tested threading

This commit is contained in:
mamoniot 2023-03-22 00:31:40 -04:00
parent d471138eb5
commit 7fbd8350c7
No known key found for this signature in database
GPG key ID: ADCCDBBE0E3D3B3B
2 changed files with 52 additions and 37 deletions

View file

@ -1,11 +1,14 @@
use std::cell::UnsafeCell;
use std::mem::{needs_drop, size_of, zeroed, MaybeUninit}; use std::mem::{needs_drop, size_of, zeroed, MaybeUninit};
use std::ptr::slice_from_raw_parts; use std::ptr::slice_from_raw_parts;
use std::sync::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
/// Fast packet defragmenter /// Fast packet defragmenter
pub struct Fragged<Fragment, const MAX_FRAGMENTS: usize> { pub struct Fragged<Fragment, const MAX_FRAGMENTS: usize> {
have: u64, have: AtomicU64,
counter: u64, counter: RwLock<u64>,
frags: [MaybeUninit<Fragment>; MAX_FRAGMENTS], frags: UnsafeCell<[MaybeUninit<Fragment>; MAX_FRAGMENTS]>,
} }
pub struct Assembled<Fragment, const MAX_FRAGMENTS: usize>([MaybeUninit<Fragment>; MAX_FRAGMENTS], usize); pub struct Assembled<Fragment, const MAX_FRAGMENTS: usize>([MaybeUninit<Fragment>; MAX_FRAGMENTS], usize);
@ -49,42 +52,56 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
/// When a fully assembled packet is returned the internal state is reset and this object can /// When a fully assembled packet is returned the internal state is reset and this object can
/// be reused to assemble another packet. /// be reused to assemble another packet.
#[inline(always)] #[inline(always)]
pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option<Assembled<Fragment, MAX_FRAGMENTS>> { pub fn assemble(&self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option<Assembled<Fragment, MAX_FRAGMENTS>> {
if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS { if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS {
let mut have = self.have; let r = self.counter.read().unwrap();
let cur_counter = *r;
let mut r_guard = Some(r);
let mut w_guard = None;
// If the counter has changed, reset the structure to receive a new packet. // If the counter has changed, reset the structure to receive a new packet.
if counter != self.counter { if counter != cur_counter {
self.counter = counter; drop(r_guard.take());
if needs_drop::<Fragment>() { let mut w = self.counter.write().unwrap();
let mut i = 0; if *w != counter {
while have != 0 { *w = counter;
if (have & 1) != 0 { if needs_drop::<Fragment>() {
debug_assert!(i < MAX_FRAGMENTS); let mut have = self.have.load(Ordering::Relaxed);
unsafe { self.frags.get_unchecked_mut(i).assume_init_drop() }; let mut i = 0;
while have != 0 {
if (have & 1) != 0 {
debug_assert!(i < MAX_FRAGMENTS);
unsafe { (*self.frags.get()).get_unchecked_mut(i).assume_init_drop() };
}
have = have.wrapping_shr(1);
i += 1;
} }
have = have.wrapping_shr(1);
i += 1;
} }
} else { self.have.store(0, Ordering::Relaxed);
have = 0;
} }
} w_guard = Some(w);
unsafe {
self.frags.get_unchecked_mut(fragment_no as usize).write(fragment);
} }
let want = 0xffffffffffffffffu64.wrapping_shr((64 - fragment_count) as u32); let want = 0xffffffffffffffffu64.wrapping_shr((64 - fragment_count) as u32);
have |= 1u64.wrapping_shl(fragment_no as u32); let got = 1u64.wrapping_shl(fragment_no as u32);
if (have & want) == want { let have = self.have.fetch_or(got, Ordering::Relaxed);
self.have = 0;
// Setting 'have' to 0 resets the state of this object, and the fragments if have & got == 0 {
// are effectively moved into the Assembled<> container and returned. That unsafe {
// container will drop them when it is dropped. (*self.frags.get()).get_unchecked_mut(fragment_no as usize).write(fragment);
return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize)); }
} else { if ((have | got) & want) == want {
self.have = have; drop(r_guard.take());
let mut w = w_guard.unwrap_or_else(|| self.counter.write().unwrap());
if *w == counter {
*w = 0;
self.have.store(0, Ordering::Relaxed);
// Setting 'have' to 0 resets the state of this object, and the fragments
// are effectively moved into the Assembled<> container and returned. That
// container will drop them when it is dropped.
return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize));
}
}
} }
} }
return None; return None;
@ -95,12 +112,12 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGME
#[inline(always)] #[inline(always)]
fn drop(&mut self) { fn drop(&mut self) {
if needs_drop::<Fragment>() { if needs_drop::<Fragment>() {
let mut have = self.have; let mut have = self.have.load(Ordering::Relaxed);
let mut i = 0; let mut i = 0;
while have != 0 { while have != 0 {
if (have & 1) != 0 { if (have & 1) != 0 {
debug_assert!(i < MAX_FRAGMENTS); debug_assert!(i < MAX_FRAGMENTS);
unsafe { self.frags.get_unchecked_mut(i).assume_init_drop() }; unsafe { (*self.frags.get()).get_unchecked_mut(i).assume_init_drop() };
} }
have = have.wrapping_shr(1); have = have.wrapping_shr(1);
i += 1; i += 1;

View file

@ -90,7 +90,7 @@ pub struct Session<Application: ApplicationLayer> {
receive_window: [AtomicU64; COUNTER_WINDOW_MAX_OOO], receive_window: [AtomicU64; COUNTER_WINDOW_MAX_OOO],
header_protection_cipher: Aes, header_protection_cipher: Aes,
state: RwLock<State>, state: RwLock<State>,
defrag: [Mutex<Fragged<Application::IncomingPacketBuffer, MAX_FRAGMENTS>>; COUNTER_WINDOW_MAX_OOO], defrag: [Fragged<Application::IncomingPacketBuffer, MAX_FRAGMENTS>; COUNTER_WINDOW_MAX_OOO],
} }
/// Most of the mutable parts of a session state. /// Most of the mutable parts of a session state.
@ -343,7 +343,7 @@ impl<Application: ApplicationLayer> Context<Application> {
init_packet: [0u8; AliceNoiseXKInit::SIZE], init_packet: [0u8; AliceNoiseXKInit::SIZE],
})), })),
}), }),
defrag: std::array::from_fn(|_| Mutex::new(Fragged::new())), defrag: std::array::from_fn(|_| Fragged::new()),
}); });
sessions.active.insert(local_session_id, Arc::downgrade(&session)); sessions.active.insert(local_session_id, Arc::downgrade(&session));
@ -461,8 +461,6 @@ impl<Application: ApplicationLayer> Context<Application> {
let (assembled_packet, incoming_packet_buf_arr); let (assembled_packet, incoming_packet_buf_arr);
let incoming_packet = if fragment_count > 1 { let incoming_packet = if fragment_count > 1 {
assembled_packet = session.defrag[(incoming_counter as usize) % COUNTER_WINDOW_MAX_OOO] assembled_packet = session.defrag[(incoming_counter as usize) % COUNTER_WINDOW_MAX_OOO]
.lock()
.unwrap()
.assemble(incoming_counter, incoming_physical_packet_buf, fragment_no, fragment_count); .assemble(incoming_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
if let Some(assembled_packet) = assembled_packet.as_ref() { if let Some(assembled_packet) = assembled_packet.as_ref() {
assembled_packet.as_ref() assembled_packet.as_ref()
@ -1102,7 +1100,7 @@ impl<Application: ApplicationLayer> Context<Application> {
current_key: 0, current_key: 0,
outgoing_offer: Offer::None, outgoing_offer: Offer::None,
}), }),
defrag: std::array::from_fn(|_| Mutex::new(Fragged::new())), defrag: std::array::from_fn(|_| Fragged::new()),
}); });
// Promote incoming session to active. // Promote incoming session to active.