diff --git a/zssp/Cargo.toml b/zssp/Cargo.toml index 29cc28958..d4888b865 100644 --- a/zssp/Cargo.toml +++ b/zssp/Cargo.toml @@ -5,11 +5,11 @@ license = "MPL-2.0" name = "zssp" version = "0.1.0" -[profile.release] -opt-level = 3 -lto = true -codegen-units = 1 -panic = 'abort' +#[profile.release] +#opt-level = 3 +#lto = true +#codegen-units = 1 +#panic = 'abort' [lib] name = "zssp" diff --git a/zssp/src/fragged.rs b/zssp/src/fragged.rs index 77ac96d44..92ab6cb83 100644 --- a/zssp/src/fragged.rs +++ b/zssp/src/fragged.rs @@ -29,6 +29,7 @@ impl Drop for Assembled Fragged { + #[inline(always)] pub fn new() -> Self { debug_assert!(MAX_FRAGMENTS <= 64); debug_assert_eq!(size_of::>(), size_of::()); @@ -39,6 +40,11 @@ impl Fragged { unsafe { zeroed() } } + /// Add a fragment and return an assembled packet container if all fragments have been received. + /// + /// When a fully assembled packet is returned the internal state is reset and this object can + /// be reused to assemble another packet. + #[inline(always)] pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option> { if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS { debug_assert!((fragment_count as usize) <= MAX_FRAGMENTS); @@ -70,6 +76,9 @@ impl Fragged { have |= 1u64.wrapping_shl(fragment_no as u32); if (have & want) == want { self.have = 0; + // Setting 'have' to 0 resets the state of this object, and the fragments + // are effectively moved into the Assembled<> container and returned. That + // container will drop them when it is dropped. return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize)); } else { self.have = have; @@ -80,6 +89,7 @@ impl Fragged { } impl Drop for Fragged { + #[inline(always)] fn drop(&mut self) { if needs_drop::() { let mut have = self.have; diff --git a/zssp/src/main.rs b/zssp/src/main.rs index 3d223c5a1..f9db5a977 100644 --- a/zssp/src/main.rs +++ b/zssp/src/main.rs @@ -272,7 +272,7 @@ fn main() { let alice_thread = ts.spawn(|| alice_main(&run, packet_success_rate, &alice_app, &bob_app, alice_out, alice_in)); let bob_thread = ts.spawn(|| bob_main(&run, packet_success_rate, &alice_app, &bob_app, bob_out, bob_in)); - thread::sleep(Duration::from_secs(60 * 10)); + thread::sleep(Duration::from_secs(60 * 60)); run.store(false, Ordering::SeqCst); let _ = alice_thread.join(); diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index 406cdaba4..a8a8f76b0 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -171,6 +171,7 @@ impl Context { let retry_cutoff = current_time - Application::RETRY_INTERVAL; let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS; + // Scan sessions in read lock mode, then lock more briefly in write mode to delete any dead entries that we found. { let sessions = self.sessions.read().unwrap(); for (id, s) in sessions.active.iter() { @@ -251,6 +252,7 @@ impl Context { } } + // Delete any expired defragmentation queue items not associated with a session. self.defrag.lock().unwrap().retain(|_, fragged| fragged.1 > negotiation_timeout_cutoff); Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS.min(Application::RETRY_INTERVAL) @@ -502,8 +504,7 @@ impl Context { .or_insert_with(|| Arc::new((Mutex::new(Fragged::new()), current_time))) .clone(); - // Anti-DOS emergency cleaning of the incoming defragmentation queue for packets not - // associated with known sessions. + // Anti-DOS overflow purge of the incoming defragmentation queue for packets not associated with known sessions. if defrag.len() >= self.max_incomplete_session_queue_size { // First, drop all entries that are timed out or whose physical source duplicates another entry. let mut sources = HashSet::with_capacity(defrag.len()); @@ -700,15 +701,9 @@ impl Context { * * Bob authenticates the message and confirms that Alice indeed knows Bob's * identity, then responds with his ephemeral keys. - * - * Bob also sends an opaque sealed object called Bob's "note to self." It contains - * Bob's state for the connection as of this first exchange, allowing Bob to be - * stateless until he knows and has confirmed Alice's identity. It's encrypted, - * authenticated, subject to a short TTL, and contains only information relevant - * to the current exchange. */ - if incoming_counter != 1 || session.is_some() { + if incoming_counter != 1 || session.is_some() || incoming.is_some() { return Err(Error::OutOfSequence); } if pkt_assembled.len() != AliceNoiseXKInit::SIZE { @@ -770,10 +765,10 @@ impl Context { } } + // If this queue is too big, we remove the latest entry and replace it. The latest + // is used because under flood conditions this is most likely to be another bogus + // entry. If we find one that is actually timed out, that one is replaced instead. if sessions.incoming.len() >= self.max_incomplete_session_queue_size { - // If this queue is too big, we remove the latest entry and replace it. The latest - // is used because under flood conditions this is most likely to be another bogus - // entry. If we find one that is actually timed out, that one is replaced instead. let mut newest = i64::MIN; let mut replace_id = None; let cutoff_time = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS; @@ -1390,7 +1385,7 @@ impl Session { /// Check whether this session is established. pub fn established(&self) -> bool { let state = self.state.read().unwrap(); - state.keys[state.current_key].is_some() + state.keys[state.current_key].as_ref().map_or(false, |k| k.confirmed) } /// Get the ratchet count and a hash fingerprint of the current active key.