diff --git a/.gitignore b/.gitignore index ed11fee73..332cc9e43 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,11 @@ /target /**/target + /aes-gmac-siv/Cargo.lock /zerotier-core-crypto/Cargo.lock /zerotier-network-hypervisor/Cargo.lock +/allthethings/Cargo.lock + .DS_* .Icon* ._* diff --git a/Makefile b/Makefile index 7ae8565a9..1d5bb2607 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ all: clean: FORCE - rm -rf zerotier-core-crypto/target zerotier-network-hypervisor/target zerotier-system-service/target + rm -rf zerotier-core-crypto/target zerotier-network-hypervisor/target zerotier-system-service/target allthethings/target FORCE: diff --git a/allthethings/.gitignore b/allthethings/.gitignore new file mode 100644 index 000000000..96ef6c0b9 --- /dev/null +++ b/allthethings/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/allthethings/Cargo.toml b/allthethings/Cargo.toml new file mode 100644 index 000000000..348aaaffb --- /dev/null +++ b/allthethings/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "allthethings" +version = "0.1.0" +edition = "2021" + +[dependencies] +sha2 = { version = "^0", features = ["asm"] } +smol = { version = "^1", features = [] } +getrandom = "^0" diff --git a/allthethings/src/lib.rs b/allthethings/src/lib.rs new file mode 100644 index 000000000..69cae67a7 --- /dev/null +++ b/allthethings/src/lib.rs @@ -0,0 +1,12 @@ +mod store; +mod replicator; +mod protocol; +mod varint; + +pub(crate) fn ms_since_epoch() -> u64 { + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64 +} + +pub(crate) fn ms_monotonic() -> u64 { + std::time::Instant::now().elapsed().as_millis() as u64 +} diff --git a/allthethings/src/protocol.rs b/allthethings/src/protocol.rs new file mode 100644 index 000000000..a8be3b11f --- /dev/null +++ b/allthethings/src/protocol.rs @@ -0,0 +1,27 @@ +pub(crate) const PROTOCOL_VERSION: u8 = 1; + +/// No operation and no payload, sent as a heartbeat. +/// This is the only message type NOT followed by a message size varint. It's just one byte. +pub(crate) const MESSAGE_TYPE_NOP: u8 = 0; + +/// An object either sent in response to a query or because it is new. +/// Payload is simply the object. The hash is not included as we compute it locally for security. +pub(crate) const MESSAGE_TYPE_OBJECT: u8 = 1; + +/// Request one or more objects by identity hash with optional common prefix. +pub(crate) const MESSAGE_TYPE_GET_OBJECTS: u8 = 2; + +/// HELLO message, which is all u8's and is packed and so can be parsed directly in place. +/// This message is sent at the start of any connection by both sides. +#[repr(packed)] +pub(crate) struct Hello { + pub hello_size: u8, // technically a varint but below 0x80 + pub protocol_version: u8, + pub flags: [u8; 4], // u32, little endian + pub clock: [u8; 8], // u64, little endian + pub data_set_size: [u8; 8], // u64, little endian + pub domain_hash: [u8; 32], + pub instance_id: [u8; 16], + pub loopback_check_code_salt: [u8; 8], + pub loopback_check_code: [u8; 16], +} diff --git a/allthethings/src/replicator.rs b/allthethings/src/replicator.rs new file mode 100644 index 000000000..1dc947ae6 --- /dev/null +++ b/allthethings/src/replicator.rs @@ -0,0 +1,273 @@ +use std::collections::HashMap; +use std::convert::TryInto; +use std::future::Future; +use std::hash::{Hash, Hasher}; +use std::io::Read; +use std::mem::{size_of, transmute}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::time::Duration; + +use getrandom::getrandom; +use sha2::{Digest, Sha256}; +use sha2::digest::{FixedOutput, Reset, Update}; +use smol::{Executor, Task, Timer}; +use smol::future; +use smol::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use smol::lock::Mutex; +use smol::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream, SocketAddr}; +use smol::stream::StreamExt; + +use crate::{ms_monotonic, ms_since_epoch, protocol}; +use crate::store::Store; +use crate::varint; + +const CONNECTION_TIMEOUT_SECONDS: u64 = 30; + +pub struct Config { + /// TCP port to which this should bind. + pub tcp_port: u16, + + /// A name for this replicated data set. This is just used to prevent linking to peers replicating different data. + pub domain: String, +} + +#[derive(PartialEq, Eq, Clone)] +struct ConnectionKey { + instance_id: [u8; 16], + ip: IpAddr, +} + +impl Hash for ConnectionKey { + #[inline(always)] + fn hash(&self, state: &mut H) { + state.write(&self.instance_id); + self.ip.hash(state); + } +} + +struct ReplicatorImpl { + instance_id: [u8; 16], + loopback_check_code_secret: [u8; 16], + domain_hash: [u8; 32], + store: Arc, + config: Config, + connections: Mutex)>>, +} + +pub struct Replicator { + state: Arc>, + v4_listener_task: Option>, + v6_listener_task: Option>, + service_task: Task<()>, +} + +impl Replicator { + /// Create a new replicator to replicate the contents of the provided store. + /// All async tasks, sockets, and connections will be dropped if the replicator is dropped. Use + /// the shutdown() method for a graceful shutdown. + pub async fn start(executor: Arc, store: Arc, config: Config) -> Result> { + let listener_v4 = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)).await; + let listener_v6 = TcpListener::bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)).await; + if listener_v4.is_err() && listener_v6.is_err() { + return Err(Box::new(listener_v4.unwrap_err())); + } + + let r = Arc::new(ReplicatorImpl:: { + instance_id: { + let mut tmp = [0_u8; 16]; + getrandom(&mut tmp).expect("getrandom failed"); + tmp + }, + loopback_check_code_secret: { + let mut tmp = [0_u8; 16]; + getrandom(&mut tmp).expect("getrandom failed"); + tmp + }, + domain_hash: { + let mut h = Sha256::new(); + h.update(config.domain.as_bytes()); + h.finalize_fixed().into() + }, + config, + store, + connections: Mutex::new(HashMap::new()), + }); + + let (e0, e1) = (executor.clone(), executor.clone()); + Ok(Self { + state: r, + v4_listener_task: listener_v4.map_or(None, |listener_v4| Some(executor.spawn(r.listener_task_main(listener_v4, e0)))), + v6_listener_task: listener_v6.map_or(None, |listener_v6| Some(executor.spawn(r.listener_task_main(listener_v6, e1)))), + service_task: executor.spawn(r.service_main(executor.clone())), + }) + } + + pub async fn shutdown(self) { + // Get a joined future including our service task and one or both listeners. There is always + // at least one listener. If there are no listeners this is a bug and will panic. + let main_tasks = self.v4_listener_task.map_or_else(|| { + future::zip(self.service_task.cancel(), self.v6_listener_task.unwrap().cancel()) + }, |v4| { + self.v6_listener_task.map_or_else(|| { + future::zip(self.service_task.cancel(), v4.cancel()) + }, |v6| { + future::zip(self.service_task.cancel(), future::zip(v4.cancel(), v6.cancel())) + }) + }); + + // Just dropping all connections is fine. + self.state.connections.lock().await.clear(); + + // Then gracefully wait for the main tasks to finish. + let _ = main_tasks.await; + } +} + +impl ReplicatorImpl { + async fn service_main(&self, executor: Arc) { + let mut timer = smol::Timer::interval(Duration::from_secs(1)); + let mut to_close: Vec = Vec::new(); + loop { + timer.next().await; + + let mut connections = self.connections.lock().await; + + let now_mt = ms_monotonic(); + for cc in connections.iter_mut() { + let c = &(*cc.1).0; + if c.closed.load(Ordering::Relaxed) || (now_mt - c.last_receive_time.load(Ordering::Relaxed)) > CONNECTION_TIMEOUT_SECONDS { + to_close.push(cc.0.clone()); + } + } + + for tc in to_close.iter() { + let _ = connections.remove(tc); + } + to_close.clear(); + + drop(connections); + } + } + + async fn listener_task_main(&self, listener: TcpListener, executor: Arc) { + loop { + let stream = listener.accept().await; + if stream.is_ok() { + let (mut stream, remote_address) = stream.unwrap(); + self.handle_new_connection(stream, remote_address, false, executor.clone()).await; + } + } + } + + async fn handle_new_connection(&self, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool, executor: Arc) { + stream.set_nodelay(true); + + let mut loopback_check_code_salt = [0_u8; 8]; + getrandom(&mut tmp).expect("getrandom failed"); + + let mut h = Sha256::new(); + h.update(&loopback_check_code_salt); + h.update(&self.loopback_check_code_secret); + let loopback_check_code: [u8; 32] = h.finalize_fixed().into(); + + let hello = protocol::Hello { + hello_size: size_of::() as u8, + protocol_version: protocol::PROTOCOL_VERSION, + flags: [0_u8; 4], + clock: ms_since_epoch().to_le_bytes(), + data_set_size: self.store.total_size().await.to_le_bytes(), + domain_hash: self.domain_hash.clone(), + instance_id: self.instance_id.clone(), + loopback_check_code_salt, + loopback_check_code: (&loopback_check_code[0..16]).try_into().unwrap() + }; + let hello: [u8; size_of::()] = unsafe { transmute(hello) }; + + if stream.write_all(&hello).await.is_ok() { + let mut hello_buf = [0_u8; size_of::()]; + if stream.read_exact(&mut hello_buf).await.is_ok() { + let hello: protocol::Hello = unsafe { transmute(hello_buf) }; + + // Sanity check HELLO packet. In the future we may support different versions and sizes. + if hello.hello_size == size_of::() as u8 && hello.protocol_version == protocol::PROTOCOL_VERSION { + // If this hash's first 16 bytes are equal to the one in the HELLO, this connection is + // from this node and should be dropped. + let mut h = Sha256::new(); + h.update(&hello.loopback_check_code_salt); + h.update(&self.loopback_check_code_secret); + let loopback_if_equal: [u8; 32] = h.finalize_fixed().into(); + + if !loopback_if_equal[0..16].eq(&hello.loopback_check_code) { + let k = ConnectionKey { + instance_id: hello.instance_id.clone(), + ip: remote_address.ip() + }; + let mut connections = self.connections.lock().await; + let _ = connections.entry(k).or_insert_with(move || { + stream.set_nodelay(false); + (remote_address.clone(), executor.spawn(self.connection_io_task_main(stream, remote_address, false, executor.clone()))) + }); + } + } + } + } + } + + async fn connection_sync_init_task_main(&self, writer: Arc>) { + let mut periodic_timer = Timer::interval(Duration::from_secs(1)); + loop { + let _ = periodic_timer.next().await; + } + } + + async fn connection_io_task_main(&self, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool, executor: Arc) { + let mut reader = BufReader::with_capacity(S::MAX_OBJECT_SIZE * 2, stream.clone()); + let writer = Arc::new(Mutex::new(stream)); + + let _sync_search_init_task = executor.spawn(self.connection_sync_init_task_main(writer.clone())); + + let mut tmp = [0_u8; 4096]; + 'main_io_loop: loop { + if reader.read_exact(&mut tmp[0..1]).await.is_err() { + break 'main_io_loop; + } + let message_type = tmp[0]; + + if message_type == protocol::MESSAGE_TYPE_NOP { + continue 'main_io_loop; + } + + let message_size = varint::async_read(&mut reader).await; + if message_size.is_err() { + break 'main_io_loop; + } + let mut message_size = message_size.unwrap(); + + if message_size > S::MAX_OBJECT_SIZE as u64 { + break 'main_io_loop; + } + + match message_type { + protocol::MESSAGE_TYPE_OBJECT => { + }, + protocol::MESSAGE_TYPE_GET_OBJECTS => { + }, + _ => { + // Skip the bodies of unrecognized message types. + while message_size >= tmp.len() as u64 { + if reader.read_exact(&tmp).await.is_err() { + break 'main_io_loop; + } + message_size -= tmp.len() as u64; + } + if message_size > 0 { + if reader.read_exact(&mut tmp[0..(message_size as usize)]).await.is_err() { + break 'main_io_loop; + } + } + } + } + } + } +} diff --git a/allthethings/src/store.rs b/allthethings/src/store.rs new file mode 100644 index 000000000..bf61897b4 --- /dev/null +++ b/allthethings/src/store.rs @@ -0,0 +1,68 @@ +use smol::net::SocketAddr; + +/// Result code from the put() method in Database. +pub enum PutObjectResult { + /// Datum stored successfully. + Ok, + /// Datum is one we already have. + Duplicate, + /// Value is invalid. (this may result in dropping connections to peers, etc.) + Invalid, +} + +/// Trait that must be implemented for the data store that is to be replicated. +/// +/// Each datum is identified by an identity hash, which is a cryptographic hash of HASH_SIZE +/// bytes of its value. The implementation assumes that's what it is, but the hash function +/// is not specified in this library. +pub trait Store: Sync + Send { + /// The size in bytes of the identity hash. + const HASH_SIZE: usize; + + /// The maximum size of the objects supported by this store (and thus replication domain). + const MAX_OBJECT_SIZE: usize; + + /// Object type returned by get(), must implement AsRef<[u8]>. + type GetOutput: AsRef<[u8]>; + + /// Compute a hash of a data object using the hash associated with this store. + /// This returns the identity hash which can then be used as a key with get(), put(), etc. + fn hash(&self, object: &[u8]) -> [u8; Self::HASH_SIZE]; + + /// Get the total size of this data set in objects. + async fn total_size(&self) -> u64; + + /// Get an object from the database, returning None if it is not found or there is an error. + async fn get(&self, identity_hash: &[u8; Self::HASH_SIZE]) -> Option; + + /// Store an entry in the database. + async fn put(&self, identity_hash: &[u8; Self::HASH_SIZE], object: &[u8]) -> PutObjectResult; + + /// Count the number of identity hash keys in this range (inclusive) of identity hashes. + /// This may return None if an error occurs, but should return 0 if the set is empty. + async fn count(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE]) -> Option; + + /// Called when a connection to a remote node was successful. + /// This is always called on successful outbound connect. + async fn save_remote_endpoint(&self, to_address: &SocketAddr); + + /// Get a remote endpoint to try. + /// This can return endpoints in any order and is used to try to establish outbound links. + async fn get_remote_endpoint(&self) -> Option; + + /* + /// Execute a function for every hash/value in a range. + /// Iteration stops if the supplied function returns false. + async fn for_each_entry(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE], function: F) + where + F: Fn(&[u8; Self::HASH_SIZE], &[u8]) -> FF, + FF: Future; + + /// Execute a function for every hash in a range. + /// Iteration stops if the supplied function returns false. + async fn for_each_hash_key(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE], function: F) + where + F: Fn(&[u8; Self::HASH_SIZE], &[u8]) -> FF, + FF: Future; + */ +} diff --git a/allthethings/src/varint.rs b/allthethings/src/varint.rs new file mode 100644 index 000000000..8992d7ddd --- /dev/null +++ b/allthethings/src/varint.rs @@ -0,0 +1,43 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + +use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt}; + +pub async fn async_write(w: &mut W, mut v: u64) -> std::io::Result<()> { + let mut b = [0_u8; 10]; + let mut i = 0; + loop { + if v > 0x7f { + b[i] = (v as u8) & 0x7f; + i += 1; + v = v.wrapping_shr(7); + } else { + b[i] = (v as u8) | 0x80; + i += 1; + break; + } + } + w.write_all(&b[0..i]).await +} + +pub async fn async_read(r: &mut R) -> std::io::Result { + let mut v = 0_u64; + let mut buf = [0_u8; 1]; + let mut pos = 0; + loop { + let _ = r.read_exact(&mut buf).await?; + let b = buf[0]; + if b <= 0x7f { + v |= (b as u64).wrapping_shl(pos); + pos += 7; + } else { + v |= ((b & 0x7f) as u64).wrapping_shl(pos); + return Ok(v); + } + } +} diff --git a/zerotier-network-hypervisor/Cargo.toml b/zerotier-network-hypervisor/Cargo.toml index 29f5b2c2e..6f92e7bbe 100644 --- a/zerotier-network-hypervisor/Cargo.toml +++ b/zerotier-network-hypervisor/Cargo.toml @@ -17,7 +17,9 @@ base64 = "^0" lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] } dashmap = "^4" parking_lot = "^0" +arc-swap = { version = "^1", features = [], default-features = false } lazy_static = "^1" +highway = "^0" [target."cfg(not(windows))".dependencies] libc = "^0" diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index aec1daa62..d9a78bc3a 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -29,15 +29,33 @@ pub(crate) fn array_range_mut() } } +/// Cast a u64 to a byte array. #[inline(always)] pub(crate) fn u64_as_bytes(i: &u64) -> &[u8; 8] { unsafe { &*(i as *const u64).cast() } } +lazy_static! { + static mut HIGHWAYHASHER_KEY: [u64; 4] = [zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure()]; +} + +/// Get an instance of HighwayHasher initialized with a secret per-process random salt. +/// The random salt is generated at process start and so will differ for each invocation of whatever process this is inside. +#[inline(always)] +pub(crate) fn highwayhasher() -> highway::HighwayHasher { highway::HighwayHasher::new(highway::Key(HIGHWAYHASHER_KEY.clone())) } + +/// Non-cryptographic 64-bit bit mixer for things like local hashing. +#[inline(always)] +pub(crate) fn hash64_noncrypt(mut x: u64) -> u64 { + x ^= x.wrapping_shr(30); + x = x.wrapping_mul(0xbf58476d1ce4e5b9); + x ^= x.wrapping_shr(27); + x = x.wrapping_mul(0x94d049bb133111eb); + x ^ x.wrapping_shr(31) +} + /// A hasher for maps that just returns u64 values as-is. -/// -/// This should be used only for things like ZeroTier addresses that are already random -/// and that aren't vulnerable to malicious crafting of identifiers. +/// Used with things like ZeroTier addresses and network IDs that are already randomly distributed. #[derive(Copy, Clone)] -pub struct U64NoOpHasher(u64); +pub(crate) struct U64NoOpHasher(u64); impl U64NoOpHasher { #[inline(always)] diff --git a/zerotier-network-hypervisor/src/vl1/inetaddress.rs b/zerotier-network-hypervisor/src/vl1/inetaddress.rs index 0ad50c4a7..546eabbae 100644 --- a/zerotier-network-hypervisor/src/vl1/inetaddress.rs +++ b/zerotier-network-hypervisor/src/vl1/inetaddress.rs @@ -124,7 +124,7 @@ impl InetAddress { addr.sin6.sin6_family = AF_INET6.into(); addr.sin6.sin6_port = port.to_be().into(); unsafe { - *((&mut (addr.sin6.sin6_addr) as *mut in6_addr).cast::().offset(15)) = 1; + *((&mut (addr.sin6.sin6_addr) as *mut _).cast::().offset(15)) = 1; } addr } @@ -169,7 +169,7 @@ impl InetAddress { /// Set the IP and port of this InetAddress. /// Whether this is IPv4 or IPv6 is inferred from the size of ip[], which must be /// either 4 or 16 bytes. The family (AF_INET or AF_INET6) is returned, or zero on - /// success. + /// failure. pub fn set(&mut self, ip: &[u8], port: u16) -> u8 { self.zero(); let port = port.to_be(); @@ -177,12 +177,12 @@ impl InetAddress { if ip.len() == 4 { self.sin.sin_family = AF_INET.into(); self.sin.sin_port = port.into(); - copy_nonoverlapping(ip.as_ptr(), (&mut self.sin.sin_addr.s_addr as *mut u32).cast::(), 4); + copy_nonoverlapping(ip.as_ptr(), (&mut self.sin.sin_addr.s_addr as *mut _).cast::(), 4); AF_INET } else if ip.len() == 16 { self.sin6.sin6_family = AF_INET6.into(); self.sin6.sin6_port = port.into(); - copy_nonoverlapping(ip.as_ptr(), (&mut self.sin6.sin6_addr as *mut in6_addr).cast::(), 16); + copy_nonoverlapping(ip.as_ptr(), (&mut self.sin6.sin6_addr as *mut _).cast::(), 16); AF_INET6 } else { 0 @@ -195,13 +195,27 @@ impl InetAddress { pub fn ip_bytes(&self) -> &[u8] { unsafe { match self.sa.sa_family as u8 { - AF_INET => &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>(), - AF_INET6 => &*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>(), + AF_INET => &*(&self.sin.sin_addr.s_addr as *const _).cast::<[u8; 4]>(), + AF_INET6 => &*(&self.sin6.sin6_addr as *const _).cast::<[u8; 16]>(), _ => &[], } } } + /// Get raw IP bytes packed into a u128. + /// Bytes are packed in native endian so the resulting u128 may not be the same between systems. + /// This value is intended for local lookup use only. + #[inline(always)] + pub fn ip_as_native_u128(&self) -> u128 { + unsafe { + match self.sa.sa_family as u8 { + AF_INET => self.sin.sin_addr.s_addr as u128, + AF_INET6 => u128::from_ne_bytes(*(&self.sin6.sin6_addr as *const _).cast::<[u8; 16]>()), + _ => 0, + } + } + } + /// Get the IP port for this InetAddress. #[inline(always)] pub fn port(&self) -> u16 { @@ -309,7 +323,7 @@ impl InetAddress { } } AF_INET6 => { - let ip = &*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>(); + let ip = &*(&(self.sin6.sin6_addr) as *const _).cast::<[u8; 16]>(); if (ip[0] & 0xf0) == 0xf0 { if ip[0] == 0xff { return IpScope::Multicast; // ff00::/8 @@ -352,10 +366,10 @@ impl InetAddress { unsafe { match self.sa.sa_family as u8 { AF_INET => { - let ip = &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>(); + let ip = &*(&self.sin.sin_addr.s_addr as *const _).cast::<[u8; 4]>(); format!("{}.{}.{}.{}", ip[0], ip[1], ip[2], ip[3]) } - AF_INET6 => Ipv6Addr::from(*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>()).to_string(), + AF_INET6 => Ipv6Addr::from(*(&(self.sin6.sin6_addr) as *const _).cast::<[u8; 16]>()).to_string(), _ => String::from("(null)") } } @@ -367,17 +381,17 @@ impl InetAddress { AF_INET => { let b = buf.append_bytes_fixed_get_mut::<7>()?; b[0] = 4; - copy_nonoverlapping((&self.sin.sin_addr.s_addr as *const u32).cast::(), b.as_mut_ptr().offset(1), 4); - b[5] = *(&self.sin.sin_port as *const u16).cast::(); - b[6] = *(&self.sin.sin_port as *const u16).cast::().offset(1); + copy_nonoverlapping((&self.sin.sin_addr.s_addr as *const _).cast::(), b.as_mut_ptr().offset(1), 4); + b[5] = *(&self.sin.sin_port as *const _).cast::(); + b[6] = *(&self.sin.sin_port as *const _).cast::().offset(1); Ok(()) } AF_INET6 => { let b = buf.append_bytes_fixed_get_mut::<19>()?; b[0] = 6; - copy_nonoverlapping((&(self.sin6.sin6_addr) as *const in6_addr).cast::(), b.as_mut_ptr().offset(1), 16); - b[17] = *(&self.sin6.sin6_port as *const u16).cast::(); - b[18] = *(&self.sin6.sin6_port as *const u16).cast::().offset(1); + copy_nonoverlapping((&(self.sin6.sin6_addr) as *const _).cast::(), b.as_mut_ptr().offset(1), 16); + b[17] = *(&self.sin6.sin6_port as *const _).cast::(); + b[18] = *(&self.sin6.sin6_port as *const _).cast::().offset(1); Ok(()) } _ => buf.append_u8(0) diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index b890fe1c8..4aa3b50c8 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -123,7 +123,7 @@ pub struct Node { pub(crate) instance_id: u64, identity: Identity, intervals: Mutex, - paths: DashMap>, + paths: DashMap>, peers: DashMap>, roots: Mutex>>, root_sets: Mutex>, @@ -236,7 +236,7 @@ impl Node { // Handle packets addressed to this node. let path = self.path(source_endpoint, source_local_socket, source_local_interface); - path.log_receive(time_ticks); + path.log_receive_anything(time_ticks); if fragment_header.is_fragment() { @@ -251,7 +251,7 @@ impl Node { let source = source.unwrap(); let peer = self.peer(source); if peer.is_some() { - peer.unwrap().receive(self, ci, ph, time_ticks, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]); + peer.unwrap().receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]); } else { self.whois.query(self, ci, source, Some(QueuedPacket::Fragmented(assembled_packet))); } @@ -270,7 +270,7 @@ impl Node { let source = source.unwrap(); let peer = self.peer(source); if peer.is_some() { - peer.unwrap().receive(self, ci, ph, time_ticks, &path, &packet_header, data.as_ref(), &[]); + peer.unwrap().receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, data.as_ref(), &[]); } else { self.whois.query(self, ci, source, Some(QueuedPacket::Unfragmented(data))); } @@ -315,9 +315,10 @@ impl Node { /// This is a canonicalizing function that returns a unique path object for every tuple /// of endpoint, local socket, and local interface. pub fn path(&self, ep: &Endpoint, local_socket: Option, local_interface: Option) -> Arc { - self.paths.get(ep).map_or_else(|| { + let key = Path::local_lookup_key(ep); + self.paths.get(&key).map_or_else(|| { let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface)); - self.paths.insert(ep.clone(), p.clone()).unwrap_or(p) // if another thread added one, return that instead + self.paths.insert(key, p.clone()).unwrap_or(p) // if another thread added one, return that instead }, |path| path.value().clone()) } } diff --git a/zerotier-network-hypervisor/src/vl1/path.rs b/zerotier-network-hypervisor/src/vl1/path.rs index 4e4b85f7a..fd2849902 100644 --- a/zerotier-network-hypervisor/src/vl1/path.rs +++ b/zerotier-network-hypervisor/src/vl1/path.rs @@ -7,13 +7,18 @@ */ use std::collections::HashMap; +use std::hash::Hasher; +use std::io::Write; use std::num::NonZeroI64; +use std::sync::Arc; use std::sync::atomic::{AtomicI64, Ordering}; +use arc_swap::ArcSwap; +use highway::HighwayHash; use parking_lot::Mutex; use crate::PacketBuffer; -use crate::util::U64NoOpHasher; +use crate::util::{highwayhasher, U64NoOpHasher}; use crate::vl1::Endpoint; use crate::vl1::fragmentedpacket::FragmentedPacket; use crate::vl1::node::NodeInterface; @@ -22,12 +27,18 @@ use crate::vl1::protocol::*; /// Keepalive interval for paths in milliseconds. pub(crate) const PATH_KEEPALIVE_INTERVAL: i64 = 20000; +lazy_static! { + static mut RANDOM_64BIT_SALT_0: u64 = zerotier_core_crypto::random::next_u64_secure(); + static mut RANDOM_64BIT_SALT_1: u64 = zerotier_core_crypto::random::next_u64_secure(); + static mut RANDOM_64BIT_SALT_2: u64 = zerotier_core_crypto::random::next_u64_secure(); +} + /// A remote endpoint paired with a local socket and a local interface. /// These are maintained in Node and canonicalized so that all unique paths have /// one and only one unique path object. That enables statistics to be tracked /// for them and uniform application of things like keepalives. pub struct Path { - endpoint: Endpoint, + endpoint: ArcSwap, local_socket: Option, local_interface: Option, last_send_time_ticks: AtomicI64, @@ -36,10 +47,42 @@ pub struct Path { } impl Path { + /// Get a 128-bit key to look up this endpoint in the local node path map. + #[inline(always)] + pub(crate) fn local_lookup_key(endpoint: &Endpoint, local_socket: Option, local_interface: Option) -> u128 { + let local_socket = local_socket.map_or(0, |s| crate::util::hash64_noncrypt(RANDOM_64BIT_SALT_0 + s.get() as u64)); + let local_interface = local_interface.map_or(0, |s| crate::util::hash64_noncrypt(RANDOM_64BIT_SALT_1 + s.get() as u64)); + let lsi = (local_socket as u128).wrapping_shl(64) | (local_interface as u128); + match endpoint { + Endpoint::Nil => 0, + Endpoint::ZeroTier(a) => a.to_u64() as u128, + Endpoint::Ethernet(m) => (m.to_u64() | 0x0100000000000000) as u128 ^ lsi, + Endpoint::WifiDirect(m) => (m.to_u64() | 0x0200000000000000) as u128 ^ lsi, + Endpoint::Bluetooth(m) => (m.to_u64() | 0x0400000000000000) as u128 ^ lsi, + Endpoint::Ip(ip) => ip.ip_as_native_u128().wrapping_sub(lsi), // naked IP has no port + Endpoint::IpUdp(ip) => ip.ip_as_native_u128().wrapping_add(lsi), // UDP maintains one path per IP but merely learns the most recent port + Endpoint::IpTcp(ip) => ip.ip_as_native_u128().wrapping_sub(crate::util::hash64_noncrypt((ip.port() as u64).wrapping_add(RANDOM_64BIT_SALT_2)) as u128).wrapping_sub(lsi), + Endpoint::Http(s) => { + let mut hh = highwayhasher(); + let _ = hh.write_all(s.as_bytes()); + let _ = hh.write_u64(local_socket); + let _ = hh.write_u64(local_interface); + u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() }) + } + Endpoint::WebRTC(b) => { + let mut hh = highwayhasher(); + let _ = hh.write_u64(local_socket); + let _ = hh.write_u64(local_interface); + let _ = hh.write_all(b.as_slice()); + u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() }) + } + } + } + #[inline(always)] pub fn new(endpoint: Endpoint, local_socket: Option, local_interface: Option) -> Self { Self { - endpoint, + endpoint: ArcSwap::new(Arc::new(endpoint)), local_socket, local_interface, last_send_time_ticks: AtomicI64::new(0), @@ -49,7 +92,7 @@ impl Path { } #[inline(always)] - pub fn endpoint(&self) -> &Endpoint { &self.endpoint } + pub fn endpoint(&self) -> Arc { self.endpoint.load_full() } #[inline(always)] pub fn local_socket(&self) -> Option { self.local_socket } @@ -93,19 +136,40 @@ impl Path { } #[inline(always)] - pub(crate) fn log_receive(&self, time_ticks: i64) { + pub(crate) fn log_receive_anything(&self, time_ticks: i64) { self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); } #[inline(always)] - pub(crate) fn log_send(&self, time_ticks: i64) { + pub(crate) fn log_receive_authenticated_packet(&self, _bytes: usize, source_endpoint: &Endpoint) { + let mut replace = false; + match source_endpoint { + Endpoint::IpUdp(ip) => { + let ep = self.endpoint.load(); + match ep.as_ref() { + Endpoint::IpUdp(ip_orig) => { + debug_assert!(ip_orig.ip_bytes().eq(ip.ip_bytes())); + if ip_orig.port() != ip.port() { + replace = true; + } + }, + _ => {} + } + }, + _ => {} + } + if replace { + self.endpoint.swap(Arc::new(source_endpoint.clone())); + } + } + + #[inline(always)] + pub(crate) fn log_send_anything(&self, time_ticks: i64) { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); } - /// Desired period between calls to call_every_interval(). pub(crate) const CALL_EVERY_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL; - /// Called every INTERVAL during background tasks. #[inline(always)] pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) { self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION); diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index fd4c8ca1a..ff7474447 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -243,29 +243,37 @@ impl Peer { /// Receive, decrypt, authenticate, and process an incoming packet from this peer. /// If the packet comes in multiple fragments, the fragments slice should contain all /// those fragments after the main packet header and first chunk. - pub(crate) fn receive(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_path: &Arc, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { + pub(crate) fn receive(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { let mut payload: Buffer = unsafe { Buffer::new_without_memzero() }; let mut message_id = 0_u64; let ephemeral_secret: Option> = self.ephemeral_secret.lock().clone(); - let forward_secrecy = if !ephemeral_secret.map_or(false, |ephemeral_secret| try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id)) { + let forward_secrecy = if ephemeral_secret.map_or(false, |ephemeral_secret| try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id)) { + // Decrypted and authenticated by the ephemeral secret. + true + } else { + // There is no ephemeral secret, or authentication with it failed. unsafe { payload.set_size_unchecked(0); } if !try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id) { + // Static secret also failed, reject packet. return; } false - } else { - true }; + // --------------------------------------------------------------- + // If we made it here it decrypted and passed authentication. + // --------------------------------------------------------------- + self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); self.total_bytes_received.fetch_add((payload.len() + PACKET_HEADER_SIZE) as u64, Ordering::Relaxed); + source_path.log_receive_authenticated_packet(payload.len() + PACKET_HEADER_SIZE, source_endpoint); debug_assert!(!payload.is_empty()); // should be impossible since this fails in try_aead_decrypt() let mut verb = payload.as_bytes()[0]; // If this flag is set, the end of the payload is a full HMAC-SHA384 authentication - // tag for much stronger authentication. + // tag for much stronger authentication than is offered by the packet MAC. let extended_authentication = (verb & VERB_FLAG_EXTENDED_AUTHENTICATION) != 0; if extended_authentication { if payload.len() >= (1 + SHA384_HASH_SIZE) { @@ -295,7 +303,7 @@ impl Peer { // if it didn't handle the packet, in which case it's handled at VL1. This is // because the most performance critical path is the handling of the ???_FRAME // verbs, which are in VL2. - verb &= VERB_MASK; + verb &= VERB_MASK; // mask off flags if !ph.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) { match verb { //VERB_VL1_NOP => {} @@ -461,7 +469,7 @@ impl Peer { explicit_endpoint.map_or_else(|| { self.path(node).map_or(false, |path| { - path.log_send(time_ticks); + path.log_send_anything(time_ticks); self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), &packet) }) }, |endpoint| { diff --git a/zerotier-system-service/Cargo.lock b/zerotier-system-service/Cargo.lock index 27c05a084..d17ff95fe 100644 --- a/zerotier-system-service/Cargo.lock +++ b/zerotier-system-service/Cargo.lock @@ -88,6 +88,12 @@ version = "1.0.45" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee10e43ae4a853c0a3591d4e2ada1719e553be18199d9da9d4a83f5927c2f5c7" +[[package]] +name = "arc-swap" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f" + [[package]] name = "async-channel" version = "1.6.1" @@ -928,6 +934,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "highway" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a310093553e2397bd2936564960446b23233864bbffee554ec5847572e2dfd93" + [[package]] name = "hkdf" version = "0.10.0" @@ -2217,8 +2229,10 @@ dependencies = [ name = "zerotier-network-hypervisor" version = "2.0.0" dependencies = [ + "arc-swap", "base64", "dashmap", + "highway", "lazy_static", "libc", "lz4_flex",