This commit is contained in:
Adam Ierymenko 2022-10-21 17:36:34 -07:00
parent c7b6a997be
commit 7cb52b112a
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3

View file

@ -5,7 +5,7 @@ use std::hash::{BuildHasher, Hasher};
use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Mutex; use std::sync::Mutex;
use crate::protocol::*; use crate::protocol;
use crate::vl1::endpoint::Endpoint; use crate::vl1::endpoint::Endpoint;
use crate::vl1::node::*; use crate::vl1::node::*;
@ -13,7 +13,7 @@ use zerotier_crypto::random;
use zerotier_utils::pocket::Pocket; use zerotier_utils::pocket::Pocket;
use zerotier_utils::NEVER_HAPPENED_TICKS; use zerotier_utils::NEVER_HAPPENED_TICKS;
pub(crate) const SERVICE_INTERVAL_MS: i64 = PATH_KEEPALIVE_INTERVAL; pub(crate) const SERVICE_INTERVAL_MS: i64 = protocol::PATH_KEEPALIVE_INTERVAL;
pub(crate) enum PathServiceResult { pub(crate) enum PathServiceResult {
Ok, Ok,
@ -32,7 +32,7 @@ pub struct Path {
last_send_time_ticks: AtomicI64, last_send_time_ticks: AtomicI64,
last_receive_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64,
create_time_ticks: i64, create_time_ticks: i64,
fragmented_packets: Mutex<HashMap<u64, v1::FragmentedPacket, PacketIdHasher>>, fragmented_packets: Mutex<HashMap<u64, protocol::v1::FragmentedPacket, PacketIdHasher>>,
} }
impl Path { impl Path {
@ -70,15 +70,15 @@ impl Path {
packet_id: u64, packet_id: u64,
fragment_no: u8, fragment_no: u8,
fragment_expecting_count: u8, fragment_expecting_count: u8,
packet: PooledPacketBuffer, packet: protocol::PooledPacketBuffer,
time_ticks: i64, time_ticks: i64,
) -> Option<v1::FragmentedPacket> { ) -> Option<protocol::v1::FragmentedPacket> {
let mut fp = self.fragmented_packets.lock().unwrap(); let mut fp = self.fragmented_packets.lock().unwrap();
// Discard some old waiting packets if the total incoming fragments for a path exceeds a // Discard some old waiting packets if the total incoming fragments for a path exceeds a
// sanity limit. This is to prevent memory exhaustion DOS attacks. // sanity limit. This is to prevent memory exhaustion DOS attacks.
let fps = fp.len(); let fps = fp.len();
if fps > v1::FRAGMENT_MAX_INBOUND_PACKETS_PER_PATH { if fps > protocol::v1::FRAGMENT_MAX_INBOUND_PACKETS_PER_PATH {
let mut entries: Vec<(i64, u64)> = Vec::new(); let mut entries: Vec<(i64, u64)> = Vec::new();
entries.reserve(fps); entries.reserve(fps);
for f in fp.iter() { for f in fp.iter() {
@ -92,7 +92,7 @@ impl Path {
if fp if fp
.entry(packet_id) .entry(packet_id)
.or_insert_with(|| v1::FragmentedPacket::new(time_ticks)) .or_insert_with(|| protocol::v1::FragmentedPacket::new(time_ticks))
.add_fragment(packet, fragment_no, fragment_expecting_count) .add_fragment(packet, fragment_no, fragment_expecting_count)
{ {
fp.remove(&packet_id) fp.remove(&packet_id)
@ -115,15 +115,15 @@ impl Path {
self.fragmented_packets self.fragmented_packets
.lock() .lock()
.unwrap() .unwrap()
.retain(|_, frag| (time_ticks - frag.ts_ticks) < v1::FRAGMENT_EXPIRATION); .retain(|_, frag| (time_ticks - frag.ts_ticks) < protocol::v1::FRAGMENT_EXPIRATION);
if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PATH_EXPIRATION_TIME { if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < protocol::PATH_EXPIRATION_TIME {
if (time_ticks - self.last_send_time_ticks.load(Ordering::Relaxed)) >= PATH_KEEPALIVE_INTERVAL { if (time_ticks - self.last_send_time_ticks.load(Ordering::Relaxed)) >= protocol::PATH_KEEPALIVE_INTERVAL {
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
PathServiceResult::NeedsKeepalive PathServiceResult::NeedsKeepalive
} else { } else {
PathServiceResult::Ok PathServiceResult::Ok
} }
} else if (time_ticks - self.create_time_ticks) < PATH_EXPIRATION_TIME { } else if (time_ticks - self.create_time_ticks) < protocol::PATH_EXPIRATION_TIME {
PathServiceResult::Ok PathServiceResult::Ok
} else { } else {
PathServiceResult::Dead PathServiceResult::Dead