A bunch of simplification of logic.

This commit is contained in:
Adam Ierymenko 2022-01-14 17:14:35 -05:00
parent 07cfd12620
commit 99611f8781
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
11 changed files with 180 additions and 184 deletions

View file

@ -11,11 +11,11 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use crate::error::InvalidParameterError; use crate::error::InvalidParameterError;
use crate::vl1::{Address, Identity, Endpoint, VL1SystemInterface, Node}; use crate::vl1::{Address, Identity, Endpoint, SystemInterface, Node};
use crate::vl2::{Switch, SwitchInterface}; use crate::vl2::{Switch, SwitchInterface};
use crate::{PacketBuffer, PacketBufferPool}; use crate::{PacketBuffer, PacketBufferPool};
pub trait Interface: VL1SystemInterface + SwitchInterface {} pub trait Interface: SystemInterface + SwitchInterface {}
pub struct NetworkHypervisor { pub struct NetworkHypervisor {
vl1: Node, vl1: Node,
@ -36,22 +36,18 @@ impl NetworkHypervisor {
#[inline(always)] #[inline(always)]
pub fn get_packet_buffer(&self) -> PacketBuffer { self.vl1.get_packet_buffer() } pub fn get_packet_buffer(&self) -> PacketBuffer { self.vl1.get_packet_buffer() }
/// Get a direct reference to the packet buffer pool.
#[inline(always)] #[inline(always)]
pub fn packet_buffer_pool(&self) -> &Arc<PacketBufferPool> { self.vl1.packet_buffer_pool() } pub fn address(&self) -> Address { self.vl1.identity.address }
#[inline(always)] #[inline(always)]
pub fn address(&self) -> Address { self.vl1.address() } pub fn identity(&self) -> &Identity { &self.vl1.identity }
#[inline(always)]
pub fn identity(&self) -> &Identity { self.vl1.identity() }
pub fn do_background_tasks<CI: Interface>(&self, ci: &CI) -> Duration { pub fn do_background_tasks<CI: Interface>(&self, ci: &CI) -> Duration {
self.vl1.do_background_tasks(ci) self.vl1.do_background_tasks(ci)
} }
#[inline(always)] #[inline(always)]
pub fn wire_receive<CI: VL1SystemInterface>(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: Option<NonZeroI64>, source_local_interface: Option<NonZeroI64>, mut data: PacketBuffer) { pub fn wire_receive<CI: SystemInterface>(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: Option<NonZeroI64>, source_local_interface: Option<NonZeroI64>, mut data: PacketBuffer) {
self.vl1.wire_receive(ci, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data) self.vl1.wire_receive(ci, &self.vl2, source_endpoint, source_local_socket, source_local_interface, data)
} }
} }

View file

@ -42,6 +42,9 @@ lazy_static! {
#[inline(always)] #[inline(always)]
pub(crate) fn highwayhasher() -> highway::HighwayHasher { highway::HighwayHasher::new(highway::Key(HIGHWAYHASHER_KEY.clone())) } pub(crate) fn highwayhasher() -> highway::HighwayHasher { highway::HighwayHasher::new(highway::Key(HIGHWAYHASHER_KEY.clone())) }
#[inline(always)]
pub(crate) fn u128_from_2xu64_ne(x: [u64; 2]) -> u128 { unsafe { std::mem::transmute(x) } }
/// Non-cryptographic 64-bit bit mixer for things like local hashing. /// Non-cryptographic 64-bit bit mixer for things like local hashing.
#[inline(always)] #[inline(always)]
pub(crate) fn hash64_noncrypt(mut x: u64) -> u64 { pub(crate) fn hash64_noncrypt(mut x: u64) -> u64 {

View file

@ -32,7 +32,7 @@ pub const ALGORITHM_NISTP521ECDH: u8 = 0x02;
pub const ALGORITHM_SIDHP751: u8 = 0x04; pub const ALGORITHM_SIDHP751: u8 = 0x04;
pub enum EphemeralKeyAgreementError { pub enum EphemeralKeyAgreementError {
OldPublic, OutdatedPublic,
StateMismatch, StateMismatch,
InvalidData, InvalidData,
NoCompatibleAlgorithms NoCompatibleAlgorithms
@ -41,7 +41,7 @@ pub enum EphemeralKeyAgreementError {
impl Display for EphemeralKeyAgreementError { impl Display for EphemeralKeyAgreementError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
EphemeralKeyAgreementError::OldPublic => f.write_str("old (replayed?) public key data from remote"), EphemeralKeyAgreementError::OutdatedPublic => f.write_str("old (replayed?) public key data from remote"),
EphemeralKeyAgreementError::StateMismatch => f.write_str("ratchet state mismatch"), EphemeralKeyAgreementError::StateMismatch => f.write_str("ratchet state mismatch"),
EphemeralKeyAgreementError::InvalidData => f.write_str("invalid public key data"), EphemeralKeyAgreementError::InvalidData => f.write_str("invalid public key data"),
EphemeralKeyAgreementError::NoCompatibleAlgorithms => f.write_str("no compatible algorithms in public key data") EphemeralKeyAgreementError::NoCompatibleAlgorithms => f.write_str("no compatible algorithms in public key data")
@ -110,9 +110,9 @@ impl EphemeralKeyPairSet {
let _ = varint::write(&mut b, self.previous_ratchet_count); let _ = varint::write(&mut b, self.previous_ratchet_count);
if self.state_hmac.is_some() { if let Some(state_hmac) = self.state_hmac.as_ref() {
b.push(EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE_HMAC); b.push(EPHEMERAL_PUBLIC_FLAG_HAVE_RATCHET_STATE_HMAC);
let _ = b.write_all(self.state_hmac.as_ref().unwrap()); let _ = b.write_all(state_hmac);
} else { } else {
b.push(0); b.push(0);
} }
@ -121,7 +121,7 @@ impl EphemeralKeyPairSet {
let _ = varint::write(&mut b, C25519_PUBLIC_KEY_SIZE as u64); let _ = varint::write(&mut b, C25519_PUBLIC_KEY_SIZE as u64);
let _ = b.write_all(&self.c25519.public_bytes()); let _ = b.write_all(&self.c25519.public_bytes());
let _ = self.sidhp751.as_ref().map(|sidhp751| { if let Some(sidhp751) = self.sidhp751.as_ref() {
b.push(ALGORITHM_SIDHP751); b.push(ALGORITHM_SIDHP751);
let _ = varint::write(&mut b, (SIDH_P751_PUBLIC_KEY_SIZE + 1) as u64); let _ = varint::write(&mut b, (SIDH_P751_PUBLIC_KEY_SIZE + 1) as u64);
b.push(sidhp751.role()); b.push(sidhp751.role());
@ -130,7 +130,7 @@ impl EphemeralKeyPairSet {
SIDHEphemeralKeyPair::Bob(b, _) => b.to_bytes() SIDHEphemeralKeyPair::Bob(b, _) => b.to_bytes()
}; };
let _ = b.write_all(&pk); let _ = b.write_all(&pk);
}); }
// FIPS note: any FIPS compliant ciphers must be last or the exchange will not be FIPS compliant. That's // FIPS note: any FIPS compliant ciphers must be last or the exchange will not be FIPS compliant. That's
// because we chain/ratchet using KHDF and non-FIPS ciphers are considered "salt" inputs for HKDF from a // because we chain/ratchet using KHDF and non-FIPS ciphers are considered "salt" inputs for HKDF from a
@ -152,6 +152,7 @@ impl EphemeralKeyPairSet {
/// the ratchet sequence, or rather a key derived from it for this purpose. /// the ratchet sequence, or rather a key derived from it for this purpose.
/// ///
/// Since ephemeral secrets should only be used once, this consumes the object. /// Since ephemeral secrets should only be used once, this consumes the object.
#[allow(non_snake_case)]
pub fn agree(self, time_ticks: i64, static_secret: &SymmetricSecret, previous_ephemeral_secret: Option<&EphemeralSymmetricSecret>, other_public_bytes: &[u8]) -> Result<EphemeralSymmetricSecret, EphemeralKeyAgreementError> { pub fn agree(self, time_ticks: i64, static_secret: &SymmetricSecret, previous_ephemeral_secret: Option<&EphemeralSymmetricSecret>, other_public_bytes: &[u8]) -> Result<EphemeralSymmetricSecret, EphemeralKeyAgreementError> {
let (mut key, mut ratchet_count, mut c25519_ratchet_count, mut sidhp751_ratchet_count, mut nistp521_ratchet_count) = previous_ephemeral_secret.map_or_else(|| { let (mut key, mut ratchet_count, mut c25519_ratchet_count, mut sidhp751_ratchet_count, mut nistp521_ratchet_count) = previous_ephemeral_secret.map_or_else(|| {
( (
@ -184,7 +185,7 @@ impl EphemeralKeyPairSet {
} }
let other_ratchet_count = other_ratchet_count.unwrap().0; let other_ratchet_count = other_ratchet_count.unwrap().0;
if other_ratchet_count < ratchet_count { if other_ratchet_count < ratchet_count {
return Err(EphemeralKeyAgreementError::OldPublic); return Err(EphemeralKeyAgreementError::OutdatedPublic);
} else if other_ratchet_count > ratchet_count { } else if other_ratchet_count > ratchet_count {
return Err(EphemeralKeyAgreementError::StateMismatch); return Err(EphemeralKeyAgreementError::StateMismatch);
} }
@ -204,7 +205,7 @@ impl EphemeralKeyPairSet {
other_public_bytes = &other_public_bytes[49..]; other_public_bytes = &other_public_bytes[49..];
} else { } else {
if self.state_hmac.is_some() { if self.state_hmac.is_some() {
return Err(EphemeralKeyAgreementError::OldPublic); return Err(EphemeralKeyAgreementError::OutdatedPublic);
} }
other_public_bytes = &other_public_bytes[1..]; other_public_bytes = &other_public_bytes[1..];
} }
@ -321,38 +322,26 @@ pub(crate) struct EphemeralSymmetricSecret {
/// Current ephemeral secret key. /// Current ephemeral secret key.
pub secret: SymmetricSecret, pub secret: SymmetricSecret,
/// Total number of ratchets that has occurred. /// Total number of ratchets that has occurred.
ratchet_count: u64, pub ratchet_count: u64,
/// Time at or after which we should start trying to re-key. /// Time at or after which we should start trying to re-key.
rekey_time: i64, pub rekey_time: i64,
/// Time after which this key is no longer valid. /// Time after which this key is no longer valid.
expire_time: i64, pub expire_time: i64,
/// Number of C25519 agreements so far in ratchet. /// Number of C25519 agreements so far in ratchet.
c25519_ratchet_count: u64, pub c25519_ratchet_count: u64,
/// Number of SIDH P-751 agreements so far in ratchet. /// Number of SIDH P-751 agreements so far in ratchet.
sidhp751_ratchet_count: u64, pub sidhp751_ratchet_count: u64,
/// Number of NIST P-521 ECDH agreements so far in ratchet. /// Number of NIST P-521 ECDH agreements so far in ratchet.
nistp521_ratchet_count: u64, pub nistp521_ratchet_count: u64,
/// Number of times this secret has been used to encrypt. /// Number of times this secret has been used to encrypt.
encrypt_uses: AtomicU32, pub encrypt_uses: AtomicU32,
/// Number of times this secret has been used to decrypt. /// Number of times this secret has been used to decrypt.
decrypt_uses: AtomicU32, pub decrypt_uses: AtomicU32,
/// True if most recent key exchange was NIST/FIPS compliant. /// True if most recent key exchange was NIST/FIPS compliant.
pub fips_compliant_exchange: bool, pub fips_compliant_exchange: bool,
} }
impl EphemeralSymmetricSecret { impl EphemeralSymmetricSecret {
#[inline(always)]
pub fn use_secret_to_encrypt(&self) -> &SymmetricSecret {
let _ = self.encrypt_uses.fetch_add(1, Ordering::Relaxed);
&self.secret
}
#[inline(always)]
pub fn use_secret_to_decrypt(&self) -> &SymmetricSecret {
let _ = self.decrypt_uses.fetch_add(1, Ordering::Relaxed);
&self.secret
}
#[inline(always)] #[inline(always)]
pub fn should_rekey(&self, time_ticks: i64) -> bool { pub fn should_rekey(&self, time_ticks: i64) -> bool {
time_ticks >= self.rekey_time || self.encrypt_uses.load(Ordering::Relaxed).max(self.decrypt_uses.load(Ordering::Relaxed)) >= EPHEMERAL_SECRET_REKEY_AFTER_USES time_ticks >= self.rekey_time || self.encrypt_uses.load(Ordering::Relaxed).max(self.decrypt_uses.load(Ordering::Relaxed)) >= EPHEMERAL_SECRET_REKEY_AFTER_USES

View file

@ -32,16 +32,21 @@ impl FragmentedPacket {
} }
} }
/// Add a fragment to this fragment set and return true if the packet appears complete. /// Add a fragment to this fragment set and return true if all fragments are present.
#[inline(always)] #[inline(always)]
pub fn add_fragment(&mut self, frag: PacketBuffer, no: u8, expecting: u8) -> bool { pub fn add_fragment(&mut self, frag: PacketBuffer, no: u8, expecting: u8) -> bool {
if no < PACKET_FRAGMENT_COUNT_MAX as u8 { self.frags.get_mut(no as usize).map_or(false, |entry| {
if self.frags[no as usize].replace(frag).is_none() { // Note that a duplicate fragment just gets silently replaced. This shouldn't happen
self.have = self.have.wrapping_add(1); // unless a dupe occurred at the network level, in which case this is usually a
self.expecting |= expecting; // in valid streams expecting is either 0 or the (same) total // no-op event. There is no security implication since the whole packet gets MAC'd
return self.have == self.expecting && self.have < PACKET_FRAGMENT_COUNT_MAX as u8; // after assembly.
if entry.replace(frag).is_none() {
self.have += 1;
self.expecting |= expecting; // expecting is either 0 or the expected total
self.have == self.expecting
} else {
false
} }
} })
false
} }
} }

View file

@ -27,7 +27,7 @@ use zerotier_core_crypto::salsa::Salsa;
use zerotier_core_crypto::secret::Secret; use zerotier_core_crypto::secret::Secret;
use crate::error::InvalidFormatError; use crate::error::InvalidFormatError;
use crate::util::{array_range, highwayhasher}; use crate::util::{array_range, highwayhasher, u128_from_2xu64_ne};
use crate::util::buffer::Buffer; use crate::util::buffer::Buffer;
use crate::util::pool::{Pool, Pooled, PoolFactory}; use crate::util::pool::{Pool, Pooled, PoolFactory};
use crate::vl1::Address; use crate::vl1::Address;
@ -150,7 +150,7 @@ impl Identity {
Self { Self {
address, address,
fast_eq_hash: u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() }), fast_eq_hash: u128_from_2xu64_ne(hh.finalize128()),
c25519: c25519_pub, c25519: c25519_pub,
ed25519: ed25519_pub, ed25519: ed25519_pub,
p521: Some(IdentityP521Public { p521: Some(IdentityP521Public {
@ -471,7 +471,7 @@ impl Identity {
Ok(Identity { Ok(Identity {
address, address,
fast_eq_hash: u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() }), fast_eq_hash: u128_from_2xu64_ne(hh.finalize128()),
c25519: x25519_public.0.clone(), c25519: x25519_public.0.clone(),
ed25519: x25519_public.1.clone(), ed25519: x25519_public.1.clone(),
p521: if p521_ecdh_ecdsa_public.is_some() { p521: if p521_ecdh_ecdsa_public.is_some() {
@ -617,7 +617,7 @@ impl FromStr for Identity {
Ok(Identity { Ok(Identity {
address, address,
fast_eq_hash: u128::from_ne_bytes(unsafe { *hh.finalize128().as_ptr().cast() }), fast_eq_hash: u128_from_2xu64_ne(hh.finalize128()),
c25519: keys[0].as_slice()[0..32].try_into().unwrap(), c25519: keys[0].as_slice()[0..32].try_into().unwrap(),
ed25519: keys[0].as_slice()[32..64].try_into().unwrap(), ed25519: keys[0].as_slice()[32..64].try_into().unwrap(),
p521: if keys[2].is_empty() { p521: if keys[2].is_empty() {

View file

@ -31,6 +31,6 @@ pub use dictionary::Dictionary;
pub use inetaddress::InetAddress; pub use inetaddress::InetAddress;
pub use peer::Peer; pub use peer::Peer;
pub use path::Path; pub use path::Path;
pub use node::{Node, VL1SystemInterface}; pub use node::{Node, SystemInterface};
pub use protocol::{PACKET_SIZE_MAX, PACKET_FRAGMENT_COUNT_MAX}; pub use protocol::{PACKET_SIZE_MAX, PACKET_FRAGMENT_COUNT_MAX};

View file

@ -8,14 +8,12 @@
use std::num::NonZeroI64; use std::num::NonZeroI64;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::{Arc, Weak};
use std::time::Duration; use std::time::Duration;
use dashmap::DashMap; use dashmap::DashMap;
use parking_lot::Mutex; use parking_lot::Mutex;
use zerotier_core_crypto::random::{next_u64_secure, SecureRandom};
use crate::{PacketBuffer, PacketBufferFactory, PacketBufferPool}; use crate::{PacketBuffer, PacketBufferFactory, PacketBufferPool};
use crate::error::InvalidParameterError; use crate::error::InvalidParameterError;
use crate::util::buffer::Buffer; use crate::util::buffer::Buffer;
@ -28,7 +26,10 @@ use crate::vl1::protocol::*;
use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue}; use crate::vl1::whoisqueue::{QueuedPacket, WhoisQueue};
/// Trait implemented by external code to handle events and provide an interface to the system or application. /// Trait implemented by external code to handle events and provide an interface to the system or application.
pub trait VL1SystemInterface { ///
/// These methods are basically callbacks that the core calls to request or transmit things. They are called
/// during calls to things like wire_recieve() and do_background_tasks().
pub trait SystemInterface: Sync + Send {
/// Node is up and ready for operation. /// Node is up and ready for operation.
fn event_node_is_up(&self); fn event_node_is_up(&self);
@ -84,21 +85,12 @@ pub trait VL1SystemInterface {
fn time_clock(&self) -> i64; fn time_clock(&self) -> i64;
} }
/// Trait implemented by VL2 to handle messages after they are unwrapped by VL1. /// Interface between VL1 and higher/inner protocol layers.
/// ///
/// This normally isn't used from outside this crate except for testing or if you want to harness VL1 /// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but
/// for some entirely unrelated purpose. /// it could also be implemented for testing or "off label" use of VL1.
pub trait VL1VirtualInterface { pub trait VL1VirtualInterface: Sync + Send {
/// Handle a packet, returning true if it belonged to VL2. /// Handle a packet, returning true if it was handled by the next layer.
///
/// If this is a VL2 packet, this must return true. True must be returned even if subsequent
/// logic determines that the VL2 packet is not valid or if it is rejected due to lack of
/// security credentials.
///
/// That's because VL1 calls this before matching the packet's verb against VL1 verbs. This
/// is done to reduce the number of CPU branches between packet receive and the performance
/// critical handling of virtual network frames. A return value of true here indicates that
/// the packet was handled, and false means it may be a VL1 packet.
/// ///
/// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error().
/// The return values of these must follow the same semantic of returning true if the message /// The return values of these must follow the same semantic of returning true if the message
@ -126,20 +118,34 @@ struct BackgroundTaskIntervals {
} }
pub struct Node { pub struct Node {
pub(crate) instance_id: u64, /// A random ID generated to identify this particular running instance.
identity: Identity, pub instance_id: u64,
/// This node's identity and permanent keys.
pub identity: Identity,
/// Interval latches for periodic background tasks.
intervals: Mutex<BackgroundTaskIntervals>, intervals: Mutex<BackgroundTaskIntervals>,
paths: DashMap<u128, Arc<Path>>,
/// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use.
paths: DashMap<u128, Weak<Path>>,
/// Peers with which we are currently communicating.
peers: DashMap<Address, Arc<Peer>>, peers: DashMap<Address, Arc<Peer>>,
/// This node's trusted roots, sorted in descending order of preference.
roots: Mutex<Vec<Arc<Peer>>>, roots: Mutex<Vec<Arc<Peer>>>,
/// Identity lookup queue, also holds packets waiting on a lookup.
whois: WhoisQueue, whois: WhoisQueue,
/// Reusable network buffer pool.
buffer_pool: Arc<PacketBufferPool>, buffer_pool: Arc<PacketBufferPool>,
secure_prng: SecureRandom,
} }
impl Node { impl Node {
/// Create a new Node. /// Create a new Node.
pub fn new<I: VL1SystemInterface>(ci: &I, auto_generate_identity: bool) -> Result<Self, InvalidParameterError> { pub fn new<I: SystemInterface>(ci: &I, auto_generate_identity: bool) -> Result<Self, InvalidParameterError> {
let id = { let id = {
let id_str = ci.load_node_identity(); let id_str = ci.load_node_identity();
if id_str.is_none() { if id_str.is_none() {
@ -162,30 +168,21 @@ impl Node {
}; };
Ok(Self { Ok(Self {
instance_id: next_u64_secure(), instance_id: zerotier_core_crypto::random::next_u64_secure(),
identity: id, identity: id,
intervals: Mutex::new(BackgroundTaskIntervals::default()), intervals: Mutex::new(BackgroundTaskIntervals::default()),
paths: DashMap::new(), paths: DashMap::with_capacity(128),
peers: DashMap::new(), peers: DashMap::with_capacity(128),
roots: Mutex::new(Vec::new()), roots: Mutex::new(Vec::new()),
whois: WhoisQueue::new(), whois: WhoisQueue::new(),
buffer_pool: Arc::new(PacketBufferPool::new(64, PacketBufferFactory::new())), buffer_pool: Arc::new(PacketBufferPool::new(64, PacketBufferFactory::new())),
secure_prng: SecureRandom::get(),
}) })
} }
/// Get a packet buffer that will automatically check itself back into the pool on drop.
#[inline(always)] #[inline(always)]
pub fn get_packet_buffer(&self) -> PacketBuffer { self.buffer_pool.get() } pub fn get_packet_buffer(&self) -> PacketBuffer { self.buffer_pool.get() }
#[inline(always)]
pub fn packet_buffer_pool(&self) -> &Arc<PacketBufferPool> { &self.buffer_pool }
#[inline(always)]
pub fn address(&self) -> Address { self.identity.address }
#[inline(always)]
pub fn identity(&self) -> &Identity { &self.identity }
/// Get a peer by address. /// Get a peer by address.
pub fn peer(&self, a: Address) -> Option<Arc<Peer>> { self.peers.get(&a).map(|peer| peer.value().clone()) } pub fn peer(&self, a: Address) -> Option<Arc<Peer>> { self.peers.get(&a).map(|peer| peer.value().clone()) }
@ -200,19 +197,19 @@ impl Node {
} }
/// Run background tasks and return desired delay until next call in milliseconds. /// Run background tasks and return desired delay until next call in milliseconds.
pub fn do_background_tasks<I: VL1SystemInterface>(&self, ci: &I) -> Duration { ///
/// This should only be called periodically from a single thread, but that thread can be
/// different each time. Calling it concurrently won't crash but won't accomplish anything.
pub fn do_background_tasks<I: SystemInterface>(&self, ci: &I) -> Duration {
let mut intervals = self.intervals.lock(); let mut intervals = self.intervals.lock();
let tt = ci.time_ticks(); let tt = ci.time_ticks();
if intervals.whois.gate(tt) {
self.whois.call_every_interval(self, ci, tt);
}
if intervals.paths.gate(tt) { if intervals.paths.gate(tt) {
self.paths.retain(|_, path| { self.paths.retain(|_, path| {
path.call_every_interval(ci, tt); path.upgrade().map_or(false, |p| {
todo!(); p.call_every_interval(ci, tt);
true true
})
}); });
} }
@ -224,18 +221,18 @@ impl Node {
}); });
} }
Duration::from_millis(1000) if intervals.whois.gate(tt) {
self.whois.call_every_interval(self, ci, tt);
}
Duration::from_millis(WhoisQueue::INTERVAL.min(Path::CALL_EVERY_INTERVAL_MS).min(Peer::CALL_EVERY_INTERVAL_MS) as u64 / 4)
} }
/// Called when a packet is received on the physical wire. /// Called when a packet is received on the physical wire.
pub fn wire_receive<I: VL1SystemInterface, PH: VL1VirtualInterface>(&self, ci: &I, ph: &PH, source_endpoint: &Endpoint, source_local_socket: Option<NonZeroI64>, source_local_interface: Option<NonZeroI64>, mut data: PacketBuffer) { pub fn wire_receive<I: SystemInterface, PH: VL1VirtualInterface>(&self, ci: &I, ph: &PH, source_endpoint: &Endpoint, source_local_socket: Option<NonZeroI64>, source_local_interface: Option<NonZeroI64>, mut data: PacketBuffer) {
let fragment_header = data.struct_mut_at::<FragmentHeader>(0); if let Ok(fragment_header) = data.struct_mut_at::<FragmentHeader>(0) {
if fragment_header.is_ok() { if let Some(dest) = Address::from_bytes(&fragment_header.dest) {
let fragment_header = fragment_header.unwrap();
let dest = Address::from_bytes(&fragment_header.dest);
if dest.is_some() {
let time_ticks = ci.time_ticks(); let time_ticks = ci.time_ticks();
let dest = dest.unwrap();
if dest == self.identity.address { if dest == self.identity.address {
// Handle packets addressed to this node. // Handle packets addressed to this node.
@ -244,37 +241,28 @@ impl Node {
if fragment_header.is_fragment() { if fragment_header.is_fragment() {
let _ = path.receive_fragment(u64::from_ne_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks).map(|assembled_packet| { if let Some(assembled_packet) = path.receive_fragment(u64::from_ne_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) {
if assembled_packet.frags[0].is_some() { if let Some(frag0) = assembled_packet.frags[0].as_ref() {
let frag0 = assembled_packet.frags[0].as_ref().unwrap();
let packet_header = frag0.struct_at::<PacketHeader>(0); let packet_header = frag0.struct_at::<PacketHeader>(0);
if packet_header.is_ok() { if packet_header.is_ok() {
let packet_header = packet_header.unwrap(); let packet_header = packet_header.unwrap();
let source = Address::from_bytes(&packet_header.src); if let Some(source) = Address::from_bytes(&packet_header.src) {
if source.is_some() { if let Some(peer) = self.peer(source) {
let source = source.unwrap(); peer.receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]);
let peer = self.peer(source);
if peer.is_some() {
peer.unwrap().receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, frag0, &assembled_packet.frags[1..(assembled_packet.have as usize)]);
} else { } else {
self.whois.query(self, ci, source, Some(QueuedPacket::Fragmented(assembled_packet))); self.whois.query(self, ci, source, Some(QueuedPacket::Fragmented(assembled_packet)));
} }
} }
} }
} }
}); }
} else { } else {
let packet_header = data.struct_at::<PacketHeader>(0); if let Ok(packet_header) = data.struct_at::<PacketHeader>(0) {
if packet_header.is_ok() { if let Some(source) = Address::from_bytes(&packet_header.src) {
let packet_header = packet_header.unwrap(); if let Some(peer) = self.peer(source) {
let source = Address::from_bytes(&packet_header.src); peer.receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, data.as_ref(), &[]);
if source.is_some() {
let source = source.unwrap();
let peer = self.peer(source);
if peer.is_some() {
peer.unwrap().receive(self, ci, ph, time_ticks, source_endpoint, &path, &packet_header, data.as_ref(), &[]);
} else { } else {
self.whois.query(self, ci, source, Some(QueuedPacket::Unfragmented(data))); self.whois.query(self, ci, source, Some(QueuedPacket::Unfragmented(data)));
} }
@ -292,24 +280,24 @@ impl Node {
return; return;
} }
} else { } else {
let packet_header = data.struct_mut_at::<PacketHeader>(0); if let Ok(packet_header) = data.struct_mut_at::<PacketHeader>(0) {
if packet_header.is_ok() { if packet_header.increment_hops() > FORWARD_MAX_HOPS {
if packet_header.unwrap().increment_hops() > FORWARD_MAX_HOPS {
return; return;
} }
} else { } else {
return; return;
} }
} }
let _ = self.peer(dest).map(|peer| peer.forward(ci, time_ticks, data.as_ref())); if let Some(peer) = self.peer(dest) {
peer.forward(ci, time_ticks, data.as_ref());
}
} }
}; };
} }
} }
/// Get the current best root peer that we should use for WHOIS, relaying, etc. /// Get the current best root peer that we should use for WHOIS, relaying, etc.
pub fn root(&self) -> Option<Arc<Peer>> { self.roots.lock().first().map(|p| p.clone()) } pub fn root(&self) -> Option<Arc<Peer>> { self.roots.lock().first().cloned() }
/// Return true if a peer is a root. /// Return true if a peer is a root.
pub fn is_peer_root(&self, peer: &Peer) -> bool { self.roots.lock().iter().any(|p| Arc::as_ptr(p) == (peer as *const Peer)) } pub fn is_peer_root(&self, peer: &Peer) -> bool { self.roots.lock().iter().any(|p| Arc::as_ptr(p) == (peer as *const Peer)) }
@ -320,10 +308,14 @@ impl Node {
/// of endpoint, local socket, and local interface. /// of endpoint, local socket, and local interface.
pub fn path(&self, ep: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> Arc<Path> { pub fn path(&self, ep: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>) -> Arc<Path> {
let key = Path::local_lookup_key(ep, local_socket, local_interface); let key = Path::local_lookup_key(ep, local_socket, local_interface);
self.paths.get(&key).map_or_else(|| { let mut path_entry = self.paths.entry(key).or_insert_with(|| Weak::new());
if let Some(path) = path_entry.value().upgrade() {
path
} else {
let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface)); let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface));
self.paths.insert(key, p.clone()).unwrap_or(p) // if another thread added one, return that instead *path_entry.value_mut() = Arc::downgrade(&p);
}, |path| path.value().clone()) p
}
} }
} }

View file

@ -22,7 +22,7 @@ use crate::PacketBuffer;
use crate::util::{array_range, highwayhasher, U64NoOpHasher}; use crate::util::{array_range, highwayhasher, U64NoOpHasher};
use crate::vl1::Endpoint; use crate::vl1::Endpoint;
use crate::vl1::fragmentedpacket::FragmentedPacket; use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::node::VL1SystemInterface; use crate::vl1::node::SystemInterface;
use crate::vl1::protocol::*; use crate::vl1::protocol::*;
/// Keepalive interval for paths in milliseconds. /// Keepalive interval for paths in milliseconds.
@ -129,7 +129,6 @@ impl Path {
} }
} }
// This is optimized for the fragmented case because that's the most common when transferring data.
if fp.entry(packet_id).or_insert_with(|| FragmentedPacket::new(time_ticks)).add_fragment(packet, fragment_no, fragment_expecting_count) { if fp.entry(packet_id).or_insert_with(|| FragmentedPacket::new(time_ticks)).add_fragment(packet, fragment_no, fragment_expecting_count) {
fp.remove(&packet_id) fp.remove(&packet_id)
} else { } else {
@ -173,7 +172,7 @@ impl Path {
pub(crate) const CALL_EVERY_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL; pub(crate) const CALL_EVERY_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL;
#[inline(always)] #[inline(always)]
pub(crate) fn call_every_interval<CI: VL1SystemInterface>(&self, ct: &CI, time_ticks: i64) { pub(crate) fn call_every_interval<CI: SystemInterface>(&self, ct: &CI, time_ticks: i64) {
self.fragmented_packets.lock().retain(|packet_id, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION); self.fragmented_packets.lock().retain(|_, frag| (time_ticks - frag.ts_ticks) < PACKET_FRAGMENT_EXPIRATION);
} }
} }

View file

@ -13,6 +13,7 @@ use std::num::NonZeroI64;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicU8, Ordering}; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicU8, Ordering};
use arc_swap::ArcSwapOption;
use parking_lot::Mutex; use parking_lot::Mutex;
use zerotier_core_crypto::hash::{SHA384, SHA384_HASH_SIZE}; use zerotier_core_crypto::hash::{SHA384, SHA384_HASH_SIZE};
@ -24,7 +25,7 @@ use zerotier_core_crypto::secret::Secret;
use crate::{PacketBuffer, VERSION_MAJOR, VERSION_MINOR, VERSION_PROTO, VERSION_REVISION}; use crate::{PacketBuffer, VERSION_MAJOR, VERSION_MINOR, VERSION_PROTO, VERSION_REVISION};
use crate::util::{array_range, u64_as_bytes}; use crate::util::{array_range, u64_as_bytes};
use crate::util::buffer::Buffer; use crate::util::buffer::Buffer;
use crate::vl1::{Endpoint, Identity, InetAddress, Path}; use crate::vl1::{Endpoint, Identity, InetAddress, Path, ephemeral};
use crate::vl1::ephemeral::EphemeralSymmetricSecret; use crate::vl1::ephemeral::EphemeralSymmetricSecret;
use crate::vl1::identity::{IDENTITY_ALGORITHM_ALL, IDENTITY_ALGORITHM_X25519}; use crate::vl1::identity::{IDENTITY_ALGORITHM_ALL, IDENTITY_ALGORITHM_X25519};
use crate::vl1::node::*; use crate::vl1::node::*;
@ -41,8 +42,8 @@ pub struct Peer {
// Static shared secret computed from agreement with identity. // Static shared secret computed from agreement with identity.
static_secret: SymmetricSecret, static_secret: SymmetricSecret,
// Latest ephemeral secret acknowledged with OK(HELLO). // Latest ephemeral secret or None if not yet negotiated.
ephemeral_secret: Mutex<Option<Arc<EphemeralSymmetricSecret>>>, ephemeral_secret: ArcSwapOption<EphemeralSymmetricSecret>,
// Paths sorted in descending order of quality / preference. // Paths sorted in descending order of quality / preference.
paths: Mutex<Vec<Arc<Path>>>, paths: Mutex<Vec<Arc<Path>>>,
@ -100,8 +101,8 @@ fn salsa_poly_create(secret: &SymmetricSecret, header: &PacketHeader, packet_siz
} }
/// Attempt AEAD packet encryption and MAC validation. /// Attempt AEAD packet encryption and MAC validation.
fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option<PacketBuffer>], payload: &mut Buffer<PACKET_SIZE_MAX>, message_id: &mut u64) -> bool { fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8], header: &PacketHeader, fragments: &[Option<PacketBuffer>], payload: &mut Buffer<PACKET_SIZE_MAX>) -> Option<u64> {
packet_frag0_payload_bytes.get(0).map_or(false, |verb| { packet_frag0_payload_bytes.get(0).map_or(None, |verb| {
match header.cipher() { match header.cipher() {
CIPHER_NOCRYPT_POLY1305 => { CIPHER_NOCRYPT_POLY1305 => {
if (verb & VERB_MASK) == VERB_VL1_HELLO { if (verb & VERB_MASK) == VERB_VL1_HELLO {
@ -116,14 +117,13 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8],
let (_, mut poly) = salsa_poly_create(secret, header, total_packet_len); let (_, mut poly) = salsa_poly_create(secret, header, total_packet_len);
poly.update(payload.as_bytes()); poly.update(payload.as_bytes());
if poly.finish()[0..8].eq(&header.mac) { if poly.finish()[0..8].eq(&header.mac) {
*message_id = u64::from_ne_bytes(header.id); Some(u64::from_ne_bytes(header.id))
true
} else { } else {
false None
} }
} else { } else {
// Only HELLO is permitted without payload encryption. Drop other packet types if sent this way. // Only HELLO is permitted without payload encryption. Drop other packet types if sent this way.
false None
} }
} }
@ -142,10 +142,9 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8],
})); }));
} }
if poly.finish()[0..8].eq(&header.mac) { if poly.finish()[0..8].eq(&header.mac) {
*message_id = u64::from_ne_bytes(header.id); Some(u64::from_ne_bytes(header.id))
true
} else { } else {
false None
} }
} }
@ -163,16 +162,15 @@ fn try_aead_decrypt(secret: &SymmetricSecret, packet_frag0_payload_bytes: &[u8],
}) })
}); });
} }
aes.decrypt_finish().map_or(false, |tag| { aes.decrypt_finish().map_or(None, |tag| {
// AES-GMAC-SIV encrypts the packet ID too as part of its computation of a single // AES-GMAC-SIV encrypts the packet ID too as part of its computation of a single
// opaque 128-bit tag, so to get the original packet ID we have to grab it from the // opaque 128-bit tag, so to get the original packet ID we have to grab it from the
// decrypted tag. // decrypted tag.
*message_id = u64::from_ne_bytes(*array_range::<u8, 16, 0, 8>(tag)); Some(u64::from_ne_bytes(*array_range::<u8, 16, 0, 8>(tag)))
true
}) })
} }
_ => false, _ => None,
} }
}) })
} }
@ -182,11 +180,11 @@ impl Peer {
/// This only returns None if this_node_identity does not have its secrets or if some /// This only returns None if this_node_identity does not have its secrets or if some
/// fatal error occurs performing key agreement between the two identities. /// fatal error occurs performing key agreement between the two identities.
pub(crate) fn new(this_node_identity: &Identity, id: Identity) -> Option<Peer> { pub(crate) fn new(this_node_identity: &Identity, id: Identity) -> Option<Peer> {
this_node_identity.agree(&id).map(|static_secret| { this_node_identity.agree(&id).map(|static_secret| -> Peer {
Peer { Peer {
identity: id, identity: id,
static_secret: SymmetricSecret::new(static_secret), static_secret: SymmetricSecret::new(static_secret),
ephemeral_secret: Mutex::new(None), ephemeral_secret: ArcSwapOption::const_empty(),
paths: Mutex::new(Vec::new()), paths: Mutex::new(Vec::new()),
reported_local_ip: Mutex::new(None), reported_local_ip: Mutex::new(None),
last_send_time_ticks: AtomicI64::new(0), last_send_time_ticks: AtomicI64::new(0),
@ -211,23 +209,29 @@ impl Peer {
/// Receive, decrypt, authenticate, and process an incoming packet from this peer. /// Receive, decrypt, authenticate, and process an incoming packet from this peer.
/// If the packet comes in multiple fragments, the fragments slice should contain all /// If the packet comes in multiple fragments, the fragments slice should contain all
/// those fragments after the main packet header and first chunk. /// those fragments after the main packet header and first chunk.
pub(crate) fn receive<CI: VL1SystemInterface, PH: VL1VirtualInterface>(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) { pub(crate) fn receive<CI: SystemInterface, PH: VL1VirtualInterface>(&self, node: &Node, ci: &CI, ph: &PH, time_ticks: i64, source_endpoint: &Endpoint, source_path: &Arc<Path>, header: &PacketHeader, packet: &Buffer<{ PACKET_SIZE_MAX }>, fragments: &[Option<PacketBuffer>]) {
let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| { let _ = packet.as_bytes_starting_at(PACKET_VERB_INDEX).map(|packet_frag0_payload_bytes| {
let mut payload: Buffer<PACKET_SIZE_MAX> = unsafe { Buffer::new_without_memzero() }; let mut payload: Buffer<PACKET_SIZE_MAX> = unsafe { Buffer::new_without_memzero() };
let mut message_id = 0_u64;
let ephemeral_secret: Option<Arc<EphemeralSymmetricSecret>> = self.ephemeral_secret.lock().clone(); let (forward_secrecy, mut message_id) = if let Some(ephemeral_secret) = self.ephemeral_secret.load_full() {
let forward_secrecy = if ephemeral_secret.map_or(false, |ephemeral_secret| try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id)) { if let Some(message_id) = try_aead_decrypt(&ephemeral_secret.secret, packet_frag0_payload_bytes, header, fragments, &mut payload) {
// Decrypted and authenticated by the ephemeral secret. ephemeral_secret.decrypt_uses.fetch_add(1, Ordering::Relaxed);
true (true, message_id)
} else {
(false, 0)
}
} else { } else {
// There is no ephemeral secret, or authentication with it failed. (false, 0)
unsafe { payload.set_size_unchecked(0); } };
if !try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload, &mut message_id) { if !forward_secrecy {
// Static secret also failed, reject packet. if let Some(message_id2) = try_aead_decrypt(&self.static_secret, packet_frag0_payload_bytes, header, fragments, &mut payload) {
message_id = message_id2;
} else {
// Packet failed to decrypt using either ephemeral or permament key, reject.
return; return;
} }
false }
}; debug_assert!(!payload.is_empty());
// --------------------------------------------------------------- // ---------------------------------------------------------------
// If we made it here it decrypted and passed authentication. // If we made it here it decrypted and passed authentication.
@ -237,7 +241,6 @@ impl Peer {
self.total_bytes_received.fetch_add((payload.len() + PACKET_HEADER_SIZE) as u64, Ordering::Relaxed); self.total_bytes_received.fetch_add((payload.len() + PACKET_HEADER_SIZE) as u64, Ordering::Relaxed);
source_path.log_receive_authenticated_packet(payload.len() + PACKET_HEADER_SIZE, source_endpoint); source_path.log_receive_authenticated_packet(payload.len() + PACKET_HEADER_SIZE, source_endpoint);
debug_assert!(!payload.is_empty()); // should be impossible since this fails in try_aead_decrypt()
let mut verb = payload.as_bytes()[0]; let mut verb = payload.as_bytes()[0];
// If this flag is set, the end of the payload is a full HMAC-SHA384 authentication // If this flag is set, the end of the payload is a full HMAC-SHA384 authentication
@ -285,11 +288,20 @@ impl Peer {
VERB_VL1_USER_MESSAGE => self.receive_user_message(ci, node, time_ticks, source_path, &payload), VERB_VL1_USER_MESSAGE => self.receive_user_message(ci, node, time_ticks, source_path, &payload),
_ => {} _ => {}
} }
} else {
#[cfg(debug)] {
if match verb {
VERB_VL1_NOP | VERB_VL1_HELLO | VERB_VL1_ERROR | VERB_VL1_OK | VERB_VL1_WHOIS | VERB_VL1_RENDEZVOUS | VERB_VL1_ECHO | VERB_VL1_PUSH_DIRECT_PATHS | VERB_VL1_USER_MESSAGE => true,
_ => false
} {
panic!("The next layer handled a VL1 packet! It should not do this.");
}
}
} }
}); });
} }
fn send_to_endpoint<CI: VL1SystemInterface>(&self, ci: &CI, endpoint: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { fn send_to_endpoint<CI: SystemInterface>(&self, ci: &CI, endpoint: &Endpoint, local_socket: Option<NonZeroI64>, local_interface: Option<NonZeroI64>, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
debug_assert!(packet.len() <= PACKET_SIZE_MAX); debug_assert!(packet.len() <= PACKET_SIZE_MAX);
debug_assert!(packet.len() >= PACKET_SIZE_MIN); debug_assert!(packet.len() >= PACKET_SIZE_MIN);
match endpoint { match endpoint {
@ -343,7 +355,7 @@ impl Peer {
/// ///
/// This will go directly if there is an active path, or otherwise indirectly /// This will go directly if there is an active path, or otherwise indirectly
/// via a root or some other route. /// via a root or some other route.
pub(crate) fn send<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { pub(crate) fn send<CI: SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
self.path(node).map_or(false, |path| { self.path(node).map_or(false, |path| {
if self.send_to_endpoint(ci, path.endpoint().as_ref(), path.local_socket(), path.local_interface(), packet) { if self.send_to_endpoint(ci, path.endpoint().as_ref(), path.local_socket(), path.local_interface(), packet) {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
@ -362,7 +374,7 @@ impl Peer {
/// ///
/// This doesn't fragment large packets since fragments are forwarded individually. /// This doesn't fragment large packets since fragments are forwarded individually.
/// Intermediates don't need to adjust fragmentation. /// Intermediates don't need to adjust fragmentation.
pub(crate) fn forward<CI: VL1SystemInterface>(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool { pub(crate) fn forward<CI: SystemInterface>(&self, ci: &CI, time_ticks: i64, packet: &Buffer<{ PACKET_SIZE_MAX }>) -> bool {
self.direct_path().map_or(false, |path| { self.direct_path().map_or(false, |path| {
if ci.wire_send(path.endpoint().as_ref(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) { if ci.wire_send(path.endpoint().as_ref(), path.local_socket(), path.local_interface(), &[packet.as_bytes()], 0) {
self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); self.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed);
@ -378,7 +390,7 @@ impl Peer {
/// ///
/// If explicit_endpoint is not None the packet will be sent directly to this endpoint. /// If explicit_endpoint is not None the packet will be sent directly to this endpoint.
/// Otherwise it will be sent via the best direct or indirect path known. /// Otherwise it will be sent via the best direct or indirect path known.
pub(crate) fn send_hello<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool { pub(crate) fn send_hello<CI: SystemInterface>(&self, ci: &CI, node: &Node, explicit_endpoint: Option<&Endpoint>) -> bool {
let mut packet: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new(); let mut packet: Buffer<{ PACKET_SIZE_MAX }> = Buffer::new();
let time_ticks = ci.time_ticks(); let time_ticks = ci.time_ticks();
@ -387,7 +399,7 @@ impl Peer {
let packet_header: &mut PacketHeader = packet.append_struct_get_mut().unwrap(); let packet_header: &mut PacketHeader = packet.append_struct_get_mut().unwrap();
packet_header.id = message_id.to_ne_bytes(); // packet ID and message ID are the same when Poly1305 MAC is used packet_header.id = message_id.to_ne_bytes(); // packet ID and message ID are the same when Poly1305 MAC is used
packet_header.dest = self.identity.address.to_bytes(); packet_header.dest = self.identity.address.to_bytes();
packet_header.src = node.address().to_bytes(); packet_header.src = node.identity.address.to_bytes();
packet_header.flags_cipher_hops = CIPHER_NOCRYPT_POLY1305; packet_header.flags_cipher_hops = CIPHER_NOCRYPT_POLY1305;
} }
{ {
@ -450,13 +462,13 @@ impl Peer {
/// Called every INTERVAL during background tasks. /// Called every INTERVAL during background tasks.
#[inline(always)] #[inline(always)]
pub(crate) fn call_every_interval<CI: VL1SystemInterface>(&self, ct: &CI, time_ticks: i64) {} pub(crate) fn call_every_interval<CI: SystemInterface>(&self, ct: &CI, time_ticks: i64) {}
#[inline(always)] #[inline(always)]
fn receive_hello<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} fn receive_hello<CI: SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)] #[inline(always)]
fn receive_error<CI: VL1SystemInterface, PH: VL1VirtualInterface>(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { fn receive_error<CI: SystemInterface, PH: VL1VirtualInterface>(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) {
let mut cursor: usize = 0; let mut cursor: usize = 0;
let _ = payload.read_struct::<message_component_structs::ErrorHeader>(&mut cursor).map(|error_header| { let _ = payload.read_struct::<message_component_structs::ErrorHeader>(&mut cursor).map(|error_header| {
let in_re_message_id = u64::from_ne_bytes(error_header.in_re_message_id); let in_re_message_id = u64::from_ne_bytes(error_header.in_re_message_id);
@ -472,7 +484,7 @@ impl Peer {
} }
#[inline(always)] #[inline(always)]
fn receive_ok<CI: VL1SystemInterface, PH: VL1VirtualInterface>(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) { fn receive_ok<CI: SystemInterface, PH: VL1VirtualInterface>(&self, ci: &CI, ph: &PH, node: &Node, time_ticks: i64, source_path: &Arc<Path>, forward_secrecy: bool, extended_authentication: bool, payload: &Buffer<{ PACKET_SIZE_MAX }>) {
let mut cursor: usize = 0; let mut cursor: usize = 0;
let _ = payload.read_struct::<message_component_structs::OkHeader>(&mut cursor).map(|ok_header| { let _ = payload.read_struct::<message_component_structs::OkHeader>(&mut cursor).map(|ok_header| {
let in_re_message_id = u64::from_ne_bytes(ok_header.in_re_message_id); let in_re_message_id = u64::from_ne_bytes(ok_header.in_re_message_id);
@ -492,19 +504,19 @@ impl Peer {
} }
#[inline(always)] #[inline(always)]
fn receive_whois<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} fn receive_whois<CI: SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)] #[inline(always)]
fn receive_rendezvous<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} fn receive_rendezvous<CI: SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)] #[inline(always)]
fn receive_echo<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} fn receive_echo<CI: SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)] #[inline(always)]
fn receive_push_direct_paths<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} fn receive_push_direct_paths<CI: SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
#[inline(always)] #[inline(always)]
fn receive_user_message<CI: VL1SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {} fn receive_user_message<CI: SystemInterface>(&self, ci: &CI, node: &Node, time_ticks: i64, source_path: &Arc<Path>, payload: &Buffer<{ PACKET_SIZE_MAX }>) {}
/// Get current best path or None if there are no direct paths to this peer. /// Get current best path or None if there are no direct paths to this peer.
#[inline(always)] #[inline(always)]

View file

@ -13,7 +13,7 @@ use parking_lot::Mutex;
use crate::util::gate::IntervalGate; use crate::util::gate::IntervalGate;
use crate::vl1::Address; use crate::vl1::Address;
use crate::vl1::fragmentedpacket::FragmentedPacket; use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::node::{Node, VL1SystemInterface}; use crate::vl1::node::{Node, SystemInterface};
use crate::vl1::protocol::{WHOIS_RETRY_INTERVAL, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_MAX}; use crate::vl1::protocol::{WHOIS_RETRY_INTERVAL, WHOIS_MAX_WAITING_PACKETS, WHOIS_RETRY_MAX};
use crate::PacketBuffer; use crate::PacketBuffer;
@ -36,7 +36,7 @@ impl WhoisQueue {
pub fn new() -> Self { Self(Mutex::new(HashMap::new())) } pub fn new() -> Self { Self(Mutex::new(HashMap::new())) }
/// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received. /// Launch or renew a WHOIS query and enqueue a packet to be processed when (if) it is received.
pub fn query<CI: VL1SystemInterface>(&self, node: &Node, ci: &CI, target: Address, packet: Option<QueuedPacket>) { pub fn query<CI: SystemInterface>(&self, node: &Node, ci: &CI, target: Address, packet: Option<QueuedPacket>) {
let mut q = self.0.lock(); let mut q = self.0.lock();
let qi = q.entry(target).or_insert_with(|| WhoisQueueItem { let qi = q.entry(target).or_insert_with(|| WhoisQueueItem {
@ -64,7 +64,7 @@ impl WhoisQueue {
} }
/// Called every INTERVAL during background tasks. /// Called every INTERVAL during background tasks.
pub fn call_every_interval<CI: VL1SystemInterface>(&self, node: &Node, ci: &CI, time_ticks: i64) { pub fn call_every_interval<CI: SystemInterface>(&self, node: &Node, ci: &CI, time_ticks: i64) {
let mut targets: Vec<Address> = Vec::new(); let mut targets: Vec<Address> = Vec::new();
self.0.lock().retain(|target, qi| { self.0.lock().retain(|target, qi| {
if qi.retry_count < WHOIS_RETRY_MAX { if qi.retry_count < WHOIS_RETRY_MAX {
@ -82,7 +82,7 @@ impl WhoisQueue {
} }
} }
fn send_whois<CI: VL1SystemInterface>(&self, node: &Node, ci: &CI, targets: &[Address]) { fn send_whois<CI: SystemInterface>(&self, node: &Node, ci: &CI, targets: &[Address]) {
todo!() todo!()
} }
} }

View file

@ -13,7 +13,7 @@ use crate::vl1::node::VL1VirtualInterface;
use crate::vl1::{Peer, Path, Identity}; use crate::vl1::{Peer, Path, Identity};
use crate::vl1::protocol::*; use crate::vl1::protocol::*;
pub trait SwitchInterface { pub trait SwitchInterface: Sync + Send {
} }
pub struct Switch { pub struct Switch {