From 99611f8781ace09e147d6560767dd0d83f4867ad Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 14 Jan 2022 17:14:35 -0500 Subject: [PATCH] A bunch of simplification of logic. --- .../src/networkhypervisor.rs | 14 +- zerotier-network-hypervisor/src/util/mod.rs | 3 + .../src/vl1/ephemeral.rs | 45 ++---- .../src/vl1/fragmentedpacket.rs | 21 ++- .../src/vl1/identity.rs | 8 +- zerotier-network-hypervisor/src/vl1/mod.rs | 2 +- zerotier-network-hypervisor/src/vl1/node.rs | 152 +++++++++--------- zerotier-network-hypervisor/src/vl1/path.rs | 7 +- zerotier-network-hypervisor/src/vl1/peer.rs | 102 ++++++------ .../src/vl1/whoisqueue.rs | 8 +- zerotier-network-hypervisor/src/vl2/switch.rs | 2 +- 11 files changed, 180 insertions(+), 184 deletions(-) diff --git a/zerotier-network-hypervisor/src/networkhypervisor.rs b/zerotier-network-hypervisor/src/networkhypervisor.rs index cd98f73bb..3eb1565cc 100644 --- a/zerotier-network-hypervisor/src/networkhypervisor.rs +++ b/zerotier-network-hypervisor/src/networkhypervisor.rs @@ -11,11 +11,11 @@ use std::sync::Arc; use std::time::Duration; use crate::error::InvalidParameterError; -use crate::vl1::{Address, Identity, Endpoint, VL1SystemInterface, Node}; +use crate::vl1::{Address, Identity, Endpoint, SystemInterface, Node}; use crate::vl2::{Switch, SwitchInterface}; use crate::{PacketBuffer, PacketBufferPool}; -pub trait Interface: VL1SystemInterface + SwitchInterface {} +pub trait Interface: SystemInterface + SwitchInterface {} pub struct NetworkHypervisor { vl1: Node, @@ -36,22 +36,18 @@ impl NetworkHypervisor { #[inline(always)] pub fn get_packet_buffer(&self) -> PacketBuffer { self.vl1.get_packet_buffer() } - /// Get a direct reference to the packet buffer pool. #[inline(always)] - pub fn packet_buffer_pool(&self) -> &Arc { self.vl1.packet_buffer_pool() } + pub fn address(&self) -> Address { self.vl1.identity.address } #[inline(always)] - pub fn address(&self) -> Address { self.vl1.address() } - - #[inline(always)] - pub fn identity(&self) -> &Identity { self.vl1.identity() } + pub fn identity(&self) -> &Identity { &self.vl1.identity } pub fn do_background_tasks(&self, ci: &CI) -> Duration { self.vl1.do_background_tasks(ci) } #[inline(always)] - pub fn wire_receive(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: Option, source_local_interface: Option, mut data: PacketBuffer) { + pub fn wire_receive(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: Option, source_local_interface: Option, mut data: PacketBuffer) { self.vl1.wire_receive(ci, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) } } diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index db45fa846..01a3c6a81 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -42,6 +42,9 @@ lazy_static! { #[inline(always)] pub(crate) fn highwayhasher() -> highway::HighwayHasher { highway::HighwayHasher::new(highway::Key(HIGHWAYHASHER_KEY.clone())) } +#[inline(always)] +pub(crate) fn u128_from_2xu64_ne(x: [u64; 2]) -> u128 { unsafe { std::mem::transmute(x) } } + /// Non-cryptographic 64-bit bit mixer for things like local hashing. #[inline(always)] pub(crate) fn hash64_noncrypt(mut x: u64) -> u64 { diff --git a/zerotier-network-hypervisor/src/vl1/ephemeral.rs b/zerotier-network-hypervisor/src/vl1/ephemeral.rs index d07432ce5..0ee7855f5 100644 --- a/zerotier-network-hypervisor/src/vl1/ephemeral.rs +++ b/zerotier-network-hypervisor/src/vl1/ephemeral.rs @@ -32,7 +32,7 @@ pub const ALGORITHM_NISTP521ECDH: u8 = 0x02; pub const ALGORITHM_SIDHP751: u8 = 0x04; pub enum EphemeralKeyAgreementError { - OldPublic, + OutdatedPublic, StateMismatch, InvalidData, NoCompatibleAlgorithms @@ -41,7 +41,7 @@ pub enum EphemeralKeyAgreementError { impl Display for EphemeralKeyAgreementError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - EphemeralKeyAgreementError::OldPublic => f.write_str("old (replayed?) public key data from remote"), + EphemeralKeyAgreementError::OutdatedPublic => f.write_str("old (replayed?) public key data from remote"), EphemeralKeyAgreementError::StateMismatch => f.write_str("ratchet state mismatch"), EphemeralKeyAgreementError::InvalidData => f.write_str("invalid public key data"), EphemeralKeyAgreementError::NoCompatibleAlgorithms => f.write_str("no compatible algorithms in public key data") @@ -110,9 +110,9 @@ impl EphemeralKeyPairSet { let _ = varint::write(&mut b, self.previous_ratchet_count); - if self.state_hmac.is_some() { + if let Some(state_hmac) = self.state_hmac.as_ref() { b.push(EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE_HMAC); - let _ = b.write_all(self.state_hmac.as_ref().unwrap()); + let _ = b.write_all(state_hmac); } else { b.push(0); } @@ -121,7 +121,7 @@ impl EphemeralKeyPairSet { let _ = varint::write(&mut b, C25519_PUBLIC_KEY_SIZE as u64); let _ = b.write_all(&self.c25519.public_bytes()); - let _ = self.sidhp751.as_ref().map(|sidhp751| { + if let Some(sidhp751) = self.sidhp751.as_ref() { b.push(ALGORITHM_SIDHP751); let _ = varint::write(&mut b, (SIDH_P751_PUBLIC_KEY_SIZE + 1) as u64); b.push(sidhp751.role()); @@ -130,7 +130,7 @@ impl EphemeralKeyPairSet { SIDHEphemeralKeyPair::Bob(b, _) => b.to_bytes() }; let _ = b.write_all(&pk); - }); + } // FIPS note: any FIPS compliant ciphers must be last or the exchange will not be FIPS compliant. That's // because we chain/ratchet using KHDF and non-FIPS ciphers are considered "salt" inputs for HKDF from a @@ -152,6 +152,7 @@ impl EphemeralKeyPairSet { /// the ratchet sequence, or rather a key derived from it for this purpose. /// /// Since ephemeral secrets should only be used once, this consumes the object. + #[allow(non_snake_case)] pub fn agree(self, time_ticks: i64, static_secret: &SymmetricSecret, previous_ephemeral_secret: Option<&EphemeralSymmetricSecret>, other_public_bytes: &[u8]) -> Result { let (mut key, mut ratchet_count, mut c25519_ratchet_count, mut sidhp751_ratchet_count, mut nistp521_ratchet_count) = previous_ephemeral_secret.map_or_else(|| { ( @@ -184,7 +185,7 @@ impl EphemeralKeyPairSet { } let other_ratchet_count = other_ratchet_count.unwrap().0; if other_ratchet_count < ratchet_count { - return Err(EphemeralKeyAgreementError::OldPublic); + return Err(EphemeralKeyAgreementError::OutdatedPublic); } else if other_ratchet_count > ratchet_count { return Err(EphemeralKeyAgreementError::StateMismatch); } @@ -204,7 +205,7 @@ impl EphemeralKeyPairSet { other_public_bytes = &other_public_bytes[49..]; } else { if self.state_hmac.is_some() { - return Err(EphemeralKeyAgreementError::OldPublic); + return Err(EphemeralKeyAgreementError::OutdatedPublic); } other_public_bytes = &other_public_bytes[1..]; } @@ -321,38 +322,26 @@ pub(crate) struct EphemeralSymmetricSecret { /// Current ephemeral secret key. pub secret: SymmetricSecret, /// Total number of ratchets that has occurred. - ratchet_count: u64, + pub ratchet_count: u64, /// Time at or after which we should start trying to re-key. - rekey_time: i64, + pub rekey_time: i64, /// Time after which this key is no longer valid. - expire_time: i64, + pub expire_time: i64, /// Number of C25519 agreements so far in ratchet. - c25519_ratchet_count: u64, + pub c25519_ratchet_count: u64, /// Number of SIDH P-751 agreements so far in ratchet. - sidhp751_ratchet_count: u64, + pub sidhp751_ratchet_count: u64, /// Number of NIST P-521 ECDH agreements so far in ratchet. - nistp521_ratchet_count: u64, + pub nistp521_ratchet_count: u64, /// Number of times this secret has been used to encrypt. - encrypt_uses: AtomicU32, + pub encrypt_uses: AtomicU32, /// Number of times this secret has been used to decrypt. - decrypt_uses: AtomicU32, + pub decrypt_uses: AtomicU32, /// True if most recent key exchange was NIST/FIPS compliant. pub fips_compliant_exchange: bool, } impl EphemeralSymmetricSecret { - #[inline(always)] - pub fn use_secret_to_encrypt(&self) -> &SymmetricSecret { - let _ = self.encrypt_uses.fetch_add(1, Ordering::Relaxed); - &self.secret - } - - #[inline(always)] - pub fn use_secret_to_decrypt(&self) -> &SymmetricSecret { - let _ = self.decrypt_uses.fetch_add(1, Ordering::Relaxed); - &self.secret - } - #[inline(always)] pub fn should_rekey(&self, time_ticks: i64) -> bool { time_ticks >= self.rekey_time || self.encrypt_uses.load(Ordering::Relaxed).max(self.decrypt_uses.load(Ordering::Relaxed)) >= EPHEMERAL_SECRET_REKEY_AFTER_USES diff --git a/zerotier-network-hypervisor/src/vl1/fragmentedpacket.rs b/zerotier-network-hypervisor/src/vl1/fragmentedpacket.rs index 91de3a24d..06280bafc 100644 --- a/zerotier-network-hypervisor/src/vl1/fragmentedpacket.rs +++ b/zerotier-network-hypervisor/src/vl1/fragmentedpacket.rs @@ -32,16 +32,21 @@ impl FragmentedPacket { } } - /// Add a fragment to this fragment set and return true if the packet appears complete. + /// Add a fragment to this fragment set and return true if all fragments are present. #[inline(always)] pub fn add_fragment(&mut self, frag: PacketBuffer, no: u8, expecting: u8) -> bool { - if no < PACKET_FRAGMENT_COUNT_MAX as u8 { - if self.frags[no as usize].replace(frag).is_none() { - self.have = self.have.wrapping_add(1); - self.expecting |= expecting; // in valid streams expecting is either 0 or the (same) total - return self.have == self.expecting && self.have < PACKET_FRAGMENT_COUNT_MAX as u8; + self.frags.get_mut(no as usize).map_or(false, |entry| { + // Note that a duplicate fragment just gets silently replaced. This shouldn't happen + // unless a dupe occurred at the network level, in which case this is usually a + // no-op event. There is no security implication since the whole packet gets MAC'd + // after assembly. + if entry.replace(frag).is_none() { + self.have += 1; + self.expecting |= expecting; // expecting is either 0 or the expected total + self.have == self.expecting + } else { + false } - } - false + }) } } diff --git a/zerotier-network-hypervisor/src/vl1/identity.rs b/zerotier-network-hypervisor/src/vl1/identity.rs index df7723316..faa22df2c 100644 --- a/zerotier-network-hypervisor/src/vl1/identity.rs +++ b/zerotier-network-hypervisor/src/vl1/identity.rs @@ -27,7 +27,7 @@ use zerotier_core_crypto::salsa::Salsa; use zerotier_core_crypto::secret::Secret; use crate::error::InvalidFormatError; -use crate::util::{array_range, highwayhasher}; +use crate::util::{array_range, highwayhasher, u128_from_2xu64_ne}; use crate::util::buffer::Buffer; use crate::util::pool::{Pool, Pooled, PoolFactory}; use crate::vl1::Address; @@ -150,7 +150,7 @@ impl Identity { Self { address, - fast_eq_hash: u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() }), + fast_eq_hash: u128_from_2xu64_ne(hh.finalize128()), c25519: c25519_pub, ed25519: ed25519_pub, p521: Some(IdentityP521Public { @@ -471,7 +471,7 @@ impl Identity { Ok(Identity { address, - fast_eq_hash: u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() }), + fast_eq_hash: u128_from_2xu64_ne(hh.finalize128()), c25519: x25519_public.0.clone(), ed25519: x25519_public.1.clone(), p521: if p521_ecdh_ecdsa_public.is_some() { @@ -617,7 +617,7 @@ impl FromStr for Identity { Ok(Identity { address, - fast_eq_hash: u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() }), + fast_eq_hash: u128_from_2xu64_ne(hh.finalize128()), c25519: keys[0].as_slice()[0..32].try_into().unwrap(), ed25519: keys[0].as_slice()[32..64].try_into().unwrap(), p521: if keys[2].is_empty() { diff --git a/zerotier-network-hypervisor/src/vl1/mod.rs b/zerotier-network-hypervisor/src/vl1/mod.rs index 2f478d415..a768c74f8 100644 --- a/zerotier-network-hypervisor/src/vl1/mod.rs +++ b/zerotier-network-hypervisor/src/vl1/mod.rs @@ -31,6 +31,6 @@ pub use dictionary::Dictionary; pub use inetaddress::InetAddress; pub use peer::Peer; pub use path::Path; -pub use node::{Node, VL1SystemInterface}; +pub use node::{Node, SystemInterface}; pub use protocol::{PACKET_SIZE_MAX, PACKET_FRAGMENT_COUNT_MAX}; diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index 03177c92c..7ca25f242 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -8,14 +8,12 @@ use std::num::NonZeroI64; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::Duration; use dashmap::DashMap; use parking_lot::Mutex; -use zerotier_core_crypto::random::{next_u64_secure, SecureRandom}; - use crate::{PacketBuffer, PacketBufferFactory, PacketBufferPool}; use crate::error::InvalidParameterError; use crate::util::buffer::Buffer; @@ -28,7 +26,10 @@ use crate::vl1::protocol::*; use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue}; /// Trait implemented by external code to handle events and provide an interface to the system or application. -pub trait VL1SystemInterface { +/// +/// These methods are basically callbacks that the core calls to request or transmit things. They are called +/// during calls to things like wire_recieve() and do_background_tasks(). +pub trait SystemInterface: Sync + Send { /// Node is up and ready for operation. fn event_node_is_up(&self); @@ -84,21 +85,12 @@ pub trait VL1SystemInterface { fn time_clock(&self) -> i64; } -/// Trait implemented by VL2 to handle messages after they are unwrapped by VL1. +/// Interface between VL1 and higher/inner protocol layers. /// -/// This normally isn't used from outside this crate except for testing or if you want to harness VL1 -/// for some entirely unrelated purpose. -pub trait VL1VirtualInterface { - /// Handle a packet, returning true if it belonged to VL2. - /// - /// If this is a VL2 packet, this must return true. True must be returned even if subsequent - /// logic determines that the VL2 packet is not valid or if it is rejected due to lack of - /// security credentials. - /// - /// That's because VL1 calls this before matching the packet's verb against VL1 verbs. This - /// is done to reduce the number of CPU branches between packet receive and the performance - /// critical handling of virtual network frames. A return value of true here indicates that - /// the packet was handled, and false means it may be a VL1 packet. +/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but +/// it could also be implemented for testing or "off label" use of VL1. +pub trait VL1VirtualInterface: Sync + Send { + /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). /// The return values of these must follow the same semantic of returning true if the message @@ -126,20 +118,34 @@ struct BackgroundTaskIntervals { } pub struct Node { - pub(crate) instance_id: u64, - identity: Identity, + /// A random ID generated to identify this particular running instance. + pub instance_id: u64, + + /// This node's identity and permanent keys. + pub identity: Identity, + + /// Interval latches for periodic background tasks. intervals: Mutex, - paths: DashMap>, + + /// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use. + paths: DashMap>, + + /// Peers with which we are currently communicating. peers: DashMap>, + + /// This node's trusted roots, sorted in descending order of preference. roots: Mutex>>, + + /// Identity lookup queue, also holds packets waiting on a lookup. whois: WhoisQueue, + + /// Reusable network buffer pool. buffer_pool: Arc, - secure_prng: SecureRandom, } impl Node { /// Create a new Node. - pub fn new(ci: &I, auto_generate_identity: bool) -> Result { + pub fn new(ci: &I, auto_generate_identity: bool) -> Result { let id = { let id_str = ci.load_node_identity(); if id_str.is_none() { @@ -162,30 +168,21 @@ impl Node { }; Ok(Self { - instance_id: next_u64_secure(), + instance_id: zerotier_core_crypto::random::next_u64_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), - paths: DashMap::new(), - peers: DashMap::new(), + paths: DashMap::with_capacity(128), + peers: DashMap::with_capacity(128), roots: Mutex::new(Vec::new()), whois: WhoisQueue::new(), buffer_pool: Arc::new(PacketBufferPool::new(64, PacketBufferFactory::new())), - secure_prng: SecureRandom::get(), }) } + /// Get a packet buffer that will automatically check itself back into the pool on drop. #[inline(always)] pub fn get_packet_buffer(&self) -> PacketBuffer { self.buffer_pool.get() } - #[inline(always)] - pub fn packet_buffer_pool(&self) -> &Arc { &self.buffer_pool } - - #[inline(always)] - pub fn address(&self) -> Address { self.identity.address } - - #[inline(always)] - pub fn identity(&self) -> &Identity { &self.identity } - /// Get a peer by address. pub fn peer(&self, a: Address) -> Option> { self.peers.get(&a).map(|peer| peer.value().clone()) } @@ -200,19 +197,19 @@ impl Node { } /// Run background tasks and return desired delay until next call in milliseconds. - pub fn do_background_tasks(&self, ci: &I) -> Duration { + /// + /// This should only be called periodically from a single thread, but that thread can be + /// different each time. Calling it concurrently won't crash but won't accomplish anything. + pub fn do_background_tasks(&self, ci: &I) -> Duration { let mut intervals = self.intervals.lock(); let tt = ci.time_ticks(); - if intervals.whois.gate(tt) { - self.whois.call_every_interval(self, ci, tt); - } - if intervals.paths.gate(tt) { self.paths.retain(|_, path| { - path.call_every_interval(ci, tt); - todo!(); - true + path.upgrade().map_or(false, |p| { + p.call_every_interval(ci, tt); + true + }) }); } @@ -224,18 +221,18 @@ impl Node { }); } - Duration::from_millis(1000) + if intervals.whois.gate(tt) { + self.whois.call_every_interval(self, ci, tt); + } + + Duration::from_millis(WhoisQueue::INTERVAL.min(Path::CALL_EVERY_INTERVAL_MS).min(Peer::CALL_EVERY_INTERVAL_MS) as u64 / 4) } /// Called when a packet is received on the physical wire. - pub fn wire_receive(&self, ci: &I, ph: &PH, source_endpoint: &Endpoint, source_local_socket: Option, source_local_interface: Option, mut data: PacketBuffer) { - let fragment_header = data.struct_mut_at::(0); - if fragment_header.is_ok() { - let fragment_header = fragment_header.unwrap(); - let dest = Address::from_bytes(&fragment_header.dest); - if dest.is_some() { + pub fn wire_receive(&self, ci: &I, ph: &PH, source_endpoint: &Endpoint, source_local_socket: Option, source_local_interface: Option, mut data: PacketBuffer) { + if let Ok(fragment_header) = data.struct_mut_at::(0) { + if let Some(dest) = Address::from_bytes(&fragment_header.dest) { let time_ticks = ci.time_ticks(); - let dest = dest.unwrap(); if dest == self.identity.address { // Handle packets addressed to this node. @@ -244,37 +241,28 @@ impl Node { if fragment_header.is_fragment() { - let _ = path.receive_fragment(u64::from_ne_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks).map(|assembled_packet| { - if assembled_packet.frags[0].is_some() { - let frag0 = assembled_packet.frags[0].as_ref().unwrap(); + if let Some(assembled_packet) = path.receive_fragment(u64::from_ne_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) { + if let Some(frag0) = assembled_packet.frags[0].as_ref() { let packet_header = frag0.struct_at::(0); if packet_header.is_ok() { let packet_header = packet_header.unwrap(); - let source = Address::from_bytes(&packet_header.src); - if source.is_some() { - let source = source.unwrap(); - let peer = self.peer(source); - if peer.is_some() { - peer.unwrap().receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]); + if let Some(source) = Address::from_bytes(&packet_header.src) { + if let Some(peer) = self.peer(source) { + peer.receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]); } else { self.whois.query(self, ci, source, Some(QueuedPacket::Fragmented(assembled_packet))); } } } } - }); + } } else { - let packet_header = data.struct_at::(0); - if packet_header.is_ok() { - let packet_header = packet_header.unwrap(); - let source = Address::from_bytes(&packet_header.src); - if source.is_some() { - let source = source.unwrap(); - let peer = self.peer(source); - if peer.is_some() { - peer.unwrap().receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, data.as_ref(), &[]); + if let Ok(packet_header) = data.struct_at::(0) { + if let Some(source) = Address::from_bytes(&packet_header.src) { + if let Some(peer) = self.peer(source) { + peer.receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, data.as_ref(), &[]); } else { self.whois.query(self, ci, source, Some(QueuedPacket::Unfragmented(data))); } @@ -292,24 +280,24 @@ impl Node { return; } } else { - let packet_header = data.struct_mut_at::(0); - if packet_header.is_ok() { - if packet_header.unwrap().increment_hops() > FORWARD_MAX_HOPS { + if let Ok(packet_header) = data.struct_mut_at::(0) { + if packet_header.increment_hops() > FORWARD_MAX_HOPS { return; } } else { return; } } - let _ = self.peer(dest).map(|peer| peer.forward(ci, time_ticks, data.as_ref())); - + if let Some(peer) = self.peer(dest) { + peer.forward(ci, time_ticks, data.as_ref()); + } } }; } } /// Get the current best root peer that we should use for WHOIS, relaying, etc. - pub fn root(&self) -> Option> { self.roots.lock().first().map(|p| p.clone()) } + pub fn root(&self) -> Option> { self.roots.lock().first().cloned() } /// Return true if a peer is a root. pub fn is_peer_root(&self, peer: &Peer) -> bool { self.roots.lock().iter().any(|p| Arc::as_ptr(p) == (peer as *const Peer)) } @@ -320,10 +308,14 @@ impl Node { /// of endpoint, local socket, and local interface. pub fn path(&self, ep: &Endpoint, local_socket: Option, local_interface: Option) -> Arc { let key = Path::local_lookup_key(ep, local_socket, local_interface); - self.paths.get(&key).map_or_else(|| { + let mut path_entry = self.paths.entry(key).or_insert_with(|| Weak::new()); + if let Some(path) = path_entry.value().upgrade() { + path + } else { let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface)); - self.paths.insert(key, p.clone()).unwrap_or(p) // if another thread added one, return that instead - }, |path| path.value().clone()) + *path_entry.value_mut() = Arc::downgrade(&p); + p + } } } diff --git a/zerotier-network-hypervisor/src/vl1/path.rs b/zerotier-network-hypervisor/src/vl1/path.rs index f8937777c..8e5a5b700 100644 --- a/zerotier-network-hypervisor/src/vl1/path.rs +++ b/zerotier-network-hypervisor/src/vl1/path.rs @@ -22,7 +22,7 @@ use crate::PacketBuffer; use crate::util::{array_range, highwayhasher, U64NoOpHasher}; use crate::vl1::Endpoint; use crate::vl1::fragmentedpacket::FragmentedPacket; -use crate::vl1::node::VL1SystemInterface; +use crate::vl1::node::SystemInterface; use crate::vl1::protocol::*; /// Keepalive interval for paths in milliseconds. @@ -129,7 +129,6 @@ impl Path { } } - // This is optimized for the fragmented case because that's the most common when transferring data. if fp.entry(packet_id).or_insert_with(|| FragmentedPacket::new(time_ticks)).add_fragment(packet, fragment_no, fragment_expecting_count) { fp.remove(&packet_id) } else { @@ -173,7 +172,7 @@ impl Path { pub(crate) const CALL_EVERY_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL; #[inline(always)] - pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) { - self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION); + pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) { + self.fragmented_packets.lock().retain(|_, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION); } } diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index 1133dbfe4..cd47eb5e9 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -13,6 +13,7 @@ use std::num::NonZeroI64; use std::sync::Arc; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicU8, Ordering}; +use arc_swap::ArcSwapOption; use parking_lot::Mutex; use zerotier_core_crypto::hash::{SHA384, SHA384_HASH_SIZE}; @@ -24,7 +25,7 @@ use zerotier_core_crypto::secret::Secret; use crate::{PacketBuffer, VERSION_MAJOR, VERSION_MINOR, VERSION_PROTO, VERSION_REVISION}; use crate::util::{array_range, u64_as_bytes}; use crate::util::buffer::Buffer; -use crate::vl1::{Endpoint, Identity, InetAddress, Path}; +use crate::vl1::{Endpoint, Identity, InetAddress, Path, ephemeral}; use crate::vl1::ephemeral::EphemeralSymmetricSecret; use crate::vl1::identity::{IDENTITY_ALGORITHM_ALL, IDENTITY_ALGORITHM_X25519}; use crate::vl1::node::*; @@ -41,8 +42,8 @@ pub struct Peer { // Static shared secret computed from agreement with identity. static_secret: SymmetricSecret, - // Latest ephemeral secret acknowledged with OK(HELLO). - ephemeral_secret: Mutex>>, + // Latest ephemeral secret or None if not yet negotiated. + ephemeral_secret: ArcSwapOption, // Paths sorted in descending order of quality / preference. paths: Mutex>>, @@ -100,8 +101,8 @@ fn salsa_poly_create(secret: &SymmetricSecret, header: &PacketHeader, packet_siz } /// Attempt AEAD packet encryption and MAC validation. -fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option], payload: &mut Buffer, message_id: &mut u64) -> bool { - packet_frag0_payload_bytes.get(0).map_or(false, |verb| { +fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option], payload: &mut Buffer) -> Option { + packet_frag0_payload_bytes.get(0).map_or(None, |verb| { match header.cipher() { CIPHER_NOCRYPT_POLY1305 => { if (verb & VERB_MASK) == VERB_VL1_HELLO { @@ -116,14 +117,13 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], let (_, mut poly) = salsa_poly_create(secret, header, total_packet_len); poly.update(payload.as_bytes()); if poly.finish()[0..8].eq(&header.mac) { - *message_id = u64::from_ne_bytes(header.id); - true + Some(u64::from_ne_bytes(header.id)) } else { - false + None } } else { // Only HELLO is permitted without payload encryption. Drop other packet types if sent this way. - false + None } } @@ -142,10 +142,9 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], })); } if poly.finish()[0..8].eq(&header.mac) { - *message_id = u64::from_ne_bytes(header.id); - true + Some(u64::from_ne_bytes(header.id)) } else { - false + None } } @@ -163,16 +162,15 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], }) }); } - aes.decrypt_finish().map_or(false, |tag| { + aes.decrypt_finish().map_or(None, |tag| { // AES-GMAC-SIV encrypts the packet ID too as part of its computation of a single // opaque 128-bit tag, so to get the original packet ID we have to grab it from the // decrypted tag. - *message_id = u64::from_ne_bytes(*array_range::(tag)); - true + Some(u64::from_ne_bytes(*array_range::(tag))) }) } - _ => false, + _ => None, } }) } @@ -182,11 +180,11 @@ impl Peer { /// This only returns None if this_node_identity does not have its secrets or if some /// fatal error occurs performing key agreement between the two identities. pub(crate) fn new(this_node_identity: &Identity, id: Identity) -> Option { - this_node_identity.agree(&id).map(|static_secret| { + this_node_identity.agree(&id).map(|static_secret| -> Peer { Peer { identity: id, static_secret: SymmetricSecret::new(static_secret), - ephemeral_secret: Mutex::new(None), + ephemeral_secret: ArcSwapOption::const_empty(), paths: Mutex::new(Vec::new()), reported_local_ip: Mutex::new(None), last_send_time_ticks: AtomicI64::new(0), @@ -211,23 +209,29 @@ impl Peer { /// Receive, decrypt, authenticate, and process an incoming packet from this peer. /// If the packet comes in multiple fragments, the fragments slice should contain all /// those fragments after the main packet header and first chunk. - pub(crate) fn receive(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { + pub(crate) fn receive(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { let mut payload: Buffer = unsafe { Buffer::new_without_memzero() }; - let mut message_id = 0_u64; - let ephemeral_secret: Option> = self.ephemeral_secret.lock().clone(); - let forward_secrecy = if ephemeral_secret.map_or(false, |ephemeral_secret| try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id)) { - // Decrypted and authenticated by the ephemeral secret. - true + + let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_secret.load_full() { + if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload) { + ephemeral_secret.decrypt_uses.fetch_add(1, Ordering::Relaxed); + (true, message_id) + } else { + (false, 0) + } } else { - // There is no ephemeral secret, or authentication with it failed. - unsafe { payload.set_size_unchecked(0); } - if !try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id) { - // Static secret also failed, reject packet. + (false, 0) + }; + if !forward_secrecy { + if let Some(message_id2) = try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload) { + message_id = message_id2; + } else { + // Packet failed to decrypt using either ephemeral or permament key, reject. return; } - false - }; + } + debug_assert!(!payload.is_empty()); // --------------------------------------------------------------- // If we made it here it decrypted and passed authentication. @@ -237,7 +241,6 @@ impl Peer { self.total_bytes_received.fetch_add((payload.len() + PACKET_HEADER_SIZE) as u64, Ordering::Relaxed); source_path.log_receive_authenticated_packet(payload.len() + PACKET_HEADER_SIZE, source_endpoint); - debug_assert!(!payload.is_empty()); // should be impossible since this fails in try_aead_decrypt() let mut verb = payload.as_bytes()[0]; // If this flag is set, the end of the payload is a full HMAC-SHA384 authentication @@ -285,11 +288,20 @@ impl Peer { VERB_VL1_USER_MESSAGE => self.receive_user_message(ci, node, time_ticks, source_path, &payload), _ => {} } + } else { + #[cfg(debug)] { + if match verb { + VERB_VL1_NOP | VERB_VL1_HELLO | VERB_VL1_ERROR | VERB_VL1_OK | VERB_VL1_WHOIS | VERB_VL1_RENDEZVOUS | VERB_VL1_ECHO | VERB_VL1_PUSH_DIRECT_PATHS | VERB_VL1_USER_MESSAGE => true, + _ => false + } { + panic!("The next layer handled a VL1 packet! It should not do this."); + } + } } }); } - fn send_to_endpoint(&self, ci: &CI, endpoint: &Endpoint, local_socket: Option, local_interface: Option, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { + fn send_to_endpoint(&self, ci: &CI, endpoint: &Endpoint, local_socket: Option, local_interface: Option, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { debug_assert!(packet.len() <= PACKET_SIZE_MAX); debug_assert!(packet.len() >= PACKET_SIZE_MIN); match endpoint { @@ -343,7 +355,7 @@ impl Peer { /// /// This will go directly if there is an active path, or otherwise indirectly /// via a root or some other route. - pub(crate) fn send(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { + pub(crate) fn send(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { self.path(node).map_or(false, |path| { if self.send_to_endpoint(ci, path.endpoint().as_ref(), path.local_socket(), path.local_interface(), packet) { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); @@ -362,7 +374,7 @@ impl Peer { /// /// This doesn't fragment large packets since fragments are forwarded individually. /// Intermediates don't need to adjust fragmentation. - pub(crate) fn forward(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { + pub(crate) fn forward(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { self.direct_path().map_or(false, |path| { if ci.wire_send(path.endpoint().as_ref(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) { self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); @@ -378,7 +390,7 @@ impl Peer { /// /// If explicit_endpoint is not None the packet will be sent directly to this endpoint. /// Otherwise it will be sent via the best direct or indirect path known. - pub(crate) fn send_hello(&self, ci: &CI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { + pub(crate) fn send_hello(&self, ci: &CI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { let mut packet: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new(); let time_ticks = ci.time_ticks(); @@ -387,7 +399,7 @@ impl Peer { let packet_header: &mut PacketHeader = packet.append_struct_get_mut().unwrap(); packet_header.id = message_id.to_ne_bytes(); // packet ID and message ID are the same when Poly1305 MAC is used packet_header.dest = self.identity.address.to_bytes(); - packet_header.src = node.address().to_bytes(); + packet_header.src = node.identity.address.to_bytes(); packet_header.flags_cipher_hops = CIPHER_NOCRYPT_POLY1305; } { @@ -450,13 +462,13 @@ impl Peer { /// Called every INTERVAL during background tasks. #[inline(always)] - pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) {} + pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) {} #[inline(always)] - fn receive_hello(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_hello(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_error(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { + fn receive_error(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { let mut cursor: usize = 0; let _ = payload.read_struct::(&mut cursor).map(|error_header| { let in_re_message_id = u64::from_ne_bytes(error_header.in_re_message_id); @@ -472,7 +484,7 @@ impl Peer { } #[inline(always)] - fn receive_ok(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { + fn receive_ok(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { let mut cursor: usize = 0; let _ = payload.read_struct::(&mut cursor).map(|ok_header| { let in_re_message_id = u64::from_ne_bytes(ok_header.in_re_message_id); @@ -492,19 +504,19 @@ impl Peer { } #[inline(always)] - fn receive_whois(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_whois(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_rendezvous(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_rendezvous(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_echo(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_echo(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_push_direct_paths(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_push_direct_paths(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_user_message(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_user_message(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} /// Get current best path or None if there are no direct paths to this peer. #[inline(always)] diff --git a/zerotier-network-hypervisor/src/vl1/whoisqueue.rs b/zerotier-network-hypervisor/src/vl1/whoisqueue.rs index 24ce83cc9..671113038 100644 --- a/zerotier-network-hypervisor/src/vl1/whoisqueue.rs +++ b/zerotier-network-hypervisor/src/vl1/whoisqueue.rs @@ -13,7 +13,7 @@ use parking_lot::Mutex; use crate::util::gate::IntervalGate; use crate::vl1::Address; use crate::vl1::fragmentedpacket::FragmentedPacket; -use crate::vl1::node::{Node, VL1SystemInterface}; +use crate::vl1::node::{Node, SystemInterface}; use crate::vl1::protocol::{WHOIS_RETRY_INTERVAL, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_MAX}; use crate::PacketBuffer; @@ -36,7 +36,7 @@ impl WhoisQueue { pub fn new() -> Self { Self(Mutex::new(HashMap::new())) } /// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received. - pub fn query(&self, node: &Node, ci: &CI, target: Address, packet: Option) { + pub fn query(&self, node: &Node, ci: &CI, target: Address, packet: Option) { let mut q = self.0.lock(); let qi = q.entry(target).or_insert_with(|| WhoisQueueItem { @@ -64,7 +64,7 @@ impl WhoisQueue { } /// Called every INTERVAL during background tasks. - pub fn call_every_interval(&self, node: &Node, ci: &CI, time_ticks: i64) { + pub fn call_every_interval(&self, node: &Node, ci: &CI, time_ticks: i64) { let mut targets: Vec
= Vec::new(); self.0.lock().retain(|target, qi| { if qi.retry_count < WHOIS_RETRY_MAX { @@ -82,7 +82,7 @@ impl WhoisQueue { } } - fn send_whois(&self, node: &Node, ci: &CI, targets: &[Address]) { + fn send_whois(&self, node: &Node, ci: &CI, targets: &[Address]) { todo!() } } diff --git a/zerotier-network-hypervisor/src/vl2/switch.rs b/zerotier-network-hypervisor/src/vl2/switch.rs index fbcfcf928..3c73ed47a 100644 --- a/zerotier-network-hypervisor/src/vl2/switch.rs +++ b/zerotier-network-hypervisor/src/vl2/switch.rs @@ -13,7 +13,7 @@ use crate::vl1::node::VL1VirtualInterface; use crate::vl1::{Peer, Path, Identity}; use crate::vl1::protocol::*; -pub trait SwitchInterface { +pub trait SwitchInterface: Sync + Send { } pub struct Switch {