Much more Rust work on sync and hypervisor.

This commit is contained in:
Adam Ierymenko 2022-01-07 11:26:12 -05:00
parent 09d7e25254
commit 86c366f1bd
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
19 changed files with 360 additions and 191 deletions

View file

@ -15,11 +15,11 @@ mod iblt;
mod config;
mod link;
pub(crate) fn ms_since_epoch() -> u64 {
pub fn ms_since_epoch() -> u64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64
}
pub(crate) fn ms_monotonic() -> u64 {
pub fn ms_monotonic() -> u64 {
std::time::Instant::now().elapsed().as_millis() as u64
}
@ -38,4 +38,5 @@ pub const IDENTITY_HASH_SIZE: usize = 48;
pub use config::Config;
pub use store::{Store, StorePutResult};
pub use memorystore::MemoryStore;
//pub use replicator::Replicator;

View file

@ -1,5 +1,6 @@
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::task::Poll;
use std::time::Duration;
use serde::{Deserialize, Serialize};
@ -7,59 +8,72 @@ use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use smol::lock::Mutex;
use smol::net::{SocketAddr, TcpStream};
use zerotier_core_crypto::gmac::SequentialNonceGMAC;
use zerotier_core_crypto::gmac::GMACStream;
use zerotier_core_crypto::hash::SHA384;
use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384;
use zerotier_core_crypto::p521::{P521KeyPair, P521PublicKey};
use zerotier_core_crypto::secret::Secret;
use crate::{Config, io_timeout, ms_monotonic, ms_since_epoch, varint};
use crate::{Config, IDENTITY_HASH_SIZE, io_timeout, Store, varint};
use crate::protocol::*;
struct Output {
stream: BufWriter<TcpStream>,
gmac: Option<GMACStream>,
}
pub(crate) struct Link<'a, 'b, 'c, S: Store> {
pub remote_addr: SocketAddr,
pub connect_time: u64,
io_timeout: Duration,
node_secret: &'a P521KeyPair,
config: &'b Config,
store: &'c S,
remote_node_id: parking_lot::Mutex<Option<[u8; 48]>>,
reader: Mutex<BufReader<TcpStream>>,
writer: Mutex<Output>,
keepalive_period: u64,
last_send_time: AtomicU64,
max_message_size: usize,
authenticated: AtomicBool,
}
#[inline(always)]
fn decode_msgpack<'de, T: Deserialize<'de>>(data: &'de [u8]) -> smol::io::Result<T> {
rmp_serde::from_read_ref(data).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid msgpack data"))
}
struct OutputStream {
stream: BufWriter<TcpStream>,
gmac: Option<SequentialNonceGMAC>,
#[inline(always)]
fn read_id_hash(bytes: &[u8]) -> smol::io::Result<&[u8; IDENTITY_HASH_SIZE]> {
if bytes.len() >= IDENTITY_HASH_SIZE {
Ok(unsafe { &*bytes.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() })
} else {
Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid identity hash"))
}
}
/// A TCP link between this node and another.
pub(crate) struct Link<'a, 'b> {
node_secret: &'a P521KeyPair,
config: &'b Config,
remote_node_id: parking_lot::Mutex<Option<[u8; 48]>>,
reader: Mutex<BufReader<TcpStream>>,
writer: Mutex<OutputStream>,
pub remote_addr: SocketAddr,
pub connect_time: u64,
pub authenticated: AtomicBool,
keepalive_period: u64,
last_send_time: AtomicU64,
max_message_size: usize,
}
impl<'a, 'b> Link<'a, 'b> {
pub fn new(stream: TcpStream, remote_addr: SocketAddr, connect_time: u64, node_secret: &'a P521KeyPair, config: &'b Config) -> Self {
impl<'a, 'b, 'c, S: Store> Link<'a, 'b, 'c, S> {
pub fn new(stream: TcpStream, remote_addr: SocketAddr, connect_time: u64, node_secret: &'a P521KeyPair, config: &'b Config, store: &'c S) -> Self {
let _ = stream.set_nodelay(false);
let max_message_size = HELLO_SIZE_MAX.max(config.max_message_size);
let now_monotonic = store.monotonic_clock();
Self {
remote_addr,
connect_time,
io_timeout: Duration::from_secs(config.io_timeout),
node_secret,
config,
store,
remote_node_id: parking_lot::Mutex::new(None),
reader: Mutex::new(BufReader::with_capacity((max_message_size + 16).max(16384), stream.clone())),
writer: Mutex::new(OutputStream {
reader: Mutex::new(BufReader::with_capacity(65536, stream.clone())),
writer: Mutex::new(Output {
stream: BufWriter::with_capacity(max_message_size + 16, stream),
gmac: None
}),
remote_addr,
connect_time,
authenticated: AtomicBool::new(false),
keepalive_period: (config.io_timeout * 1000) / 2,
last_send_time: AtomicU64::new(ms_monotonic()),
max_message_size
last_send_time: AtomicU64::new(now_monotonic),
max_message_size,
authenticated: AtomicBool::new(false),
}
}
@ -67,37 +81,6 @@ impl<'a, 'b> Link<'a, 'b> {
/// Returns None if the remote node has not yet responded with HelloAck and been verified.
pub fn remote_node_id(&self) -> Option<[u8; 48]> { self.remote_node_id.lock().clone() }
/// Send message and increment outgoing GMAC nonce.
async fn write_message(&self, timeout: Duration, message_type: u8, message: &[u8]) -> smol::io::Result<()> {
let mut mac: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() };
let mt = [message_type];
let mut writer = self.writer.lock().await;
writer.gmac.as_mut().map_or_else(|| {
Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "link negotiation is not complete"))
}, |gmac| {
gmac.init_for_next_message();
gmac.update(&mt);
gmac.update(message);
gmac.finish(&mut mac);
Ok(())
})?;
io_timeout(timeout, writer.stream.write_all(&mt)).await?;
io_timeout(timeout, varint::async_write(&mut writer.stream, message.len() as u64)).await?;
io_timeout(timeout, writer.stream.write_all(message)).await?;
io_timeout(timeout, writer.stream.write_all(&mac)).await?;
io_timeout(timeout, writer.stream.flush()).await
}
/// Serialize object with msgpack and send, increment outgoing GMAC nonce.
async fn write_message_msgpack<T: Serialize>(&self, timeout: Duration, serialize_buf: &mut Vec<u8>, message_type: u8, message: &T) -> smol::io::Result<()> {
serialize_buf.clear();
rmp_serde::encode::write(serialize_buf, message).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "msgpack encode failure"))?;
self.write_message(timeout, message_type, serialize_buf.as_slice()).await
}
/// Send a keepalive if necessary.
pub async fn send_keepalive_if_needed(&self, now_monotonic: u64) {
if now_monotonic.saturating_sub(self.last_send_time.load(Ordering::Relaxed)) >= self.keepalive_period && self.authenticated.load(Ordering::Relaxed) {
@ -109,54 +92,88 @@ impl<'a, 'b> Link<'a, 'b> {
}
}
/// Launched as an async task for each new link.
pub async fn io_main(&self) -> smol::io::Result<()> {
async fn write_message(&self, message_type: u8, message: &[&[u8]]) -> smol::io::Result<()> {
let mut writer = self.writer.lock().await;
if writer.gmac.is_some() {
let mut mac: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() };
let mt = [message_type];
let gmac = writer.gmac.as_mut().unwrap();
gmac.init_for_next_message();
gmac.update(&mt);
let mut total_length = 0_usize;
for m in message.iter() {
total_length += (*m).len();
gmac.update(*m);
}
gmac.finish(&mut mac);
io_timeout(self.io_timeout, writer.stream.write_all(&mt)).await?;
io_timeout(self.io_timeout, varint::async_write(&mut writer.stream, total_length as u64)).await?;
for m in message.iter() {
io_timeout(self.io_timeout, writer.stream.write_all(*m)).await?;
}
io_timeout(self.io_timeout, writer.stream.write_all(&mac)).await?;
io_timeout(self.io_timeout, writer.stream.flush()).await
} else {
Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "link negotiation is not complete"))
}
}
async fn write_message_msgpack<T: Serialize>(&self, serialize_buf: &mut Vec<u8>, message_type: u8, message: &T) -> smol::io::Result<()> {
serialize_buf.clear();
rmp_serde::encode::write(serialize_buf, message).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "msgpack encode failure"))?;
self.write_message(message_type, &[serialize_buf.as_slice()]).await
}
pub(crate) async fn io_main(&self) -> smol::io::Result<()> {
// Reader is held here for the duration of the link's I/O loop.
let mut reader_mg = self.reader.lock().await;
let reader = &mut *reader_mg;
let mut read_buf: Vec<u8> = Vec::new();
read_buf.resize(self.max_message_size, 0);
let mut serialize_buf: Vec<u8> = Vec::with_capacity(4096);
let timeout = Duration::from_secs(self.config.io_timeout);
let mut tmp_buf: Vec<u8> = Vec::with_capacity(4096);
// (1) Send Hello and save the nonce and the hash of the raw Hello message for later HelloAck HMAC check.
let mut gmac_send_nonce_initial = [0_u8; 16];
zerotier_core_crypto::random::fill_bytes_secure(&mut gmac_send_nonce_initial);
let mut outgoing_nonce = [0_u8; HELLO_NONCE_SIZE];
zerotier_core_crypto::random::fill_bytes_secure(&mut outgoing_nonce);
let ephemeral_secret = P521KeyPair::generate(true).unwrap();
let sent_hello_hash = {
serialize_buf.clear();
let _ = rmp_serde::encode::write(&mut serialize_buf, &Hello {
tmp_buf.clear();
let _ = rmp_serde::encode::write(&mut tmp_buf, &Hello {
protocol_version: PROTOCOL_VERSION,
flags: 0,
clock: ms_since_epoch(),
clock: self.store.clock(),
domain: self.config.domain.as_str(),
nonce: &gmac_send_nonce_initial,
nonce: &outgoing_nonce,
p521_ecdh_ephemeral_key: ephemeral_secret.public_key_bytes(),
p521_ecdh_node_key: self.node_secret.public_key_bytes(),
}).unwrap();
let mut writer = self.writer.lock().await;
io_timeout(timeout, varint::async_write(&mut writer.stream, serialize_buf.len() as u64)).await?;
io_timeout(timeout, writer.stream.write_all(serialize_buf.as_slice())).await?;
io_timeout(timeout, writer.stream.flush()).await?;
io_timeout(self.io_timeout, varint::async_write(&mut writer.stream, tmp_buf.len() as u64)).await?;
io_timeout(self.io_timeout, writer.stream.write_all(tmp_buf.as_slice())).await?;
io_timeout(self.io_timeout, writer.stream.flush()).await?;
drop(writer);
SHA384::hash(serialize_buf.as_slice())
SHA384::hash(tmp_buf.as_slice())
};
self.last_send_time.store(self.store.monotonic_clock(), Ordering::Relaxed);
// (2) Read other side's HELLO and send ACK. Also do key agreement, initialize GMAC, etc.
let message_size = io_timeout(timeout, varint::async_read(reader)).await? as usize;
let message_size = io_timeout(self.io_timeout, varint::async_read(reader)).await? as usize;
if message_size > HELLO_SIZE_MAX {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large"));
}
let (mut gmac_receive, ack_key, remote_node_id) = {
let hello_buf = &mut read_buf.as_mut_slice()[0..message_size];
io_timeout(timeout, reader.read_exact(hello_buf)).await?;
io_timeout(self.io_timeout, reader.read_exact(hello_buf)).await?;
let received_hello_hash = SHA384::hash(hello_buf); // for ACK generation
let hello: Hello = decode_msgpack(hello_buf)?;
if hello.nonce.len() < 16 || hello.protocol_version != PROTOCOL_VERSION {
if hello.nonce.len() < HELLO_NONCE_SIZE || hello.protocol_version != PROTOCOL_VERSION {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid HELLO parameters"));
}
@ -170,24 +187,25 @@ impl<'a, 'b> Link<'a, 'b> {
if node_shared_key.is_none() || ephemeral_shared_key.is_none() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "key agreement failed"));
}
let shared_key = Secret(SHA384::hmac(&SHA384::hash(ephemeral_shared_key.unwrap().as_bytes()), node_shared_key.unwrap().as_bytes()));
let shared_base_key = Secret(SHA384::hmac(&SHA384::hash(ephemeral_shared_key.unwrap().as_bytes()), node_shared_key.unwrap().as_bytes()));
let gmac_key = zt_kbkdf_hmac_sha384(shared_key.as_bytes(), KBKDF_LABEL_GMAC, 0, 0);
let ack_key = zt_kbkdf_hmac_sha384(shared_key.as_bytes(), KBKDF_LABEL_HELLO_ACK_HMAC, 0, 0);
let shared_gmac_base_key = zt_kbkdf_hmac_sha384(shared_base_key.as_bytes(), KBKDF_LABEL_GMAC, 0, 0);
let gmac_receive_key = Secret(SHA384::hmac(&hello.nonce[0..48], &shared_gmac_base_key.0));
let gmac_send_key = Secret(SHA384::hmac(&outgoing_nonce[0..48], &shared_gmac_base_key.0));
let gmac_receive = GMACStream::new(&gmac_receive_key.0[0..32], &hello.nonce[48..64]);
self.writer.lock().await.gmac.replace(GMACStream::new(&gmac_send_key.0[0..32], &outgoing_nonce[48..64]));
let gmac_receive = SequentialNonceGMAC::new(&gmac_key.0[0..32], &hello.nonce[0..16]);
self.writer.lock().await.gmac.replace(SequentialNonceGMAC::new(&gmac_key.0[0..32], &gmac_send_nonce_initial));
let ack_hmac = SHA384::hmac(ack_key.as_bytes(), &received_hello_hash);
self.write_message_msgpack(timeout, &mut serialize_buf, MESSAGE_TYPE_HELLO_ACK, &HelloAck {
let shared_ack_key = zt_kbkdf_hmac_sha384(shared_base_key.as_bytes(), KBKDF_LABEL_HELLO_ACK_HMAC, 0, 0);
let ack_hmac = SHA384::hmac(shared_ack_key.as_bytes(), &received_hello_hash);
self.write_message_msgpack(&mut tmp_buf, MESSAGE_TYPE_HELLO_ACK, &HelloAck {
ack: &ack_hmac,
clock_echo: hello.clock
}).await?;
(gmac_receive, ack_key, SHA384::hash(hello.p521_ecdh_node_key))
(gmac_receive, shared_ack_key, SHA384::hash(hello.p521_ecdh_node_key))
};
self.last_send_time.store(ms_monotonic(), Ordering::Relaxed);
self.last_send_time.store(self.store.monotonic_clock(), Ordering::Relaxed);
// Done with ephemeral secret key, so forget it.
drop(ephemeral_secret);
@ -198,17 +216,15 @@ impl<'a, 'b> Link<'a, 'b> {
let mut message_type_buf = [0_u8; 1];
let mut authenticated = false;
loop {
io_timeout(timeout, reader.read_exact(&mut message_type_buf)).await?;
// NOP is a single byte keepalive, so skip. Otherwise handle actual messages.
io_timeout(self.io_timeout, reader.read_exact(&mut message_type_buf)).await?;
if message_type_buf[0] != MESSAGE_TYPE_KEEPALIVE {
let message_size = io_timeout(timeout, varint::async_read(reader)).await? as usize;
let message_size = io_timeout(self.io_timeout, varint::async_read(reader)).await? as usize;
if message_size > self.max_message_size {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large"));
}
let message_buf = &mut read_buf.as_mut_slice()[0..message_size];
io_timeout(timeout, reader.read_exact(message_buf)).await?;
io_timeout(timeout, reader.read_exact(&mut received_mac_buf)).await?;
io_timeout(self.io_timeout, reader.read_exact(message_buf)).await?;
io_timeout(self.io_timeout, reader.read_exact(&mut received_mac_buf)).await?;
gmac_receive.init_for_next_message();
gmac_receive.update(&message_type_buf);
@ -220,14 +236,11 @@ impl<'a, 'b> Link<'a, 'b> {
if authenticated {
match message_type_buf[0] {
MESSAGE_TYPE_HELLO_ACK => {
// Multiple HelloAck messages don't make sense.
},
MESSAGE_TYPE_OBJECTS => {},
MESSAGE_TYPE_HAVE_OBJECTS => {},
MESSAGE_TYPE_WANT_OBJECTS => {},
MESSAGE_TYPE_IBLT_SYNC_REQUEST => {},
MESSAGE_TYPE_IBLT_SYNC_DIGEST => {},
MESSAGE_TYPE_OBJECTS => self.do_objects(message_buf).await?,
MESSAGE_TYPE_HAVE_OBJECTS => self.do_have_objects(&mut tmp_buf, message_buf).await?,
MESSAGE_TYPE_WANT_OBJECTS => self.do_want_objects(message_buf).await?,
MESSAGE_TYPE_SYNC_REQUEST => self.do_sync_request(decode_msgpack(message_buf)?).await?,
MESSAGE_TYPE_IBLT_SYNC_DIGEST => self.do_iblt_sync_digest(decode_msgpack(message_buf)?).await?,
_ => {},
}
} else {
@ -248,4 +261,81 @@ impl<'a, 'b> Link<'a, 'b> {
}
}
}
async fn do_objects(&self, mut msg: &[u8]) -> smol::io::Result<()> {
while !msg.is_empty() {
let obj_size = varint::async_read(&mut msg).await? as usize;
if obj_size >= msg.len() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "object incomplete"));
}
let obj = &msg[0..obj_size];
msg = &msg[obj_size..];
match self.store.put(&SHA384::hash(obj), obj) {
crate::StorePutResult::Invalid => {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid object in stream"));
},
_ => {}
}
}
Ok(())
}
async fn do_have_objects(&self, tmp_buf: &mut Vec<u8>, mut msg: &[u8]) -> smol::io::Result<()> {
tmp_buf.clear();
varint::write(tmp_buf, self.store.clock());
let empty_tmp_buf_size = tmp_buf.len();
while !msg.is_empty() {
let id_hash = read_id_hash(msg)?;
if !self.store.have(id_hash) {
let _ = tmp_buf.write_all(id_hash);
}
msg = &msg[IDENTITY_HASH_SIZE..];
}
if tmp_buf.len() != empty_tmp_buf_size {
self.write_message(MESSAGE_TYPE_WANT_OBJECTS, &[tmp_buf.as_slice()]).await
} else {
Ok(())
}
}
async fn do_want_objects(&self, mut msg: &[u8]) -> smol::io::Result<()> {
let ref_time = varint::read(&mut msg);
if !ref_time.is_err() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "object incomplete"));
}
let ref_time = ref_time.unwrap().0;
let cap = (msg.len() / IDENTITY_HASH_SIZE) + 1;
let mut objects: Vec<S::Object> = Vec::with_capacity(cap);
while !msg.is_empty() {
let id_hash = read_id_hash(msg)?;
let _ = self.store.get(ref_time, id_hash).map(|obj| objects.push(obj));
msg = &msg[IDENTITY_HASH_SIZE..];
}
if !objects.is_empty() {
let mut sizes: Vec<varint::Encoded> = Vec::with_capacity(objects.len());
let mut slices: Vec<&[u8]> = Vec::with_capacity(objects.len() * 2);
for o in objects.iter() {
sizes.push(varint::Encoded::from(o.as_ref().len() as u64));
}
for i in 0..objects.len() {
slices.push(sizes.get(i).unwrap().as_ref());
slices.push(objects.get(i).unwrap().as_ref());
}
self.write_message(MESSAGE_TYPE_OBJECTS, slices.as_slice()).await
} else {
Ok(())
}
}
async fn do_sync_request(&self, sr: SyncRequest<'_>) -> smol::io::Result<()> {
Ok(())
}
async fn do_iblt_sync_digest(&self, sd: IBLTSyncDigest<'_>) -> smol::io::Result<()> {
Ok(())
}
}

View file

@ -12,7 +12,7 @@ use std::sync::Arc;
use parking_lot::Mutex;
use crate::{IDENTITY_HASH_SIZE, ms_since_epoch, Store, StorePutResult};
use crate::{IDENTITY_HASH_SIZE, ms_monotonic, ms_since_epoch, Store, StorePutResult};
/// A Store that stores all objects in memory, mostly for testing.
pub struct MemoryStore(Mutex<BTreeMap<[u8; IDENTITY_HASH_SIZE], (Arc<[u8]>, u64)>>);
@ -25,9 +25,10 @@ impl Store for MemoryStore {
type Object = Arc<[u8]>;
#[inline(always)]
fn local_time(&self) -> u64 {
ms_since_epoch()
}
fn clock(&self) -> u64 { ms_since_epoch() }
#[inline(always)]
fn monotonic_clock(&self) -> u64 { ms_monotonic() }
fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option<Self::Object> {
self.0.lock().get(identity_hash).and_then(|o| {
@ -39,17 +40,17 @@ impl Store for MemoryStore {
})
}
fn put(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult {
fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult {
let mut result = StorePutResult::Duplicate;
let _ = self.0.lock().entry(identity_hash.clone()).or_insert_with(|| {
result = StorePutResult::Ok;
(object.to_vec().into(), reference_time)
(object.to_vec().into(), ms_since_epoch())
});
result
}
fn have(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool {
self.0.lock().get(identity_hash).map_or(false, |o| (*o).1 <= reference_time)
fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool {
self.0.lock().contains_key(identity_hash)
}
fn total_count(&self, reference_time: u64) -> Option<u64> {

View file

@ -41,6 +41,9 @@ pub const KBKDF_LABEL_GMAC: u8 = b'G';
/// Sanity limit on the size of HELLO.
pub const HELLO_SIZE_MAX: usize = 4096;
/// Size of nonce sent with HELLO.
pub const HELLO_NONCE_SIZE: usize = 64;
/// Overall protocol version.
pub const PROTOCOL_VERSION: u16 = 1;
@ -61,7 +64,7 @@ pub const MESSAGE_TYPE_HAVE_OBJECTS: u8 = 3;
pub const MESSAGE_TYPE_WANT_OBJECTS: u8 = 4;
/// Request IBLT synchronization, payload is IBLTSyncRequest.
pub const MESSAGE_TYPE_IBLT_SYNC_REQUEST: u8 = 5;
pub const MESSAGE_TYPE_SYNC_REQUEST: u8 = 5;
/// IBLT sync digest, payload is IBLTSyncDigest.
pub const MESSAGE_TYPE_IBLT_SYNC_DIGEST: u8 = 6;
@ -79,7 +82,7 @@ pub struct Hello<'a> {
pub clock: u64,
/// The data set name ("domain") to which this node belongs.
pub domain: &'a str,
/// Random nonce, must be at least 16 bytes in length.
/// Random nonce, must be at least 64 bytes in length.
pub nonce: &'a [u8],
/// Random ephemeral ECDH session key.
pub p521_ecdh_ephemeral_key: &'a [u8],
@ -87,7 +90,7 @@ pub struct Hello<'a> {
pub p521_ecdh_node_key: &'a [u8],
}
/// Sent in response to Hello.
/// Sent in response to Hello and contains an acknowledgement HMAC for the shared key.
#[derive(Deserialize, Serialize)]
pub struct HelloAck<'a> {
/// HMAC-SHA384(KBKDF(key, KBKDF_LABEL_HELLO_ACK_HMAC), SHA384(original raw Hello))
@ -118,8 +121,14 @@ pub struct HelloAck<'a> {
/// simple calculation to be made with the sending node's total count to
/// estimate set difference density across the entire hash range.
#[derive(Deserialize, Serialize)]
pub struct IBLTSyncRequest {
/// Total number of hashes in entire data set (on our side).
pub struct SyncRequest<'a> {
/// Start of range. Right-pad with zeroes if too short.
pub range_start: &'a [u8],
/// End of range. Right-pad with zeroes if too short.
pub range_end: &'a [u8],
/// Total number of hashes in range.
pub count: u64,
/// Total number of hashes in entire data set.
pub total_count: u64,
/// Our clock to use as a reference time for filtering the data set (if applicable).
pub reference_time: u64,
@ -138,6 +147,6 @@ pub struct IBLTSyncDigest<'a> {
pub count: u64,
/// Total number of hashes in entire data set.
pub total_count: u64,
/// Reference time from IBLTSyncRequest or 0 if this is being sent synthetically.
/// Reference time from SyncRequest or 0 if this is being sent synthetically.
pub reference_time: u64,
}

View file

@ -25,17 +25,20 @@ pub trait Store: Sync + Send {
/// Type returned by get(), which can be anything that contains a byte slice.
type Object: AsRef<[u8]>;
/// Get the local time in milliseconds since Unix epoch.
fn local_time(&self) -> u64;
/// Get the current wall time in milliseconds since Unix epoch.
fn clock(&self) -> u64;
/// Get the number of milliseconds that have elapsed since some arbitrary event in the past (like system boot).
fn monotonic_clock(&self) -> u64;
/// Get an object from the database.
fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option<Self::Object>;
/// Store an entry in the database.
fn put(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult;
fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult;
/// Check if we have an object by its identity hash.
fn have(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool;
fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool;
/// Get the total count of objects.
fn total_count(&self, reference_time: u64) -> Option<u64>;

View file

@ -8,6 +8,8 @@
use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt};
pub use zerotier_core_crypto::varint::*;
pub async fn async_write<W: AsyncWrite + Unpin>(w: &mut W, mut v: u64) -> smol::io::Result<()> {
let mut b = [0_u8; 10];
let mut i = 0;

View file

@ -38,13 +38,18 @@ impl GMAC {
}
/// A wrapper for GMAC with an incrementing 96-bit nonce.
/// The nonce here is incremented as a little-endian value.
/// This is for use with TCP streams. A maximum of 2^96 messages
/// should be sent or received with this, which is probably a large
/// enough limit to be safely ignored.
pub struct SequentialNonceGMAC(GMAC, u128);
///
/// This is designed for use to authenticate messages on an otherwise unencrypted
/// TCP connection. The nonce is treated as a 96-bit little-endian integer that
/// is incremented for each message. It should not be used beyond 2^96 messages
/// but that's a ludicrously large message count.
pub struct GMACStream(GMAC, u128);
impl SequentialNonceGMAC {
impl GMACStream {
/// Create a new streaming GMAC instance.
/// Key must be 16, 24, or 32 bytes in length. Initial nonce must be 16 bytes
/// in length, though only the first 12 are used. If either of these are not
/// sized properly this will panic.
#[inline(always)]
pub fn new(key: &[u8], initial_nonce: &[u8]) -> Self {
assert_eq!(initial_nonce.len(), 16);

View file

@ -8,9 +8,14 @@
use std::io::{Read, Write};
/// Write a variable length integer, which can consume up to 10 bytes.
pub fn write<W: Write>(w: &mut W, mut v: u64) -> std::io::Result<()> {
let mut b = [0_u8; 10];
pub const VARINT_MAX_SIZE_BYTES: usize = 10;
/// Encode an integer as a varint.
///
/// WARNING: if the supplied byte slice does not have at least 10 bytes available this may panic.
/// This is checked in debug mode by an assertion.
pub fn encode(b: &mut [u8], mut v: u64) -> usize {
debug_assert!(b.len() >= VARINT_MAX_SIZE_BYTES);
let mut i = 0;
loop {
if v > 0x7f {
@ -23,6 +28,14 @@ pub fn write<W: Write>(w: &mut W, mut v: u64) -> std::io::Result<()> {
break;
}
}
i
}
/// Write a variable length integer, which can consume up to 10 bytes.
#[inline(always)]
pub fn write<W: Write>(w: &mut W, v: u64) -> std::io::Result<()> {
let mut b = [0_u8; VARINT_MAX_SIZE_BYTES];
let i = encode(&mut b, v);
w.write_all(&b[0..i])
}
@ -46,6 +59,23 @@ pub fn read<R: Read>(r: &mut R) -> std::io::Result<(u64, usize)> {
}
}
/// A container for an encoded varint. Use as_ref() to get bytes.
pub struct Encoded([u8; VARINT_MAX_SIZE_BYTES], u8);
impl Encoded {
#[inline(always)]
pub fn from(v: u64) -> Encoded {
let mut e = Encoded([0_u8; VARINT_MAX_SIZE_BYTES], 0);
e.1 = encode(&mut e.0, v) as u8;
e
}
}
impl AsRef<[u8]> for Encoded {
#[inline(always)]
fn as_ref(&self) -> &[u8] { &self.0[0..(self.1 as usize)] }
}
#[cfg(test)]
mod tests {
use crate::varint::*;

View file

@ -11,11 +11,11 @@ use std::sync::Arc;
use std::time::Duration;
use crate::error::InvalidParameterError;
use crate::vl1::{Address, Identity, Endpoint, NodeInterface, Node};
use crate::vl1::{Address, Identity, Endpoint, VL1SystemInterface, Node};
use crate::vl2::{Switch, SwitchInterface};
use crate::{PacketBuffer, PacketBufferPool};
pub trait Interface: NodeInterface + SwitchInterface {}
pub trait Interface: VL1SystemInterface + SwitchInterface {}
pub struct NetworkHypervisor {
vl1: Node,
@ -51,7 +51,7 @@ impl NetworkHypervisor {
}
#[inline(always)]
pub fn wire_receive<CI: NodeInterface>(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: Option<NonZeroI64>, source_local_interface: Option<NonZeroI64>, mut data: PacketBuffer) {
pub fn wire_receive<CI: VL1SystemInterface>(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: Option<NonZeroI64>, source_local_interface: Option<NonZeroI64>, mut data: PacketBuffer) {
self.vl1.wire_receive(ci, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data)
}
}

View file

@ -12,6 +12,7 @@ use std::convert::TryInto;
use zerotier_core_crypto::c25519::{C25519KeyPair, C25519_PUBLIC_KEY_SIZE};
use zerotier_core_crypto::hash::{SHA384_HASH_SIZE, SHA384};
use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384;
use zerotier_core_crypto::p521::{P521KeyPair, P521_PUBLIC_KEY_SIZE, P521PublicKey};
use zerotier_core_crypto::random::SecureRandom;
use zerotier_core_crypto::secret::Secret;
@ -22,12 +23,14 @@ use crate::vl1::Address;
use crate::vl1::protocol::*;
use crate::vl1::symmetricsecret::SymmetricSecret;
const EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE: u8 = 0x01;
/// A set of ephemeral secret key pairs. Multiple algorithms are used.
pub struct EphemeralKeyPairSet {
previous_ratchet_state: Option<[u8; 16]>, // First 128 bits of SHA384(previous ratchet secret)
pub(crate) struct EphemeralKeyPairSet {
previous_ratchet_state: Option<[u8; 16]>, // Previous state of ratchet on which this agreement should build
c25519: C25519KeyPair, // Hipster DJB cryptography
p521: P521KeyPair, // US federal government cryptography
sidhp751: Option<SIDHEphemeralKeyPair>, // Post-quantum moon math cryptography
p521: P521KeyPair, // US Federal Government cryptography
sidhp751: Option<SIDHEphemeralKeyPair>, // Post-quantum moon math cryptography (not used in every ratchet tick)
}
impl EphemeralKeyPairSet {
@ -35,11 +38,6 @@ impl EphemeralKeyPairSet {
///
/// This contains key pairs for the asymmetric key agreement algorithms used and a
/// timestamp used to enforce TTL.
///
/// SIDH is only used once per ratchet sequence because it's much more CPU intensive
/// than ECDH. The threat model for SIDH is forward secrecy on the order of 5-15 years
/// from now when a quantum computer capable of attacking elliptic curve may exist,
/// it's incredibly unlikely that a p2p link would ever persist that long.
pub fn new(local_address: Address, remote_address: Address, previous_ephemeral_secret: Option<&EphemeralSymmetricSecret>) -> Self {
let (sidhp751, previous_ratchet_state) = previous_ephemeral_secret.map_or_else(|| {
(
@ -48,7 +46,15 @@ impl EphemeralKeyPairSet {
)
}, |previous_ephemeral_secret| {
(
None,
if previous_ephemeral_secret.ratchet_state[0] == 0 {
// We include SIDH with a probability of 1/256, which for a 5 minute re-key interval
// means SIDH will be included about every 24 hours. SIDH is slower and is intended
// to guard against long term warehousing for eventual cracking with a QC, so this
// should be good enough for that threat model.
Some(SIDHEphemeralKeyPair::generate(local_address, remote_address))
} else {
None
},
Some(previous_ephemeral_secret.ratchet_state.clone())
)
});
@ -67,11 +73,11 @@ impl EphemeralKeyPairSet {
pub fn public_bytes(&self) -> Vec<u8> {
let mut b: Vec<u8> = Vec::with_capacity(SHA384_HASH_SIZE + 8 + C25519_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE + SIDH_P751_PUBLIC_KEY_SIZE);
if self.previous_ratchet_state.is_none() {
b.push(0); // no flags
} else {
b.push(1); // flag 0x01: previous ephemeral secret hash included
if self.previous_ratchet_state.is_some() {
b.push(EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE);
let _ = b.write_all(self.previous_ratchet_state.as_ref().unwrap());
} else {
b.push(0);
}
b.push(EphemeralKeyAgreementAlgorithm::C25519 as u8);
@ -130,17 +136,17 @@ impl EphemeralKeyPairSet {
let mut fips_compliant_exchange = false; // ends up true if last algorithm was FIPS compliant
let mut other_public_bytes = other_public_bytes;
// Make sure the state of the ratchet matches on both ends. Otherwise it must restart.
// Check that the other side's ratchet state matches ours. If not the ratchet must restart.
if other_public_bytes.is_empty() {
return None;
}
if (other_public_bytes[0] & 1) == 0 {
if (other_public_bytes[0] & EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE) == 0 {
if previous_ephemeral_secret.is_some() {
return None;
}
other_public_bytes = &other_public_bytes[1..];
} else {
if other_public_bytes.len() < 17 || previous_ephemeral_secret.map_or(false, |previous_ephemeral_secret| other_public_bytes[1..17].ne(&previous_ephemeral_secret.ratchet_state)) {
if other_public_bytes.len() < 17 || previous_ephemeral_secret.map_or(false, |previous_ephemeral_secret| other_public_bytes[1..17] != previous_ephemeral_secret.ratchet_state) {
return None;
}
other_public_bytes = &other_public_bytes[17..];
@ -228,10 +234,9 @@ impl EphemeralKeyPairSet {
}
return if it_happened {
let ratchet_state = SHA384::hash(&key.0)[0..16].try_into().unwrap();
Some(EphemeralSymmetricSecret {
secret: SymmetricSecret::new(key),
ratchet_state,
ratchet_state: (&zt_kbkdf_hmac_sha384(&key.0, KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_STATE_ID, 0, 0).0[0..16]).try_into().unwrap(),
rekey_time: time_ticks + EPHEMERAL_SECRET_REKEY_AFTER_TIME,
expire_time: time_ticks + EPHEMERAL_SECRET_REJECT_AFTER_TIME,
c25519_ratchet_count,
@ -248,16 +253,26 @@ impl EphemeralKeyPairSet {
}
/// Symmetric secret representing a step in the ephemeral keying ratchet.
pub struct EphemeralSymmetricSecret {
pub(crate) struct EphemeralSymmetricSecret {
/// Current ephemeral secret key.
pub secret: SymmetricSecret,
/// First 16 bytes of SHA384(current ephemeral secret).
ratchet_state: [u8; 16],
/// Time at or after which we should start trying to re-key.
rekey_time: i64,
/// Time after which this key is no longer valid.
expire_time: i64,
/// Number of C25519 agreements so far in ratchet.
c25519_ratchet_count: u64,
/// Number of SIDH P-751 agreements so far in ratchet.
sidhp751_ratchet_count: u64,
/// Number of NIST P-521 ECDH agreements so far in ratchet.
nistp521_ratchet_count: u64,
/// Number of times this secret has been used to encrypt.
encrypt_uses: AtomicU32,
/// Number of times this secret has been used to decrypt.
decrypt_uses: AtomicU32,
/// True if most recent key exchange was NIST/FIPS compliant.
fips_compliant_exchange: bool,
}

View file

@ -32,6 +32,6 @@ pub use dictionary::Dictionary;
pub use inetaddress::InetAddress;
pub use peer::Peer;
pub use path::Path;
pub use node::{Node, NodeInterface};
pub use node::{Node, VL1SystemInterface};
pub use protocol::{PACKET_SIZE_MAX, PACKET_FRAGMENT_COUNT_MAX};

View file

@ -18,9 +18,9 @@ use zerotier_core_crypto::random::{next_u64_secure, SecureRandom};
use crate::{PacketBuffer, PacketBufferFactory, PacketBufferPool};
use crate::error::InvalidParameterError;
use crate::util::buffer::Buffer;
use crate::util::gate::IntervalGate;
use crate::util::pool::{Pool, Pooled};
use crate::util::buffer::Buffer;
use crate::vl1::{Address, Endpoint, Identity};
use crate::vl1::path::Path;
use crate::vl1::peer::Peer;
@ -28,7 +28,8 @@ use crate::vl1::protocol::*;
use crate::vl1::rootset::RootSet;
use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue};
pub trait NodeInterface {
/// Trait implemented by external code to handle events and provide an interface to the system or application.
pub trait VL1SystemInterface {
/// Node is up and ready for operation.
fn event_node_is_up(&self);
@ -88,7 +89,7 @@ pub trait NodeInterface {
///
/// This normally isn't used from outside this crate except for testing or if you want to harness VL1
/// for some entirely unrelated purpose.
pub trait VL1PacketHandler {
pub trait VL1VirtualInterface {
/// Handle a packet, returning true if it belonged to VL2.
///
/// If this is a VL2 packet, this must return true. True must be returned even if subsequent
@ -110,6 +111,12 @@ pub trait VL1PacketHandler {
/// Handle an OK, returing true if the OK was recognized.
fn handle_ok(&self, peer: &Peer, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &Buffer<{ PACKET_SIZE_MAX }>, cursor: &mut usize) -> bool;
/// Check if this remote peer has a trust relationship with this node.
///
/// This is checked to determine if we should do things like make direct links ore respond to
/// various other VL1 messages.
fn has_trust_relationship(&self, id: &Identity) -> bool;
}
#[derive(Default)]
@ -120,7 +127,6 @@ struct BackgroundTaskIntervals {
}
pub struct Node {
pub(crate) instance_id: u64,
identity: Identity,
intervals: Mutex<BackgroundTaskIntervals>,
paths: DashMap<u128, Arc<Path>>,
@ -134,7 +140,7 @@ pub struct Node {
impl Node {
/// Create a new Node.
pub fn new<I: NodeInterface>(ci: &I, auto_generate_identity: bool) -> Result<Self, InvalidParameterError> {
pub fn new<I: VL1SystemInterface>(ci: &I, auto_generate_identity: bool) -> Result<Self, InvalidParameterError> {
let id = {
let id_str = ci.load_node_identity();
if id_str.is_none() {
@ -157,7 +163,6 @@ impl Node {
};
Ok(Self {
instance_id: next_u64_secure(),
identity: id,
intervals: Mutex::new(BackgroundTaskIntervals::default()),
paths: DashMap::new(),
@ -196,7 +201,7 @@ impl Node {
}
/// Run background tasks and return desired delay until next call in milliseconds.
pub fn do_background_tasks<I: NodeInterface>(&self, ci: &I) -> Duration {
pub fn do_background_tasks<I: VL1SystemInterface>(&self, ci: &I) -> Duration {
let mut intervals = self.intervals.lock();
let tt = ci.time_ticks();
@ -224,7 +229,7 @@ impl Node {
}
/// Called when a packet is received on the physical wire.
pub fn wire_receive<I: NodeInterface, PH: VL1PacketHandler>(&self, ci: &I, ph: &PH, source_endpoint: &Endpoint, source_local_socket: Option<NonZeroI64>, source_local_interface: Option<NonZeroI64>, mut data: PacketBuffer) {
pub fn wire_receive<I: VL1SystemInterface, PH: VL1VirtualInterface>(&self, ci: &I, ph: &PH, source_endpoint: &Endpoint, source_local_socket: Option<NonZeroI64>, source_local_interface: Option<NonZeroI64>, mut data: PacketBuffer) {
let fragment_header = data.struct_mut_at::<FragmentHeader>(0);
if fragment_header.is_ok() {
let fragment_header = fragment_header.unwrap();
@ -315,7 +320,7 @@ impl Node {
/// This is a canonicalizing function that returns a unique path object for every tuple
/// of endpoint, local socket, and local interface.
pub fn path(&self, ep: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> Arc<Path> {
let key = Path::local_lookup_key(ep);
let key = Path::local_lookup_key(ep, local_socket, local_interface);
self.paths.get(&key).map_or_else(|| {
let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface));
self.paths.insert(key, p.clone()).unwrap_or(p) // if another thread added one, return that instead

View file

@ -21,7 +21,7 @@ use crate::PacketBuffer;
use crate::util::{highwayhasher, U64NoOpHasher};
use crate::vl1::Endpoint;
use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::node::NodeInterface;
use crate::vl1::node::VL1SystemInterface;
use crate::vl1::protocol::*;
/// Keepalive interval for paths in milliseconds.
@ -171,7 +171,7 @@ impl Path {
pub(crate) const CALL_EVERY_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL;
#[inline(always)]
pub(crate) fn call_every_interval<CI: NodeInterface>(&self, ct: &CI, time_ticks: i64) {
pub(crate) fn call_every_interval<CI: VL1SystemInterface>(&self, ct: &CI, time_ticks: i64) {
self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION);
}
}

View file

@ -243,7 +243,7 @@ impl Peer {
/// Receive, decrypt, authenticate, and process an incoming packet from this peer.
/// If the packet comes in multiple fragments, the fragments slice should contain all
/// those fragments after the main packet header and first chunk.
pub(crate) fn receive<CI: NodeInterface, PH: VL1PacketHandler>(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) {
pub(crate) fn receive<CI: VL1SystemInterface, PH: VL1VirtualInterface>(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) {
let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| {
let mut payload: Buffer<PACKET_SIZE_MAX> = unsafe { Buffer::new_without_memzero() };
let mut message_id = 0_u64;
@ -321,7 +321,7 @@ impl Peer {
});
}
fn send_to_endpoint<CI: NodeInterface>(&self, ci: &CI, endpoint: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
fn send_to_endpoint<CI: VL1SystemInterface>(&self, ci: &CI, endpoint: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
debug_assert!(packet.len() <= PACKET_SIZE_MAX);
debug_assert!(packet.len() >= PACKET_SIZE_MIN);
match endpoint {
@ -375,7 +375,7 @@ impl Peer {
///
/// This will go directly if there is an active path, or otherwise indirectly
/// via a root or some other route.
pub(crate) fn send<CI: NodeInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
pub(crate) fn send<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
self.path(node).map_or(false, |path| {
if self.send_to_endpoint(ci, path.endpoint(), path.local_socket(), path.local_interface(), packet) {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
@ -394,7 +394,7 @@ impl Peer {
///
/// This doesn't fragment large packets since fragments are forwarded individually.
/// Intermediates don't need to adjust fragmentation.
pub(crate) fn forward<CI: NodeInterface>(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
pub(crate) fn forward<CI: VL1SystemInterface>(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
self.direct_path().map_or(false, |path| {
if ci.wire_send(path.endpoint(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) {
self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
@ -410,7 +410,7 @@ impl Peer {
///
/// If explicit_endpoint is not None the packet will be sent directly to this endpoint.
/// Otherwise it will be sent via the best direct or indirect path known.
pub(crate) fn send_hello<CI: NodeInterface>(&self, ci: &CI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool {
pub(crate) fn send_hello<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool {
let mut packet: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new();
let time_ticks = ci.time_ticks();
@ -481,13 +481,13 @@ impl Peer {
/// Called every INTERVAL during background tasks.
#[inline(always)]
pub(crate) fn call_every_interval<CI: NodeInterface>(&self, ct: &CI, time_ticks: i64) {}
pub(crate) fn call_every_interval<CI: VL1SystemInterface>(&self, ct: &CI, time_ticks: i64) {}
#[inline(always)]
fn receive_hello<CI: NodeInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
fn receive_hello<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)]
fn receive_error<CI: NodeInterface, PH: VL1PacketHandler>(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) {
fn receive_error<CI: VL1SystemInterface, PH: VL1VirtualInterface>(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) {
let mut cursor: usize = 0;
let _ = payload.read_struct::<message_component_structs::ErrorHeader>(&mut cursor).map(|error_header| {
let in_re_message_id = u64::from_ne_bytes(error_header.in_re_message_id);
@ -503,7 +503,7 @@ impl Peer {
}
#[inline(always)]
fn receive_ok<CI: NodeInterface, PH: VL1PacketHandler>(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) {
fn receive_ok<CI: VL1SystemInterface, PH: VL1VirtualInterface>(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) {
let mut cursor: usize = 0;
let _ = payload.read_struct::<message_component_structs::OkHeader>(&mut cursor).map(|ok_header| {
let in_re_message_id = u64::from_ne_bytes(ok_header.in_re_message_id);
@ -523,19 +523,19 @@ impl Peer {
}
#[inline(always)]
fn receive_whois<CI: NodeInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
fn receive_whois<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)]
fn receive_rendezvous<CI: NodeInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
fn receive_rendezvous<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)]
fn receive_echo<CI: NodeInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
fn receive_echo<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)]
fn receive_push_direct_paths<CI: NodeInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
fn receive_push_direct_paths<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)]
fn receive_user_message<CI: NodeInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
fn receive_user_message<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
/// Get current best path or None if there are no direct paths to this peer.
#[inline(always)]

View file

@ -39,7 +39,10 @@ pub const KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K0: u8 = b'0';
pub const KBKDF_KEY_USAGE_LABEL_AES_GMAC_SIV_K1: u8 = b'1';
/// KBKDF usage label for acknowledgement of a shared secret.
pub const KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET: u8 = b'E';
pub const KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET: u8 = b'e';
/// KBKDF usage label for generating the ratchet state ID (which is not actually a key).
pub const KBKDF_KEY_USAGE_LABEL_EPHEMERAL_RATCHET_STATE_ID: u8 = b'E';
/// Try to re-key ephemeral keys after this time.
pub const EPHEMERAL_SECRET_REKEY_AFTER_TIME: i64 = 300000; // 5 minutes

View file

@ -16,7 +16,8 @@ use zerotier_core_crypto::secret::Secret;
use crate::util::pool::{Pool, PoolFactory};
use crate::vl1::protocol::*;
pub struct AesGmacSivPoolFactory(Secret<SHA384_HASH_SIZE>, Secret<SHA384_HASH_SIZE>);
/// Pool of reusable AES-GMAC-SIV instances.
pub(crate) struct AesGmacSivPoolFactory(Secret<SHA384_HASH_SIZE>, Secret<SHA384_HASH_SIZE>);
impl PoolFactory<AesGmacSiv> for AesGmacSivPoolFactory {
#[inline(always)]
@ -29,7 +30,7 @@ impl PoolFactory<AesGmacSiv> for AesGmacSivPoolFactory {
/// A symmetric secret key negotiated between peers.
///
/// This contains the key and several sub-keys and ciphers keyed with sub-keys.
pub struct SymmetricSecret {
pub(crate) struct SymmetricSecret {
/// The root shared symmetric secret from which other keys are derived.
pub key: Secret<SHA384_HASH_SIZE>,

View file

@ -13,7 +13,7 @@ use parking_lot::Mutex;
use crate::util::gate::IntervalGate;
use crate::vl1::Address;
use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::node::{Node, NodeInterface};
use crate::vl1::node::{Node, VL1SystemInterface};
use crate::vl1::protocol::{WHOIS_RETRY_INTERVAL, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_MAX};
use crate::PacketBuffer;
@ -36,7 +36,7 @@ impl WhoisQueue {
pub fn new() -> Self { Self(Mutex::new(HashMap::new())) }
/// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received.
pub fn query<CI: NodeInterface>(&self, node: &Node, ci: &CI, target: Address, packet: Option<QueuedPacket>) {
pub fn query<CI: VL1SystemInterface>(&self, node: &Node, ci: &CI, target: Address, packet: Option<QueuedPacket>) {
let mut q = self.0.lock();
let qi = q.entry(target).or_insert_with(|| WhoisQueueItem {
@ -64,7 +64,7 @@ impl WhoisQueue {
}
/// Called every INTERVAL during background tasks.
pub fn call_every_interval<CI: NodeInterface>(&self, node: &Node, ci: &CI, time_ticks: i64) {
pub fn call_every_interval<CI: VL1SystemInterface>(&self, node: &Node, ci: &CI, time_ticks: i64) {
let mut targets: Vec<Address> = Vec::new();
self.0.lock().retain(|target, qi| {
if qi.retry_count < WHOIS_RETRY_MAX {
@ -82,7 +82,7 @@ impl WhoisQueue {
}
}
fn send_whois<CI: NodeInterface>(&self, node: &Node, ci: &CI, targets: &[Address]) {
fn send_whois<CI: VL1SystemInterface>(&self, node: &Node, ci: &CI, targets: &[Address]) {
todo!()
}
}

View file

@ -9,8 +9,8 @@
use std::sync::Arc;
use crate::util::buffer::Buffer;
use crate::vl1::node::VL1PacketHandler;
use crate::vl1::{Peer, Path};
use crate::vl1::node::VL1VirtualInterface;
use crate::vl1::{Peer, Path, Identity};
use crate::vl1::protocol::*;
pub trait SwitchInterface {
@ -19,7 +19,7 @@ pub trait SwitchInterface {
pub struct Switch {
}
impl VL1PacketHandler for Switch {
impl VL1VirtualInterface for Switch {
fn handle_packet(&self, peer: &Peer, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, verb: u8, payload: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
false
}
@ -31,6 +31,10 @@ impl VL1PacketHandler for Switch {
fn handle_ok(&self, peer: &Peer, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, in_re_verb: u8, in_re_message_id: u64, payload: &Buffer<{ PACKET_SIZE_MAX }>, cursor: &mut usize) -> bool {
false
}
fn has_trust_relationship(&self, id: &Identity) -> bool {
true
}
}
impl Switch {

View file

@ -13,7 +13,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use parking_lot::Mutex;
use zerotier_network_hypervisor::{Interface, NetworkHypervisor};
use zerotier_network_hypervisor::vl1::{Endpoint, Identity, IdentityType, Node, NodeInterface};
use zerotier_network_hypervisor::vl1::{Endpoint, Identity, IdentityType, Node, VL1SystemInterface};
use zerotier_network_hypervisor::vl2::SwitchInterface;
use crate::fastudpsocket::{fast_udp_socket_sendto, FastUDPRawOsSocket};
@ -31,7 +31,7 @@ struct ServiceInterface {
pub all_sockets_spin_ptr: AtomicUsize,
}
impl NodeInterface for ServiceInterface {
impl VL1SystemInterface for ServiceInterface {
fn event_node_is_up(&self) {}
fn event_node_is_down(&self) {}