mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-06-06 12:33:44 +02:00
added init packet expiry
This commit is contained in:
parent
4c26109a09
commit
cada04545e
2 changed files with 50 additions and 26 deletions
|
@ -55,24 +55,11 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
||||||
///
|
///
|
||||||
/// When a fully assembled packet is returned the internal state is reset and this object can
|
/// When a fully assembled packet is returned the internal state is reset and this object can
|
||||||
/// be reused to assemble another packet.
|
/// be reused to assemble another packet.
|
||||||
#[inline(always)]
|
|
||||||
pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option<Assembled<Fragment, MAX_FRAGMENTS>> {
|
pub fn assemble(&mut self, counter: u64, fragment: Fragment, fragment_no: u8, fragment_count: u8) -> Option<Assembled<Fragment, MAX_FRAGMENTS>> {
|
||||||
if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS {
|
if fragment_no < fragment_count && (fragment_count as usize) <= MAX_FRAGMENTS {
|
||||||
// If the counter has changed, reset the structure to receive a new packet.
|
// If the counter has changed, reset the structure to receive a new packet.
|
||||||
if counter != self.counter {
|
if counter != self.counter {
|
||||||
if needs_drop::<Fragment>() {
|
self.drop_in_place();
|
||||||
let mut have = self.have;
|
|
||||||
let mut i = 0;
|
|
||||||
while have != 0 {
|
|
||||||
if (have & 1) != 0 {
|
|
||||||
debug_assert!(i < MAX_FRAGMENTS);
|
|
||||||
unsafe { self.frags.get_unchecked_mut(i).assume_init_drop() };
|
|
||||||
}
|
|
||||||
have = have.wrapping_shr(1);
|
|
||||||
i += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
self.have = 0;
|
|
||||||
self.count = fragment_count as u32;
|
self.count = fragment_count as u32;
|
||||||
self.counter = counter;
|
self.counter = counter;
|
||||||
}
|
}
|
||||||
|
@ -96,11 +83,10 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Fragged<Fragment, MAX_FRAGMENTS> {
|
||||||
}
|
}
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGMENTS> {
|
/// Drops any remaining fragments and resets this object.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn drop(&mut self) {
|
pub fn drop_in_place(&mut self) {
|
||||||
if needs_drop::<Fragment>() {
|
if needs_drop::<Fragment>() {
|
||||||
let mut have = self.have;
|
let mut have = self.have;
|
||||||
let mut i = 0;
|
let mut i = 0;
|
||||||
|
@ -113,5 +99,15 @@ impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGME
|
||||||
i += 1;
|
i += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.have = 0;
|
||||||
|
self.count = 0;
|
||||||
|
self.counter = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Fragment, const MAX_FRAGMENTS: usize> Drop for Fragged<Fragment, MAX_FRAGMENTS> {
|
||||||
|
#[inline(always)]
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.drop_in_place();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,7 @@ use std::collections::hash_map::RandomState;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::hash::{BuildHasher, Hash, Hasher};
|
use std::hash::{BuildHasher, Hash, Hasher};
|
||||||
use std::num::NonZeroU64;
|
use std::num::NonZeroU64;
|
||||||
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering, AtomicBool};
|
||||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
|
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
|
||||||
|
|
||||||
use zerotier_crypto::aes::{Aes, AesGcm};
|
use zerotier_crypto::aes::{Aes, AesGcm};
|
||||||
|
@ -41,7 +41,8 @@ const GCM_CIPHER_POOL_SIZE: usize = 4;
|
||||||
pub struct Context<Application: ApplicationLayer> {
|
pub struct Context<Application: ApplicationLayer> {
|
||||||
default_physical_mtu: AtomicUsize,
|
default_physical_mtu: AtomicUsize,
|
||||||
defrag_salt: RandomState,
|
defrag_salt: RandomState,
|
||||||
defrag: [Mutex<Fragged<Application::IncomingPacketBuffer, MAX_NOISE_HANDSHAKE_FRAGMENTS>>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE],
|
defrag_has_pending: AtomicBool, // Allowed to be falsely positive
|
||||||
|
defrag: [Mutex<(Fragged<Application::IncomingPacketBuffer, MAX_NOISE_HANDSHAKE_FRAGMENTS>, i64)>; MAX_INCOMPLETE_SESSION_QUEUE_SIZE],
|
||||||
sessions: RwLock<SessionsById<Application>>,
|
sessions: RwLock<SessionsById<Application>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,7 +154,8 @@ impl<Application: ApplicationLayer> Context<Application> {
|
||||||
Self {
|
Self {
|
||||||
default_physical_mtu: AtomicUsize::new(default_physical_mtu),
|
default_physical_mtu: AtomicUsize::new(default_physical_mtu),
|
||||||
defrag_salt: RandomState::new(),
|
defrag_salt: RandomState::new(),
|
||||||
defrag: std::array::from_fn(|_| Mutex::new(Fragged::new())),
|
defrag_has_pending: AtomicBool::new(false),
|
||||||
|
defrag: std::array::from_fn(|_| Mutex::new((Fragged::new(), i64::MAX))),
|
||||||
sessions: RwLock::new(SessionsById {
|
sessions: RwLock::new(SessionsById {
|
||||||
active: HashMap::with_capacity(64),
|
active: HashMap::with_capacity(64),
|
||||||
incoming: HashMap::with_capacity(64),
|
incoming: HashMap::with_capacity(64),
|
||||||
|
@ -245,6 +247,24 @@ impl<Application: ApplicationLayer> Context<Application> {
|
||||||
dead_pending.push(*id);
|
dead_pending.push(*id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// Only check for expiration if we have a pending packet.
|
||||||
|
// This check is allowed to have false positives for simplicity's sake.
|
||||||
|
if self.defrag_has_pending.swap(false, Ordering::Relaxed) {
|
||||||
|
let mut has_pending = false;
|
||||||
|
for m in &self.defrag {
|
||||||
|
let mut pending = m.lock().unwrap();
|
||||||
|
if pending.1 <= negotiation_timeout_cutoff {
|
||||||
|
pending.1 = i64::MAX;
|
||||||
|
pending.0.drop_in_place();
|
||||||
|
} else if pending.0.counter() != 0 {
|
||||||
|
has_pending = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if has_pending {
|
||||||
|
self.defrag_has_pending.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !dead_active.is_empty() || !dead_pending.is_empty() {
|
if !dead_active.is_empty() || !dead_pending.is_empty() {
|
||||||
|
@ -544,15 +564,23 @@ impl<Application: ApplicationLayer> Context<Application> {
|
||||||
// cannot control which slots their fragments index to. And since Alice's packet header has a randomly
|
// cannot control which slots their fragments index to. And since Alice's packet header has a randomly
|
||||||
// generated counter value replaying it in time requires extreme amounts of network control.
|
// generated counter value replaying it in time requires extreme amounts of network control.
|
||||||
let mut slot0 = self.defrag[idx0].lock().unwrap();
|
let mut slot0 = self.defrag[idx0].lock().unwrap();
|
||||||
if slot0.counter() == hashed_counter {
|
if slot0.0.counter() == hashed_counter {
|
||||||
assembled = slot0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
assembled = slot0.0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
||||||
|
if assembled.is_some() { slot0.1 = i64::MAX }
|
||||||
} else {
|
} else {
|
||||||
let mut slot1 = self.defrag[idx1].lock().unwrap();
|
let mut slot1 = self.defrag[idx1].lock().unwrap();
|
||||||
if slot1.counter() == hashed_counter || slot1.counter() == 0 {
|
if slot1.0.counter() == hashed_counter || slot1.0.counter() == 0 {
|
||||||
assembled = slot1.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
if slot1.0.counter() == 0 {
|
||||||
|
slot1.1 = current_time;
|
||||||
|
self.defrag_has_pending.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
assembled = slot1.0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
||||||
|
if assembled.is_some() { slot1.1 = i64::MAX }
|
||||||
} else {
|
} else {
|
||||||
// slot1 is full so kick out whatever is in slot0 to make more room.
|
// slot0 is either occupied or empty so we overwrite whatever is there to make more room.
|
||||||
assembled = slot0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
slot0.1 = current_time;
|
||||||
|
self.defrag_has_pending.store(true, Ordering::Relaxed);
|
||||||
|
assembled = slot0.0.assemble(hashed_counter, incoming_physical_packet_buf, fragment_no, fragment_count);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue