diff --git a/zerotier-network-hypervisor/Cargo.toml b/zerotier-network-hypervisor/Cargo.toml index d0db44825..fb2308def 100644 --- a/zerotier-network-hypervisor/Cargo.toml +++ b/zerotier-network-hypervisor/Cargo.toml @@ -15,6 +15,7 @@ panic = 'abort' zerotier-core-crypto = { path = "../zerotier-core-crypto" } base64 = "^0" lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] } +metrohash = "^1" dashmap = "^5" parking_lot = "^0" lazy_static = "^1" diff --git a/zerotier-network-hypervisor/src/error.rs b/zerotier-network-hypervisor/src/error.rs index 1e93f68a5..65b666476 100644 --- a/zerotier-network-hypervisor/src/error.rs +++ b/zerotier-network-hypervisor/src/error.rs @@ -9,6 +9,23 @@ use std::error::Error; use std::fmt::{Debug, Display}; +pub struct UnexpectedError; + +impl Display for UnexpectedError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("UnexpectedError") + } +} + +impl Debug for UnexpectedError { + #[inline(always)] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + ::fmt(self, f) + } +} + +impl Error for UnexpectedError {} + pub struct InvalidFormatError; impl Display for InvalidFormatError { @@ -26,8 +43,6 @@ impl Debug for InvalidFormatError { impl Error for InvalidFormatError {} -/****/ - pub struct InvalidParameterError(pub(crate) &'static str); impl Display for InvalidParameterError { @@ -45,8 +60,6 @@ impl Debug for InvalidParameterError { impl Error for InvalidParameterError {} -/****/ - pub struct MalformedRecordError(pub(crate) &'static str); impl Display for MalformedRecordError { diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index 642dcb970..c1822346d 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -23,13 +23,6 @@ pub(crate) fn byte_array_range() } } -/// Obtain a reference to a sub-array within an existing byte array. -//#[inline(always)] -//pub(crate) fn byte_array_range_mut(a: &mut [u8; A]) -> &mut [u8; LEN] { -// assert!((START + LEN) <= A); -// unsafe { &mut *a.as_mut_ptr().add(START).cast::<[u8; LEN]>() } -//} - /// Non-cryptographic 64-bit bit mixer for things like local hashing. #[inline(always)] pub(crate) fn hash64_noncrypt(mut x: u64) -> u64 { diff --git a/zerotier-network-hypervisor/src/vl1/endpoint.rs b/zerotier-network-hypervisor/src/vl1/endpoint.rs index c2d37f305..202f372ec 100644 --- a/zerotier-network-hypervisor/src/vl1/endpoint.rs +++ b/zerotier-network-hypervisor/src/vl1/endpoint.rs @@ -123,12 +123,6 @@ impl Endpoint { None } } - - pub fn to_bytes(&self) -> Vec { - let mut b: Buffer = Buffer::new(); - self.marshal(&mut b).expect("internal error marshaling Endpoint"); - b.as_bytes().to_vec() - } } impl Marshalable for Endpoint { diff --git a/zerotier-network-hypervisor/src/vl1/inetaddress.rs b/zerotier-network-hypervisor/src/vl1/inetaddress.rs index 9f57494e2..a9bed2f50 100644 --- a/zerotier-network-hypervisor/src/vl1/inetaddress.rs +++ b/zerotier-network-hypervisor/src/vl1/inetaddress.rs @@ -471,20 +471,6 @@ impl InetAddress { } } - /// Get raw IP bytes packed into a u128. - /// Bytes are packed in native endian so the resulting u128 may not be the same between systems. - /// This value is intended for local lookup use only. - #[inline(always)] - pub(crate) fn ip_as_native_u128(&self) -> u128 { - unsafe { - match self.sa.sa_family as u8 { - AF_INET => self.sin.sin_addr.s_addr as u128, - AF_INET6 => u128::from_ne_bytes(*(&self.sin6.sin6_addr as *const in6_addr).cast::<[u8; 16]>()), - _ => 0, - } - } - } - /// Get the IP port for this InetAddress. pub fn port(&self) -> u16 { unsafe { diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index 516a6dff9..fdfd51e0f 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -6,13 +6,16 @@ * https://www.zerotier.com/ */ +use std::collections::HashMap; use std::num::NonZeroI64; use std::str::FromStr; +use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use std::time::Duration; use dashmap::DashMap; -use parking_lot::Mutex; +use lazy_static::lazy_static; +use parking_lot::{Mutex, RwLock}; use crate::error::InvalidParameterError; use crate::util::buffer::Buffer; @@ -21,7 +24,7 @@ use crate::vl1::path::Path; use crate::vl1::peer::Peer; use crate::vl1::protocol::*; use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue}; -use crate::vl1::{Address, Endpoint, Identity}; +use crate::vl1::{Address, Endpoint, Identity, RootCluster}; use crate::{PacketBuffer, PacketBufferFactory, PacketBufferPool}; /// Trait implemented by external code to handle events and provide an interface to the system or application. @@ -41,6 +44,9 @@ pub trait SystemInterface: Sync + Send { /// A USER_MESSAGE packet was received. fn event_user_message(&self, source: &Identity, message_type: u64, message: &[u8]); + /// VL1 core generated a security warning. + fn event_security_warning(&self, warning: &str); + /// Load this node's identity from the data store. fn load_node_identity(&self) -> Option>; @@ -64,11 +70,8 @@ pub trait SystemInterface: Sync + Send { /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option, local_interface: Option) -> bool; - /// Called to look up a path to a known node. - /// - /// If a path is found, this returns a tuple of an endpoint and optional local socket and local - /// interface IDs. If these are None they will be None when this is sent with wire_send. - fn get_path_hints(&self, id: &Identity) -> Option<&[(&Endpoint, Option, Option)]>; + /// Called to look up any statically defined or memorized paths to known nodes. + fn get_path_hints(&self, id: &Identity) -> Option, Option)>>; /// Called to get the current time in milliseconds from the system monotonically increasing clock. /// This needs to be accurate to about 250 milliseconds resolution or better. @@ -113,13 +116,23 @@ pub(crate) trait BackgroundServicable { fn service(&self, si: &SI, node: &Node, time_ticks: i64) -> bool; } +/// How often to check the root cluster definitions against the root list and update. +const ROOT_SYNC_INTERVAL_MS: i64 = 1000; + +lazy_static! { + static ref BACKGROUND_TASK_INTERVAL: Duration = Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(WhoisQueue::SERVICE_INTERVAL_MS).min(Path::SERVICE_INTERVAL_MS).min(Peer::SERVICE_INTERVAL_MS) as u64) / 2); +} + #[derive(Default)] struct BackgroundTaskIntervals { whois: IntervalGate<{ WhoisQueue::SERVICE_INTERVAL_MS }>, paths: IntervalGate<{ Path::SERVICE_INTERVAL_MS }>, peers: IntervalGate<{ Peer::SERVICE_INTERVAL_MS }>, + root_sync: IntervalGate, + root_hello: IntervalGate, } +/// A VL1 global P2P network node. pub struct Node { /// A random ID generated to identify this particular running instance. pub instance_id: u64, @@ -136,14 +149,17 @@ pub struct Node { /// Peers with which we are currently communicating. peers: DashMap>, - /// This node's trusted roots, sorted in descending order of preference. - roots: Mutex>>, + /// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions. + roots: Mutex<(Vec>, Vec)>, + + /// Current best root. + best_root: RwLock>>, /// Identity lookup queue, also holds packets waiting on a lookup. whois: WhoisQueue, /// Reusable network buffer pool. - buffer_pool: Arc, + buffer_pool: PacketBufferPool, } impl Node { @@ -179,11 +195,12 @@ impl Node { instance_id: zerotier_core_crypto::random::next_u64_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), - paths: DashMap::with_capacity(256), - peers: DashMap::with_capacity(128), - roots: Mutex::new(Vec::new()), + paths: DashMap::new(), + peers: DashMap::new(), + roots: Mutex::new((Vec::new(), Vec::new())), + best_root: RwLock::new(None), whois: WhoisQueue::new(), - buffer_pool: Arc::new(PacketBufferPool::new(64, PacketBufferFactory::new())), + buffer_pool: PacketBufferPool::new(64, PacketBufferFactory::new()), }) } @@ -194,6 +211,7 @@ impl Node { } /// Get a peer by address. + #[inline(always)] pub fn peer(&self, a: Address) -> Option> { self.peers.get(&a).map(|peer| peer.value().clone()) } @@ -216,8 +234,91 @@ impl Node { let mut intervals = self.intervals.lock(); let tt = si.time_ticks(); + if intervals.root_sync.gate(tt) { + let mut roots_lock = self.roots.lock(); + let (roots, root_clusters) = &mut *roots_lock; + + // Look at root cluster definitions and make sure all have corresponding root peers. + let mut root_endpoints = HashMap::with_capacity(roots.len() * 2); + for rc in root_clusters.iter() { + for m in rc.members.iter() { + if m.endpoints.is_some() { + let endpoints = m.endpoints.as_ref().unwrap(); + + /* + * SECURITY NOTE: we take extra care to handle the case where we have a peer whose identity + * differs from that of a root but whose address is the same. It should be impossible to + * make this happen, but we check anyway. It would require a colliding identity to be + * approved by one of your existing roots and somehow retrieved via WHOIS, but this would + * be hard because this background task loop populates the peer list with specified root + * identities before this happens. It could also happen if you have two root cluster + * definitions with a colliding address, which would itself be hard to produce and would + * probably mean someone is doing something nasty. + * + * In this case the response is to ignore this root entirely and generate a warning. + */ + + // This functional stuff on entry() is to do all this atomically while holding the map's entry + // object, since this is a "lock-free" structure. + let _ = self + .peers + .entry(m.identity.address) + .or_try_insert_with(|| { + Peer::new(&self.identity, m.identity.clone(), tt).map_or(Err(crate::error::UnexpectedError), |new_root| { + let new_root = Arc::new(new_root); + roots.retain(|r| r.identity.address != m.identity.address); // sanity check, should be impossible + roots.push(new_root.clone()); + Ok(new_root) + }) + }) + .and_then(|root_peer_entry| { + let rp = root_peer_entry.value(); + if rp.identity.eq(&m.identity) { + Ok(root_peer_entry) + } else { + roots.retain(|r| r.identity.address != m.identity.address); + si.event_security_warning(format!("address/identity collision between root {} (from root cluster definition '{}') and known peer {}", m.identity.address.to_string(), rc.name, rp.identity.to_string()).as_str()); + Err(crate::error::UnexpectedError) + } + }) + .map(|_| { + let _ = root_endpoints.insert(m.identity.address, endpoints); + }); + } + } + } + + // Remove all roots not in any current root cluster definition. + roots.retain(|r| root_endpoints.contains_key(&r.identity.address)); + + // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint + // they have, which is a behavior that differs from normal peers. This allows roots to + // e.g. see our IPv4 and our IPv6 address which can be important for us to learn our + // external addresses from them. + assert!(ROOT_SYNC_INTERVAL_MS <= (ROOT_HELLO_INTERVAL / 2)); + if intervals.root_hello.gate(tt) { + for r in roots.iter() { + for ep in root_endpoints.get(&r.identity.address).unwrap().iter() { + r.send_hello(si, self, Some(ep)); + } + } + } + + // Update best root fast lookup field. + if !roots.is_empty() { + let _ = self.best_root.write().insert(roots.last().unwrap().clone()); + } else { + // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison + // this is a proxy for latency and also causes roots that fail to reply to drop out quickly. + roots.sort_unstable_by(|a, b| a.last_hello_reply_time_ticks.load(Ordering::Relaxed).cmp(&b.last_hello_reply_time_ticks.load(Ordering::Relaxed))); + let _ = self.best_root.write().take(); + } + } + if intervals.peers.gate(tt) { - self.peers.retain(|_, peer| peer.service(si, self, tt)); + // Service all peers, removing any whose service() method returns false AND that are not + // roots. Roots on the other hand remain in the peer list as long as they are roots. + self.peers.retain(|_, peer| if peer.service(si, self, tt) { true } else { !self.roots.lock().0.iter().any(|r| Arc::ptr_eq(peer, r)) }); } if intervals.paths.gate(tt) { @@ -228,7 +329,7 @@ impl Node { let _ = self.whois.service(si, self, tt); } - Duration::from_millis((WhoisQueue::SERVICE_INTERVAL_MS.min(Path::SERVICE_INTERVAL_MS).min(Peer::SERVICE_INTERVAL_MS) as u64) / 2) + *BACKGROUND_TASK_INTERVAL } /// Called when a packet is received on the physical wire. @@ -252,7 +353,7 @@ impl Node { let packet_header = packet_header.unwrap(); if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.receive(self, si, ph, time_ticks, source_endpoint, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]); + peer.receive(self, si, ph, time_ticks, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]); } else { self.whois.query(self, si, source, Some(QueuedPacket::Fragmented(assembled_packet))); } @@ -266,7 +367,7 @@ impl Node { if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.receive(self, si, ph, time_ticks, source_endpoint, &path, &packet_header, data.as_ref(), &[]); + peer.receive(self, si, ph, time_ticks, &path, &packet_header, data.as_ref(), &[]); } else { self.whois.query(self, si, source, Some(QueuedPacket::Unfragmented(data))); } @@ -275,6 +376,7 @@ impl Node { } } else { // Forward packets not destined for this node. + // TODO: SHOULD we forward? Need a way to check. if fragment_header.is_fragment() { if fragment_header.increment_hops() > FORWARD_MAX_HOPS { @@ -299,13 +401,14 @@ impl Node { } /// Get the current best root peer that we should use for WHOIS, relaying, etc. + #[inline(always)] pub fn root(&self) -> Option> { - self.roots.lock().first().cloned() + self.best_root.read().clone() } /// Return true if a peer is a root. pub fn is_peer_root(&self, peer: &Peer) -> bool { - self.roots.lock().iter().any(|p| Arc::as_ptr(p) == (peer as *const Peer)) + self.roots.lock().0.iter().any(|p| Arc::as_ptr(p) == (peer as *const Peer)) } /// Get the canonical Path object for a given endpoint and local socket information. diff --git a/zerotier-network-hypervisor/src/vl1/path.rs b/zerotier-network-hypervisor/src/vl1/path.rs index d74e0d03a..44d4ae5f0 100644 --- a/zerotier-network-hypervisor/src/vl1/path.rs +++ b/zerotier-network-hypervisor/src/vl1/path.rs @@ -7,32 +7,24 @@ */ use std::collections::HashMap; -use std::hash::Hasher; +use std::hash::{Hash, Hasher}; use std::num::NonZeroI64; use std::sync::atomic::{AtomicI64, Ordering}; -use std::sync::Arc; use lazy_static::lazy_static; +use metrohash::MetroHash128; use parking_lot::Mutex; -use zerotier_core_crypto::hash::SHA512_HASH_SIZE; +use zerotier_core_crypto::random; use crate::util::*; use crate::vl1::fragmentedpacket::FragmentedPacket; use crate::vl1::node::*; use crate::vl1::protocol::*; -use crate::vl1::Endpoint; +use crate::vl1::{endpoint, Endpoint}; use crate::PacketBuffer; -// A bunch of random values used to randomize the local_lookup_key() function's mappings of addresses to 128-bit internal keys. lazy_static! { - static ref RANDOM_64BIT_SALT_0: u64 = zerotier_core_crypto::random::next_u64_secure(); - static ref RANDOM_64BIT_SALT_1: u64 = zerotier_core_crypto::random::next_u64_secure(); - static ref RANDOM_64BIT_SALT_2: u64 = zerotier_core_crypto::random::next_u64_secure(); - static ref RANDOM_128BIT_SALT_0: u128 = (zerotier_core_crypto::random::next_u64_secure().wrapping_shl(64) as u128) ^ (zerotier_core_crypto::random::next_u64_secure() as u128); - static ref RANDOM_128BIT_SALT_1: u128 = (zerotier_core_crypto::random::next_u64_secure().wrapping_shl(64) as u128) ^ (zerotier_core_crypto::random::next_u64_secure() as u128); - static ref RANDOM_128BIT_SALT_2: u128 = (zerotier_core_crypto::random::next_u64_secure().wrapping_shl(64) as u128) ^ (zerotier_core_crypto::random::next_u64_secure() as u128); - static ref RANDOM_128BIT_SALT_3: u128 = (zerotier_core_crypto::random::next_u64_secure().wrapping_shl(64) as u128) ^ (zerotier_core_crypto::random::next_u64_secure() as u128); - static ref RANDOM_128BIT_SALT_4: u128 = (zerotier_core_crypto::random::next_u64_secure().wrapping_shl(64) as u128) ^ (zerotier_core_crypto::random::next_u64_secure() as u128); + static ref METROHASH_SEED: u64 = random::next_u64_secure(); } /// A remote endpoint paired with a local socket and a local interface. @@ -40,9 +32,9 @@ lazy_static! { /// one and only one unique path object. That enables statistics to be tracked /// for them and uniform application of things like keepalives. pub struct Path { - endpoint: Mutex>, - pub(crate) local_socket: Option, - pub(crate) local_interface: Option, + pub endpoint: Endpoint, + pub local_socket: Option, + pub local_interface: Option, last_send_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64, fragmented_packets: Mutex>, @@ -50,41 +42,60 @@ pub struct Path { impl Path { /// Get a 128-bit key to look up this endpoint in the local node path map. - #[inline(always)] pub(crate) fn local_lookup_key(endpoint: &Endpoint, local_socket: Option, local_interface: Option) -> u128 { - let local_socket = local_socket.map_or(0, |s| crate::util::hash64_noncrypt(*RANDOM_64BIT_SALT_0 + s.get() as u64)); - let local_interface = local_interface.map_or(0, |s| crate::util::hash64_noncrypt(*RANDOM_64BIT_SALT_1 + s.get() as u64)); - let lsi = (local_socket as u128).wrapping_shl(64) | (local_interface as u128); + let mut h = MetroHash128::with_seed(*METROHASH_SEED); + h.write_u64(local_socket.map_or(0, |s| s.get() as u64)); + h.write_u64(local_interface.map_or(0, |s| s.get() as u64)); match endpoint { - Endpoint::Nil => 0, - Endpoint::ZeroTier(_, h) => u128::from_ne_bytes(*byte_array_range::(h)), - Endpoint::Ethernet(m) => RANDOM_128BIT_SALT_0.wrapping_add(lsi as u128).wrapping_add(m.to_u64() as u128), - Endpoint::WifiDirect(m) => RANDOM_128BIT_SALT_1.wrapping_add(lsi as u128).wrapping_add(m.to_u64() as u128), - Endpoint::Bluetooth(m) => RANDOM_128BIT_SALT_2.wrapping_add(lsi as u128).wrapping_add(m.to_u64() as u128), - Endpoint::Ip(ip) => ip.ip_as_native_u128().wrapping_sub(lsi), // naked IP has no port - Endpoint::IpUdp(ip) => ip.ip_as_native_u128().wrapping_add(lsi), // UDP maintains one path per IP but merely learns the most recent port - Endpoint::IpTcp(ip) => ip.ip_as_native_u128().wrapping_sub(crate::util::hash64_noncrypt((ip.port() as u64).wrapping_add(*RANDOM_64BIT_SALT_2)) as u128).wrapping_sub(lsi), + Endpoint::Nil => h.write_u8(endpoint::TYPE_NIL), + Endpoint::ZeroTier(_, fingerprint) => { + h.write_u8(endpoint::TYPE_ZEROTIER); + h.write(fingerprint); + } + Endpoint::Ethernet(m) => { + h.write_u8(endpoint::TYPE_ETHERNET); + h.write_u64(m.to_u64()); + } + Endpoint::WifiDirect(m) => { + h.write_u8(endpoint::TYPE_WIFIDIRECT); + h.write_u64(m.to_u64()); + } + Endpoint::Bluetooth(m) => { + h.write_u8(endpoint::TYPE_BLUETOOTH); + h.write_u64(m.to_u64()); + } + Endpoint::Ip(ip) => { + h.write_u8(endpoint::TYPE_IP); + h.write(ip.ip_bytes()); + } + Endpoint::IpUdp(ip) => { + h.write_u8(endpoint::TYPE_IPUDP); + ip.hash(&mut h); + } + Endpoint::IpTcp(ip) => { + h.write_u8(endpoint::TYPE_IPTCP); + ip.hash(&mut h); + } Endpoint::Http(s) => { - let mut hh = std::collections::hash_map::DefaultHasher::new(); - hh.write_u64(local_socket); - hh.write_u64(local_interface); - hh.write(s.as_bytes()); - RANDOM_128BIT_SALT_3.wrapping_add(hh.finish() as u128) + h.write_u8(endpoint::TYPE_HTTP); + h.write(s.as_bytes()); } Endpoint::WebRTC(b) => { - let mut hh = std::collections::hash_map::DefaultHasher::new(); - hh.write_u64(local_socket); - hh.write_u64(local_interface); - hh.write(b.as_slice()); - RANDOM_128BIT_SALT_4.wrapping_add(hh.finish() as u128) + h.write_u8(endpoint::TYPE_WEBRTC); + h.write(b.as_slice()); + } + Endpoint::ZeroTierEncap(_, fingerprint) => { + h.write_u8(endpoint::TYPE_ZEROTIER_ENCAP); + h.write(fingerprint); } - Endpoint::ZeroTierEncap(_, h) => u128::from_ne_bytes(*byte_array_range::(h)), } + assert_eq!(std::mem::size_of::<(u64, u64)>(), std::mem::size_of::()); + unsafe { std::mem::transmute(h.finish128()) } } pub fn new(endpoint: Endpoint, local_socket: Option, local_interface: Option) -> Self { Self { - endpoint: Mutex::new(Arc::new(endpoint)), + endpoint, local_socket, local_interface, last_send_time_ticks: AtomicI64::new(0), @@ -93,11 +104,6 @@ impl Path { } } - #[inline(always)] - pub fn endpoint(&self) -> Arc { - self.endpoint.lock().clone() - } - /// Receive a fragment and return a FragmentedPacket if the entire packet was assembled. /// This returns None if more fragments are needed to assemble the packet. pub(crate) fn receive_fragment(&self, packet_id: u64, fragment_no: u8, fragment_expecting_count: u8, packet: PacketBuffer, time_ticks: i64) -> Option { @@ -131,30 +137,6 @@ impl Path { self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); } - /// Called when a real packet is received and passes authentication checks. - pub(crate) fn log_receive_authenticated_packet(&self, _bytes: usize, source_endpoint: &Endpoint) { - match source_endpoint { - Endpoint::IpUdp(ip) => { - // If an IPv4 UDP remote IP is the same but the port changes, learn the new port by replacing the - // endpoint with the new one. This is because IPv4 NATs will occasionally remap IPs at random. - if ip.is_ipv4() { - let mut ep = self.endpoint.lock(); - match ep.as_ref() { - Endpoint::IpUdp(ip_orig) => { - // These should always be equal because this path would have been looked up by IP, but sanity check in debug. - debug_assert_eq!(ip_orig.ip_bytes(), ip.ip_bytes()); - if ip_orig.port() != ip.port() { - (*ep) = Arc::new(source_endpoint.clone()); - } - } - _ => {} - } - } - } - _ => {} - } - } - #[inline(always)] pub(crate) fn log_send_anything(&self, time_ticks: i64) { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); @@ -164,9 +146,12 @@ impl Path { impl BackgroundServicable for Path { const SERVICE_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL; - fn service(&self, si: &SI, node: &Node, time_ticks: i64) -> bool { + fn service(&self, si: &SI, _: &Node, time_ticks: i64) -> bool { self.fragmented_packets.lock().retain(|_, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION); - // TODO: keepalives + if (time_ticks - self.last_send_time_ticks.load(Ordering::Relaxed)) >= PATH_KEEPALIVE_INTERVAL { + self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); + si.wire_send(&self.endpoint, self.local_socket, self.local_interface, &[&ZEROES[..1]], 0); + } true } } diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index cb928b04d..38daa7b7d 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -16,7 +16,6 @@ use parking_lot::Mutex; use zerotier_core_crypto::aes_gmac_siv::AesCtr; use zerotier_core_crypto::hash::*; -use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384; use zerotier_core_crypto::poly1305::Poly1305; use zerotier_core_crypto::random::{get_bytes_secure, next_u64_secure}; use zerotier_core_crypto::salsa::Salsa; @@ -24,6 +23,7 @@ use zerotier_core_crypto::secret::Secret; use crate::util::buffer::Buffer; use crate::util::byte_array_range; +use crate::util::marshalable::Marshalable; use crate::vl1::hybridkey::HybridKeyPair; use crate::vl1::identity::{IDENTITY_ALGORITHM_ALL, IDENTITY_ALGORITHM_X25519}; use crate::vl1::node::*; @@ -37,7 +37,7 @@ use crate::{PacketBuffer, VERSION_MAJOR, VERSION_MINOR, VERSION_PROTO, VERSION_R /// send/receive is not uncommon. pub struct Peer { // This peer's identity. - identity: Identity, + pub(crate) identity: Identity, // Static shared secret computed from agreement with identity. identity_symmetric_key: SymmetricSecret, @@ -58,8 +58,9 @@ pub struct Peer { reported_local_ip: Mutex>, // Statistics and times of events. - last_send_time_ticks: AtomicI64, - last_receive_time_ticks: AtomicI64, + pub(crate) last_send_time_ticks: AtomicI64, + pub(crate) last_receive_time_ticks: AtomicI64, + pub(crate) last_hello_reply_time_ticks: AtomicI64, last_forward_time_ticks: AtomicI64, total_bytes_sent: AtomicU64, total_bytes_sent_indirect: AtomicU64, @@ -198,6 +199,7 @@ impl Peer { reported_local_ip: Mutex::new(None), last_send_time_ticks: AtomicI64::new(0), last_receive_time_ticks: AtomicI64::new(0), + last_hello_reply_time_ticks: AtomicI64::new(0), last_forward_time_ticks: AtomicI64::new(0), total_bytes_sent: AtomicU64::new(0), total_bytes_sent_indirect: AtomicU64::new(0), @@ -221,18 +223,7 @@ impl Peer { /// /// If the packet comes in multiple fragments, the fragments slice should contain all /// those fragments after the main packet header and first chunk. - pub(crate) fn receive( - &self, - node: &Node, - si: &SI, - vi: &VI, - time_ticks: i64, - source_endpoint: &Endpoint, - source_path: &Arc, - header: &PacketHeader, - frag0: &Buffer<{ PACKET_SIZE_MAX }>, - fragments: &[Option], - ) { + pub(crate) fn receive(&self, node: &Node, si: &SI, vi: &VI, time_ticks: i64, source_path: &Arc, header: &PacketHeader, frag0: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { let _ = frag0.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { let mut payload: Buffer = unsafe { Buffer::new_without_memzero() }; @@ -266,7 +257,6 @@ impl Peer { self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); self.total_bytes_received.fetch_add((payload.len() + PACKET_HEADER_SIZE) as u64, Ordering::Relaxed); - source_path.log_receive_authenticated_packet(payload.len() + PACKET_HEADER_SIZE, source_endpoint); let mut verb = payload.as_bytes()[0]; @@ -386,7 +376,7 @@ impl Peer { /// via a root or some other route. pub(crate) fn send(&self, si: &SI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { self.path(node).map_or(false, |path| { - if self.send_to_endpoint(si, path.endpoint().as_ref(), path.local_socket, path.local_interface, packet) { + if self.send_to_endpoint(si, &path.endpoint, path.local_socket, path.local_interface, packet) { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); self.total_bytes_sent.fetch_add(packet.len() as u64, Ordering::Relaxed); true @@ -405,7 +395,7 @@ impl Peer { /// Intermediates don't need to adjust fragmentation. pub(crate) fn forward(&self, si: &SI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { self.direct_path().map_or(false, |path| { - if si.wire_send(path.endpoint().as_ref(), path.local_socket, path.local_interface, &[packet.as_bytes()], 0) { + if si.wire_send(&path.endpoint, path.local_socket, path.local_interface, &[packet.as_bytes()], 0) { self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); self.total_bytes_forwarded.fetch_add(packet.len() as u64, Ordering::Relaxed); true @@ -428,7 +418,7 @@ impl Peer { || { self.path(node).map_or(None, |p| { path = Some(p.clone()); - Some(p.endpoint().as_ref().clone()) + Some(p.endpoint.clone()) }) }, |endpoint| Some(endpoint.clone()), @@ -438,11 +428,10 @@ impl Peer { } let destination = destination.unwrap(); - let mut packet: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new(); + let mut packet: Buffer = unsafe { Buffer::new_without_memzero() }; let time_ticks = si.time_ticks(); - - // Create packet headers and the first fixed-size fields in HELLO. let message_id = self.next_message_id(); + { let packet_header: &mut PacketHeader = packet.append_struct_get_mut().unwrap(); packet_header.id = message_id.to_ne_bytes(); // packet ID and message ID are the same when Poly1305 MAC is used @@ -450,27 +439,21 @@ impl Peer { packet_header.src = node.identity.address.to_bytes(); packet_header.flags_cipher_hops = CIPHER_NOCRYPT_POLY1305; } + { let hello_fixed_headers: &mut message_component_structs::HelloFixedHeaderFields = packet.append_struct_get_mut().unwrap(); hello_fixed_headers.verb = VERB_VL1_HELLO | VERB_FLAG_EXTENDED_AUTHENTICATION; - - // Protocol version so remote can do version-dependent things. hello_fixed_headers.version_proto = VERSION_PROTO; - - // Software version (if this is the "official" ZeroTier implementation). hello_fixed_headers.version_major = VERSION_MAJOR; hello_fixed_headers.version_minor = VERSION_MINOR; hello_fixed_headers.version_revision = (VERSION_REVISION as u16).to_be_bytes(); - - // Timestamp for purposes of latency determination (not wall clock). hello_fixed_headers.timestamp = (time_ticks as u64).to_be_bytes(); } - // Add this node's identity. assert!(self.identity.marshal_with_options(&mut packet, IDENTITY_ALGORITHM_ALL, false).is_ok()); if self.identity.algorithms() == IDENTITY_ALGORITHM_X25519 { // LEGACY: append an extra zero when marshaling identities containing only x25519 keys. - // See comments in Identity::marshal(). + // See comments in Identity::marshal(). This can go away eventually. assert!(packet.append_u8(0).is_ok()); } @@ -495,29 +478,25 @@ impl Peer { let mut fields = Dictionary::new(); fields.set_u64(SESSION_METADATA_INSTANCE_ID, node.instance_id); fields.set_u64(SESSION_METADATA_CLOCK, si.time_clock() as u64); - fields.set_bytes(SESSION_METADATA_SENT_TO, destination.to_bytes()); - let ephemeral_secret = self.ephemeral_symmetric_key.lock(); - let _ = ephemeral_secret.as_ref().map(|s| fields.set_bytes(SESSION_METADATA_EPHEMERAL_CURRENT_SYMMETRIC_KEY_ID, s.id.to_vec())); - drop(ephemeral_secret); // release lock - let ephemeral_offer = self.ephemeral_offer.lock(); - let _ = ephemeral_offer.as_ref().map(|p| fields.set_bytes(SESSION_METADATA_EPHEMERAL_PUBLIC_OFFER, p.0.public_bytes())); - drop(ephemeral_offer); // release lock + fields.set_bytes(SESSION_METADATA_SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec()); let fields = fields.to_bytes(); assert!(fields.len() <= 0xffff); // sanity check, should be impossible assert!(packet.append_u16(fields.len() as u16).is_ok()); // prefix with unencrypted size let private_section_start = packet.len(); assert!(packet.append_bytes(fields.as_slice()).is_ok()); - let mut aes = AesCtr::new(&zt_kbkdf_hmac_sha384(&self.identity_symmetric_key.key.as_bytes()[0..48], KBKDF_KEY_USAGE_LABEL_HELLO_PRIVATE_SECTION, 0, 0).as_bytes()[0..32]); + let mut aes = AesCtr::new(&self.identity_symmetric_key.hello_private_section_key.as_bytes()[0..32]); aes.init(&nonce); aes.crypt_in_place(&mut packet.as_mut()[private_section_start..]); + drop(aes); + drop(fields); - // Add extended HMAC-SHA512 authentication. + // Add extended authentication at end of packet. let mut hmac = HMACSHA512::new(self.identity_symmetric_key.packet_hmac_key.as_bytes()); hmac.update(&message_id.to_ne_bytes()); hmac.update(&packet.as_bytes()[PACKET_HEADER_SIZE..]); assert!(packet.append_bytes_fixed(&hmac.finish()).is_ok()); - // Set legacy poly1305 MAC in packet header. Newer nodes check HMAC-SHA512 but older ones only use this. + // Set legacy poly1305 MAC in packet header. Newer nodes also check HMAC-SHA512 but older ones only use this. let (_, mut poly) = salsa_poly_create(&self.identity_symmetric_key, packet.struct_at::(0).unwrap(), packet.len()); poly.update(packet.as_bytes_starting_at(PACKET_HEADER_SIZE).unwrap()); packet.as_mut()[HEADER_MAC_FIELD_INDEX..HEADER_MAC_FIELD_INDEX + 8].copy_from_slice(&poly.finish()[0..8]); @@ -565,7 +544,10 @@ impl Peer { let current_packet_id_counter = self.message_id_counter.load(Ordering::Relaxed); if current_packet_id_counter.wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { match ok_header.in_re_verb { - VERB_VL1_HELLO => {} + VERB_VL1_HELLO => { + // TODO + self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed); + } VERB_VL1_WHOIS => {} _ => { ph.handle_ok(self, source_path, forward_secrecy, extended_authentication, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor); diff --git a/zerotier-network-hypervisor/src/vl1/protocol.rs b/zerotier-network-hypervisor/src/vl1/protocol.rs index 168125bbd..bd716ab48 100644 --- a/zerotier-network-hypervisor/src/vl1/protocol.rs +++ b/zerotier-network-hypervisor/src/vl1/protocol.rs @@ -183,6 +183,9 @@ pub const WHOIS_MAX_WAITING_PACKETS: usize = 64; /// Keepalive interval for paths in milliseconds. pub const PATH_KEEPALIVE_INTERVAL: i64 = 20000; +/// How often to send HELLOs to roots, which is more often than normal peers. +pub const ROOT_HELLO_INTERVAL: i64 = PATH_KEEPALIVE_INTERVAL * 2; + /// Proof of work difficulty (threshold) for identity generation. pub const IDENTITY_POW_THRESHOLD: u8 = 17; diff --git a/zerotier-network-hypervisor/src/vl1/rootcluster.rs b/zerotier-network-hypervisor/src/vl1/rootcluster.rs index b51fb3e40..cabc90be3 100644 --- a/zerotier-network-hypervisor/src/vl1/rootcluster.rs +++ b/zerotier-network-hypervisor/src/vl1/rootcluster.rs @@ -21,8 +21,8 @@ pub struct Root { /// Full identity of this node. pub identity: Identity, - /// Addresses of this root or None if this is a former member attesting to an update that removes it. - pub addresses: Option>, + /// Endpoints for this root or None if this is a former member attesting to an update that removes it. + pub endpoints: Option>, /// Signature of entire cluster by this identity. pub cluster_signature: Vec, @@ -80,10 +80,10 @@ impl RootCluster { buf.append_varint(self.members.len() as u64)?; for m in self.members.iter() { m.identity.marshal_with_options(buf, IDENTITY_ALGORITHM_ALL, false)?; - if m.addresses.is_some() { - let addresses = m.addresses.as_ref().unwrap(); - buf.append_varint(addresses.len() as u64)?; - for a in addresses.iter() { + if m.endpoints.is_some() { + let endpoints = m.endpoints.as_ref().unwrap(); + buf.append_varint(endpoints.len() as u64)?; + for a in endpoints.iter() { a.marshal(buf)?; } } else { @@ -124,13 +124,13 @@ impl RootCluster { } /// Add a member to this definition, replacing any current entry for this identity. - pub fn add<'a, I: Iterator>(&mut self, member_identity: &Identity, addresses: Option) { + pub fn add<'a, I: Iterator>(&mut self, member_identity: &Identity, endpoints: Option) { self.members.retain(|m| !m.identity.eq(member_identity)); let _ = self.members.push(Root { identity: member_identity.clone_without_secret(), - addresses: addresses.map(|addresses| { + endpoints: endpoints.map(|endpoints| { let mut tmp = BTreeSet::new(); - for a in addresses { + for a in endpoints { tmp.insert(a.clone()); } tmp @@ -156,7 +156,7 @@ impl RootCluster { self.members.retain(|m| !m.identity.eq(member_identity)); let _ = self.members.push(Root { identity: unsigned_entry.identity, - addresses: unsigned_entry.addresses, + endpoints: unsigned_entry.endpoints, cluster_signature: signature.unwrap(), flags: unsigned_entry.flags, }); @@ -195,7 +195,7 @@ impl RootCluster { let mut previous_count: isize = 0; let mut witness_count: isize = 0; for m in previous.members.iter() { - if m.addresses.is_some() { + if m.endpoints.is_some() { previous_count += 1; witness_count += my_signers.contains(&m.identity.fingerprint) as isize; } @@ -231,18 +231,18 @@ impl Marshalable for RootCluster { for _ in 0..member_count { let mut m = Root { identity: Identity::unmarshal(buf, cursor)?, - addresses: None, + endpoints: None, cluster_signature: Vec::new(), flags: 0, }; - let address_count = buf.read_varint(cursor)?; - if address_count > 0 { - let mut addresses = BTreeSet::new(); - for _ in 0..address_count { - addresses.insert(Endpoint::unmarshal(buf, cursor)?); + let endpoint_count = buf.read_varint(cursor)?; + if endpoint_count > 0 { + let mut endpoints = BTreeSet::new(); + for _ in 0..endpoint_count { + endpoints.insert(Endpoint::unmarshal(buf, cursor)?); } - let _ = m.addresses.insert(addresses); + let _ = m.endpoints.insert(endpoints); } let signature_size = buf.read_varint(cursor)?; diff --git a/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs b/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs index adfa6d185..57ef06dfc 100644 --- a/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs +++ b/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs @@ -37,6 +37,9 @@ pub(crate) struct SymmetricSecret { /// Master key from which other keys are derived. pub key: Secret<64>, + /// Key for private fields in HELLO packets. + pub hello_private_section_key: Secret<48>, + /// Key used for HMAC extended validation on packets like HELLO. pub packet_hmac_key: Secret<64>, @@ -59,11 +62,13 @@ impl Eq for SymmetricSecret {} impl SymmetricSecret { /// Create a new symmetric secret, deriving all sub-keys and such. pub fn new(key: Secret<64>) -> SymmetricSecret { + let hello_private_section_key = zt_kbkdf_hmac_sha384(&key.0, KBKDF_KEY_USAGE_LABEL_HELLO_PRIVATE_SECTION, 0, 0); let packet_hmac_key = zt_kbkdf_hmac_sha512(&key.0, KBKDF_KEY_USAGE_LABEL_PACKET_HMAC, 0, 0); let ephemeral_ratchet_key = zt_kbkdf_hmac_sha512(&key.0, KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_KEY, 0, 0); let aes_factory = AesGmacSivPoolFactory(zt_kbkdf_hmac_sha384(&key.0[0..48], KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K0, 0, 0).first_n(), zt_kbkdf_hmac_sha384(&key.0[0..48], KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K1, 0, 0).first_n()); SymmetricSecret { key, + hello_private_section_key, packet_hmac_key, ephemeral_ratchet_key, aes_gmac_siv: Pool::new(2, aes_factory), diff --git a/zerotier-system-service/Cargo.lock b/zerotier-system-service/Cargo.lock index f1599c87a..afa482f27 100644 --- a/zerotier-system-service/Cargo.lock +++ b/zerotier-system-service/Cargo.lock @@ -313,6 +313,12 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "metrohash" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ba553cb19e2acbc54baa16faef215126243fe45e53357a3b2e9f4ebc7b0506c" + [[package]] name = "nb" version = "0.1.3" @@ -873,6 +879,7 @@ dependencies = [ "lazy_static", "libc", "lz4_flex", + "metrohash", "parking_lot", "serde", "winapi",