diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index 6bf8e5d0d..2179c7487 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -55,6 +55,8 @@ pub struct Peer { pub(crate) last_hello_reply_time_ticks: AtomicI64, last_forward_time_ticks: AtomicI64, create_time_ticks: i64, + + // A random offset added to timestamps sent with HELLO to measure latency avoid advertising the actual tick counter. random_ticks_offset: u64, // Counter for assigning sequential message IDs. @@ -148,6 +150,11 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], } } +/// Sort a list of paths by quality or priority, with best paths first. +fn prioritize_paths(paths: &mut Vec>) { + paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); +} + impl Peer { /// Create a new peer. /// @@ -212,7 +219,7 @@ impl Peer { return None; } - /// Get either the current best direct path or an indirect path. + /// Get either the current best direct path or an indirect path via e.g. a root. pub fn path(&self, node: &Node) -> Option>> { let direct_path = self.direct_path(); if direct_path.is_some() { @@ -224,20 +231,18 @@ impl Peer { return None; } - /// Sort a list of paths by quality or priority, with best paths first. - fn prioritize_paths(paths: &mut Vec>) { - paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); - } - pub(crate) fn learn_path(&self, si: &SI, new_path: &Arc>, time_ticks: i64) { let mut paths = self.paths.lock(); - // If this is an IpUdp endpoint, scan the existing paths and replace any that come from - // the same IP address but a different port. This prevents the accumulation of duplicate - // paths to the same peer over different ports. match &new_path.endpoint { Endpoint::IpUdp(new_ip) => { + // If this is an IpUdp endpoint, scan the existing paths and replace any that come from + // the same IP address but a different port. This prevents the accumulation of duplicate + // paths to the same peer over different ports. for pi in paths.iter_mut() { + if pi.canonical_instance_id == new_path.canonical.canonical_instance_id() { + return; + } if let Some(p) = pi.path.upgrade() { match &p.endpoint { Endpoint::IpUdp(existing_ip) => { @@ -246,7 +251,7 @@ impl Peer { pi.path = Arc::downgrade(new_path); pi.canonical_instance_id = new_path.canonical.canonical_instance_id(); pi.last_receive_time_ticks = time_ticks; - Self::prioritize_paths(&mut paths); + prioritize_paths(&mut paths); return; } } @@ -255,17 +260,23 @@ impl Peer { } } } - _ => {} + _ => { + for pi in paths.iter() { + if pi.canonical_instance_id == new_path.canonical.canonical_instance_id() { + return; + } + } + } } - // Otherwise learn new path. + // Learn new path if it's not a duplicate or should not replace an existing path. debug_event!(si, "[vl1] {} learned new path: {}", self.identity.address.to_string(), new_path.endpoint.to_string()); paths.push(PeerPath:: { path: Arc::downgrade(new_path), canonical_instance_id: new_path.canonical.canonical_instance_id(), last_receive_time_ticks: time_ticks, }); - Self::prioritize_paths(&mut paths); + prioritize_paths(&mut paths); } /// Called every SERVICE_INTERVAL_MS by the background service loop in Node. @@ -273,118 +284,11 @@ impl Peer { { let mut paths = self.paths.lock(); paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); - Self::prioritize_paths(&mut paths); + prioritize_paths(&mut paths); } (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME } - /// Receive, decrypt, authenticate, and process an incoming packet from this peer. - /// - /// If the packet comes in multiple fragments, the fragments slice should contain all - /// those fragments after the main packet header and first chunk. - /// - /// This returns true if the packet decrypted and passed authentication. - pub(crate) async fn receive(&self, node: &Node, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc>, packet_header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option]) { - if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) { - //let mut payload = unsafe { PacketBuffer::new_without_memzero() }; - let mut payload = PacketBuffer::new(); - - // First try decrypting and authenticating with an ephemeral secret if one is negotiated. - let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_symmetric_key.read().as_ref() { - if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, packet_header, fragments, &mut payload) { - // Decryption successful with ephemeral secret - (true, message_id) - } else { - // Decryption failed with ephemeral secret, which may indicate that it's obsolete. - (false, 0) - } - } else { - // There is no ephemeral secret negotiated (yet?). - (false, 0) - }; - - // If forward_secrecy is false it means the ephemeral key failed. Try decrypting with the permanent key. - if !forward_secrecy { - payload.clear(); - if let Some(message_id2) = try_aead_decrypt(&self.identity_symmetric_key, packet_frag0_payload_bytes, packet_header, fragments, &mut payload) { - // Decryption successful with static secret. - message_id = message_id2; - } else { - // Packet failed to decrypt using either ephemeral or permament key, reject. - debug_event!(si, "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id)); - return; - } - } - - if let Ok(mut verb) = payload.u8_at(0) { - let extended_authentication = (verb & packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION) != 0; - if extended_authentication { - if payload.len() >= SHA384_HASH_SIZE { - let actual_end_of_payload = payload.len() - SHA384_HASH_SIZE; - let mut hmac = HMACSHA384::new(self.identity_symmetric_key.packet_hmac_key.as_bytes()); - hmac.update(&node.identity.fingerprint); - hmac.update(&self.identity.fingerprint); - hmac.update(&message_id.to_ne_bytes()); - hmac.update(&payload.as_bytes()[..actual_end_of_payload]); - if !hmac.finish().eq(&payload.as_bytes()[actual_end_of_payload..]) { - return; - } - payload.set_size(actual_end_of_payload); - } else { - return; - } - } - - // --------------------------------------------------------------- - // If we made it here it decrypted and passed authentication. - // --------------------------------------------------------------- - - debug_event!(si, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), verbs::name(verb & packet_constants::VERB_MASK), (verb & packet_constants::VERB_MASK) as u32); - - if (verb & packet_constants::VERB_FLAG_COMPRESSED) != 0 { - let mut decompressed_payload: [u8; packet_constants::SIZE_MAX] = unsafe { MaybeUninit::uninit().assume_init() }; - decompressed_payload[0] = verb; - if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) { - payload.set_to(&decompressed_payload[..(dlen + 1)]); - } else { - return; - } - } - - self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); - - let mut path_is_known = false; - for p in self.paths.lock().iter_mut() { - if p.canonical_instance_id == source_path.canonical.canonical_instance_id() { - p.last_receive_time_ticks = time_ticks; - path_is_known = true; - break; - } - } - - // For performance reasons we let VL2 handle packets first. It returns false - // if it didn't handle the packet, in which case it's handled at VL1. This is - // because the most performance critical path is the handling of the ???_FRAME - // verbs, which are in VL2. - verb &= packet_constants::VERB_MASK; // mask off flags - if !ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await { - match verb { - //VERB_VL1_NOP => {} - verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, source_path, &payload).await, - verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await, - verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, forward_secrecy, extended_authentication, &payload).await, - verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await, - verbs::VL1_RENDEZVOUS => self.handle_incoming_rendezvous(si, node, time_ticks, source_path, &payload).await, - verbs::VL1_ECHO => self.handle_incoming_echo(si, node, time_ticks, source_path, &payload).await, - verbs::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(si, node, time_ticks, source_path, &payload).await, - verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(si, node, time_ticks, source_path, &payload).await, - _ => {} - } - } - } - } - } - /// Send to an endpoint, fragmenting if needed. /// /// This does not set the fragmentation field in the packet header, MAC, or encrypt the packet. The sender @@ -575,6 +479,113 @@ impl Peer { } } + /// Receive, decrypt, authenticate, and process an incoming packet from this peer. + /// + /// If the packet comes in multiple fragments, the fragments slice should contain all + /// those fragments after the main packet header and first chunk. + /// + /// This returns true if the packet decrypted and passed authentication. + pub(crate) async fn receive(&self, node: &Node, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc>, packet_header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option]) { + if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) { + //let mut payload = unsafe { PacketBuffer::new_without_memzero() }; + let mut payload = PacketBuffer::new(); + + // First try decrypting and authenticating with an ephemeral secret if one is negotiated. + let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_symmetric_key.read().as_ref() { + if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, packet_header, fragments, &mut payload) { + // Decryption successful with ephemeral secret + (true, message_id) + } else { + // Decryption failed with ephemeral secret, which may indicate that it's obsolete. + (false, 0) + } + } else { + // There is no ephemeral secret negotiated (yet?). + (false, 0) + }; + + // If forward_secrecy is false it means the ephemeral key failed. Try decrypting with the permanent key. + if !forward_secrecy { + payload.clear(); + if let Some(message_id2) = try_aead_decrypt(&self.identity_symmetric_key, packet_frag0_payload_bytes, packet_header, fragments, &mut payload) { + // Decryption successful with static secret. + message_id = message_id2; + } else { + // Packet failed to decrypt using either ephemeral or permament key, reject. + debug_event!(si, "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id)); + return; + } + } + + if let Ok(mut verb) = payload.u8_at(0) { + let extended_authentication = (verb & packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION) != 0; + if extended_authentication { + if payload.len() >= SHA384_HASH_SIZE { + let actual_end_of_payload = payload.len() - SHA384_HASH_SIZE; + let mut hmac = HMACSHA384::new(self.identity_symmetric_key.packet_hmac_key.as_bytes()); + hmac.update(&node.identity.fingerprint); + hmac.update(&self.identity.fingerprint); + hmac.update(&message_id.to_ne_bytes()); + hmac.update(&payload.as_bytes()[..actual_end_of_payload]); + if !hmac.finish().eq(&payload.as_bytes()[actual_end_of_payload..]) { + return; + } + payload.set_size(actual_end_of_payload); + } else { + return; + } + } + + // --------------------------------------------------------------- + // If we made it here it decrypted and passed authentication. + // --------------------------------------------------------------- + + debug_event!(si, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), verbs::name(verb & packet_constants::VERB_MASK), (verb & packet_constants::VERB_MASK) as u32); + + if (verb & packet_constants::VERB_FLAG_COMPRESSED) != 0 { + let mut decompressed_payload: [u8; packet_constants::SIZE_MAX] = unsafe { MaybeUninit::uninit().assume_init() }; + decompressed_payload[0] = verb; + if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) { + payload.set_to(&decompressed_payload[..(dlen + 1)]); + } else { + return; + } + } + + self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); + + let mut path_is_known = false; + for p in self.paths.lock().iter_mut() { + if p.canonical_instance_id == source_path.canonical.canonical_instance_id() { + p.last_receive_time_ticks = time_ticks; + path_is_known = true; + break; + } + } + + // For performance reasons we let VL2 handle packets first. It returns false + // if it didn't handle the packet, in which case it's handled at VL1. This is + // because the most performance critical path is the handling of the ???_FRAME + // verbs, which are in VL2. + verb &= packet_constants::VERB_MASK; // mask off flags + if !ph.handle_packet(self, &source_path, forward_secrecy, extended_authentication, verb, &payload).await { + match verb { + //VERB_VL1_NOP => {} + verbs::VL1_HELLO => self.handle_incoming_hello(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_ERROR => self.handle_incoming_error(si, ph, node, time_ticks, source_path, forward_secrecy, extended_authentication, &payload).await, + verbs::VL1_OK => self.handle_incoming_ok(si, ph, node, time_ticks, source_path, packet_header.hops(), path_is_known, forward_secrecy, extended_authentication, &payload).await, + verbs::VL1_WHOIS => self.handle_incoming_whois(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_RENDEZVOUS => self.handle_incoming_rendezvous(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_ECHO => self.handle_incoming_echo(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(si, node, time_ticks, source_path, &payload).await, + verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(si, node, time_ticks, source_path, &payload).await, + _ => {} + } + } + } + } + } + #[allow(unused)] async fn handle_incoming_hello(&self, si: &SI, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer) {} diff --git a/zerotier-system-service/src/service.rs b/zerotier-system-service/src/service.rs index a433234b7..41ab80d89 100644 --- a/zerotier-system-service/src/service.rs +++ b/zerotier-system-service/src/service.rs @@ -34,7 +34,7 @@ pub struct Service { struct ServiceImpl { pub rt: tokio::runtime::Handle, pub data: DataDir, - pub udp_sockets: tokio::sync::RwLock>, + pub udp_sockets_by_port: tokio::sync::RwLock>, pub num_listeners_per_socket: usize, _core: Option>, } @@ -49,7 +49,7 @@ impl Drop for Service { // This shouldn't have to loop much if at all to acquire the lock, but it might if something // is still completing somewhere in an aborting task. loop { - if let Ok(mut udp_sockets) = self.internal.udp_sockets.try_write() { + if let Ok(mut udp_sockets) = self.internal.udp_sockets_by_port.try_write() { udp_sockets.clear(); break; } @@ -67,7 +67,7 @@ impl Service { let mut si = ServiceImpl { rt, data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?, - udp_sockets: tokio::sync::RwLock::new(HashMap::with_capacity(4)), + udp_sockets_by_port: tokio::sync::RwLock::new(HashMap::with_capacity(4)), num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(), _core: None, }; @@ -93,8 +93,8 @@ impl ServiceImpl { /// Called in udp_binding_task_main() to service a particular UDP port. async fn update_udp_bindings_for_port(self: &Arc, port: u16, interface_prefix_blacklist: &Vec, cidr_blacklist: &Vec) -> Option> { for ns in { - let mut udp_sockets = self.udp_sockets.write().await; - let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port)); + let mut udp_sockets_by_port = self.udp_sockets_by_port.write().await; + let bp = udp_sockets_by_port.entry(port).or_insert_with(|| BoundUdpPort::new(port)); let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist); if bp.sockets.is_empty() { return Some(errors); @@ -196,12 +196,11 @@ impl SystemInterface for ServiceImpl { } } - // Otherwise we try to send from one socket on every interface or from the specified interface. - // This path only happens when the core is trying new endpoints. The fast path is for most packets. - let sockets = self.udp_sockets.read().await; - if !sockets.is_empty() { + let udp_sockets_by_port = self.udp_sockets_by_port.read().await; + if !udp_sockets_by_port.is_empty() { if let Some(specific_interface) = local_interface { - for (_, p) in sockets.iter() { + // Send from a specific interface if that interface is specified. + for (_, p) in udp_sockets_by_port.iter() { if !p.sockets.is_empty() { let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); for _ in 0..p.sockets.len() { @@ -216,11 +215,9 @@ impl SystemInterface for ServiceImpl { } } } else { - let bound_ports: Vec<&u16> = sockets.keys().collect(); + // Otherwise send from one socket on every interface. let mut sent_on_interfaces = HashSet::with_capacity(4); - let rn = random::xorshift64_random() as usize; - for i in 0..bound_ports.len() { - let p = sockets.get(*bound_ports.get(rn.wrapping_add(i) % bound_ports.len()).unwrap()).unwrap(); + for p in udp_sockets_by_port.values() { if !p.sockets.is_empty() { let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); for _ in 0..p.sockets.len() {