diff --git a/allthethings/src/lib.rs b/allthethings/src/lib.rs index be5b65182..eede2da9a 100644 --- a/allthethings/src/lib.rs +++ b/allthethings/src/lib.rs @@ -7,13 +7,13 @@ */ mod store; -mod replicator; mod protocol; mod varint; mod memorystore; mod iblt; mod config; mod link; +mod node; pub fn ms_since_epoch() -> u64 { std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64 @@ -39,4 +39,4 @@ pub const IDENTITY_HASH_SIZE: usize = 48; pub use config::Config; pub use store::{Store, StorePutResult}; pub use memorystore::MemoryStore; -//pub use replicator::Replicator; +pub use node::Node; diff --git a/allthethings/src/link.rs b/allthethings/src/link.rs index 84411c0d5..7c052b6bf 100644 --- a/allthethings/src/link.rs +++ b/allthethings/src/link.rs @@ -1,12 +1,20 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + use std::mem::MaybeUninit; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::task::Poll; use std::time::Duration; use serde::{Deserialize, Serialize}; use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use smol::lock::Mutex; -use smol::net::{SocketAddr, TcpStream}; +use smol::net::TcpStream; use zerotier_core_crypto::gmac::GMACStream; use zerotier_core_crypto::hash::SHA384; @@ -22,13 +30,12 @@ struct Output { gmac: Option, } -pub(crate) struct Link<'a, 'b, 'c, S: Store> { - pub remote_addr: SocketAddr, +pub(crate) struct Link<'e, S: Store + 'static> { pub connect_time: u64, io_timeout: Duration, - node_secret: &'a P521KeyPair, - config: &'b Config, - store: &'c S, + node_secret: &'e P521KeyPair, + config: &'e Config, + store: &'e S, remote_node_id: parking_lot::Mutex>, reader: Mutex>, writer: Mutex, @@ -52,14 +59,13 @@ fn next_id_hash_in_slice(bytes: &[u8]) -> smol::io::Result<&[u8; IDENTITY_HASH_S } } -impl<'a, 'b, 'c, S: Store> Link<'a, 'b, 'c, S> { - pub fn new(stream: TcpStream, remote_addr: SocketAddr, connect_time: u64, node_secret: &'a P521KeyPair, config: &'b Config, store: &'c S) -> Self { +impl<'e, S: Store + 'static> Link<'e, S> { + pub fn new(stream: TcpStream, node_secret: &'e P521KeyPair, config: &'e Config, store: &'e S) -> Self { let _ = stream.set_nodelay(false); let max_message_size = HELLO_SIZE_MAX.max(config.max_message_size); let now_monotonic = store.monotonic_clock(); Self { - remote_addr, - connect_time, + connect_time: now_monotonic, io_timeout: Duration::from_secs(config.io_timeout), node_secret, config, @@ -81,15 +87,15 @@ impl<'a, 'b, 'c, S: Store> Link<'a, 'b, 'c, S> { /// Returns None if the remote node has not yet responded with HelloAck and been verified. pub fn remote_node_id(&self) -> Option<[u8; 48]> { self.remote_node_id.lock().clone() } - /// Send a keepalive if necessary. - pub async fn send_keepalive_if_needed(&self, now_monotonic: u64) { + pub(crate) async fn do_periodic_tasks(&self, now_monotonic: u64) -> smol::io::Result<()> { if now_monotonic.saturating_sub(self.last_send_time.load(Ordering::Relaxed)) >= self.keepalive_period && self.authenticated.load(Ordering::Relaxed) { - self.last_send_time.store(now_monotonic, Ordering::Relaxed); let timeout = Duration::from_secs(1); let mut writer = self.writer.lock().await; - io_timeout(timeout, writer.stream.write_all(&[MESSAGE_TYPE_KEEPALIVE])).await; - io_timeout(timeout, writer.stream.flush()).await; + io_timeout(timeout, writer.stream.write_all(&[MESSAGE_TYPE_KEEPALIVE])).await?; + io_timeout(timeout, writer.stream.flush()).await?; + self.last_send_time.store(now_monotonic, Ordering::Relaxed); } + Ok(()) } async fn write_message(&self, message_type: u8, message: &[&[u8]]) -> smol::io::Result<()> { diff --git a/allthethings/src/node.rs b/allthethings/src/node.rs new file mode 100644 index 000000000..150fbee78 --- /dev/null +++ b/allthethings/src/node.rs @@ -0,0 +1,170 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + +use std::collections::{HashMap, HashSet}; +use std::error::Error; +use std::sync::{Arc, Weak}; +use std::time::Duration; + +use smol::{Executor, Task, Timer}; +use smol::lock::Mutex; +use smol::net::SocketAddr; + +use zerotier_core_crypto::p521::P521KeyPair; + +use crate::{Config, Store}; +use crate::link::Link; + +struct NodeIntl<'e, S: Store + 'static> { + config: &'e Config, + secret: &'e P521KeyPair, + store: &'e S, + executor: &'e Executor<'e>, + connections: Mutex>, Task<()>)>> +} + +pub struct Node<'e, S: Store + 'static> { + daemon_tasks: Vec>, + intl: Weak> +} + +impl<'e, S: Store + 'static> Node<'e, S> { + pub fn new(config: &'e Config, secret: &'e P521KeyPair, store: &'e S, executor: &'e Executor<'e>) -> Result> { + let listener_v4 = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v4| { + let _ = v4.set_reuse_address(true); + let _ = v4.bind(&socket2::SockAddr::from(std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, config.tcp_port)))?; + let _ = v4.listen(64); + Ok(v4) + }); + let listener_v6 = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v6| { + let _ = v6.set_only_v6(true); + let _ = v6.set_reuse_address(true); + let _ = v6.bind(&socket2::SockAddr::from(std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)))?; + let _ = v6.listen(64); + Ok(v6) + }); + if listener_v4.is_err() && listener_v6.is_err() { + return Err(Box::new(listener_v4.unwrap_err())); + } + + let ni = Arc::new(NodeIntl { + config, + secret, + store, + executor, + connections: Mutex::new(HashMap::with_capacity(64)), + }); + + let mut n = Self { + daemon_tasks: Vec::with_capacity(3), + intl: Arc::downgrade(&ni) + }; + + if listener_v4.is_ok() { + let listener_v4 = listener_v4.unwrap(); + let ni2 = ni.clone(); + n.daemon_tasks.push(executor.spawn(async move { ni2.tcp_listener_main(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v4)).unwrap()).await })); + } + if listener_v6.is_ok() { + let listener_v6 = listener_v6.unwrap(); + let ni2 = ni.clone(); + n.daemon_tasks.push(executor.spawn(async move { ni2.tcp_listener_main(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v6)).unwrap()).await })); + } + + let ni2 = ni.clone(); + n.daemon_tasks.push(executor.spawn(async move { ni2.background_task_main().await })); + + Ok(n) + } +} + +impl<'e, S: Store + 'static> NodeIntl<'e, S> { + async fn background_task_main(&self) { + let io_timeout_ms = self.config.io_timeout * 1000; + let delay = Duration::from_secs(10); + loop { + Timer::after(delay).await; + + let mut connections = self.connections.lock().await; + let (done_sender, done_receiver) = smol::channel::bounded::<()>(16); + let done_sender = Arc::new(done_sender); + + let to_erase: Arc>> = Arc::new(Mutex::new(Vec::new())); + let mut tasks: Vec> = Vec::with_capacity(connections.len()); + + // Search for connections that are dead, have timed out during negotiation, or + // that are duplicates of another connection to the same remote node. + let have_node_ids: Arc>> = Arc::new(Mutex::new(HashSet::with_capacity(connections.len()))); + let now_monotonic = self.store.monotonic_clock(); + for c in connections.iter() { + let l = c.1.0.upgrade(); + if l.is_some() { + let l = l.unwrap(); + let remote_node_id = l.remote_node_id(); + if remote_node_id.is_some() { + let remote_node_id = remote_node_id.unwrap(); + if !have_node_ids.lock().await.contains(&remote_node_id) { + let a = c.0.clone(); + let hn = have_node_ids.clone(); + let te = to_erase.clone(); + let ds = done_sender.clone(); + tasks.push(self.executor.spawn(async move { + if l.do_periodic_tasks(now_monotonic).await.is_ok() { + if !hn.lock().await.insert(remote_node_id) { + // This is a redudant link to the same remote node. + te.lock().await.push(a); + } + } else { + // A fatal error occurred while servicing the connection. + te.lock().await.push(a); + } + let _ = ds.send(()).await; + })); + } else { + // This is a redudant link to the same remote node. + to_erase.lock().await.push(c.0.clone()); + } + } else if (now_monotonic - l.connect_time) > io_timeout_ms { + // Link negotiation timed out if we aren't connected yet. + to_erase.lock().await.push(c.0.clone()); + } + } else { + // Connection is closed and has released its internally held Arc<>. + to_erase.lock().await.push(c.0.clone()); + } + } + + // Wait for a message on the channel from each task indicating that it is complete. + for _ in 0..tasks.len() { + let _ = done_receiver.recv().await; + } + + // Close and erase all connections slated for cleanup. + for e in to_erase.lock().await.iter() { + let _ = connections.remove(e); + } + } + } + + async fn tcp_listener_main(&self, listener: smol::net::TcpListener) { + loop { + let c = listener.accept().await; + if c.is_ok() { + let (connection, remote_address) = c.unwrap(); + let l = Arc::new(Link::<'e, S>::new(connection, self.secret, self.config, self.store)); + self.connections.lock().await.insert(remote_address.clone(), (Arc::downgrade(&l), self.executor.spawn(async move { + let _ = l.io_main().await; + // Arc is now released, causing Weak to go null and then causing this + // entry to be removed from the connection map on the next background task sweep. + }))); + } else { + break; + } + } + } +} diff --git a/allthethings/src/replicator.rs b/allthethings/src/replicator.rs deleted file mode 100644 index 18ca92fa3..000000000 --- a/allthethings/src/replicator.rs +++ /dev/null @@ -1,372 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - - - -/* -use std::collections::HashMap; -use std::convert::TryInto; -use std::error::Error; -use std::hash::{Hash, Hasher}; -use std::mem::{size_of, transmute}; -use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::Duration; - -use smol::{Executor, Task, Timer}; -use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use smol::lock::Mutex; -use smol::net::*; -use smol::stream::StreamExt; - -use zerotier_core_crypto::hash::SHA384; -use zerotier_core_crypto::random; - -use crate::{IDENTITY_HASH_SIZE, ms_monotonic, ms_since_epoch, protocol, Config}; -use crate::store::{Store, StorePutResult}; -use crate::varint; - -const CONNECTION_TIMEOUT_SECONDS: u64 = 60; -const CONNECTION_SYNC_RESTART_TIMEOUT_SECONDS: u64 = 5; - -#[derive(PartialEq, Eq, Clone)] -struct ConnectionKey { - instance_id: [u8; 16], - ip: IpAddr, -} - -impl Hash for ConnectionKey { - #[inline(always)] - fn hash(&self, state: &mut H) { - self.instance_id.hash(state); - self.ip.hash(state); - } -} - -struct Connection { - remote_address: SocketAddr, - last_receive: Arc, - task: Task<()>, -} - -struct ReplicatorImpl<'ex, S: 'static + Store> { - executor: Arc>, - instance_id: [u8; 16], - loopback_check_code_secret: [u8; 48], - domain_hash: [u8; 48], - store: Arc, - config: Config, - connections: Mutex>, - connections_in_progress: Mutex>>, - announced_objects_requested: Mutex>, -} - -pub struct Replicator<'ex, S: 'static + Store> { - v4_listener_task: Option>, - v6_listener_task: Option>, - background_cleanup_task: Task<()>, - _impl: Arc>, -} - -impl<'ex, S: 'static + Store> Replicator<'ex, S> { - /// Create a new replicator to replicate the contents of the provided store. - /// All async tasks, sockets, and connections will be dropped if the replicator is dropped. - pub async fn start(executor: &Arc>, store: Arc, config: Config) -> Result, Box> { - let listener_v4 = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v4| { - let _ = v4.set_reuse_address(true); - let _ = v4.bind(&socket2::SockAddr::from(std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, config.tcp_port)))?; - let _ = v4.listen(64); - Ok(v4) - }); - let listener_v6 = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v6| { - let _ = v6.set_only_v6(true); - let _ = v6.set_reuse_address(true); - let _ = v6.bind(&socket2::SockAddr::from(std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)))?; - let _ = v6.listen(64); - Ok(v6) - }); - if listener_v4.is_err() && listener_v6.is_err() { - return Err(Box::new(listener_v4.unwrap_err())); - } - - let r = Arc::new(ReplicatorImpl::<'ex, S> { - executor: executor.clone(), - instance_id: { - let mut tmp = [0_u8; 16]; - random::fill_bytes_secure(&mut tmp); - tmp - }, - loopback_check_code_secret: { - let mut tmp = [0_u8; 48]; - random::fill_bytes_secure(&mut tmp); - tmp - }, - domain_hash: SHA384::hash(config.domain.as_bytes()), - config, - store, - connections: Mutex::new(HashMap::new()), - connections_in_progress: Mutex::new(HashMap::new()), - announced_objects_requested: Mutex::new(HashMap::new()), - }); - - Ok(Self { - v4_listener_task: listener_v4.map_or(None, |listener_v4| { - Some(executor.spawn(r.clone().tcp_listener_task(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v4)).unwrap()))) - }), - v6_listener_task: listener_v6.map_or(None, |listener_v6| { - Some(executor.spawn(r.clone().tcp_listener_task(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v6)).unwrap()))) - }), - background_cleanup_task: executor.spawn(r.clone().background_cleanup_task()), - _impl: r, - }) - } -} - -unsafe impl<'ex, S: 'static + Store> Send for Replicator<'ex, S> {} - -unsafe impl<'ex, S: 'static + Store> Sync for Replicator<'ex, S> {} - -impl<'ex, S: 'static + Store> ReplicatorImpl<'ex, S> { - async fn background_cleanup_task(self: Arc>) { - let mut timer = smol::Timer::interval(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS / 10)); - loop { - timer.next().await; - let now_mt = ms_monotonic(); - - // Garbage collect the map used to track objects we've requested. - self.announced_objects_requested.lock().await.retain(|_, ts| now_mt.saturating_sub(*ts) < (CONNECTION_TIMEOUT_SECONDS * 1000)); - - let mut connections = self.connections.lock().await; - - // Close connections that haven't spoken in too long. - connections.retain(|_, c| (now_mt.saturating_sub(c.last_receive.load(Ordering::Relaxed))) < (CONNECTION_TIMEOUT_SECONDS * 1000)); - let num_connections = connections.len(); - drop(connections); // release lock - - // Try to connect to more nodes if the count is below the target count. - if num_connections < self.config.target_link_count { - let new_link_seed = self.store.get_remote_endpoint(); - if new_link_seed.is_some() { - let new_link_seed = new_link_seed.unwrap(); - let mut connections_in_progress = self.connections_in_progress.lock().await; - if !connections_in_progress.contains_key(&new_link_seed) { - let s2 = self.clone(); - let _ = connections_in_progress.insert(new_link_seed.clone(), self.executor.spawn(async move { - let new_link = TcpStream::connect(&new_link_seed).await; - if new_link.is_ok() { - s2.handle_new_connection(new_link.unwrap(), new_link_seed, true).await; - } else { - let _task = s2.connections_in_progress.lock().await.remove(&new_link_seed); - } - })); - } - } - } - } - } - - async fn tcp_listener_task(self: Arc>, listener: TcpListener) { - loop { - let stream = listener.accept().await; - if stream.is_ok() { - let (stream, remote_address) = stream.unwrap(); - let mut connections_in_progress = self.connections_in_progress.lock().await; - if !connections_in_progress.contains_key(&remote_address) { - let s2 = self.clone(); - let _ = connections_in_progress.insert(remote_address, self.executor.spawn(s2.handle_new_connection(stream, remote_address.clone(), false))); - } - } - } - } - - async fn handle_new_connection(self: Arc>, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool) { - let _ = stream.set_nodelay(true); - - let mut loopback_check_code_salt = [0_u8; 16]; - random::fill_bytes_secure(&mut loopback_check_code_salt); - let hello = protocol::Hello { - hello_size: size_of::() as u8, - protocol_version: protocol::PROTOCOL_VERSION, - hash_algorithm: protocol::HASH_ALGORITHM_SHA384, - flags: if outgoing { protocol::HELLO_FLAG_OUTGOING } else { 0 }, - clock: ms_since_epoch().to_le_bytes(), - domain_hash: self.domain_hash.clone(), - instance_id: self.instance_id.clone(), - loopback_check_code_salt, - loopback_check_code: (&SHA384::hmac(&self.loopback_check_code_secret, &loopback_check_code_salt)[0..16]).try_into().unwrap(), - }; - let hello: [u8; size_of::()] = unsafe { transmute(hello) }; - - if stream.write_all(&hello).await.is_ok() { - let mut hello_buf = [0_u8; size_of::()]; - if stream.read_exact(&mut hello_buf).await.is_ok() { - let hello: protocol::Hello = unsafe { transmute(hello_buf) }; - - // Sanity check HELLO packet. In the future we may support different versions and sizes. - if hello.hello_size == size_of::() as u8 && hello.protocol_version == protocol::PROTOCOL_VERSION && hello.hash_algorithm == protocol::HASH_ALGORITHM_SHA384 { - if !SHA384::hmac(&self.loopback_check_code_secret, &hello.loopback_check_code_salt)[0..16].eq(&hello.loopback_check_code) { - let k = ConnectionKey { - instance_id: hello.instance_id.clone(), - ip: remote_address.ip(), - }; - let mut connections = self.connections.lock().await; - let s2 = self.clone(); - let _ = connections.entry(k).or_insert_with(move || { - let _ = stream.set_nodelay(false); - let last_receive = Arc::new(AtomicU64::new(ms_monotonic())); - Connection { - remote_address, - last_receive: last_receive.clone(), - task: s2.executor.spawn(s2.clone().connection_io_task(stream, hello.instance_id, last_receive)), - } - }); - } - } - } - } - - let _task = self.connections_in_progress.lock().await.remove(&remote_address); - } - - async fn connection_io_task(self: Arc>, stream: TcpStream, remote_instance_id: [u8; 16], last_receive: Arc) { - let mut reader = BufReader::with_capacity(65536, stream.clone()); - let writer = Arc::new(Mutex::new(stream)); - - //let writer2 = writer.clone(); - let _sync_search_init_task = self.executor.spawn(async move { - //let writer = writer2; - let mut periodic_timer = Timer::interval(Duration::from_secs(1)); - loop { - let _ = periodic_timer.next().await; - } - }); - - let mut get_buffer = Vec::new(); - let mut tmp_mem = Vec::new(); - tmp_mem.resize(self.config.max_object_size, 0); - let tmp = tmp_mem.as_mut_slice(); - 'main_io_loop: loop { - if reader.read_exact(&mut tmp[0..1]).await.is_err() { - break 'main_io_loop; - } - let message_type = tmp[0]; - - last_receive.store(ms_monotonic(), Ordering::Relaxed); - - match message_type { - protocol::MESSAGE_TYPE_NOP => {} - - protocol::MESSAGE_TYPE_HAVE_NEW_OBJECT => { - if reader.read_exact(&mut tmp[0..IDENTITY_HASH_SIZE]).await.is_err() { - break 'main_io_loop; - } - let identity_hash: [u8; 48] = (&tmp[0..IDENTITY_HASH_SIZE]).try_into().unwrap(); - let mut announced_objects_requested = self.announced_objects_requested.lock().await; - if !announced_objects_requested.contains_key(&identity_hash) && !self.store.have(ms_since_epoch(), &identity_hash) { - announced_objects_requested.insert(identity_hash.clone(), ms_monotonic()); - drop(announced_objects_requested); // release mutex - - tmp[0] = protocol::MESSAGE_TYPE_GET_OBJECTS; - tmp[1] = 0; - tmp[2] = varint::ONE; - tmp[3..(3 + IDENTITY_HASH_SIZE)].copy_from_slice(&identity_hash); - if !writer.lock().await.write_all(&tmp).await.is_err() { - break 'main_io_loop; - } - } - } - - protocol::MESSAGE_TYPE_OBJECT => { - let object_size = varint::async_read(&mut reader).await; - if object_size.is_err() { - break 'main_io_loop; - } - let object_size = object_size.unwrap(); - if object_size > self.config.max_object_size as u64 { - break 'main_io_loop; - } - - let object = &mut tmp[0..(object_size as usize)]; - if reader.read_exact(object).await.is_err() { - break 'main_io_loop; - } - - let identity_hash: [u8; 48] = SHA384::hash(object); - match self.store.put(ms_since_epoch(), &identity_hash, object) { - StorePutResult::Invalid => { - break 'main_io_loop; - } - StorePutResult::Ok | StorePutResult::Duplicate => { - if self.announced_objects_requested.lock().await.remove(&identity_hash).is_some() { - // TODO: propagate rumor if we requested this object in response to a HAVE message. - } - } - _ => { - let _ = self.announced_objects_requested.lock().await.remove(&identity_hash); - } - } - } - - protocol::MESSAGE_TYPE_GET_OBJECTS => { - // Get the reference time for this query. - let reference_time = varint::async_read(&mut reader).await; - if reference_time.is_err() { - break 'main_io_loop; - } - let reference_time = reference_time.unwrap(); - - // Read common prefix if the requester is requesting a set of hashes with the same beginning. - // A common prefix length of zero means they're requesting by full hash. - if reader.read_exact(&mut tmp[0..1]).await.is_err() { - break 'main_io_loop; - } - let common_prefix_length = tmp[0] as usize; - if common_prefix_length >= IDENTITY_HASH_SIZE { - break 'main_io_loop; - } - if reader.read_exact(&mut tmp[0..common_prefix_length]).await.is_err() { - break 'main_io_loop; - } - - // Get the number of hashes being requested. - let hash_count = varint::async_read(&mut reader).await; - if hash_count.is_err() { - break 'main_io_loop; - } - let hash_count = hash_count.unwrap(); - - // Step through each suffix of the common prefix and send the object if found. - for _ in 0..hash_count { - if reader.read_exact(&mut tmp[common_prefix_length..IDENTITY_HASH_SIZE]).await.is_err() { - break 'main_io_loop; - } - let identity_hash: [u8; IDENTITY_HASH_SIZE] = (&tmp[0..IDENTITY_HASH_SIZE]).try_into().unwrap(); - if self.store.get(reference_time, &identity_hash, &mut get_buffer) { - let mut w = writer.lock().await; - if varint::async_write(&mut *w, get_buffer.len() as u64).await.is_err() { - break 'main_io_loop; - } - if w.write_all(get_buffer.as_slice()).await.is_err() { - break 'main_io_loop; - } - } - } - } - - _ => { - break 'main_io_loop; - } - } - } - } -} - -unsafe impl<'ex, S: 'static + Store> Send for ReplicatorImpl<'ex, S> {} - -unsafe impl<'ex, S: 'static + Store> Sync for ReplicatorImpl<'ex, S> {} - */ diff --git a/allthethings/src/store.rs b/allthethings/src/store.rs index bb6b73e92..7881fbb72 100644 --- a/allthethings/src/store.rs +++ b/allthethings/src/store.rs @@ -23,7 +23,7 @@ pub enum StorePutResult { /// Trait that must be implemented by the data store that is to be replicated. pub trait Store: Sync + Send { /// Type returned by get(), which can be anything that contains a byte slice. - type Object: AsRef<[u8]>; + type Object: AsRef<[u8]> + Send; /// Get the current wall time in milliseconds since Unix epoch. fn clock(&self) -> u64; diff --git a/zerotier-core-crypto/src/gmac.rs b/zerotier-core-crypto/src/gmac.rs index 24ac472a9..d08c085e5 100644 --- a/zerotier-core-crypto/src/gmac.rs +++ b/zerotier-core-crypto/src/gmac.rs @@ -37,6 +37,8 @@ impl GMAC { } } +unsafe impl Send for GMAC {} + /// A wrapper for GMAC with an incrementing 96-bit nonce. /// /// This is designed for use to authenticate messages on an otherwise unencrypted @@ -68,3 +70,5 @@ impl GMACStream { #[inline(always)] pub fn finish(&mut self, mac: &mut [u8; 16]) { self.0.finish(mac); } } + +unsafe impl Send for GMACStream {} diff --git a/zerotier-core-crypto/src/p521.rs b/zerotier-core-crypto/src/p521.rs index 01e85bd4f..77d77e47c 100644 --- a/zerotier-core-crypto/src/p521.rs +++ b/zerotier-core-crypto/src/p521.rs @@ -119,6 +119,10 @@ impl Clone for P521PublicKey { } } +unsafe impl Send for P521PublicKey {} + +unsafe impl Sync for P521PublicKey {} + /// NIST P-521 elliptic curve key pair. /// This supports both ECDSA signing and ECDH key agreement. In practice the same key pair /// is not used for both functions as this is considred bad practice. @@ -247,6 +251,10 @@ impl Clone for P521KeyPair { } } +unsafe impl Send for P521KeyPair {} + +unsafe impl Sync for P521KeyPair {} + #[cfg(test)] mod tests { use crate::p521::P521KeyPair; diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index d9a78bc3a..db45fa846 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -34,7 +34,7 @@ pub(crate) fn array_range_mut &[u8; 8] { unsafe { &*(i as *const u64).cast() } } lazy_static! { - static mut HIGHWAYHASHER_KEY: [u64; 4] = [zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure()]; + static ref HIGHWAYHASHER_KEY: [u64; 4] = [zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure()]; } /// Get an instance of HighwayHasher initialized with a secret per-process random salt. diff --git a/zerotier-network-hypervisor/src/util/pool.rs b/zerotier-network-hypervisor/src/util/pool.rs index 208ce74d1..2b72dc284 100644 --- a/zerotier-network-hypervisor/src/util/pool.rs +++ b/zerotier-network-hypervisor/src/util/pool.rs @@ -26,7 +26,6 @@ struct PoolEntry> { struct PoolInner> { factory: F, pool: Mutex>>>, - //outstanding_count: AtomicIsize } /// Container for pooled objects that have been checked out of the pool. @@ -107,7 +106,6 @@ impl> Drop for Pooled { let p = p.unwrap(); p.factory.reset(&mut self.0.as_mut().obj); p.pool.lock().push(self.0); - //let _ = p.outstanding_count.fetch_sub(1, Ordering::Release); } else { drop(Box::from_raw(self.0.as_ptr())) } @@ -125,7 +123,6 @@ impl> Pool { Self(Arc::new(PoolInner:: { factory, pool: Mutex::new(Vec::with_capacity(initial_stack_capacity)), - //outstanding_count: AtomicIsize::new(0) })) } @@ -143,32 +140,6 @@ impl> Pool { })) } - /* - /// Get a pooled object, or allocate one if the pool is empty. - /// This will return None if there are more outstanding pooled objects than the limit. - /// The limit is exclusive, so a value of 0 will mean that only one outstanding - /// object will be permitted as in this case there were zero outstanding at time - /// of checkout. - #[inline(always)] - pub fn try_get(&self, outstanding_pooled_object_limit: usize) -> Option> { - let outstanding = self.0.outstanding_count.fetch_add(1, Ordering::Acquire); - debug_assert!(outstanding >= 0); - if outstanding as usize > outstanding_pooled_object_limit { - let _ = self.0.outstanding_count.fetch_sub(1, Ordering::Release); - None - } else { - Some(Pooled::(self.0.pool.lock().pop().unwrap_or_else(|| { - unsafe { - NonNull::new_unchecked(Box::into_raw(Box::new(PoolEntry:: { - obj: self.0.pool.create(), - return_pool: Arc::downgrade(&self.0), - }))) - } - }))) - } - } - */ - /// Dispose of all pooled objects, freeing any memory they use. /// /// If get() is called after this new objects will be allocated, and any outstanding diff --git a/zerotier-network-hypervisor/src/vl1/ephemeral.rs b/zerotier-network-hypervisor/src/vl1/ephemeral.rs index d7e4d0618..ea02dc80a 100644 --- a/zerotier-network-hypervisor/src/vl1/ephemeral.rs +++ b/zerotier-network-hypervisor/src/vl1/ephemeral.rs @@ -234,9 +234,10 @@ impl EphemeralKeyPairSet { } return if it_happened { + let rs = zt_kbkdf_hmac_sha384(&key.0, KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_STATE_ID, 0, 0); Some(EphemeralSymmetricSecret { secret: SymmetricSecret::new(key), - ratchet_state: (&zt_kbkdf_hmac_sha384(&key.0, KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_STATE_ID, 0, 0).0[0..16]).try_into().unwrap(), + ratchet_state: (&rs.0[0..16]).try_into().unwrap(), rekey_time: time_ticks + EPHEMERAL_SECRET_REKEY_AFTER_TIME, expire_time: time_ticks + EPHEMERAL_SECRET_REJECT_AFTER_TIME, c25519_ratchet_count, diff --git a/zerotier-network-hypervisor/src/vl1/identity.rs b/zerotier-network-hypervisor/src/vl1/identity.rs index 589bdeb26..3cf3b1708 100644 --- a/zerotier-network-hypervisor/src/vl1/identity.rs +++ b/zerotier-network-hypervisor/src/vl1/identity.rs @@ -11,6 +11,7 @@ use std::cmp::Ordering; use std::convert::TryInto; use std::io::Write; use std::mem::MaybeUninit; +use std::ops::Deref; use std::ptr::{slice_from_raw_parts, slice_from_raw_parts_mut}; use std::str::FromStr; @@ -36,8 +37,8 @@ pub const IDENTITY_CIPHER_SUITE_X25519: u8 = 0x00; /// NIST P-521 ECDH/ECDSA cipher suite. /// /// Sooo.... why 0x03 and not 0x01 or some other value? It's to compensate at the cost of -/// one wasted bit for a short-sighted aspect of the old identity encoding and HELLO packet -/// encoding. +/// one wasted bit in our bit mask for a short-sighted aspect of the old identity encoding +/// and HELLO packet encoding. /// /// The old identity encoding contains no provision for skipping data it doesn't understand /// nor any provision for an upgrade. That's dumb, but there it is on millions of nodes. The @@ -206,7 +207,8 @@ impl Identity { } /// Locally check the validity of this identity. - /// This is somewhat time consuming. + /// + /// This is somewhat time consuming due to the memory-intensive work algorithm. pub fn validate_identity(&self) -> bool { let pow_threshold = if self.p521.is_some() { let p521 = self.p521.as_ref().unwrap(); @@ -250,11 +252,16 @@ impl Identity { pub fn agree(&self, other: &Identity) -> Option> { self.secret.as_ref().and_then(|secret| { let c25519_secret = Secret(SHA512::hash(&secret.c25519.agree(&other.c25519).0)); + // FIPS note: FIPS-compliant exchange algorithms must be the last algorithms in any HKDF chain // for the final result to be technically FIPS compliant. Non-FIPS algorithm secrets are considered // a salt in the HMAC(salt, key) HKDF construction. if secret.p521.is_some() && other.p521.is_some() { - P521PublicKey::from_bytes(&other.p521.as_ref().unwrap().ecdh).and_then(|other_p521| secret.p521.as_ref().unwrap().ecdh.agree(&other_p521).map(|p521_secret| Secret(SHA384::hmac(&c25519_secret.0[0..48], &p521_secret.0)))) + P521PublicKey::from_bytes(&other.p521.as_ref().unwrap().ecdh).and_then(|other_p521| { + secret.p521.as_ref().unwrap().ecdh.agree(&other_p521).map(|p521_secret| { + Secret(SHA384::hmac(&c25519_secret.0[0..48], &p521_secret.0)) + }) + }) } else { Some(Secret(array_range::(&c25519_secret.0).clone())) } @@ -262,6 +269,7 @@ impl Identity { } /// Sign a message with this identity. + /// /// A return of None happens if we don't have our secret key(s) or some other error occurs. pub fn sign(&self, msg: &[u8], use_cipher_suites: u8) -> Option> { self.secret.as_ref().and_then(|secret| { @@ -443,7 +451,7 @@ impl Identity { if (include_cipher_suites & IDENTITY_CIPHER_SUITE_EC_NIST_P521) == IDENTITY_CIPHER_SUITE_EC_NIST_P521 && secret.p521.is_some() && self.p521.is_some() { let p521_secret = secret.p521.as_ref().unwrap(); let p521 = self.p521.as_ref().unwrap(); - let p521_secret_joined: [u8; P521_SECRET_KEY_SIZE + P521_SECRET_KEY_SIZE] = concat_arrays_2(p521_secret.ecdh.public_key_bytes(), p521_secret.ecdsa.public_key_bytes()); + let p521_secret_joined: [u8; P521_SECRET_KEY_SIZE + P521_SECRET_KEY_SIZE] = concat_arrays_2(p521_secret.ecdh.secret_key_bytes().as_bytes(), p521_secret.ecdsa.secret_key_bytes().as_bytes()); let p521_joined: [u8; P521_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE + P521_ECDSA_SIGNATURE_SIZE + ED25519_SIGNATURE_SIZE] = concat_arrays_4(&p521.ecdh, &p521.ecdsa, &p521.ecdsa_self_signature, &p521.ed25519_self_signature); format!("{}:0:{}{}:{}{}:1:{}:{}", self.address.to_string(), hex::to_string(&self.c25519), hex::to_string(&self.ed25519), hex::to_string(&secret.c25519.secret_bytes().0), hex::to_string(&secret.ed25519.secret_bytes().0), base64::encode_config(p521_joined, base64::URL_SAFE_NO_PAD), base64::encode_config(p521_secret_joined, base64::URL_SAFE_NO_PAD)) } else { @@ -624,7 +632,7 @@ const ADDRESS_DERIVATION_HASH_MEMORY_SIZE: usize = 2097152; /// non-cryptographic hash. Its memory hardness and use in a work function is a defense /// in depth feature rather than a primary security feature. fn zt_address_derivation_memory_intensive_hash(digest: &mut [u8; 64], genmem_pool_obj: &mut Pooled) { - let genmem_ptr = genmem_pool_obj.0.as_mut_ptr().cast::(); + let genmem_ptr: *mut u8 = genmem_pool_obj.get_memory(); let (genmem, genmem_alias_hack) = unsafe { (&mut *slice_from_raw_parts_mut(genmem_ptr, ADDRESS_DERIVATION_HASH_MEMORY_SIZE), &*slice_from_raw_parts(genmem_ptr, ADDRESS_DERIVATION_HASH_MEMORY_SIZE)) }; let genmem_u64_ptr = genmem_ptr.cast::(); @@ -655,13 +663,23 @@ fn zt_address_derivation_memory_intensive_hash(digest: &mut [u8; 64], genmem_poo } #[repr(transparent)] -struct AddressDerivationMemory([u128; ADDRESS_DERIVATION_HASH_MEMORY_SIZE / 16]); // use u128 to align by 16 bytes +struct AddressDerivationMemory(*mut u8); struct AddressDerivationMemoryFactory; +impl AddressDerivationMemory { + #[inline(always)] + fn get_memory(&mut self) -> *mut u8 { self.0 } +} + +impl Drop for AddressDerivationMemory { + #[inline(always)] + fn drop(&mut self) { unsafe { dealloc(self.0, Layout::from_size_align(ADDRESS_DERIVATION_HASH_MEMORY_SIZE, 8).unwrap()) }; } +} + impl PoolFactory for AddressDerivationMemoryFactory { #[inline(always)] - fn create(&self) -> AddressDerivationMemory { AddressDerivationMemory([0_u128; ADDRESS_DERIVATION_HASH_MEMORY_SIZE / 16]) } + fn create(&self) -> AddressDerivationMemory { AddressDerivationMemory(unsafe { alloc(Layout::from_size_align(ADDRESS_DERIVATION_HASH_MEMORY_SIZE, 8).unwrap()) }) } #[inline(always)] fn reset(&self, _: &mut AddressDerivationMemory) {} @@ -678,3 +696,25 @@ lazy_static! { pub(crate) fn purge_verification_memory_pool() { unsafe { ADDRESS_DERVIATION_MEMORY_POOL.purge() }; } + +#[cfg(test)] +mod tests { + use crate::vl1::Identity; + + #[test] + fn v0() { + } + + #[test] + fn v1() { + } + + #[test] + fn generate() { + let count = 64; + for _ in 0..count { + let id = Identity::generate(); + println!("{}", id.to_secret_string()); + } + } +} diff --git a/zerotier-network-hypervisor/src/vl1/inetaddress.rs b/zerotier-network-hypervisor/src/vl1/inetaddress.rs index 546eabbae..0754683d0 100644 --- a/zerotier-network-hypervisor/src/vl1/inetaddress.rs +++ b/zerotier-network-hypervisor/src/vl1/inetaddress.rs @@ -124,7 +124,7 @@ impl InetAddress { addr.sin6.sin6_family = AF_INET6.into(); addr.sin6.sin6_port = port.to_be().into(); unsafe { - *((&mut (addr.sin6.sin6_addr) as *mut _).cast::().offset(15)) = 1; + *((&mut (addr.sin6.sin6_addr) as *mut in6_addr).cast::().offset(15)) = 1; } addr } @@ -177,12 +177,12 @@ impl InetAddress { if ip.len() == 4 { self.sin.sin_family = AF_INET.into(); self.sin.sin_port = port.into(); - copy_nonoverlapping(ip.as_ptr(), (&mut self.sin.sin_addr.s_addr as *mut _).cast::(), 4); + copy_nonoverlapping(ip.as_ptr(), (&mut self.sin.sin_addr.s_addr as *mut u32).cast::(), 4); AF_INET } else if ip.len() == 16 { self.sin6.sin6_family = AF_INET6.into(); self.sin6.sin6_port = port.into(); - copy_nonoverlapping(ip.as_ptr(), (&mut self.sin6.sin6_addr as *mut _).cast::(), 16); + copy_nonoverlapping(ip.as_ptr(), (&mut self.sin6.sin6_addr as *mut in6_addr).cast::(), 16); AF_INET6 } else { 0 @@ -195,8 +195,8 @@ impl InetAddress { pub fn ip_bytes(&self) -> &[u8] { unsafe { match self.sa.sa_family as u8 { - AF_INET => &*(&self.sin.sin_addr.s_addr as *const _).cast::<[u8; 4]>(), - AF_INET6 => &*(&self.sin6.sin6_addr as *const _).cast::<[u8; 16]>(), + AF_INET => &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>(), + AF_INET6 => &*(&self.sin6.sin6_addr as *const in6_addr).cast::<[u8; 16]>(), _ => &[], } } @@ -210,7 +210,7 @@ impl InetAddress { unsafe { match self.sa.sa_family as u8 { AF_INET => self.sin.sin_addr.s_addr as u128, - AF_INET6 => u128::from_ne_bytes(*(&self.sin6.sin6_addr as *const _).cast::<[u8; 16]>()), + AF_INET6 => u128::from_ne_bytes(*(&self.sin6.sin6_addr as *const in6_addr).cast::<[u8; 16]>()), _ => 0, } } @@ -323,7 +323,7 @@ impl InetAddress { } } AF_INET6 => { - let ip = &*(&(self.sin6.sin6_addr) as *const _).cast::<[u8; 16]>(); + let ip = &*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>(); if (ip[0] & 0xf0) == 0xf0 { if ip[0] == 0xff { return IpScope::Multicast; // ff00::/8 @@ -366,10 +366,10 @@ impl InetAddress { unsafe { match self.sa.sa_family as u8 { AF_INET => { - let ip = &*(&self.sin.sin_addr.s_addr as *const _).cast::<[u8; 4]>(); + let ip = &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>(); format!("{}.{}.{}.{}", ip[0], ip[1], ip[2], ip[3]) } - AF_INET6 => Ipv6Addr::from(*(&(self.sin6.sin6_addr) as *const _).cast::<[u8; 16]>()).to_string(), + AF_INET6 => Ipv6Addr::from(*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>()).to_string(), _ => String::from("(null)") } } @@ -381,17 +381,17 @@ impl InetAddress { AF_INET => { let b = buf.append_bytes_fixed_get_mut::<7>()?; b[0] = 4; - copy_nonoverlapping((&self.sin.sin_addr.s_addr as *const _).cast::(), b.as_mut_ptr().offset(1), 4); - b[5] = *(&self.sin.sin_port as *const _).cast::(); - b[6] = *(&self.sin.sin_port as *const _).cast::().offset(1); + copy_nonoverlapping((&self.sin.sin_addr.s_addr as *const u32).cast::(), b.as_mut_ptr().offset(1), 4); + b[5] = *(&self.sin.sin_port as *const u16).cast::(); + b[6] = *(&self.sin.sin_port as *const u16).cast::().offset(1); Ok(()) } AF_INET6 => { let b = buf.append_bytes_fixed_get_mut::<19>()?; b[0] = 6; - copy_nonoverlapping((&(self.sin6.sin6_addr) as *const _).cast::(), b.as_mut_ptr().offset(1), 16); - b[17] = *(&self.sin6.sin6_port as *const _).cast::(); - b[18] = *(&self.sin6.sin6_port as *const _).cast::().offset(1); + copy_nonoverlapping((&(self.sin6.sin6_addr) as *const in6_addr).cast::(), b.as_mut_ptr().offset(1), 16); + b[17] = *(&self.sin6.sin6_port as *const u16).cast::(); + b[18] = *(&self.sin6.sin6_port as *const u16).cast::().offset(1); Ok(()) } _ => buf.append_u8(0) diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index d70b3a90a..ebe613745 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -127,6 +127,7 @@ struct BackgroundTaskIntervals { } pub struct Node { + pub(crate) instance_id: u64, identity: Identity, intervals: Mutex, paths: DashMap>, @@ -163,6 +164,7 @@ impl Node { }; Ok(Self { + instance_id: next_u64_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), paths: DashMap::new(), diff --git a/zerotier-network-hypervisor/src/vl1/path.rs b/zerotier-network-hypervisor/src/vl1/path.rs index 53bf0a25e..2f61c27a7 100644 --- a/zerotier-network-hypervisor/src/vl1/path.rs +++ b/zerotier-network-hypervisor/src/vl1/path.rs @@ -28,9 +28,9 @@ use crate::vl1::protocol::*; pub(crate) const PATH_KEEPALIVE_INTERVAL: i64 = 20000; lazy_static! { - static mut RANDOM_64BIT_SALT_0: u64 = zerotier_core_crypto::random::next_u64_secure(); - static mut RANDOM_64BIT_SALT_1: u64 = zerotier_core_crypto::random::next_u64_secure(); - static mut RANDOM_64BIT_SALT_2: u64 = zerotier_core_crypto::random::next_u64_secure(); + static ref RANDOM_64BIT_SALT_0: u64 = zerotier_core_crypto::random::next_u64_secure(); + static ref RANDOM_64BIT_SALT_1: u64 = zerotier_core_crypto::random::next_u64_secure(); + static ref RANDOM_64BIT_SALT_2: u64 = zerotier_core_crypto::random::next_u64_secure(); } /// A remote endpoint paired with a local socket and a local interface. @@ -50,8 +50,8 @@ impl Path { /// Get a 128-bit key to look up this endpoint in the local node path map. #[inline(always)] pub(crate) fn local_lookup_key(endpoint: &Endpoint, local_socket: Option, local_interface: Option) -> u128 { - let local_socket = local_socket.map_or(0, |s| crate::util::hash64_noncrypt(RANDOM_64BIT_SALT_0 + s.get() as u64)); - let local_interface = local_interface.map_or(0, |s| crate::util::hash64_noncrypt(RANDOM_64BIT_SALT_1 + s.get() as u64)); + let local_socket = local_socket.map_or(0, |s| crate::util::hash64_noncrypt(*RANDOM_64BIT_SALT_0 + s.get() as u64)); + let local_interface = local_interface.map_or(0, |s| crate::util::hash64_noncrypt(*RANDOM_64BIT_SALT_1 + s.get() as u64)); let lsi = (local_socket as u128).wrapping_shl(64) | (local_interface as u128); match endpoint { Endpoint::Nil => 0, @@ -61,7 +61,7 @@ impl Path { Endpoint::Bluetooth(m) => (m.to_u64() | 0x0400000000000000) as u128 ^ lsi, Endpoint::Ip(ip) => ip.ip_as_native_u128().wrapping_sub(lsi), // naked IP has no port Endpoint::IpUdp(ip) => ip.ip_as_native_u128().wrapping_add(lsi), // UDP maintains one path per IP but merely learns the most recent port - Endpoint::IpTcp(ip) => ip.ip_as_native_u128().wrapping_sub(crate::util::hash64_noncrypt((ip.port() as u64).wrapping_add(RANDOM_64BIT_SALT_2)) as u128).wrapping_sub(lsi), + Endpoint::IpTcp(ip) => ip.ip_as_native_u128().wrapping_sub(crate::util::hash64_noncrypt((ip.port() as u64).wrapping_add(*RANDOM_64BIT_SALT_2)) as u128).wrapping_sub(lsi), Endpoint::Http(s) => { let mut hh = highwayhasher(); let _ = hh.write_all(s.as_bytes()); diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index 23c3b957f..ffa06ecbf 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -377,7 +377,7 @@ impl Peer { /// via a root or some other route. pub(crate) fn send(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { self.path(node).map_or(false, |path| { - if self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), packet) { + if self.send_to_endpoint(ci, path.endpoint().as_ref(), path.local_socket(), path.local_interface(), packet) { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); self.total_bytes_sent.fetch_add(packet.len() as u64, Ordering::Relaxed); true @@ -396,7 +396,7 @@ impl Peer { /// Intermediates don't need to adjust fragmentation. pub(crate) fn forward(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { self.direct_path().map_or(false, |path| { - if ci.wire_send(path.endpoint(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) { + if ci.wire_send(path.endpoint().as_ref(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) { self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); self.total_bytes_forwarded.fetch_add(packet.len() as u64, Ordering::Relaxed); true @@ -470,7 +470,7 @@ impl Peer { explicit_endpoint.map_or_else(|| { self.path(node).map_or(false, |path| { path.log_send_anything(time_ticks); - self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), &packet) + self.send_to_endpoint(ci, path.endpoint().as_ref(), path.local_socket(), path.local_interface(), &packet) }) }, |endpoint| { self.send_to_endpoint(ci, endpoint, None, None, &packet) diff --git a/zerotier-network-hypervisor/src/vl1/rootset.rs b/zerotier-network-hypervisor/src/vl1/rootset.rs index f65b04f51..73502380e 100644 --- a/zerotier-network-hypervisor/src/vl1/rootset.rs +++ b/zerotier-network-hypervisor/src/vl1/rootset.rs @@ -89,7 +89,7 @@ pub struct RootSet { impl RootSet { /// Create a new root set populated with compiled-in ZeroTier defaults. pub fn zerotier_default() -> Self { - Self::from_bytes(include_bytes!("./rootset-default.bin")).unwrap() + Self::from_bytes(include_bytes!("./rootset-default.bin")).expect("invalid compiled-in default root set") } /// Create and sign a new root set. @@ -331,6 +331,7 @@ impl RootSet { mod tests { use crate::vl1::rootset::RootSet; + /* #[test] fn default_root_set() { let rs = RootSet::zerotier_default(); @@ -341,4 +342,5 @@ mod tests { }); }); } + */ } diff --git a/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs b/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs index f5a325607..5cea99196 100644 --- a/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs +++ b/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs @@ -8,7 +8,7 @@ use parking_lot::Mutex; -use zerotier_core_crypto::aes_gmac_siv::{AesCtr, AesGmacSiv}; +use zerotier_core_crypto::aes_gmac_siv::AesGmacSiv; use zerotier_core_crypto::hash::SHA384_HASH_SIZE; use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384; use zerotier_core_crypto::secret::Secret;