Lots more stuff, but mostly packet defragmentation.

This commit is contained in:
Adam Ierymenko 2021-08-02 23:41:05 -04:00
parent f989690785
commit 617b7c86b6
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
16 changed files with 442 additions and 117 deletions

View file

@ -9,21 +9,8 @@ pub mod random;
pub mod secret;
pub use aes_gmac_siv;
use std::convert::Infallible;
static mut SALT64: u64 = 0;
pub fn init() {
unsafe {
// We always run gcrypt in "FIPS mode," but it doesn't count as fully compliant unless it's a FIPS-certified library.
let _ = gcrypt::init_fips_mode(|_| -> Result<(), Infallible> { Ok(()) });
while SALT64 == 0 {
let mut tmp = 0_u64;
gcrypt::rand::randomize(gcrypt::rand::Level::Strong, &mut *((&mut tmp as *mut u64).cast::<[u8; 8]>()));
SALT64 = tmp;
}
}
// We always run gcrypt in "FIPS mode," but it doesn't count as fully compliant unless it's a FIPS-certified library.
let _ = gcrypt::init_fips_mode(|_| -> Result<(), std::convert::Infallible> { Ok(()) });
}
pub fn salt64() -> u64 { unsafe { SALT64 } }

View file

@ -0,0 +1,67 @@
use std::sync::atomic::{AtomicI64, Ordering};
/// Boolean rate limiter with normal (non-atomic) semantics.
pub struct IntervalGate<const FREQ: i64>(i64);
impl<const FREQ: i64> Default for IntervalGate<FREQ> {
fn default() -> Self {
Self(0)
}
}
impl<const FREQ: i64> IntervalGate<FREQ> {
#[inline(always)]
pub fn new(initial_ts: i64) -> Self {
Self(initial_ts)
}
#[inline(always)]
pub fn reset(&mut self) {
self.0 = 0;
}
#[inline(always)]
pub fn gate(&mut self, time: i64) -> bool {
if (time - self.0) >= FREQ {
self.0 = time;
true
} else {
false
}
}
}
/// Boolean rate limiter with atomic semantics.
pub struct AtomicIntervalGate<const FREQ: i64>(AtomicI64);
impl<const FREQ: i64> Default for AtomicIntervalGate<FREQ> {
fn default() -> Self {
Self(AtomicI64::new(0))
}
}
impl<const FREQ: i64> AtomicIntervalGate<FREQ> {
#[inline(always)]
pub fn new(initial_ts: i64) -> Self {
Self(AtomicI64::new(initial_ts))
}
#[inline(always)]
pub fn reset(&self) {
self.0.store(0, Ordering::Relaxed);
}
#[inline(always)]
pub fn gate(&self, time: i64) -> bool {
// Note that if two or more threads are using this at once, any thread's time might
// end up being the one stored. This is okay since these times should either be the
// same or very close, and slight differences won't cause issues with the use cases
// for this. This is primarily used to rate gate operations to prevent DOS attacks.
if (time - self.0.load(Ordering::Relaxed)) >= FREQ {
self.0.store(time, Ordering::Relaxed);
true
} else {
false
}
}
}

View file

@ -12,7 +12,7 @@ pub fn to_string(b: &[u8]) -> String {
s
}
/// Encode an unsigned 64-bit value as a string.
/// Encode an unsigned 64-bit value as a hexadecimal string.
pub fn to_string_u64(mut i: u64, skip_leading_zeroes: bool) -> String {
let mut s = String::new();
s.reserve(16);
@ -26,7 +26,7 @@ pub fn to_string_u64(mut i: u64, skip_leading_zeroes: bool) -> String {
s
}
/// Encode an unsigned 64-bit value as a string.
/// Encode an unsigned 64-bit value as a hexadecimal ASCII string.
pub fn to_vec_u64(mut i: u64, skip_leading_zeroes: bool) -> Vec<u8> {
let mut s = Vec::new();
s.reserve(16);
@ -40,7 +40,7 @@ pub fn to_vec_u64(mut i: u64, skip_leading_zeroes: bool) -> Vec<u8> {
s
}
/// Decode a hex string, ignoring non-hexadecimal characters.
/// Decode a hex string, ignoring all non-hexadecimal characters.
pub fn from_string(s: &str) -> Vec<u8> {
let mut b: Vec<u8> = Vec::new();
b.reserve((s.len() / 2) + 1);

View file

@ -1,5 +1,6 @@
pub mod hex;
pub mod pool;
pub mod gate;
pub(crate) const ZEROES: [u8; 64] = [0_u8; 64];

View file

@ -1,7 +1,7 @@
use std::str::FromStr;
use std::hash::{Hash, Hasher};
use crate::vl1::constants::ADDRESS_RESERVED_PREFIX;
use crate::vl1::constants::{ADDRESS_RESERVED_PREFIX, ADDRESS_SIZE};
use crate::error::InvalidFormatError;
use crate::util::hex::HEX_CHARS;
@ -11,7 +11,7 @@ pub struct Address(u64);
impl Address {
#[inline(always)]
pub fn from_bytes(b: &[u8]) -> Result<Address, InvalidFormatError> {
if b.len() >= 5 {
if b.len() >= ADDRESS_SIZE {
Ok(Address((b[0] as u64) << 32 | (b[1] as u64) << 24 | (b[2] as u64) << 16 | (b[3] as u64) << 8 | b[4] as u64))
} else {
Err(InvalidFormatError)
@ -34,7 +34,7 @@ impl Address {
}
#[inline(always)]
pub fn to_bytes(&self) -> [u8; 5] {
pub fn to_bytes(&self) -> [u8; ADDRESS_SIZE] {
[(self.0 >> 32) as u8, (self.0 >> 24) as u8, (self.0 >> 16) as u8, (self.0 >> 8) as u8, self.0 as u8]
}
@ -48,8 +48,8 @@ impl ToString for Address {
fn to_string(&self) -> String {
let mut v = self.0 << 24;
let mut s = String::new();
s.reserve(10);
for _ in 0..10 {
s.reserve(ADDRESS_SIZE * 2);
for _ in 0..(ADDRESS_SIZE * 2) {
s.push(HEX_CHARS[(v >> 60) as usize] as char);
v <<= 4;
}
@ -79,14 +79,14 @@ impl Hash for Address {
}
}
impl From<&[u8; 5]> for Address {
impl From<&[u8; ADDRESS_SIZE]> for Address {
#[inline(always)]
fn from(b: &[u8; 5]) -> Address {
Address((b[0] as u64) << 32 | (b[1] as u64) << 24 | (b[2] as u64) << 16 | (b[3] as u64) << 8 | b[4] as u64)
}
}
impl From<[u8; 5]> for Address {
impl From<[u8; ADDRESS_SIZE]> for Address {
#[inline(always)]
fn from(b: [u8; 5]) -> Address {
Self::from(&b)

View file

@ -207,20 +207,28 @@ impl<const L: usize> Buffer<L> {
}
}
/// Get a structure at position 0.
/// Get a structure at a given position in the buffer.
#[inline(always)]
pub fn header<H: RawObject>(&self) -> &H {
debug_assert!(size_of::<H>() <= L);
debug_assert!(size_of::<H>() <= self.0);
unsafe { &*self.1.as_ptr().cast::<H>() }
pub fn struct_at<T: RawObject>(&self, ptr: usize) -> std::io::Result<&T> {
if (i + size_of::<T>()) <= self.0 {
unsafe {
Ok(&*self.1.as_ptr().cast::<u8>().offset(ptr as isize).cast::<T>())
}
} else {
std::io::Result::Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, OVERFLOW_ERR_MSG))
}
}
/// Get a structure at position 0 (mutable).
/// Get a structure at a given position in the buffer.
#[inline(always)]
pub fn header_mut<H: RawObject>(&mut self) -> &mut H {
debug_assert!(size_of::<H>() <= L);
debug_assert!(size_of::<H>() <= self.0);
unsafe { &mut *self.1.as_mut_ptr().cast::<H>() }
pub fn struct_mut_at<T: RawObject>(&mut self, ptr: usize) -> std::io::Result<&mut T> {
if (i + size_of::<T>()) <= self.0 {
unsafe {
Ok(&mut *self.1.as_mut_ptr().cast::<u8>().offset(ptr as isize).cast::<T>())
}
} else {
std::io::Result::Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, OVERFLOW_ERR_MSG))
}
}
/// Get a structure at a given position in the buffer and advance the cursor.

View file

@ -1,6 +1,9 @@
/// Length of an address in bytes.
pub const ADDRESS_SIZE: usize = 5;
/// Prefix indicating reserved addresses (that can't actually be addresses).
pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff;
/// Size of packet header that lies outside the encryption envelope.
pub const PACKET_HEADER_SIZE: usize = 27;
@ -19,6 +22,9 @@ pub const PACKET_SIZE_MIN: usize = PACKET_HEADER_SIZE + 1;
/// Maximum size of an entire packet.
pub const PACKET_SIZE_MAX: usize = PACKET_HEADER_SIZE + PACKET_PAYLOAD_SIZE_MAX;
/// Index of destination in both fragment and full packet headers.
pub const PACKET_DESTINATION_INDEX: usize = 8;
/// Mask to select cipher from header flags field.
pub const HEADER_FLAGS_FIELD_MASK_CIPHER: u8 = 0x30;
@ -48,9 +54,14 @@ pub const FRAGMENT_SIZE_MIN: usize = 16;
/// Maximum allowed number of fragments.
pub const FRAGMENT_COUNT_MAX: usize = 16;
/// Maximum number of fragmented packets in flight from a peer.
/// Usually there should only be one at a time, so this is overkill.
pub const PEER_DEFRAGMENT_MAX_PACKETS_IN_FLIGHT: usize = 4;
/// Index of packet fragment indicator byte to detect fragments.
pub const FRAGMENT_INDICATOR_INDEX: usize = 13;
/// Byte found at FRAGMENT_INDICATOR_INDEX to indicate a fragment.
pub const FRAGMENT_INDICATOR: u8 = 0xff;
/// Maximum number of inbound fragments to handle at once per path.
pub const FRAGMENT_MAX_PER_PATH: usize = 64;
/// Verb (inner) flag indicating that the packet's payload (after the verb) is LZ4 compressed.
pub const VERB_FLAG_COMPRESSED: u8 = 0x80;
@ -58,11 +69,8 @@ pub const VERB_FLAG_COMPRESSED: u8 = 0x80;
/// Maximum number of packet hops allowed by the protocol.
pub const PROTOCOL_MAX_HOPS: usize = 7;
/// Index of packet fragment indicator byte to detect fragments.
pub const FRAGMENT_INDICATOR_INDEX: usize = 13;
/// Frequency for WHOIS retries
pub const WHOIS_RETRY_INTERVAL: i64 = 1000;
/// Byte found at FRAGMENT_INDICATOR_INDEX to indicate a fragment.
pub const FRAGMENT_INDICATOR: u8 = 0xff;
/// Prefix indicating reserved addresses (that can't actually be addresses).
pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff;
/// Maximum number of WHOIS retries
pub const WHOIS_RETRY_MAX: u16 = 3;

View file

@ -0,0 +1,57 @@
use std::sync::Arc;
use crate::vl1::node::PacketBuffer;
use crate::vl1::constants::FRAGMENT_COUNT_MAX;
use crate::vl1::Path;
use crate::vl1::protocol::PacketID;
pub struct FragmentedPacket {
pub id: PacketID,
pub ts_ticks: i64,
frags: [Option<PacketBuffer>; FRAGMENT_COUNT_MAX],
have: u8,
expecting: u8,
}
impl Default for FragmentedPacket {
fn default() -> Self {
Self {
id: 0,
ts_ticks: -1,
frags: [None; FRAGMENT_COUNT_MAX],
have: 0,
expecting: 0,
}
}
}
impl FragmentedPacket {
/// Reset this fragmented packet for re-use.
#[inline(always)]
pub fn reset(&mut self) {
self.id = 0;
self.ts_ticks = -1;
self.frags.fill(None);
self.have = 0;
self.expecting = 0;
}
/// Initialize for a new packet.
#[inline(always)]
pub fn init(&mut self, id: PacketID, ts_ticks: i64) {
self.id = id;
self.ts_ticks = ts_ticks;
}
/// Add a fragment to this fragment set and return true if the packet appears complete.
#[inline(always)]
pub fn add(&mut self, frag: PacketBuffer, no: u8, expecting: u8) -> bool {
if self.frags[no].replace(frag).is_none() {
self.have = self.have.wrapping_add(1);
self.expecting |= expecting;
self.have == self.expecting
} else {
false
}
}
}

View file

@ -23,6 +23,8 @@ const V1_BALLOON_SPACE_COST: usize = 16384;
const V1_BALLOON_TIME_COST: usize = 3;
const V1_BALLOON_DELTA: usize = 3;
const V1_BALLOON_SALT: &'static [u8] = b"zt_id_v1";
pub const IDENTITY_TYPE_0_SIGNATURE_SIZE: usize = ED25519_SIGNATURE_SIZE + 32;
pub const IDENTITY_TYPE_1_SIGNATURE_SIZE: usize = P521_ECDSA_SIGNATURE_SIZE + ED25519_SIGNATURE_SIZE;
@ -141,7 +143,7 @@ impl Identity {
loop {
// ECDSA is a randomized signature algorithm, so each signature will be different.
let sig = p521_ecdsa.sign(&signing_buf).unwrap();
let bh = balloon::hash::<{ V1_BALLOON_SPACE_COST }, { V1_BALLOON_TIME_COST }, { V1_BALLOON_DELTA }>(&sig, b"zt_id_v1");
let bh = balloon::hash::<{ V1_BALLOON_SPACE_COST }, { V1_BALLOON_TIME_COST }, { V1_BALLOON_DELTA }>(&sig, V1_BALLOON_SALT);
if bh[0] < 7 {
let addr = Address::from_bytes(&bh[59..64]).unwrap();
if addr.is_valid() {
@ -229,7 +231,7 @@ impl Identity {
signing_buf[(C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE)..(C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE)].copy_from_slice((*p521).0.public_key_bytes());
signing_buf[(C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE)..(C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE + P521_PUBLIC_KEY_SIZE)].copy_from_slice((*p521).1.public_key_bytes());
if (*p521).1.verify(&signing_buf, &(*p521).2) {
let bh = balloon::hash::<{ V1_BALLOON_SPACE_COST }, { V1_BALLOON_TIME_COST }, { V1_BALLOON_DELTA }>(&(*p521).2, b"zt_id_v1");
let bh = balloon::hash::<{ V1_BALLOON_SPACE_COST }, { V1_BALLOON_TIME_COST }, { V1_BALLOON_DELTA }>(&(*p521).2, V1_BALLOON_SALT);
(bh[0] < 7) && bh.eq(&(*p521).3) && Address::from_bytes(&bh[59..64]).unwrap().eq(&self.address)
} else {
false

View file

@ -238,24 +238,6 @@ impl InetAddress {
}
}
#[inline(always)]
pub(crate) fn local_lookup_key(&self) -> u128 {
unsafe {
match self.sa.sa_family as u8 {
AF_INET => {
((self.sin.sin_addr.s_addr as u64).wrapping_shl(16) | self.sin.sin_port as u64) as u128
}
AF_INET6 => {
let mut tmp: [u64; 2] = MaybeUninit::uninit().assume_init();
copy_nonoverlapping((&self.sin6.sin6_addr as *const in6_addr).cast::<u8>(), tmp.as_mut_ptr().cast::<u8>(), 16);
tmp[1] = tmp[1].wrapping_add((self.sin6.sin6_port as u64) ^ crate::crypto::salt64());
(*tmp.as_ptr().cast::<u128>()).wrapping_mul(0x0fc94e3bf4e9ab32866458cd56f5e605)
}
_ => 0
}
}
}
/// Get this IP address's scope as per RFC documents and what is advertised via BGP.
pub fn scope(&self) -> IpScope {
unsafe {
@ -488,6 +470,7 @@ impl FromStr for InetAddress {
}
impl PartialEq for InetAddress {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
unsafe {
if self.sa.sa_family == other.sa.sa_family {

View file

@ -3,6 +3,11 @@ pub(crate) mod buffer;
pub(crate) mod node;
pub(crate) mod path;
pub(crate) mod peer;
pub(crate) mod dictionary;
pub(crate) mod address;
pub(crate) mod mac;
pub(crate) mod fragmentedpacket;
mod(crate) mod whois;
pub mod constants;
pub mod identity;
@ -10,10 +15,6 @@ pub mod inetaddress;
pub mod endpoint;
pub mod locator;
mod dictionary;
mod address;
mod mac;
pub use address::Address;
pub use mac::MAC;
pub use identity::Identity;

View file

@ -1,20 +1,23 @@
use std::sync::Arc;
use std::str::FromStr;
use std::time::Duration;
use std::marker::PhantomData;
use std::hash::Hash;
use std::marker::PhantomData;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use parking_lot::Mutex;
use crate::crypto::random::SecureRandom;
use crate::error::InvalidParameterError;
use crate::util::gate::IntervalGate;
use crate::util::pool::{Pool, Pooled};
use crate::vl1::{Address, Identity, Endpoint, Locator};
use crate::vl1::{Address, Endpoint, Identity, Locator};
use crate::vl1::buffer::Buffer;
use crate::vl1::constants::{PACKET_SIZE_MAX, FRAGMENT_COUNT_MAX};
use crate::vl1::constants::PACKET_SIZE_MAX;
use crate::vl1::path::Path;
use crate::vl1::peer::Peer;
use parking_lot::Mutex;
use dashmap::DashMap;
use crate::vl1::protocol::{FragmentHeader, is_fragment, PacketHeader, PacketID};
use crate::vl1::whois::Whois;
/// Standard packet buffer type including pool container.
pub type PacketBuffer = Pooled<Buffer<{ PACKET_SIZE_MAX }>>;
@ -96,12 +99,18 @@ pub trait VL1CallerInterface {
fn time_clock(&self) -> i64;
}
#[derive(Default)]
struct BackgroundTaskIntervals {
whois: IntervalGate<{ Whois::INTERVAL }>,
}
pub struct Node {
identity: Identity,
intervals: Mutex<BackgroundTaskIntervals>,
locator: Mutex<Option<Locator>>,
paths_by_inaddr: DashMap<u128, Arc<Path>>,
paths: DashMap<Endpoint, Arc<Path>>,
peers: DashMap<Address, Arc<Peer>>,
peer_vec: Mutex<Vec<Arc<Peer>>>, // for rapid iteration through all peers
whois: Whois,
buffer_pool: Pool<Buffer<{ PACKET_SIZE_MAX }>>,
secure_prng: SecureRandom,
}
@ -136,10 +145,11 @@ impl Node {
Ok(Self {
identity: id,
intervals: Mutex::new(BackgroundTaskIntervals::default()),
locator: Mutex::new(None),
paths_by_inaddr: DashMap::new(),
paths: DashMap::new(),
peers: DashMap::new(),
peer_vec: Mutex::new(Vec::new()),
whois: Whois::new(),
buffer_pool: Pool::new(64),
secure_prng: SecureRandom::get(),
})
@ -164,15 +174,73 @@ impl Node {
self.buffer_pool.get()
}
/// Get a peer by address.
#[inline(always)]
pub fn peer(&self, a: Address) -> Option<Arc<Peer>> {
self.peers.get(&a).map(|peer| peer.clone() )
}
/// Get all peers currently in the peer cache.
pub fn peers(&self) -> Vec<Arc<Peer>> {
let mut v: Vec<Arc<Peer>> = Vec::new();
v.reserve(self.peers.len());
for p in self.peers.iter() {
v.push(p.value().clone());
}
v
}
/// Run background tasks and return desired delay until next call in milliseconds.
/// This should only be called once at a time. It technically won't hurt anything to
/// call concurrently but it will waste CPU cycles.
pub fn do_background_tasks<CI: VL1CallerInterface>(&self, ci: &CI) -> Duration {
let intervals = self.intervals.lock();
let tt = ci.time_ticks();
if intervals.whois.gate(tt) {
self.whois.on_interval(self, ci, tt);
}
Duration::from_millis(1000)
}
/// Called when a packet is received on the physical wire.
pub fn wire_receive<CI: VL1CallerInterface>(&self, ci: &CI, endpoint: &Endpoint, local_socket: i64, local_interface: i64, data: PacketBuffer) {
pub fn wire_receive<CI: VL1CallerInterface>(&self, ci: &CI, source_endpoint: &Endpoint, source_local_socket: i64, source_local_interface: i64, mut data: PacketBuffer) {
let _ = data.struct_mut_at::<FragmentHeader>(0).map(|fragment_header| {
// NOTE: destination address is located at the same index in both the fragment
// header and the full packet header, allowing us to make this decision once.
let dest = Address::from(&fragment_header.dest);
if dest == self.identity.address() {
// Packet or fragment is addressed to this node.
let path = self.path(source_endpoint, source_local_socket, source_local_interface);
if fragment_header.is_fragment() {
} else {
data.struct_mut_at::<PacketHeader>(0).map(|header| {
let source = Address::from(&header.src);
if header.is_fragmented() {
} else {
}
});
}
} else {
// Packet or fragment is addressed to another node.
}
});
}
/// Get the canonical Path object for a given endpoint and local socket information.
/// This is a canonicalizing function that returns a unique path object for every tuple
/// of endpoint, local socket, and local interface.
pub(crate) fn path(&self, ep: &Endpoint, local_socket: i64, local_interface: i64) -> Arc<Path> {
self.paths.get(ep).map_or_else(|| {
let p = Arc::new(Path::new(ep.clone(), local_socket, local_interface));
self.paths.insert(ep.clone(), p.clone()).unwrap_or(p) // if another thread added one, return that instead
}, |path| {
path.clone()
})
}
}

View file

@ -1,13 +1,25 @@
use std::sync::atomic::{AtomicI64, Ordering};
use crate::vl1::Endpoint;
use crate::vl1::constants::FRAGMENT_COUNT_MAX;
use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::protocol::{FragmentHeader, PacketID};
use crate::vl1::node::PacketBuffer;
use parking_lot::Mutex;
struct RxState {
last_receive_time_ticks: i64,
fragmented_packet_count: usize,
fragmented_packets: [FragmentedPacket; FRAGMENT_COUNT_MAX],
}
pub struct Path {
pub(crate) endpoint: Endpoint,
pub(crate) local_socket: i64,
pub(crate) local_interface: i64,
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
rxs: Mutex<RxState>,
}
impl Path {
@ -18,7 +30,11 @@ impl Path {
local_socket,
local_interface,
last_send_time_ticks: AtomicI64::new(0),
last_receive_time_ticks: AtomicI64::new(0),
rxs: Mutex::new(RxState {
last_receive_time_ticks: 0,
fragmented_packet_count: 0,
fragmented_packets: [FragmentedPacket::default(); FRAGMENT_COUNT_MAX]
})
}
}
@ -29,7 +45,48 @@ impl Path {
#[inline(always)]
pub fn send_receive_time_ticks(&self) -> i64 {
self.last_receive_time_ticks.load(Ordering::Relaxed)
self.rxs.lock().last_receive_time_ticks
}
#[inline(always)]
pub(crate) fn receive_fragment<F: FnOnce(&mut FragmentedPacket)>(&self, packet_id: PacketID, fragment_no: u8, fragment_expecting_count: u8, packet: PacketBuffer, time_ticks: i64, assembled_packet_handler: F) {
if fragment_no < FRAGMENT_COUNT_MAX as u8 {
let mut rxs = self.rxs.lock();
rxs.last_receive_time_ticks = time_ticks;
let mut fpcnt = rxs.fragmented_packet_count;
let mut fidx = 0;
while fpcnt > 0 {
let mut f = &mut rxs.fragmented_packets[fidx];
if f.id == packet_id {
if f.add(packet, fragment_no, fragment_expecting_count) {
assembled_packet_handler(f);
f.reset();
rxs.fragmented_packet_count = rxs.fragmented_packet_count.wrapping_sub(1);
}
return;
} else if f.ts_ticks >= 0 {
fpcnt = fpcnt.wrapping_sub(1);
}
fidx = fidx.wrapping_add(1);
}
let mut oldest_ts = rxs.fragmented_packets[0].ts_ticks;
let mut oldest_idx = 0;
if oldest_ts >= 0 {
for fidx in 1..FRAGMENT_COUNT_MAX {
let ts = rxs.fragmented_packets[fidx].ts_ticks;
if ts < oldest_ts {
oldest_ts = ts;
oldest_idx = fidx;
}
}
}
let mut f = &mut rxs.fragmented_packets[oldest_idx];
f.init(packet_id, time_ticks);
let _ = f.add(packet, fragment_no, fragment_expecting_count);
}
}
}

View file

@ -1,41 +1,47 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicU8};
use crate::vl1::protocol::PacketID;
use crate::vl1::node::PacketBuffer;
use crate::vl1::constants::{FRAGMENT_COUNT_MAX, PEER_DEFRAGMENT_MAX_PACKETS_IN_FLIGHT};
use crate::vl1::{Identity, Path};
use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::protocol::{PacketID, PacketHeader};
use crate::vl1::node::{VL1CallerInterface, PacketBuffer, Node};
use parking_lot::Mutex;
struct FragmentedPacket {
pub id: PacketID,
pub frag: [Option<PacketBuffer>; FRAGMENT_COUNT_MAX],
const MAX_PATHS: usize = 16;
struct TxState {
packet_iv_counter: u64,
last_send_time_ticks: i64,
paths: [Arc<Path>; MAX_PATHS],
}
struct RxState {
last_receive_time_ticks: i64,
remote_version: [u8; 4],
remote_protocol_version: u8,
}
/// A remote peer known to this node.
/// Sending-related and receiving-related fields are locked separately since concurrent
/// send/receive is not uncommon.
pub struct Peer {
// This peer's identity.
identity: Identity,
// Primary static secret resulting from key agreement with identity.
// Static shared secret computed from agreement with identity.
identity_static_secret: [u8; 48],
// Outgoing packet IV counter used to generate packet IDs to this peer.
packet_iv_counter: AtomicU64,
// State used primarily when sending to this peer.
txs: Mutex<TxState>,
// Paths sorted in ascending order of quality / preference.
paths: Mutex<Vec<Arc<Path>>>,
// Incoming fragmented packet defragment buffer.
fragmented_packets: Mutex<[FragmentedPacket; PEER_DEFRAGMENT_MAX_PACKETS_IN_FLIGHT]>,
// Last send and receive time in millisecond ticks (not wall clock).
last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64,
// Most recent remote version (most to least significant bytes: major, minor, revision, build)
remote_version: AtomicU64,
// Most recent remote protocol version
remote_protocol_version: AtomicU8,
// State used primarily when receiving from this peer.
rxs: Mutex<RxState>,
}
impl Peer {
pub(crate) fn receive_from_singular<CI: VL1CallerInterface>(&self, node: &Node, ci: &CI, header: &PacketHeader, packet: &PacketBuffer) {
}
pub(crate) fn receive_from_fragmented<CI: VL1CallerInterface>(&self, node: &Node, ci: CI, header: &PacketHeader, packet: &FragmentedPacket) {
}
}

View file

@ -54,12 +54,6 @@ impl PacketHeader {
(self.flags_cipher_hops & HEADER_FLAG_FRAGMENTED) != 0
}
/// If true, this packet is actually a fragment and its header should be interpreted as a FragmentHeader instead.
#[inline(always)]
pub fn is_fragment(&self) -> bool {
self.src[0] == FRAGMENT_INDICATOR
}
#[inline(always)]
pub fn destination(&self) -> Address {
Address::from(&self.dest)
@ -89,6 +83,11 @@ pub struct FragmentHeader {
unsafe impl crate::vl1::buffer::RawObject for FragmentHeader {}
impl FragmentHeader {
#[inline(always)]
pub fn is_fragment(&self) -> bool {
self.fragment_indicator == FRAGMENT_INDICATOR
}
#[inline(always)]
pub fn total_fragments(&self) -> u8 {
self.total_and_fragment_no >> 4
@ -104,6 +103,12 @@ impl FragmentHeader {
self.reserved_hops & HEADER_FLAGS_FIELD_MASK_HOPS
}
#[inline(always)]
pub fn increment_hops(&mut self) {
let f = self.reserved_hops;
self.reserved_hops = (f & HEADER_FLAGS_FIELD_MASK_HOPS.not()) | ((f + 1) & HEADER_FLAGS_FIELD_MASK_HOPS);
}
#[inline(always)]
pub fn destination(&self) -> Address {
Address::from(&self.dest)

View file

@ -0,0 +1,75 @@
use std::collections::HashMap;
use crate::vl1::Address;
use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::node::{VL1CallerInterface, Node, PacketBuffer};
use crate::util::gate::IntervalGate;
use parking_lot::Mutex;
use crate::vl1::constants::{WHOIS_RETRY_INTERVAL, WHOIS_RETRY_MAX};
pub enum QueuedPacket {
Singular(PacketBuffer),
Fragmented(FragmentedPacket)
}
struct WhoisQueueItem {
retry_gate: IntervalGate<{ WHOIS_RETRY_INTERVAL }>,
retry_count: u16,
packet_queue: Vec<QueuedPacket>
}
pub struct Whois {
queue: Mutex<HashMap<Address, WhoisQueueItem>>
}
impl Whois {
pub const INTERVAL: i64 = WHOIS_RETRY_INTERVAL;
pub fn new() -> Self {
Self {
queue: Mutex::new(HashMap::new())
}
}
pub fn query<CI: VL1CallerInterface>(&self, node: &Node, ci: &CI, target: Address, packet: Option<QueuedPacket>) {
let mut q = self.queue.lock();
if q.get_mut(&target).map_or_else(|| {
q.insert(target, WhoisQueueItem {
retry_gate: IntervalGate::new(ci.time_ticks()),
retry_count: 1,
packet_queue: packet.map_or_else(|| Vec::new(), |p| vec![p]),
});
true
}, |qi| {
let g = qi.retry_gate(ci.time_ticks());
qi.retry_count += g as u16;
let _ = packet.map(|p| qi.packet_queue.push(p));
g
}) {
self.send_whois(node, ci, &[target]);
}
}
pub fn on_interval<CI: VL1CallerInterface>(&self, node: &Node, ci: &CI, time_ticks: i64) {
let mut targets: Vec<Address> = Vec::new();
self.queue.lock().retain(|target, qi| {
if qi.retry_count < WHOIS_RETRY_MAX {
if qi.retry_gate.gate(time_ticks) {
qi.retry_count += 1;
targets.push(*target);
}
true
} else {
false
}
});
if !targets.is_empty() {
self.send_whois(node, ci, targets.as_slice());
}
}
fn send_whois<CI: VL1CallerInterface>(&self, node: &Node, ci: &CI, targets: &[Address]) {
todo!()
}
}