From 09d7e252549b4fe6532a18e27412e092265c228e Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 30 Dec 2021 18:15:32 -0500 Subject: [PATCH] Too much to list, but mostly sync stuff. --- allthethings/Cargo.toml | 5 + allthethings/src/config.rs | 36 ++++ allthethings/src/iblt.rs | 269 ++++++++++++++++------------ allthethings/src/lib.rs | 29 ++- allthethings/src/link.rs | 251 ++++++++++++++++++++++++++ allthethings/src/memorystore.rs | 109 ++++++----- allthethings/src/protocol.rs | 160 +++++++++++++---- allthethings/src/replicator.rs | 71 +++++--- allthethings/src/store.rs | 35 ++-- allthethings/src/varint.rs | 10 +- zerotier-core-crypto/src/gmac.rs | 65 +++++++ zerotier-core-crypto/src/lib.rs | 1 + zerotier-system-service/src/main.rs | 17 +- 13 files changed, 785 insertions(+), 273 deletions(-) create mode 100644 allthethings/src/config.rs create mode 100644 allthethings/src/link.rs create mode 100644 zerotier-core-crypto/src/gmac.rs diff --git a/allthethings/Cargo.toml b/allthethings/Cargo.toml index ef0539ed5..0a9129877 100644 --- a/allthethings/Cargo.toml +++ b/allthethings/Cargo.toml @@ -6,3 +6,8 @@ edition = "2021" [dependencies] smol = { version = "^1", features = [] } zerotier-core-crypto = { path = "../zerotier-core-crypto" } +socket2 = "^0" +serde = { version = "^1", features = ["derive"], default-features = false } +rmp-serde = "^0" +serde_json = { version = "^1", features = ["std"], default-features = false } +parking_lot = "^0" diff --git a/allthethings/src/config.rs b/allthethings/src/config.rs new file mode 100644 index 000000000..ee833352c --- /dev/null +++ b/allthethings/src/config.rs @@ -0,0 +1,36 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct Config { + /// Maximum allowed size of a protocol message. + pub max_message_size: usize, + + /// TCP port to which this should bind. + pub tcp_port: u16, + + /// Connection timeout in seconds. + pub io_timeout: u64, + + /// A name for this replicated data set. This is just used to prevent linking to peers replicating different data. + pub domain: String, +} + +impl Default for Config { + fn default() -> Self { + Self { + max_message_size: 1024 * 1024, // 1MiB + tcp_port: 19993, + io_timeout: 300, // 5 minutes + domain: String::new(), + } + } +} diff --git a/allthethings/src/iblt.rs b/allthethings/src/iblt.rs index 61f8979a7..61f112aad 100644 --- a/allthethings/src/iblt.rs +++ b/allthethings/src/iblt.rs @@ -6,13 +6,26 @@ * https://www.zerotier.com/ */ +use std::alloc::{alloc_zeroed, dealloc, Layout}; use std::mem::size_of; -use std::ptr::{slice_from_raw_parts, write_bytes, copy_nonoverlapping}; +use std::ptr::slice_from_raw_parts; use crate::IDENTITY_HASH_SIZE; // The number of indexing sub-hashes to use, must be <= IDENTITY_HASH_SIZE / 8 -const KEY_MAPPING_ITERATIONS: usize = 3; +const KEY_MAPPING_ITERATIONS: usize = 5; + +#[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))] +#[inline(always)] +fn read_unaligned_u64(i: *const u64) -> u64 { + let mut tmp = 0_u64; + unsafe { copy_nonoverlapping(i.cast::(), (&mut tmp as *mut u64).cast(), 8) }; + tmp +} + +#[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))] +#[inline(always)] +fn read_unaligned_u64(i: *const u64) -> u64 { unsafe { *i } } #[inline(always)] fn xorshift64(mut x: u64) -> u64 { @@ -22,7 +35,7 @@ fn xorshift64(mut x: u64) -> u64 { x } -#[repr(packed)] +#[repr(C, packed)] struct IBLTEntry { key_sum: [u64; IDENTITY_HASH_SIZE / 8], check_hash_sum: u64, @@ -30,10 +43,18 @@ struct IBLTEntry { } /// An IBLT (invertible bloom lookup table) specialized for reconciling sets of identity hashes. -/// This skips some extra hashing that would be necessary in a universal implementation since identity -/// hashes are already randomly distributed strong hashes. +/// +/// This makes some careful use of unsafe as it's heavily optimized. It's a CPU bottleneck when +/// replicating large dynamic data sets. pub struct IBLT { - map: Vec, + map: *mut IBLTEntry, + buckets: usize +} + +impl Drop for IBLT { + fn drop(&mut self) { + unsafe { dealloc(self.map.cast(), Layout::from_size_align(size_of::() * self.buckets, 8).unwrap()) }; + } } impl IBLTEntry { @@ -50,135 +71,103 @@ impl IBLTEntry { impl IBLT { /// Construct a new IBLT with a given capacity. pub fn new(buckets: usize) -> Self { - assert!(buckets > 0); + assert!(buckets > 0 && buckets <= u32::MAX as usize); Self { - map: { - let mut tmp: Vec = Vec::with_capacity(buckets); - unsafe { - tmp.set_len(buckets); - write_bytes(tmp.as_mut_ptr().cast::(), 0, buckets * size_of::()); - } - tmp - } + map: unsafe { alloc_zeroed(Layout::from_size_align(size_of::() * buckets, 8).unwrap()).cast() }, + buckets, } } - /// Obtain IBLT from a byte array. - /// This returns None if the supplied bytes are not a valid IBLT. - pub fn from_bytes(b: &[u8]) -> Option { - if b.len() >= size_of::() && (b.len() % size_of::()) == 0 { - let buckets = b.len() / size_of::(); - Some(Self { - map: { - let mut tmp: Vec = Vec::with_capacity(buckets); - unsafe { - tmp.set_len(buckets); - copy_nonoverlapping(b.as_ptr(), tmp.as_mut_ptr().cast::(), buckets * size_of::()); - } - tmp - } - }) - } else { - None - } - } + #[inline(always)] + pub fn buckets(&self) -> usize { self.buckets } - /// Get this IBLT as a byte array that is ready to be sent over the wire. - pub fn as_bytes(&self) -> &[u8] { - unsafe { &*slice_from_raw_parts(self.map.as_ptr().cast(), size_of::() * self.map.len()) } - } + #[inline(always)] + pub fn as_bytes(&self) -> &[u8] { unsafe { &*slice_from_raw_parts(self.map.cast::(), size_of::() * self.buckets) } } - fn ins_rem(&mut self, key: &[u64; IDENTITY_HASH_SIZE / 8], delta: i64) { - let check_hash = u64::from_le(key[0]).wrapping_add(xorshift64(u64::from_le(key[1]))).to_le(); - let buckets = self.map.len(); + fn ins_rem(&mut self, key: &[u8; IDENTITY_HASH_SIZE], delta: i64) { + let key = key.as_ptr().cast::(); + let (k0, k1, k2, k3, k4, k5) = (read_unaligned_u64(key.wrapping_add(0)), read_unaligned_u64(key.wrapping_add(1)), read_unaligned_u64(key.wrapping_add(2)), read_unaligned_u64(key.wrapping_add(3)), read_unaligned_u64(key.wrapping_add(4)), read_unaligned_u64(key.wrapping_add(5))); + let check_hash = u64::from_le(k0).wrapping_add(xorshift64(u64::from_le(k1))).to_le(); for mapping_sub_hash in 0..KEY_MAPPING_ITERATIONS { - let b = unsafe { self.map.get_unchecked_mut((u64::from_le(key[mapping_sub_hash]) as usize) % buckets) }; - for j in 0..(IDENTITY_HASH_SIZE / 8) { - b.key_sum[j] ^= key[j]; - } + let b = unsafe { &mut *self.map.wrapping_add((u64::from_le(read_unaligned_u64(key.wrapping_add(mapping_sub_hash))) as usize) % self.buckets) }; + b.key_sum[0] ^= k0; + b.key_sum[1] ^= k1; + b.key_sum[2] ^= k2; + b.key_sum[3] ^= k3; + b.key_sum[4] ^= k4; + b.key_sum[5] ^= k5; b.check_hash_sum ^= check_hash; b.count = i64::from_le(b.count).wrapping_add(delta).to_le(); } } - #[cfg(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64", target_arch = "powerpc64"))] #[inline(always)] - pub fn insert(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { - self.ins_rem(unsafe { &*key.as_ptr().cast::<[u64; IDENTITY_HASH_SIZE / 8]>() }, 1); - } + pub fn insert(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { self.ins_rem(key, 1); } - #[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64", target_arch = "powerpc64")))] #[inline(always)] - pub fn insert(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { - let mut tmp = [0_u64; IDENTITY_HASH_SIZE / 8]; - unsafe { copy_nonoverlapping(key.as_ptr(), tmp.as_mut_ptr().cast(), IDENTITY_HASH_SIZE) }; - self.ins_rem(&tmp, 1); - } - - #[cfg(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64", target_arch = "powerpc64"))] - #[inline(always)] - pub fn remove(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { - self.ins_rem(unsafe { &*key.as_ptr().cast::<[u64; IDENTITY_HASH_SIZE / 8]>() }, -1); - } - - #[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64", target_arch = "powerpc64")))] - #[inline(always)] - pub fn remove(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { - let mut tmp = [0_u64; IDENTITY_HASH_SIZE / 8]; - unsafe { copy_nonoverlapping(key.as_ptr(), tmp.as_mut_ptr().cast(), IDENTITY_HASH_SIZE) }; - self.ins_rem(&tmp, -1); - } + pub fn remove(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { self.ins_rem(key, -1); } /// Subtract another IBLT from this one to compute set difference. - pub fn subtract(&mut self, other: &IBLT) { - if other.map.len() == self.map.len() { - for i in 0..self.map.len() { - let self_b = unsafe { self.map.get_unchecked_mut(i) }; - let other_b = unsafe { other.map.get_unchecked(i) }; - for j in 0..(IDENTITY_HASH_SIZE / 8) { - self_b.key_sum[j] ^= other_b.key_sum[j]; - } + /// The other may be in the form of a raw byte array or an IBLT, which implements + /// AsRef<[u8]>. It must have the same number of buckets. + pub fn subtract>(&mut self, other: &O) { + let other_slice = other.as_ref(); + if (other_slice.len() / size_of::()) == self.buckets { + let other_map: *const IBLTEntry = other_slice.as_ptr().cast(); + for i in 0..self.buckets { + let self_b = unsafe { &mut *self.map.wrapping_add(i) }; + let other_b = unsafe { &*other_map.wrapping_add(i) }; + self_b.key_sum[0] ^= other_b.key_sum[0]; + self_b.key_sum[1] ^= other_b.key_sum[1]; + self_b.key_sum[2] ^= other_b.key_sum[2]; + self_b.key_sum[3] ^= other_b.key_sum[3]; + self_b.key_sum[4] ^= other_b.key_sum[4]; + self_b.key_sum[5] ^= other_b.key_sum[5]; self_b.check_hash_sum ^= other_b.check_hash_sum; self_b.count = i64::from_le(self_b.count).wrapping_sub(i64::from_le(other_b.count)).to_le(); } } } - /// Call a function for every value that can be extracted from this IBLT. + /// Extract every enumerable value from this IBLT. /// - /// The function is called with the key and a boolean. The boolean is meaningful - /// if this IBLT is the result of subtract(). In that case the boolean is true - /// if the "local" IBLT contained the item and false if the "remote" side contained - /// the item. - /// - /// The starting_singular_bucket parameter must be the internal index of a - /// bucket with only one entry (1 or -1). It can be obtained from the return - /// values of either subtract() or singular_bucket(). - pub fn list bool>(&mut self, mut f: F) { - let buckets = self.map.len(); - let mut singular_buckets: Vec = Vec::with_capacity(buckets); + /// This consumes the IBLT instance since listing requires destructive modification + /// of the digest data. + pub fn list bool>(self, mut f: F) { + let buckets = self.buckets; + let mut singular_buckets: Vec = Vec::with_capacity(buckets + 2); for i in 0..buckets { - if unsafe { self.map.get_unchecked(i) }.is_singular() { - singular_buckets.push(i as u32); - }; + unsafe { + if (&*self.map.wrapping_add(i)).is_singular() { + singular_buckets.push(i as u32); + } + } } let mut key = [0_u64; IDENTITY_HASH_SIZE / 8]; - while !singular_buckets.is_empty() { - let b = unsafe { self.map.get_unchecked_mut(singular_buckets.pop().unwrap() as usize) }; + let mut bucket_ptr = 0; + while bucket_ptr < singular_buckets.len() { + let b = unsafe { &*self.map.wrapping_add(*singular_buckets.get_unchecked(bucket_ptr) as usize) }; + bucket_ptr += 1; if b.is_singular() { - for j in 0..(IDENTITY_HASH_SIZE / 8) { - key[j] = b.key_sum[j]; - } - if f(unsafe { &*key.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() }, b.count == 1) { + key[0] = b.key_sum[0]; + key[1] = b.key_sum[1]; + key[2] = b.key_sum[2]; + key[3] = b.key_sum[3]; + key[4] = b.key_sum[4]; + key[5] = b.key_sum[5]; + if f(unsafe { &*key.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() }) { let check_hash = u64::from_le(key[0]).wrapping_add(xorshift64(u64::from_le(key[1]))).to_le(); for mapping_sub_hash in 0..KEY_MAPPING_ITERATIONS { let bi = (u64::from_le(unsafe { *key.get_unchecked(mapping_sub_hash) }) as usize) % buckets; - let b = unsafe { self.map.get_unchecked_mut(bi) }; - for j in 0..(IDENTITY_HASH_SIZE / 8) { - b.key_sum[j] ^= key[j]; - } + let b = unsafe { &mut *self.map.wrapping_add(bi) }; + b.key_sum[0] ^= key[0]; + b.key_sum[1] ^= key[1]; + b.key_sum[2] ^= key[2]; + b.key_sum[3] ^= key[3]; + b.key_sum[4] ^= key[4]; + b.key_sum[5] ^= key[5]; b.check_hash_sum ^= check_hash; b.count = i64::from_le(b.count).wrapping_sub(1).to_le(); if b.is_singular() { @@ -193,35 +182,77 @@ impl IBLT { } } +impl AsRef<[u8]> for IBLT { + /// Get this IBLT in raw byte array form. + #[inline(always)] + fn as_ref(&self) -> &[u8] { self.as_bytes() } +} + #[cfg(test)] mod tests { use zerotier_core_crypto::hash::SHA384; use crate::iblt::*; + #[test] + fn compiler_behavior() { + // A number of things above like unrolled key XORing must be changed if this size is changed. + assert_eq!(IDENTITY_HASH_SIZE, 48); + assert!(KEY_MAPPING_ITERATIONS <= (IDENTITY_HASH_SIZE / 8) && (IDENTITY_HASH_SIZE % 8) == 0); + + // Make sure this packed struct is actually packed. + assert_eq!(size_of::(), IDENTITY_HASH_SIZE + 8 + 8); + } + #[allow(unused_variables)] #[test] fn insert_and_list() { - assert_eq!(size_of::(), IDENTITY_HASH_SIZE + 8 + 8); - assert!(KEY_MAPPING_ITERATIONS <= (IDENTITY_HASH_SIZE / 8) && (IDENTITY_HASH_SIZE % 8) == 0); - - for expected_cnt in 0..800 { - let mut t = IBLT::new(1000); - for i in 0..expected_cnt { - let k = SHA384::hash(&((i + expected_cnt) as u32).to_le_bytes()); - t.insert(&k); + for _ in 0..10 { + for expected_cnt in 0..768 { + let random_u64 = zerotier_core_crypto::random::xorshift64_random(); + let mut t = IBLT::new(2048); + for i in 0..expected_cnt { + let k = SHA384::hash(&((i + random_u64) as u64).to_le_bytes()); + t.insert(&k); + } + let mut cnt = 0; + t.list(|k| { + cnt += 1; + true + }); + assert_eq!(cnt, expected_cnt); } - let mut cnt = 0; - t.list(|k, d| { - cnt += 1; - //println!("{} {}", zerotier_core_crypto::hex::to_string(k), d); - true - }); - //println!("retrieved {} keys", cnt); - assert_eq!(cnt, expected_cnt); } } + #[allow(unused_variables)] #[test] - fn benchmark() { + fn set_reconciliation() { + for _ in 0..10 { + let random_u64 = zerotier_core_crypto::random::xorshift64_random(); + let mut alice = IBLT::new(2048); + let mut bob = IBLT::new(2048); + let mut alice_total = 0_i32; + let mut bob_total = 0_i32; + for i in 0..1500 { + let k = SHA384::hash(&((i ^ random_u64) as u64).to_le_bytes()); + if (k[0] & 1) == 1 { + alice.insert(&k); + alice_total += 1; + } + if (k[0] & 3) == 2 { + bob.insert(&k); + bob_total += 1; + } + } + alice.subtract(&bob); + let mut diff_total = 0_i32; + alice.list(|k| { + diff_total += 1; + true + }); + // This is a probabilistic process so we tolerate a little bit of failure. The idea is that each + // pass reconciles more and more differences. + assert!(((alice_total + bob_total) - diff_total).abs() <= 128); + } } } diff --git a/allthethings/src/lib.rs b/allthethings/src/lib.rs index 18e657688..6d039690d 100644 --- a/allthethings/src/lib.rs +++ b/allthethings/src/lib.rs @@ -12,20 +12,8 @@ mod protocol; mod varint; mod memorystore; mod iblt; - -pub struct Config { - /// Number of P2P connections desired. - pub target_link_count: usize, - - /// Maximum allowed size of an object. - pub max_object_size: usize, - - /// TCP port to which this should bind. - pub tcp_port: u16, - - /// A name for this replicated data set. This is just used to prevent linking to peers replicating different data. - pub domain: String, -} +mod config; +mod link; pub(crate) fn ms_since_epoch() -> u64 { std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64 @@ -35,8 +23,19 @@ pub(crate) fn ms_monotonic() -> u64 { std::time::Instant::now().elapsed().as_millis() as u64 } +#[inline(always)] +pub(crate) async fn io_timeout>>(d: std::time::Duration, f: F) -> smol::io::Result { + smol::future::or(f, async { + let _ = smol::Timer::after(d).await; + Err(smol::io::Error::new(smol::io::ErrorKind::TimedOut, "I/O timeout")) + }).await +} + /// SHA384 is the hash currently used. Others could be supported in the future. +/// If this size changes check iblt.rs for a few things that must be changed. This +/// is checked in "cargo test." pub const IDENTITY_HASH_SIZE: usize = 48; +pub use config::Config; pub use store::{Store, StorePutResult}; -pub use replicator::Replicator; +//pub use replicator::Replicator; diff --git a/allthethings/src/link.rs b/allthethings/src/link.rs new file mode 100644 index 000000000..50afe1e76 --- /dev/null +++ b/allthethings/src/link.rs @@ -0,0 +1,251 @@ +use std::mem::MaybeUninit; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use smol::lock::Mutex; +use smol::net::{SocketAddr, TcpStream}; + +use zerotier_core_crypto::gmac::SequentialNonceGMAC; +use zerotier_core_crypto::hash::SHA384; +use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384; +use zerotier_core_crypto::p521::{P521KeyPair, P521PublicKey}; +use zerotier_core_crypto::secret::Secret; + +use crate::{Config, io_timeout, ms_monotonic, ms_since_epoch, varint}; +use crate::protocol::*; + +#[inline(always)] +fn decode_msgpack<'de, T: Deserialize<'de>>(data: &'de [u8]) -> smol::io::Result { + rmp_serde::from_read_ref(data).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid msgpack data")) +} + +struct OutputStream { + stream: BufWriter, + gmac: Option, +} + +/// A TCP link between this node and another. +pub(crate) struct Link<'a, 'b> { + node_secret: &'a P521KeyPair, + config: &'b Config, + remote_node_id: parking_lot::Mutex>, + reader: Mutex>, + writer: Mutex, + pub remote_addr: SocketAddr, + pub connect_time: u64, + pub authenticated: AtomicBool, + keepalive_period: u64, + last_send_time: AtomicU64, + max_message_size: usize, +} + +impl<'a, 'b> Link<'a, 'b> { + pub fn new(stream: TcpStream, remote_addr: SocketAddr, connect_time: u64, node_secret: &'a P521KeyPair, config: &'b Config) -> Self { + let _ = stream.set_nodelay(false); + let max_message_size = HELLO_SIZE_MAX.max(config.max_message_size); + Self { + node_secret, + config, + remote_node_id: parking_lot::Mutex::new(None), + reader: Mutex::new(BufReader::with_capacity((max_message_size + 16).max(16384), stream.clone())), + writer: Mutex::new(OutputStream { + stream: BufWriter::with_capacity(max_message_size + 16, stream), + gmac: None + }), + remote_addr, + connect_time, + authenticated: AtomicBool::new(false), + keepalive_period: (config.io_timeout * 1000) / 2, + last_send_time: AtomicU64::new(ms_monotonic()), + max_message_size + } + } + + /// Get the remote node ID, which is SHA384(its long-term public keys). + /// Returns None if the remote node has not yet responded with HelloAck and been verified. + pub fn remote_node_id(&self) -> Option<[u8; 48]> { self.remote_node_id.lock().clone() } + + /// Send message and increment outgoing GMAC nonce. + async fn write_message(&self, timeout: Duration, message_type: u8, message: &[u8]) -> smol::io::Result<()> { + let mut mac: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() }; + let mt = [message_type]; + + let mut writer = self.writer.lock().await; + + writer.gmac.as_mut().map_or_else(|| { + Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "link negotiation is not complete")) + }, |gmac| { + gmac.init_for_next_message(); + gmac.update(&mt); + gmac.update(message); + gmac.finish(&mut mac); + Ok(()) + })?; + + io_timeout(timeout, writer.stream.write_all(&mt)).await?; + io_timeout(timeout, varint::async_write(&mut writer.stream, message.len() as u64)).await?; + io_timeout(timeout, writer.stream.write_all(message)).await?; + io_timeout(timeout, writer.stream.write_all(&mac)).await?; + io_timeout(timeout, writer.stream.flush()).await + } + + /// Serialize object with msgpack and send, increment outgoing GMAC nonce. + async fn write_message_msgpack(&self, timeout: Duration, serialize_buf: &mut Vec, message_type: u8, message: &T) -> smol::io::Result<()> { + serialize_buf.clear(); + rmp_serde::encode::write(serialize_buf, message).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "msgpack encode failure"))?; + self.write_message(timeout, message_type, serialize_buf.as_slice()).await + } + + /// Send a keepalive if necessary. + pub async fn send_keepalive_if_needed(&self, now_monotonic: u64) { + if now_monotonic.saturating_sub(self.last_send_time.load(Ordering::Relaxed)) >= self.keepalive_period && self.authenticated.load(Ordering::Relaxed) { + self.last_send_time.store(now_monotonic, Ordering::Relaxed); + let timeout = Duration::from_secs(1); + let mut writer = self.writer.lock().await; + io_timeout(timeout, writer.stream.write_all(&[MESSAGE_TYPE_KEEPALIVE])).await; + io_timeout(timeout, writer.stream.flush()).await; + } + } + + /// Launched as an async task for each new link. + pub async fn io_main(&self) -> smol::io::Result<()> { + // Reader is held here for the duration of the link's I/O loop. + let mut reader_mg = self.reader.lock().await; + let reader = &mut *reader_mg; + + let mut read_buf: Vec = Vec::new(); + read_buf.resize(self.max_message_size, 0); + let mut serialize_buf: Vec = Vec::with_capacity(4096); + let timeout = Duration::from_secs(self.config.io_timeout); + + // (1) Send Hello and save the nonce and the hash of the raw Hello message for later HelloAck HMAC check. + let mut gmac_send_nonce_initial = [0_u8; 16]; + zerotier_core_crypto::random::fill_bytes_secure(&mut gmac_send_nonce_initial); + let ephemeral_secret = P521KeyPair::generate(true).unwrap(); + let sent_hello_hash = { + serialize_buf.clear(); + let _ = rmp_serde::encode::write(&mut serialize_buf, &Hello { + protocol_version: PROTOCOL_VERSION, + flags: 0, + clock: ms_since_epoch(), + domain: self.config.domain.as_str(), + nonce: &gmac_send_nonce_initial, + p521_ecdh_ephemeral_key: ephemeral_secret.public_key_bytes(), + p521_ecdh_node_key: self.node_secret.public_key_bytes(), + }).unwrap(); + + let mut writer = self.writer.lock().await; + io_timeout(timeout, varint::async_write(&mut writer.stream, serialize_buf.len() as u64)).await?; + io_timeout(timeout, writer.stream.write_all(serialize_buf.as_slice())).await?; + io_timeout(timeout, writer.stream.flush()).await?; + drop(writer); + + SHA384::hash(serialize_buf.as_slice()) + }; + + // (2) Read other side's HELLO and send ACK. Also do key agreement, initialize GMAC, etc. + let message_size = io_timeout(timeout, varint::async_read(reader)).await? as usize; + if message_size > HELLO_SIZE_MAX { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large")); + } + let (mut gmac_receive, ack_key, remote_node_id) = { + let hello_buf = &mut read_buf.as_mut_slice()[0..message_size]; + io_timeout(timeout, reader.read_exact(hello_buf)).await?; + let received_hello_hash = SHA384::hash(hello_buf); // for ACK generation + let hello: Hello = decode_msgpack(hello_buf)?; + + if hello.nonce.len() < 16 || hello.protocol_version != PROTOCOL_VERSION { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid HELLO parameters")); + } + + let remote_node_public_key = P521PublicKey::from_bytes(hello.p521_ecdh_node_key); + let remote_ephemeral_public_key = P521PublicKey::from_bytes(hello.p521_ecdh_ephemeral_key); + if remote_node_public_key.is_none() || remote_ephemeral_public_key.is_none() { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid public key in HELLO")); + } + let node_shared_key = self.node_secret.agree(&remote_node_public_key.unwrap()); + let ephemeral_shared_key = ephemeral_secret.agree(&remote_ephemeral_public_key.unwrap()); + if node_shared_key.is_none() || ephemeral_shared_key.is_none() { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "key agreement failed")); + } + let shared_key = Secret(SHA384::hmac(&SHA384::hash(ephemeral_shared_key.unwrap().as_bytes()), node_shared_key.unwrap().as_bytes())); + + let gmac_key = zt_kbkdf_hmac_sha384(shared_key.as_bytes(), KBKDF_LABEL_GMAC, 0, 0); + let ack_key = zt_kbkdf_hmac_sha384(shared_key.as_bytes(), KBKDF_LABEL_HELLO_ACK_HMAC, 0, 0); + + let gmac_receive = SequentialNonceGMAC::new(&gmac_key.0[0..32], &hello.nonce[0..16]); + self.writer.lock().await.gmac.replace(SequentialNonceGMAC::new(&gmac_key.0[0..32], &gmac_send_nonce_initial)); + + let ack_hmac = SHA384::hmac(ack_key.as_bytes(), &received_hello_hash); + self.write_message_msgpack(timeout, &mut serialize_buf, MESSAGE_TYPE_HELLO_ACK, &HelloAck { + ack: &ack_hmac, + clock_echo: hello.clock + }).await?; + + (gmac_receive, ack_key, SHA384::hash(hello.p521_ecdh_node_key)) + }; + + self.last_send_time.store(ms_monotonic(), Ordering::Relaxed); + + // Done with ephemeral secret key, so forget it. + drop(ephemeral_secret); + + // (3) Start primary I/O loop and initially listen for HelloAck to confirm the other side's node identity. + let mut received_mac_buf = [0_u8; 16]; + let mut expected_mac_buf = [0_u8; 16]; + let mut message_type_buf = [0_u8; 1]; + let mut authenticated = false; + loop { + io_timeout(timeout, reader.read_exact(&mut message_type_buf)).await?; + + // NOP is a single byte keepalive, so skip. Otherwise handle actual messages. + if message_type_buf[0] != MESSAGE_TYPE_KEEPALIVE { + let message_size = io_timeout(timeout, varint::async_read(reader)).await? as usize; + if message_size > self.max_message_size { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large")); + } + let message_buf = &mut read_buf.as_mut_slice()[0..message_size]; + io_timeout(timeout, reader.read_exact(message_buf)).await?; + io_timeout(timeout, reader.read_exact(&mut received_mac_buf)).await?; + + gmac_receive.init_for_next_message(); + gmac_receive.update(&message_type_buf); + gmac_receive.update(message_buf); + gmac_receive.finish(&mut expected_mac_buf); + if !received_mac_buf.eq(&expected_mac_buf) { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message authentication failed")); + } + + if authenticated { + match message_type_buf[0] { + MESSAGE_TYPE_HELLO_ACK => { + // Multiple HelloAck messages don't make sense. + }, + MESSAGE_TYPE_OBJECTS => {}, + MESSAGE_TYPE_HAVE_OBJECTS => {}, + MESSAGE_TYPE_WANT_OBJECTS => {}, + MESSAGE_TYPE_IBLT_SYNC_REQUEST => {}, + MESSAGE_TYPE_IBLT_SYNC_DIGEST => {}, + _ => {}, + } + } else { + if message_type_buf[0] == MESSAGE_TYPE_HELLO_ACK { + let hello_ack: HelloAck = decode_msgpack(message_buf)?; + let expected_ack_hmac = SHA384::hmac(ack_key.as_bytes(), &sent_hello_hash); + if !hello_ack.ack.eq(&expected_ack_hmac) { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "session authentication failed")); + } + + authenticated = true; + let _ = self.remote_node_id.lock().replace(remote_node_id.clone()); + self.authenticated.store(true, Ordering::Relaxed); + } else { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message received before session authenticated")); + } + } + } + } + } +} diff --git a/allthethings/src/memorystore.rs b/allthethings/src/memorystore.rs index 95466a964..ed0af475d 100644 --- a/allthethings/src/memorystore.rs +++ b/allthethings/src/memorystore.rs @@ -1,79 +1,98 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + use std::collections::Bound::Included; use std::collections::BTreeMap; -use std::io::Write; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Mutex; +use std::sync::Arc; -use smol::net::SocketAddr; - -use zerotier_core_crypto::random::xorshift64_random; +use parking_lot::Mutex; use crate::{IDENTITY_HASH_SIZE, ms_since_epoch, Store, StorePutResult}; /// A Store that stores all objects in memory, mostly for testing. -pub struct MemoryStore(Mutex>>, Mutex>, AtomicU64); +pub struct MemoryStore(Mutex, u64)>>); impl MemoryStore { - pub fn new() -> Self { Self(Mutex::new(BTreeMap::new()), Mutex::new(Vec::new()), AtomicU64::new(u64::MAX)) } + pub fn new() -> Self { Self(Mutex::new(BTreeMap::new())) } } impl Store for MemoryStore { - fn get(&self, _reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], buffer: &mut Vec) -> bool { - buffer.clear(); - self.0.lock().unwrap().get(identity_hash).map_or(false, |value| { - let _ = buffer.write_all(value.as_slice()); - true + type Object = Arc<[u8]>; + + #[inline(always)] + fn local_time(&self) -> u64 { + ms_since_epoch() + } + + fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option { + self.0.lock().get(identity_hash).and_then(|o| { + if (*o).1 <= reference_time { + Some((*o).0.clone()) + } else { + None + } }) } - fn put(&self, _reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult { + fn put(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult { let mut result = StorePutResult::Duplicate; - let _ = self.0.lock().unwrap().entry(identity_hash.clone()).or_insert_with(|| { - self.2.store(ms_since_epoch(), Ordering::Relaxed); + let _ = self.0.lock().entry(identity_hash.clone()).or_insert_with(|| { result = StorePutResult::Ok; - object.to_vec() + (object.to_vec().into(), reference_time) }); result } - fn have(&self, _reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool { - self.0.lock().unwrap().contains_key(identity_hash) + fn have(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool { + self.0.lock().get(identity_hash).map_or(false, |o| (*o).1 <= reference_time) } - fn total_count(&self, _reference_time: u64) -> Option { - Some(self.0.lock().unwrap().len() as u64) + fn total_count(&self, reference_time: u64) -> Option { + let mut tc = 0_u64; + for e in self.0.lock().iter() { + tc += ((*e.1).1 <= reference_time) as u64; + } + Some(tc) } - fn last_object_receive_time(&self) -> Option { - let rt = self.2.load(Ordering::Relaxed); - if rt == u64::MAX { - None - } else { - Some(rt) + fn for_each) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], mut f: F) { + let mut tmp: Vec<([u8; IDENTITY_HASH_SIZE], Arc<[u8]>)> = Vec::with_capacity(1024); + for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() { + if (*e.1).1 <= reference_time { + tmp.push((e.0.clone(), (*e.1).0.clone())); + } + } + for e in tmp.iter() { + if !f(&(*e).0, &(*e).1) { + break; + } } } - fn count(&self, _reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option { - if start.le(end) { - Some(self.0.lock().unwrap().range((Included(*start), Included(*end))).count() as u64) - } else { - None + fn for_each_identity_hash bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], mut f: F) { + let mut tmp: Vec<[u8; IDENTITY_HASH_SIZE]> = Vec::with_capacity(1024); + for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() { + if (*e.1).1 <= reference_time { + tmp.push(e.0.clone()); + } + } + for e in tmp.iter() { + if !f(e) { + break; + } } } - fn save_remote_endpoint(&self, to_address: &SocketAddr) { - let mut sv = self.1.lock().unwrap(); - if !sv.contains(to_address) { - sv.push(to_address.clone()); - } - } - - fn get_remote_endpoint(&self) -> Option { - let sv = self.1.lock().unwrap(); - if sv.is_empty() { - None - } else { - sv.get((xorshift64_random() as usize) % sv.len()).cloned() + fn count(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option { + let mut tc = 0_u64; + for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() { + tc += ((*e.1).1 <= reference_time) as u64; } + Some(tc) } } diff --git a/allthethings/src/protocol.rs b/allthethings/src/protocol.rs index 357c10a21..b128cf389 100644 --- a/allthethings/src/protocol.rs +++ b/allthethings/src/protocol.rs @@ -6,38 +6,138 @@ * https://www.zerotier.com/ */ -pub const PROTOCOL_VERSION: u8 = 1; -pub const HASH_ALGORITHM_SHA384: u8 = 1; +/* + * Wire protocol notes: + * + * Messages are prefixed by a type byte followed by a message size in the form + * of a variable length integer (varint). Each message is followed by a + * 16-byte GMAC message authentication code. + * + * HELLO is an exception. It's sent on connect, is prefixed only by a varint + * size, and is not followed by a MAC. Instead HelloAck is sent after it + * containing a full HMAC ACK for key negotiation. + * + * GMAC is keyed using a KBKDF-derived key from a shared key currently made + * with HKDF as HMAC(SHA384(ephemeral key), node key). The first 32 bytes of + * the key are the GMAC key while the nonce is the first 96 bytes of + * the nonce where this 96-bit integer is incremented (as little-endian) + * for each message sent. Increment should wrap at 2^96. The connection should + * close after no more than 2^96 messages, but that's a crazy long session + * anyway. + * + * The wire protocol is only authenticated to prevent network level attacks. + * Data is not encrypted since this is typically used to replicate a public + * "well known" data set and encryption would add needless overhead. + */ -pub const MESSAGE_TYPE_NOP: u8 = 0; -pub const MESSAGE_TYPE_HAVE_NEW_OBJECT: u8 = 1; -pub const MESSAGE_TYPE_OBJECT: u8 = 2; -pub const MESSAGE_TYPE_GET_OBJECTS: u8 = 3; +use serde::{Deserialize, Serialize}; -/// HELLO message, which is all u8's and is packed and so can be parsed directly in place. -/// This message is sent at the start of any connection by both sides. -#[repr(packed)] -pub struct Hello { - pub hello_size: u8, // technically a varint but below 0x80 - pub protocol_version: u8, - pub hash_algorithm: u8, - pub flags: [u8; 4], // u32, little endian - pub clock: [u8; 8], // u64, little endian - pub last_object_receive_time: [u8; 8], // u64, little endian, u64::MAX if unspecified - pub domain_hash: [u8; 48], - pub instance_id: [u8; 16], - pub loopback_check_code_salt: [u8; 16], - pub loopback_check_code: [u8; 16], +/// KBKDF label for the HMAC in HelloAck. +pub const KBKDF_LABEL_HELLO_ACK_HMAC: u8 = b'A'; + +/// KBKDF label for GMAC key derived from main key. +pub const KBKDF_LABEL_GMAC: u8 = b'G'; + +/// Sanity limit on the size of HELLO. +pub const HELLO_SIZE_MAX: usize = 4096; + +/// Overall protocol version. +pub const PROTOCOL_VERSION: u16 = 1; + +/// No operation, no payload, sent without size or MAC. +/// This is a special single byte message used for connection keepalive. +pub const MESSAGE_TYPE_KEEPALIVE: u8 = 0; + +/// Acknowledgement of HELLO. +pub const MESSAGE_TYPE_HELLO_ACK: u8 = 1; + +/// A series of objects with each prefixed by a varint size. +pub const MESSAGE_TYPE_OBJECTS: u8 = 2; + +/// A series of identity hashes concatenated together advertising objects we have. +pub const MESSAGE_TYPE_HAVE_OBJECTS: u8 = 3; + +/// A series of identity hashes concatenated together of objects being requested. +pub const MESSAGE_TYPE_WANT_OBJECTS: u8 = 4; + +/// Request IBLT synchronization, payload is IBLTSyncRequest. +pub const MESSAGE_TYPE_IBLT_SYNC_REQUEST: u8 = 5; + +/// IBLT sync digest, payload is IBLTSyncDigest. +pub const MESSAGE_TYPE_IBLT_SYNC_DIGEST: u8 = 6; + +/// Initial message sent by both sides on TCP connection establishment. +/// This is sent with no type or message authentication code and is only +/// sent on connect. It is prefixed by a varint size. +#[derive(Deserialize, Serialize)] +pub struct Hello<'a> { + /// Local value of PROTOCOL_VERSION. + pub protocol_version: u16, + /// Flags, currently unused and always zero. + pub flags: u64, + /// Local clock in milliseconds since Unix epoch. + pub clock: u64, + /// The data set name ("domain") to which this node belongs. + pub domain: &'a str, + /// Random nonce, must be at least 16 bytes in length. + pub nonce: &'a [u8], + /// Random ephemeral ECDH session key. + pub p521_ecdh_ephemeral_key: &'a [u8], + /// Long-lived node-identifying ECDH public key. + pub p521_ecdh_node_key: &'a [u8], } -#[cfg(test)] -mod tests { - use std::mem::size_of; - use crate::protocol::*; - - #[test] - fn check_sizing() { - // Make sure packed structures are really packed. - assert_eq!(size_of::(), 1 + 1 + 1 + 4 + 8 + 8 + 48 + 16 + 16 + 16); - } +/// Sent in response to Hello. +#[derive(Deserialize, Serialize)] +pub struct HelloAck<'a> { + /// HMAC-SHA384(KBKDF(key, KBKDF_LABEL_HELLO_ACK_HMAC), SHA384(original raw Hello)) + pub ack: &'a [u8], + /// Value of clock in original hello, for measuring latency. + pub clock_echo: u64, +} + +/// Request an IBLT set digest to assist with synchronization. +/// +/// The peer may respond in one of three ways: +/// +/// (1) It may send an IBLTSyncDigest over a range of identity hashes of its +/// choice so that the requesting node may compute a difference and request +/// objects it does not have. +/// +/// (2) It may send HAVE_OBJECTS with a simple list of objects. +/// +/// (3) It may simply send a batch of objects. +/// +/// (4) It may not respond at all. +/// +/// Which option is chosen is up to the responding node and should be chosen +/// via a heuristic to maximize sync efficiency and minimize sync time. +/// +/// A central assumption is that identity hashes are uniformly distributed +/// since they are cryptographic hashes (currently SHA-384). This allows a +/// simple calculation to be made with the sending node's total count to +/// estimate set difference density across the entire hash range. +#[derive(Deserialize, Serialize)] +pub struct IBLTSyncRequest { + /// Total number of hashes in entire data set (on our side). + pub total_count: u64, + /// Our clock to use as a reference time for filtering the data set (if applicable). + pub reference_time: u64, +} + +/// An IBLT digest of identity hashes over a range. +#[derive(Deserialize, Serialize)] +pub struct IBLTSyncDigest<'a> { + /// Start of range. Right-pad with zeroes if too short. + pub range_start: &'a [u8], + /// End of range. Right-pad with zeroes if too short. + pub range_end: &'a [u8], + /// IBLT digest of hashes in this range. + pub iblt: &'a [u8], + /// Number of hashes in this range. + pub count: u64, + /// Total number of hashes in entire data set. + pub total_count: u64, + /// Reference time from IBLTSyncRequest or 0 if this is being sent synthetically. + pub reference_time: u64, } diff --git a/allthethings/src/replicator.rs b/allthethings/src/replicator.rs index b27b15bb3..18ca92fa3 100644 --- a/allthethings/src/replicator.rs +++ b/allthethings/src/replicator.rs @@ -6,6 +6,9 @@ * https://www.zerotier.com/ */ + + +/* use std::collections::HashMap; use std::convert::TryInto; use std::error::Error; @@ -18,7 +21,7 @@ use std::time::Duration; use smol::{Executor, Task, Timer}; use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use smol::lock::Mutex; -use smol::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream}; +use smol::net::*; use smol::stream::StreamExt; use zerotier_core_crypto::hash::SHA384; @@ -51,36 +54,47 @@ struct Connection { task: Task<()>, } -struct ReplicatorImpl<'ex> { +struct ReplicatorImpl<'ex, S: 'static + Store> { executor: Arc>, instance_id: [u8; 16], loopback_check_code_secret: [u8; 48], domain_hash: [u8; 48], - store: Arc, + store: Arc, config: Config, connections: Mutex>, connections_in_progress: Mutex>>, announced_objects_requested: Mutex>, } -pub struct Replicator<'ex> { +pub struct Replicator<'ex, S: 'static + Store> { v4_listener_task: Option>, v6_listener_task: Option>, background_cleanup_task: Task<()>, - _impl: Arc> + _impl: Arc>, } -impl<'ex> Replicator<'ex> { +impl<'ex, S: 'static + Store> Replicator<'ex, S> { /// Create a new replicator to replicate the contents of the provided store. /// All async tasks, sockets, and connections will be dropped if the replicator is dropped. - pub async fn start(executor: &Arc>, store: Arc, config: Config) -> Result, Box> { - let listener_v4 = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)).await; - let listener_v6 = TcpListener::bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)).await; + pub async fn start(executor: &Arc>, store: Arc, config: Config) -> Result, Box> { + let listener_v4 = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v4| { + let _ = v4.set_reuse_address(true); + let _ = v4.bind(&socket2::SockAddr::from(std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, config.tcp_port)))?; + let _ = v4.listen(64); + Ok(v4) + }); + let listener_v6 = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v6| { + let _ = v6.set_only_v6(true); + let _ = v6.set_reuse_address(true); + let _ = v6.bind(&socket2::SockAddr::from(std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)))?; + let _ = v6.listen(64); + Ok(v6) + }); if listener_v4.is_err() && listener_v6.is_err() { return Err(Box::new(listener_v4.unwrap_err())); } - let r = Arc::new(ReplicatorImpl::<'ex> { + let r = Arc::new(ReplicatorImpl::<'ex, S> { executor: executor.clone(), instance_id: { let mut tmp = [0_u8; 16]; @@ -101,20 +115,24 @@ impl<'ex> Replicator<'ex> { }); Ok(Self { - v4_listener_task: listener_v4.map_or(None, |listener_v4| Some(executor.spawn(r.clone().tcp_listener_task(listener_v4)))), - v6_listener_task: listener_v6.map_or(None, |listener_v6| Some(executor.spawn(r.clone().tcp_listener_task(listener_v6)))), + v4_listener_task: listener_v4.map_or(None, |listener_v4| { + Some(executor.spawn(r.clone().tcp_listener_task(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v4)).unwrap()))) + }), + v6_listener_task: listener_v6.map_or(None, |listener_v6| { + Some(executor.spawn(r.clone().tcp_listener_task(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v6)).unwrap()))) + }), background_cleanup_task: executor.spawn(r.clone().background_cleanup_task()), - _impl: r + _impl: r, }) } } -unsafe impl<'ex> Send for Replicator<'ex> {} +unsafe impl<'ex, S: 'static + Store> Send for Replicator<'ex, S> {} -unsafe impl<'ex> Sync for Replicator<'ex> {} +unsafe impl<'ex, S: 'static + Store> Sync for Replicator<'ex, S> {} -impl<'ex> ReplicatorImpl<'ex> { - async fn background_cleanup_task(self: Arc>) { +impl<'ex, S: 'static + Store> ReplicatorImpl<'ex, S> { + async fn background_cleanup_task(self: Arc>) { let mut timer = smol::Timer::interval(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS / 10)); loop { timer.next().await; @@ -152,7 +170,7 @@ impl<'ex> ReplicatorImpl<'ex> { } } - async fn tcp_listener_task(self: Arc>, listener: TcpListener) { + async fn tcp_listener_task(self: Arc>, listener: TcpListener) { loop { let stream = listener.accept().await; if stream.is_ok() { @@ -166,7 +184,7 @@ impl<'ex> ReplicatorImpl<'ex> { } } - async fn handle_new_connection(self: Arc>, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool) { + async fn handle_new_connection(self: Arc>, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool) { let _ = stream.set_nodelay(true); let mut loopback_check_code_salt = [0_u8; 16]; @@ -175,9 +193,8 @@ impl<'ex> ReplicatorImpl<'ex> { hello_size: size_of::() as u8, protocol_version: protocol::PROTOCOL_VERSION, hash_algorithm: protocol::HASH_ALGORITHM_SHA384, - flags: [0_u8; 4], + flags: if outgoing { protocol::HELLO_FLAG_OUTGOING } else { 0 }, clock: ms_since_epoch().to_le_bytes(), - last_object_receive_time: self.store.last_object_receive_time().unwrap_or(u64::MAX).to_le_bytes(), domain_hash: self.domain_hash.clone(), instance_id: self.instance_id.clone(), loopback_check_code_salt, @@ -201,11 +218,6 @@ impl<'ex> ReplicatorImpl<'ex> { let s2 = self.clone(); let _ = connections.entry(k).or_insert_with(move || { let _ = stream.set_nodelay(false); - - if outgoing { - s2.store.save_remote_endpoint(&remote_address); - } - let last_receive = Arc::new(AtomicU64::new(ms_monotonic())); Connection { remote_address, @@ -221,7 +233,7 @@ impl<'ex> ReplicatorImpl<'ex> { let _task = self.connections_in_progress.lock().await.remove(&remote_address); } - async fn connection_io_task(self: Arc>, stream: TcpStream, remote_instance_id: [u8; 16], last_receive: Arc) { + async fn connection_io_task(self: Arc>, stream: TcpStream, remote_instance_id: [u8; 16], last_receive: Arc) { let mut reader = BufReader::with_capacity(65536, stream.clone()); let writer = Arc::new(Mutex::new(stream)); @@ -354,6 +366,7 @@ impl<'ex> ReplicatorImpl<'ex> { } } -unsafe impl<'ex> Send for ReplicatorImpl<'ex> {} +unsafe impl<'ex, S: 'static + Store> Send for ReplicatorImpl<'ex, S> {} -unsafe impl<'ex> Sync for ReplicatorImpl<'ex> {} +unsafe impl<'ex, S: 'static + Store> Sync for ReplicatorImpl<'ex, S> {} + */ diff --git a/allthethings/src/store.rs b/allthethings/src/store.rs index b65a79ddf..93d113e0b 100644 --- a/allthethings/src/store.rs +++ b/allthethings/src/store.rs @@ -6,13 +6,9 @@ * https://www.zerotier.com/ */ -use smol::net::SocketAddr; - use crate::IDENTITY_HASH_SIZE; -pub const MIN_IDENTITY_HASH: [u8; 48] = [0_u8; 48]; -pub const MAX_IDENTITY_HASH: [u8; 48] = [0xff_u8; 48]; - +/// Result returned by Store::put(). pub enum StorePutResult { /// Datum stored successfully. Ok, @@ -26,10 +22,14 @@ pub enum StorePutResult { /// Trait that must be implemented by the data store that is to be replicated. pub trait Store: Sync + Send { - /// Get an object from the database, storing it in the supplied buffer. - /// A return of 'false' leaves the buffer state undefined. If the return is true any previous - /// data in the supplied buffer will have been cleared and replaced with the retrieved object. - fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], buffer: &mut Vec) -> bool; + /// Type returned by get(), which can be anything that contains a byte slice. + type Object: AsRef<[u8]>; + + /// Get the local time in milliseconds since Unix epoch. + fn local_time(&self) -> u64; + + /// Get an object from the database. + fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option; /// Store an entry in the database. fn put(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult; @@ -40,18 +40,15 @@ pub trait Store: Sync + Send { /// Get the total count of objects. fn total_count(&self, reference_time: u64) -> Option; - /// Get the time the last object was received in milliseconds since epoch. - fn last_object_receive_time(&self) -> Option; + /// Iterate over a range of identity hashes and values. + /// This calls the supplied function for each object. If the function returns false iteration stops. + fn for_each bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], f: F); + + /// Iterate over a range of identity hashes. + /// This calls the supplied function for each hash. If the function returns false iteration stops. + fn for_each_identity_hash bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], f: F); /// Count the number of identity hash keys in this range (inclusive) of identity hashes. /// This may return None if an error occurs, but should return 0 if the set is empty. fn count(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option; - - /// Called when a connection to a remote node was successful. - /// This is always called on successful outbound connect. - fn save_remote_endpoint(&self, to_address: &SocketAddr); - - /// Get a remote endpoint to try. - /// This can return endpoints in any order and is used to try to establish outbound links. - fn get_remote_endpoint(&self) -> Option; } diff --git a/allthethings/src/varint.rs b/allthethings/src/varint.rs index c96500754..9c83b51ea 100644 --- a/allthethings/src/varint.rs +++ b/allthethings/src/varint.rs @@ -8,13 +8,7 @@ use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt}; -/// Byte that can be written for a zero varint. -pub const ZERO: u8 = 0x80; - -/// Byte that can be written for a varint of 1. -pub const ONE: u8 = 0x81; - -pub async fn async_write(w: &mut W, mut v: u64) -> std::io::Result<()> { +pub async fn async_write(w: &mut W, mut v: u64) -> smol::io::Result<()> { let mut b = [0_u8; 10]; let mut i = 0; loop { @@ -31,7 +25,7 @@ pub async fn async_write(w: &mut W, mut v: u64) -> std::i w.write_all(&b[0..i]).await } -pub async fn async_read(r: &mut R) -> std::io::Result { +pub async fn async_read(r: &mut R) -> smol::io::Result { let mut v = 0_u64; let mut buf = [0_u8; 1]; let mut pos = 0; diff --git a/zerotier-core-crypto/src/gmac.rs b/zerotier-core-crypto/src/gmac.rs new file mode 100644 index 000000000..f49fe9900 --- /dev/null +++ b/zerotier-core-crypto/src/gmac.rs @@ -0,0 +1,65 @@ +use std::io::Write; + +/// GMAC portion of AES-GCM for use as a fast plain vanilla MAC. +pub struct GMAC(gcrypt::mac::Mac); + +impl GMAC { + /// Create a new keyed GMAC instance. + /// The key may be 16, 24, or 32 bytes in length. This will panic otherwise. + pub fn new(key: &[u8]) -> GMAC { + if key.len() != 32 && key.len() != 24 && key.len() != 16 { + panic!("AES supports 128, 192, or 256 bits keys"); + } + let mut m = GMAC(gcrypt::mac::Mac::new(gcrypt::mac::Algorithm::GmacAes).unwrap()); + m.0.set_key(key).expect("GMAC set_key failed"); + m + } + + /// Reset GMAC and set nonce. + /// The nonce may be anywhere from 8 to 16 bytes in length but 12 bytes is strongly recommended. + /// It may be sequential. + #[inline(always)] + pub fn init(&mut self, nonce: &[u8]) { + let _ = self.0.reset(); + self.0.set_iv(nonce).expect("GMAC set_iv failed"); + } + + #[inline(always)] + pub fn update(&mut self, data: &[u8]) { + let _ = self.0.update(data); + } + + /// Flush GMAC and filll 'mac' with the final authentication code. + #[inline(always)] + pub fn finish(&mut self, mac: &mut [u8; 16]) { + let _ = self.0.flush(); + let _ = self.0.get_mac(mac).expect("GMAC get_mac failed"); + } +} + +/// A wrapper for GMAC with an incrementing 96-bit nonce. +/// The nonce here is incremented as a little-endian value. +/// This is for use with TCP streams. A maximum of 2^96 messages +/// should be sent or received with this, which is probably a large +/// enough limit to be safely ignored. +pub struct SequentialNonceGMAC(GMAC, u128); + +impl SequentialNonceGMAC { + #[inline(always)] + pub fn new(key: &[u8], initial_nonce: &[u8]) -> Self { + assert_eq!(initial_nonce.len(), 16); + Self(GMAC::new(key), u128::from_ne_bytes(initial_nonce.try_into().unwrap())) + } + + #[inline(always)] + pub fn init_for_next_message(&mut self) { + self.0.init(unsafe { &*(&self.1 as *const u128).cast::<[u8; 12]>() }); + self.1 = u128::from_le(self.1).wrapping_add(1).to_le(); + } + + #[inline(always)] + pub fn update(&mut self, data: &[u8]) { self.0.update(data); } + + #[inline(always)] + pub fn finish(&mut self, mac: &mut [u8; 16]) { self.0.finish(mac); } +} diff --git a/zerotier-core-crypto/src/lib.rs b/zerotier-core-crypto/src/lib.rs index f0afd3e38..7b4cdb8d0 100644 --- a/zerotier-core-crypto/src/lib.rs +++ b/zerotier-core-crypto/src/lib.rs @@ -18,6 +18,7 @@ pub mod secret; pub mod hex; pub mod varint; pub mod sidhp751; +pub mod gmac; pub use aes_gmac_siv; pub use rand_core; diff --git a/zerotier-system-service/src/main.rs b/zerotier-system-service/src/main.rs index b8ebee8c6..cda13d92d 100644 --- a/zerotier-system-service/src/main.rs +++ b/zerotier-system-service/src/main.rs @@ -6,6 +6,15 @@ * https://www.zerotier.com/ */ +use std::io::Write; +use std::str::FromStr; + +use clap::{App, Arg, ArgMatches, ErrorKind}; + +use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; + +use crate::store::platform_default_home_path; + mod fastudpsocket; mod localconfig; mod getifaddrs; @@ -16,14 +25,6 @@ mod vnic; mod service; mod utils; -use std::io::Write; -use std::str::FromStr; - -use clap::{App, Arg, ArgMatches, ErrorKind}; - -use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; -use crate::store::platform_default_home_path; - pub const HTTP_API_OBJECT_SIZE_LIMIT: usize = 131072; fn make_help(long_help: bool) -> String {