Replicator P2P code, and build fixes.

This commit is contained in:
Adam Ierymenko 2022-01-10 16:17:40 -05:00
parent b2796a5f9c
commit 95951ed14e
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
17 changed files with 288 additions and 456 deletions

View file

@ -7,13 +7,13 @@
*/
mod store;
mod replicator;
mod protocol;
mod varint;
mod memorystore;
mod iblt;
mod config;
mod link;
mod node;
pub fn ms_since_epoch() -> u64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64
@ -39,4 +39,4 @@ pub const IDENTITY_HASH_SIZE: usize = 48;
pub use config::Config;
pub use store::{Store, StorePutResult};
pub use memorystore::MemoryStore;
//pub use replicator::Replicator;
pub use node::Node;

View file

@ -1,12 +1,20 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::task::Poll;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use smol::lock::Mutex;
use smol::net::{SocketAddr, TcpStream};
use smol::net::TcpStream;
use zerotier_core_crypto::gmac::GMACStream;
use zerotier_core_crypto::hash::SHA384;
@ -22,13 +30,12 @@ struct Output {
gmac: Option<GMACStream>,
}
pub(crate) struct Link<'a, 'b, 'c, S: Store> {
pub remote_addr: SocketAddr,
pub(crate) struct Link<'e, S: Store + 'static> {
pub connect_time: u64,
io_timeout: Duration,
node_secret: &'a P521KeyPair,
config: &'b Config,
store: &'c S,
node_secret: &'e P521KeyPair,
config: &'e Config,
store: &'e S,
remote_node_id: parking_lot::Mutex<Option<[u8; 48]>>,
reader: Mutex<BufReader<TcpStream>>,
writer: Mutex<Output>,
@ -52,14 +59,13 @@ fn next_id_hash_in_slice(bytes: &[u8]) -> smol::io::Result<&[u8; IDENTITY_HASH_S
}
}
impl<'a, 'b, 'c, S: Store> Link<'a, 'b, 'c, S> {
pub fn new(stream: TcpStream, remote_addr: SocketAddr, connect_time: u64, node_secret: &'a P521KeyPair, config: &'b Config, store: &'c S) -> Self {
impl<'e, S: Store + 'static> Link<'e, S> {
pub fn new(stream: TcpStream, node_secret: &'e P521KeyPair, config: &'e Config, store: &'e S) -> Self {
let _ = stream.set_nodelay(false);
let max_message_size = HELLO_SIZE_MAX.max(config.max_message_size);
let now_monotonic = store.monotonic_clock();
Self {
remote_addr,
connect_time,
connect_time: now_monotonic,
io_timeout: Duration::from_secs(config.io_timeout),
node_secret,
config,
@ -81,15 +87,15 @@ impl<'a, 'b, 'c, S: Store> Link<'a, 'b, 'c, S> {
/// Returns None if the remote node has not yet responded with HelloAck and been verified.
pub fn remote_node_id(&self) -> Option<[u8; 48]> { self.remote_node_id.lock().clone() }
/// Send a keepalive if necessary.
pub async fn send_keepalive_if_needed(&self, now_monotonic: u64) {
pub(crate) async fn do_periodic_tasks(&self, now_monotonic: u64) -> smol::io::Result<()> {
if now_monotonic.saturating_sub(self.last_send_time.load(Ordering::Relaxed)) >= self.keepalive_period && self.authenticated.load(Ordering::Relaxed) {
self.last_send_time.store(now_monotonic, Ordering::Relaxed);
let timeout = Duration::from_secs(1);
let mut writer = self.writer.lock().await;
io_timeout(timeout, writer.stream.write_all(&[MESSAGE_TYPE_KEEPALIVE])).await;
io_timeout(timeout, writer.stream.flush()).await;
io_timeout(timeout, writer.stream.write_all(&[MESSAGE_TYPE_KEEPALIVE])).await?;
io_timeout(timeout, writer.stream.flush()).await?;
self.last_send_time.store(now_monotonic, Ordering::Relaxed);
}
Ok(())
}
async fn write_message(&self, message_type: u8, message: &[&[u8]]) -> smol::io::Result<()> {

170
allthethings/src/node.rs Normal file
View file

@ -0,0 +1,170 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::{Arc, Weak};
use std::time::Duration;
use smol::{Executor, Task, Timer};
use smol::lock::Mutex;
use smol::net::SocketAddr;
use zerotier_core_crypto::p521::P521KeyPair;
use crate::{Config, Store};
use crate::link::Link;
struct NodeIntl<'e, S: Store + 'static> {
config: &'e Config,
secret: &'e P521KeyPair,
store: &'e S,
executor: &'e Executor<'e>,
connections: Mutex<HashMap<SocketAddr, (Weak<Link<'e, S>>, Task<()>)>>
}
pub struct Node<'e, S: Store + 'static> {
daemon_tasks: Vec<Task<()>>,
intl: Weak<NodeIntl<'e, S>>
}
impl<'e, S: Store + 'static> Node<'e, S> {
pub fn new(config: &'e Config, secret: &'e P521KeyPair, store: &'e S, executor: &'e Executor<'e>) -> Result<Self, Box<dyn Error>> {
let listener_v4 = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v4| {
let _ = v4.set_reuse_address(true);
let _ = v4.bind(&socket2::SockAddr::from(std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, config.tcp_port)))?;
let _ = v4.listen(64);
Ok(v4)
});
let listener_v6 = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v6| {
let _ = v6.set_only_v6(true);
let _ = v6.set_reuse_address(true);
let _ = v6.bind(&socket2::SockAddr::from(std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)))?;
let _ = v6.listen(64);
Ok(v6)
});
if listener_v4.is_err() && listener_v6.is_err() {
return Err(Box::new(listener_v4.unwrap_err()));
}
let ni = Arc::new(NodeIntl {
config,
secret,
store,
executor,
connections: Mutex::new(HashMap::with_capacity(64)),
});
let mut n = Self {
daemon_tasks: Vec::with_capacity(3),
intl: Arc::downgrade(&ni)
};
if listener_v4.is_ok() {
let listener_v4 = listener_v4.unwrap();
let ni2 = ni.clone();
n.daemon_tasks.push(executor.spawn(async move { ni2.tcp_listener_main(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v4)).unwrap()).await }));
}
if listener_v6.is_ok() {
let listener_v6 = listener_v6.unwrap();
let ni2 = ni.clone();
n.daemon_tasks.push(executor.spawn(async move { ni2.tcp_listener_main(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v6)).unwrap()).await }));
}
let ni2 = ni.clone();
n.daemon_tasks.push(executor.spawn(async move { ni2.background_task_main().await }));
Ok(n)
}
}
impl<'e, S: Store + 'static> NodeIntl<'e, S> {
async fn background_task_main(&self) {
let io_timeout_ms = self.config.io_timeout * 1000;
let delay = Duration::from_secs(10);
loop {
Timer::after(delay).await;
let mut connections = self.connections.lock().await;
let (done_sender, done_receiver) = smol::channel::bounded::<()>(16);
let done_sender = Arc::new(done_sender);
let to_erase: Arc<Mutex<Vec<SocketAddr>>> = Arc::new(Mutex::new(Vec::new()));
let mut tasks: Vec<Task<()>> = Vec::with_capacity(connections.len());
// Search for connections that are dead, have timed out during negotiation, or
// that are duplicates of another connection to the same remote node.
let have_node_ids: Arc<Mutex<HashSet<[u8; 48]>>> = Arc::new(Mutex::new(HashSet::with_capacity(connections.len())));
let now_monotonic = self.store.monotonic_clock();
for c in connections.iter() {
let l = c.1.0.upgrade();
if l.is_some() {
let l = l.unwrap();
let remote_node_id = l.remote_node_id();
if remote_node_id.is_some() {
let remote_node_id = remote_node_id.unwrap();
if !have_node_ids.lock().await.contains(&remote_node_id) {
let a = c.0.clone();
let hn = have_node_ids.clone();
let te = to_erase.clone();
let ds = done_sender.clone();
tasks.push(self.executor.spawn(async move {
if l.do_periodic_tasks(now_monotonic).await.is_ok() {
if !hn.lock().await.insert(remote_node_id) {
// This is a redudant link to the same remote node.
te.lock().await.push(a);
}
} else {
// A fatal error occurred while servicing the connection.
te.lock().await.push(a);
}
let _ = ds.send(()).await;
}));
} else {
// This is a redudant link to the same remote node.
to_erase.lock().await.push(c.0.clone());
}
} else if (now_monotonic - l.connect_time) > io_timeout_ms {
// Link negotiation timed out if we aren't connected yet.
to_erase.lock().await.push(c.0.clone());
}
} else {
// Connection is closed and has released its internally held Arc<>.
to_erase.lock().await.push(c.0.clone());
}
}
// Wait for a message on the channel from each task indicating that it is complete.
for _ in 0..tasks.len() {
let _ = done_receiver.recv().await;
}
// Close and erase all connections slated for cleanup.
for e in to_erase.lock().await.iter() {
let _ = connections.remove(e);
}
}
}
async fn tcp_listener_main(&self, listener: smol::net::TcpListener) {
loop {
let c = listener.accept().await;
if c.is_ok() {
let (connection, remote_address) = c.unwrap();
let l = Arc::new(Link::<'e, S>::new(connection, self.secret, self.config, self.store));
self.connections.lock().await.insert(remote_address.clone(), (Arc::downgrade(&l), self.executor.spawn(async move {
let _ = l.io_main().await;
// Arc<Link> is now released, causing Weak<Link> to go null and then causing this
// entry to be removed from the connection map on the next background task sweep.
})));
} else {
break;
}
}
}
}

View file

@ -1,372 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
/*
use std::collections::HashMap;
use std::convert::TryInto;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::mem::{size_of, transmute};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use smol::{Executor, Task, Timer};
use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use smol::lock::Mutex;
use smol::net::*;
use smol::stream::StreamExt;
use zerotier_core_crypto::hash::SHA384;
use zerotier_core_crypto::random;
use crate::{IDENTITY_HASH_SIZE, ms_monotonic, ms_since_epoch, protocol, Config};
use crate::store::{Store, StorePutResult};
use crate::varint;
const CONNECTION_TIMEOUT_SECONDS: u64 = 60;
const CONNECTION_SYNC_RESTART_TIMEOUT_SECONDS: u64 = 5;
#[derive(PartialEq, Eq, Clone)]
struct ConnectionKey {
instance_id: [u8; 16],
ip: IpAddr,
}
impl Hash for ConnectionKey {
#[inline(always)]
fn hash<H: Hasher>(&self, state: &mut H) {
self.instance_id.hash(state);
self.ip.hash(state);
}
}
struct Connection {
remote_address: SocketAddr,
last_receive: Arc<AtomicU64>,
task: Task<()>,
}
struct ReplicatorImpl<'ex, S: 'static + Store> {
executor: Arc<Executor<'ex>>,
instance_id: [u8; 16],
loopback_check_code_secret: [u8; 48],
domain_hash: [u8; 48],
store: Arc<S>,
config: Config,
connections: Mutex<HashMap<ConnectionKey, Connection>>,
connections_in_progress: Mutex<HashMap<SocketAddr, Task<()>>>,
announced_objects_requested: Mutex<HashMap<[u8; IDENTITY_HASH_SIZE], u64>>,
}
pub struct Replicator<'ex, S: 'static + Store> {
v4_listener_task: Option<Task<()>>,
v6_listener_task: Option<Task<()>>,
background_cleanup_task: Task<()>,
_impl: Arc<ReplicatorImpl<'ex, S>>,
}
impl<'ex, S: 'static + Store> Replicator<'ex, S> {
/// Create a new replicator to replicate the contents of the provided store.
/// All async tasks, sockets, and connections will be dropped if the replicator is dropped.
pub async fn start(executor: &Arc<Executor<'ex>>, store: Arc<S>, config: Config) -> Result<Replicator<'ex, S>, Box<dyn Error>> {
let listener_v4 = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v4| {
let _ = v4.set_reuse_address(true);
let _ = v4.bind(&socket2::SockAddr::from(std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, config.tcp_port)))?;
let _ = v4.listen(64);
Ok(v4)
});
let listener_v6 = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v6| {
let _ = v6.set_only_v6(true);
let _ = v6.set_reuse_address(true);
let _ = v6.bind(&socket2::SockAddr::from(std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)))?;
let _ = v6.listen(64);
Ok(v6)
});
if listener_v4.is_err() && listener_v6.is_err() {
return Err(Box::new(listener_v4.unwrap_err()));
}
let r = Arc::new(ReplicatorImpl::<'ex, S> {
executor: executor.clone(),
instance_id: {
let mut tmp = [0_u8; 16];
random::fill_bytes_secure(&mut tmp);
tmp
},
loopback_check_code_secret: {
let mut tmp = [0_u8; 48];
random::fill_bytes_secure(&mut tmp);
tmp
},
domain_hash: SHA384::hash(config.domain.as_bytes()),
config,
store,
connections: Mutex::new(HashMap::new()),
connections_in_progress: Mutex::new(HashMap::new()),
announced_objects_requested: Mutex::new(HashMap::new()),
});
Ok(Self {
v4_listener_task: listener_v4.map_or(None, |listener_v4| {
Some(executor.spawn(r.clone().tcp_listener_task(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v4)).unwrap())))
}),
v6_listener_task: listener_v6.map_or(None, |listener_v6| {
Some(executor.spawn(r.clone().tcp_listener_task(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v6)).unwrap())))
}),
background_cleanup_task: executor.spawn(r.clone().background_cleanup_task()),
_impl: r,
})
}
}
unsafe impl<'ex, S: 'static + Store> Send for Replicator<'ex, S> {}
unsafe impl<'ex, S: 'static + Store> Sync for Replicator<'ex, S> {}
impl<'ex, S: 'static + Store> ReplicatorImpl<'ex, S> {
async fn background_cleanup_task(self: Arc<ReplicatorImpl<'ex, S>>) {
let mut timer = smol::Timer::interval(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS / 10));
loop {
timer.next().await;
let now_mt = ms_monotonic();
// Garbage collect the map used to track objects we've requested.
self.announced_objects_requested.lock().await.retain(|_, ts| now_mt.saturating_sub(*ts) < (CONNECTION_TIMEOUT_SECONDS * 1000));
let mut connections = self.connections.lock().await;
// Close connections that haven't spoken in too long.
connections.retain(|_, c| (now_mt.saturating_sub(c.last_receive.load(Ordering::Relaxed))) < (CONNECTION_TIMEOUT_SECONDS * 1000));
let num_connections = connections.len();
drop(connections); // release lock
// Try to connect to more nodes if the count is below the target count.
if num_connections < self.config.target_link_count {
let new_link_seed = self.store.get_remote_endpoint();
if new_link_seed.is_some() {
let new_link_seed = new_link_seed.unwrap();
let mut connections_in_progress = self.connections_in_progress.lock().await;
if !connections_in_progress.contains_key(&new_link_seed) {
let s2 = self.clone();
let _ = connections_in_progress.insert(new_link_seed.clone(), self.executor.spawn(async move {
let new_link = TcpStream::connect(&new_link_seed).await;
if new_link.is_ok() {
s2.handle_new_connection(new_link.unwrap(), new_link_seed, true).await;
} else {
let _task = s2.connections_in_progress.lock().await.remove(&new_link_seed);
}
}));
}
}
}
}
}
async fn tcp_listener_task(self: Arc<ReplicatorImpl<'ex, S>>, listener: TcpListener) {
loop {
let stream = listener.accept().await;
if stream.is_ok() {
let (stream, remote_address) = stream.unwrap();
let mut connections_in_progress = self.connections_in_progress.lock().await;
if !connections_in_progress.contains_key(&remote_address) {
let s2 = self.clone();
let _ = connections_in_progress.insert(remote_address, self.executor.spawn(s2.handle_new_connection(stream, remote_address.clone(), false)));
}
}
}
}
async fn handle_new_connection(self: Arc<ReplicatorImpl<'ex, S>>, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool) {
let _ = stream.set_nodelay(true);
let mut loopback_check_code_salt = [0_u8; 16];
random::fill_bytes_secure(&mut loopback_check_code_salt);
let hello = protocol::Hello {
hello_size: size_of::<protocol::Hello>() as u8,
protocol_version: protocol::PROTOCOL_VERSION,
hash_algorithm: protocol::HASH_ALGORITHM_SHA384,
flags: if outgoing { protocol::HELLO_FLAG_OUTGOING } else { 0 },
clock: ms_since_epoch().to_le_bytes(),
domain_hash: self.domain_hash.clone(),
instance_id: self.instance_id.clone(),
loopback_check_code_salt,
loopback_check_code: (&SHA384::hmac(&self.loopback_check_code_secret, &loopback_check_code_salt)[0..16]).try_into().unwrap(),
};
let hello: [u8; size_of::<protocol::Hello>()] = unsafe { transmute(hello) };
if stream.write_all(&hello).await.is_ok() {
let mut hello_buf = [0_u8; size_of::<protocol::Hello>()];
if stream.read_exact(&mut hello_buf).await.is_ok() {
let hello: protocol::Hello = unsafe { transmute(hello_buf) };
// Sanity check HELLO packet. In the future we may support different versions and sizes.
if hello.hello_size == size_of::<protocol::Hello>() as u8 && hello.protocol_version == protocol::PROTOCOL_VERSION && hello.hash_algorithm == protocol::HASH_ALGORITHM_SHA384 {
if !SHA384::hmac(&self.loopback_check_code_secret, &hello.loopback_check_code_salt)[0..16].eq(&hello.loopback_check_code) {
let k = ConnectionKey {
instance_id: hello.instance_id.clone(),
ip: remote_address.ip(),
};
let mut connections = self.connections.lock().await;
let s2 = self.clone();
let _ = connections.entry(k).or_insert_with(move || {
let _ = stream.set_nodelay(false);
let last_receive = Arc::new(AtomicU64::new(ms_monotonic()));
Connection {
remote_address,
last_receive: last_receive.clone(),
task: s2.executor.spawn(s2.clone().connection_io_task(stream, hello.instance_id, last_receive)),
}
});
}
}
}
}
let _task = self.connections_in_progress.lock().await.remove(&remote_address);
}
async fn connection_io_task(self: Arc<ReplicatorImpl<'ex, S>>, stream: TcpStream, remote_instance_id: [u8; 16], last_receive: Arc<AtomicU64>) {
let mut reader = BufReader::with_capacity(65536, stream.clone());
let writer = Arc::new(Mutex::new(stream));
//let writer2 = writer.clone();
let _sync_search_init_task = self.executor.spawn(async move {
//let writer = writer2;
let mut periodic_timer = Timer::interval(Duration::from_secs(1));
loop {
let _ = periodic_timer.next().await;
}
});
let mut get_buffer = Vec::new();
let mut tmp_mem = Vec::new();
tmp_mem.resize(self.config.max_object_size, 0);
let tmp = tmp_mem.as_mut_slice();
'main_io_loop: loop {
if reader.read_exact(&mut tmp[0..1]).await.is_err() {
break 'main_io_loop;
}
let message_type = tmp[0];
last_receive.store(ms_monotonic(), Ordering::Relaxed);
match message_type {
protocol::MESSAGE_TYPE_NOP => {}
protocol::MESSAGE_TYPE_HAVE_NEW_OBJECT => {
if reader.read_exact(&mut tmp[0..IDENTITY_HASH_SIZE]).await.is_err() {
break 'main_io_loop;
}
let identity_hash: [u8; 48] = (&tmp[0..IDENTITY_HASH_SIZE]).try_into().unwrap();
let mut announced_objects_requested = self.announced_objects_requested.lock().await;
if !announced_objects_requested.contains_key(&identity_hash) && !self.store.have(ms_since_epoch(), &identity_hash) {
announced_objects_requested.insert(identity_hash.clone(), ms_monotonic());
drop(announced_objects_requested); // release mutex
tmp[0] = protocol::MESSAGE_TYPE_GET_OBJECTS;
tmp[1] = 0;
tmp[2] = varint::ONE;
tmp[3..(3 + IDENTITY_HASH_SIZE)].copy_from_slice(&identity_hash);
if !writer.lock().await.write_all(&tmp).await.is_err() {
break 'main_io_loop;
}
}
}
protocol::MESSAGE_TYPE_OBJECT => {
let object_size = varint::async_read(&mut reader).await;
if object_size.is_err() {
break 'main_io_loop;
}
let object_size = object_size.unwrap();
if object_size > self.config.max_object_size as u64 {
break 'main_io_loop;
}
let object = &mut tmp[0..(object_size as usize)];
if reader.read_exact(object).await.is_err() {
break 'main_io_loop;
}
let identity_hash: [u8; 48] = SHA384::hash(object);
match self.store.put(ms_since_epoch(), &identity_hash, object) {
StorePutResult::Invalid => {
break 'main_io_loop;
}
StorePutResult::Ok | StorePutResult::Duplicate => {
if self.announced_objects_requested.lock().await.remove(&identity_hash).is_some() {
// TODO: propagate rumor if we requested this object in response to a HAVE message.
}
}
_ => {
let _ = self.announced_objects_requested.lock().await.remove(&identity_hash);
}
}
}
protocol::MESSAGE_TYPE_GET_OBJECTS => {
// Get the reference time for this query.
let reference_time = varint::async_read(&mut reader).await;
if reference_time.is_err() {
break 'main_io_loop;
}
let reference_time = reference_time.unwrap();
// Read common prefix if the requester is requesting a set of hashes with the same beginning.
// A common prefix length of zero means they're requesting by full hash.
if reader.read_exact(&mut tmp[0..1]).await.is_err() {
break 'main_io_loop;
}
let common_prefix_length = tmp[0] as usize;
if common_prefix_length >= IDENTITY_HASH_SIZE {
break 'main_io_loop;
}
if reader.read_exact(&mut tmp[0..common_prefix_length]).await.is_err() {
break 'main_io_loop;
}
// Get the number of hashes being requested.
let hash_count = varint::async_read(&mut reader).await;
if hash_count.is_err() {
break 'main_io_loop;
}
let hash_count = hash_count.unwrap();
// Step through each suffix of the common prefix and send the object if found.
for _ in 0..hash_count {
if reader.read_exact(&mut tmp[common_prefix_length..IDENTITY_HASH_SIZE]).await.is_err() {
break 'main_io_loop;
}
let identity_hash: [u8; IDENTITY_HASH_SIZE] = (&tmp[0..IDENTITY_HASH_SIZE]).try_into().unwrap();
if self.store.get(reference_time, &identity_hash, &mut get_buffer) {
let mut w = writer.lock().await;
if varint::async_write(&mut *w, get_buffer.len() as u64).await.is_err() {
break 'main_io_loop;
}
if w.write_all(get_buffer.as_slice()).await.is_err() {
break 'main_io_loop;
}
}
}
}
_ => {
break 'main_io_loop;
}
}
}
}
}
unsafe impl<'ex, S: 'static + Store> Send for ReplicatorImpl<'ex, S> {}
unsafe impl<'ex, S: 'static + Store> Sync for ReplicatorImpl<'ex, S> {}
*/

View file

@ -23,7 +23,7 @@ pub enum StorePutResult {
/// Trait that must be implemented by the data store that is to be replicated.
pub trait Store: Sync + Send {
/// Type returned by get(), which can be anything that contains a byte slice.
type Object: AsRef<[u8]>;
type Object: AsRef<[u8]> + Send;
/// Get the current wall time in milliseconds since Unix epoch.
fn clock(&self) -> u64;

View file

@ -37,6 +37,8 @@ impl GMAC {
}
}
unsafe impl Send for GMAC {}
/// A wrapper for GMAC with an incrementing 96-bit nonce.
///
/// This is designed for use to authenticate messages on an otherwise unencrypted
@ -68,3 +70,5 @@ impl GMACStream {
#[inline(always)]
pub fn finish(&mut self, mac: &mut [u8; 16]) { self.0.finish(mac); }
}
unsafe impl Send for GMACStream {}

View file

@ -119,6 +119,10 @@ impl Clone for P521PublicKey {
}
}
unsafe impl Send for P521PublicKey {}
unsafe impl Sync for P521PublicKey {}
/// NIST P-521 elliptic curve key pair.
/// This supports both ECDSA signing and ECDH key agreement. In practice the same key pair
/// is not used for both functions as this is considred bad practice.
@ -247,6 +251,10 @@ impl Clone for P521KeyPair {
}
}
unsafe impl Send for P521KeyPair {}
unsafe impl Sync for P521KeyPair {}
#[cfg(test)]
mod tests {
use crate::p521::P521KeyPair;

View file

@ -34,7 +34,7 @@ pub(crate) fn array_range_mut<T, const A: usize, const START: usize, const LEN:
pub(crate) fn u64_as_bytes(i: &u64) -> &[u8; 8] { unsafe { &*(i as *const u64).cast() } }
lazy_static! {
static mut HIGHWAYHASHER_KEY: [u64; 4] = [zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure()];
static ref HIGHWAYHASHER_KEY: [u64; 4] = [zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure()];
}
/// Get an instance of HighwayHasher initialized with a secret per-process random salt.

View file

@ -26,7 +26,6 @@ struct PoolEntry<O, F: PoolFactory<O>> {
struct PoolInner<O, F: PoolFactory<O>> {
factory: F,
pool: Mutex<Vec<NonNull<PoolEntry<O, F>>>>,
//outstanding_count: AtomicIsize
}
/// Container for pooled objects that have been checked out of the pool.
@ -107,7 +106,6 @@ impl<O, F: PoolFactory<O>> Drop for Pooled<O, F> {
let p = p.unwrap();
p.factory.reset(&mut self.0.as_mut().obj);
p.pool.lock().push(self.0);
//let _ = p.outstanding_count.fetch_sub(1, Ordering::Release);
} else {
drop(Box::from_raw(self.0.as_ptr()))
}
@ -125,7 +123,6 @@ impl<O, F: PoolFactory<O>> Pool<O, F> {
Self(Arc::new(PoolInner::<O, F> {
factory,
pool: Mutex::new(Vec::with_capacity(initial_stack_capacity)),
//outstanding_count: AtomicIsize::new(0)
}))
}
@ -143,32 +140,6 @@ impl<O, F: PoolFactory<O>> Pool<O, F> {
}))
}
/*
/// Get a pooled object, or allocate one if the pool is empty.
/// This will return None if there are more outstanding pooled objects than the limit.
/// The limit is exclusive, so a value of 0 will mean that only one outstanding
/// object will be permitted as in this case there were zero outstanding at time
/// of checkout.
#[inline(always)]
pub fn try_get(&self, outstanding_pooled_object_limit: usize) -> Option<Pooled<O, F>> {
let outstanding = self.0.outstanding_count.fetch_add(1, Ordering::Acquire);
debug_assert!(outstanding >= 0);
if outstanding as usize > outstanding_pooled_object_limit {
let _ = self.0.outstanding_count.fetch_sub(1, Ordering::Release);
None
} else {
Some(Pooled::<O, F>(self.0.pool.lock().pop().unwrap_or_else(|| {
unsafe {
NonNull::new_unchecked(Box::into_raw(Box::new(PoolEntry::<O, F> {
obj: self.0.pool.create(),
return_pool: Arc::downgrade(&self.0),
})))
}
})))
}
}
*/
/// Dispose of all pooled objects, freeing any memory they use.
///
/// If get() is called after this new objects will be allocated, and any outstanding

View file

@ -234,9 +234,10 @@ impl EphemeralKeyPairSet {
}
return if it_happened {
let rs = zt_kbkdf_hmac_sha384(&key.0, KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_STATE_ID, 0, 0);
Some(EphemeralSymmetricSecret {
secret: SymmetricSecret::new(key),
ratchet_state: (&zt_kbkdf_hmac_sha384(&key.0, KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_STATE_ID, 0, 0).0[0..16]).try_into().unwrap(),
ratchet_state: (&rs.0[0..16]).try_into().unwrap(),
rekey_time: time_ticks + EPHEMERAL_SECRET_REKEY_AFTER_TIME,
expire_time: time_ticks + EPHEMERAL_SECRET_REJECT_AFTER_TIME,
c25519_ratchet_count,

View file

@ -11,6 +11,7 @@ use std::cmp::Ordering;
use std::convert::TryInto;
use std::io::Write;
use std::mem::MaybeUninit;
use std::ops::Deref;
use std::ptr::{slice_from_raw_parts, slice_from_raw_parts_mut};
use std::str::FromStr;
@ -36,8 +37,8 @@ pub const IDENTITY_CIPHER_SUITE_X25519: u8 = 0x00;
/// NIST P-521 ECDH/ECDSA cipher suite.
///
/// Sooo.... why 0x03 and not 0x01 or some other value? It's to compensate at the cost of
/// one wasted bit for a short-sighted aspect of the old identity encoding and HELLO packet
/// encoding.
/// one wasted bit in our bit mask for a short-sighted aspect of the old identity encoding
/// and HELLO packet encoding.
///
/// The old identity encoding contains no provision for skipping data it doesn't understand
/// nor any provision for an upgrade. That's dumb, but there it is on millions of nodes. The
@ -206,7 +207,8 @@ impl Identity {
}
/// Locally check the validity of this identity.
/// This is somewhat time consuming.
///
/// This is somewhat time consuming due to the memory-intensive work algorithm.
pub fn validate_identity(&self) -> bool {
let pow_threshold = if self.p521.is_some() {
let p521 = self.p521.as_ref().unwrap();
@ -250,11 +252,16 @@ impl Identity {
pub fn agree(&self, other: &Identity) -> Option<Secret<48>> {
self.secret.as_ref().and_then(|secret| {
let c25519_secret = Secret(SHA512::hash(&secret.c25519.agree(&other.c25519).0));
// FIPS note: FIPS-compliant exchange algorithms must be the last algorithms in any HKDF chain
// for the final result to be technically FIPS compliant. Non-FIPS algorithm secrets are considered
// a salt in the HMAC(salt, key) HKDF construction.
if secret.p521.is_some() && other.p521.is_some() {
P521PublicKey::from_bytes(&other.p521.as_ref().unwrap().ecdh).and_then(|other_p521| secret.p521.as_ref().unwrap().ecdh.agree(&other_p521).map(|p521_secret| Secret(SHA384::hmac(&c25519_secret.0[0..48], &p521_secret.0))))
P521PublicKey::from_bytes(&other.p521.as_ref().unwrap().ecdh).and_then(|other_p521| {
secret.p521.as_ref().unwrap().ecdh.agree(&other_p521).map(|p521_secret| {
Secret(SHA384::hmac(&c25519_secret.0[0..48], &p521_secret.0))
})
})
} else {
Some(Secret(array_range::<u8, 64, 0, 48>(&c25519_secret.0).clone()))
}
@ -262,6 +269,7 @@ impl Identity {
}
/// Sign a message with this identity.
///
/// A return of None happens if we don't have our secret key(s) or some other error occurs.
pub fn sign(&self, msg: &[u8], use_cipher_suites: u8) -> Option<Vec<u8>> {
self.secret.as_ref().and_then(|secret| {
@ -443,7 +451,7 @@ impl Identity {
if (include_cipher_suites & IDENTITY_CIPHER_SUITE_EC_NIST_P521) == IDENTITY_CIPHER_SUITE_EC_NIST_P521 && secret.p521.is_some() && self.p521.is_some() {
let p521_secret = secret.p521.as_ref().unwrap();
let p521 = self.p521.as_ref().unwrap();
let p521_secret_joined: [u8; P521_SECRET_KEY_SIZE + P521_SECRET_KEY_SIZE] = concat_arrays_2(p521_secret.ecdh.public_key_bytes(), p521_secret.ecdsa.public_key_bytes());
let p521_secret_joined: [u8; P521_SECRET_KEY_SIZE + P521_SECRET_KEY_SIZE] = concat_arrays_2(p521_secret.ecdh.secret_key_bytes().as_bytes(), p521_secret.ecdsa.secret_key_bytes().as_bytes());
let p521_joined: [u8; P521_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE + P521_ECDSA_SIGNATURE_SIZE + ED25519_SIGNATURE_SIZE] = concat_arrays_4(&p521.ecdh, &p521.ecdsa, &p521.ecdsa_self_signature, &p521.ed25519_self_signature);
format!("{}:0:{}{}:{}{}:1:{}:{}", self.address.to_string(), hex::to_string(&self.c25519), hex::to_string(&self.ed25519), hex::to_string(&secret.c25519.secret_bytes().0), hex::to_string(&secret.ed25519.secret_bytes().0), base64::encode_config(p521_joined, base64::URL_SAFE_NO_PAD), base64::encode_config(p521_secret_joined, base64::URL_SAFE_NO_PAD))
} else {
@ -624,7 +632,7 @@ const ADDRESS_DERIVATION_HASH_MEMORY_SIZE: usize = 2097152;
/// non-cryptographic hash. Its memory hardness and use in a work function is a defense
/// in depth feature rather than a primary security feature.
fn zt_address_derivation_memory_intensive_hash(digest: &mut [u8; 64], genmem_pool_obj: &mut Pooled<AddressDerivationMemory, AddressDerivationMemoryFactory>) {
let genmem_ptr = genmem_pool_obj.0.as_mut_ptr().cast::<u8>();
let genmem_ptr: *mut u8 = genmem_pool_obj.get_memory();
let (genmem, genmem_alias_hack) = unsafe { (&mut *slice_from_raw_parts_mut(genmem_ptr, ADDRESS_DERIVATION_HASH_MEMORY_SIZE), &*slice_from_raw_parts(genmem_ptr, ADDRESS_DERIVATION_HASH_MEMORY_SIZE)) };
let genmem_u64_ptr = genmem_ptr.cast::<u64>();
@ -655,13 +663,23 @@ fn zt_address_derivation_memory_intensive_hash(digest: &mut [u8; 64], genmem_poo
}
#[repr(transparent)]
struct AddressDerivationMemory([u128; ADDRESS_DERIVATION_HASH_MEMORY_SIZE / 16]); // use u128 to align by 16 bytes
struct AddressDerivationMemory(*mut u8);
struct AddressDerivationMemoryFactory;
impl AddressDerivationMemory {
#[inline(always)]
fn get_memory(&mut self) -> *mut u8 { self.0 }
}
impl Drop for AddressDerivationMemory {
#[inline(always)]
fn drop(&mut self) { unsafe { dealloc(self.0, Layout::from_size_align(ADDRESS_DERIVATION_HASH_MEMORY_SIZE, 8).unwrap()) }; }
}
impl PoolFactory<AddressDerivationMemory> for AddressDerivationMemoryFactory {
#[inline(always)]
fn create(&self) -> AddressDerivationMemory { AddressDerivationMemory([0_u128; ADDRESS_DERIVATION_HASH_MEMORY_SIZE / 16]) }
fn create(&self) -> AddressDerivationMemory { AddressDerivationMemory(unsafe { alloc(Layout::from_size_align(ADDRESS_DERIVATION_HASH_MEMORY_SIZE, 8).unwrap()) }) }
#[inline(always)]
fn reset(&self, _: &mut AddressDerivationMemory) {}
@ -678,3 +696,25 @@ lazy_static! {
pub(crate) fn purge_verification_memory_pool() {
unsafe { ADDRESS_DERVIATION_MEMORY_POOL.purge() };
}
#[cfg(test)]
mod tests {
use crate::vl1::Identity;
#[test]
fn v0() {
}
#[test]
fn v1() {
}
#[test]
fn generate() {
let count = 64;
for _ in 0..count {
let id = Identity::generate();
println!("{}", id.to_secret_string());
}
}
}

View file

@ -124,7 +124,7 @@ impl InetAddress {
addr.sin6.sin6_family = AF_INET6.into();
addr.sin6.sin6_port = port.to_be().into();
unsafe {
*((&mut (addr.sin6.sin6_addr) as *mut _).cast::<u8>().offset(15)) = 1;
*((&mut (addr.sin6.sin6_addr) as *mut in6_addr).cast::<u8>().offset(15)) = 1;
}
addr
}
@ -177,12 +177,12 @@ impl InetAddress {
if ip.len() == 4 {
self.sin.sin_family = AF_INET.into();
self.sin.sin_port = port.into();
copy_nonoverlapping(ip.as_ptr(), (&mut self.sin.sin_addr.s_addr as *mut _).cast::<u8>(), 4);
copy_nonoverlapping(ip.as_ptr(), (&mut self.sin.sin_addr.s_addr as *mut u32).cast::<u8>(), 4);
AF_INET
} else if ip.len() == 16 {
self.sin6.sin6_family = AF_INET6.into();
self.sin6.sin6_port = port.into();
copy_nonoverlapping(ip.as_ptr(), (&mut self.sin6.sin6_addr as *mut _).cast::<u8>(), 16);
copy_nonoverlapping(ip.as_ptr(), (&mut self.sin6.sin6_addr as *mut in6_addr).cast::<u8>(), 16);
AF_INET6
} else {
0
@ -195,8 +195,8 @@ impl InetAddress {
pub fn ip_bytes(&self) -> &[u8] {
unsafe {
match self.sa.sa_family as u8 {
AF_INET => &*(&self.sin.sin_addr.s_addr as *const _).cast::<[u8; 4]>(),
AF_INET6 => &*(&self.sin6.sin6_addr as *const _).cast::<[u8; 16]>(),
AF_INET => &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>(),
AF_INET6 => &*(&self.sin6.sin6_addr as *const in6_addr).cast::<[u8; 16]>(),
_ => &[],
}
}
@ -210,7 +210,7 @@ impl InetAddress {
unsafe {
match self.sa.sa_family as u8 {
AF_INET => self.sin.sin_addr.s_addr as u128,
AF_INET6 => u128::from_ne_bytes(*(&self.sin6.sin6_addr as *const _).cast::<[u8; 16]>()),
AF_INET6 => u128::from_ne_bytes(*(&self.sin6.sin6_addr as *const in6_addr).cast::<[u8; 16]>()),
_ => 0,
}
}
@ -323,7 +323,7 @@ impl InetAddress {
}
}
AF_INET6 => {
let ip = &*(&(self.sin6.sin6_addr) as *const _).cast::<[u8; 16]>();
let ip = &*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>();
if (ip[0] & 0xf0) == 0xf0 {
if ip[0] == 0xff {
return IpScope::Multicast; // ff00::/8
@ -366,10 +366,10 @@ impl InetAddress {
unsafe {
match self.sa.sa_family as u8 {
AF_INET => {
let ip = &*(&self.sin.sin_addr.s_addr as *const _).cast::<[u8; 4]>();
let ip = &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>();
format!("{}.{}.{}.{}", ip[0], ip[1], ip[2], ip[3])
}
AF_INET6 => Ipv6Addr::from(*(&(self.sin6.sin6_addr) as *const _).cast::<[u8; 16]>()).to_string(),
AF_INET6 => Ipv6Addr::from(*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>()).to_string(),
_ => String::from("(null)")
}
}
@ -381,17 +381,17 @@ impl InetAddress {
AF_INET => {
let b = buf.append_bytes_fixed_get_mut::<7>()?;
b[0] = 4;
copy_nonoverlapping((&self.sin.sin_addr.s_addr as *const _).cast::<u8>(), b.as_mut_ptr().offset(1), 4);
b[5] = *(&self.sin.sin_port as *const _).cast::<u8>();
b[6] = *(&self.sin.sin_port as *const _).cast::<u8>().offset(1);
copy_nonoverlapping((&self.sin.sin_addr.s_addr as *const u32).cast::<u8>(), b.as_mut_ptr().offset(1), 4);
b[5] = *(&self.sin.sin_port as *const u16).cast::<u8>();
b[6] = *(&self.sin.sin_port as *const u16).cast::<u8>().offset(1);
Ok(())
}
AF_INET6 => {
let b = buf.append_bytes_fixed_get_mut::<19>()?;
b[0] = 6;
copy_nonoverlapping((&(self.sin6.sin6_addr) as *const _).cast::<u8>(), b.as_mut_ptr().offset(1), 16);
b[17] = *(&self.sin6.sin6_port as *const _).cast::<u8>();
b[18] = *(&self.sin6.sin6_port as *const _).cast::<u8>().offset(1);
copy_nonoverlapping((&(self.sin6.sin6_addr) as *const in6_addr).cast::<u8>(), b.as_mut_ptr().offset(1), 16);
b[17] = *(&self.sin6.sin6_port as *const u16).cast::<u8>();
b[18] = *(&self.sin6.sin6_port as *const u16).cast::<u8>().offset(1);
Ok(())
}
_ => buf.append_u8(0)

View file

@ -127,6 +127,7 @@ struct BackgroundTaskIntervals {
}
pub struct Node {
pub(crate) instance_id: u64,
identity: Identity,
intervals: Mutex<BackgroundTaskIntervals>,
paths: DashMap<u128, Arc<Path>>,
@ -163,6 +164,7 @@ impl Node {
};
Ok(Self {
instance_id: next_u64_secure(),
identity: id,
intervals: Mutex::new(BackgroundTaskIntervals::default()),
paths: DashMap::new(),

View file

@ -28,9 +28,9 @@ use crate::vl1::protocol::*;
pub(crate) const PATH_KEEPALIVE_INTERVAL: i64 = 20000;
lazy_static! {
static mut RANDOM_64BIT_SALT_0: u64 = zerotier_core_crypto::random::next_u64_secure();
static mut RANDOM_64BIT_SALT_1: u64 = zerotier_core_crypto::random::next_u64_secure();
static mut RANDOM_64BIT_SALT_2: u64 = zerotier_core_crypto::random::next_u64_secure();
static ref RANDOM_64BIT_SALT_0: u64 = zerotier_core_crypto::random::next_u64_secure();
static ref RANDOM_64BIT_SALT_1: u64 = zerotier_core_crypto::random::next_u64_secure();
static ref RANDOM_64BIT_SALT_2: u64 = zerotier_core_crypto::random::next_u64_secure();
}
/// A remote endpoint paired with a local socket and a local interface.
@ -50,8 +50,8 @@ impl Path {
/// Get a 128-bit key to look up this endpoint in the local node path map.
#[inline(always)]
pub(crate) fn local_lookup_key(endpoint: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> u128 {
let local_socket = local_socket.map_or(0, |s| crate::util::hash64_noncrypt(RANDOM_64BIT_SALT_0 + s.get() as u64));
let local_interface = local_interface.map_or(0, |s| crate::util::hash64_noncrypt(RANDOM_64BIT_SALT_1 + s.get() as u64));
let local_socket = local_socket.map_or(0, |s| crate::util::hash64_noncrypt(*RANDOM_64BIT_SALT_0 + s.get() as u64));
let local_interface = local_interface.map_or(0, |s| crate::util::hash64_noncrypt(*RANDOM_64BIT_SALT_1 + s.get() as u64));
let lsi = (local_socket as u128).wrapping_shl(64) | (local_interface as u128);
match endpoint {
Endpoint::Nil => 0,
@ -61,7 +61,7 @@ impl Path {
Endpoint::Bluetooth(m) => (m.to_u64() | 0x0400000000000000) as u128 ^ lsi,
Endpoint::Ip(ip) => ip.ip_as_native_u128().wrapping_sub(lsi), // naked IP has no port
Endpoint::IpUdp(ip) => ip.ip_as_native_u128().wrapping_add(lsi), // UDP maintains one path per IP but merely learns the most recent port
Endpoint::IpTcp(ip) => ip.ip_as_native_u128().wrapping_sub(crate::util::hash64_noncrypt((ip.port() as u64).wrapping_add(RANDOM_64BIT_SALT_2)) as u128).wrapping_sub(lsi),
Endpoint::IpTcp(ip) => ip.ip_as_native_u128().wrapping_sub(crate::util::hash64_noncrypt((ip.port() as u64).wrapping_add(*RANDOM_64BIT_SALT_2)) as u128).wrapping_sub(lsi),
Endpoint::Http(s) => {
let mut hh = highwayhasher();
let _ = hh.write_all(s.as_bytes());

View file

@ -377,7 +377,7 @@ impl Peer {
/// via a root or some other route.
pub(crate) fn send<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
self.path(node).map_or(false, |path| {
if self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), packet) {
if self.send_to_endpoint(ci, path.endpoint().as_ref(), path.local_socket(), path.local_interface(), packet) {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
self.total_bytes_sent.fetch_add(packet.len() as u64, Ordering::Relaxed);
true
@ -396,7 +396,7 @@ impl Peer {
/// Intermediates don't need to adjust fragmentation.
pub(crate) fn forward<CI: VL1SystemInterface>(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
self.direct_path().map_or(false, |path| {
if ci.wire_send(path.endpoint(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) {
if ci.wire_send(path.endpoint().as_ref(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) {
self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
self.total_bytes_forwarded.fetch_add(packet.len() as u64, Ordering::Relaxed);
true
@ -470,7 +470,7 @@ impl Peer {
explicit_endpoint.map_or_else(|| {
self.path(node).map_or(false, |path| {
path.log_send_anything(time_ticks);
self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), &packet)
self.send_to_endpoint(ci, path.endpoint().as_ref(), path.local_socket(), path.local_interface(), &packet)
})
}, |endpoint| {
self.send_to_endpoint(ci, endpoint, None, None, &packet)

View file

@ -89,7 +89,7 @@ pub struct RootSet {
impl RootSet {
/// Create a new root set populated with compiled-in ZeroTier defaults.
pub fn zerotier_default() -> Self {
Self::from_bytes(include_bytes!("./rootset-default.bin")).unwrap()
Self::from_bytes(include_bytes!("./rootset-default.bin")).expect("invalid compiled-in default root set")
}
/// Create and sign a new root set.
@ -331,6 +331,7 @@ impl RootSet {
mod tests {
use crate::vl1::rootset::RootSet;
/*
#[test]
fn default_root_set() {
let rs = RootSet::zerotier_default();
@ -341,4 +342,5 @@ mod tests {
});
});
}
*/
}

View file

@ -8,7 +8,7 @@
use parking_lot::Mutex;
use zerotier_core_crypto::aes_gmac_siv::{AesCtr, AesGmacSiv};
use zerotier_core_crypto::aes_gmac_siv::AesGmacSiv;
use zerotier_core_crypto::hash::SHA384_HASH_SIZE;
use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384;
use zerotier_core_crypto::secret::Secret;