mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-05-02 11:53:43 +02:00
Replicator code, and cleanup.
This commit is contained in:
parent
5fd0e2998b
commit
734e86dfed
16 changed files with 599 additions and 41 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -1,8 +1,11 @@
|
||||||
/target
|
/target
|
||||||
/**/target
|
/**/target
|
||||||
|
|
||||||
/aes-gmac-siv/Cargo.lock
|
/aes-gmac-siv/Cargo.lock
|
||||||
/zerotier-core-crypto/Cargo.lock
|
/zerotier-core-crypto/Cargo.lock
|
||||||
/zerotier-network-hypervisor/Cargo.lock
|
/zerotier-network-hypervisor/Cargo.lock
|
||||||
|
/allthethings/Cargo.lock
|
||||||
|
|
||||||
.DS_*
|
.DS_*
|
||||||
.Icon*
|
.Icon*
|
||||||
._*
|
._*
|
||||||
|
|
2
Makefile
2
Makefile
|
@ -1,6 +1,6 @@
|
||||||
all:
|
all:
|
||||||
|
|
||||||
clean: FORCE
|
clean: FORCE
|
||||||
rm -rf zerotier-core-crypto/target zerotier-network-hypervisor/target zerotier-system-service/target
|
rm -rf zerotier-core-crypto/target zerotier-network-hypervisor/target zerotier-system-service/target allthethings/target
|
||||||
|
|
||||||
FORCE:
|
FORCE:
|
||||||
|
|
2
allthethings/.gitignore
vendored
Normal file
2
allthethings/.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
/target
|
||||||
|
Cargo.lock
|
9
allthethings/Cargo.toml
Normal file
9
allthethings/Cargo.toml
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
[package]
|
||||||
|
name = "allthethings"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
sha2 = { version = "^0", features = ["asm"] }
|
||||||
|
smol = { version = "^1", features = [] }
|
||||||
|
getrandom = "^0"
|
12
allthethings/src/lib.rs
Normal file
12
allthethings/src/lib.rs
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
mod store;
|
||||||
|
mod replicator;
|
||||||
|
mod protocol;
|
||||||
|
mod varint;
|
||||||
|
|
||||||
|
pub(crate) fn ms_since_epoch() -> u64 {
|
||||||
|
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn ms_monotonic() -> u64 {
|
||||||
|
std::time::Instant::now().elapsed().as_millis() as u64
|
||||||
|
}
|
27
allthethings/src/protocol.rs
Normal file
27
allthethings/src/protocol.rs
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
pub(crate) const PROTOCOL_VERSION: u8 = 1;
|
||||||
|
|
||||||
|
/// No operation and no payload, sent as a heartbeat.
|
||||||
|
/// This is the only message type NOT followed by a message size varint. It's just one byte.
|
||||||
|
pub(crate) const MESSAGE_TYPE_NOP: u8 = 0;
|
||||||
|
|
||||||
|
/// An object either sent in response to a query or because it is new.
|
||||||
|
/// Payload is simply the object. The hash is not included as we compute it locally for security.
|
||||||
|
pub(crate) const MESSAGE_TYPE_OBJECT: u8 = 1;
|
||||||
|
|
||||||
|
/// Request one or more objects by identity hash with optional common prefix.
|
||||||
|
pub(crate) const MESSAGE_TYPE_GET_OBJECTS: u8 = 2;
|
||||||
|
|
||||||
|
/// HELLO message, which is all u8's and is packed and so can be parsed directly in place.
|
||||||
|
/// This message is sent at the start of any connection by both sides.
|
||||||
|
#[repr(packed)]
|
||||||
|
pub(crate) struct Hello {
|
||||||
|
pub hello_size: u8, // technically a varint but below 0x80
|
||||||
|
pub protocol_version: u8,
|
||||||
|
pub flags: [u8; 4], // u32, little endian
|
||||||
|
pub clock: [u8; 8], // u64, little endian
|
||||||
|
pub data_set_size: [u8; 8], // u64, little endian
|
||||||
|
pub domain_hash: [u8; 32],
|
||||||
|
pub instance_id: [u8; 16],
|
||||||
|
pub loopback_check_code_salt: [u8; 8],
|
||||||
|
pub loopback_check_code: [u8; 16],
|
||||||
|
}
|
273
allthethings/src/replicator.rs
Normal file
273
allthethings/src/replicator.rs
Normal file
|
@ -0,0 +1,273 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::convert::TryInto;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::hash::{Hash, Hasher};
|
||||||
|
use std::io::Read;
|
||||||
|
use std::mem::{size_of, transmute};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use getrandom::getrandom;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use sha2::digest::{FixedOutput, Reset, Update};
|
||||||
|
use smol::{Executor, Task, Timer};
|
||||||
|
use smol::future;
|
||||||
|
use smol::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
|
use smol::lock::Mutex;
|
||||||
|
use smol::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream, SocketAddr};
|
||||||
|
use smol::stream::StreamExt;
|
||||||
|
|
||||||
|
use crate::{ms_monotonic, ms_since_epoch, protocol};
|
||||||
|
use crate::store::Store;
|
||||||
|
use crate::varint;
|
||||||
|
|
||||||
|
const CONNECTION_TIMEOUT_SECONDS: u64 = 30;
|
||||||
|
|
||||||
|
pub struct Config {
|
||||||
|
/// TCP port to which this should bind.
|
||||||
|
pub tcp_port: u16,
|
||||||
|
|
||||||
|
/// A name for this replicated data set. This is just used to prevent linking to peers replicating different data.
|
||||||
|
pub domain: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone)]
|
||||||
|
struct ConnectionKey {
|
||||||
|
instance_id: [u8; 16],
|
||||||
|
ip: IpAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Hash for ConnectionKey {
|
||||||
|
#[inline(always)]
|
||||||
|
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||||
|
state.write(&self.instance_id);
|
||||||
|
self.ip.hash(state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ReplicatorImpl<S: Store> {
|
||||||
|
instance_id: [u8; 16],
|
||||||
|
loopback_check_code_secret: [u8; 16],
|
||||||
|
domain_hash: [u8; 32],
|
||||||
|
store: Arc<S>,
|
||||||
|
config: Config,
|
||||||
|
connections: Mutex<HashMap<ConnectionKey, (SocketAddr, Task<()>)>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Replicator<S: Store> {
|
||||||
|
state: Arc<ReplicatorImpl<S>>,
|
||||||
|
v4_listener_task: Option<Task<()>>,
|
||||||
|
v6_listener_task: Option<Task<()>>,
|
||||||
|
service_task: Task<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Store> Replicator<S> {
|
||||||
|
/// Create a new replicator to replicate the contents of the provided store.
|
||||||
|
/// All async tasks, sockets, and connections will be dropped if the replicator is dropped. Use
|
||||||
|
/// the shutdown() method for a graceful shutdown.
|
||||||
|
pub async fn start(executor: Arc<Executor>, store: Arc<S>, config: Config) -> Result<Self, Box<dyn Err>> {
|
||||||
|
let listener_v4 = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)).await;
|
||||||
|
let listener_v6 = TcpListener::bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)).await;
|
||||||
|
if listener_v4.is_err() && listener_v6.is_err() {
|
||||||
|
return Err(Box::new(listener_v4.unwrap_err()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let r = Arc::new(ReplicatorImpl::<S> {
|
||||||
|
instance_id: {
|
||||||
|
let mut tmp = [0_u8; 16];
|
||||||
|
getrandom(&mut tmp).expect("getrandom failed");
|
||||||
|
tmp
|
||||||
|
},
|
||||||
|
loopback_check_code_secret: {
|
||||||
|
let mut tmp = [0_u8; 16];
|
||||||
|
getrandom(&mut tmp).expect("getrandom failed");
|
||||||
|
tmp
|
||||||
|
},
|
||||||
|
domain_hash: {
|
||||||
|
let mut h = Sha256::new();
|
||||||
|
h.update(config.domain.as_bytes());
|
||||||
|
h.finalize_fixed().into()
|
||||||
|
},
|
||||||
|
config,
|
||||||
|
store,
|
||||||
|
connections: Mutex::new(HashMap::new()),
|
||||||
|
});
|
||||||
|
|
||||||
|
let (e0, e1) = (executor.clone(), executor.clone());
|
||||||
|
Ok(Self {
|
||||||
|
state: r,
|
||||||
|
v4_listener_task: listener_v4.map_or(None, |listener_v4| Some(executor.spawn(r.listener_task_main(listener_v4, e0)))),
|
||||||
|
v6_listener_task: listener_v6.map_or(None, |listener_v6| Some(executor.spawn(r.listener_task_main(listener_v6, e1)))),
|
||||||
|
service_task: executor.spawn(r.service_main(executor.clone())),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn shutdown(self) {
|
||||||
|
// Get a joined future including our service task and one or both listeners. There is always
|
||||||
|
// at least one listener. If there are no listeners this is a bug and will panic.
|
||||||
|
let main_tasks = self.v4_listener_task.map_or_else(|| {
|
||||||
|
future::zip(self.service_task.cancel(), self.v6_listener_task.unwrap().cancel())
|
||||||
|
}, |v4| {
|
||||||
|
self.v6_listener_task.map_or_else(|| {
|
||||||
|
future::zip(self.service_task.cancel(), v4.cancel())
|
||||||
|
}, |v6| {
|
||||||
|
future::zip(self.service_task.cancel(), future::zip(v4.cancel(), v6.cancel()))
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
// Just dropping all connections is fine.
|
||||||
|
self.state.connections.lock().await.clear();
|
||||||
|
|
||||||
|
// Then gracefully wait for the main tasks to finish.
|
||||||
|
let _ = main_tasks.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Store> ReplicatorImpl<S> {
|
||||||
|
async fn service_main(&self, executor: Arc<Executor>) {
|
||||||
|
let mut timer = smol::Timer::interval(Duration::from_secs(1));
|
||||||
|
let mut to_close: Vec<ConnectionKey> = Vec::new();
|
||||||
|
loop {
|
||||||
|
timer.next().await;
|
||||||
|
|
||||||
|
let mut connections = self.connections.lock().await;
|
||||||
|
|
||||||
|
let now_mt = ms_monotonic();
|
||||||
|
for cc in connections.iter_mut() {
|
||||||
|
let c = &(*cc.1).0;
|
||||||
|
if c.closed.load(Ordering::Relaxed) || (now_mt - c.last_receive_time.load(Ordering::Relaxed)) > CONNECTION_TIMEOUT_SECONDS {
|
||||||
|
to_close.push(cc.0.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for tc in to_close.iter() {
|
||||||
|
let _ = connections.remove(tc);
|
||||||
|
}
|
||||||
|
to_close.clear();
|
||||||
|
|
||||||
|
drop(connections);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listener_task_main(&self, listener: TcpListener, executor: Arc<Executor>) {
|
||||||
|
loop {
|
||||||
|
let stream = listener.accept().await;
|
||||||
|
if stream.is_ok() {
|
||||||
|
let (mut stream, remote_address) = stream.unwrap();
|
||||||
|
self.handle_new_connection(stream, remote_address, false, executor.clone()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_new_connection(&self, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool, executor: Arc<Executor>) {
|
||||||
|
stream.set_nodelay(true);
|
||||||
|
|
||||||
|
let mut loopback_check_code_salt = [0_u8; 8];
|
||||||
|
getrandom(&mut tmp).expect("getrandom failed");
|
||||||
|
|
||||||
|
let mut h = Sha256::new();
|
||||||
|
h.update(&loopback_check_code_salt);
|
||||||
|
h.update(&self.loopback_check_code_secret);
|
||||||
|
let loopback_check_code: [u8; 32] = h.finalize_fixed().into();
|
||||||
|
|
||||||
|
let hello = protocol::Hello {
|
||||||
|
hello_size: size_of::<protocol::Hello>() as u8,
|
||||||
|
protocol_version: protocol::PROTOCOL_VERSION,
|
||||||
|
flags: [0_u8; 4],
|
||||||
|
clock: ms_since_epoch().to_le_bytes(),
|
||||||
|
data_set_size: self.store.total_size().await.to_le_bytes(),
|
||||||
|
domain_hash: self.domain_hash.clone(),
|
||||||
|
instance_id: self.instance_id.clone(),
|
||||||
|
loopback_check_code_salt,
|
||||||
|
loopback_check_code: (&loopback_check_code[0..16]).try_into().unwrap()
|
||||||
|
};
|
||||||
|
let hello: [u8; size_of::<protocol::Hello>()] = unsafe { transmute(hello) };
|
||||||
|
|
||||||
|
if stream.write_all(&hello).await.is_ok() {
|
||||||
|
let mut hello_buf = [0_u8; size_of::<protocol::Hello>()];
|
||||||
|
if stream.read_exact(&mut hello_buf).await.is_ok() {
|
||||||
|
let hello: protocol::Hello = unsafe { transmute(hello_buf) };
|
||||||
|
|
||||||
|
// Sanity check HELLO packet. In the future we may support different versions and sizes.
|
||||||
|
if hello.hello_size == size_of::<protocol::Hello>() as u8 && hello.protocol_version == protocol::PROTOCOL_VERSION {
|
||||||
|
// If this hash's first 16 bytes are equal to the one in the HELLO, this connection is
|
||||||
|
// from this node and should be dropped.
|
||||||
|
let mut h = Sha256::new();
|
||||||
|
h.update(&hello.loopback_check_code_salt);
|
||||||
|
h.update(&self.loopback_check_code_secret);
|
||||||
|
let loopback_if_equal: [u8; 32] = h.finalize_fixed().into();
|
||||||
|
|
||||||
|
if !loopback_if_equal[0..16].eq(&hello.loopback_check_code) {
|
||||||
|
let k = ConnectionKey {
|
||||||
|
instance_id: hello.instance_id.clone(),
|
||||||
|
ip: remote_address.ip()
|
||||||
|
};
|
||||||
|
let mut connections = self.connections.lock().await;
|
||||||
|
let _ = connections.entry(k).or_insert_with(move || {
|
||||||
|
stream.set_nodelay(false);
|
||||||
|
(remote_address.clone(), executor.spawn(self.connection_io_task_main(stream, remote_address, false, executor.clone())))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connection_sync_init_task_main(&self, writer: Arc<Mutex<TcpStream>>) {
|
||||||
|
let mut periodic_timer = Timer::interval(Duration::from_secs(1));
|
||||||
|
loop {
|
||||||
|
let _ = periodic_timer.next().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connection_io_task_main(&self, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool, executor: Arc<Executor>) {
|
||||||
|
let mut reader = BufReader::with_capacity(S::MAX_OBJECT_SIZE * 2, stream.clone());
|
||||||
|
let writer = Arc::new(Mutex::new(stream));
|
||||||
|
|
||||||
|
let _sync_search_init_task = executor.spawn(self.connection_sync_init_task_main(writer.clone()));
|
||||||
|
|
||||||
|
let mut tmp = [0_u8; 4096];
|
||||||
|
'main_io_loop: loop {
|
||||||
|
if reader.read_exact(&mut tmp[0..1]).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
let message_type = tmp[0];
|
||||||
|
|
||||||
|
if message_type == protocol::MESSAGE_TYPE_NOP {
|
||||||
|
continue 'main_io_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
let message_size = varint::async_read(&mut reader).await;
|
||||||
|
if message_size.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
let mut message_size = message_size.unwrap();
|
||||||
|
|
||||||
|
if message_size > S::MAX_OBJECT_SIZE as u64 {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
match message_type {
|
||||||
|
protocol::MESSAGE_TYPE_OBJECT => {
|
||||||
|
},
|
||||||
|
protocol::MESSAGE_TYPE_GET_OBJECTS => {
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
// Skip the bodies of unrecognized message types.
|
||||||
|
while message_size >= tmp.len() as u64 {
|
||||||
|
if reader.read_exact(&tmp).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
message_size -= tmp.len() as u64;
|
||||||
|
}
|
||||||
|
if message_size > 0 {
|
||||||
|
if reader.read_exact(&mut tmp[0..(message_size as usize)]).await.is_err() {
|
||||||
|
break 'main_io_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
68
allthethings/src/store.rs
Normal file
68
allthethings/src/store.rs
Normal file
|
@ -0,0 +1,68 @@
|
||||||
|
use smol::net::SocketAddr;
|
||||||
|
|
||||||
|
/// Result code from the put() method in Database.
|
||||||
|
pub enum PutObjectResult {
|
||||||
|
/// Datum stored successfully.
|
||||||
|
Ok,
|
||||||
|
/// Datum is one we already have.
|
||||||
|
Duplicate,
|
||||||
|
/// Value is invalid. (this may result in dropping connections to peers, etc.)
|
||||||
|
Invalid,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Trait that must be implemented for the data store that is to be replicated.
|
||||||
|
///
|
||||||
|
/// Each datum is identified by an identity hash, which is a cryptographic hash of HASH_SIZE
|
||||||
|
/// bytes of its value. The implementation assumes that's what it is, but the hash function
|
||||||
|
/// is not specified in this library.
|
||||||
|
pub trait Store: Sync + Send {
|
||||||
|
/// The size in bytes of the identity hash.
|
||||||
|
const HASH_SIZE: usize;
|
||||||
|
|
||||||
|
/// The maximum size of the objects supported by this store (and thus replication domain).
|
||||||
|
const MAX_OBJECT_SIZE: usize;
|
||||||
|
|
||||||
|
/// Object type returned by get(), must implement AsRef<[u8]>.
|
||||||
|
type GetOutput: AsRef<[u8]>;
|
||||||
|
|
||||||
|
/// Compute a hash of a data object using the hash associated with this store.
|
||||||
|
/// This returns the identity hash which can then be used as a key with get(), put(), etc.
|
||||||
|
fn hash(&self, object: &[u8]) -> [u8; Self::HASH_SIZE];
|
||||||
|
|
||||||
|
/// Get the total size of this data set in objects.
|
||||||
|
async fn total_size(&self) -> u64;
|
||||||
|
|
||||||
|
/// Get an object from the database, returning None if it is not found or there is an error.
|
||||||
|
async fn get(&self, identity_hash: &[u8; Self::HASH_SIZE]) -> Option<Self::GetOutput>;
|
||||||
|
|
||||||
|
/// Store an entry in the database.
|
||||||
|
async fn put(&self, identity_hash: &[u8; Self::HASH_SIZE], object: &[u8]) -> PutObjectResult;
|
||||||
|
|
||||||
|
/// Count the number of identity hash keys in this range (inclusive) of identity hashes.
|
||||||
|
/// This may return None if an error occurs, but should return 0 if the set is empty.
|
||||||
|
async fn count(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE]) -> Option<u64>;
|
||||||
|
|
||||||
|
/// Called when a connection to a remote node was successful.
|
||||||
|
/// This is always called on successful outbound connect.
|
||||||
|
async fn save_remote_endpoint(&self, to_address: &SocketAddr);
|
||||||
|
|
||||||
|
/// Get a remote endpoint to try.
|
||||||
|
/// This can return endpoints in any order and is used to try to establish outbound links.
|
||||||
|
async fn get_remote_endpoint(&self) -> Option<SocketAddr>;
|
||||||
|
|
||||||
|
/*
|
||||||
|
/// Execute a function for every hash/value in a range.
|
||||||
|
/// Iteration stops if the supplied function returns false.
|
||||||
|
async fn for_each_entry<F, FF>(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE], function: F)
|
||||||
|
where
|
||||||
|
F: Fn(&[u8; Self::HASH_SIZE], &[u8]) -> FF,
|
||||||
|
FF: Future<Output=bool>;
|
||||||
|
|
||||||
|
/// Execute a function for every hash in a range.
|
||||||
|
/// Iteration stops if the supplied function returns false.
|
||||||
|
async fn for_each_hash_key<F, FF>(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE], function: F)
|
||||||
|
where
|
||||||
|
F: Fn(&[u8; Self::HASH_SIZE], &[u8]) -> FF,
|
||||||
|
FF: Future<Output=bool>;
|
||||||
|
*/
|
||||||
|
}
|
43
allthethings/src/varint.rs
Normal file
43
allthethings/src/varint.rs
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
*
|
||||||
|
* (c)2021 ZeroTier, Inc.
|
||||||
|
* https://www.zerotier.com/
|
||||||
|
*/
|
||||||
|
|
||||||
|
use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt};
|
||||||
|
|
||||||
|
pub async fn async_write<W: AsyncWrite>(w: &mut W, mut v: u64) -> std::io::Result<()> {
|
||||||
|
let mut b = [0_u8; 10];
|
||||||
|
let mut i = 0;
|
||||||
|
loop {
|
||||||
|
if v > 0x7f {
|
||||||
|
b[i] = (v as u8) & 0x7f;
|
||||||
|
i += 1;
|
||||||
|
v = v.wrapping_shr(7);
|
||||||
|
} else {
|
||||||
|
b[i] = (v as u8) | 0x80;
|
||||||
|
i += 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.write_all(&b[0..i]).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn async_read<R: AsyncRead>(r: &mut R) -> std::io::Result<u64> {
|
||||||
|
let mut v = 0_u64;
|
||||||
|
let mut buf = [0_u8; 1];
|
||||||
|
let mut pos = 0;
|
||||||
|
loop {
|
||||||
|
let _ = r.read_exact(&mut buf).await?;
|
||||||
|
let b = buf[0];
|
||||||
|
if b <= 0x7f {
|
||||||
|
v |= (b as u64).wrapping_shl(pos);
|
||||||
|
pos += 7;
|
||||||
|
} else {
|
||||||
|
v |= ((b & 0x7f) as u64).wrapping_shl(pos);
|
||||||
|
return Ok(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,7 +17,9 @@ base64 = "^0"
|
||||||
lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] }
|
lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] }
|
||||||
dashmap = "^4"
|
dashmap = "^4"
|
||||||
parking_lot = "^0"
|
parking_lot = "^0"
|
||||||
|
arc-swap = { version = "^1", features = [], default-features = false }
|
||||||
lazy_static = "^1"
|
lazy_static = "^1"
|
||||||
|
highway = "^0"
|
||||||
|
|
||||||
[target."cfg(not(windows))".dependencies]
|
[target."cfg(not(windows))".dependencies]
|
||||||
libc = "^0"
|
libc = "^0"
|
||||||
|
|
|
@ -29,15 +29,33 @@ pub(crate) fn array_range_mut<T, const A: usize, const START: usize, const LEN:
|
||||||
unsafe { &mut *a.as_mut_ptr().add(START).cast::<[T; LEN]>() }
|
unsafe { &mut *a.as_mut_ptr().add(START).cast::<[T; LEN]>() }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Cast a u64 to a byte array.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub(crate) fn u64_as_bytes(i: &u64) -> &[u8; 8] { unsafe { &*(i as *const u64).cast() } }
|
pub(crate) fn u64_as_bytes(i: &u64) -> &[u8; 8] { unsafe { &*(i as *const u64).cast() } }
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static mut HIGHWAYHASHER_KEY: [u64; 4] = [zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure(), zerotier_core_crypto::random::next_u64_secure()];
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get an instance of HighwayHasher initialized with a secret per-process random salt.
|
||||||
|
/// The random salt is generated at process start and so will differ for each invocation of whatever process this is inside.
|
||||||
|
#[inline(always)]
|
||||||
|
pub(crate) fn highwayhasher() -> highway::HighwayHasher { highway::HighwayHasher::new(highway::Key(HIGHWAYHASHER_KEY.clone())) }
|
||||||
|
|
||||||
|
/// Non-cryptographic 64-bit bit mixer for things like local hashing.
|
||||||
|
#[inline(always)]
|
||||||
|
pub(crate) fn hash64_noncrypt(mut x: u64) -> u64 {
|
||||||
|
x ^= x.wrapping_shr(30);
|
||||||
|
x = x.wrapping_mul(0xbf58476d1ce4e5b9);
|
||||||
|
x ^= x.wrapping_shr(27);
|
||||||
|
x = x.wrapping_mul(0x94d049bb133111eb);
|
||||||
|
x ^ x.wrapping_shr(31)
|
||||||
|
}
|
||||||
|
|
||||||
/// A hasher for maps that just returns u64 values as-is.
|
/// A hasher for maps that just returns u64 values as-is.
|
||||||
///
|
/// Used with things like ZeroTier addresses and network IDs that are already randomly distributed.
|
||||||
/// This should be used only for things like ZeroTier addresses that are already random
|
|
||||||
/// and that aren't vulnerable to malicious crafting of identifiers.
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
pub struct U64NoOpHasher(u64);
|
pub(crate) struct U64NoOpHasher(u64);
|
||||||
|
|
||||||
impl U64NoOpHasher {
|
impl U64NoOpHasher {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
|
|
|
@ -124,7 +124,7 @@ impl InetAddress {
|
||||||
addr.sin6.sin6_family = AF_INET6.into();
|
addr.sin6.sin6_family = AF_INET6.into();
|
||||||
addr.sin6.sin6_port = port.to_be().into();
|
addr.sin6.sin6_port = port.to_be().into();
|
||||||
unsafe {
|
unsafe {
|
||||||
*((&mut (addr.sin6.sin6_addr) as *mut in6_addr).cast::<u8>().offset(15)) = 1;
|
*((&mut (addr.sin6.sin6_addr) as *mut _).cast::<u8>().offset(15)) = 1;
|
||||||
}
|
}
|
||||||
addr
|
addr
|
||||||
}
|
}
|
||||||
|
@ -169,7 +169,7 @@ impl InetAddress {
|
||||||
/// Set the IP and port of this InetAddress.
|
/// Set the IP and port of this InetAddress.
|
||||||
/// Whether this is IPv4 or IPv6 is inferred from the size of ip[], which must be
|
/// Whether this is IPv4 or IPv6 is inferred from the size of ip[], which must be
|
||||||
/// either 4 or 16 bytes. The family (AF_INET or AF_INET6) is returned, or zero on
|
/// either 4 or 16 bytes. The family (AF_INET or AF_INET6) is returned, or zero on
|
||||||
/// success.
|
/// failure.
|
||||||
pub fn set(&mut self, ip: &[u8], port: u16) -> u8 {
|
pub fn set(&mut self, ip: &[u8], port: u16) -> u8 {
|
||||||
self.zero();
|
self.zero();
|
||||||
let port = port.to_be();
|
let port = port.to_be();
|
||||||
|
@ -177,12 +177,12 @@ impl InetAddress {
|
||||||
if ip.len() == 4 {
|
if ip.len() == 4 {
|
||||||
self.sin.sin_family = AF_INET.into();
|
self.sin.sin_family = AF_INET.into();
|
||||||
self.sin.sin_port = port.into();
|
self.sin.sin_port = port.into();
|
||||||
copy_nonoverlapping(ip.as_ptr(), (&mut self.sin.sin_addr.s_addr as *mut u32).cast::<u8>(), 4);
|
copy_nonoverlapping(ip.as_ptr(), (&mut self.sin.sin_addr.s_addr as *mut _).cast::<u8>(), 4);
|
||||||
AF_INET
|
AF_INET
|
||||||
} else if ip.len() == 16 {
|
} else if ip.len() == 16 {
|
||||||
self.sin6.sin6_family = AF_INET6.into();
|
self.sin6.sin6_family = AF_INET6.into();
|
||||||
self.sin6.sin6_port = port.into();
|
self.sin6.sin6_port = port.into();
|
||||||
copy_nonoverlapping(ip.as_ptr(), (&mut self.sin6.sin6_addr as *mut in6_addr).cast::<u8>(), 16);
|
copy_nonoverlapping(ip.as_ptr(), (&mut self.sin6.sin6_addr as *mut _).cast::<u8>(), 16);
|
||||||
AF_INET6
|
AF_INET6
|
||||||
} else {
|
} else {
|
||||||
0
|
0
|
||||||
|
@ -195,13 +195,27 @@ impl InetAddress {
|
||||||
pub fn ip_bytes(&self) -> &[u8] {
|
pub fn ip_bytes(&self) -> &[u8] {
|
||||||
unsafe {
|
unsafe {
|
||||||
match self.sa.sa_family as u8 {
|
match self.sa.sa_family as u8 {
|
||||||
AF_INET => &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>(),
|
AF_INET => &*(&self.sin.sin_addr.s_addr as *const _).cast::<[u8; 4]>(),
|
||||||
AF_INET6 => &*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>(),
|
AF_INET6 => &*(&self.sin6.sin6_addr as *const _).cast::<[u8; 16]>(),
|
||||||
_ => &[],
|
_ => &[],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get raw IP bytes packed into a u128.
|
||||||
|
/// Bytes are packed in native endian so the resulting u128 may not be the same between systems.
|
||||||
|
/// This value is intended for local lookup use only.
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn ip_as_native_u128(&self) -> u128 {
|
||||||
|
unsafe {
|
||||||
|
match self.sa.sa_family as u8 {
|
||||||
|
AF_INET => self.sin.sin_addr.s_addr as u128,
|
||||||
|
AF_INET6 => u128::from_ne_bytes(*(&self.sin6.sin6_addr as *const _).cast::<[u8; 16]>()),
|
||||||
|
_ => 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Get the IP port for this InetAddress.
|
/// Get the IP port for this InetAddress.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn port(&self) -> u16 {
|
pub fn port(&self) -> u16 {
|
||||||
|
@ -309,7 +323,7 @@ impl InetAddress {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
AF_INET6 => {
|
AF_INET6 => {
|
||||||
let ip = &*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>();
|
let ip = &*(&(self.sin6.sin6_addr) as *const _).cast::<[u8; 16]>();
|
||||||
if (ip[0] & 0xf0) == 0xf0 {
|
if (ip[0] & 0xf0) == 0xf0 {
|
||||||
if ip[0] == 0xff {
|
if ip[0] == 0xff {
|
||||||
return IpScope::Multicast; // ff00::/8
|
return IpScope::Multicast; // ff00::/8
|
||||||
|
@ -352,10 +366,10 @@ impl InetAddress {
|
||||||
unsafe {
|
unsafe {
|
||||||
match self.sa.sa_family as u8 {
|
match self.sa.sa_family as u8 {
|
||||||
AF_INET => {
|
AF_INET => {
|
||||||
let ip = &*(&self.sin.sin_addr.s_addr as *const u32).cast::<[u8; 4]>();
|
let ip = &*(&self.sin.sin_addr.s_addr as *const _).cast::<[u8; 4]>();
|
||||||
format!("{}.{}.{}.{}", ip[0], ip[1], ip[2], ip[3])
|
format!("{}.{}.{}.{}", ip[0], ip[1], ip[2], ip[3])
|
||||||
}
|
}
|
||||||
AF_INET6 => Ipv6Addr::from(*(&(self.sin6.sin6_addr) as *const in6_addr).cast::<[u8; 16]>()).to_string(),
|
AF_INET6 => Ipv6Addr::from(*(&(self.sin6.sin6_addr) as *const _).cast::<[u8; 16]>()).to_string(),
|
||||||
_ => String::from("(null)")
|
_ => String::from("(null)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -367,17 +381,17 @@ impl InetAddress {
|
||||||
AF_INET => {
|
AF_INET => {
|
||||||
let b = buf.append_bytes_fixed_get_mut::<7>()?;
|
let b = buf.append_bytes_fixed_get_mut::<7>()?;
|
||||||
b[0] = 4;
|
b[0] = 4;
|
||||||
copy_nonoverlapping((&self.sin.sin_addr.s_addr as *const u32).cast::<u8>(), b.as_mut_ptr().offset(1), 4);
|
copy_nonoverlapping((&self.sin.sin_addr.s_addr as *const _).cast::<u8>(), b.as_mut_ptr().offset(1), 4);
|
||||||
b[5] = *(&self.sin.sin_port as *const u16).cast::<u8>();
|
b[5] = *(&self.sin.sin_port as *const _).cast::<u8>();
|
||||||
b[6] = *(&self.sin.sin_port as *const u16).cast::<u8>().offset(1);
|
b[6] = *(&self.sin.sin_port as *const _).cast::<u8>().offset(1);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
AF_INET6 => {
|
AF_INET6 => {
|
||||||
let b = buf.append_bytes_fixed_get_mut::<19>()?;
|
let b = buf.append_bytes_fixed_get_mut::<19>()?;
|
||||||
b[0] = 6;
|
b[0] = 6;
|
||||||
copy_nonoverlapping((&(self.sin6.sin6_addr) as *const in6_addr).cast::<u8>(), b.as_mut_ptr().offset(1), 16);
|
copy_nonoverlapping((&(self.sin6.sin6_addr) as *const _).cast::<u8>(), b.as_mut_ptr().offset(1), 16);
|
||||||
b[17] = *(&self.sin6.sin6_port as *const u16).cast::<u8>();
|
b[17] = *(&self.sin6.sin6_port as *const _).cast::<u8>();
|
||||||
b[18] = *(&self.sin6.sin6_port as *const u16).cast::<u8>().offset(1);
|
b[18] = *(&self.sin6.sin6_port as *const _).cast::<u8>().offset(1);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
_ => buf.append_u8(0)
|
_ => buf.append_u8(0)
|
||||||
|
|
|
@ -123,7 +123,7 @@ pub struct Node {
|
||||||
pub(crate) instance_id: u64,
|
pub(crate) instance_id: u64,
|
||||||
identity: Identity,
|
identity: Identity,
|
||||||
intervals: Mutex<BackgroundTaskIntervals>,
|
intervals: Mutex<BackgroundTaskIntervals>,
|
||||||
paths: DashMap<Endpoint, Arc<Path>>,
|
paths: DashMap<u128, Arc<Path>>,
|
||||||
peers: DashMap<Address, Arc<Peer>>,
|
peers: DashMap<Address, Arc<Peer>>,
|
||||||
roots: Mutex<Vec<Arc<Peer>>>,
|
roots: Mutex<Vec<Arc<Peer>>>,
|
||||||
root_sets: Mutex<Vec<RootSet>>,
|
root_sets: Mutex<Vec<RootSet>>,
|
||||||
|
@ -236,7 +236,7 @@ impl Node {
|
||||||
// Handle packets addressed to this node.
|
// Handle packets addressed to this node.
|
||||||
|
|
||||||
let path = self.path(source_endpoint, source_local_socket, source_local_interface);
|
let path = self.path(source_endpoint, source_local_socket, source_local_interface);
|
||||||
path.log_receive(time_ticks);
|
path.log_receive_anything(time_ticks);
|
||||||
|
|
||||||
if fragment_header.is_fragment() {
|
if fragment_header.is_fragment() {
|
||||||
|
|
||||||
|
@ -251,7 +251,7 @@ impl Node {
|
||||||
let source = source.unwrap();
|
let source = source.unwrap();
|
||||||
let peer = self.peer(source);
|
let peer = self.peer(source);
|
||||||
if peer.is_some() {
|
if peer.is_some() {
|
||||||
peer.unwrap().receive(self, ci, ph, time_ticks, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]);
|
peer.unwrap().receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]);
|
||||||
} else {
|
} else {
|
||||||
self.whois.query(self, ci, source, Some(QueuedPacket::Fragmented(assembled_packet)));
|
self.whois.query(self, ci, source, Some(QueuedPacket::Fragmented(assembled_packet)));
|
||||||
}
|
}
|
||||||
|
@ -270,7 +270,7 @@ impl Node {
|
||||||
let source = source.unwrap();
|
let source = source.unwrap();
|
||||||
let peer = self.peer(source);
|
let peer = self.peer(source);
|
||||||
if peer.is_some() {
|
if peer.is_some() {
|
||||||
peer.unwrap().receive(self, ci, ph, time_ticks, &path, &packet_header, data.as_ref(), &[]);
|
peer.unwrap().receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, data.as_ref(), &[]);
|
||||||
} else {
|
} else {
|
||||||
self.whois.query(self, ci, source, Some(QueuedPacket::Unfragmented(data)));
|
self.whois.query(self, ci, source, Some(QueuedPacket::Unfragmented(data)));
|
||||||
}
|
}
|
||||||
|
@ -315,9 +315,10 @@ impl Node {
|
||||||
/// This is a canonicalizing function that returns a unique path object for every tuple
|
/// This is a canonicalizing function that returns a unique path object for every tuple
|
||||||
/// of endpoint, local socket, and local interface.
|
/// of endpoint, local socket, and local interface.
|
||||||
pub fn path(&self, ep: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> Arc<Path> {
|
pub fn path(&self, ep: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> Arc<Path> {
|
||||||
self.paths.get(ep).map_or_else(|| {
|
let key = Path::local_lookup_key(ep);
|
||||||
|
self.paths.get(&key).map_or_else(|| {
|
||||||
let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface));
|
let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface));
|
||||||
self.paths.insert(ep.clone(), p.clone()).unwrap_or(p) // if another thread added one, return that instead
|
self.paths.insert(key, p.clone()).unwrap_or(p) // if another thread added one, return that instead
|
||||||
}, |path| path.value().clone())
|
}, |path| path.value().clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,13 +7,18 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::hash::Hasher;
|
||||||
|
use std::io::Write;
|
||||||
use std::num::NonZeroI64;
|
use std::num::NonZeroI64;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::sync::atomic::{AtomicI64, Ordering};
|
use std::sync::atomic::{AtomicI64, Ordering};
|
||||||
|
|
||||||
|
use arc_swap::ArcSwap;
|
||||||
|
use highway::HighwayHash;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
|
||||||
use crate::PacketBuffer;
|
use crate::PacketBuffer;
|
||||||
use crate::util::U64NoOpHasher;
|
use crate::util::{highwayhasher, U64NoOpHasher};
|
||||||
use crate::vl1::Endpoint;
|
use crate::vl1::Endpoint;
|
||||||
use crate::vl1::fragmentedpacket::FragmentedPacket;
|
use crate::vl1::fragmentedpacket::FragmentedPacket;
|
||||||
use crate::vl1::node::NodeInterface;
|
use crate::vl1::node::NodeInterface;
|
||||||
|
@ -22,12 +27,18 @@ use crate::vl1::protocol::*;
|
||||||
/// Keepalive interval for paths in milliseconds.
|
/// Keepalive interval for paths in milliseconds.
|
||||||
pub(crate) const PATH_KEEPALIVE_INTERVAL: i64 = 20000;
|
pub(crate) const PATH_KEEPALIVE_INTERVAL: i64 = 20000;
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static mut RANDOM_64BIT_SALT_0: u64 = zerotier_core_crypto::random::next_u64_secure();
|
||||||
|
static mut RANDOM_64BIT_SALT_1: u64 = zerotier_core_crypto::random::next_u64_secure();
|
||||||
|
static mut RANDOM_64BIT_SALT_2: u64 = zerotier_core_crypto::random::next_u64_secure();
|
||||||
|
}
|
||||||
|
|
||||||
/// A remote endpoint paired with a local socket and a local interface.
|
/// A remote endpoint paired with a local socket and a local interface.
|
||||||
/// These are maintained in Node and canonicalized so that all unique paths have
|
/// These are maintained in Node and canonicalized so that all unique paths have
|
||||||
/// one and only one unique path object. That enables statistics to be tracked
|
/// one and only one unique path object. That enables statistics to be tracked
|
||||||
/// for them and uniform application of things like keepalives.
|
/// for them and uniform application of things like keepalives.
|
||||||
pub struct Path {
|
pub struct Path {
|
||||||
endpoint: Endpoint,
|
endpoint: ArcSwap<Endpoint>,
|
||||||
local_socket: Option<NonZeroI64>,
|
local_socket: Option<NonZeroI64>,
|
||||||
local_interface: Option<NonZeroI64>,
|
local_interface: Option<NonZeroI64>,
|
||||||
last_send_time_ticks: AtomicI64,
|
last_send_time_ticks: AtomicI64,
|
||||||
|
@ -36,10 +47,42 @@ pub struct Path {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Path {
|
impl Path {
|
||||||
|
/// Get a 128-bit key to look up this endpoint in the local node path map.
|
||||||
|
#[inline(always)]
|
||||||
|
pub(crate) fn local_lookup_key(endpoint: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> u128 {
|
||||||
|
let local_socket = local_socket.map_or(0, |s| crate::util::hash64_noncrypt(RANDOM_64BIT_SALT_0 + s.get() as u64));
|
||||||
|
let local_interface = local_interface.map_or(0, |s| crate::util::hash64_noncrypt(RANDOM_64BIT_SALT_1 + s.get() as u64));
|
||||||
|
let lsi = (local_socket as u128).wrapping_shl(64) | (local_interface as u128);
|
||||||
|
match endpoint {
|
||||||
|
Endpoint::Nil => 0,
|
||||||
|
Endpoint::ZeroTier(a) => a.to_u64() as u128,
|
||||||
|
Endpoint::Ethernet(m) => (m.to_u64() | 0x0100000000000000) as u128 ^ lsi,
|
||||||
|
Endpoint::WifiDirect(m) => (m.to_u64() | 0x0200000000000000) as u128 ^ lsi,
|
||||||
|
Endpoint::Bluetooth(m) => (m.to_u64() | 0x0400000000000000) as u128 ^ lsi,
|
||||||
|
Endpoint::Ip(ip) => ip.ip_as_native_u128().wrapping_sub(lsi), // naked IP has no port
|
||||||
|
Endpoint::IpUdp(ip) => ip.ip_as_native_u128().wrapping_add(lsi), // UDP maintains one path per IP but merely learns the most recent port
|
||||||
|
Endpoint::IpTcp(ip) => ip.ip_as_native_u128().wrapping_sub(crate::util::hash64_noncrypt((ip.port() as u64).wrapping_add(RANDOM_64BIT_SALT_2)) as u128).wrapping_sub(lsi),
|
||||||
|
Endpoint::Http(s) => {
|
||||||
|
let mut hh = highwayhasher();
|
||||||
|
let _ = hh.write_all(s.as_bytes());
|
||||||
|
let _ = hh.write_u64(local_socket);
|
||||||
|
let _ = hh.write_u64(local_interface);
|
||||||
|
u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() })
|
||||||
|
}
|
||||||
|
Endpoint::WebRTC(b) => {
|
||||||
|
let mut hh = highwayhasher();
|
||||||
|
let _ = hh.write_u64(local_socket);
|
||||||
|
let _ = hh.write_u64(local_interface);
|
||||||
|
let _ = hh.write_all(b.as_slice());
|
||||||
|
u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn new(endpoint: Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> Self {
|
pub fn new(endpoint: Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
endpoint,
|
endpoint: ArcSwap::new(Arc::new(endpoint)),
|
||||||
local_socket,
|
local_socket,
|
||||||
local_interface,
|
local_interface,
|
||||||
last_send_time_ticks: AtomicI64::new(0),
|
last_send_time_ticks: AtomicI64::new(0),
|
||||||
|
@ -49,7 +92,7 @@ impl Path {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn endpoint(&self) -> &Endpoint { &self.endpoint }
|
pub fn endpoint(&self) -> Arc<Endpoint> { self.endpoint.load_full() }
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn local_socket(&self) -> Option<NonZeroI64> { self.local_socket }
|
pub fn local_socket(&self) -> Option<NonZeroI64> { self.local_socket }
|
||||||
|
@ -93,19 +136,40 @@ impl Path {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub(crate) fn log_receive(&self, time_ticks: i64) {
|
pub(crate) fn log_receive_anything(&self, time_ticks: i64) {
|
||||||
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
|
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub(crate) fn log_send(&self, time_ticks: i64) {
|
pub(crate) fn log_receive_authenticated_packet(&self, _bytes: usize, source_endpoint: &Endpoint) {
|
||||||
|
let mut replace = false;
|
||||||
|
match source_endpoint {
|
||||||
|
Endpoint::IpUdp(ip) => {
|
||||||
|
let ep = self.endpoint.load();
|
||||||
|
match ep.as_ref() {
|
||||||
|
Endpoint::IpUdp(ip_orig) => {
|
||||||
|
debug_assert!(ip_orig.ip_bytes().eq(ip.ip_bytes()));
|
||||||
|
if ip_orig.port() != ip.port() {
|
||||||
|
replace = true;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
if replace {
|
||||||
|
self.endpoint.swap(Arc::new(source_endpoint.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
pub(crate) fn log_send_anything(&self, time_ticks: i64) {
|
||||||
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
|
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Desired period between calls to call_every_interval().
|
|
||||||
pub(crate) const CALL_EVERY_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL;
|
pub(crate) const CALL_EVERY_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL;
|
||||||
|
|
||||||
/// Called every INTERVAL during background tasks.
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub(crate) fn call_every_interval<CI: NodeInterface>(&self, ct: &CI, time_ticks: i64) {
|
pub(crate) fn call_every_interval<CI: NodeInterface>(&self, ct: &CI, time_ticks: i64) {
|
||||||
self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION);
|
self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION);
|
||||||
|
|
|
@ -243,29 +243,37 @@ impl Peer {
|
||||||
/// Receive, decrypt, authenticate, and process an incoming packet from this peer.
|
/// Receive, decrypt, authenticate, and process an incoming packet from this peer.
|
||||||
/// If the packet comes in multiple fragments, the fragments slice should contain all
|
/// If the packet comes in multiple fragments, the fragments slice should contain all
|
||||||
/// those fragments after the main packet header and first chunk.
|
/// those fragments after the main packet header and first chunk.
|
||||||
pub(crate) fn receive<CI: NodeInterface, PH: VL1PacketHandler>(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) {
|
pub(crate) fn receive<CI: NodeInterface, PH: VL1PacketHandler>(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) {
|
||||||
let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| {
|
let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| {
|
||||||
let mut payload: Buffer<PACKET_SIZE_MAX> = unsafe { Buffer::new_without_memzero() };
|
let mut payload: Buffer<PACKET_SIZE_MAX> = unsafe { Buffer::new_without_memzero() };
|
||||||
let mut message_id = 0_u64;
|
let mut message_id = 0_u64;
|
||||||
let ephemeral_secret: Option<Arc<EphemeralSymmetricSecret>> = self.ephemeral_secret.lock().clone();
|
let ephemeral_secret: Option<Arc<EphemeralSymmetricSecret>> = self.ephemeral_secret.lock().clone();
|
||||||
let forward_secrecy = if !ephemeral_secret.map_or(false, |ephemeral_secret| try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id)) {
|
let forward_secrecy = if ephemeral_secret.map_or(false, |ephemeral_secret| try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id)) {
|
||||||
|
// Decrypted and authenticated by the ephemeral secret.
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
// There is no ephemeral secret, or authentication with it failed.
|
||||||
unsafe { payload.set_size_unchecked(0); }
|
unsafe { payload.set_size_unchecked(0); }
|
||||||
if !try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id) {
|
if !try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id) {
|
||||||
|
// Static secret also failed, reject packet.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
false
|
false
|
||||||
} else {
|
|
||||||
true
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------
|
||||||
|
// If we made it here it decrypted and passed authentication.
|
||||||
|
// ---------------------------------------------------------------
|
||||||
|
|
||||||
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
|
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
|
||||||
self.total_bytes_received.fetch_add((payload.len() + PACKET_HEADER_SIZE) as u64, Ordering::Relaxed);
|
self.total_bytes_received.fetch_add((payload.len() + PACKET_HEADER_SIZE) as u64, Ordering::Relaxed);
|
||||||
|
source_path.log_receive_authenticated_packet(payload.len() + PACKET_HEADER_SIZE, source_endpoint);
|
||||||
|
|
||||||
debug_assert!(!payload.is_empty()); // should be impossible since this fails in try_aead_decrypt()
|
debug_assert!(!payload.is_empty()); // should be impossible since this fails in try_aead_decrypt()
|
||||||
let mut verb = payload.as_bytes()[0];
|
let mut verb = payload.as_bytes()[0];
|
||||||
|
|
||||||
// If this flag is set, the end of the payload is a full HMAC-SHA384 authentication
|
// If this flag is set, the end of the payload is a full HMAC-SHA384 authentication
|
||||||
// tag for much stronger authentication.
|
// tag for much stronger authentication than is offered by the packet MAC.
|
||||||
let extended_authentication = (verb & VERB_FLAG_EXTENDED_AUTHENTICATION) != 0;
|
let extended_authentication = (verb & VERB_FLAG_EXTENDED_AUTHENTICATION) != 0;
|
||||||
if extended_authentication {
|
if extended_authentication {
|
||||||
if payload.len() >= (1 + SHA384_HASH_SIZE) {
|
if payload.len() >= (1 + SHA384_HASH_SIZE) {
|
||||||
|
@ -295,7 +303,7 @@ impl Peer {
|
||||||
// if it didn't handle the packet, in which case it's handled at VL1. This is
|
// if it didn't handle the packet, in which case it's handled at VL1. This is
|
||||||
// because the most performance critical path is the handling of the ???_FRAME
|
// because the most performance critical path is the handling of the ???_FRAME
|
||||||
// verbs, which are in VL2.
|
// verbs, which are in VL2.
|
||||||
verb &= VERB_MASK;
|
verb &= VERB_MASK; // mask off flags
|
||||||
if !ph.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) {
|
if !ph.handle_packet(self, source_path, forward_secrecy, extended_authentication, verb, &payload) {
|
||||||
match verb {
|
match verb {
|
||||||
//VERB_VL1_NOP => {}
|
//VERB_VL1_NOP => {}
|
||||||
|
@ -461,7 +469,7 @@ impl Peer {
|
||||||
|
|
||||||
explicit_endpoint.map_or_else(|| {
|
explicit_endpoint.map_or_else(|| {
|
||||||
self.path(node).map_or(false, |path| {
|
self.path(node).map_or(false, |path| {
|
||||||
path.log_send(time_ticks);
|
path.log_send_anything(time_ticks);
|
||||||
self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), &packet)
|
self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), &packet)
|
||||||
})
|
})
|
||||||
}, |endpoint| {
|
}, |endpoint| {
|
||||||
|
|
14
zerotier-system-service/Cargo.lock
generated
14
zerotier-system-service/Cargo.lock
generated
|
@ -88,6 +88,12 @@ version = "1.0.45"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ee10e43ae4a853c0a3591d4e2ada1719e553be18199d9da9d4a83f5927c2f5c7"
|
checksum = "ee10e43ae4a853c0a3591d4e2ada1719e553be18199d9da9d4a83f5927c2f5c7"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "arc-swap"
|
||||||
|
version = "1.5.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c5d78ce20460b82d3fa150275ed9d55e21064fc7951177baacf86a145c4a4b1f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-channel"
|
name = "async-channel"
|
||||||
version = "1.6.1"
|
version = "1.6.1"
|
||||||
|
@ -928,6 +934,12 @@ version = "0.4.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
|
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "highway"
|
||||||
|
version = "0.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a310093553e2397bd2936564960446b23233864bbffee554ec5847572e2dfd93"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hkdf"
|
name = "hkdf"
|
||||||
version = "0.10.0"
|
version = "0.10.0"
|
||||||
|
@ -2217,8 +2229,10 @@ dependencies = [
|
||||||
name = "zerotier-network-hypervisor"
|
name = "zerotier-network-hypervisor"
|
||||||
version = "2.0.0"
|
version = "2.0.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"arc-swap",
|
||||||
"base64",
|
"base64",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
|
"highway",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"libc",
|
"libc",
|
||||||
"lz4_flex",
|
"lz4_flex",
|
||||||
|
|
Loading…
Add table
Reference in a new issue