diff --git a/attic/flatkv.rs b/attic/flatkv.rs new file mode 100644 index 000000000..77c0574d7 --- /dev/null +++ b/attic/flatkv.rs @@ -0,0 +1,261 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +use std::borrow::Cow; + +/// A flat key/value store implemented in terms of arrays tuples of (key, value). +#[repr(transparent)] +#[derive(Clone, PartialEq, Eq)] +pub struct FlatKV(Vec<(&'static str, Value)>); + +/// Value variant for FlatKV. +#[derive(Clone, PartialEq, Eq)] +pub enum Value { + N, + KV(FlatKV), + S(Cow<'static, str>), + I(i64), + UI(u64), + B(bool), + Endpoint(crate::vl1::Endpoint), + Identity(crate::vl1::Identity), +} + +impl Into for FlatKV { + #[inline(always)] + fn into(self) -> Value { + Value::KV(self) + } +} + +impl Into for &'static str { + #[inline(always)] + fn into(self) -> Value { + Value::S(Cow::Borrowed(self)) + } +} + +impl Into for String { + #[inline(always)] + fn into(self) -> Value { + Value::S(Cow::Owned(self)) + } +} + +impl Into for i64 { + #[inline(always)] + fn into(self) -> Value { + Value::I(self) + } +} + +impl Into for u64 { + #[inline(always)] + fn into(self) -> Value { + Value::UI(self) + } +} + +impl Into for isize { + #[inline(always)] + fn into(self) -> Value { + Value::I(self as i64) + } +} + +impl Into for usize { + #[inline(always)] + fn into(self) -> Value { + Value::UI(self as u64) + } +} + +impl Into for i32 { + #[inline(always)] + fn into(self) -> Value { + Value::I(self as i64) + } +} + +impl Into for u32 { + #[inline(always)] + fn into(self) -> Value { + Value::UI(self as u64) + } +} + +impl Into for i16 { + #[inline(always)] + fn into(self) -> Value { + Value::I(self as i64) + } +} + +impl Into for u16 { + #[inline(always)] + fn into(self) -> Value { + Value::UI(self as u64) + } +} + +impl Into for i8 { + #[inline(always)] + fn into(self) -> Value { + Value::I(self as i64) + } +} + +impl Into for u8 { + #[inline(always)] + fn into(self) -> Value { + Value::UI(self as u64) + } +} + +impl Into for bool { + #[inline(always)] + fn into(self) -> Value { + Value::B(self) + } +} + +impl Into for crate::vl1::Endpoint { + #[inline(always)] + fn into(self) -> Value { + Value::Endpoint(self) + } +} + +impl Into for crate::vl1::InetAddress { + #[inline(always)] + fn into(self) -> Value { + Value::Endpoint(crate::vl1::Endpoint::IpUdp(self)) + } +} + +impl Into for crate::vl1::Identity { + #[inline(always)] + fn into(self) -> Value { + Value::Identity(self) + } +} + +impl ToString for Value { + fn to_string(&self) -> String { + match self { + Value::N => "(null)".into(), + Value::KV(x) => x.to_string(), + Value::S(x) => x.to_string(), + Value::I(x) => x.to_string(), + Value::UI(x) => x.to_string(), + Value::B(x) => x.to_string(), + Value::Endpoint(x) => x.to_string(), + Value::Identity(x) => x.to_string(), + } + } +} + +impl FlatKV { + #[inline(always)] + pub fn add>(&mut self, k: &'static str, v: T) { + self.0.push((k, v.into())) + } +} + +fn json_escape(src: &str, escaped: &mut String) { + use std::fmt::Write; + let mut utf16_buf = [0u16; 2]; + for c in src.chars() { + match c { + '\x08' => escaped.push_str("\\b"), + '\x0c' => escaped.push_str("\\f"), + '\n' => escaped.push_str("\\n"), + '\r' => escaped.push_str("\\r"), + '\t' => escaped.push_str("\\t"), + '"' => escaped.push_str("\\\""), + '\\' => escaped.push_str("\\\\"), + '/' => escaped.push_str("\\/"), + c if c.is_ascii_graphic() => escaped.push(c), + c => { + let encoded = c.encode_utf16(&mut utf16_buf); + for utf16 in encoded { + write!(escaped, "\\u{:04X}", utf16).unwrap(); + } + } + } + } +} + +impl Default for FlatKV { + #[inline(always)] + fn default() -> Self { + Self(Vec::new()) + } +} + +impl FlatKV { + #[inline(always)] + pub fn new() -> Self { + Self(Vec::new()) + } +} + +impl ToString for FlatKV { + /// Output a JSON formatted map of values or maps. + fn to_string(&self) -> String { + let mut first = true; + let mut tmp = String::new(); + tmp.push_str("{ "); //} //" + for (k, v) in self.0.iter() { + if first { + first = false; + } else { + tmp.push_str(", "); + } + tmp.push('"'); + json_escape(*k, &mut tmp); + tmp.push_str("\": "); + match v { + Value::S(_) | Value::Endpoint(_) | Value::Identity(_) => { + tmp.push('"'); + json_escape(v.to_string().as_str(), &mut tmp); + tmp.push('"'); + } + _ => tmp.push_str(v.to_string().as_str()), + } + } + tmp.push_str("} "); + tmp + } +} + +#[macro_export] +macro_rules! kv { + ($($key:expr => $value:expr,)+) => (kv!($($key => $value),+)); + ( $($key:expr => $value:expr),* ) => { + { + #[allow(unused_mut)] + let mut _kv = crate::util::flatkv::FlatKV(Vec::new()); + $( + _kv.add($key, $value); + )* + _kv + } + }; +} + +#[cfg(test)] +mod tests { + #[test] + fn kv_macro() { + let kv = kv!( + "foo" => 0_u64, + "bar" => "bar", + "baz" => -1_i64, + "lala" => false, + "lol" => kv!( + "boo" => 1_u16, + "far" => 2_u32, + ) + ); + } +} diff --git a/zerotier-core-crypto/src/random.rs b/zerotier-core-crypto/src/random.rs index 812686383..a9588a4c0 100644 --- a/zerotier-core-crypto/src/random.rs +++ b/zerotier-core-crypto/src/random.rs @@ -1,11 +1,12 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. -use openssl::rand::rand_bytes; use std::mem::MaybeUninit; +use std::sync::atomic::{AtomicU64, Ordering}; -pub struct SecureRandom; +use openssl::rand::rand_bytes; + +use lazy_static::lazy_static; -#[inline(always)] pub fn next_u32_secure() -> u32 { unsafe { let mut tmp: [u32; 1] = MaybeUninit::uninit().assume_init(); @@ -14,7 +15,6 @@ pub fn next_u32_secure() -> u32 { } } -#[inline(always)] pub fn next_u64_secure() -> u64 { unsafe { let mut tmp: [u64; 1] = MaybeUninit::uninit().assume_init(); @@ -23,6 +23,14 @@ pub fn next_u64_secure() -> u64 { } } +pub fn next_u128_secure() -> u128 { + unsafe { + let mut tmp: [u128; 1] = MaybeUninit::uninit().assume_init(); + assert!(rand_bytes(&mut *(tmp.as_mut_ptr().cast::<[u8; 16]>())).is_ok()); + tmp[0] + } +} + #[inline(always)] pub fn fill_bytes_secure(dest: &mut [u8]) { assert!(rand_bytes(dest).is_ok()); @@ -35,6 +43,15 @@ pub fn get_bytes_secure() -> [u8; COUNT] { tmp } +pub struct SecureRandom; + +impl Default for SecureRandom { + #[inline(always)] + fn default() -> Self { + Self + } +} + impl SecureRandom { #[inline(always)] pub fn get() -> Self { @@ -66,19 +83,21 @@ impl rand_core::RngCore for SecureRandom { impl rand_core::CryptoRng for SecureRandom {} +unsafe impl Sync for SecureRandom {} + unsafe impl Send for SecureRandom {} -static mut XORSHIFT64_STATE: u64 = 0; +lazy_static! { + static ref XORSHIFT64_STATE: AtomicU64 = AtomicU64::new(next_u64_secure()); +} /// Get a non-cryptographic random number. pub fn xorshift64_random() -> u64 { - let mut x = unsafe { XORSHIFT64_STATE }; - while x == 0 { - x = next_u64_secure(); - } + let mut x = XORSHIFT64_STATE.load(Ordering::Relaxed); + x = x.wrapping_add((x == 0) as u64); x ^= x.wrapping_shl(13); x ^= x.wrapping_shr(7); x ^= x.wrapping_shl(17); - unsafe { XORSHIFT64_STATE = x }; + XORSHIFT64_STATE.store(x, Ordering::Relaxed); x } diff --git a/zerotier-network-hypervisor/Cargo.toml b/zerotier-network-hypervisor/Cargo.toml index 4b7680a2d..f610987e3 100644 --- a/zerotier-network-hypervisor/Cargo.toml +++ b/zerotier-network-hypervisor/Cargo.toml @@ -11,6 +11,10 @@ opt-level = 3 codegen-units = 1 panic = 'abort' +[features] +default = ["zt_trace"] +zt_trace = [] + [dependencies] zerotier-core-crypto = { path = "../zerotier-core-crypto" } base64 = "^0" diff --git a/zerotier-network-hypervisor/src/event.rs b/zerotier-network-hypervisor/src/event.rs new file mode 100644 index 000000000..a1c6873e9 --- /dev/null +++ b/zerotier-network-hypervisor/src/event.rs @@ -0,0 +1,49 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +use crate::vl1::*; + +#[derive(Clone)] +pub enum Event { + // Change in node online status. + Online(bool), + + // Tracing: source file, line number, message (if enabled in build). + Trace(&'static str, u32, String), + + // An anomalous event has been encountered that could indicate a possible security problem. + SecurityWarning(String), + + // A fatal error has occurred. + FatalError(String), + + // This node has automatically generated an identity. + IdentityAutoGenerated(Identity), + + // This node's identity has automatically been upgraded, contains old and new. + IdentityAutoUpgraded(Identity, Identity), + + // The list of roots has been updated, contains old and new. + UpdatedRoots(Vec, Vec), +} + +impl ToString for Event { + fn to_string(&self) -> String { + match self { + Event::Online(online) => format!("[vl1] online == {}", online), + Event::Trace(l, f, m) => format!("[trace] {}:{} {}", l, f, m), + Event::SecurityWarning(w) => format!("[global] security warning: {}", w), + Event::FatalError(e) => format!("[global] FATAL: {}", e), + Event::IdentityAutoGenerated(id) => format!("[vl1] identity auto-generated: {}", id.to_string()), + Event::IdentityAutoUpgraded(oldid, newid) => format!("[vl1] identity upgrade: {} => {}", oldid.to_string(), newid.to_string()), + Event::UpdatedRoots(_, newroots) => { + let mut tmp = String::with_capacity(128); + tmp.push_str("[vl1] updated root set:"); + for r in newroots.iter() { + tmp.push(' '); + tmp.push_str(r.address.to_string().as_str()); + } + tmp + } + } + } +} diff --git a/zerotier-network-hypervisor/src/lib.rs b/zerotier-network-hypervisor/src/lib.rs index 6f5082dae..b1b350577 100644 --- a/zerotier-network-hypervisor/src/lib.rs +++ b/zerotier-network-hypervisor/src/lib.rs @@ -9,7 +9,9 @@ pub mod util; pub mod vl1; pub mod vl2; +mod event; mod networkhypervisor; +pub use event::Event; pub use networkhypervisor::{Interface, NetworkHypervisor}; pub use vl1::protocol::{PacketBuffer, PooledPacketBuffer}; diff --git a/zerotier-network-hypervisor/src/networkhypervisor.rs b/zerotier-network-hypervisor/src/networkhypervisor.rs index 744308717..c9fbf6360 100644 --- a/zerotier-network-hypervisor/src/networkhypervisor.rs +++ b/zerotier-network-hypervisor/src/networkhypervisor.rs @@ -15,9 +15,9 @@ pub struct NetworkHypervisor { } impl NetworkHypervisor { - pub fn new(ii: &I, auto_generate_identity: bool) -> Result { + pub fn new(ii: &I, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { Ok(NetworkHypervisor { - vl1: Node::new(ii, auto_generate_identity)?, + vl1: Node::new(ii, auto_generate_identity, auto_upgrade_identity)?, vl2: Switch::new(), }) } @@ -43,8 +43,8 @@ impl NetworkHypervisor { } #[inline(always)] - pub fn wire_receive(&self, ii: &I, source_endpoint: &Endpoint, source_local_socket: &I::LocalSocket, source_local_interface: &I::LocalInterface, data: PooledPacketBuffer) { - self.vl1.wire_receive(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) + pub fn handle_incoming_physical_packet(&self, ii: &I, source_endpoint: &Endpoint, source_local_socket: &I::LocalSocket, source_local_interface: &I::LocalInterface, data: PooledPacketBuffer) { + self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) } #[inline(always)] diff --git a/zerotier-network-hypervisor/src/util/marshalable.rs b/zerotier-network-hypervisor/src/util/marshalable.rs index 8bbe93397..c1cc41443 100644 --- a/zerotier-network-hypervisor/src/util/marshalable.rs +++ b/zerotier-network-hypervisor/src/util/marshalable.rs @@ -4,7 +4,7 @@ use crate::util::buffer::Buffer; /// Must be larger than any object we want to use with to_bytes() or from_bytes(). /// This hack can go away once Rust allows us to reference trait consts as generics. -const TEMP_BUF_SIZE: usize = 16384; +const TEMP_BUF_SIZE: usize = 8192; /// A super-lightweight zero-allocation serialization interface. pub trait Marshalable: Sized { @@ -25,7 +25,6 @@ pub trait Marshalable: Sized { /// This will return an Err if the buffer is too small or some other error occurs. It's just /// a shortcut to creating a buffer and marshaling into it. fn to_buffer(&self) -> std::io::Result> { - assert!(BL >= Self::MAX_MARSHAL_SIZE); let mut tmp = Buffer::new(); self.marshal(&mut tmp)?; Ok(tmp) @@ -41,9 +40,8 @@ pub trait Marshalable: Sized { /// Marshal and convert to a Rust vector. fn to_bytes(&self) -> Vec { - assert!(Self::MAX_MARSHAL_SIZE <= TEMP_BUF_SIZE); let mut tmp = Buffer::::new(); - assert!(self.marshal(&mut tmp).is_ok()); + assert!(self.marshal(&mut tmp).is_ok()); // panics if TEMP_BUF_SIZE is too small tmp.as_bytes().to_vec() } diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index ae5aa839b..4e0584a71 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -10,71 +10,23 @@ pub use zerotier_core_crypto::varint; pub(crate) const ZEROES: [u8; 64] = [0_u8; 64]; +#[cfg(target_feature = "zt_trace")] +macro_rules! zt_trace { + ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => { + $si.event(crate::Event::Trace(file!(), line!(), format!($fmt, $($($arg)*)?))); + } +} + +#[cfg(not(target_feature = "zt_trace"))] +macro_rules! zt_trace { + ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {}; +} + +pub(crate) use zt_trace; + /// Obtain a reference to a sub-array within an existing byte array. #[inline(always)] pub(crate) fn byte_array_range(a: &[u8; A]) -> &[u8; LEN] { assert!((START + LEN) <= A); unsafe { &*a.as_ptr().add(START).cast::<[u8; LEN]>() } } - -/// A super-minimal hasher for u64 keys for keys already fairly randomly distributed like addresses and network IDs. -#[derive(Copy, Clone)] -pub(crate) struct U64NoOpHasher(u64); - -impl U64NoOpHasher { - #[inline(always)] - pub fn new() -> Self { - Self(0) - } -} - -impl std::hash::Hasher for U64NoOpHasher { - #[inline(always)] - fn finish(&self) -> u64 { - self.0.wrapping_add(self.0.wrapping_shr(32)) - } - - #[inline(always)] - fn write_u64(&mut self, i: u64) { - self.0 = self.0.wrapping_add(i); - } - - #[inline(always)] - fn write_i64(&mut self, i: i64) { - self.0 = self.0.wrapping_add(i as u64); - } - - #[inline(always)] - fn write_usize(&mut self, i: usize) { - self.0 = self.0.wrapping_add(i as u64); - } - - #[inline(always)] - fn write_isize(&mut self, i: isize) { - self.0 = self.0.wrapping_add(i as u64); - } - - #[inline(always)] - fn write_u32(&mut self, i: u32) { - self.0 = self.0.wrapping_add(i as u64); - } - - #[inline(always)] - fn write_i32(&mut self, i: i32) { - self.0 = self.0.wrapping_add(i as u64); - } - - #[inline(always)] - fn write(&mut self, _: &[u8]) { - panic!("U64NoOpHasher should only be used with u64 and i64 types"); - } -} - -impl std::hash::BuildHasher for U64NoOpHasher { - type Hasher = Self; - - #[inline(always)] - fn build_hasher(&self) -> Self::Hasher { - Self(0) - } -} diff --git a/zerotier-network-hypervisor/src/util/pool.rs b/zerotier-network-hypervisor/src/util/pool.rs index 74741ede4..1bba90bb4 100644 --- a/zerotier-network-hypervisor/src/util/pool.rs +++ b/zerotier-network-hypervisor/src/util/pool.rs @@ -63,6 +63,10 @@ impl> Pooled { } } +unsafe impl> Send for Pooled where O: Send {} + +unsafe impl> Sync for Pooled where O: Sync {} + impl> std::ops::Deref for Pooled { type Target = O; diff --git a/zerotier-network-hypervisor/src/vl1/fragmentedpacket.rs b/zerotier-network-hypervisor/src/vl1/fragmentedpacket.rs index cfdf9e504..a7a28667d 100644 --- a/zerotier-network-hypervisor/src/vl1/fragmentedpacket.rs +++ b/zerotier-network-hypervisor/src/vl1/fragmentedpacket.rs @@ -7,9 +7,6 @@ use crate::vl1::protocol::*; /// Performance note: PacketBuffer is Pooled which is NotNull<*mut Buffer>. /// That means Option is just a pointer, since NotNull permits the /// compiler to optimize out any additional state in Option. -/// -/// This will need to be modified if we ever support more than 8 fragments to increase -/// the size of frags[] and the number of bits in 'have' and 'expecting'. pub(crate) struct FragmentedPacket { pub ts_ticks: i64, pub frags: [Option; packet_constants::FRAGMENT_COUNT_MAX], @@ -18,20 +15,22 @@ pub(crate) struct FragmentedPacket { } impl FragmentedPacket { - #[inline] pub fn new(ts: i64) -> Self { + // 'have' and 'expecting' must be expanded if this is >8 + debug_assert!(packet_constants::FRAGMENT_COUNT_MAX <= 8); + Self { ts_ticks: ts, - frags: [None, None, None, None, None, None, None, None], + frags: Default::default(), have: 0, expecting: 0, } } /// Add a fragment to this fragment set and return true if all fragments are present. - #[inline] + #[inline(always)] pub fn add_fragment(&mut self, frag: PooledPacketBuffer, no: u8, expecting: u8) -> bool { - self.frags.get_mut(no as usize).map_or(false, |entry| { + if let Some(entry) = self.frags.get_mut(no as usize) { /* * This works by setting bit N in the 'have' bit mask and then setting X bits * in 'expecting' if the 'expecting' field is non-zero. Since the packet head @@ -59,6 +58,8 @@ impl FragmentedPacket { self.have |= 1_u8.wrapping_shl(no as u32); self.expecting |= 0xff_u8.wrapping_shr(8 - (expecting as u32)); self.have == self.expecting - }) + } else { + false + } } } diff --git a/zerotier-network-hypervisor/src/vl1/identity.rs b/zerotier-network-hypervisor/src/vl1/identity.rs index 0d3df6202..45785daa9 100644 --- a/zerotier-network-hypervisor/src/vl1/identity.rs +++ b/zerotier-network-hypervisor/src/vl1/identity.rs @@ -277,16 +277,16 @@ impl Identity { /// Sign a message with this identity. /// - /// If legacy_compatibility is true this generates only an ed25519 signature and uses the old + /// If legacy_ed25519_only is true this generates only an ed25519 signature and uses the old /// format that also includes part of the plaintext hash at the end. The include_algorithms mask /// will be ignored. Otherwise it will generate a signature for every algorithm with a secret /// in this identity and that is specified in the include_algorithms bit mask. /// /// A return of None happens if we don't have our secret key(s) or some other error occurs. - pub fn sign(&self, msg: &[u8], include_algorithms: u8, legacy_compatibility: bool) -> Option> { + pub fn sign(&self, msg: &[u8], include_algorithms: u8, legacy_ed25519_only: bool) -> Option> { if self.secret.is_some() { let secret = self.secret.as_ref().unwrap(); - if legacy_compatibility { + if legacy_ed25519_only { Some(secret.ed25519.sign_zt(msg).to_vec()) } else { let mut tmp: Vec = Vec::with_capacity(1 + P384_ECDSA_SIGNATURE_SIZE + ED25519_SIGNATURE_SIZE); diff --git a/zerotier-network-hypervisor/src/vl1/inetaddress.rs b/zerotier-network-hypervisor/src/vl1/inetaddress.rs index 30f2a0308..5b274af62 100644 --- a/zerotier-network-hypervisor/src/vl1/inetaddress.rs +++ b/zerotier-network-hypervisor/src/vl1/inetaddress.rs @@ -3,7 +3,7 @@ use std::cmp::Ordering; use std::hash::{Hash, Hasher}; use std::mem::{size_of, transmute_copy, zeroed, MaybeUninit}; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; use std::ptr::{copy_nonoverlapping, null, slice_from_raw_parts, write_bytes}; use std::str::FromStr; @@ -74,9 +74,27 @@ pub union InetAddress { ss: sockaddr_storage, // some external code may expect the struct to be this full length } +impl ToSocketAddrs for InetAddress { + type Iter = std::iter::Once; + + #[inline(always)] + fn to_socket_addrs(&self) -> std::io::Result { + self.try_into().map_or_else(|_| Err(std::io::Error::new(std::io::ErrorKind::Other, "not an IP address")), |sa| Ok(std::iter::once(sa))) + } +} + impl TryInto for InetAddress { type Error = crate::error::InvalidParameterError; + #[inline(always)] + fn try_into(self) -> Result { + (&self).try_into() + } +} + +impl TryInto for &InetAddress { + type Error = crate::error::InvalidParameterError; + #[inline(always)] fn try_into(self) -> Result { match unsafe { self.sa.sa_family } { @@ -90,6 +108,15 @@ impl TryInto for InetAddress { impl TryInto for InetAddress { type Error = crate::error::InvalidParameterError; + #[inline(always)] + fn try_into(self) -> Result { + (&self).try_into() + } +} + +impl TryInto for &InetAddress { + type Error = crate::error::InvalidParameterError; + #[inline(always)] fn try_into(self) -> Result { match unsafe { self.sa.sa_family } { @@ -102,6 +129,15 @@ impl TryInto for InetAddress { impl TryInto for InetAddress { type Error = crate::error::InvalidParameterError; + #[inline(always)] + fn try_into(self) -> Result { + (&self).try_into() + } +} + +impl TryInto for &InetAddress { + type Error = crate::error::InvalidParameterError; + #[inline(always)] fn try_into(self) -> Result { match unsafe { self.sa.sa_family } { @@ -114,6 +150,15 @@ impl TryInto for InetAddress { impl TryInto for InetAddress { type Error = crate::error::InvalidParameterError; + #[inline(always)] + fn try_into(self) -> Result { + (&self).try_into() + } +} + +impl TryInto for &InetAddress { + type Error = crate::error::InvalidParameterError; + #[inline(always)] fn try_into(self) -> Result { unsafe { @@ -129,6 +174,15 @@ impl TryInto for InetAddress { impl TryInto for InetAddress { type Error = crate::error::InvalidParameterError; + #[inline(always)] + fn try_into(self) -> Result { + (&self).try_into() + } +} + +impl TryInto for &InetAddress { + type Error = crate::error::InvalidParameterError; + #[inline(always)] fn try_into(self) -> Result { unsafe { @@ -143,6 +197,15 @@ impl TryInto for InetAddress { impl TryInto for InetAddress { type Error = crate::error::InvalidParameterError; + #[inline(always)] + fn try_into(self) -> Result { + (&self).try_into() + } +} + +impl TryInto for &InetAddress { + type Error = crate::error::InvalidParameterError; + #[inline(always)] fn try_into(self) -> Result { unsafe { @@ -328,6 +391,9 @@ impl<'de> Deserialize<'de> for InetAddress { } impl InetAddress { + pub const AF_INET: u8 = AF_INET; + pub const AF_INET6: u8 = AF_INET6; + /// Get a new zero/nil InetAddress. #[inline(always)] pub fn new() -> InetAddress { diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index c9c23166e..70f717c1b 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::hash::Hash; -use std::num::NonZeroI64; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -12,11 +11,13 @@ use parking_lot::{Mutex, RwLock}; use crate::error::InvalidParameterError; use crate::util::gate::IntervalGate; +use crate::util::zt_trace; use crate::vl1::path::Path; use crate::vl1::peer::Peer; use crate::vl1::protocol::*; use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue}; use crate::vl1::{Address, Endpoint, Identity, RootSet}; +use crate::Event; /// Trait implemented by external code to handle events and provide an interface to the system or application. /// @@ -24,25 +25,16 @@ use crate::vl1::{Address, Endpoint, Identity, RootSet}; /// during calls to things like wire_recieve() and do_background_tasks(). pub trait SystemInterface: Sync + Send + 'static { /// Type for local system sockets. - type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone; + type LocalSocket: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; /// Type for local system interfaces. - type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone; + type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; - /// Node is up and ready for operation. - fn event_node_is_up(&self); - - /// Node is shutting down. - fn event_node_is_down(&self); - - /// Node has gone online or offline. - fn event_online_status_change(&self, online: bool); + /// An event occurred. + fn event(&self, event: Event); /// A USER_MESSAGE packet was received. - fn event_user_message(&self, source: &Identity, message_type: u64, message: &[u8]); - - /// VL1 core generated a security warning. - fn event_security_warning(&self, warning: &str); + fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]); /// Check a local socket for validity. /// @@ -75,7 +67,7 @@ pub trait SystemInterface: Sync + Send + 'static { fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool; /// Called to look up any statically defined or memorized paths to known nodes. - fn get_path_hints(&self, id: &Identity) -> Option, Option)>>; + fn get_path_hints(&self, id: &Identity) -> Option, Option)>>; /// Called to get the current time in milliseconds from the system monotonically increasing clock. /// This needs to be accurate to about 250 milliseconds resolution or better. @@ -119,8 +111,8 @@ struct BackgroundTaskIntervals { whois: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>, paths: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>, peers: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>, - root_sync: IntervalGate, - root_hello: IntervalGate, + root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, + root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>, } struct RootInfo { @@ -132,7 +124,7 @@ struct RootInfo { /// A VL1 global P2P network node. pub struct Node { /// A random ID generated to identify this particular running instance. - pub instance_id: u64, + pub instance_id: [u8; 16], /// This node's identity and permanent keys. pub identity: Identity, @@ -161,7 +153,7 @@ pub struct Node { impl Node { /// Create a new Node. - pub fn new(si: &SI, auto_generate_identity: bool) -> Result { + pub fn new(si: &SI, auto_generate_identity: bool, auto_upgrade_identity: bool) -> Result { let mut id = { let id = si.load_node_identity(); if id.is_none() { @@ -169,6 +161,7 @@ impl Node { return Err(InvalidParameterError("no identity found and auto-generate not enabled")); } else { let id = Identity::generate(); + si.event(Event::IdentityAutoGenerated(id.clone())); si.save_node_identity(&id); id } @@ -177,13 +170,16 @@ impl Node { } }; - // Automatically upgrade old type identities to add P-384 keys. - if id.upgrade()? { - si.save_node_identity(&id); + if auto_upgrade_identity { + let old = id.clone(); + if id.upgrade()? { + si.save_node_identity(&id); + si.event(Event::IdentityAutoUpgraded(old, id.clone())); + } } Ok(Self { - instance_id: zerotier_core_crypto::random::next_u64_secure(), + instance_id: zerotier_core_crypto::random::get_bytes_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), paths: DashMap::new(), @@ -206,15 +202,14 @@ impl Node { } /// Get a peer by address. - #[inline(always)] pub fn peer(&self, a: Address) -> Option>> { self.peers.get(&a).map(|peer| peer.value().clone()) } /// Run background tasks and return desired delay until next call in milliseconds. /// - /// This should only be called periodically from a single thread, but that thread can be - /// different each time. Calling it concurrently won't crash but won't accomplish anything. + /// This shouldn't be called concurrently by more than one loop. Doing so would be harmless + /// but would be a waste of compute cycles. pub fn do_background_tasks(&self, si: &SI) -> Duration { let mut intervals = self.intervals.lock(); let tt = si.time_ticks(); @@ -225,7 +220,11 @@ impl Node { // Sychronize root info with root sets info if the latter has changed. if *sets_modified { *sets_modified = false; - roots.clear(); + zt_trace!(si, "root sets modified, rescanning..."); + + let mut old_root_identities: Vec = roots.drain().map(|r| r.0.identity.clone()).collect(); + let mut new_root_identities = Vec::new(); + let mut colliding_root_addresses = Vec::new(); // see security note below for (_, rc) in sets.iter() { for m in rc.members.iter() { @@ -252,16 +251,28 @@ impl Node { Ok(root_peer_entry) } else { colliding_root_addresses.push(m.identity.address); - si.event_security_warning( - format!("address/identity collision between root {} (from root cluster definition '{}') and known peer {}", m.identity.address.to_string(), rc.name, rp.identity.to_string()).as_str(), - ); + si.event(Event::SecurityWarning(format!( + "address/identity collision between root {} (from root cluster definition '{}') and known peer {}, ignoring this root!", + m.identity.address.to_string(), + rc.name, + rp.identity.to_string() + ))); Err(crate::error::UnexpectedError) } }) - .map(|r| roots.insert(r.value().clone(), m.endpoints.as_ref().unwrap().iter().map(|e| e.clone()).collect())); + .map(|r| { + new_root_identities.push(r.value().identity.clone()); + roots.insert(r.value().clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); + }); } } } + + old_root_identities.sort_unstable(); + new_root_identities.sort_unstable(); + if !old_root_identities.eq(&new_root_identities) { + si.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); + } } // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint @@ -272,6 +283,7 @@ impl Node { if intervals.root_hello.gate(tt) { for (root, endpoints) in roots.iter() { for ep in endpoints.iter() { + zt_trace!(si, "sending HELLO to root {}", root.identity.address.to_string()); root.send_hello(si, self, Some(ep)); } } @@ -300,7 +312,8 @@ impl Node { } if intervals.paths.gate(tt) { - // Service all paths, removing expired or invalid ones. + // Service all paths, removing expired or invalid ones. This is done in two passes to + // avoid introducing latency into a flow. self.paths.retain(|_, pbs| { let mut expired_paths = Vec::new(); for (ls, path) in pbs.read().iter() { @@ -319,14 +332,16 @@ impl Node { } if intervals.whois.gate(tt) { - let _ = self.whois.service(si, self, tt); + self.whois.service(si, self, tt); } Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2) } /// Called when a packet is received on the physical wire. - pub fn wire_receive(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) { + pub fn handle_incoming_physical_packet(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) { + zt_trace!(si, "<< incoming packet from {} length {} via socket {}@{}", source_endpoint.to_string(), data.len(), source_local_socket.to_string(), source_local_interface.to_string()); + if let Ok(fragment_header) = data.struct_mut_at::(0) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { let time_ticks = si.time_ticks(); @@ -339,7 +354,7 @@ impl Node { if fragment_header.is_fragment() { if let Some(assembled_packet) = path.receive_fragment(u64::from_ne_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) { if let Some(frag0) = assembled_packet.frags[0].as_ref() { - // Fragmented packet is fully assembled. + zt_trace!(si, "fragmented packet fully assembled!"); let packet_header = frag0.struct_at::(0); if packet_header.is_ok() { @@ -356,7 +371,7 @@ impl Node { } } else { if let Ok(packet_header) = data.struct_at::(0) { - // Packet is not fragmented. + zt_trace!(si, "parsing unfragmented packet"); if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { @@ -389,12 +404,11 @@ impl Node { peer.forward(si, time_ticks, data.as_ref()); } } - }; + } } } /// Get the current best root peer that we should use for WHOIS, relaying, etc. - #[inline(always)] pub fn root(&self) -> Option>> { self.best_root.read().clone() } diff --git a/zerotier-network-hypervisor/src/vl1/path.rs b/zerotier-network-hypervisor/src/vl1/path.rs index 5142de58c..096574128 100644 --- a/zerotier-network-hypervisor/src/vl1/path.rs +++ b/zerotier-network-hypervisor/src/vl1/path.rs @@ -1,6 +1,7 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. use std::collections::HashMap; +use std::hash::{BuildHasher, Hasher}; use std::sync::atomic::{AtomicI64, Ordering}; use parking_lot::Mutex; @@ -21,10 +22,10 @@ pub struct Path { pub endpoint: Endpoint, pub local_socket: SI::LocalSocket, pub local_interface: SI::LocalInterface, - pub(crate) last_send_time_ticks: AtomicI64, - pub(crate) last_receive_time_ticks: AtomicI64, - pub(crate) create_time_ticks: i64, - fragmented_packets: Mutex>, + last_send_time_ticks: AtomicI64, + last_receive_time_ticks: AtomicI64, + create_time_ticks: i64, + fragmented_packets: Mutex>, } impl Path { @@ -36,7 +37,7 @@ impl Path { last_send_time_ticks: AtomicI64::new(0), last_receive_time_ticks: AtomicI64::new(0), create_time_ticks: time_ticks, - fragmented_packets: Mutex::new(HashMap::with_capacity_and_hasher(4, U64NoOpHasher::new())), + fragmented_packets: Mutex::new(HashMap::with_capacity_and_hasher(4, PacketIdHasher(zerotier_core_crypto::random::xorshift64_random()))), } } @@ -92,3 +93,36 @@ impl Path { } } } + +#[repr(transparent)] +struct PacketIdHasher(u64); + +impl Hasher for PacketIdHasher { + #[inline(always)] + fn finish(&self) -> u64 { + self.0 + } + + #[inline(always)] + fn write(&mut self, _: &[u8]) { + panic!("u64 only"); + } + + #[inline(always)] + fn write_u64(&mut self, i: u64) { + let mut x = self.0.wrapping_add(i); + x ^= x.wrapping_shl(13); + x ^= x.wrapping_shr(7); + x ^= x.wrapping_shl(17); + self.0 = x; + } +} + +impl BuildHasher for PacketIdHasher { + type Hasher = Self; + + #[inline(always)] + fn build_hasher(&self) -> Self::Hasher { + Self(0) + } +} diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index e30dceba4..8728f5647 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -46,10 +46,10 @@ pub struct Peer { paths: Mutex>>, // Statistics and times of events. - pub(crate) last_send_time_ticks: AtomicI64, - pub(crate) last_receive_time_ticks: AtomicI64, + last_send_time_ticks: AtomicI64, + last_receive_time_ticks: AtomicI64, pub(crate) last_hello_reply_time_ticks: AtomicI64, - pub(crate) last_forward_time_ticks: AtomicI64, + last_forward_time_ticks: AtomicI64, // Counter for assigning sequential message IDs. message_id_counter: AtomicU64, @@ -442,7 +442,7 @@ impl Peer { // because the whole packet is authenticated. Data in the session is not technically secret in a // cryptographic sense but we encrypt it for privacy and as a defense in depth. let mut fields = Dictionary::new(); - fields.set_u64(session_metadata::INSTANCE_ID, node.instance_id); + fields.set_bytes(session_metadata::INSTANCE_ID, node.instance_id.to_vec()); fields.set_u64(session_metadata::CLOCK, si.time_clock() as u64); fields.set_bytes(session_metadata::SENT_TO, destination.to_buffer::<{ Endpoint::MAX_MARSHAL_SIZE }>().unwrap().as_bytes().to_vec()); let fields = fields.to_bytes(); diff --git a/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs b/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs index 48a5b9b34..f88b2164e 100644 --- a/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs +++ b/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs @@ -1,6 +1,6 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::AtomicUsize; use zerotier_core_crypto::aes_gmac_siv::AesGmacSiv; use zerotier_core_crypto::kbkdf::*; @@ -22,9 +22,6 @@ pub(crate) struct SymmetricSecret { /// Key used for HMAC extended validation on packets like HELLO. pub packet_hmac_key: Secret<64>, - /// Key used with ephemeral keying/re-keying. - pub ephemeral_ratchet_key: Secret<64>, - /// Pool of keyed AES-GMAC-SIV engines (pooled to avoid AES re-init every time). pub aes_gmac_siv: Pool, } @@ -34,14 +31,12 @@ impl SymmetricSecret { pub fn new(key: Secret<64>) -> SymmetricSecret { let hello_private_section_key = zt_kbkdf_hmac_sha384(&key.0, security_constants::KBKDF_KEY_USAGE_LABEL_HELLO_PRIVATE_SECTION); let packet_hmac_key = zt_kbkdf_hmac_sha512(&key.0, security_constants::KBKDF_KEY_USAGE_LABEL_PACKET_HMAC); - let ephemeral_ratchet_key = zt_kbkdf_hmac_sha512(&key.0, security_constants::KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_KEY); let aes_factory = AesGmacSivPoolFactory(zt_kbkdf_hmac_sha384(&key.0[..48], security_constants::KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K0).first_n(), zt_kbkdf_hmac_sha384(&key.0[..48], security_constants::KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K1).first_n()); SymmetricSecret { key, hello_private_section_key, packet_hmac_key, - ephemeral_ratchet_key, aes_gmac_siv: Pool::new(2, aes_factory), } } @@ -50,24 +45,7 @@ impl SymmetricSecret { /// An ephemeral symmetric secret with usage timers and counters. pub(crate) struct EphemeralSymmetricSecret { pub secret: SymmetricSecret, - pub rekey_time_ticks: i64, - pub expire_time_ticks: i64, - pub ratchet_count: u64, - pub encrypt_uses: AtomicUsize, pub decrypt_uses: AtomicUsize, - pub fips_compliant_exchange: bool, -} - -impl EphemeralSymmetricSecret { - #[inline(always)] - pub fn should_rekey(&self, time_ticks: i64) -> bool { - time_ticks >= self.rekey_time_ticks || self.encrypt_uses.load(Ordering::Relaxed).max(self.decrypt_uses.load(Ordering::Relaxed)) >= security_constants::EPHEMERAL_SECRET_REKEY_AFTER_USES - } - - #[inline(always)] - pub fn is_expired(&self, time_ticks: i64) -> bool { - time_ticks >= self.expire_time_ticks || self.encrypt_uses.load(Ordering::Relaxed).max(self.decrypt_uses.load(Ordering::Relaxed)) >= security_constants::EPHEMERAL_SECRET_REJECT_AFTER_USES - } } pub(crate) struct AesGmacSivPoolFactory(Secret<32>, Secret<32>); diff --git a/zerotier-network-hypervisor/src/vl1/whoisqueue.rs b/zerotier-network-hypervisor/src/vl1/whoisqueue.rs index 0dc1e9972..3d4ff3e43 100644 --- a/zerotier-network-hypervisor/src/vl1/whoisqueue.rs +++ b/zerotier-network-hypervisor/src/vl1/whoisqueue.rs @@ -62,7 +62,7 @@ impl WhoisQueue { todo!() } - pub(crate) fn service(&self, si: &SI, node: &Node, time_ticks: i64) -> bool { + pub(crate) fn service(&self, si: &SI, node: &Node, time_ticks: i64) { let mut targets: Vec
= Vec::new(); self.0.lock().retain(|target, qi| { if qi.retry_count < WHOIS_RETRY_MAX { @@ -78,6 +78,5 @@ impl WhoisQueue { if !targets.is_empty() { self.send_whois(node, si, targets.as_slice()); } - true } } diff --git a/zerotier-network-hypervisor/src/vl2/networkid.rs b/zerotier-network-hypervisor/src/vl2/networkid.rs index 45e5e264f..6d80ad8b0 100644 --- a/zerotier-network-hypervisor/src/vl2/networkid.rs +++ b/zerotier-network-hypervisor/src/vl2/networkid.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::error::InvalidFormatError; use crate::util::buffer::Buffer; use crate::util::hex::HEX_CHARS; +use crate::util::marshalable::Marshalable; #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] #[repr(transparent)] @@ -43,15 +44,19 @@ impl NetworkId { pub fn to_u64(&self) -> u64 { self.0.get() } +} + +impl Marshalable for NetworkId { + const MAX_MARSHAL_SIZE: usize = 8; #[inline(always)] - pub(crate) fn marshal(&self, buf: &mut Buffer) -> std::io::Result<()> { + fn marshal(&self, buf: &mut Buffer) -> std::io::Result<()> { buf.append_u64(self.0.get()) } #[inline(always)] - pub(crate) fn unmarshal(buf: &Buffer, cursor: &mut usize) -> std::io::Result> { - Ok(Self::from_u64(buf.read_u64(cursor)?)) + fn unmarshal(buf: &Buffer, cursor: &mut usize) -> std::io::Result { + Self::from_u64(buf.read_u64(cursor)?).map_or_else(|| Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "cannot be zero")), |a| Ok(a)) } } diff --git a/zerotier-system-service/Cargo.lock b/zerotier-system-service/Cargo.lock index 6ce71ee85..b5cb17645 100644 --- a/zerotier-system-service/Cargo.lock +++ b/zerotier-system-service/Cargo.lock @@ -1065,6 +1065,7 @@ dependencies = [ "clap", "lazy_static", "libc", + "log", "num-traits", "parking_lot", "serde", diff --git a/zerotier-system-service/Cargo.toml b/zerotier-system-service/Cargo.toml index 7120a0b90..8432d1e71 100644 --- a/zerotier-system-service/Cargo.toml +++ b/zerotier-system-service/Cargo.toml @@ -21,6 +21,7 @@ serde_json = { version = "^1", features = ["std"], default-features = false } parking_lot = "^0" lazy_static = "^1" clap = { version = "^3", features = ["std", "suggestions"], default-features = false } +log = "^0" [target."cfg(windows)".dependencies] winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] } diff --git a/zerotier-system-service/src/datadir.rs b/zerotier-system-service/src/datadir.rs index 5a6a39745..3b522fa20 100644 --- a/zerotier-system-service/src/datadir.rs +++ b/zerotier-system-service/src/datadir.rs @@ -86,6 +86,7 @@ impl DataDir { tmp.push(AUTH_TOKEN_POSSIBLE_CHARS.as_bytes()[(next_u32_secure() as usize) % AUTH_TOKEN_POSSIBLE_CHARS.len()] as char); } tokio::fs::write(&authtoken_path, tmp.as_bytes()).await?; + assert!(crate::utils::fs_restrict_permissions(&authtoken_path)); *authtoken = tmp; } else { *authtoken = String::from_utf8_lossy(authtoken_bytes.unwrap().as_slice()).into(); diff --git a/zerotier-system-service/src/getifaddrs.rs b/zerotier-system-service/src/getifaddrs.rs index 730d9244c..8b0e993eb 100644 --- a/zerotier-system-service/src/getifaddrs.rs +++ b/zerotier-system-service/src/getifaddrs.rs @@ -5,6 +5,8 @@ use std::ptr::{copy_nonoverlapping, null_mut}; use zerotier_network_hypervisor::vl1::InetAddress; +use crate::localinterface::LocalInterface; + #[allow(unused)] #[inline(always)] fn s6_addr_as_ptr(a: &A) -> *const A { @@ -13,7 +15,7 @@ fn s6_addr_as_ptr(a: &A) -> *const A { /// Call supplied function or closure for each physical IP address in the system. #[cfg(unix)] -pub fn for_each_address(mut f: F) { +pub fn for_each_address(mut f: F) { unsafe { let mut ifa_name = [0_u8; libc::IFNAMSIZ as usize]; let mut ifap: *mut libc::ifaddrs = null_mut(); @@ -66,7 +68,7 @@ pub fn for_each_address(mut f: F) { if namlen > 0 { let dev = String::from_utf8_lossy(&ifa_name[0..namlen]); if dev.len() > 0 { - f(&a, dev.as_ref()); + f(&a, &LocalInterface::from_unix_interface_name(dev.as_ref())); } } } @@ -79,12 +81,13 @@ pub fn for_each_address(mut f: F) { #[cfg(test)] mod tests { + use crate::localinterface::LocalInterface; use zerotier_network_hypervisor::vl1::InetAddress; #[test] fn test_getifaddrs() { println!("starting getifaddrs..."); - crate::vnic::getifaddrs::for_each_address(|a: &InetAddress, dev: &str| println!(" {} {}", dev, a.to_string())); + crate::getifaddrs::for_each_address(|a: &InetAddress, interface: &LocalInterface| println!(" {} {}", interface.to_string(), a.to_string())); println!("done.") } } diff --git a/zerotier-system-service/src/localconfig.rs b/zerotier-system-service/src/localconfig.rs index 15aca8248..7a09ec0f9 100644 --- a/zerotier-system-service/src/localconfig.rs +++ b/zerotier-system-service/src/localconfig.rs @@ -75,14 +75,21 @@ impl Default for NetworkSettings { #[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] pub struct GlobalSettings { + /// Primary ZeroTier port that is always bound, default is 9993. #[serde(rename = "primaryPort")] pub primary_port: u16, + + /// Enable uPnP, NAT-PMP, and other router port mapping technologies? #[serde(rename = "portMapping")] pub port_mapping: bool, + + /// Interface name prefix blacklist for local bindings (not remote IPs). #[serde(rename = "interfacePrefixBlacklist")] pub interface_prefix_blacklist: Vec, - #[serde(rename = "explicitAddresses")] - pub explicit_addresses: Vec, + + /// IP/bits CIDR blacklist for local bindings (not remote IPs). + #[serde(rename = "cidrBlacklist")] + pub cidr_blacklist: Vec, } impl Default for GlobalSettings { @@ -97,7 +104,7 @@ impl Default for GlobalSettings { primary_port: DEFAULT_PORT, port_mapping: true, interface_prefix_blacklist: bl, - explicit_addresses: Vec::new(), + cidr_blacklist: Vec::new(), } } } diff --git a/zerotier-system-service/src/localinterface.rs b/zerotier-system-service/src/localinterface.rs new file mode 100644 index 000000000..fedd0d23a --- /dev/null +++ b/zerotier-system-service/src/localinterface.rs @@ -0,0 +1,41 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +use std::hash::Hash; + +/// Lightweight container for local system network interface names/IDs. +/// +/// On *nix systems this will be an interface like 'eth0' stuffed into a u128. On Windows +/// this will be a network device GUID, which is also 128-bit. This will need to be revised +/// if there are OSes out there that use interface names or IDs longer than 16 bytes. The +/// point here is to have something tiny and cheap for the core to store internally. +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[repr(transparent)] +pub struct LocalInterface(u128); + +impl LocalInterface { + #[cfg(unix)] + pub fn from_unix_interface_name(name: &str) -> Self { + let mut tmp = [0_u8; 16]; + let nb = name.as_bytes(); + let l = nb.len(); + assert!(l <= 16); // do any *nix OSes have device names longer than 16 bytes? + tmp[..l].copy_from_slice(&nb[..l]); + Self(u128::from_le_bytes(tmp)) + } +} + +impl ToString for LocalInterface { + #[cfg(unix)] + fn to_string(&self) -> String { + let b = self.0.to_le_bytes(); + let mut l = 0; + for _ in 0..16 { + if b[l] > 0 { + l += 1; + } else { + break; + } + } + String::from_utf8_lossy(&b[..l]).to_string() + } +} diff --git a/zerotier-system-service/src/main.rs b/zerotier-system-service/src/main.rs index a98505528..bd4626de5 100644 --- a/zerotier-system-service/src/main.rs +++ b/zerotier-system-service/src/main.rs @@ -6,6 +6,7 @@ pub mod exitcode; pub mod getifaddrs; pub mod jsonformatter; pub mod localconfig; +pub mod localinterface; pub mod service; pub mod udp; pub mod utils; @@ -122,7 +123,7 @@ pub struct Flags { async fn async_main(flags: Flags, global_args: Box) -> i32 { #[allow(unused)] - return match global_args.subcommand() { + match global_args.subcommand() { Some(("help", _)) => { print_help(); exitcode::OK @@ -138,8 +139,9 @@ async fn async_main(flags: Flags, global_args: Box) -> i32 { Some(("join", cmd_args)) => todo!(), Some(("leave", cmd_args)) => todo!(), Some(("service", _)) => { - drop(global_args); // free unnecessary heap - assert!(service::Service::new(flags.base_path.as_str()).await.is_ok()); + drop(global_args); // free unnecessary heap before starting service as we're done with CLI args + assert!(service::Service::new(tokio::runtime::Handle::current(), &flags.base_path, true).await.is_ok()); + exitcode::OK } Some(("identity", cmd_args)) => todo!(), Some(("rootset", cmd_args)) => cli::rootset::cmd(flags, cmd_args).await, @@ -147,7 +149,7 @@ async fn async_main(flags: Flags, global_args: Box) -> i32 { eprintln!("Invalid command line. Use 'help' for help."); exitcode::ERR_USAGE } - }; + } } fn main() { diff --git a/zerotier-system-service/src/service.rs b/zerotier-system-service/src/service.rs index f71b92ac0..030976753 100644 --- a/zerotier-system-service/src/service.rs +++ b/zerotier-system-service/src/service.rs @@ -1,72 +1,153 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::error::Error; use std::hash::Hash; -use std::num::NonZeroI64; -use std::sync::atomic::AtomicUsize; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Weak}; -use tokio::net::UdpSocket; - use zerotier_network_hypervisor::vl1::*; use zerotier_network_hypervisor::vl2::*; use zerotier_network_hypervisor::*; +use zerotier_core_crypto::random; + +use tokio::time::Duration; + use crate::datadir::DataDir; -use crate::udp::BoundUdpSocket; +use crate::localinterface::LocalInterface; +use crate::udp::{BoundUdpPort, BoundUdpSocket}; use crate::utils::{ms_monotonic, ms_since_epoch}; -pub type DynamicError = Box; +const UDP_UPDATE_BINDINGS_INTERVAL_MS: Duration = Duration::from_millis(2500); +/// ZeroTier system service, which presents virtual networks as VPN connections. pub struct Service { + udp_binding_task: tokio::task::JoinHandle<()>, + core_background_service_task: tokio::task::JoinHandle<()>, + internal: Arc, +} + +struct ServiceImpl { pub rt: tokio::runtime::Handle, pub data: DataDir, pub local_socket_unique_id_counter: AtomicUsize, - pub udp_sockets: parking_lot::RwLock>>>, - pub core: Option>, + pub udp_sockets: tokio::sync::RwLock>, + pub num_listeners_per_socket: usize, + _core: Option>, +} + +impl Drop for Service { + fn drop(&mut self) { + self.internal.rt.block_on(async { + // Kill all background tasks associated with this service. + self.udp_binding_task.abort(); + self.core_background_service_task.abort(); + + // Wait for all tasks to actually stop. + let _ = self.udp_binding_task.await; + let _ = self.core_background_service_task.await; + + // Drop all bound sockets since these can hold circular Arc<> references to 'internal'. + self.internal.udp_sockets.write().await.clear(); + }); + } } impl Service { - pub async fn new(base_path: &str) -> Result { - let mut svc = Self { - rt: tokio::runtime::Handle::current(), + pub async fn new>(rt: tokio::runtime::Handle, base_path: P, auto_upgrade_identity: bool) -> Result> { + let mut si = ServiceImpl { + rt, data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?, local_socket_unique_id_counter: AtomicUsize::new(1), - udp_sockets: parking_lot::RwLock::new(HashMap::with_capacity(4)), - core: None, + udp_sockets: tokio::sync::RwLock::new(HashMap::with_capacity(4)), + num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(), + _core: None, }; - let _ = svc.core.insert(NetworkHypervisor::new(&svc, true).map_err(|e| Box::new(e))?); + let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity)?); + let si = Arc::new(si); - let config = svc.data.config().await; - - Ok(svc) + let (si1, si2) = (si.clone(), si.clone()); + Ok(Self { + udp_binding_task: si.rt.spawn(si1.udp_binding_task_main()), + core_background_service_task: si.rt.spawn(si2.core_background_service_task_main()), + internal: si, + }) } } -/// Local socket wrapper implementing equality and hash in terms of an arbitrary unique ID. -#[derive(Clone)] -struct LocalSocket(Weak, usize); - -impl PartialEq for LocalSocket { +impl ServiceImpl { #[inline(always)] - fn eq(&self, other: &Self) -> bool { - self.1 == other.1 + fn core(&self) -> &NetworkHypervisor { + debug_assert!(self._core.is_some()); + unsafe { self._core.as_ref().unwrap_unchecked() } + } + + async fn update_bindings_for_port(self: &Arc, port: u16, interface_prefix_blacklist: &Vec, cidr_blacklist: &Vec) -> Option> { + let mut udp_sockets = self.udp_sockets.write().await; + let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port)); + let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist); + if bp.sockets.is_empty() { + return Some(errors); + } + drop(udp_sockets); // release lock + + for ns in new_sockets.iter() { + // We start a task for each CPU core. Tokio multiplexes but since each packet takes a bit of CPU + // to parse, decrypt, etc. we want to be able to saturate the CPU for any given socket to virtual + // network path. The alternative would be to use MPMC channels but that would almost certainly be + // a lot slower as it would involve more sync/atomic bottlenecks and probably extra malloc/free. + let mut kill_on_drop = ns.kill_on_drop.lock(); + for _ in 0..self.num_listeners_per_socket { + let self2 = self.clone(); + let socket = ns.socket.clone(); + let interface = ns.interface.clone(); + let local_socket = LocalSocket(Arc::downgrade(ns), self.local_socket_unique_id_counter.fetch_add(1, Ordering::SeqCst)); + kill_on_drop.push(self.rt.spawn(async move { + let core = self2.core(); + loop { + let mut buf = core.get_packet_buffer(); + if let Ok((bytes, source)) = socket.recv_from(unsafe { buf.entire_buffer_mut() }).await { + unsafe { buf.set_size_unchecked(bytes) }; + core.handle_incoming_physical_packet(&self2, &Endpoint::IpUdp(InetAddress::from(source)), &local_socket, &interface, buf); + } else { + break; + } + } + })); + } + } + + return None; + } + + async fn udp_binding_task_main(self: Arc) { + loop { + let config = self.data.config().await; + + if let Some(errors) = self.update_bindings_for_port(config.settings.primary_port, &config.settings.interface_prefix_blacklist, &config.settings.cidr_blacklist).await { + for e in errors.iter() { + println!("BIND ERROR: {} {} {}", e.0.to_string(), e.1.to_string(), e.2.to_string()); + } + // TODO: report errors properly + } + + tokio::time::sleep(UDP_UPDATE_BINDINGS_INTERVAL_MS).await; + } + } + + async fn core_background_service_task_main(self: Arc) { + tokio::time::sleep(Duration::from_secs(1)).await; + loop { + tokio::time::sleep(self.core().do_background_tasks(&self)).await; + } } } -impl Eq for LocalSocket {} - -impl Hash for LocalSocket { - #[inline(always)] - fn hash(&self, state: &mut H) { - self.1.hash(state) - } -} - -impl SystemInterface for Service { +impl SystemInterface for ServiceImpl { type LocalSocket = crate::service::LocalSocket; - - type LocalInterface = String; + type LocalInterface = crate::localinterface::LocalInterface; fn event_node_is_up(&self) {} @@ -91,7 +172,55 @@ impl SystemInterface for Service { } fn wire_send(&self, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>, data: &[&[u8]], packet_ttl: u8) -> bool { - todo!() + match endpoint { + Endpoint::IpUdp(address) => { + // This is the fast path -- the socket is known to the core so just send it. + if let Some(s) = local_socket { + if let Some(s) = s.0.upgrade() { + return s.send_sync_nonblock(&self.rt, address, data, packet_ttl); + } else { + return false; + } + } + + // Otherwise we try to send from one socket on every interface or from the specified interface. + // This path only happens when the core is trying new endpoints. The fast path is for most packets. + return self.rt.block_on(async { + let sockets = self.udp_sockets.read().await; + if !sockets.is_empty() { + if let Some(specific_interface) = local_interface { + for (_, p) in sockets.iter() { + for s in p.sockets.iter() { + if s.interface.eq(specific_interface) { + if s.send_async(&self.rt, address, data, packet_ttl).await { + return true; + } + } + } + } + } else { + let bound_ports: Vec<&u16> = sockets.keys().collect(); + let mut sent_on_interfaces = HashSet::with_capacity(4); + let rn = random::xorshift64_random() as usize; + for i in 0..bound_ports.len() { + let p = sockets.get(*bound_ports.get(rn.wrapping_add(i) % bound_ports.len()).unwrap()).unwrap(); + for s in p.sockets.iter() { + if !sent_on_interfaces.contains(&s.interface) { + if s.send_async(&self.rt, address, data, packet_ttl).await { + sent_on_interfaces.insert(s.interface.clone()); + } + } + } + } + return !sent_on_interfaces.is_empty(); + } + } + return false; + }); + } + _ => {} + } + return false; } fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool { @@ -113,6 +242,54 @@ impl SystemInterface for Service { } } -impl SwitchInterface for Service {} +impl SwitchInterface for ServiceImpl {} -impl Interface for Service {} +impl Interface for ServiceImpl {} + +/// Local socket wrapper to provide to the core. +/// +/// This implements very fast hash and equality in terms of an arbitrary unique ID assigned at +/// construction and holds a weak reference to the bound socket so dead sockets will silently +/// cease to exist or work. This also means that this code can check the weak count to determine +/// if the core is currently holding/using a socket for any reason. +#[derive(Clone)] +pub struct LocalSocket(Weak, usize); + +impl LocalSocket { + /// Returns true if the wrapped socket appears to be in use by the core. + #[inline(always)] + pub fn in_use(&self) -> bool { + self.0.weak_count() > 0 + } + + #[inline(always)] + pub fn socket(&self) -> Option> { + self.0.upgrade() + } +} + +impl PartialEq for LocalSocket { + #[inline(always)] + fn eq(&self, other: &Self) -> bool { + self.1 == other.1 + } +} + +impl Eq for LocalSocket {} + +impl Hash for LocalSocket { + #[inline(always)] + fn hash(&self, state: &mut H) { + self.1.hash(state) + } +} + +impl ToString for LocalSocket { + fn to_string(&self) -> String { + if let Some(s) = self.0.upgrade() { + s.address.to_string() + } else { + "(closed socket)".into() + } + } +} diff --git a/zerotier-system-service/src/udp.rs b/zerotier-system-service/src/udp.rs index 8a9628414..d766c6bf2 100644 --- a/zerotier-system-service/src/udp.rs +++ b/zerotier-system-service/src/udp.rs @@ -1,83 +1,239 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. -use std::collections::{HashMap, HashSet}; -use std::hash::Hash; -use std::num::NonZeroI64; +use std::collections::HashMap; +#[allow(unused_imports)] +use std::mem::{size_of, transmute, MaybeUninit}; +#[allow(unused_imports)] +use std::net::SocketAddr; +#[allow(unused_imports)] +use std::os::raw::*; +#[allow(unused_imports)] +use std::ptr::{null, null_mut}; use std::sync::Arc; #[cfg(unix)] use std::os::unix::io::{FromRawFd, RawFd}; -use lazy_static::lazy_static; +use crate::getifaddrs; +use crate::localinterface::LocalInterface; #[allow(unused_imports)] use num_traits::AsPrimitive; -use crate::getifaddrs; +use zerotier_network_hypervisor::vl1::{InetAddress, IpScope}; -use zerotier_network_hypervisor::vl1::inetaddress::{InetAddress, IpScope}; +/// A local port to which one or more UDP sockets is bound. +/// +/// To bind a port we must bind sockets to each interface/IP pair directly. Sockets must +/// be "hard" bound to the interface so default route override can work. +pub struct BoundUdpPort { + pub sockets: Vec>, + pub port: u16, +} -/// A locally bound UDP socket. +/// A socket bound to a specific interface and IP. pub struct BoundUdpSocket { - /// Locally bound address. + /// Local IP address to which this socket is bound. pub address: InetAddress, - /// Locally bound (to device) socket. - pub socket: tokio::net::UdpSocket, - /// Local interface device name or other unique identifier (OS-specific). - pub interface: String, - /// Raw socket FD, which only remains valid as long as 'socket' exists. - pub fd: RawFd, - /// Monotonic time of last activity. - pub last_activity_time_ticks: i64, + + /// High-level async socket, but UDP also supports non-blocking sync send. + pub socket: Arc, + + /// Local interface on which socket appears. + pub interface: LocalInterface, + + /// Add tasks here that should be aborted when this socket is closed. + pub kill_on_drop: parking_lot::Mutex>>, + + fd: RawFd, +} + +impl Drop for BoundUdpSocket { + fn drop(&mut self) { + for t in self.kill_on_drop.lock().drain(..) { + t.abort(); + } + } } impl BoundUdpSocket { - /// Update 'sockets' by adding any missing local bindings and removing any that are no longer valid. + #[cfg(unix)] + #[inline(always)] + fn set_ttl(&self, packet_ttl: u8) { + let ttl = packet_ttl as c_int; + unsafe { libc::setsockopt(self.fd.as_(), libc::IPPROTO_IP.as_(), libc::IP_TOS.as_(), (&ttl as *const c_int).cast(), std::mem::size_of::().as_()) }; + } + + #[cfg(any(target_os = "macos", target_os = "freebsd"))] + pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool { + let mut ok = false; + if dest.family() == self.address.family() { + if packet_ttl > 0 && dest.is_ipv4() { + self.set_ttl(packet_ttl); + } + unsafe { + if b.len() == 1 { + let bb = *b.get_unchecked(0); + ok = libc::sendto(self.fd.as_(), bb.as_ptr().cast(), bb.len().as_(), 0, transmute(dest as *const InetAddress), size_of::().as_()) > 0; + } else { + let mut iov: [libc::iovec; 16] = MaybeUninit::uninit().assume_init(); + assert!(b.len() <= iov.len()); + for i in 0..b.len() { + let bb = *b.get_unchecked(i); + let ii = iov.get_unchecked_mut(i); + ii.iov_base = transmute(bb.as_ptr()); + ii.iov_len = bb.len().as_(); + } + let msghdr = libc::msghdr { + msg_name: transmute(dest as *const InetAddress), + msg_namelen: size_of::().as_(), + msg_iov: iov.as_mut_ptr(), + msg_iovlen: b.len().as_(), + msg_control: null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + ok = libc::sendmsg(self.fd.as_(), &msghdr, 0) > 0; + } + } + if packet_ttl > 0 && dest.is_ipv4() { + self.set_ttl(0xff); + } + } + ok + } + + #[cfg(not(any(target_os = "macos", target_os = "freebsd")))] + pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool { + let mut ok = false; + if dest.family() == self.address.family() { + let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() }; + let data = if b.len() == 1 { + *unsafe { b.get_unchecked(0) } + } else { + let mut p = 0; + for bb in b.iter() { + let pp = p + bb.len(); + if pp < 16384 { + tmp[p..pp].copy_from_slice(*bb); + p = pp; + } else { + return false; + } + } + &tmp[..p] + }; + + if packet_ttl > 0 && dest.is_ipv4() { + self.set_ttl(packet_ttl); + } + ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok(); + if packet_ttl > 0 && dest.is_ipv4() { + self.set_ttl(0xff); + } + } + ok + } + + pub async fn send_async(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool { + let mut ok = false; + if dest.family() == self.address.family() { + let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() }; + let data = if b.len() == 1 { + *unsafe { b.get_unchecked(0) } + } else { + let mut p = 0; + for bb in b.iter() { + let pp = p + bb.len(); + if pp < 16384 { + tmp[p..pp].copy_from_slice(*bb); + p = pp; + } else { + return false; + } + } + &tmp[..p] + }; + + if packet_ttl > 0 && dest.is_ipv4() { + self.set_ttl(packet_ttl); + } + let sa: SocketAddr = dest.try_into().unwrap(); + ok = self.socket.send_to(data, sa).await.is_ok(); + if packet_ttl > 0 && dest.is_ipv4() { + self.set_ttl(0xff); + } + } + ok + } +} + +impl BoundUdpPort { + /// Create a new port binding. + /// + /// You must call update_bindings() after this to actually bind to system interfaces. + pub fn new(port: u16) -> Self { + Self { sockets: Vec::new(), port } + } + + /// Synchronize bindings with devices and IPs in system. /// /// Any device or local IP within any of the supplied blacklists is ignored. Multicast or loopback addresses are - /// also ignored. All errors encountered are returned. + /// also ignored. /// - /// This should always be called on the same port for the same socket collection. Calling on the same 'sockets' - /// with different ports will lead to redundant or missed bindings. - /// - /// We must bind directly to each device/address pair for each port so default route override can work. - fn update_bindings_for_port(sockets: &mut Vec>, port: u16, device_prefix_blacklist: &Vec, cidr_blacklist: &Vec) -> Vec { + /// The caller can check the 'sockets' member variable after calling to determine which if any bindings were + /// successful. Any errors that occurred are returned as tuples of (interface, address, error). The second vector + /// returned contains newly bound sockets. + pub fn update_bindings(&mut self, interface_prefix_blacklist: &Vec, cidr_blacklist: &Vec) -> (Vec<(LocalInterface, InetAddress, std::io::Error)>, Vec>) { + let mut existing_bindings: HashMap>> = HashMap::with_capacity(4); + for s in self.sockets.drain(..) { + existing_bindings.entry(s.interface.clone()).or_insert_with(|| HashMap::with_capacity(4)).insert(s.address.clone(), s); + } + let mut errors = Vec::new(); - let mut existing_bind_points: HashMap> = HashMap::with_capacity(id_assignment_state.devices.len() + 1); - let now = crate::utils::ms_monotonic(); - getifaddrs::for_each_address(|address, device| { + let mut new_sockets = Vec::new(); + getifaddrs::for_each_address(|address, interface| { + let interface_str = interface.to_string(); if address.is_ip() && matches!(address.scope(), IpScope::Global | IpScope::PseudoPrivate | IpScope::Private | IpScope::Shared) - && !device_prefix_blacklist.iter().any(|pfx| device.starts_with(pfx.as_str())) + && !interface_prefix_blacklist.iter().any(|pfx| interface_str.starts_with(pfx.as_str())) && !cidr_blacklist.iter().any(|r| address.is_within(r)) { - existing_bind_points.entry(device.to_string()).or_default().push(address.clone()); - if !sockets.iter().any(|_, s| s.address == address || s.local_device_id == did) { - let s = unsafe { bind_udp_to_device(device, address) }; + let mut found = false; + if let Some(byaddr) = existing_bindings.get(interface) { + if let Some(socket) = byaddr.get(address) { + found = true; + self.sockets.push(socket.clone()); + } + } + if !found { + let mut addr_with_port = address.clone(); + addr_with_port.set_port(self.port); + let s = unsafe { bind_udp_to_device(interface_str.as_str(), &addr_with_port) }; if s.is_ok() { let fd = s.unwrap(); let s = tokio::net::UdpSocket::from_std(unsafe { std::net::UdpSocket::from_raw_fd(fd) }); if s.is_ok() { - id_assignment_state.socket_id_counter += 1; - let lsid = NonZeroI64::new(id_assignment_state.socket_id_counter).unwrap(); - sockets.push(Arc::new(BoundUdpSocket { - address: address.clone(), - socket: s.unwrap(), - interface: device.to_string(), + let s = Arc::new(BoundUdpSocket { + address: addr_with_port, + socket: Arc::new(s.unwrap()), + kill_on_drop: parking_lot::Mutex::new(Vec::new()), + interface: interface.clone(), fd, - last_activity_time_ticks: now, - })); + }); + self.sockets.push(s.clone()); + new_sockets.push(s); } else { - errors.push(s.err().unwrap()); + errors.push((interface.clone(), addr_with_port, s.err().unwrap())); } } else { - errors.push(std::io::Error::new(std::io::ErrorKind::AddrInUse, s.err().unwrap())); + errors.push((interface.clone(), addr_with_port, std::io::Error::new(std::io::ErrorKind::Other, s.err().unwrap()))); } } } }); - sockets.retain(|s| existing_bind_points.get(&s.local_interface).map_or(false, |addr_list| addr_list.contains(&s.address))); - errors + + (errors, new_sockets) } } @@ -85,8 +241,8 @@ impl BoundUdpSocket { #[cfg(unix)] unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result { let (af, sa_len) = match address.family() { - InetAddressFamily::IPv4 => (libc::AF_INET, std::mem::size_of::().as_()), - InetAddressFamily::IPv6 => (libc::AF_INET6, std::mem::size_of::().as_()), + InetAddress::AF_INET => (libc::AF_INET, std::mem::size_of::().as_()), + InetAddress::AF_INET6 => (libc::AF_INET6, std::mem::size_of::().as_()), _ => { return Err("unrecognized address family"); } @@ -128,7 +284,7 @@ unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result #[cfg(not(target_os = "linux"))] { fl = 0; - libc::setsockopt(s, libc::IPPROTO_IP.as_(), libc::IP_DF.as_(), (&mut fl as *mut c_int).cast(), std::mem::size_of::().as_()); + libc::setsockopt(s, libc::IPPROTO_IP.as_(), libc::IP_DONTFRAG.as_(), (&mut fl as *mut c_int).cast(), std::mem::size_of::().as_()); } #[cfg(target_os = "linux")] { @@ -143,14 +299,14 @@ unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result } fl = 1048576; - while fl >= 131072 { + while fl >= 65536 { if libc::setsockopt(s, libc::SOL_SOCKET.as_(), libc::SO_RCVBUF.as_(), (&mut fl as *mut c_int).cast(), std::mem::size_of::().as_()) == 0 { break; } fl -= 65536; } fl = 1048576; - while fl >= 131072 { + while fl >= 65536 { if libc::setsockopt(s, libc::SOL_SOCKET.as_(), libc::SO_SNDBUF.as_(), (&mut fl as *mut c_int).cast(), std::mem::size_of::().as_()) == 0 { break; }