minor cleanup

This commit is contained in:
mamoniot 2023-03-25 12:45:58 -04:00
parent 951273f51c
commit 77f6f34dbe
No known key found for this signature in database
GPG key ID: ADCCDBBE0E3D3B3B
3 changed files with 39 additions and 66 deletions

View file

@ -9,7 +9,6 @@
mod applicationlayer;
mod error;
mod fragged;
mod priority_queue;
mod proto;
mod sessionid;
mod zssp;

View file

@ -1,52 +0,0 @@
/// Is a Donald Knuth minheap, which is extremely fast and memory efficient.
pub struct EventQueue {
heap: Vec<(i64, u64)>
}
impl EventQueue {
pub fn new() -> Self {
Self {
heap: Vec::new(),
}
}
/// Pops a single event from the queue if one exists to be run past the current time
pub fn pump(&mut self, current_time: i64) -> Option<(i64, u64)> {
if self.heap.len() > 0 {
if self.heap[0].0 <= current_time {
let ret = self.heap.swap_remove(0);
let mut parent = 0;
while 2*parent < self.heap.len() {
let child0 = 2*parent;
let child1 = child0 + 1;
let child_min = if child1 < self.heap.len() && self.heap[child1].0 < self.heap[child0].0 {
child1
} else {
child0
};
if self.heap[child_min].0 < self.heap[parent].0 {
self.heap.swap(parent, child_min);
parent = child_min;
} else {
break;
}
}
return Some(ret);
}
}
None
}
/// Pushes an event onto the queue with the given timestamp
pub fn push(&mut self, timestamp: i64, id: u64) {
let mut idx = self.heap.len();
self.heap.push((timestamp, id));
while idx > 0 {
let parent = idx/2;
if self.heap[parent].0 > self.heap[idx].0 {
self.heap.swap(parent, idx);
}
idx = parent;
}
}
}

View file

@ -40,7 +40,7 @@ const GCM_CIPHER_POOL_SIZE: usize = 4;
pub struct Context<'a, Application: ApplicationLayer> {
default_physical_mtu: AtomicUsize,
dos_salt: RandomState,
defrag_has_pending: AtomicBool, // Allowed to be falsely positive
defrag_has_pending: AtomicBool, // Allowed to be falsely positive
incoming_has_pending: AtomicBool, // Allowed to be falsely positive
defrag: Mutex<[(i64, u64, Fragged<Application::IncomingPacketBuffer, MAX_NOISE_HANDSHAKE_FRAGMENTS>); MAX_INCOMPLETE_SESSION_QUEUE_SIZE]>,
active_sessions: RwLock<HashMap<SessionId, Weak<Session<'a, Application>>>>,
@ -163,13 +163,12 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> {
/// * `send` - Function to send packets to remote sessions
/// * `current_time` - Current monotonic time in milliseconds
pub fn service<SendFunction: FnMut(&Arc<Session<Application>>, &mut [u8])>(&self, mut send: SendFunction, current_time: i64) -> i64 {
let mut dead_active = Vec::new();
let retry_cutoff = current_time - Application::RETRY_INTERVAL;
let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS;
// Scan sessions in read lock mode, then lock more briefly in write mode to delete any dead entries that we found.
let active_sessions = self.active_sessions.read().unwrap();
for (id, s) in active_sessions.iter() {
for (_id, s) in active_sessions.iter() {
if let Some(session) = s.upgrade() {
let state = session.state.read().unwrap();
if match &state.outgoing_offer {
@ -226,8 +225,6 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> {
}
}
}
} else {
dead_active.push(*id);
}
}
drop(active_sessions);
@ -318,7 +315,11 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> {
let mut hasher = self.dos_salt.build_hasher();
hasher.write_u64(local_session_id.into());
hashed_id = hasher.finish();
let (_, is_used) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS);
let (_, is_used) = lookup(
&*incoming_sessions,
hashed_id,
current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS,
);
if !is_used && !active_sessions.contains_key(&local_session_id) {
break;
}
@ -500,7 +501,11 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> {
hasher.write_u64(local_session_id.into());
let hashed_id = hasher.finish();
let (idx, is_old) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS);
let (idx, is_old) = lookup(
&*incoming_sessions,
hashed_id,
current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS,
);
if is_old {
incoming_sessions[idx].2.clone()
} else {
@ -557,8 +562,14 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> {
let hashed_counter = hasher.finish();
let mut defrag = self.defrag.lock().unwrap();
let (idx, is_old) = lookup(&*defrag, hashed_counter, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS);
assembled = defrag[idx].2.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
let (idx, is_old) = lookup(
&*defrag,
hashed_counter,
current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS,
);
assembled = defrag[idx]
.2
.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
if assembled.is_some() {
defrag[idx].0 = i64::MAX;
} else if !is_old {
@ -783,7 +794,11 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> {
hasher.write_u64(bob_session_id.into());
hashed_id = hasher.finish();
let is_used;
(bob_incoming_idx, is_used) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS);
(bob_incoming_idx, is_used) = lookup(
&*incoming_sessions,
hashed_id,
current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS,
);
if !is_used && !active_sessions.contains_key(&bob_session_id) {
break;
}
@ -1040,7 +1055,11 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> {
hasher.write_u64(incoming.bob_session_id.into());
let hashed_id = hasher.finish();
let mut incoming_sessions = self.incoming_sessions.write().unwrap();
let (bob_incoming_idx, is_old) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS);
let (bob_incoming_idx, is_old) = lookup(
&*incoming_sessions,
hashed_id,
current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS,
);
// Might have been removed already
if is_old {
incoming_sessions[bob_incoming_idx].0 = i64::MAX;
@ -1108,13 +1127,20 @@ impl<'a, Application: ApplicationLayer> Context<'a, Application> {
hasher.write_u64(incoming.bob_session_id.into());
let hashed_id = hasher.finish();
let mut incoming_sessions = self.incoming_sessions.write().unwrap();
let (bob_incoming_idx, is_present) = lookup(&*incoming_sessions, hashed_id, current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS);
let (bob_incoming_idx, is_present) = lookup(
&*incoming_sessions,
hashed_id,
current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS,
);
if is_present {
incoming_sessions[bob_incoming_idx].0 = i64::MAX;
incoming_sessions[bob_incoming_idx].1 = 0;
incoming_sessions[bob_incoming_idx].2 = None;
}
self.active_sessions.write().unwrap().insert(incoming.bob_session_id, Arc::downgrade(&session));
self.active_sessions
.write()
.unwrap()
.insert(incoming.bob_session_id, Arc::downgrade(&session));
}
let _ = session.send_nop(|b| send(Some(&session), b));