From f6540e129a88cf003189de42eb4b1926f1cbca49 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Mon, 20 Mar 2023 15:26:15 -0400 Subject: [PATCH 1/4] added arc_pool to tetanus --- utils/src/arc_pool.rs | 657 ++++++++++++++++++++++++++++++++++++++++++ utils/src/lib.rs | 2 + 2 files changed, 659 insertions(+) create mode 100644 utils/src/arc_pool.rs diff --git a/utils/src/arc_pool.rs b/utils/src/arc_pool.rs new file mode 100644 index 000000000..1d5e71e1a --- /dev/null +++ b/utils/src/arc_pool.rs @@ -0,0 +1,657 @@ +use std::sync::{Mutex, RwLock, RwLockReadGuard, atomic::{AtomicU32, Ordering, AtomicPtr}}; +use std::mem::{self, MaybeUninit, ManuallyDrop}; +use std::ptr::{self, NonNull}; +use std::marker::PhantomData; +use std::num::NonZeroU64; +use std::ops::Deref; + +const DEFAULT_L: usize = 64; + +union SlotState { + empty_next: *mut Slot, + full_obj: ManuallyDrop, +} +struct Slot { + obj: SlotState, + free_lock: RwLock<()>, + ref_count: AtomicU32, + uid: u64, +} + +struct PoolMem { + mem: [MaybeUninit>; L], + pre: *mut PoolMem, +} + +/// A generic, *thread-safe*, fixed-sized memory allocator for instances of `T`. +/// New instances of `T` are packed together into arrays of size `L`, and allocated in bulk as one memory arena from the global allocator. +/// Arenas from the global allocator are not deallocated until the pool is dropped, and are re-used as instances of `T` are allocated and freed. +/// +/// This specific datastructure also supports generational indexing, which means that an arbitrary number of non-owning references to allocated instances of `T` can be generated safely. These references can outlive the underlying `T` they reference, and will safely report upon dereference that the original underlying `T` is gone. +/// +/// Atomic reference counting is also implemented allowing for exceedingly complex models of shared ownership. Multiple copies of both strong and weak references to the underlying `T` can be generated that are all memory safe and borrow-checked. +/// +/// Allocating from a pool results in very little internal and external fragmentation in the global heap, thus saving significant amounts of memory from being used by one's program. Pools also allocate memory significantly faster on average than the global allocator. This specific pool implementation supports guaranteed constant time `alloc` and `free`. +pub struct Pool (Mutex<(*mut Slot, u64, *mut PoolMem, usize)>); +unsafe impl Send for Pool {} +unsafe impl Sync for Pool {} + +impl Pool { + pub const DEFAULT_L: usize = DEFAULT_L; + + /// Creates a new `Pool` with packing length `L`. Packing length determines the number of instances of `T` that will fit in a page before it becomes full. Once all pages in a `Pool` are full a new page is allocated from the LocalNode allocator. Larger values of `L` are generally faster, but the returns are diminishing and vary by platform. + /// + /// A `Pool` cannot be interacted with directly, it requires a `impl StaticPool for Pool` implementation. See the `static_pool!` macro for automatically generated trait implementation. + #[inline] + pub const fn new() -> Self { + Pool (Mutex::new((ptr::null_mut(), 1, ptr::null_mut(), usize::MAX))) + } + + + #[inline(always)] + fn create_arr() -> [MaybeUninit>; L] { + unsafe { MaybeUninit::<[MaybeUninit>; L]>::uninit().assume_init() } + } + + /// Allocates uninitialized memory for an instance `T`. The returned pointer points to this memory. It is undefined what will be contained in this memory, it must be initiallized before being used. This pointer must be manually freed from the pool using `Pool::free_ptr` before being dropped, otherwise its memory will be leaked. If the pool is dropped before this pointer is freed, the destructor of `T` will not be run and this pointer will point to invalid memory. + unsafe fn alloc_ptr(&self, obj: T) -> NonNull> { + let mut mutex = self.0.lock().unwrap(); + let (mut first_free, uid, mut head_arena, mut head_size) = *mutex; + + let slot_ptr = if let Some(mut slot_ptr) = NonNull::new(first_free) { + let slot = slot_ptr.as_mut(); + let _announce_free = slot.free_lock.write().unwrap(); + first_free = slot.obj.empty_next; + slot.ref_count = AtomicU32::new(1); + slot.uid = uid; + slot.obj.full_obj = ManuallyDrop::new(obj); + slot_ptr + } else { + if head_size >= L { + let new = Box::leak(Box::new(PoolMem { + pre: head_arena, + mem: Self::create_arr(), + })); + head_arena = new; + head_size = 0; + } + let slot = Slot { + obj: SlotState {full_obj: ManuallyDrop::new(obj)}, + free_lock: RwLock::new(()), + ref_count: AtomicU32::new(1), + uid, + }; + let slot_ptr = &mut (*head_arena).mem[head_size]; + let slot_ptr = NonNull::new_unchecked(slot_ptr.write(slot)); + head_size += 1; + // We do not have to hold the free lock since we know this slot has never been touched before and nothing external references it + slot_ptr + }; + + *mutex = (first_free, uid.wrapping_add(1), head_arena, head_size); + slot_ptr + } + /// Frees memory allocated from the pool by `Pool::alloc_ptr`. This must be called only once on only pointers returned by `Pool::alloc_ptr` from the same pool. Once memory is freed the content of the memory is undefined, it should not be read or written. + /// + /// `drop` will be called on the `T` pointed to, be sure it has not been called already. + unsafe fn free_ptr(&self, mut slot_ptr: NonNull>) { + let slot = slot_ptr.as_mut(); + let _announce_free = slot.free_lock.write().unwrap(); + slot.uid = 0; + ManuallyDrop::::drop(&mut slot.obj.full_obj); + //linked-list insert + let mut mutex = self.0.lock().unwrap(); + + slot.obj.empty_next = mutex.0; + mutex.0 = slot_ptr.as_ptr(); + } +} +impl Drop for Pool { + fn drop(&mut self) { + let mutex = self.0.lock().unwrap(); + let (_, _, mut head_arena, _) = *mutex; + unsafe { + while !head_arena.is_null() { + let mem = Box::from_raw(head_arena); + head_arena = mem.pre; + drop(mem); + } + } + drop(mutex); + } +} + +pub trait StaticPool { + + /// Must return a pointer to an instance of a `Pool` with a static lifetime. That pointer must be cast to a `*const ()` to make the borrow-checker happy. + /// + /// **Safety**: The returned pointer must have originally been a `&'static Pool` reference. So it must have had a matching `T` and `L` and it must have the static lifetime. + /// + /// In order to borrow-split allocations from a `Pool`, we need to force the borrow-checker to not associate the lifetime of an instance of `T` with the lifetime of the pool. Otherwise the borrow-checker would require every allocated `T` to have the `'static` lifetime, to match the pool's lifetime. + /// The simplest way I have found to do this is to return the pointer to the static pool as an anonymous, lifetimeless `*const ()`. This introduces unnecessary safety concerns surrounding pointer casting unfortunately. If there is a better way to borrow-split from a pool I will gladly implement it. + unsafe fn get_static_pool() -> *const (); + + /// Allocates memory for an instance `T` and puts its pointer behind a memory-safe Arc. This `PoolArc` automatically frees itself on drop, and will cause the borrow checker to complain if you attempt to drop the pool before you drop this box. + /// + /// This `PoolArc` supports the ability to generate weak, non-owning references to the allocated `T`. + #[inline(always)] + fn alloc(obj: T) -> PoolArc where Self: Sized { + unsafe { + PoolArc { + ptr: (*Self::get_static_pool().cast::>()).alloc_ptr(obj), + _p: PhantomData + } + } + } +} + + +/// A multithreading lock guard that prevents another thread from freeing the underlying `T` while it is held. It does not prevent other threads from accessing the underlying `T`. +/// +/// If the same thread that holds this guard attempts to free `T` before dropping the guard, it will deadlock. +pub struct PoolGuard<'a, T> (RwLockReadGuard<'a, ()>, &'a T); +impl<'a, T> Deref for PoolGuard<'a, T> { + type Target = T; + #[inline] + fn deref(&self) -> &Self::Target { + &*self.1 + } +} + + +/// A rust-style RAII wrapper that drops and frees memory allocated from a pool automatically, the same as an `Arc`. This will run the destructor of `T` in place within the pool before freeing it, correctly maintaining the invariants that the borrow checker and rust compiler expect of generic types. +pub struct PoolArc, const L: usize = DEFAULT_L> { + ptr: NonNull>, + _p: PhantomData<*const OriginPool>, +} + +impl, const L: usize> PoolArc { + /// Obtain a non-owning reference to the `T` contained in this `PoolArc`. This reference has the special property that the underlying `T` can be dropped from the pool while neither making this reference invalid or unsafe nor leaking the memory of `T`. Instead attempts to `grab` the reference will safely return `None`. + #[inline] + pub fn downgrade(&self) -> PoolWeakRef { + unsafe { + // Since this is a Arc we know for certain the object has not been freed, so we don't have to hold the free lock + PoolWeakRef { + ptr: self.ptr, + uid: NonZeroU64::new_unchecked(self.ptr.as_ref().uid), + } + } + } + /// Returns a number that uniquely identifies this allocated `T` within this pool. No other instance of `T` may have this uid. + pub fn uid(&self) -> NonZeroU64 { + unsafe { + NonZeroU64::new_unchecked(self.ptr.as_ref().uid) + } + } +} +unsafe impl, const L: usize> Send for PoolArc where T: Send {} +unsafe impl, const L: usize> Sync for PoolArc where T: Sync {} + +impl, const L: usize> Deref for PoolArc { + type Target = T; + #[inline] + fn deref(&self) -> &Self::Target { + unsafe { + &self.ptr.as_ref().obj.full_obj + } + } +} +impl, const L: usize> Clone for PoolArc { + fn clone(&self) -> Self { + unsafe { + self.ptr.as_ref().ref_count.fetch_add(1, Ordering::Relaxed); + } + Self { + ptr: self.ptr, + _p: PhantomData, + } + } +} +impl, const L: usize> Drop for PoolArc { + #[inline] + fn drop(&mut self) { + unsafe { + if self.ptr.as_ref().ref_count.fetch_sub(1, Ordering::AcqRel) == 1 { + (*OriginPool::get_static_pool().cast::>()).free_ptr(self.ptr); + } + } + } +} + + +/// A non-owning reference to a `T` allocated by a pool. This reference has the special property that the underlying `T` can be dropped from the pool while neither making this reference invalid nor leaking the memory of `T`. Instead attempts to `grab` this reference will safely return `None` if the underlying `T` has been freed by any thread. +/// +/// Due to there thread safety and low overhead a `PoolWeakRef` implements clone and copy. +/// +/// The lifetime of this reference is tied to the lifetime of the pool it came from, because if it were allowed to live longer than its origin pool, it would no longer be safe to dereference and would most likely segfault. Instead the borrow-checker will enforce that this reference has a shorter lifetime that its origin pool. +pub struct PoolWeakRef { + /// A number that uniquely identifies this allocated `T` within this pool. No other instance of `T` may have this uid. This value is read-only. + pub uid: NonZeroU64, + ptr: NonNull>, +} + +impl PoolWeakRef { + /// Obtains a lock that allows the `T` contained in this `PoolWeakRef` to be dereferenced in a thread-safe manner. This lock does not prevent other threads from accessing `T` at the same time, so `T` ought to use interior mutability if it needs to be mutated in a thread-safe way. What this lock does guarantee is that `T` cannot be destructed and freed while it is being held. + /// + /// Do not attempt from within the same thread to drop the `PoolArc` that owns this `T` before dropping this lock, or else the thread will deadlock. Rust makes this quite hard to do accidentally but it's not strictly impossible. + #[inline] + pub fn grab<'b>(&self) -> Option> { + unsafe { + let slot = self.ptr.as_ref(); + let prevent_free_lock = slot.free_lock.read().unwrap(); + if slot.uid == self.uid.get() { + Some(PoolGuard(prevent_free_lock, &slot.obj.full_obj)) + } else { + None + } + } + } +} +unsafe impl Send for PoolWeakRef where T: Send {} +unsafe impl Sync for PoolWeakRef where T: Sync {} + +impl Clone for PoolWeakRef { + fn clone(&self) -> Self { + Self { + uid: self.uid, + ptr: self.ptr, + } + } +} +impl Copy for PoolWeakRef {} + + +pub struct PoolArcSwap, const L: usize = DEFAULT_L> { + ptr: AtomicPtr>, + reads: AtomicU32, + _p: PhantomData<*const OriginPool>, +} +impl, const L: usize> PoolArcSwap { + pub fn new(mut arc: PoolArc) -> Self { + unsafe { + let ret = Self { + ptr: AtomicPtr::new(arc.ptr.as_mut()), + reads: AtomicU32::new(0), + _p: arc._p, + }; + // Suppress reference decrement on new + mem::forget(arc); + ret + } + } + + pub fn swap(&self, arc: PoolArc) -> PoolArc { + unsafe { + let pre_ptr = self.ptr.swap(arc.ptr.as_ptr(), Ordering::Relaxed); + + while self.reads.load(Ordering::Acquire) > 0 { + std::hint::spin_loop() + } + + mem::forget(arc); + PoolArc { + ptr: NonNull::new_unchecked(pre_ptr), + _p: self._p, + } + } + } + + pub fn load(&self) -> PoolArc { + unsafe { + self.reads.fetch_add(1, Ordering::Acquire); + let ptr = self.ptr.load(Ordering::Relaxed); + (*ptr).ref_count.fetch_add(1, Ordering::Relaxed); + self.reads.fetch_sub(1, Ordering::Release); + PoolArc { + ptr: NonNull::new_unchecked(ptr), + _p: self._p, + } + } + } +} +unsafe impl, const L: usize> Send for PoolArcSwap where T: Send {} +unsafe impl, const L: usize> Sync for PoolArcSwap where T: Sync {} + +impl, const L: usize> Drop for PoolArcSwap { + #[inline] + fn drop(&mut self) { + unsafe { + let pre = self.ptr.load(Ordering::SeqCst); + PoolArc { + _p: self._p, + ptr: NonNull::new_unchecked(pre), + }; + } + } +} + + +pub struct PoolArcSwapRw, const L: usize = DEFAULT_L> { + ptr: RwLock>>, + _p: PhantomData<*const OriginPool>, +} + +impl, const L: usize> PoolArcSwapRw { + pub fn new(arc: PoolArc) -> Self { + let ret = Self { + ptr: RwLock::new(arc.ptr), + _p: arc._p, + }; + mem::forget(arc); + ret + } + + pub fn swap(&self, arc: PoolArc) -> PoolArc { + let mut w = self.ptr.write().unwrap(); + let pre = PoolArc { + ptr: *w, + _p: self._p, + }; + *w = arc.ptr; + mem::forget(arc); + pre + } + + pub fn load(&self) -> PoolArc { + let r = self.ptr.read().unwrap(); + unsafe { + r.as_ref().ref_count.fetch_add(1, Ordering::Relaxed); + } + let pre = PoolArc { + ptr: *r, + _p: self._p, + }; + pre + } +} +impl, const L: usize> Drop for PoolArcSwapRw { + #[inline] + fn drop(&mut self) { + let w = self.ptr.write().unwrap(); + PoolArc { + ptr: *w, + _p: self._p, + }; + } +} +unsafe impl, const L: usize> Send for PoolArcSwapRw where T: Send {} +unsafe impl, const L: usize> Sync for PoolArcSwapRw where T: Sync {} + +/// Automatically generates valid implementations of `StaticPool` onto a chosen identifier, allowing this module to allocate instances of `T` with `alloc`. Users have to generate implementations clientside because rust does not allow for generic globals. +/// +/// # Example +/// ``` +/// use zerotier_utils::arc_pool::{Pool, StaticPool, static_pool}; +/// +/// static_pool!(StaticPool MyPools { +/// Pool, Pool<&u32, 12> +/// }); +/// +/// let object = 1u32; +/// let arc_object = MyPools::alloc(object); +/// let arc_ref = MyPools::alloc(&object); +/// +/// assert_eq!(*arc_object, **arc_ref); +/// ``` +#[macro_export] +macro_rules! __static_pool__ { + ($m:ident $s:ident { $($($p:ident)::+<$t:ty$(, $l:tt)?>),+ $(,)?}) => { + struct $s {} + $( + impl $m<$t$(, $l)?> for $s { + #[inline(always)] + unsafe fn get_static_pool() -> *const () { + static POOL: $($p)::+<$t$(, $l)?> = $($p)::+::new(); + (&POOL as *const $($p)::+<$t$(, $l)?>).cast() + } + } + )* + }; + ($m:ident::$n:ident $s:ident { $($($p:ident)::+<$t:ty$(, $l:tt)?>),+ $(,)?}) => { + struct $s {} + $( + impl $m::$n<$t$(, $l)?> for $s { + #[inline(always)] + unsafe fn get_static_pool() -> *const () { + static POOL: $($p)::+<$t$(, $l)?> = $($p)::+::new(); + (&POOL as *const $($p)::+<$t$(, $l)?>).cast() + } + } + )* + }; +} +pub use __static_pool__ as static_pool; + +#[cfg(test)] +mod tests { + use std::{thread, sync::{Arc, atomic::AtomicU64}}; + use super::*; + + + fn rand(r: &mut u32) -> u32 { + /* Algorithm "xor" from p. 4 of Marsaglia, "Xorshift RNGs" */ + *r ^= *r << 13; + *r ^= *r >> 17; + *r ^= *r << 5; + *r + } + const fn prob(p: u64) -> u32 { + (p*(u32::MAX as u64)/100) as u32 + } + fn rand_idx<'a, T>(v: &'a [T], r: &mut u32) -> Option<&'a T> { + if v.len() > 0 { + Some(&v[(rand(r) as usize)%v.len()]) + } else { + None + } + } + fn rand_i<'a, T>(v: &'a [T], r: &mut u32) -> Option { + if v.len() > 0 { + Some((rand(r) as usize)%v.len()) + } else { + None + } + } + + struct Item { + a: u32, + count: &'static AtomicU64, + b: u32, + } + impl Item { + fn new(r: u32, count: &'static AtomicU64) -> Item { + count.fetch_add(1, Ordering::Relaxed); + Item { + a: r, + count, + b: r, + } + } + fn check(&self, id: u32) { + assert_eq!(self.a, self.b); + assert_eq!(self.a, id); + } + } + impl Drop for Item { + fn drop(&mut self) { + let _a = self.count.fetch_sub(1, Ordering::Relaxed); + assert_eq!(self.a, self.b); + } + } + + const POOL_U32_LEN: usize = (5*12)<<2; + static_pool!(StaticPool TestPools { + Pool, Pool + }); + + #[test] + fn usage() { + + let num1 = TestPools::alloc(1u32); + let num2 = TestPools::alloc(2u32); + let num3 = TestPools::alloc(3u32); + let num4 = TestPools::alloc(4u32); + let num2_weak = num2.downgrade(); + + assert_eq!(*num2_weak.grab().unwrap(), 2); + drop(num2); + + assert_eq!(*num1, 1); + assert_eq!(*num3, 3); + assert_eq!(*num4, 4); + assert!(num2_weak.grab().is_none()); + } + #[test] + fn single_thread() { + + let mut history = Vec::new(); + + let num1 = TestPools::alloc(1u32); + let num2 = TestPools::alloc(2u32); + let num3 = TestPools::alloc(3u32); + let num4 = TestPools::alloc(4u32); + let num2_weak = num2.downgrade(); + + for i in 0..1000 { + history.push(TestPools::alloc(i as u32)); + } + for i in 0..100 { + let arc = history.remove((i*10)%history.len()); + assert!(*arc < 1000); + } + for i in 0..1000 { + history.push(TestPools::alloc(i as u32)); + } + + assert_eq!(*num2_weak.grab().unwrap(), 2); + drop(num2); + + assert_eq!(*num1, 1); + assert_eq!(*num3, 3); + assert_eq!(*num4, 4); + assert!(num2_weak.grab().is_none()); + } + + #[test] + fn multi_thread() { + const N: usize = 123456; + static COUNT: AtomicU64 = AtomicU64::new(0); + + let mut joins = Vec::new(); + for i in 0..32 { + joins.push(thread::spawn(move || { + let r = &mut (i + 1234); + + let mut items_dup = Vec::new(); + let mut items = Vec::new(); + for _ in 0..N { + let p = rand(r); + if p < prob(30) { + let id = rand(r); + let s = TestPools::alloc(Item::new(id, &COUNT)); + items.push((id, s.clone(), s.downgrade())); + s.check(id); + } else if p < prob(60) { + if let Some((id, s, w)) = rand_idx(&items, r) { + items_dup.push((*id, s.clone(), w.clone())); + s.check(*id); + } + } else if p < prob(80) { + if let Some(i) = rand_i(&items, r) { + let (id, s, w) = items.swap_remove(i); + w.grab().unwrap().check(id); + s.check(id); + } + } else if p < prob(100) { + if let Some(i) = rand_i(&items_dup, r) { + let (id, s, w) = items_dup.swap_remove(i); + w.grab().unwrap().check(id); + s.check(id); + } + } + } + for (id, s, w) in items_dup { + s.check(id); + w.grab().unwrap().check(id); + } + for (id, s, w) in items { + s.check(id); + w.grab().unwrap().check(id); + drop(s); + assert!(w.grab().is_none()) + } + })); + } + for j in joins { + j.join().unwrap(); + } + assert_eq!(COUNT.load(Ordering::Relaxed), 0); + } + + #[test] + fn multi_thread_swap() { + const N: usize = 1234; + static COUNT: AtomicU64 = AtomicU64::new(0); + + let s = Arc::new(PoolArcSwap::new(TestPools::alloc(Item::new(0, &COUNT)))); + + for _ in 0..1234 { + let mut joins = Vec::new(); + for _ in 0..8 { + let swaps = s.clone(); + joins.push(thread::spawn(move || { + let r = &mut 1474; + let mut new = TestPools::alloc(Item::new(rand(r), &COUNT)); + for _ in 0..N { + new = swaps.swap(new); + } + })); + } + for j in joins { + j.join().unwrap(); + } + } + drop(s); + assert_eq!(COUNT.load(Ordering::Relaxed), 0); + } + + #[test] + fn multi_thread_swap_load() { + const N: usize = 123456; + static COUNT: AtomicU64 = AtomicU64::new(0); + + let s: Arc<[_; 8]> = Arc::new(std::array::from_fn(|i| PoolArcSwap::new(TestPools::alloc(Item::new(i as u32, &COUNT))))); + + let mut joins = Vec::new(); + + for i in 0..4 { + let swaps = s.clone(); + joins.push(thread::spawn(move || { + let r = &mut (i + 2783); + for _ in 0..N { + if let Some(s) = rand_idx(&swaps[..], r) { + let new = TestPools::alloc(Item::new(rand(r), &COUNT)); + let _a = s.swap(new); + } + } + })); + } + for i in 0..28 { + let swaps = s.clone(); + joins.push(thread::spawn(move || { + let r = &mut (i + 4136); + for _ in 0..N { + if let Some(s) = rand_idx(&swaps[..], r) { + let _a = s.load(); + assert_eq!(_a.a, _a.b); + } + } + })); + } + for j in joins { + j.join().unwrap(); + } + drop(s); + assert_eq!(COUNT.load(Ordering::Relaxed), 0); + } +} diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 194f7af4e..30d87df93 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -29,6 +29,8 @@ pub mod reaper; pub mod ringbuffer; pub mod sync; pub mod varint; +pub mod arc_pool; + #[cfg(feature = "tokio")] pub use tokio; From 7ec194a6d1a3a824020e8fa1e74569024f30248b Mon Sep 17 00:00:00 2001 From: mamoniot Date: Mon, 20 Mar 2023 15:29:02 -0400 Subject: [PATCH 2/4] ran cargo fmt --- utils/src/arc_pool.rs | 115 ++++++++++++++---------------------------- utils/src/lib.rs | 3 +- 2 files changed, 39 insertions(+), 79 deletions(-) diff --git a/utils/src/arc_pool.rs b/utils/src/arc_pool.rs index 1d5e71e1a..d9eb269e5 100644 --- a/utils/src/arc_pool.rs +++ b/utils/src/arc_pool.rs @@ -1,9 +1,12 @@ -use std::sync::{Mutex, RwLock, RwLockReadGuard, atomic::{AtomicU32, Ordering, AtomicPtr}}; -use std::mem::{self, MaybeUninit, ManuallyDrop}; -use std::ptr::{self, NonNull}; use std::marker::PhantomData; +use std::mem::{self, ManuallyDrop, MaybeUninit}; use std::num::NonZeroU64; use std::ops::Deref; +use std::ptr::{self, NonNull}; +use std::sync::{ + atomic::{AtomicPtr, AtomicU32, Ordering}, + Mutex, RwLock, RwLockReadGuard, +}; const DEFAULT_L: usize = 64; @@ -32,7 +35,7 @@ struct PoolMem { /// Atomic reference counting is also implemented allowing for exceedingly complex models of shared ownership. Multiple copies of both strong and weak references to the underlying `T` can be generated that are all memory safe and borrow-checked. /// /// Allocating from a pool results in very little internal and external fragmentation in the global heap, thus saving significant amounts of memory from being used by one's program. Pools also allocate memory significantly faster on average than the global allocator. This specific pool implementation supports guaranteed constant time `alloc` and `free`. -pub struct Pool (Mutex<(*mut Slot, u64, *mut PoolMem, usize)>); +pub struct Pool(Mutex<(*mut Slot, u64, *mut PoolMem, usize)>); unsafe impl Send for Pool {} unsafe impl Sync for Pool {} @@ -44,10 +47,9 @@ impl Pool { /// A `Pool` cannot be interacted with directly, it requires a `impl StaticPool for Pool` implementation. See the `static_pool!` macro for automatically generated trait implementation. #[inline] pub const fn new() -> Self { - Pool (Mutex::new((ptr::null_mut(), 1, ptr::null_mut(), usize::MAX))) + Pool(Mutex::new((ptr::null_mut(), 1, ptr::null_mut(), usize::MAX))) } - #[inline(always)] fn create_arr() -> [MaybeUninit>; L] { unsafe { MaybeUninit::<[MaybeUninit>; L]>::uninit().assume_init() } @@ -68,15 +70,12 @@ impl Pool { slot_ptr } else { if head_size >= L { - let new = Box::leak(Box::new(PoolMem { - pre: head_arena, - mem: Self::create_arr(), - })); + let new = Box::leak(Box::new(PoolMem { pre: head_arena, mem: Self::create_arr() })); head_arena = new; head_size = 0; } let slot = Slot { - obj: SlotState {full_obj: ManuallyDrop::new(obj)}, + obj: SlotState { full_obj: ManuallyDrop::new(obj) }, free_lock: RwLock::new(()), ref_count: AtomicU32::new(1), uid, @@ -122,7 +121,6 @@ impl Drop for Pool { } pub trait StaticPool { - /// Must return a pointer to an instance of a `Pool` with a static lifetime. That pointer must be cast to a `*const ()` to make the borrow-checker happy. /// /// **Safety**: The returned pointer must have originally been a `&'static Pool` reference. So it must have had a matching `T` and `L` and it must have the static lifetime. @@ -135,21 +133,23 @@ pub trait StaticPool { /// /// This `PoolArc` supports the ability to generate weak, non-owning references to the allocated `T`. #[inline(always)] - fn alloc(obj: T) -> PoolArc where Self: Sized { + fn alloc(obj: T) -> PoolArc + where + Self: Sized, + { unsafe { PoolArc { ptr: (*Self::get_static_pool().cast::>()).alloc_ptr(obj), - _p: PhantomData + _p: PhantomData, } } } } - /// A multithreading lock guard that prevents another thread from freeing the underlying `T` while it is held. It does not prevent other threads from accessing the underlying `T`. /// /// If the same thread that holds this guard attempts to free `T` before dropping the guard, it will deadlock. -pub struct PoolGuard<'a, T> (RwLockReadGuard<'a, ()>, &'a T); +pub struct PoolGuard<'a, T>(RwLockReadGuard<'a, ()>, &'a T); impl<'a, T> Deref for PoolGuard<'a, T> { type Target = T; #[inline] @@ -158,7 +158,6 @@ impl<'a, T> Deref for PoolGuard<'a, T> { } } - /// A rust-style RAII wrapper that drops and frees memory allocated from a pool automatically, the same as an `Arc`. This will run the destructor of `T` in place within the pool before freeing it, correctly maintaining the invariants that the borrow checker and rust compiler expect of generic types. pub struct PoolArc, const L: usize = DEFAULT_L> { ptr: NonNull>, @@ -179,9 +178,7 @@ impl, const L: usize> PoolArc } /// Returns a number that uniquely identifies this allocated `T` within this pool. No other instance of `T` may have this uid. pub fn uid(&self) -> NonZeroU64 { - unsafe { - NonZeroU64::new_unchecked(self.ptr.as_ref().uid) - } + unsafe { NonZeroU64::new_unchecked(self.ptr.as_ref().uid) } } } unsafe impl, const L: usize> Send for PoolArc where T: Send {} @@ -191,9 +188,7 @@ impl, const L: usize> Deref for PoolArc &Self::Target { - unsafe { - &self.ptr.as_ref().obj.full_obj - } + unsafe { &self.ptr.as_ref().obj.full_obj } } } impl, const L: usize> Clone for PoolArc { @@ -201,10 +196,7 @@ impl, const L: usize> Clone for PoolArc, const L: usize> Drop for PoolArc { @@ -218,7 +210,6 @@ impl, const L: usize> Drop for PoolArc Sync for PoolWeakRef where T: Sync {} impl Clone for PoolWeakRef { fn clone(&self) -> Self { - Self { - uid: self.uid, - ptr: self.ptr, - } + Self { uid: self.uid, ptr: self.ptr } } } impl Copy for PoolWeakRef {} - pub struct PoolArcSwap, const L: usize = DEFAULT_L> { ptr: AtomicPtr>, reads: AtomicU32, @@ -289,10 +276,7 @@ impl, const L: usize> PoolArcSwap, const L: usize> PoolArcSwap, const L: usize> Drop for PoolArcSwap, const L: usize = DEFAULT_L> { ptr: RwLock>>, _p: PhantomData<*const OriginPool>, @@ -333,20 +310,14 @@ pub struct PoolArcSwapRw, const L: usize = DEFAU impl, const L: usize> PoolArcSwapRw { pub fn new(arc: PoolArc) -> Self { - let ret = Self { - ptr: RwLock::new(arc.ptr), - _p: arc._p, - }; + let ret = Self { ptr: RwLock::new(arc.ptr), _p: arc._p }; mem::forget(arc); ret } pub fn swap(&self, arc: PoolArc) -> PoolArc { let mut w = self.ptr.write().unwrap(); - let pre = PoolArc { - ptr: *w, - _p: self._p, - }; + let pre = PoolArc { ptr: *w, _p: self._p }; *w = arc.ptr; mem::forget(arc); pre @@ -357,10 +328,7 @@ impl, const L: usize> PoolArcSwapRw, const L: usize> Drop for PoolArcSwapRw, const L: usize> Send for PoolArcSwapRw where T: Send {} @@ -424,9 +389,11 @@ pub use __static_pool__ as static_pool; #[cfg(test)] mod tests { - use std::{thread, sync::{Arc, atomic::AtomicU64}}; use super::*; - + use std::{ + sync::{atomic::AtomicU64, Arc}, + thread, + }; fn rand(r: &mut u32) -> u32 { /* Algorithm "xor" from p. 4 of Marsaglia, "Xorshift RNGs" */ @@ -436,18 +403,18 @@ mod tests { *r } const fn prob(p: u64) -> u32 { - (p*(u32::MAX as u64)/100) as u32 + (p * (u32::MAX as u64) / 100) as u32 } fn rand_idx<'a, T>(v: &'a [T], r: &mut u32) -> Option<&'a T> { if v.len() > 0 { - Some(&v[(rand(r) as usize)%v.len()]) + Some(&v[(rand(r) as usize) % v.len()]) } else { None } } fn rand_i<'a, T>(v: &'a [T], r: &mut u32) -> Option { if v.len() > 0 { - Some((rand(r) as usize)%v.len()) + Some((rand(r) as usize) % v.len()) } else { None } @@ -461,11 +428,7 @@ mod tests { impl Item { fn new(r: u32, count: &'static AtomicU64) -> Item { count.fetch_add(1, Ordering::Relaxed); - Item { - a: r, - count, - b: r, - } + Item { a: r, count, b: r } } fn check(&self, id: u32) { assert_eq!(self.a, self.b); @@ -479,14 +442,13 @@ mod tests { } } - const POOL_U32_LEN: usize = (5*12)<<2; + const POOL_U32_LEN: usize = (5 * 12) << 2; static_pool!(StaticPool TestPools { Pool, Pool }); #[test] fn usage() { - let num1 = TestPools::alloc(1u32); let num2 = TestPools::alloc(2u32); let num3 = TestPools::alloc(3u32); @@ -503,7 +465,6 @@ mod tests { } #[test] fn single_thread() { - let mut history = Vec::new(); let num1 = TestPools::alloc(1u32); @@ -516,7 +477,7 @@ mod tests { history.push(TestPools::alloc(i as u32)); } for i in 0..100 { - let arc = history.remove((i*10)%history.len()); + let arc = history.remove((i * 10) % history.len()); assert!(*arc < 1000); } for i in 0..1000 { @@ -645,7 +606,7 @@ mod tests { let _a = s.load(); assert_eq!(_a.a, _a.b); } - } + } })); } for j in joins { diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 30d87df93..a3be5577b 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -6,6 +6,7 @@ * https://www.zerotier.com/ */ +pub mod arc_pool; pub mod arrayvec; pub mod base64; pub mod blob; @@ -29,8 +30,6 @@ pub mod reaper; pub mod ringbuffer; pub mod sync; pub mod varint; -pub mod arc_pool; - #[cfg(feature = "tokio")] pub use tokio; From 984782d7798f11168ee8e0bae2e4054ed0498af8 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Mon, 20 Mar 2023 17:32:43 -0400 Subject: [PATCH 3/4] made the correctness tests less aggressive --- utils/src/arc_pool.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/utils/src/arc_pool.rs b/utils/src/arc_pool.rs index d9eb269e5..b4fd8c88f 100644 --- a/utils/src/arc_pool.rs +++ b/utils/src/arc_pool.rs @@ -495,7 +495,7 @@ mod tests { #[test] fn multi_thread() { - const N: usize = 123456; + const N: usize = 12345; static COUNT: AtomicU64 = AtomicU64::new(0); let mut joins = Vec::new(); @@ -556,7 +556,7 @@ mod tests { let s = Arc::new(PoolArcSwap::new(TestPools::alloc(Item::new(0, &COUNT)))); - for _ in 0..1234 { + for _ in 0..123 { let mut joins = Vec::new(); for _ in 0..8 { let swaps = s.clone(); @@ -578,7 +578,7 @@ mod tests { #[test] fn multi_thread_swap_load() { - const N: usize = 123456; + const N: usize = 12345; static COUNT: AtomicU64 = AtomicU64::new(0); let s: Arc<[_; 8]> = Arc::new(std::array::from_fn(|i| PoolArcSwap::new(TestPools::alloc(Item::new(i as u32, &COUNT))))); From c2125db444961ed7da3cf19ab76d943d2a7bbc11 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Tue, 21 Mar 2023 08:25:15 -0400 Subject: [PATCH 4/4] added pub option --- utils/src/arc_pool.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/utils/src/arc_pool.rs b/utils/src/arc_pool.rs index b4fd8c88f..5fd782a1d 100644 --- a/utils/src/arc_pool.rs +++ b/utils/src/arc_pool.rs @@ -348,7 +348,7 @@ unsafe impl, const L: usize> Sync for PoolArcSwa /// ``` /// use zerotier_utils::arc_pool::{Pool, StaticPool, static_pool}; /// -/// static_pool!(StaticPool MyPools { +/// static_pool!(pub StaticPool MyPools { /// Pool, Pool<&u32, 12> /// }); /// @@ -384,6 +384,30 @@ macro_rules! __static_pool__ { } )* }; + (pub $m:ident $s:ident { $($($p:ident)::+<$t:ty$(, $l:tt)?>),+ $(,)?}) => { + pub struct $s {} + $( + impl $m<$t$(, $l)?> for $s { + #[inline(always)] + unsafe fn get_static_pool() -> *const () { + static POOL: $($p)::+<$t$(, $l)?> = $($p)::+::new(); + (&POOL as *const $($p)::+<$t$(, $l)?>).cast() + } + } + )* + }; + (pub $m:ident::$n:ident $s:ident { $($($p:ident)::+<$t:ty$(, $l:tt)?>),+ $(,)?}) => { + pub struct $s {} + $( + impl $m::$n<$t$(, $l)?> for $s { + #[inline(always)] + unsafe fn get_static_pool() -> *const () { + static POOL: $($p)::+<$t$(, $l)?> = $($p)::+::new(); + (&POOL as *const $($p)::+<$t$(, $l)?>).cast() + } + } + )* + }; } pub use __static_pool__ as static_pool;