diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index b598b9d0d..36e58c948 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::hash::Hash; use std::io::Write; use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::Duration; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; @@ -41,6 +41,9 @@ pub trait HostSystem: Sync + Send + 'static { /// A VL1 level event occurred. fn event(&self, event: Event); + /// Get a pooled packet buffer for internal use. + fn get_buffer(&self) -> PooledPacketBuffer; + /// Check a local socket for validity. /// /// This could return false if the socket's interface no longer exists, its port has been @@ -197,9 +200,8 @@ struct BackgroundTaskIntervals { whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, } -#[derive(Default)] -struct WhoisQueueItem { - waiting_packets: RingBuffer, +struct WhoisQueueItem { + waiting_packets: RingBuffer<(Weak>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, retry_count: u16, } @@ -230,7 +232,7 @@ pub struct Node { best_root: RwLock>>>, /// Queue of identities being looked up. - whois_queue: Mutex>, + whois_queue: Mutex>>, } impl Node { @@ -644,6 +646,7 @@ impl Node { if let Ok(fragment_header) = data.struct_mut_at::(0) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { if dest == self.identity.address { + let fragment_header = &*fragment_header; // discard mut let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks); path.log_receive_anything(time_ticks); @@ -683,7 +686,9 @@ impl Node { &assembled_packet.frags[1..(assembled_packet.have as usize)], ); } else { - let mut combined_packet = PooledPacketBuffer::naked(PacketBuffer::new()); + // If WHOIS is needed we need to go ahead and combine the packet so it can be cached + // for later processing when a WHOIS reply comes back. + let mut combined_packet = host_system.get_buffer(); let mut ok = combined_packet.append_bytes(frag0.as_bytes()).is_ok(); for i in 1..assembled_packet.have { if let Some(f) = assembled_packet.frags[i as usize].as_ref() { @@ -694,7 +699,7 @@ impl Node { } } if ok { - self.whois(host_system, source, Some(combined_packet), time_ticks); + self.whois(host_system, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks); } } } @@ -702,7 +707,6 @@ impl Node { } } } else { - #[cfg(debug_assertions)] if let Ok(packet_header) = data.struct_at::(0) { debug_event!( host_system, @@ -714,7 +718,7 @@ impl Node { if let Some(peer) = self.peer(source) { peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]); } else { - self.whois(host_system, source, Some(PooledPacketBuffer::naked(data.clone())), time_ticks); + self.whois(host_system, source, Some((Arc::downgrade(&path), data)), time_ticks); } } } @@ -777,11 +781,19 @@ impl Node { } /// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply. - fn whois(&self, host_system: &HostSystemImpl, address: Address, waiting_packet: Option, time_ticks: i64) { + fn whois( + &self, + host_system: &HostSystemImpl, + address: Address, + waiting_packet: Option<(Weak>, PooledPacketBuffer)>, + time_ticks: i64, + ) { debug_event!(host_system, "[vl1] [v1] WHOIS {}", address.to_string()); { let mut whois_queue = self.whois_queue.lock(); - let qi = whois_queue.entry(address).or_default(); + let qi = whois_queue + .entry(address) + .or_insert_with(|| WhoisQueueItem:: { waiting_packets: RingBuffer::new(), retry_count: 0 }); if let Some(p) = waiting_packet { qi.waiting_packets.add(p); } @@ -817,6 +829,43 @@ impl Node { } } + /// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS). + pub(crate) fn handle_incoming_identity( + &self, + host_system: &HostSystemImpl, + inner: &InnerProtocolImpl, + received_identity: Identity, + time_ticks: i64, + authoritative: bool, + ) { + if authoritative { + if received_identity.validate_identity() { + let mut whois_queue = self.whois_queue.lock(); + if let Some(qi) = whois_queue.get_mut(&received_identity.address) { + let address = received_identity.address; + if inner.should_communicate_with(&received_identity) { + let mut peers = self.peers.write(); + if let Some(peer) = peers.get(&address).cloned().or_else(|| { + Peer::new(&self.identity, received_identity, time_ticks) + .map(|p| Arc::new(p)) + .and_then(|peer| Some(peers.entry(address).or_insert(peer).clone())) + }) { + drop(peers); + for p in qi.waiting_packets.iter() { + if let Some(path) = p.0.upgrade() { + if let Ok(packet_header) = p.1.struct_at::(0) { + peer.receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]); + } + } + } + } + } + whois_queue.remove(&address); + } + } + } + } + /// Get the current "best" root from among this node's trusted roots. pub fn best_root(&self) -> Option>> { self.best_root.read().clone() @@ -891,9 +940,11 @@ impl Node { local_interface: &HostSystemImpl::LocalInterface, time_ticks: i64, ) -> Arc> { - if let Some(path) = self.paths.read().get(&PathKey::Ref(ep, local_socket)) { + let paths = self.paths.read(); + if let Some(path) = paths.get(&PathKey::Ref(ep, local_socket)) { path.clone() } else { + drop(paths); self.paths .write() .entry(PathKey::Copied(ep.clone(), local_socket.clone())) diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index ca87ead77..770c3f582 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -135,7 +135,7 @@ impl Peer { return None; } - pub(crate) fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc>, time_ticks: i64) { + fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc>, time_ticks: i64) { let mut paths = self.paths.lock(); match &new_path.endpoint { @@ -259,8 +259,6 @@ impl Peer { /// /// This will go directly if there is an active path, or otherwise indirectly /// via a root or some other route. - /// - /// It encrypts and sets the MAC and cipher fields and packet ID and other things. pub(crate) fn send( &self, host_system: &HostSystemImpl, @@ -663,10 +661,11 @@ impl Peer { if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { match ok_header.in_re_verb { verbs::VL1_HELLO => { - if let Ok(ok_hello_fixed_header_fields) = + if let Ok(_ok_hello_fixed_header_fields) = payload.read_struct::(&mut cursor) { if hops == 0 { + debug_event!(host_system, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),); if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) { #[cfg(debug_assertions)] let reported_endpoint2 = reported_endpoint.clone(); @@ -699,8 +698,14 @@ impl Peer { verbs::VL1_WHOIS => { if node.is_peer_root(self) { while cursor < payload.len() { - if let Ok(_whois_response) = Identity::read_bytes(&mut BufferReader::new(payload, &mut cursor)) { - // TODO + if let Ok(received_identity) = Identity::read_bytes(&mut BufferReader::new(payload, &mut cursor)) { + debug_event!( + host_system, + "[vl1] {} OK(WHOIS): {}", + self.identity.address.to_string(), + received_identity.to_string() + ); + node.handle_incoming_identity(host_system, inner, received_identity, time_ticks, true); } else { break; } diff --git a/vl1-service/src/localsocket.rs b/vl1-service/src/localsocket.rs index 4746a3671..9b6b514af 100644 --- a/vl1-service/src/localsocket.rs +++ b/vl1-service/src/localsocket.rs @@ -52,7 +52,7 @@ impl Hash for LocalSocket { impl ToString for LocalSocket { fn to_string(&self) -> String { if let Some(s) = self.0.upgrade() { - s.address.to_string() + s.bind_address.to_string() } else { "(closed socket)".into() } diff --git a/vl1-service/src/settings.rs b/vl1-service/src/settings.rs index e5ba3d8ed..363e96a13 100644 --- a/vl1-service/src/settings.rs +++ b/vl1-service/src/settings.rs @@ -24,7 +24,8 @@ pub struct VL1Settings { impl VL1Settings { #[cfg(target_os = "macos")] - pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 10] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth", "zt", "llw", "anpi"]; + pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 11] = + ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth", "zt", "llw", "anpi", "bridge"]; #[cfg(target_os = "linux")] pub const DEFAULT_PREFIX_BLACKLIST: [&'static str; 5] = ["lo", "tun", "tap", "ipsec", "zt"]; diff --git a/vl1-service/src/sys/udp.rs b/vl1-service/src/sys/udp.rs index 4c88e5358..de9c8d3d2 100644 --- a/vl1-service/src/sys/udp.rs +++ b/vl1-service/src/sys/udp.rs @@ -21,6 +21,9 @@ use zerotier_utils::ms_monotonic; use crate::sys::{getifaddrs, ipv6}; +/// UDP socket receive timeout to allow sockets to close properly on some systems (seconds). +const SOCKET_RECV_TIMEOUT_SECONDS: i64 = 2; + fn socket_read_concurrency() -> usize { const MAX_PER_SOCKET_CONCURRENCY: usize = 8; @@ -60,7 +63,7 @@ pub struct BoundUdpPort { /// A socket bound to a specific interface and IP. pub struct BoundUdpSocket { - pub address: InetAddress, + pub bind_address: InetAddress, pub interface: LocalInterface, last_receive_time: AtomicI64, fd: i32, @@ -92,7 +95,7 @@ impl BoundUdpSocket { #[cfg(unix)] pub fn send(&self, dest: &InetAddress, data: &[u8], packet_ttl: u8) -> bool { - if dest.family() == self.address.family() { + if dest.family() == self.bind_address.family() { let (c_sockaddr, c_addrlen) = dest.c_sockaddr(); if packet_ttl == 0 || !dest.is_ipv4() { unsafe { @@ -183,7 +186,7 @@ impl BoundUdpPort { existing_bindings .entry(s.interface) .or_insert_with(|| HashMap::with_capacity(4)) - .insert(s.address.clone(), s); + .insert(s.bind_address.clone(), s); } let mut errors: Vec<(LocalInterface, InetAddress, std::io::Error)> = Vec::new(); @@ -215,7 +218,7 @@ impl BoundUdpPort { let fd = s.unwrap(); if s.is_ok() { let s = Arc::new(BoundUdpSocket { - address: addr_with_port, + bind_address: addr_with_port, interface: interface.clone(), last_receive_time: AtomicI64::new(i64::MIN), fd, @@ -324,7 +327,7 @@ unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result //assert_ne!(libc::fcntl(s, libc::F_SETFL, libc::O_NONBLOCK), -1); let mut timeo: libc::timeval = std::mem::zeroed(); - timeo.tv_sec = 1; + timeo.tv_sec = SOCKET_RECV_TIMEOUT_SECONDS.as_(); timeo.tv_usec = 0; setsockopt_results |= libc::setsockopt( s, diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index 61ba4d5f7..6a3c9650d 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -226,11 +226,16 @@ impl bool { socket.is_valid() } + #[inline] + fn get_buffer(&self) -> zerotier_network_hypervisor::protocol::PooledPacketBuffer { + self.buffer_pool.get() + } + fn wire_send( &self, endpoint: &Endpoint,