diff --git a/controller/src/handler.rs b/controller/src/handler.rs index 4aed8161c..ab66722c9 100644 --- a/controller/src/handler.rs +++ b/controller/src/handler.rs @@ -1,6 +1,5 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. -use std::any::Any; use std::error::Error; use std::sync::{Arc, Mutex, RwLock, Weak}; @@ -127,9 +126,9 @@ impl InnerProtocol for Handler { fn handle_packet( &self, _host_system: &HostSystemImpl, - _node: &Node, - source: &Arc>, - source_path: &Arc>, + _node: &Node, + source: &Arc, + source_path: &Arc, source_hops: u8, message_id: u64, verb: u8, @@ -193,7 +192,7 @@ impl InnerProtocol for Handler { .await { Result::Ok((result, Some(config))) => { - inner.send_network_config(peer, &config, Some(message_id)); + inner.send_network_config(peer.as_ref(), &config, Some(message_id)); result } Result::Ok((result, None)) => result, @@ -234,9 +233,9 @@ impl InnerProtocol for Handler { fn handle_error( &self, _host_system: &HostSystemImpl, - _node: &Node, - _source: &Arc>, - _source_path: &Arc>, + _node: &Node, + _source: &Arc, + _source_path: &Arc, _source_hops: u8, _message_id: u64, _in_re_verb: u8, @@ -251,9 +250,9 @@ impl InnerProtocol for Handler { fn handle_ok( &self, _host_system: &HostSystemImpl, - _node: &Node, - _source: &Arc>, - _source_path: &Arc>, + _node: &Node, + _source: &Arc, + _source_path: &Arc, _source_hops: u8, _message_id: u64, _in_re_verb: u8, @@ -272,57 +271,53 @@ impl InnerProtocol for Handler { impl Inner { fn send_network_config( &self, - peer: Arc, // hack can go away when Rust has specialization + peer: &Peer, config: &NetworkConfig, in_re_message_id: Option, // None for unsolicited push ) { if let Some(host_system) = self.service.read().unwrap().upgrade() { - if let Some(peer) = peer.downcast_ref::, Handler>>>() { - peer.send( - host_system.as_ref(), - host_system.node(), - None, - ms_monotonic(), - |packet| -> Result<(), OutOfBoundsError> { - if let Some(in_re_message_id) = in_re_message_id { - let ok_header = packet.append_struct_get_mut::()?; - ok_header.verb = protocol::verbs::VL1_OK; - ok_header.in_re_verb = protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST; - ok_header.in_re_message_id = in_re_message_id.to_be_bytes(); + peer.send( + host_system.as_ref(), + host_system.node(), + None, + ms_monotonic(), + |packet| -> Result<(), OutOfBoundsError> { + if let Some(in_re_message_id) = in_re_message_id { + let ok_header = packet.append_struct_get_mut::()?; + ok_header.verb = protocol::verbs::VL1_OK; + ok_header.in_re_verb = protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST; + ok_header.in_re_message_id = in_re_message_id.to_be_bytes(); + } else { + packet.append_u8(protocol::verbs::VL2_VERB_NETWORK_CONFIG)?; + } + + if peer.is_v2() { + todo!() + } else { + let config_data = if let Some(config_dict) = config.v1_proto_to_dictionary(&self.local_identity) { + config_dict.to_bytes() } else { - packet.append_u8(protocol::verbs::VL2_VERB_NETWORK_CONFIG)?; + eprintln!("WARNING: unexpected error serializing network config into V1 format dictionary"); + return Err(OutOfBoundsError); // abort + }; + if config_data.len() > (u16::MAX as usize) { + eprintln!("WARNING: network config is larger than 65536 bytes!"); + return Err(OutOfBoundsError); // abort } - if peer.is_v2() { - todo!() - } else { - let config_data = if let Some(config_dict) = config.v1_proto_to_dictionary(&self.local_identity) { - config_dict.to_bytes() - } else { - eprintln!("WARNING: unexpected error serializing network config into V1 format dictionary"); - return Err(OutOfBoundsError); // abort - }; - if config_data.len() > (u16::MAX as usize) { - eprintln!("WARNING: network config is larger than 65536 bytes!"); - return Err(OutOfBoundsError); // abort - } + packet.append_u64(config.network_id.into())?; + packet.append_u16(config_data.len() as u16)?; + packet.append_bytes(config_data.as_slice())?; - packet.append_u64(config.network_id.into())?; - packet.append_u16(config_data.len() as u16)?; - packet.append_bytes(config_data.as_slice())?; + // TODO: compress - // TODO: compress + // NOTE: V1 supports a bunch of other things like chunking but it was never truly used and is optional. + // Omit it here as it adds overhead. + } - // NOTE: V1 supports a bunch of other things like chunking but it was never truly used and is optional. - // Omit it here as it adds overhead. - } - - Ok(()) - }, - ); - } else { - panic!("HostSystem implementation mismatch with service to which controller is harnessed"); - } + Ok(()) + }, + ); } } diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index 0ad67c97a..d642a97d9 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -24,6 +24,7 @@ use zerotier_utils::error::InvalidParameterError; use zerotier_utils::gate::IntervalGate; use zerotier_utils::hex; use zerotier_utils::marshalable::Marshalable; +use zerotier_utils::pocket::Pocket; use zerotier_utils::ringbuffer::RingBuffer; /// Trait implemented by external code to handle events and provide an interface to the system or application. @@ -35,7 +36,7 @@ pub trait HostSystem: Sync + Send + 'static { type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; /// Type for local system interfaces. - type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized; + type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; /// A VL1 level event occurred. fn event(&self, event: Event); @@ -136,9 +137,9 @@ pub trait InnerProtocol: Sync + Send { fn handle_packet( &self, host_system: &HostSystemImpl, - node: &Node, - source: &Arc>, - source_path: &Arc>, + node: &Node, + source: &Arc, + source_path: &Arc, source_hops: u8, message_id: u64, verb: u8, @@ -149,9 +150,9 @@ pub trait InnerProtocol: Sync + Send { fn handle_error( &self, host_system: &HostSystemImpl, - node: &Node, - source: &Arc>, - source_path: &Arc>, + node: &Node, + source: &Arc, + source_path: &Arc, source_hops: u8, message_id: u64, in_re_verb: u8, @@ -165,9 +166,9 @@ pub trait InnerProtocol: Sync + Send { fn handle_ok( &self, host_system: &HostSystemImpl, - node: &Node, - source: &Arc>, - source_path: &Arc>, + node: &Node, + source: &Arc, + source_path: &Arc, source_hops: u8, message_id: u64, in_re_verb: u8, @@ -183,12 +184,12 @@ pub trait InnerProtocol: Sync + Send { /// How often to check the root cluster definitions against the root list and update. const ROOT_SYNC_INTERVAL_MS: i64 = 1000; -struct RootInfo { +struct RootInfo { /// Root sets to which we are a member. sets: HashMap>, /// Root peers and their statically defined endpoints (from root sets). - roots: HashMap>, Vec>, + roots: HashMap, Vec>, /// If this node is a root, these are the root sets to which it's a member in binary serialized form. /// Set to None if this node is not a root, meaning it doesn't appear in any of its root sets. @@ -213,14 +214,17 @@ struct BackgroundTaskIntervals { } /// WHOIS requests and any packets that are waiting on them to be decrypted and authenticated. -struct WhoisQueueItem { - v1_proto_waiting_packets: RingBuffer<(Weak>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, +struct WhoisQueueItem { + v1_proto_waiting_packets: RingBuffer<(Weak, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, last_retry_time: i64, retry_count: u16, } +const PATH_MAP_SIZE: usize = std::mem::size_of::() + 128], Arc>>(); +type PathMap = HashMap, Arc>; + /// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network. -pub struct Node { +pub struct Node { /// A random ID generated to identify this particular running instance. /// /// This can be used to implement multi-homing by allowing remote nodes to distinguish instances @@ -234,23 +238,23 @@ pub struct Node { intervals: Mutex, /// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use. - paths: RwLock, Arc>>>, + paths: RwLock>, /// Peers with which we are currently communicating. - peers: RwLock>>>, + peers: RwLock>>, /// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions. - roots: RwLock>, + roots: RwLock, /// Current best root. - best_root: RwLock>>>, + best_root: RwLock>>, /// Queue of identities being looked up. - whois_queue: Mutex>>, + whois_queue: Mutex>, } -impl Node { - pub fn new( +impl Node { + pub fn new( host_system: &HostSystemImpl, storage: &NodeStorageImpl, auto_generate_identity: bool, @@ -286,7 +290,7 @@ impl Node { instance_id: random::get_bytes_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), - paths: RwLock::new(HashMap::new()), + paths: RwLock::new(Pocket::new(PathMap::::new())), peers: RwLock::new(HashMap::new()), roots: RwLock::new(RootInfo { sets: HashMap::new(), @@ -300,7 +304,7 @@ impl Node { }) } - pub fn peer(&self, a: Address) -> Option>> { + pub fn peer(&self, a: Address) -> Option> { self.peers.read().unwrap().get(&a).cloned() } @@ -309,12 +313,12 @@ impl Node { } /// Get the current "best" root from among this node's trusted roots. - pub fn best_root(&self) -> Option>> { + pub fn best_root(&self) -> Option> { self.best_root.read().unwrap().clone() } /// Check whether a peer is a root according to any root set trusted by this node. - pub fn is_peer_root(&self, peer: &Peer) -> bool { + pub fn is_peer_root(&self, peer: &Peer) -> bool { self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity)) } @@ -360,7 +364,7 @@ impl Node { self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect() } - pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { + pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { const INTERVAL_MS: i64 = 1000; const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64); let time_ticks = host_system.time_ticks(); @@ -477,7 +481,7 @@ impl Node { if let Some(peer) = peers.get(&m.identity.address) { new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); } else { - if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), time_ticks) { + if let Some(peer) = Peer::new(&self.identity, m.identity.clone(), time_ticks) { drop(peers); new_roots.insert( self.peers @@ -636,7 +640,7 @@ impl Node { let mut need_keepalive = Vec::new(); // First check all paths in read mode to avoid blocking the entire node. - for (k, path) in self.paths.read().unwrap().iter() { + for (k, path) in self.paths.read().unwrap().get::>().iter() { if host_system.local_socket_is_valid(k.local_socket()) { match path.service(time_ticks) { PathServiceResult::Ok => {} @@ -650,13 +654,19 @@ impl Node { // Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking. for dp in dead_paths.iter() { - self.paths.write().unwrap().remove(dp); + self.paths.write().unwrap().get_mut::>().remove(dp); } // Finally run keepalive sends as a batch. let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance for p in need_keepalive.iter() { - host_system.wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &keepalive_buf, 0); + host_system.wire_send( + &p.endpoint, + Some(p.local_socket::()), + Some(p.local_interface::()), + &keepalive_buf, + 0, + ); } } @@ -683,7 +693,7 @@ impl Node { INTERVAL } - pub fn handle_incoming_physical_packet( + pub fn handle_incoming_physical_packet( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -727,7 +737,8 @@ impl Node { if dest == self.identity.address { let fragment_header = &*fragment_header; // discard mut - let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks); + let path = + self.canonical_path::(source_endpoint, source_local_socket, source_local_interface, time_ticks); path.log_receive_anything(time_ticks); if fragment_header.is_fragment() { @@ -852,8 +863,8 @@ impl Node { if let Some(forward_path) = peer.direct_path() { host_system.wire_send( &forward_path.endpoint, - Some(&forward_path.local_socket), - Some(&forward_path.local_interface), + Some(forward_path.local_socket::()), + Some(forward_path.local_interface::()), packet.as_bytes(), 0, ); @@ -870,17 +881,17 @@ impl Node { } /// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply. - fn whois( + fn whois( &self, host_system: &HostSystemImpl, address: Address, - waiting_packet: Option<(Weak>, PooledPacketBuffer)>, + waiting_packet: Option<(Weak, PooledPacketBuffer)>, time_ticks: i64, ) { debug_event!(host_system, "[vl1] [v1] WHOIS {}", address.to_string()); { let mut whois_queue = self.whois_queue.lock().unwrap(); - let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem:: { + let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem { v1_proto_waiting_packets: RingBuffer::new(), last_retry_time: 0, retry_count: 0, @@ -899,7 +910,7 @@ impl Node { } /// Send a WHOIS query to the current best root. - fn send_whois(&self, host_system: &HostSystemImpl, mut addresses: &[Address], time_ticks: i64) { + fn send_whois(&self, host_system: &HostSystemImpl, mut addresses: &[Address], time_ticks: i64) { debug_assert!(!addresses.is_empty()); if let Some(root) = self.best_root() { while !addresses.is_empty() { @@ -921,7 +932,7 @@ impl Node { } /// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS). - pub(crate) fn handle_incoming_identity( + pub(crate) fn handle_incoming_identity( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -972,23 +983,31 @@ impl Node { } /// Get the canonical Path object corresponding to an endpoint. - pub(crate) fn canonical_path( + pub(crate) fn canonical_path( &self, ep: &Endpoint, local_socket: &HostSystemImpl::LocalSocket, local_interface: &HostSystemImpl::LocalInterface, time_ticks: i64, - ) -> Arc> { + ) -> Arc { let paths = self.paths.read().unwrap(); - if let Some(path) = paths.get(&PathKey::Ref(ep, local_socket)) { + if let Some(path) = paths.get::>().get(&PathKey::Ref(ep, local_socket)) { path.clone() } else { drop(paths); self.paths .write() .unwrap() + .get_mut::>() .entry(PathKey::Copied(ep.clone(), local_socket.clone())) - .or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks))) + .or_insert_with(|| { + Arc::new(Path::new::( + ep.clone(), + local_socket.clone(), + local_interface.clone(), + time_ticks, + )) + }) .clone() } } @@ -1056,9 +1075,9 @@ impl InnerProtocol for DummyInnerProtocol { fn handle_packet( &self, _host_system: &HostSystemImpl, - _node: &Node, - _source: &Arc>, - _source_path: &Arc>, + _node: &Node, + _source: &Arc, + _source_path: &Arc, _source_hops: u8, _message_id: u64, _verb: u8, @@ -1071,9 +1090,9 @@ impl InnerProtocol for DummyInnerProtocol { fn handle_error( &self, _host_system: &HostSystemImpl, - _node: &Node, - _source: &Arc>, - _source_path: &Arc>, + _node: &Node, + _source: &Arc, + _source_path: &Arc, _source_hops: u8, _message_id: u64, _in_re_verb: u8, @@ -1089,9 +1108,9 @@ impl InnerProtocol for DummyInnerProtocol { fn handle_ok( &self, _host_system: &HostSystemImpl, - _node: &Node, - _source: &Arc>, - _source_path: &Arc>, + _node: &Node, + _source: &Arc, + _source_path: &Arc, _source_hops: u8, _message_id: u64, _in_re_verb: u8, diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index 37eb2b04c..78fdb362a 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -10,6 +10,7 @@ use crate::vl1::endpoint::Endpoint; use crate::vl1::node::*; use zerotier_crypto::random; +use zerotier_utils::pocket::Pocket; use zerotier_utils::NEVER_HAPPENED_TICKS; pub(crate) const SERVICE_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL; @@ -24,19 +25,18 @@ pub(crate) enum PathServiceResult { /// These are maintained in Node and canonicalized so that all unique paths have /// one and only one unique path object. That enables statistics to be tracked /// for them and uniform application of things like keepalives. -pub struct Path { +pub struct Path { pub endpoint: Endpoint, - pub local_socket: HostSystemImpl::LocalSocket, - pub local_interface: HostSystemImpl::LocalInterface, + local_socket: Pocket<64>, + local_interface: Pocket<64>, last_send_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64, create_time_ticks: i64, fragmented_packets: Mutex>, } -impl Path { - #[inline] - pub fn new( +impl Path { + pub(crate) fn new( endpoint: Endpoint, local_socket: HostSystemImpl::LocalSocket, local_interface: HostSystemImpl::LocalInterface, @@ -44,8 +44,8 @@ impl Path { ) -> Self { Self { endpoint, - local_socket, - local_interface, + local_socket: Pocket::new(local_socket), + local_interface: Pocket::new(local_interface), last_send_time_ticks: AtomicI64::new(NEVER_HAPPENED_TICKS), last_receive_time_ticks: AtomicI64::new(NEVER_HAPPENED_TICKS), create_time_ticks: time_ticks, @@ -53,9 +53,18 @@ impl Path { } } + #[inline(always)] + pub(crate) fn local_socket(&self) -> &HostSystemImpl::LocalSocket { + self.local_socket.get() + } + + #[inline(always)] + pub(crate) fn local_interface(&self) -> &HostSystemImpl::LocalInterface { + self.local_socket.get() + } + /// Receive a fragment and return a FragmentedPacket if the entire packet was assembled. /// This returns None if more fragments are needed to assemble the packet. - #[inline] pub(crate) fn receive_fragment( &self, packet_id: u64, @@ -102,7 +111,6 @@ impl Path { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); } - #[inline] pub(crate) fn service(&self, time_ticks: i64) -> PathServiceResult { self.fragmented_packets .lock() diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index 2ee7664c7..43d1a28aa 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -23,11 +23,11 @@ use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000; -pub struct Peer { +pub struct Peer { pub identity: Identity, v1_proto_static_secret: v1::SymmetricSecret, - paths: Mutex>>, + paths: Mutex>, pub(crate) last_send_time_ticks: AtomicI64, pub(crate) last_receive_time_ticks: AtomicI64, @@ -40,8 +40,8 @@ pub struct Peer { remote_node_info: RwLock, } -struct PeerPath { - path: Weak>, +struct PeerPath { + path: Weak, last_receive_time_ticks: i64, } @@ -52,11 +52,11 @@ struct RemoteNodeInfo { } /// Sort a list of paths by quality or priority, with best paths first. -fn prioritize_paths(paths: &mut Vec>) { +fn prioritize_paths(paths: &mut Vec) { paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); } -impl Peer { +impl Peer { /// Create a new peer. /// /// This only returns None if this_node_identity does not have its secrets or if some @@ -112,7 +112,7 @@ impl Peer { /// Get current best path or None if there are no direct paths to this peer. #[inline] - pub fn direct_path(&self) -> Option>> { + pub fn direct_path(&self) -> Option> { for p in self.paths.lock().unwrap().iter() { let pp = p.path.upgrade(); if pp.is_some() { @@ -124,7 +124,7 @@ impl Peer { /// Get either the current best direct path or an indirect path via e.g. a root. #[inline] - pub fn path(&self, node: &Node) -> Option>> { + pub fn path(&self, node: &Node) -> Option> { let direct_path = self.direct_path(); if direct_path.is_some() { return direct_path; @@ -135,7 +135,7 @@ impl Peer { return None; } - fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc>, time_ticks: i64) { + fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc, time_ticks: i64) { let mut paths = self.paths.lock().unwrap(); match &new_path.endpoint { @@ -160,7 +160,7 @@ impl Peer { ); pi.path = Arc::downgrade(new_path); pi.last_receive_time_ticks = time_ticks; - prioritize_paths(&mut paths); + prioritize_paths::(&mut paths); return; } } @@ -185,11 +185,11 @@ impl Peer { self.identity.address.to_string(), new_path.endpoint.to_string() ); - paths.push(PeerPath:: { + paths.push(PeerPath { path: Arc::downgrade(new_path), last_receive_time_ticks: time_ticks, }); - prioritize_paths(&mut paths); + prioritize_paths::(&mut paths); } /// Get the next sequential message ID for use with the V1 transport protocol. @@ -199,7 +199,7 @@ impl Peer { } /// Called every SERVICE_INTERVAL_MS by the background service loop in Node. - pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool { + pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool { // Prune dead paths and sort in descending order of quality. { let mut paths = self.paths.lock().unwrap(); @@ -207,7 +207,7 @@ impl Peer { if paths.capacity() > 16 { paths.shrink_to_fit(); } - prioritize_paths(&mut paths); + prioritize_paths::(&mut paths); } // Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs). @@ -220,7 +220,7 @@ impl Peer { } /// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed. - fn v1_proto_internal_send( + fn v1_proto_internal_send( &self, host_system: &HostSystemImpl, endpoint: &Endpoint, @@ -278,11 +278,11 @@ impl Peer { /// The builder function must append the verb (with any verb flags) and packet payload. If it returns /// an error, the error is returned immediately and the send is aborted. None is returned if the send /// function itself fails for some reason such as no paths being available. - pub fn send Result>( + pub fn send Result>( &self, host_system: &HostSystemImpl, - node: &Node, - path: Option<&Arc>>, + node: &Node, + path: Option<&Arc>, time_ticks: i64, builder_function: BuilderFunction, ) -> Option> { @@ -370,8 +370,8 @@ impl Peer { self.v1_proto_internal_send( host_system, &path.endpoint, - Some(&path.local_socket), - Some(&path.local_interface), + Some(path.local_socket::()), + Some(path.local_interface::()), max_fragment_size, packet, ); @@ -390,10 +390,10 @@ impl Peer { /// Unlike other messages HELLO is sent partially in the clear and always with the long-lived /// static identity key. Authentication in old versions is via Poly1305 and in new versions /// via HMAC-SHA512. - pub(crate) fn send_hello( + pub(crate) fn send_hello( &self, host_system: &HostSystemImpl, - node: &Node, + node: &Node, explicit_endpoint: Option<&Endpoint>, ) -> bool { let mut path = None; @@ -456,8 +456,8 @@ impl Peer { self.v1_proto_internal_send( host_system, destination, - Some(&p.local_socket), - Some(&p.local_interface), + Some(p.local_socket::()), + Some(p.local_interface::()), max_fragment_size, packet, ); @@ -475,13 +475,13 @@ impl Peer { /// those fragments after the main packet header and first chunk. /// /// This returns true if the packet decrypted and passed authentication. - pub(crate) fn v1_proto_receive( + pub(crate) fn v1_proto_receive( self: &Arc, - node: &Node, + node: &Node, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, time_ticks: i64, - source_path: &Arc>, + source_path: &Arc, packet_header: &v1::PacketHeader, frag0: &PacketBuffer, fragments: &[Option], @@ -593,14 +593,14 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_hello( + fn handle_incoming_hello( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, - node: &Node, + node: &Node, time_ticks: i64, message_id: MessageId, - source_path: &Arc>, + source_path: &Arc, payload: &PacketBuffer, ) -> PacketHandlerResult { if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { @@ -654,13 +654,13 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_error( + fn handle_incoming_error( self: &Arc, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, - node: &Node, - _: i64, - source_path: &Arc>, + node: &Node, + _time_ticks: i64, + source_path: &Arc, source_hops: u8, message_id: u64, payload: &PacketBuffer, @@ -691,13 +691,13 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_ok( + fn handle_incoming_ok( self: &Arc, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, - node: &Node, + node: &Node, time_ticks: i64, - source_path: &Arc>, + source_path: &Arc, source_hops: u8, message_id: u64, path_is_known: bool, @@ -791,11 +791,11 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_whois( + fn handle_incoming_whois( self: &Arc, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, - node: &Node, + node: &Node, time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, @@ -824,24 +824,24 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_rendezvous( + fn handle_incoming_rendezvous( self: &Arc, host_system: &HostSystemImpl, - node: &Node, + node: &Node, time_ticks: i64, message_id: MessageId, - source_path: &Arc>, + source_path: &Arc, payload: &PacketBuffer, ) -> PacketHandlerResult { if node.is_peer_root(self) {} return PacketHandlerResult::Ok; } - fn handle_incoming_echo( + fn handle_incoming_echo( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, - node: &Node, + node: &Node, time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, @@ -864,44 +864,44 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_push_direct_paths( + fn handle_incoming_push_direct_paths( self: &Arc, host_system: &HostSystemImpl, - node: &Node, + node: &Node, time_ticks: i64, - source_path: &Arc>, + source_path: &Arc, payload: &PacketBuffer, ) -> PacketHandlerResult { PacketHandlerResult::Ok } - fn handle_incoming_user_message( + fn handle_incoming_user_message( self: &Arc, host_system: &HostSystemImpl, - node: &Node, + node: &Node, time_ticks: i64, - source_path: &Arc>, + source_path: &Arc, payload: &PacketBuffer, ) -> PacketHandlerResult { PacketHandlerResult::Ok } } -impl Hash for Peer { +impl Hash for Peer { #[inline(always)] fn hash(&self, state: &mut H) { state.write_u64(self.identity.address.into()); } } -impl PartialEq for Peer { +impl PartialEq for Peer { #[inline(always)] fn eq(&self, other: &Self) -> bool { self.identity.fingerprint.eq(&other.identity.fingerprint) } } -impl Eq for Peer {} +impl Eq for Peer {} fn v1_proto_try_aead_decrypt( secret: &v1::SymmetricSecret, diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index 4331b055c..5372b2b88 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -14,9 +14,9 @@ impl InnerProtocol for Switch { fn handle_packet( &self, host_system: &HostSystemImpl, - node: &Node, - source: &Arc>, - source_path: &Arc>, + node: &Node, + source: &Arc, + source_path: &Arc, source_hops: u8, message_id: u64, verb: u8, @@ -28,9 +28,9 @@ impl InnerProtocol for Switch { fn handle_error( &self, host_system: &HostSystemImpl, - node: &Node, - source: &Arc>, - source_path: &Arc>, + node: &Node, + source: &Arc, + source_path: &Arc, source_hops: u8, message_id: u64, in_re_verb: u8, @@ -45,9 +45,9 @@ impl InnerProtocol for Switch { fn handle_ok( &self, host_system: &HostSystemImpl, - node: &Node, - source: &Arc>, - source_path: &Arc>, + node: &Node, + source: &Arc, + source_path: &Arc, source_hops: u8, message_id: u64, in_re_verb: u8, diff --git a/utils/src/lib.rs b/utils/src/lib.rs index dd14641b2..4e4d5313d 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -14,6 +14,7 @@ pub mod io; pub mod json; pub mod marshalable; pub mod memory; +pub mod pocket; pub mod pool; pub mod ringbuffer; pub mod ringbuffermap; diff --git a/utils/src/pocket.rs b/utils/src/pocket.rs new file mode 100644 index 000000000..d9ffc2784 --- /dev/null +++ b/utils/src/pocket.rs @@ -0,0 +1,124 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +use std::any::TypeId; +use std::mem::{forget, size_of, MaybeUninit}; +use std::ptr::{drop_in_place, read, write}; + +/// A statically sized container that acts a bit like Box. +/// +/// This is used in a few places to avoid cascades of templates by allowing templated +/// objects to be held generically and accessed only within templated functions. It does so +/// with very low to zero runtime overhead or memory overhead and panics if misused. +/// +/// This will panic if the capacity is too small. If that occurs, it must be enlarged. It will +/// also panic if any of the accessors (other than the try_ versions) are used to try to get +/// a type other than the one it was +pub struct Pocket { + storage: [u8; CAPACITY], + dropper: fn(*mut u8), + data_type: TypeId, +} + +impl Pocket { + #[inline(always)] + pub fn new(x: T) -> Self { + assert!(size_of::() <= CAPACITY); + let mut p = Self { + storage: unsafe { MaybeUninit::uninit().assume_init() }, + dropper: |s: *mut u8| unsafe { + drop_in_place::((*s.cast::()).storage.as_mut_ptr().cast()); + }, + data_type: TypeId::of::(), + }; + unsafe { write(p.storage.as_mut_ptr().cast(), x) }; + p + } + + #[inline(always)] + pub fn get(&self) -> &T { + assert_eq!(TypeId::of::(), self.data_type); + unsafe { &*self.storage.as_ptr().cast() } + } + + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + assert_eq!(TypeId::of::(), self.data_type); + unsafe { &mut *self.storage.as_mut_ptr().cast() } + } + + #[inline(always)] + pub fn try_get(&self) -> Option<&T> { + if TypeId::of::() == self.data_type { + Some(unsafe { &*self.storage.as_ptr().cast() }) + } else { + None + } + } + + #[inline(always)] + pub fn try_get_mut(&mut self) -> Option<&mut T> { + if TypeId::of::() == self.data_type { + Some(unsafe { &mut *self.storage.as_mut_ptr().cast() }) + } else { + None + } + } + + #[inline(always)] + pub fn unwrap(self) -> T { + assert_eq!(TypeId::of::(), self.data_type); + let x = unsafe { read(self.storage.as_ptr().cast()) }; + forget(self); + x + } +} + +impl AsRef for Pocket { + #[inline(always)] + fn as_ref(&self) -> &T { + assert_eq!(TypeId::of::(), self.data_type); + unsafe { &*self.storage.as_ptr().cast() } + } +} + +impl AsMut for Pocket { + #[inline(always)] + fn as_mut(&mut self) -> &mut T { + assert_eq!(TypeId::of::(), self.data_type); + unsafe { &mut *self.storage.as_mut_ptr().cast() } + } +} + +impl Drop for Pocket { + #[inline(always)] + fn drop(&mut self) { + (self.dropper)((self as *mut Self).cast()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::rc::Rc; + + #[test] + fn typing_and_life_cycle() { + let test_obj = Rc::new(1i32); + assert_eq!(Rc::strong_count(&test_obj), 1); + let a = Pocket::<32>::new(test_obj.clone()); + let b = Pocket::<32>::new(test_obj.clone()); + let c = Pocket::<32>::new(test_obj.clone()); + assert!(a.get::>().eq(b.get())); + assert!(a.try_get::>().is_some()); + assert!(a.try_get::>().is_none()); + assert_eq!(Rc::strong_count(&test_obj), 4); + drop(a); + assert_eq!(Rc::strong_count(&test_obj), 3); + drop(b); + assert_eq!(Rc::strong_count(&test_obj), 2); + let c = c.unwrap::>(); + assert_eq!(Rc::strong_count(&test_obj), 2); + drop(c); + assert_eq!(Rc::strong_count(&test_obj), 1); + } +} diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index 3e924a05c..3766bfa32 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -36,7 +36,7 @@ pub struct VL1Service< inner: Arc, path_filter: Arc, buffer_pool: Arc, - node_container: Option>, // never None, set in new() + node_container: Option, // never None, set in new() } struct VL1ServiceMutableState { @@ -86,7 +86,7 @@ impl &Node { + pub fn node(&self) -> &Node { debug_assert!(self.node_container.is_some()); unsafe { self.node_container.as_ref().unwrap_unchecked() } } @@ -198,8 +198,8 @@ impl