From 1670a3aa316aa743ae8c793e745250c5365e3fc7 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 28 Mar 2023 08:58:39 -0400 Subject: [PATCH] Split off VL1 API traits into a separate file, remove ?Sized from some traits as it is unnecessary and causes issues, cleanup. --- controller/src/controller.rs | 135 +++++------- controller/src/main.rs | 5 +- controller/src/model/mod.rs | 4 - network-hypervisor/src/vl1/api.rs | 175 +++++++++++++++ network-hypervisor/src/vl1/mod.rs | 4 +- network-hypervisor/src/vl1/node.rs | 208 +++--------------- network-hypervisor/src/vl1/peer.rs | 31 +-- network-hypervisor/src/vl1/peermap.rs | 6 +- network-hypervisor/src/vl1/whois.rs | 11 +- .../src/vl2/multicastauthority.rs | 4 +- network-hypervisor/src/vl2/switch.rs | 6 +- vl1-service/src/vl1service.rs | 21 +- 12 files changed, 304 insertions(+), 306 deletions(-) create mode 100644 network-hypervisor/src/vl1/api.rs diff --git a/controller/src/controller.rs b/controller/src/controller.rs index d983b90ae..a6736c342 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -31,7 +31,6 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// ZeroTier VL2 network controller packet handler, answers VL2 netconf queries. pub struct Controller { self_ref: Weak, - service: RwLock>>, reaper: Reaper, runtime: tokio::runtime::Handle, database: Arc, @@ -49,40 +48,25 @@ pub struct Controller { } impl Controller { - /* - /// Start an inner protocol handler answer ZeroTier VL2 network controller queries. - /// - /// The start() method must be called once the service this will run within is also created. - pub async fn new(database: Arc, runtime: tokio::runtime::Handle) -> Result, Box> { - if let Some(local_identity) = database.load_node_identity() { - assert!(local_identity.secret.is_some()); - Ok(Arc::new_cyclic(|r| Self { - self_ref: r.clone(), - service: RwLock::new(Weak::default()), - reaper: Reaper::new(&runtime), - runtime, - database: database.clone(), - multicast_authority: MulticastAuthority::new(), - daemons: Mutex::new(Vec::with_capacity(2)), - recently_authorized: RwLock::new(HashMap::new()), - })) - } else { - Err(Box::new(InvalidParameterError("local controller's identity not readable by database"))) - } - } + pub async fn new( + runtime: tokio::runtime::Handle, + local_identity: IdentitySecret, + database: Arc, + ) -> Result, Box> { + let c = Arc::new_cyclic(|self_ref| Self { + self_ref: self_ref.clone(), + reaper: Reaper::new(&runtime), + runtime, + database: database.clone(), + local_identity, + multicast_authority: MulticastAuthority::new(), + daemons: Mutex::new(Vec::with_capacity(2)), + recently_authorized: RwLock::new(HashMap::new()), + }); - /// Set the service and HostSystem implementation for this controller and start daemons. - /// - /// This must be called once the service that uses this handler is up or the controller - /// won't actually do anything. The controller holds a weak reference to VL1Service so - /// be sure it's not dropped. - pub async fn start(&self, service: &Arc>) { - *self.service.write().unwrap() = Arc::downgrade(service); - - // Create database change listener. - if let Some(cw) = self.database.changes().await.map(|mut ch| { - let self2 = self.self_ref.clone(); - self.runtime.spawn(async move { + if let Some(cw) = c.database.changes().await.map(|mut ch| { + let self2 = c.self_ref.clone(); + c.runtime.spawn(async move { loop { if let Ok(change) = ch.recv().await { if let Some(self2) = self2.upgrade() { @@ -97,12 +81,11 @@ impl Controller { } }) }) { - self.daemons.lock().unwrap().push(cw); + c.daemons.lock().unwrap().push(cw); } - // Create background task to expire multicast subscriptions and recent authorizations. - let self2 = self.self_ref.clone(); - self.daemons.lock().unwrap().push(self.runtime.spawn(async move { + let self2 = c.self_ref.clone(); + c.daemons.lock().unwrap().push(c.runtime.spawn(async move { let sleep_duration = Duration::from_millis((protocol::VL2_DEFAULT_MULTICAST_LIKE_EXPIRE / 2).min(2500) as u64); loop { tokio::time::sleep(sleep_duration).await; @@ -119,8 +102,9 @@ impl Controller { } } })); + + Ok(c) } - */ /// Launched as a task when the DB informs us of a change. async fn handle_change_notification(self: Arc, change: Change) { @@ -139,7 +123,7 @@ impl Controller { } /// Compose and send network configuration packet (either V1 or V2) - fn send_network_config( + fn send_network_config( &self, app: &Application, node: &Node, @@ -176,8 +160,6 @@ impl Controller { packet.append_u64(config.network_id.to_legacy_u64())?; packet.append_u16(config_data.len() as u16)?; packet.append_bytes(config_data.as_slice())?; - - // TODO: for V1 we may need to introduce use of the chunking mechanism for large configs. } let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]); @@ -188,48 +170,40 @@ impl Controller { } /// Send one or more revocation object(s) to a peer. The provided vector is drained. - fn send_revocations(&self, peer: &Peer>, revocations: &mut Vec) { - if let Some(host_system) = self.service.read().unwrap().upgrade() { - let time_ticks = ms_monotonic(); - while !revocations.is_empty() { - let send_count = revocations.len().min(protocol::UDP_DEFAULT_MTU / 256); - debug_assert!(send_count <= (u16::MAX as usize)); - peer.send( - host_system.as_ref(), - &host_system.node, - None, - time_ticks, - |packet| -> Result<(), OutOfBoundsError> { - let payload_start = packet.len(); + fn send_revocations(&self, app: &Arc>, peer: &Peer>, revocations: &mut Vec) { + let time_ticks = ms_monotonic(); + while !revocations.is_empty() { + let send_count = revocations.len().min(protocol::UDP_DEFAULT_MTU / 256); + debug_assert!(send_count <= (u16::MAX as usize)); + peer.send(app.as_ref(), &app.node, None, time_ticks, |packet| -> Result<(), OutOfBoundsError> { + let payload_start = packet.len(); - packet.append_u8(protocol::message_type::VL2_NETWORK_CREDENTIALS)?; - packet.append_u8(0)?; - packet.append_u16(0)?; - packet.append_u16(0)?; - packet.append_u16(send_count as u16)?; - for _ in 0..send_count { - let r = revocations.pop().unwrap(); - packet.append_bytes(r.v1_proto_to_bytes(&self.local_identity.public.address).as_bytes())?; - } - packet.append_u16(0)?; + packet.append_u8(protocol::message_type::VL2_NETWORK_CREDENTIALS)?; + packet.append_u8(0)?; + packet.append_u16(0)?; + packet.append_u16(0)?; + packet.append_u16(send_count as u16)?; + for _ in 0..send_count { + let r = revocations.pop().unwrap(); + packet.append_bytes(r.v1_proto_to_bytes(&self.local_identity.public.address).as_bytes())?; + } + packet.append_u16(0)?; - let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]); - packet.set_size(payload_start + new_payload_len); + let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]); + packet.set_size(payload_start + new_payload_len); - Ok(()) - }, - ); - } + Ok(()) + }); } } - async fn deauthorize_member(&self, member: &Member) { + async fn deauthorize_member(&self, app: &Arc>, member: &Member) { let time_clock = ms_since_epoch(); let mut revocations = Vec::with_capacity(1); if let Ok(all_network_members) = self.database.list_members(&member.network_id).await { for m in all_network_members.iter() { if member.node_id != *m { - if let Some(peer) = self.service.read().unwrap().upgrade().and_then(|s| s.node.peer(m)) { + if let Some(peer) = app.node.peer(m) { revocations.clear(); revocations.push(Revocation::new( &member.network_id, @@ -239,7 +213,7 @@ impl Controller { &self.local_identity, false, )); - self.send_revocations(&peer, &mut revocations); + self.send_revocations(&app, &peer, &mut revocations); } } } @@ -258,7 +232,7 @@ impl Controller { source_identity: &Valid, network_id: &NetworkId, time_clock: i64, - ) -> Result<(AuthenticationResult, Option), Box> { + ) -> Result<(AuthenticationResult, Option>), Box> { let network = self.database.get_network(&network_id).await?; if network.is_none() { return Ok((AuthenticationResult::Rejected, None)); @@ -326,7 +300,7 @@ impl Controller { // Check and if necessary auto-assign static IPs for this member. member_changed |= network.assign_ip_addresses(self.database.as_ref(), &mut member).await; - let mut nc = NetworkConfig::new(network_id.clone(), source_identity.address.clone()); + let mut nc = Box::new(NetworkConfig::new(network_id.clone(), source_identity.address.clone())); nc.name = network.name.clone(); nc.private = network.private; @@ -434,7 +408,7 @@ impl Controller { } impl InnerProtocolLayer for Controller { - fn handle_packet( + fn handle_packet( &self, app: &Application, node: &Node, @@ -481,6 +455,8 @@ impl InnerProtocolLayer for Controller { }; // Launch handler as an async background task. + let app: &VL1Service = cast_ref(app).unwrap(); + let app = app.get(); let (self2, source, source_remote_endpoint) = (self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone()); self.reaper.add( self.runtime.spawn(async move { @@ -490,15 +466,13 @@ impl InnerProtocolLayer for Controller { let (result, config) = match self2.authorize(&source.identity, &network_id, now).await { Result::Ok((result, Some(config))) => { //println!("{}", serde_yaml::to_string(&config).unwrap()); - let app = self2.service.read().unwrap().upgrade().unwrap(); self2.send_network_config(app.as_ref(), &app.node, cast_ref(source.as_ref()).unwrap(), &config, Some(message_id)); (result, Some(config)) } Result::Ok((result, None)) => (result, None), Result::Err(e) => { #[cfg(debug_assertions)] - let host = self2.service.read().unwrap().clone().upgrade().unwrap(); - debug_event!(host, "[vl2] ERROR getting network config: {}", e.to_string()); + debug_event!(app, "[vl2] ERROR getting network config: {}", e.to_string()); return; } }; @@ -516,7 +490,6 @@ impl InnerProtocolLayer for Controller { source_remote_endpoint, source_hops, result, - config, }) .await; }), diff --git a/controller/src/main.rs b/controller/src/main.rs index 6c9d29f3c..ea058fb6f 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -5,18 +5,19 @@ use std::sync::Arc; use zerotier_network_controller::database::Database; use zerotier_network_controller::filedatabase::FileDatabase; use zerotier_network_controller::Controller; +use zerotier_network_hypervisor::vl1::identity::IdentitySecret; use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; use zerotier_utils::exitcode; use zerotier_utils::tokio::runtime::Runtime; use zerotier_vl1_service::VL1Service; -async fn run(database: Arc, runtime: &Runtime) -> i32 { +async fn run(identity: IdentitySecret, runtime: &Runtime) -> i32 { match Controller::new(database.clone(), runtime.handle().clone()).await { Err(err) => { eprintln!("FATAL: error initializing handler: {}", err.to_string()); exitcode::ERR_CONFIG } - Ok(handler) => match VL1Service::new(database.clone(), handler.clone(), zerotier_vl1_service::VL1Settings::default()) { + Ok(handler) => match VL1Service::new(identity, handler.clone(), zerotier_vl1_service::VL1Settings::default()) { Err(err) => { eprintln!("FATAL: error launching service: {}", err.to_string()); exitcode::ERR_IOERR diff --git a/controller/src/model/mod.rs b/controller/src/model/mod.rs index 455b0a0fa..fab148844 100644 --- a/controller/src/model/mod.rs +++ b/controller/src/model/mod.rs @@ -11,7 +11,6 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; use zerotier_network_hypervisor::vl1::{Address, Endpoint}; -use zerotier_network_hypervisor::vl2::v1::networkconfig::NetworkConfig; use zerotier_network_hypervisor::vl2::NetworkId; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -118,9 +117,6 @@ pub struct RequestLogItem { #[serde(rename = "r")] pub result: AuthenticationResult, - - #[serde(rename = "nc")] - pub config: Option, } impl ToString for RequestLogItem { diff --git a/network-hypervisor/src/vl1/api.rs b/network-hypervisor/src/vl1/api.rs new file mode 100644 index 000000000..ceb364655 --- /dev/null +++ b/network-hypervisor/src/vl1/api.rs @@ -0,0 +1,175 @@ +use std::hash::Hash; +use std::sync::Arc; + +use super::endpoint::Endpoint; +use super::event::Event; +use super::identity::Identity; +use super::node::Node; +use super::path::Path; +use super::peer::Peer; + +use crate::protocol::{PacketBuffer, PooledPacketBuffer}; +use zerotier_crypto::typestate::Valid; + +/// Interface trait to be implemented by code that's using the ZeroTier network hypervisor. +/// +/// This is analogous to a C struct full of function pointers to callbacks along with some +/// associated type definitions. +pub trait ApplicationLayer: Sync + Send + 'static { + /// Type for local system sockets. + type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; + + /// Type for local system interfaces. + type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; + + /// A VL1 level event occurred. + fn event(&self, event: Event); + + /// Get a pooled packet buffer for internal use. + fn get_buffer(&self) -> PooledPacketBuffer; + + /// Check a local socket for validity. + /// + /// This could return false if the socket's interface no longer exists, its port has been + /// unbound, etc. + fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool; + + /// Check if this node should respond to messages from a given peer at all. + /// + /// The default implementation always returns true. Typically this is what you want for a + /// controller or a root but not a regular node (unless required for backward compatibility). + #[allow(unused)] + fn should_respond_to(&self, id: &Valid) -> bool { + true + } + + /// Called to send a packet over the physical network (virtual -> physical). + /// + /// This sends with UDP-like semantics. It should do whatever best effort it can and return. + /// + /// If a local socket is specified the implementation should send from that socket or not + /// at all (returning false). If a local interface is specified the implementation should + /// send from all sockets on that interface. If neither is specified the packet may be + /// sent on all sockets or a random subset. + /// + /// For endpoint types that support a packet TTL, the implementation may set the TTL + /// if the 'ttl' parameter is not zero. If the parameter is zero or TTL setting is not + /// supported, the default TTL should be used. This parameter is ignored for types that + /// don't support it. + fn wire_send( + &self, + endpoint: &Endpoint, + local_socket: Option<&Self::LocalSocket>, + local_interface: Option<&Self::LocalInterface>, + data: &[u8], + packet_ttl: u8, + ); + + /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. + /// + /// The default implementation always returns true. + #[allow(unused_variables)] + fn should_use_physical_path( + &self, + id: &Valid, + endpoint: &Endpoint, + local_socket: Option<&Application::LocalSocket>, + local_interface: Option<&Application::LocalInterface>, + ) -> bool { + true + } + + /// Called to look up any statically defined or memorized paths to known nodes. + /// + /// The default implementation always returns None. + #[allow(unused_variables)] + fn get_path_hints( + &self, + id: &Valid, + ) -> Option, Option)>> { + None + } + + /// Called to get the current time in milliseconds from the system monotonically increasing clock. + /// This needs to be accurate to about 250 milliseconds resolution or better. + fn time_ticks(&self) -> i64; + + /// Called to get the current time in milliseconds since epoch from the real-time clock. + /// This needs to be accurate to about one second resolution or better. + fn time_clock(&self) -> i64; +} + +/// Result of a packet handler in the InnerProtocolLayer trait. +pub enum PacketHandlerResult { + /// Packet was handled successfully. + Ok, + + /// Packet was handled and an error occurred (malformed, authentication failure, etc.) + Error, + + /// Packet was not handled by this handler. + NotHandled, +} + +/// Interface between VL1 and higher/inner protocol layers. +/// +/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but +/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols. +#[allow(unused)] +pub trait InnerProtocolLayer: Sync + Send { + /// Handle a packet, returning true if it was handled by the next layer. + /// + /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). + /// The default version returns NotHandled. + fn handle_packet( + &self, + app: &Application, + node: &Node, + source: &Arc>, + source_path: &Arc>, + source_hops: u8, + message_id: u64, + verb: u8, + payload: &PacketBuffer, + cursor: usize, + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled + } + + /// Handle errors, returning true if the error was recognized. + /// The default version returns NotHandled. + fn handle_error( + &self, + app: &Application, + node: &Node, + source: &Arc>, + source_path: &Arc>, + source_hops: u8, + message_id: u64, + in_re_verb: u8, + in_re_message_id: u64, + error_code: u8, + payload: &PacketBuffer, + cursor: usize, + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled + } + + /// Handle an OK, returning true if the OK was recognized. + /// The default version returns NotHandled. + fn handle_ok( + &self, + app: &Application, + node: &Node, + source: &Arc>, + source_path: &Arc>, + source_hops: u8, + message_id: u64, + in_re_verb: u8, + in_re_message_id: u64, + payload: &PacketBuffer, + cursor: usize, + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled + } +} diff --git a/network-hypervisor/src/vl1/mod.rs b/network-hypervisor/src/vl1/mod.rs index 30f462ff2..2cd2306e4 100644 --- a/network-hypervisor/src/vl1/mod.rs +++ b/network-hypervisor/src/vl1/mod.rs @@ -1,6 +1,7 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently proprietary pending actual release and licensing. See LICENSE.md. mod address; +mod api; mod endpoint; mod event; mod mac; @@ -15,11 +16,12 @@ pub mod identity; pub mod inetaddress; pub use address::{Address, PartialAddress}; +pub use api::{ApplicationLayer, InnerProtocolLayer, PacketHandlerResult}; pub use endpoint::Endpoint; pub use event::Event; pub use inetaddress::InetAddress; pub use mac::MAC; -pub use node::{ApplicationLayer, InnerProtocolLayer, Node, PacketHandlerResult}; +pub use node::Node; pub use path::Path; pub use peer::Peer; pub use rootset::{Root, RootSet}; diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index 5459718af..a2b552208 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -6,16 +6,17 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; +use super::address::{Address, PartialAddress}; +use super::api::{ApplicationLayer, InnerProtocolLayer, PacketHandlerResult}; +use super::debug_event; +use super::endpoint::Endpoint; +use super::event::Event; +use super::identity::{Identity, IdentitySecret}; +use super::path::{Path, PathServiceResult}; +use super::peer::Peer; +use super::peermap::PeerMap; +use super::rootset::RootSet; use crate::protocol::*; -use crate::vl1::address::{Address, PartialAddress}; -use crate::vl1::debug_event; -use crate::vl1::endpoint::Endpoint; -use crate::vl1::event::Event; -use crate::vl1::identity::{Identity, IdentitySecret}; -use crate::vl1::path::{Path, PathServiceResult}; -use crate::vl1::peer::Peer; -use crate::vl1::peermap::PeerMap; -use crate::vl1::rootset::RootSet; use zerotier_crypto::typestate::{Valid, Verified}; use zerotier_utils::gate::IntervalGate; @@ -23,170 +24,22 @@ use zerotier_utils::hex; use zerotier_utils::marshalable::Marshalable; use zerotier_utils::tokio::io::AsyncWriteExt; -/// Interface trait to be implemented by code that's using the ZeroTier network hypervisor. +/// A VL1 node on the ZeroTier global peer to peer network. /// -/// This is analogous to a C struct full of function pointers to callbacks along with some -/// associated type definitions. -pub trait ApplicationLayer: Sync + Send + 'static { - /// Type for local system sockets. - type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; - - /// Type for local system interfaces. - type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; - - /// A VL1 level event occurred. - fn event(&self, event: Event); - - /// Get a pooled packet buffer for internal use. - fn get_buffer(&self) -> PooledPacketBuffer; - - /// Check a local socket for validity. - /// - /// This could return false if the socket's interface no longer exists, its port has been - /// unbound, etc. - fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool; - - /// Check if this node should respond to messages from a given peer at all. - /// - /// The default implementation always returns true. Typically this is what you want for a - /// controller or a root but not a regular node (unless required for backward compatibility). - #[allow(unused)] - fn should_respond_to(&self, id: &Valid) -> bool { - true - } - - /// Called to send a packet over the physical network (virtual -> physical). - /// - /// This sends with UDP-like semantics. It should do whatever best effort it can and return. - /// - /// If a local socket is specified the implementation should send from that socket or not - /// at all (returning false). If a local interface is specified the implementation should - /// send from all sockets on that interface. If neither is specified the packet may be - /// sent on all sockets or a random subset. - /// - /// For endpoint types that support a packet TTL, the implementation may set the TTL - /// if the 'ttl' parameter is not zero. If the parameter is zero or TTL setting is not - /// supported, the default TTL should be used. This parameter is ignored for types that - /// don't support it. - fn wire_send( - &self, - endpoint: &Endpoint, - local_socket: Option<&Self::LocalSocket>, - local_interface: Option<&Self::LocalInterface>, - data: &[u8], - packet_ttl: u8, - ); - - /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. - /// - /// The default implementation always returns true. - #[allow(unused_variables)] - fn should_use_physical_path( - &self, - id: &Valid, - endpoint: &Endpoint, - local_socket: Option<&Application::LocalSocket>, - local_interface: Option<&Application::LocalInterface>, - ) -> bool { - true - } - - /// Called to look up any statically defined or memorized paths to known nodes. - /// - /// The default implementation always returns None. - #[allow(unused_variables)] - fn get_path_hints( - &self, - id: &Valid, - ) -> Option, Option)>> { - None - } - - /// Called to get the current time in milliseconds from the system monotonically increasing clock. - /// This needs to be accurate to about 250 milliseconds resolution or better. - fn time_ticks(&self) -> i64; - - /// Called to get the current time in milliseconds since epoch from the real-time clock. - /// This needs to be accurate to about one second resolution or better. - fn time_clock(&self) -> i64; +/// VL1 nodes communicate to/from both the outside world and the inner protocol layer via the two +/// supplied API traits that must be implemented by the application. ApplicationLayer provides a +/// means of interacting with the application/OS and InnerProtocolLayer provides the interface for +/// implementing the protocol (e.g. ZeroTier VL2) that will be carried by VL1. +pub struct Node { + pub identity: IdentitySecret, + intervals: Mutex, + paths: RwLock, Arc>>>, + pub(super) peers: PeerMap, + roots: RwLock>, + best_root: RwLock>>>, } -/// Result of a packet handler in the InnerProtocolLayer trait. -pub enum PacketHandlerResult { - /// Packet was handled successfully. - Ok, - - /// Packet was handled and an error occurred (malformed, authentication failure, etc.) - Error, - - /// Packet was not handled by this handler. - NotHandled, -} - -/// Interface between VL1 and higher/inner protocol layers. -/// -/// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but -/// it could also be implemented for testing or "off label" use of VL1 to carry different protocols. -#[allow(unused)] -pub trait InnerProtocolLayer: Sync + Send { - /// Handle a packet, returning true if it was handled by the next layer. - /// - /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). - /// The default version returns NotHandled. - fn handle_packet( - &self, - app: &Application, - node: &Node, - source: &Arc>, - source_path: &Arc>, - source_hops: u8, - message_id: u64, - verb: u8, - payload: &PacketBuffer, - cursor: usize, - ) -> PacketHandlerResult { - PacketHandlerResult::NotHandled - } - - /// Handle errors, returning true if the error was recognized. - /// The default version returns NotHandled. - fn handle_error( - &self, - app: &Application, - node: &Node, - source: &Arc>, - source_path: &Arc>, - source_hops: u8, - message_id: u64, - in_re_verb: u8, - in_re_message_id: u64, - error_code: u8, - payload: &PacketBuffer, - cursor: usize, - ) -> PacketHandlerResult { - PacketHandlerResult::NotHandled - } - - /// Handle an OK, returning true if the OK was recognized. - /// The default version returns NotHandled. - fn handle_ok( - &self, - app: &Application, - node: &Node, - source: &Arc>, - source_path: &Arc>, - source_hops: u8, - message_id: u64, - in_re_verb: u8, - in_re_message_id: u64, - payload: &PacketBuffer, - cursor: usize, - ) -> PacketHandlerResult { - PacketHandlerResult::NotHandled - } -} - -struct RootInfo { +struct RootInfo { /// Root sets to which we are a member. sets: HashMap>, @@ -217,16 +70,7 @@ struct BackgroundTaskIntervals { whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, } -pub struct Node { - pub identity: IdentitySecret, - intervals: Mutex, - paths: RwLock, Arc>>>, - pub(super) peers: PeerMap, - roots: RwLock>, - best_root: RwLock>>>, -} - -impl Node { +impl Node { pub fn new(identity_secret: IdentitySecret) -> Self { Self { identity: identity_secret, @@ -550,7 +394,7 @@ impl Node { INTERVAL } - pub fn handle_incoming_physical_packet( + pub fn handle_incoming_physical_packet( &self, app: &Application, inner: &Inner, @@ -753,7 +597,7 @@ impl Node { } } -/// Key used to look up paths in a hash map efficiently. +/// Key used to look up paths in a hash map efficiently. It can be constructed for lookup without full copy. enum PathKey<'a, 'b, LocalSocket: Hash + PartialEq + Eq + Clone> { Copied(Endpoint, LocalSocket), Ref(&'a Endpoint, &'b LocalSocket), diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index a47319bb1..559069c8f 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -14,19 +14,20 @@ use zerotier_utils::marshalable::Marshalable; use zerotier_utils::memory::array_range; use zerotier_utils::NEVER_HAPPENED_TICKS; +use super::api::*; +use super::debug_event; +use super::identity::{Identity, IdentitySecret}; +use super::node::*; +use super::Valid; +use super::{Address, Endpoint, Path}; use crate::protocol::*; -use crate::vl1::debug_event; -use crate::vl1::identity::{Identity, IdentitySecret}; -use crate::vl1::node::*; -use crate::vl1::Valid; -use crate::vl1::{Address, Endpoint, Path}; use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; use super::PartialAddress; pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000; -pub struct Peer { +pub struct Peer { pub identity: Valid, v1_proto_static_secret: v1::SymmetricSecret, @@ -43,7 +44,7 @@ pub struct Peer { remote_node_info: RwLock, } -struct PeerPath { +struct PeerPath { path: Weak>, last_receive_time_ticks: i64, } @@ -55,11 +56,11 @@ struct RemoteNodeInfo { } /// Sort a list of paths by quality or priority, with best paths first. -fn prioritize_paths(paths: &mut Vec>) { +fn prioritize_paths(paths: &mut Vec>) { paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); } -impl Peer { +impl Peer { /// Create a new peer. /// /// This only returns None if this_node_identity does not have its secrets or if some @@ -470,7 +471,7 @@ impl Peer { /// those fragments after the main packet header and first chunk. /// /// This returns true if the packet decrypted and passed authentication. - pub(crate) fn v1_proto_receive( + pub(crate) fn v1_proto_receive( self: &Arc, node: &Node, app: &Application, @@ -617,7 +618,7 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_error( + fn handle_incoming_error( self: &Arc, app: &Application, inner: &Inner, @@ -655,7 +656,7 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_ok( + fn handle_incoming_ok( self: &Arc, app: &Application, inner: &Inner, @@ -852,21 +853,21 @@ impl Peer { } } -impl Hash for Peer { +impl Hash for Peer { #[inline(always)] fn hash(&self, state: &mut H) { self.identity.address.hash(state) } } -impl PartialEq for Peer { +impl PartialEq for Peer { #[inline(always)] fn eq(&self, other: &Self) -> bool { self.identity.eq(&other.identity) } } -impl Eq for Peer {} +impl Eq for Peer {} fn v1_proto_try_aead_decrypt( secret: &v1::SymmetricSecret, diff --git a/network-hypervisor/src/vl1/peermap.rs b/network-hypervisor/src/vl1/peermap.rs index 6a82a3c59..f6d11122e 100644 --- a/network-hypervisor/src/vl1/peermap.rs +++ b/network-hypervisor/src/vl1/peermap.rs @@ -3,17 +3,17 @@ use std::ops::Bound; use std::sync::{Arc, RwLock}; use super::address::{Address, PartialAddress}; +use super::api::ApplicationLayer; use super::identity::{Identity, IdentitySecret}; -use super::node::ApplicationLayer; use super::peer::Peer; use zerotier_crypto::typestate::Valid; -pub struct PeerMap { +pub struct PeerMap { maps: [RwLock>>>; 256], } -impl PeerMap { +impl PeerMap { pub fn new() -> Self { Self { maps: std::array::from_fn(|_| RwLock::new(BTreeMap::new())) } } diff --git a/network-hypervisor/src/vl1/whois.rs b/network-hypervisor/src/vl1/whois.rs index 33ccdaa69..c37dbe175 100644 --- a/network-hypervisor/src/vl1/whois.rs +++ b/network-hypervisor/src/vl1/whois.rs @@ -4,8 +4,9 @@ use std::ops::Bound; use std::sync::{Mutex, Weak}; use super::address::PartialAddress; +use super::api::{ApplicationLayer, InnerProtocolLayer}; use super::identity::Identity; -use super::node::{ApplicationLayer, InnerProtocolLayer, Node}; +use super::node::Node; use super::path::Path; use crate::debug_event; use crate::protocol; @@ -13,17 +14,17 @@ use crate::protocol; use zerotier_crypto::typestate::Valid; use zerotier_utils::ringbuffer::RingBuffer; -pub(super) struct Whois { +pub(super) struct Whois { whois_queue: Mutex>>, } -struct WhoisQueueItem { +struct WhoisQueueItem { pending_v1_packets: RingBuffer<(Weak>, protocol::PooledPacketBuffer), { protocol::WHOIS_MAX_WAITING_PACKETS }>, last_retry_time: i64, retry_count: u16, } -impl Whois { +impl Whois { pub fn new() -> Self { Self { whois_queue: Mutex::new(BTreeMap::new()) } } @@ -37,7 +38,7 @@ impl Whois { ) { } - pub fn handle_incoming_identity( + pub fn handle_incoming_identity( &self, app: &Application, node: &Node, diff --git a/network-hypervisor/src/vl2/multicastauthority.rs b/network-hypervisor/src/vl2/multicastauthority.rs index d17e40f0b..c8551c236 100644 --- a/network-hypervisor/src/vl2/multicastauthority.rs +++ b/network-hypervisor/src/vl2/multicastauthority.rs @@ -42,7 +42,7 @@ impl MulticastAuthority { } /// Call for VL2_MULTICAST_LIKE packets. - pub fn handle_vl2_multicast_like bool>( + pub fn handle_vl2_multicast_like bool>( &self, auth: Authenticator, time_ticks: i64, @@ -79,7 +79,7 @@ impl MulticastAuthority { } /// Call for VL2_MULTICAST_GATHER packets. - pub fn handle_vl2_multicast_gather bool>( + pub fn handle_vl2_multicast_gather bool>( &self, auth: Authenticator, time_ticks: i64, diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index 279d20911..7b13de8c0 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -11,7 +11,7 @@ pub struct Switch {} #[allow(unused_variables)] impl InnerProtocolLayer for Switch { - fn handle_packet( + fn handle_packet( &self, app: &Application, node: &Node, @@ -26,7 +26,7 @@ impl InnerProtocolLayer for Switch { PacketHandlerResult::NotHandled } - fn handle_error( + fn handle_error( &self, app: &Application, node: &Node, @@ -43,7 +43,7 @@ impl InnerProtocolLayer for Switch { PacketHandlerResult::NotHandled } - fn handle_ok( + fn handle_ok( &self, app: &Application, node: &Node, diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index 9f3dd894f..690ce3303 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -2,8 +2,7 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; -use std::sync::Arc; -use std::sync::RwLock; +use std::sync::{Arc, RwLock, Weak}; use std::thread::JoinHandle; use std::time::Duration; @@ -27,8 +26,9 @@ const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10; /// talks to the physical network, manages the vl1 node, and presents a templated interface for /// whatever inner protocol implementation is using it. This would typically be VL2 but could be /// a test harness or just the controller for a controller that runs stand-alone. -pub struct VL1Service { +pub struct VL1Service { pub node: Node, + self_ref: Weak, state: RwLock, inner: Arc, buffer_pool: Arc, @@ -41,10 +41,11 @@ struct VL1ServiceMutableState { running: bool, } -impl VL1Service { +impl VL1Service { pub fn new(identity: IdentitySecret, inner: Arc, settings: VL1Settings) -> Result, Box> { - let service = Arc::new(Self { + let service = Arc::new_cyclic(|self_ref| Self { node: Node::::new(identity), + self_ref: self_ref.clone(), state: RwLock::new(VL1ServiceMutableState { daemons: Vec::with_capacity(2), udp_sockets: HashMap::with_capacity(8), @@ -68,6 +69,10 @@ impl VL1Service { Ok(service) } + pub fn get(&self) -> Arc { + self.self_ref.upgrade().unwrap() + } + pub fn bound_udp_ports(&self) -> Vec { self.state.read().unwrap().udp_sockets.keys().cloned().collect() } @@ -162,7 +167,7 @@ impl VL1Service { } } -impl UdpPacketHandler for VL1Service { +impl UdpPacketHandler for VL1Service { #[inline(always)] fn incoming_udp_packet( self: &Arc, @@ -183,7 +188,7 @@ impl UdpPacketHandler for VL1Servi } } -impl ApplicationLayer for VL1Service { +impl ApplicationLayer for VL1Service { type LocalSocket = crate::LocalSocket; type LocalInterface = crate::LocalInterface; @@ -280,7 +285,7 @@ impl ApplicationLayer for VL1Servi } } -impl Drop for VL1Service { +impl Drop for VL1Service { fn drop(&mut self) { let mut state = self.state.write().unwrap(); state.running = false;