From f0158ee8ae82b4eb6dec363816e79760f9219314 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 14 Mar 2023 13:30:35 -0400 Subject: [PATCH] Simplify some Rust generic madness. --- {utils/src => attic}/thing.rs | 0 controller/src/controller.rs | 140 ++++++++---------- network-hypervisor/src/vl1/node.rs | 128 ++++++---------- network-hypervisor/src/vl1/path.rs | 31 +--- network-hypervisor/src/vl1/peer.rs | 114 +++++++------- .../src/vl2/multicastauthority.rs | 22 +-- network-hypervisor/src/vl2/switch.rs | 18 +-- utils/src/cast.rs | 33 +++++ utils/src/gate.rs | 33 ----- utils/src/lib.rs | 2 +- vl1-service/src/vl1service.rs | 10 +- 11 files changed, 228 insertions(+), 303 deletions(-) rename {utils/src => attic}/thing.rs (100%) create mode 100644 utils/src/cast.rs diff --git a/utils/src/thing.rs b/attic/thing.rs similarity index 100% rename from utils/src/thing.rs rename to attic/thing.rs diff --git a/controller/src/controller.rs b/controller/src/controller.rs index bbf7ddae1..ebe9fde5c 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -17,6 +17,7 @@ use zerotier_network_hypervisor::vl2::v1::Revocation; use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_utils::blob::Blob; use zerotier_utils::buffer::OutOfBoundsError; +use zerotier_utils::cast::cast_ref; use zerotier_utils::error::InvalidParameterError; use zerotier_utils::reaper::Reaper; use zerotier_utils::tokio; @@ -139,62 +140,56 @@ impl Controller { } /// Compose and send network configuration packet (either V1 or V2) - fn send_network_config( + fn send_network_config( &self, - peer: &Peer, + app: &Application, + node: &Node, + peer: &Peer, config: &NetworkConfig, in_re_message_id: Option, // None for unsolicited push ) { - if let Some(host_system) = self.service.read().unwrap().upgrade() { - peer.send( - host_system.as_ref(), - host_system.node(), - None, - ms_monotonic(), - |packet| -> Result<(), OutOfBoundsError> { - let payload_start = packet.len(); + peer.send(app, node, None, ms_monotonic(), |packet| -> Result<(), OutOfBoundsError> { + let payload_start = packet.len(); - if let Some(in_re_message_id) = in_re_message_id { - let ok_header = packet.append_struct_get_mut::()?; - ok_header.verb = protocol::message_type::VL1_OK; - ok_header.in_re_verb = protocol::message_type::VL2_NETWORK_CONFIG_REQUEST; - ok_header.in_re_message_id = in_re_message_id.to_be_bytes(); - } else { - packet.append_u8(protocol::message_type::VL2_NETWORK_CONFIG)?; - } + if let Some(in_re_message_id) = in_re_message_id { + let ok_header = packet.append_struct_get_mut::()?; + ok_header.verb = protocol::message_type::VL1_OK; + ok_header.in_re_verb = protocol::message_type::VL2_NETWORK_CONFIG_REQUEST; + ok_header.in_re_message_id = in_re_message_id.to_be_bytes(); + } else { + packet.append_u8(protocol::message_type::VL2_NETWORK_CONFIG)?; + } - if peer.is_v2() { - todo!() - } else { - let config_data = if let Some(config_dict) = config.v1_proto_to_dictionary(&self.local_identity) { - config_dict.to_bytes() - } else { - eprintln!("WARNING: unexpected error serializing network config into V1 format dictionary"); - return Err(OutOfBoundsError); // abort - }; - if config_data.len() > (u16::MAX as usize) { - eprintln!("WARNING: network config is larger than 65536 bytes!"); - return Err(OutOfBoundsError); // abort - } + if peer.is_v2() { + todo!() + } else { + let config_data = if let Some(config_dict) = config.v1_proto_to_dictionary(&self.local_identity) { + config_dict.to_bytes() + } else { + eprintln!("WARNING: unexpected error serializing network config into V1 format dictionary"); + return Err(OutOfBoundsError); // abort + }; + if config_data.len() > (u16::MAX as usize) { + eprintln!("WARNING: network config is larger than 65536 bytes!"); + return Err(OutOfBoundsError); // abort + } - packet.append_u64(config.network_id.into())?; - packet.append_u16(config_data.len() as u16)?; - packet.append_bytes(config_data.as_slice())?; + packet.append_u64(config.network_id.into())?; + packet.append_u16(config_data.len() as u16)?; + packet.append_bytes(config_data.as_slice())?; - // TODO: for V1 we may need to introduce use of the chunking mechanism for large configs. - } + // TODO: for V1 we may need to introduce use of the chunking mechanism for large configs. + } - let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]); - packet.set_size(payload_start + new_payload_len); + let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]); + packet.set_size(payload_start + new_payload_len); - Ok(()) - }, - ); - } + Ok(()) + }); } /// Send one or more revocation object(s) to a peer. The provided vector is drained. - fn send_revocations(&self, peer: &Peer, revocations: &mut Vec) { + fn send_revocations(&self, peer: &Peer>, revocations: &mut Vec) { if let Some(host_system) = self.service.read().unwrap().upgrade() { let time_ticks = ms_monotonic(); while !revocations.is_empty() { @@ -496,20 +491,12 @@ impl Controller { } impl InnerProtocolLayer for Controller { - #[inline(always)] - fn should_respond_to(&self, _: &Valid) -> bool { - // Controllers always have to establish sessions to process requests. We don't really know if - // a member is relevant until we have looked up both the network and the member, since whether - // or not to "learn" unknown members is a network level option. - true - } - - fn handle_packet( + fn handle_packet( &self, - host_system: &HostSystemImpl, - _: &Node, - source: &Arc, - source_path: &Arc, + app: &Application, + node: &Node, + source: &Arc>, + source_path: &Arc>, source_hops: u8, message_id: u64, verb: u8, @@ -518,10 +505,6 @@ impl InnerProtocolLayer for Controller { ) -> PacketHandlerResult { match verb { protocol::message_type::VL2_NETWORK_CONFIG_REQUEST => { - if !self.should_respond_to(&source.identity) { - return PacketHandlerResult::Ok; // handled and ignored - } - let network_id = payload.read_u64(&mut cursor); if network_id.is_err() { return PacketHandlerResult::Error; @@ -533,7 +516,7 @@ impl InnerProtocolLayer for Controller { let network_id = network_id.unwrap(); debug_event!( - host_system, + app, "[vl2] NETWORK_CONFIG_REQUEST from {}({}) for {:0>16x}", source.identity.address.to_string(), source_path.endpoint.to_string(), @@ -565,7 +548,8 @@ impl InnerProtocolLayer for Controller { let (result, config) = match self2.authorize(&source.identity, network_id, now).await { Result::Ok((result, Some(config))) => { //println!("{}", serde_yaml::to_string(&config).unwrap()); - self2.send_network_config(source.as_ref(), &config, Some(message_id)); + let app = self2.service.read().unwrap().upgrade().unwrap(); + self2.send_network_config(app.as_ref(), app.node(), cast_ref(source.as_ref()).unwrap(), &config, Some(message_id)); (result, Some(config)) } Result::Ok((result, None)) => (result, None), @@ -618,23 +602,21 @@ impl InnerProtocolLayer for Controller { } protocol::message_type::VL2_MULTICAST_GATHER => { - if let Some(service) = self.service.read().unwrap().upgrade() { - let auth = self.recently_authorized.read().unwrap(); - let time_ticks = ms_monotonic(); - self.multicast_authority.handle_vl2_multicast_gather( - |network_id, identity| { - auth.get(&identity.fingerprint) - .map_or(false, |t| t.get(&network_id).map_or(false, |t| *t > time_ticks)) - }, - time_ticks, - service.as_ref(), - service.node(), - source, - message_id, - payload, - cursor, - ); - } + let auth = self.recently_authorized.read().unwrap(); + let time_ticks = ms_monotonic(); + self.multicast_authority.handle_vl2_multicast_gather( + |network_id, identity| { + auth.get(&identity.fingerprint) + .map_or(false, |t| t.get(&network_id).map_or(false, |t| *t > time_ticks)) + }, + time_ticks, + app, + node, + source, + message_id, + payload, + cursor, + ); PacketHandlerResult::Ok } diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index 0118af34a..e02c65dc0 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -25,13 +25,12 @@ use zerotier_utils::gate::IntervalGate; use zerotier_utils::hex; use zerotier_utils::marshalable::Marshalable; use zerotier_utils::ringbuffer::RingBuffer; -use zerotier_utils::thing::Thing; /// Interface trait to be implemented by code that's using the ZeroTier network hypervisor. /// /// This is analogous to a C struct full of function pointers to callbacks along with some /// associated type definitions. -pub trait ApplicationLayer: Sync + Send { +pub trait ApplicationLayer: Sync + Send + 'static { /// Type for local system sockets. type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; @@ -56,6 +55,9 @@ pub trait ApplicationLayer: Sync + Send { /// unbound, etc. fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool; + /// Check if this node should respond to messages from a given peer at all. + fn should_respond_to(&self, id: &Valid) -> bool; + /// Called to send a packet over the physical network (virtual -> physical). /// /// This sends with UDP-like semantics. It should do whatever best effort it can and return. @@ -130,13 +132,6 @@ pub enum PacketHandlerResult { /// it could also be implemented for testing or "off label" use of VL1 to carry different protocols. #[allow(unused)] pub trait InnerProtocolLayer: Sync + Send { - /// Check if this node should respond to messages from a given peer at all. - /// - /// The default implementation always returns true. - fn should_respond_to(&self, id: &Valid) -> bool { - true - } - /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). @@ -144,9 +139,9 @@ pub trait InnerProtocolLayer: Sync + Send { fn handle_packet( &self, app: &Application, - node: &Node, - source: &Arc, - source_path: &Arc, + node: &Node, + source: &Arc>, + source_path: &Arc>, source_hops: u8, message_id: u64, verb: u8, @@ -161,9 +156,9 @@ pub trait InnerProtocolLayer: Sync + Send { fn handle_error( &self, app: &Application, - node: &Node, - source: &Arc, - source_path: &Arc, + node: &Node, + source: &Arc>, + source_path: &Arc>, source_hops: u8, message_id: u64, in_re_verb: u8, @@ -180,9 +175,9 @@ pub trait InnerProtocolLayer: Sync + Send { fn handle_ok( &self, app: &Application, - node: &Node, - source: &Arc, - source_path: &Arc, + node: &Node, + source: &Arc>, + source_path: &Arc>, source_hops: u8, message_id: u64, in_re_verb: u8, @@ -194,12 +189,12 @@ pub trait InnerProtocolLayer: Sync + Send { } } -struct RootInfo { +struct RootInfo { /// Root sets to which we are a member. sets: HashMap>, /// Root peers and their statically defined endpoints (from root sets). - roots: HashMap, Vec>, + roots: HashMap>, Vec>, /// If this node is a root, these are the root sets to which it's a member in binary serialized form. /// Set to None if this node is not a root, meaning it doesn't appear in any of its root sets. @@ -225,17 +220,15 @@ struct BackgroundTaskIntervals { whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, } -struct WhoisQueueItem { - v1_proto_waiting_packets: RingBuffer<(Weak, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, +struct WhoisQueueItem { + v1_proto_waiting_packets: + RingBuffer<(Weak>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, last_retry_time: i64, retry_count: u16, } -const PATH_MAP_SIZE: usize = std::mem::size_of::() + 128], Arc>>(); -type PathMap = HashMap, Arc>; - /// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network. -pub struct Node { +pub struct Node { /// A random ID generated to identify this particular running instance. pub instance_id: [u8; 16], @@ -246,27 +239,23 @@ pub struct Node { intervals: Mutex, /// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use. - paths: RwLock>, // holds a PathMap<> but as a Thing<> to hide ApplicationLayer template parameter + paths: RwLock, Arc>>>, /// Peers with which we are currently communicating. - peers: RwLock>>, + peers: RwLock>>>, /// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions. - roots: RwLock, + roots: RwLock>, /// Current best root. - best_root: RwLock>>, + best_root: RwLock>>>, /// Queue of identities being looked up. - whois_queue: Mutex>, + whois_queue: Mutex>>, } -impl Node { - pub fn new( - app: &Application, - auto_generate_identity: bool, - auto_upgrade_identity: bool, - ) -> Result { +impl Node { + pub fn new(app: &Application, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { let mut id = { let id = app.load_node_identity(); if id.is_none() { @@ -297,7 +286,7 @@ impl Node { instance_id: random::get_bytes_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), - paths: RwLock::new(Thing::new(PathMap::::new())), + paths: RwLock::new(HashMap::new()), peers: RwLock::new(HashMap::new()), roots: RwLock::new(RootInfo { sets: HashMap::new(), @@ -312,7 +301,7 @@ impl Node { } #[inline] - pub fn peer(&self, a: Address) -> Option> { + pub fn peer(&self, a: Address) -> Option>> { self.peers.read().unwrap().get(&a).cloned() } @@ -323,13 +312,13 @@ impl Node { /// Get the current "best" root from among this node's trusted roots. #[inline] - pub fn best_root(&self) -> Option> { + pub fn best_root(&self) -> Option>> { self.best_root.read().unwrap().clone() } /// Check whether a peer is a root according to any root set trusted by this node. #[inline] - pub fn is_peer_root(&self, peer: &Peer) -> bool { + pub fn is_peer_root(&self, peer: &Peer) -> bool { self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity)) } @@ -380,7 +369,7 @@ impl Node { self.roots.read().unwrap().sets.values().cloned().map(|s| s.remove_typestate()).collect() } - pub fn do_background_tasks(&self, app: &Application) -> Duration { + pub fn do_background_tasks(&self, app: &Application) -> Duration { const INTERVAL_MS: i64 = 1000; const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64); let time_ticks = app.time_ticks(); @@ -611,7 +600,7 @@ impl Node { let mut need_keepalive = Vec::new(); // First check all paths in read mode to avoid blocking the entire node. - for (k, path) in self.paths.read().unwrap().get::>().iter() { + for (k, path) in self.paths.read().unwrap().iter() { if app.local_socket_is_valid(k.local_socket()) { match path.service(time_ticks) { PathServiceResult::Ok => {} @@ -625,19 +614,13 @@ impl Node { // Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking. for dp in dead_paths.iter() { - self.paths.write().unwrap().get_mut::>().remove(dp); + self.paths.write().unwrap().remove(dp); } // Finally run keepalive sends as a batch. let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance for p in need_keepalive.iter() { - app.wire_send( - &p.endpoint, - Some(p.local_socket::()), - Some(p.local_interface::()), - &keepalive_buf, - 0, - ); + app.wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &keepalive_buf, 0); } } @@ -663,7 +646,7 @@ impl Node { INTERVAL } - pub fn handle_incoming_physical_packet( + pub fn handle_incoming_physical_packet( &self, app: &Application, inner: &Inner, @@ -685,14 +668,7 @@ impl Node { source_local_interface.to_string() ); - // An 0xff value at byte [8] means this is a ZSSP packet. This is accomplished via the - // backward compatibility hack of always having 0xff at byte [4] of 6-byte session IDs - // and by having 0xffffffffffff be the "nil" session ID for session init packets. ZSSP - // is the new V2 Noise-based forward-secure transport protocol. What follows below this - // is legacy handling of the old v1 protocol. - if packet.u8_at(8).map_or(false, |x| x == 0xff) { - todo!(); - } + // TODO: detect inbound ZSSP sessions, handle ZSSP mode. // Legacy ZeroTier V1 packet handling if let Ok(fragment_header) = packet.struct_mut_at::(0) { @@ -701,7 +677,7 @@ impl Node { if dest == self.identity.address { let fragment_header = &*fragment_header; // discard mut - let path = self.canonical_path::(source_endpoint, source_local_socket, source_local_interface, time_ticks); + let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks); path.log_receive_anything(time_ticks); if fragment_header.is_fragment() { @@ -816,8 +792,8 @@ impl Node { if let Some(forward_path) = peer.direct_path() { app.wire_send( &forward_path.endpoint, - Some(forward_path.local_socket::()), - Some(forward_path.local_interface::()), + Some(&forward_path.local_socket), + Some(&forward_path.local_interface), packet.as_bytes(), 0, ); @@ -834,11 +810,11 @@ impl Node { } /// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply. - fn whois( + fn whois( &self, app: &Application, address: Address, - waiting_packet: Option<(Weak, PooledPacketBuffer)>, + waiting_packet: Option<(Weak>, PooledPacketBuffer)>, time_ticks: i64, ) { { @@ -862,7 +838,7 @@ impl Node { } /// Send a WHOIS query to the current best root. - fn send_whois(&self, app: &Application, mut addresses: &[Address], time_ticks: i64) { + fn send_whois(&self, app: &Application, mut addresses: &[Address], time_ticks: i64) { debug_assert!(!addresses.is_empty()); debug_event!(app, "[vl1] [v1] sending WHOIS for {}", { let mut tmp = String::new(); @@ -894,7 +870,7 @@ impl Node { } /// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS). - pub(crate) fn handle_incoming_identity( + pub(crate) fn handle_incoming_identity( &self, app: &Application, inner: &Inner, @@ -907,7 +883,7 @@ impl Node { let mut whois_queue = self.whois_queue.lock().unwrap(); if let Some(qi) = whois_queue.get_mut(&received_identity.address) { let address = received_identity.address; - if inner.should_respond_to(&received_identity) { + if app.should_respond_to(&received_identity) { let mut peers = self.peers.write().unwrap(); if let Some(peer) = peers.get(&address).cloned().or_else(|| { Peer::new(&self.identity, received_identity, time_ticks) @@ -946,31 +922,23 @@ impl Node { } /// Get the canonical Path object corresponding to an endpoint. - pub(crate) fn canonical_path( + pub(crate) fn canonical_path( &self, ep: &Endpoint, local_socket: &Application::LocalSocket, local_interface: &Application::LocalInterface, time_ticks: i64, - ) -> Arc { + ) -> Arc> { let paths = self.paths.read().unwrap(); - if let Some(path) = paths.get::>().get(&PathKey::Ref(ep, local_socket)) { + if let Some(path) = paths.get(&PathKey::Ref(ep, local_socket)) { path.clone() } else { drop(paths); self.paths .write() .unwrap() - .get_mut::>() .entry(PathKey::Copied(ep.clone(), local_socket.clone())) - .or_insert_with(|| { - Arc::new(Path::new::( - ep.clone(), - local_socket.clone(), - local_interface.clone(), - time_ticks, - )) - }) + .or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks))) .clone() } } diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index 5a59364ec..da32ae857 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -7,10 +7,8 @@ use std::sync::Mutex; use crate::protocol; use crate::vl1::endpoint::Endpoint; -use crate::vl1::node::*; use zerotier_crypto::random; -use zerotier_utils::thing::Thing; use zerotier_utils::NEVER_HAPPENED_TICKS; pub(crate) const SERVICE_INTERVAL_MS: i64 = protocol::PATH_KEEPALIVE_INTERVAL; @@ -26,27 +24,22 @@ pub(crate) enum PathServiceResult { /// These are maintained in Node and canonicalized so that all unique paths have /// one and only one unique path object. That enables statistics to be tracked /// for them and uniform application of things like keepalives. -pub struct Path { +pub struct Path { pub endpoint: Endpoint, - local_socket: Thing<64>, - local_interface: Thing<64>, + pub local_socket: LocalSocket, + pub local_interface: LocalInterface, last_send_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64, create_time_ticks: i64, fragmented_packets: Mutex>, } -impl Path { - pub(crate) fn new( - endpoint: Endpoint, - local_socket: Application::LocalSocket, - local_interface: Application::LocalInterface, - time_ticks: i64, - ) -> Self { +impl Path { + pub(crate) fn new(endpoint: Endpoint, local_socket: LocalSocket, local_interface: LocalInterface, time_ticks: i64) -> Self { Self { endpoint, - local_socket: Thing::new(local_socket), // enlarge Thing<> if this panics - local_interface: Thing::new(local_interface), // enlarge Thing<> if this panics + local_socket, + local_interface, last_send_time_ticks: AtomicI64::new(NEVER_HAPPENED_TICKS), last_receive_time_ticks: AtomicI64::new(NEVER_HAPPENED_TICKS), create_time_ticks: time_ticks, @@ -54,16 +47,6 @@ impl Path { } } - #[inline(always)] - pub(crate) fn local_socket(&self) -> &Application::LocalSocket { - self.local_socket.get() - } - - #[inline(always)] - pub(crate) fn local_interface(&self) -> &Application::LocalInterface { - self.local_interface.get() - } - /// Receive a fragment and return a FragmentedPacket if the entire packet was assembled. /// This returns None if more fragments are needed to assemble the packet. pub(crate) fn v1_proto_receive_fragment( diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index 1b2bc3e86..774b5a8b0 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -24,11 +24,11 @@ use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000; -pub struct Peer { +pub struct Peer { pub identity: Valid, v1_proto_static_secret: v1::SymmetricSecret, - paths: Mutex>, + paths: Mutex>>, pub(crate) last_send_time_ticks: AtomicI64, pub(crate) last_receive_time_ticks: AtomicI64, @@ -41,8 +41,8 @@ pub struct Peer { remote_node_info: RwLock, } -struct PeerPath { - path: Weak, +struct PeerPath { + path: Weak>, last_receive_time_ticks: i64, } @@ -53,11 +53,11 @@ struct RemoteNodeInfo { } /// Sort a list of paths by quality or priority, with best paths first. -fn prioritize_paths(paths: &mut Vec) { +fn prioritize_paths(paths: &mut Vec>) { paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); } -impl Peer { +impl Peer { /// Create a new peer. /// /// This only returns None if this_node_identity does not have its secrets or if some @@ -113,7 +113,7 @@ impl Peer { /// Get current best path or None if there are no direct paths to this peer. #[inline] - pub fn direct_path(&self) -> Option> { + pub fn direct_path(&self) -> Option>> { for p in self.paths.lock().unwrap().iter() { let pp = p.path.upgrade(); if pp.is_some() { @@ -125,7 +125,7 @@ impl Peer { /// Get either the current best direct path or an indirect path via e.g. a root. #[inline] - pub fn path(&self, node: &Node) -> Option> { + pub fn path(&self, node: &Node) -> Option>> { let direct_path = self.direct_path(); if direct_path.is_some() { return direct_path; @@ -136,7 +136,7 @@ impl Peer { return None; } - fn learn_path(&self, app: &Application, new_path: &Arc, time_ticks: i64) { + fn learn_path(&self, app: &Application, new_path: &Arc>, time_ticks: i64) { let mut paths = self.paths.lock().unwrap(); // TODO: check path filter @@ -202,7 +202,7 @@ impl Peer { } /// Called every SERVICE_INTERVAL_MS by the background service loop in Node. - pub(crate) fn service(&self, _: &Application, _: &Node, time_ticks: i64) -> bool { + pub(crate) fn service(&self, _: &Application, _: &Node, time_ticks: i64) -> bool { // Prune dead paths and sort in descending order of quality. { let mut paths = self.paths.lock().unwrap(); @@ -223,7 +223,7 @@ impl Peer { } /// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed. - fn v1_proto_internal_send( + fn v1_proto_internal_send( &self, app: &Application, endpoint: &Endpoint, @@ -281,11 +281,11 @@ impl Peer { /// The builder function must append the verb (with any verb flags) and packet payload. If it returns /// an error, the error is returned immediately and the send is aborted. None is returned if the send /// function itself fails for some reason such as no paths being available. - pub fn send Result>( + pub fn send Result>( &self, app: &Application, - node: &Node, - path: Option<&Arc>, + node: &Node, + path: Option<&Arc>>, time_ticks: i64, builder_function: BuilderFunction, ) -> Option> { @@ -369,8 +369,8 @@ impl Peer { self.v1_proto_internal_send( app, &path.endpoint, - Some(path.local_socket::()), - Some(path.local_interface::()), + Some(&path.local_socket), + Some(&path.local_interface), max_fragment_size, packet, ); @@ -389,12 +389,7 @@ impl Peer { /// Unlike other messages HELLO is sent partially in the clear and always with the long-lived /// static identity key. Authentication in old versions is via Poly1305 and in new versions /// via HMAC-SHA512. - pub(crate) fn send_hello( - &self, - app: &Application, - node: &Node, - explicit_endpoint: Option<&Endpoint>, - ) -> bool { + pub(crate) fn send_hello(&self, app: &Application, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { let mut path = None; let destination = if let Some(explicit_endpoint) = explicit_endpoint { explicit_endpoint @@ -454,8 +449,8 @@ impl Peer { self.v1_proto_internal_send( app, destination, - Some(p.local_socket::()), - Some(p.local_interface::()), + Some(&p.local_socket), + Some(&p.local_interface), max_fragment_size, packet, ); @@ -473,13 +468,13 @@ impl Peer { /// those fragments after the main packet header and first chunk. /// /// This returns true if the packet decrypted and passed authentication. - pub(crate) fn v1_proto_receive( + pub(crate) fn v1_proto_receive( self: &Arc, - node: &Node, + node: &Node, app: &Application, inner: &Inner, time_ticks: i64, - source_path: &Arc, + source_path: &Arc>, packet_header: &v1::PacketHeader, frag0: &PacketBuffer, fragments: &[Option], @@ -539,7 +534,7 @@ impl Peer { return match verb { message_type::VL1_NOP => PacketHandlerResult::Ok, - message_type::VL1_HELLO => self.handle_incoming_hello(app, inner, node, time_ticks, message_id, source_path, &payload), + message_type::VL1_HELLO => self.handle_incoming_hello(app, node, time_ticks, message_id, source_path, &payload), message_type::VL1_ERROR => { self.handle_incoming_error(app, inner, node, time_ticks, source_path, packet_header.hops(), message_id, &payload) } @@ -554,9 +549,9 @@ impl Peer { path_is_known, &payload, ), - message_type::VL1_WHOIS => self.handle_incoming_whois(app, inner, node, time_ticks, message_id, &payload), + message_type::VL1_WHOIS => self.handle_incoming_whois(app, node, time_ticks, message_id, &payload), message_type::VL1_RENDEZVOUS => self.handle_incoming_rendezvous(app, node, time_ticks, message_id, source_path, &payload), - message_type::VL1_ECHO => self.handle_incoming_echo(app, inner, node, time_ticks, message_id, &payload), + message_type::VL1_ECHO => self.handle_incoming_echo(app, node, time_ticks, message_id, &payload), message_type::VL1_PUSH_DIRECT_PATHS => self.handle_incoming_push_direct_paths(app, node, time_ticks, source_path, &payload), message_type::VL1_USER_MESSAGE => self.handle_incoming_user_message(app, node, time_ticks, source_path, &payload), _ => inner.handle_packet(app, node, self, &source_path, packet_header.hops(), message_id, verb, &payload, 1), @@ -567,17 +562,16 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_hello( + fn handle_incoming_hello( &self, app: &Application, - inner: &Inner, - node: &Node, + node: &Node, time_ticks: i64, message_id: MessageId, - source_path: &Arc, + source_path: &Arc>, payload: &PacketBuffer, ) -> PacketHandlerResult { - if !(inner.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { + if !(app.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { debug_event!( app, "[vl1] dropping HELLO from {} due to lack of trust relationship", @@ -621,13 +615,13 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_error( + fn handle_incoming_error( self: &Arc, app: &Application, inner: &Inner, - node: &Node, + node: &Node, _time_ticks: i64, - source_path: &Arc, + source_path: &Arc>, source_hops: u8, message_id: u64, payload: &PacketBuffer, @@ -659,13 +653,13 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_ok( + fn handle_incoming_ok( self: &Arc, app: &Application, inner: &Inner, - node: &Node, + node: &Node, time_ticks: i64, - source_path: &Arc, + source_path: &Arc>, source_hops: u8, message_id: u64, path_is_known: bool, @@ -761,16 +755,15 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_whois( + fn handle_incoming_whois( self: &Arc, app: &Application, - inner: &Inner, - node: &Node, + node: &Node, time_ticks: i64, _message_id: MessageId, payload: &PacketBuffer, ) -> PacketHandlerResult { - if node.this_node_is_root() || inner.should_respond_to(&self.identity) { + if node.this_node_is_root() || app.should_respond_to(&self.identity) { let mut addresses = payload.as_bytes(); while addresses.len() >= ADDRESS_SIZE { if !self @@ -794,29 +787,28 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_rendezvous( + fn handle_incoming_rendezvous( self: &Arc, _app: &Application, - node: &Node, + node: &Node, _time_ticks: i64, _message_id: MessageId, - _source_path: &Arc, + _source_path: &Arc>, _payload: &PacketBuffer, ) -> PacketHandlerResult { if node.is_peer_root(self) {} return PacketHandlerResult::Ok; } - fn handle_incoming_echo( + fn handle_incoming_echo( &self, app: &Application, - inner: &Inner, - node: &Node, + node: &Node, time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, ) -> PacketHandlerResult { - if inner.should_respond_to(&self.identity) || node.is_peer_root(self) { + if app.should_respond_to(&self.identity) || node.is_peer_root(self) { self.send(app, node, None, time_ticks, |packet| { let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap(); f.verb = message_type::VL1_OK; @@ -834,44 +826,44 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_push_direct_paths( + fn handle_incoming_push_direct_paths( self: &Arc, _app: &Application, - _node: &Node, + _node: &Node, _time_ticks: i64, - _source_path: &Arc, + _source_path: &Arc>, _payload: &PacketBuffer, ) -> PacketHandlerResult { PacketHandlerResult::Ok } - fn handle_incoming_user_message( + fn handle_incoming_user_message( self: &Arc, _app: &Application, - _node: &Node, + _node: &Node, _time_ticks: i64, - _source_path: &Arc, + _source_path: &Arc>, _payload: &PacketBuffer, ) -> PacketHandlerResult { PacketHandlerResult::Ok } } -impl Hash for Peer { +impl Hash for Peer { #[inline(always)] fn hash(&self, state: &mut H) { state.write_u64(self.identity.address.into()); } } -impl PartialEq for Peer { +impl PartialEq for Peer { #[inline(always)] fn eq(&self, other: &Self) -> bool { self.identity.fingerprint.eq(&other.identity.fingerprint) } } -impl Eq for Peer {} +impl Eq for Peer {} fn v1_proto_try_aead_decrypt( secret: &v1::SymmetricSecret, diff --git a/network-hypervisor/src/vl2/multicastauthority.rs b/network-hypervisor/src/vl2/multicastauthority.rs index c253ca4cc..d81f40090 100644 --- a/network-hypervisor/src/vl2/multicastauthority.rs +++ b/network-hypervisor/src/vl2/multicastauthority.rs @@ -3,19 +3,13 @@ use std::sync::{Arc, Mutex, RwLock}; use crate::protocol; use crate::protocol::PacketBuffer; -use crate::vl1::{Address, ApplicationLayer, Identity, Node, PacketHandlerResult, Peer, MAC}; +use crate::vl1::*; use crate::vl2::{MulticastGroup, NetworkId}; use zerotier_utils::buffer::OutOfBoundsError; use zerotier_utils::sync::RMaybeWLockGuard; /// Handler implementations for VL2_MULTICAST_LIKE and VL2_MULTICAST_GATHER. -/// -/// Both controllers and roots will want to handle these, with the latter supporting them for legacy -/// reasons only. Regular nodes may also want to handle them in the future. So, break this out to allow -/// easy code reuse. To integrate call the appropriate method when the appropriate message type is -/// received and pass in a function to check whether specific network/identity combinations should be -/// processed. The GATHER implementation will send reply packets to the source peer. pub struct MulticastAuthority { subscriptions: RwLock>>>, } @@ -47,11 +41,11 @@ impl MulticastAuthority { } /// Call for VL2_MULTICAST_LIKE packets. - pub fn handle_vl2_multicast_like bool>( + pub fn handle_vl2_multicast_like bool>( &self, auth: Authenticator, time_ticks: i64, - source: &Arc, + source: &Arc>, payload: &PacketBuffer, mut cursor: usize, ) -> PacketHandlerResult { @@ -84,13 +78,13 @@ impl MulticastAuthority { } /// Call for VL2_MULTICAST_GATHER packets. - pub fn handle_vl2_multicast_gather bool>( + pub fn handle_vl2_multicast_gather bool>( &self, auth: Authenticator, time_ticks: i64, - host_system: &HostSystemImpl, - node: &Node, - source: &Arc, + app: &Application, + node: &Node, + source: &Arc>, message_id: u64, payload: &PacketBuffer, mut cursor: usize, @@ -114,7 +108,7 @@ impl MulticastAuthority { } while !gathered.is_empty() { - source.send(host_system, node, None, time_ticks, |packet| -> Result<(), OutOfBoundsError> { + source.send(app, node, None, time_ticks, |packet| -> Result<(), OutOfBoundsError> { let ok_header = packet.append_struct_get_mut::()?; ok_header.verb = protocol::message_type::VL1_OK; ok_header.in_re_verb = protocol::message_type::VL2_MULTICAST_GATHER; diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index 3e2a3f32f..1dd807421 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -14,9 +14,9 @@ impl InnerProtocolLayer for Switch { fn handle_packet( &self, app: &Application, - node: &Node, - source: &Arc, - source_path: &Arc, + node: &Node, + source: &Arc>, + source_path: &Arc>, source_hops: u8, message_id: u64, verb: u8, @@ -29,9 +29,9 @@ impl InnerProtocolLayer for Switch { fn handle_error( &self, app: &Application, - node: &Node, - source: &Arc, - source_path: &Arc, + node: &Node, + source: &Arc>, + source_path: &Arc>, source_hops: u8, message_id: u64, in_re_verb: u8, @@ -46,9 +46,9 @@ impl InnerProtocolLayer for Switch { fn handle_ok( &self, app: &Application, - node: &Node, - source: &Arc, - source_path: &Arc, + node: &Node, + source: &Arc>, + source_path: &Arc>, source_hops: u8, message_id: u64, in_re_verb: u8, diff --git a/utils/src/cast.rs b/utils/src/cast.rs new file mode 100644 index 000000000..61c3d4124 --- /dev/null +++ b/utils/src/cast.rs @@ -0,0 +1,33 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c) ZeroTier, Inc. + * https://www.zerotier.com/ + */ + +use std::any::TypeId; +use std::mem::size_of; + +#[inline(always)] +pub fn same_type() -> bool { + TypeId::of::() == TypeId::of::() && size_of::() == size_of::() +} + +#[inline(always)] +pub fn cast_ref(u: &U) -> Option<&V> { + if same_type::() { + Some(unsafe { std::mem::transmute::<&U, &V>(u) }) + } else { + None + } +} + +#[inline(always)] +pub fn cast_mut(u: &mut U) -> Option<&mut V> { + if same_type::() { + Some(unsafe { std::mem::transmute::<&mut U, &mut V>(u) }) + } else { + None + } +} diff --git a/utils/src/gate.rs b/utils/src/gate.rs index 037edcc26..e3a8aa6bb 100644 --- a/utils/src/gate.rs +++ b/utils/src/gate.rs @@ -33,36 +33,3 @@ impl IntervalGate { } } } - -/* -/// Boolean rate limiter with atomic semantics. -#[repr(transparent)] -pub struct AtomicIntervalGate(AtomicI64); - -impl Default for AtomicIntervalGate { - #[inline(always)] - fn default() -> Self { - Self(AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS)) - } -} - -impl AtomicIntervalGate { - #[inline(always)] - #[allow(unused)] - pub fn new(initial_ts: i64) -> Self { - Self(AtomicI64::new(initial_ts)) - } - - #[inline(always)] - #[allow(unused)] - pub fn gate(&self, mut time: i64) -> bool { - let prev_time = self.0.load(Ordering::Acquire); - if (time - prev_time) < FREQ { - false - } else { - self.0.store(time, Ordering::Release); - true - } - } -} -*/ diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 795f172d8..4158587d0 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -9,6 +9,7 @@ pub mod arrayvec; pub mod blob; pub mod buffer; +pub mod cast; pub mod defer; pub mod dictionary; pub mod error; @@ -26,7 +27,6 @@ pub mod pool; pub mod reaper; pub mod ringbuffer; pub mod sync; -pub mod thing; pub mod varint; #[cfg(feature = "tokio")] diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index f7f8c6669..32450c8a9 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -37,7 +37,7 @@ pub struct VL1Service { vl1_data_storage: Arc, inner: Arc, buffer_pool: Arc, - node_container: Option, // never None, set in new() + node_container: Option>, // never None, set in new() } struct VL1ServiceMutableState { @@ -79,7 +79,7 @@ impl VL1Service { } #[inline(always)] - pub fn node(&self) -> &Node { + pub fn node(&self) -> &Node { debug_assert!(self.node_container.is_some()); unsafe { self.node_container.as_ref().unwrap_unchecked() } } @@ -216,6 +216,12 @@ impl ApplicationLayer for VL1Servi socket.is_valid() } + #[inline] + fn should_respond_to(&self, _: &Valid) -> bool { + // TODO: provide a way for the user of VL1Service to control this + true + } + #[inline] fn load_node_identity(&self) -> Option> { self.vl1_data_storage.load_node_identity()