Too much to list, but mostly sync stuff.

This commit is contained in:
Adam Ierymenko 2021-12-30 18:15:32 -05:00
parent b0f7cc1238
commit 09d7e25254
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
13 changed files with 785 additions and 273 deletions

View file

@ -6,3 +6,8 @@ edition = "2021"
[dependencies]
smol = { version = "^1", features = [] }
zerotier-core-crypto = { path = "../zerotier-core-crypto" }
socket2 = "^0"
serde = { version = "^1", features = ["derive"], default-features = false }
rmp-serde = "^0"
serde_json = { version = "^1", features = ["std"], default-features = false }
parking_lot = "^0"

View file

@ -0,0 +1,36 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct Config {
/// Maximum allowed size of a protocol message.
pub max_message_size: usize,
/// TCP port to which this should bind.
pub tcp_port: u16,
/// Connection timeout in seconds.
pub io_timeout: u64,
/// A name for this replicated data set. This is just used to prevent linking to peers replicating different data.
pub domain: String,
}
impl Default for Config {
fn default() -> Self {
Self {
max_message_size: 1024 * 1024, // 1MiB
tcp_port: 19993,
io_timeout: 300, // 5 minutes
domain: String::new(),
}
}
}

View file

@ -6,13 +6,26 @@
* https://www.zerotier.com/
*/
use std::alloc::{alloc_zeroed, dealloc, Layout};
use std::mem::size_of;
use std::ptr::{slice_from_raw_parts, write_bytes, copy_nonoverlapping};
use std::ptr::slice_from_raw_parts;
use crate::IDENTITY_HASH_SIZE;
// The number of indexing sub-hashes to use, must be <= IDENTITY_HASH_SIZE / 8
const KEY_MAPPING_ITERATIONS: usize = 3;
const KEY_MAPPING_ITERATIONS: usize = 5;
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))]
#[inline(always)]
fn read_unaligned_u64(i: *const u64) -> u64 {
let mut tmp = 0_u64;
unsafe { copy_nonoverlapping(i.cast::<u8>(), (&mut tmp as *mut u64).cast(), 8) };
tmp
}
#[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))]
#[inline(always)]
fn read_unaligned_u64(i: *const u64) -> u64 { unsafe { *i } }
#[inline(always)]
fn xorshift64(mut x: u64) -> u64 {
@ -22,7 +35,7 @@ fn xorshift64(mut x: u64) -> u64 {
x
}
#[repr(packed)]
#[repr(C, packed)]
struct IBLTEntry {
key_sum: [u64; IDENTITY_HASH_SIZE / 8],
check_hash_sum: u64,
@ -30,10 +43,18 @@ struct IBLTEntry {
}
/// An IBLT (invertible bloom lookup table) specialized for reconciling sets of identity hashes.
/// This skips some extra hashing that would be necessary in a universal implementation since identity
/// hashes are already randomly distributed strong hashes.
///
/// This makes some careful use of unsafe as it's heavily optimized. It's a CPU bottleneck when
/// replicating large dynamic data sets.
pub struct IBLT {
map: Vec<IBLTEntry>,
map: *mut IBLTEntry,
buckets: usize
}
impl Drop for IBLT {
fn drop(&mut self) {
unsafe { dealloc(self.map.cast(), Layout::from_size_align(size_of::<IBLTEntry>() * self.buckets, 8).unwrap()) };
}
}
impl IBLTEntry {
@ -50,135 +71,103 @@ impl IBLTEntry {
impl IBLT {
/// Construct a new IBLT with a given capacity.
pub fn new(buckets: usize) -> Self {
assert!(buckets > 0);
assert!(buckets > 0 && buckets <= u32::MAX as usize);
Self {
map: {
let mut tmp: Vec<IBLTEntry> = Vec::with_capacity(buckets);
unsafe {
tmp.set_len(buckets);
write_bytes(tmp.as_mut_ptr().cast::<u8>(), 0, buckets * size_of::<IBLTEntry>());
}
tmp
}
map: unsafe { alloc_zeroed(Layout::from_size_align(size_of::<IBLTEntry>() * buckets, 8).unwrap()).cast() },
buckets,
}
}
/// Obtain IBLT from a byte array.
/// This returns None if the supplied bytes are not a valid IBLT.
pub fn from_bytes(b: &[u8]) -> Option<Self> {
if b.len() >= size_of::<IBLTEntry>() && (b.len() % size_of::<IBLTEntry>()) == 0 {
let buckets = b.len() / size_of::<IBLTEntry>();
Some(Self {
map: {
let mut tmp: Vec<IBLTEntry> = Vec::with_capacity(buckets);
unsafe {
tmp.set_len(buckets);
copy_nonoverlapping(b.as_ptr(), tmp.as_mut_ptr().cast::<u8>(), buckets * size_of::<IBLTEntry>());
}
tmp
}
})
} else {
None
}
}
#[inline(always)]
pub fn buckets(&self) -> usize { self.buckets }
/// Get this IBLT as a byte array that is ready to be sent over the wire.
pub fn as_bytes(&self) -> &[u8] {
unsafe { &*slice_from_raw_parts(self.map.as_ptr().cast(), size_of::<IBLTEntry>() * self.map.len()) }
}
#[inline(always)]
pub fn as_bytes(&self) -> &[u8] { unsafe { &*slice_from_raw_parts(self.map.cast::<u8>(), size_of::<IBLTEntry>() * self.buckets) } }
fn ins_rem(&mut self, key: &[u64; IDENTITY_HASH_SIZE / 8], delta: i64) {
let check_hash = u64::from_le(key[0]).wrapping_add(xorshift64(u64::from_le(key[1]))).to_le();
let buckets = self.map.len();
fn ins_rem(&mut self, key: &[u8; IDENTITY_HASH_SIZE], delta: i64) {
let key = key.as_ptr().cast::<u64>();
let (k0, k1, k2, k3, k4, k5) = (read_unaligned_u64(key.wrapping_add(0)), read_unaligned_u64(key.wrapping_add(1)), read_unaligned_u64(key.wrapping_add(2)), read_unaligned_u64(key.wrapping_add(3)), read_unaligned_u64(key.wrapping_add(4)), read_unaligned_u64(key.wrapping_add(5)));
let check_hash = u64::from_le(k0).wrapping_add(xorshift64(u64::from_le(k1))).to_le();
for mapping_sub_hash in 0..KEY_MAPPING_ITERATIONS {
let b = unsafe { self.map.get_unchecked_mut((u64::from_le(key[mapping_sub_hash]) as usize) % buckets) };
for j in 0..(IDENTITY_HASH_SIZE / 8) {
b.key_sum[j] ^= key[j];
}
let b = unsafe { &mut *self.map.wrapping_add((u64::from_le(read_unaligned_u64(key.wrapping_add(mapping_sub_hash))) as usize) % self.buckets) };
b.key_sum[0] ^= k0;
b.key_sum[1] ^= k1;
b.key_sum[2] ^= k2;
b.key_sum[3] ^= k3;
b.key_sum[4] ^= k4;
b.key_sum[5] ^= k5;
b.check_hash_sum ^= check_hash;
b.count = i64::from_le(b.count).wrapping_add(delta).to_le();
}
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64", target_arch = "powerpc64"))]
#[inline(always)]
pub fn insert(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) {
self.ins_rem(unsafe { &*key.as_ptr().cast::<[u64; IDENTITY_HASH_SIZE / 8]>() }, 1);
}
pub fn insert(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { self.ins_rem(key, 1); }
#[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64", target_arch = "powerpc64")))]
#[inline(always)]
pub fn insert(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) {
let mut tmp = [0_u64; IDENTITY_HASH_SIZE / 8];
unsafe { copy_nonoverlapping(key.as_ptr(), tmp.as_mut_ptr().cast(), IDENTITY_HASH_SIZE) };
self.ins_rem(&tmp, 1);
}
#[cfg(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64", target_arch = "powerpc64"))]
#[inline(always)]
pub fn remove(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) {
self.ins_rem(unsafe { &*key.as_ptr().cast::<[u64; IDENTITY_HASH_SIZE / 8]>() }, -1);
}
#[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64", target_arch = "powerpc64")))]
#[inline(always)]
pub fn remove(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) {
let mut tmp = [0_u64; IDENTITY_HASH_SIZE / 8];
unsafe { copy_nonoverlapping(key.as_ptr(), tmp.as_mut_ptr().cast(), IDENTITY_HASH_SIZE) };
self.ins_rem(&tmp, -1);
}
pub fn remove(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { self.ins_rem(key, -1); }
/// Subtract another IBLT from this one to compute set difference.
pub fn subtract(&mut self, other: &IBLT) {
if other.map.len() == self.map.len() {
for i in 0..self.map.len() {
let self_b = unsafe { self.map.get_unchecked_mut(i) };
let other_b = unsafe { other.map.get_unchecked(i) };
for j in 0..(IDENTITY_HASH_SIZE / 8) {
self_b.key_sum[j] ^= other_b.key_sum[j];
}
/// The other may be in the form of a raw byte array or an IBLT, which implements
/// AsRef<[u8]>. It must have the same number of buckets.
pub fn subtract<O: AsRef<[u8]>>(&mut self, other: &O) {
let other_slice = other.as_ref();
if (other_slice.len() / size_of::<IBLTEntry>()) == self.buckets {
let other_map: *const IBLTEntry = other_slice.as_ptr().cast();
for i in 0..self.buckets {
let self_b = unsafe { &mut *self.map.wrapping_add(i) };
let other_b = unsafe { &*other_map.wrapping_add(i) };
self_b.key_sum[0] ^= other_b.key_sum[0];
self_b.key_sum[1] ^= other_b.key_sum[1];
self_b.key_sum[2] ^= other_b.key_sum[2];
self_b.key_sum[3] ^= other_b.key_sum[3];
self_b.key_sum[4] ^= other_b.key_sum[4];
self_b.key_sum[5] ^= other_b.key_sum[5];
self_b.check_hash_sum ^= other_b.check_hash_sum;
self_b.count = i64::from_le(self_b.count).wrapping_sub(i64::from_le(other_b.count)).to_le();
}
}
}
/// Call a function for every value that can be extracted from this IBLT.
/// Extract every enumerable value from this IBLT.
///
/// The function is called with the key and a boolean. The boolean is meaningful
/// if this IBLT is the result of subtract(). In that case the boolean is true
/// if the "local" IBLT contained the item and false if the "remote" side contained
/// the item.
///
/// The starting_singular_bucket parameter must be the internal index of a
/// bucket with only one entry (1 or -1). It can be obtained from the return
/// values of either subtract() or singular_bucket().
pub fn list<F: FnMut(&[u8; IDENTITY_HASH_SIZE], bool) -> bool>(&mut self, mut f: F) {
let buckets = self.map.len();
let mut singular_buckets: Vec<u32> = Vec::with_capacity(buckets);
/// This consumes the IBLT instance since listing requires destructive modification
/// of the digest data.
pub fn list<F: FnMut(&[u8; IDENTITY_HASH_SIZE]) -> bool>(self, mut f: F) {
let buckets = self.buckets;
let mut singular_buckets: Vec<u32> = Vec::with_capacity(buckets + 2);
for i in 0..buckets {
if unsafe { self.map.get_unchecked(i) }.is_singular() {
singular_buckets.push(i as u32);
};
unsafe {
if (&*self.map.wrapping_add(i)).is_singular() {
singular_buckets.push(i as u32);
}
}
}
let mut key = [0_u64; IDENTITY_HASH_SIZE / 8];
while !singular_buckets.is_empty() {
let b = unsafe { self.map.get_unchecked_mut(singular_buckets.pop().unwrap() as usize) };
let mut bucket_ptr = 0;
while bucket_ptr < singular_buckets.len() {
let b = unsafe { &*self.map.wrapping_add(*singular_buckets.get_unchecked(bucket_ptr) as usize) };
bucket_ptr += 1;
if b.is_singular() {
for j in 0..(IDENTITY_HASH_SIZE / 8) {
key[j] = b.key_sum[j];
}
if f(unsafe { &*key.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() }, b.count == 1) {
key[0] = b.key_sum[0];
key[1] = b.key_sum[1];
key[2] = b.key_sum[2];
key[3] = b.key_sum[3];
key[4] = b.key_sum[4];
key[5] = b.key_sum[5];
if f(unsafe { &*key.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() }) {
let check_hash = u64::from_le(key[0]).wrapping_add(xorshift64(u64::from_le(key[1]))).to_le();
for mapping_sub_hash in 0..KEY_MAPPING_ITERATIONS {
let bi = (u64::from_le(unsafe { *key.get_unchecked(mapping_sub_hash) }) as usize) % buckets;
let b = unsafe { self.map.get_unchecked_mut(bi) };
for j in 0..(IDENTITY_HASH_SIZE / 8) {
b.key_sum[j] ^= key[j];
}
let b = unsafe { &mut *self.map.wrapping_add(bi) };
b.key_sum[0] ^= key[0];
b.key_sum[1] ^= key[1];
b.key_sum[2] ^= key[2];
b.key_sum[3] ^= key[3];
b.key_sum[4] ^= key[4];
b.key_sum[5] ^= key[5];
b.check_hash_sum ^= check_hash;
b.count = i64::from_le(b.count).wrapping_sub(1).to_le();
if b.is_singular() {
@ -193,35 +182,77 @@ impl IBLT {
}
}
impl AsRef<[u8]> for IBLT {
/// Get this IBLT in raw byte array form.
#[inline(always)]
fn as_ref(&self) -> &[u8] { self.as_bytes() }
}
#[cfg(test)]
mod tests {
use zerotier_core_crypto::hash::SHA384;
use crate::iblt::*;
#[test]
fn compiler_behavior() {
// A number of things above like unrolled key XORing must be changed if this size is changed.
assert_eq!(IDENTITY_HASH_SIZE, 48);
assert!(KEY_MAPPING_ITERATIONS <= (IDENTITY_HASH_SIZE / 8) && (IDENTITY_HASH_SIZE % 8) == 0);
// Make sure this packed struct is actually packed.
assert_eq!(size_of::<IBLTEntry>(), IDENTITY_HASH_SIZE + 8 + 8);
}
#[allow(unused_variables)]
#[test]
fn insert_and_list() {
assert_eq!(size_of::<IBLTEntry>(), IDENTITY_HASH_SIZE + 8 + 8);
assert!(KEY_MAPPING_ITERATIONS <= (IDENTITY_HASH_SIZE / 8) && (IDENTITY_HASH_SIZE % 8) == 0);
for expected_cnt in 0..800 {
let mut t = IBLT::new(1000);
for i in 0..expected_cnt {
let k = SHA384::hash(&((i + expected_cnt) as u32).to_le_bytes());
t.insert(&k);
for _ in 0..10 {
for expected_cnt in 0..768 {
let random_u64 = zerotier_core_crypto::random::xorshift64_random();
let mut t = IBLT::new(2048);
for i in 0..expected_cnt {
let k = SHA384::hash(&((i + random_u64) as u64).to_le_bytes());
t.insert(&k);
}
let mut cnt = 0;
t.list(|k| {
cnt += 1;
true
});
assert_eq!(cnt, expected_cnt);
}
let mut cnt = 0;
t.list(|k, d| {
cnt += 1;
//println!("{} {}", zerotier_core_crypto::hex::to_string(k), d);
true
});
//println!("retrieved {} keys", cnt);
assert_eq!(cnt, expected_cnt);
}
}
#[allow(unused_variables)]
#[test]
fn benchmark() {
fn set_reconciliation() {
for _ in 0..10 {
let random_u64 = zerotier_core_crypto::random::xorshift64_random();
let mut alice = IBLT::new(2048);
let mut bob = IBLT::new(2048);
let mut alice_total = 0_i32;
let mut bob_total = 0_i32;
for i in 0..1500 {
let k = SHA384::hash(&((i ^ random_u64) as u64).to_le_bytes());
if (k[0] & 1) == 1 {
alice.insert(&k);
alice_total += 1;
}
if (k[0] & 3) == 2 {
bob.insert(&k);
bob_total += 1;
}
}
alice.subtract(&bob);
let mut diff_total = 0_i32;
alice.list(|k| {
diff_total += 1;
true
});
// This is a probabilistic process so we tolerate a little bit of failure. The idea is that each
// pass reconciles more and more differences.
assert!(((alice_total + bob_total) - diff_total).abs() <= 128);
}
}
}

View file

@ -12,20 +12,8 @@ mod protocol;
mod varint;
mod memorystore;
mod iblt;
pub struct Config {
/// Number of P2P connections desired.
pub target_link_count: usize,
/// Maximum allowed size of an object.
pub max_object_size: usize,
/// TCP port to which this should bind.
pub tcp_port: u16,
/// A name for this replicated data set. This is just used to prevent linking to peers replicating different data.
pub domain: String,
}
mod config;
mod link;
pub(crate) fn ms_since_epoch() -> u64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64
@ -35,8 +23,19 @@ pub(crate) fn ms_monotonic() -> u64 {
std::time::Instant::now().elapsed().as_millis() as u64
}
#[inline(always)]
pub(crate) async fn io_timeout<T, F: smol::future::Future<Output = smol::io::Result<T>>>(d: std::time::Duration, f: F) -> smol::io::Result<T> {
smol::future::or(f, async {
let _ = smol::Timer::after(d).await;
Err(smol::io::Error::new(smol::io::ErrorKind::TimedOut, "I/O timeout"))
}).await
}
/// SHA384 is the hash currently used. Others could be supported in the future.
/// If this size changes check iblt.rs for a few things that must be changed. This
/// is checked in "cargo test."
pub const IDENTITY_HASH_SIZE: usize = 48;
pub use config::Config;
pub use store::{Store, StorePutResult};
pub use replicator::Replicator;
//pub use replicator::Replicator;

251
allthethings/src/link.rs Normal file
View file

@ -0,0 +1,251 @@
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use smol::lock::Mutex;
use smol::net::{SocketAddr, TcpStream};
use zerotier_core_crypto::gmac::SequentialNonceGMAC;
use zerotier_core_crypto::hash::SHA384;
use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384;
use zerotier_core_crypto::p521::{P521KeyPair, P521PublicKey};
use zerotier_core_crypto::secret::Secret;
use crate::{Config, io_timeout, ms_monotonic, ms_since_epoch, varint};
use crate::protocol::*;
#[inline(always)]
fn decode_msgpack<'de, T: Deserialize<'de>>(data: &'de [u8]) -> smol::io::Result<T> {
rmp_serde::from_read_ref(data).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid msgpack data"))
}
struct OutputStream {
stream: BufWriter<TcpStream>,
gmac: Option<SequentialNonceGMAC>,
}
/// A TCP link between this node and another.
pub(crate) struct Link<'a, 'b> {
node_secret: &'a P521KeyPair,
config: &'b Config,
remote_node_id: parking_lot::Mutex<Option<[u8; 48]>>,
reader: Mutex<BufReader<TcpStream>>,
writer: Mutex<OutputStream>,
pub remote_addr: SocketAddr,
pub connect_time: u64,
pub authenticated: AtomicBool,
keepalive_period: u64,
last_send_time: AtomicU64,
max_message_size: usize,
}
impl<'a, 'b> Link<'a, 'b> {
pub fn new(stream: TcpStream, remote_addr: SocketAddr, connect_time: u64, node_secret: &'a P521KeyPair, config: &'b Config) -> Self {
let _ = stream.set_nodelay(false);
let max_message_size = HELLO_SIZE_MAX.max(config.max_message_size);
Self {
node_secret,
config,
remote_node_id: parking_lot::Mutex::new(None),
reader: Mutex::new(BufReader::with_capacity((max_message_size + 16).max(16384), stream.clone())),
writer: Mutex::new(OutputStream {
stream: BufWriter::with_capacity(max_message_size + 16, stream),
gmac: None
}),
remote_addr,
connect_time,
authenticated: AtomicBool::new(false),
keepalive_period: (config.io_timeout * 1000) / 2,
last_send_time: AtomicU64::new(ms_monotonic()),
max_message_size
}
}
/// Get the remote node ID, which is SHA384(its long-term public keys).
/// Returns None if the remote node has not yet responded with HelloAck and been verified.
pub fn remote_node_id(&self) -> Option<[u8; 48]> { self.remote_node_id.lock().clone() }
/// Send message and increment outgoing GMAC nonce.
async fn write_message(&self, timeout: Duration, message_type: u8, message: &[u8]) -> smol::io::Result<()> {
let mut mac: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() };
let mt = [message_type];
let mut writer = self.writer.lock().await;
writer.gmac.as_mut().map_or_else(|| {
Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "link negotiation is not complete"))
}, |gmac| {
gmac.init_for_next_message();
gmac.update(&mt);
gmac.update(message);
gmac.finish(&mut mac);
Ok(())
})?;
io_timeout(timeout, writer.stream.write_all(&mt)).await?;
io_timeout(timeout, varint::async_write(&mut writer.stream, message.len() as u64)).await?;
io_timeout(timeout, writer.stream.write_all(message)).await?;
io_timeout(timeout, writer.stream.write_all(&mac)).await?;
io_timeout(timeout, writer.stream.flush()).await
}
/// Serialize object with msgpack and send, increment outgoing GMAC nonce.
async fn write_message_msgpack<T: Serialize>(&self, timeout: Duration, serialize_buf: &mut Vec<u8>, message_type: u8, message: &T) -> smol::io::Result<()> {
serialize_buf.clear();
rmp_serde::encode::write(serialize_buf, message).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "msgpack encode failure"))?;
self.write_message(timeout, message_type, serialize_buf.as_slice()).await
}
/// Send a keepalive if necessary.
pub async fn send_keepalive_if_needed(&self, now_monotonic: u64) {
if now_monotonic.saturating_sub(self.last_send_time.load(Ordering::Relaxed)) >= self.keepalive_period && self.authenticated.load(Ordering::Relaxed) {
self.last_send_time.store(now_monotonic, Ordering::Relaxed);
let timeout = Duration::from_secs(1);
let mut writer = self.writer.lock().await;
io_timeout(timeout, writer.stream.write_all(&[MESSAGE_TYPE_KEEPALIVE])).await;
io_timeout(timeout, writer.stream.flush()).await;
}
}
/// Launched as an async task for each new link.
pub async fn io_main(&self) -> smol::io::Result<()> {
// Reader is held here for the duration of the link's I/O loop.
let mut reader_mg = self.reader.lock().await;
let reader = &mut *reader_mg;
let mut read_buf: Vec<u8> = Vec::new();
read_buf.resize(self.max_message_size, 0);
let mut serialize_buf: Vec<u8> = Vec::with_capacity(4096);
let timeout = Duration::from_secs(self.config.io_timeout);
// (1) Send Hello and save the nonce and the hash of the raw Hello message for later HelloAck HMAC check.
let mut gmac_send_nonce_initial = [0_u8; 16];
zerotier_core_crypto::random::fill_bytes_secure(&mut gmac_send_nonce_initial);
let ephemeral_secret = P521KeyPair::generate(true).unwrap();
let sent_hello_hash = {
serialize_buf.clear();
let _ = rmp_serde::encode::write(&mut serialize_buf, &Hello {
protocol_version: PROTOCOL_VERSION,
flags: 0,
clock: ms_since_epoch(),
domain: self.config.domain.as_str(),
nonce: &gmac_send_nonce_initial,
p521_ecdh_ephemeral_key: ephemeral_secret.public_key_bytes(),
p521_ecdh_node_key: self.node_secret.public_key_bytes(),
}).unwrap();
let mut writer = self.writer.lock().await;
io_timeout(timeout, varint::async_write(&mut writer.stream, serialize_buf.len() as u64)).await?;
io_timeout(timeout, writer.stream.write_all(serialize_buf.as_slice())).await?;
io_timeout(timeout, writer.stream.flush()).await?;
drop(writer);
SHA384::hash(serialize_buf.as_slice())
};
// (2) Read other side's HELLO and send ACK. Also do key agreement, initialize GMAC, etc.
let message_size = io_timeout(timeout, varint::async_read(reader)).await? as usize;
if message_size > HELLO_SIZE_MAX {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large"));
}
let (mut gmac_receive, ack_key, remote_node_id) = {
let hello_buf = &mut read_buf.as_mut_slice()[0..message_size];
io_timeout(timeout, reader.read_exact(hello_buf)).await?;
let received_hello_hash = SHA384::hash(hello_buf); // for ACK generation
let hello: Hello = decode_msgpack(hello_buf)?;
if hello.nonce.len() < 16 || hello.protocol_version != PROTOCOL_VERSION {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid HELLO parameters"));
}
let remote_node_public_key = P521PublicKey::from_bytes(hello.p521_ecdh_node_key);
let remote_ephemeral_public_key = P521PublicKey::from_bytes(hello.p521_ecdh_ephemeral_key);
if remote_node_public_key.is_none() || remote_ephemeral_public_key.is_none() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid public key in HELLO"));
}
let node_shared_key = self.node_secret.agree(&remote_node_public_key.unwrap());
let ephemeral_shared_key = ephemeral_secret.agree(&remote_ephemeral_public_key.unwrap());
if node_shared_key.is_none() || ephemeral_shared_key.is_none() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "key agreement failed"));
}
let shared_key = Secret(SHA384::hmac(&SHA384::hash(ephemeral_shared_key.unwrap().as_bytes()), node_shared_key.unwrap().as_bytes()));
let gmac_key = zt_kbkdf_hmac_sha384(shared_key.as_bytes(), KBKDF_LABEL_GMAC, 0, 0);
let ack_key = zt_kbkdf_hmac_sha384(shared_key.as_bytes(), KBKDF_LABEL_HELLO_ACK_HMAC, 0, 0);
let gmac_receive = SequentialNonceGMAC::new(&gmac_key.0[0..32], &hello.nonce[0..16]);
self.writer.lock().await.gmac.replace(SequentialNonceGMAC::new(&gmac_key.0[0..32], &gmac_send_nonce_initial));
let ack_hmac = SHA384::hmac(ack_key.as_bytes(), &received_hello_hash);
self.write_message_msgpack(timeout, &mut serialize_buf, MESSAGE_TYPE_HELLO_ACK, &HelloAck {
ack: &ack_hmac,
clock_echo: hello.clock
}).await?;
(gmac_receive, ack_key, SHA384::hash(hello.p521_ecdh_node_key))
};
self.last_send_time.store(ms_monotonic(), Ordering::Relaxed);
// Done with ephemeral secret key, so forget it.
drop(ephemeral_secret);
// (3) Start primary I/O loop and initially listen for HelloAck to confirm the other side's node identity.
let mut received_mac_buf = [0_u8; 16];
let mut expected_mac_buf = [0_u8; 16];
let mut message_type_buf = [0_u8; 1];
let mut authenticated = false;
loop {
io_timeout(timeout, reader.read_exact(&mut message_type_buf)).await?;
// NOP is a single byte keepalive, so skip. Otherwise handle actual messages.
if message_type_buf[0] != MESSAGE_TYPE_KEEPALIVE {
let message_size = io_timeout(timeout, varint::async_read(reader)).await? as usize;
if message_size > self.max_message_size {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large"));
}
let message_buf = &mut read_buf.as_mut_slice()[0..message_size];
io_timeout(timeout, reader.read_exact(message_buf)).await?;
io_timeout(timeout, reader.read_exact(&mut received_mac_buf)).await?;
gmac_receive.init_for_next_message();
gmac_receive.update(&message_type_buf);
gmac_receive.update(message_buf);
gmac_receive.finish(&mut expected_mac_buf);
if !received_mac_buf.eq(&expected_mac_buf) {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message authentication failed"));
}
if authenticated {
match message_type_buf[0] {
MESSAGE_TYPE_HELLO_ACK => {
// Multiple HelloAck messages don't make sense.
},
MESSAGE_TYPE_OBJECTS => {},
MESSAGE_TYPE_HAVE_OBJECTS => {},
MESSAGE_TYPE_WANT_OBJECTS => {},
MESSAGE_TYPE_IBLT_SYNC_REQUEST => {},
MESSAGE_TYPE_IBLT_SYNC_DIGEST => {},
_ => {},
}
} else {
if message_type_buf[0] == MESSAGE_TYPE_HELLO_ACK {
let hello_ack: HelloAck = decode_msgpack(message_buf)?;
let expected_ack_hmac = SHA384::hmac(ack_key.as_bytes(), &sent_hello_hash);
if !hello_ack.ack.eq(&expected_ack_hmac) {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "session authentication failed"));
}
authenticated = true;
let _ = self.remote_node_id.lock().replace(remote_node_id.clone());
self.authenticated.store(true, Ordering::Relaxed);
} else {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message received before session authenticated"));
}
}
}
}
}
}

View file

@ -1,79 +1,98 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::collections::Bound::Included;
use std::collections::BTreeMap;
use std::io::Write;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use std::sync::Arc;
use smol::net::SocketAddr;
use zerotier_core_crypto::random::xorshift64_random;
use parking_lot::Mutex;
use crate::{IDENTITY_HASH_SIZE, ms_since_epoch, Store, StorePutResult};
/// A Store that stores all objects in memory, mostly for testing.
pub struct MemoryStore(Mutex<BTreeMap<[u8; IDENTITY_HASH_SIZE], Vec<u8>>>, Mutex<Vec<SocketAddr>>, AtomicU64);
pub struct MemoryStore(Mutex<BTreeMap<[u8; IDENTITY_HASH_SIZE], (Arc<[u8]>, u64)>>);
impl MemoryStore {
pub fn new() -> Self { Self(Mutex::new(BTreeMap::new()), Mutex::new(Vec::new()), AtomicU64::new(u64::MAX)) }
pub fn new() -> Self { Self(Mutex::new(BTreeMap::new())) }
}
impl Store for MemoryStore {
fn get(&self, _reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], buffer: &mut Vec<u8>) -> bool {
buffer.clear();
self.0.lock().unwrap().get(identity_hash).map_or(false, |value| {
let _ = buffer.write_all(value.as_slice());
true
type Object = Arc<[u8]>;
#[inline(always)]
fn local_time(&self) -> u64 {
ms_since_epoch()
}
fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option<Self::Object> {
self.0.lock().get(identity_hash).and_then(|o| {
if (*o).1 <= reference_time {
Some((*o).0.clone())
} else {
None
}
})
}
fn put(&self, _reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult {
fn put(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult {
let mut result = StorePutResult::Duplicate;
let _ = self.0.lock().unwrap().entry(identity_hash.clone()).or_insert_with(|| {
self.2.store(ms_since_epoch(), Ordering::Relaxed);
let _ = self.0.lock().entry(identity_hash.clone()).or_insert_with(|| {
result = StorePutResult::Ok;
object.to_vec()
(object.to_vec().into(), reference_time)
});
result
}
fn have(&self, _reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool {
self.0.lock().unwrap().contains_key(identity_hash)
fn have(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool {
self.0.lock().get(identity_hash).map_or(false, |o| (*o).1 <= reference_time)
}
fn total_count(&self, _reference_time: u64) -> Option<u64> {
Some(self.0.lock().unwrap().len() as u64)
fn total_count(&self, reference_time: u64) -> Option<u64> {
let mut tc = 0_u64;
for e in self.0.lock().iter() {
tc += ((*e.1).1 <= reference_time) as u64;
}
Some(tc)
}
fn last_object_receive_time(&self) -> Option<u64> {
let rt = self.2.load(Ordering::Relaxed);
if rt == u64::MAX {
None
} else {
Some(rt)
fn for_each<F: FnMut(&[u8], &Arc<[u8]>) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], mut f: F) {
let mut tmp: Vec<([u8; IDENTITY_HASH_SIZE], Arc<[u8]>)> = Vec::with_capacity(1024);
for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() {
if (*e.1).1 <= reference_time {
tmp.push((e.0.clone(), (*e.1).0.clone()));
}
}
for e in tmp.iter() {
if !f(&(*e).0, &(*e).1) {
break;
}
}
}
fn count(&self, _reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option<u64> {
if start.le(end) {
Some(self.0.lock().unwrap().range((Included(*start), Included(*end))).count() as u64)
} else {
None
fn for_each_identity_hash<F: FnMut(&[u8]) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], mut f: F) {
let mut tmp: Vec<[u8; IDENTITY_HASH_SIZE]> = Vec::with_capacity(1024);
for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() {
if (*e.1).1 <= reference_time {
tmp.push(e.0.clone());
}
}
for e in tmp.iter() {
if !f(e) {
break;
}
}
}
fn save_remote_endpoint(&self, to_address: &SocketAddr) {
let mut sv = self.1.lock().unwrap();
if !sv.contains(to_address) {
sv.push(to_address.clone());
}
}
fn get_remote_endpoint(&self) -> Option<SocketAddr> {
let sv = self.1.lock().unwrap();
if sv.is_empty() {
None
} else {
sv.get((xorshift64_random() as usize) % sv.len()).cloned()
fn count(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option<u64> {
let mut tc = 0_u64;
for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() {
tc += ((*e.1).1 <= reference_time) as u64;
}
Some(tc)
}
}

View file

@ -6,38 +6,138 @@
* https://www.zerotier.com/
*/
pub const PROTOCOL_VERSION: u8 = 1;
pub const HASH_ALGORITHM_SHA384: u8 = 1;
/*
* Wire protocol notes:
*
* Messages are prefixed by a type byte followed by a message size in the form
* of a variable length integer (varint). Each message is followed by a
* 16-byte GMAC message authentication code.
*
* HELLO is an exception. It's sent on connect, is prefixed only by a varint
* size, and is not followed by a MAC. Instead HelloAck is sent after it
* containing a full HMAC ACK for key negotiation.
*
* GMAC is keyed using a KBKDF-derived key from a shared key currently made
* with HKDF as HMAC(SHA384(ephemeral key), node key). The first 32 bytes of
* the key are the GMAC key while the nonce is the first 96 bytes of
* the nonce where this 96-bit integer is incremented (as little-endian)
* for each message sent. Increment should wrap at 2^96. The connection should
* close after no more than 2^96 messages, but that's a crazy long session
* anyway.
*
* The wire protocol is only authenticated to prevent network level attacks.
* Data is not encrypted since this is typically used to replicate a public
* "well known" data set and encryption would add needless overhead.
*/
pub const MESSAGE_TYPE_NOP: u8 = 0;
pub const MESSAGE_TYPE_HAVE_NEW_OBJECT: u8 = 1;
pub const MESSAGE_TYPE_OBJECT: u8 = 2;
pub const MESSAGE_TYPE_GET_OBJECTS: u8 = 3;
use serde::{Deserialize, Serialize};
/// HELLO message, which is all u8's and is packed and so can be parsed directly in place.
/// This message is sent at the start of any connection by both sides.
#[repr(packed)]
pub struct Hello {
pub hello_size: u8, // technically a varint but below 0x80
pub protocol_version: u8,
pub hash_algorithm: u8,
pub flags: [u8; 4], // u32, little endian
pub clock: [u8; 8], // u64, little endian
pub last_object_receive_time: [u8; 8], // u64, little endian, u64::MAX if unspecified
pub domain_hash: [u8; 48],
pub instance_id: [u8; 16],
pub loopback_check_code_salt: [u8; 16],
pub loopback_check_code: [u8; 16],
/// KBKDF label for the HMAC in HelloAck.
pub const KBKDF_LABEL_HELLO_ACK_HMAC: u8 = b'A';
/// KBKDF label for GMAC key derived from main key.
pub const KBKDF_LABEL_GMAC: u8 = b'G';
/// Sanity limit on the size of HELLO.
pub const HELLO_SIZE_MAX: usize = 4096;
/// Overall protocol version.
pub const PROTOCOL_VERSION: u16 = 1;
/// No operation, no payload, sent without size or MAC.
/// This is a special single byte message used for connection keepalive.
pub const MESSAGE_TYPE_KEEPALIVE: u8 = 0;
/// Acknowledgement of HELLO.
pub const MESSAGE_TYPE_HELLO_ACK: u8 = 1;
/// A series of objects with each prefixed by a varint size.
pub const MESSAGE_TYPE_OBJECTS: u8 = 2;
/// A series of identity hashes concatenated together advertising objects we have.
pub const MESSAGE_TYPE_HAVE_OBJECTS: u8 = 3;
/// A series of identity hashes concatenated together of objects being requested.
pub const MESSAGE_TYPE_WANT_OBJECTS: u8 = 4;
/// Request IBLT synchronization, payload is IBLTSyncRequest.
pub const MESSAGE_TYPE_IBLT_SYNC_REQUEST: u8 = 5;
/// IBLT sync digest, payload is IBLTSyncDigest.
pub const MESSAGE_TYPE_IBLT_SYNC_DIGEST: u8 = 6;
/// Initial message sent by both sides on TCP connection establishment.
/// This is sent with no type or message authentication code and is only
/// sent on connect. It is prefixed by a varint size.
#[derive(Deserialize, Serialize)]
pub struct Hello<'a> {
/// Local value of PROTOCOL_VERSION.
pub protocol_version: u16,
/// Flags, currently unused and always zero.
pub flags: u64,
/// Local clock in milliseconds since Unix epoch.
pub clock: u64,
/// The data set name ("domain") to which this node belongs.
pub domain: &'a str,
/// Random nonce, must be at least 16 bytes in length.
pub nonce: &'a [u8],
/// Random ephemeral ECDH session key.
pub p521_ecdh_ephemeral_key: &'a [u8],
/// Long-lived node-identifying ECDH public key.
pub p521_ecdh_node_key: &'a [u8],
}
#[cfg(test)]
mod tests {
use std::mem::size_of;
use crate::protocol::*;
#[test]
fn check_sizing() {
// Make sure packed structures are really packed.
assert_eq!(size_of::<Hello>(), 1 + 1 + 1 + 4 + 8 + 8 + 48 + 16 + 16 + 16);
}
/// Sent in response to Hello.
#[derive(Deserialize, Serialize)]
pub struct HelloAck<'a> {
/// HMAC-SHA384(KBKDF(key, KBKDF_LABEL_HELLO_ACK_HMAC), SHA384(original raw Hello))
pub ack: &'a [u8],
/// Value of clock in original hello, for measuring latency.
pub clock_echo: u64,
}
/// Request an IBLT set digest to assist with synchronization.
///
/// The peer may respond in one of three ways:
///
/// (1) It may send an IBLTSyncDigest over a range of identity hashes of its
/// choice so that the requesting node may compute a difference and request
/// objects it does not have.
///
/// (2) It may send HAVE_OBJECTS with a simple list of objects.
///
/// (3) It may simply send a batch of objects.
///
/// (4) It may not respond at all.
///
/// Which option is chosen is up to the responding node and should be chosen
/// via a heuristic to maximize sync efficiency and minimize sync time.
///
/// A central assumption is that identity hashes are uniformly distributed
/// since they are cryptographic hashes (currently SHA-384). This allows a
/// simple calculation to be made with the sending node's total count to
/// estimate set difference density across the entire hash range.
#[derive(Deserialize, Serialize)]
pub struct IBLTSyncRequest {
/// Total number of hashes in entire data set (on our side).
pub total_count: u64,
/// Our clock to use as a reference time for filtering the data set (if applicable).
pub reference_time: u64,
}
/// An IBLT digest of identity hashes over a range.
#[derive(Deserialize, Serialize)]
pub struct IBLTSyncDigest<'a> {
/// Start of range. Right-pad with zeroes if too short.
pub range_start: &'a [u8],
/// End of range. Right-pad with zeroes if too short.
pub range_end: &'a [u8],
/// IBLT digest of hashes in this range.
pub iblt: &'a [u8],
/// Number of hashes in this range.
pub count: u64,
/// Total number of hashes in entire data set.
pub total_count: u64,
/// Reference time from IBLTSyncRequest or 0 if this is being sent synthetically.
pub reference_time: u64,
}

View file

@ -6,6 +6,9 @@
* https://www.zerotier.com/
*/
/*
use std::collections::HashMap;
use std::convert::TryInto;
use std::error::Error;
@ -18,7 +21,7 @@ use std::time::Duration;
use smol::{Executor, Task, Timer};
use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use smol::lock::Mutex;
use smol::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream};
use smol::net::*;
use smol::stream::StreamExt;
use zerotier_core_crypto::hash::SHA384;
@ -51,36 +54,47 @@ struct Connection {
task: Task<()>,
}
struct ReplicatorImpl<'ex> {
struct ReplicatorImpl<'ex, S: 'static + Store> {
executor: Arc<Executor<'ex>>,
instance_id: [u8; 16],
loopback_check_code_secret: [u8; 48],
domain_hash: [u8; 48],
store: Arc<dyn Store>,
store: Arc<S>,
config: Config,
connections: Mutex<HashMap<ConnectionKey, Connection>>,
connections_in_progress: Mutex<HashMap<SocketAddr, Task<()>>>,
announced_objects_requested: Mutex<HashMap<[u8; IDENTITY_HASH_SIZE], u64>>,
}
pub struct Replicator<'ex> {
pub struct Replicator<'ex, S: 'static + Store> {
v4_listener_task: Option<Task<()>>,
v6_listener_task: Option<Task<()>>,
background_cleanup_task: Task<()>,
_impl: Arc<ReplicatorImpl<'ex>>
_impl: Arc<ReplicatorImpl<'ex, S>>,
}
impl<'ex> Replicator<'ex> {
impl<'ex, S: 'static + Store> Replicator<'ex, S> {
/// Create a new replicator to replicate the contents of the provided store.
/// All async tasks, sockets, and connections will be dropped if the replicator is dropped.
pub async fn start(executor: &Arc<Executor<'ex>>, store: Arc<dyn Store>, config: Config) -> Result<Replicator<'ex>, Box<dyn Error>> {
let listener_v4 = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)).await;
let listener_v6 = TcpListener::bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)).await;
pub async fn start(executor: &Arc<Executor<'ex>>, store: Arc<S>, config: Config) -> Result<Replicator<'ex, S>, Box<dyn Error>> {
let listener_v4 = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v4| {
let _ = v4.set_reuse_address(true);
let _ = v4.bind(&socket2::SockAddr::from(std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, config.tcp_port)))?;
let _ = v4.listen(64);
Ok(v4)
});
let listener_v6 = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v6| {
let _ = v6.set_only_v6(true);
let _ = v6.set_reuse_address(true);
let _ = v6.bind(&socket2::SockAddr::from(std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)))?;
let _ = v6.listen(64);
Ok(v6)
});
if listener_v4.is_err() && listener_v6.is_err() {
return Err(Box::new(listener_v4.unwrap_err()));
}
let r = Arc::new(ReplicatorImpl::<'ex> {
let r = Arc::new(ReplicatorImpl::<'ex, S> {
executor: executor.clone(),
instance_id: {
let mut tmp = [0_u8; 16];
@ -101,20 +115,24 @@ impl<'ex> Replicator<'ex> {
});
Ok(Self {
v4_listener_task: listener_v4.map_or(None, |listener_v4| Some(executor.spawn(r.clone().tcp_listener_task(listener_v4)))),
v6_listener_task: listener_v6.map_or(None, |listener_v6| Some(executor.spawn(r.clone().tcp_listener_task(listener_v6)))),
v4_listener_task: listener_v4.map_or(None, |listener_v4| {
Some(executor.spawn(r.clone().tcp_listener_task(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v4)).unwrap())))
}),
v6_listener_task: listener_v6.map_or(None, |listener_v6| {
Some(executor.spawn(r.clone().tcp_listener_task(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v6)).unwrap())))
}),
background_cleanup_task: executor.spawn(r.clone().background_cleanup_task()),
_impl: r
_impl: r,
})
}
}
unsafe impl<'ex> Send for Replicator<'ex> {}
unsafe impl<'ex, S: 'static + Store> Send for Replicator<'ex, S> {}
unsafe impl<'ex> Sync for Replicator<'ex> {}
unsafe impl<'ex, S: 'static + Store> Sync for Replicator<'ex, S> {}
impl<'ex> ReplicatorImpl<'ex> {
async fn background_cleanup_task(self: Arc<ReplicatorImpl<'ex>>) {
impl<'ex, S: 'static + Store> ReplicatorImpl<'ex, S> {
async fn background_cleanup_task(self: Arc<ReplicatorImpl<'ex, S>>) {
let mut timer = smol::Timer::interval(Duration::from_secs(CONNECTION_TIMEOUT_SECONDS / 10));
loop {
timer.next().await;
@ -152,7 +170,7 @@ impl<'ex> ReplicatorImpl<'ex> {
}
}
async fn tcp_listener_task(self: Arc<ReplicatorImpl<'ex>>, listener: TcpListener) {
async fn tcp_listener_task(self: Arc<ReplicatorImpl<'ex, S>>, listener: TcpListener) {
loop {
let stream = listener.accept().await;
if stream.is_ok() {
@ -166,7 +184,7 @@ impl<'ex> ReplicatorImpl<'ex> {
}
}
async fn handle_new_connection(self: Arc<ReplicatorImpl<'ex>>, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool) {
async fn handle_new_connection(self: Arc<ReplicatorImpl<'ex, S>>, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool) {
let _ = stream.set_nodelay(true);
let mut loopback_check_code_salt = [0_u8; 16];
@ -175,9 +193,8 @@ impl<'ex> ReplicatorImpl<'ex> {
hello_size: size_of::<protocol::Hello>() as u8,
protocol_version: protocol::PROTOCOL_VERSION,
hash_algorithm: protocol::HASH_ALGORITHM_SHA384,
flags: [0_u8; 4],
flags: if outgoing { protocol::HELLO_FLAG_OUTGOING } else { 0 },
clock: ms_since_epoch().to_le_bytes(),
last_object_receive_time: self.store.last_object_receive_time().unwrap_or(u64::MAX).to_le_bytes(),
domain_hash: self.domain_hash.clone(),
instance_id: self.instance_id.clone(),
loopback_check_code_salt,
@ -201,11 +218,6 @@ impl<'ex> ReplicatorImpl<'ex> {
let s2 = self.clone();
let _ = connections.entry(k).or_insert_with(move || {
let _ = stream.set_nodelay(false);
if outgoing {
s2.store.save_remote_endpoint(&remote_address);
}
let last_receive = Arc::new(AtomicU64::new(ms_monotonic()));
Connection {
remote_address,
@ -221,7 +233,7 @@ impl<'ex> ReplicatorImpl<'ex> {
let _task = self.connections_in_progress.lock().await.remove(&remote_address);
}
async fn connection_io_task(self: Arc<ReplicatorImpl<'ex>>, stream: TcpStream, remote_instance_id: [u8; 16], last_receive: Arc<AtomicU64>) {
async fn connection_io_task(self: Arc<ReplicatorImpl<'ex, S>>, stream: TcpStream, remote_instance_id: [u8; 16], last_receive: Arc<AtomicU64>) {
let mut reader = BufReader::with_capacity(65536, stream.clone());
let writer = Arc::new(Mutex::new(stream));
@ -354,6 +366,7 @@ impl<'ex> ReplicatorImpl<'ex> {
}
}
unsafe impl<'ex> Send for ReplicatorImpl<'ex> {}
unsafe impl<'ex, S: 'static + Store> Send for ReplicatorImpl<'ex, S> {}
unsafe impl<'ex> Sync for ReplicatorImpl<'ex> {}
unsafe impl<'ex, S: 'static + Store> Sync for ReplicatorImpl<'ex, S> {}
*/

View file

@ -6,13 +6,9 @@
* https://www.zerotier.com/
*/
use smol::net::SocketAddr;
use crate::IDENTITY_HASH_SIZE;
pub const MIN_IDENTITY_HASH: [u8; 48] = [0_u8; 48];
pub const MAX_IDENTITY_HASH: [u8; 48] = [0xff_u8; 48];
/// Result returned by Store::put().
pub enum StorePutResult {
/// Datum stored successfully.
Ok,
@ -26,10 +22,14 @@ pub enum StorePutResult {
/// Trait that must be implemented by the data store that is to be replicated.
pub trait Store: Sync + Send {
/// Get an object from the database, storing it in the supplied buffer.
/// A return of 'false' leaves the buffer state undefined. If the return is true any previous
/// data in the supplied buffer will have been cleared and replaced with the retrieved object.
fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], buffer: &mut Vec<u8>) -> bool;
/// Type returned by get(), which can be anything that contains a byte slice.
type Object: AsRef<[u8]>;
/// Get the local time in milliseconds since Unix epoch.
fn local_time(&self) -> u64;
/// Get an object from the database.
fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option<Self::Object>;
/// Store an entry in the database.
fn put(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult;
@ -40,18 +40,15 @@ pub trait Store: Sync + Send {
/// Get the total count of objects.
fn total_count(&self, reference_time: u64) -> Option<u64>;
/// Get the time the last object was received in milliseconds since epoch.
fn last_object_receive_time(&self) -> Option<u64>;
/// Iterate over a range of identity hashes and values.
/// This calls the supplied function for each object. If the function returns false iteration stops.
fn for_each<F: FnMut(&[u8], &Self::Object) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], f: F);
/// Iterate over a range of identity hashes.
/// This calls the supplied function for each hash. If the function returns false iteration stops.
fn for_each_identity_hash<F: FnMut(&[u8]) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], f: F);
/// Count the number of identity hash keys in this range (inclusive) of identity hashes.
/// This may return None if an error occurs, but should return 0 if the set is empty.
fn count(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option<u64>;
/// Called when a connection to a remote node was successful.
/// This is always called on successful outbound connect.
fn save_remote_endpoint(&self, to_address: &SocketAddr);
/// Get a remote endpoint to try.
/// This can return endpoints in any order and is used to try to establish outbound links.
fn get_remote_endpoint(&self) -> Option<SocketAddr>;
}

View file

@ -8,13 +8,7 @@
use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt};
/// Byte that can be written for a zero varint.
pub const ZERO: u8 = 0x80;
/// Byte that can be written for a varint of 1.
pub const ONE: u8 = 0x81;
pub async fn async_write<W: AsyncWrite + Unpin>(w: &mut W, mut v: u64) -> std::io::Result<()> {
pub async fn async_write<W: AsyncWrite + Unpin>(w: &mut W, mut v: u64) -> smol::io::Result<()> {
let mut b = [0_u8; 10];
let mut i = 0;
loop {
@ -31,7 +25,7 @@ pub async fn async_write<W: AsyncWrite + Unpin>(w: &mut W, mut v: u64) -> std::i
w.write_all(&b[0..i]).await
}
pub async fn async_read<R: AsyncRead + Unpin>(r: &mut R) -> std::io::Result<u64> {
pub async fn async_read<R: AsyncRead + Unpin>(r: &mut R) -> smol::io::Result<u64> {
let mut v = 0_u64;
let mut buf = [0_u8; 1];
let mut pos = 0;

View file

@ -0,0 +1,65 @@
use std::io::Write;
/// GMAC portion of AES-GCM for use as a fast plain vanilla MAC.
pub struct GMAC(gcrypt::mac::Mac);
impl GMAC {
/// Create a new keyed GMAC instance.
/// The key may be 16, 24, or 32 bytes in length. This will panic otherwise.
pub fn new(key: &[u8]) -> GMAC {
if key.len() != 32 && key.len() != 24 && key.len() != 16 {
panic!("AES supports 128, 192, or 256 bits keys");
}
let mut m = GMAC(gcrypt::mac::Mac::new(gcrypt::mac::Algorithm::GmacAes).unwrap());
m.0.set_key(key).expect("GMAC set_key failed");
m
}
/// Reset GMAC and set nonce.
/// The nonce may be anywhere from 8 to 16 bytes in length but 12 bytes is strongly recommended.
/// It may be sequential.
#[inline(always)]
pub fn init(&mut self, nonce: &[u8]) {
let _ = self.0.reset();
self.0.set_iv(nonce).expect("GMAC set_iv failed");
}
#[inline(always)]
pub fn update(&mut self, data: &[u8]) {
let _ = self.0.update(data);
}
/// Flush GMAC and filll 'mac' with the final authentication code.
#[inline(always)]
pub fn finish(&mut self, mac: &mut [u8; 16]) {
let _ = self.0.flush();
let _ = self.0.get_mac(mac).expect("GMAC get_mac failed");
}
}
/// A wrapper for GMAC with an incrementing 96-bit nonce.
/// The nonce here is incremented as a little-endian value.
/// This is for use with TCP streams. A maximum of 2^96 messages
/// should be sent or received with this, which is probably a large
/// enough limit to be safely ignored.
pub struct SequentialNonceGMAC(GMAC, u128);
impl SequentialNonceGMAC {
#[inline(always)]
pub fn new(key: &[u8], initial_nonce: &[u8]) -> Self {
assert_eq!(initial_nonce.len(), 16);
Self(GMAC::new(key), u128::from_ne_bytes(initial_nonce.try_into().unwrap()))
}
#[inline(always)]
pub fn init_for_next_message(&mut self) {
self.0.init(unsafe { &*(&self.1 as *const u128).cast::<[u8; 12]>() });
self.1 = u128::from_le(self.1).wrapping_add(1).to_le();
}
#[inline(always)]
pub fn update(&mut self, data: &[u8]) { self.0.update(data); }
#[inline(always)]
pub fn finish(&mut self, mac: &mut [u8; 16]) { self.0.finish(mac); }
}

View file

@ -18,6 +18,7 @@ pub mod secret;
pub mod hex;
pub mod varint;
pub mod sidhp751;
pub mod gmac;
pub use aes_gmac_siv;
pub use rand_core;

View file

@ -6,6 +6,15 @@
* https://www.zerotier.com/
*/
use std::io::Write;
use std::str::FromStr;
use clap::{App, Arg, ArgMatches, ErrorKind};
use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
use crate::store::platform_default_home_path;
mod fastudpsocket;
mod localconfig;
mod getifaddrs;
@ -16,14 +25,6 @@ mod vnic;
mod service;
mod utils;
use std::io::Write;
use std::str::FromStr;
use clap::{App, Arg, ArgMatches, ErrorKind};
use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION};
use crate::store::platform_default_home_path;
pub const HTTP_API_OBJECT_SIZE_LIMIT: usize = 131072;
fn make_help(long_help: bool) -> String {