diff --git a/zssp/src/fragged.rs b/zssp/src/fragged.rs index 01a0c3b20..6a9d2da33 100644 --- a/zssp/src/fragged.rs +++ b/zssp/src/fragged.rs @@ -3,6 +3,7 @@ use std::ptr::slice_from_raw_parts; /// Fast packet defragmenter pub struct Fragged { + count: u32, have: u64, counter: u64, frags: [MaybeUninit; MAX_FRAGMENTS], @@ -35,9 +36,9 @@ impl Fragged { // that the array of MaybeUninit can be freely cast into an array of // Fragment. They also check that the maximum number of fragments is not too large // for the fact that we use bits in a u64 to track which fragments are received. - assert!(MAX_FRAGMENTS <= 64); - assert_eq!(size_of::>(), size_of::()); - assert_eq!( + debug_assert!(MAX_FRAGMENTS <= 64); + debug_assert_eq!(size_of::>(), size_of::()); + debug_assert_eq!( size_of::<[MaybeUninit; MAX_FRAGMENTS]>(), size_of::<[Fragment; MAX_FRAGMENTS]>() ); @@ -51,12 +52,11 @@ impl Fragged { #[inline(always)] pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option> { if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS { - let mut have = self.have; // If the counter has changed, reset the structure to receive a new packet. if counter != self.counter { - self.counter = counter; if needs_drop::() { + let mut have = self.have; let mut i = 0; while have != 0 { if (have & 1) != 0 { @@ -66,26 +66,29 @@ impl Fragged { have = have.wrapping_shr(1); i += 1; } - } else { - have = 0; + } + self.have = 0; + self.count = fragment_count as u32; + self.counter = counter; + } + + let got = 1u64.wrapping_shl(fragment_no as u32); + if got & self.have == 0 && self.count as u8 == fragment_count { + self.have |= got; + unsafe { + self.frags.get_unchecked_mut(fragment_no as usize).write(fragment); + } + if self.have == 1u64.wrapping_shl(self.count) - 1 { + self.have = 0; + self.count = 0; + self.counter = 0; + // Setting 'have' to 0 resets the state of this object, and the fragments + // are effectively moved into the Assembled<> container and returned. That + // container will drop them when it is dropped. + return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize)); } } - unsafe { - self.frags.get_unchecked_mut(fragment_no as usize).write(fragment); - } - - let want = 0xffffffffffffffffu64.wrapping_shr((64 - fragment_count) as u32); - have |= 1u64.wrapping_shl(fragment_no as u32); - if (have & want) == want { - self.have = 0; - // Setting 'have' to 0 resets the state of this object, and the fragments - // are effectively moved into the Assembled<> container and returned. That - // container will drop them when it is dropped. - return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize)); - } else { - self.have = have; - } } return None; } diff --git a/zssp/src/proto.rs b/zssp/src/proto.rs index 826371983..06dc1dfaa 100644 --- a/zssp/src/proto.rs +++ b/zssp/src/proto.rs @@ -63,6 +63,7 @@ pub(crate) const KBKDF_KEY_USAGE_LABEL_AES_GCM_ALICE_TO_BOB: u8 = b'A'; // AES-G pub(crate) const KBKDF_KEY_USAGE_LABEL_AES_GCM_BOB_TO_ALICE: u8 = b'B'; // AES-GCM in B->A direction pub(crate) const KBKDF_KEY_USAGE_LABEL_RATCHET: u8 = b'R'; // Key used in derivatin of next session key +pub(crate) const MAX_INCOMPLETE_SESSION_QUEUE_SIZE: usize = 32; pub(crate) const MAX_FRAGMENTS: usize = 48; // hard protocol max: 63 pub(crate) const MAX_NOISE_HANDSHAKE_FRAGMENTS: usize = 16; // enough room for p384 + ZT identity + kyber1024 + tag/hmac/etc. pub(crate) const MAX_NOISE_HANDSHAKE_SIZE: usize = MAX_NOISE_HANDSHAKE_FRAGMENTS * MIN_TRANSPORT_MTU; diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index 660b3e65d..c20ce1e14 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -36,17 +36,8 @@ const GCM_CIPHER_POOL_SIZE: usize = 4; /// Each application using ZSSP must create an instance of this to own sessions and /// defragment incoming packets that are not yet associated with a session. pub struct Context { - max_incomplete_session_queue_size: usize, default_physical_mtu: AtomicUsize, - defrag: Mutex< - HashMap< - (Application::PhysicalPath, u64), - Arc<( - Mutex>, - i64, // creation timestamp - )>, - >, - >, + defrag: Mutex<[Fragged; MAX_INCOMPLETE_SESSION_QUEUE_SIZE]>, sessions: RwLock>, } @@ -154,11 +145,10 @@ impl Context { /// Create a new session context. /// /// * `max_incomplete_session_queue_size` - Maximum number of incomplete sessions in negotiation phase - pub fn new(max_incomplete_session_queue_size: usize, default_physical_mtu: usize) -> Self { + pub fn new(default_physical_mtu: usize) -> Self { Self { - max_incomplete_session_queue_size, default_physical_mtu: AtomicUsize::new(default_physical_mtu), - defrag: Mutex::new(HashMap::new()), + defrag: Mutex::new(std::array::from_fn(|_| Fragged::new())), sessions: RwLock::new(SessionsById { active: HashMap::with_capacity(64), incoming: HashMap::with_capacity(64), @@ -262,9 +252,6 @@ impl Context { } } - // Delete any expired defragmentation queue items not associated with a session. - self.defrag.lock().unwrap().retain(|_, fragged| fragged.1 > negotiation_timeout_cutoff); - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS.min(Application::RETRY_INTERVAL) }