Make unique IDs for local sockets globally unique.

This commit is contained in:
Adam Ierymenko 2022-06-23 12:36:17 -04:00
parent e3906b3269
commit 37ccc44117
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
2 changed files with 14 additions and 6 deletions

View file

@ -1,10 +1,17 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::hash::Hash;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use crate::udp::BoundUdpSocket;
use lazy_static::lazy_static;
lazy_static! {
static ref LOCAL_SOCKET_UNIQUE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
}
/// Local socket wrapper to provide to the core.
///
/// This implements very fast hash and equality in terms of an arbitrary unique ID assigned at
@ -12,9 +19,13 @@ use crate::udp::BoundUdpSocket;
/// cease to exist or work. This also means that this code can check the weak count to determine
/// if the core is currently holding/using a socket for any reason.
#[derive(Clone)]
pub struct LocalSocket(pub Weak<BoundUdpSocket>, pub usize);
pub struct LocalSocket(pub(crate) Weak<BoundUdpSocket>, usize);
impl LocalSocket {
pub fn new(s: &Arc<BoundUdpSocket>) -> Self {
Self(Arc::downgrade(s), LOCAL_SOCKET_UNIQUE_ID_COUNTER.fetch_add(1, Ordering::SeqCst))
}
/// Returns true if the wrapped socket appears to be in use by the core.
#[inline(always)]
pub fn in_use(&self) -> bool {

View file

@ -3,7 +3,6 @@
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@ -34,7 +33,6 @@ pub struct Service {
struct ServiceImpl {
pub rt: tokio::runtime::Handle,
pub data: DataDir,
pub local_socket_unique_id_counter: AtomicUsize,
pub udp_sockets: tokio::sync::RwLock<HashMap<u16, BoundUdpPort>>,
pub num_listeners_per_socket: usize,
_core: Option<NetworkHypervisor<Self>>,
@ -68,9 +66,8 @@ impl Service {
let mut si = ServiceImpl {
rt,
data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?,
local_socket_unique_id_counter: AtomicUsize::new(1),
udp_sockets: tokio::sync::RwLock::new(HashMap::with_capacity(4)),
num_listeners_per_socket: 1, //std::thread::available_parallelism().unwrap().get(),
num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(),
_core: None,
};
let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity).await?);
@ -116,7 +113,7 @@ impl ServiceImpl {
let self2 = self.clone();
let socket = ns.socket.clone();
let interface = ns.interface.clone();
let local_socket = LocalSocket(Arc::downgrade(ns), self.local_socket_unique_id_counter.fetch_add(1, Ordering::SeqCst));
let local_socket = LocalSocket::new(ns);
ns.socket_associated_tasks.lock().push(self.rt.spawn(async move {
let core = self2.core();
loop {