mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-06 12:33:44 +02:00
Just a bit of final ZSSP cleanup before moving to another thing.
This commit is contained in:
parent
4360e0b487
commit
3f6c7f27a1
4 changed files with 24 additions and 19 deletions
|
@ -5,11 +5,11 @@ license = "MPL-2.0"
|
||||||
name = "zssp"
|
name = "zssp"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
|
||||||
[profile.release]
|
#[profile.release]
|
||||||
opt-level = 3
|
#opt-level = 3
|
||||||
lto = true
|
#lto = true
|
||||||
codegen-units = 1
|
#codegen-units = 1
|
||||||
panic = 'abort'
|
#panic = 'abort'
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
name = "zssp"
|
name = "zssp"
|
||||||
|
|
|
@ -29,6 +29,7 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Assembled<Fragment, MAX_FRAG
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
||||||
|
#[inline(always)]
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
debug_assert!(MAX_FRAGMENTS <= 64);
|
debug_assert!(MAX_FRAGMENTS <= 64);
|
||||||
debug_assert_eq!(size_of::<MaybeUninit<Fragment>>(), size_of::<Fragment>());
|
debug_assert_eq!(size_of::<MaybeUninit<Fragment>>(), size_of::<Fragment>());
|
||||||
|
@ -39,6 +40,11 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
||||||
unsafe { zeroed() }
|
unsafe { zeroed() }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add a fragment and return an assembled packet container if all fragments have been received.
|
||||||
|
///
|
||||||
|
/// When a fully assembled packet is returned the internal state is reset and this object can
|
||||||
|
/// be reused to assemble another packet.
|
||||||
|
#[inline(always)]
|
||||||
pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option<Assembled<Fragment, MAX_FRAGMENTS>> {
|
pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option<Assembled<Fragment, MAX_FRAGMENTS>> {
|
||||||
if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS {
|
if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS {
|
||||||
debug_assert!((fragment_count as usize) <= MAX_FRAGMENTS);
|
debug_assert!((fragment_count as usize) <= MAX_FRAGMENTS);
|
||||||
|
@ -70,6 +76,9 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
||||||
have |= 1u64.wrapping_shl(fragment_no as u32);
|
have |= 1u64.wrapping_shl(fragment_no as u32);
|
||||||
if (have & want) == want {
|
if (have & want) == want {
|
||||||
self.have = 0;
|
self.have = 0;
|
||||||
|
// Setting 'have' to 0 resets the state of this object, and the fragments
|
||||||
|
// are effectively moved into the Assembled<> container and returned. That
|
||||||
|
// container will drop them when it is dropped.
|
||||||
return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize));
|
return Some(Assembled(unsafe { std::mem::transmute_copy(&self.frags) }, fragment_count as usize));
|
||||||
} else {
|
} else {
|
||||||
self.have = have;
|
self.have = have;
|
||||||
|
@ -80,6 +89,7 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGMENTS> {
|
impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGMENTS> {
|
||||||
|
#[inline(always)]
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if needs_drop::<Fragment>() {
|
if needs_drop::<Fragment>() {
|
||||||
let mut have = self.have;
|
let mut have = self.have;
|
||||||
|
|
|
@ -272,7 +272,7 @@ fn main() {
|
||||||
let alice_thread = ts.spawn(|| alice_main(&run, packet_success_rate, &alice_app, &bob_app, alice_out, alice_in));
|
let alice_thread = ts.spawn(|| alice_main(&run, packet_success_rate, &alice_app, &bob_app, alice_out, alice_in));
|
||||||
let bob_thread = ts.spawn(|| bob_main(&run, packet_success_rate, &alice_app, &bob_app, bob_out, bob_in));
|
let bob_thread = ts.spawn(|| bob_main(&run, packet_success_rate, &alice_app, &bob_app, bob_out, bob_in));
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(60 * 10));
|
thread::sleep(Duration::from_secs(60 * 60));
|
||||||
|
|
||||||
run.store(false, Ordering::SeqCst);
|
run.store(false, Ordering::SeqCst);
|
||||||
let _ = alice_thread.join();
|
let _ = alice_thread.join();
|
||||||
|
|
|
@ -171,6 +171,7 @@ impl<Application: ApplicationLayer> Context<Application> {
|
||||||
let retry_cutoff = current_time - Application::RETRY_INTERVAL;
|
let retry_cutoff = current_time - Application::RETRY_INTERVAL;
|
||||||
let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS;
|
let negotiation_timeout_cutoff = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS;
|
||||||
|
|
||||||
|
// Scan sessions in read lock mode, then lock more briefly in write mode to delete any dead entries that we found.
|
||||||
{
|
{
|
||||||
let sessions = self.sessions.read().unwrap();
|
let sessions = self.sessions.read().unwrap();
|
||||||
for (id, s) in sessions.active.iter() {
|
for (id, s) in sessions.active.iter() {
|
||||||
|
@ -251,6 +252,7 @@ impl<Application: ApplicationLayer> Context<Application> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Delete any expired defragmentation queue items not associated with a session.
|
||||||
self.defrag.lock().unwrap().retain(|_, fragged| fragged.1 > negotiation_timeout_cutoff);
|
self.defrag.lock().unwrap().retain(|_, fragged| fragged.1 > negotiation_timeout_cutoff);
|
||||||
|
|
||||||
Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS.min(Application::RETRY_INTERVAL)
|
Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS.min(Application::RETRY_INTERVAL)
|
||||||
|
@ -502,8 +504,7 @@ impl<Application: ApplicationLayer> Context<Application> {
|
||||||
.or_insert_with(|| Arc::new((Mutex::new(Fragged::new()), current_time)))
|
.or_insert_with(|| Arc::new((Mutex::new(Fragged::new()), current_time)))
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
// Anti-DOS emergency cleaning of the incoming defragmentation queue for packets not
|
// Anti-DOS overflow purge of the incoming defragmentation queue for packets not associated with known sessions.
|
||||||
// associated with known sessions.
|
|
||||||
if defrag.len() >= self.max_incomplete_session_queue_size {
|
if defrag.len() >= self.max_incomplete_session_queue_size {
|
||||||
// First, drop all entries that are timed out or whose physical source duplicates another entry.
|
// First, drop all entries that are timed out or whose physical source duplicates another entry.
|
||||||
let mut sources = HashSet::with_capacity(defrag.len());
|
let mut sources = HashSet::with_capacity(defrag.len());
|
||||||
|
@ -700,15 +701,9 @@ impl<Application: ApplicationLayer> Context<Application> {
|
||||||
*
|
*
|
||||||
* Bob authenticates the message and confirms that Alice indeed knows Bob's
|
* Bob authenticates the message and confirms that Alice indeed knows Bob's
|
||||||
* identity, then responds with his ephemeral keys.
|
* identity, then responds with his ephemeral keys.
|
||||||
*
|
|
||||||
* Bob also sends an opaque sealed object called Bob's "note to self." It contains
|
|
||||||
* Bob's state for the connection as of this first exchange, allowing Bob to be
|
|
||||||
* stateless until he knows and has confirmed Alice's identity. It's encrypted,
|
|
||||||
* authenticated, subject to a short TTL, and contains only information relevant
|
|
||||||
* to the current exchange.
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if incoming_counter != 1 || session.is_some() {
|
if incoming_counter != 1 || session.is_some() || incoming.is_some() {
|
||||||
return Err(Error::OutOfSequence);
|
return Err(Error::OutOfSequence);
|
||||||
}
|
}
|
||||||
if pkt_assembled.len() != AliceNoiseXKInit::SIZE {
|
if pkt_assembled.len() != AliceNoiseXKInit::SIZE {
|
||||||
|
@ -770,10 +765,10 @@ impl<Application: ApplicationLayer> Context<Application> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If this queue is too big, we remove the latest entry and replace it. The latest
|
||||||
|
// is used because under flood conditions this is most likely to be another bogus
|
||||||
|
// entry. If we find one that is actually timed out, that one is replaced instead.
|
||||||
if sessions.incoming.len() >= self.max_incomplete_session_queue_size {
|
if sessions.incoming.len() >= self.max_incomplete_session_queue_size {
|
||||||
// If this queue is too big, we remove the latest entry and replace it. The latest
|
|
||||||
// is used because under flood conditions this is most likely to be another bogus
|
|
||||||
// entry. If we find one that is actually timed out, that one is replaced instead.
|
|
||||||
let mut newest = i64::MIN;
|
let mut newest = i64::MIN;
|
||||||
let mut replace_id = None;
|
let mut replace_id = None;
|
||||||
let cutoff_time = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS;
|
let cutoff_time = current_time - Application::INCOMING_SESSION_NEGOTIATION_TIMEOUT_MS;
|
||||||
|
@ -1390,7 +1385,7 @@ impl<Application: ApplicationLayer> Session<Application> {
|
||||||
/// Check whether this session is established.
|
/// Check whether this session is established.
|
||||||
pub fn established(&self) -> bool {
|
pub fn established(&self) -> bool {
|
||||||
let state = self.state.read().unwrap();
|
let state = self.state.read().unwrap();
|
||||||
state.keys[state.current_key].is_some()
|
state.keys[state.current_key].as_ref().map_or(false, |k| k.confirmed)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the ratchet count and a hash fingerprint of the current active key.
|
/// Get the ratchet count and a hash fingerprint of the current active key.
|
||||||
|
|
Loading…
Add table
Reference in a new issue