diff --git a/allthethings/src/lib.rs b/allthethings/src/lib.rs index 6d039690d..be5b65182 100644 --- a/allthethings/src/lib.rs +++ b/allthethings/src/lib.rs @@ -15,11 +15,11 @@ mod iblt; mod config; mod link; -pub(crate) fn ms_since_epoch() -> u64 { +pub fn ms_since_epoch() -> u64 { std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64 } -pub(crate) fn ms_monotonic() -> u64 { +pub fn ms_monotonic() -> u64 { std::time::Instant::now().elapsed().as_millis() as u64 } @@ -38,4 +38,5 @@ pub const IDENTITY_HASH_SIZE: usize = 48; pub use config::Config; pub use store::{Store, StorePutResult}; +pub use memorystore::MemoryStore; //pub use replicator::Replicator; diff --git a/allthethings/src/link.rs b/allthethings/src/link.rs index 50afe1e76..f81574b8c 100644 --- a/allthethings/src/link.rs +++ b/allthethings/src/link.rs @@ -1,5 +1,6 @@ use std::mem::MaybeUninit; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::task::Poll; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -7,59 +8,72 @@ use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; use smol::lock::Mutex; use smol::net::{SocketAddr, TcpStream}; -use zerotier_core_crypto::gmac::SequentialNonceGMAC; +use zerotier_core_crypto::gmac::GMACStream; use zerotier_core_crypto::hash::SHA384; use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384; use zerotier_core_crypto::p521::{P521KeyPair, P521PublicKey}; use zerotier_core_crypto::secret::Secret; -use crate::{Config, io_timeout, ms_monotonic, ms_since_epoch, varint}; +use crate::{Config, IDENTITY_HASH_SIZE, io_timeout, Store, varint}; use crate::protocol::*; +struct Output { + stream: BufWriter, + gmac: Option, +} + +pub(crate) struct Link<'a, 'b, 'c, S: Store> { + pub remote_addr: SocketAddr, + pub connect_time: u64, + io_timeout: Duration, + node_secret: &'a P521KeyPair, + config: &'b Config, + store: &'c S, + remote_node_id: parking_lot::Mutex>, + reader: Mutex>, + writer: Mutex, + keepalive_period: u64, + last_send_time: AtomicU64, + max_message_size: usize, + authenticated: AtomicBool, +} + #[inline(always)] fn decode_msgpack<'de, T: Deserialize<'de>>(data: &'de [u8]) -> smol::io::Result { rmp_serde::from_read_ref(data).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid msgpack data")) } -struct OutputStream { - stream: BufWriter, - gmac: Option, +#[inline(always)] +fn read_id_hash(bytes: &[u8]) -> smol::io::Result<&[u8; IDENTITY_HASH_SIZE]> { + if bytes.len() >= IDENTITY_HASH_SIZE { + Ok(unsafe { &*bytes.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() }) + } else { + Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid identity hash")) + } } -/// A TCP link between this node and another. -pub(crate) struct Link<'a, 'b> { - node_secret: &'a P521KeyPair, - config: &'b Config, - remote_node_id: parking_lot::Mutex>, - reader: Mutex>, - writer: Mutex, - pub remote_addr: SocketAddr, - pub connect_time: u64, - pub authenticated: AtomicBool, - keepalive_period: u64, - last_send_time: AtomicU64, - max_message_size: usize, -} - -impl<'a, 'b> Link<'a, 'b> { - pub fn new(stream: TcpStream, remote_addr: SocketAddr, connect_time: u64, node_secret: &'a P521KeyPair, config: &'b Config) -> Self { +impl<'a, 'b, 'c, S: Store> Link<'a, 'b, 'c, S> { + pub fn new(stream: TcpStream, remote_addr: SocketAddr, connect_time: u64, node_secret: &'a P521KeyPair, config: &'b Config, store: &'c S) -> Self { let _ = stream.set_nodelay(false); let max_message_size = HELLO_SIZE_MAX.max(config.max_message_size); + let now_monotonic = store.monotonic_clock(); Self { + remote_addr, + connect_time, + io_timeout: Duration::from_secs(config.io_timeout), node_secret, config, + store, remote_node_id: parking_lot::Mutex::new(None), - reader: Mutex::new(BufReader::with_capacity((max_message_size + 16).max(16384), stream.clone())), - writer: Mutex::new(OutputStream { + reader: Mutex::new(BufReader::with_capacity(65536, stream.clone())), + writer: Mutex::new(Output { stream: BufWriter::with_capacity(max_message_size + 16, stream), gmac: None }), - remote_addr, - connect_time, - authenticated: AtomicBool::new(false), keepalive_period: (config.io_timeout * 1000) / 2, - last_send_time: AtomicU64::new(ms_monotonic()), - max_message_size + last_send_time: AtomicU64::new(now_monotonic), + max_message_size, + authenticated: AtomicBool::new(false), } } @@ -67,37 +81,6 @@ impl<'a, 'b> Link<'a, 'b> { /// Returns None if the remote node has not yet responded with HelloAck and been verified. pub fn remote_node_id(&self) -> Option<[u8; 48]> { self.remote_node_id.lock().clone() } - /// Send message and increment outgoing GMAC nonce. - async fn write_message(&self, timeout: Duration, message_type: u8, message: &[u8]) -> smol::io::Result<()> { - let mut mac: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() }; - let mt = [message_type]; - - let mut writer = self.writer.lock().await; - - writer.gmac.as_mut().map_or_else(|| { - Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "link negotiation is not complete")) - }, |gmac| { - gmac.init_for_next_message(); - gmac.update(&mt); - gmac.update(message); - gmac.finish(&mut mac); - Ok(()) - })?; - - io_timeout(timeout, writer.stream.write_all(&mt)).await?; - io_timeout(timeout, varint::async_write(&mut writer.stream, message.len() as u64)).await?; - io_timeout(timeout, writer.stream.write_all(message)).await?; - io_timeout(timeout, writer.stream.write_all(&mac)).await?; - io_timeout(timeout, writer.stream.flush()).await - } - - /// Serialize object with msgpack and send, increment outgoing GMAC nonce. - async fn write_message_msgpack(&self, timeout: Duration, serialize_buf: &mut Vec, message_type: u8, message: &T) -> smol::io::Result<()> { - serialize_buf.clear(); - rmp_serde::encode::write(serialize_buf, message).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "msgpack encode failure"))?; - self.write_message(timeout, message_type, serialize_buf.as_slice()).await - } - /// Send a keepalive if necessary. pub async fn send_keepalive_if_needed(&self, now_monotonic: u64) { if now_monotonic.saturating_sub(self.last_send_time.load(Ordering::Relaxed)) >= self.keepalive_period && self.authenticated.load(Ordering::Relaxed) { @@ -109,54 +92,88 @@ impl<'a, 'b> Link<'a, 'b> { } } - /// Launched as an async task for each new link. - pub async fn io_main(&self) -> smol::io::Result<()> { + async fn write_message(&self, message_type: u8, message: &[&[u8]]) -> smol::io::Result<()> { + let mut writer = self.writer.lock().await; + if writer.gmac.is_some() { + let mut mac: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() }; + let mt = [message_type]; + + let gmac = writer.gmac.as_mut().unwrap(); + gmac.init_for_next_message(); + gmac.update(&mt); + let mut total_length = 0_usize; + for m in message.iter() { + total_length += (*m).len(); + gmac.update(*m); + } + gmac.finish(&mut mac); + + io_timeout(self.io_timeout, writer.stream.write_all(&mt)).await?; + io_timeout(self.io_timeout, varint::async_write(&mut writer.stream, total_length as u64)).await?; + for m in message.iter() { + io_timeout(self.io_timeout, writer.stream.write_all(*m)).await?; + } + io_timeout(self.io_timeout, writer.stream.write_all(&mac)).await?; + io_timeout(self.io_timeout, writer.stream.flush()).await + } else { + Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "link negotiation is not complete")) + } + } + + async fn write_message_msgpack(&self, serialize_buf: &mut Vec, message_type: u8, message: &T) -> smol::io::Result<()> { + serialize_buf.clear(); + rmp_serde::encode::write(serialize_buf, message).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "msgpack encode failure"))?; + self.write_message(message_type, &[serialize_buf.as_slice()]).await + } + + pub(crate) async fn io_main(&self) -> smol::io::Result<()> { // Reader is held here for the duration of the link's I/O loop. let mut reader_mg = self.reader.lock().await; let reader = &mut *reader_mg; let mut read_buf: Vec = Vec::new(); read_buf.resize(self.max_message_size, 0); - let mut serialize_buf: Vec = Vec::with_capacity(4096); - let timeout = Duration::from_secs(self.config.io_timeout); + let mut tmp_buf: Vec = Vec::with_capacity(4096); // (1) Send Hello and save the nonce and the hash of the raw Hello message for later HelloAck HMAC check. - let mut gmac_send_nonce_initial = [0_u8; 16]; - zerotier_core_crypto::random::fill_bytes_secure(&mut gmac_send_nonce_initial); + let mut outgoing_nonce = [0_u8; HELLO_NONCE_SIZE]; + zerotier_core_crypto::random::fill_bytes_secure(&mut outgoing_nonce); let ephemeral_secret = P521KeyPair::generate(true).unwrap(); let sent_hello_hash = { - serialize_buf.clear(); - let _ = rmp_serde::encode::write(&mut serialize_buf, &Hello { + tmp_buf.clear(); + let _ = rmp_serde::encode::write(&mut tmp_buf, &Hello { protocol_version: PROTOCOL_VERSION, flags: 0, - clock: ms_since_epoch(), + clock: self.store.clock(), domain: self.config.domain.as_str(), - nonce: &gmac_send_nonce_initial, + nonce: &outgoing_nonce, p521_ecdh_ephemeral_key: ephemeral_secret.public_key_bytes(), p521_ecdh_node_key: self.node_secret.public_key_bytes(), }).unwrap(); let mut writer = self.writer.lock().await; - io_timeout(timeout, varint::async_write(&mut writer.stream, serialize_buf.len() as u64)).await?; - io_timeout(timeout, writer.stream.write_all(serialize_buf.as_slice())).await?; - io_timeout(timeout, writer.stream.flush()).await?; + io_timeout(self.io_timeout, varint::async_write(&mut writer.stream, tmp_buf.len() as u64)).await?; + io_timeout(self.io_timeout, writer.stream.write_all(tmp_buf.as_slice())).await?; + io_timeout(self.io_timeout, writer.stream.flush()).await?; drop(writer); - SHA384::hash(serialize_buf.as_slice()) + SHA384::hash(tmp_buf.as_slice()) }; + self.last_send_time.store(self.store.monotonic_clock(), Ordering::Relaxed); + // (2) Read other side's HELLO and send ACK. Also do key agreement, initialize GMAC, etc. - let message_size = io_timeout(timeout, varint::async_read(reader)).await? as usize; + let message_size = io_timeout(self.io_timeout, varint::async_read(reader)).await? as usize; if message_size > HELLO_SIZE_MAX { return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large")); } let (mut gmac_receive, ack_key, remote_node_id) = { let hello_buf = &mut read_buf.as_mut_slice()[0..message_size]; - io_timeout(timeout, reader.read_exact(hello_buf)).await?; + io_timeout(self.io_timeout, reader.read_exact(hello_buf)).await?; let received_hello_hash = SHA384::hash(hello_buf); // for ACK generation let hello: Hello = decode_msgpack(hello_buf)?; - if hello.nonce.len() < 16 || hello.protocol_version != PROTOCOL_VERSION { + if hello.nonce.len() < HELLO_NONCE_SIZE || hello.protocol_version != PROTOCOL_VERSION { return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid HELLO parameters")); } @@ -170,24 +187,25 @@ impl<'a, 'b> Link<'a, 'b> { if node_shared_key.is_none() || ephemeral_shared_key.is_none() { return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "key agreement failed")); } - let shared_key = Secret(SHA384::hmac(&SHA384::hash(ephemeral_shared_key.unwrap().as_bytes()), node_shared_key.unwrap().as_bytes())); + let shared_base_key = Secret(SHA384::hmac(&SHA384::hash(ephemeral_shared_key.unwrap().as_bytes()), node_shared_key.unwrap().as_bytes())); - let gmac_key = zt_kbkdf_hmac_sha384(shared_key.as_bytes(), KBKDF_LABEL_GMAC, 0, 0); - let ack_key = zt_kbkdf_hmac_sha384(shared_key.as_bytes(), KBKDF_LABEL_HELLO_ACK_HMAC, 0, 0); + let shared_gmac_base_key = zt_kbkdf_hmac_sha384(shared_base_key.as_bytes(), KBKDF_LABEL_GMAC, 0, 0); + let gmac_receive_key = Secret(SHA384::hmac(&hello.nonce[0..48], &shared_gmac_base_key.0)); + let gmac_send_key = Secret(SHA384::hmac(&outgoing_nonce[0..48], &shared_gmac_base_key.0)); + let gmac_receive = GMACStream::new(&gmac_receive_key.0[0..32], &hello.nonce[48..64]); + self.writer.lock().await.gmac.replace(GMACStream::new(&gmac_send_key.0[0..32], &outgoing_nonce[48..64])); - let gmac_receive = SequentialNonceGMAC::new(&gmac_key.0[0..32], &hello.nonce[0..16]); - self.writer.lock().await.gmac.replace(SequentialNonceGMAC::new(&gmac_key.0[0..32], &gmac_send_nonce_initial)); - - let ack_hmac = SHA384::hmac(ack_key.as_bytes(), &received_hello_hash); - self.write_message_msgpack(timeout, &mut serialize_buf, MESSAGE_TYPE_HELLO_ACK, &HelloAck { + let shared_ack_key = zt_kbkdf_hmac_sha384(shared_base_key.as_bytes(), KBKDF_LABEL_HELLO_ACK_HMAC, 0, 0); + let ack_hmac = SHA384::hmac(shared_ack_key.as_bytes(), &received_hello_hash); + self.write_message_msgpack(&mut tmp_buf, MESSAGE_TYPE_HELLO_ACK, &HelloAck { ack: &ack_hmac, clock_echo: hello.clock }).await?; - (gmac_receive, ack_key, SHA384::hash(hello.p521_ecdh_node_key)) + (gmac_receive, shared_ack_key, SHA384::hash(hello.p521_ecdh_node_key)) }; - self.last_send_time.store(ms_monotonic(), Ordering::Relaxed); + self.last_send_time.store(self.store.monotonic_clock(), Ordering::Relaxed); // Done with ephemeral secret key, so forget it. drop(ephemeral_secret); @@ -198,17 +216,15 @@ impl<'a, 'b> Link<'a, 'b> { let mut message_type_buf = [0_u8; 1]; let mut authenticated = false; loop { - io_timeout(timeout, reader.read_exact(&mut message_type_buf)).await?; - - // NOP is a single byte keepalive, so skip. Otherwise handle actual messages. + io_timeout(self.io_timeout, reader.read_exact(&mut message_type_buf)).await?; if message_type_buf[0] != MESSAGE_TYPE_KEEPALIVE { - let message_size = io_timeout(timeout, varint::async_read(reader)).await? as usize; + let message_size = io_timeout(self.io_timeout, varint::async_read(reader)).await? as usize; if message_size > self.max_message_size { return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large")); } let message_buf = &mut read_buf.as_mut_slice()[0..message_size]; - io_timeout(timeout, reader.read_exact(message_buf)).await?; - io_timeout(timeout, reader.read_exact(&mut received_mac_buf)).await?; + io_timeout(self.io_timeout, reader.read_exact(message_buf)).await?; + io_timeout(self.io_timeout, reader.read_exact(&mut received_mac_buf)).await?; gmac_receive.init_for_next_message(); gmac_receive.update(&message_type_buf); @@ -220,14 +236,11 @@ impl<'a, 'b> Link<'a, 'b> { if authenticated { match message_type_buf[0] { - MESSAGE_TYPE_HELLO_ACK => { - // Multiple HelloAck messages don't make sense. - }, - MESSAGE_TYPE_OBJECTS => {}, - MESSAGE_TYPE_HAVE_OBJECTS => {}, - MESSAGE_TYPE_WANT_OBJECTS => {}, - MESSAGE_TYPE_IBLT_SYNC_REQUEST => {}, - MESSAGE_TYPE_IBLT_SYNC_DIGEST => {}, + MESSAGE_TYPE_OBJECTS => self.do_objects(message_buf).await?, + MESSAGE_TYPE_HAVE_OBJECTS => self.do_have_objects(&mut tmp_buf, message_buf).await?, + MESSAGE_TYPE_WANT_OBJECTS => self.do_want_objects(message_buf).await?, + MESSAGE_TYPE_SYNC_REQUEST => self.do_sync_request(decode_msgpack(message_buf)?).await?, + MESSAGE_TYPE_IBLT_SYNC_DIGEST => self.do_iblt_sync_digest(decode_msgpack(message_buf)?).await?, _ => {}, } } else { @@ -248,4 +261,81 @@ impl<'a, 'b> Link<'a, 'b> { } } } + + async fn do_objects(&self, mut msg: &[u8]) -> smol::io::Result<()> { + while !msg.is_empty() { + let obj_size = varint::async_read(&mut msg).await? as usize; + if obj_size >= msg.len() { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "object incomplete")); + } + let obj = &msg[0..obj_size]; + msg = &msg[obj_size..]; + match self.store.put(&SHA384::hash(obj), obj) { + crate::StorePutResult::Invalid => { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid object in stream")); + }, + _ => {} + } + } + Ok(()) + } + + async fn do_have_objects(&self, tmp_buf: &mut Vec, mut msg: &[u8]) -> smol::io::Result<()> { + tmp_buf.clear(); + varint::write(tmp_buf, self.store.clock()); + let empty_tmp_buf_size = tmp_buf.len(); + + while !msg.is_empty() { + let id_hash = read_id_hash(msg)?; + if !self.store.have(id_hash) { + let _ = tmp_buf.write_all(id_hash); + } + msg = &msg[IDENTITY_HASH_SIZE..]; + } + + if tmp_buf.len() != empty_tmp_buf_size { + self.write_message(MESSAGE_TYPE_WANT_OBJECTS, &[tmp_buf.as_slice()]).await + } else { + Ok(()) + } + } + + async fn do_want_objects(&self, mut msg: &[u8]) -> smol::io::Result<()> { + let ref_time = varint::read(&mut msg); + if !ref_time.is_err() { + return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "object incomplete")); + } + let ref_time = ref_time.unwrap().0; + + let cap = (msg.len() / IDENTITY_HASH_SIZE) + 1; + let mut objects: Vec = Vec::with_capacity(cap); + while !msg.is_empty() { + let id_hash = read_id_hash(msg)?; + let _ = self.store.get(ref_time, id_hash).map(|obj| objects.push(obj)); + msg = &msg[IDENTITY_HASH_SIZE..]; + } + + if !objects.is_empty() { + let mut sizes: Vec = Vec::with_capacity(objects.len()); + let mut slices: Vec<&[u8]> = Vec::with_capacity(objects.len() * 2); + for o in objects.iter() { + sizes.push(varint::Encoded::from(o.as_ref().len() as u64)); + } + for i in 0..objects.len() { + slices.push(sizes.get(i).unwrap().as_ref()); + slices.push(objects.get(i).unwrap().as_ref()); + } + self.write_message(MESSAGE_TYPE_OBJECTS, slices.as_slice()).await + } else { + Ok(()) + } + } + + async fn do_sync_request(&self, sr: SyncRequest<'_>) -> smol::io::Result<()> { + Ok(()) + } + + async fn do_iblt_sync_digest(&self, sd: IBLTSyncDigest<'_>) -> smol::io::Result<()> { + Ok(()) + } } diff --git a/allthethings/src/memorystore.rs b/allthethings/src/memorystore.rs index ed0af475d..088269651 100644 --- a/allthethings/src/memorystore.rs +++ b/allthethings/src/memorystore.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use parking_lot::Mutex; -use crate::{IDENTITY_HASH_SIZE, ms_since_epoch, Store, StorePutResult}; +use crate::{IDENTITY_HASH_SIZE, ms_monotonic, ms_since_epoch, Store, StorePutResult}; /// A Store that stores all objects in memory, mostly for testing. pub struct MemoryStore(Mutex, u64)>>); @@ -25,9 +25,10 @@ impl Store for MemoryStore { type Object = Arc<[u8]>; #[inline(always)] - fn local_time(&self) -> u64 { - ms_since_epoch() - } + fn clock(&self) -> u64 { ms_since_epoch() } + + #[inline(always)] + fn monotonic_clock(&self) -> u64 { ms_monotonic() } fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option { self.0.lock().get(identity_hash).and_then(|o| { @@ -39,17 +40,17 @@ impl Store for MemoryStore { }) } - fn put(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult { + fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult { let mut result = StorePutResult::Duplicate; let _ = self.0.lock().entry(identity_hash.clone()).or_insert_with(|| { result = StorePutResult::Ok; - (object.to_vec().into(), reference_time) + (object.to_vec().into(), ms_since_epoch()) }); result } - fn have(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool { - self.0.lock().get(identity_hash).map_or(false, |o| (*o).1 <= reference_time) + fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool { + self.0.lock().contains_key(identity_hash) } fn total_count(&self, reference_time: u64) -> Option { diff --git a/allthethings/src/protocol.rs b/allthethings/src/protocol.rs index b128cf389..2b4ed5fac 100644 --- a/allthethings/src/protocol.rs +++ b/allthethings/src/protocol.rs @@ -41,6 +41,9 @@ pub const KBKDF_LABEL_GMAC: u8 = b'G'; /// Sanity limit on the size of HELLO. pub const HELLO_SIZE_MAX: usize = 4096; +/// Size of nonce sent with HELLO. +pub const HELLO_NONCE_SIZE: usize = 64; + /// Overall protocol version. pub const PROTOCOL_VERSION: u16 = 1; @@ -61,7 +64,7 @@ pub const MESSAGE_TYPE_HAVE_OBJECTS: u8 = 3; pub const MESSAGE_TYPE_WANT_OBJECTS: u8 = 4; /// Request IBLT synchronization, payload is IBLTSyncRequest. -pub const MESSAGE_TYPE_IBLT_SYNC_REQUEST: u8 = 5; +pub const MESSAGE_TYPE_SYNC_REQUEST: u8 = 5; /// IBLT sync digest, payload is IBLTSyncDigest. pub const MESSAGE_TYPE_IBLT_SYNC_DIGEST: u8 = 6; @@ -79,7 +82,7 @@ pub struct Hello<'a> { pub clock: u64, /// The data set name ("domain") to which this node belongs. pub domain: &'a str, - /// Random nonce, must be at least 16 bytes in length. + /// Random nonce, must be at least 64 bytes in length. pub nonce: &'a [u8], /// Random ephemeral ECDH session key. pub p521_ecdh_ephemeral_key: &'a [u8], @@ -87,7 +90,7 @@ pub struct Hello<'a> { pub p521_ecdh_node_key: &'a [u8], } -/// Sent in response to Hello. +/// Sent in response to Hello and contains an acknowledgement HMAC for the shared key. #[derive(Deserialize, Serialize)] pub struct HelloAck<'a> { /// HMAC-SHA384(KBKDF(key, KBKDF_LABEL_HELLO_ACK_HMAC), SHA384(original raw Hello)) @@ -118,8 +121,14 @@ pub struct HelloAck<'a> { /// simple calculation to be made with the sending node's total count to /// estimate set difference density across the entire hash range. #[derive(Deserialize, Serialize)] -pub struct IBLTSyncRequest { - /// Total number of hashes in entire data set (on our side). +pub struct SyncRequest<'a> { + /// Start of range. Right-pad with zeroes if too short. + pub range_start: &'a [u8], + /// End of range. Right-pad with zeroes if too short. + pub range_end: &'a [u8], + /// Total number of hashes in range. + pub count: u64, + /// Total number of hashes in entire data set. pub total_count: u64, /// Our clock to use as a reference time for filtering the data set (if applicable). pub reference_time: u64, @@ -138,6 +147,6 @@ pub struct IBLTSyncDigest<'a> { pub count: u64, /// Total number of hashes in entire data set. pub total_count: u64, - /// Reference time from IBLTSyncRequest or 0 if this is being sent synthetically. + /// Reference time from SyncRequest or 0 if this is being sent synthetically. pub reference_time: u64, } diff --git a/allthethings/src/store.rs b/allthethings/src/store.rs index 93d113e0b..bb6b73e92 100644 --- a/allthethings/src/store.rs +++ b/allthethings/src/store.rs @@ -25,17 +25,20 @@ pub trait Store: Sync + Send { /// Type returned by get(), which can be anything that contains a byte slice. type Object: AsRef<[u8]>; - /// Get the local time in milliseconds since Unix epoch. - fn local_time(&self) -> u64; + /// Get the current wall time in milliseconds since Unix epoch. + fn clock(&self) -> u64; + + /// Get the number of milliseconds that have elapsed since some arbitrary event in the past (like system boot). + fn monotonic_clock(&self) -> u64; /// Get an object from the database. fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option; /// Store an entry in the database. - fn put(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult; + fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult; /// Check if we have an object by its identity hash. - fn have(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool; + fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool; /// Get the total count of objects. fn total_count(&self, reference_time: u64) -> Option; diff --git a/allthethings/src/varint.rs b/allthethings/src/varint.rs index 9c83b51ea..ff48c9fc8 100644 --- a/allthethings/src/varint.rs +++ b/allthethings/src/varint.rs @@ -8,6 +8,8 @@ use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt}; +pub use zerotier_core_crypto::varint::*; + pub async fn async_write(w: &mut W, mut v: u64) -> smol::io::Result<()> { let mut b = [0_u8; 10]; let mut i = 0; diff --git a/zerotier-core-crypto/src/gmac.rs b/zerotier-core-crypto/src/gmac.rs index f49fe9900..24ac472a9 100644 --- a/zerotier-core-crypto/src/gmac.rs +++ b/zerotier-core-crypto/src/gmac.rs @@ -38,13 +38,18 @@ impl GMAC { } /// A wrapper for GMAC with an incrementing 96-bit nonce. -/// The nonce here is incremented as a little-endian value. -/// This is for use with TCP streams. A maximum of 2^96 messages -/// should be sent or received with this, which is probably a large -/// enough limit to be safely ignored. -pub struct SequentialNonceGMAC(GMAC, u128); +/// +/// This is designed for use to authenticate messages on an otherwise unencrypted +/// TCP connection. The nonce is treated as a 96-bit little-endian integer that +/// is incremented for each message. It should not be used beyond 2^96 messages +/// but that's a ludicrously large message count. +pub struct GMACStream(GMAC, u128); -impl SequentialNonceGMAC { +impl GMACStream { + /// Create a new streaming GMAC instance. + /// Key must be 16, 24, or 32 bytes in length. Initial nonce must be 16 bytes + /// in length, though only the first 12 are used. If either of these are not + /// sized properly this will panic. #[inline(always)] pub fn new(key: &[u8], initial_nonce: &[u8]) -> Self { assert_eq!(initial_nonce.len(), 16); diff --git a/zerotier-core-crypto/src/varint.rs b/zerotier-core-crypto/src/varint.rs index e8500c871..d1076c33d 100644 --- a/zerotier-core-crypto/src/varint.rs +++ b/zerotier-core-crypto/src/varint.rs @@ -8,9 +8,14 @@ use std::io::{Read, Write}; -/// Write a variable length integer, which can consume up to 10 bytes. -pub fn write(w: &mut W, mut v: u64) -> std::io::Result<()> { - let mut b = [0_u8; 10]; +pub const VARINT_MAX_SIZE_BYTES: usize = 10; + +/// Encode an integer as a varint. +/// +/// WARNING: if the supplied byte slice does not have at least 10 bytes available this may panic. +/// This is checked in debug mode by an assertion. +pub fn encode(b: &mut [u8], mut v: u64) -> usize { + debug_assert!(b.len() >= VARINT_MAX_SIZE_BYTES); let mut i = 0; loop { if v > 0x7f { @@ -23,6 +28,14 @@ pub fn write(w: &mut W, mut v: u64) -> std::io::Result<()> { break; } } + i +} + +/// Write a variable length integer, which can consume up to 10 bytes. +#[inline(always)] +pub fn write(w: &mut W, v: u64) -> std::io::Result<()> { + let mut b = [0_u8; VARINT_MAX_SIZE_BYTES]; + let i = encode(&mut b, v); w.write_all(&b[0..i]) } @@ -46,6 +59,23 @@ pub fn read(r: &mut R) -> std::io::Result<(u64, usize)> { } } +/// A container for an encoded varint. Use as_ref() to get bytes. +pub struct Encoded([u8; VARINT_MAX_SIZE_BYTES], u8); + +impl Encoded { + #[inline(always)] + pub fn from(v: u64) -> Encoded { + let mut e = Encoded([0_u8; VARINT_MAX_SIZE_BYTES], 0); + e.1 = encode(&mut e.0, v) as u8; + e + } +} + +impl AsRef<[u8]> for Encoded { + #[inline(always)] + fn as_ref(&self) -> &[u8] { &self.0[0..(self.1 as usize)] } +} + #[cfg(test)] mod tests { use crate::varint::*; diff --git a/zerotier-network-hypervisor/src/networkhypervisor.rs b/zerotier-network-hypervisor/src/networkhypervisor.rs index 14a2839ec..cd98f73bb 100644 --- a/zerotier-network-hypervisor/src/networkhypervisor.rs +++ b/zerotier-network-hypervisor/src/networkhypervisor.rs @@ -11,11 +11,11 @@ use std::sync::Arc; use std::time::Duration; use crate::error::InvalidParameterError; -use crate::vl1::{Address, Identity, Endpoint, NodeInterface, Node}; +use crate::vl1::{Address, Identity, Endpoint, VL1SystemInterface, Node}; use crate::vl2::{Switch, SwitchInterface}; use crate::{PacketBuffer, PacketBufferPool}; -pub trait Interface: NodeInterface + SwitchInterface {} +pub trait Interface: VL1SystemInterface + SwitchInterface {} pub struct NetworkHypervisor { vl1: Node, @@ -51,7 +51,7 @@ impl NetworkHypervisor { } #[inline(always)] - pub fn wire_receive(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: Option, source_local_interface: Option, mut data: PacketBuffer) { + pub fn wire_receive(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: Option, source_local_interface: Option, mut data: PacketBuffer) { self.vl1.wire_receive(ci, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) } } diff --git a/zerotier-network-hypervisor/src/vl1/ephemeral.rs b/zerotier-network-hypervisor/src/vl1/ephemeral.rs index 9217802e9..d7e4d0618 100644 --- a/zerotier-network-hypervisor/src/vl1/ephemeral.rs +++ b/zerotier-network-hypervisor/src/vl1/ephemeral.rs @@ -12,6 +12,7 @@ use std::convert::TryInto; use zerotier_core_crypto::c25519::{C25519KeyPair, C25519_PUBLIC_KEY_SIZE}; use zerotier_core_crypto::hash::{SHA384_HASH_SIZE, SHA384}; +use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384; use zerotier_core_crypto::p521::{P521KeyPair, P521_PUBLIC_KEY_SIZE, P521PublicKey}; use zerotier_core_crypto::random::SecureRandom; use zerotier_core_crypto::secret::Secret; @@ -22,12 +23,14 @@ use crate::vl1::Address; use crate::vl1::protocol::*; use crate::vl1::symmetricsecret::SymmetricSecret; +const EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE: u8 = 0x01; + /// A set of ephemeral secret key pairs. Multiple algorithms are used. -pub struct EphemeralKeyPairSet { - previous_ratchet_state: Option<[u8; 16]>, // First 128 bits of SHA384(previous ratchet secret) +pub(crate) struct EphemeralKeyPairSet { + previous_ratchet_state: Option<[u8; 16]>, // Previous state of ratchet on which this agreement should build c25519: C25519KeyPair, // Hipster DJB cryptography - p521: P521KeyPair, // US federal government cryptography - sidhp751: Option, // Post-quantum moon math cryptography + p521: P521KeyPair, // US Federal Government cryptography + sidhp751: Option, // Post-quantum moon math cryptography (not used in every ratchet tick) } impl EphemeralKeyPairSet { @@ -35,11 +38,6 @@ impl EphemeralKeyPairSet { /// /// This contains key pairs for the asymmetric key agreement algorithms used and a /// timestamp used to enforce TTL. - /// - /// SIDH is only used once per ratchet sequence because it's much more CPU intensive - /// than ECDH. The threat model for SIDH is forward secrecy on the order of 5-15 years - /// from now when a quantum computer capable of attacking elliptic curve may exist, - /// it's incredibly unlikely that a p2p link would ever persist that long. pub fn new(local_address: Address, remote_address: Address, previous_ephemeral_secret: Option<&EphemeralSymmetricSecret>) -> Self { let (sidhp751, previous_ratchet_state) = previous_ephemeral_secret.map_or_else(|| { ( @@ -48,7 +46,15 @@ impl EphemeralKeyPairSet { ) }, |previous_ephemeral_secret| { ( - None, + if previous_ephemeral_secret.ratchet_state[0] == 0 { + // We include SIDH with a probability of 1/256, which for a 5 minute re-key interval + // means SIDH will be included about every 24 hours. SIDH is slower and is intended + // to guard against long term warehousing for eventual cracking with a QC, so this + // should be good enough for that threat model. + Some(SIDHEphemeralKeyPair::generate(local_address, remote_address)) + } else { + None + }, Some(previous_ephemeral_secret.ratchet_state.clone()) ) }); @@ -67,11 +73,11 @@ impl EphemeralKeyPairSet { pub fn public_bytes(&self) -> Vec { let mut b: Vec = Vec::with_capacity(SHA384_HASH_SIZE + 8 + C25519_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE + SIDH_P751_PUBLIC_KEY_SIZE); - if self.previous_ratchet_state.is_none() { - b.push(0); // no flags - } else { - b.push(1); // flag 0x01: previous ephemeral secret hash included + if self.previous_ratchet_state.is_some() { + b.push(EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE); let _ = b.write_all(self.previous_ratchet_state.as_ref().unwrap()); + } else { + b.push(0); } b.push(EphemeralKeyAgreementAlgorithm::C25519 as u8); @@ -130,17 +136,17 @@ impl EphemeralKeyPairSet { let mut fips_compliant_exchange = false; // ends up true if last algorithm was FIPS compliant let mut other_public_bytes = other_public_bytes; - // Make sure the state of the ratchet matches on both ends. Otherwise it must restart. + // Check that the other side's ratchet state matches ours. If not the ratchet must restart. if other_public_bytes.is_empty() { return None; } - if (other_public_bytes[0] & 1) == 0 { + if (other_public_bytes[0] & EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE) == 0 { if previous_ephemeral_secret.is_some() { return None; } other_public_bytes = &other_public_bytes[1..]; } else { - if other_public_bytes.len() < 17 || previous_ephemeral_secret.map_or(false, |previous_ephemeral_secret| other_public_bytes[1..17].ne(&previous_ephemeral_secret.ratchet_state)) { + if other_public_bytes.len() < 17 || previous_ephemeral_secret.map_or(false, |previous_ephemeral_secret| other_public_bytes[1..17] != previous_ephemeral_secret.ratchet_state) { return None; } other_public_bytes = &other_public_bytes[17..]; @@ -228,10 +234,9 @@ impl EphemeralKeyPairSet { } return if it_happened { - let ratchet_state = SHA384::hash(&key.0)[0..16].try_into().unwrap(); Some(EphemeralSymmetricSecret { secret: SymmetricSecret::new(key), - ratchet_state, + ratchet_state: (&zt_kbkdf_hmac_sha384(&key.0, KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_STATE_ID, 0, 0).0[0..16]).try_into().unwrap(), rekey_time: time_ticks + EPHEMERAL_SECRET_REKEY_AFTER_TIME, expire_time: time_ticks + EPHEMERAL_SECRET_REJECT_AFTER_TIME, c25519_ratchet_count, @@ -248,16 +253,26 @@ impl EphemeralKeyPairSet { } /// Symmetric secret representing a step in the ephemeral keying ratchet. -pub struct EphemeralSymmetricSecret { +pub(crate) struct EphemeralSymmetricSecret { + /// Current ephemeral secret key. pub secret: SymmetricSecret, + /// First 16 bytes of SHA384(current ephemeral secret). ratchet_state: [u8; 16], + /// Time at or after which we should start trying to re-key. rekey_time: i64, + /// Time after which this key is no longer valid. expire_time: i64, + /// Number of C25519 agreements so far in ratchet. c25519_ratchet_count: u64, + /// Number of SIDH P-751 agreements so far in ratchet. sidhp751_ratchet_count: u64, + /// Number of NIST P-521 ECDH agreements so far in ratchet. nistp521_ratchet_count: u64, + /// Number of times this secret has been used to encrypt. encrypt_uses: AtomicU32, + /// Number of times this secret has been used to decrypt. decrypt_uses: AtomicU32, + /// True if most recent key exchange was NIST/FIPS compliant. fips_compliant_exchange: bool, } diff --git a/zerotier-network-hypervisor/src/vl1/mod.rs b/zerotier-network-hypervisor/src/vl1/mod.rs index 9b0036396..92ccd3432 100644 --- a/zerotier-network-hypervisor/src/vl1/mod.rs +++ b/zerotier-network-hypervisor/src/vl1/mod.rs @@ -32,6 +32,6 @@ pub use dictionary::Dictionary; pub use inetaddress::InetAddress; pub use peer::Peer; pub use path::Path; -pub use node::{Node, NodeInterface}; +pub use node::{Node, VL1SystemInterface}; pub use protocol::{PACKET_SIZE_MAX, PACKET_FRAGMENT_COUNT_MAX}; diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index 4aa3b50c8..d70b3a90a 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -18,9 +18,9 @@ use zerotier_core_crypto::random::{next_u64_secure, SecureRandom}; use crate::{PacketBuffer, PacketBufferFactory, PacketBufferPool}; use crate::error::InvalidParameterError; +use crate::util::buffer::Buffer; use crate::util::gate::IntervalGate; use crate::util::pool::{Pool, Pooled}; -use crate::util::buffer::Buffer; use crate::vl1::{Address, Endpoint, Identity}; use crate::vl1::path::Path; use crate::vl1::peer::Peer; @@ -28,7 +28,8 @@ use crate::vl1::protocol::*; use crate::vl1::rootset::RootSet; use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue}; -pub trait NodeInterface { +/// Trait implemented by external code to handle events and provide an interface to the system or application. +pub trait VL1SystemInterface { /// Node is up and ready for operation. fn event_node_is_up(&self); @@ -88,7 +89,7 @@ pub trait NodeInterface { /// /// This normally isn't used from outside this crate except for testing or if you want to harness VL1 /// for some entirely unrelated purpose. -pub trait VL1PacketHandler { +pub trait VL1VirtualInterface { /// Handle a packet, returning true if it belonged to VL2. /// /// If this is a VL2 packet, this must return true. True must be returned even if subsequent @@ -110,6 +111,12 @@ pub trait VL1PacketHandler { /// Handle an OK, returing true if the OK was recognized. fn handle_ok(&self, peer: &Peer, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &Buffer<{ PACKET_SIZE_MAX }>, cursor: &mut usize) -> bool; + + /// Check if this remote peer has a trust relationship with this node. + /// + /// This is checked to determine if we should do things like make direct links ore respond to + /// various other VL1 messages. + fn has_trust_relationship(&self, id: &Identity) -> bool; } #[derive(Default)] @@ -120,7 +127,6 @@ struct BackgroundTaskIntervals { } pub struct Node { - pub(crate) instance_id: u64, identity: Identity, intervals: Mutex, paths: DashMap>, @@ -134,7 +140,7 @@ pub struct Node { impl Node { /// Create a new Node. - pub fn new(ci: &I, auto_generate_identity: bool) -> Result { + pub fn new(ci: &I, auto_generate_identity: bool) -> Result { let id = { let id_str = ci.load_node_identity(); if id_str.is_none() { @@ -157,7 +163,6 @@ impl Node { }; Ok(Self { - instance_id: next_u64_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), paths: DashMap::new(), @@ -196,7 +201,7 @@ impl Node { } /// Run background tasks and return desired delay until next call in milliseconds. - pub fn do_background_tasks(&self, ci: &I) -> Duration { + pub fn do_background_tasks(&self, ci: &I) -> Duration { let mut intervals = self.intervals.lock(); let tt = ci.time_ticks(); @@ -224,7 +229,7 @@ impl Node { } /// Called when a packet is received on the physical wire. - pub fn wire_receive(&self, ci: &I, ph: &PH, source_endpoint: &Endpoint, source_local_socket: Option, source_local_interface: Option, mut data: PacketBuffer) { + pub fn wire_receive(&self, ci: &I, ph: &PH, source_endpoint: &Endpoint, source_local_socket: Option, source_local_interface: Option, mut data: PacketBuffer) { let fragment_header = data.struct_mut_at::(0); if fragment_header.is_ok() { let fragment_header = fragment_header.unwrap(); @@ -315,7 +320,7 @@ impl Node { /// This is a canonicalizing function that returns a unique path object for every tuple /// of endpoint, local socket, and local interface. pub fn path(&self, ep: &Endpoint, local_socket: Option, local_interface: Option) -> Arc { - let key = Path::local_lookup_key(ep); + let key = Path::local_lookup_key(ep, local_socket, local_interface); self.paths.get(&key).map_or_else(|| { let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface)); self.paths.insert(key, p.clone()).unwrap_or(p) // if another thread added one, return that instead diff --git a/zerotier-network-hypervisor/src/vl1/path.rs b/zerotier-network-hypervisor/src/vl1/path.rs index fd2849902..53bf0a25e 100644 --- a/zerotier-network-hypervisor/src/vl1/path.rs +++ b/zerotier-network-hypervisor/src/vl1/path.rs @@ -21,7 +21,7 @@ use crate::PacketBuffer; use crate::util::{highwayhasher, U64NoOpHasher}; use crate::vl1::Endpoint; use crate::vl1::fragmentedpacket::FragmentedPacket; -use crate::vl1::node::NodeInterface; +use crate::vl1::node::VL1SystemInterface; use crate::vl1::protocol::*; /// Keepalive interval for paths in milliseconds. @@ -171,7 +171,7 @@ impl Path { pub(crate) const CALL_EVERY_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL; #[inline(always)] - pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) { + pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) { self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION); } } diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index ff7474447..23c3b957f 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -243,7 +243,7 @@ impl Peer { /// Receive, decrypt, authenticate, and process an incoming packet from this peer. /// If the packet comes in multiple fragments, the fragments slice should contain all /// those fragments after the main packet header and first chunk. - pub(crate) fn receive(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { + pub(crate) fn receive(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option]) { let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { let mut payload: Buffer = unsafe { Buffer::new_without_memzero() }; let mut message_id = 0_u64; @@ -321,7 +321,7 @@ impl Peer { }); } - fn send_to_endpoint(&self, ci: &CI, endpoint: &Endpoint, local_socket: Option, local_interface: Option, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { + fn send_to_endpoint(&self, ci: &CI, endpoint: &Endpoint, local_socket: Option, local_interface: Option, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { debug_assert!(packet.len() <= PACKET_SIZE_MAX); debug_assert!(packet.len() >= PACKET_SIZE_MIN); match endpoint { @@ -375,7 +375,7 @@ impl Peer { /// /// This will go directly if there is an active path, or otherwise indirectly /// via a root or some other route. - pub(crate) fn send(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { + pub(crate) fn send(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { self.path(node).map_or(false, |path| { if self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), packet) { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); @@ -394,7 +394,7 @@ impl Peer { /// /// This doesn't fragment large packets since fragments are forwarded individually. /// Intermediates don't need to adjust fragmentation. - pub(crate) fn forward(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { + pub(crate) fn forward(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { self.direct_path().map_or(false, |path| { if ci.wire_send(path.endpoint(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) { self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); @@ -410,7 +410,7 @@ impl Peer { /// /// If explicit_endpoint is not None the packet will be sent directly to this endpoint. /// Otherwise it will be sent via the best direct or indirect path known. - pub(crate) fn send_hello(&self, ci: &CI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { + pub(crate) fn send_hello(&self, ci: &CI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { let mut packet: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new(); let time_ticks = ci.time_ticks(); @@ -481,13 +481,13 @@ impl Peer { /// Called every INTERVAL during background tasks. #[inline(always)] - pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) {} + pub(crate) fn call_every_interval(&self, ct: &CI, time_ticks: i64) {} #[inline(always)] - fn receive_hello(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_hello(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_error(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { + fn receive_error(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { let mut cursor: usize = 0; let _ = payload.read_struct::(&mut cursor).map(|error_header| { let in_re_message_id = u64::from_ne_bytes(error_header.in_re_message_id); @@ -503,7 +503,7 @@ impl Peer { } #[inline(always)] - fn receive_ok(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { + fn receive_ok(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { let mut cursor: usize = 0; let _ = payload.read_struct::(&mut cursor).map(|ok_header| { let in_re_message_id = u64::from_ne_bytes(ok_header.in_re_message_id); @@ -523,19 +523,19 @@ impl Peer { } #[inline(always)] - fn receive_whois(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_whois(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_rendezvous(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_rendezvous(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_echo(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_echo(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_push_direct_paths(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_push_direct_paths(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} #[inline(always)] - fn receive_user_message(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} + fn receive_user_message(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} /// Get current best path or None if there are no direct paths to this peer. #[inline(always)] diff --git a/zerotier-network-hypervisor/src/vl1/protocol.rs b/zerotier-network-hypervisor/src/vl1/protocol.rs index 6da39813d..b1969f8bf 100644 --- a/zerotier-network-hypervisor/src/vl1/protocol.rs +++ b/zerotier-network-hypervisor/src/vl1/protocol.rs @@ -39,7 +39,10 @@ pub const KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K0: u8 = b'0'; pub const KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K1: u8 = b'1'; /// KBKDF usage label for acknowledgement of a shared secret. -pub const KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET: u8 = b'E'; +pub const KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET: u8 = b'e'; + +/// KBKDF usage label for generating the ratchet state ID (which is not actually a key). +pub const KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_STATE_ID: u8 = b'E'; /// Try to re-key ephemeral keys after this time. pub const EPHEMERAL_SECRET_REKEY_AFTER_TIME: i64 = 300000; // 5 minutes diff --git a/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs b/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs index 9e91da95e..f5a325607 100644 --- a/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs +++ b/zerotier-network-hypervisor/src/vl1/symmetricsecret.rs @@ -16,7 +16,8 @@ use zerotier_core_crypto::secret::Secret; use crate::util::pool::{Pool, PoolFactory}; use crate::vl1::protocol::*; -pub struct AesGmacSivPoolFactory(Secret, Secret); +/// Pool of reusable AES-GMAC-SIV instances. +pub(crate) struct AesGmacSivPoolFactory(Secret, Secret); impl PoolFactory for AesGmacSivPoolFactory { #[inline(always)] @@ -29,7 +30,7 @@ impl PoolFactory for AesGmacSivPoolFactory { /// A symmetric secret key negotiated between peers. /// /// This contains the key and several sub-keys and ciphers keyed with sub-keys. -pub struct SymmetricSecret { +pub(crate) struct SymmetricSecret { /// The root shared symmetric secret from which other keys are derived. pub key: Secret, diff --git a/zerotier-network-hypervisor/src/vl1/whoisqueue.rs b/zerotier-network-hypervisor/src/vl1/whoisqueue.rs index 3ef843edd..24ce83cc9 100644 --- a/zerotier-network-hypervisor/src/vl1/whoisqueue.rs +++ b/zerotier-network-hypervisor/src/vl1/whoisqueue.rs @@ -13,7 +13,7 @@ use parking_lot::Mutex; use crate::util::gate::IntervalGate; use crate::vl1::Address; use crate::vl1::fragmentedpacket::FragmentedPacket; -use crate::vl1::node::{Node, NodeInterface}; +use crate::vl1::node::{Node, VL1SystemInterface}; use crate::vl1::protocol::{WHOIS_RETRY_INTERVAL, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_MAX}; use crate::PacketBuffer; @@ -36,7 +36,7 @@ impl WhoisQueue { pub fn new() -> Self { Self(Mutex::new(HashMap::new())) } /// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received. - pub fn query(&self, node: &Node, ci: &CI, target: Address, packet: Option) { + pub fn query(&self, node: &Node, ci: &CI, target: Address, packet: Option) { let mut q = self.0.lock(); let qi = q.entry(target).or_insert_with(|| WhoisQueueItem { @@ -64,7 +64,7 @@ impl WhoisQueue { } /// Called every INTERVAL during background tasks. - pub fn call_every_interval(&self, node: &Node, ci: &CI, time_ticks: i64) { + pub fn call_every_interval(&self, node: &Node, ci: &CI, time_ticks: i64) { let mut targets: Vec
= Vec::new(); self.0.lock().retain(|target, qi| { if qi.retry_count < WHOIS_RETRY_MAX { @@ -82,7 +82,7 @@ impl WhoisQueue { } } - fn send_whois(&self, node: &Node, ci: &CI, targets: &[Address]) { + fn send_whois(&self, node: &Node, ci: &CI, targets: &[Address]) { todo!() } } diff --git a/zerotier-network-hypervisor/src/vl2/switch.rs b/zerotier-network-hypervisor/src/vl2/switch.rs index f2d29fe84..fbcfcf928 100644 --- a/zerotier-network-hypervisor/src/vl2/switch.rs +++ b/zerotier-network-hypervisor/src/vl2/switch.rs @@ -9,8 +9,8 @@ use std::sync::Arc; use crate::util::buffer::Buffer; -use crate::vl1::node::VL1PacketHandler; -use crate::vl1::{Peer, Path}; +use crate::vl1::node::VL1VirtualInterface; +use crate::vl1::{Peer, Path, Identity}; use crate::vl1::protocol::*; pub trait SwitchInterface { @@ -19,7 +19,7 @@ pub trait SwitchInterface { pub struct Switch { } -impl VL1PacketHandler for Switch { +impl VL1VirtualInterface for Switch { fn handle_packet(&self, peer: &Peer, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { false } @@ -31,6 +31,10 @@ impl VL1PacketHandler for Switch { fn handle_ok(&self, peer: &Peer, source_path: &Arc, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &Buffer<{ PACKET_SIZE_MAX }>, cursor: &mut usize) -> bool { false } + + fn has_trust_relationship(&self, id: &Identity) -> bool { + true + } } impl Switch { diff --git a/zerotier-system-service/src/service.rs b/zerotier-system-service/src/service.rs index 76668f089..ade02d8d5 100644 --- a/zerotier-system-service/src/service.rs +++ b/zerotier-system-service/src/service.rs @@ -13,7 +13,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use parking_lot::Mutex; use zerotier_network_hypervisor::{Interface, NetworkHypervisor}; -use zerotier_network_hypervisor::vl1::{Endpoint, Identity, IdentityType, Node, NodeInterface}; +use zerotier_network_hypervisor::vl1::{Endpoint, Identity, IdentityType, Node, VL1SystemInterface}; use zerotier_network_hypervisor::vl2::SwitchInterface; use crate::fastudpsocket::{fast_udp_socket_sendto, FastUDPRawOsSocket}; @@ -31,7 +31,7 @@ struct ServiceInterface { pub all_sockets_spin_ptr: AtomicUsize, } -impl NodeInterface for ServiceInterface { +impl VL1SystemInterface for ServiceInterface { fn event_node_is_up(&self) {} fn event_node_is_down(&self) {}