Remove parking_lot since as of 1.63 Rust std::mutex is usually as good or better.

This commit is contained in:
Adam Ierymenko 2022-09-29 15:34:03 -04:00
parent ceacc932de
commit 5d17c37b65
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
21 changed files with 136 additions and 143 deletions

View file

@ -13,7 +13,6 @@ zerotier-utils = { path = "../utils", features = ["tokio"] }
zerotier-network-hypervisor = { path = "../network-hypervisor" }
zerotier-vl1-service = { path = "../vl1-service" }
async-trait = "^0"
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
clap = { version = "^3", features = ["std", "suggestions"], default-features = false }

View file

@ -3,14 +3,13 @@
use std::sync::Arc;
use tokio::time::{Duration, Instant};
use zerotier_utils::tokio;
use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer};
use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, PacketHandlerResult, Path, PathFilter, Peer};
use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::dictionary::Dictionary;
use zerotier_utils::reaper::Reaper;
use zerotier_utils::tokio;
use crate::database::Database;

View file

@ -11,7 +11,6 @@ ed25519-dalek = { version = "1.0.1", features = ["std", "u64_backend"], default-
foreign-types = "0.3.1"
lazy_static = "^1"
openssl = { version = "^0", features = [], default-features = false }
parking_lot = { version = "^0", features = [], default-features = false }
poly1305 = { version = "0.7.2", features = [], default-features = false }
pqc_kyber = { path = "../third_party/kyber", features = ["kyber1024", "reference"], default-features = false }
#pqc_kyber = { version = "^0", features = ["kyber1024", "reference"], default-features = false }

View file

@ -6,6 +6,7 @@
use std::io::{Read, Write};
use std::ops::Deref;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Mutex, RwLock};
use crate::aes::{Aes, AesGcm};
use crate::hash::{hmac_sha512, HMACSHA384, SHA384};
@ -19,8 +20,6 @@ use zerotier_utils::ringbuffermap::RingBufferMap;
use zerotier_utils::unlikely_branch;
use zerotier_utils::varint;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
/// Minimum size of a valid packet.
pub const MIN_PACKET_SIZE: usize = HEADER_SIZE + AES_GCM_TAG_SIZE;
@ -410,7 +409,7 @@ impl<H: Host> Session<H> {
mut data: &[u8],
) -> Result<(), Error> {
debug_assert!(mtu_buffer.len() >= MIN_MTU);
let state = self.state.read();
let state = self.state.read().unwrap();
if let Some(remote_session_id) = state.remote_session_id {
if let Some(key) = state.keys[state.key_ptr].as_ref() {
let mut packet_len = data.len() + HEADER_SIZE + AES_GCM_TAG_SIZE;
@ -472,7 +471,7 @@ impl<H: Host> Session<H> {
/// Check whether this session is established.
pub fn established(&self) -> bool {
let state = self.state.read();
let state = self.state.read().unwrap();
state.remote_session_id.is_some() && state.keys[state.key_ptr].is_some()
}
@ -481,7 +480,7 @@ impl<H: Host> Session<H> {
/// This returns a tuple of: the key fingerprint, the time it was established, the length of its ratchet chain,
/// and whether Kyber1024 was used. None is returned if the session isn't established.
pub fn security_info(&self) -> Option<([u8; 16], i64, u64, bool)> {
let state = self.state.read();
let state = self.state.read().unwrap();
if let Some(key) = state.keys[state.key_ptr].as_ref() {
Some((key.fingerprint, key.establish_time, key.ratchet_count, key.jedi))
} else {
@ -504,7 +503,7 @@ impl<H: Host> Session<H> {
current_time: i64,
force_rekey: bool,
) {
let state = self.state.upgradable_read();
let state = self.state.read().unwrap();
if (force_rekey
|| state.keys[state.key_ptr]
.as_ref()
@ -538,7 +537,8 @@ impl<H: Host> Session<H> {
mtu,
current_time,
) {
let _ = RwLockUpgradableReadGuard::upgrade(state).offer.replace(offer);
drop(state);
let _ = self.state.write().unwrap().offer.replace(offer);
}
}
}
@ -590,7 +590,7 @@ impl<H: Host> ReceiveContext<H> {
let pseudoheader = Pseudoheader::make(u64::from(local_session_id), packet_type, counter);
if fragment_count > 1 {
if fragment_count <= (MAX_FRAGMENTS as u8) && fragment_no < fragment_count {
let mut defrag = session.defrag.lock();
let mut defrag = session.defrag.lock().unwrap();
let fragment_gather_array = defrag.get_or_create_mut(&counter, || GatherArray::new(fragment_count));
if let Some(assembled_packet) = fragment_gather_array.add(fragment_no, incoming_packet_buf) {
drop(defrag); // release lock
@ -638,7 +638,7 @@ impl<H: Host> ReceiveContext<H> {
if check_header_mac(incoming_packet, &self.incoming_init_header_check_cipher) {
let pseudoheader = Pseudoheader::make(SessionId::NIL.0, packet_type, counter);
if fragment_count > 1 {
let mut defrag = self.initial_offer_defrag.lock();
let mut defrag = self.initial_offer_defrag.lock().unwrap();
let fragment_gather_array = defrag.get_or_create_mut(&counter, || GatherArray::new(fragment_count));
if let Some(assembled_packet) = fragment_gather_array.add(fragment_no, incoming_packet_buf) {
drop(defrag); // release lock
@ -697,7 +697,7 @@ impl<H: Host> ReceiveContext<H> {
debug_assert_eq!(PACKET_TYPE_NOP, 1);
if packet_type <= PACKET_TYPE_NOP {
if let Some(session) = session {
let state = session.state.read();
let state = session.state.read().unwrap();
for p in 0..KEY_HISTORY_SIZE {
let key_ptr = (state.key_ptr + p) % KEY_HISTORY_SIZE;
if let Some(key) = state.keys[key_ptr].as_ref() {
@ -747,14 +747,14 @@ impl<H: Host> ReceiveContext<H> {
.map_or(true, |old| old.establish_counter < key.establish_counter)
{
drop(state);
let mut state = session.state.write();
let mut state = session.state.write().unwrap();
state.key_ptr = key_ptr;
for i in 0..KEY_HISTORY_SIZE {
if i != key_ptr {
if let Some(old_key) = state.keys[key_ptr].as_ref() {
// Release pooled cipher memory from old keys.
old_key.receive_cipher_pool.lock().clear();
old_key.send_cipher_pool.lock().clear();
old_key.receive_cipher_pool.lock().unwrap().clear();
old_key.send_cipher_pool.lock().unwrap().clear();
}
}
}
@ -822,7 +822,7 @@ impl<H: Host> ReceiveContext<H> {
// Check rate limits.
if let Some(session) = session.as_ref() {
if (current_time - session.state.read().last_remote_offer) < H::REKEY_RATE_LIMIT_MS {
if (current_time - session.state.read().unwrap().last_remote_offer) < H::REKEY_RATE_LIMIT_MS {
return Err(Error::RateLimited);
}
} else {
@ -899,7 +899,7 @@ impl<H: Host> ReceiveContext<H> {
let alice_ratchet_key_fingerprint = alice_ratchet_key_fingerprint.as_ref().unwrap();
let mut ratchet_key = None;
let mut ratchet_count = 0;
let state = session.state.read();
let state = session.state.read().unwrap();
for k in state.keys.iter() {
if let Some(k) = k.as_ref() {
if key_fingerprint(k.ratchet_key.as_bytes())[..16].eq(alice_ratchet_key_fingerprint) {
@ -1060,7 +1060,7 @@ impl<H: Host> ReceiveContext<H> {
let key = SessionKey::new(key, Role::Bob, current_time, reply_counter, ratchet_count + 1, e1e1.is_some());
let mut state = session.state.write();
let mut state = session.state.write().unwrap();
let _ = state.remote_session_id.replace(alice_session_id);
let next_key_ptr = (state.key_ptr + 1) % KEY_HISTORY_SIZE;
let _ = state.keys[next_key_ptr].replace(key);
@ -1087,7 +1087,7 @@ impl<H: Host> ReceiveContext<H> {
let aes_gcm_tag_end = incoming_packet_len - HMAC_SIZE;
if let Some(session) = session {
let state = session.state.upgradable_read();
let state = session.state.read().unwrap();
if let Some(offer) = state.offer.as_ref() {
let (bob_e0_public, e0e0) =
P384PublicKey::from_bytes(&incoming_packet[(HEADER_SIZE + 1)..(HEADER_SIZE + 1 + P384_PUBLIC_KEY_SIZE)])
@ -1181,7 +1181,8 @@ impl<H: Host> ReceiveContext<H> {
set_header_mac(&mut reply_buf, &session.header_check_cipher);
send(&mut reply_buf);
let mut state = RwLockUpgradableReadGuard::upgrade(state);
drop(state);
let mut state = session.state.write().unwrap();
let _ = state.remote_session_id.replace(bob_session_id);
let next_key_ptr = (state.key_ptr + 1) % KEY_HISTORY_SIZE;
let _ = state.keys[next_key_ptr].replace(key);
@ -1603,11 +1604,12 @@ impl SessionKey {
Ok(self
.send_cipher_pool
.lock()
.unwrap()
.pop()
.unwrap_or_else(|| Box::new(AesGcm::new(self.send_key.as_bytes(), true))))
} else {
// Not only do we return an error, but we also destroy the key.
let mut scp = self.send_cipher_pool.lock();
let mut scp = self.send_cipher_pool.lock().unwrap();
scp.clear();
self.send_key.nuke();
@ -1617,20 +1619,21 @@ impl SessionKey {
#[inline(always)]
fn return_send_cipher(&self, c: Box<AesGcm>) {
self.send_cipher_pool.lock().push(c);
self.send_cipher_pool.lock().unwrap().push(c);
}
#[inline(always)]
fn get_receive_cipher(&self) -> Box<AesGcm> {
self.receive_cipher_pool
.lock()
.unwrap()
.pop()
.unwrap_or_else(|| Box::new(AesGcm::new(self.receive_key.as_bytes(), false)))
}
#[inline(always)]
fn return_receive_cipher(&self, c: Box<AesGcm>) {
self.receive_cipher_pool.lock().push(c);
self.receive_cipher_pool.lock().unwrap().push(c);
}
}
@ -1659,9 +1662,8 @@ fn key_fingerprint(key: &[u8]) -> [u8; 48] {
#[cfg(test)]
mod tests {
use parking_lot::Mutex;
use std::collections::LinkedList;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use zerotier_utils::hex;
#[allow(unused_imports)]
@ -1722,7 +1724,7 @@ mod tests {
}
fn session_lookup(&self, local_session_id: SessionId) -> Option<Self::SessionRef> {
self.session.lock().as_ref().and_then(|s| {
self.session.lock().unwrap().as_ref().and_then(|s| {
if s.id == local_session_id {
Some(s.clone())
} else {
@ -1743,7 +1745,7 @@ mod tests {
_: &[u8],
) -> Option<(SessionId, Secret<64>, Self::AssociatedObject)> {
loop {
let mut new_id = self.session_id_counter.lock();
let mut new_id = self.session_id_counter.lock().unwrap();
*new_id += 1;
return Some((SessionId::new_from_u64(*new_id).unwrap(), self.psk.clone(), 0));
}
@ -1765,10 +1767,10 @@ mod tests {
//println!("zssp: size of session (bytes): {}", std::mem::size_of::<Session<Box<TestHost>>>());
let _ = alice_host.session.lock().insert(Arc::new(
let _ = alice_host.session.lock().unwrap().insert(Arc::new(
Session::new(
&alice_host,
|data| bob_host.queue.lock().push_front(data.to_vec()),
|data| bob_host.queue.lock().unwrap().push_front(data.to_vec()),
SessionId::new_random(),
bob_host.local_s.public_key_bytes(),
&[],
@ -1785,9 +1787,9 @@ mod tests {
for host in [&alice_host, &bob_host] {
let send_to_other = |data: &mut [u8]| {
if std::ptr::eq(host, &alice_host) {
bob_host.queue.lock().push_front(data.to_vec());
bob_host.queue.lock().unwrap().push_front(data.to_vec());
} else {
alice_host.queue.lock().push_front(data.to_vec());
alice_host.queue.lock().unwrap().push_front(data.to_vec());
}
};
@ -1798,7 +1800,7 @@ mod tests {
};
loop {
if let Some(qi) = host.queue.lock().pop_back() {
if let Some(qi) = host.queue.lock().unwrap().pop_back() {
let qi_len = qi.len();
ts += 1;
let r = rc.receive(host, &0, send_to_other, &mut data_buf, qi, mtu_buffer.len(), ts);
@ -1820,7 +1822,7 @@ mod tests {
qi_len,
u64::from(new_session.id)
);
let mut hs = host.session.lock();
let mut hs = host.session.lock().unwrap();
assert!(hs.is_none());
let _ = hs.insert(Arc::new(new_session));
}
@ -1844,10 +1846,10 @@ mod tests {
}
data_buf.fill(0x12);
if let Some(session) = host.session.lock().as_ref().cloned() {
if let Some(session) = host.session.lock().unwrap().as_ref().cloned() {
if session.established() {
{
let mut key_id = host.key_id.lock();
let mut key_id = host.key_id.lock().unwrap();
let security_info = session.security_info().unwrap();
if !security_info.0.eq(key_id.as_ref()) {
*key_id = security_info.0;

View file

@ -14,7 +14,6 @@ zerotier-crypto = { path = "../crypto" }
zerotier-utils = { path = "../utils" }
base64 = "^0"
lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] }
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
[dev-dependencies]

View file

@ -4,11 +4,9 @@ use std::collections::HashMap;
use std::hash::Hash;
use std::io::Write;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::Duration;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use crate::protocol::*;
use crate::vl1::address::Address;
use crate::vl1::debug_event;
@ -290,11 +288,11 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
pub fn peer(&self, a: Address) -> Option<Arc<Peer<HostSystemImpl>>> {
self.peers.read().get(&a).cloned()
self.peers.read().unwrap().get(&a).cloned()
}
pub fn is_online(&self) -> bool {
self.roots.read().online
self.roots.read().unwrap().online
}
pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration {
@ -303,7 +301,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
let time_ticks = host_system.time_ticks();
let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_queue_retry) = {
let mut intervals = self.intervals.lock();
let mut intervals = self.intervals.lock().unwrap();
(
intervals.root_sync.gate(time_ticks),
intervals.root_hello.gate(time_ticks),
@ -356,7 +354,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if root_sync {
if {
let mut roots = self.roots.write();
let mut roots = self.roots.write().unwrap();
if roots.sets_modified {
roots.sets_modified = false;
true
@ -367,7 +365,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
debug_event!(host_system, "[vl1] root sets modified, synchronizing internal data structures");
let (mut old_root_identities, address_collisions, new_roots, bad_identities, my_root_sets) = {
let roots = self.roots.read();
let roots = self.roots.read().unwrap();
let old_root_identities: Vec<Identity> = roots.roots.iter().map(|(p, _)| p.identity.clone()).collect();
let mut new_roots = HashMap::new();
@ -387,6 +385,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
} else if self
.peers
.read()
.unwrap()
.get(&m.identity.address)
.map_or(false, |p| !p.identity.eq(&m.identity))
|| address_collision_check
@ -409,13 +408,16 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
m.identity.address.to_string(),
m.endpoints.as_ref().map_or(0, |e| e.len())
);
let peers = self.peers.upgradable_read();
let peers = self.peers.read().unwrap();
if let Some(peer) = peers.get(&m.identity.address) {
new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
} else {
if let Some(peer) = Peer::<HostSystemImpl>::new(&self.identity, m.identity.clone(), time_ticks) {
drop(peers);
new_roots.insert(
RwLockUpgradableReadGuard::upgrade(peers)
self.peers
.write()
.unwrap()
.entry(m.identity.address)
.or_insert_with(|| Arc::new(peer))
.clone(),
@ -450,7 +452,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
new_root_identities.sort_unstable();
if !old_root_identities.eq(&new_root_identities) {
let mut roots = self.roots.write();
let mut roots = self.roots.write().unwrap();
roots.roots = new_roots;
roots.this_root_sets = my_root_sets;
host_system.event(Event::UpdatedRoots(old_root_identities, new_root_identities));
@ -458,7 +460,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
{
let roots = self.roots.read();
let roots = self.roots.read().unwrap();
// The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison
// this is a proxy for latency and also causes roots that fail to reply to drop out quickly.
@ -473,9 +475,10 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
if let Some(best) = best {
let best_root = self.best_root.upgradable_read();
let best_root = self.best_root.read().unwrap();
if best_root.as_ref().map_or(true, |br| !Arc::ptr_eq(&br, best)) {
let mut best_root = RwLockUpgradableReadGuard::upgrade(best_root);
drop(best_root);
let mut best_root = self.best_root.write().unwrap();
if let Some(best_root) = best_root.as_mut() {
debug_event!(
host_system,
@ -494,7 +497,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
}
}
} else {
if let Some(old_best) = self.best_root.write().take() {
if let Some(old_best) = self.best_root.write().unwrap().take() {
debug_event!(
host_system,
"[vl1] selected new best root: NONE (replaced {})",
@ -507,12 +510,12 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if (time_ticks - latest_hello_reply) < (ROOT_HELLO_INTERVAL * 2) && best.is_some() {
if !roots.online {
drop(roots);
self.roots.write().online = true;
self.roots.write().unwrap().online = true;
host_system.event(Event::Online(true));
}
} else if roots.online {
drop(roots);
self.roots.write().online = false;
self.roots.write().unwrap().online = false;
host_system.event(Event::Online(false));
}
}
@ -524,7 +527,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
// external addresses from them.
if root_hello || root_spam_hello {
let roots = {
let roots = self.roots.read();
let roots = self.roots.read().unwrap();
let mut roots_copy = Vec::with_capacity(roots.roots.len());
for (root, endpoints) in roots.roots.iter() {
roots_copy.push((root.clone(), endpoints.clone()));
@ -551,15 +554,15 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
// roots. Roots on the other hand remain in the peer list as long as they are roots.
let mut dead_peers = Vec::new();
{
let roots = self.roots.read();
for (a, peer) in self.peers.read().iter() {
let roots = self.roots.read().unwrap();
for (a, peer) in self.peers.read().unwrap().iter() {
if !peer.service(host_system, self, time_ticks) && !roots.roots.contains_key(peer) {
dead_peers.push(*a);
}
}
}
for dp in dead_peers.iter() {
self.peers.write().remove(dp);
self.peers.write().unwrap().remove(dp);
}
}
@ -568,7 +571,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
let mut need_keepalive = Vec::new();
// First check all paths in read mode to avoid blocking the entire node.
for (k, path) in self.paths.read().iter() {
for (k, path) in self.paths.read().unwrap().iter() {
if host_system.local_socket_is_valid(k.local_socket()) {
match path.service(time_ticks) {
PathServiceResult::Ok => {}
@ -582,7 +585,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
// Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking.
for dp in dead_paths.iter() {
self.paths.write().remove(dp);
self.paths.write().unwrap().remove(dp);
}
// Finally run keepalive sends as a batch.
@ -595,7 +598,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
if whois_queue_retry {
let need_whois = {
let mut need_whois = Vec::new();
let mut whois_queue = self.whois_queue.lock();
let mut whois_queue = self.whois_queue.lock().unwrap();
whois_queue.retain(|_, qi| qi.retry_count <= WHOIS_RETRY_COUNT_MAX);
for (address, qi) in whois_queue.iter_mut() {
if (time_ticks - qi.last_retry_time) >= WHOIS_RETRY_INTERVAL {
@ -811,7 +814,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
) {
debug_event!(host_system, "[vl1] [v1] WHOIS {}", address.to_string());
{
let mut whois_queue = self.whois_queue.lock();
let mut whois_queue = self.whois_queue.lock().unwrap();
let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem::<HostSystemImpl> {
v1_proto_waiting_packets: RingBuffer::new(),
last_retry_time: 0,
@ -865,11 +868,11 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
) {
if authoritative {
if received_identity.validate_identity() {
let mut whois_queue = self.whois_queue.lock();
let mut whois_queue = self.whois_queue.lock().unwrap();
if let Some(qi) = whois_queue.get_mut(&received_identity.address) {
let address = received_identity.address;
if inner.should_communicate_with(&received_identity) {
let mut peers = self.peers.write();
let mut peers = self.peers.write().unwrap();
if let Some(peer) = peers.get(&address).cloned().or_else(|| {
Peer::new(&self.identity, received_identity, time_ticks)
.map(|p| Arc::new(p))
@ -893,17 +896,17 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
/// Get the current "best" root from among this node's trusted roots.
pub fn best_root(&self) -> Option<Arc<Peer<HostSystemImpl>>> {
self.best_root.read().clone()
self.best_root.read().unwrap().clone()
}
/// Check whether a peer is a root according to any root set trusted by this node.
pub fn is_peer_root(&self, peer: &Peer<HostSystemImpl>) -> bool {
self.roots.read().roots.keys().any(|p| p.identity.eq(&peer.identity))
self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity))
}
/// Returns true if this node is a member of a root set (that it knows about).
pub fn this_node_is_root(&self) -> bool {
self.roots.read().this_root_sets.is_some()
self.roots.read().unwrap().this_root_sets.is_some()
}
/// Called when a remote node sends us a root set update, applying the update if it is valid and applicable.
@ -911,7 +914,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
/// This will only replace an existing root set with a newer one. It won't add a new root set, which must be
/// done by an authorized user or administrator not just by a root.
pub(crate) fn remote_update_root_set(&self, received_from: &Identity, rs: Verified<RootSet>) {
let mut roots = self.roots.write();
let mut roots = self.roots.write().unwrap();
if let Some(entry) = roots.sets.get_mut(&rs.name) {
if entry.members.iter().any(|m| m.identity.eq(received_from)) && rs.should_replace(entry) {
*entry = rs;
@ -922,7 +925,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
/// Add a new root set or update the existing root set if the new root set is newer and otherwise matches.
pub fn add_update_root_set(&self, rs: Verified<RootSet>) -> bool {
let mut roots = self.roots.write();
let mut roots = self.roots.write().unwrap();
if let Some(entry) = roots.sets.get_mut(&rs.name) {
if rs.should_replace(entry) {
*entry = rs;
@ -940,7 +943,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
/// Returns whether or not this node has any root sets defined.
pub fn has_roots_defined(&self) -> bool {
self.roots.read().sets.iter().any(|rs| !rs.1.members.is_empty())
self.roots.read().unwrap().sets.iter().any(|rs| !rs.1.members.is_empty())
}
/// Initialize with default roots if there are no roots defined, otherwise do nothing.
@ -954,7 +957,7 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
/// Get the root sets that this node trusts.
pub fn root_sets(&self) -> Vec<RootSet> {
self.roots.read().sets.values().cloned().map(|s| s.unwrap()).collect()
self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect()
}
/// Get the canonical Path object corresponding to an endpoint.
@ -965,13 +968,14 @@ impl<HostSystemImpl: HostSystem> Node<HostSystemImpl> {
local_interface: &HostSystemImpl::LocalInterface,
time_ticks: i64,
) -> Arc<Path<HostSystemImpl>> {
let paths = self.paths.read();
let paths = self.paths.read().unwrap();
if let Some(path) = paths.get(&PathKey::Ref(ep, local_socket)) {
path.clone()
} else {
drop(paths);
self.paths
.write()
.unwrap()
.entry(PathKey::Copied(ep.clone(), local_socket.clone()))
.or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks)))
.clone()

View file

@ -3,8 +3,7 @@
use std::collections::HashMap;
use std::hash::{BuildHasher, Hasher};
use std::sync::atomic::{AtomicI64, Ordering};
use parking_lot::Mutex;
use std::sync::Mutex;
use crate::protocol::*;
use crate::vl1::endpoint::Endpoint;
@ -65,7 +64,7 @@ impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
packet: PooledPacketBuffer,
time_ticks: i64,
) -> Option<v1::FragmentedPacket> {
let mut fp = self.fragmented_packets.lock();
let mut fp = self.fragmented_packets.lock().unwrap();
// Discard some old waiting packets if the total incoming fragments for a path exceeds a
// sanity limit. This is to prevent memory exhaustion DOS attacks.
@ -107,6 +106,7 @@ impl<HostSystemImpl: HostSystem> Path<HostSystemImpl> {
pub(crate) fn service(&self, time_ticks: i64) -> PathServiceResult {
self.fragmented_packets
.lock()
.unwrap()
.retain(|_, frag| (time_ticks - frag.ts_ticks) < v1::FRAGMENT_EXPIRATION);
if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PATH_EXPIRATION_TIME {
if (time_ticks - self.last_send_time_ticks.load(Ordering::Relaxed)) >= PATH_KEEPALIVE_INTERVAL {

View file

@ -3,9 +3,7 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use parking_lot::{Mutex, RwLock};
use std::sync::{Arc, Mutex, RwLock, Weak};
use zerotier_crypto::poly1305;
use zerotier_crypto::random;
@ -87,7 +85,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
/// Get the remote version of this peer: major, minor, revision.
/// Returns None if it's not yet known.
pub fn version(&self) -> Option<(u8, u8, u16)> {
let rv = self.remote_node_info.read().remote_version;
let rv = self.remote_node_info.read().unwrap().remote_version;
if rv.0 != 0 || rv.1 != 0 || rv.2 != 0 {
Some(rv)
} else {
@ -97,7 +95,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
/// Get the remote protocol version of this peer or None if not yet known.
pub fn protocol_version(&self) -> Option<u8> {
let pv = self.remote_node_info.read().remote_protocol_version;
let pv = self.remote_node_info.read().unwrap().remote_protocol_version;
if pv != 0 {
Some(pv)
} else {
@ -107,7 +105,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
/// Get current best path or None if there are no direct paths to this peer.
pub fn direct_path(&self) -> Option<Arc<Path<HostSystemImpl>>> {
for p in self.paths.lock().iter() {
for p in self.paths.lock().unwrap().iter() {
let pp = p.path.upgrade();
if pp.is_some() {
return pp;
@ -129,7 +127,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
}
fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc<Path<HostSystemImpl>>, time_ticks: i64) {
let mut paths = self.paths.lock();
let mut paths = self.paths.lock().unwrap();
match &new_path.endpoint {
Endpoint::IpUdp(new_ip) => {
@ -194,7 +192,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node<HostSystemImpl>, time_ticks: i64) -> bool {
// Prune dead paths and sort in descending order of quality.
{
let mut paths = self.paths.lock();
let mut paths = self.paths.lock().unwrap();
paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0));
if paths.capacity() > 16 {
paths.shrink_to_fit();
@ -205,6 +203,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
// Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs).
self.remote_node_info
.write()
.unwrap()
.reported_local_endpoints
.retain(|_, ts| (time_ticks - *ts) < PEER_EXPIRATION_TIME);
(time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME
@ -286,7 +285,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
let max_fragment_size = path.endpoint.max_fragment_size();
if self.remote_node_info.read().remote_protocol_version >= 12 {
if self.remote_node_info.read().unwrap().remote_protocol_version >= 12 {
let flags_cipher_hops = if packet.len() > max_fragment_size {
v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV
} else {
@ -503,7 +502,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
let mut path_is_known = false;
for p in self.paths.lock().iter_mut() {
for p in self.paths.lock().unwrap().iter_mut() {
if std::ptr::eq(p.path.as_ptr(), source_path.as_ref()) {
p.last_receive_time_ticks = time_ticks;
path_is_known = true;
@ -585,7 +584,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
if let Ok(identity) = Identity::unmarshal(payload, &mut cursor) {
if identity.eq(&self.identity) {
{
let mut remote_node_info = self.remote_node_info.write();
let mut remote_node_info = self.remote_node_info.write().unwrap();
remote_node_info.remote_protocol_version = hello_fixed_headers.version_proto;
remote_node_info.remote_version = (
hello_fixed_headers.version_major,
@ -679,6 +678,7 @@ impl<HostSystemImpl: HostSystem> Peer<HostSystemImpl> {
if self
.remote_node_info
.write()
.unwrap()
.reported_local_endpoints
.insert(reported_endpoint, time_ticks)
.is_none()

View file

@ -6,12 +6,11 @@ use std::io::Write;
use crate::vl1::identity::{Identity, IDENTITY_MAX_SIGNATURE_SIZE};
use crate::vl1::Endpoint;
use zerotier_crypto::verified::Verified;
use zerotier_utils::arrayvec::ArrayVec;
use zerotier_utils::buffer::Buffer;
use zerotier_utils::marshalable::{Marshalable, UnmarshalError};
use zerotier_crypto::verified::Verified;
use serde::{Deserialize, Serialize};
/// Description of a member of a root cluster.

View file

@ -16,7 +16,6 @@ zerotier-utils = { path = "../utils", features = ["tokio"] }
zerotier-vl1-service = { path = "../vl1-service" }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
parking_lot = { version = "^0", features = [], default-features = false }
clap = { version = "^3", features = ["std", "suggestions"], default-features = false }
[target."cfg(windows)".dependencies]

View file

@ -10,10 +10,9 @@ default = []
tokio = ["dep:tokio"]
[dependencies]
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
tokio = { version = "^1", default-features = false, features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], optional = true }
tokio = { version = "^1", default-features = false, features = ["fs", "io-util", "io-std", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], optional = true }
[target."cfg(windows)".dependencies]
winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

@ -40,12 +40,12 @@ pub fn ms_since_epoch() -> i64 {
/// Get milliseconds since an arbitrary time in the past, guaranteed to monotonically increase.
#[inline]
pub fn ms_monotonic() -> i64 {
static STARTUP_INSTANT: parking_lot::RwLock<Option<std::time::Instant>> = parking_lot::RwLock::new(None);
let si = *STARTUP_INSTANT.read();
static STARTUP_INSTANT: std::sync::RwLock<Option<std::time::Instant>> = std::sync::RwLock::new(None);
let si = *STARTUP_INSTANT.read().unwrap();
let instant_zero = if let Some(si) = si {
si
} else {
*STARTUP_INSTANT.write().get_or_insert(std::time::Instant::now())
*STARTUP_INSTANT.write().unwrap().get_or_insert(std::time::Instant::now())
};
std::time::Instant::now().duration_since(instant_zero).as_millis() as i64
}

View file

@ -2,9 +2,7 @@
use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;
use std::sync::{Arc, Weak};
use parking_lot::Mutex;
use std::sync::{Arc, Mutex, Weak};
/// Each pool requires a factory that creates and resets (for re-use) pooled objects.
pub trait PoolFactory<O> {
@ -78,7 +76,7 @@ where
fn clone(&self) -> Self {
let internal = unsafe { &mut *self.0.as_ptr() };
if let Some(p) = internal.return_pool.upgrade() {
if let Some(o) = p.pool.lock().pop() {
if let Some(o) = p.pool.lock().unwrap().pop() {
let mut o = Self(o);
*o.as_mut() = self.as_ref().clone();
o
@ -135,7 +133,7 @@ impl<O, F: PoolFactory<O>> Drop for Pooled<O, F> {
let internal = unsafe { &mut *self.0.as_ptr() };
if let Some(p) = internal.return_pool.upgrade() {
p.factory.reset(&mut internal.obj);
p.pool.lock().push(self.0);
p.pool.lock().unwrap().push(self.0);
} else {
drop(unsafe { Box::from_raw(self.0.as_ptr()) });
}
@ -164,7 +162,7 @@ impl<O, F: PoolFactory<O>> Pool<O, F> {
/// Get a pooled object, or allocate one if the pool is empty.
#[inline]
pub fn get(&self) -> Pooled<O, F> {
if let Some(o) = self.0.pool.lock().pop() {
if let Some(o) = self.0.pool.lock().unwrap().pop() {
return Pooled::<O, F>(o);
}
return Pooled::<O, F>(unsafe {
@ -182,7 +180,7 @@ impl<O, F: PoolFactory<O>> Pool<O, F> {
/// be done to free some memory if there has been a spike in memory use.
#[inline]
pub fn purge(&self) {
for o in self.0.pool.lock().drain(..) {
for o in self.0.pool.lock().unwrap().drain(..) {
drop(unsafe { Box::from_raw(o.as_ptr()) })
}
}

View file

@ -7,20 +7,20 @@ use tokio::time::Instant;
/// Watches tokio jobs and times them out if they run past a deadline or aborts them all if the reaper is dropped.
pub struct Reaper {
q: Arc<(parking_lot::Mutex<VecDeque<(JoinHandle<()>, Instant)>>, Notify)>,
q: Arc<(std::sync::Mutex<VecDeque<(JoinHandle<()>, Instant)>>, Notify)>,
finisher: JoinHandle<()>,
}
impl Reaper {
pub fn new(runtime: &tokio::runtime::Handle) -> Self {
let q = Arc::new((parking_lot::Mutex::new(VecDeque::with_capacity(16)), Notify::new()));
let q = Arc::new((std::sync::Mutex::new(VecDeque::with_capacity(16)), Notify::new()));
Self {
q: q.clone(),
finisher: runtime.spawn(async move {
loop {
q.1.notified().await;
loop {
let j = q.0.lock().pop_front();
let j = q.0.lock().unwrap().pop_front();
if let Some(j) = j {
let _ = tokio::time::timeout_at(j.1, j.0).await;
} else {
@ -35,7 +35,7 @@ impl Reaper {
/// Add a job to be executed with timeout at a given instant.
#[inline]
pub fn add(&self, job: JoinHandle<()>, deadline: Instant) {
self.q.0.lock().push_back((job, deadline));
self.q.0.lock().unwrap().push_back((job, deadline));
self.q.1.notify_waiters();
}
}
@ -44,6 +44,6 @@ impl Drop for Reaper {
#[inline]
fn drop(&mut self) {
self.finisher.abort();
self.q.0.lock().drain(..).for_each(|j| j.0.abort());
self.q.0.lock().unwrap().drain(..).for_each(|j| j.0.abort());
}
}

View file

@ -1,7 +1,6 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::hash::{Hash, Hasher};
use std::mem::MaybeUninit;
const EMPTY: u16 = 0xffff;

View file

@ -10,7 +10,6 @@ zerotier-network-hypervisor = { path = "../network-hypervisor" }
zerotier-crypto = { path = "../crypto" }
zerotier-utils = { path = "../utils" }
num-traits = "^0"
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }

View file

@ -2,9 +2,8 @@
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, Mutex, RwLock};
use parking_lot::{Mutex, RwLock};
use serde::de::DeserializeOwned;
use serde::Serialize;
@ -86,7 +85,7 @@ impl<Config: PartialEq + Eq + Clone + Send + Sync + Default + Serialize + Deseri
/// Get authorization token for local API, creating and saving if it does not exist.
pub fn authtoken(&self) -> std::io::Result<String> {
let authtoken = self.authtoken.lock().clone();
let authtoken = self.authtoken.lock().unwrap().clone();
if authtoken.is_empty() {
let authtoken_path = self.base_path.join(AUTH_TOKEN_FILENAME);
let authtoken_bytes = read_limit(&authtoken_path, 4096);
@ -97,9 +96,9 @@ impl<Config: PartialEq + Eq + Clone + Send + Sync + Default + Serialize + Deseri
}
std::fs::write(&authtoken_path, tmp.as_bytes())?;
assert!(fs_restrict_permissions(&authtoken_path));
*self.authtoken.lock() = tmp;
*self.authtoken.lock().unwrap() = tmp;
} else {
*self.authtoken.lock() = String::from_utf8_lossy(authtoken_bytes.unwrap().as_slice()).into();
*self.authtoken.lock().unwrap() = String::from_utf8_lossy(authtoken_bytes.unwrap().as_slice()).into();
}
}
Ok(authtoken)
@ -111,15 +110,15 @@ impl<Config: PartialEq + Eq + Clone + Send + Sync + Default + Serialize + Deseri
/// save_config() to save the modified configuration and update the internal copy in
/// this structure.
pub fn config(&self) -> Arc<Config> {
self.config.read().clone()
self.config.read().unwrap().clone()
}
/// Save a modified copy of the configuration and replace the internal copy in this structure (if it's actually changed).
pub fn save_config(&self, modified_config: Config) -> std::io::Result<()> {
if !modified_config.eq(&self.config.read()) {
if !modified_config.eq(&self.config.read().unwrap()) {
let config_data = to_json_pretty(&modified_config);
std::fs::write(self.base_path.join(CONFIG_FILENAME), config_data.as_bytes())?;
*self.config.write() = Arc::new(modified_config);
*self.config.write().unwrap() = Arc::new(modified_config);
}
Ok(())
}

View file

@ -6,8 +6,8 @@ use zerotier_network_hypervisor::vl1::InetAddress;
#[cfg(any(target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "darwin"))]
mod freebsd_like {
use num_traits::AsPrimitive;
use parking_lot::Mutex;
use std::mem::size_of;
use std::sync::Mutex;
use zerotier_network_hypervisor::vl1::InetAddress;
static INFO_SOCKET: Mutex<i32> = Mutex::new(-1);
@ -41,7 +41,7 @@ mod freebsd_like {
pub fn is_ipv6_temporary(device_name: &str, address: &InetAddress) -> bool {
if address.is_ipv6() {
unsafe {
let mut info_socket = INFO_SOCKET.lock();
let mut info_socket = INFO_SOCKET.lock().unwrap();
if *info_socket < 0 {
*info_socket = libc::socket(libc::AF_INET6.as_(), libc::SOCK_DGRAM.as_(), 0) as i32;
if *info_socket < 0 {

View file

@ -8,7 +8,7 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
#[allow(unused_imports)]
use std::ptr::{null, null_mut};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use crate::localinterface::LocalInterface;
@ -67,7 +67,7 @@ pub struct BoundUdpSocket {
pub interface: LocalInterface,
last_receive_time: AtomicI64,
fd: i32,
lock: parking_lot::RwLock<()>,
lock: RwLock<()>,
open: AtomicBool,
}
@ -222,7 +222,7 @@ impl BoundUdpPort {
interface: interface.clone(),
last_receive_time: AtomicI64::new(i64::MIN),
fd,
lock: parking_lot::RwLock::new(()),
lock: RwLock::new(()),
open: AtomicBool::new(true),
});

View file

@ -3,6 +3,7 @@
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::Arc;
use std::sync::RwLock;
use std::thread::JoinHandle;
use std::time::Duration;
@ -30,7 +31,7 @@ pub struct VL1Service<
PathFilterImpl: PathFilter + 'static,
InnerProtocolImpl: InnerProtocol + 'static,
> {
state: parking_lot::RwLock<VL1ServiceMutableState>,
state: RwLock<VL1ServiceMutableState>,
storage: Arc<NodeStorageImpl>,
inner: Arc<InnerProtocolImpl>,
path_filter: Arc<PathFilterImpl>,
@ -40,7 +41,7 @@ pub struct VL1Service<
struct VL1ServiceMutableState {
daemons: Vec<JoinHandle<()>>,
udp_sockets: HashMap<u16, parking_lot::RwLock<BoundUdpPort>>,
udp_sockets: HashMap<u16, RwLock<BoundUdpPort>>,
settings: VL1Settings,
running: bool,
}
@ -55,7 +56,7 @@ impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'stati
settings: VL1Settings,
) -> Result<Arc<Self>, Box<dyn Error>> {
let mut service = VL1Service {
state: parking_lot::RwLock::new(VL1ServiceMutableState {
state: RwLock::new(VL1ServiceMutableState {
daemons: Vec::with_capacity(2),
udp_sockets: HashMap::with_capacity(8),
settings,
@ -78,7 +79,7 @@ impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'stati
daemons.push(std::thread::spawn(move || {
s.background_task_daemon();
}));
service.state.write().daemons = daemons;
service.state.write().unwrap().daemons = daemons;
Ok(service)
}
@ -90,11 +91,11 @@ impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'stati
}
pub fn bound_udp_ports(&self) -> Vec<u16> {
self.state.read().udp_sockets.keys().cloned().collect()
self.state.read().unwrap().udp_sockets.keys().cloned().collect()
}
fn update_udp_bindings(self: &Arc<Self>) {
let state = self.state.read();
let state = self.state.read().unwrap();
let mut need_fixed_ports: HashSet<u16> = HashSet::from_iter(state.settings.fixed_ports.iter().cloned());
let mut have_random_port_count = 0;
for (p, _) in state.udp_sockets.iter() {
@ -105,10 +106,10 @@ impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'stati
let state = if !need_fixed_ports.is_empty() || have_random_port_count != desired_random_port_count {
drop(state);
let mut state = self.state.write();
let mut state = self.state.write().unwrap();
for p in need_fixed_ports.iter() {
state.udp_sockets.insert(*p, parking_lot::RwLock::new(BoundUdpPort::new(*p)));
state.udp_sockets.insert(*p, RwLock::new(BoundUdpPort::new(*p)));
}
while have_random_port_count > desired_random_port_count {
@ -116,7 +117,7 @@ impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'stati
let mut most_stale_binding_port = 0;
for (p, s) in state.udp_sockets.iter() {
if !state.settings.fixed_ports.contains(p) {
let (total_smart_ptr_handles, most_recent_receive) = s.read().liveness();
let (total_smart_ptr_handles, most_recent_receive) = s.read().unwrap().liveness();
if total_smart_ptr_handles < most_stale_binding_liveness.0
|| (total_smart_ptr_handles == most_stale_binding_liveness.0
&& most_recent_receive <= most_stale_binding_liveness.1)
@ -139,28 +140,25 @@ impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'stati
for i in 0..UNASSIGNED_PRIVILEGED_PORTS.len() {
let p = UNASSIGNED_PRIVILEGED_PORTS[rn.wrapping_add(i) % UNASSIGNED_PRIVILEGED_PORTS.len()];
if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) {
let _ = state.udp_sockets.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p)));
let _ = state.udp_sockets.insert(p, RwLock::new(BoundUdpPort::new(p)));
continue 'outer_add_port_loop;
}
}
let p = 50000 + ((random::xorshift64_random() as u16) % 15535);
if !state.udp_sockets.contains_key(&p) && udp_test_bind(p) {
have_random_port_count += state
.udp_sockets
.insert(p, parking_lot::RwLock::new(BoundUdpPort::new(p)))
.is_none() as usize;
have_random_port_count += state.udp_sockets.insert(p, RwLock::new(BoundUdpPort::new(p))).is_none() as usize;
}
}
drop(state);
self.state.read()
self.state.read().unwrap()
} else {
state
};
for (_, binding) in state.udp_sockets.iter() {
let mut binding = binding.write();
let mut binding = binding.write().unwrap();
let _ = binding.update_bindings(
&state.settings.interface_prefix_blacklist,
&state.settings.cidr_blacklist,
@ -175,7 +173,7 @@ impl<NodeStorageImpl: NodeStorage + 'static, PathFilterImpl: PathFilter + 'stati
std::thread::sleep(Duration::from_millis(500));
let mut udp_binding_check_every: usize = 0;
loop {
if !self.state.read().running {
if !self.state.read().unwrap().running {
break;
}
if (udp_binding_check_every % UPDATE_UDP_BINDINGS_EVERY_SECS) == 0 {
@ -252,12 +250,12 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
}
}
let state = self.state.read();
let state = self.state.read().unwrap();
if !state.udp_sockets.is_empty() {
if let Some(specific_interface) = local_interface {
// Send from a specific interface if that interface is specified.
'socket_search: for (_, p) in state.udp_sockets.iter() {
let p = p.read();
let p = p.read().unwrap();
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() {
@ -275,7 +273,7 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
// Otherwise send from one socket on every interface.
let mut sent_on_interfaces = HashSet::with_capacity(4);
for p in state.udp_sockets.values() {
let p = p.read();
let p = p.read().unwrap();
if !p.sockets.is_empty() {
let mut i = (random::next_u32_secure() as usize) % p.sockets.len();
for _ in 0..p.sockets.len() {
@ -311,7 +309,7 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
for VL1Service<NodeStorageImpl, PathFilterImpl, InnerProtocolImpl>
{
fn drop(&mut self) {
let mut state = self.state.write();
let mut state = self.state.write().unwrap();
state.running = false;
state.udp_sockets.clear();
let mut daemons: Vec<JoinHandle<()>> = state.daemons.drain(..).collect();

View file

@ -1,6 +1,7 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use serde::{Deserialize, Serialize};
use zerotier_network_hypervisor::vl1::InetAddress;
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]