diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 572780ba2..5284120fd 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -13,7 +13,6 @@ zerotier-utils = { path = "../utils", features = ["tokio"] } zerotier-network-hypervisor = { path = "../network-hypervisor" } zerotier-vl1-service = { path = "../vl1-service" } async-trait = "^0" -parking_lot = { version = "^0", features = [], default-features = false } serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } clap = { version = "^3", features = ["std", "suggestions"], default-features = false } diff --git a/controller/src/controller.rs b/controller/src/controller.rs index 13ea8f821..e6bcad714 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -3,14 +3,13 @@ use std::sync::Arc; use tokio::time::{Duration, Instant}; -use zerotier_utils::tokio; use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer}; use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, PacketHandlerResult, Path, PathFilter, Peer}; use zerotier_network_hypervisor::vl2::NetworkId; - use zerotier_utils::dictionary::Dictionary; use zerotier_utils::reaper::Reaper; +use zerotier_utils::tokio; use crate::database::Database; diff --git a/crypto/Cargo.toml b/crypto/Cargo.toml index 13132c0b0..d2af1d188 100644 --- a/crypto/Cargo.toml +++ b/crypto/Cargo.toml @@ -11,7 +11,6 @@ ed25519-dalek = { version = "1.0.1", features = ["std", "u64_backend"], default- foreign-types = "0.3.1" lazy_static = "^1" openssl = { version = "^0", features = [], default-features = false } -parking_lot = { version = "^0", features = [], default-features = false } poly1305 = { version = "0.7.2", features = [], default-features = false } pqc_kyber = { path = "../third_party/kyber", features = ["kyber1024", "reference"], default-features = false } #pqc_kyber = { version = "^0", features = ["kyber1024", "reference"], default-features = false } diff --git a/crypto/src/zssp.rs b/crypto/src/zssp.rs index bbd358bae..2b6b8f7af 100644 --- a/crypto/src/zssp.rs +++ b/crypto/src/zssp.rs @@ -6,6 +6,7 @@ use std::io::{Read, Write}; use std::ops::Deref; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Mutex, RwLock}; use crate::aes::{Aes, AesGcm}; use crate::hash::{hmac_sha512, HMACSHA384, SHA384}; @@ -19,8 +20,6 @@ use zerotier_utils::ringbuffermap::RingBufferMap; use zerotier_utils::unlikely_branch; use zerotier_utils::varint; -use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; - /// Minimum size of a valid packet. pub const MIN_PACKET_SIZE: usize = HEADER_SIZE + AES_GCM_TAG_SIZE; @@ -410,7 +409,7 @@ impl Session { mut data: &[u8], ) -> Result<(), Error> { debug_assert!(mtu_buffer.len() >= MIN_MTU); - let state = self.state.read(); + let state = self.state.read().unwrap(); if let Some(remote_session_id) = state.remote_session_id { if let Some(key) = state.keys[state.key_ptr].as_ref() { let mut packet_len = data.len() + HEADER_SIZE + AES_GCM_TAG_SIZE; @@ -472,7 +471,7 @@ impl Session { /// Check whether this session is established. pub fn established(&self) -> bool { - let state = self.state.read(); + let state = self.state.read().unwrap(); state.remote_session_id.is_some() && state.keys[state.key_ptr].is_some() } @@ -481,7 +480,7 @@ impl Session { /// This returns a tuple of: the key fingerprint, the time it was established, the length of its ratchet chain, /// and whether Kyber1024 was used. None is returned if the session isn't established. pub fn security_info(&self) -> Option<([u8; 16], i64, u64, bool)> { - let state = self.state.read(); + let state = self.state.read().unwrap(); if let Some(key) = state.keys[state.key_ptr].as_ref() { Some((key.fingerprint, key.establish_time, key.ratchet_count, key.jedi)) } else { @@ -504,7 +503,7 @@ impl Session { current_time: i64, force_rekey: bool, ) { - let state = self.state.upgradable_read(); + let state = self.state.read().unwrap(); if (force_rekey || state.keys[state.key_ptr] .as_ref() @@ -538,7 +537,8 @@ impl Session { mtu, current_time, ) { - let _ = RwLockUpgradableReadGuard::upgrade(state).offer.replace(offer); + drop(state); + let _ = self.state.write().unwrap().offer.replace(offer); } } } @@ -590,7 +590,7 @@ impl ReceiveContext { let pseudoheader = Pseudoheader::make(u64::from(local_session_id), packet_type, counter); if fragment_count > 1 { if fragment_count <= (MAX_FRAGMENTS as u8) && fragment_no < fragment_count { - let mut defrag = session.defrag.lock(); + let mut defrag = session.defrag.lock().unwrap(); let fragment_gather_array = defrag.get_or_create_mut(&counter, || GatherArray::new(fragment_count)); if let Some(assembled_packet) = fragment_gather_array.add(fragment_no, incoming_packet_buf) { drop(defrag); // release lock @@ -638,7 +638,7 @@ impl ReceiveContext { if check_header_mac(incoming_packet, &self.incoming_init_header_check_cipher) { let pseudoheader = Pseudoheader::make(SessionId::NIL.0, packet_type, counter); if fragment_count > 1 { - let mut defrag = self.initial_offer_defrag.lock(); + let mut defrag = self.initial_offer_defrag.lock().unwrap(); let fragment_gather_array = defrag.get_or_create_mut(&counter, || GatherArray::new(fragment_count)); if let Some(assembled_packet) = fragment_gather_array.add(fragment_no, incoming_packet_buf) { drop(defrag); // release lock @@ -697,7 +697,7 @@ impl ReceiveContext { debug_assert_eq!(PACKET_TYPE_NOP, 1); if packet_type <= PACKET_TYPE_NOP { if let Some(session) = session { - let state = session.state.read(); + let state = session.state.read().unwrap(); for p in 0..KEY_HISTORY_SIZE { let key_ptr = (state.key_ptr + p) % KEY_HISTORY_SIZE; if let Some(key) = state.keys[key_ptr].as_ref() { @@ -747,14 +747,14 @@ impl ReceiveContext { .map_or(true, |old| old.establish_counter < key.establish_counter) { drop(state); - let mut state = session.state.write(); + let mut state = session.state.write().unwrap(); state.key_ptr = key_ptr; for i in 0..KEY_HISTORY_SIZE { if i != key_ptr { if let Some(old_key) = state.keys[key_ptr].as_ref() { // Release pooled cipher memory from old keys. - old_key.receive_cipher_pool.lock().clear(); - old_key.send_cipher_pool.lock().clear(); + old_key.receive_cipher_pool.lock().unwrap().clear(); + old_key.send_cipher_pool.lock().unwrap().clear(); } } } @@ -822,7 +822,7 @@ impl ReceiveContext { // Check rate limits. if let Some(session) = session.as_ref() { - if (current_time - session.state.read().last_remote_offer) < H::REKEY_RATE_LIMIT_MS { + if (current_time - session.state.read().unwrap().last_remote_offer) < H::REKEY_RATE_LIMIT_MS { return Err(Error::RateLimited); } } else { @@ -899,7 +899,7 @@ impl ReceiveContext { let alice_ratchet_key_fingerprint = alice_ratchet_key_fingerprint.as_ref().unwrap(); let mut ratchet_key = None; let mut ratchet_count = 0; - let state = session.state.read(); + let state = session.state.read().unwrap(); for k in state.keys.iter() { if let Some(k) = k.as_ref() { if key_fingerprint(k.ratchet_key.as_bytes())[..16].eq(alice_ratchet_key_fingerprint) { @@ -1060,7 +1060,7 @@ impl ReceiveContext { let key = SessionKey::new(key, Role::Bob, current_time, reply_counter, ratchet_count + 1, e1e1.is_some()); - let mut state = session.state.write(); + let mut state = session.state.write().unwrap(); let _ = state.remote_session_id.replace(alice_session_id); let next_key_ptr = (state.key_ptr + 1) % KEY_HISTORY_SIZE; let _ = state.keys[next_key_ptr].replace(key); @@ -1087,7 +1087,7 @@ impl ReceiveContext { let aes_gcm_tag_end = incoming_packet_len - HMAC_SIZE; if let Some(session) = session { - let state = session.state.upgradable_read(); + let state = session.state.read().unwrap(); if let Some(offer) = state.offer.as_ref() { let (bob_e0_public, e0e0) = P384PublicKey::from_bytes(&incoming_packet[(HEADER_SIZE + 1)..(HEADER_SIZE + 1 + P384_PUBLIC_KEY_SIZE)]) @@ -1181,7 +1181,8 @@ impl ReceiveContext { set_header_mac(&mut reply_buf, &session.header_check_cipher); send(&mut reply_buf); - let mut state = RwLockUpgradableReadGuard::upgrade(state); + drop(state); + let mut state = session.state.write().unwrap(); let _ = state.remote_session_id.replace(bob_session_id); let next_key_ptr = (state.key_ptr + 1) % KEY_HISTORY_SIZE; let _ = state.keys[next_key_ptr].replace(key); @@ -1603,11 +1604,12 @@ impl SessionKey { Ok(self .send_cipher_pool .lock() + .unwrap() .pop() .unwrap_or_else(|| Box::new(AesGcm::new(self.send_key.as_bytes(), true)))) } else { // Not only do we return an error, but we also destroy the key. - let mut scp = self.send_cipher_pool.lock(); + let mut scp = self.send_cipher_pool.lock().unwrap(); scp.clear(); self.send_key.nuke(); @@ -1617,20 +1619,21 @@ impl SessionKey { #[inline(always)] fn return_send_cipher(&self, c: Box) { - self.send_cipher_pool.lock().push(c); + self.send_cipher_pool.lock().unwrap().push(c); } #[inline(always)] fn get_receive_cipher(&self) -> Box { self.receive_cipher_pool .lock() + .unwrap() .pop() .unwrap_or_else(|| Box::new(AesGcm::new(self.receive_key.as_bytes(), false))) } #[inline(always)] fn return_receive_cipher(&self, c: Box) { - self.receive_cipher_pool.lock().push(c); + self.receive_cipher_pool.lock().unwrap().push(c); } } @@ -1659,9 +1662,8 @@ fn key_fingerprint(key: &[u8]) -> [u8; 48] { #[cfg(test)] mod tests { - use parking_lot::Mutex; use std::collections::LinkedList; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; use zerotier_utils::hex; #[allow(unused_imports)] @@ -1722,7 +1724,7 @@ mod tests { } fn session_lookup(&self, local_session_id: SessionId) -> Option { - self.session.lock().as_ref().and_then(|s| { + self.session.lock().unwrap().as_ref().and_then(|s| { if s.id == local_session_id { Some(s.clone()) } else { @@ -1743,7 +1745,7 @@ mod tests { _: &[u8], ) -> Option<(SessionId, Secret<64>, Self::AssociatedObject)> { loop { - let mut new_id = self.session_id_counter.lock(); + let mut new_id = self.session_id_counter.lock().unwrap(); *new_id += 1; return Some((SessionId::new_from_u64(*new_id).unwrap(), self.psk.clone(), 0)); } @@ -1765,10 +1767,10 @@ mod tests { //println!("zssp: size of session (bytes): {}", std::mem::size_of::>>()); - let _ = alice_host.session.lock().insert(Arc::new( + let _ = alice_host.session.lock().unwrap().insert(Arc::new( Session::new( &alice_host, - |data| bob_host.queue.lock().push_front(data.to_vec()), + |data| bob_host.queue.lock().unwrap().push_front(data.to_vec()), SessionId::new_random(), bob_host.local_s.public_key_bytes(), &[], @@ -1785,9 +1787,9 @@ mod tests { for host in [&alice_host, &bob_host] { let send_to_other = |data: &mut [u8]| { if std::ptr::eq(host, &alice_host) { - bob_host.queue.lock().push_front(data.to_vec()); + bob_host.queue.lock().unwrap().push_front(data.to_vec()); } else { - alice_host.queue.lock().push_front(data.to_vec()); + alice_host.queue.lock().unwrap().push_front(data.to_vec()); } }; @@ -1798,7 +1800,7 @@ mod tests { }; loop { - if let Some(qi) = host.queue.lock().pop_back() { + if let Some(qi) = host.queue.lock().unwrap().pop_back() { let qi_len = qi.len(); ts += 1; let r = rc.receive(host, &0, send_to_other, &mut data_buf, qi, mtu_buffer.len(), ts); @@ -1820,7 +1822,7 @@ mod tests { qi_len, u64::from(new_session.id) ); - let mut hs = host.session.lock(); + let mut hs = host.session.lock().unwrap(); assert!(hs.is_none()); let _ = hs.insert(Arc::new(new_session)); } @@ -1844,10 +1846,10 @@ mod tests { } data_buf.fill(0x12); - if let Some(session) = host.session.lock().as_ref().cloned() { + if let Some(session) = host.session.lock().unwrap().as_ref().cloned() { if session.established() { { - let mut key_id = host.key_id.lock(); + let mut key_id = host.key_id.lock().unwrap(); let security_info = session.security_info().unwrap(); if !security_info.0.eq(key_id.as_ref()) { *key_id = security_info.0; diff --git a/network-hypervisor/Cargo.toml b/network-hypervisor/Cargo.toml index 1d88632c5..590259c67 100644 --- a/network-hypervisor/Cargo.toml +++ b/network-hypervisor/Cargo.toml @@ -14,7 +14,6 @@ zerotier-crypto = { path = "../crypto" } zerotier-utils = { path = "../utils" } base64 = "^0" lz4_flex = { version = "^0", features = ["safe-encode", "safe-decode", "checked-decode"] } -parking_lot = { version = "^0", features = [], default-features = false } serde = { version = "^1", features = ["derive"], default-features = false } [dev-dependencies] diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index 83f2e669c..747320a88 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -4,11 +4,9 @@ use std::collections::HashMap; use std::hash::Hash; use std::io::Write; use std::sync::atomic::Ordering; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::Duration; -use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; - use crate::protocol::*; use crate::vl1::address::Address; use crate::vl1::debug_event; @@ -290,11 +288,11 @@ impl Node { } pub fn peer(&self, a: Address) -> Option>> { - self.peers.read().get(&a).cloned() + self.peers.read().unwrap().get(&a).cloned() } pub fn is_online(&self) -> bool { - self.roots.read().online + self.roots.read().unwrap().online } pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { @@ -303,7 +301,7 @@ impl Node { let time_ticks = host_system.time_ticks(); let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_queue_retry) = { - let mut intervals = self.intervals.lock(); + let mut intervals = self.intervals.lock().unwrap(); ( intervals.root_sync.gate(time_ticks), intervals.root_hello.gate(time_ticks), @@ -356,7 +354,7 @@ impl Node { if root_sync { if { - let mut roots = self.roots.write(); + let mut roots = self.roots.write().unwrap(); if roots.sets_modified { roots.sets_modified = false; true @@ -367,7 +365,7 @@ impl Node { debug_event!(host_system, "[vl1] root sets modified, synchronizing internal data structures"); let (mut old_root_identities, address_collisions, new_roots, bad_identities, my_root_sets) = { - let roots = self.roots.read(); + let roots = self.roots.read().unwrap(); let old_root_identities: Vec = roots.roots.iter().map(|(p, _)| p.identity.clone()).collect(); let mut new_roots = HashMap::new(); @@ -387,6 +385,7 @@ impl Node { } else if self .peers .read() + .unwrap() .get(&m.identity.address) .map_or(false, |p| !p.identity.eq(&m.identity)) || address_collision_check @@ -409,13 +408,16 @@ impl Node { m.identity.address.to_string(), m.endpoints.as_ref().map_or(0, |e| e.len()) ); - let peers = self.peers.upgradable_read(); + let peers = self.peers.read().unwrap(); if let Some(peer) = peers.get(&m.identity.address) { new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); } else { if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), time_ticks) { + drop(peers); new_roots.insert( - RwLockUpgradableReadGuard::upgrade(peers) + self.peers + .write() + .unwrap() .entry(m.identity.address) .or_insert_with(|| Arc::new(peer)) .clone(), @@ -450,7 +452,7 @@ impl Node { new_root_identities.sort_unstable(); if !old_root_identities.eq(&new_root_identities) { - let mut roots = self.roots.write(); + let mut roots = self.roots.write().unwrap(); roots.roots = new_roots; roots.this_root_sets = my_root_sets; host_system.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); @@ -458,7 +460,7 @@ impl Node { } { - let roots = self.roots.read(); + let roots = self.roots.read().unwrap(); // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison // this is a proxy for latency and also causes roots that fail to reply to drop out quickly. @@ -473,9 +475,10 @@ impl Node { } if let Some(best) = best { - let best_root = self.best_root.upgradable_read(); + let best_root = self.best_root.read().unwrap(); if best_root.as_ref().map_or(true, |br| !Arc::ptr_eq(&br, best)) { - let mut best_root = RwLockUpgradableReadGuard::upgrade(best_root); + drop(best_root); + let mut best_root = self.best_root.write().unwrap(); if let Some(best_root) = best_root.as_mut() { debug_event!( host_system, @@ -494,7 +497,7 @@ impl Node { } } } else { - if let Some(old_best) = self.best_root.write().take() { + if let Some(old_best) = self.best_root.write().unwrap().take() { debug_event!( host_system, "[vl1] selected new best root: NONE (replaced {})", @@ -507,12 +510,12 @@ impl Node { if (time_ticks - latest_hello_reply) < (ROOT_HELLO_INTERVAL * 2) && best.is_some() { if !roots.online { drop(roots); - self.roots.write().online = true; + self.roots.write().unwrap().online = true; host_system.event(Event::Online(true)); } } else if roots.online { drop(roots); - self.roots.write().online = false; + self.roots.write().unwrap().online = false; host_system.event(Event::Online(false)); } } @@ -524,7 +527,7 @@ impl Node { // external addresses from them. if root_hello || root_spam_hello { let roots = { - let roots = self.roots.read(); + let roots = self.roots.read().unwrap(); let mut roots_copy = Vec::with_capacity(roots.roots.len()); for (root, endpoints) in roots.roots.iter() { roots_copy.push((root.clone(), endpoints.clone())); @@ -551,15 +554,15 @@ impl Node { // roots. Roots on the other hand remain in the peer list as long as they are roots. let mut dead_peers = Vec::new(); { - let roots = self.roots.read(); - for (a, peer) in self.peers.read().iter() { + let roots = self.roots.read().unwrap(); + for (a, peer) in self.peers.read().unwrap().iter() { if !peer.service(host_system, self, time_ticks) && !roots.roots.contains_key(peer) { dead_peers.push(*a); } } } for dp in dead_peers.iter() { - self.peers.write().remove(dp); + self.peers.write().unwrap().remove(dp); } } @@ -568,7 +571,7 @@ impl Node { let mut need_keepalive = Vec::new(); // First check all paths in read mode to avoid blocking the entire node. - for (k, path) in self.paths.read().iter() { + for (k, path) in self.paths.read().unwrap().iter() { if host_system.local_socket_is_valid(k.local_socket()) { match path.service(time_ticks) { PathServiceResult::Ok => {} @@ -582,7 +585,7 @@ impl Node { // Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking. for dp in dead_paths.iter() { - self.paths.write().remove(dp); + self.paths.write().unwrap().remove(dp); } // Finally run keepalive sends as a batch. @@ -595,7 +598,7 @@ impl Node { if whois_queue_retry { let need_whois = { let mut need_whois = Vec::new(); - let mut whois_queue = self.whois_queue.lock(); + let mut whois_queue = self.whois_queue.lock().unwrap(); whois_queue.retain(|_, qi| qi.retry_count <= WHOIS_RETRY_COUNT_MAX); for (address, qi) in whois_queue.iter_mut() { if (time_ticks - qi.last_retry_time) >= WHOIS_RETRY_INTERVAL { @@ -811,7 +814,7 @@ impl Node { ) { debug_event!(host_system, "[vl1] [v1] WHOIS {}", address.to_string()); { - let mut whois_queue = self.whois_queue.lock(); + let mut whois_queue = self.whois_queue.lock().unwrap(); let qi = whois_queue.entry(address).or_insert_with(|| WhoisQueueItem:: { v1_proto_waiting_packets: RingBuffer::new(), last_retry_time: 0, @@ -865,11 +868,11 @@ impl Node { ) { if authoritative { if received_identity.validate_identity() { - let mut whois_queue = self.whois_queue.lock(); + let mut whois_queue = self.whois_queue.lock().unwrap(); if let Some(qi) = whois_queue.get_mut(&received_identity.address) { let address = received_identity.address; if inner.should_communicate_with(&received_identity) { - let mut peers = self.peers.write(); + let mut peers = self.peers.write().unwrap(); if let Some(peer) = peers.get(&address).cloned().or_else(|| { Peer::new(&self.identity, received_identity, time_ticks) .map(|p| Arc::new(p)) @@ -893,17 +896,17 @@ impl Node { /// Get the current "best" root from among this node's trusted roots. pub fn best_root(&self) -> Option>> { - self.best_root.read().clone() + self.best_root.read().unwrap().clone() } /// Check whether a peer is a root according to any root set trusted by this node. pub fn is_peer_root(&self, peer: &Peer) -> bool { - self.roots.read().roots.keys().any(|p| p.identity.eq(&peer.identity)) + self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity)) } /// Returns true if this node is a member of a root set (that it knows about). pub fn this_node_is_root(&self) -> bool { - self.roots.read().this_root_sets.is_some() + self.roots.read().unwrap().this_root_sets.is_some() } /// Called when a remote node sends us a root set update, applying the update if it is valid and applicable. @@ -911,7 +914,7 @@ impl Node { /// This will only replace an existing root set with a newer one. It won't add a new root set, which must be /// done by an authorized user or administrator not just by a root. pub(crate) fn remote_update_root_set(&self, received_from: &Identity, rs: Verified) { - let mut roots = self.roots.write(); + let mut roots = self.roots.write().unwrap(); if let Some(entry) = roots.sets.get_mut(&rs.name) { if entry.members.iter().any(|m| m.identity.eq(received_from)) && rs.should_replace(entry) { *entry = rs; @@ -922,7 +925,7 @@ impl Node { /// Add a new root set or update the existing root set if the new root set is newer and otherwise matches. pub fn add_update_root_set(&self, rs: Verified) -> bool { - let mut roots = self.roots.write(); + let mut roots = self.roots.write().unwrap(); if let Some(entry) = roots.sets.get_mut(&rs.name) { if rs.should_replace(entry) { *entry = rs; @@ -940,7 +943,7 @@ impl Node { /// Returns whether or not this node has any root sets defined. pub fn has_roots_defined(&self) -> bool { - self.roots.read().sets.iter().any(|rs| !rs.1.members.is_empty()) + self.roots.read().unwrap().sets.iter().any(|rs| !rs.1.members.is_empty()) } /// Initialize with default roots if there are no roots defined, otherwise do nothing. @@ -954,7 +957,7 @@ impl Node { /// Get the root sets that this node trusts. pub fn root_sets(&self) -> Vec { - self.roots.read().sets.values().cloned().map(|s| s.unwrap()).collect() + self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect() } /// Get the canonical Path object corresponding to an endpoint. @@ -965,13 +968,14 @@ impl Node { local_interface: &HostSystemImpl::LocalInterface, time_ticks: i64, ) -> Arc> { - let paths = self.paths.read(); + let paths = self.paths.read().unwrap(); if let Some(path) = paths.get(&PathKey::Ref(ep, local_socket)) { path.clone() } else { drop(paths); self.paths .write() + .unwrap() .entry(PathKey::Copied(ep.clone(), local_socket.clone())) .or_insert_with(|| Arc::new(Path::new(ep.clone(), local_socket.clone(), local_interface.clone(), time_ticks))) .clone() diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index 90659fb00..02efeea62 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -3,8 +3,7 @@ use std::collections::HashMap; use std::hash::{BuildHasher, Hasher}; use std::sync::atomic::{AtomicI64, Ordering}; - -use parking_lot::Mutex; +use std::sync::Mutex; use crate::protocol::*; use crate::vl1::endpoint::Endpoint; @@ -65,7 +64,7 @@ impl Path { packet: PooledPacketBuffer, time_ticks: i64, ) -> Option { - let mut fp = self.fragmented_packets.lock(); + let mut fp = self.fragmented_packets.lock().unwrap(); // Discard some old waiting packets if the total incoming fragments for a path exceeds a // sanity limit. This is to prevent memory exhaustion DOS attacks. @@ -107,6 +106,7 @@ impl Path { pub(crate) fn service(&self, time_ticks: i64) -> PathServiceResult { self.fragmented_packets .lock() + .unwrap() .retain(|_, frag| (time_ticks - frag.ts_ticks) < v1::FRAGMENT_EXPIRATION); if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PATH_EXPIRATION_TIME { if (time_ticks - self.last_send_time_ticks.load(Ordering::Relaxed)) >= PATH_KEEPALIVE_INTERVAL { diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index 1ebbbd8df..52ee75419 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -3,9 +3,7 @@ use std::collections::HashMap; use std::hash::Hash; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; -use std::sync::{Arc, Weak}; - -use parking_lot::{Mutex, RwLock}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use zerotier_crypto::poly1305; use zerotier_crypto::random; @@ -87,7 +85,7 @@ impl Peer { /// Get the remote version of this peer: major, minor, revision. /// Returns None if it's not yet known. pub fn version(&self) -> Option<(u8, u8, u16)> { - let rv = self.remote_node_info.read().remote_version; + let rv = self.remote_node_info.read().unwrap().remote_version; if rv.0 != 0 || rv.1 != 0 || rv.2 != 0 { Some(rv) } else { @@ -97,7 +95,7 @@ impl Peer { /// Get the remote protocol version of this peer or None if not yet known. pub fn protocol_version(&self) -> Option { - let pv = self.remote_node_info.read().remote_protocol_version; + let pv = self.remote_node_info.read().unwrap().remote_protocol_version; if pv != 0 { Some(pv) } else { @@ -107,7 +105,7 @@ impl Peer { /// Get current best path or None if there are no direct paths to this peer. pub fn direct_path(&self) -> Option>> { - for p in self.paths.lock().iter() { + for p in self.paths.lock().unwrap().iter() { let pp = p.path.upgrade(); if pp.is_some() { return pp; @@ -129,7 +127,7 @@ impl Peer { } fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc>, time_ticks: i64) { - let mut paths = self.paths.lock(); + let mut paths = self.paths.lock().unwrap(); match &new_path.endpoint { Endpoint::IpUdp(new_ip) => { @@ -194,7 +192,7 @@ impl Peer { pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool { // Prune dead paths and sort in descending order of quality. { - let mut paths = self.paths.lock(); + let mut paths = self.paths.lock().unwrap(); paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); if paths.capacity() > 16 { paths.shrink_to_fit(); @@ -205,6 +203,7 @@ impl Peer { // Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs). self.remote_node_info .write() + .unwrap() .reported_local_endpoints .retain(|_, ts| (time_ticks - *ts) < PEER_EXPIRATION_TIME); (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME @@ -286,7 +285,7 @@ impl Peer { let max_fragment_size = path.endpoint.max_fragment_size(); - if self.remote_node_info.read().remote_protocol_version >= 12 { + if self.remote_node_info.read().unwrap().remote_protocol_version >= 12 { let flags_cipher_hops = if packet.len() > max_fragment_size { v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV } else { @@ -503,7 +502,7 @@ impl Peer { self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); let mut path_is_known = false; - for p in self.paths.lock().iter_mut() { + for p in self.paths.lock().unwrap().iter_mut() { if std::ptr::eq(p.path.as_ptr(), source_path.as_ref()) { p.last_receive_time_ticks = time_ticks; path_is_known = true; @@ -585,7 +584,7 @@ impl Peer { if let Ok(identity) = Identity::unmarshal(payload, &mut cursor) { if identity.eq(&self.identity) { { - let mut remote_node_info = self.remote_node_info.write(); + let mut remote_node_info = self.remote_node_info.write().unwrap(); remote_node_info.remote_protocol_version = hello_fixed_headers.version_proto; remote_node_info.remote_version = ( hello_fixed_headers.version_major, @@ -679,6 +678,7 @@ impl Peer { if self .remote_node_info .write() + .unwrap() .reported_local_endpoints .insert(reported_endpoint, time_ticks) .is_none() diff --git a/network-hypervisor/src/vl1/rootset.rs b/network-hypervisor/src/vl1/rootset.rs index f5de2d98a..6c78ad279 100644 --- a/network-hypervisor/src/vl1/rootset.rs +++ b/network-hypervisor/src/vl1/rootset.rs @@ -6,12 +6,11 @@ use std::io::Write; use crate::vl1::identity::{Identity, IDENTITY_MAX_SIGNATURE_SIZE}; use crate::vl1::Endpoint; +use zerotier_crypto::verified::Verified; use zerotier_utils::arrayvec::ArrayVec; use zerotier_utils::buffer::Buffer; use zerotier_utils::marshalable::{Marshalable, UnmarshalError}; -use zerotier_crypto::verified::Verified; - use serde::{Deserialize, Serialize}; /// Description of a member of a root cluster. diff --git a/service/Cargo.toml b/service/Cargo.toml index 48cac2007..2e9be8981 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -16,7 +16,6 @@ zerotier-utils = { path = "../utils", features = ["tokio"] } zerotier-vl1-service = { path = "../vl1-service" } serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } -parking_lot = { version = "^0", features = [], default-features = false } clap = { version = "^3", features = ["std", "suggestions"], default-features = false } [target."cfg(windows)".dependencies] diff --git a/utils/Cargo.toml b/utils/Cargo.toml index a66eaa8ae..758b799c8 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -10,10 +10,9 @@ default = [] tokio = ["dep:tokio"] [dependencies] -parking_lot = { version = "^0", features = [], default-features = false } serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } -tokio = { version = "^1", default-features = false, features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], optional = true } +tokio = { version = "^1", default-features = false, features = ["fs", "io-util", "io-std", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], optional = true } [target."cfg(windows)".dependencies] winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index d515814b2..dd14641b2 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -40,12 +40,12 @@ pub fn ms_since_epoch() -> i64 { /// Get milliseconds since an arbitrary time in the past, guaranteed to monotonically increase. #[inline] pub fn ms_monotonic() -> i64 { - static STARTUP_INSTANT: parking_lot::RwLock> = parking_lot::RwLock::new(None); - let si = *STARTUP_INSTANT.read(); + static STARTUP_INSTANT: std::sync::RwLock> = std::sync::RwLock::new(None); + let si = *STARTUP_INSTANT.read().unwrap(); let instant_zero = if let Some(si) = si { si } else { - *STARTUP_INSTANT.write().get_or_insert(std::time::Instant::now()) + *STARTUP_INSTANT.write().unwrap().get_or_insert(std::time::Instant::now()) }; std::time::Instant::now().duration_since(instant_zero).as_millis() as i64 } diff --git a/utils/src/pool.rs b/utils/src/pool.rs index 583db344b..532d6f3b3 100644 --- a/utils/src/pool.rs +++ b/utils/src/pool.rs @@ -2,9 +2,7 @@ use std::ops::{Deref, DerefMut}; use std::ptr::NonNull; -use std::sync::{Arc, Weak}; - -use parking_lot::Mutex; +use std::sync::{Arc, Mutex, Weak}; /// Each pool requires a factory that creates and resets (for re-use) pooled objects. pub trait PoolFactory { @@ -78,7 +76,7 @@ where fn clone(&self) -> Self { let internal = unsafe { &mut *self.0.as_ptr() }; if let Some(p) = internal.return_pool.upgrade() { - if let Some(o) = p.pool.lock().pop() { + if let Some(o) = p.pool.lock().unwrap().pop() { let mut o = Self(o); *o.as_mut() = self.as_ref().clone(); o @@ -135,7 +133,7 @@ impl> Drop for Pooled { let internal = unsafe { &mut *self.0.as_ptr() }; if let Some(p) = internal.return_pool.upgrade() { p.factory.reset(&mut internal.obj); - p.pool.lock().push(self.0); + p.pool.lock().unwrap().push(self.0); } else { drop(unsafe { Box::from_raw(self.0.as_ptr()) }); } @@ -164,7 +162,7 @@ impl> Pool { /// Get a pooled object, or allocate one if the pool is empty. #[inline] pub fn get(&self) -> Pooled { - if let Some(o) = self.0.pool.lock().pop() { + if let Some(o) = self.0.pool.lock().unwrap().pop() { return Pooled::(o); } return Pooled::(unsafe { @@ -182,7 +180,7 @@ impl> Pool { /// be done to free some memory if there has been a spike in memory use. #[inline] pub fn purge(&self) { - for o in self.0.pool.lock().drain(..) { + for o in self.0.pool.lock().unwrap().drain(..) { drop(unsafe { Box::from_raw(o.as_ptr()) }) } } diff --git a/utils/src/reaper.rs b/utils/src/reaper.rs index a197cb5c0..ecfbe0b44 100644 --- a/utils/src/reaper.rs +++ b/utils/src/reaper.rs @@ -7,20 +7,20 @@ use tokio::time::Instant; /// Watches tokio jobs and times them out if they run past a deadline or aborts them all if the reaper is dropped. pub struct Reaper { - q: Arc<(parking_lot::Mutex, Instant)>>, Notify)>, + q: Arc<(std::sync::Mutex, Instant)>>, Notify)>, finisher: JoinHandle<()>, } impl Reaper { pub fn new(runtime: &tokio::runtime::Handle) -> Self { - let q = Arc::new((parking_lot::Mutex::new(VecDeque::with_capacity(16)), Notify::new())); + let q = Arc::new((std::sync::Mutex::new(VecDeque::with_capacity(16)), Notify::new())); Self { q: q.clone(), finisher: runtime.spawn(async move { loop { q.1.notified().await; loop { - let j = q.0.lock().pop_front(); + let j = q.0.lock().unwrap().pop_front(); if let Some(j) = j { let _ = tokio::time::timeout_at(j.1, j.0).await; } else { @@ -35,7 +35,7 @@ impl Reaper { /// Add a job to be executed with timeout at a given instant. #[inline] pub fn add(&self, job: JoinHandle<()>, deadline: Instant) { - self.q.0.lock().push_back((job, deadline)); + self.q.0.lock().unwrap().push_back((job, deadline)); self.q.1.notify_waiters(); } } @@ -44,6 +44,6 @@ impl Drop for Reaper { #[inline] fn drop(&mut self) { self.finisher.abort(); - self.q.0.lock().drain(..).for_each(|j| j.0.abort()); + self.q.0.lock().unwrap().drain(..).for_each(|j| j.0.abort()); } } diff --git a/utils/src/ringbuffermap.rs b/utils/src/ringbuffermap.rs index 9da5c3a31..e9038d181 100644 --- a/utils/src/ringbuffermap.rs +++ b/utils/src/ringbuffermap.rs @@ -1,7 +1,6 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. use std::hash::{Hash, Hasher}; - use std::mem::MaybeUninit; const EMPTY: u16 = 0xffff; diff --git a/vl1-service/Cargo.toml b/vl1-service/Cargo.toml index e0340de82..aec9dca5e 100644 --- a/vl1-service/Cargo.toml +++ b/vl1-service/Cargo.toml @@ -10,7 +10,6 @@ zerotier-network-hypervisor = { path = "../network-hypervisor" } zerotier-crypto = { path = "../crypto" } zerotier-utils = { path = "../utils" } num-traits = "^0" -parking_lot = { version = "^0", features = [], default-features = false } serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } diff --git a/vl1-service/src/datadir.rs b/vl1-service/src/datadir.rs index 92b8d26d9..cac09d468 100644 --- a/vl1-service/src/datadir.rs +++ b/vl1-service/src/datadir.rs @@ -2,9 +2,8 @@ use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Mutex, RwLock}; -use parking_lot::{Mutex, RwLock}; use serde::de::DeserializeOwned; use serde::Serialize; @@ -86,7 +85,7 @@ impl std::io::Result { - let authtoken = self.authtoken.lock().clone(); + let authtoken = self.authtoken.lock().unwrap().clone(); if authtoken.is_empty() { let authtoken_path = self.base_path.join(AUTH_TOKEN_FILENAME); let authtoken_bytes = read_limit(&authtoken_path, 4096); @@ -97,9 +96,9 @@ impl Arc { - self.config.read().clone() + self.config.read().unwrap().clone() } /// Save a modified copy of the configuration and replace the internal copy in this structure (if it's actually changed). pub fn save_config(&self, modified_config: Config) -> std::io::Result<()> { - if !modified_config.eq(&self.config.read()) { + if !modified_config.eq(&self.config.read().unwrap()) { let config_data = to_json_pretty(&modified_config); std::fs::write(self.base_path.join(CONFIG_FILENAME), config_data.as_bytes())?; - *self.config.write() = Arc::new(modified_config); + *self.config.write().unwrap() = Arc::new(modified_config); } Ok(()) } diff --git a/vl1-service/src/sys/ipv6.rs b/vl1-service/src/sys/ipv6.rs index b0542f78c..ddf3ba64b 100644 --- a/vl1-service/src/sys/ipv6.rs +++ b/vl1-service/src/sys/ipv6.rs @@ -6,8 +6,8 @@ use zerotier_network_hypervisor::vl1::InetAddress; #[cfg(any(target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "darwin"))] mod freebsd_like { use num_traits::AsPrimitive; - use parking_lot::Mutex; use std::mem::size_of; + use std::sync::Mutex; use zerotier_network_hypervisor::vl1::InetAddress; static INFO_SOCKET: Mutex = Mutex::new(-1); @@ -41,7 +41,7 @@ mod freebsd_like { pub fn is_ipv6_temporary(device_name: &str, address: &InetAddress) -> bool { if address.is_ipv6() { unsafe { - let mut info_socket = INFO_SOCKET.lock(); + let mut info_socket = INFO_SOCKET.lock().unwrap(); if *info_socket < 0 { *info_socket = libc::socket(libc::AF_INET6.as_(), libc::SOCK_DGRAM.as_(), 0) as i32; if *info_socket < 0 { diff --git a/vl1-service/src/sys/udp.rs b/vl1-service/src/sys/udp.rs index de9c8d3d2..d457abc92 100644 --- a/vl1-service/src/sys/udp.rs +++ b/vl1-service/src/sys/udp.rs @@ -8,7 +8,7 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; #[allow(unused_imports)] use std::ptr::{null, null_mut}; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use crate::localinterface::LocalInterface; @@ -67,7 +67,7 @@ pub struct BoundUdpSocket { pub interface: LocalInterface, last_receive_time: AtomicI64, fd: i32, - lock: parking_lot::RwLock<()>, + lock: RwLock<()>, open: AtomicBool, } @@ -222,7 +222,7 @@ impl BoundUdpPort { interface: interface.clone(), last_receive_time: AtomicI64::new(i64::MIN), fd, - lock: parking_lot::RwLock::new(()), + lock: RwLock::new(()), open: AtomicBool::new(true), }); diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index 21c66caf3..530c6e4c8 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -3,6 +3,7 @@ use std::collections::{HashMap, HashSet}; use std::error::Error; use std::sync::Arc; +use std::sync::RwLock; use std::thread::JoinHandle; use std::time::Duration; @@ -30,7 +31,7 @@ pub struct VL1Service< PathFilterImpl: PathFilter + 'static, InnerProtocolImpl: InnerProtocol + 'static, > { - state: parking_lot::RwLock, + state: RwLock, storage: Arc, inner: Arc, path_filter: Arc, @@ -40,7 +41,7 @@ pub struct VL1Service< struct VL1ServiceMutableState { daemons: Vec>, - udp_sockets: HashMap>, + udp_sockets: HashMap>, settings: VL1Settings, running: bool, } @@ -55,7 +56,7 @@ impl Result, Box> { let mut service = VL1Service { - state: parking_lot::RwLock::new(VL1ServiceMutableState { + state: RwLock::new(VL1ServiceMutableState { daemons: Vec::with_capacity(2), udp_sockets: HashMap::with_capacity(8), settings, @@ -78,7 +79,7 @@ impl Vec { - self.state.read().udp_sockets.keys().cloned().collect() + self.state.read().unwrap().udp_sockets.keys().cloned().collect() } fn update_udp_bindings(self: &Arc) { - let state = self.state.read(); + let state = self.state.read().unwrap(); let mut need_fixed_ports: HashSet = HashSet::from_iter(state.settings.fixed_ports.iter().cloned()); let mut have_random_port_count = 0; for (p, _) in state.udp_sockets.iter() { @@ -105,10 +106,10 @@ impl desired_random_port_count { @@ -116,7 +117,7 @@ impl { fn drop(&mut self) { - let mut state = self.state.write(); + let mut state = self.state.write().unwrap(); state.running = false; state.udp_sockets.clear(); let mut daemons: Vec> = state.daemons.drain(..).collect(); diff --git a/vl1-service/src/vl1settings.rs b/vl1-service/src/vl1settings.rs index 363e96a13..0af3e96d0 100644 --- a/vl1-service/src/vl1settings.rs +++ b/vl1-service/src/vl1settings.rs @@ -1,6 +1,7 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. use serde::{Deserialize, Serialize}; + use zerotier_network_hypervisor::vl1::InetAddress; #[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]