diff --git a/zssp/src/lib.rs b/zssp/src/lib.rs index 3865ef771..d4b41ca62 100644 --- a/zssp/src/lib.rs +++ b/zssp/src/lib.rs @@ -9,7 +9,6 @@ mod applicationlayer; mod error; mod fragged; -mod priority_queue; mod proto; mod sessionid; mod zssp; diff --git a/zssp/src/priority_queue.rs b/zssp/src/priority_queue.rs deleted file mode 100644 index 1f877828e..000000000 --- a/zssp/src/priority_queue.rs +++ /dev/null @@ -1,52 +0,0 @@ - -/// Is a Donald Knuth minheap, which is extremely fast and memory efficient. -pub struct EventQueue { - heap: Vec<(i64, u64)> -} - -impl EventQueue { - pub fn new() -> Self { - Self { - heap: Vec::new(), - } - } - /// Pops a single event from the queue if one exists to be run past the current time - pub fn pump(&mut self, current_time: i64) -> Option<(i64, u64)> { - if self.heap.len() > 0 { - if self.heap[0].0 <= current_time { - let ret = self.heap.swap_remove(0); - let mut parent = 0; - while 2*parent < self.heap.len() { - let child0 = 2*parent; - let child1 = child0 + 1; - let child_min = if child1 < self.heap.len() && self.heap[child1].0 < self.heap[child0].0 { - child1 - } else { - child0 - }; - if self.heap[child_min].0 < self.heap[parent].0 { - self.heap.swap(parent, child_min); - parent = child_min; - } else { - break; - } - } - return Some(ret); - } - } - None - } - - /// Pushes an event onto the queue with the given timestamp - pub fn push(&mut self, timestamp: i64, id: u64) { - let mut idx = self.heap.len(); - self.heap.push((timestamp, id)); - while idx > 0 { - let parent = idx/2; - if self.heap[parent].0 > self.heap[idx].0 { - self.heap.swap(parent, idx); - } - idx = parent; - } - } -} diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index b35018550..465eb034c 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -40,7 +40,7 @@ const GCM_CIPHER_POOL_SIZE: usize = 4; pub struct Context<'a, Application: ApplicationLayer> { default_physical_mtu: AtomicUsize, dos_salt: RandomState, - defrag_has_pending: AtomicBool, // Allowed to be falsely positive + defrag_has_pending: AtomicBool, // Allowed to be falsely positive incoming_has_pending: AtomicBool, // Allowed to be falsely positive defrag: Mutex<[(i64, u64, Fragged); MAX_INCOMPLETE_SESSION_QUEUE_SIZE]>, active_sessions: RwLock>>>, @@ -163,13 +163,12 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> { /// * `send` - Function to send packets to remote sessions /// * `current_time` - Current monotonic time in milliseconds pub fn service>, &mut [u8])>(&self, mut send: SendFunction, current_time: i64) -> i64 { - let mut dead_active = Vec::new(); let retry_cutoff = current_time - Application::RETRY_INTERVAL; let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS; // Scan sessions in read lock mode, then lock more briefly in write mode to delete any dead entries that we found. let active_sessions = self.active_sessions.read().unwrap(); - for (id, s) in active_sessions.iter() { + for (_id, s) in active_sessions.iter() { if let Some(session) = s.upgrade() { let state = session.state.read().unwrap(); if match &state.outgoing_offer { @@ -226,8 +225,6 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> { } } } - } else { - dead_active.push(*id); } } drop(active_sessions); @@ -318,7 +315,11 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> { let mut hasher = self.dos_salt.build_hasher(); hasher.write_u64(local_session_id.into()); hashed_id = hasher.finish(); - let (_, is_used) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS); + let (_, is_used) = lookup( + &*incoming_sessions, + hashed_id, + current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS, + ); if !is_used && !active_sessions.contains_key(&local_session_id) { break; } @@ -500,7 +501,11 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> { hasher.write_u64(local_session_id.into()); let hashed_id = hasher.finish(); - let (idx, is_old) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS); + let (idx, is_old) = lookup( + &*incoming_sessions, + hashed_id, + current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS, + ); if is_old { incoming_sessions[idx].2.clone() } else { @@ -557,8 +562,14 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> { let hashed_counter = hasher.finish(); let mut defrag = self.defrag.lock().unwrap(); - let (idx, is_old) = lookup(&*defrag, hashed_counter, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS); - assembled = defrag[idx].2.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count); + let (idx, is_old) = lookup( + &*defrag, + hashed_counter, + current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS, + ); + assembled = defrag[idx] + .2 + .assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count); if assembled.is_some() { defrag[idx].0 = i64::MAX; } else if !is_old { @@ -783,7 +794,11 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> { hasher.write_u64(bob_session_id.into()); hashed_id = hasher.finish(); let is_used; - (bob_incoming_idx, is_used) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS); + (bob_incoming_idx, is_used) = lookup( + &*incoming_sessions, + hashed_id, + current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS, + ); if !is_used && !active_sessions.contains_key(&bob_session_id) { break; } @@ -1040,7 +1055,11 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> { hasher.write_u64(incoming.bob_session_id.into()); let hashed_id = hasher.finish(); let mut incoming_sessions = self.incoming_sessions.write().unwrap(); - let (bob_incoming_idx, is_old) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS); + let (bob_incoming_idx, is_old) = lookup( + &*incoming_sessions, + hashed_id, + current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS, + ); // Might have been removed already if is_old { incoming_sessions[bob_incoming_idx].0 = i64::MAX; @@ -1108,13 +1127,20 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> { hasher.write_u64(incoming.bob_session_id.into()); let hashed_id = hasher.finish(); let mut incoming_sessions = self.incoming_sessions.write().unwrap(); - let (bob_incoming_idx, is_present) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS); + let (bob_incoming_idx, is_present) = lookup( + &*incoming_sessions, + hashed_id, + current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS, + ); if is_present { incoming_sessions[bob_incoming_idx].0 = i64::MAX; incoming_sessions[bob_incoming_idx].1 = 0; incoming_sessions[bob_incoming_idx].2 = None; } - self.active_sessions.write().unwrap().insert(incoming.bob_session_id, Arc::downgrade(&session)); + self.active_sessions + .write() + .unwrap() + .insert(incoming.bob_session_id, Arc::downgrade(&session)); } let _ = session.send_nop(|b| send(Some(&session), b));