diff --git a/controller/Cargo.toml b/controller/Cargo.toml index e7c090170..d01579c6f 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -12,6 +12,8 @@ zerotier-crypto = { path = "../crypto" } zerotier-utils = { path = "../utils" } zerotier-network-hypervisor = { path = "../network-hypervisor" } zerotier-vl1-service = { path = "../vl1-service" } +async-trait = "^0" +tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false } +parking_lot = { version = "^0", features = [], default-features = false } serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } -async-trait = "^0" diff --git a/controller/src/controller.rs b/controller/src/controller.rs index 5a15ee5d7..341260982 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -1,102 +1,135 @@ -use std::sync::Arc; - use crate::database::Database; -use async_trait::async_trait; +use std::sync::Arc; + +use tokio::time::{Duration, Instant}; use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer}; use zerotier_network_hypervisor::util::dictionary::Dictionary; -use zerotier_network_hypervisor::util::marshalable::MarshalUnmarshalError; -use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, Path, Peer}; +use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, PacketHandlerResult, Path, Peer}; use zerotier_network_hypervisor::vl2::NetworkId; +use zerotier_utils::reaper::Reaper; + +const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + pub struct Controller { - pub database: Arc, + database: Arc, + reaper: Reaper, } impl Controller { pub async fn new(database: Arc) -> Arc { - Arc::new(Self { database }) + Arc::new(Self { database, reaper: Reaper::new() }) } async fn handle_network_config_request( - &self, - source: &Peer, - source_path: &Path, - payload: &PacketBuffer, - ) -> Result<(), MarshalUnmarshalError> { - let mut cursor = 0; - let network_id = NetworkId::from_u64(payload.read_u64(&mut cursor)?); - if network_id.is_none() { - return Err(MarshalUnmarshalError::InvalidData); - } - let network_id = network_id.unwrap(); - let meta_data = if cursor < payload.len() { - let meta_data_len = payload.read_u16(&mut cursor)?; - let d = Dictionary::from_bytes(payload.read_bytes(meta_data_len as usize, &mut cursor)?); - if d.is_none() { - return Err(MarshalUnmarshalError::InvalidData); - } - d.unwrap() - } else { - Dictionary::new() - }; - let (have_revision, have_timestamp) = if cursor < payload.len() { - let r = payload.read_u64(&mut cursor)?; - let t = payload.read_u64(&mut cursor)?; - (Some(r), Some(t)) - } else { - (None, None) - }; - - if let Ok(Some(network)) = self.database.get_network(network_id).await {} - - return Ok(()); + database: Arc, + source: Arc>, + source_path: Arc>, + network_id: NetworkId, + meta_data: Dictionary, + have_revision: Option, + have_timestamp: Option, + ) { + if let Ok(Some(network)) = database.get_network(network_id).await {} } } -#[async_trait] impl InnerProtocol for Controller { - async fn handle_packet( + fn handle_packet( &self, - source: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, verb: u8, payload: &PacketBuffer, - ) -> bool { + ) -> PacketHandlerResult { match verb { verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => { - let _ = self.handle_network_config_request(source, source_path, payload).await; - // TODO: display/log errors - true + let mut cursor = 0; + let network_id = payload.read_u64(&mut cursor); + if network_id.is_err() { + return PacketHandlerResult::Error; + } + let network_id = NetworkId::from_u64(network_id.unwrap()); + if network_id.is_none() { + return PacketHandlerResult::Error; + } + let network_id = network_id.unwrap(); + let meta_data = if cursor < payload.len() { + let meta_data_len = payload.read_u16(&mut cursor); + if meta_data_len.is_err() { + return PacketHandlerResult::Error; + } + if let Ok(d) = payload.read_bytes(meta_data_len.unwrap() as usize, &mut cursor) { + let d = Dictionary::from_bytes(d); + if d.is_none() { + return PacketHandlerResult::Error; + } + d.unwrap() + } else { + return PacketHandlerResult::Error; + } + } else { + Dictionary::new() + }; + let (have_revision, have_timestamp) = if cursor < payload.len() { + let r = payload.read_u64(&mut cursor); + let t = payload.read_u64(&mut cursor); + if r.is_err() || t.is_err() { + return PacketHandlerResult::Error; + } + (Some(r.unwrap()), Some(t.unwrap())) + } else { + (None, None) + }; + + if let Some(deadline) = Instant::now().checked_add(REQUEST_TIMEOUT) { + self.reaper.add( + tokio::spawn(Self::handle_network_config_request( + self.database.clone(), + source.clone(), + source_path.clone(), + network_id, + meta_data, + have_revision, + have_timestamp, + )), + deadline, + ); + } else { + eprintln!("WARNING: instant + REQUEST_TIMEOUT overflowed! should be impossible."); + } + + PacketHandlerResult::Ok } - _ => false, + _ => PacketHandlerResult::NotHandled, } } - async fn handle_error( + fn handle_error( &self, - source: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, in_re_verb: u8, in_re_message_id: u64, error_code: u8, payload: &PacketBuffer, cursor: &mut usize, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled } - async fn handle_ok( + fn handle_ok( &self, - source: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled } fn should_communicate_with(&self, _: &Identity) -> bool { diff --git a/crypto/Cargo.toml b/crypto/Cargo.toml index a044e002e..13132c0b0 100644 --- a/crypto/Cargo.toml +++ b/crypto/Cargo.toml @@ -7,17 +7,18 @@ version = "0.1.0" [dependencies] zerotier-utils = { path = "../utils" } -ed25519-dalek = {version = "1.0.1", features = ["std", "u64_backend"], default-features = false} +ed25519-dalek = { version = "1.0.1", features = ["std", "u64_backend"], default-features = false } foreign-types = "0.3.1" lazy_static = "^1" -openssl = {version = "^0", features = [], default-features = false} -parking_lot = {version = "^0", features = [], default-features = false} -poly1305 = {version = "0.7.2", features = [], default-features = false} -pqc_kyber = {path = "../third_party/kyber", features = ["kyber1024", "reference"], default-features = false} +openssl = { version = "^0", features = [], default-features = false } +parking_lot = { version = "^0", features = [], default-features = false } +poly1305 = { version = "0.7.2", features = [], default-features = false } +pqc_kyber = { path = "../third_party/kyber", features = ["kyber1024", "reference"], default-features = false } +#pqc_kyber = { version = "^0", features = ["kyber1024", "reference"], default-features = false } rand_core = "0.5.1" -rand_core_062 = {package = "rand_core", version = "0.6.2"} +rand_core_062 = { package = "rand_core", version = "0.6.2" } subtle = "2.4.1" -x25519-dalek = {version = "1.2.0", features = ["std", "u64_backend"], default-features = false} +x25519-dalek = { version = "1.2.0", features = ["std", "u64_backend"], default-features = false } [target."cfg(not(any(target_os = \"macos\", target_os = \"ios\")))".dependencies] openssl = "^0" diff --git a/network-hypervisor/Cargo.toml b/network-hypervisor/Cargo.toml index 98af897b6..1d88632c5 100644 --- a/network-hypervisor/Cargo.toml +++ b/network-hypervisor/Cargo.toml @@ -12,7 +12,6 @@ debug_events = [] [dependencies] zerotier-crypto = { path = "../crypto" } zerotier-utils = { path = "../utils" } -async-trait = "^0" base64 = "^0" lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] } parking_lot = { version = "^0", features = [], default-features = false } @@ -22,7 +21,7 @@ serde = { version = "^1", features = ["derive"], default-features = false } rand = "*" serde_json = "*" serde_cbor = "*" -criterion = "0.3" +criterion = "^0" [target."cfg(not(windows))".dependencies] libc = "^0" diff --git a/network-hypervisor/src/protocol.rs b/network-hypervisor/src/protocol.rs index 4d0a70338..730c4d56f 100644 --- a/network-hypervisor/src/protocol.rs +++ b/network-hypervisor/src/protocol.rs @@ -9,7 +9,7 @@ use zerotier_utils::buffer::{Buffer, PooledBufferFactory}; use zerotier_utils::pool::{Pool, Pooled}; /* - * Protocol versions + * Legacy V1 protocol versions: * * 1 - 0.2.0 ... 0.2.5 * 2 - 0.3.0 ... 0.4.5 @@ -46,11 +46,14 @@ pub const PROTOCOL_VERSION: u8 = 20; /// We could probably push it back to 8 or 9 with some added support for sending Salsa/Poly packets. pub const PROTOCOL_VERSION_MIN: u8 = 11; +/// Size of a pooled packet buffer. +pub const PACKET_BUFFER_SIZE: usize = 16384; + /// Buffer sized for ZeroTier packets. -pub type PacketBuffer = Buffer<{ v1::SIZE_MAX }>; +pub type PacketBuffer = Buffer; /// Factory type to supply to a new PacketBufferPool, used in PooledPacketBuffer and PacketBufferPool types. -pub type PacketBufferFactory = PooledBufferFactory<{ crate::protocol::v1::SIZE_MAX }>; +pub type PacketBufferFactory = PooledBufferFactory; /// Packet buffer checked out of pool, automatically returns on drop. pub type PooledPacketBuffer = Pooled; @@ -58,9 +61,6 @@ pub type PooledPacketBuffer = Pooled; /// Source for instances of PacketBuffer pub type PacketBufferPool = Pool; -/// 64-bit packet (outer) ID. -pub type PacketId = u64; - /// 64-bit message ID (obtained after AEAD decryption). pub type MessageId = u64; @@ -116,9 +116,6 @@ pub const ADDRESS_SIZE_STRING: usize = 10; /// Prefix indicating reserved addresses (that can't actually be addresses). pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff; -/// Size of an identity fingerprint (SHA384) -pub const IDENTITY_FINGERPRINT_SIZE: usize = 48; - pub(crate) mod v1 { use super::*; @@ -276,7 +273,7 @@ pub(crate) mod v1 { impl PacketHeader { #[inline(always)] - pub fn packet_id(&self) -> PacketId { + pub fn packet_id(&self) -> u64 { u64::from_ne_bytes(self.id) } @@ -344,7 +341,7 @@ pub(crate) mod v1 { impl FragmentHeader { #[inline(always)] - pub fn packet_id(&self) -> PacketId { + pub fn packet_id(&self) -> u64 { u64::from_ne_bytes(self.id) } @@ -424,38 +421,38 @@ pub(crate) mod v1 { } } -/// Maximum difference between current message ID and OK/ERROR in-re message ID. -pub const PACKET_RESPONSE_COUNTER_DELTA_MAX: u64 = 4096; +/// Maximum delta between the message ID of a sent packet and its response. +pub(crate) const PACKET_RESPONSE_COUNTER_DELTA_MAX: u64 = 256; -/// Frequency for WHOIS retries -pub const WHOIS_RETRY_INTERVAL: i64 = 1000; +/// Frequency for WHOIS retries in milliseconds. +pub(crate) const WHOIS_RETRY_INTERVAL: i64 = 1500; /// Maximum number of WHOIS retries -pub const WHOIS_RETRY_MAX: u16 = 3; +pub(crate) const WHOIS_RETRY_COUNT_MAX: u16 = 3; /// Maximum number of packets to queue up behind a WHOIS. -pub const WHOIS_MAX_WAITING_PACKETS: usize = 64; +pub(crate) const WHOIS_MAX_WAITING_PACKETS: usize = 32; /// Keepalive interval for paths in milliseconds. -pub const PATH_KEEPALIVE_INTERVAL: i64 = 20000; +pub(crate) const PATH_KEEPALIVE_INTERVAL: i64 = 20000; /// Path object expiration time in milliseconds since last receive. -pub const PATH_EXPIRATION_TIME: i64 = (PATH_KEEPALIVE_INTERVAL * 2) + 10000; +pub(crate) const PATH_EXPIRATION_TIME: i64 = (PATH_KEEPALIVE_INTERVAL * 2) + 10000; /// How often to send HELLOs to roots, which is more often than normal peers. -pub const ROOT_HELLO_INTERVAL: i64 = PATH_KEEPALIVE_INTERVAL * 2; +pub(crate) const ROOT_HELLO_INTERVAL: i64 = PATH_KEEPALIVE_INTERVAL * 2; /// How often to send HELLOs to roots when we are offline. -pub const ROOT_HELLO_SPAM_INTERVAL: i64 = 5000; +pub(crate) const ROOT_HELLO_SPAM_INTERVAL: i64 = 5000; /// How often to send HELLOs to regular peers. -pub const PEER_HELLO_INTERVAL_MAX: i64 = 300000; +pub(crate) const PEER_HELLO_INTERVAL_MAX: i64 = 300000; /// Timeout for path association with peers and for peers themselves. -pub const PEER_EXPIRATION_TIME: i64 = (PEER_HELLO_INTERVAL_MAX * 2) + 10000; +pub(crate) const PEER_EXPIRATION_TIME: i64 = (PEER_HELLO_INTERVAL_MAX * 2) + 10000; /// Proof of work difficulty (threshold) for identity generation. -pub const IDENTITY_POW_THRESHOLD: u8 = 17; +pub(crate) const IDENTITY_POW_THRESHOLD: u8 = 17; #[cfg(test)] mod tests { diff --git a/network-hypervisor/src/vl1/endpoint.rs b/network-hypervisor/src/vl1/endpoint.rs index bfcce1b51..c7050c4d9 100644 --- a/network-hypervisor/src/vl1/endpoint.rs +++ b/network-hypervisor/src/vl1/endpoint.rs @@ -7,8 +7,8 @@ use std::str::FromStr; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::error::InvalidFormatError; -use crate::protocol::IDENTITY_FINGERPRINT_SIZE; use crate::util::marshalable::*; +use crate::vl1::identity::IDENTITY_FINGERPRINT_SIZE; use crate::vl1::inetaddress::InetAddress; use crate::vl1::{Address, MAC}; diff --git a/network-hypervisor/src/vl1/identity.rs b/network-hypervisor/src/vl1/identity.rs index 442dc419f..f85ae4f19 100644 --- a/network-hypervisor/src/vl1/identity.rs +++ b/network-hypervisor/src/vl1/identity.rs @@ -19,11 +19,14 @@ use zerotier_utils::hex; use zerotier_utils::memory::{as_byte_array, as_flat_object}; use crate::error::{InvalidFormatError, InvalidParameterError}; -use crate::protocol::{ADDRESS_SIZE, ADDRESS_SIZE_STRING, IDENTITY_FINGERPRINT_SIZE, IDENTITY_POW_THRESHOLD}; +use crate::protocol::{ADDRESS_SIZE, ADDRESS_SIZE_STRING, IDENTITY_POW_THRESHOLD}; use crate::vl1::Address; /// Current maximum size for an identity signature. -pub const MAX_SIGNATURE_SIZE: usize = P384_ECDSA_SIGNATURE_SIZE + 1; +pub const IDENTITY_MAX_SIGNATURE_SIZE: usize = P384_ECDSA_SIGNATURE_SIZE + 1; + +/// Size of an identity fingerprint (SHA384) +pub const IDENTITY_FINGERPRINT_SIZE: usize = 48; /// Secret keys associated with NIST P-384 public keys. #[derive(Clone)] @@ -358,7 +361,7 @@ impl Identity { /// set the old 96-byte signature plus hash format used in ZeroTier v1 is used. /// /// A return of None happens if we don't have our secret key(s) or some other error occurs. - pub fn sign(&self, msg: &[u8], legacy_ed25519_only: bool) -> Option> { + pub fn sign(&self, msg: &[u8], legacy_ed25519_only: bool) -> Option> { if let Some(secret) = self.secret.as_ref() { if legacy_ed25519_only { Some(secret.ed25519.sign_zt(msg).into()) diff --git a/network-hypervisor/src/vl1/mod.rs b/network-hypervisor/src/vl1/mod.rs index d5c4c888a..33208747f 100644 --- a/network-hypervisor/src/vl1/mod.rs +++ b/network-hypervisor/src/vl1/mod.rs @@ -9,7 +9,6 @@ mod path; mod peer; mod rootset; mod symmetricsecret; -mod whoisqueue; pub(crate) mod node; @@ -22,7 +21,7 @@ pub use event::Event; pub use identity::Identity; pub use inetaddress::InetAddress; pub use mac::MAC; -pub use node::{DummyInnerProtocol, DummyPathFilter, HostSystem, InnerProtocol, Node, NodeStorage, PathFilter}; +pub use node::{DummyInnerProtocol, DummyPathFilter, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, PathFilter}; pub use path::Path; pub use peer::Peer; pub use rootset::{Root, RootSet}; diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index bb74bc379..b6e0192f3 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -7,7 +7,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; +use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use crate::error::InvalidParameterError; use crate::protocol::*; @@ -21,33 +21,26 @@ use crate::vl1::identity::Identity; use crate::vl1::path::{Path, PathServiceResult}; use crate::vl1::peer::Peer; use crate::vl1::rootset::RootSet; -use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue}; use zerotier_crypto::random; use zerotier_crypto::verified::Verified; use zerotier_utils::hex; +use zerotier_utils::ringbuffer::RingBuffer; /// Trait implemented by external code to handle events and provide an interface to the system or application. /// /// These methods are basically callbacks that the core calls to request or transmit things. They are called /// during calls to things like wire_recieve() and do_background_tasks(). -#[async_trait] pub trait HostSystem: Sync + Send + 'static { /// Type for local system sockets. - type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; + type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + 'static; /// Type for local system interfaces. - type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; + type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString; - /// An event occurred. - /// - /// This isn't async to avoid all kinds of issues in code that deals with locks. If you need - /// it to be async use a channel or something. + /// A VL1 level event occurred. fn event(&self, event: Event); - /// A USER_MESSAGE packet was received. - async fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]); - /// Check a local socket for validity. /// /// This could return false if the socket's interface no longer exists, its port has been @@ -56,8 +49,7 @@ pub trait HostSystem: Sync + Send + 'static { /// Called to send a packet over the physical network (virtual -> physical). /// - /// This may return false if the send definitely failed. Otherwise it should return true - /// which indicates possible success but with no guarantee (UDP semantics). + /// This sends with UDP-like semantics. It should do whatever best effort it can and return. /// /// If a local socket is specified the implementation should send from that socket or not /// at all (returning false). If a local interface is specified the implementation should @@ -66,15 +58,16 @@ pub trait HostSystem: Sync + Send + 'static { /// /// For endpoint types that support a packet TTL, the implementation may set the TTL /// if the 'ttl' parameter is not zero. If the parameter is zero or TTL setting is not - /// supported, the default TTL should be used. - async fn wire_send( + /// supported, the default TTL should be used. This parameter is ignored for types that + /// don't support it. + fn wire_send( &self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[u8], packet_ttl: u8, - ) -> bool; + ); /// Called to get the current time in milliseconds from the system monotonically increasing clock. /// This needs to be accurate to about 250 milliseconds resolution or better. @@ -86,20 +79,18 @@ pub trait HostSystem: Sync + Send + 'static { } /// Trait to be implemented by outside code to provide object storage to VL1 -#[async_trait] pub trait NodeStorage: Sync + Send + 'static { /// Load this node's identity from the data store. - async fn load_node_identity(&self) -> Option; + fn load_node_identity(&self) -> Option; /// Save this node's identity to the data store. - async fn save_node_identity(&self, id: &Identity); + fn save_node_identity(&self, id: &Identity); } /// Trait to be implemented to provide path hints and a filter to approve physical paths -#[async_trait] pub trait PathFilter: Sync + Send + 'static { /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. - async fn check_path( + fn check_path( &self, id: &Identity, endpoint: &Endpoint, @@ -108,7 +99,7 @@ pub trait PathFilter: Sync + Send + 'static { ) -> bool; /// Called to look up any statically defined or memorized paths to known nodes. - async fn get_path_hints( + fn get_path_hints( &self, id: &Identity, ) -> Option< @@ -120,45 +111,56 @@ pub trait PathFilter: Sync + Send + 'static { >; } +/// Result of a packet handler. +pub enum PacketHandlerResult { + /// Packet was handled successfully. + Ok, + + /// Packet was handled and an error occurred (malformed, authentication failure, etc.) + Error, + + /// Packet was not handled by this handler. + NotHandled, +} + /// Interface between VL1 and higher/inner protocol layers. /// /// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but /// it could also be implemented for testing or "off label" use of VL1 to carry different protocols. -#[async_trait] pub trait InnerProtocol: Sync + Send + 'static { /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). - async fn handle_packet( + fn handle_packet( &self, - source: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, verb: u8, payload: &PacketBuffer, - ) -> bool; + ) -> PacketHandlerResult; /// Handle errors, returning true if the error was recognized. - async fn handle_error( + fn handle_error( &self, - source: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, in_re_verb: u8, in_re_message_id: u64, error_code: u8, payload: &PacketBuffer, cursor: &mut usize, - ) -> bool; + ) -> PacketHandlerResult; /// Handle an OK, returing true if the OK was recognized. - async fn handle_ok( + fn handle_ok( &self, - source: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize, - ) -> bool; + ) -> PacketHandlerResult; /// Check if this peer should communicate with another at all. fn should_communicate_with(&self, id: &Identity) -> bool; @@ -167,17 +169,6 @@ pub trait InnerProtocol: Sync + Send + 'static { /// How often to check the root cluster definitions against the root list and update. const ROOT_SYNC_INTERVAL_MS: i64 = 1000; -#[derive(Default)] -struct BackgroundTaskIntervals { - root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, - root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>, - root_spam_hello: IntervalGate<{ ROOT_HELLO_SPAM_INTERVAL }>, - peer_service: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>, - path_service: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>, - whois_service: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>, -} - -/// Mutable fields related to roots and root sets. struct RootInfo { /// Root sets to which we are a member. sets: HashMap>, @@ -196,56 +187,20 @@ struct RootInfo { online: bool, } -/// Key used to look up paths in a hash map -/// This supports copied keys for storing and refs for fast lookup without having to copy anything. -enum PathKey<'a, HostSystemImpl: HostSystem> { - Copied(Endpoint, HostSystemImpl::LocalSocket), - Ref(&'a Endpoint, &'a HostSystemImpl::LocalSocket), +#[derive(Default)] +struct BackgroundTaskIntervals { + root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, + root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>, + root_spam_hello: IntervalGate<{ ROOT_HELLO_SPAM_INTERVAL }>, + peer_service: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>, + path_service: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>, + whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, } -impl<'a, HostSystemImpl: HostSystem> Hash for PathKey<'a, HostSystemImpl> { - fn hash(&self, state: &mut H) { - match self { - Self::Copied(ep, ls) => { - ep.hash(state); - ls.hash(state); - } - Self::Ref(ep, ls) => { - (*ep).hash(state); - (*ls).hash(state); - } - } - } -} - -impl<'a, HostSystemImpl: HostSystem> PartialEq for PathKey<'_, HostSystemImpl> { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2), - (Self::Copied(ep1, ls1), Self::Ref(ep2, ls2)) => ep1.eq(*ep2) && ls1.eq(*ls2), - (Self::Ref(ep1, ls1), Self::Copied(ep2, ls2)) => (*ep1).eq(ep2) && (*ls1).eq(ls2), - (Self::Ref(ep1, ls1), Self::Ref(ep2, ls2)) => (*ep1).eq(*ep2) && (*ls1).eq(*ls2), - } - } -} - -impl<'a, HostSystemImpl: HostSystem> Eq for PathKey<'_, HostSystemImpl> {} - -impl<'a, HostSystemImpl: HostSystem> PathKey<'a, HostSystemImpl> { - #[inline(always)] - fn local_socket(&self) -> &HostSystemImpl::LocalSocket { - match self { - Self::Copied(_, ls) => ls, - Self::Ref(_, ls) => *ls, - } - } - - fn to_copied(&self) -> PathKey<'static, HostSystemImpl> { - match self { - Self::Copied(ep, ls) => PathKey::<'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()), - Self::Ref(ep, ls) => PathKey::<'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()), - } - } +#[derive(Default)] +struct WhoisQueueItem { + waiting_packets: RingBuffer, + retry_count: u16, } /// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network. @@ -260,40 +215,40 @@ pub struct Node { pub identity: Identity, /// Interval latches for periodic background tasks. - intervals: parking_lot::Mutex, + intervals: Mutex, /// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use. - paths: parking_lot::RwLock, Arc>>>, + paths: RwLock, Arc>>>, /// Peers with which we are currently communicating. - peers: parking_lot::RwLock>>>, + peers: RwLock>>>, /// This node's trusted roots, sorted in ascending order of quality/preference, and cluster definitions. - roots: parking_lot::RwLock>, + roots: RwLock>, /// Current best root. - best_root: parking_lot::RwLock>>>, + best_root: RwLock>>>, - /// Identity lookup queue, also holds packets waiting on a lookup. - whois: WhoisQueue, + /// Queue of identities being looked up. + whois_queue: Mutex>, } impl Node { - pub async fn new( + pub fn new( host_system: &HostSystemImpl, storage: &NodeStorageImpl, auto_generate_identity: bool, auto_upgrade_identity: bool, ) -> Result { let mut id = { - let id = storage.load_node_identity().await; + let id = storage.load_node_identity(); if id.is_none() { if !auto_generate_identity { return Err(InvalidParameterError("no identity found and auto-generate not enabled")); } else { let id = Identity::generate(); host_system.event(Event::IdentityAutoGenerated(id.clone())); - storage.save_node_identity(&id).await; + storage.save_node_identity(&id); id } } else { @@ -304,7 +259,7 @@ impl Node { if auto_upgrade_identity { let old = id.clone(); if id.upgrade()? { - storage.save_node_identity(&id).await; + storage.save_node_identity(&id); host_system.event(Event::IdentityAutoUpgraded(old, id.clone())); } } @@ -314,18 +269,18 @@ impl Node { Ok(Self { instance_id: random::get_bytes_secure(), identity: id, - intervals: parking_lot::Mutex::new(BackgroundTaskIntervals::default()), - paths: parking_lot::RwLock::new(HashMap::new()), - peers: parking_lot::RwLock::new(HashMap::new()), - roots: parking_lot::RwLock::new(RootInfo { + intervals: Mutex::new(BackgroundTaskIntervals::default()), + paths: RwLock::new(HashMap::new()), + peers: RwLock::new(HashMap::new()), + roots: RwLock::new(RootInfo { sets: HashMap::new(), roots: HashMap::new(), this_root_sets: None, sets_modified: false, online: false, }), - best_root: parking_lot::RwLock::new(None), - whois: WhoisQueue::new(), + best_root: RwLock::new(None), + whois_queue: Mutex::new(HashMap::new()), }) } @@ -396,17 +351,20 @@ impl Node { } } - pub async fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { - let tt = host_system.time_ticks(); - let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_service) = { + pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { + const INTERVAL_MS: i64 = 1000; + const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64); + let time_ticks = host_system.time_ticks(); + + let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_queue_retry) = { let mut intervals = self.intervals.lock(); ( - intervals.root_sync.gate(tt), - intervals.root_hello.gate(tt), - intervals.root_spam_hello.gate(tt), - intervals.peer_service.gate(tt), - intervals.path_service.gate(tt), - intervals.whois_service.gate(tt), + intervals.root_sync.gate(time_ticks), + intervals.root_hello.gate(time_ticks), + intervals.root_spam_hello.gate(time_ticks), + intervals.peer_service.gate(time_ticks), + intervals.path_service.gate(time_ticks), + intervals.whois_queue_retry.gate(time_ticks), ) }; @@ -443,11 +401,11 @@ impl Node { } else { "" }, - if whois_service { - " whois_service" + if whois_queue_retry { + " whois_queue_retry" } else { "" - }, + } ); if root_sync { @@ -509,9 +467,9 @@ impl Node { if let Some(peer) = peers.get(&m.identity.address) { new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); } else { - if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), tt) { + if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), time_ticks) { new_roots.insert( - parking_lot::RwLockUpgradableReadGuard::upgrade(peers) + RwLockUpgradableReadGuard::upgrade(peers) .entry(m.identity.address) .or_insert_with(|| Arc::new(peer)) .clone(), @@ -553,7 +511,7 @@ impl Node { } } - self.update_best_root(host_system, tt); + self.update_best_root(host_system, time_ticks); } // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint @@ -577,7 +535,9 @@ impl Node { root.identity.address.to_string(), ROOT_HELLO_INTERVAL ); - root.send_hello(host_system, self, Some(ep)).await; + let root = root.clone(); + let ep = ep.clone(); + root.send_hello(host_system, self, Some(&ep)); } } } @@ -589,7 +549,7 @@ impl Node { { let roots = self.roots.read(); for (a, peer) in self.peers.read().iter() { - if !peer.service(host_system, self, tt) && !roots.roots.contains_key(peer) { + if !peer.service(host_system, self, time_ticks) && !roots.roots.contains_key(peer) { dead_peers.push(*a); } } @@ -600,13 +560,13 @@ impl Node { } if path_service { - // Service all paths, removing expired or invalid ones. This is done in two passes to - // avoid introducing latency into a flow. let mut dead_paths = Vec::new(); let mut need_keepalive = Vec::new(); + + // First check all paths in read mode to avoid blocking the entire node. for (k, path) in self.paths.read().iter() { if host_system.local_socket_is_valid(k.local_socket()) { - match path.service(tt) { + match path.service(time_ticks) { PathServiceResult::Ok => {} PathServiceResult::Dead => dead_paths.push(k.to_copied()), PathServiceResult::NeedsKeepalive => need_keepalive.push(path.clone()), @@ -615,26 +575,40 @@ impl Node { dead_paths.push(k.to_copied()); } } + + // Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking. for dp in dead_paths.iter() { self.paths.write().remove(dp); } - let ka = [tt as u8]; // send different bytes every time for keepalive in case some things filter zero packets + + // Finally run keepalive sends as a batch. + let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance for p in need_keepalive.iter() { - host_system - .wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &ka[..1], 0) - .await; + host_system.wire_send(&p.endpoint, Some(&p.local_socket), Some(&p.local_interface), &keepalive_buf, 0); } } - if whois_service { - self.whois.service(host_system, self, tt); + if whois_queue_retry { + let need_whois = { + let mut need_whois = Vec::new(); + let mut whois_queue = self.whois_queue.lock(); + whois_queue.retain(|_, qi| qi.retry_count <= WHOIS_RETRY_COUNT_MAX); + for (address, qi) in whois_queue.iter_mut() { + qi.retry_count += 1; + need_whois.push(*address); + } + need_whois + }; + if !need_whois.is_empty() { + self.send_whois(host_system, need_whois.as_slice()); + } } debug_event!(host_system, "[vl1] do_background_tasks DONE ----"); - Duration::from_millis(1000) + INTERVAL } - pub async fn handle_incoming_physical_packet( + pub fn handle_incoming_physical_packet( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -656,6 +630,16 @@ impl Node { source_local_interface.to_string() ); + // An 0xff value at byte [8] means this is a ZSSP packet. This is accomplished via the + // backward compatibilty hack of always having 0xff at byte [4] of 6-byte session IDs + // and by having 0xffffffffffff be the "nil" session ID for session init packets. ZSSP + // is the new V2 Noise-based forward-secure transport protocol. What follows below this + // is legacy handling of the old v1 protocol. + if data.u8_at(8).map_or(false, |x| x == 0xff) { + todo!(); + } + + // Legacy ZeroTier V1 packet handling if let Ok(fragment_header) = data.struct_mut_at::(0) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { let time_ticks = host_system.time_ticks(); @@ -668,7 +652,7 @@ impl Node { let fragment_header_id = u64::from_be_bytes(fragment_header.id); debug_event!( host_system, - "[vl1] #{:0>16x} fragment {} of {} received", + "[vl1] [v1] #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments() @@ -683,7 +667,7 @@ impl Node { ) { if let Some(frag0) = assembled_packet.frags[0].as_ref() { #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] #{:0>16x} packet fully assembled!", fragment_header_id); + debug_event!(host_system, "[vl1] [v1] #{:0>16x} packet fully assembled!", fragment_header_id); if let Ok(packet_header) = frag0.struct_at::(0) { if let Some(source) = Address::from_bytes(&packet_header.src) { @@ -697,11 +681,16 @@ impl Node { packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)], - ) - .await; + ); } else { - self.whois - .query(self, host_system, source, Some(QueuedPacket::Fragmented(assembled_packet))); + /* + self.whois_lookup_queue.query( + self, + host_system, + source, + Some(QueuedPacket::Fragmented(assembled_packet)), + ); + */ } } } @@ -710,14 +699,17 @@ impl Node { } else { #[cfg(debug_assertions)] if let Ok(packet_header) = data.struct_at::(0) { - debug_event!(host_system, "[vl1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id)); + debug_event!( + host_system, + "[vl1] [v1] #{:0>16x} is unfragmented", + u64::from_be_bytes(packet_header.id) + ); if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]) - .await; + peer.receive(self, host_system, inner, time_ticks, &path, packet_header, data.as_ref(), &[]); } else { - self.whois.query(self, host_system, source, Some(QueuedPacket::Unfragmented(data))); + self.whois(host_system, source, Some(data)); } } } @@ -732,14 +724,14 @@ impl Node { debug_packet_id = u64::from_be_bytes(fragment_header.id); debug_event!( host_system, - "[vl1] #{:0>16x} forwarding packet fragment to {}", + "[vl1] [v1] #{:0>16x} forwarding packet fragment to {}", debug_packet_id, dest.to_string() ); } if fragment_header.increment_hops() > v1::FORWARD_MAX_HOPS { #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); + debug_event!(host_system, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); return; } } else { @@ -749,7 +741,7 @@ impl Node { debug_packet_id = u64::from_be_bytes(packet_header.id); debug_event!( host_system, - "[vl1] #{:0>16x} forwarding packet to {}", + "[vl1] [v1] #{:0>16x} forwarding packet to {}", debug_packet_id, dest.to_string() ); @@ -758,7 +750,7 @@ impl Node { #[cfg(debug_assertions)] debug_event!( host_system, - "[vl1] #{:0>16x} discarded: max hops exceeded!", + "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(packet_header.id) ); return; @@ -770,15 +762,35 @@ impl Node { if let Some(peer) = self.peer(dest) { // TODO: SHOULD we forward? Need a way to check. - peer.forward(host_system, time_ticks, data.as_ref()).await; + peer.forward(host_system, time_ticks, data.as_ref()); #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] #{:0>16x} forwarded successfully", debug_packet_id); + debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id); } } } } } + fn whois(&self, host_system: &HostSystemImpl, address: Address, waiting_packet: Option) { + { + let mut whois_queue = self.whois_queue.lock(); + let qi = whois_queue.entry(address).or_default(); + if let Some(p) = waiting_packet { + qi.waiting_packets.add(p); + } + if qi.retry_count > 0 { + return; + } else { + qi.retry_count += 1; + } + } + self.send_whois(host_system, &[address]); + } + + fn send_whois(&self, host_system: &HostSystemImpl, addresses: &[Address]) { + if let Some(root) = self.best_root() {} + } + /// Get the current "best" root from among this node's trusted roots. pub fn best_root(&self) -> Option>> { self.best_root.read().clone() @@ -865,47 +877,103 @@ impl Node { } } +/// Key used to look up paths in a hash map +/// This supports copied keys for storing and refs for fast lookup without having to copy anything. +enum PathKey<'a, 'b, HostSystemImpl: HostSystem> { + Copied(Endpoint, HostSystemImpl::LocalSocket), + Ref(&'a Endpoint, &'b HostSystemImpl::LocalSocket), +} + +impl<'a, 'b, HostSystemImpl: HostSystem> Hash for PathKey<'a, 'b, HostSystemImpl> { + fn hash(&self, state: &mut H) { + match self { + Self::Copied(ep, ls) => { + ep.hash(state); + ls.hash(state); + } + Self::Ref(ep, ls) => { + (*ep).hash(state); + (*ls).hash(state); + } + } + } +} + +impl PartialEq for PathKey<'_, '_, HostSystemImpl> { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2), + (Self::Copied(ep1, ls1), Self::Ref(ep2, ls2)) => ep1.eq(*ep2) && ls1.eq(*ls2), + (Self::Ref(ep1, ls1), Self::Copied(ep2, ls2)) => (*ep1).eq(ep2) && (*ls1).eq(ls2), + (Self::Ref(ep1, ls1), Self::Ref(ep2, ls2)) => (*ep1).eq(*ep2) && (*ls1).eq(*ls2), + } + } +} + +impl Eq for PathKey<'_, '_, HostSystemImpl> {} + +impl<'a, 'b, HostSystemImpl: HostSystem> PathKey<'a, 'b, HostSystemImpl> { + #[inline(always)] + fn local_socket(&self) -> &HostSystemImpl::LocalSocket { + match self { + Self::Copied(_, ls) => ls, + Self::Ref(_, ls) => *ls, + } + } + + #[inline(always)] + fn to_copied(&self) -> PathKey<'static, 'static, HostSystemImpl> { + match self { + Self::Copied(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()), + Self::Ref(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()), + } + } +} + /// Dummy no-op inner protocol for debugging and testing. #[derive(Default)] pub struct DummyInnerProtocol; -#[async_trait] impl InnerProtocol for DummyInnerProtocol { - async fn handle_packet( + #[inline(always)] + fn handle_packet( &self, - _source: &Peer, - _source_path: &Path, + _source: &Arc>, + _source_path: &Arc>, _verb: u8, _payload: &PacketBuffer, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled } - async fn handle_error( + #[inline(always)] + fn handle_error( &self, - _source: &Peer, - _source_path: &Path, + _source: &Arc>, + _source_path: &Arc>, _in_re_verb: u8, _in_re_message_id: u64, _error_code: u8, _payload: &PacketBuffer, _cursor: &mut usize, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled } - async fn handle_ok( + #[inline(always)] + fn handle_ok( &self, - _source: &Peer, - _source_path: &Path, + _source: &Arc>, + _source_path: &Arc>, _in_re_verb: u8, _in_re_message_id: u64, _payload: &PacketBuffer, _cursor: &mut usize, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled } + #[inline(always)] fn should_communicate_with(&self, _id: &Identity) -> bool { true } @@ -915,9 +983,9 @@ impl InnerProtocol for DummyInnerProtocol { #[derive(Default)] pub struct DummyPathFilter; -#[async_trait] impl PathFilter for DummyPathFilter { - async fn check_path( + #[inline(always)] + fn check_path( &self, _id: &Identity, _endpoint: &Endpoint, @@ -927,7 +995,8 @@ impl PathFilter for DummyPathFilter { true } - async fn get_path_hints( + #[inline(always)] + fn get_path_hints( &self, _id: &Identity, ) -> Option< diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index e5280db2a..3335db789 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -32,7 +32,7 @@ pub struct Path { last_send_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64, create_time_ticks: i64, - fragmented_packets: Mutex>, + fragmented_packets: Mutex>, } impl Path { @@ -57,7 +57,7 @@ impl Path { /// This returns None if more fragments are needed to assemble the packet. pub(crate) fn receive_fragment( &self, - packet_id: PacketId, + packet_id: u64, fragment_no: u8, fragment_expecting_count: u8, packet: PooledPacketBuffer, diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index 7a9f48f9e..a88a47f80 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -206,7 +206,7 @@ impl Peer { (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME } - async fn internal_send( + fn internal_send( &self, host_system: &HostSystemImpl, endpoint: &Endpoint, @@ -214,16 +214,11 @@ impl Peer { local_interface: Option<&HostSystemImpl::LocalInterface>, max_fragment_size: usize, packet: &PacketBuffer, - ) -> bool { + ) { let packet_size = packet.len(); if packet_size > max_fragment_size { let bytes = packet.as_bytes(); - if !host_system - .wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0) - .await - { - return false; - } + host_system.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0); let mut pos = UDP_DEFAULT_MTU; let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32; @@ -247,23 +242,16 @@ impl Peer { let fragment_size = v1::FRAGMENT_HEADER_SIZE + chunk_size; tmp_buf[..v1::FRAGMENT_HEADER_SIZE].copy_from_slice(header.as_bytes()); tmp_buf[v1::FRAGMENT_HEADER_SIZE..fragment_size].copy_from_slice(&bytes[pos..next_pos]); - if !host_system - .wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0) - .await - { - return false; - } + host_system.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0); pos = next_pos; if pos < packet_size { chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE); } else { - return true; + break; } } } else { - return host_system - .wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0) - .await; + host_system.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0); } } @@ -273,7 +261,7 @@ impl Peer { /// via a root or some other route. /// /// It encrypts and sets the MAC and cipher fields and packet ID and other things. - pub(crate) async fn send( + pub(crate) fn send( &self, host_system: &HostSystemImpl, path: Option<&Arc>>, @@ -326,22 +314,18 @@ impl Peer { return false; } - if self - .internal_send( - host_system, - &path.endpoint, - Some(&path.local_socket), - Some(&path.local_interface), - max_fragment_size, - packet, - ) - .await - { - self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); - return true; - } + self.internal_send( + host_system, + &path.endpoint, + Some(&path.local_socket), + Some(&path.local_interface), + max_fragment_size, + packet, + ); - return false; + self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); + + return true; } /// Forward a packet to this peer. @@ -351,21 +335,17 @@ impl Peer { /// /// This doesn't fragment large packets since fragments are forwarded individually. /// Intermediates don't need to adjust fragmentation. - pub(crate) async fn forward(&self, host_system: &HostSystemImpl, time_ticks: i64, packet: &PacketBuffer) -> bool { + pub(crate) fn forward(&self, host_system: &HostSystemImpl, time_ticks: i64, packet: &PacketBuffer) -> bool { if let Some(path) = self.direct_path() { - if host_system - .wire_send( - &path.endpoint, - Some(&path.local_socket), - Some(&path.local_interface), - packet.as_bytes(), - 0, - ) - .await - { - self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); - return true; - } + host_system.wire_send( + &path.endpoint, + Some(&path.local_socket), + Some(&path.local_interface), + packet.as_bytes(), + 0, + ); + self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); + return true; } return false; } @@ -378,7 +358,7 @@ impl Peer { /// Unlike other messages HELLO is sent partially in the clear and always with the long-lived /// static identity key. Authentication in old versions is via Poly1305 and in new versions /// via HMAC-SHA512. - pub(crate) async fn send_hello( + pub(crate) fn send_hello( &self, host_system: &HostSystemImpl, node: &Node, @@ -445,26 +425,20 @@ impl Peer { } if let Some(p) = path.as_ref() { - if self - .internal_send( - host_system, - destination, - Some(&p.local_socket), - Some(&p.local_interface), - max_fragment_size, - &packet, - ) - .await - { - p.log_send_anything(time_ticks); - true - } else { - false - } + self.internal_send( + host_system, + destination, + Some(&p.local_socket), + Some(&p.local_interface), + max_fragment_size, + &packet, + ); + p.log_send_anything(time_ticks); } else { - self.internal_send(host_system, destination, None, None, max_fragment_size, &packet) - .await + self.internal_send(host_system, destination, None, None, max_fragment_size, &packet); } + + return true; } /// Receive, decrypt, authenticate, and process an incoming packet from this peer. @@ -473,8 +447,8 @@ impl Peer { /// those fragments after the main packet header and first chunk. /// /// This returns true if the packet decrypted and passed authentication. - pub(crate) async fn receive( - &self, + pub(crate) fn receive( + self: &Arc, node: &Node, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -483,7 +457,7 @@ impl Peer { packet_header: &v1::PacketHeader, frag0: &PacketBuffer, fragments: &[Option], - ) -> bool { + ) -> PacketHandlerResult { if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(v1::VERB_INDEX) { let mut payload = PacketBuffer::new(); @@ -503,7 +477,7 @@ impl Peer { "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id) ); - return false; + return PacketHandlerResult::Error; }; if let Ok(mut verb) = payload.u8_at(0) { @@ -513,7 +487,7 @@ impl Peer { if let Ok(dlen) = lz4_flex::block::decompress_into(&payload.as_bytes()[1..], &mut decompressed_payload[1..]) { payload.set_to(&decompressed_payload[..(dlen + 1)]); } else { - return false; + return PacketHandlerResult::Error; } } @@ -541,68 +515,47 @@ impl Peer { verb as u32 ); - if match verb { - verbs::VL1_NOP => true, - verbs::VL1_HELLO => { - self.handle_incoming_hello( - host_system, - inner, - node, - time_ticks, - message_id, - source_path, - packet_header.hops(), - &payload, - ) - .await - } - verbs::VL1_ERROR => { - self.handle_incoming_error(host_system, inner, node, time_ticks, source_path, &payload) - .await - } - verbs::VL1_OK => { - self.handle_incoming_ok( - host_system, - inner, - node, - time_ticks, - source_path, - packet_header.hops(), - path_is_known, - &payload, - ) - .await - } - verbs::VL1_WHOIS => { - self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload) - .await - } + return match verb { + verbs::VL1_NOP => PacketHandlerResult::Ok, + verbs::VL1_HELLO => self.handle_incoming_hello( + host_system, + inner, + node, + time_ticks, + message_id, + source_path, + packet_header.hops(), + &payload, + ), + verbs::VL1_ERROR => self.handle_incoming_error(host_system, inner, node, time_ticks, source_path, &payload), + verbs::VL1_OK => self.handle_incoming_ok( + host_system, + inner, + node, + time_ticks, + source_path, + packet_header.hops(), + path_is_known, + &payload, + ), + verbs::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload), verbs::VL1_RENDEZVOUS => { self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload) - .await - } - verbs::VL1_ECHO => { - self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload) - .await } + verbs::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload), verbs::VL1_PUSH_DIRECT_PATHS => { self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload) - .await } - verbs::VL1_USER_MESSAGE => { - self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload) - .await - } - _ => inner.handle_packet(self, &source_path, verb, &payload).await, - } { - return true; - } + verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload), + _ => inner.handle_packet(self, &source_path, verb, &payload), + }; } } - return false; + + return PacketHandlerResult::Error; } - async fn handle_incoming_hello( + fn handle_incoming_hello( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -612,14 +565,14 @@ impl Peer { source_path: &Arc>, hops: u8, payload: &PacketBuffer, - ) -> bool { + ) -> PacketHandlerResult { if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { debug_event!( host_system, "[vl1] dropping HELLO from {} due to lack of trust relationship", self.identity.address.to_string() ); - return true; // packet wasn't invalid, just ignored + return PacketHandlerResult::Ok; // packet wasn't invalid, just ignored } let mut cursor = 0; @@ -653,48 +606,48 @@ impl Peer { f.1.version_revision = VERSION_REVISION.to_be_bytes(); } - return self.send(host_system, Some(source_path), node, time_ticks, &mut packet).await; + self.send(host_system, Some(source_path), node, time_ticks, &mut packet); + return PacketHandlerResult::Ok; } } } - return false; + + return PacketHandlerResult::Error; } - async fn handle_incoming_error( - &self, + fn handle_incoming_error( + self: &Arc, _: &HostSystemImpl, inner: &InnerProtocolImpl, _: &Node, _: i64, source_path: &Arc>, payload: &PacketBuffer, - ) -> bool { + ) -> PacketHandlerResult { let mut cursor = 0; if let Ok(error_header) = payload.read_struct::(&mut cursor) { let in_re_message_id: MessageId = u64::from_ne_bytes(error_header.in_re_message_id); if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { match error_header.in_re_verb { _ => { - return inner - .handle_error( - self, - &source_path, - error_header.in_re_verb, - in_re_message_id, - error_header.error_code, - payload, - &mut cursor, - ) - .await; + return inner.handle_error( + self, + &source_path, + error_header.in_re_verb, + in_re_message_id, + error_header.error_code, + payload, + &mut cursor, + ); } } } } - return false; + return PacketHandlerResult::Error; } - async fn handle_incoming_ok( - &self, + fn handle_incoming_ok( + self: &Arc, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, node: &Node, @@ -703,7 +656,7 @@ impl Peer { hops: u8, path_is_known: bool, payload: &PacketBuffer, - ) -> bool { + ) -> PacketHandlerResult { let mut cursor = 0; if let Ok(ok_header) = payload.read_struct::(&mut cursor) { let in_re_message_id: MessageId = u64::from_ne_bytes(ok_header.in_re_message_id); @@ -753,30 +706,28 @@ impl Peer { } } } else { - return true; // not invalid, just ignored + return PacketHandlerResult::Ok; // not invalid, just ignored } } _ => { - return inner - .handle_ok(self, &source_path, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor) - .await; + return inner.handle_ok(self, &source_path, ok_header.in_re_verb, in_re_message_id, payload, &mut cursor); } } } } - return false; + return PacketHandlerResult::Error; } - async fn handle_incoming_whois( - &self, + fn handle_incoming_whois( + self: &Arc, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, node: &Node, time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, - ) -> bool { + ) -> PacketHandlerResult { if node.this_node_is_root() || inner.should_communicate_with(&self.identity) { let mut packet = PacketBuffer::new(); packet.set_size(v1::HEADER_SIZE); @@ -793,33 +744,31 @@ impl Peer { if let Some(peer) = node.peer(zt_address) { if !packet.append_bytes((&peer.identity.to_public_bytes()).into()).is_ok() { debug_event!(host_system, "unexpected error serializing an identity into a WHOIS packet response"); - return false; + return PacketHandlerResult::Error; } } } } - self.send(host_system, None, node, time_ticks, &mut packet).await - } else { - true // packet wasn't invalid, just ignored + self.send(host_system, None, node, time_ticks, &mut packet); } + return PacketHandlerResult::Ok; } - #[allow(unused)] - async fn handle_incoming_rendezvous( - &self, + fn handle_incoming_rendezvous( + self: &Arc, host_system: &HostSystemImpl, node: &Node, time_ticks: i64, message_id: MessageId, source_path: &Arc>, payload: &PacketBuffer, - ) -> bool { + ) -> PacketHandlerResult { if node.is_peer_root(self) {} - return true; + return PacketHandlerResult::Ok; } - async fn handle_incoming_echo( + fn handle_incoming_echo( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -827,7 +776,7 @@ impl Peer { time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, - ) -> bool { + ) -> PacketHandlerResult { if inner.should_communicate_with(&self.identity) || node.is_peer_root(self) { let mut packet = PacketBuffer::new(); packet.set_size(v1::HEADER_SIZE); @@ -838,9 +787,7 @@ impl Peer { f.in_re_message_id = message_id.to_ne_bytes(); } if packet.append_bytes(payload.as_bytes()).is_ok() { - self.send(host_system, None, node, time_ticks, &mut packet).await - } else { - false + self.send(host_system, None, node, time_ticks, &mut packet); } } else { debug_event!( @@ -848,32 +795,30 @@ impl Peer { "[vl1] dropping ECHO from {} due to lack of trust relationship", self.identity.address.to_string() ); - true // packet wasn't invalid, just ignored } + return PacketHandlerResult::Ok; } - #[allow(unused)] - async fn handle_incoming_push_direct_paths( - &self, + fn handle_incoming_push_direct_paths( + self: &Arc, host_system: &HostSystemImpl, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::Ok } - #[allow(unused)] - async fn handle_incoming_user_message( - &self, + fn handle_incoming_user_message( + self: &Arc, host_system: &HostSystemImpl, node: &Node, time_ticks: i64, source_path: &Arc>, payload: &PacketBuffer, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::Ok } } diff --git a/network-hypervisor/src/vl1/rootset.rs b/network-hypervisor/src/vl1/rootset.rs index df0af9392..b710273d4 100644 --- a/network-hypervisor/src/vl1/rootset.rs +++ b/network-hypervisor/src/vl1/rootset.rs @@ -4,7 +4,7 @@ use std::collections::BTreeSet; use std::io::Write; use crate::util::marshalable::*; -use crate::vl1::identity::{Identity, MAX_SIGNATURE_SIZE}; +use crate::vl1::identity::{Identity, IDENTITY_MAX_SIGNATURE_SIZE}; use crate::vl1::Endpoint; use zerotier_utils::arrayvec::ArrayVec; @@ -33,7 +33,7 @@ pub struct Root { /// This is populated by the sign() method when the completed root set is signed by each member. /// All member roots must sign. #[serde(default)] - pub signature: ArrayVec, + pub signature: ArrayVec, /// Priority (higher number is lower priority, 0 is default). /// diff --git a/network-hypervisor/src/vl1/whoisqueue.rs b/network-hypervisor/src/vl1/whoisqueue.rs deleted file mode 100644 index 35d7227dc..000000000 --- a/network-hypervisor/src/vl1/whoisqueue.rs +++ /dev/null @@ -1,84 +0,0 @@ -// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. - -use std::collections::{HashMap, LinkedList}; - -use parking_lot::Mutex; - -use crate::protocol::{PooledPacketBuffer, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_INTERVAL, WHOIS_RETRY_MAX}; -use crate::util::gate::IntervalGate; -use crate::vl1::fragmentedpacket::FragmentedPacket; -use crate::vl1::node::{HostSystem, Node}; -use crate::vl1::Address; - -pub(crate) const SERVICE_INTERVAL_MS: i64 = WHOIS_RETRY_INTERVAL; - -pub(crate) enum QueuedPacket { - Unfragmented(PooledPacketBuffer), - Fragmented(FragmentedPacket), -} - -struct WhoisQueueItem { - packet_queue: LinkedList, - retry_gate: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, - retry_count: u16, -} - -pub(crate) struct WhoisQueue(Mutex>); - -impl WhoisQueue { - pub fn new() -> Self { - Self(Mutex::new(HashMap::new())) - } - - /// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received. - pub fn query(&self, node: &Node, si: &SI, target: Address, packet: Option) { - let mut q = self.0.lock(); - - let qi = q.entry(target).or_insert_with(|| WhoisQueueItem { - packet_queue: LinkedList::new(), - retry_gate: IntervalGate::new(0), - retry_count: 0, - }); - - if qi.retry_gate.gate(si.time_ticks()) { - qi.retry_count += 1; - if packet.is_some() { - while qi.packet_queue.len() >= WHOIS_MAX_WAITING_PACKETS { - let _ = qi.packet_queue.pop_front(); - } - qi.packet_queue.push_back(packet.unwrap()); - } - self.send_whois(node, si, &[target]); - } - } - - /// Remove a WHOIS request from the queue and call the supplied function for all queued packets. - #[allow(unused)] - pub fn response_received_get_packets(&self, address: Address, packet_handler: F) { - let mut qi = self.0.lock().remove(&address); - let _ = qi.map(|mut qi| qi.packet_queue.iter_mut().for_each(packet_handler)); - } - - #[allow(unused)] - fn send_whois(&self, node: &Node, si: &SI, targets: &[Address]) { - todo!() - } - - pub(crate) fn service(&self, si: &SI, node: &Node, time_ticks: i64) { - let mut targets: Vec
= Vec::new(); - self.0.lock().retain(|target, qi| { - if qi.retry_count < WHOIS_RETRY_MAX { - if qi.retry_gate.gate(time_ticks) { - qi.retry_count += 1; - targets.push(*target); - } - true - } else { - false - } - }); - if !targets.is_empty() { - self.send_whois(node, si, targets.as_slice()); - } - } -} diff --git a/network-hypervisor/src/vl2/certificateofmembership.rs b/network-hypervisor/src/vl2/certificateofmembership.rs index 9a67b203c..d4cd550e7 100644 --- a/network-hypervisor/src/vl2/certificateofmembership.rs +++ b/network-hypervisor/src/vl2/certificateofmembership.rs @@ -14,5 +14,5 @@ pub struct CertificateOfMembership { pub network_id: NetworkId, pub timestamp: i64, pub max_delta: i64, - pub signature: ArrayVec, + pub signature: ArrayVec, } diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index ca764bdc7..be3fc0b17 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -1,56 +1,51 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. -use async_trait::async_trait; +use std::sync::Arc; use crate::protocol::PacketBuffer; -use crate::vl1::node::{HostSystem, InnerProtocol}; +use crate::vl1::node::{HostSystem, InnerProtocol, PacketHandlerResult}; use crate::vl1::{Identity, Path, Peer}; pub trait SwitchInterface: Sync + Send {} pub struct Switch {} -#[async_trait] impl InnerProtocol for Switch { - #[allow(unused)] - async fn handle_packet( + fn handle_packet( &self, - peer: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, verb: u8, payload: &PacketBuffer, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled } - #[allow(unused)] - async fn handle_error( + fn handle_error( &self, - peer: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, in_re_verb: u8, in_re_message_id: u64, error_code: u8, payload: &PacketBuffer, cursor: &mut usize, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled } - #[allow(unused)] - async fn handle_ok( + fn handle_ok( &self, - peer: &Peer, - source_path: &Path, + source: &Arc>, + source_path: &Arc>, in_re_verb: u8, in_re_message_id: u64, payload: &PacketBuffer, cursor: &mut usize, - ) -> bool { - false + ) -> PacketHandlerResult { + PacketHandlerResult::NotHandled } - #[allow(unused)] fn should_communicate_with(&self, id: &Identity) -> bool { true } diff --git a/network-hypervisor/src/vl2/tag.rs b/network-hypervisor/src/vl2/tag.rs index d8016eacc..e59bf9bb5 100644 --- a/network-hypervisor/src/vl2/tag.rs +++ b/network-hypervisor/src/vl2/tag.rs @@ -14,5 +14,5 @@ pub struct Tag { pub timestamp: i64, pub issued_to: Address, pub signed_by: Address, - pub signature: ArrayVec, + pub signature: ArrayVec, } diff --git a/service/src/cli/rootset.rs b/service/src/cli/rootset.rs index b7cb12dc4..982bd5fa0 100644 --- a/service/src/cli/rootset.rs +++ b/service/src/cli/rootset.rs @@ -73,7 +73,7 @@ pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 { return exitcode::ERR_IOERR; } let root_set = root_set.unwrap(); - if root_set.verify() { + if root_set.verify().is_some() { println!("OK"); } else { println!("FAILED"); @@ -108,7 +108,9 @@ pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 { } Some(("restoredefault", _)) => { - let _ = std::io::stdout().write_all(to_json_pretty(&RootSet::zerotier_default()).as_bytes()); + let rs = RootSet::zerotier_default(); + let _ = std::io::stdout().write_all(to_json_pretty(&*rs).as_bytes()); + // TODO: re-add } _ => panic!(), diff --git a/service/src/datadir.rs b/service/src/datadir.rs index c1c588db3..bb12b3cd0 100644 --- a/service/src/datadir.rs +++ b/service/src/datadir.rs @@ -7,12 +7,10 @@ use std::sync::Arc; use crate::localconfig::Config; use crate::utils::{read_limit, DEFAULT_FILE_IO_READ_LIMIT}; -use async_trait::async_trait; - use parking_lot::{Mutex, RwLock}; use zerotier_crypto::random::next_u32_secure; -use zerotier_network_hypervisor::vl1::{Identity, Storage}; +use zerotier_network_hypervisor::vl1::{Identity, NodeStorage}; use zerotier_utils::json::to_json_pretty; const AUTH_TOKEN_DEFAULT_LENGTH: usize = 48; @@ -29,29 +27,35 @@ pub struct DataDir { authtoken: Mutex, } -#[async_trait] -impl Storage for DataDir { - async fn load_node_identity(&self) -> Option { - let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096).await; - if id_data.is_err() { - return None; - } - let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref()); - if id_data.is_err() { - return None; - } - Some(id_data.unwrap()) +impl NodeStorage for DataDir { + fn load_node_identity(&self) -> Option { + todo!() + /* + tokio::runtime::Handle::current().spawn(async { + let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096).await; + if id_data.is_err() { + return None; + } + let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref()); + if id_data.is_err() { + return None; + } + Some(id_data.unwrap()) + }) + */ } - async fn save_node_identity(&self, id: &Identity) { - assert!(id.secret.is_some()); - let id_secret_str = id.to_secret_string(); - let id_public_str = id.to_string(); - let secret_path = self.base_path.join(IDENTITY_SECRET_FILENAME); - // TODO: handle errors - let _ = tokio::fs::write(&secret_path, id_secret_str.as_bytes()).await; - assert!(crate::utils::fs_restrict_permissions(&secret_path)); - let _ = tokio::fs::write(self.base_path.join(IDENTITY_PUBLIC_FILENAME), id_public_str.as_bytes()).await; + fn save_node_identity(&self, id: &Identity) { + tokio::runtime::Handle::current().block_on(async { + assert!(id.secret.is_some()); + let id_secret_str = id.to_secret_string(); + let id_public_str = id.to_string(); + let secret_path = self.base_path.join(IDENTITY_SECRET_FILENAME); + // TODO: handle errors + let _ = tokio::fs::write(&secret_path, id_secret_str.as_bytes()).await; + assert!(crate::utils::fs_restrict_permissions(&secret_path)); + let _ = tokio::fs::write(self.base_path.join(IDENTITY_PUBLIC_FILENAME), id_public_str.as_bytes()).await; + }) } } diff --git a/utils/Cargo.toml b/utils/Cargo.toml index cde0e2365..be97870d6 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -9,3 +9,4 @@ version = "0.1.0" parking_lot = { version = "^0", features = [], default-features = false } serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } +tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false } diff --git a/utils/src/arrayvec.rs b/utils/src/arrayvec.rs index 6a183fb2f..81048f601 100644 --- a/utils/src/arrayvec.rs +++ b/utils/src/arrayvec.rs @@ -142,8 +142,8 @@ impl ArrayVec { #[inline(always)] pub fn push(&mut self, v: T) { - if self.s < C { - let i = self.s; + let i = self.s; + if i < C { unsafe { self.a.get_unchecked_mut(i).write(v) }; self.s = i + 1; } else { diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 04e1aab6a..d34a92efe 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -10,6 +10,8 @@ pub mod hex; pub mod json; pub mod memory; pub mod pool; +pub mod reaper; +pub mod ringbuffer; pub mod ringbuffermap; pub mod varint; diff --git a/utils/src/reaper.rs b/utils/src/reaper.rs new file mode 100644 index 000000000..71f4b3b58 --- /dev/null +++ b/utils/src/reaper.rs @@ -0,0 +1,49 @@ +use std::collections::VecDeque; +use std::sync::Arc; + +use tokio::sync::Notify; +use tokio::task::JoinHandle; +use tokio::time::Instant; + +/// Watches tokio jobs and times them out if they run past a deadline or aborts them all if the reaper is dropped. +pub struct Reaper { + q: Arc<(parking_lot::Mutex, Instant)>>, Notify)>, + finisher: JoinHandle<()>, +} + +impl Reaper { + pub fn new() -> Self { + let q = Arc::new((parking_lot::Mutex::new(VecDeque::with_capacity(16)), Notify::new())); + Self { + q: q.clone(), + finisher: tokio::spawn(async move { + loop { + q.1.notified().await; + loop { + let j = q.0.lock().pop_front(); + if let Some(j) = j { + let _ = tokio::time::timeout_at(j.1, j.0).await; + } else { + break; + } + } + } + }), + } + } + + /// Add a job to be executed with timeout at a given instant. + #[inline] + pub fn add(&self, job: JoinHandle<()>, deadline: Instant) { + self.q.0.lock().push_back((job, deadline)); + self.q.1.notify_waiters(); + } +} + +impl Drop for Reaper { + #[inline] + fn drop(&mut self) { + self.finisher.abort(); + self.q.0.lock().drain(..).for_each(|j| j.0.abort()); + } +} diff --git a/utils/src/ringbuffer.rs b/utils/src/ringbuffer.rs new file mode 100644 index 000000000..a5ab34dbd --- /dev/null +++ b/utils/src/ringbuffer.rs @@ -0,0 +1,113 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +use std::mem::MaybeUninit; + +/// A FIFO ring buffer. +pub struct RingBuffer { + a: [MaybeUninit; C], + p: usize, +} + +impl RingBuffer { + #[inline] + pub fn new() -> Self { + let mut tmp: Self = unsafe { MaybeUninit::uninit().assume_init() }; + tmp.p = 0; + tmp + } + + /// Add an element to the buffer, replacing old elements if full. + #[inline] + pub fn add(&mut self, o: T) { + let p = self.p; + if p < C { + unsafe { self.a.get_unchecked_mut(p).write(o) }; + } else { + unsafe { *self.a.get_unchecked_mut(p % C).assume_init_mut() = o }; + } + self.p = p.wrapping_add(1); + } + + /// Clear the buffer and drop all elements. + #[inline] + pub fn clear(&mut self) { + for i in 0..C.min(self.p) { + unsafe { self.a.get_unchecked_mut(i).assume_init_drop() }; + } + self.p = 0; + } + + /// Gets an iterator that dumps the contents of the buffer in FIFO order. + #[inline] + pub fn iter(&self) -> RingBufferIterator<'_, T, C> { + let s = C.min(self.p); + RingBufferIterator { b: self, s, i: self.p.wrapping_sub(s) } + } +} + +impl Default for RingBuffer { + #[inline(always)] + fn default() -> Self { + Self::new() + } +} + +impl Drop for RingBuffer { + #[inline(always)] + fn drop(&mut self) { + self.clear(); + } +} + +pub struct RingBufferIterator<'a, T, const C: usize> { + b: &'a RingBuffer, + s: usize, + i: usize, +} + +impl<'a, T, const C: usize> Iterator for RingBufferIterator<'a, T, C> { + type Item = &'a T; + + #[inline] + fn next(&mut self) -> Option { + let s = self.s; + if s > 0 { + let i = self.i; + self.s = s.wrapping_sub(1); + self.i = i.wrapping_add(1); + Some(unsafe { self.b.a.get_unchecked(i % C).assume_init_ref() }) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn fifo() { + let mut tmp: RingBuffer = RingBuffer::new(); + let mut tmp2 = Vec::new(); + for i in 0..4 { + tmp.add(i); + tmp2.push(i); + } + for (i, j) in tmp.iter().zip(tmp2.iter()) { + assert_eq!(*i, *j); + } + tmp.clear(); + tmp2.clear(); + for i in 0..23 { + tmp.add(i); + tmp2.push(i); + } + while tmp2.len() > 8 { + tmp2.remove(0); + } + for (i, j) in tmp.iter().zip(tmp2.iter()) { + assert_eq!(*i, *j); + } + } +} diff --git a/vl1-service/Cargo.toml b/vl1-service/Cargo.toml index 6cd63a365..7cb6e11a8 100644 --- a/vl1-service/Cargo.toml +++ b/vl1-service/Cargo.toml @@ -9,11 +9,11 @@ license = "MPL-2.0" zerotier-network-hypervisor = { path = "../network-hypervisor" } zerotier-crypto = { path = "../crypto" } zerotier-utils = { path = "../utils" } -async-trait = "^0" num-traits = "^0" tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false } parking_lot = { version = "^0", features = [], default-features = false } serde = { version = "^1", features = ["derive"], default-features = false } +serde_json = { version = "^1", features = ["std"], default-features = false } [target."cfg(windows)".dependencies] winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] } diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index 546c164ab..b748474bf 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -4,10 +4,9 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; use std::sync::Arc; -use async_trait::async_trait; - use zerotier_crypto::random; -use zerotier_network_hypervisor::vl1::{Endpoint, Event, HostSystem, Identity, InetAddress, InnerProtocol, Node, NodeStorage, PathFilter}; +use zerotier_network_hypervisor::protocol::{PacketBufferFactory, PacketBufferPool}; +use zerotier_network_hypervisor::vl1::*; use zerotier_utils::{ms_monotonic, ms_since_epoch}; use crate::constants::UNASSIGNED_PRIVILEGED_PORTS; @@ -24,11 +23,16 @@ use tokio::time::Duration; /// talks to the physical network, manages the vl1 node, and presents a templated interface for /// whatever inner protocol implementation is using it. This would typically be VL2 but could be /// a test harness or just the controller for a controller that runs stand-alone. -pub struct VL1Service { - state: tokio::sync::RwLock, +pub struct VL1Service< + NodeStorageImpl: NodeStorage + 'static, + PathFilterImpl: PathFilter + 'static, + InnerProtocolImpl: InnerProtocol + 'static, +> { + state: parking_lot::RwLock, storage: Arc, inner: Arc, path_filter: Arc, + buffer_pool: PacketBufferPool, node_container: Option>, } @@ -38,7 +42,7 @@ struct VL1ServiceMutableState { settings: VL1Settings, } -impl +impl VL1Service { pub async fn new( @@ -48,7 +52,7 @@ impl Result, Box> { let mut service = VL1Service { - state: tokio::sync::RwLock::new(VL1ServiceMutableState { + state: parking_lot::RwLock::new(VL1ServiceMutableState { daemons: Vec::with_capacity(2), udp_sockets: HashMap::with_capacity(8), settings, @@ -56,17 +60,19 @@ impl Vec { - self.state.read().await.udp_sockets.keys().cloned().collect() + pub fn bound_udp_ports(&self) -> Vec { + self.state.read().udp_sockets.keys().cloned().collect() } async fn udp_bind_daemon(self: Arc) { loop { - let state = self.state.read().await; - let mut need_fixed_ports: HashSet = HashSet::from_iter(state.settings.fixed_ports.iter().cloned()); - let mut have_random_port_count = 0; - for (p, _) in state.udp_sockets.iter() { - need_fixed_ports.remove(p); - have_random_port_count += (!state.settings.fixed_ports.contains(p)) as usize; - } - let desired_random_port_count = state.settings.random_port_count; - - let state = if !need_fixed_ports.is_empty() || have_random_port_count != desired_random_port_count { - drop(state); - let mut state = self.state.write().await; - - for p in need_fixed_ports.iter() { - state.udp_sockets.insert(*p, parking_lot::RwLock::new(BoundUdpPort::new(*p))); + { + let state = self.state.read(); + let mut need_fixed_ports: HashSet = HashSet::from_iter(state.settings.fixed_ports.iter().cloned()); + let mut have_random_port_count = 0; + for (p, _) in state.udp_sockets.iter() { + need_fixed_ports.remove(p); + have_random_port_count += (!state.settings.fixed_ports.contains(p)) as usize; } + let desired_random_port_count = state.settings.random_port_count; - while have_random_port_count > desired_random_port_count { - let mut most_stale_binding_liveness = (usize::MAX, i64::MAX); - let mut most_stale_binding_port = 0; - for (p, s) in state.udp_sockets.iter() { - if !state.settings.fixed_ports.contains(p) { - let (total_smart_ptr_handles, most_recent_receive) = s.read().liveness(); - if total_smart_ptr_handles < most_stale_binding_liveness.0 - || (total_smart_ptr_handles == most_stale_binding_liveness.0 - && most_recent_receive <= most_stale_binding_liveness.1) - { - most_stale_binding_liveness.0 = total_smart_ptr_handles; - most_stale_binding_liveness.1 = most_recent_receive; - most_stale_binding_port = *p; + let state = if !need_fixed_ports.is_empty() || have_random_port_count != desired_random_port_count { + drop(state); + let mut state = self.state.write(); + + for p in need_fixed_ports.iter() { + state.udp_sockets.insert(*p, parking_lot::RwLock::new(BoundUdpPort::new(*p))); + } + + while have_random_port_count > desired_random_port_count { + let mut most_stale_binding_liveness = (usize::MAX, i64::MAX); + let mut most_stale_binding_port = 0; + for (p, s) in state.udp_sockets.iter() { + if !state.settings.fixed_ports.contains(p) { + let (total_smart_ptr_handles, most_recent_receive) = s.read().liveness(); + if total_smart_ptr_handles < most_stale_binding_liveness.0 + || (total_smart_ptr_handles == most_stale_binding_liveness.0 + && most_recent_receive <= most_stale_binding_liveness.1) + { + most_stale_binding_liveness.0 = total_smart_ptr_handles; + most_stale_binding_liveness.1 = most_recent_receive; + most_stale_binding_port = *p; + } } } - } - if most_stale_binding_port != 0 { - have_random_port_count -= state.udp_sockets.remove(&most_stale_binding_port).is_some() as usize; - } else { - break; - } - } - - 'outer_add_port_loop: while have_random_port_count < desired_random_port_count { - let rn = random::xorshift64_random() as usize; - for i in 0..UNASSIGNED_PRIVILEGED_PORTS.len() { - let p = UNASSIGNED_PRIVILEGED_PORTS[rn.wrapping_add(i) % UNASSIGNED_PRIVILEGED_PORTS.len()]; - if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) { - let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p))); - continue 'outer_add_port_loop; + if most_stale_binding_port != 0 { + have_random_port_count -= state.udp_sockets.remove(&most_stale_binding_port).is_some() as usize; + } else { + break; } } - let p = 50000 + ((random::xorshift64_random() as u16) % 15535); - if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) { - have_random_port_count += state - .udp_sockets - .insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p))) - .is_none() as usize; + 'outer_add_port_loop: while have_random_port_count < desired_random_port_count { + let rn = random::xorshift64_random() as usize; + for i in 0..UNASSIGNED_PRIVILEGED_PORTS.len() { + let p = UNASSIGNED_PRIVILEGED_PORTS[rn.wrapping_add(i) % UNASSIGNED_PRIVILEGED_PORTS.len()]; + if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) { + let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p))); + continue 'outer_add_port_loop; + } + } + + let p = 50000 + ((random::xorshift64_random() as u16) % 15535); + if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) { + have_random_port_count += state + .udp_sockets + .insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p))) + .is_none() as usize; + } } - } - drop(state); - self.state.read().await - } else { - state - }; + drop(state); + self.state.read() + } else { + state + }; - let num_cores = std::thread::available_parallelism().map_or(1, |c| c.get()); - for (_, binding) in state.udp_sockets.iter() { - let mut binding = binding.write(); - let (_, mut new_sockets) = - binding.update_bindings(&state.settings.interface_prefix_blacklist, &state.settings.cidr_blacklist); - for s in new_sockets.drain(..) { - // Start one async task per system core. This is technically not necessary because tokio - // schedules and multiplexes, but this enables tokio to grab and schedule packets - // concurrently for up to the number of cores available for any given socket and is - // probably faster than other patterns that involve iterating through sockets and creating - // arrays of futures or using channels. - let mut socket_tasks = Vec::with_capacity(num_cores); - for _ in 0..num_cores { - let self_copy = self.clone(); - let s_copy = s.clone(); - let local_socket = LocalSocket::new(&s); - socket_tasks.push(tokio::spawn(async move { - loop { - let mut buf = self_copy.node().get_packet_buffer(); - let now = ms_monotonic(); - if let Ok((bytes, from_sockaddr)) = s_copy.receive(unsafe { buf.entire_buffer_mut() }, now).await { - unsafe { buf.set_size_unchecked(bytes) }; - self_copy - .node() - .handle_incoming_physical_packet( + let num_cores = std::thread::available_parallelism().map_or(1, |c| c.get()); + for (_, binding) in state.udp_sockets.iter() { + let mut binding = binding.write(); + let (_, mut new_sockets) = + binding.update_bindings(&state.settings.interface_prefix_blacklist, &state.settings.cidr_blacklist); + for s in new_sockets.drain(..) { + // Start one async task per system core. This is technically not necessary because tokio + // schedules and multiplexes, but this enables tokio to grab and schedule packets + // concurrently for up to the number of cores available for any given socket and is + // probably faster than other patterns that involve iterating through sockets and creating + // arrays of futures or using channels. + let mut socket_tasks = Vec::with_capacity(num_cores); + for _ in 0..num_cores { + let self_copy = self.clone(); + let s_copy = s.clone(); + let local_socket = LocalSocket::new(&s); + socket_tasks.push(tokio::spawn(async move { + loop { + let mut buf = self_copy.buffer_pool.get(); + let now = ms_monotonic(); + if let Ok((bytes, from_sockaddr)) = s_copy.receive(unsafe { buf.entire_buffer_mut() }, now).await { + unsafe { buf.set_size_unchecked(bytes) }; + self_copy.node().handle_incoming_physical_packet( &*self_copy, &*self_copy.inner, &Endpoint::IpUdp(InetAddress::from(from_sockaddr)), &local_socket, &s_copy.interface, buf, - ) - .await; + ); + } } - } - })); + })); + } + debug_assert!(s.associated_tasks.lock().is_empty()); + *s.associated_tasks.lock() = socket_tasks; } - debug_assert!(s.associated_tasks.lock().is_empty()); - *s.associated_tasks.lock() = socket_tasks; } } @@ -197,12 +202,11 @@ impl) { tokio::time::sleep(Duration::from_secs(1)).await; loop { - tokio::time::sleep(self.node().do_background_tasks(self.as_ref()).await).await; + tokio::time::sleep(self.node().do_background_tasks(self.as_ref())).await; } } } -#[async_trait] impl HostSystem for VL1Service { @@ -216,37 +220,35 @@ impl bool { socket.is_valid() } - async fn wire_send( + fn wire_send( &self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[u8], packet_ttl: u8, - ) -> bool { + ) { match endpoint { Endpoint::IpUdp(address) => { // This is the fast path -- the socket is known to the core so just send it. if let Some(s) = local_socket { if let Some(s) = s.0.upgrade() { - return s.send_sync_nonblock(address, data, packet_ttl); + s.send_sync_nonblock(address, data, packet_ttl); } else { - return false; + return; } } - let state = self.state.read().await; + let state = self.state.read(); if !state.udp_sockets.is_empty() { if let Some(specific_interface) = local_interface { // Send from a specific interface if that interface is specified. - for (_, p) in state.udp_sockets.iter() { + 'socket_search: for (_, p) in state.udp_sockets.iter() { let p = p.read(); if !p.sockets.is_empty() { let mut i = (random::next_u32_secure() as usize) % p.sockets.len(); @@ -254,7 +256,7 @@ impl {} } - return false; } #[inline(always)] @@ -305,15 +303,10 @@ impl { fn drop(&mut self) { - loop { - if let Ok(mut state) = self.state.try_write() { - for d in state.daemons.drain(..) { - d.abort(); - } - state.udp_sockets.clear(); - break; - } - std::thread::sleep(Duration::from_millis(2)); + let mut state = self.state.write(); + for d in state.daemons.drain(..) { + d.abort(); } + state.udp_sockets.clear(); } }