A bit more reorg.

This commit is contained in:
Adam Ierymenko 2022-09-29 09:45:54 -04:00
parent 2e469e282a
commit b5e1c4f546
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
10 changed files with 83 additions and 74 deletions

View file

@ -419,6 +419,69 @@ pub(crate) mod v1 {
pub version_revision: [u8; 2], // u16 pub version_revision: [u8; 2], // u16
} }
} }
/// Packet fragment re-assembler and container.
///
/// Performance note: PacketBuffer is Pooled<Buffer> which is NotNull<*mut Buffer>.
/// That means Option<PacketBuffer> is just a pointer, since NotNull permits the
/// compiler to optimize out any additional state in Option.
pub(crate) struct FragmentedPacket {
pub ts_ticks: i64,
pub frags: [Option<PooledPacketBuffer>; FRAGMENT_COUNT_MAX],
pub have: u8,
pub expecting: u8,
}
impl FragmentedPacket {
#[inline]
pub fn new(ts: i64) -> Self {
// 'have' and 'expecting' must be expanded if this is >8
debug_assert!(v1::FRAGMENT_COUNT_MAX <= 8);
Self {
ts_ticks: ts,
frags: Default::default(),
have: 0,
expecting: 0,
}
}
/// Add a fragment to this fragment set and return true if all fragments are present.
#[inline(always)]
pub fn add_fragment(&mut self, frag: PooledPacketBuffer, no: u8, expecting: u8) -> bool {
if let Some(entry) = self.frags.get_mut(no as usize) {
/*
* This works by setting bit N in the 'have' bit mask and then setting X bits
* in 'expecting' if the 'expecting' field is non-zero. Since the packet head
* does not carry the expecting fragment count (it will be provided as zero) and
* all subsequent fragments should have the same fragment count, this will yield
* a 'have' of 1 and an 'expecting' of 0 after the head arrives. Then 'expecting'
* will be set to the right bit pattern by the first fragment and 'true' will get
* returned once all fragments have arrived and therefore all flags in 'have' are
* set.
*
* Receipt of a four-fragment packet would look like:
*
* after head : have == 0x01, expecting == 0x00 -> false
* after fragment 1: have == 0x03, expecting == 0x0f -> false
* after fragment 2: have == 0x07, expecting == 0x0f -> false
* after fragment 3: have == 0x0f, expecting == 0x0f -> true (done!)
*
* This algorithm is just a few instructions in ASM and also correctly handles
* duplicated packet fragments. If all fragments never arrive receipt eventually
* times out and this is discarded.
*/
let _ = entry.insert(frag);
self.have |= 1_u8.wrapping_shl(no as u32);
self.expecting |= 0xff_u8.wrapping_shr(8 - (expecting as u32));
self.have == self.expecting
} else {
false
}
}
}
} }
/// Maximum delta between the message ID of a sent packet and its response. /// Maximum delta between the message ID of a sent packet and its response.

View file

@ -1,65 +0,0 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use crate::protocol::*;
/// Packet fragment re-assembler and container.
///
/// Performance note: PacketBuffer is Pooled<Buffer> which is NotNull<*mut Buffer>.
/// That means Option<PacketBuffer> is just a pointer, since NotNull permits the
/// compiler to optimize out any additional state in Option.
pub(crate) struct FragmentedPacket {
pub ts_ticks: i64,
pub frags: [Option<PooledPacketBuffer>; v1::FRAGMENT_COUNT_MAX],
pub have: u8,
pub expecting: u8,
}
impl FragmentedPacket {
pub fn new(ts: i64) -> Self {
// 'have' and 'expecting' must be expanded if this is >8
debug_assert!(v1::FRAGMENT_COUNT_MAX <= 8);
Self {
ts_ticks: ts,
frags: Default::default(),
have: 0,
expecting: 0,
}
}
/// Add a fragment to this fragment set and return true if all fragments are present.
#[inline(always)]
pub fn add_fragment(&mut self, frag: PooledPacketBuffer, no: u8, expecting: u8) -> bool {
if let Some(entry) = self.frags.get_mut(no as usize) {
/*
* This works by setting bit N in the 'have' bit mask and then setting X bits
* in 'expecting' if the 'expecting' field is non-zero. Since the packet head
* does not carry the expecting fragment count (it will be provided as zero) and
* all subsequent fragments should have the same fragment count, this will yield
* a 'have' of 1 and an 'expecting' of 0 after the head arrives. Then 'expecting'
* will be set to the right bit pattern by the first fragment and 'true' will get
* returned once all fragments have arrived and therefore all flags in 'have' are
* set.
*
* Receipt of a four-fragment packet would look like:
*
* after head : have == 0x01, expecting == 0x00 -> false
* after fragment 1: have == 0x03, expecting == 0x0f -> false
* after fragment 2: have == 0x07, expecting == 0x0f -> false
* after fragment 3: have == 0x0f, expecting == 0x0f -> true (done!)
*
* This algorithm is just a few instructions in ASM and also correctly handles
* duplicated packet fragments. If all fragments never arrive receipt eventually
* times out and this is discarded.
*/
let _ = entry.insert(frag);
self.have |= 1_u8.wrapping_shl(no as u32);
self.expecting |= 0xff_u8.wrapping_shr(8 - (expecting as u32));
self.have == self.expecting
} else {
false
}
}
}

View file

@ -3,7 +3,6 @@
mod address; mod address;
mod endpoint; mod endpoint;
mod event; mod event;
mod fragmentedpacket;
mod mac; mod mac;
mod path; mod path;
mod peer; mod peer;

View file

@ -8,7 +8,6 @@ use parking_lot::Mutex;
use crate::protocol::*; use crate::protocol::*;
use crate::vl1::endpoint::Endpoint; use crate::vl1::endpoint::Endpoint;
use crate::vl1::fragmentedpacket::FragmentedPacket;
use crate::vl1::node::*; use crate::vl1::node::*;
use zerotier_crypto::random; use zerotier_crypto::random;
@ -33,10 +32,11 @@ pub struct Path<HostSystemImpl: HostSystem> {
last_send_time_ticks: AtomicI64, last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64,
create_time_ticks: i64, create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<u64, FragmentedPacket, PacketIdHasher>>, fragmented_packets: Mutex<HashMap<u64, v1::FragmentedPacket, PacketIdHasher>>,
} }
impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> { impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
#[inline]
pub fn new( pub fn new(
endpoint: Endpoint, endpoint: Endpoint,
local_socket: HostSystemImpl::LocalSocket, local_socket: HostSystemImpl::LocalSocket,
@ -56,6 +56,7 @@ impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
/// Receive a fragment and return a FragmentedPacket if the entire packet was assembled. /// Receive a fragment and return a FragmentedPacket if the entire packet was assembled.
/// This returns None if more fragments are needed to assemble the packet. /// This returns None if more fragments are needed to assemble the packet.
#[inline]
pub(crate) fn receive_fragment( pub(crate) fn receive_fragment(
&self, &self,
packet_id: u64, packet_id: u64,
@ -63,7 +64,7 @@ impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
fragment_expecting_count: u8, fragment_expecting_count: u8,
packet: PooledPacketBuffer, packet: PooledPacketBuffer,
time_ticks: i64, time_ticks: i64,
) -> Option<FragmentedPacket> { ) -> Option<v1::FragmentedPacket> {
let mut fp = self.fragmented_packets.lock(); let mut fp = self.fragmented_packets.lock();
// Discard some old waiting packets if the total incoming fragments for a path exceeds a // Discard some old waiting packets if the total incoming fragments for a path exceeds a
@ -83,7 +84,7 @@ impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
if fp if fp
.entry(packet_id) .entry(packet_id)
.or_insert_with(|| FragmentedPacket::new(time_ticks)) .or_insert_with(|| v1::FragmentedPacket::new(time_ticks))
.add_fragment(packet, fragment_no, fragment_expecting_count) .add_fragment(packet, fragment_no, fragment_expecting_count)
{ {
fp.remove(&packet_id) fp.remove(&packet_id)
@ -102,6 +103,7 @@ impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
} }
#[inline]
pub(crate) fn service(&self, time_ticks: i64) -> PathServiceResult { pub(crate) fn service(&self, time_ticks: i64) -> PathServiceResult {
self.fragmented_packets self.fragmented_packets
.lock() .lock()

View file

@ -191,8 +191,12 @@ impl<T, const C: usize> ArrayVec<T, C> {
} }
} }
impl<T: Copy, const C: usize> ArrayVec<T, C> { impl<T, const C: usize> ArrayVec<T, C>
where
T: Copy,
{
/// Push a slice of copyable objects, panic if capacity exceeded. /// Push a slice of copyable objects, panic if capacity exceeded.
#[inline]
pub fn push_slice(&mut self, v: &[T]) { pub fn push_slice(&mut self, v: &[T]) {
let start = self.s; let start = self.s;
let end = self.s + v.len(); let end = self.s + v.len();

View file

@ -29,6 +29,7 @@ pub use tokio;
pub const NEVER_HAPPENED_TICKS: i64 = i64::MIN / 2; pub const NEVER_HAPPENED_TICKS: i64 = i64::MIN / 2;
/// Get milliseconds since unix epoch. /// Get milliseconds since unix epoch.
#[inline]
pub fn ms_since_epoch() -> i64 { pub fn ms_since_epoch() -> i64 {
std::time::SystemTime::now() std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
@ -37,6 +38,7 @@ pub fn ms_since_epoch() -> i64 {
} }
/// Get milliseconds since an arbitrary time in the past, guaranteed to monotonically increase. /// Get milliseconds since an arbitrary time in the past, guaranteed to monotonically increase.
#[inline]
pub fn ms_monotonic() -> i64 { pub fn ms_monotonic() -> i64 {
static STARTUP_INSTANT: parking_lot::RwLock<Option<std::time::Instant>> = parking_lot::RwLock::new(None); static STARTUP_INSTANT: parking_lot::RwLock<Option<std::time::Instant>> = parking_lot::RwLock::new(None);
let si = *STARTUP_INSTANT.read(); let si = *STARTUP_INSTANT.read();

View file

@ -27,6 +27,7 @@ pub trait Marshalable: Sized {
/// ///
/// This will return an Err if the buffer is too small or some other error occurs. It's just /// This will return an Err if the buffer is too small or some other error occurs. It's just
/// a shortcut to creating a buffer and marshaling into it. /// a shortcut to creating a buffer and marshaling into it.
#[inline]
fn to_buffer<const BL: usize>(&self) -> Result<Buffer<BL>, UnmarshalError> { fn to_buffer<const BL: usize>(&self) -> Result<Buffer<BL>, UnmarshalError> {
let mut tmp = Buffer::new(); let mut tmp = Buffer::new();
self.marshal(&mut tmp)?; self.marshal(&mut tmp)?;
@ -36,12 +37,14 @@ pub trait Marshalable: Sized {
/// Unmarshal this object from a buffer. /// Unmarshal this object from a buffer.
/// ///
/// This is just a shortcut to calling unmarshal() with a zero cursor and then discarding the cursor. /// This is just a shortcut to calling unmarshal() with a zero cursor and then discarding the cursor.
#[inline]
fn from_buffer<const BL: usize>(buf: &Buffer<BL>) -> Result<Self, UnmarshalError> { fn from_buffer<const BL: usize>(buf: &Buffer<BL>) -> Result<Self, UnmarshalError> {
let mut tmp = 0; let mut tmp = 0;
Self::unmarshal(buf, &mut tmp) Self::unmarshal(buf, &mut tmp)
} }
/// Marshal and convert to a Rust vector. /// Marshal and convert to a Rust vector.
#[inline]
fn to_bytes(&self) -> Vec<u8> { fn to_bytes(&self) -> Vec<u8> {
let mut tmp = Buffer::<TEMP_BUF_SIZE>::new(); let mut tmp = Buffer::<TEMP_BUF_SIZE>::new();
assert!(self.marshal(&mut tmp).is_ok()); // panics if TEMP_BUF_SIZE is too small assert!(self.marshal(&mut tmp).is_ok()); // panics if TEMP_BUF_SIZE is too small
@ -49,6 +52,7 @@ pub trait Marshalable: Sized {
} }
/// Unmarshal from a raw slice. /// Unmarshal from a raw slice.
#[inline]
fn from_bytes(b: &[u8]) -> Result<Self, UnmarshalError> { fn from_bytes(b: &[u8]) -> Result<Self, UnmarshalError> {
if b.len() <= TEMP_BUF_SIZE { if b.len() <= TEMP_BUF_SIZE {
let mut tmp = Buffer::<TEMP_BUF_SIZE>::new_boxed(); let mut tmp = Buffer::<TEMP_BUF_SIZE>::new_boxed();

View file

@ -2,8 +2,8 @@
mod localinterface; mod localinterface;
mod localsocket; mod localsocket;
mod settings;
mod vl1service; mod vl1service;
mod vl1settings;
pub mod constants; pub mod constants;
pub mod datadir; pub mod datadir;
@ -11,5 +11,5 @@ pub mod sys;
pub use localinterface::LocalInterface; pub use localinterface::LocalInterface;
pub use localsocket::LocalSocket; pub use localsocket::LocalSocket;
pub use settings::VL1Settings;
pub use vl1service::*; pub use vl1service::*;
pub use vl1settings::VL1Settings;

View file

@ -12,8 +12,8 @@ use zerotier_network_hypervisor::vl1::*;
use zerotier_utils::{ms_monotonic, ms_since_epoch}; use zerotier_utils::{ms_monotonic, ms_since_epoch};
use crate::constants::UNASSIGNED_PRIVILEGED_PORTS; use crate::constants::UNASSIGNED_PRIVILEGED_PORTS;
use crate::settings::VL1Settings;
use crate::sys::udp::{udp_test_bind, BoundUdpPort, UdpPacketHandler}; use crate::sys::udp::{udp_test_bind, BoundUdpPort, UdpPacketHandler};
use crate::vl1settings::VL1Settings;
use crate::LocalSocket; use crate::LocalSocket;
/// Update UDP bindings every this many seconds. /// Update UDP bindings every this many seconds.