diff --git a/controller/src/controller.rs b/controller/src/controller.rs index b9e2d77ed..13ea8f821 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -47,7 +47,7 @@ impl Controller { } impl PathFilter for Controller { - fn check_path( + fn should_use_physical_path( &self, _id: &Identity, _endpoint: &zerotier_network_hypervisor::vl1::Endpoint, diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index e55e21a6e..f33d299fc 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -90,10 +90,10 @@ pub trait NodeStorage: Sync + Send + 'static { fn save_node_identity(&self, id: &Identity); } -/// Trait to be implemented to provide path hints and a filter to approve physical paths +/// Trait to be implemented to provide path hints and a filter to approve physical paths. pub trait PathFilter: Sync + Send + 'static { /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. - fn check_path( + fn should_use_physical_path( &self, id: &Identity, endpoint: &Endpoint, @@ -190,6 +190,7 @@ struct RootInfo { online: bool, } +/// Interval gate objects used ot fire off background tasks, see do_background_tasks(). #[derive(Default)] struct BackgroundTaskIntervals { root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, @@ -200,8 +201,9 @@ struct BackgroundTaskIntervals { whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, } +/// WHOIS requests and any packets that are waiting on them to be decrypted and authenticated. struct WhoisQueueItem { - waiting_packets: RingBuffer<(Weak>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, + v1_proto_waiting_packets: RingBuffer<(Weak>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, last_retry_time: i64, retry_count: u16, } @@ -295,65 +297,6 @@ impl Node { self.roots.read().online } - fn update_best_root(&self, host_system: &HostSystemImpl, time_ticks: i64) { - let roots = self.roots.read(); - - // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison - // this is a proxy for latency and also causes roots that fail to reply to drop out quickly. - let mut best = None; - let mut latest_hello_reply = 0; - for (r, _) in roots.roots.iter() { - let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed); - if t > latest_hello_reply { - latest_hello_reply = t; - let _ = best.insert(r); - } - } - - if let Some(best) = best { - let mut best_root = self.best_root.write(); - if let Some(best_root) = best_root.as_mut() { - if !Arc::ptr_eq(best_root, best) { - debug_event!( - host_system, - "[vl1] new best root: {} (replaced {})", - best.identity.address.to_string(), - best_root.identity.address.to_string() - ); - *best_root = best.clone(); - } - } else { - debug_event!( - host_system, - "[vl1] new best root: {} (was empty)", - best.identity.address.to_string() - ); - let _ = best_root.insert(best.clone()); - } - } else { - if let Some(old_best) = self.best_root.write().take() { - debug_event!( - host_system, - "[vl1] new best root: NONE (replaced {})", - old_best.identity.address.to_string() - ); - } - } - - // Determine if the node is online by whether there is a currently reachable root. - if (time_ticks - latest_hello_reply) < (ROOT_HELLO_INTERVAL * 2) && best.is_some() { - if !roots.online { - drop(roots); - self.roots.write().online = true; - host_system.event(Event::Online(true)); - } - } else if roots.online { - drop(roots); - self.roots.write().online = false; - host_system.event(Event::Online(false)); - } - } - pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { const INTERVAL_MS: i64 = 1000; const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64); @@ -514,7 +457,65 @@ impl Node { } } - self.update_best_root(host_system, time_ticks); + { + let roots = self.roots.read(); + + // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison + // this is a proxy for latency and also causes roots that fail to reply to drop out quickly. + let mut best = None; + let mut latest_hello_reply = 0; + for (r, _) in roots.roots.iter() { + let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed); + if t > latest_hello_reply { + latest_hello_reply = t; + let _ = best.insert(r); + } + } + + if let Some(best) = best { + let best_root = self.best_root.upgradable_read(); + if best_root.as_ref().map_or(true, |br| !Arc::ptr_eq(&br, best)) { + let mut best_root = RwLockUpgradableReadGuard::upgrade(best_root); + if let Some(best_root) = best_root.as_mut() { + debug_event!( + host_system, + "[vl1] selected new best root: {} (replaced {})", + best.identity.address.to_string(), + best_root.identity.address.to_string() + ); + *best_root = best.clone(); + } else { + debug_event!( + host_system, + "[vl1] selected new best root: {} (was empty)", + best.identity.address.to_string() + ); + let _ = best_root.insert(best.clone()); + } + } + } else { + if let Some(old_best) = self.best_root.write().take() { + debug_event!( + host_system, + "[vl1] selected new best root: NONE (replaced {})", + old_best.identity.address.to_string() + ); + } + } + + // Determine if the node is online by whether there is a currently reachable root. + if (time_ticks - latest_hello_reply) < (ROOT_HELLO_INTERVAL * 2) && best.is_some() { + if !roots.online { + drop(roots); + self.roots.write().online = true; + host_system.event(Event::Online(true)); + } + } else if roots.online { + drop(roots); + self.roots.write().online = false; + host_system.event(Event::Online(false)); + } + } } // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint @@ -622,17 +623,22 @@ impl Node { source_local_socket: &HostSystemImpl::LocalSocket, source_local_interface: &HostSystemImpl::LocalInterface, time_ticks: i64, - mut data: PooledPacketBuffer, + mut packet: PooledPacketBuffer, ) { debug_event!( host_system, "[vl1] {} -> #{} {}->{} length {} (on socket {}@{})", source_endpoint.to_string(), - data.bytes_fixed_at::<8>(0) + packet + .bytes_fixed_at::<8>(0) .map_or("????????????????".into(), |pid| hex::to_string(pid)), - data.bytes_fixed_at::<5>(13).map_or("??????????".into(), |src| hex::to_string(src)), - data.bytes_fixed_at::<5>(8).map_or("??????????".into(), |dest| hex::to_string(dest)), - data.len(), + packet + .bytes_fixed_at::<5>(13) + .map_or("??????????".into(), |src| hex::to_string(src)), + packet + .bytes_fixed_at::<5>(8) + .map_or("??????????".into(), |dest| hex::to_string(dest)), + packet.len(), source_local_socket.to_string(), source_local_interface.to_string() ); @@ -642,12 +648,12 @@ impl Node { // and by having 0xffffffffffff be the "nil" session ID for session init packets. ZSSP // is the new V2 Noise-based forward-secure transport protocol. What follows below this // is legacy handling of the old v1 protocol. - if data.u8_at(8).map_or(false, |x| x == 0xff) { + if packet.u8_at(8).map_or(false, |x| x == 0xff) { todo!(); } // Legacy ZeroTier V1 packet handling - if let Ok(fragment_header) = data.struct_mut_at::(0) { + if let Ok(fragment_header) = packet.struct_mut_at::(0) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { // Packet is addressed to this node. @@ -671,7 +677,7 @@ impl Node { fragment_header.packet_id(), fragment_header.fragment_no(), fragment_header.total_fragments(), - data, + packet, time_ticks, ) { if let Some(frag0) = assembled_packet.frags[0].as_ref() { @@ -681,7 +687,7 @@ impl Node { if let Ok(packet_header) = frag0.struct_at::(0) { if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.receive( + peer.v1_proto_receive( self, host_system, inner, @@ -708,11 +714,11 @@ impl Node { self.whois(host_system, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks); } } - } - } - } - } - } else if let Ok(packet_header) = data.struct_at::(0) { + } // else source address invalid + } // else header incomplete + } // else reassembly failed (in a way that shouldn't be possible) + } // else packet not fully assembled yet + } else if let Ok(packet_header) = packet.struct_at::(0) { debug_event!( host_system, "[vl1] [v1] #{:0>16x} is unfragmented", @@ -721,18 +727,19 @@ impl Node { if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]); + peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, packet.as_ref(), &[]); } else { - self.whois(host_system, source, Some((Arc::downgrade(&path), data)), time_ticks); + self.whois(host_system, source, Some((Arc::downgrade(&path), packet)), time_ticks); } } - } - } else { - // Packet is addressed somewhere else. + } // else not fragment and header incomplete + } else if self.this_node_is_root() { + // Packet is addressed somewhere else, forward if this node is a root. #[cfg(debug_assertions)] let debug_packet_id; + // Increment and check hop count in packet header, return if max hops exceeded or error. if fragment_header.is_fragment() { #[cfg(debug_assertions)] { @@ -749,7 +756,7 @@ impl Node { debug_event!(host_system, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); return; } - } else if let Ok(packet_header) = data.struct_mut_at::(0) { + } else if let Ok(packet_header) = packet.struct_mut_at::(0) { #[cfg(debug_assertions)] { debug_packet_id = u64::from_be_bytes(packet_header.id); @@ -774,12 +781,22 @@ impl Node { } if let Some(peer) = self.peer(dest) { - // TODO: SHOULD we forward? Need a way to check. - peer.forward(host_system, time_ticks, data.as_ref()); - #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id); + if let Some(forward_path) = peer.direct_path() { + host_system.wire_send( + &forward_path.endpoint, + Some(&forward_path.local_socket), + Some(&forward_path.local_interface), + packet.as_bytes(), + 0, + ); + + peer.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); + + #[cfg(debug_assertions)] + debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id); + } } - } + } // else not for this node and shouldn't be forwarded } } } @@ -796,12 +813,12 @@ impl Node { { let mut whois_queue = self.whois_queue.lock(); let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem:: { - waiting_packets: RingBuffer::new(), + v1_proto_waiting_packets: RingBuffer::new(), last_retry_time: 0, retry_count: 0, }); if let Some(p) = waiting_packet { - qi.waiting_packets.add(p); + qi.v1_proto_waiting_packets.add(p); } if qi.retry_count > 0 { return; @@ -859,10 +876,10 @@ impl Node { .and_then(|peer| Some(peers.entry(address).or_insert(peer).clone())) }) { drop(peers); - for p in qi.waiting_packets.iter() { + for p in qi.v1_proto_waiting_packets.iter() { if let Some(path) = p.0.upgrade() { if let Ok(packet_header) = p.1.struct_at::(0) { - peer.receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]); + peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]); } } } @@ -1070,7 +1087,7 @@ pub struct DummyPathFilter; impl PathFilter for DummyPathFilter { #[inline(always)] - fn check_path( + fn should_use_physical_path( &self, _id: &Identity, _endpoint: &Endpoint, diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index ea51187cc..e1ff660a3 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -106,12 +106,6 @@ impl Peer { } } - /// Get the next message ID for sending a message to this peer. - #[inline(always)] - pub(crate) fn next_message_id(&self) -> MessageId { - self.message_id_counter.fetch_add(1, Ordering::SeqCst) - } - /// Get current best path or None if there are no direct paths to this peer. pub fn direct_path(&self) -> Option>> { for p in self.paths.lock().iter() { @@ -192,13 +186,24 @@ impl Peer { prioritize_paths(&mut paths); } + #[inline(always)] + pub(crate) fn v1_proto_next_message_id(&self) -> MessageId { + self.message_id_counter.fetch_add(1, Ordering::SeqCst) + } + /// Called every SERVICE_INTERVAL_MS by the background service loop in Node. pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool { + // Prune dead paths and sort in descending order of quality. { let mut paths = self.paths.lock(); paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); + if paths.capacity() > 16 { + paths.shrink_to_fit(); + } prioritize_paths(&mut paths); } + + // Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs). self.remote_node_info .write() .reported_local_endpoints @@ -206,7 +211,8 @@ impl Peer { (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME } - fn internal_send( + /// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed. + fn v1_proto_internal_send( &self, host_system: &HostSystemImpl, endpoint: &Endpoint, @@ -280,7 +286,8 @@ impl Peer { }; let max_fragment_size = path.endpoint.max_fragment_size(); - if self.remote_node_info.read().remote_protocol_version >= 12 { + + if self.remote_node_info.read().remote_protocol_version >= 12 || self.identity.p384.is_some() { let flags_cipher_hops = if packet.len() > max_fragment_size { v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV } else { @@ -288,7 +295,7 @@ impl Peer { }; let mut aes_gmac_siv = self.static_symmetric_key.aes_gmac_siv.get(); - aes_gmac_siv.encrypt_init(&self.next_message_id().to_ne_bytes()); + aes_gmac_siv.encrypt_init(&self.v1_proto_next_message_id().to_ne_bytes()); aes_gmac_siv.encrypt_set_aad(&v1::get_packet_aad_bytes( self.identity.address, node.identity.address, @@ -317,11 +324,11 @@ impl Peer { v1::CIPHER_SALSA2012_POLY1305 }; - let (mut salsa, poly1305_otk) = salsa_poly_create( + let (mut salsa, poly1305_otk) = v1_proto_salsa_poly_create( &self.static_symmetric_key, { let header = packet.struct_mut_at::(0).unwrap(); - header.id = self.next_message_id().to_ne_bytes(); + header.id = self.v1_proto_next_message_id().to_ne_bytes(); header.dest = self.identity.address.to_bytes(); header.src = node.identity.address.to_bytes(); header.flags_cipher_hops = flags_cipher_hops; @@ -339,7 +346,7 @@ impl Peer { packet.as_bytes_mut()[v1::MAC_FIELD_INDEX..(v1::MAC_FIELD_INDEX + 8)].copy_from_slice(&tag[0..8]); } - self.internal_send( + self.v1_proto_internal_send( host_system, &path.endpoint, Some(&path.local_socket), @@ -353,28 +360,6 @@ impl Peer { return true; } - /// Forward a packet to this peer. - /// - /// This is called when we receive a packet not addressed to this node and - /// want to pass it along. - /// - /// This doesn't fragment large packets since fragments are forwarded individually. - /// Intermediates don't need to adjust fragmentation. - pub(crate) fn forward(&self, host_system: &HostSystemImpl, time_ticks: i64, packet: &PacketBuffer) -> bool { - if let Some(path) = self.direct_path() { - host_system.wire_send( - &path.endpoint, - Some(&path.local_socket), - Some(&path.local_interface), - packet.as_bytes(), - 0, - ); - self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); - return true; - } - return false; - } - /// Send a HELLO to this peer. /// /// If explicit_endpoint is not None the packet will be sent directly to this endpoint. @@ -406,7 +391,7 @@ impl Peer { let mut packet = PacketBuffer::new(); { - let message_id = self.next_message_id(); + let message_id = self.v1_proto_next_message_id(); { let f: &mut (v1::PacketHeader, v1::message_component_structs::HelloFixedHeaderFields) = @@ -426,7 +411,7 @@ impl Peer { debug_assert_eq!(packet.len(), 41); assert!(node.identity.write_public(&mut packet, self.identity.p384.is_none()).is_ok()); - let (_, poly1305_key) = salsa_poly_create( + let (_, poly1305_key) = v1_proto_salsa_poly_create( &self.static_symmetric_key, packet.struct_at::(0).unwrap(), packet.len(), @@ -446,7 +431,7 @@ impl Peer { } if let Some(p) = path.as_ref() { - self.internal_send( + self.v1_proto_internal_send( host_system, destination, Some(&p.local_socket), @@ -456,7 +441,7 @@ impl Peer { ); p.log_send_anything(time_ticks); } else { - self.internal_send(host_system, destination, None, None, max_fragment_size, &packet); + self.v1_proto_internal_send(host_system, destination, None, None, max_fragment_size, &packet); } return true; @@ -468,7 +453,7 @@ impl Peer { /// those fragments after the main packet header and first chunk. /// /// This returns true if the packet decrypted and passed authentication. - pub(crate) fn receive( + pub(crate) fn v1_proto_receive( self: &Arc, node: &Node, host_system: &HostSystemImpl, @@ -482,7 +467,7 @@ impl Peer { if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(v1::VERB_INDEX) { let mut payload = PacketBuffer::new(); - let message_id = if let Some(message_id2) = try_aead_decrypt( + let message_id = if let Some(message_id2) = v1_proto_try_aead_decrypt( &self.static_symmetric_key, packet_frag0_payload_bytes, packet_header, @@ -584,7 +569,7 @@ impl Peer { time_ticks: i64, message_id: MessageId, source_path: &Arc>, - hops: u8, + _hops: u8, payload: &PacketBuffer, ) -> PacketHandlerResult { if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { @@ -737,7 +722,7 @@ impl Peer { self.identity.address.to_string(), r.err().unwrap().to_string() ); - break; + return PacketHandlerResult::Error; } } } else { @@ -884,8 +869,7 @@ impl PartialEq for Peer { impl Eq for Peer {} -/// Attempt ZeroTier V1 protocol AEAD packet encryption and MAC validation. Returns message ID on success. -fn try_aead_decrypt( +fn v1_proto_try_aead_decrypt( secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], packet_header: &v1::PacketHeader, @@ -904,7 +888,7 @@ fn try_aead_decrypt( } } - let (mut salsa, poly1305_key) = salsa_poly_create(secret, packet_header, payload.len() + v1::HEADER_SIZE); + let (mut salsa, poly1305_key) = v1_proto_salsa_poly_create(secret, packet_header, payload.len() + v1::HEADER_SIZE); let mac = poly1305::compute(&poly1305_key, &payload.as_bytes()); if mac[0..8].eq(&packet_header.mac) { let message_id = u64::from_ne_bytes(packet_header.id); @@ -968,9 +952,7 @@ fn try_aead_decrypt( } } -/// Create initialized instances of Salsa20/12 and Poly1305 for a packet. -/// (Note that this is a legacy cipher suite.) -fn salsa_poly_create(secret: &SymmetricSecret, header: &v1::PacketHeader, packet_size: usize) -> (Salsa<12>, [u8; 32]) { +fn v1_proto_salsa_poly_create(secret: &SymmetricSecret, header: &v1::PacketHeader, packet_size: usize) -> (Salsa<12>, [u8; 32]) { // Create a per-packet key from the IV, source, destination, and packet size. let mut key: Secret<32> = secret.key.first_n_clone(); let hb = header.as_bytes(); diff --git a/network-hypervisor/src/vl1/rootset.rs b/network-hypervisor/src/vl1/rootset.rs index 4b672f301..f5de2d98a 100644 --- a/network-hypervisor/src/vl1/rootset.rs +++ b/network-hypervisor/src/vl1/rootset.rs @@ -220,8 +220,10 @@ impl RootSet { fn marshal_internal(&self, buf: &mut Buffer, include_signatures: bool) -> Result<(), UnmarshalError> { buf.append_u8(0)?; // version byte for future use + buf.append_varint(self.name.as_bytes().len() as u64)?; buf.append_bytes(self.name.as_bytes())?; + if self.url.is_some() { let url = self.url.as_ref().unwrap().as_bytes(); buf.append_varint(url.len() as u64)?; @@ -229,7 +231,9 @@ impl RootSet { } else { buf.append_varint(0)?; } + buf.append_varint(self.revision)?; + buf.append_varint(self.members.len() as u64)?; for m in self.members.iter() { m.identity.marshal(buf)?; @@ -251,7 +255,9 @@ impl RootSet { buf.append_u8(m.protocol_version)?; buf.append_varint(0)?; // size of additional fields for future use } + buf.append_varint(0)?; // size of additional fields for future use + Ok(()) } }