diff --git a/zerotier-core-crypto/src/hash.rs b/zerotier-core-crypto/src/hash.rs index e1a6904e6..905d51b42 100644 --- a/zerotier-core-crypto/src/hash.rs +++ b/zerotier-core-crypto/src/hash.rs @@ -51,6 +51,8 @@ impl Write for SHA512 { } } +unsafe impl Send for SHA512 {} + pub struct SHA384(Option); impl SHA384 { @@ -93,6 +95,8 @@ impl Write for SHA384 { } } +unsafe impl Send for SHA384 {} + //#[link(name="crypto")] extern "C" { fn HMAC_CTX_new() -> *mut c_void; diff --git a/zerotier-core-crypto/src/poly1305.rs b/zerotier-core-crypto/src/poly1305.rs index d8fa9c4ab..bf6ca7d8e 100644 --- a/zerotier-core-crypto/src/poly1305.rs +++ b/zerotier-core-crypto/src/poly1305.rs @@ -26,3 +26,5 @@ impl Poly1305 { self.0.finalize().into_bytes().into() } } + +unsafe impl Send for Poly1305 {} diff --git a/zerotier-network-hypervisor/Cargo.toml b/zerotier-network-hypervisor/Cargo.toml index cdc0032ff..763a3badd 100644 --- a/zerotier-network-hypervisor/Cargo.toml +++ b/zerotier-network-hypervisor/Cargo.toml @@ -17,9 +17,9 @@ debug_events = [] [dependencies] zerotier-core-crypto = { path = "../zerotier-core-crypto" } +async-trait = "^0" base64 = "^0" lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] } -dashmap = "^5" parking_lot = "^0" lazy_static = "^1" serde = { version = "^1", features = ["derive"], default-features = false } diff --git a/zerotier-network-hypervisor/src/lib.rs b/zerotier-network-hypervisor/src/lib.rs index b1b350577..95a7d3dde 100644 --- a/zerotier-network-hypervisor/src/lib.rs +++ b/zerotier-network-hypervisor/src/lib.rs @@ -15,3 +15,5 @@ mod networkhypervisor; pub use event::Event; pub use networkhypervisor::{Interface, NetworkHypervisor}; pub use vl1::protocol::{PacketBuffer, PooledPacketBuffer}; + +pub use async_trait::async_trait; diff --git a/zerotier-network-hypervisor/src/networkhypervisor.rs b/zerotier-network-hypervisor/src/networkhypervisor.rs index 4758ce8a9..3f7b40bfa 100644 --- a/zerotier-network-hypervisor/src/networkhypervisor.rs +++ b/zerotier-network-hypervisor/src/networkhypervisor.rs @@ -2,6 +2,8 @@ use std::time::Duration; +use async_trait::async_trait; + use crate::error::InvalidParameterError; use crate::util::buffer::Buffer; use crate::util::marshalable::Marshalable; @@ -10,6 +12,7 @@ use crate::vl1::protocol::PooledPacketBuffer; use crate::vl1::*; use crate::vl2::switch::*; +#[async_trait] pub trait Interface: SystemInterface + SwitchInterface {} pub struct NetworkHypervisor { @@ -18,10 +21,10 @@ pub struct NetworkHypervisor { } impl NetworkHypervisor { - pub fn new(ii: &I, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { + pub async fn new(ii: &I, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { Ok(NetworkHypervisor { - vl1: Node::new(ii, auto_generate_identity, auto_upgrade_identity)?, - vl2: Switch::new(), + vl1: Node::new(ii, auto_generate_identity, auto_upgrade_identity).await?, + vl2: Switch::new().await, }) } @@ -45,13 +48,14 @@ impl NetworkHypervisor { /// This shouldn't be called concurrently by more than one loop. Doing so would be harmless /// but would be a waste of compute cycles. #[inline(always)] - pub fn do_background_tasks(&self, ii: &I) -> Duration { - self.vl1.do_background_tasks(ii) + pub async fn do_background_tasks(&self, ii: &I) -> Duration { + self.vl1.do_background_tasks(ii).await } + /// Process a physical packet received over a network interface. #[inline(always)] - pub fn handle_incoming_physical_packet(&self, ii: &I, source_endpoint: &Endpoint, source_local_socket: &I::LocalSocket, source_local_interface: &I::LocalInterface, data: PooledPacketBuffer) { - self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) + pub async fn handle_incoming_physical_packet(&self, ii: &I, source_endpoint: &Endpoint, source_local_socket: &I::LocalSocket, source_local_interface: &I::LocalInterface, data: PooledPacketBuffer) { + self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data).await } /// Add or update a root set. diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index afea5641f..50ed13e78 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -14,7 +14,7 @@ pub(crate) const ZEROES: [u8; 64] = [0_u8; 64]; #[allow(unused_macros)] macro_rules! debug_event { ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => { - $si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))); + $si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))).await; } } diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index c08b53347..1e5bd11a5 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -1,18 +1,18 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::hash::Hash; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use dashmap::DashMap; +use async_trait::async_trait; use parking_lot::{Mutex, RwLock}; use crate::error::InvalidParameterError; use crate::util::debug_event; use crate::util::gate::IntervalGate; -use crate::vl1::path::Path; +use crate::vl1::path::{Path, PathServiceResult}; use crate::vl1::peer::Peer; use crate::vl1::protocol::*; use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue}; @@ -23,6 +23,7 @@ use crate::Event; /// /// These methods are basically callbacks that the core calls to request or transmit things. They are called /// during calls to things like wire_recieve() and do_background_tasks(). +#[async_trait] pub trait SystemInterface: Sync + Send + 'static { /// Type for local system sockets. type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; @@ -31,10 +32,10 @@ pub trait SystemInterface: Sync + Send + 'static { type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; /// An event occurred. - fn event(&self, event: Event); + async fn event(&self, event: Event); /// A USER_MESSAGE packet was received. - fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]); + async fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]); /// Check a local socket for validity. /// @@ -43,10 +44,10 @@ pub trait SystemInterface: Sync + Send + 'static { fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool; /// Load this node's identity from the data store. - fn load_node_identity(&self) -> Option; + async fn load_node_identity(&self) -> Option; /// Save this node's identity. - fn save_node_identity(&self, id: &Identity); + async fn save_node_identity(&self, id: &Identity); /// Called to send a packet over the physical network (virtual -> physical). /// @@ -61,13 +62,13 @@ pub trait SystemInterface: Sync + Send + 'static { /// For endpoint types that support a packet TTL, the implementation may set the TTL /// if the 'ttl' parameter is not zero. If the parameter is zero or TTL setting is not /// supported, the default TTL should be used. - fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool; + async fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool; /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. - fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool; + async fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool; /// Called to look up any statically defined or memorized paths to known nodes. - fn get_path_hints(&self, id: &Identity) -> Option, Option)>>; + async fn get_path_hints(&self, id: &Identity) -> Option, Option)>>; /// Called to get the current time in milliseconds from the system monotonically increasing clock. /// This needs to be accurate to about 250 milliseconds resolution or better. @@ -82,19 +83,31 @@ pub trait SystemInterface: Sync + Send + 'static { /// /// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but /// it could also be implemented for testing or "off label" use of VL1 to carry different protocols. +#[async_trait] pub trait InnerProtocolInterface: Sync + Send + 'static { /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). /// The return values of these must follow the same semantic of returning true if the message /// was handled. - fn handle_packet(&self, peer: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &PacketBuffer) -> bool; + async fn handle_packet(&self, source: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &PacketBuffer) -> bool; /// Handle errors, returning true if the error was recognized. - fn handle_error(&self, peer: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, error_code: u8, payload: &PacketBuffer, cursor: &mut usize) -> bool; + async fn handle_error( + &self, + source: &Peer, + source_path: &Path, + forward_secrecy: bool, + extended_authentication: bool, + in_re_verb: u8, + in_re_message_id: u64, + error_code: u8, + payload: &PacketBuffer, + cursor: &mut usize, + ) -> bool; /// Handle an OK, returing true if the OK was recognized. - fn handle_ok(&self, peer: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize) -> bool; + async fn handle_ok(&self, source: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize) -> bool; /// Check if this remote peer has a trust relationship with this node. /// @@ -108,11 +121,11 @@ const ROOT_SYNC_INTERVAL_MS: i64 = 1000; #[derive(Default)] struct BackgroundTaskIntervals { - whois: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>, - paths: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>, - peers: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>, root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>, + peers: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>, + paths: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>, + whois: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>, } struct RootInfo { @@ -121,6 +134,57 @@ struct RootInfo { sets_modified: bool, } +enum PathKey<'a, SI: SystemInterface> { + Copied(Endpoint, SI::LocalSocket), + Ref(&'a Endpoint, &'a SI::LocalSocket), +} + +impl<'a, SI: SystemInterface> Hash for PathKey<'a, SI> { + fn hash(&self, state: &mut H) { + match self { + Self::Copied(ep, ls) => { + ep.hash(state); + ls.hash(state); + } + Self::Ref(ep, ls) => { + (*ep).hash(state); + (*ls).hash(state); + } + } + } +} + +impl<'a, SI: SystemInterface> PartialEq for PathKey<'_, SI> { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2), + (Self::Copied(ep1, ls1), Self::Ref(ep2, ls2)) => ep1.eq(*ep2) && ls1.eq(*ls2), + (Self::Ref(ep1, ls1), Self::Copied(ep2, ls2)) => (*ep1).eq(ep2) && (*ls1).eq(ls2), + (Self::Ref(ep1, ls1), Self::Ref(ep2, ls2)) => (*ep1).eq(*ep2) && (*ls1).eq(*ls2), + } + } +} + +impl<'a, SI: SystemInterface> Eq for PathKey<'_, SI> {} + +impl<'a, SI: SystemInterface> PathKey<'a, SI> { + #[inline(always)] + fn local_socket(&self) -> &SI::LocalSocket { + match self { + Self::Copied(_, ls) => ls, + Self::Ref(_, ls) => *ls, + } + } + + #[inline(always)] + fn to_copied(&self) -> PathKey<'static, SI> { + match self { + Self::Copied(ep, ls) => PathKey::<'static, SI>::Copied(ep.clone(), ls.clone()), + Self::Ref(ep, ls) => PathKey::<'static, SI>::Copied((*ep).clone(), (*ls).clone()), + } + } +} + /// A VL1 global P2P network node. pub struct Node { /// A random ID generated to identify this particular running instance. @@ -133,13 +197,13 @@ pub struct Node { intervals: Mutex, /// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use. - paths: DashMap>>>>, + paths: parking_lot::RwLock, Arc>>>, /// Peers with which we are currently communicating. - peers: DashMap>>, + peers: parking_lot::RwLock>>>, /// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions. - roots: Mutex>, + roots: RwLock>, /// Current best root. best_root: RwLock>>>, @@ -152,17 +216,16 @@ pub struct Node { } impl Node { - /// Create a new Node. - pub fn new(si: &SI, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { + pub async fn new(si: &SI, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { let mut id = { - let id = si.load_node_identity(); + let id = si.load_node_identity().await; if id.is_none() { if !auto_generate_identity { return Err(InvalidParameterError("no identity found and auto-generate not enabled")); } else { let id = Identity::generate(); - si.event(Event::IdentityAutoGenerated(id.clone())); - si.save_node_identity(&id); + si.event(Event::IdentityAutoGenerated(id.clone())).await; + si.save_node_identity(&id).await; id } } else { @@ -173,8 +236,8 @@ impl Node { if auto_upgrade_identity { let old = id.clone(); if id.upgrade()? { - si.save_node_identity(&id); - si.event(Event::IdentityAutoUpgraded(old, id.clone())); + si.save_node_identity(&id).await; + si.event(Event::IdentityAutoUpgraded(old, id.clone())).await; } } @@ -182,9 +245,9 @@ impl Node { instance_id: zerotier_core_crypto::random::get_bytes_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), - paths: DashMap::new(), - peers: DashMap::new(), - roots: Mutex::new(RootInfo { + paths: parking_lot::RwLock::new(HashMap::new()), + peers: parking_lot::RwLock::new(HashMap::new()), + roots: RwLock::new(RootInfo { roots: HashMap::new(), sets: HashMap::new(), sets_modified: false, @@ -201,139 +264,177 @@ impl Node { } pub fn peer(&self, a: Address) -> Option>> { - self.peers.get(&a).map(|peer| peer.value().clone()) + self.peers.read().get(&a).cloned() } - pub fn do_background_tasks(&self, si: &SI) -> Duration { - let mut intervals = self.intervals.lock(); + pub async fn do_background_tasks(&self, si: &SI) -> Duration { let tt = si.time_ticks(); + let (root_sync, root_hello, peer_check, path_check, whois_check) = { + let mut intervals = self.intervals.lock(); + (intervals.root_sync.gate(tt), intervals.root_hello.gate(tt), intervals.peers.gate(tt), intervals.paths.gate(tt), intervals.whois.gate(tt)) + }; - assert!(ROOT_SYNC_INTERVAL_MS <= (ROOT_HELLO_INTERVAL / 2)); - if intervals.root_sync.gate(tt) { - match &mut (*self.roots.lock()) { - RootInfo { roots, sets, sets_modified } => { - // Update internal data structures if the root set configuration has changed. - if *sets_modified { - *sets_modified = false; - debug_event!(si, "root sets modified, synchronizing internal data structures"); + if root_sync { + if { + let mut roots = self.roots.write(); + if roots.sets_modified { + roots.sets_modified = false; + true + } else { + false + } + } { + let (mut old_root_identities, address_collisions, new_roots, bad_identities) = { + let roots = self.roots.read(); - let mut old_root_identities: Vec = roots.drain().map(|r| r.0.identity.clone()).collect(); - let mut new_root_identities = Vec::new(); + let old_root_identities: Vec = roots.roots.iter().map(|(p, _)| p.identity.clone()).collect(); + let mut new_roots = HashMap::new(); + let mut bad_identities = Vec::new(); - let mut colliding_root_addresses = Vec::new(); // see security note below - for (_, rc) in sets.iter() { + // This is a sanity check to make sure we don't have root sets that contain roots with the same address + // but a different identity. If we do, the offending address is blacklisted. This would indicate something + // weird and possibly nasty happening with whomever is making your root set definitions. + let mut address_collisions = Vec::new(); + { + let mut address_collision_check = HashMap::with_capacity(roots.sets.len() * 8); + for (_, rc) in roots.sets.iter() { for m in rc.members.iter() { - if m.endpoints.is_some() && !colliding_root_addresses.contains(&m.identity.address) { - /* - * SECURITY NOTE: it should be impossible to get an address/identity collision here unless - * the user adds a maliciously crafted root set with an identity that collides another. Under - * normal circumstances the root backplane combined with the address PoW should rule this - * out. However since we trust roots as identity lookup authorities it's important to take - * extra care to check for this case. If it's detected, all roots with the offending - * address are ignored/disabled. - * - * The apparently over-thought functional chain here on peers.entry() is to make access to - * the peer map atomic since we use a "lock-free" data structure here (DashMap). - */ - - let _ = self - .peers - .entry(m.identity.address) - .or_try_insert_with(|| Peer::::new(&self.identity, m.identity.clone(), si.time_clock()).map_or(Err(crate::error::UnexpectedError), |new_root| Ok(Arc::new(new_root)))) - .and_then(|root_peer_entry| { - let rp = root_peer_entry.value(); - if rp.identity.eq(&m.identity) { - Ok(root_peer_entry) - } else { - colliding_root_addresses.push(m.identity.address); - si.event(Event::SecurityWarning(format!( - "address/identity collision between root {} (from root cluster definition '{}') and known peer {}, ignoring this root!", - m.identity.address.to_string(), - rc.name, - rp.identity.to_string() - ))); - Err(crate::error::UnexpectedError) - } - }) - .map(|r| { - new_root_identities.push(r.value().identity.clone()); - roots.insert(r.value().clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); - }); + if self.peers.read().get(&m.identity.address).map_or(false, |p| !p.identity.eq(&m.identity)) || address_collision_check.insert(m.identity.address, &m.identity).map_or(false, |old_id| !old_id.eq(&m.identity)) { + address_collisions.push(m.identity.address); } } } - - old_root_identities.sort_unstable(); - new_root_identities.sort_unstable(); - if !old_root_identities.eq(&new_root_identities) { - si.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); - } } - // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint - // they have, which is a behavior that differs from normal peers. This allows roots to - // e.g. see our IPv4 and our IPv6 address which can be important for us to learn our - // external addresses from them. - if intervals.root_hello.gate(tt) { - for (root, endpoints) in roots.iter() { - for ep in endpoints.iter() { - debug_event!(si, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL); - root.send_hello(si, self, Some(ep)); + for (_, rc) in roots.sets.iter() { + for m in rc.members.iter() { + if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) { + let peers = self.peers.upgradable_read(); + if let Some(peer) = peers.get(&m.identity.address) { + new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); + } else { + if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), si.time_clock()) { + new_roots.insert(parking_lot::RwLockUpgradableReadGuard::upgrade(peers).entry(m.identity.address).or_insert_with(|| Arc::new(peer)).clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); + } else { + bad_identities.push(m.identity.clone()); + } + } } } } - // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison - // this is a proxy for latency and also causes roots that fail to reply to drop out quickly. - let mut latest_hello_reply = 0; - let mut best: Option<&Arc>> = None; - for (r, _) in roots.iter() { - let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed); - if t >= latest_hello_reply { - latest_hello_reply = t; - let _ = best.insert(r); + (old_root_identities, address_collisions, new_roots, bad_identities) + }; + + for c in address_collisions.iter() { + si.event(Event::SecurityWarning(format!("address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string()))).await; + } + for i in bad_identities.iter() { + si.event(Event::SecurityWarning(format!("bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string()))).await; + } + + let mut new_root_identities: Vec = new_roots.iter().map(|(p, _)| p.identity.clone()).collect(); + + old_root_identities.sort_unstable(); + new_root_identities.sort_unstable(); + if !old_root_identities.eq(&new_root_identities) { + let mut best: Option>> = None; + + { + let mut roots = self.roots.write(); + roots.roots = new_roots; + + // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison + // this is a proxy for latency and also causes roots that fail to reply to drop out quickly. + let mut latest_hello_reply = 0; + for (r, _) in roots.roots.iter() { + let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed); + if t >= latest_hello_reply { + latest_hello_reply = t; + let _ = best.insert(r.clone()); + } } } - debug_event!(si, "new best root: {}", best.clone().map_or("none".into(), |p| p.identity.address.to_string())); - *(self.best_root.write()) = best.cloned(); + + *(self.best_root.write()) = best; + + //debug_event!(si, "new best root: {}", best.as_ref().map_or("none".into(), |p| p.identity.address.to_string())); + //si.event(Event::UpdatedRoots(old_root_identities, new_root_identities)).await; } } } - if intervals.peers.gate(tt) { + // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint + // they have, which is a behavior that differs from normal peers. This allows roots to + // e.g. see our IPv4 and our IPv6 address which can be important for us to learn our + // external addresses from them. + if root_hello { + let roots = { + let roots = self.roots.read(); + let mut roots_copy = Vec::with_capacity(roots.roots.len()); + for (root, endpoints) in roots.roots.iter() { + roots_copy.push((root.clone(), endpoints.clone())); + } + roots_copy + }; + for (root, endpoints) in roots.iter() { + for ep in endpoints.iter() { + debug_event!(si, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL); + root.send_hello(si, self, Some(ep)).await; + } + } + } + + if peer_check { // Service all peers, removing any whose service() method returns false AND that are not // roots. Roots on the other hand remain in the peer list as long as they are roots. - self.peers.retain(|_, peer| if peer.service(si, self, tt) { true } else { !self.roots.lock().roots.contains_key(peer) }); - } - - if intervals.paths.gate(tt) { - // Service all paths, removing expired or invalid ones. This is done in two passes to - // avoid introducing latency into a flow. - self.paths.retain(|_, pbs| { - let mut expired_paths = Vec::new(); - for (ls, path) in pbs.read().iter() { - if !si.local_socket_is_valid(ls) || !path.service(si, self, tt) { - expired_paths.push(Arc::as_ptr(path)); + let mut dead_peers = Vec::new(); + { + let roots = self.roots.read(); + for (a, peer) in self.peers.read().iter() { + if !peer.service(si, self, tt) && !roots.roots.contains_key(peer) { + dead_peers.push(*a); } } - if expired_paths.is_empty() { - true - } else { - let mut pbs_w = pbs.write(); - pbs_w.retain(|_, path| !expired_paths.contains(&Arc::as_ptr(path))); - !pbs_w.is_empty() - } - }) + } + for dp in dead_peers.iter() { + self.peers.write().remove(dp); + } } - if intervals.whois.gate(tt) { + if path_check { + // Service all paths, removing expired or invalid ones. This is done in two passes to + // avoid introducing latency into a flow. + let mut dead_paths = Vec::new(); + let mut need_keepalive = Vec::new(); + for (k, path) in self.paths.read().iter() { + if si.local_socket_is_valid(k.local_socket()) { + match path.service(tt) { + PathServiceResult::Ok => {} + PathServiceResult::Dead => dead_paths.push(k.to_copied()), + PathServiceResult::NeedsKeepalive => need_keepalive.push(path.clone()), + } + } else { + dead_paths.push(k.to_copied()); + } + } + for dp in dead_paths.iter() { + self.paths.write().remove(dp); + } + let z = [&crate::util::ZEROES[..1]]; + for ka in need_keepalive.iter() { + si.wire_send(&ka.endpoint, Some(&ka.local_socket), Some(&ka.local_interface), &z, 0).await; + } + } + + if whois_check { self.whois.service(si, self, tt); } Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2) } - pub fn handle_incoming_physical_packet(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) { + pub async fn handle_incoming_physical_packet(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) { debug_event!( si, "<< #{} ->{} from {} length {} via socket {}@{}", @@ -359,12 +460,10 @@ impl Node { if let Some(frag0) = assembled_packet.frags[0].as_ref() { debug_event!(si, "-- #{:0>16x} packet fully assembled!", u64::from_be_bytes(fragment_header.id)); - let packet_header = frag0.struct_at::(0); - if packet_header.is_ok() { - let packet_header = packet_header.unwrap(); + if let Ok(packet_header) = frag0.struct_at::(0) { if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.receive(self, si, ph, time_ticks, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]); + peer.receive(self, si, ph, time_ticks, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]).await; } else { self.whois.query(self, si, source, Some(QueuedPacket::Fragmented(assembled_packet))); } @@ -378,7 +477,7 @@ impl Node { if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.receive(self, si, ph, time_ticks, &path, &packet_header, data.as_ref(), &[]); + peer.receive(self, si, ph, time_ticks, &path, &packet_header, data.as_ref(), &[]).await; } else { self.whois.query(self, si, source, Some(QueuedPacket::Unfragmented(data))); } @@ -406,7 +505,7 @@ impl Node { if let Some(peer) = self.peer(dest) { // TODO: SHOULD we forward? Need a way to check. - peer.forward(si, time_ticks, data.as_ref()); + peer.forward(si, time_ticks, data.as_ref()).await; debug_event!(si, "-- #{:0>16x} forwarded successfully", u64::from_be_bytes(fragment_header.id)); } } @@ -419,11 +518,11 @@ impl Node { } pub fn is_peer_root(&self, peer: &Peer) -> bool { - self.roots.lock().roots.contains_key(peer) + self.roots.read().roots.contains_key(peer) } pub fn add_update_root_set(&self, rs: RootSet) -> bool { - let mut roots = self.roots.lock(); + let mut roots = self.roots.write(); if let Some(entry) = roots.sets.get_mut(&rs.name) { if rs.should_replace(entry) { *entry = rs; @@ -439,30 +538,18 @@ impl Node { } pub fn has_roots_defined(&self) -> bool { - self.roots.lock().sets.iter().any(|rs| !rs.1.members.is_empty()) + self.roots.read().sets.iter().any(|rs| !rs.1.members.is_empty()) } pub fn root_sets(&self) -> Vec { - self.roots.lock().sets.values().cloned().collect() + self.roots.read().sets.values().cloned().collect() } pub fn canonical_path(&self, ep: &Endpoint, local_socket: &SI::LocalSocket, local_interface: &SI::LocalInterface, time_ticks: i64) -> Arc> { - // It's faster to do a read only lookup first since most of the time this will succeed. The second - // version below this only gets invoked if it's a new path. - if let Some(path) = self.paths.get(ep) { - if let Some(path) = path.value().read().get(local_socket) { - return path.clone(); - } + if let Some(path) = self.paths.read().get(&PathKey::Ref(ep, local_socket)) { + path.clone() + } else { + self.paths.write().entry(PathKey::Copied(ep.clone(), local_socket.clone())).or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks))).clone() } - - return self - .paths - .entry(ep.clone()) - .or_insert_with(|| parking_lot::RwLock::new(HashMap::with_capacity(4))) - .value_mut() - .write() - .entry(local_socket.clone()) - .or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks))) - .clone(); } } diff --git a/zerotier-network-hypervisor/src/vl1/path.rs b/zerotier-network-hypervisor/src/vl1/path.rs index 70fc78761..6f2d022a4 100644 --- a/zerotier-network-hypervisor/src/vl1/path.rs +++ b/zerotier-network-hypervisor/src/vl1/path.rs @@ -2,11 +2,11 @@ use std::collections::HashMap; use std::hash::{BuildHasher, Hasher}; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; +use lazy_static::lazy_static; use parking_lot::Mutex; -use crate::util::*; use crate::vl1::endpoint::Endpoint; use crate::vl1::fragmentedpacket::FragmentedPacket; use crate::vl1::node::*; @@ -14,6 +14,16 @@ use crate::vl1::protocol::*; pub(crate) const SERVICE_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL; +pub(crate) enum PathServiceResult { + Ok, + Dead, + NeedsKeepalive, +} + +lazy_static! { + static ref INSTANCE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); +} + /// A remote endpoint paired with a local socket and a local interface. /// These are maintained in Node and canonicalized so that all unique paths have /// one and only one unique path object. That enables statistics to be tracked @@ -22,6 +32,7 @@ pub struct Path { pub endpoint: Endpoint, pub local_socket: SI::LocalSocket, pub local_interface: SI::LocalInterface, + pub(crate) internal_instance_id: usize, // arbitrary local ID that should be globally unique to a given path object instance last_send_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64, create_time_ticks: i64, @@ -34,6 +45,7 @@ impl Path { endpoint, local_socket, local_interface, + internal_instance_id: INSTANCE_ID_COUNTER.fetch_add(1, Ordering::SeqCst), last_send_time_ticks: AtomicI64::new(0), last_receive_time_ticks: AtomicI64::new(0), create_time_ticks: time_ticks, @@ -78,18 +90,19 @@ impl Path { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); } - pub(crate) fn service(&self, si: &SI, _: &Node, time_ticks: i64) -> bool { + pub(crate) fn service(&self, time_ticks: i64) -> PathServiceResult { self.fragmented_packets.lock().retain(|_, frag| (time_ticks - frag.ts_ticks) < packet_constants::FRAGMENT_EXPIRATION); if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PATH_EXPIRATION_TIME { if (time_ticks - self.last_send_time_ticks.load(Ordering::Relaxed)) >= PATH_KEEPALIVE_INTERVAL { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); - si.wire_send(&self.endpoint, Some(&self.local_socket), Some(&self.local_interface), &[&ZEROES[..1]], 0); + PathServiceResult::NeedsKeepalive + } else { + PathServiceResult::Ok } - true } else if (time_ticks - self.create_time_ticks) < PATH_EXPIRATION_TIME { - true + PathServiceResult::Ok } else { - false + PathServiceResult::Dead } } } diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index d7ea97f42..72f8b974b 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -15,6 +15,7 @@ use zerotier_core_crypto::salsa::Salsa; use zerotier_core_crypto::secret::Secret; use crate::util::byte_array_range; +use crate::util::debug_event; use crate::util::marshalable::Marshalable; use crate::vl1::node::*; use crate::vl1::protocol::*; @@ -26,6 +27,7 @@ pub(crate) const SERVICE_INTERVAL_MS: i64 = security_constants::EPHEMERAL_SECRET struct PeerPath { path: Weak>, + path_internal_instance_id: usize, last_receive_time_ticks: i64, } @@ -205,7 +207,7 @@ impl Peer { /// /// If the packet comes in multiple fragments, the fragments slice should contain all /// those fragments after the main packet header and first chunk. - pub(crate) fn receive(&self, node: &Node, si: &SI, vi: &VI, time_ticks: i64, source_path: &Arc>, header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option]) { + pub(crate) async fn receive(&self, node: &Node, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc>, header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option]) { if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) { let mut payload = unsafe { PacketBuffer::new_without_memzero() }; @@ -268,9 +270,8 @@ impl Peer { } } - let source_path_ptr = Arc::as_ptr(source_path); for p in self.paths.lock().iter_mut() { - if Weak::as_ptr(&p.path) == source_path_ptr { + if p.path_internal_instance_id == source_path.internal_instance_id { p.last_receive_time_ticks = time_ticks; break; } @@ -281,17 +282,17 @@ impl Peer { // because the most performance critical path is the handling of the ???_FRAME // verbs, which are in VL2. verb &= packet_constants::VERB_MASK; // mask off flags - if !vi.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) { + if !ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await { match verb { //VERB_VL1_NOP => {} - verbs::VL1_HELLO => self.receive_hello(si, node, time_ticks, source_path, &payload), - verbs::VL1_ERROR => self.receive_error(si, vi, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload), - verbs::VL1_OK => self.receive_ok(si, vi, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload), - verbs::VL1_WHOIS => self.receive_whois(si, node, time_ticks, source_path, &payload), - verbs::VL1_RENDEZVOUS => self.receive_rendezvous(si, node, time_ticks, source_path, &payload), - verbs::VL1_ECHO => self.receive_echo(si, node, time_ticks, source_path, &payload), - verbs::VL1_PUSH_DIRECT_PATHS => self.receive_push_direct_paths(si, node, time_ticks, source_path, &payload), - verbs::VL1_USER_MESSAGE => self.receive_user_message(si, node, time_ticks, source_path, &payload), + verbs::VL1_HELLO => self.receive_hello(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_ERROR => self.receive_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await, + verbs::VL1_OK => self.receive_ok(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await, + verbs::VL1_WHOIS => self.receive_whois(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_RENDEZVOUS => self.receive_rendezvous(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_ECHO => self.receive_echo(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_PUSH_DIRECT_PATHS => self.receive_push_direct_paths(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_USER_MESSAGE => self.receive_user_message(si, node, time_ticks, source_path, &payload).await, _ => {} } } @@ -299,13 +300,13 @@ impl Peer { } } - fn send_to_endpoint(&self, si: &SI, endpoint: &Endpoint, local_socket: Option<&SI::LocalSocket>, local_interface: Option<&SI::LocalInterface>, packet: &PacketBuffer) -> bool { + async fn send_to_endpoint(&self, si: &SI, endpoint: &Endpoint, local_socket: Option<&SI::LocalSocket>, local_interface: Option<&SI::LocalInterface>, packet: &PacketBuffer) -> bool { match endpoint { Endpoint::Ip(_) | Endpoint::IpUdp(_) | Endpoint::Ethernet(_) | Endpoint::Bluetooth(_) | Endpoint::WifiDirect(_) => { let packet_size = packet.len(); if packet_size > UDP_DEFAULT_MTU { let bytes = packet.as_bytes(); - if !si.wire_send(endpoint, local_socket, local_interface, &[&bytes[0..UDP_DEFAULT_MTU]], 0) { + if !si.wire_send(endpoint, local_socket, local_interface, &[&bytes[0..UDP_DEFAULT_MTU]], 0).await { return false; } @@ -327,7 +328,7 @@ impl Peer { loop { header.total_and_fragment_no += 1; let next_pos = pos + chunk_size; - if !si.wire_send(endpoint, local_socket, local_interface, &[header.as_bytes(), &bytes[pos..next_pos]], 0) { + if !si.wire_send(endpoint, local_socket, local_interface, &[header.as_bytes(), &bytes[pos..next_pos]], 0).await { return false; } pos = next_pos; @@ -338,11 +339,11 @@ impl Peer { } } } else { - return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0); + return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0).await; } } _ => { - return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0); + return si.wire_send(endpoint, local_socket, local_interface, &[packet.as_bytes()], 0).await; } } } @@ -351,9 +352,9 @@ impl Peer { /// /// This will go directly if there is an active path, or otherwise indirectly /// via a root or some other route. - pub(crate) fn send(&self, si: &SI, node: &Node, time_ticks: i64, packet: &PacketBuffer) -> bool { + pub(crate) async fn send(&self, si: &SI, node: &Node, time_ticks: i64, packet: &PacketBuffer) -> bool { if let Some(path) = self.path(node) { - if self.send_to_endpoint(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), packet) { + if self.send_to_endpoint(si, &path.endpoint, Some(&path.local_socket), Some(&path.local_interface), packet).await { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); return true; } @@ -368,9 +369,9 @@ impl Peer { /// /// This doesn't fragment large packets since fragments are forwarded individually. /// Intermediates don't need to adjust fragmentation. - pub(crate) fn forward(&self, si: &SI, time_ticks: i64, packet: &PacketBuffer) -> bool { + pub(crate) async fn forward(&self, si: &SI, time_ticks: i64, packet: &PacketBuffer) -> bool { if let Some(path) = self.direct_path() { - if si.wire_send(&path.endpoint, Some(&path.local_socket), Some(&path.local_interface), &[packet.as_bytes()], 0) { + if si.wire_send(&path.endpoint, Some(&path.local_socket), Some(&path.local_interface), &[packet.as_bytes()], 0).await { self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); return true; } @@ -385,110 +386,109 @@ impl Peer { /// /// Unlike other messages HELLO is sent partially in the clear and always with the long-lived /// static identity key. - pub(crate) fn send_hello(&self, si: &SI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { + pub(crate) async fn send_hello(&self, si: &SI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { let mut path = None; - let destination = explicit_endpoint.map_or_else( - || { - self.path(node).map_or(None, |p| { - let _ = path.insert(p.clone()); - Some(p.endpoint.clone()) - }) - }, - |endpoint| Some(endpoint.clone()), - ); - if destination.is_none() { - return false; - } - let destination = destination.unwrap(); + let destination = if let Some(explicit_endpoint) = explicit_endpoint { + explicit_endpoint.clone() + } else { + if let Some(p) = self.path(node) { + let _ = path.insert(p); + path.as_ref().unwrap().endpoint.clone() + } else { + return false; + } + }; - let mut packet = PacketBuffer::new(); let time_ticks = si.time_ticks(); - let message_id = self.next_message_id(); - + let mut packet = PacketBuffer::new(); { - let packet_header: &mut PacketHeader = packet.append_struct_get_mut().unwrap(); - packet_header.id = message_id.to_ne_bytes(); // packet ID and message ID are the same when Poly1305 MAC is used - packet_header.dest = self.identity.address.to_bytes(); - packet_header.src = node.identity.address.to_bytes(); - packet_header.flags_cipher_hops = security_constants::CIPHER_NOCRYPT_POLY1305; + let message_id = self.next_message_id(); + + { + let packet_header: &mut PacketHeader = packet.append_struct_get_mut().unwrap(); + packet_header.id = message_id.to_ne_bytes(); // packet ID and message ID are the same when Poly1305 MAC is used + packet_header.dest = self.identity.address.to_bytes(); + packet_header.src = node.identity.address.to_bytes(); + packet_header.flags_cipher_hops = security_constants::CIPHER_NOCRYPT_POLY1305; + } + + { + let hello_fixed_headers: &mut message_component_structs::HelloFixedHeaderFields = packet.append_struct_get_mut().unwrap(); + hello_fixed_headers.verb = verbs::VL1_HELLO | packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION; + hello_fixed_headers.version_proto = PROTOCOL_VERSION; + hello_fixed_headers.version_major = VERSION_MAJOR; + hello_fixed_headers.version_minor = VERSION_MINOR; + hello_fixed_headers.version_revision = (VERSION_REVISION as u16).to_be_bytes(); + hello_fixed_headers.timestamp = (time_ticks as u64).to_be_bytes(); + } + + // Full identity of the node establishing the session. + assert!(self.identity.marshal_with_options(&mut packet, Identity::ALGORITHM_ALL, false).is_ok()); + + // 8 reserved bytes, must be zero for compatibility with old nodes. + assert!(packet.append_padding(0, 8).is_ok()); + + // Generate a 12-byte nonce for the private section of HELLO. + let mut nonce = get_bytes_secure::<12>(); + + // LEGACY: create a 16-bit encrypted field that specifies zero "moons." Current nodes ignore this + // and treat it as part of the random AES-CTR nonce, but old versions need it to parse the packet + // correctly. + let mut salsa_iv = message_id.to_ne_bytes(); + salsa_iv[7] &= 0xf8; + Salsa::<12>::new(&self.identity_symmetric_key.key.0[0..32], &salsa_iv).crypt(&[0_u8, 0_u8], &mut nonce[8..10]); + + // Append 12-byte AES-CTR nonce. + assert!(packet.append_bytes_fixed(&nonce).is_ok()); + + // Add session meta-data, which is encrypted using plain AES-CTR. No authentication (AEAD) is needed + // because the whole packet is authenticated. Data in the session is not technically secret in a + // cryptographic sense but we encrypt it for privacy and as a defense in depth. + let mut fields = Dictionary::new(); + fields.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec()); + fields.set_u64(session_metadata::CLOCK, si.time_clock() as u64); + fields.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec()); + let fields = fields.to_bytes(); + assert!(fields.len() <= 0xffff); // sanity check, should be impossible + assert!(packet.append_u16(fields.len() as u16).is_ok()); // prefix with unencrypted size + let private_section_start = packet.len(); + assert!(packet.append_bytes(fields.as_slice()).is_ok()); + let mut aes = AesCtr::new(&self.identity_symmetric_key.hello_private_section_key.as_bytes()[0..32]); + aes.init(&nonce); + aes.crypt_in_place(&mut packet.as_mut()[private_section_start..]); + + // Seal packet with HMAC-SHA512 extended authentication. + let mut hmac = HMACSHA512::new(self.identity_symmetric_key.packet_hmac_key.as_bytes()); + hmac.update(&message_id.to_ne_bytes()); + hmac.update(&packet.as_bytes()[packet_constants::HEADER_SIZE..]); + assert!(packet.append_bytes_fixed(&hmac.finish()).is_ok()); + drop(hmac); + + // Set poly1305 in header, which is the only authentication for old nodes. + let (_, mut poly) = salsa_poly_create(&self.identity_symmetric_key, packet.struct_at::(0).unwrap(), packet.len()); + poly.update(packet.as_bytes_starting_at(packet_constants::HEADER_SIZE).unwrap()); + packet.as_mut()[packet_constants::MAC_FIELD_INDEX..packet_constants::MAC_FIELD_INDEX + 8].copy_from_slice(&poly.finish()[0..8]); + + self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); + + debug_event!(si, "HELLO -> {} @ {} ({} bytes)", self.identity.address.to_string(), destination.to_string(), packet.len()); } - { - let hello_fixed_headers: &mut message_component_structs::HelloFixedHeaderFields = packet.append_struct_get_mut().unwrap(); - hello_fixed_headers.verb = verbs::VL1_HELLO | packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION; - hello_fixed_headers.version_proto = PROTOCOL_VERSION; - hello_fixed_headers.version_major = VERSION_MAJOR; - hello_fixed_headers.version_minor = VERSION_MINOR; - hello_fixed_headers.version_revision = (VERSION_REVISION as u16).to_be_bytes(); - hello_fixed_headers.timestamp = (time_ticks as u64).to_be_bytes(); + if let Some(p) = path { + if self.send_to_endpoint(si, &destination, Some(&p.local_socket), Some(&p.local_interface), &packet).await { + p.log_send_anything(time_ticks); + true + } else { + false + } + } else { + self.send_to_endpoint(si, &destination, None, None, &packet).await } - - // Full identity of the node establishing the session. - assert!(self.identity.marshal_with_options(&mut packet, Identity::ALGORITHM_ALL, false).is_ok()); - - // 8 reserved bytes, must be zero for compatibility with old nodes. - assert!(packet.append_padding(0, 8).is_ok()); - - // Generate a 12-byte nonce for the private section of HELLO. - let mut nonce = get_bytes_secure::<12>(); - - // LEGACY: create a 16-bit encrypted field that specifies zero "moons." Current nodes ignore this - // and treat it as part of the random AES-CTR nonce, but old versions need it to parse the packet - // correctly. - let mut salsa_iv = message_id.to_ne_bytes(); - salsa_iv[7] &= 0xf8; - Salsa::<12>::new(&self.identity_symmetric_key.key.0[0..32], &salsa_iv).crypt(&[0_u8, 0_u8], &mut nonce[8..10]); - - // Append 12-byte AES-CTR nonce. - assert!(packet.append_bytes_fixed(&nonce).is_ok()); - - // Add session meta-data, which is encrypted using plain AES-CTR. No authentication (AEAD) is needed - // because the whole packet is authenticated. Data in the session is not technically secret in a - // cryptographic sense but we encrypt it for privacy and as a defense in depth. - let mut fields = Dictionary::new(); - fields.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec()); - fields.set_u64(session_metadata::CLOCK, si.time_clock() as u64); - fields.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec()); - let fields = fields.to_bytes(); - assert!(fields.len() <= 0xffff); // sanity check, should be impossible - assert!(packet.append_u16(fields.len() as u16).is_ok()); // prefix with unencrypted size - let private_section_start = packet.len(); - assert!(packet.append_bytes(fields.as_slice()).is_ok()); - let mut aes = AesCtr::new(&self.identity_symmetric_key.hello_private_section_key.as_bytes()[0..32]); - aes.init(&nonce); - aes.crypt_in_place(&mut packet.as_mut()[private_section_start..]); - drop(aes); - drop(fields); - - // Seal packet with HMAC-SHA512 extended authentication. - let mut hmac = HMACSHA512::new(self.identity_symmetric_key.packet_hmac_key.as_bytes()); - hmac.update(&message_id.to_ne_bytes()); - hmac.update(&packet.as_bytes()[packet_constants::HEADER_SIZE..]); - assert!(packet.append_bytes_fixed(&hmac.finish()).is_ok()); - - // Set poly1305 in header, which is the only authentication for old nodes. - let (_, mut poly) = salsa_poly_create(&self.identity_symmetric_key, packet.struct_at::(0).unwrap(), packet.len()); - poly.update(packet.as_bytes_starting_at(packet_constants::HEADER_SIZE).unwrap()); - packet.as_mut()[packet_constants::MAC_FIELD_INDEX..packet_constants::MAC_FIELD_INDEX + 8].copy_from_slice(&poly.finish()[0..8]); - - self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); - - path.map_or_else( - || self.send_to_endpoint(si, &destination, None, None, &packet), - |p| { - if self.send_to_endpoint(si, &destination, Some(&p.local_socket), Some(&p.local_interface), &packet) { - p.log_send_anything(time_ticks); - true - } else { - false - } - }, - ) } - fn receive_hello(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} + async fn receive_hello(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} - fn receive_error(&self, si: &SI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) { + async fn receive_error(&self, si: &SI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) { let mut cursor: usize = 1; if let Ok(error_header) = payload.read_struct::(&mut cursor) { let in_re_message_id = u64::from_ne_bytes(error_header.in_re_message_id); @@ -496,14 +496,14 @@ impl Peer { if current_packet_id_counter.wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { match error_header.in_re_verb { _ => { - ph.handle_error(self, source_path, forward_secrecy, extended_authentication, error_header.in_re_verb, in_re_message_id, error_header.error_code, payload, &mut cursor); + ph.handle_error(self, &source_path, forward_secrecy, extended_authentication, error_header.in_re_verb, in_re_message_id, error_header.error_code, payload, &mut cursor).await; } } } } } - fn receive_ok(&self, si: &SI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) { + async fn receive_ok(&self, si: &SI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc>, forward_secrecy: bool, extended_authentication: bool, payload: &PacketBuffer) { let mut cursor: usize = 1; if let Ok(ok_header) = payload.read_struct::(&mut cursor) { let in_re_message_id = u64::from_ne_bytes(ok_header.in_re_message_id); @@ -515,22 +515,22 @@ impl Peer { } verbs::VL1_WHOIS => {} _ => { - ph.handle_ok(self, source_path, forward_secrecy, extended_authentication, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor); + ph.handle_ok(self, &source_path, forward_secrecy, extended_authentication, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor).await; } } } } } - fn receive_whois(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} + async fn receive_whois(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} - fn receive_rendezvous(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} + async fn receive_rendezvous(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} - fn receive_echo(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} + async fn receive_echo(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} - fn receive_push_direct_paths(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} + async fn receive_push_direct_paths(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} - fn receive_user_message(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} + async fn receive_user_message(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} /// Get current best path or None if there are no direct paths to this peer. pub fn direct_path(&self) -> Option>> { diff --git a/zerotier-network-hypervisor/src/vl2/switch.rs b/zerotier-network-hypervisor/src/vl2/switch.rs index 571785a2c..a159b52dc 100644 --- a/zerotier-network-hypervisor/src/vl2/switch.rs +++ b/zerotier-network-hypervisor/src/vl2/switch.rs @@ -1,5 +1,7 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. +use async_trait::async_trait; + use crate::vl1::node::{InnerProtocolInterface, SystemInterface}; use crate::vl1::protocol::*; use crate::vl1::{Identity, Path, Peer}; @@ -8,16 +10,28 @@ pub trait SwitchInterface: Sync + Send {} pub struct Switch {} +#[async_trait] impl InnerProtocolInterface for Switch { - fn handle_packet(&self, peer: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &PacketBuffer) -> bool { + async fn handle_packet(&self, peer: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &PacketBuffer) -> bool { false } - fn handle_error(&self, peer: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, error_code: u8, payload: &PacketBuffer, cursor: &mut usize) -> bool { + async fn handle_error( + &self, + peer: &Peer, + source_path: &Path, + forward_secrecy: bool, + extended_authentication: bool, + in_re_verb: u8, + in_re_message_id: u64, + error_code: u8, + payload: &PacketBuffer, + cursor: &mut usize, + ) -> bool { false } - fn handle_ok(&self, peer: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize) -> bool { + async fn handle_ok(&self, peer: &Peer, source_path: &Path, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize) -> bool { false } @@ -27,7 +41,7 @@ impl InnerProtocolInterface for Switch { } impl Switch { - pub fn new() -> Self { + pub async fn new() -> Self { Self {} } } diff --git a/zerotier-system-service/Cargo.lock b/zerotier-system-service/Cargo.lock index 2549e76b2..0c2c8e4aa 100644 --- a/zerotier-system-service/Cargo.lock +++ b/zerotier-system-service/Cargo.lock @@ -18,6 +18,17 @@ dependencies = [ "memchr", ] +[[package]] +name = "async-trait" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96cf8829f67d2eab0b2dfa42c5d0ef737e0724e4a82b01b3e292456202b19716" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-polyfill" version = "0.1.8" @@ -173,18 +184,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "dashmap" -version = "5.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3495912c9c1ccf2e18976439f4443f3fee0fd61f424ff99fde6a66b15ecb448f" -dependencies = [ - "cfg-if", - "hashbrown 0.12.1", - "lock_api", - "parking_lot_core", -] - [[package]] name = "digest" version = "0.9.0" @@ -278,12 +277,6 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" -[[package]] -name = "hashbrown" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" - [[package]] name = "heapless" version = "0.7.13" @@ -313,7 +306,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a" dependencies = [ "autocfg", - "hashbrown 0.11.2", + "hashbrown", ] [[package]] @@ -1048,8 +1041,8 @@ dependencies = [ name = "zerotier-network-hypervisor" version = "2.0.0" dependencies = [ + "async-trait", "base64", - "dashmap", "lazy_static", "libc", "lz4_flex", diff --git a/zerotier-system-service/src/main.rs b/zerotier-system-service/src/main.rs index a4a753531..09196c0f1 100644 --- a/zerotier-system-service/src/main.rs +++ b/zerotier-system-service/src/main.rs @@ -141,8 +141,14 @@ async fn async_main(flags: Flags, global_args: Box) -> i32 { Some(("leave", cmd_args)) => todo!(), Some(("service", _)) => { drop(global_args); // free unnecessary heap before starting service as we're done with CLI args - assert!(service::Service::new(tokio::runtime::Handle::current(), &flags.base_path, true).await.is_ok()); - exitcode::OK + let svc = service::Service::new(tokio::runtime::Handle::current(), &flags.base_path, true).await; + if svc.is_ok() { + let _ = tokio::signal::ctrl_c().await; + exitcode::OK + } else { + println!("FATAL: error launching service: {}", svc.err().unwrap().to_string()); + exitcode::ERR_IOERR + } } Some(("identity", cmd_args)) => todo!(), Some(("rootset", cmd_args)) => cli::rootset::cmd(flags, cmd_args).await, diff --git a/zerotier-system-service/src/service.rs b/zerotier-system-service/src/service.rs index a7b033954..e05f03b5f 100644 --- a/zerotier-system-service/src/service.rs +++ b/zerotier-system-service/src/service.rs @@ -6,6 +6,7 @@ use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use zerotier_network_hypervisor::async_trait; use zerotier_network_hypervisor::vl1::*; use zerotier_network_hypervisor::vl2::*; use zerotier_network_hypervisor::*; @@ -52,6 +53,10 @@ impl Drop for Service { } impl Service { + /// Start ZeroTier service. + /// + /// This launches a number of background tasks in the async runtime that will run as long as this object exists. + /// When this is dropped these tasks are killed. pub async fn new>(rt: tokio::runtime::Handle, base_path: P, auto_upgrade_identity: bool) -> Result> { let mut si = ServiceImpl { rt, @@ -61,7 +66,7 @@ impl Service { num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(), _core: None, }; - let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity)?); + let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity).await?); let si = Arc::new(si); si._core.as_ref().unwrap().add_update_default_root_set_if_none(); @@ -88,7 +93,7 @@ impl ServiceImpl { if bp.sockets.is_empty() { return Some(errors); } - drop(udp_sockets); // release lock + drop(udp_sockets); for ns in new_sockets.iter() { /* @@ -98,19 +103,18 @@ impl ServiceImpl { * This makes sure that when one packet is in processing the async runtime is immediately able to * cue up another receiver for this socket. */ - let mut socket_associated_tasks = ns.socket_associated_tasks.lock(); for _ in 0..self.num_listeners_per_socket { let self2 = self.clone(); let socket = ns.socket.clone(); let interface = ns.interface.clone(); let local_socket = LocalSocket(Arc::downgrade(ns), self.local_socket_unique_id_counter.fetch_add(1, Ordering::SeqCst)); - socket_associated_tasks.push(self.rt.spawn(async move { + ns.socket_associated_tasks.lock().push(self.rt.spawn(async move { let core = self2.core(); loop { let mut buf = core.get_packet_buffer(); if let Ok((bytes, source)) = socket.recv_from(unsafe { buf.entire_buffer_mut() }).await { unsafe { buf.set_size_unchecked(bytes) }; - core.handle_incoming_physical_packet(&self2, &Endpoint::IpUdp(InetAddress::from(source)), &local_socket, &interface, buf); + core.handle_incoming_physical_packet(&self2, &Endpoint::IpUdp(InetAddress::from(source)), &local_socket, &interface, buf).await; } else { break; } @@ -142,38 +146,39 @@ impl ServiceImpl { async fn core_background_service_task_main(self: Arc) { tokio::time::sleep(Duration::from_secs(1)).await; loop { - tokio::time::sleep(self.core().do_background_tasks(&self)).await; + tokio::time::sleep(self.core().do_background_tasks(&self).await).await; } } } +#[async_trait] impl SystemInterface for ServiceImpl { type LocalSocket = crate::service::LocalSocket; type LocalInterface = crate::localinterface::LocalInterface; - fn event(&self, event: Event) { + async fn event(&self, event: Event) { println!("{}", event.to_string()); match event { _ => {} } } - fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {} + async fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {} #[inline(always)] fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool { socket.0.strong_count() > 0 } - fn load_node_identity(&self) -> Option { - self.rt.block_on(async { self.data.load_identity().await.map_or(None, |i| Some(i)) }) + async fn load_node_identity(&self) -> Option { + self.data.load_identity().await.map_or(None, |i| Some(i)) } - fn save_node_identity(&self, id: &Identity) { - self.rt.block_on(async { assert!(self.data.save_identity(id).await.is_ok()) }); + async fn save_node_identity(&self, id: &Identity) { + assert!(self.data.save_identity(id).await.is_ok()) } - fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool { + async fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool { match endpoint { Endpoint::IpUdp(address) => { // This is the fast path -- the socket is known to the core so just send it. @@ -187,64 +192,58 @@ impl SystemInterface for ServiceImpl { // Otherwise we try to send from one socket on every interface or from the specified interface. // This path only happens when the core is trying new endpoints. The fast path is for most packets. - return self.rt.block_on(async { - let sockets = self.udp_sockets.read().await; - if !sockets.is_empty() { - if let Some(specific_interface) = local_interface { - for (_, p) in sockets.iter() { - for s in p.sockets.iter() { - if s.interface.eq(specific_interface) { - if s.send_async(&self.rt, address, data, packet_ttl).await { - return true; - } + let sockets = self.udp_sockets.read().await; + if !sockets.is_empty() { + if let Some(specific_interface) = local_interface { + for (_, p) in sockets.iter() { + for s in p.sockets.iter() { + if s.interface.eq(specific_interface) { + if s.send_async(&self.rt, address, data, packet_ttl).await { + return true; } } } - } else { - let bound_ports: Vec<&u16> = sockets.keys().collect(); - let mut sent_on_interfaces = HashSet::with_capacity(4); - let rn = random::xorshift64_random() as usize; - for i in 0..bound_ports.len() { - let p = sockets.get(*bound_ports.get(rn.wrapping_add(i) % bound_ports.len()).unwrap()).unwrap(); - for s in p.sockets.iter() { - if !sent_on_interfaces.contains(&s.interface) { - if s.send_async(&self.rt, address, data, packet_ttl).await { - sent_on_interfaces.insert(s.interface.clone()); - } - } - } - } - return !sent_on_interfaces.is_empty(); } + } else { + let bound_ports: Vec<&u16> = sockets.keys().collect(); + let mut sent_on_interfaces = HashSet::with_capacity(4); + let rn = random::xorshift64_random() as usize; + for i in 0..bound_ports.len() { + let p = sockets.get(*bound_ports.get(rn.wrapping_add(i) % bound_ports.len()).unwrap()).unwrap(); + for s in p.sockets.iter() { + if !sent_on_interfaces.contains(&s.interface) { + if s.send_async(&self.rt, address, data, packet_ttl).await { + sent_on_interfaces.insert(s.interface.clone()); + } + } + } + } + return !sent_on_interfaces.is_empty(); } - return false; - }); + } + return false; } _ => {} } return false; } - fn check_path(&self, _id: &Identity, endpoint: &Endpoint, _local_socket: Option<&Self::LocalSocket>, _local_interface: Option<&Self::LocalInterface>) -> bool { - self.rt.block_on(async { - let config = self.data.config().await; - if let Some(pps) = config.physical.get(endpoint) { - !pps.blacklist - } else { - true - } - }) + async fn check_path(&self, _id: &Identity, endpoint: &Endpoint, _local_socket: Option<&Self::LocalSocket>, _local_interface: Option<&Self::LocalInterface>) -> bool { + let config = self.data.config().await; + if let Some(pps) = config.physical.get(endpoint) { + !pps.blacklist + } else { + true + } } - fn get_path_hints(&self, id: &Identity) -> Option, Option)>> { - self.rt.block_on(async { - let config = self.data.config().await; - if let Some(vns) = config.virtual_.get(&id.address) { - Some(vns.try_.iter().map(|ep| (ep.clone(), None, None)).collect()) - } else { - None - } - }) + async fn get_path_hints(&self, id: &Identity) -> Option, Option)>> { + let config = self.data.config().await; + if let Some(vns) = config.virtual_.get(&id.address) { + Some(vns.try_.iter().map(|ep| (ep.clone(), None, None)).collect()) + } else { + None + } } #[inline(always)]