diff --git a/controller/src/controller.rs b/controller/src/controller.rs index 8ec15314e..c5b64434a 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -8,18 +8,17 @@ use tokio::time::{Duration, Instant}; use zerotier_network_hypervisor::protocol; use zerotier_network_hypervisor::protocol::{PacketBuffer, DEFAULT_MULTICAST_LIMIT, ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU}; -use zerotier_network_hypervisor::vl1::{ - debug_event, HostSystem, Identity, InnerProtocol, Node, PacketHandlerResult, Path, PathFilter, Peer, -}; -use zerotier_network_hypervisor::vl2; +use zerotier_network_hypervisor::vl1::*; use zerotier_network_hypervisor::vl2::networkconfig::*; use zerotier_network_hypervisor::vl2::v1::Revocation; use zerotier_network_hypervisor::vl2::NetworkId; +use zerotier_network_hypervisor::vl2::{self, MulticastGroup}; use zerotier_utils::blob::Blob; use zerotier_utils::buffer::OutOfBoundsError; use zerotier_utils::dictionary::Dictionary; use zerotier_utils::error::InvalidParameterError; use zerotier_utils::reaper::Reaper; +use zerotier_utils::sync::RMaybeWLockGuard; use zerotier_utils::tokio; use zerotier_utils::{ms_monotonic, ms_since_epoch}; use zerotier_vl1_service::VL1Service; @@ -35,10 +34,19 @@ pub struct Controller { self_ref: Weak, service: RwLock>>, reaper: Reaper, - daemons: Mutex>>, // drop() aborts these runtime: tokio::runtime::Handle, database: Arc, local_identity: Identity, + + // Async tasks that should be killed when the controller is dropped. + daemons: Mutex>>, // drop() aborts these + + // Multicast "likes" recently received. + multicast_subscriptions: RwLock>>>, + + // Recently authorized network members and when that authorization expires (in monotonic ticks). + // Note that this is not and should not be used for real authentication, just for locking up multicast info. + recently_authorized: RwLock>, } impl Controller { @@ -52,10 +60,12 @@ impl Controller { self_ref: r.clone(), service: RwLock::new(Weak::default()), reaper: Reaper::new(&runtime), - daemons: Mutex::new(Vec::with_capacity(1)), runtime, database: database.clone(), local_identity, + daemons: Mutex::new(Vec::with_capacity(2)), + multicast_subscriptions: RwLock::new(HashMap::new()), + recently_authorized: RwLock::new(HashMap::new()), })) } else { Err(Box::new(InvalidParameterError( @@ -73,21 +83,63 @@ impl Controller { pub async fn start(&self, service: &Arc>) { *self.service.write().unwrap() = Arc::downgrade(service); + // Create database change listener. if let Some(cw) = self.database.changes().await.map(|mut ch| { - let self2 = self.self_ref.upgrade().unwrap(); + let self2 = self.self_ref.clone(); self.runtime.spawn(async move { loop { if let Ok(change) = ch.recv().await { - self2.reaper.add( - self2.runtime.spawn(self2.clone().handle_change_notification(change)), - Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), - ); + if let Some(self2) = self2.upgrade() { + self2.reaper.add( + self2.runtime.spawn(self2.clone().handle_change_notification(change)), + Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), + ); + } else { + break; + } } } }) }) { self.daemons.lock().unwrap().push(cw); } + + // Create background task to expire multicast subscriptions and recent authorizations. + let self2 = self.self_ref.clone(); + self.daemons.lock().unwrap().push(self.runtime.spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis((protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE / 2) as u64)).await; + + if let Some(self2) = self2.upgrade() { + let time_ticks = ms_monotonic(); + let exp_before = time_ticks - protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE; + let mut empty_subscription_entries = Vec::new(); + + for (network_group, subs) in self2.multicast_subscriptions.read().unwrap().iter() { + let mut subs = subs.lock().unwrap(); + subs.retain(|_, t| *t > exp_before); + if subs.is_empty() { + empty_subscription_entries.push(network_group.clone()); + } + } + + if !empty_subscription_entries.is_empty() { + let mut ms = self2.multicast_subscriptions.write().unwrap(); + for e in empty_subscription_entries.iter() { + ms.remove(e); + } + } + + self2 + .recently_authorized + .write() + .unwrap() + .retain(|_, timeout| *timeout > time_ticks); + } else { + break; + } + } + })); } /// Compose and send network configuration packet. @@ -108,11 +160,11 @@ impl Controller { if let Some(in_re_message_id) = in_re_message_id { let ok_header = packet.append_struct_get_mut::()?; - ok_header.verb = protocol::verbs::VL1_OK; - ok_header.in_re_verb = protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST; + ok_header.verb = protocol::message_type::VL1_OK; + ok_header.in_re_verb = protocol::message_type::VL2_NETWORK_CONFIG_REQUEST; ok_header.in_re_message_id = in_re_message_id.to_be_bytes(); } else { - packet.append_u8(protocol::verbs::VL2_VERB_NETWORK_CONFIG)?; + packet.append_u8(protocol::message_type::VL2_NETWORK_CONFIG)?; } if peer.is_v2() { @@ -149,6 +201,8 @@ impl Controller { } } + fn send_revocations(&self, peer: &Peer, revocations: &Vec) {} + /// Called when the DB informs us of a change. async fn handle_change_notification(self: Arc, change: Change) { match change { @@ -267,6 +321,7 @@ impl Controller { nc.revision = now as u64; nc.mtu = network.mtu.unwrap_or(ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU as u16); nc.multicast_limit = network.multicast_limit.unwrap_or(DEFAULT_MULTICAST_LIMIT as u32); + nc.multicast_like_expire = Some(protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE as u32); nc.routes = network.ip_routes; nc.static_ips = member.ip_assignments.clone(); nc.rules = network.rules; @@ -335,6 +390,12 @@ impl Controller { // TODO: populate node info for V2 networks } + let _ = self + .recently_authorized + .write() + .unwrap() + .insert((network_id, source_identity.fingerprint), ms_monotonic() + nc.credential_ttl); + network_config = Some(nc); } @@ -346,9 +407,6 @@ impl Controller { } } -// Default PathFilter implementations permit anything. -impl PathFilter for Controller {} - impl InnerProtocol for Controller { fn handle_packet( &self, @@ -363,7 +421,11 @@ impl InnerProtocol for Controller { mut cursor: usize, ) -> PacketHandlerResult { match verb { - protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => { + protocol::message_type::VL2_NETWORK_CONFIG_REQUEST => { + if !host_system.should_respond_to(&source.identity) { + return PacketHandlerResult::Ok; // handled and ignored + } + let network_id = payload.read_u64(&mut cursor); if network_id.is_err() { return PacketHandlerResult::Error; @@ -400,19 +462,6 @@ impl InnerProtocol for Controller { Dictionary::new() }; - /* - let (have_revision, have_timestamp) = if (cursor + 16) <= payload.len() { - let r = payload.read_u64(&mut cursor); - let t = payload.read_u64(&mut cursor); - if r.is_err() || t.is_err() { - return PacketHandlerResult::Error; - } - (Some(r.unwrap()), Some(t.unwrap())) - } else { - (None, None) - }; - */ - // Launch handler as an async background task. let (self2, peer, source_remote_endpoint) = (self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone()); @@ -427,6 +476,9 @@ impl InnerProtocol for Controller { Result::Ok((result, Some(config), revocations)) => { //println!("{}", serde_yaml::to_string(&config).unwrap()); self2.send_network_config(peer.as_ref(), &config, Some(message_id)); + if let Some(revocations) = revocations { + self2.send_revocations(peer.as_ref(), &revocations); + } (result, Some(config)) } Result::Ok((result, None, _)) => (result, None), @@ -448,6 +500,8 @@ impl InnerProtocol for Controller { } else { meta_data.to_bytes() }, + peer_version: peer.version(), + peer_protocol_version: peer.protocol_version(), timestamp: now, source_remote_endpoint, source_hops, @@ -461,6 +515,105 @@ impl InnerProtocol for Controller { PacketHandlerResult::Ok } + + protocol::message_type::VL2_MULTICAST_LIKE => { + let mut subscriptions = RMaybeWLockGuard::new_read(&self.multicast_subscriptions); + let auth = self.recently_authorized.read().unwrap(); + let time_ticks = ms_monotonic(); + + while payload.len() >= (8 + 6 + 4) { + let network_id = NetworkId::from_bytes_fixed(payload.read_bytes_fixed(&mut cursor).unwrap()); + if let Some(network_id) = network_id { + let mac = MAC::from_bytes_fixed(payload.read_bytes_fixed(&mut cursor).unwrap()); + if let Some(mac) = mac { + if auth + .get(&(network_id, source.identity.fingerprint)) + .map_or(false, |t| *t > time_ticks) + { + let sub_key = (network_id, MulticastGroup { mac, adi: payload.read_u32(&mut cursor).unwrap() }); + if let Some(sub) = subscriptions.read().get(&sub_key) { + let _ = sub.lock().unwrap().insert(source.identity.address, time_ticks); + } else { + let _ = subscriptions + .write(&self.multicast_subscriptions) + .entry(sub_key) + .or_insert_with(|| Mutex::new(HashMap::new())) + .lock() + .unwrap() + .insert(source.identity.address, time_ticks); + } + } + } + } + } + + PacketHandlerResult::Ok + } + + protocol::message_type::VL2_MULTICAST_GATHER => { + if let Some(service) = self.service.read().unwrap().upgrade() { + if let Some(network_id) = payload + .read_bytes_fixed(&mut cursor) + .map_or(None, |network_id| NetworkId::from_bytes_fixed(network_id)) + { + let time_ticks = ms_monotonic(); + if self + .recently_authorized + .read() + .unwrap() + .get(&(network_id, source.identity.fingerprint)) + .map_or(false, |t| *t > time_ticks) + { + cursor += 1; // skip flags, currently unused + if let Some(mac) = payload.read_bytes_fixed(&mut cursor).map_or(None, |mac| MAC::from_bytes_fixed(mac)) { + let mut gathered = Vec::new(); + let adi = payload.read_u32(&mut cursor).unwrap_or(0); + let subscriptions = self.multicast_subscriptions.read().unwrap(); + if let Some(sub) = subscriptions.get(&(network_id, MulticastGroup { mac, adi })) { + let sub = sub.lock().unwrap(); + for a in sub.keys() { + gathered.push(*a); + } + } + + while !gathered.is_empty() { + source.send( + service.as_ref(), + service.node(), + None, + time_ticks, + |packet| -> Result<(), OutOfBoundsError> { + let ok_header = packet.append_struct_get_mut::()?; + ok_header.verb = protocol::message_type::VL1_OK; + ok_header.in_re_verb = protocol::message_type::VL2_MULTICAST_GATHER; + ok_header.in_re_message_id = message_id.to_be_bytes(); + + packet.append_bytes_fixed(&network_id.to_bytes())?; + packet.append_bytes_fixed(&mac.to_bytes())?; + packet.append_u32(adi)?; + packet.append_u32(gathered.len() as u32)?; + + let in_this_packet = gathered + .len() + .clamp(1, (packet.capacity() - packet.len()) / protocol::ADDRESS_SIZE) + .min(u16::MAX as usize); + + packet.append_u16(in_this_packet as u16)?; + for _ in 0..in_this_packet { + packet.append_bytes_fixed(&gathered.pop().unwrap().to_bytes())?; + } + + Ok(()) + }, + ); + } + } + } + } + } + PacketHandlerResult::Ok + } + _ => PacketHandlerResult::NotHandled, } } @@ -497,11 +650,17 @@ impl InnerProtocol for Controller { ) -> PacketHandlerResult { PacketHandlerResult::NotHandled } +} - fn should_respond_to(&self, _: &Identity) -> bool { - // Controllers respond to anyone. +impl VL1AuthProvider for Controller { + #[inline(always)] + fn should_respond_to(&self, id: &Identity) -> bool { true } + + fn has_trust_relationship(&self, id: &Identity) -> bool { + false + } } impl Drop for Controller { diff --git a/controller/src/model/mod.rs b/controller/src/model/mod.rs index 0767965cd..3e2c6e5fc 100644 --- a/controller/src/model/mod.rs +++ b/controller/src/model/mod.rs @@ -82,26 +82,40 @@ impl ToString for AuthorizationResult { #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct RequestLogItem { - #[serde(rename = "nwid")] + #[serde(rename = "nw")] pub network_id: NetworkId, - #[serde(rename = "nid")] + #[serde(rename = "n")] pub node_id: Address, #[serde(rename = "nf")] pub node_fingerprint: Blob<48>, - #[serde(rename = "cid")] + #[serde(rename = "c")] pub controller_node_id: Address, + #[serde(skip_serializing_if = "Vec::is_empty")] #[serde(default)] #[serde(rename = "md")] pub metadata: Vec, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + #[serde(rename = "pv")] + pub peer_version: Option<(u8, u8, u16)>, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + #[serde(rename = "ppv")] + pub peer_protocol_version: Option, + #[serde(rename = "ts")] pub timestamp: i64, + #[serde(rename = "s")] pub source_remote_endpoint: Endpoint, #[serde(rename = "sh")] pub source_hops: u8, + #[serde(rename = "r")] pub result: AuthorizationResult, + #[serde(rename = "nc")] pub config: Option, } diff --git a/network-hypervisor/src/protocol.rs b/network-hypervisor/src/protocol.rs index 9d0425052..f6851a5b3 100644 --- a/network-hypervisor/src/protocol.rs +++ b/network-hypervisor/src/protocol.rs @@ -73,7 +73,8 @@ pub type PacketBufferPool = Pool; /// 64-bit message ID (obtained after AEAD decryption). pub type MessageId = u64; -pub mod verbs { +/// ZeroTier VL1 and VL2 wire protocol message types. +pub mod message_type { pub const VL1_NOP: u8 = 0x00; pub const VL1_HELLO: u8 = 0x01; pub const VL1_ERROR: u8 = 0x02; @@ -84,10 +85,10 @@ pub mod verbs { pub const VL1_PUSH_DIRECT_PATHS: u8 = 0x10; pub const VL1_USER_MESSAGE: u8 = 0x14; - pub const VL2_VERB_MULTICAST_LIKE: u8 = 0x09; - pub const VL2_VERB_NETWORK_CONFIG_REQUEST: u8 = 0x0b; - pub const VL2_VERB_NETWORK_CONFIG: u8 = 0x0c; - pub const VL2_VERB_MULTICAST_GATHER: u8 = 0x0d; + pub const VL2_MULTICAST_LIKE: u8 = 0x09; + pub const VL2_NETWORK_CONFIG_REQUEST: u8 = 0x0b; + pub const VL2_NETWORK_CONFIG: u8 = 0x0c; + pub const VL2_MULTICAST_GATHER: u8 = 0x0d; pub fn name(verb: u8) -> &'static str { match verb { @@ -100,10 +101,10 @@ pub mod verbs { VL1_ECHO => "VL1_ECHO", VL1_PUSH_DIRECT_PATHS => "VL1_PUSH_DIRECT_PATHS", VL1_USER_MESSAGE => "VL1_USER_MESSAGE", - VL2_VERB_MULTICAST_LIKE => "VL2_VERB_MULTICAST_LIKE", - VL2_VERB_NETWORK_CONFIG_REQUEST => "VL2_VERB_NETWORK_CONFIG_REQUEST", - VL2_VERB_NETWORK_CONFIG => "VL2_VERB_NETWORK_CONFIG", - VL2_VERB_MULTICAST_GATHER => "VL2_VERB_MULTICAST_GATHER", + VL2_MULTICAST_LIKE => "VL2_MULTICAST_LIKE", + VL2_NETWORK_CONFIG_REQUEST => "VL2_NETWORK_CONFIG_REQUEST", + VL2_NETWORK_CONFIG => "VL2_NETWORK_CONFIG", + VL2_MULTICAST_GATHER => "VL2_MULTICAST_GATHER", _ => "???", } } @@ -588,9 +589,8 @@ pub(crate) const PEER_EXPIRATION_TIME: i64 = (PEER_HELLO_INTERVAL_MAX * 2) + 100 /// Proof of work difficulty (threshold) for identity generation. pub(crate) const IDENTITY_POW_THRESHOLD: u8 = 17; -/// Maximum number of key/value pairs in a single Tag credential. -/// (This is for V2 only. In V1 tag credentials can have only one pair.) -pub(crate) const MAX_TAG_KEY_VALUE_PAIRS: usize = 128; +// Multicast LIKE expire time in milliseconds. +pub const VL2_DEFAULT_MULTICAST_LIKE_EXPIRE: i64 = 600000; #[cfg(test)] mod tests { diff --git a/network-hypervisor/src/vl1/mod.rs b/network-hypervisor/src/vl1/mod.rs index b3cf1a9ef..6725397ef 100644 --- a/network-hypervisor/src/vl1/mod.rs +++ b/network-hypervisor/src/vl1/mod.rs @@ -19,7 +19,7 @@ pub use event::Event; pub use identity::Identity; pub use inetaddress::InetAddress; pub use mac::MAC; -pub use node::{DummyInnerProtocol, DummyPathFilter, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, PathFilter}; +pub use node::{DummyInnerProtocol, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, VL1AuthProvider}; pub use path::Path; pub use peer::Peer; pub use rootset::{Root, RootSet}; diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index f5e3e7a4e..5e80cadc3 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -27,17 +27,42 @@ use zerotier_utils::marshalable::Marshalable; use zerotier_utils::ringbuffer::RingBuffer; use zerotier_utils::thing::Thing; -/// Trait implemented by external code to handle events and provide an interface to the system or application. +/// Trait providing VL1 authentication functions to determine which nodes we should talk to. /// -/// These methods are basically callbacks that the core calls to request or transmit things. They are called -/// during calls to things like wire_recieve() and do_background_tasks(). -pub trait HostSystem: Sync + Send + 'static { +/// This is included in HostSystem but is provided as a separate trait to make it easy for +/// implementers of HostSystem to break this out and allow a user to specify it. +pub trait VL1AuthProvider: Sync + Send { + /// Check if this node should respond to messages from a given peer at all. + /// + /// If this returns false, the node simply drops messages on the floor and refuses + /// to init V2 sessions. + fn should_respond_to(&self, id: &Identity) -> bool; + + /// Check if this node has any trust relationship with the provided identity. + /// + /// This should return true if there is any special trust relationship such as mutual + /// membership in a network or for controllers the peer's membership in any network + /// they control. + fn has_trust_relationship(&self, id: &Identity) -> bool; +} + +/// Trait to be implemented by outside code to provide object storage to VL1 +/// +/// This is included in HostSystem but is provided as a separate trait to make it easy for +/// implementers of HostSystem to break this out and allow a user to specify it. +pub trait NodeStorage: Sync + Send { + /// Load this node's identity from the data store. + fn load_node_identity(&self) -> Option; + + /// Save this node's identity to the data store. + fn save_node_identity(&self, id: &Identity); +} + +/// Trait implemented by external code to handle events and provide an interface to the system or application. +pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static { /// Type for implementation of NodeStorage. type Storage: NodeStorage + ?Sized; - /// Path filter implementation for this host. - type PathFilter: PathFilter + ?Sized; - /// Type for local system sockets. type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; @@ -50,9 +75,6 @@ pub trait HostSystem: Sync + Send + 'static { /// Get a reference to the local storage implementation at this host. fn storage(&self) -> &Self::Storage; - /// Get the path filter implementation for this host. - fn path_filter(&self) -> &Self::PathFilter; - /// Get a pooled packet buffer for internal use. fn get_buffer(&self) -> PooledPacketBuffer; @@ -84,26 +106,6 @@ pub trait HostSystem: Sync + Send + 'static { packet_ttl: u8, ); - /// Called to get the current time in milliseconds from the system monotonically increasing clock. - /// This needs to be accurate to about 250 milliseconds resolution or better. - fn time_ticks(&self) -> i64; - - /// Called to get the current time in milliseconds since epoch from the real-time clock. - /// This needs to be accurate to about one second resolution or better. - fn time_clock(&self) -> i64; -} - -/// Trait to be implemented by outside code to provide object storage to VL1 -pub trait NodeStorage: Sync + Send { - /// Load this node's identity from the data store. - fn load_node_identity(&self) -> Option; - - /// Save this node's identity to the data store. - fn save_node_identity(&self, id: &Identity); -} - -/// Trait to be implemented to provide path hints and a filter to approve physical paths. -pub trait PathFilter: Sync + Send { /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. /// /// The default implementation always returns true. @@ -134,6 +136,14 @@ pub trait PathFilter: Sync + Send { > { None } + + /// Called to get the current time in milliseconds from the system monotonically increasing clock. + /// This needs to be accurate to about 250 milliseconds resolution or better. + fn time_ticks(&self) -> i64; + + /// Called to get the current time in milliseconds since epoch from the real-time clock. + /// This needs to be accurate to about one second resolution or better. + fn time_clock(&self) -> i64; } /// Result of a packet handler. @@ -199,9 +209,6 @@ pub trait InnerProtocol: Sync + Send { payload: &PacketBuffer, cursor: usize, ) -> PacketHandlerResult; - - /// Check if this node should respond to messages from a given peer. - fn should_respond_to(&self, id: &Identity) -> bool; } /// How often to check the root cluster definitions against the root list and update. @@ -949,7 +956,7 @@ impl Node { while !addresses.is_empty() { if !root .send(host_system, self, None, time_ticks, |packet| -> Result<(), Infallible> { - assert!(packet.append_u8(verbs::VL1_WHOIS).is_ok()); + assert!(packet.append_u8(message_type::VL1_WHOIS).is_ok()); while !addresses.is_empty() && (packet.len() + ADDRESS_SIZE) <= UDP_DEFAULT_MTU { assert!(packet.append_bytes_fixed(&addresses[0].to_bytes()).is_ok()); addresses = &addresses[1..]; @@ -978,7 +985,7 @@ impl Node { let mut whois_queue = self.whois_queue.lock().unwrap(); if let Some(qi) = whois_queue.get_mut(&received_identity.address) { let address = received_identity.address; - if inner.should_respond_to(&received_identity) { + if host_system.should_respond_to(&received_identity) { let mut peers = self.peers.write().unwrap(); if let Some(peer) = peers.get(&address).cloned().or_else(|| { Peer::new(&self.identity, received_identity, time_ticks) @@ -1154,15 +1161,16 @@ impl InnerProtocol for DummyInnerProtocol { ) -> PacketHandlerResult { PacketHandlerResult::NotHandled } +} + +impl VL1AuthProvider for DummyInnerProtocol { + #[inline(always)] + fn should_respond_to(&self, id: &Identity) -> bool { + true + } #[inline(always)] - fn should_respond_to(&self, _id: &Identity) -> bool { + fn has_trust_relationship(&self, id: &Identity) -> bool { true } } - -/// Dummy no-op path filter for debugging and testing. -#[derive(Default)] -pub struct DummyPathFilter; - -impl PathFilter for DummyPathFilter {} diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index a59c772e2..16a2a20a1 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -424,7 +424,7 @@ impl Peer { f.0.dest = self.identity.address.to_bytes(); f.0.src = node.identity.address.to_bytes(); f.0.flags_cipher_hops = v1::CIPHER_NOCRYPT_POLY1305; - f.1.verb = verbs::VL1_HELLO | v1::VERB_FLAG_EXTENDED_AUTHENTICATION; + f.1.verb = message_type::VL1_HELLO | v1::VERB_FLAG_EXTENDED_AUTHENTICATION; f.1.version_proto = PROTOCOL_VERSION; f.1.version_major = VERSION_MAJOR; f.1.version_minor = VERSION_MINOR; @@ -541,14 +541,16 @@ impl Peer { host_system, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), - verbs::name(verb), + message_type::name(verb), verb as u32 ); return match verb { - verbs::VL1_NOP => PacketHandlerResult::Ok, - verbs::VL1_HELLO => self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload), - verbs::VL1_ERROR => self.handle_incoming_error( + message_type::VL1_NOP => PacketHandlerResult::Ok, + message_type::VL1_HELLO => { + self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload) + } + message_type::VL1_ERROR => self.handle_incoming_error( host_system, inner, node, @@ -558,7 +560,7 @@ impl Peer { message_id, &payload, ), - verbs::VL1_OK => self.handle_incoming_ok( + message_type::VL1_OK => self.handle_incoming_ok( host_system, inner, node, @@ -569,15 +571,17 @@ impl Peer { path_is_known, &payload, ), - verbs::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload), - verbs::VL1_RENDEZVOUS => { + message_type::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload), + message_type::VL1_RENDEZVOUS => { self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload) } - verbs::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload), - verbs::VL1_PUSH_DIRECT_PATHS => { + message_type::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload), + message_type::VL1_PUSH_DIRECT_PATHS => { self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload) } - verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload), + message_type::VL1_USER_MESSAGE => { + self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload) + } _ => inner.handle_packet( host_system, node, @@ -606,7 +610,7 @@ impl Peer { source_path: &Arc, payload: &PacketBuffer, ) -> PacketHandlerResult { - if !(inner.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { + if !(host_system.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { debug_event!( host_system, "[vl1] dropping HELLO from {} due to lack of trust relationship", @@ -637,8 +641,8 @@ impl Peer { |packet| -> Result<(), Infallible> { let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) = packet.append_struct_get_mut().unwrap(); - f.0.verb = verbs::VL1_OK; - f.0.in_re_verb = verbs::VL1_HELLO; + f.0.verb = message_type::VL1_OK; + f.0.in_re_verb = message_type::VL1_HELLO; f.0.in_re_message_id = message_id.to_ne_bytes(); f.1.timestamp_echo = hello_fixed_headers.timestamp; f.1.version_proto = PROTOCOL_VERSION; @@ -714,7 +718,7 @@ impl Peer { // TODO: replay attack prevention filter match ok_header.in_re_verb { - verbs::VL1_HELLO => { + message_type::VL1_HELLO => { if let Ok(_ok_hello_fixed_header_fields) = payload.read_struct::(&mut cursor) { @@ -750,7 +754,7 @@ impl Peer { } } - verbs::VL1_WHOIS => { + message_type::VL1_WHOIS => { debug_event!(host_system, "[vl1] OK(WHOIS)"); if node.is_peer_root(self) { while cursor < payload.len() { @@ -806,7 +810,7 @@ impl Peer { message_id: MessageId, payload: &PacketBuffer, ) -> PacketHandlerResult { - if node.this_node_is_root() || inner.should_respond_to(&self.identity) { + if node.this_node_is_root() || host_system.should_respond_to(&self.identity) { let mut addresses = payload.as_bytes(); while addresses.len() >= ADDRESS_SIZE { if !self @@ -852,11 +856,11 @@ impl Peer { message_id: MessageId, payload: &PacketBuffer, ) -> PacketHandlerResult { - if inner.should_respond_to(&self.identity) || node.is_peer_root(self) { + if host_system.should_respond_to(&self.identity) || node.is_peer_root(self) { self.send(host_system, node, None, time_ticks, |packet| { let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap(); - f.verb = verbs::VL1_OK; - f.in_re_verb = verbs::VL1_ECHO; + f.verb = message_type::VL1_OK; + f.in_re_verb = message_type::VL1_ECHO; f.in_re_message_id = message_id.to_ne_bytes(); packet.append_bytes(payload.as_bytes()) }); @@ -935,7 +939,7 @@ fn v1_proto_try_aead_decrypt( if cipher == v1::CIPHER_SALSA2012_POLY1305 { salsa.crypt_in_place(payload.as_bytes_mut()); Some(message_id) - } else if (payload.u8_at(0).unwrap_or(0) & v1::VERB_MASK) == verbs::VL1_HELLO { + } else if (payload.u8_at(0).unwrap_or(0) & v1::VERB_MASK) == message_type::VL1_HELLO { Some(message_id) } else { // SECURITY: fail if there is no encryption and the message is not HELLO. No other types are allowed diff --git a/network-hypervisor/src/vl2/mod.rs b/network-hypervisor/src/vl2/mod.rs index 3e93e3c29..6cc318e55 100644 --- a/network-hypervisor/src/vl2/mod.rs +++ b/network-hypervisor/src/vl2/mod.rs @@ -4,6 +4,7 @@ mod multicastgroup; mod networkid; mod switch; +pub mod multicastauthority; pub mod networkconfig; pub mod rule; pub mod v1; diff --git a/network-hypervisor/src/vl2/multicastauthority.rs b/network-hypervisor/src/vl2/multicastauthority.rs new file mode 100644 index 000000000..28b44a3db --- /dev/null +++ b/network-hypervisor/src/vl2/multicastauthority.rs @@ -0,0 +1,44 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex, RwLock}; + +use crate::protocol; +use crate::protocol::PacketBuffer; +use crate::vl1::{Address, HostSystem, Identity, PacketHandlerResult, Peer}; +use crate::vl2::{MulticastGroup, NetworkId}; + +/// Handler implementations for VL2_MULTICAST_LIKE and VL2_MULTICAST_GATHER. +/// +/// Both controllers and roots will want to handle these, with the latter supporting them for legacy +/// reasons only. Regular nodes may also want to handle them in the future. So, break this out to allow +/// easy code reuse. To integrate call the appropriate method when the appropriate message type is +/// received and pass in a function to check whether specific network/identity combinations should be +/// processed. The GATHER implementation will send reply packets to the source peer. +pub struct MulticastAuthority { + subscriptions: RwLock>>>, +} + +impl MulticastAuthority { + fn handle_vl2_multicast_like bool>( + &self, + auth: Authenticator, + host_system: &HostSystemImpl, + source: &Arc, + message_id: u64, + payload: &PacketBuffer, + mut cursor: usize, + ) -> PacketHandlerResult { + PacketHandlerResult::Ok + } + + fn handle_vl2_multicast_gather bool>( + &self, + auth: Authenticator, + host_system: &HostSystemImpl, + source: &Arc, + message_id: u64, + payload: &PacketBuffer, + mut cursor: usize, + ) -> PacketHandlerResult { + PacketHandlerResult::Ok + } +} diff --git a/network-hypervisor/src/vl2/multicastgroup.rs b/network-hypervisor/src/vl2/multicastgroup.rs index fe7f1be09..f5cdcf0e2 100644 --- a/network-hypervisor/src/vl2/multicastgroup.rs +++ b/network-hypervisor/src/vl2/multicastgroup.rs @@ -1,8 +1,11 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. -use crate::vl1::MAC; -use std::cmp::Ordering; use std::hash::{Hash, Hasher}; +use std::str::FromStr; + +use crate::vl1::MAC; + +use serde::{Deserialize, Deserializer, Serialize, Serializer}; #[derive(Clone, Copy, PartialEq, Eq)] pub struct MulticastGroup { @@ -24,23 +27,6 @@ impl From for MulticastGroup { } } -impl Ord for MulticastGroup { - fn cmp(&self, other: &Self) -> Ordering { - let o = self.mac.cmp(&other.mac); - match o { - Ordering::Equal => self.adi.cmp(&other.adi), - _ => o, - } - } -} - -impl PartialOrd for MulticastGroup { - #[inline(always)] - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - impl Hash for MulticastGroup { #[inline(always)] fn hash(&self, state: &mut H) { @@ -48,3 +34,74 @@ impl Hash for MulticastGroup { state.write_u32(self.adi); } } +impl Serialize for MulticastGroup { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + if serializer.is_human_readable() { + serializer.serialize_str(format!("{}/{}", self.mac.to_string(), self.adi.to_string()).as_str()) + } else { + let mut tmp = [0u8; 10]; + tmp[0..6].copy_from_slice(&self.mac.to_bytes()); + tmp[6..10].copy_from_slice(&self.adi.to_be_bytes()); + serializer.serialize_bytes(&tmp) + } + } +} + +struct MulticastGroupVisitor; + +impl<'de> serde::de::Visitor<'de> for MulticastGroupVisitor { + type Value = MulticastGroup; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a ZeroTier network ID") + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + if v.len() == 10 { + Ok(MulticastGroup { + mac: MAC::from_bytes(&v[..6]).ok_or(E::custom("invalid MAC address"))?, + adi: u32::from_be_bytes((&v[6..10]).try_into().unwrap()), + }) + } else { + Err(E::custom("object too large")) + } + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + let mut mac = None; + let mut adi: u32 = 0; + let mut x = 0; + for f in v.split('/') { + if x == 0 { + mac = Some(MAC::from_str(f).map_err(|_| E::custom("invalid MAC address"))?); + x = 1; + } else { + adi = u32::from_str(f).unwrap_or(0); + break; + } + } + Ok(MulticastGroup { mac: mac.ok_or(E::custom("invalid MAC address"))?, adi }) + } +} + +impl<'de> Deserialize<'de> for MulticastGroup { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + if deserializer.is_human_readable() { + deserializer.deserialize_str(MulticastGroupVisitor) + } else { + deserializer.deserialize_bytes(MulticastGroupVisitor) + } + } +} diff --git a/network-hypervisor/src/vl2/networkconfig.rs b/network-hypervisor/src/vl2/networkconfig.rs index 100212451..28237079b 100644 --- a/network-hypervisor/src/vl2/networkconfig.rs +++ b/network-hypervisor/src/vl2/networkconfig.rs @@ -53,6 +53,11 @@ pub struct NetworkConfig { /// Suggested horizon limit for multicast (not a hard limit, but 0 disables multicast) pub multicast_limit: u32, + /// Multicast "like" expire time in milliseconds (default if omitted). + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub multicast_like_expire: Option, + /// ZeroTier-assigned L3 routes for this node. #[serde(skip_serializing_if = "HashSet::is_empty")] #[serde(default)] @@ -107,6 +112,7 @@ impl NetworkConfig { revision: 0, mtu: 0, multicast_limit: 0, + multicast_like_expire: None, routes: HashSet::new(), static_ips: HashSet::new(), rules: Vec::new(), diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index 52f6a2d94..60b177971 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::protocol::PacketBuffer; use crate::vl1::node::{HostSystem, InnerProtocol, Node, PacketHandlerResult}; -use crate::vl1::{Identity, Path, Peer}; +use crate::vl1::{Path, Peer}; pub trait SwitchInterface: Sync + Send {} @@ -59,10 +59,6 @@ impl InnerProtocol for Switch { ) -> PacketHandlerResult { PacketHandlerResult::NotHandled } - - fn should_respond_to(&self, id: &Identity) -> bool { - true - } } impl Switch {} diff --git a/service/src/main.rs b/service/src/main.rs index 4a94ef5e1..7520ad7f7 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -210,9 +210,13 @@ fn main() { drop(global_args); // free unnecessary heap before starting service as we're done with CLI args if let Ok(_tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() { let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerProtocol::default()); - let test_path_filter = Arc::new(zerotier_network_hypervisor::vl1::DummyPathFilter::default()); let datadir = open_datadir(&flags); - let svc = VL1Service::new(datadir, test_inner, test_path_filter, zerotier_vl1_service::VL1Settings::default()); + let svc = VL1Service::new( + datadir, + test_inner.clone(), + test_inner, + zerotier_vl1_service::VL1Settings::default(), + ); if svc.is_ok() { let svc = svc.unwrap(); svc.node().init_default_roots(); diff --git a/utils/src/arrayvec.rs b/utils/src/arrayvec.rs index a76535bf5..7dc4cee75 100644 --- a/utils/src/arrayvec.rs +++ b/utils/src/arrayvec.rs @@ -178,6 +178,11 @@ impl ArrayVec { self.s } + #[inline(always)] + pub fn capacity_remaining(&self) -> usize { + C - self.s + } + #[inline] pub fn pop(&mut self) -> Option { if self.s > 0 { diff --git a/utils/src/lib.rs b/utils/src/lib.rs index de7bd725e..a08df3bfa 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -17,6 +17,7 @@ pub mod memory; pub mod pool; pub mod ringbuffer; pub mod ringbuffermap; +pub mod sync; pub mod thing; pub mod varint; diff --git a/utils/src/sync.rs b/utils/src/sync.rs new file mode 100644 index 000000000..edebd1f46 --- /dev/null +++ b/utils/src/sync.rs @@ -0,0 +1,39 @@ +use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + +/// Variant version of lock for RwLock with automatic conversion to a write lock as needed. +pub enum RMaybeWLockGuard<'a, T> { + R(Option>), + W(RwLockWriteGuard<'a, T>), +} + +impl<'a, T> RMaybeWLockGuard<'a, T> { + #[inline(always)] + pub fn new_read(l: &'a RwLock) -> Self { + Self::R(Some(l.read().unwrap())) + } + + /// Get a readable reference to the object. + #[inline] + pub fn read(&self) -> &T { + match self { + Self::R(r) => &*(r.as_ref().unwrap()), + Self::W(w) => &*w, + } + } + + /// Get a writable reference to the object, converting this to a write lock if needed. + #[inline] + pub fn write(&mut self, l: &'a RwLock) -> &mut T { + match self { + Self::R(r) => { + let _ = r.take(); + *self = Self::W(l.write().unwrap()); + match self { + Self::W(w) => &mut *w, + _ => panic!(), + } + } + Self::W(w) => &mut *w, + } + } +} diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index 18be9a497..70eafc6a8 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -28,13 +28,13 @@ const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10; /// a test harness or just the controller for a controller that runs stand-alone. pub struct VL1Service< NodeStorageImpl: NodeStorage + ?Sized + 'static, - PathFilterImpl: PathFilter + ?Sized + 'static, + VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static, > { state: RwLock, storage: Arc, + vl1_auth_provider: Arc, inner: Arc, - path_filter: Arc, buffer_pool: Arc, node_container: Option, // never None, set in new() } @@ -48,14 +48,14 @@ struct VL1ServiceMutableState { impl< NodeStorageImpl: NodeStorage + ?Sized + 'static, - PathFilterImpl: PathFilter + ?Sized + 'static, + VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > VL1Service + > VL1Service { pub fn new( storage: Arc, + vl1_auth_provider: Arc, inner: Arc, - path_filter: Arc, settings: VL1Settings, ) -> Result, Box> { let mut service = Self { @@ -66,8 +66,8 @@ impl< running: true, }), storage, + vl1_auth_provider, inner, - path_filter, buffer_pool: Arc::new(PacketBufferPool::new( std::thread::available_parallelism().map_or(2, |c| c.get() + 2), PacketBufferFactory::new(), @@ -191,9 +191,9 @@ impl< impl< NodeStorageImpl: NodeStorage + ?Sized + 'static, - PathFilterImpl: PathFilter + ?Sized + 'static, + VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > UdpPacketHandler for VL1Service + > UdpPacketHandler for VL1Service { #[inline(always)] fn incoming_udp_packet( @@ -217,12 +217,11 @@ impl< impl< NodeStorageImpl: NodeStorage + ?Sized + 'static, - PathFilterImpl: PathFilter + ?Sized + 'static, + VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > HostSystem for VL1Service + > HostSystem for VL1Service { type Storage = NodeStorageImpl; - type PathFilter = PathFilterImpl; type LocalSocket = crate::LocalSocket; type LocalInterface = crate::LocalInterface; @@ -243,11 +242,6 @@ impl< self.storage.as_ref() } - #[inline(always)] - fn path_filter(&self) -> &Self::PathFilter { - self.path_filter.as_ref() - } - #[inline] fn get_buffer(&self) -> zerotier_network_hypervisor::protocol::PooledPacketBuffer { self.buffer_pool.get() @@ -329,9 +323,43 @@ impl< impl< NodeStorageImpl: NodeStorage + ?Sized + 'static, - PathFilterImpl: PathFilter + ?Sized + 'static, + VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > Drop for VL1Service + > NodeStorage for VL1Service +{ + #[inline(always)] + fn load_node_identity(&self) -> Option { + self.storage.load_node_identity() + } + + #[inline(always)] + fn save_node_identity(&self, id: &Identity) { + self.storage.save_node_identity(id) + } +} + +impl< + NodeStorageImpl: NodeStorage + ?Sized + 'static, + VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, + InnerProtocolImpl: InnerProtocol + ?Sized + 'static, + > VL1AuthProvider for VL1Service +{ + #[inline(always)] + fn should_respond_to(&self, id: &Identity) -> bool { + self.vl1_auth_provider.should_respond_to(id) + } + + #[inline(always)] + fn has_trust_relationship(&self, id: &Identity) -> bool { + self.vl1_auth_provider.has_trust_relationship(id) + } +} + +impl< + NodeStorageImpl: NodeStorage + ?Sized + 'static, + VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, + InnerProtocolImpl: InnerProtocol + ?Sized + 'static, + > Drop for VL1Service { fn drop(&mut self) { let mut state = self.state.write().unwrap();