From a956dbd4be6cb9d925aa829ec6ac9daba0b29c03 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 14 Jan 2022 17:15:01 -0500 Subject: [PATCH] Move sync library out of this repo. --- allthethings/.gitignore | 2 - allthethings/Cargo.toml | 13 -- allthethings/src/config.rs | 36 ---- allthethings/src/iblt.rs | 258 ------------------------ allthethings/src/lib.rs | 42 ---- allthethings/src/link.rs | 346 -------------------------------- allthethings/src/memorystore.rs | 99 --------- allthethings/src/node.rs | 170 ---------------- allthethings/src/protocol.rs | 146 -------------- allthethings/src/store.rs | 57 ------ allthethings/src/varint.rs | 45 ----- 11 files changed, 1214 deletions(-) delete mode 100644 allthethings/.gitignore delete mode 100644 allthethings/Cargo.toml delete mode 100644 allthethings/src/config.rs delete mode 100644 allthethings/src/iblt.rs delete mode 100644 allthethings/src/lib.rs delete mode 100644 allthethings/src/link.rs delete mode 100644 allthethings/src/memorystore.rs delete mode 100644 allthethings/src/node.rs delete mode 100644 allthethings/src/protocol.rs delete mode 100644 allthethings/src/store.rs delete mode 100644 allthethings/src/varint.rs diff --git a/allthethings/.gitignore b/allthethings/.gitignore deleted file mode 100644 index 96ef6c0b9..000000000 --- a/allthethings/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target -Cargo.lock diff --git a/allthethings/Cargo.toml b/allthethings/Cargo.toml deleted file mode 100644 index 0a9129877..000000000 --- a/allthethings/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "allthethings" -version = "0.1.0" -edition = "2021" - -[dependencies] -smol = { version = "^1", features = [] } -zerotier-core-crypto = { path = "../zerotier-core-crypto" } -socket2 = "^0" -serde = { version = "^1", features = ["derive"], default-features = false } -rmp-serde = "^0" -serde_json = { version = "^1", features = ["std"], default-features = false } -parking_lot = "^0" diff --git a/allthethings/src/config.rs b/allthethings/src/config.rs deleted file mode 100644 index bb7ae73bf..000000000 --- a/allthethings/src/config.rs +++ /dev/null @@ -1,36 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] -#[serde(default)] -pub struct Config { - /// Maximum allowed size of a protocol message. - pub max_message_size: usize, - - /// TCP port to which this should bind. - pub tcp_port: u16, - - /// Connection timeout in seconds. - pub io_timeout: u64, - - /// A name for this replicated data set. This is just used to prevent linking to peers replicating different data. - pub domain: String, -} - -impl Default for Config { - fn default() -> Self { - Self { - max_message_size: 1024 * 256, // 256KiB - tcp_port: 19993, - io_timeout: 300, // 5 minutes - domain: String::new(), - } - } -} diff --git a/allthethings/src/iblt.rs b/allthethings/src/iblt.rs deleted file mode 100644 index 61f112aad..000000000 --- a/allthethings/src/iblt.rs +++ /dev/null @@ -1,258 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use std::alloc::{alloc_zeroed, dealloc, Layout}; -use std::mem::size_of; -use std::ptr::slice_from_raw_parts; - -use crate::IDENTITY_HASH_SIZE; - -// The number of indexing sub-hashes to use, must be <= IDENTITY_HASH_SIZE / 8 -const KEY_MAPPING_ITERATIONS: usize = 5; - -#[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))] -#[inline(always)] -fn read_unaligned_u64(i: *const u64) -> u64 { - let mut tmp = 0_u64; - unsafe { copy_nonoverlapping(i.cast::(), (&mut tmp as *mut u64).cast(), 8) }; - tmp -} - -#[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))] -#[inline(always)] -fn read_unaligned_u64(i: *const u64) -> u64 { unsafe { *i } } - -#[inline(always)] -fn xorshift64(mut x: u64) -> u64 { - x ^= x.wrapping_shl(13); - x ^= x.wrapping_shr(7); - x ^= x.wrapping_shl(17); - x -} - -#[repr(C, packed)] -struct IBLTEntry { - key_sum: [u64; IDENTITY_HASH_SIZE / 8], - check_hash_sum: u64, - count: i64, -} - -/// An IBLT (invertible bloom lookup table) specialized for reconciling sets of identity hashes. -/// -/// This makes some careful use of unsafe as it's heavily optimized. It's a CPU bottleneck when -/// replicating large dynamic data sets. -pub struct IBLT { - map: *mut IBLTEntry, - buckets: usize -} - -impl Drop for IBLT { - fn drop(&mut self) { - unsafe { dealloc(self.map.cast(), Layout::from_size_align(size_of::() * self.buckets, 8).unwrap()) }; - } -} - -impl IBLTEntry { - #[inline(always)] - fn is_singular(&self) -> bool { - if self.count == 1 || self.count == -1 { - u64::from_le(self.key_sum[0]).wrapping_add(xorshift64(u64::from_le(self.key_sum[1]))) == u64::from_le(self.check_hash_sum) - } else { - false - } - } -} - -impl IBLT { - /// Construct a new IBLT with a given capacity. - pub fn new(buckets: usize) -> Self { - assert!(buckets > 0 && buckets <= u32::MAX as usize); - Self { - map: unsafe { alloc_zeroed(Layout::from_size_align(size_of::() * buckets, 8).unwrap()).cast() }, - buckets, - } - } - - #[inline(always)] - pub fn buckets(&self) -> usize { self.buckets } - - #[inline(always)] - pub fn as_bytes(&self) -> &[u8] { unsafe { &*slice_from_raw_parts(self.map.cast::(), size_of::() * self.buckets) } } - - fn ins_rem(&mut self, key: &[u8; IDENTITY_HASH_SIZE], delta: i64) { - let key = key.as_ptr().cast::(); - let (k0, k1, k2, k3, k4, k5) = (read_unaligned_u64(key.wrapping_add(0)), read_unaligned_u64(key.wrapping_add(1)), read_unaligned_u64(key.wrapping_add(2)), read_unaligned_u64(key.wrapping_add(3)), read_unaligned_u64(key.wrapping_add(4)), read_unaligned_u64(key.wrapping_add(5))); - let check_hash = u64::from_le(k0).wrapping_add(xorshift64(u64::from_le(k1))).to_le(); - for mapping_sub_hash in 0..KEY_MAPPING_ITERATIONS { - let b = unsafe { &mut *self.map.wrapping_add((u64::from_le(read_unaligned_u64(key.wrapping_add(mapping_sub_hash))) as usize) % self.buckets) }; - b.key_sum[0] ^= k0; - b.key_sum[1] ^= k1; - b.key_sum[2] ^= k2; - b.key_sum[3] ^= k3; - b.key_sum[4] ^= k4; - b.key_sum[5] ^= k5; - b.check_hash_sum ^= check_hash; - b.count = i64::from_le(b.count).wrapping_add(delta).to_le(); - } - } - - #[inline(always)] - pub fn insert(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { self.ins_rem(key, 1); } - - #[inline(always)] - pub fn remove(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { self.ins_rem(key, -1); } - - /// Subtract another IBLT from this one to compute set difference. - /// The other may be in the form of a raw byte array or an IBLT, which implements - /// AsRef<[u8]>. It must have the same number of buckets. - pub fn subtract>(&mut self, other: &O) { - let other_slice = other.as_ref(); - if (other_slice.len() / size_of::()) == self.buckets { - let other_map: *const IBLTEntry = other_slice.as_ptr().cast(); - for i in 0..self.buckets { - let self_b = unsafe { &mut *self.map.wrapping_add(i) }; - let other_b = unsafe { &*other_map.wrapping_add(i) }; - self_b.key_sum[0] ^= other_b.key_sum[0]; - self_b.key_sum[1] ^= other_b.key_sum[1]; - self_b.key_sum[2] ^= other_b.key_sum[2]; - self_b.key_sum[3] ^= other_b.key_sum[3]; - self_b.key_sum[4] ^= other_b.key_sum[4]; - self_b.key_sum[5] ^= other_b.key_sum[5]; - self_b.check_hash_sum ^= other_b.check_hash_sum; - self_b.count = i64::from_le(self_b.count).wrapping_sub(i64::from_le(other_b.count)).to_le(); - } - } - } - - /// Extract every enumerable value from this IBLT. - /// - /// This consumes the IBLT instance since listing requires destructive modification - /// of the digest data. - pub fn list bool>(self, mut f: F) { - let buckets = self.buckets; - let mut singular_buckets: Vec = Vec::with_capacity(buckets + 2); - - for i in 0..buckets { - unsafe { - if (&*self.map.wrapping_add(i)).is_singular() { - singular_buckets.push(i as u32); - } - } - } - - let mut key = [0_u64; IDENTITY_HASH_SIZE / 8]; - let mut bucket_ptr = 0; - while bucket_ptr < singular_buckets.len() { - let b = unsafe { &*self.map.wrapping_add(*singular_buckets.get_unchecked(bucket_ptr) as usize) }; - bucket_ptr += 1; - if b.is_singular() { - key[0] = b.key_sum[0]; - key[1] = b.key_sum[1]; - key[2] = b.key_sum[2]; - key[3] = b.key_sum[3]; - key[4] = b.key_sum[4]; - key[5] = b.key_sum[5]; - if f(unsafe { &*key.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() }) { - let check_hash = u64::from_le(key[0]).wrapping_add(xorshift64(u64::from_le(key[1]))).to_le(); - for mapping_sub_hash in 0..KEY_MAPPING_ITERATIONS { - let bi = (u64::from_le(unsafe { *key.get_unchecked(mapping_sub_hash) }) as usize) % buckets; - let b = unsafe { &mut *self.map.wrapping_add(bi) }; - b.key_sum[0] ^= key[0]; - b.key_sum[1] ^= key[1]; - b.key_sum[2] ^= key[2]; - b.key_sum[3] ^= key[3]; - b.key_sum[4] ^= key[4]; - b.key_sum[5] ^= key[5]; - b.check_hash_sum ^= check_hash; - b.count = i64::from_le(b.count).wrapping_sub(1).to_le(); - if b.is_singular() { - singular_buckets.push(bi as u32); - } - } - } else { - break; - } - } - } - } -} - -impl AsRef<[u8]> for IBLT { - /// Get this IBLT in raw byte array form. - #[inline(always)] - fn as_ref(&self) -> &[u8] { self.as_bytes() } -} - -#[cfg(test)] -mod tests { - use zerotier_core_crypto::hash::SHA384; - use crate::iblt::*; - - #[test] - fn compiler_behavior() { - // A number of things above like unrolled key XORing must be changed if this size is changed. - assert_eq!(IDENTITY_HASH_SIZE, 48); - assert!(KEY_MAPPING_ITERATIONS <= (IDENTITY_HASH_SIZE / 8) && (IDENTITY_HASH_SIZE % 8) == 0); - - // Make sure this packed struct is actually packed. - assert_eq!(size_of::(), IDENTITY_HASH_SIZE + 8 + 8); - } - - #[allow(unused_variables)] - #[test] - fn insert_and_list() { - for _ in 0..10 { - for expected_cnt in 0..768 { - let random_u64 = zerotier_core_crypto::random::xorshift64_random(); - let mut t = IBLT::new(2048); - for i in 0..expected_cnt { - let k = SHA384::hash(&((i + random_u64) as u64).to_le_bytes()); - t.insert(&k); - } - let mut cnt = 0; - t.list(|k| { - cnt += 1; - true - }); - assert_eq!(cnt, expected_cnt); - } - } - } - - #[allow(unused_variables)] - #[test] - fn set_reconciliation() { - for _ in 0..10 { - let random_u64 = zerotier_core_crypto::random::xorshift64_random(); - let mut alice = IBLT::new(2048); - let mut bob = IBLT::new(2048); - let mut alice_total = 0_i32; - let mut bob_total = 0_i32; - for i in 0..1500 { - let k = SHA384::hash(&((i ^ random_u64) as u64).to_le_bytes()); - if (k[0] & 1) == 1 { - alice.insert(&k); - alice_total += 1; - } - if (k[0] & 3) == 2 { - bob.insert(&k); - bob_total += 1; - } - } - alice.subtract(&bob); - let mut diff_total = 0_i32; - alice.list(|k| { - diff_total += 1; - true - }); - // This is a probabilistic process so we tolerate a little bit of failure. The idea is that each - // pass reconciles more and more differences. - assert!(((alice_total + bob_total) - diff_total).abs() <= 128); - } - } -} diff --git a/allthethings/src/lib.rs b/allthethings/src/lib.rs deleted file mode 100644 index eede2da9a..000000000 --- a/allthethings/src/lib.rs +++ /dev/null @@ -1,42 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -mod store; -mod protocol; -mod varint; -mod memorystore; -mod iblt; -mod config; -mod link; -mod node; - -pub fn ms_since_epoch() -> u64 { - std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64 -} - -pub fn ms_monotonic() -> u64 { - std::time::Instant::now().elapsed().as_millis() as u64 -} - -#[inline(always)] -pub(crate) async fn io_timeout>>(d: std::time::Duration, f: F) -> smol::io::Result { - smol::future::or(f, async { - let _ = smol::Timer::after(d).await; - Err(smol::io::Error::new(smol::io::ErrorKind::TimedOut, "I/O timeout")) - }).await -} - -/// SHA384 is the hash currently used. Others could be supported in the future. -/// If this size changes check iblt.rs for a few things that must be changed. This -/// is checked in "cargo test." -pub const IDENTITY_HASH_SIZE: usize = 48; - -pub use config::Config; -pub use store::{Store, StorePutResult}; -pub use memorystore::MemoryStore; -pub use node::Node; diff --git a/allthethings/src/link.rs b/allthethings/src/link.rs deleted file mode 100644 index 023f37161..000000000 --- a/allthethings/src/link.rs +++ /dev/null @@ -1,346 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use std::mem::MaybeUninit; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::time::Duration; - -use serde::{Deserialize, Serialize}; -use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; -use smol::lock::Mutex; -use smol::net::TcpStream; - -use zerotier_core_crypto::gmac::GMACStream; -use zerotier_core_crypto::hash::SHA384; -use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384; -use zerotier_core_crypto::p521::{P521KeyPair, P521PublicKey}; -use zerotier_core_crypto::secret::Secret; - -use crate::{Config, IDENTITY_HASH_SIZE, io_timeout, Store, varint}; -use crate::protocol::*; - -struct Output { - stream: BufWriter, - gmac: Option, -} - -pub(crate) struct Link<'e, S: Store + 'static> { - pub connect_time: u64, - io_timeout: Duration, - node_secret: &'e P521KeyPair, - config: &'e Config, - store: &'e S, - remote_node_id: parking_lot::Mutex>, - reader: Mutex>, - writer: Mutex, - keepalive_period: u64, - last_send_time: AtomicU64, - max_message_size: usize, - authenticated: AtomicBool, -} - -#[inline(always)] -fn decode_msgpack<'de, T: Deserialize<'de>>(data: &'de [u8]) -> smol::io::Result { - rmp_serde::from_read_ref(data).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid msgpack data")) -} - -#[inline(always)] -fn next_id_hash_in_slice(bytes: &[u8]) -> smol::io::Result<&[u8; IDENTITY_HASH_SIZE]> { - if bytes.len() >= IDENTITY_HASH_SIZE { - Ok(unsafe { &*bytes.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() }) - } else { - Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid identity hash")) - } -} - -impl<'e, S: Store + 'static> Link<'e, S> { - pub fn new(stream: TcpStream, node_secret: &'e P521KeyPair, config: &'e Config, store: &'e S) -> Self { - let _ = stream.set_nodelay(false); - let max_message_size = HELLO_SIZE_MAX.max(config.max_message_size); - let now_monotonic = store.monotonic_clock(); - Self { - connect_time: now_monotonic, - io_timeout: Duration::from_secs(config.io_timeout), - node_secret, - config, - store, - remote_node_id: parking_lot::Mutex::new(None), - reader: Mutex::new(BufReader::with_capacity(65536, stream.clone())), - writer: Mutex::new(Output { - stream: BufWriter::with_capacity(max_message_size + 16, stream), - gmac: None - }), - keepalive_period: (config.io_timeout * 1000) / 2, - last_send_time: AtomicU64::new(now_monotonic), - max_message_size, - authenticated: AtomicBool::new(false), - } - } - - /// Get the remote node ID, which is SHA384(its long-term public keys). - /// Returns None if the remote node has not yet responded with HelloAck and been verified. - pub fn remote_node_id(&self) -> Option<[u8; 48]> { self.remote_node_id.lock().clone() } - - pub(crate) async fn do_periodic_tasks(&self, now_monotonic: u64) -> smol::io::Result<()> { - if now_monotonic.saturating_sub(self.last_send_time.load(Ordering::Relaxed)) >= self.keepalive_period && self.authenticated.load(Ordering::Relaxed) { - let timeout = Duration::from_secs(1); - let mut writer = self.writer.lock().await; - io_timeout(timeout, writer.stream.write_all(&[MESSAGE_TYPE_KEEPALIVE])).await?; - io_timeout(timeout, writer.stream.flush()).await?; - self.last_send_time.store(now_monotonic, Ordering::Relaxed); - } - Ok(()) - } - - async fn write_message(&self, message_type: u8, message: &[&[u8]]) -> smol::io::Result<()> { - let mut writer = self.writer.lock().await; - if writer.gmac.is_some() { - let mut mac: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() }; - let mt = [message_type]; - - let gmac = writer.gmac.as_mut().unwrap(); - gmac.init_for_next_message(); - gmac.update(&mt); - let mut total_length = 0_usize; - for m in message.iter() { - total_length += (*m).len(); - gmac.update(*m); - } - gmac.finish(&mut mac); - - io_timeout(self.io_timeout, writer.stream.write_all(&mt)).await?; - io_timeout(self.io_timeout, varint::async_write(&mut writer.stream, total_length as u64)).await?; - for m in message.iter() { - io_timeout(self.io_timeout, writer.stream.write_all(*m)).await?; - } - io_timeout(self.io_timeout, writer.stream.write_all(&mac)).await?; - io_timeout(self.io_timeout, writer.stream.flush()).await - } else { - Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "link negotiation is not complete")) - } - } - - async fn write_message_msgpack(&self, serialize_buf: &mut Vec, message_type: u8, message: &T) -> smol::io::Result<()> { - serialize_buf.clear(); - rmp_serde::encode::write(serialize_buf, message).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "msgpack encode failure"))?; - self.write_message(message_type, &[serialize_buf.as_slice()]).await - } - - pub(crate) async fn io_main(&self) -> smol::io::Result<()> { - // Reader is held here for the duration of the link's I/O loop. - let mut reader_mg = self.reader.lock().await; - let reader = &mut *reader_mg; - - let mut read_buf: Vec = Vec::new(); - read_buf.resize(self.max_message_size, 0); - let mut tmp_buf: Vec = Vec::with_capacity(4096); - - // (1) Send Hello and save the nonce and the hash of the raw Hello message for later HelloAck HMAC check. - let mut outgoing_nonce = [0_u8; HELLO_NONCE_SIZE]; - zerotier_core_crypto::random::fill_bytes_secure(&mut outgoing_nonce); - let ephemeral_secret = P521KeyPair::generate(true).unwrap(); - let sent_hello_hash = { - tmp_buf.clear(); - let _ = rmp_serde::encode::write(&mut tmp_buf, &Hello { - protocol_version: PROTOCOL_VERSION, - flags: 0, - clock: self.store.clock(), - domain: self.config.domain.as_str(), - nonce: &outgoing_nonce, - p521_ecdh_ephemeral_key: ephemeral_secret.public_key_bytes(), - p521_ecdh_node_key: self.node_secret.public_key_bytes(), - }).unwrap(); - - let mut writer = self.writer.lock().await; - io_timeout(self.io_timeout, varint::async_write(&mut writer.stream, tmp_buf.len() as u64)).await?; - io_timeout(self.io_timeout, writer.stream.write_all(tmp_buf.as_slice())).await?; - io_timeout(self.io_timeout, writer.stream.flush()).await?; - drop(writer); - - SHA384::hash(tmp_buf.as_slice()) - }; - - self.last_send_time.store(self.store.monotonic_clock(), Ordering::Relaxed); - - // (2) Read other side's HELLO and send ACK. Also do key agreement, initialize GMAC, etc. - let message_size = io_timeout(self.io_timeout, varint::async_read(reader)).await? as usize; - if message_size > HELLO_SIZE_MAX { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large")); - } - let (mut gmac_receive, ack_key, remote_node_id) = { - let hello_buf = &mut read_buf.as_mut_slice()[0..message_size]; - io_timeout(self.io_timeout, reader.read_exact(hello_buf)).await?; - let received_hello_hash = SHA384::hash(hello_buf); // for ACK generation - let hello: Hello = decode_msgpack(hello_buf)?; - - if hello.nonce.len() < HELLO_NONCE_SIZE || hello.protocol_version != PROTOCOL_VERSION { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid HELLO parameters")); - } - - let remote_node_public_key = P521PublicKey::from_bytes(hello.p521_ecdh_node_key); - let remote_ephemeral_public_key = P521PublicKey::from_bytes(hello.p521_ecdh_ephemeral_key); - if remote_node_public_key.is_none() || remote_ephemeral_public_key.is_none() { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid public key in HELLO")); - } - let node_shared_key = self.node_secret.agree(&remote_node_public_key.unwrap()); - let ephemeral_shared_key = ephemeral_secret.agree(&remote_ephemeral_public_key.unwrap()); - if node_shared_key.is_none() || ephemeral_shared_key.is_none() { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "key agreement failed")); - } - let shared_base_key = Secret(SHA384::hmac(&SHA384::hash(ephemeral_shared_key.unwrap().as_bytes()), node_shared_key.unwrap().as_bytes())); - - let shared_gmac_base_key = zt_kbkdf_hmac_sha384(shared_base_key.as_bytes(), KBKDF_LABEL_GMAC, 0, 0); - let gmac_receive_key = Secret(SHA384::hmac(&hello.nonce[0..48], &shared_gmac_base_key.0)); - let gmac_send_key = Secret(SHA384::hmac(&outgoing_nonce[0..48], &shared_gmac_base_key.0)); - let gmac_receive = GMACStream::new(&gmac_receive_key.0[0..32], &hello.nonce[48..64]); - self.writer.lock().await.gmac.replace(GMACStream::new(&gmac_send_key.0[0..32], &outgoing_nonce[48..64])); - - let shared_ack_key = zt_kbkdf_hmac_sha384(shared_base_key.as_bytes(), KBKDF_LABEL_HELLO_ACK_HMAC, 0, 0); - let ack_hmac = SHA384::hmac(shared_ack_key.as_bytes(), &received_hello_hash); - self.write_message_msgpack(&mut tmp_buf, MESSAGE_TYPE_HELLO_ACK, &HelloAck { - ack: &ack_hmac, - clock_echo: hello.clock - }).await?; - - (gmac_receive, shared_ack_key, SHA384::hash(hello.p521_ecdh_node_key)) - }; - - self.last_send_time.store(self.store.monotonic_clock(), Ordering::Relaxed); - - // Done with ephemeral secret key, so forget it. - drop(ephemeral_secret); - - // (3) Start primary I/O loop and initially listen for HelloAck to confirm the other side's node identity. - let mut received_mac_buf = [0_u8; 16]; - let mut expected_mac_buf = [0_u8; 16]; - let mut message_type_buf = [0_u8; 1]; - let mut authenticated = false; - loop { - io_timeout(self.io_timeout, reader.read_exact(&mut message_type_buf)).await?; - if message_type_buf[0] != MESSAGE_TYPE_KEEPALIVE { - let message_size = io_timeout(self.io_timeout, varint::async_read(reader)).await? as usize; - if message_size > self.max_message_size { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large")); - } - let message_buf = &mut read_buf.as_mut_slice()[0..message_size]; - io_timeout(self.io_timeout, reader.read_exact(message_buf)).await?; - io_timeout(self.io_timeout, reader.read_exact(&mut received_mac_buf)).await?; - - gmac_receive.init_for_next_message(); - gmac_receive.update(&message_type_buf); - gmac_receive.update(message_buf); - gmac_receive.finish(&mut expected_mac_buf); - if !received_mac_buf.eq(&expected_mac_buf) { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message authentication failed")); - } - - if authenticated { - match message_type_buf[0] { - MESSAGE_TYPE_OBJECTS => self.do_objects(message_buf).await?, - MESSAGE_TYPE_HAVE_OBJECTS => self.do_have_objects(&mut tmp_buf, message_buf).await?, - MESSAGE_TYPE_WANT_OBJECTS => self.do_want_objects(message_buf).await?, - MESSAGE_TYPE_STATE => self.do_sync_request(decode_msgpack(message_buf)?).await?, - MESSAGE_TYPE_IBLT_SYNC_DIGEST => self.do_iblt_sync_digest(decode_msgpack(message_buf)?).await?, - _ => {}, - } - } else { - if message_type_buf[0] == MESSAGE_TYPE_HELLO_ACK { - let hello_ack: HelloAck = decode_msgpack(message_buf)?; - let expected_ack_hmac = SHA384::hmac(ack_key.as_bytes(), &sent_hello_hash); - if !hello_ack.ack.eq(&expected_ack_hmac) { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "session authentication failed")); - } - - authenticated = true; - let _ = self.remote_node_id.lock().replace(remote_node_id.clone()); - self.authenticated.store(true, Ordering::Relaxed); - } else { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message received before session authenticated")); - } - } - } - } - } - - async fn do_objects(&self, mut msg: &[u8]) -> smol::io::Result<()> { - while !msg.is_empty() { - let obj_size = varint::async_read(&mut msg).await? as usize; - if obj_size >= msg.len() { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "object incomplete")); - } - let obj = &msg[0..obj_size]; - msg = &msg[obj_size..]; - match self.store.put(&SHA384::hash(obj), obj) { - crate::StorePutResult::Invalid => { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid object in stream")); - }, - _ => {} - } - } - Ok(()) - } - - async fn do_have_objects(&self, tmp_buf: &mut Vec, mut msg: &[u8]) -> smol::io::Result<()> { - tmp_buf.clear(); - varint::write(tmp_buf, self.store.clock()); - let empty_tmp_buf_size = tmp_buf.len(); - - while !msg.is_empty() { - let id_hash = next_id_hash_in_slice(msg)?; - msg = &msg[IDENTITY_HASH_SIZE..]; - if !self.store.have(id_hash) { - let _ = tmp_buf.write_all(id_hash); - } - } - - if tmp_buf.len() != empty_tmp_buf_size { - self.write_message(MESSAGE_TYPE_WANT_OBJECTS, &[tmp_buf.as_slice()]).await - } else { - Ok(()) - } - } - - async fn do_want_objects(&self, mut msg: &[u8]) -> smol::io::Result<()> { - let ref_time = varint::read(&mut msg); - if !ref_time.is_err() { - return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "object incomplete")); - } - let ref_time = ref_time.unwrap().0; - - let mut objects: Vec = Vec::with_capacity((msg.len() / IDENTITY_HASH_SIZE) + 1); - while !msg.is_empty() { - let id_hash = next_id_hash_in_slice(msg)?; - msg = &msg[IDENTITY_HASH_SIZE..]; - let _ = self.store.get(ref_time, id_hash).map(|obj| objects.push(obj)); - } - - if !objects.is_empty() { - let mut sizes: Vec = Vec::with_capacity(objects.len()); - let mut slices: Vec<&[u8]> = Vec::with_capacity(objects.len() * 2); - for o in objects.iter() { - sizes.push(varint::Encoded::from(o.as_ref().len() as u64)); - } - for i in 0..objects.len() { - slices.push(sizes.get(i).unwrap().as_ref()); - slices.push(objects.get(i).unwrap().as_ref()); - } - self.write_message(MESSAGE_TYPE_OBJECTS, slices.as_slice()).await - } else { - Ok(()) - } - } - - async fn do_sync_request(&self, sr: State<'_>) -> smol::io::Result<()> { - Ok(()) - } - - async fn do_iblt_sync_digest(&self, sd: IBLTSyncDigest<'_>) -> smol::io::Result<()> { - Ok(()) - } -} diff --git a/allthethings/src/memorystore.rs b/allthethings/src/memorystore.rs deleted file mode 100644 index 088269651..000000000 --- a/allthethings/src/memorystore.rs +++ /dev/null @@ -1,99 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use std::collections::Bound::Included; -use std::collections::BTreeMap; -use std::sync::Arc; - -use parking_lot::Mutex; - -use crate::{IDENTITY_HASH_SIZE, ms_monotonic, ms_since_epoch, Store, StorePutResult}; - -/// A Store that stores all objects in memory, mostly for testing. -pub struct MemoryStore(Mutex, u64)>>); - -impl MemoryStore { - pub fn new() -> Self { Self(Mutex::new(BTreeMap::new())) } -} - -impl Store for MemoryStore { - type Object = Arc<[u8]>; - - #[inline(always)] - fn clock(&self) -> u64 { ms_since_epoch() } - - #[inline(always)] - fn monotonic_clock(&self) -> u64 { ms_monotonic() } - - fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option { - self.0.lock().get(identity_hash).and_then(|o| { - if (*o).1 <= reference_time { - Some((*o).0.clone()) - } else { - None - } - }) - } - - fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult { - let mut result = StorePutResult::Duplicate; - let _ = self.0.lock().entry(identity_hash.clone()).or_insert_with(|| { - result = StorePutResult::Ok; - (object.to_vec().into(), ms_since_epoch()) - }); - result - } - - fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool { - self.0.lock().contains_key(identity_hash) - } - - fn total_count(&self, reference_time: u64) -> Option { - let mut tc = 0_u64; - for e in self.0.lock().iter() { - tc += ((*e.1).1 <= reference_time) as u64; - } - Some(tc) - } - - fn for_each) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], mut f: F) { - let mut tmp: Vec<([u8; IDENTITY_HASH_SIZE], Arc<[u8]>)> = Vec::with_capacity(1024); - for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() { - if (*e.1).1 <= reference_time { - tmp.push((e.0.clone(), (*e.1).0.clone())); - } - } - for e in tmp.iter() { - if !f(&(*e).0, &(*e).1) { - break; - } - } - } - - fn for_each_identity_hash bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], mut f: F) { - let mut tmp: Vec<[u8; IDENTITY_HASH_SIZE]> = Vec::with_capacity(1024); - for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() { - if (*e.1).1 <= reference_time { - tmp.push(e.0.clone()); - } - } - for e in tmp.iter() { - if !f(e) { - break; - } - } - } - - fn count(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option { - let mut tc = 0_u64; - for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() { - tc += ((*e.1).1 <= reference_time) as u64; - } - Some(tc) - } -} diff --git a/allthethings/src/node.rs b/allthethings/src/node.rs deleted file mode 100644 index 150fbee78..000000000 --- a/allthethings/src/node.rs +++ /dev/null @@ -1,170 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use std::collections::{HashMap, HashSet}; -use std::error::Error; -use std::sync::{Arc, Weak}; -use std::time::Duration; - -use smol::{Executor, Task, Timer}; -use smol::lock::Mutex; -use smol::net::SocketAddr; - -use zerotier_core_crypto::p521::P521KeyPair; - -use crate::{Config, Store}; -use crate::link::Link; - -struct NodeIntl<'e, S: Store + 'static> { - config: &'e Config, - secret: &'e P521KeyPair, - store: &'e S, - executor: &'e Executor<'e>, - connections: Mutex>, Task<()>)>> -} - -pub struct Node<'e, S: Store + 'static> { - daemon_tasks: Vec>, - intl: Weak> -} - -impl<'e, S: Store + 'static> Node<'e, S> { - pub fn new(config: &'e Config, secret: &'e P521KeyPair, store: &'e S, executor: &'e Executor<'e>) -> Result> { - let listener_v4 = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v4| { - let _ = v4.set_reuse_address(true); - let _ = v4.bind(&socket2::SockAddr::from(std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, config.tcp_port)))?; - let _ = v4.listen(64); - Ok(v4) - }); - let listener_v6 = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v6| { - let _ = v6.set_only_v6(true); - let _ = v6.set_reuse_address(true); - let _ = v6.bind(&socket2::SockAddr::from(std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)))?; - let _ = v6.listen(64); - Ok(v6) - }); - if listener_v4.is_err() && listener_v6.is_err() { - return Err(Box::new(listener_v4.unwrap_err())); - } - - let ni = Arc::new(NodeIntl { - config, - secret, - store, - executor, - connections: Mutex::new(HashMap::with_capacity(64)), - }); - - let mut n = Self { - daemon_tasks: Vec::with_capacity(3), - intl: Arc::downgrade(&ni) - }; - - if listener_v4.is_ok() { - let listener_v4 = listener_v4.unwrap(); - let ni2 = ni.clone(); - n.daemon_tasks.push(executor.spawn(async move { ni2.tcp_listener_main(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v4)).unwrap()).await })); - } - if listener_v6.is_ok() { - let listener_v6 = listener_v6.unwrap(); - let ni2 = ni.clone(); - n.daemon_tasks.push(executor.spawn(async move { ni2.tcp_listener_main(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v6)).unwrap()).await })); - } - - let ni2 = ni.clone(); - n.daemon_tasks.push(executor.spawn(async move { ni2.background_task_main().await })); - - Ok(n) - } -} - -impl<'e, S: Store + 'static> NodeIntl<'e, S> { - async fn background_task_main(&self) { - let io_timeout_ms = self.config.io_timeout * 1000; - let delay = Duration::from_secs(10); - loop { - Timer::after(delay).await; - - let mut connections = self.connections.lock().await; - let (done_sender, done_receiver) = smol::channel::bounded::<()>(16); - let done_sender = Arc::new(done_sender); - - let to_erase: Arc>> = Arc::new(Mutex::new(Vec::new())); - let mut tasks: Vec> = Vec::with_capacity(connections.len()); - - // Search for connections that are dead, have timed out during negotiation, or - // that are duplicates of another connection to the same remote node. - let have_node_ids: Arc>> = Arc::new(Mutex::new(HashSet::with_capacity(connections.len()))); - let now_monotonic = self.store.monotonic_clock(); - for c in connections.iter() { - let l = c.1.0.upgrade(); - if l.is_some() { - let l = l.unwrap(); - let remote_node_id = l.remote_node_id(); - if remote_node_id.is_some() { - let remote_node_id = remote_node_id.unwrap(); - if !have_node_ids.lock().await.contains(&remote_node_id) { - let a = c.0.clone(); - let hn = have_node_ids.clone(); - let te = to_erase.clone(); - let ds = done_sender.clone(); - tasks.push(self.executor.spawn(async move { - if l.do_periodic_tasks(now_monotonic).await.is_ok() { - if !hn.lock().await.insert(remote_node_id) { - // This is a redudant link to the same remote node. - te.lock().await.push(a); - } - } else { - // A fatal error occurred while servicing the connection. - te.lock().await.push(a); - } - let _ = ds.send(()).await; - })); - } else { - // This is a redudant link to the same remote node. - to_erase.lock().await.push(c.0.clone()); - } - } else if (now_monotonic - l.connect_time) > io_timeout_ms { - // Link negotiation timed out if we aren't connected yet. - to_erase.lock().await.push(c.0.clone()); - } - } else { - // Connection is closed and has released its internally held Arc<>. - to_erase.lock().await.push(c.0.clone()); - } - } - - // Wait for a message on the channel from each task indicating that it is complete. - for _ in 0..tasks.len() { - let _ = done_receiver.recv().await; - } - - // Close and erase all connections slated for cleanup. - for e in to_erase.lock().await.iter() { - let _ = connections.remove(e); - } - } - } - - async fn tcp_listener_main(&self, listener: smol::net::TcpListener) { - loop { - let c = listener.accept().await; - if c.is_ok() { - let (connection, remote_address) = c.unwrap(); - let l = Arc::new(Link::<'e, S>::new(connection, self.secret, self.config, self.store)); - self.connections.lock().await.insert(remote_address.clone(), (Arc::downgrade(&l), self.executor.spawn(async move { - let _ = l.io_main().await; - // Arc is now released, causing Weak to go null and then causing this - // entry to be removed from the connection map on the next background task sweep. - }))); - } else { - break; - } - } - } -} diff --git a/allthethings/src/protocol.rs b/allthethings/src/protocol.rs deleted file mode 100644 index dfc718ad9..000000000 --- a/allthethings/src/protocol.rs +++ /dev/null @@ -1,146 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -/* - * Wire protocol notes: - * - * Messages are prefixed by a type byte followed by a message size in the form - * of a variable length integer (varint). Each message is followed by a - * 16-byte GMAC message authentication code. - * - * HELLO is an exception. It's sent on connect, is prefixed only by a varint - * size, and is not followed by a MAC. Instead HelloAck is sent after it - * containing a full HMAC ACK for key negotiation. - * - * GMAC is keyed using a KBKDF-derived key from a shared key currently made - * with HKDF as HMAC(SHA384(ephemeral key), node key). The first 32 bytes of - * the key are the GMAC key while the nonce is the first 96 bytes of - * the nonce where this 96-bit integer is incremented (as little-endian) - * for each message sent. Increment should wrap at 2^96. The connection should - * close after no more than 2^96 messages, but that's a crazy long session - * anyway. - * - * The wire protocol is only authenticated to prevent network level attacks. - * Data is not encrypted since this is typically used to replicate a public - * "well known" data set and encryption would add needless overhead. - */ - -use serde::{Deserialize, Serialize}; - -/// KBKDF label for the HMAC in HelloAck. -pub const KBKDF_LABEL_HELLO_ACK_HMAC: u8 = b'A'; - -/// KBKDF label for GMAC key derived from main key. -pub const KBKDF_LABEL_GMAC: u8 = b'G'; - -/// Sanity limit on the size of HELLO. -pub const HELLO_SIZE_MAX: usize = 4096; - -/// Size of nonce sent with HELLO. -pub const HELLO_NONCE_SIZE: usize = 64; - -/// Overall protocol version. -pub const PROTOCOL_VERSION: u16 = 1; - -/// No operation, no payload, sent without size or MAC. -/// This is a special single byte message used for connection keepalive. -pub const MESSAGE_TYPE_KEEPALIVE: u8 = 0; - -/// Acknowledgement of HELLO. -pub const MESSAGE_TYPE_HELLO_ACK: u8 = 1; - -/// A series of objects with each prefixed by a varint size. -pub const MESSAGE_TYPE_OBJECTS: u8 = 2; - -/// A series of identity hashes concatenated together advertising objects we have. -pub const MESSAGE_TYPE_HAVE_OBJECTS: u8 = 3; - -/// A series of identity hashes concatenated together of objects being requested. -pub const MESSAGE_TYPE_WANT_OBJECTS: u8 = 4; - -/// Report state, requesting possible sync response. -pub const MESSAGE_TYPE_STATE: u8 = 5; - -/// IBLT sync digest, payload is IBLTSyncDigest. -pub const MESSAGE_TYPE_IBLT_SYNC_DIGEST: u8 = 6; - -/// Initial message sent by both sides on TCP connection establishment. -/// This is sent with no type or message authentication code and is only -/// sent on connect. It is prefixed by a varint size. -#[derive(Deserialize, Serialize)] -pub struct Hello<'a> { - /// Local value of PROTOCOL_VERSION. - pub protocol_version: u16, - /// Flags, currently unused and always zero. - pub flags: u64, - /// Local clock in milliseconds since Unix epoch. - pub clock: u64, - /// The data set name ("domain") to which this node belongs. - pub domain: &'a str, - /// Random nonce, must be at least 64 bytes in length. - pub nonce: &'a [u8], - /// Random ephemeral ECDH session key. - pub p521_ecdh_ephemeral_key: &'a [u8], - /// Long-lived node-identifying ECDH public key. - pub p521_ecdh_node_key: &'a [u8], -} - -/// Sent in response to Hello and contains an acknowledgement HMAC for the shared key. -#[derive(Deserialize, Serialize)] -pub struct HelloAck<'a> { - /// HMAC-SHA384(KBKDF(ack key, KBKDF_LABEL_HELLO_ACK_HMAC), SHA384(original raw Hello)) - pub ack: &'a [u8], - /// Value of clock in original hello, for measuring latency. - pub clock_echo: u64, -} - -/// Report the state of the sender's data set. -/// -/// The peer may respond in one of three ways: -/// -/// (1) It may send an IBLTSyncDigest over a range of identity hashes of its -/// choice so that the requesting node may compute a difference and request -/// objects it does not have. -/// -/// (2) It may send HAVE_OBJECTS with a simple list of objects. -/// -/// (3) It may simply send a batch of objects. -/// -/// (4) It may not respond at all. -/// -/// Which option is chosen is up to the responding node and should be chosen -/// via a heuristic to maximize sync efficiency and minimize sync time. -/// -/// A central assumption is that identity hashes are uniformly distributed -/// since they are cryptographic hashes (currently SHA-384). This allows a -/// simple calculation to be made with the sending node's total count to -/// estimate set difference density across the entire hash range. -#[derive(Deserialize, Serialize)] -pub struct State<'a> { - /// Total number of hashes in the entire data set. - pub total_count: u64, - /// Our clock to use as a reference time for filtering the data set (if applicable). - pub reference_time: u64, -} - -/// An IBLT digest of identity hashes over a range. -#[derive(Deserialize, Serialize)] -pub struct IBLTSyncDigest<'a> { - /// Start of range. Right-pad with zeroes if too short. - pub range_start: &'a [u8], - /// End of range. Right-pad with zeroes if too short. - pub range_end: &'a [u8], - /// IBLT digest of hashes in this range. - pub iblt: &'a [u8], - /// Number of hashes in this range. - pub count: u64, - /// Total number of hashes in entire data set. - pub total_count: u64, - /// Reference time from SyncRequest or 0 if this is being sent synthetically. - pub reference_time: u64, -} diff --git a/allthethings/src/store.rs b/allthethings/src/store.rs deleted file mode 100644 index 7881fbb72..000000000 --- a/allthethings/src/store.rs +++ /dev/null @@ -1,57 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use crate::IDENTITY_HASH_SIZE; - -/// Result returned by Store::put(). -pub enum StorePutResult { - /// Datum stored successfully. - Ok, - /// Datum is one we already have. - Duplicate, - /// Value is invalid. (this may result in dropping connections to peers, etc.) - Invalid, - /// Value is not invalid but it was not added to the data store for some neutral reason. - Ignored, -} - -/// Trait that must be implemented by the data store that is to be replicated. -pub trait Store: Sync + Send { - /// Type returned by get(), which can be anything that contains a byte slice. - type Object: AsRef<[u8]> + Send; - - /// Get the current wall time in milliseconds since Unix epoch. - fn clock(&self) -> u64; - - /// Get the number of milliseconds that have elapsed since some arbitrary event in the past (like system boot). - fn monotonic_clock(&self) -> u64; - - /// Get an object from the database. - fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option; - - /// Store an entry in the database. - fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult; - - /// Check if we have an object by its identity hash. - fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool; - - /// Get the total count of objects. - fn total_count(&self, reference_time: u64) -> Option; - - /// Iterate over a range of identity hashes and values. - /// This calls the supplied function for each object. If the function returns false iteration stops. - fn for_each bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], f: F); - - /// Iterate over a range of identity hashes. - /// This calls the supplied function for each hash. If the function returns false iteration stops. - fn for_each_identity_hash bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], f: F); - - /// Count the number of identity hash keys in this range (inclusive) of identity hashes. - /// This may return None if an error occurs, but should return 0 if the set is empty. - fn count(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option; -} diff --git a/allthethings/src/varint.rs b/allthethings/src/varint.rs deleted file mode 100644 index ff48c9fc8..000000000 --- a/allthethings/src/varint.rs +++ /dev/null @@ -1,45 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. - * - * (c)2021 ZeroTier, Inc. - * https://www.zerotier.com/ - */ - -use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt}; - -pub use zerotier_core_crypto::varint::*; - -pub async fn async_write(w: &mut W, mut v: u64) -> smol::io::Result<()> { - let mut b = [0_u8; 10]; - let mut i = 0; - loop { - if v > 0x7f { - b[i] = (v as u8) & 0x7f; - i += 1; - v = v.wrapping_shr(7); - } else { - b[i] = (v as u8) | 0x80; - i += 1; - break; - } - } - w.write_all(&b[0..i]).await -} - -pub async fn async_read(r: &mut R) -> smol::io::Result { - let mut v = 0_u64; - let mut buf = [0_u8; 1]; - let mut pos = 0; - loop { - let _ = r.read_exact(&mut buf).await?; - let b = buf[0]; - if b <= 0x7f { - v |= (b as u64).wrapping_shl(pos); - pos += 7; - } else { - v |= ((b & 0x7f) as u64).wrapping_shl(pos); - return Ok(v); - } - } -}