diff --git a/network-hypervisor/src/crypto/mod.rs b/network-hypervisor/src/crypto/mod.rs index 2504ecc73..98aa7cfc1 100644 --- a/network-hypervisor/src/crypto/mod.rs +++ b/network-hypervisor/src/crypto/mod.rs @@ -9,21 +9,8 @@ pub mod random; pub mod secret; pub use aes_gmac_siv; -use std::convert::Infallible; - -static mut SALT64: u64 = 0; pub fn init() { - unsafe { - // We always run gcrypt in "FIPS mode," but it doesn't count as fully compliant unless it's a FIPS-certified library. - let _ = gcrypt::init_fips_mode(|_| -> Result<(), Infallible> { Ok(()) }); - - while SALT64 == 0 { - let mut tmp = 0_u64; - gcrypt::rand::randomize(gcrypt::rand::Level::Strong, &mut *((&mut tmp as *mut u64).cast::<[u8; 8]>())); - SALT64 = tmp; - } - } + // We always run gcrypt in "FIPS mode," but it doesn't count as fully compliant unless it's a FIPS-certified library. + let _ = gcrypt::init_fips_mode(|_| -> Result<(), std::convert::Infallible> { Ok(()) }); } - -pub fn salt64() -> u64 { unsafe { SALT64 } } diff --git a/network-hypervisor/src/util/gate.rs b/network-hypervisor/src/util/gate.rs new file mode 100644 index 000000000..eca35262f --- /dev/null +++ b/network-hypervisor/src/util/gate.rs @@ -0,0 +1,67 @@ +use std::sync::atomic::{AtomicI64, Ordering}; + +/// Boolean rate limiter with normal (non-atomic) semantics. +pub struct IntervalGate(i64); + +impl Default for IntervalGate { + fn default() -> Self { + Self(0) + } +} + +impl IntervalGate { + #[inline(always)] + pub fn new(initial_ts: i64) -> Self { + Self(initial_ts) + } + + #[inline(always)] + pub fn reset(&mut self) { + self.0 = 0; + } + + #[inline(always)] + pub fn gate(&mut self, time: i64) -> bool { + if (time - self.0) >= FREQ { + self.0 = time; + true + } else { + false + } + } +} + +/// Boolean rate limiter with atomic semantics. +pub struct AtomicIntervalGate(AtomicI64); + +impl Default for AtomicIntervalGate { + fn default() -> Self { + Self(AtomicI64::new(0)) + } +} + +impl AtomicIntervalGate { + #[inline(always)] + pub fn new(initial_ts: i64) -> Self { + Self(AtomicI64::new(initial_ts)) + } + + #[inline(always)] + pub fn reset(&self) { + self.0.store(0, Ordering::Relaxed); + } + + #[inline(always)] + pub fn gate(&self, time: i64) -> bool { + // Note that if two or more threads are using this at once, any thread's time might + // end up being the one stored. This is okay since these times should either be the + // same or very close, and slight differences won't cause issues with the use cases + // for this. This is primarily used to rate gate operations to prevent DOS attacks. + if (time - self.0.load(Ordering::Relaxed)) >= FREQ { + self.0.store(time, Ordering::Relaxed); + true + } else { + false + } + } +} diff --git a/network-hypervisor/src/util/hex.rs b/network-hypervisor/src/util/hex.rs index b61c2dcf3..8ea7526ca 100644 --- a/network-hypervisor/src/util/hex.rs +++ b/network-hypervisor/src/util/hex.rs @@ -12,7 +12,7 @@ pub fn to_string(b: &[u8]) -> String { s } -/// Encode an unsigned 64-bit value as a string. +/// Encode an unsigned 64-bit value as a hexadecimal string. pub fn to_string_u64(mut i: u64, skip_leading_zeroes: bool) -> String { let mut s = String::new(); s.reserve(16); @@ -26,7 +26,7 @@ pub fn to_string_u64(mut i: u64, skip_leading_zeroes: bool) -> String { s } -/// Encode an unsigned 64-bit value as a string. +/// Encode an unsigned 64-bit value as a hexadecimal ASCII string. pub fn to_vec_u64(mut i: u64, skip_leading_zeroes: bool) -> Vec { let mut s = Vec::new(); s.reserve(16); @@ -40,7 +40,7 @@ pub fn to_vec_u64(mut i: u64, skip_leading_zeroes: bool) -> Vec { s } -/// Decode a hex string, ignoring non-hexadecimal characters. +/// Decode a hex string, ignoring all non-hexadecimal characters. pub fn from_string(s: &str) -> Vec { let mut b: Vec = Vec::new(); b.reserve((s.len() / 2) + 1); diff --git a/network-hypervisor/src/util/mod.rs b/network-hypervisor/src/util/mod.rs index 61620111e..4e1f51f5b 100644 --- a/network-hypervisor/src/util/mod.rs +++ b/network-hypervisor/src/util/mod.rs @@ -1,5 +1,6 @@ pub mod hex; pub mod pool; +pub mod gate; pub(crate) const ZEROES: [u8; 64] = [0_u8; 64]; diff --git a/network-hypervisor/src/vl1/address.rs b/network-hypervisor/src/vl1/address.rs index 250ae3b2e..f13716689 100644 --- a/network-hypervisor/src/vl1/address.rs +++ b/network-hypervisor/src/vl1/address.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use std::hash::{Hash, Hasher}; -use crate::vl1::constants::ADDRESS_RESERVED_PREFIX; +use crate::vl1::constants::{ADDRESS_RESERVED_PREFIX, ADDRESS_SIZE}; use crate::error::InvalidFormatError; use crate::util::hex::HEX_CHARS; @@ -11,7 +11,7 @@ pub struct Address(u64); impl Address { #[inline(always)] pub fn from_bytes(b: &[u8]) -> Result { - if b.len() >= 5 { + if b.len() >= ADDRESS_SIZE { Ok(Address((b[0] as u64) << 32 | (b[1] as u64) << 24 | (b[2] as u64) << 16 | (b[3] as u64) << 8 | b[4] as u64)) } else { Err(InvalidFormatError) @@ -34,7 +34,7 @@ impl Address { } #[inline(always)] - pub fn to_bytes(&self) -> [u8; 5] { + pub fn to_bytes(&self) -> [u8; ADDRESS_SIZE] { [(self.0 >> 32) as u8, (self.0 >> 24) as u8, (self.0 >> 16) as u8, (self.0 >> 8) as u8, self.0 as u8] } @@ -48,8 +48,8 @@ impl ToString for Address { fn to_string(&self) -> String { let mut v = self.0 << 24; let mut s = String::new(); - s.reserve(10); - for _ in 0..10 { + s.reserve(ADDRESS_SIZE * 2); + for _ in 0..(ADDRESS_SIZE * 2) { s.push(HEX_CHARS[(v >> 60) as usize] as char); v <<= 4; } @@ -79,14 +79,14 @@ impl Hash for Address { } } -impl From<&[u8; 5]> for Address { +impl From<&[u8; ADDRESS_SIZE]> for Address { #[inline(always)] fn from(b: &[u8; 5]) -> Address { Address((b[0] as u64) << 32 | (b[1] as u64) << 24 | (b[2] as u64) << 16 | (b[3] as u64) << 8 | b[4] as u64) } } -impl From<[u8; 5]> for Address { +impl From<[u8; ADDRESS_SIZE]> for Address { #[inline(always)] fn from(b: [u8; 5]) -> Address { Self::from(&b) diff --git a/network-hypervisor/src/vl1/buffer.rs b/network-hypervisor/src/vl1/buffer.rs index e847bc797..2f85e2294 100644 --- a/network-hypervisor/src/vl1/buffer.rs +++ b/network-hypervisor/src/vl1/buffer.rs @@ -207,20 +207,28 @@ impl Buffer { } } - /// Get a structure at position 0. + /// Get a structure at a given position in the buffer. #[inline(always)] - pub fn header(&self) -> &H { - debug_assert!(size_of::() <= L); - debug_assert!(size_of::() <= self.0); - unsafe { &*self.1.as_ptr().cast::() } + pub fn struct_at(&self, ptr: usize) -> std::io::Result<&T> { + if (i + size_of::()) <= self.0 { + unsafe { + Ok(&*self.1.as_ptr().cast::().offset(ptr as isize).cast::()) + } + } else { + std::io::Result::Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, OVERFLOW_ERR_MSG)) + } } - /// Get a structure at position 0 (mutable). + /// Get a structure at a given position in the buffer. #[inline(always)] - pub fn header_mut(&mut self) -> &mut H { - debug_assert!(size_of::() <= L); - debug_assert!(size_of::() <= self.0); - unsafe { &mut *self.1.as_mut_ptr().cast::() } + pub fn struct_mut_at(&mut self, ptr: usize) -> std::io::Result<&mut T> { + if (i + size_of::()) <= self.0 { + unsafe { + Ok(&mut *self.1.as_mut_ptr().cast::().offset(ptr as isize).cast::()) + } + } else { + std::io::Result::Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, OVERFLOW_ERR_MSG)) + } } /// Get a structure at a given position in the buffer and advance the cursor. diff --git a/network-hypervisor/src/vl1/constants.rs b/network-hypervisor/src/vl1/constants.rs index 2aecccd5e..b532d96a9 100644 --- a/network-hypervisor/src/vl1/constants.rs +++ b/network-hypervisor/src/vl1/constants.rs @@ -1,6 +1,9 @@ /// Length of an address in bytes. pub const ADDRESS_SIZE: usize = 5; +/// Prefix indicating reserved addresses (that can't actually be addresses). +pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff; + /// Size of packet header that lies outside the encryption envelope. pub const PACKET_HEADER_SIZE: usize = 27; @@ -19,6 +22,9 @@ pub const PACKET_SIZE_MIN: usize = PACKET_HEADER_SIZE + 1; /// Maximum size of an entire packet. pub const PACKET_SIZE_MAX: usize = PACKET_HEADER_SIZE + PACKET_PAYLOAD_SIZE_MAX; +/// Index of destination in both fragment and full packet headers. +pub const PACKET_DESTINATION_INDEX: usize = 8; + /// Mask to select cipher from header flags field. pub const HEADER_FLAGS_FIELD_MASK_CIPHER: u8 = 0x30; @@ -48,9 +54,14 @@ pub const FRAGMENT_SIZE_MIN: usize = 16; /// Maximum allowed number of fragments. pub const FRAGMENT_COUNT_MAX: usize = 16; -/// Maximum number of fragmented packets in flight from a peer. -/// Usually there should only be one at a time, so this is overkill. -pub const PEER_DEFRAGMENT_MAX_PACKETS_IN_FLIGHT: usize = 4; +/// Index of packet fragment indicator byte to detect fragments. +pub const FRAGMENT_INDICATOR_INDEX: usize = 13; + +/// Byte found at FRAGMENT_INDICATOR_INDEX to indicate a fragment. +pub const FRAGMENT_INDICATOR: u8 = 0xff; + +/// Maximum number of inbound fragments to handle at once per path. +pub const FRAGMENT_MAX_PER_PATH: usize = 64; /// Verb (inner) flag indicating that the packet's payload (after the verb) is LZ4 compressed. pub const VERB_FLAG_COMPRESSED: u8 = 0x80; @@ -58,11 +69,8 @@ pub const VERB_FLAG_COMPRESSED: u8 = 0x80; /// Maximum number of packet hops allowed by the protocol. pub const PROTOCOL_MAX_HOPS: usize = 7; -/// Index of packet fragment indicator byte to detect fragments. -pub const FRAGMENT_INDICATOR_INDEX: usize = 13; +/// Frequency for WHOIS retries +pub const WHOIS_RETRY_INTERVAL: i64 = 1000; -/// Byte found at FRAGMENT_INDICATOR_INDEX to indicate a fragment. -pub const FRAGMENT_INDICATOR: u8 = 0xff; - -/// Prefix indicating reserved addresses (that can't actually be addresses). -pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff; +/// Maximum number of WHOIS retries +pub const WHOIS_RETRY_MAX: u16 = 3; diff --git a/network-hypervisor/src/vl1/fragmentedpacket.rs b/network-hypervisor/src/vl1/fragmentedpacket.rs new file mode 100644 index 000000000..3a9668397 --- /dev/null +++ b/network-hypervisor/src/vl1/fragmentedpacket.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use crate::vl1::node::PacketBuffer; +use crate::vl1::constants::FRAGMENT_COUNT_MAX; +use crate::vl1::Path; +use crate::vl1::protocol::PacketID; + +pub struct FragmentedPacket { + pub id: PacketID, + pub ts_ticks: i64, + frags: [Option; FRAGMENT_COUNT_MAX], + have: u8, + expecting: u8, +} + +impl Default for FragmentedPacket { + fn default() -> Self { + Self { + id: 0, + ts_ticks: -1, + frags: [None; FRAGMENT_COUNT_MAX], + have: 0, + expecting: 0, + } + } +} + +impl FragmentedPacket { + /// Reset this fragmented packet for re-use. + #[inline(always)] + pub fn reset(&mut self) { + self.id = 0; + self.ts_ticks = -1; + self.frags.fill(None); + self.have = 0; + self.expecting = 0; + } + + /// Initialize for a new packet. + #[inline(always)] + pub fn init(&mut self, id: PacketID, ts_ticks: i64) { + self.id = id; + self.ts_ticks = ts_ticks; + } + + /// Add a fragment to this fragment set and return true if the packet appears complete. + #[inline(always)] + pub fn add(&mut self, frag: PacketBuffer, no: u8, expecting: u8) -> bool { + if self.frags[no].replace(frag).is_none() { + self.have = self.have.wrapping_add(1); + self.expecting |= expecting; + self.have == self.expecting + } else { + false + } + } +} diff --git a/network-hypervisor/src/vl1/identity.rs b/network-hypervisor/src/vl1/identity.rs index c6b7bcade..7d9312f60 100644 --- a/network-hypervisor/src/vl1/identity.rs +++ b/network-hypervisor/src/vl1/identity.rs @@ -23,6 +23,8 @@ const V1_BALLOON_SPACE_COST: usize = 16384; const V1_BALLOON_TIME_COST: usize = 3; const V1_BALLOON_DELTA: usize = 3; +const V1_BALLOON_SALT: &'static [u8] = b"zt_id_v1"; + pub const IDENTITY_TYPE_0_SIGNATURE_SIZE: usize = ED25519_SIGNATURE_SIZE + 32; pub const IDENTITY_TYPE_1_SIGNATURE_SIZE: usize = P521_ECDSA_SIGNATURE_SIZE + ED25519_SIGNATURE_SIZE; @@ -141,7 +143,7 @@ impl Identity { loop { // ECDSA is a randomized signature algorithm, so each signature will be different. let sig = p521_ecdsa.sign(&signing_buf).unwrap(); - let bh = balloon::hash::<{ V1_BALLOON_SPACE_COST }, { V1_BALLOON_TIME_COST }, { V1_BALLOON_DELTA }>(&sig, b"zt_id_v1"); + let bh = balloon::hash::<{ V1_BALLOON_SPACE_COST }, { V1_BALLOON_TIME_COST }, { V1_BALLOON_DELTA }>(&sig, V1_BALLOON_SALT); if bh[0] < 7 { let addr = Address::from_bytes(&bh[59..64]).unwrap(); if addr.is_valid() { @@ -229,7 +231,7 @@ impl Identity { signing_buf[(C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE)..(C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE)].copy_from_slice((*p521).0.public_key_bytes()); signing_buf[(C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE)..(C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE)].copy_from_slice((*p521).1.public_key_bytes()); if (*p521).1.verify(&signing_buf, &(*p521).2) { - let bh = balloon::hash::<{ V1_BALLOON_SPACE_COST }, { V1_BALLOON_TIME_COST }, { V1_BALLOON_DELTA }>(&(*p521).2, b"zt_id_v1"); + let bh = balloon::hash::<{ V1_BALLOON_SPACE_COST }, { V1_BALLOON_TIME_COST }, { V1_BALLOON_DELTA }>(&(*p521).2, V1_BALLOON_SALT); (bh[0] < 7) && bh.eq(&(*p521).3) && Address::from_bytes(&bh[59..64]).unwrap().eq(&self.address) } else { false diff --git a/network-hypervisor/src/vl1/inetaddress.rs b/network-hypervisor/src/vl1/inetaddress.rs index 5dc302057..2137e678f 100644 --- a/network-hypervisor/src/vl1/inetaddress.rs +++ b/network-hypervisor/src/vl1/inetaddress.rs @@ -238,24 +238,6 @@ impl InetAddress { } } - #[inline(always)] - pub(crate) fn local_lookup_key(&self) -> u128 { - unsafe { - match self.sa.sa_family as u8 { - AF_INET => { - ((self.sin.sin_addr.s_addr as u64).wrapping_shl(16) | self.sin.sin_port as u64) as u128 - } - AF_INET6 => { - let mut tmp: [u64; 2] = MaybeUninit::uninit().assume_init(); - copy_nonoverlapping((&self.sin6.sin6_addr as *const in6_addr).cast::(), tmp.as_mut_ptr().cast::(), 16); - tmp[1] = tmp[1].wrapping_add((self.sin6.sin6_port as u64) ^ crate::crypto::salt64()); - (*tmp.as_ptr().cast::()).wrapping_mul(0x0fc94e3bf4e9ab32866458cd56f5e605) - } - _ => 0 - } - } - } - /// Get this IP address's scope as per RFC documents and what is advertised via BGP. pub fn scope(&self) -> IpScope { unsafe { @@ -488,6 +470,7 @@ impl FromStr for InetAddress { } impl PartialEq for InetAddress { + #[inline(always)] fn eq(&self, other: &Self) -> bool { unsafe { if self.sa.sa_family == other.sa.sa_family { diff --git a/network-hypervisor/src/vl1/mod.rs b/network-hypervisor/src/vl1/mod.rs index d8f9d1b41..9afc095c6 100644 --- a/network-hypervisor/src/vl1/mod.rs +++ b/network-hypervisor/src/vl1/mod.rs @@ -3,6 +3,11 @@ pub(crate) mod buffer; pub(crate) mod node; pub(crate) mod path; pub(crate) mod peer; +pub(crate) mod dictionary; +pub(crate) mod address; +pub(crate) mod mac; +pub(crate) mod fragmentedpacket; +mod(crate) mod whois; pub mod constants; pub mod identity; @@ -10,10 +15,6 @@ pub mod inetaddress; pub mod endpoint; pub mod locator; -mod dictionary; -mod address; -mod mac; - pub use address::Address; pub use mac::MAC; pub use identity::Identity; diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index e4e59413c..1a0b0c7f6 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -1,20 +1,23 @@ -use std::sync::Arc; -use std::str::FromStr; -use std::time::Duration; -use std::marker::PhantomData; use std::hash::Hash; +use std::marker::PhantomData; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use dashmap::DashMap; +use parking_lot::Mutex; use crate::crypto::random::SecureRandom; use crate::error::InvalidParameterError; +use crate::util::gate::IntervalGate; use crate::util::pool::{Pool, Pooled}; -use crate::vl1::{Address, Identity, Endpoint, Locator}; +use crate::vl1::{Address, Endpoint, Identity, Locator}; use crate::vl1::buffer::Buffer; -use crate::vl1::constants::{PACKET_SIZE_MAX, FRAGMENT_COUNT_MAX}; +use crate::vl1::constants::PACKET_SIZE_MAX; use crate::vl1::path::Path; use crate::vl1::peer::Peer; - -use parking_lot::Mutex; -use dashmap::DashMap; +use crate::vl1::protocol::{FragmentHeader, is_fragment, PacketHeader, PacketID}; +use crate::vl1::whois::Whois; /// Standard packet buffer type including pool container. pub type PacketBuffer = Pooled>; @@ -96,12 +99,18 @@ pub trait VL1CallerInterface { fn time_clock(&self) -> i64; } +#[derive(Default)] +struct BackgroundTaskIntervals { + whois: IntervalGate<{ Whois::INTERVAL }>, +} + pub struct Node { identity: Identity, + intervals: Mutex, locator: Mutex>, - paths_by_inaddr: DashMap>, + paths: DashMap>, peers: DashMap>, - peer_vec: Mutex>>, // for rapid iteration through all peers + whois: Whois, buffer_pool: Pool>, secure_prng: SecureRandom, } @@ -136,10 +145,11 @@ impl Node { Ok(Self { identity: id, + intervals: Mutex::new(BackgroundTaskIntervals::default()), locator: Mutex::new(None), - paths_by_inaddr: DashMap::new(), + paths: DashMap::new(), peers: DashMap::new(), - peer_vec: Mutex::new(Vec::new()), + whois: Whois::new(), buffer_pool: Pool::new(64), secure_prng: SecureRandom::get(), }) @@ -164,15 +174,73 @@ impl Node { self.buffer_pool.get() } + /// Get a peer by address. + #[inline(always)] + pub fn peer(&self, a: Address) -> Option> { + self.peers.get(&a).map(|peer| peer.clone() ) + } + + /// Get all peers currently in the peer cache. + pub fn peers(&self) -> Vec> { + let mut v: Vec> = Vec::new(); + v.reserve(self.peers.len()); + for p in self.peers.iter() { + v.push(p.value().clone()); + } + v + } + /// Run background tasks and return desired delay until next call in milliseconds. /// This should only be called once at a time. It technically won't hurt anything to /// call concurrently but it will waste CPU cycles. pub fn do_background_tasks(&self, ci: &CI) -> Duration { + let intervals = self.intervals.lock(); + let tt = ci.time_ticks(); + + if intervals.whois.gate(tt) { + self.whois.on_interval(self, ci, tt); + } + Duration::from_millis(1000) } /// Called when a packet is received on the physical wire. - pub fn wire_receive(&self, ci: &CI, endpoint: &Endpoint, local_socket: i64, local_interface: i64, data: PacketBuffer) { + pub fn wire_receive(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: i64, source_local_interface: i64, mut data: PacketBuffer) { + let _ = data.struct_mut_at::(0).map(|fragment_header| { + // NOTE: destination address is located at the same index in both the fragment + // header and the full packet header, allowing us to make this decision once. + let dest = Address::from(&fragment_header.dest); + if dest == self.identity.address() { + // Packet or fragment is addressed to this node. + + let path = self.path(source_endpoint, source_local_socket, source_local_interface); + if fragment_header.is_fragment() { + } else { + data.struct_mut_at::(0).map(|header| { + let source = Address::from(&header.src); + + if header.is_fragmented() { + } else { + } + }); + } + + } else { + // Packet or fragment is addressed to another node. + } + }); + } + + /// Get the canonical Path object for a given endpoint and local socket information. + /// This is a canonicalizing function that returns a unique path object for every tuple + /// of endpoint, local socket, and local interface. + pub(crate) fn path(&self, ep: &Endpoint, local_socket: i64, local_interface: i64) -> Arc { + self.paths.get(ep).map_or_else(|| { + let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface)); + self.paths.insert(ep.clone(), p.clone()).unwrap_or(p) // if another thread added one, return that instead + }, |path| { + path.clone() + }) } } diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index a972c4bac..14f230a40 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -1,13 +1,25 @@ use std::sync::atomic::{AtomicI64, Ordering}; use crate::vl1::Endpoint; +use crate::vl1::constants::FRAGMENT_COUNT_MAX; +use crate::vl1::fragmentedpacket::FragmentedPacket; +use crate::vl1::protocol::{FragmentHeader, PacketID}; +use crate::vl1::node::PacketBuffer; + +use parking_lot::Mutex; + +struct RxState { + last_receive_time_ticks: i64, + fragmented_packet_count: usize, + fragmented_packets: [FragmentedPacket; FRAGMENT_COUNT_MAX], +} pub struct Path { pub(crate) endpoint: Endpoint, pub(crate) local_socket: i64, pub(crate) local_interface: i64, last_send_time_ticks: AtomicI64, - last_receive_time_ticks: AtomicI64, + rxs: Mutex, } impl Path { @@ -18,7 +30,11 @@ impl Path { local_socket, local_interface, last_send_time_ticks: AtomicI64::new(0), - last_receive_time_ticks: AtomicI64::new(0), + rxs: Mutex::new(RxState { + last_receive_time_ticks: 0, + fragmented_packet_count: 0, + fragmented_packets: [FragmentedPacket::default(); FRAGMENT_COUNT_MAX] + }) } } @@ -29,7 +45,48 @@ impl Path { #[inline(always)] pub fn send_receive_time_ticks(&self) -> i64 { - self.last_receive_time_ticks.load(Ordering::Relaxed) + self.rxs.lock().last_receive_time_ticks + } + + #[inline(always)] + pub(crate) fn receive_fragment(&self, packet_id: PacketID, fragment_no: u8, fragment_expecting_count: u8, packet: PacketBuffer, time_ticks: i64, assembled_packet_handler: F) { + if fragment_no < FRAGMENT_COUNT_MAX as u8 { + let mut rxs = self.rxs.lock(); + rxs.last_receive_time_ticks = time_ticks; + + let mut fpcnt = rxs.fragmented_packet_count; + let mut fidx = 0; + while fpcnt > 0 { + let mut f = &mut rxs.fragmented_packets[fidx]; + if f.id == packet_id { + if f.add(packet, fragment_no, fragment_expecting_count) { + assembled_packet_handler(f); + f.reset(); + rxs.fragmented_packet_count = rxs.fragmented_packet_count.wrapping_sub(1); + } + return; + } else if f.ts_ticks >= 0 { + fpcnt = fpcnt.wrapping_sub(1); + } + fidx = fidx.wrapping_add(1); + } + + let mut oldest_ts = rxs.fragmented_packets[0].ts_ticks; + let mut oldest_idx = 0; + if oldest_ts >= 0 { + for fidx in 1..FRAGMENT_COUNT_MAX { + let ts = rxs.fragmented_packets[fidx].ts_ticks; + if ts < oldest_ts { + oldest_ts = ts; + oldest_idx = fidx; + } + } + } + + let mut f = &mut rxs.fragmented_packets[oldest_idx]; + f.init(packet_id, time_ticks); + let _ = f.add(packet, fragment_no, fragment_expecting_count); + } } } diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index ed0149515..454f6cc0c 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -1,41 +1,47 @@ use std::sync::Arc; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicU8}; -use crate::vl1::protocol::PacketID; -use crate::vl1::node::PacketBuffer; -use crate::vl1::constants::{FRAGMENT_COUNT_MAX, PEER_DEFRAGMENT_MAX_PACKETS_IN_FLIGHT}; use crate::vl1::{Identity, Path}; +use crate::vl1::fragmentedpacket::FragmentedPacket; +use crate::vl1::protocol::{PacketID, PacketHeader}; +use crate::vl1::node::{VL1CallerInterface, PacketBuffer, Node}; use parking_lot::Mutex; -struct FragmentedPacket { - pub id: PacketID, - pub frag: [Option; FRAGMENT_COUNT_MAX], +const MAX_PATHS: usize = 16; + +struct TxState { + packet_iv_counter: u64, + last_send_time_ticks: i64, + paths: [Arc; MAX_PATHS], } +struct RxState { + last_receive_time_ticks: i64, + remote_version: [u8; 4], + remote_protocol_version: u8, +} + +/// A remote peer known to this node. +/// Sending-related and receiving-related fields are locked separately since concurrent +/// send/receive is not uncommon. pub struct Peer { - // This peer's identity. identity: Identity, - // Primary static secret resulting from key agreement with identity. + // Static shared secret computed from agreement with identity. identity_static_secret: [u8; 48], - // Outgoing packet IV counter used to generate packet IDs to this peer. - packet_iv_counter: AtomicU64, + // State used primarily when sending to this peer. + txs: Mutex, - // Paths sorted in ascending order of quality / preference. - paths: Mutex>>, - - // Incoming fragmented packet defragment buffer. - fragmented_packets: Mutex<[FragmentedPacket; PEER_DEFRAGMENT_MAX_PACKETS_IN_FLIGHT]>, - - // Last send and receive time in millisecond ticks (not wall clock). - last_send_time_ticks: AtomicI64, - last_receive_time_ticks: AtomicI64, - - // Most recent remote version (most to least significant bytes: major, minor, revision, build) - remote_version: AtomicU64, - - // Most recent remote protocol version - remote_protocol_version: AtomicU8, + // State used primarily when receiving from this peer. + rxs: Mutex, +} + +impl Peer { + pub(crate) fn receive_from_singular(&self, node: &Node, ci: &CI, header: &PacketHeader, packet: &PacketBuffer) { + } + + pub(crate) fn receive_from_fragmented(&self, node: &Node, ci: CI, header: &PacketHeader, packet: &FragmentedPacket) { + } } diff --git a/network-hypervisor/src/vl1/protocol.rs b/network-hypervisor/src/vl1/protocol.rs index 875835488..5bb19cbc5 100644 --- a/network-hypervisor/src/vl1/protocol.rs +++ b/network-hypervisor/src/vl1/protocol.rs @@ -54,12 +54,6 @@ impl PacketHeader { (self.flags_cipher_hops & HEADER_FLAG_FRAGMENTED) != 0 } - /// If true, this packet is actually a fragment and its header should be interpreted as a FragmentHeader instead. - #[inline(always)] - pub fn is_fragment(&self) -> bool { - self.src[0] == FRAGMENT_INDICATOR - } - #[inline(always)] pub fn destination(&self) -> Address { Address::from(&self.dest) @@ -89,6 +83,11 @@ pub struct FragmentHeader { unsafe impl crate::vl1::buffer::RawObject for FragmentHeader {} impl FragmentHeader { + #[inline(always)] + pub fn is_fragment(&self) -> bool { + self.fragment_indicator == FRAGMENT_INDICATOR + } + #[inline(always)] pub fn total_fragments(&self) -> u8 { self.total_and_fragment_no >> 4 @@ -104,6 +103,12 @@ impl FragmentHeader { self.reserved_hops & HEADER_FLAGS_FIELD_MASK_HOPS } + #[inline(always)] + pub fn increment_hops(&mut self) { + let f = self.reserved_hops; + self.reserved_hops = (f & HEADER_FLAGS_FIELD_MASK_HOPS.not()) | ((f + 1) & HEADER_FLAGS_FIELD_MASK_HOPS); + } + #[inline(always)] pub fn destination(&self) -> Address { Address::from(&self.dest) diff --git a/network-hypervisor/src/vl1/whois.rs b/network-hypervisor/src/vl1/whois.rs new file mode 100644 index 000000000..b7da6df98 --- /dev/null +++ b/network-hypervisor/src/vl1/whois.rs @@ -0,0 +1,75 @@ +use std::collections::HashMap; + +use crate::vl1::Address; +use crate::vl1::fragmentedpacket::FragmentedPacket; +use crate::vl1::node::{VL1CallerInterface, Node, PacketBuffer}; +use crate::util::gate::IntervalGate; + +use parking_lot::Mutex; +use crate::vl1::constants::{WHOIS_RETRY_INTERVAL, WHOIS_RETRY_MAX}; + +pub enum QueuedPacket { + Singular(PacketBuffer), + Fragmented(FragmentedPacket) +} + +struct WhoisQueueItem { + retry_gate: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, + retry_count: u16, + packet_queue: Vec +} + +pub struct Whois { + queue: Mutex> +} + +impl Whois { + pub const INTERVAL: i64 = WHOIS_RETRY_INTERVAL; + + pub fn new() -> Self { + Self { + queue: Mutex::new(HashMap::new()) + } + } + + pub fn query(&self, node: &Node, ci: &CI, target: Address, packet: Option) { + let mut q = self.queue.lock(); + if q.get_mut(&target).map_or_else(|| { + q.insert(target, WhoisQueueItem { + retry_gate: IntervalGate::new(ci.time_ticks()), + retry_count: 1, + packet_queue: packet.map_or_else(|| Vec::new(), |p| vec![p]), + }); + true + }, |qi| { + let g = qi.retry_gate(ci.time_ticks()); + qi.retry_count += g as u16; + let _ = packet.map(|p| qi.packet_queue.push(p)); + g + }) { + self.send_whois(node, ci, &[target]); + } + } + + pub fn on_interval(&self, node: &Node, ci: &CI, time_ticks: i64) { + let mut targets: Vec
= Vec::new(); + self.queue.lock().retain(|target, qi| { + if qi.retry_count < WHOIS_RETRY_MAX { + if qi.retry_gate.gate(time_ticks) { + qi.retry_count += 1; + targets.push(*target); + } + true + } else { + false + } + }); + if !targets.is_empty() { + self.send_whois(node, ci, targets.as_slice()); + } + } + + fn send_whois(&self, node: &Node, ci: &CI, targets: &[Address]) { + todo!() + } +}