mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-04-19 13:36:54 +02:00
Merge pull request #1925 from zerotier/tetanus-defrag-2
Added init packet expiry and added Debug impl for arc_pool
This commit is contained in:
commit
2f61e59e5c
3 changed files with 212 additions and 58 deletions
|
@ -1,3 +1,4 @@
|
|||
use std::fmt::{Debug, Display};
|
||||
use std::marker::PhantomData;
|
||||
use std::mem::{self, ManuallyDrop, MaybeUninit};
|
||||
use std::num::NonZeroU64;
|
||||
|
@ -63,6 +64,7 @@ impl<T, const L: usize> Pool<T, L> {
|
|||
let slot_ptr = if let Some(mut slot_ptr) = NonNull::new(first_free) {
|
||||
let slot = slot_ptr.as_mut();
|
||||
let _announce_free = slot.free_lock.write().unwrap();
|
||||
debug_assert_eq!(slot.uid, 0);
|
||||
first_free = slot.obj.empty_next;
|
||||
slot.ref_count = AtomicU32::new(1);
|
||||
slot.uid = uid;
|
||||
|
@ -93,9 +95,10 @@ impl<T, const L: usize> Pool<T, L> {
|
|||
/// Frees memory allocated from the pool by `Pool::alloc_ptr`. This must be called only once on only pointers returned by `Pool::alloc_ptr` from the same pool. Once memory is freed the content of the memory is undefined, it should not be read or written.
|
||||
///
|
||||
/// `drop` will be called on the `T` pointed to, be sure it has not been called already.
|
||||
///
|
||||
/// The free lock must be held by the caller.
|
||||
unsafe fn free_ptr(&self, mut slot_ptr: NonNull<Slot<T>>) {
|
||||
let slot = slot_ptr.as_mut();
|
||||
let _announce_free = slot.free_lock.write().unwrap();
|
||||
slot.uid = 0;
|
||||
ManuallyDrop::<T>::drop(&mut slot.obj.full_obj);
|
||||
//linked-list insert
|
||||
|
@ -146,18 +149,6 @@ pub trait StaticPool<T, const L: usize = DEFAULT_L> {
|
|||
}
|
||||
}
|
||||
|
||||
/// A multithreading lock guard that prevents another thread from freeing the underlying `T` while it is held. It does not prevent other threads from accessing the underlying `T`.
|
||||
///
|
||||
/// If the same thread that holds this guard attempts to free `T` before dropping the guard, it will deadlock.
|
||||
pub struct PoolGuard<'a, T>(RwLockReadGuard<'a, ()>, &'a T);
|
||||
impl<'a, T> Deref for PoolGuard<'a, T> {
|
||||
type Target = T;
|
||||
#[inline]
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&*self.1
|
||||
}
|
||||
}
|
||||
|
||||
/// A rust-style RAII wrapper that drops and frees memory allocated from a pool automatically, the same as an `Arc<T>`. This will run the destructor of `T` in place within the pool before freeing it, correctly maintaining the invariants that the borrow checker and rust compiler expect of generic types.
|
||||
pub struct PoolArc<T, OriginPool: StaticPool<T, L>, const L: usize = DEFAULT_L> {
|
||||
ptr: NonNull<Slot<T>>,
|
||||
|
@ -166,13 +157,16 @@ pub struct PoolArc<T, OriginPool: StaticPool<T, L>, const L: usize = DEFAULT_L>
|
|||
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolArc<T, OriginPool, L> {
|
||||
/// Obtain a non-owning reference to the `T` contained in this `PoolArc`. This reference has the special property that the underlying `T` can be dropped from the pool while neither making this reference invalid or unsafe nor leaking the memory of `T`. Instead attempts to `grab` the reference will safely return `None`.
|
||||
///
|
||||
/// `T` is guaranteed to be dropped when all `PoolArc<T>` are dropped, regardless of how many `PoolWeakRef<T>` still exist.
|
||||
#[inline]
|
||||
pub fn downgrade(&self) -> PoolWeakRef<T> {
|
||||
pub fn downgrade(&self) -> PoolWeakRef<T, OriginPool, L> {
|
||||
unsafe {
|
||||
// Since this is a Arc we know for certain the object has not been freed, so we don't have to hold the free lock
|
||||
PoolWeakRef {
|
||||
ptr: self.ptr,
|
||||
uid: NonZeroU64::new_unchecked(self.ptr.as_ref().uid),
|
||||
_p: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -181,8 +175,6 @@ impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolArc<T, OriginPool, L>
|
|||
unsafe { NonZeroU64::new_unchecked(self.ptr.as_ref().uid) }
|
||||
}
|
||||
}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Send for PoolArc<T, OriginPool, L> where T: Send {}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Sync for PoolArc<T, OriginPool, L> where T: Sync {}
|
||||
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Deref for PoolArc<T, OriginPool, L> {
|
||||
type Target = T;
|
||||
|
@ -203,25 +195,51 @@ impl<T, OriginPool: StaticPool<T, L>, const L: usize> Drop for PoolArc<T, Origin
|
|||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
if self.ptr.as_ref().ref_count.fetch_sub(1, Ordering::AcqRel) == 1 {
|
||||
(*OriginPool::get_static_pool().cast::<Pool<T, L>>()).free_ptr(self.ptr);
|
||||
let slot = self.ptr.as_ref();
|
||||
if slot.ref_count.fetch_sub(1, Ordering::AcqRel) == 1 {
|
||||
let _announce_free = slot.free_lock.write().unwrap();
|
||||
// We have to check twice in case a weakref was upgraded before the lock was acquired
|
||||
if slot.ref_count.load(Ordering::Relaxed) == 0 {
|
||||
(*OriginPool::get_static_pool().cast::<Pool<T, L>>()).free_ptr(self.ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Send for PoolArc<T, OriginPool, L> where T: Send {}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Sync for PoolArc<T, OriginPool, L> where T: Sync {}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Debug for PoolArc<T, OriginPool, L>
|
||||
where
|
||||
T: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("PoolArc").field(self.deref()).finish()
|
||||
}
|
||||
}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Display for PoolArc<T, OriginPool, L>
|
||||
where
|
||||
T: Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.deref().fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// A non-owning reference to a `T` allocated by a pool. This reference has the special property that the underlying `T` can be dropped from the pool while neither making this reference invalid nor leaking the memory of `T`. Instead attempts to `grab` this reference will safely return `None` if the underlying `T` has been freed by any thread.
|
||||
///
|
||||
/// Due to there thread safety and low overhead a `PoolWeakRef` implements clone and copy.
|
||||
/// Due to their thread safety and low overhead a `PoolWeakRef` implements clone and copy.
|
||||
///
|
||||
/// The lifetime of this reference is tied to the lifetime of the pool it came from, because if it were allowed to live longer than its origin pool, it would no longer be safe to dereference and would most likely segfault. Instead the borrow-checker will enforce that this reference has a shorter lifetime that its origin pool.
|
||||
pub struct PoolWeakRef<T> {
|
||||
///
|
||||
/// For technical reasons a `RwLock<PoolWeakRef<T>>` will always be the fastest implementation of a `PoolWeakRefSwap`, which is why this library does not provide a `PoolWeakRefSwap` type.
|
||||
pub struct PoolWeakRef<T, OriginPool: StaticPool<T, L>, const L: usize = DEFAULT_L> {
|
||||
/// A number that uniquely identifies this allocated `T` within this pool. No other instance of `T` may have this uid. This value is read-only.
|
||||
pub uid: NonZeroU64,
|
||||
ptr: NonNull<Slot<T>>,
|
||||
_p: PhantomData<*const OriginPool>,
|
||||
}
|
||||
|
||||
impl<T> PoolWeakRef<T> {
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolWeakRef<T, OriginPool, L> {
|
||||
/// Obtains a lock that allows the `T` contained in this `PoolWeakRef` to be dereferenced in a thread-safe manner. This lock does not prevent other threads from accessing `T` at the same time, so `T` ought to use interior mutability if it needs to be mutated in a thread-safe way. What this lock does guarantee is that `T` cannot be destructed and freed while it is being held.
|
||||
///
|
||||
/// Do not attempt from within the same thread to drop the `PoolArc` that owns this `T` before dropping this lock, or else the thread will deadlock. Rust makes this quite hard to do accidentally but it's not strictly impossible.
|
||||
|
@ -237,23 +255,86 @@ impl<T> PoolWeakRef<T> {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
unsafe impl<T> Send for PoolWeakRef<T> where T: Send {}
|
||||
unsafe impl<T> Sync for PoolWeakRef<T> where T: Sync {}
|
||||
|
||||
impl<T> Clone for PoolWeakRef<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self { uid: self.uid, ptr: self.ptr }
|
||||
/// Attempts to create an owning `PoolArc` from this `PoolWeakRef` of the underlying `T`. Will return `None` if the underlying `T` has already been dropped.
|
||||
pub fn upgrade(&self) -> Option<PoolArc<T, OriginPool, L>> {
|
||||
unsafe {
|
||||
let slot = self.ptr.as_ref();
|
||||
let _prevent_free_lock = slot.free_lock.read().unwrap();
|
||||
if slot.uid == self.uid.get() {
|
||||
self.ptr.as_ref().ref_count.fetch_add(1, Ordering::Relaxed);
|
||||
Some(PoolArc { ptr: self.ptr, _p: PhantomData })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Clone for PoolWeakRef<T, OriginPool, L> {
|
||||
fn clone(&self) -> Self {
|
||||
Self { uid: self.uid, ptr: self.ptr, _p: PhantomData }
|
||||
}
|
||||
}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Copy for PoolWeakRef<T, OriginPool, L> {}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Send for PoolWeakRef<T, OriginPool, L> where T: Send {}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Sync for PoolWeakRef<T, OriginPool, L> where T: Sync {}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Debug for PoolWeakRef<T, OriginPool, L>
|
||||
where
|
||||
T: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let inner = self.grab();
|
||||
f.debug_tuple("PoolWeakRef").field(&inner).finish()
|
||||
}
|
||||
}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Display for PoolWeakRef<T, OriginPool, L>
|
||||
where
|
||||
T: Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
if let Some(inner) = self.grab() {
|
||||
inner.fmt(f)
|
||||
} else {
|
||||
f.write_str("Empty")
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T> Copy for PoolWeakRef<T> {}
|
||||
|
||||
/// A multithreading lock guard that prevents another thread from freeing the underlying `T` while it is held. It does not prevent other threads from accessing the underlying `T`.
|
||||
///
|
||||
/// If the same thread that holds this guard attempts to free `T` before dropping the guard, it will deadlock.
|
||||
pub struct PoolGuard<'a, T>(RwLockReadGuard<'a, ()>, &'a T);
|
||||
impl<'a, T> Deref for PoolGuard<'a, T> {
|
||||
type Target = T;
|
||||
#[inline]
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&*self.1
|
||||
}
|
||||
}
|
||||
impl<'a, T> Debug for PoolGuard<'a, T>
|
||||
where
|
||||
T: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("PoolGuard").field(self.deref()).finish()
|
||||
}
|
||||
}
|
||||
impl<'a, T> Display for PoolGuard<'a, T>
|
||||
where
|
||||
T: Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.deref().fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// Allows for the Atomic Swapping and Loading of a `PoolArc<T>`, similar to how a `RwLock<Arc<T>>` would function, but much faster and less verbose.
|
||||
pub struct PoolArcSwap<T, OriginPool: StaticPool<T, L>, const L: usize = DEFAULT_L> {
|
||||
ptr: AtomicPtr<Slot<T>>,
|
||||
reads: AtomicU32,
|
||||
_p: PhantomData<*const OriginPool>,
|
||||
}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolArcSwap<T, OriginPool, L> {
|
||||
/// Creates a new `PoolArcSwap`, consuming `arc` in the process.
|
||||
pub fn new(mut arc: PoolArc<T, OriginPool, L>) -> Self {
|
||||
unsafe {
|
||||
let ret = Self {
|
||||
|
@ -266,7 +347,7 @@ impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolArcSwap<T, OriginPool,
|
|||
ret
|
||||
}
|
||||
}
|
||||
|
||||
/// Atomically swaps the currently stored `PoolArc` with a new one, returning the previous one.
|
||||
pub fn swap(&self, arc: PoolArc<T, OriginPool, L>) -> PoolArc<T, OriginPool, L> {
|
||||
unsafe {
|
||||
let pre_ptr = self.ptr.swap(arc.ptr.as_ptr(), Ordering::Relaxed);
|
||||
|
@ -280,6 +361,7 @@ impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolArcSwap<T, OriginPool,
|
|||
}
|
||||
}
|
||||
|
||||
/// Atomically loads and clones the currently stored `PoolArc`, guaranteeing that the underlying `T` cannot be freed while the clone is held.
|
||||
pub fn load(&self) -> PoolArc<T, OriginPool, L> {
|
||||
unsafe {
|
||||
self.reads.fetch_add(1, Ordering::Acquire);
|
||||
|
@ -290,9 +372,6 @@ impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolArcSwap<T, OriginPool,
|
|||
}
|
||||
}
|
||||
}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Send for PoolArcSwap<T, OriginPool, L> where T: Send {}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Sync for PoolArcSwap<T, OriginPool, L> where T: Sync {}
|
||||
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Drop for PoolArcSwap<T, OriginPool, L> {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
|
@ -302,19 +381,42 @@ impl<T, OriginPool: StaticPool<T, L>, const L: usize> Drop for PoolArcSwap<T, Or
|
|||
}
|
||||
}
|
||||
}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Send for PoolArcSwap<T, OriginPool, L> where T: Send {}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Sync for PoolArcSwap<T, OriginPool, L> where T: Sync {}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Debug for PoolArcSwap<T, OriginPool, L>
|
||||
where
|
||||
T: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("PoolArcSwap").field(&self.load()).finish()
|
||||
}
|
||||
}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Display for PoolArcSwap<T, OriginPool, L>
|
||||
where
|
||||
T: Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
(&self.load()).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// Another implementation of a `PoolArcSwap` utalizing a RwLock instead of atomics.
|
||||
/// This implementation has slower a `load` but a faster `swap` than the previous implementation of `PoolArcSwap`.
|
||||
/// If you plan on swapping way more often than loading, this may be a better choice.
|
||||
pub struct PoolArcSwapRw<T, OriginPool: StaticPool<T, L>, const L: usize = DEFAULT_L> {
|
||||
ptr: RwLock<NonNull<Slot<T>>>,
|
||||
_p: PhantomData<*const OriginPool>,
|
||||
}
|
||||
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolArcSwapRw<T, OriginPool, L> {
|
||||
/// Creates a new `PoolArcSwap`, consuming `arc` in the process.
|
||||
pub fn new(arc: PoolArc<T, OriginPool, L>) -> Self {
|
||||
let ret = Self { ptr: RwLock::new(arc.ptr), _p: arc._p };
|
||||
mem::forget(arc);
|
||||
ret
|
||||
}
|
||||
|
||||
/// Atomically swaps the currently stored `PoolArc` with a new one, returning the previous one.
|
||||
pub fn swap(&self, arc: PoolArc<T, OriginPool, L>) -> PoolArc<T, OriginPool, L> {
|
||||
let mut w = self.ptr.write().unwrap();
|
||||
let pre = PoolArc { ptr: *w, _p: self._p };
|
||||
|
@ -323,6 +425,7 @@ impl<T, OriginPool: StaticPool<T, L>, const L: usize> PoolArcSwapRw<T, OriginPoo
|
|||
pre
|
||||
}
|
||||
|
||||
/// Atomically loads and clones the currently stored `PoolArc`, guaranteeing that the underlying `T` cannot be freed while the clone is held.
|
||||
pub fn load(&self) -> PoolArc<T, OriginPool, L> {
|
||||
let r = self.ptr.read().unwrap();
|
||||
unsafe {
|
||||
|
@ -341,22 +444,46 @@ impl<T, OriginPool: StaticPool<T, L>, const L: usize> Drop for PoolArcSwapRw<T,
|
|||
}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Send for PoolArcSwapRw<T, OriginPool, L> where T: Send {}
|
||||
unsafe impl<T, OriginPool: StaticPool<T, L>, const L: usize> Sync for PoolArcSwapRw<T, OriginPool, L> where T: Sync {}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Debug for PoolArcSwapRw<T, OriginPool, L>
|
||||
where
|
||||
T: Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("PoolArcSwapRw").field(&self.load()).finish()
|
||||
}
|
||||
}
|
||||
impl<T, OriginPool: StaticPool<T, L>, const L: usize> Display for PoolArcSwapRw<T, OriginPool, L>
|
||||
where
|
||||
T: Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
(&self.load()).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// Automatically generates valid implementations of `StaticPool<T, L>` onto a chosen identifier, allowing this module to allocate instances of `T` with `alloc`. Users have to generate implementations clientside because rust does not allow for generic globals.
|
||||
///
|
||||
/// The chosen identifier is declared to be a struct with no fields, and instead contains a static global `Pool` for every implementation of `StaticPool<T, L>` requested.
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// use zerotier_utils::arc_pool::{Pool, StaticPool, static_pool};
|
||||
/// use zerotier_utils::arc_pool::{static_pool, StaticPool, Pool, PoolArc};
|
||||
///
|
||||
/// static_pool!(pub StaticPool MyPools {
|
||||
/// Pool<u32>, Pool<&u32, 12>
|
||||
/// });
|
||||
///
|
||||
/// struct Container {
|
||||
/// item: PoolArc<u32, MyPools>
|
||||
/// }
|
||||
///
|
||||
/// let object = 1u32;
|
||||
/// let arc_object = MyPools::alloc(object);
|
||||
/// let arc_ref = MyPools::alloc(&object);
|
||||
/// let arc_container = Container {item: MyPools::alloc(object)};
|
||||
///
|
||||
/// assert_eq!(*arc_object, **arc_ref);
|
||||
/// assert_eq!(*arc_object, *arc_container.item);
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! __static_pool__ {
|
||||
|
@ -538,7 +665,7 @@ mod tests {
|
|||
s.check(id);
|
||||
} else if p < prob(60) {
|
||||
if let Some((id, s, w)) = rand_idx(&items, r) {
|
||||
items_dup.push((*id, s.clone(), w.clone()));
|
||||
items_dup.push((*id, s.clone(), (*w).clone()));
|
||||
s.check(*id);
|
||||
}
|
||||
} else if p < prob(80) {
|
||||
|
|
|
@ -55,24 +55,11 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
|||
///
|
||||
/// When a fully assembled packet is returned the internal state is reset and this object can
|
||||
/// be reused to assemble another packet.
|
||||
#[inline(always)]
|
||||
pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option<Assembled<Fragment, MAX_FRAGMENTS>> {
|
||||
if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS {
|
||||
// If the counter has changed, reset the structure to receive a new packet.
|
||||
if counter != self.counter {
|
||||
if needs_drop::<Fragment>() {
|
||||
let mut have = self.have;
|
||||
let mut i = 0;
|
||||
while have != 0 {
|
||||
if (have & 1) != 0 {
|
||||
debug_assert!(i < MAX_FRAGMENTS);
|
||||
unsafe { self.frags.get_unchecked_mut(i).assume_init_drop() };
|
||||
}
|
||||
have = have.wrapping_shr(1);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
self.have = 0;
|
||||
self.drop_in_place();
|
||||
self.count = fragment_count as u32;
|
||||
self.counter = counter;
|
||||
}
|
||||
|
@ -96,11 +83,10 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
|||
}
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGMENTS> {
|
||||
/// Drops any remaining fragments and resets this object.
|
||||
#[inline(always)]
|
||||
fn drop(&mut self) {
|
||||
pub fn drop_in_place(&mut self) {
|
||||
if needs_drop::<Fragment>() {
|
||||
let mut have = self.have;
|
||||
let mut i = 0;
|
||||
|
@ -113,5 +99,15 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGME
|
|||
i += 1;
|
||||
}
|
||||
}
|
||||
self.have = 0;
|
||||
self.count = 0;
|
||||
self.counter = 0;
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGMENTS> {
|
||||
#[inline(always)]
|
||||
fn drop(&mut self) {
|
||||
self.drop_in_place();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ use std::collections::hash_map::RandomState;
|
|||
use std::collections::HashMap;
|
||||
use std::hash::{BuildHasher, Hash, Hasher};
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
|
||||
|
||||
use zerotier_crypto::aes::{Aes, AesGcm};
|
||||
|
@ -41,7 +41,8 @@ const GCM_CIPHER_POOL_SIZE: usize = 4;
|
|||
pub struct Context<Application: ApplicationLayer> {
|
||||
default_physical_mtu: AtomicUsize,
|
||||
defrag_salt: RandomState,
|
||||
defrag: [Mutex<Fragged<Application::IncomingPacketBuffer, MAX_NOISE_HANDSHAKE_FRAGMENTS>>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE],
|
||||
defrag_has_pending: AtomicBool, // Allowed to be falsely positive
|
||||
defrag: [Mutex<(Fragged<Application::IncomingPacketBuffer, MAX_NOISE_HANDSHAKE_FRAGMENTS>, i64)>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE],
|
||||
sessions: RwLock<SessionsById<Application>>,
|
||||
}
|
||||
|
||||
|
@ -153,7 +154,8 @@ impl<Application: ApplicationLayer> Context<Application> {
|
|||
Self {
|
||||
default_physical_mtu: AtomicUsize::new(default_physical_mtu),
|
||||
defrag_salt: RandomState::new(),
|
||||
defrag: std::array::from_fn(|_| Mutex::new(Fragged::new())),
|
||||
defrag_has_pending: AtomicBool::new(false),
|
||||
defrag: std::array::from_fn(|_| Mutex::new((Fragged::new(), i64::MAX))),
|
||||
sessions: RwLock::new(SessionsById {
|
||||
active: HashMap::with_capacity(64),
|
||||
incoming: HashMap::with_capacity(64),
|
||||
|
@ -246,6 +248,23 @@ impl<Application: ApplicationLayer> Context<Application> {
|
|||
}
|
||||
}
|
||||
}
|
||||
// Only check for expiration if we have a pending packet.
|
||||
// This check is allowed to have false positives for simplicity's sake.
|
||||
if self.defrag_has_pending.swap(false, Ordering::Relaxed) {
|
||||
let mut has_pending = false;
|
||||
for m in &self.defrag {
|
||||
let mut pending = m.lock().unwrap();
|
||||
if pending.1 <= negotiation_timeout_cutoff {
|
||||
pending.1 = i64::MAX;
|
||||
pending.0.drop_in_place();
|
||||
} else if pending.0.counter() != 0 {
|
||||
has_pending = true;
|
||||
}
|
||||
}
|
||||
if has_pending {
|
||||
self.defrag_has_pending.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
if !dead_active.is_empty() || !dead_pending.is_empty() {
|
||||
let mut sessions = self.sessions.write().unwrap();
|
||||
|
@ -543,15 +562,27 @@ impl<Application: ApplicationLayer> Context<Application> {
|
|||
// Volumetric spam is quite difficult since without the `defrag_salt: RandomState` value an adversary
|
||||
// cannot control which slots their fragments index to. And since Alice's packet header has a randomly
|
||||
// generated counter value replaying it in time requires extreme amounts of network control.
|
||||
let mut slot0 = self.defrag[idx0].lock().unwrap();
|
||||
let (slot0, timestamp0) = &mut *self.defrag[idx0].lock().unwrap();
|
||||
if slot0.counter() == hashed_counter {
|
||||
assembled = slot0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
||||
if assembled.is_some() {
|
||||
*timestamp0 = i64::MAX;
|
||||
}
|
||||
} else {
|
||||
let mut slot1 = self.defrag[idx1].lock().unwrap();
|
||||
let (slot1, timestamp1) = &mut *self.defrag[idx1].lock().unwrap();
|
||||
if slot1.counter() == hashed_counter || slot1.counter() == 0 {
|
||||
if slot1.counter() == 0 {
|
||||
*timestamp1 = current_time;
|
||||
self.defrag_has_pending.store(true, Ordering::Relaxed);
|
||||
}
|
||||
assembled = slot1.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
||||
if assembled.is_some() {
|
||||
*timestamp1 = i64::MAX;
|
||||
}
|
||||
} else {
|
||||
// slot1 is full so kick out whatever is in slot0 to make more room.
|
||||
// slot0 is either occupied or empty so we overwrite whatever is there to make more room.
|
||||
*timestamp0 = current_time;
|
||||
self.defrag_has_pending.store(true, Ordering::Relaxed);
|
||||
assembled = slot0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue