got new defragmenter working

This commit is contained in:
mamoniot 2023-03-22 18:55:35 -04:00
parent 1e32e0ad2c
commit 9e4e099843
No known key found for this signature in database
GPG key ID: ADCCDBBE0E3D3B3B
3 changed files with 34 additions and 50 deletions

View file

@ -45,6 +45,10 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
unsafe { zeroed() }
}
#[inline(always)]
pub fn counter(&self) -> u64 {
self.counter
}
/// Add a fragment and return an assembled packet container if all fragments have been received.
///
/// When a fully assembled packet is returned the internal state is reset and this object can

View file

@ -46,7 +46,7 @@ fn alice_main(
alice_out: mpsc::SyncSender<Vec<u8>>,
alice_in: mpsc::Receiver<Vec<u8>>,
) {
let context = zssp::Context::<TestApplication>::new(16, TEST_MTU);
let context = zssp::Context::<TestApplication>::new(TEST_MTU);
let mut data_buf = [0u8; 65536];
let mut next_service = ms_monotonic() + 500;
let mut last_ratchet_count = 0;
@ -157,7 +157,7 @@ fn bob_main(
bob_out: mpsc::SyncSender<Vec<u8>>,
bob_in: mpsc::Receiver<Vec<u8>>,
) {
let context = zssp::Context::<TestApplication>::new(16, TEST_MTU);
let context = zssp::Context::<TestApplication>::new(TEST_MTU);
let mut data_buf = [0u8; 65536];
let mut data_buf_2 = [0u8; TEST_MTU];
let mut last_ratchet_count = 0;

View file

@ -9,7 +9,10 @@
// ZSSP: ZeroTier Secure Session Protocol
// FIPS compliant Noise_XK with Jedi powers (Kyber1024) and built-in attack-resistant large payload (fragmentation) support.
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::RandomState;
//use std::collections::hash_map::DefaultHasher;
use std::hash::{BuildHasher, Hash, Hasher};
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
@ -37,7 +40,8 @@ const GCM_CIPHER_POOL_SIZE: usize = 4;
/// defragment incoming packets that are not yet associated with a session.
pub struct Context<Application: ApplicationLayer> {
default_physical_mtu: AtomicUsize,
defrag: Mutex<[Fragged<Application::IncomingPacketBuffer, MAX_NOISE_HANDSHAKE_FRAGMENTS>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE]>,
defrag_hasher: RandomState,
defrag: [Mutex<Fragged<Application::IncomingPacketBuffer, MAX_NOISE_HANDSHAKE_FRAGMENTS>>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE],
sessions: RwLock<SessionsById<Application>>,
}
@ -148,7 +152,8 @@ impl<Application: ApplicationLayer> Context<Application> {
pub fn new(default_physical_mtu: usize) -> Self {
Self {
default_physical_mtu: AtomicUsize::new(default_physical_mtu),
defrag: Mutex::new(std::array::from_fn(|_| Fragged::new())),
defrag_hasher: RandomState::new(),
defrag: std::array::from_fn(|_| Mutex::new(Fragged::new())),
sessions: RwLock::new(SessionsById {
active: HashMap::with_capacity(64),
incoming: HashMap::with_capacity(64),
@ -522,48 +527,34 @@ impl<Application: ApplicationLayer> Context<Application> {
} else {
let (key_index, packet_type, fragment_count, fragment_no, incoming_counter) = parse_packet_header(&incoming_physical_packet);
let (assembled_packet, incoming_packet_buf_arr);
let assembled;
let incoming_packet = if fragment_count > 1 {
assembled_packet = {
let mut defrag = self.defrag.lock().unwrap();
let f = defrag
.entry((source.clone(), incoming_counter))
.or_insert_with(|| Arc::new((Mutex::new(Fragged::new()), current_time)))
.clone();
let mut hasher = self.defrag_hasher.build_hasher();
source.hash(&mut hasher);
hasher.write_u64(incoming_counter);
let offer_id = hasher.finish();
let idx0 = (offer_id as usize)%MAX_INCOMPLETE_SESSION_QUEUE_SIZE;
let idx1 = (offer_id as usize)/MAX_INCOMPLETE_SESSION_QUEUE_SIZE%MAX_INCOMPLETE_SESSION_QUEUE_SIZE;
// Anti-DOS overflow purge of the incoming defragmentation queue for packets not associated with known sessions.
if defrag.len() >= self.max_incomplete_session_queue_size {
// First, drop all entries that are timed out or whose physical source duplicates another entry.
let mut sources = HashSet::with_capacity(defrag.len());
let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS;
defrag
.retain(|k, fragged| (fragged.1 > negotiation_timeout_cutoff && sources.insert(k.0.clone())) || Arc::ptr_eq(fragged, &f));
// Then, if we are still at or over the limit, drop 10% of remaining entries at random.
if defrag.len() >= self.max_incomplete_session_queue_size {
let mut rn = random::next_u32_secure();
defrag.retain(|_, fragged| {
rn = prng32(rn);
rn > (u32::MAX / 10) || Arc::ptr_eq(fragged, &f)
});
}
let mut slot0 = self.defrag[idx0].lock().unwrap();
if slot0.counter() == offer_id {
assembled = slot0.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count);
} else {
let mut slot1 = self.defrag[idx1].lock().unwrap();
if slot1.counter() == offer_id || slot1.counter() == 0 {
assembled = slot1.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count);
} else {
assembled = slot0.assemble(offer_id, incoming_physical_packet_buf, fragment_no, fragment_count);
}
f
}
.0
.lock()
.unwrap()
.assemble(incoming_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
if let Some(assembled_packet) = assembled_packet.as_ref() {
self.defrag.lock().unwrap().remove(&(source.clone(), incoming_counter));
if let Some(assembled_packet) = &assembled {
assembled_packet.as_ref()
} else {
return Ok(ReceiveResult::Ok(None));
}
} else {
incoming_packet_buf_arr = [incoming_physical_packet_buf];
&incoming_packet_buf_arr
std::array::from_ref(&incoming_physical_packet_buf)
};
return self.process_complete_incoming_packet(
@ -788,7 +779,7 @@ impl<Application: ApplicationLayer> Context<Application> {
// If this queue is too big, we remove the latest entry and replace it. The latest
// is used because under flood conditions this is most likely to be another bogus
// entry. If we find one that is actually timed out, that one is replaced instead.
if sessions.incoming.len() >= self.max_incomplete_session_queue_size {
if sessions.incoming.len() >= MAX_INCOMPLETE_SESSION_QUEUE_SIZE {
let mut newest = i64::MIN;
let mut replace_id = None;
let cutoff_time = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS;
@ -1697,14 +1688,3 @@ fn kbkdf512<const LABEL: u8>(key: &Secret<NOISE_HASHLEN>) -> Secret<NOISE_HASHLE
fn kbkdf256<const LABEL: u8>(key: &Secret<NOISE_HASHLEN>) -> Secret<32> {
hmac_sha512_secret256(key.as_bytes(), &[1, b'Z', b'T', LABEL, 0x00, 0, 1u8, 0u8])
}
fn prng32(mut x: u32) -> u32 {
// based on lowbias32 from https://nullprogram.com/blog/2018/07/31/
x = x.wrapping_add(1); // don't get stuck on 0
x ^= x.wrapping_shr(16);
x = x.wrapping_mul(0x7feb352d);
x ^= x.wrapping_shr(15);
x = x.wrapping_mul(0x846ca68b);
x ^= x.wrapping_shr(16);
x
}