From 9e4e0998434adb617d17560efebbed7392ac4080 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Wed, 22 Mar 2023 18:55:35 -0400 Subject: [PATCH] got new defragmenter working --- zssp/src/fragged.rs | 4 +++ zssp/src/main.rs | 4 +-- zssp/src/zssp.rs | 76 +++++++++++++++++---------------------------- 3 files changed, 34 insertions(+), 50 deletions(-) diff --git a/zssp/src/fragged.rs b/zssp/src/fragged.rs index 6a9d2da33..264d713b4 100644 --- a/zssp/src/fragged.rs +++ b/zssp/src/fragged.rs @@ -45,6 +45,10 @@ impl Fragged { unsafe { zeroed() } } + #[inline(always)] + pub fn counter(&self) -> u64 { + self.counter + } /// Add a fragment and return an assembled packet container if all fragments have been received. /// /// When a fully assembled packet is returned the internal state is reset and this object can diff --git a/zssp/src/main.rs b/zssp/src/main.rs index 8bf5a9684..0df766689 100644 --- a/zssp/src/main.rs +++ b/zssp/src/main.rs @@ -46,7 +46,7 @@ fn alice_main( alice_out: mpsc::SyncSender>, alice_in: mpsc::Receiver>, ) { - let context = zssp::Context::::new(16, TEST_MTU); + let context = zssp::Context::::new(TEST_MTU); let mut data_buf = [0u8; 65536]; let mut next_service = ms_monotonic() + 500; let mut last_ratchet_count = 0; @@ -157,7 +157,7 @@ fn bob_main( bob_out: mpsc::SyncSender>, bob_in: mpsc::Receiver>, ) { - let context = zssp::Context::::new(16, TEST_MTU); + let context = zssp::Context::::new(TEST_MTU); let mut data_buf = [0u8; 65536]; let mut data_buf_2 = [0u8; TEST_MTU]; let mut last_ratchet_count = 0; diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index c20ce1e14..f03430f64 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -9,7 +9,10 @@ // ZSSP: ZeroTier Secure Session Protocol // FIPS compliant Noise_XK with Jedi powers (Kyber1024) and built-in attack-resistant large payload (fragmentation) support. -use std::collections::{HashMap, HashSet}; +use std::collections::hash_map::RandomState; +//use std::collections::hash_map::DefaultHasher; +use std::hash::{BuildHasher, Hash, Hasher}; +use std::collections::HashMap; use std::num::NonZeroU64; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; @@ -37,7 +40,8 @@ const GCM_CIPHER_POOL_SIZE: usize = 4; /// defragment incoming packets that are not yet associated with a session. pub struct Context { default_physical_mtu: AtomicUsize, - defrag: Mutex<[Fragged; MAX_INCOMPLETE_SESSION_QUEUE_SIZE]>, + defrag_hasher: RandomState, + defrag: [Mutex>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE], sessions: RwLock>, } @@ -148,7 +152,8 @@ impl Context { pub fn new(default_physical_mtu: usize) -> Self { Self { default_physical_mtu: AtomicUsize::new(default_physical_mtu), - defrag: Mutex::new(std::array::from_fn(|_| Fragged::new())), + defrag_hasher: RandomState::new(), + defrag: std::array::from_fn(|_| Mutex::new(Fragged::new())), sessions: RwLock::new(SessionsById { active: HashMap::with_capacity(64), incoming: HashMap::with_capacity(64), @@ -522,48 +527,34 @@ impl Context { } else { let (key_index, packet_type, fragment_count, fragment_no, incoming_counter) = parse_packet_header(&incoming_physical_packet); - let (assembled_packet, incoming_packet_buf_arr); + let assembled; let incoming_packet = if fragment_count > 1 { - assembled_packet = { - let mut defrag = self.defrag.lock().unwrap(); - let f = defrag - .entry((source.clone(), incoming_counter)) - .or_insert_with(|| Arc::new((Mutex::new(Fragged::new()), current_time))) - .clone(); + let mut hasher = self.defrag_hasher.build_hasher(); + source.hash(&mut hasher); + hasher.write_u64(incoming_counter); + let offer_id = hasher.finish(); + let idx0 = (offer_id as usize)%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; + let idx1 = (offer_id as usize)/MAX_INCOMPLETE_SESSION_QUEUE_SIZE%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; - // Anti-DOS overflow purge of the incoming defragmentation queue for packets not associated with known sessions. - if defrag.len() >= self.max_incomplete_session_queue_size { - // First, drop all entries that are timed out or whose physical source duplicates another entry. - let mut sources = HashSet::with_capacity(defrag.len()); - let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS; - defrag - .retain(|k, fragged| (fragged.1 > negotiation_timeout_cutoff && sources.insert(k.0.clone())) || Arc::ptr_eq(fragged, &f)); - - // Then, if we are still at or over the limit, drop 10% of remaining entries at random. - if defrag.len() >= self.max_incomplete_session_queue_size { - let mut rn = random::next_u32_secure(); - defrag.retain(|_, fragged| { - rn = prng32(rn); - rn > (u32::MAX / 10) || Arc::ptr_eq(fragged, &f) - }); - } + let mut slot0 = self.defrag[idx0].lock().unwrap(); + if slot0.counter() == offer_id { + assembled = slot0.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); + } else { + let mut slot1 = self.defrag[idx1].lock().unwrap(); + if slot1.counter() == offer_id || slot1.counter() == 0 { + assembled = slot1.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); + } else { + assembled = slot0.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); } - - f } - .0 - .lock() - .unwrap() - .assemble(incoming_counter, incoming_physical_packet_buf, fragment_no, fragment_count); - if let Some(assembled_packet) = assembled_packet.as_ref() { - self.defrag.lock().unwrap().remove(&(source.clone(), incoming_counter)); + + if let Some(assembled_packet) = &assembled { assembled_packet.as_ref() } else { return Ok(ReceiveResult::Ok(None)); } } else { - incoming_packet_buf_arr = [incoming_physical_packet_buf]; - &incoming_packet_buf_arr + std::array::from_ref(&incoming_physical_packet_buf) }; return self.process_complete_incoming_packet( @@ -788,7 +779,7 @@ impl Context { // If this queue is too big, we remove the latest entry and replace it. The latest // is used because under flood conditions this is most likely to be another bogus // entry. If we find one that is actually timed out, that one is replaced instead. - if sessions.incoming.len() >= self.max_incomplete_session_queue_size { + if sessions.incoming.len() >= MAX_INCOMPLETE_SESSION_QUEUE_SIZE { let mut newest = i64::MIN; let mut replace_id = None; let cutoff_time = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS; @@ -1697,14 +1688,3 @@ fn kbkdf512(key: &Secret) -> Secret(key: &Secret) -> Secret<32> { hmac_sha512_secret256(key.as_bytes(), &[1, b'Z', b'T', LABEL, 0x00, 0, 1u8, 0u8]) } - -fn prng32(mut x: u32) -> u32 { - // based on lowbias32 from https://nullprogram.com/blog/2018/07/31/ - x = x.wrapping_add(1); // don't get stuck on 0 - x ^= x.wrapping_shr(16); - x = x.wrapping_mul(0x7feb352d); - x ^= x.wrapping_shr(15); - x = x.wrapping_mul(0x846ca68b); - x ^= x.wrapping_shr(16); - x -}