diff --git a/Cargo.toml b/Cargo.toml index 7425b56f0..b22aa4702 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,12 @@ [workspace] members = [ - "crypto", - "network-hypervisor", - "controller", - "system-service", - "utils", + "crypto", + "network-hypervisor", + "controller", + "service", + "vl1-service", + "utils", ] [profile.release] diff --git a/network-hypervisor/src/lib.rs b/network-hypervisor/src/lib.rs index af3a413fe..4ecbe590e 100644 --- a/network-hypervisor/src/lib.rs +++ b/network-hypervisor/src/lib.rs @@ -9,9 +9,4 @@ pub mod util; pub mod vl1; pub mod vl2; -mod event; -mod networkhypervisor; - -pub use event::Event; -pub use networkhypervisor::{Interface, NetworkHypervisor}; pub use vl1::protocol::{PacketBuffer, PooledPacketBuffer}; diff --git a/network-hypervisor/src/networkhypervisor.rs b/network-hypervisor/src/networkhypervisor.rs deleted file mode 100644 index e72119dc2..000000000 --- a/network-hypervisor/src/networkhypervisor.rs +++ /dev/null @@ -1,101 +0,0 @@ -// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. - -use std::time::Duration; - -use async_trait::async_trait; - -use crate::error::InvalidParameterError; -use crate::util::buffer::Buffer; -use crate::util::marshalable::Marshalable; -use crate::vl1::node::*; -use crate::vl1::protocol::PooledPacketBuffer; -use crate::vl1::*; -use crate::vl2::switch::*; - -#[async_trait] -pub trait Interface: SystemInterface + SwitchInterface {} - -pub struct NetworkHypervisor { - vl1: Node, - vl2: Switch, -} - -impl NetworkHypervisor { - pub async fn new(ii: &I, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { - Ok(NetworkHypervisor { - vl1: Node::new(ii, auto_generate_identity, auto_upgrade_identity).await?, - vl2: Switch::new().await, - }) - } - - #[inline(always)] - pub fn get_packet_buffer(&self) -> PooledPacketBuffer { - self.vl1.get_packet_buffer() - } - - #[inline(always)] - pub fn address(&self) -> Address { - self.vl1.identity.address - } - - #[inline(always)] - pub fn identity(&self) -> &Identity { - &self.vl1.identity - } - - /// Run background tasks and return desired delay until next call in milliseconds. - /// - /// This shouldn't be called concurrently by more than one loop. Doing so would be harmless - /// but would be a waste of compute cycles. - #[inline(always)] - pub async fn do_background_tasks(&self, ii: &I) -> Duration { - self.vl1.do_background_tasks(ii).await - } - - /// Process a physical packet received over a network interface. - #[inline(always)] - pub async fn handle_incoming_physical_packet( - &self, - ii: &I, - source_endpoint: &Endpoint, - source_local_socket: &I::LocalSocket, - source_local_interface: &I::LocalInterface, - data: PooledPacketBuffer, - ) { - self.vl1 - .handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) - .await - } - - /// Add or update a root set. - /// - /// If no root set exists by this name, a new root set is added. If one already - /// exists it's checked against the new one and updated if the new set is valid - /// and should supersede it. - /// - /// Changes will take effect within a few seconds when root sets are next - /// examined and synchronized with peer and root list state. - /// - /// This returns true if the new root set was accepted and false otherwise. - #[inline(always)] - pub fn add_update_root_set(&self, rs: RootSet) -> bool { - self.vl1.add_update_root_set(rs) - } - - /// Add or update the compiled-in default ZeroTier RootSet. - /// - /// This is equivalent to unmarshaling default-rootset/root.zerotier.com.bin and then - /// calling add_update_root_set(). - pub fn add_update_default_root_set(&self) -> bool { - let mut buf: Buffer<4096> = Buffer::new(); - //buf.set_to(include_bytes!("../default-rootset/root.zerotier.com.bin")); - buf.set_to(include_bytes!("../default-rootset/test-root.bin")); - let mut cursor = 0; - self.add_update_root_set(RootSet::unmarshal(&buf, &mut cursor).unwrap()) - } - - /// Call add_update_default_root_set if there are no roots defined, otherwise do nothing and return false. - pub fn add_update_default_root_set_if_none(&self) { - assert!(self.add_update_default_root_set()); - } -} diff --git a/network-hypervisor/src/util/mod.rs b/network-hypervisor/src/util/mod.rs index 385ae3cb0..d565a77df 100644 --- a/network-hypervisor/src/util/mod.rs +++ b/network-hypervisor/src/util/mod.rs @@ -15,7 +15,7 @@ pub mod testutil; #[allow(unused_macros)] macro_rules! debug_event { ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => { - $si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))); + $si.event(crate::vl1::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))); } } diff --git a/network-hypervisor/src/event.rs b/network-hypervisor/src/vl1/event.rs similarity index 100% rename from network-hypervisor/src/event.rs rename to network-hypervisor/src/vl1/event.rs diff --git a/network-hypervisor/src/vl1/mod.rs b/network-hypervisor/src/vl1/mod.rs index cd3b90d76..d16cfaec5 100644 --- a/network-hypervisor/src/vl1/mod.rs +++ b/network-hypervisor/src/vl1/mod.rs @@ -2,8 +2,8 @@ mod address; mod endpoint; +mod event; mod fragmentedpacket; -mod identity; mod mac; mod path; mod peer; @@ -15,14 +15,16 @@ pub(crate) mod node; #[allow(unused)] pub(crate) mod protocol; +pub mod identity; pub mod inetaddress; pub use address::Address; pub use endpoint::Endpoint; -pub use identity::*; +pub use event::Event; +pub use identity::Identity; pub use inetaddress::InetAddress; pub use mac::MAC; -pub use node::SystemInterface; +pub use node::{HostSystem, InnerProtocol, Node, PathFilter, Storage}; pub use path::Path; pub use peer::Peer; pub use rootset::{Root, RootSet}; diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index 8631296cd..a749e393d 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -14,12 +14,15 @@ use crate::error::InvalidParameterError; use crate::util::debug_event; use crate::util::gate::IntervalGate; use crate::util::marshalable::Marshalable; +use crate::vl1::address::Address; +use crate::vl1::endpoint::Endpoint; +use crate::vl1::event::Event; +use crate::vl1::identity::Identity; use crate::vl1::path::{Path, PathServiceResult}; use crate::vl1::peer::Peer; use crate::vl1::protocol::*; +use crate::vl1::rootset::RootSet; use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue}; -use crate::vl1::{Address, Endpoint, Identity, RootSet}; -use crate::Event; use zerotier_crypto::random; use zerotier_utils::hex; @@ -29,7 +32,7 @@ use zerotier_utils::hex; /// These methods are basically callbacks that the core calls to request or transmit things. They are called /// during calls to things like wire_recieve() and do_background_tasks(). #[async_trait] -pub trait SystemInterface: Sync + Send + 'static { +pub trait HostSystem: Sync + Send + 'static { /// Type for local system sockets. type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; @@ -51,12 +54,6 @@ pub trait SystemInterface: Sync + Send + 'static { /// unbound, etc. fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool; - /// Load this node's identity from the data store. - async fn load_node_identity(&self) -> Option; - - /// Save this node's identity to the data store. - async fn save_node_identity(&self, id: &Identity); - /// Called to send a packet over the physical network (virtual -> physical). /// /// This may return false if the send definitely failed. Otherwise it should return true @@ -75,22 +72,10 @@ pub trait SystemInterface: Sync + Send + 'static { endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, - data: &[&[u8]], + data: &[u8], packet_ttl: u8, ) -> bool; - /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. - async fn check_path( - &self, - id: &Identity, - endpoint: &Endpoint, - local_socket: Option<&Self::LocalSocket>, - local_interface: Option<&Self::LocalInterface>, - ) -> bool; - - /// Called to look up any statically defined or memorized paths to known nodes. - async fn get_path_hints(&self, id: &Identity) -> Option, Option)>>; - /// Called to get the current time in milliseconds from the system monotonically increasing clock. /// This needs to be accurate to about 250 milliseconds resolution or better. fn time_ticks(&self) -> i64; @@ -100,23 +85,63 @@ pub trait SystemInterface: Sync + Send + 'static { fn time_clock(&self) -> i64; } +/// Trait to be implemented by outside code to provide object storage to VL1 +#[async_trait] +pub trait Storage: Sync + Send + 'static { + /// Load this node's identity from the data store. + async fn load_node_identity(&self) -> Option; + + /// Save this node's identity to the data store. + async fn save_node_identity(&self, id: &Identity); +} + +/// Trait to be implemented to provide path hints and a filter to approve physical paths +#[async_trait] +pub trait PathFilter: Sync + Send + 'static { + /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. + async fn check_path( + &self, + id: &Identity, + endpoint: &Endpoint, + local_socket: Option<&HostSystemImpl::LocalSocket>, + local_interface: Option<&HostSystemImpl::LocalInterface>, + ) -> bool; + + /// Called to look up any statically defined or memorized paths to known nodes. + async fn get_path_hints( + &self, + id: &Identity, + ) -> Option< + Vec<( + Endpoint, + Option, + Option, + )>, + >; +} + /// Interface between VL1 and higher/inner protocol layers. /// /// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but /// it could also be implemented for testing or "off label" use of VL1 to carry different protocols. #[async_trait] -pub trait InnerProtocolInterface: Sync + Send + 'static { +pub trait InnerProtocol: Sync + Send + 'static { /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). - async fn handle_packet(&self, source: &Peer, source_path: &Path, verb: u8, payload: &PacketBuffer) - -> bool; + async fn handle_packet( + &self, + source: &Peer, + source_path: &Path, + verb: u8, + payload: &PacketBuffer, + ) -> bool; /// Handle errors, returning true if the error was recognized. - async fn handle_error( + async fn handle_error( &self, - source: &Peer, - source_path: &Path, + source: &Peer, + source_path: &Path, in_re_verb: u8, in_re_message_id: u64, error_code: u8, @@ -125,10 +150,10 @@ pub trait InnerProtocolInterface: Sync + Send + 'static { ) -> bool; /// Handle an OK, returing true if the OK was recognized. - async fn handle_ok( + async fn handle_ok( &self, - source: &Peer, - source_path: &Path, + source: &Peer, + source_path: &Path, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, @@ -152,20 +177,20 @@ struct BackgroundTaskIntervals { whois_service: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>, } -struct RootInfo { +struct RootInfo { sets: HashMap, - roots: HashMap>, Vec>, + roots: HashMap>, Vec>, this_root_sets: Option>, sets_modified: bool, online: bool, } -enum PathKey<'a, SI: SystemInterface> { - Copied(Endpoint, SI::LocalSocket), - Ref(&'a Endpoint, &'a SI::LocalSocket), +enum PathKey<'a, HostSystemImpl: HostSystem> { + Copied(Endpoint, HostSystemImpl::LocalSocket), + Ref(&'a Endpoint, &'a HostSystemImpl::LocalSocket), } -impl<'a, SI: SystemInterface> Hash for PathKey<'a, SI> { +impl<'a, HostSystemImpl: HostSystem> Hash for PathKey<'a, HostSystemImpl> { fn hash(&self, state: &mut H) { match self { Self::Copied(ep, ls) => { @@ -180,7 +205,7 @@ impl<'a, SI: SystemInterface> Hash for PathKey<'a, SI> { } } -impl<'a, SI: SystemInterface> PartialEq for PathKey<'_, SI> { +impl<'a, HostSystemImpl: HostSystem> PartialEq for PathKey<'_, HostSystemImpl> { fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2), @@ -191,11 +216,11 @@ impl<'a, SI: SystemInterface> PartialEq for PathKey<'_, SI> { } } -impl<'a, SI: SystemInterface> Eq for PathKey<'_, SI> {} +impl<'a, HostSystemImpl: HostSystem> Eq for PathKey<'_, HostSystemImpl> {} -impl<'a, SI: SystemInterface> PathKey<'a, SI> { +impl<'a, HostSystemImpl: HostSystem> PathKey<'a, HostSystemImpl> { #[inline(always)] - fn local_socket(&self) -> &SI::LocalSocket { + fn local_socket(&self) -> &HostSystemImpl::LocalSocket { match self { Self::Copied(_, ls) => ls, Self::Ref(_, ls) => *ls, @@ -203,16 +228,16 @@ impl<'a, SI: SystemInterface> PathKey<'a, SI> { } #[inline(always)] - fn to_copied(&self) -> PathKey<'static, SI> { + fn to_copied(&self) -> PathKey<'static, HostSystemImpl> { match self { - Self::Copied(ep, ls) => PathKey::<'static, SI>::Copied(ep.clone(), ls.clone()), - Self::Ref(ep, ls) => PathKey::<'static, SI>::Copied((*ep).clone(), (*ls).clone()), + Self::Copied(ep, ls) => PathKey::<'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()), + Self::Ref(ep, ls) => PathKey::<'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()), } } } /// A VL1 global P2P network node. -pub struct Node { +pub struct Node { /// A random ID generated to identify this particular running instance. pub instance_id: [u8; 16], @@ -223,16 +248,16 @@ pub struct Node { intervals: Mutex, /// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use. - paths: parking_lot::RwLock, Arc>>>, + paths: parking_lot::RwLock, Arc>>>, /// Peers with which we are currently communicating. - peers: parking_lot::RwLock>>>, + peers: parking_lot::RwLock>>>, /// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions. - roots: RwLock>, + roots: RwLock>, /// Current best root. - best_root: RwLock>>>, + best_root: RwLock>>>, /// Identity lookup queue, also holds packets waiting on a lookup. whois: WhoisQueue, @@ -241,17 +266,22 @@ pub struct Node { buffer_pool: PacketBufferPool, } -impl Node { - pub async fn new(si: &SI, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { +impl Node { + pub async fn new( + host_system: &HostSystemImpl, + storage: &StorageImpl, + auto_generate_identity: bool, + auto_upgrade_identity: bool, + ) -> Result { let mut id = { - let id = si.load_node_identity().await; + let id = storage.load_node_identity().await; if id.is_none() { if !auto_generate_identity { return Err(InvalidParameterError("no identity found and auto-generate not enabled")); } else { let id = Identity::generate(); - si.event(Event::IdentityAutoGenerated(id.clone())); - si.save_node_identity(&id).await; + host_system.event(Event::IdentityAutoGenerated(id.clone())); + storage.save_node_identity(&id).await; id } } else { @@ -262,12 +292,12 @@ impl Node { if auto_upgrade_identity { let old = id.clone(); if id.upgrade()? { - si.save_node_identity(&id).await; - si.event(Event::IdentityAutoUpgraded(old, id.clone())); + storage.save_node_identity(&id).await; + host_system.event(Event::IdentityAutoUpgraded(old, id.clone())); } } - debug_event!(si, "[vl1] loaded identity {}", id.to_string()); + debug_event!(host_system, "[vl1] loaded identity {}", id.to_string()); Ok(Self { instance_id: random::get_bytes_secure(), @@ -293,7 +323,7 @@ impl Node { self.buffer_pool.get() } - pub fn peer(&self, a: Address) -> Option>> { + pub fn peer(&self, a: Address) -> Option>> { self.peers.read().get(&a).cloned() } @@ -301,7 +331,7 @@ impl Node { self.roots.read().online } - fn update_best_root(&self, si: &SI, time_ticks: i64) { + fn update_best_root(&self, host_system: &HostSystemImpl, time_ticks: i64) { let roots = self.roots.read(); // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison @@ -321,7 +351,7 @@ impl Node { if let Some(best_root) = best_root.as_mut() { if !Arc::ptr_eq(best_root, best) { debug_event!( - si, + host_system, "[vl1] new best root: {} (replaced {})", best.identity.address.to_string(), best_root.identity.address.to_string() @@ -329,12 +359,20 @@ impl Node { *best_root = best.clone(); } } else { - debug_event!(si, "[vl1] new best root: {} (was empty)", best.identity.address.to_string()); + debug_event!( + host_system, + "[vl1] new best root: {} (was empty)", + best.identity.address.to_string() + ); let _ = best_root.insert(best.clone()); } } else { if let Some(old_best) = self.best_root.write().take() { - debug_event!(si, "[vl1] new best root: NONE (replaced {})", old_best.identity.address.to_string()); + debug_event!( + host_system, + "[vl1] new best root: NONE (replaced {})", + old_best.identity.address.to_string() + ); } } @@ -343,17 +381,17 @@ impl Node { if !roots.online { drop(roots); self.roots.write().online = true; - si.event(Event::Online(true)); + host_system.event(Event::Online(true)); } } else if roots.online { drop(roots); self.roots.write().online = false; - si.event(Event::Online(false)); + host_system.event(Event::Online(false)); } } - pub async fn do_background_tasks(&self, si: &SI) -> Duration { - let tt = si.time_ticks(); + pub async fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { + let tt = host_system.time_ticks(); let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_service) = { let mut intervals = self.intervals.lock(); ( @@ -372,7 +410,7 @@ impl Node { } debug_event!( - si, + host_system, "[vl1] do_background_tasks:{}{}{}{}{}{} ----", if root_sync { " root_sync" @@ -416,7 +454,7 @@ impl Node { false } } { - debug_event!(si, "[vl1] root sets modified, synchronizing internal data structures"); + debug_event!(host_system, "[vl1] root sets modified, synchronizing internal data structures"); let (mut old_root_identities, address_collisions, new_roots, bad_identities, my_root_sets) = { let roots = self.roots.read(); @@ -456,7 +494,7 @@ impl Node { if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) && !m.identity.eq(&self.identity) { debug_event!( - si, + host_system, "[vl1] examining root {} with {} endpoints", m.identity.address.to_string(), m.endpoints.as_ref().map_or(0, |e| e.len()) @@ -465,7 +503,7 @@ impl Node { if let Some(peer) = peers.get(&m.identity.address) { new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); } else { - if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), tt) { + if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), tt) { new_roots.insert( parking_lot::RwLockUpgradableReadGuard::upgrade(peers) .entry(m.identity.address) @@ -485,13 +523,13 @@ impl Node { }; for c in address_collisions.iter() { - si.event(Event::SecurityWarning(format!( + host_system.event(Event::SecurityWarning(format!( "address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string() ))); } for i in bad_identities.iter() { - si.event(Event::SecurityWarning(format!( + host_system.event(Event::SecurityWarning(format!( "bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string() ))); @@ -505,11 +543,11 @@ impl Node { let mut roots = self.roots.write(); roots.roots = new_roots; roots.this_root_sets = my_root_sets; - si.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); + host_system.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); } } - self.update_best_root(si, tt); + self.update_best_root(host_system, tt); } // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint @@ -528,12 +566,12 @@ impl Node { for (root, endpoints) in roots.iter() { for ep in endpoints.iter() { debug_event!( - si, + host_system, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL ); - root.send_hello(si, self, Some(ep)).await; + root.send_hello(host_system, self, Some(ep)).await; } } } @@ -545,7 +583,7 @@ impl Node { { let roots = self.roots.read(); for (a, peer) in self.peers.read().iter() { - if !peer.service(si, self, tt) && !roots.roots.contains_key(peer) { + if !peer.service(host_system, self, tt) && !roots.roots.contains_key(peer) { dead_peers.push(*a); } } @@ -561,7 +599,7 @@ impl Node { let mut dead_paths = Vec::new(); let mut need_keepalive = Vec::new(); for (k, path) in self.paths.read().iter() { - if si.local_socket_is_valid(k.local_socket()) { + if host_system.local_socket_is_valid(k.local_socket()) { match path.service(tt) { PathServiceResult::Ok => {} PathServiceResult::Dead => dead_paths.push(k.to_copied()), @@ -575,32 +613,32 @@ impl Node { self.paths.write().remove(dp); } let ka = [tt as u8]; // send different bytes every time for keepalive in case some things filter zero packets - let ka2 = [&ka[..1]]; - for ka in need_keepalive.iter() { - si.wire_send(&ka.endpoint, Some(&ka.local_socket), Some(&ka.local_interface), &ka2, 0) + for p in need_keepalive.iter() { + host_system + .wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &ka[..1], 0) .await; } } if whois_service { - self.whois.service(si, self, tt); + self.whois.service(host_system, self, tt); } - debug_event!(si, "[vl1] do_background_tasks DONE ----"); + debug_event!(host_system, "[vl1] do_background_tasks DONE ----"); Duration::from_millis(1000) } - pub async fn handle_incoming_physical_packet( + pub async fn handle_incoming_physical_packet( &self, - si: &SI, - ph: &PH, + host_system: &HostSystemImpl, + inner: &InnerProtocolImpl, source_endpoint: &Endpoint, - source_local_socket: &SI::LocalSocket, - source_local_interface: &SI::LocalInterface, + source_local_socket: &HostSystemImpl::LocalSocket, + source_local_interface: &HostSystemImpl::LocalInterface, mut data: PooledPacketBuffer, ) { debug_event!( - si, + host_system, "[vl1] {} -> #{} {}->{} length {} (on socket {}@{})", source_endpoint.to_string(), data.bytes_fixed_at::<8>(0) @@ -614,7 +652,7 @@ impl Node { if let Ok(fragment_header) = data.struct_mut_at::(0) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { - let time_ticks = si.time_ticks(); + let time_ticks = host_system.time_ticks(); if dest == self.identity.address { let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks); path.log_receive_anything(time_ticks); @@ -623,7 +661,7 @@ impl Node { #[cfg(debug_assertions)] let fragment_header_id = u64::from_be_bytes(fragment_header.id); debug_event!( - si, + host_system, "[vl1] #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), @@ -639,15 +677,15 @@ impl Node { ) { if let Some(frag0) = assembled_packet.frags[0].as_ref() { #[cfg(debug_assertions)] - debug_event!(si, "[vl1] #{:0>16x} packet fully assembled!", fragment_header_id); + debug_event!(host_system, "[vl1] #{:0>16x} packet fully assembled!", fragment_header_id); if let Ok(packet_header) = frag0.struct_at::(0) { if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { peer.receive( self, - si, - ph, + host_system, + inner, time_ticks, &path, packet_header, @@ -656,7 +694,8 @@ impl Node { ) .await; } else { - self.whois.query(self, si, source, Some(QueuedPacket::Fragmented(assembled_packet))); + self.whois + .query(self, host_system, source, Some(QueuedPacket::Fragmented(assembled_packet))); } } } @@ -665,14 +704,14 @@ impl Node { } else { #[cfg(debug_assertions)] if let Ok(packet_header) = data.struct_at::(0) { - debug_event!(si, "[vl1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id)); + debug_event!(host_system, "[vl1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id)); if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.receive(self, si, ph, time_ticks, &path, packet_header, data.as_ref(), &[]) + peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]) .await; } else { - self.whois.query(self, si, source, Some(QueuedPacket::Unfragmented(data))); + self.whois.query(self, host_system, source, Some(QueuedPacket::Unfragmented(data))); } } } @@ -686,7 +725,7 @@ impl Node { { debug_packet_id = u64::from_be_bytes(fragment_header.id); debug_event!( - si, + host_system, "[vl1] #{:0>16x} forwarding packet fragment to {}", debug_packet_id, dest.to_string() @@ -694,7 +733,7 @@ impl Node { } if fragment_header.increment_hops() > v1::FORWARD_MAX_HOPS { #[cfg(debug_assertions)] - debug_event!(si, "[vl1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); + debug_event!(host_system, "[vl1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); return; } } else { @@ -702,12 +741,17 @@ impl Node { #[cfg(debug_assertions)] { debug_packet_id = u64::from_be_bytes(packet_header.id); - debug_event!(si, "[vl1] #{:0>16x} forwarding packet to {}", debug_packet_id, dest.to_string()); + debug_event!( + host_system, + "[vl1] #{:0>16x} forwarding packet to {}", + debug_packet_id, + dest.to_string() + ); } if packet_header.increment_hops() > v1::FORWARD_MAX_HOPS { #[cfg(debug_assertions)] debug_event!( - si, + host_system, "[vl1] #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(packet_header.id) ); @@ -720,9 +764,9 @@ impl Node { if let Some(peer) = self.peer(dest) { // TODO: SHOULD we forward? Need a way to check. - peer.forward(si, time_ticks, data.as_ref()).await; + peer.forward(host_system, time_ticks, data.as_ref()).await; #[cfg(debug_assertions)] - debug_event!(si, "[vl1] #{:0>16x} forwarded successfully", debug_packet_id); + debug_event!(host_system, "[vl1] #{:0>16x} forwarded successfully", debug_packet_id); } } } @@ -730,16 +774,21 @@ impl Node { } /// Get the current "best" root from among this node's trusted roots. - pub fn best_root(&self) -> Option>> { + pub fn best_root(&self) -> Option>> { self.best_root.read().clone() } /// Check whether this peer is a root according to any root set trusted by this node. - pub fn is_peer_root(&self, peer: &Peer) -> bool { + pub fn is_peer_root(&self, peer: &Peer) -> bool { self.roots.read().roots.keys().any(|p| p.identity.eq(&peer.identity)) } - /// Called when a remote node sends us a root set update. + /// Called when a remote node sends us a root set update, applying the update if it is valid and applicable. + /// + /// This will only replace an existing root set with a newer one. It won't add a new root set, which must be + /// done by an authorized user or administrator not just by a root. + /// + /// SECURITY NOTE: this DOES NOT validate certificates in the supplied root set! Caller must do that first! pub(crate) fn remote_update_root_set(&self, received_from: &Identity, rs: RootSet) { let mut roots = self.roots.write(); if let Some(entry) = roots.sets.get_mut(&rs.name) { @@ -776,11 +825,6 @@ impl Node { self.roots.read().sets.values().cloned().collect() } - /// Get the root set(s) to which this node belongs if it is a root. - pub(crate) fn this_root_sets_as_bytes(&self) -> Option> { - self.roots.read().this_root_sets.clone() - } - /// Returns true if this node is a member of a root set (that it knows about). pub fn this_node_is_root(&self) -> bool { self.roots.read().this_root_sets.is_some() @@ -790,10 +834,10 @@ impl Node { pub(crate) fn canonical_path( &self, ep: &Endpoint, - local_socket: &SI::LocalSocket, - local_interface: &SI::LocalInterface, + local_socket: &HostSystemImpl::LocalSocket, + local_interface: &HostSystemImpl::LocalInterface, time_ticks: i64, - ) -> Arc> { + ) -> Arc> { if let Some(path) = self.paths.read().get(&PathKey::Ref(ep, local_socket)) { path.clone() } else { diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index 90390089f..4b85ea144 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -25,18 +25,23 @@ pub(crate) enum PathServiceResult { /// These are maintained in Node and canonicalized so that all unique paths have /// one and only one unique path object. That enables statistics to be tracked /// for them and uniform application of things like keepalives. -pub struct Path { +pub struct Path { pub endpoint: Endpoint, - pub local_socket: SI::LocalSocket, - pub local_interface: SI::LocalInterface, + pub local_socket: HostSystemImpl::LocalSocket, + pub local_interface: HostSystemImpl::LocalInterface, last_send_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64, create_time_ticks: i64, fragmented_packets: Mutex>, } -impl Path { - pub fn new(endpoint: Endpoint, local_socket: SI::LocalSocket, local_interface: SI::LocalInterface, time_ticks: i64) -> Self { +impl Path { + pub fn new( + endpoint: Endpoint, + local_socket: HostSystemImpl::LocalSocket, + local_interface: HostSystemImpl::LocalInterface, + time_ticks: i64, + ) -> Self { Self { endpoint, local_socket, diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index 0895c392b..fb96b1aa4 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -25,11 +25,11 @@ use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000; -pub struct Peer { +pub struct Peer { pub identity: Identity, static_symmetric_key: SymmetricSecret, - paths: Mutex>>, + paths: Mutex>>, pub(crate) last_send_time_ticks: AtomicI64, pub(crate) last_receive_time_ticks: AtomicI64, @@ -42,8 +42,8 @@ pub struct Peer { remote_node_info: RwLock, } -struct PeerPath { - path: Weak>, +struct PeerPath { + path: Weak>, last_receive_time_ticks: i64, } @@ -53,6 +53,846 @@ struct RemoteNodeInfo { remote_version: (u8, u8, u16), } +/// Sort a list of paths by quality or priority, with best paths first. +fn prioritize_paths(paths: &mut Vec>) { + paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); +} + +impl Peer { + /// Create a new peer. + /// + /// This only returns None if this_node_identity does not have its secrets or if some + /// fatal error occurs performing key agreement between the two identities. + pub(crate) fn new(this_node_identity: &Identity, id: Identity, time_ticks: i64) -> Option { + this_node_identity.agree(&id).map(|static_secret| -> Self { + Self { + identity: id, + static_symmetric_key: SymmetricSecret::new(static_secret), + paths: Mutex::new(Vec::with_capacity(4)), + last_send_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), + last_receive_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), + last_forward_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), + last_hello_reply_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), + create_time_ticks: time_ticks, + random_ticks_offset: random::xorshift64_random() as u32, + message_id_counter: AtomicU64::new(random::xorshift64_random()), + remote_node_info: RwLock::new(RemoteNodeInfo { + reported_local_endpoints: HashMap::new(), + remote_protocol_version: 0, + remote_version: (0, 0, 0), + }), + } + }) + } + + /// Get the remote version of this peer: major, minor, revision. + /// Returns None if it's not yet known. + pub fn version(&self) -> Option<(u8, u8, u16)> { + let rv = self.remote_node_info.read().remote_version; + if rv.0 != 0 || rv.1 != 0 || rv.2 != 0 { + Some(rv) + } else { + None + } + } + + /// Get the remote protocol version of this peer or None if not yet known. + pub fn protocol_version(&self) -> Option { + let pv = self.remote_node_info.read().remote_protocol_version; + if pv != 0 { + Some(pv) + } else { + None + } + } + + /// Get the next message ID for sending a message to this peer. + #[inline(always)] + pub(crate) fn next_message_id(&self) -> MessageId { + self.message_id_counter.fetch_add(1, Ordering::SeqCst) + } + + /// Get current best path or None if there are no direct paths to this peer. + pub fn direct_path(&self) -> Option>> { + for p in self.paths.lock().iter() { + let pp = p.path.upgrade(); + if pp.is_some() { + return pp; + } + } + return None; + } + + /// Get either the current best direct path or an indirect path via e.g. a root. + pub fn path(&self, node: &Node) -> Option>> { + let direct_path = self.direct_path(); + if direct_path.is_some() { + return direct_path; + } + if let Some(root) = node.best_root() { + return root.direct_path(); + } + return None; + } + + pub(crate) fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc>, time_ticks: i64) { + let mut paths = self.paths.lock(); + + match &new_path.endpoint { + Endpoint::IpUdp(new_ip) => { + // If this is an IpUdp endpoint, scan the existing paths and replace any that come from + // the same IP address but a different port. This prevents the accumulation of duplicate + // paths to the same peer over different ports. + for pi in paths.iter_mut() { + if std::ptr::eq(pi.path.as_ptr(), new_path.as_ref()) { + return; + } + if let Some(p) = pi.path.upgrade() { + match &p.endpoint { + Endpoint::IpUdp(existing_ip) => { + if existing_ip.ip_bytes().eq(new_ip.ip_bytes()) { + debug_event!( + host_system, + "[vl1] {} replacing path {} with {} (same IP, different port)", + self.identity.address.to_string(), + p.endpoint.to_string(), + new_path.endpoint.to_string() + ); + pi.path = Arc::downgrade(new_path); + pi.last_receive_time_ticks = time_ticks; + prioritize_paths(&mut paths); + return; + } + } + _ => {} + } + } + } + } + _ => { + for pi in paths.iter() { + if std::ptr::eq(pi.path.as_ptr(), new_path.as_ref()) { + return; + } + } + } + } + + // Learn new path if it's not a duplicate or should not replace an existing path. + debug_event!( + host_system, + "[vl1] {} learned new path: {}", + self.identity.address.to_string(), + new_path.endpoint.to_string() + ); + paths.push(PeerPath:: { + path: Arc::downgrade(new_path), + last_receive_time_ticks: time_ticks, + }); + prioritize_paths(&mut paths); + } + + /// Called every SERVICE_INTERVAL_MS by the background service loop in Node. + pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool { + { + let mut paths = self.paths.lock(); + paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); + prioritize_paths(&mut paths); + } + self.remote_node_info + .write() + .reported_local_endpoints + .retain(|_, ts| (time_ticks - *ts) < PEER_EXPIRATION_TIME); + (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME + } + + async fn internal_send( + &self, + host_system: &HostSystemImpl, + endpoint: &Endpoint, + local_socket: Option<&HostSystemImpl::LocalSocket>, + local_interface: Option<&HostSystemImpl::LocalInterface>, + max_fragment_size: usize, + packet: &PacketBuffer, + ) -> bool { + let packet_size = packet.len(); + if packet_size > max_fragment_size { + let bytes = packet.as_bytes(); + if !host_system + .wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0) + .await + { + return false; + } + let mut pos = UDP_DEFAULT_MTU; + + let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32; + let fragment_count = (overrun_size / (UDP_DEFAULT_MTU - v1::FRAGMENT_HEADER_SIZE) as u32) + + (((overrun_size % (UDP_DEFAULT_MTU - v1::FRAGMENT_HEADER_SIZE) as u32) != 0) as u32); + debug_assert!(fragment_count <= v1::FRAGMENT_COUNT_MAX as u32); + + let mut header = v1::FragmentHeader { + id: *packet.bytes_fixed_at(0).unwrap(), + dest: *packet.bytes_fixed_at(v1::DESTINATION_INDEX).unwrap(), + fragment_indicator: v1::FRAGMENT_INDICATOR, + total_and_fragment_no: ((fragment_count + 1) << 4) as u8, + reserved_hops: 0, + }; + + let mut chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE); + let mut tmp_buf: [u8; v1::SIZE_MAX] = unsafe { std::mem::MaybeUninit::uninit().assume_init() }; + loop { + header.total_and_fragment_no += 1; + let next_pos = pos + chunk_size; + let fragment_size = v1::FRAGMENT_HEADER_SIZE + chunk_size; + tmp_buf[..v1::FRAGMENT_HEADER_SIZE].copy_from_slice(header.as_bytes()); + tmp_buf[v1::FRAGMENT_HEADER_SIZE..fragment_size].copy_from_slice(&bytes[pos..next_pos]); + if !host_system + .wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0) + .await + { + return false; + } + pos = next_pos; + if pos < packet_size { + chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE); + } else { + return true; + } + } + } else { + return host_system + .wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0) + .await; + } + } + + /// Send a packet to this peer, returning true on (potential) success. + /// + /// This will go directly if there is an active path, or otherwise indirectly + /// via a root or some other route. + /// + /// It encrypts and sets the MAC and cipher fields and packet ID and other things. + pub(crate) async fn send( + &self, + host_system: &HostSystemImpl, + path: Option<&Arc>>, + node: &Node, + time_ticks: i64, + packet: &mut PacketBuffer, + ) -> bool { + let mut _path_arc = None; + let path = if let Some(path) = path { + path + } else { + _path_arc = self.path(node); + if let Some(path) = _path_arc.as_ref() { + path + } else { + return false; + } + }; + + let max_fragment_size = if path.endpoint.requires_fragmentation() { + UDP_DEFAULT_MTU + } else { + usize::MAX + }; + let flags_cipher_hops = if packet.len() > max_fragment_size { + v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV + } else { + v1::CIPHER_AES_GMAC_SIV + }; + + let mut aes_gmac_siv = self.static_symmetric_key.aes_gmac_siv.get(); + aes_gmac_siv.encrypt_init(&self.next_message_id().to_ne_bytes()); + aes_gmac_siv.encrypt_set_aad(&v1::get_packet_aad_bytes( + self.identity.address, + node.identity.address, + flags_cipher_hops, + )); + if let Ok(payload) = packet.as_bytes_starting_at_mut(v1::HEADER_SIZE) { + aes_gmac_siv.encrypt_first_pass(payload); + aes_gmac_siv.encrypt_first_pass_finish(); + aes_gmac_siv.encrypt_second_pass_in_place(payload); + let tag = aes_gmac_siv.encrypt_second_pass_finish(); + let header = packet.struct_mut_at::(0).unwrap(); + header.id = *array_range::(tag); + header.dest = self.identity.address.to_bytes(); + header.src = node.identity.address.to_bytes(); + header.flags_cipher_hops = flags_cipher_hops; + header.mac = *array_range::(tag); + } else { + return false; + } + + if self + .internal_send( + host_system, + &path.endpoint, + Some(&path.local_socket), + Some(&path.local_interface), + max_fragment_size, + packet, + ) + .await + { + self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); + return true; + } + + return false; + } + + /// Forward a packet to this peer. + /// + /// This is called when we receive a packet not addressed to this node and + /// want to pass it along. + /// + /// This doesn't fragment large packets since fragments are forwarded individually. + /// Intermediates don't need to adjust fragmentation. + pub(crate) async fn forward(&self, host_system: &HostSystemImpl, time_ticks: i64, packet: &PacketBuffer) -> bool { + if let Some(path) = self.direct_path() { + if host_system + .wire_send( + &path.endpoint, + Some(&path.local_socket), + Some(&path.local_interface), + packet.as_bytes(), + 0, + ) + .await + { + self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); + return true; + } + } + return false; + } + + /// Send a HELLO to this peer. + /// + /// If explicit_endpoint is not None the packet will be sent directly to this endpoint. + /// Otherwise it will be sent via the best direct or indirect path known. + /// + /// Unlike other messages HELLO is sent partially in the clear and always with the long-lived + /// static identity key. Authentication in old versions is via Poly1305 and in new versions + /// via HMAC-SHA512. + pub(crate) async fn send_hello( + &self, + host_system: &HostSystemImpl, + node: &Node, + explicit_endpoint: Option<&Endpoint>, + ) -> bool { + let mut path = None; + let destination = if let Some(explicit_endpoint) = explicit_endpoint { + explicit_endpoint + } else { + if let Some(p) = self.path(node) { + let _ = path.insert(p); + &path.as_ref().unwrap().endpoint + } else { + return false; + } + }; + + let max_fragment_size = if destination.requires_fragmentation() { + UDP_DEFAULT_MTU + } else { + usize::MAX + }; + let time_ticks = host_system.time_ticks(); + + let mut packet = PacketBuffer::new(); + { + let message_id = self.next_message_id(); + + { + let f: &mut (v1::PacketHeader, v1::message_component_structs::HelloFixedHeaderFields) = + packet.append_struct_get_mut().unwrap(); + f.0.id = message_id.to_ne_bytes(); + f.0.dest = self.identity.address.to_bytes(); + f.0.src = node.identity.address.to_bytes(); + f.0.flags_cipher_hops = v1::CIPHER_NOCRYPT_POLY1305; + f.1.verb = verbs::VL1_HELLO | v1::VERB_FLAG_EXTENDED_AUTHENTICATION; + f.1.version_proto = PROTOCOL_VERSION; + f.1.version_major = VERSION_MAJOR; + f.1.version_minor = VERSION_MINOR; + f.1.version_revision = VERSION_REVISION.to_be_bytes(); + f.1.timestamp = (time_ticks as u64).wrapping_add(self.random_ticks_offset as u64).to_be_bytes(); + } + + debug_assert_eq!(packet.len(), 41); + assert!(packet.append_bytes((&node.identity.to_public_bytes()).into()).is_ok()); + + let (_, poly1305_key) = salsa_poly_create( + &self.static_symmetric_key, + packet.struct_at::(0).unwrap(), + packet.len(), + ); + let mac = poly1305::compute(&poly1305_key, packet.as_bytes_starting_at(v1::HEADER_SIZE).unwrap()); + packet.as_mut()[v1::MAC_FIELD_INDEX..v1::MAC_FIELD_INDEX + 8].copy_from_slice(&mac[0..8]); + + self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); + + debug_event!( + host_system, + "HELLO -> {} @ {} ({} bytes)", + self.identity.address.to_string(), + destination.to_string(), + packet.len() + ); + } + + if let Some(p) = path.as_ref() { + if self + .internal_send( + host_system, + destination, + Some(&p.local_socket), + Some(&p.local_interface), + max_fragment_size, + &packet, + ) + .await + { + p.log_send_anything(time_ticks); + true + } else { + false + } + } else { + self.internal_send(host_system, destination, None, None, max_fragment_size, &packet) + .await + } + } + + /// Receive, decrypt, authenticate, and process an incoming packet from this peer. + /// + /// If the packet comes in multiple fragments, the fragments slice should contain all + /// those fragments after the main packet header and first chunk. + /// + /// This returns true if the packet decrypted and passed authentication. + pub(crate) async fn receive( + &self, + node: &Node, + host_system: &HostSystemImpl, + inner: &InnerProtocolImpl, + time_ticks: i64, + source_path: &Arc>, + packet_header: &v1::PacketHeader, + frag0: &PacketBuffer, + fragments: &[Option], + ) -> bool { + if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(v1::VERB_INDEX) { + let mut payload = PacketBuffer::new(); + + let message_id = if let Some(message_id2) = try_aead_decrypt( + &self.static_symmetric_key, + packet_frag0_payload_bytes, + packet_header, + fragments, + &mut payload, + ) { + // Decryption successful with static secret. + message_id2 + } else { + // Packet failed to decrypt using either ephemeral or permament key, reject. + debug_event!( + host_system, + "[vl1] #{:0>16x} failed authentication", + u64::from_be_bytes(packet_header.id) + ); + return false; + }; + + if let Ok(mut verb) = payload.u8_at(0) { + if (verb & v1::VERB_FLAG_COMPRESSED) != 0 { + let mut decompressed_payload = [0u8; v1::SIZE_MAX]; + decompressed_payload[0] = verb; + if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) { + payload.set_to(&decompressed_payload[..(dlen + 1)]); + } else { + return false; + } + } + + // --------------------------------------------------------------- + // If we made it here it decrypted and passed authentication. + // --------------------------------------------------------------- + + self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); + + let mut path_is_known = false; + for p in self.paths.lock().iter_mut() { + if std::ptr::eq(p.path.as_ptr(), source_path.as_ref()) { + p.last_receive_time_ticks = time_ticks; + path_is_known = true; + break; + } + } + + verb &= v1::VERB_MASK; // mask off flags + debug_event!( + host_system, + "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", + u64::from_be_bytes(packet_header.id), + verbs::name(verb), + verb as u32 + ); + + if match verb { + verbs::VL1_NOP => true, + verbs::VL1_HELLO => { + self.handle_incoming_hello( + host_system, + inner, + node, + time_ticks, + message_id, + source_path, + packet_header.hops(), + &payload, + ) + .await + } + verbs::VL1_ERROR => { + self.handle_incoming_error(host_system, inner, node, time_ticks, source_path, &payload) + .await + } + verbs::VL1_OK => { + self.handle_incoming_ok( + host_system, + inner, + node, + time_ticks, + source_path, + packet_header.hops(), + path_is_known, + &payload, + ) + .await + } + verbs::VL1_WHOIS => { + self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload) + .await + } + verbs::VL1_RENDEZVOUS => { + self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload) + .await + } + verbs::VL1_ECHO => { + self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload) + .await + } + verbs::VL1_PUSH_DIRECT_PATHS => { + self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload) + .await + } + verbs::VL1_USER_MESSAGE => { + self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload) + .await + } + _ => inner.handle_packet(self, &source_path, verb, &payload).await, + } { + return true; + } + } + } + return false; + } + + async fn handle_incoming_hello( + &self, + host_system: &HostSystemImpl, + inner: &InnerProtocolImpl, + node: &Node, + time_ticks: i64, + message_id: MessageId, + source_path: &Arc>, + hops: u8, + payload: &PacketBuffer, + ) -> bool { + if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { + debug_event!( + host_system, + "[vl1] dropping HELLO from {} due to lack of trust relationship", + self.identity.address.to_string() + ); + return true; // packet wasn't invalid, just ignored + } + + let mut cursor = 0; + if let Ok(hello_fixed_headers) = payload.read_struct::(&mut cursor) { + if let Ok(identity) = Identity::read_bytes(&mut BufferReader::new(payload, &mut cursor)) { + if identity.eq(&self.identity) { + { + let mut remote_node_info = self.remote_node_info.write(); + remote_node_info.remote_protocol_version = hello_fixed_headers.version_proto; + remote_node_info.remote_version = ( + hello_fixed_headers.version_major, + hello_fixed_headers.version_minor, + u16::from_be_bytes(hello_fixed_headers.version_revision), + ); + } + + let mut packet = PacketBuffer::new(); + packet.set_size(v1::HEADER_SIZE); + { + let f: &mut ( + v1::message_component_structs::OkHeader, + v1::message_component_structs::OkHelloFixedHeaderFields, + ) = packet.append_struct_get_mut().unwrap(); + f.0.verb = verbs::VL1_OK; + f.0.in_re_verb = verbs::VL1_HELLO; + f.0.in_re_message_id = message_id.to_ne_bytes(); + f.1.timestamp_echo = hello_fixed_headers.timestamp; + f.1.version_proto = PROTOCOL_VERSION; + f.1.version_major = VERSION_MAJOR; + f.1.version_minor = VERSION_MINOR; + f.1.version_revision = VERSION_REVISION.to_be_bytes(); + } + + return self.send(host_system, Some(source_path), node, time_ticks, &mut packet).await; + } + } + } + return false; + } + + async fn handle_incoming_error( + &self, + _: &HostSystemImpl, + inner: &InnerProtocolImpl, + _: &Node, + _: i64, + source_path: &Arc>, + payload: &PacketBuffer, + ) -> bool { + let mut cursor = 0; + if let Ok(error_header) = payload.read_struct::(&mut cursor) { + let in_re_message_id: MessageId = u64::from_ne_bytes(error_header.in_re_message_id); + if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { + match error_header.in_re_verb { + _ => { + return inner + .handle_error( + self, + &source_path, + error_header.in_re_verb, + in_re_message_id, + error_header.error_code, + payload, + &mut cursor, + ) + .await; + } + } + } + } + return false; + } + + async fn handle_incoming_ok( + &self, + host_system: &HostSystemImpl, + inner: &InnerProtocolImpl, + node: &Node, + time_ticks: i64, + source_path: &Arc>, + hops: u8, + path_is_known: bool, + payload: &PacketBuffer, + ) -> bool { + let mut cursor = 0; + if let Ok(ok_header) = payload.read_struct::(&mut cursor) { + let in_re_message_id: MessageId = u64::from_ne_bytes(ok_header.in_re_message_id); + if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { + match ok_header.in_re_verb { + verbs::VL1_HELLO => { + if let Ok(ok_hello_fixed_header_fields) = + payload.read_struct::(&mut cursor) + { + if hops == 0 { + if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) { + #[cfg(debug_assertions)] + let reported_endpoint2 = reported_endpoint.clone(); + if self + .remote_node_info + .write() + .reported_local_endpoints + .insert(reported_endpoint, time_ticks) + .is_none() + { + #[cfg(debug_assertions)] + debug_event!( + host_system, + "[vl1] {} reported new remote perspective, local endpoint: {}", + self.identity.address.to_string(), + reported_endpoint2.to_string() + ); + } + } + } + + if hops == 0 && !path_is_known { + self.learn_path(host_system, source_path, time_ticks); + } + + self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed); + } + } + + verbs::VL1_WHOIS => { + if node.is_peer_root(self) { + while cursor < payload.len() { + if let Ok(_whois_response) = Identity::read_bytes(&mut BufferReader::new(payload, &mut cursor)) { + // TODO + } else { + break; + } + } + } else { + return true; // not invalid, just ignored + } + } + + _ => { + return inner + .handle_ok(self, &source_path, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor) + .await; + } + } + } + } + return false; + } + + async fn handle_incoming_whois( + &self, + host_system: &HostSystemImpl, + inner: &InnerProtocolImpl, + node: &Node, + time_ticks: i64, + message_id: MessageId, + payload: &PacketBuffer, + ) -> bool { + if node.this_node_is_root() || inner.should_communicate_with(&self.identity) { + let mut packet = PacketBuffer::new(); + packet.set_size(v1::HEADER_SIZE); + { + let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap(); + f.verb = verbs::VL1_OK; + f.in_re_verb = verbs::VL1_WHOIS; + f.in_re_message_id = message_id.to_ne_bytes(); + } + + let mut cursor = 0; + while cursor < payload.len() { + if let Ok(zt_address) = Address::unmarshal(payload, &mut cursor) { + if let Some(peer) = node.peer(zt_address) { + if !packet.append_bytes((&peer.identity.to_public_bytes()).into()).is_ok() { + debug_event!(host_system, "unexpected error serializing an identity into a WHOIS packet response"); + return false; + } + } + } + } + + self.send(host_system, None, node, time_ticks, &mut packet).await + } else { + true // packet wasn't invalid, just ignored + } + } + + #[allow(unused)] + async fn handle_incoming_rendezvous( + &self, + host_system: &HostSystemImpl, + node: &Node, + time_ticks: i64, + message_id: MessageId, + source_path: &Arc>, + payload: &PacketBuffer, + ) -> bool { + if node.is_peer_root(self) {} + return true; + } + + async fn handle_incoming_echo( + &self, + host_system: &HostSystemImpl, + inner: &InnerProtocolImpl, + node: &Node, + time_ticks: i64, + message_id: MessageId, + payload: &PacketBuffer, + ) -> bool { + if inner.should_communicate_with(&self.identity) || node.is_peer_root(self) { + let mut packet = PacketBuffer::new(); + packet.set_size(v1::HEADER_SIZE); + { + let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap(); + f.verb = verbs::VL1_OK; + f.in_re_verb = verbs::VL1_ECHO; + f.in_re_message_id = message_id.to_ne_bytes(); + } + if packet.append_bytes(payload.as_bytes()).is_ok() { + self.send(host_system, None, node, time_ticks, &mut packet).await + } else { + false + } + } else { + debug_event!( + host_system, + "[vl1] dropping ECHO from {} due to lack of trust relationship", + self.identity.address.to_string() + ); + true // packet wasn't invalid, just ignored + } + } + + #[allow(unused)] + async fn handle_incoming_push_direct_paths( + &self, + host_system: &HostSystemImpl, + node: &Node, + time_ticks: i64, + source_path: &Arc>, + payload: &PacketBuffer, + ) -> bool { + false + } + + #[allow(unused)] + async fn handle_incoming_user_message( + &self, + host_system: &HostSystemImpl, + node: &Node, + time_ticks: i64, + source_path: &Arc>, + payload: &PacketBuffer, + ) -> bool { + false + } +} + +impl Hash for Peer { + #[inline(always)] + fn hash(&self, state: &mut H) { + state.write_u64(self.identity.address.into()); + } +} + +impl PartialEq for Peer { + #[inline(always)] + fn eq(&self, other: &Self) -> bool { + self.identity.fingerprint.eq(&other.identity.fingerprint) + } +} + +impl Eq for Peer {} + /// Attempt ZeroTier V1 protocol AEAD packet encryption and MAC validation. Returns message ID on success. fn try_aead_decrypt( secret: &SymmetricSecret, @@ -155,807 +995,3 @@ fn salsa_poly_create(secret: &SymmetricSecret, header: &v1::PacketHeader, packet salsa.crypt_in_place(&mut poly1305_key); (salsa, poly1305_key) } - -/// Sort a list of paths by quality or priority, with best paths first. -fn prioritize_paths(paths: &mut Vec>) { - paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); -} - -impl Peer { - /// Create a new peer. - /// - /// This only returns None if this_node_identity does not have its secrets or if some - /// fatal error occurs performing key agreement between the two identities. - pub(crate) fn new(this_node_identity: &Identity, id: Identity, time_ticks: i64) -> Option> { - this_node_identity.agree(&id).map(|static_secret| -> Self { - Self { - identity: id, - static_symmetric_key: SymmetricSecret::new(static_secret), - paths: Mutex::new(Vec::with_capacity(4)), - last_send_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), - last_receive_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), - last_forward_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), - last_hello_reply_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS), - create_time_ticks: time_ticks, - random_ticks_offset: random::xorshift64_random() as u32, - message_id_counter: AtomicU64::new(random::xorshift64_random()), - remote_node_info: RwLock::new(RemoteNodeInfo { - reported_local_endpoints: HashMap::new(), - remote_protocol_version: 0, - remote_version: (0, 0, 0), - }), - } - }) - } - - /// Get the remote version of this peer: major, minor, revision. - /// Returns None if it's not yet known. - pub fn version(&self) -> Option<(u8, u8, u16)> { - let rv = self.remote_node_info.read().remote_version; - if rv.0 != 0 || rv.1 != 0 || rv.2 != 0 { - Some(rv) - } else { - None - } - } - - /// Get the remote protocol version of this peer or None if not yet known. - pub fn protocol_version(&self) -> Option { - let pv = self.remote_node_info.read().remote_protocol_version; - if pv != 0 { - Some(pv) - } else { - None - } - } - - /// Get the next message ID for sending a message to this peer. - #[inline(always)] - pub(crate) fn next_message_id(&self) -> MessageId { - self.message_id_counter.fetch_add(1, Ordering::SeqCst) - } - - /// Get current best path or None if there are no direct paths to this peer. - pub fn direct_path(&self) -> Option>> { - for p in self.paths.lock().iter() { - let pp = p.path.upgrade(); - if pp.is_some() { - return pp; - } - } - return None; - } - - /// Get either the current best direct path or an indirect path via e.g. a root. - pub fn path(&self, node: &Node) -> Option>> { - let direct_path = self.direct_path(); - if direct_path.is_some() { - return direct_path; - } - if let Some(root) = node.best_root() { - return root.direct_path(); - } - return None; - } - - pub(crate) fn learn_path(&self, si: &SI, new_path: &Arc>, time_ticks: i64) { - let mut paths = self.paths.lock(); - - match &new_path.endpoint { - Endpoint::IpUdp(new_ip) => { - // If this is an IpUdp endpoint, scan the existing paths and replace any that come from - // the same IP address but a different port. This prevents the accumulation of duplicate - // paths to the same peer over different ports. - for pi in paths.iter_mut() { - if std::ptr::eq(pi.path.as_ptr(), new_path.as_ref()) { - return; - } - if let Some(p) = pi.path.upgrade() { - match &p.endpoint { - Endpoint::IpUdp(existing_ip) => { - if existing_ip.ip_bytes().eq(new_ip.ip_bytes()) { - debug_event!( - si, - "[vl1] {} replacing path {} with {} (same IP, different port)", - self.identity.address.to_string(), - p.endpoint.to_string(), - new_path.endpoint.to_string() - ); - pi.path = Arc::downgrade(new_path); - pi.last_receive_time_ticks = time_ticks; - prioritize_paths(&mut paths); - return; - } - } - _ => {} - } - } - } - } - _ => { - for pi in paths.iter() { - if std::ptr::eq(pi.path.as_ptr(), new_path.as_ref()) { - return; - } - } - } - } - - // Learn new path if it's not a duplicate or should not replace an existing path. - debug_event!( - si, - "[vl1] {} learned new path: {}", - self.identity.address.to_string(), - new_path.endpoint.to_string() - ); - paths.push(PeerPath:: { - path: Arc::downgrade(new_path), - last_receive_time_ticks: time_ticks, - }); - prioritize_paths(&mut paths); - } - - /// Called every SERVICE_INTERVAL_MS by the background service loop in Node. - pub(crate) fn service(&self, _: &SI, _: &Node, time_ticks: i64) -> bool { - { - let mut paths = self.paths.lock(); - paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); - prioritize_paths(&mut paths); - } - self.remote_node_info - .write() - .reported_local_endpoints - .retain(|_, ts| (time_ticks - *ts) < PEER_EXPIRATION_TIME); - (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME - } - - /// Send to an endpoint, fragmenting if needed. - /// - /// This does not set the fragmentation field in the packet header, MAC, or encrypt the packet. The sender - /// must do that while building the packet. The fragmentation flag must be set if fragmentation will be needed. - async fn internal_send( - &self, - si: &SI, - endpoint: &Endpoint, - local_socket: Option<&SI::LocalSocket>, - local_interface: Option<&SI::LocalInterface>, - max_fragment_size: usize, - packet: &PacketBuffer, - ) -> bool { - let packet_size = packet.len(); - if packet_size > max_fragment_size { - let bytes = packet.as_bytes(); - if !si - .wire_send(endpoint, local_socket, local_interface, &[&bytes[0..UDP_DEFAULT_MTU]], 0) - .await - { - return false; - } - let mut pos = UDP_DEFAULT_MTU; - - let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32; - let fragment_count = (overrun_size / (UDP_DEFAULT_MTU - v1::FRAGMENT_HEADER_SIZE) as u32) - + (((overrun_size % (UDP_DEFAULT_MTU - v1::FRAGMENT_HEADER_SIZE) as u32) != 0) as u32); - debug_assert!(fragment_count <= v1::FRAGMENT_COUNT_MAX as u32); - - let mut header = v1::FragmentHeader { - id: *packet.bytes_fixed_at(0).unwrap(), - dest: *packet.bytes_fixed_at(v1::DESTINATION_INDEX).unwrap(), - fragment_indicator: v1::FRAGMENT_INDICATOR, - total_and_fragment_no: ((fragment_count + 1) << 4) as u8, - reserved_hops: 0, - }; - - let mut chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE); - loop { - header.total_and_fragment_no += 1; - let next_pos = pos + chunk_size; - if !si - .wire_send( - endpoint, - local_socket, - local_interface, - &[header.as_bytes(), &bytes[pos..next_pos]], - 0, - ) - .await - { - return false; - } - pos = next_pos; - if pos < packet_size { - chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE); - } else { - return true; - } - } - } else { - return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0).await; - } - } - - /// Send a packet to this peer, returning true on (potential) success. - /// - /// This will go directly if there is an active path, or otherwise indirectly - /// via a root or some other route. - /// - /// It encrypts and sets the MAC and cipher fields and packet ID and other things. - pub(crate) async fn send( - &self, - si: &SI, - path: Option<&Arc>>, - node: &Node, - time_ticks: i64, - packet: &mut PacketBuffer, - ) -> bool { - let mut _path_arc = None; - let path = if let Some(path) = path { - path - } else { - _path_arc = self.path(node); - if let Some(path) = _path_arc.as_ref() { - path - } else { - return false; - } - }; - - let max_fragment_size = if path.endpoint.requires_fragmentation() { - UDP_DEFAULT_MTU - } else { - usize::MAX - }; - let flags_cipher_hops = if packet.len() > max_fragment_size { - v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV - } else { - v1::CIPHER_AES_GMAC_SIV - }; - - let mut aes_gmac_siv = self.static_symmetric_key.aes_gmac_siv.get(); - aes_gmac_siv.encrypt_init(&self.next_message_id().to_ne_bytes()); - aes_gmac_siv.encrypt_set_aad(&v1::get_packet_aad_bytes( - self.identity.address, - node.identity.address, - flags_cipher_hops, - )); - if let Ok(payload) = packet.as_bytes_starting_at_mut(v1::HEADER_SIZE) { - aes_gmac_siv.encrypt_first_pass(payload); - aes_gmac_siv.encrypt_first_pass_finish(); - aes_gmac_siv.encrypt_second_pass_in_place(payload); - let tag = aes_gmac_siv.encrypt_second_pass_finish(); - let header = packet.struct_mut_at::(0).unwrap(); - header.id = *array_range::(tag); - header.dest = self.identity.address.to_bytes(); - header.src = node.identity.address.to_bytes(); - header.flags_cipher_hops = flags_cipher_hops; - header.mac = *array_range::(tag); - } else { - return false; - } - - if self - .internal_send( - si, - &path.endpoint, - Some(&path.local_socket), - Some(&path.local_interface), - max_fragment_size, - packet, - ) - .await - { - self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); - return true; - } - - return false; - } - - /// Forward a packet to this peer. - /// - /// This is called when we receive a packet not addressed to this node and - /// want to pass it along. - /// - /// This doesn't fragment large packets since fragments are forwarded individually. - /// Intermediates don't need to adjust fragmentation. - pub(crate) async fn forward(&self, si: &SI, time_ticks: i64, packet: &PacketBuffer) -> bool { - if let Some(path) = self.direct_path() { - if si - .wire_send( - &path.endpoint, - Some(&path.local_socket), - Some(&path.local_interface), - &[packet.as_bytes()], - 0, - ) - .await - { - self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); - return true; - } - } - return false; - } - - /// Send a HELLO to this peer. - /// - /// If explicit_endpoint is not None the packet will be sent directly to this endpoint. - /// Otherwise it will be sent via the best direct or indirect path known. - /// - /// Unlike other messages HELLO is sent partially in the clear and always with the long-lived - /// static identity key. Authentication in old versions is via Poly1305 and in new versions - /// via HMAC-SHA512. - pub(crate) async fn send_hello(&self, si: &SI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { - let mut path = None; - let destination = if let Some(explicit_endpoint) = explicit_endpoint { - explicit_endpoint - } else { - if let Some(p) = self.path(node) { - let _ = path.insert(p); - &path.as_ref().unwrap().endpoint - } else { - return false; - } - }; - - let max_fragment_size = if destination.requires_fragmentation() { - UDP_DEFAULT_MTU - } else { - usize::MAX - }; - let time_ticks = si.time_ticks(); - - let mut packet = PacketBuffer::new(); - { - let message_id = self.next_message_id(); - - { - let f: &mut (v1::PacketHeader, v1::message_component_structs::HelloFixedHeaderFields) = - packet.append_struct_get_mut().unwrap(); - f.0.id = message_id.to_ne_bytes(); - f.0.dest = self.identity.address.to_bytes(); - f.0.src = node.identity.address.to_bytes(); - f.0.flags_cipher_hops = v1::CIPHER_NOCRYPT_POLY1305; - f.1.verb = verbs::VL1_HELLO | v1::VERB_FLAG_EXTENDED_AUTHENTICATION; - f.1.version_proto = PROTOCOL_VERSION; - f.1.version_major = VERSION_MAJOR; - f.1.version_minor = VERSION_MINOR; - f.1.version_revision = VERSION_REVISION.to_be_bytes(); - f.1.timestamp = (time_ticks as u64).wrapping_add(self.random_ticks_offset as u64).to_be_bytes(); - } - - debug_assert_eq!(packet.len(), 41); - assert!(packet.append_bytes((&node.identity.to_public_bytes()).into()).is_ok()); - - let (_, poly1305_key) = salsa_poly_create( - &self.static_symmetric_key, - packet.struct_at::(0).unwrap(), - packet.len(), - ); - let mac = poly1305::compute(&poly1305_key, packet.as_bytes_starting_at(v1::HEADER_SIZE).unwrap()); - packet.as_mut()[v1::MAC_FIELD_INDEX..v1::MAC_FIELD_INDEX + 8].copy_from_slice(&mac[0..8]); - - self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); - - debug_event!( - si, - "HELLO -> {} @ {} ({} bytes)", - self.identity.address.to_string(), - destination.to_string(), - packet.len() - ); - } - - if let Some(p) = path.as_ref() { - if self - .internal_send( - si, - destination, - Some(&p.local_socket), - Some(&p.local_interface), - max_fragment_size, - &packet, - ) - .await - { - p.log_send_anything(time_ticks); - true - } else { - false - } - } else { - self.internal_send(si, destination, None, None, max_fragment_size, &packet).await - } - } - - /// Receive, decrypt, authenticate, and process an incoming packet from this peer. - /// - /// If the packet comes in multiple fragments, the fragments slice should contain all - /// those fragments after the main packet header and first chunk. - /// - /// This returns true if the packet decrypted and passed authentication. - pub(crate) async fn receive( - &self, - node: &Node, - si: &SI, - ph: &PH, - time_ticks: i64, - source_path: &Arc>, - packet_header: &v1::PacketHeader, - frag0: &PacketBuffer, - fragments: &[Option], - ) -> bool { - if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(v1::VERB_INDEX) { - let mut payload = PacketBuffer::new(); - - let message_id = if let Some(message_id2) = try_aead_decrypt( - &self.static_symmetric_key, - packet_frag0_payload_bytes, - packet_header, - fragments, - &mut payload, - ) { - // Decryption successful with static secret. - message_id2 - } else { - // Packet failed to decrypt using either ephemeral or permament key, reject. - debug_event!(si, "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id)); - return false; - }; - - if let Ok(mut verb) = payload.u8_at(0) { - if (verb & v1::VERB_FLAG_COMPRESSED) != 0 { - let mut decompressed_payload = [0u8; v1::SIZE_MAX]; - decompressed_payload[0] = verb; - if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) { - payload.set_to(&decompressed_payload[..(dlen + 1)]); - } else { - return false; - } - } - - // --------------------------------------------------------------- - // If we made it here it decrypted and passed authentication. - // --------------------------------------------------------------- - - self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); - - let mut path_is_known = false; - for p in self.paths.lock().iter_mut() { - if std::ptr::eq(p.path.as_ptr(), source_path.as_ref()) { - p.last_receive_time_ticks = time_ticks; - path_is_known = true; - break; - } - } - - verb &= v1::VERB_MASK; // mask off flags - debug_event!( - si, - "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", - u64::from_be_bytes(packet_header.id), - verbs::name(verb), - verb as u32 - ); - - if match verb { - verbs::VL1_NOP => true, - verbs::VL1_HELLO => { - self.handle_incoming_hello(si, ph, node, time_ticks, message_id, source_path, packet_header.hops(), &payload) - .await - } - verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, &payload).await, - verbs::VL1_OK => { - self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, &payload) - .await - } - verbs::VL1_WHOIS => self.handle_incoming_whois(si, ph, node, time_ticks, message_id, &payload).await, - verbs::VL1_RENDEZVOUS => { - self.handle_incoming_rendezvous(si, node, time_ticks, message_id, source_path, &payload) - .await - } - verbs::VL1_ECHO => self.handle_incoming_echo(si, ph, node, time_ticks, message_id, &payload).await, - verbs::VL1_PUSH_DIRECT_PATHS => { - self.handle_incoming_push_direct_paths(si, node, time_ticks, source_path, &payload) - .await - } - verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(si, node, time_ticks, source_path, &payload).await, - _ => ph.handle_packet(self, &source_path, verb, &payload).await, - } { - return true; - } - } - } - return false; - } - - async fn handle_incoming_hello( - &self, - si: &SI, - ph: &PH, - node: &Node, - time_ticks: i64, - message_id: MessageId, - source_path: &Arc>, - hops: u8, - payload: &PacketBuffer, - ) -> bool { - if !(ph.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { - debug_event!( - si, - "[vl1] dropping HELLO from {} due to lack of trust relationship", - self.identity.address.to_string() - ); - return true; // packet wasn't invalid, just ignored - } - - let mut cursor = 0; - if let Ok(hello_fixed_headers) = payload.read_struct::(&mut cursor) { - if let Ok(identity) = Identity::read_bytes(&mut BufferReader::new(payload, &mut cursor)) { - if identity.eq(&self.identity) { - { - let mut remote_node_info = self.remote_node_info.write(); - remote_node_info.remote_protocol_version = hello_fixed_headers.version_proto; - remote_node_info.remote_version = ( - hello_fixed_headers.version_major, - hello_fixed_headers.version_minor, - u16::from_be_bytes(hello_fixed_headers.version_revision), - ); - } - - let mut packet = PacketBuffer::new(); - packet.set_size(v1::HEADER_SIZE); - { - let f: &mut ( - v1::message_component_structs::OkHeader, - v1::message_component_structs::OkHelloFixedHeaderFields, - ) = packet.append_struct_get_mut().unwrap(); - f.0.verb = verbs::VL1_OK; - f.0.in_re_verb = verbs::VL1_HELLO; - f.0.in_re_message_id = message_id.to_ne_bytes(); - f.1.timestamp_echo = hello_fixed_headers.timestamp; - f.1.version_proto = PROTOCOL_VERSION; - f.1.version_major = VERSION_MAJOR; - f.1.version_minor = VERSION_MINOR; - f.1.version_revision = VERSION_REVISION.to_be_bytes(); - } - - return self.send(si, Some(source_path), node, time_ticks, &mut packet).await; - } - } - } - return false; - } - - async fn handle_incoming_error( - &self, - _si: &SI, - ph: &PH, - _node: &Node, - _time_ticks: i64, - source_path: &Arc>, - payload: &PacketBuffer, - ) -> bool { - let mut cursor = 0; - if let Ok(error_header) = payload.read_struct::(&mut cursor) { - let in_re_message_id: MessageId = u64::from_ne_bytes(error_header.in_re_message_id); - if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { - match error_header.in_re_verb { - _ => { - return ph - .handle_error( - self, - &source_path, - error_header.in_re_verb, - in_re_message_id, - error_header.error_code, - payload, - &mut cursor, - ) - .await; - } - } - } - } - return false; - } - - async fn handle_incoming_ok( - &self, - si: &SI, - ph: &PH, - node: &Node, - time_ticks: i64, - source_path: &Arc>, - hops: u8, - path_is_known: bool, - payload: &PacketBuffer, - ) -> bool { - let mut cursor = 0; - if let Ok(ok_header) = payload.read_struct::(&mut cursor) { - let in_re_message_id: MessageId = u64::from_ne_bytes(ok_header.in_re_message_id); - if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { - match ok_header.in_re_verb { - verbs::VL1_HELLO => { - if let Ok(ok_hello_fixed_header_fields) = - payload.read_struct::(&mut cursor) - { - if hops == 0 { - if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) { - #[cfg(debug_assertions)] - let reported_endpoint2 = reported_endpoint.clone(); - if self - .remote_node_info - .write() - .reported_local_endpoints - .insert(reported_endpoint, time_ticks) - .is_none() - { - #[cfg(debug_assertions)] - debug_event!( - si, - "[vl1] {} reported new remote perspective, local endpoint: {}", - self.identity.address.to_string(), - reported_endpoint2.to_string() - ); - } - } - } - - if hops == 0 && !path_is_known { - self.learn_path(si, source_path, time_ticks); - } - - self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed); - } - } - - verbs::VL1_WHOIS => { - if node.is_peer_root(self) { - while cursor < payload.len() { - if let Ok(_whois_response) = Identity::read_bytes(&mut BufferReader::new(payload, &mut cursor)) { - // TODO - } else { - break; - } - } - } else { - return true; // not invalid, just ignored - } - } - - _ => { - return ph - .handle_ok(self, &source_path, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor) - .await; - } - } - } - } - return false; - } - - async fn handle_incoming_whois( - &self, - si: &SI, - ph: &PH, - node: &Node, - time_ticks: i64, - message_id: MessageId, - payload: &PacketBuffer, - ) -> bool { - if node.this_node_is_root() || ph.should_communicate_with(&self.identity) { - let mut packet = PacketBuffer::new(); - packet.set_size(v1::HEADER_SIZE); - { - let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap(); - f.verb = verbs::VL1_OK; - f.in_re_verb = verbs::VL1_WHOIS; - f.in_re_message_id = message_id.to_ne_bytes(); - } - - let mut cursor = 0; - while cursor < payload.len() { - if let Ok(zt_address) = Address::unmarshal(payload, &mut cursor) { - if let Some(peer) = node.peer(zt_address) { - if !packet.append_bytes((&peer.identity.to_public_bytes()).into()).is_ok() { - debug_event!(si, "unexpected error serializing an identity into a WHOIS packet response"); - return false; - } - } - } - } - - self.send(si, None, node, time_ticks, &mut packet).await - } else { - true // packet wasn't invalid, just ignored - } - } - - #[allow(unused)] - async fn handle_incoming_rendezvous( - &self, - si: &SI, - node: &Node, - time_ticks: i64, - message_id: MessageId, - source_path: &Arc>, - payload: &PacketBuffer, - ) -> bool { - if node.is_peer_root(self) {} - return true; - } - - async fn handle_incoming_echo( - &self, - si: &SI, - ph: &PH, - node: &Node, - time_ticks: i64, - message_id: MessageId, - payload: &PacketBuffer, - ) -> bool { - if ph.should_communicate_with(&self.identity) || node.is_peer_root(self) { - let mut packet = PacketBuffer::new(); - packet.set_size(v1::HEADER_SIZE); - { - let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap(); - f.verb = verbs::VL1_OK; - f.in_re_verb = verbs::VL1_ECHO; - f.in_re_message_id = message_id.to_ne_bytes(); - } - if packet.append_bytes(payload.as_bytes()).is_ok() { - self.send(si, None, node, time_ticks, &mut packet).await - } else { - false - } - } else { - debug_event!( - si, - "[vl1] dropping ECHO from {} due to lack of trust relationship", - self.identity.address.to_string() - ); - true // packet wasn't invalid, just ignored - } - } - - #[allow(unused)] - async fn handle_incoming_push_direct_paths( - &self, - si: &SI, - node: &Node, - time_ticks: i64, - source_path: &Arc>, - payload: &PacketBuffer, - ) -> bool { - false - } - - #[allow(unused)] - async fn handle_incoming_user_message( - &self, - si: &SI, - node: &Node, - time_ticks: i64, - source_path: &Arc>, - payload: &PacketBuffer, - ) -> bool { - false - } -} - -impl Hash for Peer { - #[inline(always)] - fn hash(&self, state: &mut H) { - state.write_u64(self.identity.address.into()); - } -} - -impl PartialEq for Peer { - #[inline(always)] - fn eq(&self, other: &Self) -> bool { - self.identity.fingerprint.eq(&other.identity.fingerprint) - } -} - -impl Eq for Peer {} diff --git a/network-hypervisor/src/vl1/whoisqueue.rs b/network-hypervisor/src/vl1/whoisqueue.rs index 940ae6f03..f3100539e 100644 --- a/network-hypervisor/src/vl1/whoisqueue.rs +++ b/network-hypervisor/src/vl1/whoisqueue.rs @@ -6,7 +6,7 @@ use parking_lot::Mutex; use crate::util::gate::IntervalGate; use crate::vl1::fragmentedpacket::FragmentedPacket; -use crate::vl1::node::{Node, SystemInterface}; +use crate::vl1::node::{HostSystem, Node}; use crate::vl1::protocol::{PooledPacketBuffer, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_INTERVAL, WHOIS_RETRY_MAX}; use crate::vl1::Address; @@ -31,7 +31,7 @@ impl WhoisQueue { } /// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received. - pub fn query(&self, node: &Node, si: &SI, target: Address, packet: Option) { + pub fn query(&self, node: &Node, si: &SI, target: Address, packet: Option) { let mut q = self.0.lock(); let qi = q.entry(target).or_insert_with(|| WhoisQueueItem { @@ -60,11 +60,11 @@ impl WhoisQueue { } #[allow(unused)] - fn send_whois(&self, node: &Node, si: &SI, targets: &[Address]) { + fn send_whois(&self, node: &Node, si: &SI, targets: &[Address]) { todo!() } - pub(crate) fn service(&self, si: &SI, node: &Node, time_ticks: i64) { + pub(crate) fn service(&self, si: &SI, node: &Node, time_ticks: i64) { let mut targets: Vec
= Vec::new(); self.0.lock().retain(|target, qi| { if qi.retry_count < WHOIS_RETRY_MAX { diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index 45fe5ac7a..0723633d2 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -2,7 +2,7 @@ use async_trait::async_trait; -use crate::vl1::node::{InnerProtocolInterface, SystemInterface}; +use crate::vl1::node::{HostSystem, InnerProtocol}; use crate::vl1::protocol::*; use crate::vl1::{Identity, Path, Peer}; @@ -11,17 +11,23 @@ pub trait SwitchInterface: Sync + Send {} pub struct Switch {} #[async_trait] -impl InnerProtocolInterface for Switch { +impl InnerProtocol for Switch { #[allow(unused)] - async fn handle_packet(&self, peer: &Peer, source_path: &Path, verb: u8, payload: &PacketBuffer) -> bool { + async fn handle_packet( + &self, + peer: &Peer, + source_path: &Path, + verb: u8, + payload: &PacketBuffer, + ) -> bool { false } #[allow(unused)] - async fn handle_error( + async fn handle_error( &self, - peer: &Peer, - source_path: &Path, + peer: &Peer, + source_path: &Path, in_re_verb: u8, in_re_message_id: u64, error_code: u8, @@ -32,10 +38,10 @@ impl InnerProtocolInterface for Switch { } #[allow(unused)] - async fn handle_ok( + async fn handle_ok( &self, - peer: &Peer, - source_path: &Path, + peer: &Peer, + source_path: &Path, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, @@ -50,8 +56,4 @@ impl InnerProtocolInterface for Switch { } } -impl Switch { - pub async fn new() -> Self { - Self {} - } -} +impl Switch {} diff --git a/system-service/Cargo.toml b/service/Cargo.toml similarity index 93% rename from system-service/Cargo.toml rename to service/Cargo.toml index 766d9faec..63e43128a 100644 --- a/system-service/Cargo.toml +++ b/service/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "zerotier-system-service" +name = "zerotier-service" version = "0.1.0" authors = ["ZeroTier, Inc. ", "Adam Ierymenko "] edition = "2021" @@ -13,8 +13,8 @@ path = "src/main.rs" zerotier-network-hypervisor = { path = "../network-hypervisor" } zerotier-crypto = { path = "../crypto" } zerotier-utils = { path = "../utils" } +zerotier-vl1-service = { path = "../vl1-service" } async-trait = "^0" -num-traits = "^0" tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false } serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } diff --git a/system-service/rustfmt.toml b/service/rustfmt.toml similarity index 100% rename from system-service/rustfmt.toml rename to service/rustfmt.toml diff --git a/system-service/src/cli/mod.rs b/service/src/cli/mod.rs similarity index 100% rename from system-service/src/cli/mod.rs rename to service/src/cli/mod.rs diff --git a/system-service/src/cli/rootset.rs b/service/src/cli/rootset.rs similarity index 100% rename from system-service/src/cli/rootset.rs rename to service/src/cli/rootset.rs diff --git a/system-service/src/cmdline_help.rs b/service/src/cmdline_help.rs similarity index 100% rename from system-service/src/cmdline_help.rs rename to service/src/cmdline_help.rs diff --git a/system-service/src/datadir.rs b/service/src/datadir.rs similarity index 100% rename from system-service/src/datadir.rs rename to service/src/datadir.rs diff --git a/system-service/src/exitcode.rs b/service/src/exitcode.rs similarity index 100% rename from system-service/src/exitcode.rs rename to service/src/exitcode.rs diff --git a/system-service/src/jsonformatter.rs b/service/src/jsonformatter.rs similarity index 100% rename from system-service/src/jsonformatter.rs rename to service/src/jsonformatter.rs diff --git a/system-service/src/localconfig.rs b/service/src/localconfig.rs similarity index 100% rename from system-service/src/localconfig.rs rename to service/src/localconfig.rs diff --git a/system-service/src/main.rs b/service/src/main.rs similarity index 100% rename from system-service/src/main.rs rename to service/src/main.rs diff --git a/system-service/src/utils.rs b/service/src/utils.rs similarity index 100% rename from system-service/src/utils.rs rename to service/src/utils.rs diff --git a/system-service/src/vnic/common.rs b/service/src/vnic/common.rs similarity index 100% rename from system-service/src/vnic/common.rs rename to service/src/vnic/common.rs diff --git a/system-service/src/vnic/mac_feth_tap.rs b/service/src/vnic/mac_feth_tap.rs similarity index 100% rename from system-service/src/vnic/mac_feth_tap.rs rename to service/src/vnic/mac_feth_tap.rs diff --git a/system-service/src/vnic/mod.rs b/service/src/vnic/mod.rs similarity index 100% rename from system-service/src/vnic/mod.rs rename to service/src/vnic/mod.rs diff --git a/system-service/src/vnic/vnic.rs b/service/src/vnic/vnic.rs similarity index 100% rename from system-service/src/vnic/vnic.rs rename to service/src/vnic/vnic.rs diff --git a/system-service/src/service.rs b/system-service/src/service.rs deleted file mode 100644 index 0c26ce09f..000000000 --- a/system-service/src/service.rs +++ /dev/null @@ -1,312 +0,0 @@ -// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. - -use std::collections::{HashMap, HashSet}; -use std::error::Error; -use std::path::Path; -use std::sync::Arc; - -use async_trait::async_trait; - -use zerotier_network_hypervisor::vl1::*; -use zerotier_network_hypervisor::vl2::*; -use zerotier_network_hypervisor::*; - -use zerotier_crypto::random; -use zerotier_utils::{ms_monotonic, ms_since_epoch}; - -use tokio::time::Duration; - -use crate::datadir::DataDir; -use crate::localinterface::LocalInterface; -use crate::localsocket::LocalSocket; -use crate::udp::*; - -/// Interval between scans of system network interfaces to update port bindings. -const UDP_UPDATE_BINDINGS_INTERVAL_MS: Duration = Duration::from_millis(5000); - -/// ZeroTier system service, which presents virtual networks as VPN connections on Windows/macOS/Linux/BSD/etc. -pub struct Service { - udp_binding_task: tokio::task::JoinHandle<()>, - core_background_service_task: tokio::task::JoinHandle<()>, - internal: Arc, -} - -struct ServiceImpl { - pub rt: tokio::runtime::Handle, - pub data: DataDir, - pub udp_sockets_by_port: tokio::sync::RwLock>, - pub num_listeners_per_socket: usize, - _core: Option>, -} - -impl Drop for Service { - fn drop(&mut self) { - // Kill all background tasks associated with this service. - self.udp_binding_task.abort(); - self.core_background_service_task.abort(); - - // Drop all bound sockets since these can hold circular Arc<> references to 'internal'. - // This shouldn't have to loop much if at all to acquire the lock, but it might if something - // is still completing somewhere in an aborting task. - loop { - if let Ok(mut udp_sockets) = self.internal.udp_sockets_by_port.try_write() { - udp_sockets.clear(); - break; - } - std::thread::sleep(Duration::from_millis(5)); - } - } -} - -impl Service { - /// Start ZeroTier service. - /// - /// This launches a number of background tasks in the async runtime that will run as long as this object exists. - /// When this is dropped these tasks are killed. - pub async fn new>( - rt: tokio::runtime::Handle, - base_path: P, - auto_upgrade_identity: bool, - ) -> Result> { - let mut si = ServiceImpl { - rt, - data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?, - udp_sockets_by_port: tokio::sync::RwLock::new(HashMap::with_capacity(4)), - num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(), - _core: None, - }; - let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity).await?); - let si = Arc::new(si); - - si._core.as_ref().unwrap().add_update_default_root_set_if_none(); - - Ok(Self { - udp_binding_task: si.rt.spawn(si.clone().udp_binding_task_main()), - core_background_service_task: si.rt.spawn(si.clone().core_background_service_task_main()), - internal: si, - }) - } -} - -impl ServiceImpl { - #[inline(always)] - fn core(&self) -> &NetworkHypervisor { - self._core.as_ref().unwrap() - } - - /// Called in udp_binding_task_main() to service a particular UDP port. - async fn update_udp_bindings_for_port( - self: &Arc, - port: u16, - interface_prefix_blacklist: &Vec, - cidr_blacklist: &Vec, - ) -> Option> { - for ns in { - let mut udp_sockets_by_port = self.udp_sockets_by_port.write().await; - let bp = udp_sockets_by_port.entry(port).or_insert_with(|| BoundUdpPort::new(port)); - let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist); - if bp.sockets.is_empty() { - return Some(errors); - } - new_sockets - } - .iter() - { - /* - * Start a task (not actual thread) for each CPU core. - * - * The async runtime is itself multithreaded but each packet takes a little bit of CPU to handle. - * This makes sure that when one packet is in processing the async runtime is immediately able to - * cue up another receiver for this socket. - */ - for _ in 0..self.num_listeners_per_socket { - let self2 = self.clone(); - let socket = ns.socket.clone(); - let interface = ns.interface.clone(); - let local_socket = LocalSocket::new(ns); - ns.socket_associated_tasks.lock().push(self.rt.spawn(async move { - let core = self2.core(); - loop { - let mut buf = core.get_packet_buffer(); - if let Ok((bytes, source)) = socket.recv_from(unsafe { buf.entire_buffer_mut() }).await { - unsafe { buf.set_size_unchecked(bytes) }; - core.handle_incoming_physical_packet( - &self2, - &Endpoint::IpUdp(InetAddress::from(source)), - &local_socket, - &interface, - buf, - ) - .await; - } else { - break; - } - } - })); - } - } - return None; - } - - /// Background task to update per-interface/per-port bindings if system interface configuration changes. - async fn udp_binding_task_main(self: Arc) { - loop { - let config = self.data.config().await; - - if let Some(errors) = self - .update_udp_bindings_for_port( - config.settings.primary_port, - &config.settings.interface_prefix_blacklist, - &config.settings.cidr_blacklist, - ) - .await - { - for e in errors.iter() { - println!("BIND ERROR: {} {} {}", e.0.to_string(), e.1.to_string(), e.2.to_string()); - } - // TODO: report errors properly - } - - tokio::time::sleep(UDP_UPDATE_BINDINGS_INTERVAL_MS).await; - } - } - - /// Periodically calls do_background_tasks() in the ZeroTier core. - async fn core_background_service_task_main(self: Arc) { - tokio::time::sleep(Duration::from_secs(1)).await; - loop { - tokio::time::sleep(self.core().do_background_tasks(&self).await).await; - } - } -} - -#[async_trait] -impl SystemInterface for ServiceImpl { - type LocalSocket = crate::service::LocalSocket; - type LocalInterface = crate::localinterface::LocalInterface; - - fn event(&self, event: Event) { - println!("{}", event.to_string()); - match event { - _ => {} - } - } - - async fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {} - - #[inline(always)] - fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool { - socket.0.strong_count() > 0 - } - - async fn load_node_identity(&self) -> Option { - self.data.load_identity().await.map_or(None, |i| Some(i)) - } - - async fn save_node_identity(&self, id: &Identity) { - assert!(self.data.save_identity(id).await.is_ok()) - } - - async fn wire_send( - &self, - endpoint: &Endpoint, - local_socket: Option<&Self::LocalSocket>, - local_interface: Option<&Self::LocalInterface>, - data: &[&[u8]], - packet_ttl: u8, - ) -> bool { - match endpoint { - Endpoint::IpUdp(address) => { - // This is the fast path -- the socket is known to the core so just send it. - if let Some(s) = local_socket { - if let Some(s) = s.0.upgrade() { - return s.send_sync_nonblock(address, data, packet_ttl); - } else { - return false; - } - } - - let udp_sockets_by_port = self.udp_sockets_by_port.read().await; - if !udp_sockets_by_port.is_empty() { - if let Some(specific_interface) = local_interface { - // Send from a specific interface if that interface is specified. - for (_, p) in udp_sockets_by_port.iter() { - if !p.sockets.is_empty() { - let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); - for _ in 0..p.sockets.len() { - let s = p.sockets.get(i).unwrap(); - if s.interface.eq(specific_interface) { - if s.send_sync_nonblock(address, data, packet_ttl) { - return true; - } - } - i = (i + 1) % p.sockets.len(); - } - } - } - } else { - // Otherwise send from one socket on every interface. - let mut sent_on_interfaces = HashSet::with_capacity(4); - for p in udp_sockets_by_port.values() { - if !p.sockets.is_empty() { - let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); - for _ in 0..p.sockets.len() { - let s = p.sockets.get(i).unwrap(); - if !sent_on_interfaces.contains(&s.interface) { - if s.send_sync_nonblock(address, data, packet_ttl) { - sent_on_interfaces.insert(s.interface.clone()); - } - } - i = (i + 1) % p.sockets.len(); - } - } - } - return !sent_on_interfaces.is_empty(); - } - } - - return false; - } - _ => {} - } - return false; - } - - async fn check_path( - &self, - _id: &Identity, - endpoint: &Endpoint, - _local_socket: Option<&Self::LocalSocket>, - _local_interface: Option<&Self::LocalInterface>, - ) -> bool { - let config = self.data.config().await; - if let Some(pps) = config.physical.get(endpoint) { - !pps.blacklist - } else { - true - } - } - - async fn get_path_hints(&self, id: &Identity) -> Option, Option)>> { - let config = self.data.config().await; - if let Some(vns) = config.virtual_.get(&id.address) { - Some(vns.try_.iter().map(|ep| (ep.clone(), None, None)).collect()) - } else { - None - } - } - - #[inline(always)] - fn time_ticks(&self) -> i64 { - ms_monotonic() - } - - #[inline(always)] - fn time_clock(&self) -> i64 { - ms_since_epoch() - } -} - -impl SwitchInterface for ServiceImpl {} - -impl Interface for ServiceImpl {} diff --git a/utils/src/pool.rs b/utils/src/pool.rs index ff17980ea..6cb85f03f 100644 --- a/utils/src/pool.rs +++ b/utils/src/pool.rs @@ -7,22 +7,11 @@ use std::sync::{Arc, Weak}; use parking_lot::Mutex; /// Each pool requires a factory that creates and resets (for re-use) pooled objects. -pub trait PoolFactory { +pub trait PoolFactory { fn create(&self) -> O; fn reset(&self, obj: &mut O); } -#[repr(C)] -struct PoolEntry> { - obj: O, // must be first - return_pool: Weak>, -} - -struct PoolInner> { - factory: F, - pool: Mutex>>>, -} - /// Container for pooled objects that have been checked out of the pool. /// /// When this is dropped the object is returned to the pool or if the pool or is @@ -34,9 +23,15 @@ struct PoolInner> { /// Note that pooled objects are not clonable. If you want to share them use Rc<> /// or Arc<>. #[repr(transparent)] -pub struct Pooled>(NonNull>); +pub struct Pooled>(NonNull>); -impl> Pooled { +#[repr(C)] +struct PoolEntry> { + obj: O, // must be first + return_pool: Weak>, +} + +impl> Pooled { /// Get a raw pointer to the object wrapped by this pooled object container. /// The returned raw pointer MUST be restored into a Pooled instance with /// from_raw() or memory will leak. @@ -67,10 +62,10 @@ impl> Pooled { } } -unsafe impl> Send for Pooled {} -unsafe impl> Sync for Pooled where O: Sync {} +unsafe impl> Send for Pooled where O: Send {} +unsafe impl> Sync for Pooled where O: Sync {} -impl> Deref for Pooled { +impl> Deref for Pooled { type Target = O; #[inline(always)] @@ -79,29 +74,29 @@ impl> Deref for Pooled { } } -impl> DerefMut for Pooled { +impl> DerefMut for Pooled { #[inline(always)] fn deref_mut(&mut self) -> &mut Self::Target { unsafe { &mut self.0.as_mut().obj } } } -impl> AsRef for Pooled { +impl> AsRef for Pooled { #[inline(always)] fn as_ref(&self) -> &O { unsafe { &self.0.as_ref().obj } } } -impl> AsMut for Pooled { +impl> AsMut for Pooled { #[inline(always)] fn as_mut(&mut self) -> &mut O { unsafe { &mut self.0.as_mut().obj } } } -impl> Drop for Pooled { - #[inline(always)] +impl> Drop for Pooled { + #[inline] fn drop(&mut self) { let internal = unsafe { self.0.as_mut() }; if let Some(p) = internal.return_pool.upgrade() { @@ -116,9 +111,14 @@ impl> Drop for Pooled { /// An object pool for Reusable objects. /// Checked out objects are held by a guard object that returns them when dropped if /// the pool still exists or drops them if the pool has itself been dropped. -pub struct Pool>(Arc>); +pub struct Pool>(Arc>); -impl> Pool { +struct PoolInner> { + factory: F, + pool: Mutex>>>, +} + +impl> Pool { pub fn new(initial_stack_capacity: usize, factory: F) -> Self { Self(Arc::new(PoolInner:: { factory, @@ -127,7 +127,7 @@ impl> Pool { } /// Get a pooled object, or allocate one if the pool is empty. - #[inline(always)] + #[inline] pub fn get(&self) -> Pooled { if let Some(o) = self.0.pool.lock().pop() { return Pooled::(o); @@ -152,7 +152,7 @@ impl> Pool { } } -impl> Drop for Pool { +impl> Drop for Pool { #[inline(always)] fn drop(&mut self) { self.purge(); diff --git a/vl1-service/Cargo.toml b/vl1-service/Cargo.toml new file mode 100644 index 000000000..c1325306a --- /dev/null +++ b/vl1-service/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "zerotier-vl1-service" +version = "0.1.0" +authors = ["ZeroTier, Inc. ", "Adam Ierymenko "] +edition = "2021" +license = "MPL-2.0" + +[dependencies] +zerotier-network-hypervisor = { path = "../network-hypervisor" } +zerotier-crypto = { path = "../crypto" } +zerotier-utils = { path = "../utils" } +async-trait = "^0" +num-traits = "^0" +tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false } +parking_lot = { version = "^0", features = [], default-features = false } + +[target."cfg(windows)".dependencies] +winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] } + +[target."cfg(not(windows))".dependencies] +libc = "^0" diff --git a/vl1-service/rustfmt.toml b/vl1-service/rustfmt.toml new file mode 120000 index 000000000..39f97b043 --- /dev/null +++ b/vl1-service/rustfmt.toml @@ -0,0 +1 @@ +../rustfmt.toml \ No newline at end of file diff --git a/vl1-service/src/lib.rs b/vl1-service/src/lib.rs new file mode 100644 index 000000000..6d97e5415 --- /dev/null +++ b/vl1-service/src/lib.rs @@ -0,0 +1,11 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +mod localinterface; +mod localsocket; +mod vl1service; + +pub mod sys; + +pub use localinterface::LocalInterface; +pub use localsocket::LocalSocket; +pub use vl1service::*; diff --git a/system-service/src/localinterface.rs b/vl1-service/src/localinterface.rs similarity index 100% rename from system-service/src/localinterface.rs rename to vl1-service/src/localinterface.rs diff --git a/system-service/src/localsocket.rs b/vl1-service/src/localsocket.rs similarity index 90% rename from system-service/src/localsocket.rs rename to vl1-service/src/localsocket.rs index 4f3cf911d..4746a3671 100644 --- a/system-service/src/localsocket.rs +++ b/vl1-service/src/localsocket.rs @@ -4,7 +4,7 @@ use std::hash::Hash; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Weak}; -use crate::udp::BoundUdpSocket; +use crate::sys::udp::BoundUdpSocket; static LOCAL_SOCKET_UNIQUE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -22,10 +22,9 @@ impl LocalSocket { Self(Arc::downgrade(s), LOCAL_SOCKET_UNIQUE_ID_COUNTER.fetch_add(1, Ordering::SeqCst)) } - /// Returns true if the wrapped socket appears to be in use by the core. #[inline(always)] - pub fn in_use(&self) -> bool { - self.0.weak_count() > 0 + pub fn is_valid(&self) -> bool { + self.0.strong_count() > 0 } #[inline(always)] diff --git a/system-service/src/getifaddrs.rs b/vl1-service/src/sys/getifaddrs.rs similarity index 93% rename from system-service/src/getifaddrs.rs rename to vl1-service/src/sys/getifaddrs.rs index c9f135580..1e98899d7 100644 --- a/system-service/src/getifaddrs.rs +++ b/vl1-service/src/sys/getifaddrs.rs @@ -89,15 +89,12 @@ pub fn for_each_address(mut f: F) { #[cfg(test)] mod tests { - use crate::localinterface::LocalInterface; - use zerotier_network_hypervisor::vl1::InetAddress; + use super::*; #[test] fn test_getifaddrs() { println!("starting getifaddrs..."); - crate::getifaddrs::for_each_address(|a: &InetAddress, interface: &LocalInterface| { - println!(" {} {}", interface.to_string(), a.to_string()) - }); + for_each_address(|a: &InetAddress, interface: &LocalInterface| println!(" {} {}", interface.to_string(), a.to_string())); println!("done.") } } diff --git a/system-service/src/ipv6.rs b/vl1-service/src/sys/ipv6.rs similarity index 100% rename from system-service/src/ipv6.rs rename to vl1-service/src/sys/ipv6.rs diff --git a/vl1-service/src/sys/mod.rs b/vl1-service/src/sys/mod.rs new file mode 100644 index 000000000..8c5d381ff --- /dev/null +++ b/vl1-service/src/sys/mod.rs @@ -0,0 +1,5 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +pub mod getifaddrs; +pub mod ipv6; +pub mod udp; diff --git a/system-service/src/udp.rs b/vl1-service/src/sys/udp.rs similarity index 78% rename from system-service/src/udp.rs rename to vl1-service/src/sys/udp.rs index 531ae325a..ad3519348 100644 --- a/system-service/src/udp.rs +++ b/vl1-service/src/sys/udp.rs @@ -12,8 +12,6 @@ use std::sync::Arc; #[cfg(unix)] use std::os::unix::io::{FromRawFd, RawFd}; -use crate::getifaddrs; -use crate::ipv6; use crate::localinterface::LocalInterface; #[allow(unused_imports)] @@ -21,6 +19,8 @@ use num_traits::AsPrimitive; use zerotier_network_hypervisor::vl1::inetaddress::*; +use crate::sys::{getifaddrs, ipv6}; + /// A local port to which one or more UDP sockets is bound. /// /// To bind a port we must bind sockets to each interface/IP pair directly. Sockets must @@ -35,18 +35,9 @@ pub struct BoundUdpSocket { pub address: InetAddress, pub socket: Arc, pub interface: LocalInterface, - pub socket_associated_tasks: parking_lot::Mutex>>, fd: RawFd, } -impl Drop for BoundUdpSocket { - fn drop(&mut self) { - for t in self.socket_associated_tasks.lock().drain(..) { - t.abort(); - } - } -} - impl BoundUdpSocket { #[cfg(unix)] #[inline(always)] @@ -64,43 +55,21 @@ impl BoundUdpSocket { } #[cfg(any(target_os = "macos", target_os = "freebsd"))] - pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool { + pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[u8], packet_ttl: u8) -> bool { let mut ok = false; if dest.family() == self.address.family() { if packet_ttl > 0 && dest.is_ipv4() { self.set_ttl(packet_ttl); } unsafe { - if b.len() == 1 { - let bb = *b.get_unchecked(0); - ok = libc::sendto( - self.fd.as_(), - bb.as_ptr().cast(), - bb.len().as_(), - 0, - (dest as *const InetAddress).cast(), - size_of::().as_(), - ) > 0; - } else { - let mut iov: [libc::iovec; 16] = MaybeUninit::uninit().assume_init(); - assert!(b.len() <= iov.len()); - for i in 0..b.len() { - let bb = *b.get_unchecked(i); - let ii = iov.get_unchecked_mut(i); - ii.iov_base = transmute(bb.as_ptr()); - ii.iov_len = bb.len().as_(); - } - let msghdr = libc::msghdr { - msg_name: transmute(dest as *const InetAddress), - msg_namelen: size_of::().as_(), - msg_iov: iov.as_mut_ptr(), - msg_iovlen: b.len().as_(), - msg_control: null_mut(), - msg_controllen: 0, - msg_flags: 0, - }; - ok = libc::sendmsg(self.fd.as_(), &msghdr, 0) > 0; - } + ok = libc::sendto( + self.fd.as_(), + b.as_ptr().cast(), + b.len().as_(), + 0, + (dest as *const InetAddress).cast(), + size_of::().as_(), + ) > 0; } if packet_ttl > 0 && dest.is_ipv4() { self.set_ttl(0xff); @@ -110,32 +79,15 @@ impl BoundUdpSocket { } #[cfg(not(any(target_os = "macos", target_os = "freebsd")))] - pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool { + pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[u8], packet_ttl: u8) -> bool { let mut ok = false; if dest.family() == self.address.family() { - let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() }; - let data = if b.len() == 1 { - *unsafe { b.get_unchecked(0) } - } else { - let mut p = 0; - for bb in b.iter() { - let pp = p + bb.len(); - if pp < 16384 { - tmp[p..pp].copy_from_slice(*bb); - p = pp; - } else { - return false; - } - } - &tmp[..p] - }; - if packet_ttl > 0 && dest.is_ipv4() { self.set_ttl(packet_ttl); - ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok(); + ok = self.socket.try_send_to(b, dest.try_into().unwrap()).is_ok(); self.set_ttl(0xff); } else { - ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok(); + ok = self.socket.try_send_to(b, dest.try_into().unwrap()).is_ok(); } } ok @@ -202,7 +154,6 @@ impl BoundUdpPort { let s = Arc::new(BoundUdpSocket { address: addr_with_port, socket: Arc::new(s.unwrap()), - socket_associated_tasks: parking_lot::Mutex::new(Vec::new()), interface: interface.clone(), fd, }); diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs new file mode 100644 index 000000000..9930772ee --- /dev/null +++ b/vl1-service/src/vl1service.rs @@ -0,0 +1,191 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +use std::collections::{HashMap, HashSet}; +use std::error::Error; +use std::sync::Arc; + +use async_trait::async_trait; + +use zerotier_crypto::random; +use zerotier_network_hypervisor::vl1::{Endpoint, Event, HostSystem, Identity, InnerProtocol, Node, PathFilter, Storage}; +use zerotier_utils::{ms_monotonic, ms_since_epoch}; + +use crate::sys::udp::BoundUdpPort; + +use tokio::task::JoinHandle; +use tokio::time::Duration; + +/// VL1 service that connects to the physical network and hosts an inner protocol like ZeroTier VL2. +/// +/// This is the "outward facing" half of a full ZeroTier stack on a normal system. It binds sockets, +/// talks to the physical network, manages the vl1 node, and presents a templated interface for +/// whatever inner protocol implementation is using it. This would typically be VL2 but could be +/// a test harness or just the controller for a controller that runs stand-alone. +pub struct VL1Service, InnerProtocolImpl: InnerProtocol> { + daemons: parking_lot::Mutex>>, + udp_sockets_by_port: tokio::sync::RwLock>, + storage: Arc, + inner: Arc, + path_filter: Arc, + node_container: Option>, +} + +impl, InnerProtocolImpl: InnerProtocol> + VL1Service +{ + pub async fn new( + storage: Arc, + inner: Arc, + path_filter: Arc, + ) -> Result, Box> { + let mut service = VL1Service { + daemons: parking_lot::Mutex::new(Vec::with_capacity(2)), + udp_sockets_by_port: tokio::sync::RwLock::new(HashMap::with_capacity(8)), + storage, + inner, + path_filter, + node_container: None, + }; + + service + .node_container + .replace(Node::new(&service, &*service.storage, true, false).await?); + + let service = Arc::new(service); + + let mut daemons = service.daemons.lock(); + daemons.push(tokio::spawn(service.clone().udp_bind_daemon())); + daemons.push(tokio::spawn(service.clone().node_background_task_daemon())); + drop(daemons); + + Ok(service) + } + + #[inline(always)] + pub fn node(&self) -> &Node { + debug_assert!(self.node_container.is_some()); + unsafe { self.node_container.as_ref().unwrap_unchecked() } + } + + async fn udp_bind_daemon(self: Arc) {} + + async fn node_background_task_daemon(self: Arc) {} +} + +#[async_trait] +impl, InnerProtocolImpl: InnerProtocol> HostSystem + for VL1Service +{ + type LocalSocket = crate::LocalSocket; + type LocalInterface = crate::LocalInterface; + + fn event(&self, event: Event) { + println!("{}", event.to_string()); + match event { + _ => {} + } + } + + async fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {} + + #[inline(always)] + fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool { + socket.is_valid() + } + + async fn wire_send( + &self, + endpoint: &Endpoint, + local_socket: Option<&Self::LocalSocket>, + local_interface: Option<&Self::LocalInterface>, + data: &[u8], + packet_ttl: u8, + ) -> bool { + match endpoint { + Endpoint::IpUdp(address) => { + // This is the fast path -- the socket is known to the core so just send it. + if let Some(s) = local_socket { + if let Some(s) = s.0.upgrade() { + return s.send_sync_nonblock(address, data, packet_ttl); + } else { + return false; + } + } + + let udp_sockets_by_port = self.udp_sockets_by_port.read().await; + if !udp_sockets_by_port.is_empty() { + if let Some(specific_interface) = local_interface { + // Send from a specific interface if that interface is specified. + for (_, p) in udp_sockets_by_port.iter() { + if !p.sockets.is_empty() { + let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); + for _ in 0..p.sockets.len() { + let s = p.sockets.get(i).unwrap(); + if s.interface.eq(specific_interface) { + if s.send_sync_nonblock(address, data, packet_ttl) { + return true; + } + } + i = (i + 1) % p.sockets.len(); + } + } + } + } else { + // Otherwise send from one socket on every interface. + let mut sent_on_interfaces = HashSet::with_capacity(4); + for p in udp_sockets_by_port.values() { + if !p.sockets.is_empty() { + let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); + for _ in 0..p.sockets.len() { + let s = p.sockets.get(i).unwrap(); + if !sent_on_interfaces.contains(&s.interface) { + if s.send_sync_nonblock(address, data, packet_ttl) { + sent_on_interfaces.insert(s.interface.clone()); + } + } + i = (i + 1) % p.sockets.len(); + } + } + } + return !sent_on_interfaces.is_empty(); + } + } + + return false; + } + _ => {} + } + return false; + } + + #[inline(always)] + fn time_ticks(&self) -> i64 { + ms_monotonic() + } + + #[inline(always)] + fn time_clock(&self) -> i64 { + ms_since_epoch() + } +} + +impl, InnerProtocolImpl: InnerProtocol> Drop + for VL1Service +{ + fn drop(&mut self) { + for d in self.daemons.lock().drain(..) { + d.abort(); + } + + // Drop all bound sockets since these can hold circular Arc<> references to 'internal'. + // This shouldn't have to loop much if at all to acquire the lock, but it might if something + // is still completing somewhere in an aborting task. + loop { + if let Ok(mut udp_sockets) = self.udp_sockets_by_port.try_write() { + udp_sockets.clear(); + break; + } + std::thread::sleep(Duration::from_millis(2)); + } + } +}