diff --git a/attic/syncwhole/Cargo.toml b/attic/syncwhole/Cargo.toml deleted file mode 100644 index 01203e812..000000000 --- a/attic/syncwhole/Cargo.toml +++ /dev/null @@ -1,37 +0,0 @@ -[package] -name = "syncwhole" -version = "0.1.0" -edition = "2021" -license = "MPL-2.0" -authors = ["Adam Ierymenko "] - -[profile.release] -opt-level = 3 -lto = true -codegen-units = 1 -panic = 'abort' - -[lib] -name = "syncwhole" -path = "src/lib.rs" -doc = true - -[[bin]] -name = "syncwhole_local_testnet" -path = "src/main.rs" -doc = false -required-features = ["include_sha2_lib"] - -[dependencies] -tokio = { version = "^1", features = ["net", "rt", "parking_lot", "time", "io-std", "io-util", "sync", "rt-multi-thread"], default-features = false } -serde = { version = "^1", features = ["derive"], default-features = false } -serde_bytes = "^0" -rmp = "^0" -rmp-serde = "^1" -sha2 = { version = "^0", optional = true } -async-trait = "^0" -futures-core = "^0" -iblt = { version = "^0", path = "../iblt" } - -[features] -include_sha2_lib = ["sha2"] diff --git a/attic/syncwhole/rustfmt.toml b/attic/syncwhole/rustfmt.toml deleted file mode 120000 index 39f97b043..000000000 --- a/attic/syncwhole/rustfmt.toml +++ /dev/null @@ -1 +0,0 @@ -../rustfmt.toml \ No newline at end of file diff --git a/attic/syncwhole/src/datastore.rs b/attic/syncwhole/src/datastore.rs deleted file mode 100644 index 3ef939088..000000000 --- a/attic/syncwhole/src/datastore.rs +++ /dev/null @@ -1,105 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2022 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use async_trait::async_trait; - -/// Size of keys, which is the size of a 512-bit hash. This is a protocol constant. -pub const KEY_SIZE: usize = 64; - -/// Result returned by DataStore::store(). -pub enum StoreResult { - /// Entry was accepted. - Ok, - - /// Entry was a duplicate of one we already have but was otherwise valid. - Duplicate, - - /// Entry was valid but was ignored for an unspecified reason. - Ignored, - - /// Entry was rejected as malformed or otherwise invalid (e.g. failed signature check). - Rejected, -} - -/// API to be implemented by the data set we want to replicate. -/// -/// Keys as used in this API are SHA512 hashes of values. -/// -/// Range queries take an optional subset parameter. The format and interpretation of -/// this is entirely up to the implementer of DataStore. It could contain a time, a SQL -/// query, a set of certificates, anything. Its purpose is to select which items we want -/// from remote nodes so that we can replicate only a subset of a larger set of data. -/// Other nodes can also supply a subset to this one, so it's important that remote subset -/// values supplied to the local data store be handled correctly. -#[async_trait] -pub trait DataStore: Sync + Send { - /// Container for values returned by load(). - /// - /// Making this a trait defined type lets you use Arc<[u8]>, etc. as well as obvious - /// ones like Box<[u8]> and Vec. - type ValueRef: AsRef<[u8]> + Sync + Send + Clone; - - /// Key hash size, always 64 for SHA512. - const KEY_SIZE: usize = KEY_SIZE; - - /// Maximum size of a value in bytes. - const MAX_VALUE_SIZE: usize; - - /// Get the subset that should be sent to remote nodes in queries. - async fn local_subset(&self) -> Option; - - /// Get an item by identity hash key if it exists. - async fn load(&self, key: &[u8; KEY_SIZE]) -> Option; - - /// Store an item in the data store and return its status. - /// - /// Note that no time is supplied here. The data store must determine this in an implementation - /// dependent manner if this is a temporally subjective data store. It could be determined by - /// the wall clock, from the object itself, etc. - /// - /// The key supplied here will always be the SHA512 hash of the value. There is no need to - /// re-compute and check the key, but the value must be validated. - /// - /// Validation of the value and returning the appropriate StoreResult is important to the - /// operation of the synchronization algorithm: - /// - /// StoreResult::Ok - Value was valid and was accepted and saved. - /// - /// StoreResult::Duplicate - Value was valid but is a duplicate of one we already have. - /// - /// StoreResult::Ignored - Value was valid but for some other reason was not saved. - /// - /// StoreResult::Rejected - Value was not valid, causes link to peer to be dropped. - /// - /// Rejected should only be returned if the value actually fails a validity check, signature - /// verification, proof of work check, or some other required criteria. Ignored must be - /// returned if the value is valid but is too old or was rejected for some other normal reason. - async fn store(&self, key: &[u8; KEY_SIZE], value: &[u8]) -> StoreResult; - - /// Iterate through keys in a range. - /// - /// Keys MUST be output in ascending binary sort order. - async fn keys bool>( - &self, - subset: Option<&[u8]>, - range_start: &[u8; KEY_SIZE], - range_end: &[u8; KEY_SIZE], - f: F, - ); - - /// Iterate through values in a range. - /// - /// Entries MUST be output in ascending binary sort order. - async fn values bool>( - &self, - subset: Option<&[u8]>, - range_start: &[u8; KEY_SIZE], - range_end: &[u8; KEY_SIZE], - f: F, - ); -} diff --git a/attic/syncwhole/src/host.rs b/attic/syncwhole/src/host.rs deleted file mode 100644 index 713f026ff..000000000 --- a/attic/syncwhole/src/host.rs +++ /dev/null @@ -1,156 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2022 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use std::net::SocketAddr; -use std::sync::Arc; - -#[cfg(feature = "include_sha2_lib")] -use sha2::digest::{Digest, FixedOutput}; - -use serde::{Deserialize, Serialize}; - -use crate::node::RemoteNodeInfo; - -/// Configuration setttings for a syncwhole node. -#[derive(Serialize, Deserialize, Clone, Eq, PartialEq)] -pub struct Config { - /// A list of peer addresses to which we always want to stay connected. - /// The library will try to maintain connectivity to these regardless of connection limits. - pub anchors: Vec, - - /// A list of other peer addresses that we can try in order to achieve desired_connection_count. - /// If this includes the anchors too there will be no effect since the anchors are tried first anyway. - pub peers: Vec, - - /// The maximum number of TCP connections we should allow. - pub max_connection_count: usize, - - /// The desired number of peering links. - pub desired_connection_count: usize, - - /// Synchronization interval in milliseconds. - pub sync_interval: u64, - - /// Connection inactivity timeout in milliseconds. - pub connection_timeout: u64, - - /// An arbitrary name for this data set to avoid connecting to irrelevant nodes. - pub domain: String, - - /// An optional name for this node to advertise to other nodes. - pub name: String, - - /// An optional contact string for this node to advertise to other nodes. - /// Example: bighead@stanford.edu or https://www.piedpiper.com/ - pub contact: String, -} - -impl Default for Config { - fn default() -> Self { - Self { - anchors: Vec::new(), - peers: Vec::new(), - max_connection_count: 128, - desired_connection_count: 64, - sync_interval: 500, - connection_timeout: 500 * 10, - domain: String::new(), - name: String::new(), - contact: String::new(), - } - } -} - -/// A trait that users of syncwhole implement to provide configuration information and listen for events. -pub trait Host: Sync + Send { - /// Get the current configuration for this node. - fn node_config(&self) -> Arc; - - /// Test whether an inbound connection should be allowed from an address. - /// - /// This is called on first incoming connection before any init is received. The authenticate() - /// method is called once init has been received and is another decision point. The default - /// implementation of this always returns true. - /// - /// This is not called for outbound connections. - #[allow(unused_variables)] - fn allow(&self, remote_address: &SocketAddr) -> bool { - true - } - - /// Compute HMAC-SHA512(secret, challenge). - /// - /// A return of None indicates that the connection should be dropped. If authentication is - /// not enabled, the response should be computed using an all-zero secret key. This is - /// what the default implementation does, so if you don't want authentication there is no - /// need to override and implement this. - /// - /// This actually gets called twice per link: once when Init is received to compute the - /// response, and once when InitResponse is received to verify the response to our challenge. - /// - /// The default implementation authenticates with an all-zero key. Leave it this way if - /// you don't want authentication. - #[allow(unused_variables)] - fn authenticate(&self, info: &RemoteNodeInfo, challenge: &[u8]) -> Option<[u8; 64]> { - Some(Self::hmac_sha512(&[0_u8; 64], challenge)) - } - - /// Called when an attempt is made to connect to a remote address. - fn on_connect_attempt(&self, address: &SocketAddr); - - /// Called when a connection has been successfully established. - /// - /// Hosts are encouraged to learn endpoints when a successful outbound connection is made. Check the - /// inbound flag in the remote node info structure. - fn on_connect(&self, info: &RemoteNodeInfo); - - /// Called when an open connection is closed for any reason. - fn on_connection_closed(&self, info: &RemoteNodeInfo, reason: String); - - /// Fill a buffer with secure random bytes. - /// - /// The implementer must call a secure random number generator or source to implement this. - fn get_secure_random(&self, buf: &mut [u8]); - - /// Compute a SHA512 digest of the input. - /// - /// Input can consist of one or more slices that will be processed in order. - /// - /// If the feature "include_sha2_lib" is enabled a default implementation in terms of the - /// Rust sha2 crate is generated. Otherwise the implementer must supply their own - /// SHA512 function. - #[cfg(not(feature = "include_sha2_lib"))] - fn sha512(msg: &[&[u8]]) -> [u8; 64]; - #[cfg(feature = "include_sha2_lib")] - fn sha512(msg: &[&[u8]]) -> [u8; 64] { - let mut h = sha2::Sha512::new(); - for b in msg.iter() { - h.update(*b); - } - h.finalize_fixed().try_into().unwrap() - } - - /// Compute HMAC-SHA512 using key and input. - /// - /// Supplied key will always be 64 bytes in length. - /// - /// The default implementation is HMAC implemented in terms of sha512() above. Specialize - /// to provide your own implementation. - fn hmac_sha512(key: &[u8], msg: &[u8]) -> [u8; 64] { - let mut opad = [0x5c_u8; 128]; - let mut ipad = [0x36_u8; 128]; - assert!(key.len() >= 64); - for i in 0..64 { - opad[i] ^= key[i]; - } - for i in 0..64 { - ipad[i] ^= key[i]; - } - Self::sha512(&[&opad, &Self::sha512(&[&ipad, msg])]) - } -} diff --git a/attic/syncwhole/src/lib.rs b/attic/syncwhole/src/lib.rs deleted file mode 100644 index 2c7aa018f..000000000 --- a/attic/syncwhole/src/lib.rs +++ /dev/null @@ -1,17 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2022 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -pub(crate) mod protocol; -pub(crate) mod utils; -pub(crate) mod varint; - -pub mod datastore; -pub mod host; -pub mod node; - -pub use async_trait; diff --git a/attic/syncwhole/src/main.rs b/attic/syncwhole/src/main.rs deleted file mode 100644 index b7e98d5c5..000000000 --- a/attic/syncwhole/src/main.rs +++ /dev/null @@ -1,196 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2022 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use std::collections::BTreeMap; -use std::io::{stdout, Write}; -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use std::ops::Bound::Included; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime}; - -use async_trait::async_trait; - -use sha2::digest::Digest; -use sha2::Sha512; - -use syncwhole::datastore::*; -use syncwhole::host::*; -use syncwhole::node::*; -use syncwhole::utils::*; - -const TEST_NODE_COUNT: usize = 8; -const TEST_PORT_RANGE_START: u16 = 21384; -const TEST_STARTING_RECORDS_PER_NODE: usize = 16; - -static mut RANDOM_CTR: u128 = 0; - -fn get_random_bytes(mut buf: &mut [u8]) { - // This is only for testing and is not really secure. - let mut ctr = unsafe { RANDOM_CTR }; - if ctr == 0 { - ctr = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() * (1 + Instant::now().elapsed().as_nanos()); - } - while !buf.is_empty() { - let l = buf.len().min(64); - ctr = ctr.wrapping_add(1); - buf[..l].copy_from_slice(&Sha512::digest(&ctr.to_ne_bytes()).as_slice()[..l]); - buf = &mut buf[l..]; - } - unsafe { RANDOM_CTR = ctr }; -} - -pub struct TestNodeHost { - pub name: String, - pub config: Config, - pub records: tokio::sync::RwLock>, -} - -impl TestNodeHost { - pub fn new_random(test_no: usize) -> Self { - let mut s = BTreeMap::new(); - for _ in 0..TEST_STARTING_RECORDS_PER_NODE { - let mut v = [0_u8; 64]; - get_random_bytes(&mut v); - let k = Self::sha512(&[&v]); - s.insert(k, v); - } - Self { - name: test_no.to_string(), - config: Config::default(), - records: tokio::sync::RwLock::new(s), - } - } -} - -impl Host for TestNodeHost { - fn node_config(&self) -> Config { - self.config.clone() - } - - fn on_connect_attempt(&self, _address: &SocketAddr) { - //println!("{:5}: connecting to {}", self.name, _address.to_string()); - } - - fn on_connect(&self, info: &RemoteNodeInfo) { - //println!("{:5}: connected to {} ({}, {})", self.name, info.remote_address.to_string(), info.node_name.as_ref().map_or("null", |s| s.as_str()), if info.inbound { "inbound" } else { "outbound" }); - } - - fn on_connection_closed(&self, info: &RemoteNodeInfo, reason: String) { - //println!("{:5}: closed connection to {}: {} ({}, {})", self.name, info.remote_address.to_string(), reason, if info.inbound { "inbound" } else { "outbound" }, if info.initialized { "initialized" } else { "not initialized" }); - } - - fn get_secure_random(&self, buf: &mut [u8]) { - // This is only for testing and is not really secure. - get_random_bytes(buf); - } -} - -#[async_trait] -impl DataStore for TestNodeHost { - type ValueRef = [u8; 64]; - - const MAX_VALUE_SIZE: usize = 1024; - - fn clock(&self) -> i64 { - ms_since_epoch() - } - - fn domain(&self) -> &str { - "test" - } - - async fn load(&self, key: &[u8; 64]) -> Option { - let records = self.records.read().await; - let value = records.get(key); - if value.is_some() { - Some(value.unwrap().clone()) - } else { - None - } - } - - async fn store(&self, key: &[u8; 64], value: &[u8]) -> StoreResult { - let value: [u8; 64] = value.try_into(); - if value.is_ok() { - if self.records.write().await.insert(key.clone(), value).is_none() { - StoreResult::Ok - } else { - StoreResult::Duplicate - } - } else { - StoreResult::Rejected - } - } - - async fn keys_under bool>(&self, reference_time: i64, prefix: u64, prefix_bits: u32, f: F) { - let (start, end) = prefix_to_range(prefix, prefix_bits); - let records = self.records.read().await; - for (k, v) in records.range((Included(start), Included(end))) { - if !f(k) { - break; - } - } - } -} - -fn main() { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(async { - println!( - "Running syncwhole local self-test network with {} nodes starting at 127.0.0.1:{}", - TEST_NODE_COUNT, TEST_PORT_RANGE_START - ); - println!(); - - println!("Starting nodes on 127.0.0.1..."); - let mut nodes: Vec> = Vec::with_capacity(TEST_NODE_COUNT); - for port in TEST_PORT_RANGE_START..(TEST_PORT_RANGE_START + (TEST_NODE_COUNT as u16)) { - let mut peers: Vec = Vec::with_capacity(TEST_NODE_COUNT); - for port2 in TEST_PORT_RANGE_START..(TEST_PORT_RANGE_START + (TEST_NODE_COUNT as u16)) { - if port != port2 { - peers.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port2))); - } - } - let mut th = TestNodeHost::new_random(port as usize); - th.config.anchors = peers; - th.config.name = port.to_string(); - let nh = Arc::new(th); - //println!("Starting node on 127.0.0.1:{}...", port, nh.db.lock().unwrap().len()); - nodes.push( - Node::new(nh.clone(), nh.clone(), SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port))) - .await - .unwrap(), - ); - } - - print!("Waiting for all connections to be established..."); - let _ = stdout().flush(); - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - let mut count = 0; - for n in nodes.iter() { - count += n.connection_count().await; - } - if count == (TEST_NODE_COUNT * (TEST_NODE_COUNT - 1)) { - println!(" {} connections up.", count); - break; - } else { - print!("."); - let _ = stdout().flush(); - } - } - - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - } - }); -} diff --git a/attic/syncwhole/src/node.rs b/attic/syncwhole/src/node.rs deleted file mode 100644 index a23c15521..000000000 --- a/attic/syncwhole/src/node.rs +++ /dev/null @@ -1,701 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2022 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use std::collections::{HashMap, HashSet}; -use std::io::IoSlice; -use std::mem::MaybeUninit; -use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; -use std::ops::Add; -use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; -use std::sync::Arc; - -use iblt::IBLT; - -use serde::{Deserialize, Serialize}; - -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; -use tokio::net::{TcpListener, TcpSocket, TcpStream}; -use tokio::sync::Mutex; -use tokio::task::JoinHandle; -use tokio::time::{Duration, Instant}; - -use crate::datastore::*; -use crate::host::Host; -use crate::protocol::*; -use crate::utils::*; -use crate::varint; - -// Interval for announcing queued HaveRecords items, in milliseconds. -const ANNOUNCE_PERIOD: i64 = 100; - -/// Information about a remote node to which we are connected. -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct RemoteNodeInfo { - /// Optional name advertised by remote node (arbitrary). - pub name: String, - - /// Optional contact information advertised by remote node (arbitrary). - pub contact: String, - - /// Actual remote endpoint address. - pub remote_address: SocketAddr, - - /// Explicitly advertised remote addresses supplied by remote node (not necessarily verified). - pub explicit_addresses: Vec, - - /// Time TCP connection was established (ms since epoch). - pub connect_time: i64, - - /// Time TCP connection was estaablished (ms, monotonic). - pub connect_instant: i64, - - /// True if this is an inbound TCP connection. - pub inbound: bool, - - /// True if this connection has exchanged init messages successfully. - pub initialized: bool, -} - -/// An instance of the syncwhole data set synchronization engine. -/// -/// This holds a number of async tasks that are terminated or aborted if this object -/// is dropped. In other words this implements structured concurrency. -pub struct Node { - internal: Arc>, - housekeeping_task: JoinHandle<()>, - announce_task: JoinHandle<()>, - listener_task: JoinHandle<()>, -} - -impl Node { - pub async fn new(db: Arc, host: Arc, bind_address: SocketAddr) -> std::io::Result { - let listener = if bind_address.is_ipv4() { - TcpSocket::new_v4() - } else { - TcpSocket::new_v6() - }?; - configure_tcp_socket(&listener)?; - listener.bind(bind_address.clone())?; - let listener = listener.listen(1024)?; - - let internal = Arc::new(NodeInternal:: { - anti_loopback_secret: { - let mut tmp = [0_u8; 64]; - host.get_secure_random(&mut tmp); - tmp - }, - datastore: db.clone(), - host: host.clone(), - connections: Mutex::new(HashMap::new()), - connecting_to: Mutex::new(HashSet::new()), - announce_queue: Mutex::new(HashMap::new()), - bind_address, - starting_instant: Instant::now(), - }); - - Ok(Self { - internal: internal.clone(), - housekeeping_task: tokio::spawn(internal.clone().housekeeping_task_main()), - announce_task: tokio::spawn(internal.clone().announce_task_main()), - listener_task: tokio::spawn(internal.listener_task_main(listener)), - }) - } - - #[inline(always)] - pub fn datastore(&self) -> &Arc { - &self.internal.datastore - } - - #[inline(always)] - pub fn host(&self) -> &Arc { - &self.internal.host - } - - /// Attempt to connect to an explicitly specified TCP endpoint. - /// - /// Ok(true) is returned if a new connection was made. Ok(false) means there is already a connection - /// to the endpoint. An error is returned if the connection fails. - pub async fn connect(&self, address: &SocketAddr) -> std::io::Result { - if self.internal.connecting_to.lock().await.insert(address.clone()) { - self.internal - .connect( - address, - Instant::now().add(Duration::from_millis(self.internal.host.node_config().connection_timeout)), - ) - .await - } else { - Ok(false) - } - } - - /// Get a list of all open peer to peer connections. - pub async fn list_connections(&self) -> Vec { - let connections = self.internal.connections.lock().await; - let mut cl: Vec = Vec::with_capacity(connections.len()); - for (_, c) in connections.iter() { - cl.push(c.info.lock().unwrap().clone()); - } - cl - } - - /// Get the number of open peer to peer connections. - pub async fn connection_count(&self) -> usize { - self.internal.connections.lock().await.len() - } -} - -impl Drop for Node { - fn drop(&mut self) { - self.housekeeping_task.abort(); - self.announce_task.abort(); - self.listener_task.abort(); - } -} - -/********************************************************************************************************************/ - -fn configure_tcp_socket(socket: &TcpSocket) -> std::io::Result<()> { - let _ = socket.set_linger(None); - if socket.set_reuseport(true).is_ok() { - Ok(()) - } else { - socket.set_reuseaddr(true) - } -} - -fn decode_msgpack<'a, T: Deserialize<'a>>(b: &'a [u8]) -> std::io::Result { - rmp_serde::from_slice(b).map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("invalid msgpack object: {}", e.to_string()), - ) - }) -} - -pub struct NodeInternal { - // Secret used to perform HMAC to detect and drop loopback connections to self. - anti_loopback_secret: [u8; 64], - - // Outside code implementations of DataStore and Host traits. - datastore: Arc, - host: Arc, - - // Connections and their task join handles, by remote endpoint address. - connections: Mutex>>, - - // Outgoing connections in progress. - connecting_to: Mutex>, - - // Records received since last announce and the endpoints that we know already have them. - announce_queue: Mutex>>, - - // Local address to which this node is bound - bind_address: SocketAddr, - - // Instant this node started. - starting_instant: Instant, -} - -impl NodeInternal { - fn ms_monotonic(&self) -> i64 { - Instant::now().duration_since(self.starting_instant).as_millis() as i64 - } - - async fn housekeeping_task_main(self: Arc) { - let tasks = AsyncTaskReaper::new(); - let mut sleep_for = Duration::from_millis(500); - loop { - tokio::time::sleep(sleep_for).await; - - let config = self.host.node_config(); - let mut connections = self.connections.lock().await; - let mut connecting_to = self.connecting_to.lock().await; - let now = self.ms_monotonic(); - - // Drop dead or timed out connections, and for live connections handle sending sync requests. - connections.retain(|_, c| { - if !c.closed.load(Ordering::Relaxed) { - let cc = c.clone(); - if (now - c.last_receive_time.load(Ordering::Relaxed)) < (config.connection_timeout as i64) { - // TODO: sync init if not waiting for a sync response - true // keep connection - } else { - let _ = c.read_task.lock().unwrap().take().map(|j| j.abort()); - let host = self.host.clone(); - tasks.spawn(async move { - host.on_connection_closed(&*cc.info.lock().unwrap(), "timeout".to_string()); - }); - false // discard connection - } - } else { - let host = self.host.clone(); - let cc = c.clone(); - let j = c.read_task.lock().unwrap().take(); - tasks.spawn(async move { - if j.is_some() { - let e = j.unwrap().await; - if e.is_ok() { - let e = e.unwrap(); - host.on_connection_closed( - &*cc.info.lock().unwrap(), - e.map_or_else(|e| e.to_string(), |_| "unknown error".to_string()), - ); - } else { - host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string()); - } - } else { - host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string()); - } - }); - false // discard connection - } - }); - - let connect_timeout_at = Instant::now().add(Duration::from_millis(config.connection_timeout)); - - // Always try to connect to anchor peers. - for sa in config.anchors.iter() { - if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) { - let self2 = self.clone(); - let sa = sa.clone(); - tasks.spawn(async move { - let _ = self2.connect(&sa, connect_timeout_at).await; - }); - } - } - - // Try to connect to more peers until desired connection count is reached. - let desired_connection_count = config.desired_connection_count.min(config.max_connection_count); - for sa in config.peers.iter() { - if (connections.len() + connecting_to.len()) >= desired_connection_count { - break; - } - if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) { - let self2 = self.clone(); - let sa = sa.clone(); - tasks.spawn(async move { - let _ = self2.connect(&sa, connect_timeout_at).await; - }); - } - } - - sleep_for = Duration::from_millis(config.sync_interval.min(config.connection_timeout)); - } - } - - async fn announce_task_main(self: Arc) { - let sleep_for = Duration::from_millis(ANNOUNCE_PERIOD as u64); - let mut to_announce: Vec<([u8; KEY_SIZE], Vec)> = Vec::new(); - let background_tasks = AsyncTaskReaper::new(); - let announce_timeout = Duration::from_millis(self.host.node_config().connection_timeout); - loop { - tokio::time::sleep(sleep_for).await; - - for (key, already_has) in self.announce_queue.lock().await.drain() { - to_announce.push((key, already_has)); - } - - let now = self.ms_monotonic(); - let have_records_est_size = (to_announce.len() * KEY_SIZE) + 2; - let mut have_records: Vec = Vec::with_capacity(have_records_est_size); - for c in self.connections.lock().await.iter() { - if c.1.announce_new_records.load(Ordering::Relaxed) { - for (key, already_has) in to_announce.iter() { - if !already_has.contains(c.0) { - let _ = std::io::Write::write_all(&mut have_records, key); - } - } - if !have_records.is_empty() { - let c2 = c.1.clone(); - background_tasks.spawn(async move { - // If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to - // write buffering but a short timeout prevents them from building up too much. - let _ = tokio::time::timeout( - announce_timeout, - c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now), - ); - }); - have_records = Vec::with_capacity(have_records_est_size); - } - } - } - - to_announce.clear(); - } - } - - async fn listener_task_main(self: Arc, listener: TcpListener) { - loop { - let socket = listener.accept().await; - if socket.is_ok() { - let (stream, address) = socket.unwrap(); - if self.host.allow(&address) { - let config = self.host.node_config(); - if self.connections.lock().await.len() < config.max_connection_count || config.anchors.contains(&address) { - Self::connection_start(&self, address, stream, true).await; - } - } - } - } - } - - /// Internal connection method. - /// - /// Note that this does not add the address to connecting_to. Instead it's done by the caller - /// to avoid races and similar things. It is removed from connecting_to once the connection - /// either succeeds or fails. - async fn connect(self: &Arc, address: &SocketAddr, deadline: Instant) -> std::io::Result { - let f = async { - let stream = if address.is_ipv4() { - TcpSocket::new_v4() - } else { - TcpSocket::new_v6() - }?; - configure_tcp_socket(&stream)?; - stream.bind(self.bind_address.clone())?; - - let stream = tokio::time::timeout_at(deadline, stream.connect(address.clone())); - self.host.on_connect_attempt(address); - let stream = stream.await; - - if stream.is_ok() { - Ok(self.connection_start(address.clone(), stream.unwrap()?, false).await) - } else { - Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out")) - } - }; - let r = f.await; - let _ = self.connecting_to.lock().await.remove(address); - r - } - - /// Initialize and start a connection whether incoming or outgoing. - async fn connection_start(self: &Arc, address: SocketAddr, stream: TcpStream, inbound: bool) -> bool { - let mut ok = false; - let _ = self.connections.lock().await.entry(address.clone()).or_insert_with(|| { - ok = true; - let _ = stream.set_nodelay(false); - let (reader, writer) = stream.into_split(); - let now = self.ms_monotonic(); - let connection = Arc::new(Connection { - writer: Mutex::new(writer), - last_send_time: AtomicI64::new(now), - last_receive_time: AtomicI64::new(now), - info: std::sync::Mutex::new(RemoteNodeInfo { - name: String::new(), - contact: String::new(), - remote_address: address.clone(), - explicit_addresses: Vec::new(), - connect_time: ms_since_epoch(), - connect_instant: now, - inbound, - initialized: false, - }), - read_task: std::sync::Mutex::new(None), - announce_new_records: AtomicBool::new(false), - closed: AtomicBool::new(false), - }); - let self2 = self.clone(); - let c2 = connection.clone(); - connection.read_task.lock().unwrap().replace(tokio::spawn(async move { - let result = self2.connection_io_task_main(&c2, address, reader).await; - c2.closed.store(true, Ordering::Relaxed); - result - })); - connection - }); - ok - } - - async fn connection_io_task_main( - self: Arc, - connection: &Arc, - remote_address: SocketAddr, - mut reader: OwnedReadHalf, - ) -> std::io::Result<()> { - const BUF_CHUNK_SIZE: usize = 4096; - const READ_BUF_INITIAL_SIZE: usize = 65536; // should be a multiple of BUF_CHUNK_SIZE - - let mut write_buffer: Vec = Vec::with_capacity(BUF_CHUNK_SIZE); - let mut read_buffer: Vec = Vec::new(); - read_buffer.resize(READ_BUF_INITIAL_SIZE, 0); - - let config = self.host.node_config(); - let mut anti_loopback_challenge_sent = [0_u8; 64]; - let mut domain_challenge_sent = [0_u8; 64]; - let mut auth_challenge_sent = [0_u8; 64]; - self.host.get_secure_random(&mut anti_loopback_challenge_sent); - self.host.get_secure_random(&mut domain_challenge_sent); - self.host.get_secure_random(&mut auth_challenge_sent); - connection - .send_obj( - &mut write_buffer, - MessageType::Init, - &msg::Init { - anti_loopback_challenge: &anti_loopback_challenge_sent, - domain_challenge: &domain_challenge_sent, - auth_challenge: &auth_challenge_sent, - node_name: config.name.as_str(), - node_contact: config.contact.as_str(), - locally_bound_port: self.bind_address.port(), - explicit_ipv4: None, - explicit_ipv6: None, - }, - self.ms_monotonic(), - ) - .await?; - drop(config); - - let max_message_size = ((D::MAX_VALUE_SIZE * 8) + (D::KEY_SIZE * 1024) + 65536) as u64; // sanity limit - let mut initialized = false; - let mut init_received = false; - let mut buffer_fill = 0_usize; - loop { - let message_type: MessageType; - let message_size: usize; - let header_size: usize; - let total_size: usize; - loop { - buffer_fill += reader.read(&mut read_buffer.as_mut_slice()[buffer_fill..]).await?; - if buffer_fill >= 2 { - // type and at least one byte of varint - let ms = varint::decode(&read_buffer.as_slice()[1..]); - if ms.1 > 0 { - // varint is all there and parsed correctly - if ms.0 > max_message_size { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "message too large")); - } - - message_type = MessageType::from(*read_buffer.get(0).unwrap()); - message_size = ms.0 as usize; - header_size = 1 + ms.1; - total_size = header_size + message_size; - - if read_buffer.len() < total_size { - read_buffer.resize(((total_size / BUF_CHUNK_SIZE) + 1) * BUF_CHUNK_SIZE, 0); - } - while buffer_fill < total_size { - buffer_fill += reader.read(&mut read_buffer.as_mut_slice()[buffer_fill..]).await?; - } - - break; - } - } - } - let message = &read_buffer.as_slice()[header_size..total_size]; - - let now = self.ms_monotonic(); - connection.last_receive_time.store(now, Ordering::Relaxed); - - match message_type { - MessageType::Nop => {} - - MessageType::Init => { - if init_received { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "duplicate init")); - } - init_received = true; - - let msg: msg::Init = decode_msgpack(message)?; - let (anti_loopback_response, domain_challenge_response, auth_challenge_response) = { - let mut info = connection.info.lock().unwrap(); - info.name = msg.node_name.to_string(); - info.contact = msg.node_contact.to_string(); - let _ = msg.explicit_ipv4.map(|pv4| { - info.explicit_addresses - .push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(pv4.ip), pv4.port))); - }); - let _ = msg.explicit_ipv6.map(|pv6| { - info.explicit_addresses - .push(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(pv6.ip), pv6.port, 0, 0))); - }); - let info = info.clone(); - - let auth_challenge_response = self.host.authenticate(&info, msg.auth_challenge); - if auth_challenge_response.is_none() { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "authenticate() returned None, connection dropped", - )); - } - let auth_challenge_response = auth_challenge_response.unwrap(); - - ( - H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge), - H::hmac_sha512(&H::sha512(&[self.host.node_config().domain.as_bytes()]), msg.domain_challenge), - auth_challenge_response, - ) - }; - - connection - .send_obj( - &mut write_buffer, - MessageType::InitResponse, - &msg::InitResponse { - anti_loopback_response: &anti_loopback_response, - domain_response: &domain_challenge_response, - auth_response: &auth_challenge_response, - }, - now, - ) - .await?; - } - - MessageType::InitResponse => { - let msg: msg::InitResponse = decode_msgpack(message)?; - let mut info = connection.info.lock().unwrap(); - - if info.initialized { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "duplicate init response")); - } - - if msg - .anti_loopback_response - .eq(&H::hmac_sha512(&self.anti_loopback_secret, &anti_loopback_challenge_sent)) - { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "rejected connection to self")); - } - if !msg.domain_response.eq(&H::hmac_sha512( - &H::sha512(&[self.host.node_config().domain.as_bytes()]), - &domain_challenge_sent, - )) { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "domain mismatch")); - } - if !self - .host - .authenticate(&info, &auth_challenge_sent) - .map_or(false, |cr| msg.auth_response.eq(&cr)) - { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "challenge/response authentication failed", - )); - } - - initialized = true; - info.initialized = true; - - let info = info.clone(); // also releases lock since info is replaced/destroyed - self.host.on_connect(&info); - } - - // Handle messages other than INIT and INIT_RESPONSE after checking 'initialized' flag. - _ => { - if !initialized { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "init exchange must be completed before other messages are sent", - )); - } - - match message_type { - MessageType::HaveRecords => {} - - MessageType::GetRecords => {} - - MessageType::Record => { - let key = H::sha512(&[message]); - match self.datastore.store(&key, message).await { - StoreResult::Ok => { - let mut q = self.announce_queue.lock().await; - let ql = q.entry(key).or_insert_with(|| Vec::with_capacity(2)); - if !ql.contains(&remote_address) { - ql.push(remote_address.clone()); - } - } - StoreResult::Rejected => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("record rejected by data store: {}", to_hex_string(&key)), - )); - } - _ => {} - } - } - - MessageType::SyncRequest => { - let msg: msg::SyncRequest = decode_msgpack(message)?; - } - - MessageType::Sync => { - let msg: msg::Sync = decode_msgpack(message)?; - } - - _ => {} - } - } - } - - read_buffer.copy_within(total_size..buffer_fill, 0); - buffer_fill -= total_size; - } - } -} - -impl Drop for NodeInternal { - fn drop(&mut self) { - let _ = tokio::runtime::Handle::try_current().map_or_else( - |_| { - for (_, c) in self.connections.blocking_lock().drain() { - c.read_task.lock().unwrap().as_mut().map(|c| c.abort()); - } - }, - |h| { - let _ = h.block_on(async { - for (_, c) in self.connections.lock().await.drain() { - c.read_task.lock().unwrap().as_mut().map(|c| c.abort()); - } - }); - }, - ); - } -} - -struct Connection { - writer: Mutex, - last_send_time: AtomicI64, - last_receive_time: AtomicI64, - info: std::sync::Mutex, - read_task: std::sync::Mutex>>>, - announce_new_records: AtomicBool, - closed: AtomicBool, -} - -impl Connection { - async fn send_msg(&self, message_type: MessageType, data: &[u8], now: i64) -> std::io::Result<()> { - let mut header: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() }; - header[0] = message_type as u8; - let header_size = 1 + varint::encode(&mut header[1..], data.len() as u64); - if self - .writer - .lock() - .await - .write_vectored(&[IoSlice::new(&header[0..header_size]), IoSlice::new(data)]) - .await? - == (data.len() + header_size) - { - self.last_send_time.store(now, Ordering::Relaxed); - Ok(()) - } else { - Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "write error")) - } - } - - async fn send_obj(&self, write_buf: &mut Vec, message_type: MessageType, obj: &O, now: i64) -> std::io::Result<()> { - write_buf.clear(); - if rmp_serde::encode::write_named(write_buf, obj).is_ok() { - self.send_msg(message_type, write_buf.as_slice(), now).await - } else { - Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "serialize failure (internal error)", - )) - } - } -} diff --git a/attic/syncwhole/src/protocol.rs b/attic/syncwhole/src/protocol.rs deleted file mode 100644 index 30f0315a9..000000000 --- a/attic/syncwhole/src/protocol.rs +++ /dev/null @@ -1,180 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2022 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -#[derive(Clone, Copy, Eq, PartialEq)] -#[repr(u8)] -pub enum MessageType { - /// No operation, payload ignored. - Nop = 0_u8, - - /// msg::Init (msgpack) - Init = 1_u8, - - /// msg::InitResponse (msgpack) - InitResponse = 2_u8, - - /// [...] - HaveRecords = 3_u8, - - /// [...] - GetRecords = 4_u8, - - /// - Record = 5_u8, - - /// msg::SyncRequest (msgpack) - SyncRequest = 6_u8, - - /// msg::Sync (msgpack) - Sync = 7_u8, -} - -const MESSAGE_TYPE_MAX: u8 = 7; - -impl From for MessageType { - /// Get a type from a byte, returning the Nop type if the byte is out of range. - #[inline(always)] - fn from(b: u8) -> Self { - if b <= MESSAGE_TYPE_MAX { - unsafe { std::mem::transmute(b) } - } else { - Self::Nop - } - } -} - -impl MessageType { - #[allow(unused)] - pub fn name(&self) -> &'static str { - match *self { - Self::Nop => "NOP", - Self::Init => "INIT", - Self::InitResponse => "INIT_RESPONSE", - Self::HaveRecords => "HAVE_RECORDS", - Self::GetRecords => "GET_RECORDS", - Self::Record => "RECORD", - Self::SyncRequest => "SYNC_REQUEST", - Self::Sync => "SYNC", - } - } -} - -/// Msgpack serializable message types. -/// Some that are frequently transferred use shortened names to save bandwidth. -pub mod msg { - use serde::{Deserialize, Serialize}; - - #[derive(Serialize, Deserialize)] - pub struct IPv4 { - pub ip: [u8; 4], - pub port: u16, - } - - #[derive(Serialize, Deserialize)] - pub struct IPv6 { - pub ip: [u8; 16], - pub port: u16, - } - - #[derive(Serialize, Deserialize)] - pub struct Init<'a> { - /// A random challenge to be hashed with a secret to detect and drop connections to self. - #[serde(with = "serde_bytes")] - pub anti_loopback_challenge: &'a [u8], - - /// A random challenge for checking the data set domain. - #[serde(with = "serde_bytes")] - pub domain_challenge: &'a [u8], - - /// A random challenge for login/authentication. - #[serde(with = "serde_bytes")] - pub auth_challenge: &'a [u8], - - /// Optional name to advertise for this node. - pub node_name: &'a str, - - /// Optional contact information for this node, such as a URL or an e-mail address. - pub node_contact: &'a str, - - /// Port to which this node has locally bound. - /// This is used to try to auto-detect whether a NAT is in the way. - pub locally_bound_port: u16, - - /// An IPv4 address where this node can be reached. - /// If both explicit_ipv4 and explicit_ipv6 are omitted the physical source IP:port may be used. - pub explicit_ipv4: Option, - - /// An IPv6 address where this node can be reached. - /// If both explicit_ipv4 and explicit_ipv6 are omitted the physical source IP:port may be used. - pub explicit_ipv6: Option, - } - - #[derive(Serialize, Deserialize)] - pub struct InitResponse<'a> { - /// HMAC-SHA512(local secret, anti_loopback_challenge) to detect and drop loops. - #[serde(with = "serde_bytes")] - pub anti_loopback_response: &'a [u8], - - /// HMAC-SHA512(SHA512(domain), domain_challenge) to check that the data set domain matches. - #[serde(with = "serde_bytes")] - pub domain_response: &'a [u8], - - /// HMAC-SHA512(secret, challenge) for authentication. (If auth is not enabled, an all-zero secret is used.) - #[serde(with = "serde_bytes")] - pub auth_response: &'a [u8], - } - - #[derive(Serialize, Deserialize)] - pub struct SyncRequest<'a> { - /// Starting range to query, padded with zeroes if shorter than KEY_SIZE. - #[serde(with = "serde_bytes")] - #[serde(rename = "s")] - pub range_start: &'a [u8], - - /// Ending range to query, padded with 0xff if shorter than KEY_SIZE. - #[serde(with = "serde_bytes")] - #[serde(rename = "e")] - pub range_end: &'a [u8], - - /// Data-store-specific subset selector indicating what subset of items desired - #[serde(with = "serde_bytes")] - #[serde(rename = "q")] - pub subset: Option<&'a [u8]>, - } - - #[derive(Serialize, Deserialize)] - pub struct Sync<'a> { - /// Starting range summarized, padded with zeroes if shorter than KEY_SIZE. - #[serde(with = "serde_bytes")] - #[serde(rename = "s")] - pub range_start: &'a [u8], - - /// Ending range summarized, padded with 0xff if shorter than KEY_SIZE. - #[serde(with = "serde_bytes")] - #[serde(rename = "e")] - pub range_end: &'a [u8], - - /// Data-store-specific subset selector indicating what subset of items were included - #[serde(with = "serde_bytes")] - #[serde(rename = "q")] - pub subset: Option<&'a [u8]>, - - /// Number of buckets in IBLT - #[serde(rename = "b")] - pub iblt_buckets: usize, - - /// Number of bytes in each IBLT item (key prefix) - #[serde(rename = "l")] - pub iblt_item_bytes: usize, - - /// Set summary for keys under prefix within subset - #[serde(with = "serde_bytes")] - #[serde(rename = "i")] - pub iblt: &'a [u8], - } -} diff --git a/attic/syncwhole/src/utils.rs b/attic/syncwhole/src/utils.rs deleted file mode 100644 index da919dd4f..000000000 --- a/attic/syncwhole/src/utils.rs +++ /dev/null @@ -1,129 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2022 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use std::collections::HashMap; -use std::future::Future; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::SystemTime; - -use tokio::task::JoinHandle; - -/// Get the real time clock in milliseconds since Unix epoch. -pub fn ms_since_epoch() -> i64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as i64 -} - -/// Encode a byte slice to a hexadecimal string. -pub fn to_hex_string(b: &[u8]) -> String { - const HEX_CHARS: [u8; 16] = [ - b'0', b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9', b'a', b'b', b'c', b'd', b'e', b'f', - ]; - let mut s = String::new(); - s.reserve(b.len() * 2); - for c in b { - let x = *c as usize; - s.push(HEX_CHARS[x >> 4] as char); - s.push(HEX_CHARS[x & 0xf] as char); - } - s -} - -#[inline(always)] -pub fn xorshift64(mut x: u64) -> u64 { - x ^= x.wrapping_shl(13); - x ^= x.wrapping_shr(7); - x ^= x.wrapping_shl(17); - x -} - -#[inline(always)] -pub fn splitmix64(mut x: u64) -> u64 { - x ^= x.wrapping_shr(30); - x = x.wrapping_mul(0xbf58476d1ce4e5b9); - x ^= x.wrapping_shr(27); - x = x.wrapping_mul(0x94d049bb133111eb); - x ^= x.wrapping_shr(31); - x -} - -/* -#[inline(always)] -pub fn splitmix64_inverse(mut x: u64) -> u64 { - x ^= x.wrapping_shr(31) ^ x.wrapping_shr(62); - x = x.wrapping_mul(0x319642b2d24d8ec3); - x ^= x.wrapping_shr(27) ^ x.wrapping_shr(54); - x = x.wrapping_mul(0x96de1b173f119089); - x ^= x.wrapping_shr(30) ^ x.wrapping_shr(60); - x -} -*/ - -static mut RANDOM_STATE_0: u64 = 0; -static mut RANDOM_STATE_1: u64 = 0; - -/// Get a non-cryptographic pseudorandom number. -pub fn random() -> u64 { - let (mut s0, mut s1) = unsafe { (RANDOM_STATE_0, RANDOM_STATE_1) }; - if s0 == 0 { - s0 = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64; - } - if s1 == 0 { - s1 = splitmix64(std::process::id() as u64); - } - let s1_new = xorshift64(s1); - s0 = splitmix64(s0.wrapping_add(s1)); - s1 = s1_new; - unsafe { - RANDOM_STATE_0 = s0; - RANDOM_STATE_1 = s1; - }; - s0 -} - -/// Wrapper for tokio::spawn() that aborts tasks not yet completed when it is dropped. -pub struct AsyncTaskReaper { - ctr: AtomicUsize, - handles: Arc>>>, -} - -impl AsyncTaskReaper { - pub fn new() -> Self { - Self { - ctr: AtomicUsize::new(0), - handles: Arc::new(std::sync::Mutex::new(HashMap::new())), - } - } - - /// Spawn a new task. - /// - /// Note that currently any task output is ignored. This is for fire and forget - /// background tasks that you want to be collected on loss of scope. - pub fn spawn(&self, future: F) { - let id = self.ctr.fetch_add(1, Ordering::Relaxed); - let handles = self.handles.clone(); - self.handles.lock().unwrap().insert( - id, - tokio::spawn(async move { - let _ = future.await; - let _ = handles.lock().unwrap().remove(&id); - }), - ); - } -} - -impl Drop for AsyncTaskReaper { - fn drop(&mut self) { - for (_, h) in self.handles.lock().unwrap().iter() { - h.abort(); - } - } -} diff --git a/attic/syncwhole/src/varint.rs b/attic/syncwhole/src/varint.rs deleted file mode 100644 index 06c040e7f..000000000 --- a/attic/syncwhole/src/varint.rs +++ /dev/null @@ -1,45 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2022 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -#[allow(unused)] -pub const VARINT_MAX_SIZE_BYTES: usize = 10; - -pub fn encode(b: &mut [u8], mut v: u64) -> usize { - let mut i = 0; - loop { - if v > 0x7f { - b[i] = (v as u8) & 0x7f; - i += 1; - v = v.wrapping_shr(7); - } else { - b[i] = (v as u8) | 0x80; - i += 1; - break; - } - } - i -} - -pub fn decode(b: &[u8]) -> (u64, usize) { - let mut v = 0_u64; - let mut pos = 0; - let mut l = 0; - let bl = b.len(); - while l < bl { - let x = b[l]; - l += 1; - if x <= 0x7f { - v |= (x as u64).wrapping_shl(pos); - pos += 7; - } else { - v |= ((x & 0x7f) as u64).wrapping_shl(pos); - return (v, l); - } - } - return (0, 0); -}