From c3ce40b5ba784f590999a4b10e2a1d064ded8bd3 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 8 Jun 2022 19:05:54 -0400 Subject: [PATCH] Wire through RootSet init, a bunch more work, almost ready to test! --- zerotier-core-crypto/Cargo.toml | 4 +- zerotier-network-hypervisor/Cargo.toml | 4 +- zerotier-network-hypervisor/src/event.rs | 6 +- .../src/networkhypervisor.rs | 41 ++++++++- .../src/util/buffer.rs | 87 ++++++++++++++----- zerotier-network-hypervisor/src/util/gate.rs | 10 +-- zerotier-network-hypervisor/src/util/mod.rs | 15 ++-- .../src/vl1/identity.rs | 5 +- zerotier-network-hypervisor/src/vl1/mod.rs | 2 +- zerotier-network-hypervisor/src/vl1/node.rs | 74 ++++++++-------- zerotier-network-hypervisor/src/vl1/path.rs | 4 +- zerotier-network-hypervisor/src/vl1/peer.rs | 8 +- .../src/vl1/protocol.rs | 16 ++++ zerotier-network-hypervisor/src/vl2/mod.rs | 5 +- zerotier-system-service/src/cli/rootset.rs | 6 +- zerotier-system-service/src/localconfig.rs | 12 +-- zerotier-system-service/src/localinterface.rs | 13 ++- zerotier-system-service/src/main.rs | 12 +-- zerotier-system-service/src/service.rs | 74 +++++++++------- zerotier-system-service/src/udp.rs | 8 +- zerotier-system-service/src/utils.rs | 2 + 21 files changed, 261 insertions(+), 147 deletions(-) diff --git a/zerotier-core-crypto/Cargo.toml b/zerotier-core-crypto/Cargo.toml index f513bed50..d95ab9b62 100644 --- a/zerotier-core-crypto/Cargo.toml +++ b/zerotier-core-crypto/Cargo.toml @@ -14,8 +14,8 @@ panic = 'abort' [dependencies] rand_core = "0.5.0" aes-gmac-siv = { path = "../aes-gmac-siv" } -x25519-dalek = { version = "1.2.0", features = ["u64_backend"] } -ed25519-dalek = { version = "1.0.1", features = ["u64_backend"] } +x25519-dalek = { version = "1.2.0", features = ["std", "u64_backend"], default-features = false } +ed25519-dalek = { version = "1.0.1", features = ["std", "u64_backend"], default-features = false } heapless = "0.7.7" subtle = "2.4.1" openssl = { version = "^0", features = [], default-features = false } diff --git a/zerotier-network-hypervisor/Cargo.toml b/zerotier-network-hypervisor/Cargo.toml index f610987e3..cdc0032ff 100644 --- a/zerotier-network-hypervisor/Cargo.toml +++ b/zerotier-network-hypervisor/Cargo.toml @@ -12,8 +12,8 @@ codegen-units = 1 panic = 'abort' [features] -default = ["zt_trace"] -zt_trace = [] +default = ["debug_events"] +debug_events = [] [dependencies] zerotier-core-crypto = { path = "../zerotier-core-crypto" } diff --git a/zerotier-network-hypervisor/src/event.rs b/zerotier-network-hypervisor/src/event.rs index a1c6873e9..9dd50217f 100644 --- a/zerotier-network-hypervisor/src/event.rs +++ b/zerotier-network-hypervisor/src/event.rs @@ -7,8 +7,8 @@ pub enum Event { // Change in node online status. Online(bool), - // Tracing: source file, line number, message (if enabled in build). - Trace(&'static str, u32, String), + // Debug event: source file, line number, message (only if feature 'debug_events' is enabled). + Debug(&'static str, u32, String), // An anomalous event has been encountered that could indicate a possible security problem. SecurityWarning(String), @@ -30,7 +30,7 @@ impl ToString for Event { fn to_string(&self) -> String { match self { Event::Online(online) => format!("[vl1] online == {}", online), - Event::Trace(l, f, m) => format!("[trace] {}:{} {}", l, f, m), + Event::Debug(l, f, m) => format!("[debug] {}:{} {}", l, f, m), Event::SecurityWarning(w) => format!("[global] security warning: {}", w), Event::FatalError(e) => format!("[global] FATAL: {}", e), Event::IdentityAutoGenerated(id) => format!("[vl1] identity auto-generated: {}", id.to_string()), diff --git a/zerotier-network-hypervisor/src/networkhypervisor.rs b/zerotier-network-hypervisor/src/networkhypervisor.rs index c9fbf6360..4758ce8a9 100644 --- a/zerotier-network-hypervisor/src/networkhypervisor.rs +++ b/zerotier-network-hypervisor/src/networkhypervisor.rs @@ -3,9 +3,12 @@ use std::time::Duration; use crate::error::InvalidParameterError; +use crate::util::buffer::Buffer; +use crate::util::marshalable::Marshalable; +use crate::vl1::node::*; use crate::vl1::protocol::PooledPacketBuffer; -use crate::vl1::{Address, Endpoint, Identity, Node, RootSet, SystemInterface}; -use crate::vl2::{Switch, SwitchInterface}; +use crate::vl1::*; +use crate::vl2::switch::*; pub trait Interface: SystemInterface + SwitchInterface {} @@ -37,6 +40,10 @@ impl NetworkHypervisor { &self.vl1.identity } + /// Run background tasks and return desired delay until next call in milliseconds. + /// + /// This shouldn't be called concurrently by more than one loop. Doing so would be harmless + /// but would be a waste of compute cycles. #[inline(always)] pub fn do_background_tasks(&self, ii: &I) -> Duration { self.vl1.do_background_tasks(ii) @@ -47,8 +54,38 @@ impl NetworkHypervisor { self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) } + /// Add or update a root set. + /// + /// If no root set exists by this name, a new root set is added. If one already + /// exists it's checked against the new one and updated if the new set is valid + /// and should supersede it. + /// + /// Changes will take effect within a few seconds when root sets are next + /// examined and synchronized with peer and root list state. + /// + /// This returns true if the new root set was accepted and false otherwise. #[inline(always)] pub fn add_update_root_set(&self, rs: RootSet) -> bool { self.vl1.add_update_root_set(rs) } + + /// Add or update the compiled-in default ZeroTier RootSet. + /// + /// This is equivalent to unmarshaling default-rootset/root.zerotier.com.bin and then + /// calling add_update_root_set(). + pub fn add_update_default_root_set(&self) -> bool { + let mut buf: Buffer<4096> = Buffer::new(); + buf.set_to(include_bytes!("../default-rootset/root.zerotier.com.bin")); + let mut cursor = 0; + self.add_update_root_set(RootSet::unmarshal(&buf, &mut cursor).unwrap()) + } + + /// Call add_update_default_root_set if there are no roots defined, otherwise do nothing and return false. + pub fn add_update_default_root_set_if_none(&self) -> bool { + if self.vl1.has_roots_defined() { + false + } else { + self.add_update_default_root_set() + } + } } diff --git a/zerotier-network-hypervisor/src/util/buffer.rs b/zerotier-network-hypervisor/src/util/buffer.rs index 306966363..3a84a6f94 100644 --- a/zerotier-network-hypervisor/src/util/buffer.rs +++ b/zerotier-network-hypervisor/src/util/buffer.rs @@ -5,7 +5,17 @@ use std::mem::{size_of, MaybeUninit}; use crate::util::pool::PoolFactory; -/// A safe bounds checked I/O buffer with extensions for convenient appending of RawObject types. +/// An I/O buffer with extensions for efficiently reading and writing various objects. +/// +/// WARNING: Structures can only be handled through raw read/write here if they are +/// tagged a Copy, meaning they are safe to just copy as raw memory. Care must also +/// be taken to ensure that access to them is safe on architectures that do not support +/// unaligned access. In vl1/protocol.rs this is accomplished by only using byte arrays +/// (including for integers) and accessing via things like u64::from_be_bytes() etc. +/// +/// Needless to say anything with non-Copy internal members or that depends on Drop to +/// not leak resources or other higher level semantics won't work here, but Rust should +/// not let you tag that as Copy in safe code. #[derive(Clone, Copy, PartialEq, Eq)] pub struct Buffer(usize, [u8; L]); @@ -25,28 +35,26 @@ impl Buffer { #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))] #[inline(always)] - fn read_obj_internal(&self, i: usize) -> T { - unsafe { *self.1.as_ptr().add(i).cast() } + unsafe fn read_obj_internal(&self, i: usize) -> T { + *self.1.as_ptr().add(i).cast() } #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))] #[inline(always)] - fn read_obj_internal(&self, i: usize) -> T { - unsafe { std::mem::transmute_copy(&*self.1.as_ptr().add(i).cast::()) } + unsafe fn read_obj_internal(&self, i: usize) -> T { + std::mem::transmute_copy(&*self.1.as_ptr().add(i).cast::()) } #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))] #[inline(always)] - fn write_obj_internal(&mut self, i: usize, o: T) { - unsafe { *self.1.as_mut_ptr().add(i).cast::() = o }; + unsafe fn write_obj_internal(&mut self, i: usize, o: T) { + *self.1.as_mut_ptr().add(i).cast::() = o; } #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))] #[inline(always)] - fn write_obj_internal(&mut self, i: usize, o: T) { - unsafe { - std::ptr::copy_nonoverlapping((&o as *const T).cast::(), self.1.as_mut_ptr().add(i), size_of::()); - } + unsafe fn write_obj_internal(&mut self, i: usize, o: T) { + std::ptr::copy_nonoverlapping((&o as *const T).cast::(), self.1.as_mut_ptr().add(i), size_of::()) } /// Create an empty zeroed buffer. @@ -94,12 +102,6 @@ impl Buffer { &mut self.1[0..self.0] } - /// Get a mutable reference to the entire buffer regardless of the current 'size'. - #[inline(always)] - pub unsafe fn entire_buffer_mut(&mut self) -> &mut [u8; L] { - &mut self.1 - } - #[inline(always)] pub fn as_ptr(&self) -> *const u8 { self.1.as_ptr() @@ -156,6 +158,12 @@ impl Buffer { } } + /// Get a mutable reference to the entire buffer regardless of the current 'size'. + #[inline(always)] + pub unsafe fn entire_buffer_mut(&mut self) -> &mut [u8; L] { + &mut self.1 + } + /// Set the size of the data in this buffer without checking bounds or zeroing new space. #[inline(always)] pub unsafe fn set_size_unchecked(&mut self, s: usize) { @@ -269,7 +277,7 @@ impl Buffer { let end = ptr + 2; if end <= L { self.0 = end; - self.write_obj_internal(ptr, i.to_be()); + unsafe { self.write_obj_internal(ptr, i.to_be()) }; Ok(()) } else { Err(overflow_err()) @@ -282,7 +290,7 @@ impl Buffer { let end = ptr + 4; if end <= L { self.0 = end; - self.write_obj_internal(ptr, i.to_be()); + unsafe { self.write_obj_internal(ptr, i.to_be()) }; Ok(()) } else { Err(overflow_err()) @@ -295,7 +303,7 @@ impl Buffer { let end = ptr + 8; if end <= L { self.0 = end; - self.write_obj_internal(ptr, i.to_be()); + unsafe { self.write_obj_internal(ptr, i.to_be()) }; Ok(()) } else { Err(overflow_err()) @@ -347,6 +355,39 @@ impl Buffer { } } + #[inline(always)] + pub fn u16_at(&self, ptr: usize) -> std::io::Result { + let end = ptr + 2; + debug_assert!(end <= L); + if end <= self.0 { + Ok(u64::from_be(unsafe { self.read_obj_internal(ptr) })) + } else { + Err(overflow_err()) + } + } + + #[inline(always)] + pub fn u32_at(&self, ptr: usize) -> std::io::Result { + let end = ptr + 4; + debug_assert!(end <= L); + if end <= self.0 { + Ok(u64::from_be(unsafe { self.read_obj_internal(ptr) })) + } else { + Err(overflow_err()) + } + } + + #[inline(always)] + pub fn u64_at(&self, ptr: usize) -> std::io::Result { + let end = ptr + 8; + debug_assert!(end <= L); + if end <= self.0 { + Ok(u64::from_be(unsafe { self.read_obj_internal(ptr) })) + } else { + Err(overflow_err()) + } + } + #[inline(always)] pub fn read_struct(&self, cursor: &mut usize) -> std::io::Result<&T> { let ptr = *cursor; @@ -420,7 +461,7 @@ impl Buffer { debug_assert!(end <= L); if end <= self.0 { *cursor = end; - Ok(u16::from_be(self.read_obj_internal(ptr))) + Ok(u16::from_be(unsafe { self.read_obj_internal(ptr) })) } else { Err(overflow_err()) } @@ -433,7 +474,7 @@ impl Buffer { debug_assert!(end <= L); if end <= self.0 { *cursor = end; - Ok(u32::from_be(self.read_obj_internal(ptr))) + Ok(u32::from_be(unsafe { self.read_obj_internal(ptr) })) } else { Err(overflow_err()) } @@ -446,7 +487,7 @@ impl Buffer { debug_assert!(end <= L); if end <= self.0 { *cursor = end; - Ok(u64::from_be(self.read_obj_internal(ptr))) + Ok(u64::from_be(unsafe { self.read_obj_internal(ptr) })) } else { Err(overflow_err()) } diff --git a/zerotier-network-hypervisor/src/util/gate.rs b/zerotier-network-hypervisor/src/util/gate.rs index 3cc09c424..2f8cc9864 100644 --- a/zerotier-network-hypervisor/src/util/gate.rs +++ b/zerotier-network-hypervisor/src/util/gate.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicI64, Ordering}; -/// Boolean rate limiter with normal (non-atomic, thread unsafe) semantics. +/// Boolean rate limiter with normal (non-atomic) semantics. #[repr(transparent)] pub struct IntervalGate(i64); @@ -30,9 +30,7 @@ impl IntervalGate { } } -unsafe impl Send for IntervalGate {} - -/// Boolean rate limiter with atomic (thread safe) semantics. +/// Boolean rate limiter with atomic semantics. #[repr(transparent)] pub struct AtomicIntervalGate(AtomicI64); @@ -60,7 +58,3 @@ impl AtomicIntervalGate { } } } - -unsafe impl Send for AtomicIntervalGate {} - -unsafe impl Sync for AtomicIntervalGate {} diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index 4e0584a71..afea5641f 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -10,19 +10,22 @@ pub use zerotier_core_crypto::varint; pub(crate) const ZEROES: [u8; 64] = [0_u8; 64]; -#[cfg(target_feature = "zt_trace")] -macro_rules! zt_trace { +#[cfg(target_feature = "debug_events")] +#[allow(unused_macros)] +macro_rules! debug_event { ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => { - $si.event(crate::Event::Trace(file!(), line!(), format!($fmt, $($($arg)*)?))); + $si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))); } } -#[cfg(not(target_feature = "zt_trace"))] -macro_rules! zt_trace { +#[cfg(not(target_feature = "debug_events"))] +#[allow(unused_macros)] +macro_rules! debug_event { ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {}; } -pub(crate) use zt_trace; +#[allow(unused_imports)] +pub(crate) use debug_event; /// Obtain a reference to a sub-array within an existing byte array. #[inline(always)] diff --git a/zerotier-network-hypervisor/src/vl1/identity.rs b/zerotier-network-hypervisor/src/vl1/identity.rs index 45785daa9..6b71cfdb1 100644 --- a/zerotier-network-hypervisor/src/vl1/identity.rs +++ b/zerotier-network-hypervisor/src/vl1/identity.rs @@ -5,7 +5,6 @@ use std::cmp::Ordering; use std::convert::TryInto; use std::hash::{Hash, Hasher}; use std::io::Write; -use std::mem::MaybeUninit; use std::ptr::{slice_from_raw_parts, slice_from_raw_parts_mut}; use std::str::FromStr; @@ -74,7 +73,7 @@ pub struct Identity { #[inline(always)] fn concat_arrays_2(a: &[u8; A], b: &[u8; B]) -> [u8; S] { assert_eq!(A + B, S); - let mut tmp: [u8; S] = unsafe { MaybeUninit::uninit().assume_init() }; + let mut tmp = [0_u8; S]; tmp[..A].copy_from_slice(a); tmp[A..].copy_from_slice(b); tmp @@ -83,7 +82,7 @@ fn concat_arrays_2(a: &[u8; A], #[inline(always)] fn concat_arrays_4(a: &[u8; A], b: &[u8; B], c: &[u8; C], d: &[u8; D]) -> [u8; S] { assert_eq!(A + B + C + D, S); - let mut tmp: [u8; S] = unsafe { MaybeUninit::uninit().assume_init() }; + let mut tmp = [0_u8; S]; tmp[..A].copy_from_slice(a); tmp[A..(A + B)].copy_from_slice(b); tmp[(A + B)..(A + B + C)].copy_from_slice(c); diff --git a/zerotier-network-hypervisor/src/vl1/mod.rs b/zerotier-network-hypervisor/src/vl1/mod.rs index df6251777..63c5a3ddc 100644 --- a/zerotier-network-hypervisor/src/vl1/mod.rs +++ b/zerotier-network-hypervisor/src/vl1/mod.rs @@ -23,7 +23,7 @@ pub use endpoint::Endpoint; pub use identity::*; pub use inetaddress::{InetAddress, IpScope}; pub use mac::MAC; -pub use node::{InnerProtocolInterface, Node, SystemInterface}; +pub use node::SystemInterface; pub use path::Path; pub use peer::Peer; pub use rootset::{Root, RootSet}; diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index 70f717c1b..c08b53347 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -10,8 +10,8 @@ use dashmap::DashMap; use parking_lot::{Mutex, RwLock}; use crate::error::InvalidParameterError; +use crate::util::debug_event; use crate::util::gate::IntervalGate; -use crate::util::zt_trace; use crate::vl1::path::Path; use crate::vl1::peer::Peer; use crate::vl1::protocol::*; @@ -195,32 +195,27 @@ impl Node { }) } - /// Get a packet buffer that will automatically check itself back into the pool on drop. #[inline(always)] pub fn get_packet_buffer(&self) -> PooledPacketBuffer { self.buffer_pool.get() } - /// Get a peer by address. pub fn peer(&self, a: Address) -> Option>> { self.peers.get(&a).map(|peer| peer.value().clone()) } - /// Run background tasks and return desired delay until next call in milliseconds. - /// - /// This shouldn't be called concurrently by more than one loop. Doing so would be harmless - /// but would be a waste of compute cycles. pub fn do_background_tasks(&self, si: &SI) -> Duration { let mut intervals = self.intervals.lock(); let tt = si.time_ticks(); + assert!(ROOT_SYNC_INTERVAL_MS <= (ROOT_HELLO_INTERVAL / 2)); if intervals.root_sync.gate(tt) { match &mut (*self.roots.lock()) { RootInfo { roots, sets, sets_modified } => { - // Sychronize root info with root sets info if the latter has changed. + // Update internal data structures if the root set configuration has changed. if *sets_modified { *sets_modified = false; - zt_trace!(si, "root sets modified, rescanning..."); + debug_event!(si, "root sets modified, synchronizing internal data structures"); let mut old_root_identities: Vec = roots.drain().map(|r| r.0.identity.clone()).collect(); let mut new_root_identities = Vec::new(); @@ -279,11 +274,10 @@ impl Node { // they have, which is a behavior that differs from normal peers. This allows roots to // e.g. see our IPv4 and our IPv6 address which can be important for us to learn our // external addresses from them. - assert!(ROOT_SYNC_INTERVAL_MS <= (ROOT_HELLO_INTERVAL / 2)); if intervals.root_hello.gate(tt) { for (root, endpoints) in roots.iter() { for ep in endpoints.iter() { - zt_trace!(si, "sending HELLO to root {}", root.identity.address.to_string()); + debug_event!(si, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL); root.send_hello(si, self, Some(ep)); } } @@ -300,6 +294,7 @@ impl Node { let _ = best.insert(r); } } + debug_event!(si, "new best root: {}", best.clone().map_or("none".into(), |p| p.identity.address.to_string())); *(self.best_root.write()) = best.cloned(); } } @@ -338,23 +333,31 @@ impl Node { Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2) } - /// Called when a packet is received on the physical wire. pub fn handle_incoming_physical_packet(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) { - zt_trace!(si, "<< incoming packet from {} length {} via socket {}@{}", source_endpoint.to_string(), data.len(), source_local_socket.to_string(), source_local_interface.to_string()); + debug_event!( + si, + "<< #{} ->{} from {} length {} via socket {}@{}", + data.bytes_fixed_at::<8>(0).map_or("????????????????".into(), |pid| zerotier_core_crypto::hex::to_string(pid)), + data.bytes_fixed_at::<5>(8).map_or("??????????".into(), |dest| zerotier_core_crypto::hex::to_string(dest)), + source_endpoint.to_string(), + data.len(), + source_local_socket.to_string(), + source_local_interface.to_string() + ); if let Ok(fragment_header) = data.struct_mut_at::(0) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { let time_ticks = si.time_ticks(); if dest == self.identity.address { - // Handle packets addressed to this node. - let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks); path.log_receive_anything(time_ticks); if fragment_header.is_fragment() { - if let Some(assembled_packet) = path.receive_fragment(u64::from_ne_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) { + debug_event!(si, "-- #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments()); + + if let Some(assembled_packet) = path.receive_fragment(fragment_header.packet_id(), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) { if let Some(frag0) = assembled_packet.frags[0].as_ref() { - zt_trace!(si, "fragmented packet fully assembled!"); + debug_event!(si, "-- #{:0>16x} packet fully assembled!", u64::from_be_bytes(fragment_header.id)); let packet_header = frag0.struct_at::(0); if packet_header.is_ok() { @@ -371,7 +374,7 @@ impl Node { } } else { if let Ok(packet_header) = data.struct_at::(0) { - zt_trace!(si, "parsing unfragmented packet"); + debug_event!(si, "-- #{:0>16x} is unfragmented", u64::from_be_bytes(fragment_header.id)); if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { @@ -383,16 +386,17 @@ impl Node { } } } else { - // Forward packets not destined for this node. - // TODO: SHOULD we forward? Need a way to check. - if fragment_header.is_fragment() { + debug_event!(si, "-- #{:0>16x} forwarding packet fragment to {}", u64::from_be_bytes(fragment_header.id), dest.to_string()); if fragment_header.increment_hops() > FORWARD_MAX_HOPS { + debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(fragment_header.id)); return; } } else { if let Ok(packet_header) = data.struct_mut_at::(0) { + debug_event!(si, "-- #{:0>16x} forwarding packet to {}", u64::from_be_bytes(fragment_header.id), dest.to_string()); if packet_header.increment_hops() > FORWARD_MAX_HOPS { + debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(fragment_header.id)); return; } } else { @@ -401,33 +405,23 @@ impl Node { } if let Some(peer) = self.peer(dest) { + // TODO: SHOULD we forward? Need a way to check. peer.forward(si, time_ticks, data.as_ref()); + debug_event!(si, "-- #{:0>16x} forwarded successfully", u64::from_be_bytes(fragment_header.id)); } } } } } - /// Get the current best root peer that we should use for WHOIS, relaying, etc. pub fn root(&self) -> Option>> { self.best_root.read().clone() } - /// Return true if a peer is a root. pub fn is_peer_root(&self, peer: &Peer) -> bool { self.roots.lock().roots.contains_key(peer) } - /// Add or update a root set. - /// - /// If no root set exists by this name, a new root set is added. If one already - /// exists it's checked against the new one and updated if the new set is valid - /// and should supersede it. - /// - /// Changes will take effect within a few seconds when root sets are next - /// examined and synchronized with peer and root list state. - /// - /// This returns true if the new root set was accepted and false otherwise. pub fn add_update_root_set(&self, rs: RootSet) -> bool { let mut roots = self.roots.lock(); if let Some(entry) = roots.sets.get_mut(&rs.name) { @@ -444,10 +438,14 @@ impl Node { return false; } - /// Get the canonical Path object for a given endpoint and local socket information. - /// - /// This is a canonicalizing function that returns a unique path object for every tuple - /// of endpoint, local socket, and local interface. + pub fn has_roots_defined(&self) -> bool { + self.roots.lock().sets.iter().any(|rs| !rs.1.members.is_empty()) + } + + pub fn root_sets(&self) -> Vec { + self.roots.lock().sets.values().cloned().collect() + } + pub fn canonical_path(&self, ep: &Endpoint, local_socket: &SI::LocalSocket, local_interface: &SI::LocalInterface, time_ticks: i64) -> Arc> { // It's faster to do a read only lookup first since most of the time this will succeed. The second // version below this only gets invoked if it's a new path. @@ -460,7 +458,7 @@ impl Node { return self .paths .entry(ep.clone()) - .or_insert_with(|| parking_lot::RwLock::new(HashMap::with_capacity(2))) + .or_insert_with(|| parking_lot::RwLock::new(HashMap::with_capacity(4))) .value_mut() .write() .entry(local_socket.clone()) diff --git a/zerotier-network-hypervisor/src/vl1/path.rs b/zerotier-network-hypervisor/src/vl1/path.rs index 096574128..70fc78761 100644 --- a/zerotier-network-hypervisor/src/vl1/path.rs +++ b/zerotier-network-hypervisor/src/vl1/path.rs @@ -25,7 +25,7 @@ pub struct Path { last_send_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64, create_time_ticks: i64, - fragmented_packets: Mutex>, + fragmented_packets: Mutex>, } impl Path { @@ -43,7 +43,7 @@ impl Path { /// Receive a fragment and return a FragmentedPacket if the entire packet was assembled. /// This returns None if more fragments are needed to assemble the packet. - pub(crate) fn receive_fragment(&self, packet_id: u64, fragment_no: u8, fragment_expecting_count: u8, packet: PooledPacketBuffer, time_ticks: i64) -> Option { + pub(crate) fn receive_fragment(&self, packet_id: PacketId, fragment_no: u8, fragment_expecting_count: u8, packet: PooledPacketBuffer, time_ticks: i64) -> Option { let mut fp = self.fragmented_packets.lock(); // Discard some old waiting packets if the total incoming fragments for a path exceeds a diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index 8728f5647..9c2fd605d 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -79,7 +79,7 @@ fn salsa_poly_create(secret: &SymmetricSecret, header: &PacketHeader, packet_siz } /// Attempt AEAD packet encryption and MAC validation. Returns message ID on success. -fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option], payload: &mut PacketBuffer) -> Option { +fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option], payload: &mut PacketBuffer) -> Option { packet_frag0_payload_bytes.get(0).map_or(None, |verb| { match header.cipher() { security_constants::CIPHER_NOCRYPT_POLY1305 => { @@ -173,6 +173,9 @@ impl Peer { * still want to avoid it. If the clock is at least marginally correct this will mean that message IDs * will remain unique for over a hundred years. Message IDs are kept secret as well because they are * encrypted along with a GMAC code to form an opaque 128-bit packet tag. + * + * Keep in mind that we re-key (when talking to new nodes) so not only are duplicate message IDs not + * particularly dangerous in SIV but they'd have to occur while using the same key. */ Self { identity: id, @@ -192,8 +195,7 @@ impl Peer { /// Get the next message ID for sending a message to this peer. #[inline(always)] - pub(crate) fn next_message_id(&self) -> u64 { - // SECURITY NOTE: uses the strictest memory ordering to avoid duplicate IDs on loose architectures like ARM64. + pub(crate) fn next_message_id(&self) -> MessageId { self.message_id_counter.fetch_add(1, Ordering::SeqCst) } diff --git a/zerotier-network-hypervisor/src/vl1/protocol.rs b/zerotier-network-hypervisor/src/vl1/protocol.rs index 9a7431b47..0f37c840f 100644 --- a/zerotier-network-hypervisor/src/vl1/protocol.rs +++ b/zerotier-network-hypervisor/src/vl1/protocol.rs @@ -58,6 +58,12 @@ pub type PooledPacketBuffer = crate::util::pool::Pooled; +/// 64-bit packet (outer) ID. +pub type PacketId = u64; + +/// 64-bit message ID (obtained after AEAD decryption). +pub type MessageId = u64; + pub mod verbs { pub const VL1_NOP: u8 = 0x00; pub const VL1_HELLO: u8 = 0x01; @@ -289,6 +295,11 @@ pub struct PacketHeader { } impl PacketHeader { + #[inline(always)] + pub fn packet_id(&self) -> PacketId { + u64::from_ne_bytes(self.id) + } + #[inline(always)] pub fn cipher(&self) -> u8 { self.flags_cipher_hops & packet_constants::FLAGS_FIELD_MASK_CIPHER @@ -352,6 +363,11 @@ pub struct FragmentHeader { } impl FragmentHeader { + #[inline(always)] + pub fn packet_id(&self) -> PacketId { + u64::from_ne_bytes(self.id) + } + #[inline(always)] pub fn is_fragment(&self) -> bool { self.fragment_indicator == packet_constants::FRAGMENT_INDICATOR diff --git a/zerotier-network-hypervisor/src/vl2/mod.rs b/zerotier-network-hypervisor/src/vl2/mod.rs index 6a4f1f248..c858d64da 100644 --- a/zerotier-network-hypervisor/src/vl2/mod.rs +++ b/zerotier-network-hypervisor/src/vl2/mod.rs @@ -2,8 +2,9 @@ mod multicastgroup; mod networkid; -mod switch; + +pub(crate) mod switch; pub use multicastgroup::MulticastGroup; pub use networkid::NetworkId; -pub use switch::{Switch, SwitchInterface}; +pub use switch::SwitchInterface; diff --git a/zerotier-system-service/src/cli/rootset.rs b/zerotier-system-service/src/cli/rootset.rs index 8d0af9d11..7da5281c4 100644 --- a/zerotier-system-service/src/cli/rootset.rs +++ b/zerotier-system-service/src/cli/rootset.rs @@ -11,9 +11,9 @@ use zerotier_network_hypervisor::vl1::RootSet; pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 { match cmd_args.subcommand() { - Some(("trust", sc_args)) => todo!(), + Some(("add", sc_args)) => todo!(), - Some(("untrust", sc_args)) => todo!(), + Some(("remove", sc_args)) => todo!(), Some(("list", _)) => todo!(), @@ -106,7 +106,7 @@ pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 { } } - Some(("default", _)) => { + Some(("restoredefault", _)) => { let _ = std::io::stdout().write_all(crate::utils::to_json_pretty(&RootSet::zerotier_default()).as_bytes()); } diff --git a/zerotier-system-service/src/localconfig.rs b/zerotier-system-service/src/localconfig.rs index 7a09ec0f9..34e8b6e6d 100644 --- a/zerotier-system-service/src/localconfig.rs +++ b/zerotier-system-service/src/localconfig.rs @@ -4,7 +4,7 @@ use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; -use zerotier_network_hypervisor::vl1::{Address, InetAddress}; +use zerotier_network_hypervisor::vl1::{Address, Endpoint, InetAddress}; use zerotier_network_hypervisor::vl2::NetworkId; /// A list of unassigned or obsolete ports under 1024 that could possibly be squatted. @@ -34,12 +34,12 @@ impl Default for PhysicalPathSettings { #[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] -pub struct VirtualNetworkSettings { +pub struct VirtualPathSettings { #[serde(rename = "try")] - pub try_: Vec, + pub try_: Vec, } -impl Default for VirtualNetworkSettings { +impl Default for VirtualPathSettings { fn default() -> Self { Self { try_: Vec::new() } } @@ -132,9 +132,9 @@ impl GlobalSettings { #[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] pub struct Config { - pub physical: BTreeMap, + pub physical: BTreeMap, #[serde(rename = "virtual")] - pub virtual_: BTreeMap, + pub virtual_: BTreeMap, pub network: BTreeMap, pub settings: GlobalSettings, } diff --git a/zerotier-system-service/src/localinterface.rs b/zerotier-system-service/src/localinterface.rs index fedd0d23a..249e56521 100644 --- a/zerotier-system-service/src/localinterface.rs +++ b/zerotier-system-service/src/localinterface.rs @@ -20,17 +20,17 @@ impl LocalInterface { let l = nb.len(); assert!(l <= 16); // do any *nix OSes have device names longer than 16 bytes? tmp[..l].copy_from_slice(&nb[..l]); - Self(u128::from_le_bytes(tmp)) + Self(u128::from_be_bytes(tmp)) } } impl ToString for LocalInterface { #[cfg(unix)] fn to_string(&self) -> String { - let b = self.0.to_le_bytes(); + let b = self.0.to_be_bytes(); let mut l = 0; - for _ in 0..16 { - if b[l] > 0 { + for bb in b.iter() { + if *bb > 0 { l += 1; } else { break; @@ -38,4 +38,9 @@ impl ToString for LocalInterface { } String::from_utf8_lossy(&b[..l]).to_string() } + + #[cfg(windows)] + fn to_string(&self) -> String { + zerotier_core_crypto::hex::to_string(&self.0.to_be_bytes()) + } } diff --git a/zerotier-system-service/src/main.rs b/zerotier-system-service/src/main.rs index bd4626de5..bf0a9065e 100644 --- a/zerotier-system-service/src/main.rs +++ b/zerotier-system-service/src/main.rs @@ -79,13 +79,13 @@ Advanced Operations: verify <@file> Verify a signature rootset [args] -· trust <@root set> Add or update a root set -· untrust Stop using a root set +· add <@root set> Add or update a root set +· remove Stop using a root set · list List root sets in use sign Sign a root set with an identity verify Load and verify a root set marshal Dump root set as binary to stdout - default Dump the default root set as JSON + restoredefault (Re-)add built-in default root set service Start local service (usually not invoked manually) @@ -196,13 +196,13 @@ fn main() { ) .subcommand( Command::new("rootset") - .subcommand(Command::new("trust").arg(Arg::new("path").index(1).required(true))) - .subcommand(Command::new("untrust").arg(Arg::new("name").index(1).required(true))) + .subcommand(Command::new("add").arg(Arg::new("path").index(1).required(true))) + .subcommand(Command::new("remove").arg(Arg::new("name").index(1).required(true))) .subcommand(Command::new("list")) .subcommand(Command::new("sign").arg(Arg::new("path").index(1).required(true)).arg(Arg::new("secret").index(2).required(true))) .subcommand(Command::new("verify").arg(Arg::new("path").index(1).required(true))) .subcommand(Command::new("marshal").arg(Arg::new("path").index(1).required(true))) - .subcommand(Command::new("default")), + .subcommand(Command::new("restoredefault")), ) .override_help(help.as_str()) .override_usage("") diff --git a/zerotier-system-service/src/service.rs b/zerotier-system-service/src/service.rs index 030976753..77ffd156c 100644 --- a/zerotier-system-service/src/service.rs +++ b/zerotier-system-service/src/service.rs @@ -22,7 +22,7 @@ use crate::utils::{ms_monotonic, ms_since_epoch}; const UDP_UPDATE_BINDINGS_INTERVAL_MS: Duration = Duration::from_millis(2500); -/// ZeroTier system service, which presents virtual networks as VPN connections. +/// ZeroTier system service, which presents virtual networks as VPN connections on Windows/macOS/Linux/BSD/etc. pub struct Service { udp_binding_task: tokio::task::JoinHandle<()>, core_background_service_task: tokio::task::JoinHandle<()>, @@ -45,10 +45,6 @@ impl Drop for Service { self.udp_binding_task.abort(); self.core_background_service_task.abort(); - // Wait for all tasks to actually stop. - let _ = self.udp_binding_task.await; - let _ = self.core_background_service_task.await; - // Drop all bound sockets since these can hold circular Arc<> references to 'internal'. self.internal.udp_sockets.write().await.clear(); }); @@ -68,10 +64,11 @@ impl Service { let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity)?); let si = Arc::new(si); - let (si1, si2) = (si.clone(), si.clone()); + si._core.as_ref().unwrap().add_update_default_root_set_if_none(); + Ok(Self { - udp_binding_task: si.rt.spawn(si1.udp_binding_task_main()), - core_background_service_task: si.rt.spawn(si2.core_background_service_task_main()), + udp_binding_task: si.rt.spawn(si.clone().udp_binding_task_main()), + core_background_service_task: si.rt.spawn(si.clone().core_background_service_task_main()), internal: si, }) } @@ -80,11 +77,11 @@ impl Service { impl ServiceImpl { #[inline(always)] fn core(&self) -> &NetworkHypervisor { - debug_assert!(self._core.is_some()); - unsafe { self._core.as_ref().unwrap_unchecked() } + self._core.as_ref().unwrap() } - async fn update_bindings_for_port(self: &Arc, port: u16, interface_prefix_blacklist: &Vec, cidr_blacklist: &Vec) -> Option> { + /// Called in udp_binding_task_main() to service a particular UDP port. + async fn update_udp_bindings_for_port(self: &Arc, port: u16, interface_prefix_blacklist: &Vec, cidr_blacklist: &Vec) -> Option> { let mut udp_sockets = self.udp_sockets.write().await; let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port)); let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist); @@ -94,17 +91,20 @@ impl ServiceImpl { drop(udp_sockets); // release lock for ns in new_sockets.iter() { - // We start a task for each CPU core. Tokio multiplexes but since each packet takes a bit of CPU - // to parse, decrypt, etc. we want to be able to saturate the CPU for any given socket to virtual - // network path. The alternative would be to use MPMC channels but that would almost certainly be - // a lot slower as it would involve more sync/atomic bottlenecks and probably extra malloc/free. - let mut kill_on_drop = ns.kill_on_drop.lock(); + /* + * Start a task (not actual thread) for each CPU core. + * + * The async runtime is itself multithreaded but each packet takes a little bit of CPU to handle. + * This makes sure that when one packet is in processing the async runtime is immediately able to + * cue up another receiver for this socket. + */ + let mut socket_associated_tasks = ns.socket_associated_tasks.lock(); for _ in 0..self.num_listeners_per_socket { let self2 = self.clone(); let socket = ns.socket.clone(); let interface = ns.interface.clone(); let local_socket = LocalSocket(Arc::downgrade(ns), self.local_socket_unique_id_counter.fetch_add(1, Ordering::SeqCst)); - kill_on_drop.push(self.rt.spawn(async move { + socket_associated_tasks.push(self.rt.spawn(async move { let core = self2.core(); loop { let mut buf = core.get_packet_buffer(); @@ -122,11 +122,12 @@ impl ServiceImpl { return None; } + /// Background task to update per-interface/per-port bindings if system interface configuration changes. async fn udp_binding_task_main(self: Arc) { loop { let config = self.data.config().await; - if let Some(errors) = self.update_bindings_for_port(config.settings.primary_port, &config.settings.interface_prefix_blacklist, &config.settings.cidr_blacklist).await { + if let Some(errors) = self.update_udp_bindings_for_port(config.settings.primary_port, &config.settings.interface_prefix_blacklist, &config.settings.cidr_blacklist).await { for e in errors.iter() { println!("BIND ERROR: {} {} {}", e.0.to_string(), e.1.to_string(), e.2.to_string()); } @@ -137,6 +138,7 @@ impl ServiceImpl { } } + /// Periodically calls do_background_tasks() in the ZeroTier core. async fn core_background_service_task_main(self: Arc) { tokio::time::sleep(Duration::from_secs(1)).await; loop { @@ -149,16 +151,16 @@ impl SystemInterface for ServiceImpl { type LocalSocket = crate::service::LocalSocket; type LocalInterface = crate::localinterface::LocalInterface; - fn event_node_is_up(&self) {} + fn event(&self, event: Event) { + println!("{}", event.to_string()); + match event { + _ => {} + } + } - fn event_node_is_down(&self) {} - - fn event_online_status_change(&self, online: bool) {} - - fn event_user_message(&self, source: &Identity, message_type: u64, message: &[u8]) {} - - fn event_security_warning(&self, warning: &str) {} + fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {} + #[inline(always)] fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool { socket.0.strong_count() > 0 } @@ -223,12 +225,26 @@ impl SystemInterface for ServiceImpl { return false; } - fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool { - true + fn check_path(&self, _id: &Identity, endpoint: &Endpoint, _local_socket: Option<&Self::LocalSocket>, _local_interface: Option<&Self::LocalInterface>) -> bool { + self.rt.block_on(async { + let config = self.data.config().await; + if let Some(pps) = config.physical.get(endpoint) { + !pps.blacklist + } else { + true + } + }) } fn get_path_hints(&self, id: &Identity) -> Option, Option)>> { - None + self.rt.block_on(async { + let config = self.data.config().await; + if let Some(vns) = config.virtual_.get(&id.address) { + Some(vns.try_.iter().map(|ep| (ep.clone(), None, None)).collect()) + } else { + None + } + }) } #[inline(always)] diff --git a/zerotier-system-service/src/udp.rs b/zerotier-system-service/src/udp.rs index d766c6bf2..587eb99cc 100644 --- a/zerotier-system-service/src/udp.rs +++ b/zerotier-system-service/src/udp.rs @@ -43,14 +43,14 @@ pub struct BoundUdpSocket { pub interface: LocalInterface, /// Add tasks here that should be aborted when this socket is closed. - pub kill_on_drop: parking_lot::Mutex>>, + pub socket_associated_tasks: parking_lot::Mutex>>, fd: RawFd, } impl Drop for BoundUdpSocket { fn drop(&mut self) { - for t in self.kill_on_drop.lock().drain(..) { + for t in self.socket_associated_tasks.lock().drain(..) { t.abort(); } } @@ -64,7 +64,7 @@ impl BoundUdpSocket { unsafe { libc::setsockopt(self.fd.as_(), libc::IPPROTO_IP.as_(), libc::IP_TOS.as_(), (&ttl as *const c_int).cast(), std::mem::size_of::().as_()) }; } - #[cfg(any(target_os = "macos", target_os = "freebsd"))] + #[cfg(any(target_os = "macos"))] pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool { let mut ok = false; if dest.family() == self.address.family() { @@ -217,7 +217,7 @@ impl BoundUdpPort { let s = Arc::new(BoundUdpSocket { address: addr_with_port, socket: Arc::new(s.unwrap()), - kill_on_drop: parking_lot::Mutex::new(Vec::new()), + socket_associated_tasks: parking_lot::Mutex::new(Vec::new()), interface: interface.clone(), fd, }); diff --git a/zerotier-system-service/src/utils.rs b/zerotier-system-service/src/utils.rs index a7bb60b62..b63794f04 100644 --- a/zerotier-system-service/src/utils.rs +++ b/zerotier-system-service/src/utils.rs @@ -24,11 +24,13 @@ lazy_static! { } /// Get milliseconds since unix epoch. +#[inline(always)] pub fn ms_since_epoch() -> i64 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64 } /// Get milliseconds since an arbitrary time in the past, guaranteed to monotonically increase. +#[inline(always)] pub fn ms_monotonic() -> i64 { Instant::now().duration_since(*STARTUP_INSTANT).as_millis() as i64 }