From 1e32e0ad2cd8892ee8bbd11460daf89d783ee160 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Wed, 22 Mar 2023 18:29:14 -0400 Subject: [PATCH 1/6] begun changes --- zssp/src/fragged.rs | 47 ++++++++++++++++++++++++--------------------- zssp/src/proto.rs | 1 + zssp/src/zssp.rs | 19 +++--------------- 3 files changed, 29 insertions(+), 38 deletions(-) diff --git a/zssp/src/fragged.rs b/zssp/src/fragged.rs index 01a0c3b20..6a9d2da33 100644 --- a/zssp/src/fragged.rs +++ b/zssp/src/fragged.rs @@ -3,6 +3,7 @@ use std::ptr::slice_from_raw_parts; /// Fast packet defragmenter pub struct Fragged { + count: u32, have: u64, counter: u64, frags: [MaybeUninit; MAX_FRAGMENTS], @@ -35,9 +36,9 @@ impl Fragged { // that the array of MaybeUninit can be freely cast into an array of // Fragment. They also check that the maximum number of fragments is not too large // for the fact that we use bits in a u64 to track which fragments are received. - assert!(MAX_FRAGMENTS <= 64); - assert_eq!(size_of::>(), size_of::()); - assert_eq!( + debug_assert!(MAX_FRAGMENTS <= 64); + debug_assert_eq!(size_of::>(), size_of::()); + debug_assert_eq!( size_of::<[MaybeUninit; MAX_FRAGMENTS]>(), size_of::<[Fragment; MAX_FRAGMENTS]>() ); @@ -51,12 +52,11 @@ impl Fragged { #[inline(always)] pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option> { if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS { - let mut have = self.have; // If the counter has changed, reset the structure to receive a new packet. if counter != self.counter { - self.counter = counter; if needs_drop::() { + let mut have = self.have; let mut i = 0; while have != 0 { if (have & 1) != 0 { @@ -66,26 +66,29 @@ impl Fragged { have = have.wrapping_shr(1); i += 1; } - } else { - have = 0; + } + self.have = 0; + self.count = fragment_count as u32; + self.counter = counter; + } + + let got = 1u64.wrapping_shl(fragment_no as u32); + if got & self.have == 0 && self.count as u8 == fragment_count { + self.have |= got; + unsafe { + self.frags.get_unchecked_mut(fragment_no as usize).write(fragment); + } + if self.have == 1u64.wrapping_shl(self.count) - 1 { + self.have = 0; + self.count = 0; + self.counter = 0; + // Setting 'have' to 0 resets the state of this object, and the fragments + // are effectively moved into the Assembled<> container and returned. That + // container will drop them when it is dropped. + return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize)); } } - unsafe { - self.frags.get_unchecked_mut(fragment_no as usize).write(fragment); - } - - let want = 0xffffffffffffffffu64.wrapping_shr((64 - fragment_count) as u32); - have |= 1u64.wrapping_shl(fragment_no as u32); - if (have & want) == want { - self.have = 0; - // Setting 'have' to 0 resets the state of this object, and the fragments - // are effectively moved into the Assembled<> container and returned. That - // container will drop them when it is dropped. - return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize)); - } else { - self.have = have; - } } return None; } diff --git a/zssp/src/proto.rs b/zssp/src/proto.rs index 826371983..06dc1dfaa 100644 --- a/zssp/src/proto.rs +++ b/zssp/src/proto.rs @@ -63,6 +63,7 @@ pub(crate) const KBKDF_KEY_USAGE_LABEL_AES_GCM_ALICE_TO_BOB: u8 = b'A'; // AES-G pub(crate) const KBKDF_KEY_USAGE_LABEL_AES_GCM_BOB_TO_ALICE: u8 = b'B'; // AES-GCM in B->A direction pub(crate) const KBKDF_KEY_USAGE_LABEL_RATCHET: u8 = b'R'; // Key used in derivatin of next session key +pub(crate) const MAX_INCOMPLETE_SESSION_QUEUE_SIZE: usize = 32; pub(crate) const MAX_FRAGMENTS: usize = 48; // hard protocol max: 63 pub(crate) const MAX_NOISE_HANDSHAKE_FRAGMENTS: usize = 16; // enough room for p384 + ZT identity + kyber1024 + tag/hmac/etc. pub(crate) const MAX_NOISE_HANDSHAKE_SIZE: usize = MAX_NOISE_HANDSHAKE_FRAGMENTS * MIN_TRANSPORT_MTU; diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index 660b3e65d..c20ce1e14 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -36,17 +36,8 @@ const GCM_CIPHER_POOL_SIZE: usize = 4; /// Each application using ZSSP must create an instance of this to own sessions and /// defragment incoming packets that are not yet associated with a session. pub struct Context { - max_incomplete_session_queue_size: usize, default_physical_mtu: AtomicUsize, - defrag: Mutex< - HashMap< - (Application::PhysicalPath, u64), - Arc<( - Mutex>, - i64, // creation timestamp - )>, - >, - >, + defrag: Mutex<[Fragged; MAX_INCOMPLETE_SESSION_QUEUE_SIZE]>, sessions: RwLock>, } @@ -154,11 +145,10 @@ impl Context { /// Create a new session context. /// /// * `max_incomplete_session_queue_size` - Maximum number of incomplete sessions in negotiation phase - pub fn new(max_incomplete_session_queue_size: usize, default_physical_mtu: usize) -> Self { + pub fn new(default_physical_mtu: usize) -> Self { Self { - max_incomplete_session_queue_size, default_physical_mtu: AtomicUsize::new(default_physical_mtu), - defrag: Mutex::new(HashMap::new()), + defrag: Mutex::new(std::array::from_fn(|_| Fragged::new())), sessions: RwLock::new(SessionsById { active: HashMap::with_capacity(64), incoming: HashMap::with_capacity(64), @@ -262,9 +252,6 @@ impl Context { } } - // Delete any expired defragmentation queue items not associated with a session. - self.defrag.lock().unwrap().retain(|_, fragged| fragged.1 > negotiation_timeout_cutoff); - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS.min(Application::RETRY_INTERVAL) } From 9e4e0998434adb617d17560efebbed7392ac4080 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Wed, 22 Mar 2023 18:55:35 -0400 Subject: [PATCH 2/6] got new defragmenter working --- zssp/src/fragged.rs | 4 +++ zssp/src/main.rs | 4 +-- zssp/src/zssp.rs | 76 +++++++++++++++++---------------------------- 3 files changed, 34 insertions(+), 50 deletions(-) diff --git a/zssp/src/fragged.rs b/zssp/src/fragged.rs index 6a9d2da33..264d713b4 100644 --- a/zssp/src/fragged.rs +++ b/zssp/src/fragged.rs @@ -45,6 +45,10 @@ impl Fragged { unsafe { zeroed() } } + #[inline(always)] + pub fn counter(&self) -> u64 { + self.counter + } /// Add a fragment and return an assembled packet container if all fragments have been received. /// /// When a fully assembled packet is returned the internal state is reset and this object can diff --git a/zssp/src/main.rs b/zssp/src/main.rs index 8bf5a9684..0df766689 100644 --- a/zssp/src/main.rs +++ b/zssp/src/main.rs @@ -46,7 +46,7 @@ fn alice_main( alice_out: mpsc::SyncSender>, alice_in: mpsc::Receiver>, ) { - let context = zssp::Context::::new(16, TEST_MTU); + let context = zssp::Context::::new(TEST_MTU); let mut data_buf = [0u8; 65536]; let mut next_service = ms_monotonic() + 500; let mut last_ratchet_count = 0; @@ -157,7 +157,7 @@ fn bob_main( bob_out: mpsc::SyncSender>, bob_in: mpsc::Receiver>, ) { - let context = zssp::Context::::new(16, TEST_MTU); + let context = zssp::Context::::new(TEST_MTU); let mut data_buf = [0u8; 65536]; let mut data_buf_2 = [0u8; TEST_MTU]; let mut last_ratchet_count = 0; diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index c20ce1e14..f03430f64 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -9,7 +9,10 @@ // ZSSP: ZeroTier Secure Session Protocol // FIPS compliant Noise_XK with Jedi powers (Kyber1024) and built-in attack-resistant large payload (fragmentation) support. -use std::collections::{HashMap, HashSet}; +use std::collections::hash_map::RandomState; +//use std::collections::hash_map::DefaultHasher; +use std::hash::{BuildHasher, Hash, Hasher}; +use std::collections::HashMap; use std::num::NonZeroU64; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; @@ -37,7 +40,8 @@ const GCM_CIPHER_POOL_SIZE: usize = 4; /// defragment incoming packets that are not yet associated with a session. pub struct Context { default_physical_mtu: AtomicUsize, - defrag: Mutex<[Fragged; MAX_INCOMPLETE_SESSION_QUEUE_SIZE]>, + defrag_hasher: RandomState, + defrag: [Mutex>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE], sessions: RwLock>, } @@ -148,7 +152,8 @@ impl Context { pub fn new(default_physical_mtu: usize) -> Self { Self { default_physical_mtu: AtomicUsize::new(default_physical_mtu), - defrag: Mutex::new(std::array::from_fn(|_| Fragged::new())), + defrag_hasher: RandomState::new(), + defrag: std::array::from_fn(|_| Mutex::new(Fragged::new())), sessions: RwLock::new(SessionsById { active: HashMap::with_capacity(64), incoming: HashMap::with_capacity(64), @@ -522,48 +527,34 @@ impl Context { } else { let (key_index, packet_type, fragment_count, fragment_no, incoming_counter) = parse_packet_header(&incoming_physical_packet); - let (assembled_packet, incoming_packet_buf_arr); + let assembled; let incoming_packet = if fragment_count > 1 { - assembled_packet = { - let mut defrag = self.defrag.lock().unwrap(); - let f = defrag - .entry((source.clone(), incoming_counter)) - .or_insert_with(|| Arc::new((Mutex::new(Fragged::new()), current_time))) - .clone(); + let mut hasher = self.defrag_hasher.build_hasher(); + source.hash(&mut hasher); + hasher.write_u64(incoming_counter); + let offer_id = hasher.finish(); + let idx0 = (offer_id as usize)%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; + let idx1 = (offer_id as usize)/MAX_INCOMPLETE_SESSION_QUEUE_SIZE%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; - // Anti-DOS overflow purge of the incoming defragmentation queue for packets not associated with known sessions. - if defrag.len() >= self.max_incomplete_session_queue_size { - // First, drop all entries that are timed out or whose physical source duplicates another entry. - let mut sources = HashSet::with_capacity(defrag.len()); - let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS; - defrag - .retain(|k, fragged| (fragged.1 > negotiation_timeout_cutoff && sources.insert(k.0.clone())) || Arc::ptr_eq(fragged, &f)); - - // Then, if we are still at or over the limit, drop 10% of remaining entries at random. - if defrag.len() >= self.max_incomplete_session_queue_size { - let mut rn = random::next_u32_secure(); - defrag.retain(|_, fragged| { - rn = prng32(rn); - rn > (u32::MAX / 10) || Arc::ptr_eq(fragged, &f) - }); - } + let mut slot0 = self.defrag[idx0].lock().unwrap(); + if slot0.counter() == offer_id { + assembled = slot0.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); + } else { + let mut slot1 = self.defrag[idx1].lock().unwrap(); + if slot1.counter() == offer_id || slot1.counter() == 0 { + assembled = slot1.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); + } else { + assembled = slot0.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); } - - f } - .0 - .lock() - .unwrap() - .assemble(incoming_counter, incoming_physical_packet_buf, fragment_no, fragment_count); - if let Some(assembled_packet) = assembled_packet.as_ref() { - self.defrag.lock().unwrap().remove(&(source.clone(), incoming_counter)); + + if let Some(assembled_packet) = &assembled { assembled_packet.as_ref() } else { return Ok(ReceiveResult::Ok(None)); } } else { - incoming_packet_buf_arr = [incoming_physical_packet_buf]; - &incoming_packet_buf_arr + std::array::from_ref(&incoming_physical_packet_buf) }; return self.process_complete_incoming_packet( @@ -788,7 +779,7 @@ impl Context { // If this queue is too big, we remove the latest entry and replace it. The latest // is used because under flood conditions this is most likely to be another bogus // entry. If we find one that is actually timed out, that one is replaced instead. - if sessions.incoming.len() >= self.max_incomplete_session_queue_size { + if sessions.incoming.len() >= MAX_INCOMPLETE_SESSION_QUEUE_SIZE { let mut newest = i64::MIN; let mut replace_id = None; let cutoff_time = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS; @@ -1697,14 +1688,3 @@ fn kbkdf512(key: &Secret) -> Secret(key: &Secret) -> Secret<32> { hmac_sha512_secret256(key.as_bytes(), &[1, b'Z', b'T', LABEL, 0x00, 0, 1u8, 0u8]) } - -fn prng32(mut x: u32) -> u32 { - // based on lowbias32 from https://nullprogram.com/blog/2018/07/31/ - x = x.wrapping_add(1); // don't get stuck on 0 - x ^= x.wrapping_shr(16); - x = x.wrapping_mul(0x7feb352d); - x ^= x.wrapping_shr(15); - x = x.wrapping_mul(0x846ca68b); - x ^= x.wrapping_shr(16); - x -} From 562631f18d9f6df7e03e60f70eb831dcfe8fbbe8 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Wed, 22 Mar 2023 19:14:25 -0400 Subject: [PATCH 3/6] added counter randomization --- zssp/src/fragged.rs | 2 ++ zssp/src/zssp.rs | 40 ++++++++++++++++++++++------------------ 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/zssp/src/fragged.rs b/zssp/src/fragged.rs index 264d713b4..feadde5a1 100644 --- a/zssp/src/fragged.rs +++ b/zssp/src/fragged.rs @@ -45,6 +45,8 @@ impl Fragged { unsafe { zeroed() } } + /// Returns the counter value associated with the packet currently being assembled. + /// If no packet is currently being assembled it returns 0. #[inline(always)] pub fn counter(&self) -> u64 { self.counter diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index f03430f64..3721b8868 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -198,7 +198,7 @@ impl Context { PACKET_TYPE_ALICE_NOISE_XK_INIT, None, 0, - 1, + random::next_u64_secure(), None, ); } @@ -378,7 +378,7 @@ impl Context { PACKET_TYPE_ALICE_NOISE_XK_INIT, None, 0, - 1, + random::next_u64_secure(), None, )?; } @@ -450,7 +450,7 @@ impl Context { let (key_index, packet_type, fragment_count, fragment_no, incoming_counter) = parse_packet_header(&incoming_physical_packet); if session.check_receive_window(incoming_counter) { - let (assembled_packet, incoming_packet_buf_arr); + let assembled_packet; let incoming_packet = if fragment_count > 1 { assembled_packet = session.defrag[(incoming_counter as usize) % COUNTER_WINDOW_MAX_OOO] .lock() @@ -462,8 +462,7 @@ impl Context { return Ok(ReceiveResult::Ok(Some(session))); } } else { - incoming_packet_buf_arr = [incoming_physical_packet_buf]; - &incoming_packet_buf_arr + std::array::from_ref(&incoming_physical_packet_buf) }; return self.process_complete_incoming_packet( @@ -491,7 +490,7 @@ impl Context { .decrypt_block_in_place(&mut incoming_physical_packet[HEADER_PROTECT_ENCRYPT_START..HEADER_PROTECT_ENCRYPT_END]); let (key_index, packet_type, fragment_count, fragment_no, incoming_counter) = parse_packet_header(&incoming_physical_packet); - let (assembled_packet, incoming_packet_buf_arr); + let assembled_packet; let incoming_packet = if fragment_count > 1 { assembled_packet = incoming.defrag[(incoming_counter as usize) % COUNTER_WINDOW_MAX_OOO] .lock() @@ -503,8 +502,7 @@ impl Context { return Ok(ReceiveResult::Ok(None)); } } else { - incoming_packet_buf_arr = [incoming_physical_packet_buf]; - &incoming_packet_buf_arr + std::array::from_ref(&incoming_physical_packet_buf) }; return self.process_complete_incoming_packet( @@ -532,19 +530,25 @@ impl Context { let mut hasher = self.defrag_hasher.build_hasher(); source.hash(&mut hasher); hasher.write_u64(incoming_counter); - let offer_id = hasher.finish(); - let idx0 = (offer_id as usize)%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; - let idx1 = (offer_id as usize)/MAX_INCOMPLETE_SESSION_QUEUE_SIZE%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; + let hashed_counter = hasher.finish(); + let idx0 = (hashed_counter as usize)%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; + let idx1 = (hashed_counter as usize)/MAX_INCOMPLETE_SESSION_QUEUE_SIZE%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; + // Open hash lookup of just 2 slots. + // By only checking 2 slots we avoid an expensive hash table lookup while also minimizing the chance that 2 offers collide. + // To DOS, an adversary would either need to volumetrically spam the defrag table to keep all slots full + // or replay Alice's packet header before Alice's packet is fully assembled. + // Since Alice's packet header has a randomly generated counter value replaying it in time is very hard. let mut slot0 = self.defrag[idx0].lock().unwrap(); - if slot0.counter() == offer_id { - assembled = slot0.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); + if slot0.counter() == hashed_counter { + assembled = slot0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count); } else { let mut slot1 = self.defrag[idx1].lock().unwrap(); - if slot1.counter() == offer_id || slot1.counter() == 0 { - assembled = slot1.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); + if slot1.counter() == hashed_counter || slot1.counter() == 0 { + assembled = slot1.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count); } else { - assembled = slot0.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count); + // Both slots are full so kick out the unassembled packet in slot0 to make more room. + assembled = slot0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count); } } @@ -563,7 +567,7 @@ impl Context { &mut check_allow_incoming_session, &mut check_accept_session, data_buf, - incoming_counter, + 1,// The incoming_counter on init packets is only meant for DOS resistant defragmentation, we do not want to use it for anything noise related. incoming_packet, packet_type, None, @@ -702,7 +706,7 @@ impl Context { * identity, then responds with his ephemeral keys. */ - if incoming_counter != 1 || session.is_some() || incoming.is_some() { + if session.is_some() || incoming.is_some() { return Err(Error::OutOfSequence); } if pkt_assembled.len() != AliceNoiseXKInit::SIZE { From adcf553f18c895bcd39c0c5e095eb30213fc9fa9 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Wed, 22 Mar 2023 19:15:07 -0400 Subject: [PATCH 4/6] corrected comment --- zssp/src/zssp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index 3721b8868..f9ed6d4e8 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -537,7 +537,7 @@ impl Context { // Open hash lookup of just 2 slots. // By only checking 2 slots we avoid an expensive hash table lookup while also minimizing the chance that 2 offers collide. // To DOS, an adversary would either need to volumetrically spam the defrag table to keep all slots full - // or replay Alice's packet header before Alice's packet is fully assembled. + // or replay Alice's packet header from a spoofed physical path before Alice's packet is fully assembled. // Since Alice's packet header has a randomly generated counter value replaying it in time is very hard. let mut slot0 = self.defrag[idx0].lock().unwrap(); if slot0.counter() == hashed_counter { From 1925d0f98eb9db43a7cc33712e8a9164b38b9a48 Mon Sep 17 00:00:00 2001 From: mamoniot Date: Wed, 22 Mar 2023 23:27:37 -0400 Subject: [PATCH 5/6] added comments and reduced path requirements --- zssp/src/applicationlayer.rs | 2 +- zssp/src/zssp.rs | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/zssp/src/applicationlayer.rs b/zssp/src/applicationlayer.rs index 1c89ff72f..ef8df25eb 100644 --- a/zssp/src/applicationlayer.rs +++ b/zssp/src/applicationlayer.rs @@ -71,7 +71,7 @@ pub trait ApplicationLayer: Sized { /// /// A physical path could be an IP address or IP plus device in the case of UDP, a socket in the /// case of TCP, etc. - type PhysicalPath: PartialEq + Eq + Hash + Clone; + type PhysicalPath: Hash; /// Get a reference to this host's static public key blob. /// diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index f9ed6d4e8..85b66a9e5 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -40,7 +40,7 @@ const GCM_CIPHER_POOL_SIZE: usize = 4; /// defragment incoming packets that are not yet associated with a session. pub struct Context { default_physical_mtu: AtomicUsize, - defrag_hasher: RandomState, + defrag_salt: RandomState, defrag: [Mutex>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE], sessions: RwLock>, } @@ -152,7 +152,7 @@ impl Context { pub fn new(default_physical_mtu: usize) -> Self { Self { default_physical_mtu: AtomicUsize::new(default_physical_mtu), - defrag_hasher: RandomState::new(), + defrag_salt: RandomState::new(), defrag: std::array::from_fn(|_| Mutex::new(Fragged::new())), sessions: RwLock::new(SessionsById { active: HashMap::with_capacity(64), @@ -527,7 +527,9 @@ impl Context { let assembled; let incoming_packet = if fragment_count > 1 { - let mut hasher = self.defrag_hasher.build_hasher(); + // Using just incoming_counter unhashed would be good DOS resistant, + // but why not make it harder by mixing in a random value and the physical path in as well. + let mut hasher = self.defrag_salt.build_hasher(); source.hash(&mut hasher); hasher.write_u64(incoming_counter); let hashed_counter = hasher.finish(); @@ -535,10 +537,12 @@ impl Context { let idx1 = (hashed_counter as usize)/MAX_INCOMPLETE_SESSION_QUEUE_SIZE%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; // Open hash lookup of just 2 slots. - // By only checking 2 slots we avoid an expensive hash table lookup while also minimizing the chance that 2 offers collide. + // By only checking 2 slots we avoid a full table lookup while also minimizing the chance that 2 offers collide. // To DOS, an adversary would either need to volumetrically spam the defrag table to keep all slots full // or replay Alice's packet header from a spoofed physical path before Alice's packet is fully assembled. - // Since Alice's packet header has a randomly generated counter value replaying it in time is very hard. + // Volumetric spam is quite difficult since without the `defrag_salt: RandomState` value an adversary + // cannot control which slots their fragments index to. And since Alice's packet header has a randomly + // generated counter value replaying it in time requires extreme amounts of network control. let mut slot0 = self.defrag[idx0].lock().unwrap(); if slot0.counter() == hashed_counter { assembled = slot0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count); @@ -547,7 +551,7 @@ impl Context { if slot1.counter() == hashed_counter || slot1.counter() == 0 { assembled = slot1.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count); } else { - // Both slots are full so kick out the unassembled packet in slot0 to make more room. + // slot1 is full so kick out whatever is in slot0 to make more room. assembled = slot0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count); } } From 8ddc054cfb86899f1189efc5f9454fff5975f94a Mon Sep 17 00:00:00 2001 From: mamoniot Date: Thu, 23 Mar 2023 08:52:10 -0400 Subject: [PATCH 6/6] ran cargo fmt --- zssp/src/fragged.rs | 2 -- zssp/src/zssp.rs | 8 ++++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/zssp/src/fragged.rs b/zssp/src/fragged.rs index feadde5a1..f3f5f1eae 100644 --- a/zssp/src/fragged.rs +++ b/zssp/src/fragged.rs @@ -58,7 +58,6 @@ impl Fragged { #[inline(always)] pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option> { if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS { - // If the counter has changed, reset the structure to receive a new packet. if counter != self.counter { if needs_drop::() { @@ -94,7 +93,6 @@ impl Fragged { return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize)); } } - } return None; } diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index 85b66a9e5..f07260799 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -11,8 +11,8 @@ use std::collections::hash_map::RandomState; //use std::collections::hash_map::DefaultHasher; -use std::hash::{BuildHasher, Hash, Hasher}; use std::collections::HashMap; +use std::hash::{BuildHasher, Hash, Hasher}; use std::num::NonZeroU64; use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; @@ -533,8 +533,8 @@ impl Context { source.hash(&mut hasher); hasher.write_u64(incoming_counter); let hashed_counter = hasher.finish(); - let idx0 = (hashed_counter as usize)%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; - let idx1 = (hashed_counter as usize)/MAX_INCOMPLETE_SESSION_QUEUE_SIZE%MAX_INCOMPLETE_SESSION_QUEUE_SIZE; + let idx0 = (hashed_counter as usize) % MAX_INCOMPLETE_SESSION_QUEUE_SIZE; + let idx1 = (hashed_counter as usize) / MAX_INCOMPLETE_SESSION_QUEUE_SIZE % MAX_INCOMPLETE_SESSION_QUEUE_SIZE; // Open hash lookup of just 2 slots. // By only checking 2 slots we avoid a full table lookup while also minimizing the chance that 2 offers collide. @@ -571,7 +571,7 @@ impl Context { &mut check_allow_incoming_session, &mut check_accept_session, data_buf, - 1,// The incoming_counter on init packets is only meant for DOS resistant defragmentation, we do not want to use it for anything noise related. + 1, // The incoming_counter on init packets is only meant for DOS resistant defragmentation, we do not want to use it for anything noise related. incoming_packet, packet_type, None,