Wire through RootSet init, a bunch more work, almost ready to test!

This commit is contained in:
Adam Ierymenko 2022-06-08 19:05:54 -04:00
parent ded7c25786
commit c3ce40b5ba
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
21 changed files with 261 additions and 147 deletions

View file

@ -14,8 +14,8 @@ panic = 'abort'
[dependencies] [dependencies]
rand_core = "0.5.0" rand_core = "0.5.0"
aes-gmac-siv = { path = "../aes-gmac-siv" } aes-gmac-siv = { path = "../aes-gmac-siv" }
x25519-dalek = { version = "1.2.0", features = ["u64_backend"] } x25519-dalek = { version = "1.2.0", features = ["std", "u64_backend"], default-features = false }
ed25519-dalek = { version = "1.0.1", features = ["u64_backend"] } ed25519-dalek = { version = "1.0.1", features = ["std", "u64_backend"], default-features = false }
heapless = "0.7.7" heapless = "0.7.7"
subtle = "2.4.1" subtle = "2.4.1"
openssl = { version = "^0", features = [], default-features = false } openssl = { version = "^0", features = [], default-features = false }

View file

@ -12,8 +12,8 @@ codegen-units = 1
panic = 'abort' panic = 'abort'
[features] [features]
default = ["zt_trace"] default = ["debug_events"]
zt_trace = [] debug_events = []
[dependencies] [dependencies]
zerotier-core-crypto = { path = "../zerotier-core-crypto" } zerotier-core-crypto = { path = "../zerotier-core-crypto" }

View file

@ -7,8 +7,8 @@ pub enum Event {
// Change in node online status. // Change in node online status.
Online(bool), Online(bool),
// Tracing: source file, line number, message (if enabled in build). // Debug event: source file, line number, message (only if feature 'debug_events' is enabled).
Trace(&'static str, u32, String), Debug(&'static str, u32, String),
// An anomalous event has been encountered that could indicate a possible security problem. // An anomalous event has been encountered that could indicate a possible security problem.
SecurityWarning(String), SecurityWarning(String),
@ -30,7 +30,7 @@ impl ToString for Event {
fn to_string(&self) -> String { fn to_string(&self) -> String {
match self { match self {
Event::Online(online) => format!("[vl1] online == {}", online), Event::Online(online) => format!("[vl1] online == {}", online),
Event::Trace(l, f, m) => format!("[trace] {}:{} {}", l, f, m), Event::Debug(l, f, m) => format!("[debug] {}:{} {}", l, f, m),
Event::SecurityWarning(w) => format!("[global] security warning: {}", w), Event::SecurityWarning(w) => format!("[global] security warning: {}", w),
Event::FatalError(e) => format!("[global] FATAL: {}", e), Event::FatalError(e) => format!("[global] FATAL: {}", e),
Event::IdentityAutoGenerated(id) => format!("[vl1] identity auto-generated: {}", id.to_string()), Event::IdentityAutoGenerated(id) => format!("[vl1] identity auto-generated: {}", id.to_string()),

View file

@ -3,9 +3,12 @@
use std::time::Duration; use std::time::Duration;
use crate::error::InvalidParameterError; use crate::error::InvalidParameterError;
use crate::util::buffer::Buffer;
use crate::util::marshalable::Marshalable;
use crate::vl1::node::*;
use crate::vl1::protocol::PooledPacketBuffer; use crate::vl1::protocol::PooledPacketBuffer;
use crate::vl1::{Address, Endpoint, Identity, Node, RootSet, SystemInterface}; use crate::vl1::*;
use crate::vl2::{Switch, SwitchInterface}; use crate::vl2::switch::*;
pub trait Interface: SystemInterface + SwitchInterface {} pub trait Interface: SystemInterface + SwitchInterface {}
@ -37,6 +40,10 @@ impl<I: Interface> NetworkHypervisor<I> {
&self.vl1.identity &self.vl1.identity
} }
/// Run background tasks and return desired delay until next call in milliseconds.
///
/// This shouldn't be called concurrently by more than one loop. Doing so would be harmless
/// but would be a waste of compute cycles.
#[inline(always)] #[inline(always)]
pub fn do_background_tasks(&self, ii: &I) -> Duration { pub fn do_background_tasks(&self, ii: &I) -> Duration {
self.vl1.do_background_tasks(ii) self.vl1.do_background_tasks(ii)
@ -47,8 +54,38 @@ impl<I: Interface> NetworkHypervisor<I> {
self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) self.vl1.handle_incoming_physical_packet(ii, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data)
} }
/// Add or update a root set.
///
/// If no root set exists by this name, a new root set is added. If one already
/// exists it's checked against the new one and updated if the new set is valid
/// and should supersede it.
///
/// Changes will take effect within a few seconds when root sets are next
/// examined and synchronized with peer and root list state.
///
/// This returns true if the new root set was accepted and false otherwise.
#[inline(always)] #[inline(always)]
pub fn add_update_root_set(&self, rs: RootSet) -> bool { pub fn add_update_root_set(&self, rs: RootSet) -> bool {
self.vl1.add_update_root_set(rs) self.vl1.add_update_root_set(rs)
} }
/// Add or update the compiled-in default ZeroTier RootSet.
///
/// This is equivalent to unmarshaling default-rootset/root.zerotier.com.bin and then
/// calling add_update_root_set().
pub fn add_update_default_root_set(&self) -> bool {
let mut buf: Buffer<4096> = Buffer::new();
buf.set_to(include_bytes!("../default-rootset/root.zerotier.com.bin"));
let mut cursor = 0;
self.add_update_root_set(RootSet::unmarshal(&buf, &mut cursor).unwrap())
}
/// Call add_update_default_root_set if there are no roots defined, otherwise do nothing and return false.
pub fn add_update_default_root_set_if_none(&self) -> bool {
if self.vl1.has_roots_defined() {
false
} else {
self.add_update_default_root_set()
}
}
} }

View file

@ -5,7 +5,17 @@ use std::mem::{size_of, MaybeUninit};
use crate::util::pool::PoolFactory; use crate::util::pool::PoolFactory;
/// A safe bounds checked I/O buffer with extensions for convenient appending of RawObject types. /// An I/O buffer with extensions for efficiently reading and writing various objects.
///
/// WARNING: Structures can only be handled through raw read/write here if they are
/// tagged a Copy, meaning they are safe to just copy as raw memory. Care must also
/// be taken to ensure that access to them is safe on architectures that do not support
/// unaligned access. In vl1/protocol.rs this is accomplished by only using byte arrays
/// (including for integers) and accessing via things like u64::from_be_bytes() etc.
///
/// Needless to say anything with non-Copy internal members or that depends on Drop to
/// not leak resources or other higher level semantics won't work here, but Rust should
/// not let you tag that as Copy in safe code.
#[derive(Clone, Copy, PartialEq, Eq)] #[derive(Clone, Copy, PartialEq, Eq)]
pub struct Buffer<const L: usize>(usize, [u8; L]); pub struct Buffer<const L: usize>(usize, [u8; L]);
@ -25,28 +35,26 @@ impl<const L: usize> Buffer<L> {
#[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))] #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))]
#[inline(always)] #[inline(always)]
fn read_obj_internal<T: Sized + Copy>(&self, i: usize) -> T { unsafe fn read_obj_internal<T: Sized + Copy>(&self, i: usize) -> T {
unsafe { *self.1.as_ptr().add(i).cast() } *self.1.as_ptr().add(i).cast()
} }
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))] #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))]
#[inline(always)] #[inline(always)]
fn read_obj_internal<T: Sized + Copy>(&self, i: usize) -> T { unsafe fn read_obj_internal<T: Sized + Copy>(&self, i: usize) -> T {
unsafe { std::mem::transmute_copy(&*self.1.as_ptr().add(i).cast::<T>()) } std::mem::transmute_copy(&*self.1.as_ptr().add(i).cast::<T>())
} }
#[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))] #[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))]
#[inline(always)] #[inline(always)]
fn write_obj_internal<T: Sized + Copy>(&mut self, i: usize, o: T) { unsafe fn write_obj_internal<T: Sized + Copy>(&mut self, i: usize, o: T) {
unsafe { *self.1.as_mut_ptr().add(i).cast::<T>() = o }; *self.1.as_mut_ptr().add(i).cast::<T>() = o;
} }
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))] #[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))]
#[inline(always)] #[inline(always)]
fn write_obj_internal<T: Sized + Copy>(&mut self, i: usize, o: T) { unsafe fn write_obj_internal<T: Sized + Copy>(&mut self, i: usize, o: T) {
unsafe { std::ptr::copy_nonoverlapping((&o as *const T).cast::<u8>(), self.1.as_mut_ptr().add(i), size_of::<T>())
std::ptr::copy_nonoverlapping((&o as *const T).cast::<u8>(), self.1.as_mut_ptr().add(i), size_of::<T>());
}
} }
/// Create an empty zeroed buffer. /// Create an empty zeroed buffer.
@ -94,12 +102,6 @@ impl<const L: usize> Buffer<L> {
&mut self.1[0..self.0] &mut self.1[0..self.0]
} }
/// Get a mutable reference to the entire buffer regardless of the current 'size'.
#[inline(always)]
pub unsafe fn entire_buffer_mut(&mut self) -> &mut [u8; L] {
&mut self.1
}
#[inline(always)] #[inline(always)]
pub fn as_ptr(&self) -> *const u8 { pub fn as_ptr(&self) -> *const u8 {
self.1.as_ptr() self.1.as_ptr()
@ -156,6 +158,12 @@ impl<const L: usize> Buffer<L> {
} }
} }
/// Get a mutable reference to the entire buffer regardless of the current 'size'.
#[inline(always)]
pub unsafe fn entire_buffer_mut(&mut self) -> &mut [u8; L] {
&mut self.1
}
/// Set the size of the data in this buffer without checking bounds or zeroing new space. /// Set the size of the data in this buffer without checking bounds or zeroing new space.
#[inline(always)] #[inline(always)]
pub unsafe fn set_size_unchecked(&mut self, s: usize) { pub unsafe fn set_size_unchecked(&mut self, s: usize) {
@ -269,7 +277,7 @@ impl<const L: usize> Buffer<L> {
let end = ptr + 2; let end = ptr + 2;
if end <= L { if end <= L {
self.0 = end; self.0 = end;
self.write_obj_internal(ptr, i.to_be()); unsafe { self.write_obj_internal(ptr, i.to_be()) };
Ok(()) Ok(())
} else { } else {
Err(overflow_err()) Err(overflow_err())
@ -282,7 +290,7 @@ impl<const L: usize> Buffer<L> {
let end = ptr + 4; let end = ptr + 4;
if end <= L { if end <= L {
self.0 = end; self.0 = end;
self.write_obj_internal(ptr, i.to_be()); unsafe { self.write_obj_internal(ptr, i.to_be()) };
Ok(()) Ok(())
} else { } else {
Err(overflow_err()) Err(overflow_err())
@ -295,7 +303,7 @@ impl<const L: usize> Buffer<L> {
let end = ptr + 8; let end = ptr + 8;
if end <= L { if end <= L {
self.0 = end; self.0 = end;
self.write_obj_internal(ptr, i.to_be()); unsafe { self.write_obj_internal(ptr, i.to_be()) };
Ok(()) Ok(())
} else { } else {
Err(overflow_err()) Err(overflow_err())
@ -347,6 +355,39 @@ impl<const L: usize> Buffer<L> {
} }
} }
#[inline(always)]
pub fn u16_at(&self, ptr: usize) -> std::io::Result<u64> {
let end = ptr + 2;
debug_assert!(end <= L);
if end <= self.0 {
Ok(u64::from_be(unsafe { self.read_obj_internal(ptr) }))
} else {
Err(overflow_err())
}
}
#[inline(always)]
pub fn u32_at(&self, ptr: usize) -> std::io::Result<u64> {
let end = ptr + 4;
debug_assert!(end <= L);
if end <= self.0 {
Ok(u64::from_be(unsafe { self.read_obj_internal(ptr) }))
} else {
Err(overflow_err())
}
}
#[inline(always)]
pub fn u64_at(&self, ptr: usize) -> std::io::Result<u64> {
let end = ptr + 8;
debug_assert!(end <= L);
if end <= self.0 {
Ok(u64::from_be(unsafe { self.read_obj_internal(ptr) }))
} else {
Err(overflow_err())
}
}
#[inline(always)] #[inline(always)]
pub fn read_struct<T: Copy>(&self, cursor: &mut usize) -> std::io::Result<&T> { pub fn read_struct<T: Copy>(&self, cursor: &mut usize) -> std::io::Result<&T> {
let ptr = *cursor; let ptr = *cursor;
@ -420,7 +461,7 @@ impl<const L: usize> Buffer<L> {
debug_assert!(end <= L); debug_assert!(end <= L);
if end <= self.0 { if end <= self.0 {
*cursor = end; *cursor = end;
Ok(u16::from_be(self.read_obj_internal(ptr))) Ok(u16::from_be(unsafe { self.read_obj_internal(ptr) }))
} else { } else {
Err(overflow_err()) Err(overflow_err())
} }
@ -433,7 +474,7 @@ impl<const L: usize> Buffer<L> {
debug_assert!(end <= L); debug_assert!(end <= L);
if end <= self.0 { if end <= self.0 {
*cursor = end; *cursor = end;
Ok(u32::from_be(self.read_obj_internal(ptr))) Ok(u32::from_be(unsafe { self.read_obj_internal(ptr) }))
} else { } else {
Err(overflow_err()) Err(overflow_err())
} }
@ -446,7 +487,7 @@ impl<const L: usize> Buffer<L> {
debug_assert!(end <= L); debug_assert!(end <= L);
if end <= self.0 { if end <= self.0 {
*cursor = end; *cursor = end;
Ok(u64::from_be(self.read_obj_internal(ptr))) Ok(u64::from_be(unsafe { self.read_obj_internal(ptr) }))
} else { } else {
Err(overflow_err()) Err(overflow_err())
} }

View file

@ -2,7 +2,7 @@
use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::atomic::{AtomicI64, Ordering};
/// Boolean rate limiter with normal (non-atomic, thread unsafe) semantics. /// Boolean rate limiter with normal (non-atomic) semantics.
#[repr(transparent)] #[repr(transparent)]
pub struct IntervalGate<const FREQ: i64>(i64); pub struct IntervalGate<const FREQ: i64>(i64);
@ -30,9 +30,7 @@ impl<const FREQ: i64> IntervalGate<FREQ> {
} }
} }
unsafe impl<const FREQ: i64> Send for IntervalGate<FREQ> {} /// Boolean rate limiter with atomic semantics.
/// Boolean rate limiter with atomic (thread safe) semantics.
#[repr(transparent)] #[repr(transparent)]
pub struct AtomicIntervalGate<const FREQ: i64>(AtomicI64); pub struct AtomicIntervalGate<const FREQ: i64>(AtomicI64);
@ -60,7 +58,3 @@ impl<const FREQ: i64> AtomicIntervalGate<FREQ> {
} }
} }
} }
unsafe impl<const FREQ: i64> Send for AtomicIntervalGate<FREQ> {}
unsafe impl<const FREQ: i64> Sync for AtomicIntervalGate<FREQ> {}

View file

@ -10,19 +10,22 @@ pub use zerotier_core_crypto::varint;
pub(crate) const ZEROES: [u8; 64] = [0_u8; 64]; pub(crate) const ZEROES: [u8; 64] = [0_u8; 64];
#[cfg(target_feature = "zt_trace")] #[cfg(target_feature = "debug_events")]
macro_rules! zt_trace { #[allow(unused_macros)]
macro_rules! debug_event {
($si:expr, $fmt:expr $(, $($arg:tt)*)?) => { ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {
$si.event(crate::Event::Trace(file!(), line!(), format!($fmt, $($($arg)*)?))); $si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?)));
} }
} }
#[cfg(not(target_feature = "zt_trace"))] #[cfg(not(target_feature = "debug_events"))]
macro_rules! zt_trace { #[allow(unused_macros)]
macro_rules! debug_event {
($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {}; ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {};
} }
pub(crate) use zt_trace; #[allow(unused_imports)]
pub(crate) use debug_event;
/// Obtain a reference to a sub-array within an existing byte array. /// Obtain a reference to a sub-array within an existing byte array.
#[inline(always)] #[inline(always)]

View file

@ -5,7 +5,6 @@ use std::cmp::Ordering;
use std::convert::TryInto; use std::convert::TryInto;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::io::Write; use std::io::Write;
use std::mem::MaybeUninit;
use std::ptr::{slice_from_raw_parts, slice_from_raw_parts_mut}; use std::ptr::{slice_from_raw_parts, slice_from_raw_parts_mut};
use std::str::FromStr; use std::str::FromStr;
@ -74,7 +73,7 @@ pub struct Identity {
#[inline(always)] #[inline(always)]
fn concat_arrays_2<const A: usize, const B: usize, const S: usize>(a: &[u8; A], b: &[u8; B]) -> [u8; S] { fn concat_arrays_2<const A: usize, const B: usize, const S: usize>(a: &[u8; A], b: &[u8; B]) -> [u8; S] {
assert_eq!(A + B, S); assert_eq!(A + B, S);
let mut tmp: [u8; S] = unsafe { MaybeUninit::uninit().assume_init() }; let mut tmp = [0_u8; S];
tmp[..A].copy_from_slice(a); tmp[..A].copy_from_slice(a);
tmp[A..].copy_from_slice(b); tmp[A..].copy_from_slice(b);
tmp tmp
@ -83,7 +82,7 @@ fn concat_arrays_2<const A: usize, const B: usize, const S: usize>(a: &[u8; A],
#[inline(always)] #[inline(always)]
fn concat_arrays_4<const A: usize, const B: usize, const C: usize, const D: usize, const S: usize>(a: &[u8; A], b: &[u8; B], c: &[u8; C], d: &[u8; D]) -> [u8; S] { fn concat_arrays_4<const A: usize, const B: usize, const C: usize, const D: usize, const S: usize>(a: &[u8; A], b: &[u8; B], c: &[u8; C], d: &[u8; D]) -> [u8; S] {
assert_eq!(A + B + C + D, S); assert_eq!(A + B + C + D, S);
let mut tmp: [u8; S] = unsafe { MaybeUninit::uninit().assume_init() }; let mut tmp = [0_u8; S];
tmp[..A].copy_from_slice(a); tmp[..A].copy_from_slice(a);
tmp[A..(A + B)].copy_from_slice(b); tmp[A..(A + B)].copy_from_slice(b);
tmp[(A + B)..(A + B + C)].copy_from_slice(c); tmp[(A + B)..(A + B + C)].copy_from_slice(c);

View file

@ -23,7 +23,7 @@ pub use endpoint::Endpoint;
pub use identity::*; pub use identity::*;
pub use inetaddress::{InetAddress, IpScope}; pub use inetaddress::{InetAddress, IpScope};
pub use mac::MAC; pub use mac::MAC;
pub use node::{InnerProtocolInterface, Node, SystemInterface}; pub use node::SystemInterface;
pub use path::Path; pub use path::Path;
pub use peer::Peer; pub use peer::Peer;
pub use rootset::{Root, RootSet}; pub use rootset::{Root, RootSet};

View file

@ -10,8 +10,8 @@ use dashmap::DashMap;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use crate::error::InvalidParameterError; use crate::error::InvalidParameterError;
use crate::util::debug_event;
use crate::util::gate::IntervalGate; use crate::util::gate::IntervalGate;
use crate::util::zt_trace;
use crate::vl1::path::Path; use crate::vl1::path::Path;
use crate::vl1::peer::Peer; use crate::vl1::peer::Peer;
use crate::vl1::protocol::*; use crate::vl1::protocol::*;
@ -195,32 +195,27 @@ impl<SI: SystemInterface> Node<SI> {
}) })
} }
/// Get a packet buffer that will automatically check itself back into the pool on drop.
#[inline(always)] #[inline(always)]
pub fn get_packet_buffer(&self) -> PooledPacketBuffer { pub fn get_packet_buffer(&self) -> PooledPacketBuffer {
self.buffer_pool.get() self.buffer_pool.get()
} }
/// Get a peer by address.
pub fn peer(&self, a: Address) -> Option<Arc<Peer<SI>>> { pub fn peer(&self, a: Address) -> Option<Arc<Peer<SI>>> {
self.peers.get(&a).map(|peer| peer.value().clone()) self.peers.get(&a).map(|peer| peer.value().clone())
} }
/// Run background tasks and return desired delay until next call in milliseconds.
///
/// This shouldn't be called concurrently by more than one loop. Doing so would be harmless
/// but would be a waste of compute cycles.
pub fn do_background_tasks(&self, si: &SI) -> Duration { pub fn do_background_tasks(&self, si: &SI) -> Duration {
let mut intervals = self.intervals.lock(); let mut intervals = self.intervals.lock();
let tt = si.time_ticks(); let tt = si.time_ticks();
assert!(ROOT_SYNC_INTERVAL_MS <= (ROOT_HELLO_INTERVAL / 2));
if intervals.root_sync.gate(tt) { if intervals.root_sync.gate(tt) {
match &mut (*self.roots.lock()) { match &mut (*self.roots.lock()) {
RootInfo { roots, sets, sets_modified } => { RootInfo { roots, sets, sets_modified } => {
// Sychronize root info with root sets info if the latter has changed. // Update internal data structures if the root set configuration has changed.
if *sets_modified { if *sets_modified {
*sets_modified = false; *sets_modified = false;
zt_trace!(si, "root sets modified, rescanning..."); debug_event!(si, "root sets modified, synchronizing internal data structures");
let mut old_root_identities: Vec<Identity> = roots.drain().map(|r| r.0.identity.clone()).collect(); let mut old_root_identities: Vec<Identity> = roots.drain().map(|r| r.0.identity.clone()).collect();
let mut new_root_identities = Vec::new(); let mut new_root_identities = Vec::new();
@ -279,11 +274,10 @@ impl<SI: SystemInterface> Node<SI> {
// they have, which is a behavior that differs from normal peers. This allows roots to // they have, which is a behavior that differs from normal peers. This allows roots to
// e.g. see our IPv4 and our IPv6 address which can be important for us to learn our // e.g. see our IPv4 and our IPv6 address which can be important for us to learn our
// external addresses from them. // external addresses from them.
assert!(ROOT_SYNC_INTERVAL_MS <= (ROOT_HELLO_INTERVAL / 2));
if intervals.root_hello.gate(tt) { if intervals.root_hello.gate(tt) {
for (root, endpoints) in roots.iter() { for (root, endpoints) in roots.iter() {
for ep in endpoints.iter() { for ep in endpoints.iter() {
zt_trace!(si, "sending HELLO to root {}", root.identity.address.to_string()); debug_event!(si, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL);
root.send_hello(si, self, Some(ep)); root.send_hello(si, self, Some(ep));
} }
} }
@ -300,6 +294,7 @@ impl<SI: SystemInterface> Node<SI> {
let _ = best.insert(r); let _ = best.insert(r);
} }
} }
debug_event!(si, "new best root: {}", best.clone().map_or("none".into(), |p| p.identity.address.to_string()));
*(self.best_root.write()) = best.cloned(); *(self.best_root.write()) = best.cloned();
} }
} }
@ -338,23 +333,31 @@ impl<SI: SystemInterface> Node<SI> {
Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2) Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2)
} }
/// Called when a packet is received on the physical wire.
pub fn handle_incoming_physical_packet<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) { pub fn handle_incoming_physical_packet<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) {
zt_trace!(si, "<< incoming packet from {} length {} via socket {}@{}", source_endpoint.to_string(), data.len(), source_local_socket.to_string(), source_local_interface.to_string()); debug_event!(
si,
"<< #{} ->{} from {} length {} via socket {}@{}",
data.bytes_fixed_at::<8>(0).map_or("????????????????".into(), |pid| zerotier_core_crypto::hex::to_string(pid)),
data.bytes_fixed_at::<5>(8).map_or("??????????".into(), |dest| zerotier_core_crypto::hex::to_string(dest)),
source_endpoint.to_string(),
data.len(),
source_local_socket.to_string(),
source_local_interface.to_string()
);
if let Ok(fragment_header) = data.struct_mut_at::<FragmentHeader>(0) { if let Ok(fragment_header) = data.struct_mut_at::<FragmentHeader>(0) {
if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) { if let Some(dest) = Address::from_bytes_fixed(&fragment_header.dest) {
let time_ticks = si.time_ticks(); let time_ticks = si.time_ticks();
if dest == self.identity.address { if dest == self.identity.address {
// Handle packets addressed to this node.
let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks); let path = self.canonical_path(source_endpoint, source_local_socket, source_local_interface, time_ticks);
path.log_receive_anything(time_ticks); path.log_receive_anything(time_ticks);
if fragment_header.is_fragment() { if fragment_header.is_fragment() {
if let Some(assembled_packet) = path.receive_fragment(u64::from_ne_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) { debug_event!(si, "-- #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments());
if let Some(assembled_packet) = path.receive_fragment(fragment_header.packet_id(), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) {
if let Some(frag0) = assembled_packet.frags[0].as_ref() { if let Some(frag0) = assembled_packet.frags[0].as_ref() {
zt_trace!(si, "fragmented packet fully assembled!"); debug_event!(si, "-- #{:0>16x} packet fully assembled!", u64::from_be_bytes(fragment_header.id));
let packet_header = frag0.struct_at::<PacketHeader>(0); let packet_header = frag0.struct_at::<PacketHeader>(0);
if packet_header.is_ok() { if packet_header.is_ok() {
@ -371,7 +374,7 @@ impl<SI: SystemInterface> Node<SI> {
} }
} else { } else {
if let Ok(packet_header) = data.struct_at::<PacketHeader>(0) { if let Ok(packet_header) = data.struct_at::<PacketHeader>(0) {
zt_trace!(si, "parsing unfragmented packet"); debug_event!(si, "-- #{:0>16x} is unfragmented", u64::from_be_bytes(fragment_header.id));
if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) { if let Some(peer) = self.peer(source) {
@ -383,16 +386,17 @@ impl<SI: SystemInterface> Node<SI> {
} }
} }
} else { } else {
// Forward packets not destined for this node.
// TODO: SHOULD we forward? Need a way to check.
if fragment_header.is_fragment() { if fragment_header.is_fragment() {
debug_event!(si, "-- #{:0>16x} forwarding packet fragment to {}", u64::from_be_bytes(fragment_header.id), dest.to_string());
if fragment_header.increment_hops() > FORWARD_MAX_HOPS { if fragment_header.increment_hops() > FORWARD_MAX_HOPS {
debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(fragment_header.id));
return; return;
} }
} else { } else {
if let Ok(packet_header) = data.struct_mut_at::<PacketHeader>(0) { if let Ok(packet_header) = data.struct_mut_at::<PacketHeader>(0) {
debug_event!(si, "-- #{:0>16x} forwarding packet to {}", u64::from_be_bytes(fragment_header.id), dest.to_string());
if packet_header.increment_hops() > FORWARD_MAX_HOPS { if packet_header.increment_hops() > FORWARD_MAX_HOPS {
debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(fragment_header.id));
return; return;
} }
} else { } else {
@ -401,33 +405,23 @@ impl<SI: SystemInterface> Node<SI> {
} }
if let Some(peer) = self.peer(dest) { if let Some(peer) = self.peer(dest) {
// TODO: SHOULD we forward? Need a way to check.
peer.forward(si, time_ticks, data.as_ref()); peer.forward(si, time_ticks, data.as_ref());
debug_event!(si, "-- #{:0>16x} forwarded successfully", u64::from_be_bytes(fragment_header.id));
} }
} }
} }
} }
} }
/// Get the current best root peer that we should use for WHOIS, relaying, etc.
pub fn root(&self) -> Option<Arc<Peer<SI>>> { pub fn root(&self) -> Option<Arc<Peer<SI>>> {
self.best_root.read().clone() self.best_root.read().clone()
} }
/// Return true if a peer is a root.
pub fn is_peer_root(&self, peer: &Peer<SI>) -> bool { pub fn is_peer_root(&self, peer: &Peer<SI>) -> bool {
self.roots.lock().roots.contains_key(peer) self.roots.lock().roots.contains_key(peer)
} }
/// Add or update a root set.
///
/// If no root set exists by this name, a new root set is added. If one already
/// exists it's checked against the new one and updated if the new set is valid
/// and should supersede it.
///
/// Changes will take effect within a few seconds when root sets are next
/// examined and synchronized with peer and root list state.
///
/// This returns true if the new root set was accepted and false otherwise.
pub fn add_update_root_set(&self, rs: RootSet) -> bool { pub fn add_update_root_set(&self, rs: RootSet) -> bool {
let mut roots = self.roots.lock(); let mut roots = self.roots.lock();
if let Some(entry) = roots.sets.get_mut(&rs.name) { if let Some(entry) = roots.sets.get_mut(&rs.name) {
@ -444,10 +438,14 @@ impl<SI: SystemInterface> Node<SI> {
return false; return false;
} }
/// Get the canonical Path object for a given endpoint and local socket information. pub fn has_roots_defined(&self) -> bool {
/// self.roots.lock().sets.iter().any(|rs| !rs.1.members.is_empty())
/// This is a canonicalizing function that returns a unique path object for every tuple }
/// of endpoint, local socket, and local interface.
pub fn root_sets(&self) -> Vec<RootSet> {
self.roots.lock().sets.values().cloned().collect()
}
pub fn canonical_path(&self, ep: &Endpoint, local_socket: &SI::LocalSocket, local_interface: &SI::LocalInterface, time_ticks: i64) -> Arc<Path<SI>> { pub fn canonical_path(&self, ep: &Endpoint, local_socket: &SI::LocalSocket, local_interface: &SI::LocalInterface, time_ticks: i64) -> Arc<Path<SI>> {
// It's faster to do a read only lookup first since most of the time this will succeed. The second // It's faster to do a read only lookup first since most of the time this will succeed. The second
// version below this only gets invoked if it's a new path. // version below this only gets invoked if it's a new path.
@ -460,7 +458,7 @@ impl<SI: SystemInterface> Node<SI> {
return self return self
.paths .paths
.entry(ep.clone()) .entry(ep.clone())
.or_insert_with(|| parking_lot::RwLock::new(HashMap::with_capacity(2))) .or_insert_with(|| parking_lot::RwLock::new(HashMap::with_capacity(4)))
.value_mut() .value_mut()
.write() .write()
.entry(local_socket.clone()) .entry(local_socket.clone())

View file

@ -25,7 +25,7 @@ pub struct Path<SI: SystemInterface> {
last_send_time_ticks: AtomicI64, last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64,
create_time_ticks: i64, create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<u64, FragmentedPacket, PacketIdHasher>>, fragmented_packets: Mutex<HashMap<PacketId, FragmentedPacket, PacketIdHasher>>,
} }
impl<SI: SystemInterface> Path<SI> { impl<SI: SystemInterface> Path<SI> {
@ -43,7 +43,7 @@ impl<SI: SystemInterface> Path<SI> {
/// Receive a fragment and return a FragmentedPacket if the entire packet was assembled. /// Receive a fragment and return a FragmentedPacket if the entire packet was assembled.
/// This returns None if more fragments are needed to assemble the packet. /// This returns None if more fragments are needed to assemble the packet.
pub(crate) fn receive_fragment(&self, packet_id: u64, fragment_no: u8, fragment_expecting_count: u8, packet: PooledPacketBuffer, time_ticks: i64) -> Option<FragmentedPacket> { pub(crate) fn receive_fragment(&self, packet_id: PacketId, fragment_no: u8, fragment_expecting_count: u8, packet: PooledPacketBuffer, time_ticks: i64) -> Option<FragmentedPacket> {
let mut fp = self.fragmented_packets.lock(); let mut fp = self.fragmented_packets.lock();
// Discard some old waiting packets if the total incoming fragments for a path exceeds a // Discard some old waiting packets if the total incoming fragments for a path exceeds a

View file

@ -79,7 +79,7 @@ fn salsa_poly_create(secret: &SymmetricSecret, header: &PacketHeader, packet_siz
} }
/// Attempt AEAD packet encryption and MAC validation. Returns message ID on success. /// Attempt AEAD packet encryption and MAC validation. Returns message ID on success.
fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option<PooledPacketBuffer>], payload: &mut PacketBuffer) -> Option<u64> { fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option<PooledPacketBuffer>], payload: &mut PacketBuffer) -> Option<MessageId> {
packet_frag0_payload_bytes.get(0).map_or(None, |verb| { packet_frag0_payload_bytes.get(0).map_or(None, |verb| {
match header.cipher() { match header.cipher() {
security_constants::CIPHER_NOCRYPT_POLY1305 => { security_constants::CIPHER_NOCRYPT_POLY1305 => {
@ -173,6 +173,9 @@ impl<SI: SystemInterface> Peer<SI> {
* still want to avoid it. If the clock is at least marginally correct this will mean that message IDs * still want to avoid it. If the clock is at least marginally correct this will mean that message IDs
* will remain unique for over a hundred years. Message IDs are kept secret as well because they are * will remain unique for over a hundred years. Message IDs are kept secret as well because they are
* encrypted along with a GMAC code to form an opaque 128-bit packet tag. * encrypted along with a GMAC code to form an opaque 128-bit packet tag.
*
* Keep in mind that we re-key (when talking to new nodes) so not only are duplicate message IDs not
* particularly dangerous in SIV but they'd have to occur while using the same key.
*/ */
Self { Self {
identity: id, identity: id,
@ -192,8 +195,7 @@ impl<SI: SystemInterface> Peer<SI> {
/// Get the next message ID for sending a message to this peer. /// Get the next message ID for sending a message to this peer.
#[inline(always)] #[inline(always)]
pub(crate) fn next_message_id(&self) -> u64 { pub(crate) fn next_message_id(&self) -> MessageId {
// SECURITY NOTE: uses the strictest memory ordering to avoid duplicate IDs on loose architectures like ARM64.
self.message_id_counter.fetch_add(1, Ordering::SeqCst) self.message_id_counter.fetch_add(1, Ordering::SeqCst)
} }

View file

@ -58,6 +58,12 @@ pub type PooledPacketBuffer = crate::util::pool::Pooled<PacketBuffer, PacketBuff
/// Source for instances of PacketBuffer /// Source for instances of PacketBuffer
pub type PacketBufferPool = crate::util::pool::Pool<PacketBuffer, PacketBufferFactory>; pub type PacketBufferPool = crate::util::pool::Pool<PacketBuffer, PacketBufferFactory>;
/// 64-bit packet (outer) ID.
pub type PacketId = u64;
/// 64-bit message ID (obtained after AEAD decryption).
pub type MessageId = u64;
pub mod verbs { pub mod verbs {
pub const VL1_NOP: u8 = 0x00; pub const VL1_NOP: u8 = 0x00;
pub const VL1_HELLO: u8 = 0x01; pub const VL1_HELLO: u8 = 0x01;
@ -289,6 +295,11 @@ pub struct PacketHeader {
} }
impl PacketHeader { impl PacketHeader {
#[inline(always)]
pub fn packet_id(&self) -> PacketId {
u64::from_ne_bytes(self.id)
}
#[inline(always)] #[inline(always)]
pub fn cipher(&self) -> u8 { pub fn cipher(&self) -> u8 {
self.flags_cipher_hops & packet_constants::FLAGS_FIELD_MASK_CIPHER self.flags_cipher_hops & packet_constants::FLAGS_FIELD_MASK_CIPHER
@ -352,6 +363,11 @@ pub struct FragmentHeader {
} }
impl FragmentHeader { impl FragmentHeader {
#[inline(always)]
pub fn packet_id(&self) -> PacketId {
u64::from_ne_bytes(self.id)
}
#[inline(always)] #[inline(always)]
pub fn is_fragment(&self) -> bool { pub fn is_fragment(&self) -> bool {
self.fragment_indicator == packet_constants::FRAGMENT_INDICATOR self.fragment_indicator == packet_constants::FRAGMENT_INDICATOR

View file

@ -2,8 +2,9 @@
mod multicastgroup; mod multicastgroup;
mod networkid; mod networkid;
mod switch;
pub(crate) mod switch;
pub use multicastgroup::MulticastGroup; pub use multicastgroup::MulticastGroup;
pub use networkid::NetworkId; pub use networkid::NetworkId;
pub use switch::{Switch, SwitchInterface}; pub use switch::SwitchInterface;

View file

@ -11,9 +11,9 @@ use zerotier_network_hypervisor::vl1::RootSet;
pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 { pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 {
match cmd_args.subcommand() { match cmd_args.subcommand() {
Some(("trust", sc_args)) => todo!(), Some(("add", sc_args)) => todo!(),
Some(("untrust", sc_args)) => todo!(), Some(("remove", sc_args)) => todo!(),
Some(("list", _)) => todo!(), Some(("list", _)) => todo!(),
@ -106,7 +106,7 @@ pub async fn cmd(_: Flags, cmd_args: &ArgMatches) -> i32 {
} }
} }
Some(("default", _)) => { Some(("restoredefault", _)) => {
let _ = std::io::stdout().write_all(crate::utils::to_json_pretty(&RootSet::zerotier_default()).as_bytes()); let _ = std::io::stdout().write_all(crate::utils::to_json_pretty(&RootSet::zerotier_default()).as_bytes());
} }

View file

@ -4,7 +4,7 @@ use std::collections::BTreeMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use zerotier_network_hypervisor::vl1::{Address, InetAddress}; use zerotier_network_hypervisor::vl1::{Address, Endpoint, InetAddress};
use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_network_hypervisor::vl2::NetworkId;
/// A list of unassigned or obsolete ports under 1024 that could possibly be squatted. /// A list of unassigned or obsolete ports under 1024 that could possibly be squatted.
@ -34,12 +34,12 @@ impl Default for PhysicalPathSettings {
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] #[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)] #[serde(default)]
pub struct VirtualNetworkSettings { pub struct VirtualPathSettings {
#[serde(rename = "try")] #[serde(rename = "try")]
pub try_: Vec<InetAddress>, pub try_: Vec<Endpoint>,
} }
impl Default for VirtualNetworkSettings { impl Default for VirtualPathSettings {
fn default() -> Self { fn default() -> Self {
Self { try_: Vec::new() } Self { try_: Vec::new() }
} }
@ -132,9 +132,9 @@ impl GlobalSettings {
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] #[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)] #[serde(default)]
pub struct Config { pub struct Config {
pub physical: BTreeMap<InetAddress, PhysicalPathSettings>, pub physical: BTreeMap<Endpoint, PhysicalPathSettings>,
#[serde(rename = "virtual")] #[serde(rename = "virtual")]
pub virtual_: BTreeMap<Address, VirtualNetworkSettings>, pub virtual_: BTreeMap<Address, VirtualPathSettings>,
pub network: BTreeMap<NetworkId, NetworkSettings>, pub network: BTreeMap<NetworkId, NetworkSettings>,
pub settings: GlobalSettings, pub settings: GlobalSettings,
} }

View file

@ -20,17 +20,17 @@ impl LocalInterface {
let l = nb.len(); let l = nb.len();
assert!(l <= 16); // do any *nix OSes have device names longer than 16 bytes? assert!(l <= 16); // do any *nix OSes have device names longer than 16 bytes?
tmp[..l].copy_from_slice(&nb[..l]); tmp[..l].copy_from_slice(&nb[..l]);
Self(u128::from_le_bytes(tmp)) Self(u128::from_be_bytes(tmp))
} }
} }
impl ToString for LocalInterface { impl ToString for LocalInterface {
#[cfg(unix)] #[cfg(unix)]
fn to_string(&self) -> String { fn to_string(&self) -> String {
let b = self.0.to_le_bytes(); let b = self.0.to_be_bytes();
let mut l = 0; let mut l = 0;
for _ in 0..16 { for bb in b.iter() {
if b[l] > 0 { if *bb > 0 {
l += 1; l += 1;
} else { } else {
break; break;
@ -38,4 +38,9 @@ impl ToString for LocalInterface {
} }
String::from_utf8_lossy(&b[..l]).to_string() String::from_utf8_lossy(&b[..l]).to_string()
} }
#[cfg(windows)]
fn to_string(&self) -> String {
zerotier_core_crypto::hex::to_string(&self.0.to_be_bytes())
}
} }

View file

@ -79,13 +79,13 @@ Advanced Operations:
verify <?identity> <@file> <sig> Verify a signature verify <?identity> <@file> <sig> Verify a signature
rootset <command> [args] rootset <command> [args]
· trust <@root set> Add or update a root set · add <@root set> Add or update a root set
· untrust <root set name> Stop using a root set · remove <root set name> Stop using a root set
· list List root sets in use · list List root sets in use
sign <path> <?identity secret> Sign a root set with an identity sign <path> <?identity secret> Sign a root set with an identity
verify <path> Load and verify a root set verify <path> Load and verify a root set
marshal <path> Dump root set as binary to stdout marshal <path> Dump root set as binary to stdout
default Dump the default root set as JSON restoredefault (Re-)add built-in default root set
service Start local service service Start local service
(usually not invoked manually) (usually not invoked manually)
@ -196,13 +196,13 @@ fn main() {
) )
.subcommand( .subcommand(
Command::new("rootset") Command::new("rootset")
.subcommand(Command::new("trust").arg(Arg::new("path").index(1).required(true))) .subcommand(Command::new("add").arg(Arg::new("path").index(1).required(true)))
.subcommand(Command::new("untrust").arg(Arg::new("name").index(1).required(true))) .subcommand(Command::new("remove").arg(Arg::new("name").index(1).required(true)))
.subcommand(Command::new("list")) .subcommand(Command::new("list"))
.subcommand(Command::new("sign").arg(Arg::new("path").index(1).required(true)).arg(Arg::new("secret").index(2).required(true))) .subcommand(Command::new("sign").arg(Arg::new("path").index(1).required(true)).arg(Arg::new("secret").index(2).required(true)))
.subcommand(Command::new("verify").arg(Arg::new("path").index(1).required(true))) .subcommand(Command::new("verify").arg(Arg::new("path").index(1).required(true)))
.subcommand(Command::new("marshal").arg(Arg::new("path").index(1).required(true))) .subcommand(Command::new("marshal").arg(Arg::new("path").index(1).required(true)))
.subcommand(Command::new("default")), .subcommand(Command::new("restoredefault")),
) )
.override_help(help.as_str()) .override_help(help.as_str())
.override_usage("") .override_usage("")

View file

@ -22,7 +22,7 @@ use crate::utils::{ms_monotonic, ms_since_epoch};
const UDP_UPDATE_BINDINGS_INTERVAL_MS: Duration = Duration::from_millis(2500); const UDP_UPDATE_BINDINGS_INTERVAL_MS: Duration = Duration::from_millis(2500);
/// ZeroTier system service, which presents virtual networks as VPN connections. /// ZeroTier system service, which presents virtual networks as VPN connections on Windows/macOS/Linux/BSD/etc.
pub struct Service { pub struct Service {
udp_binding_task: tokio::task::JoinHandle<()>, udp_binding_task: tokio::task::JoinHandle<()>,
core_background_service_task: tokio::task::JoinHandle<()>, core_background_service_task: tokio::task::JoinHandle<()>,
@ -45,10 +45,6 @@ impl Drop for Service {
self.udp_binding_task.abort(); self.udp_binding_task.abort();
self.core_background_service_task.abort(); self.core_background_service_task.abort();
// Wait for all tasks to actually stop.
let _ = self.udp_binding_task.await;
let _ = self.core_background_service_task.await;
// Drop all bound sockets since these can hold circular Arc<> references to 'internal'. // Drop all bound sockets since these can hold circular Arc<> references to 'internal'.
self.internal.udp_sockets.write().await.clear(); self.internal.udp_sockets.write().await.clear();
}); });
@ -68,10 +64,11 @@ impl Service {
let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity)?); let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity)?);
let si = Arc::new(si); let si = Arc::new(si);
let (si1, si2) = (si.clone(), si.clone()); si._core.as_ref().unwrap().add_update_default_root_set_if_none();
Ok(Self { Ok(Self {
udp_binding_task: si.rt.spawn(si1.udp_binding_task_main()), udp_binding_task: si.rt.spawn(si.clone().udp_binding_task_main()),
core_background_service_task: si.rt.spawn(si2.core_background_service_task_main()), core_background_service_task: si.rt.spawn(si.clone().core_background_service_task_main()),
internal: si, internal: si,
}) })
} }
@ -80,11 +77,11 @@ impl Service {
impl ServiceImpl { impl ServiceImpl {
#[inline(always)] #[inline(always)]
fn core(&self) -> &NetworkHypervisor<ServiceImpl> { fn core(&self) -> &NetworkHypervisor<ServiceImpl> {
debug_assert!(self._core.is_some()); self._core.as_ref().unwrap()
unsafe { self._core.as_ref().unwrap_unchecked() }
} }
async fn update_bindings_for_port(self: &Arc<Self>, port: u16, interface_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> Option<Vec<(LocalInterface, InetAddress, std::io::Error)>> { /// Called in udp_binding_task_main() to service a particular UDP port.
async fn update_udp_bindings_for_port(self: &Arc<Self>, port: u16, interface_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> Option<Vec<(LocalInterface, InetAddress, std::io::Error)>> {
let mut udp_sockets = self.udp_sockets.write().await; let mut udp_sockets = self.udp_sockets.write().await;
let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port)); let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port));
let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist); let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist);
@ -94,17 +91,20 @@ impl ServiceImpl {
drop(udp_sockets); // release lock drop(udp_sockets); // release lock
for ns in new_sockets.iter() { for ns in new_sockets.iter() {
// We start a task for each CPU core. Tokio multiplexes but since each packet takes a bit of CPU /*
// to parse, decrypt, etc. we want to be able to saturate the CPU for any given socket to virtual * Start a task (not actual thread) for each CPU core.
// network path. The alternative would be to use MPMC channels but that would almost certainly be *
// a lot slower as it would involve more sync/atomic bottlenecks and probably extra malloc/free. * The async runtime is itself multithreaded but each packet takes a little bit of CPU to handle.
let mut kill_on_drop = ns.kill_on_drop.lock(); * This makes sure that when one packet is in processing the async runtime is immediately able to
* cue up another receiver for this socket.
*/
let mut socket_associated_tasks = ns.socket_associated_tasks.lock();
for _ in 0..self.num_listeners_per_socket { for _ in 0..self.num_listeners_per_socket {
let self2 = self.clone(); let self2 = self.clone();
let socket = ns.socket.clone(); let socket = ns.socket.clone();
let interface = ns.interface.clone(); let interface = ns.interface.clone();
let local_socket = LocalSocket(Arc::downgrade(ns), self.local_socket_unique_id_counter.fetch_add(1, Ordering::SeqCst)); let local_socket = LocalSocket(Arc::downgrade(ns), self.local_socket_unique_id_counter.fetch_add(1, Ordering::SeqCst));
kill_on_drop.push(self.rt.spawn(async move { socket_associated_tasks.push(self.rt.spawn(async move {
let core = self2.core(); let core = self2.core();
loop { loop {
let mut buf = core.get_packet_buffer(); let mut buf = core.get_packet_buffer();
@ -122,11 +122,12 @@ impl ServiceImpl {
return None; return None;
} }
/// Background task to update per-interface/per-port bindings if system interface configuration changes.
async fn udp_binding_task_main(self: Arc<Self>) { async fn udp_binding_task_main(self: Arc<Self>) {
loop { loop {
let config = self.data.config().await; let config = self.data.config().await;
if let Some(errors) = self.update_bindings_for_port(config.settings.primary_port, &config.settings.interface_prefix_blacklist, &config.settings.cidr_blacklist).await { if let Some(errors) = self.update_udp_bindings_for_port(config.settings.primary_port, &config.settings.interface_prefix_blacklist, &config.settings.cidr_blacklist).await {
for e in errors.iter() { for e in errors.iter() {
println!("BIND ERROR: {} {} {}", e.0.to_string(), e.1.to_string(), e.2.to_string()); println!("BIND ERROR: {} {} {}", e.0.to_string(), e.1.to_string(), e.2.to_string());
} }
@ -137,6 +138,7 @@ impl ServiceImpl {
} }
} }
/// Periodically calls do_background_tasks() in the ZeroTier core.
async fn core_background_service_task_main(self: Arc<Self>) { async fn core_background_service_task_main(self: Arc<Self>) {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_secs(1)).await;
loop { loop {
@ -149,16 +151,16 @@ impl SystemInterface for ServiceImpl {
type LocalSocket = crate::service::LocalSocket; type LocalSocket = crate::service::LocalSocket;
type LocalInterface = crate::localinterface::LocalInterface; type LocalInterface = crate::localinterface::LocalInterface;
fn event_node_is_up(&self) {} fn event(&self, event: Event) {
println!("{}", event.to_string());
match event {
_ => {}
}
}
fn event_node_is_down(&self) {} fn user_message(&self, _source: &Identity, _message_type: u64, _message: &[u8]) {}
fn event_online_status_change(&self, online: bool) {}
fn event_user_message(&self, source: &Identity, message_type: u64, message: &[u8]) {}
fn event_security_warning(&self, warning: &str) {}
#[inline(always)]
fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool { fn local_socket_is_valid(&self, socket: &Self::LocalSocket) -> bool {
socket.0.strong_count() > 0 socket.0.strong_count() > 0
} }
@ -223,12 +225,26 @@ impl SystemInterface for ServiceImpl {
return false; return false;
} }
fn check_path(&self, id: &Identity, endpoint: &Endpoint, local_socket: Option<&Self::LocalSocket>, local_interface: Option<&Self::LocalInterface>) -> bool { fn check_path(&self, _id: &Identity, endpoint: &Endpoint, _local_socket: Option<&Self::LocalSocket>, _local_interface: Option<&Self::LocalInterface>) -> bool {
true self.rt.block_on(async {
let config = self.data.config().await;
if let Some(pps) = config.physical.get(endpoint) {
!pps.blacklist
} else {
true
}
})
} }
fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>> { fn get_path_hints(&self, id: &Identity) -> Option<Vec<(Endpoint, Option<Self::LocalSocket>, Option<Self::LocalInterface>)>> {
None self.rt.block_on(async {
let config = self.data.config().await;
if let Some(vns) = config.virtual_.get(&id.address) {
Some(vns.try_.iter().map(|ep| (ep.clone(), None, None)).collect())
} else {
None
}
})
} }
#[inline(always)] #[inline(always)]

View file

@ -43,14 +43,14 @@ pub struct BoundUdpSocket {
pub interface: LocalInterface, pub interface: LocalInterface,
/// Add tasks here that should be aborted when this socket is closed. /// Add tasks here that should be aborted when this socket is closed.
pub kill_on_drop: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>, pub socket_associated_tasks: parking_lot::Mutex<Vec<tokio::task::JoinHandle<()>>>,
fd: RawFd, fd: RawFd,
} }
impl Drop for BoundUdpSocket { impl Drop for BoundUdpSocket {
fn drop(&mut self) { fn drop(&mut self) {
for t in self.kill_on_drop.lock().drain(..) { for t in self.socket_associated_tasks.lock().drain(..) {
t.abort(); t.abort();
} }
} }
@ -64,7 +64,7 @@ impl BoundUdpSocket {
unsafe { libc::setsockopt(self.fd.as_(), libc::IPPROTO_IP.as_(), libc::IP_TOS.as_(), (&ttl as *const c_int).cast(), std::mem::size_of::<c_int>().as_()) }; unsafe { libc::setsockopt(self.fd.as_(), libc::IPPROTO_IP.as_(), libc::IP_TOS.as_(), (&ttl as *const c_int).cast(), std::mem::size_of::<c_int>().as_()) };
} }
#[cfg(any(target_os = "macos", target_os = "freebsd"))] #[cfg(any(target_os = "macos"))]
pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool { pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
let mut ok = false; let mut ok = false;
if dest.family() == self.address.family() { if dest.family() == self.address.family() {
@ -217,7 +217,7 @@ impl BoundUdpPort {
let s = Arc::new(BoundUdpSocket { let s = Arc::new(BoundUdpSocket {
address: addr_with_port, address: addr_with_port,
socket: Arc::new(s.unwrap()), socket: Arc::new(s.unwrap()),
kill_on_drop: parking_lot::Mutex::new(Vec::new()), socket_associated_tasks: parking_lot::Mutex::new(Vec::new()),
interface: interface.clone(), interface: interface.clone(),
fd, fd,
}); });

View file

@ -24,11 +24,13 @@ lazy_static! {
} }
/// Get milliseconds since unix epoch. /// Get milliseconds since unix epoch.
#[inline(always)]
pub fn ms_since_epoch() -> i64 { pub fn ms_since_epoch() -> i64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64 SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64
} }
/// Get milliseconds since an arbitrary time in the past, guaranteed to monotonically increase. /// Get milliseconds since an arbitrary time in the past, guaranteed to monotonically increase.
#[inline(always)]
pub fn ms_monotonic() -> i64 { pub fn ms_monotonic() -> i64 {
Instant::now().duration_since(*STARTUP_INSTANT).as_millis() as i64 Instant::now().duration_since(*STARTUP_INSTANT).as_millis() as i64
} }