From 36a105ecbf37e96766531fe0d559b8f8766451a2 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Mon, 20 Jun 2022 15:11:01 -0400 Subject: [PATCH] It now binds and sends packets. They are not correct but they are sent. --- zerotier-core-crypto/src/poly1305.rs | 2 +- zerotier-network-hypervisor/src/event.rs | 2 +- .../src/networkhypervisor.rs | 8 +- zerotier-network-hypervisor/src/util/mod.rs | 6 +- zerotier-network-hypervisor/src/vl1/node.rs | 166 +++++++++++++----- zerotier-network-hypervisor/src/vl1/peer.rs | 17 +- .../src/vl1/protocol.rs | 3 + zerotier-system-service/src/service.rs | 2 +- 8 files changed, 140 insertions(+), 66 deletions(-) diff --git a/zerotier-core-crypto/src/poly1305.rs b/zerotier-core-crypto/src/poly1305.rs index bf6ca7d8e..2ec9f6ea3 100644 --- a/zerotier-core-crypto/src/poly1305.rs +++ b/zerotier-core-crypto/src/poly1305.rs @@ -18,7 +18,7 @@ impl Poly1305 { #[inline(always)] pub fn update(&mut self, data: &[u8]) { - self.0.update(poly1305::Block::from_slice(data)); + self.0.update_padded(data); } #[inline(always)] diff --git a/zerotier-network-hypervisor/src/event.rs b/zerotier-network-hypervisor/src/event.rs index 9dd50217f..1b5a22614 100644 --- a/zerotier-network-hypervisor/src/event.rs +++ b/zerotier-network-hypervisor/src/event.rs @@ -30,7 +30,7 @@ impl ToString for Event { fn to_string(&self) -> String { match self { Event::Online(online) => format!("[vl1] online == {}", online), - Event::Debug(l, f, m) => format!("[debug] {}:{} {}", l, f, m), + Event::Debug(l, f, m) => format!("[debug] {}:{} {}", l.split("/").last().unwrap(), f, m), Event::SecurityWarning(w) => format!("[global] security warning: {}", w), Event::FatalError(e) => format!("[global] FATAL: {}", e), Event::IdentityAutoGenerated(id) => format!("[vl1] identity auto-generated: {}", id.to_string()), diff --git a/zerotier-network-hypervisor/src/networkhypervisor.rs b/zerotier-network-hypervisor/src/networkhypervisor.rs index 3f7b40bfa..98f58afa4 100644 --- a/zerotier-network-hypervisor/src/networkhypervisor.rs +++ b/zerotier-network-hypervisor/src/networkhypervisor.rs @@ -85,11 +85,7 @@ impl NetworkHypervisor { } /// Call add_update_default_root_set if there are no roots defined, otherwise do nothing and return false. - pub fn add_update_default_root_set_if_none(&self) -> bool { - if self.vl1.has_roots_defined() { - false - } else { - self.add_update_default_root_set() - } + pub fn add_update_default_root_set_if_none(&self) { + assert!(self.add_update_default_root_set()); } } diff --git a/zerotier-network-hypervisor/src/util/mod.rs b/zerotier-network-hypervisor/src/util/mod.rs index 50ed13e78..83482fe35 100644 --- a/zerotier-network-hypervisor/src/util/mod.rs +++ b/zerotier-network-hypervisor/src/util/mod.rs @@ -10,15 +10,15 @@ pub use zerotier_core_crypto::varint; pub(crate) const ZEROES: [u8; 64] = [0_u8; 64]; -#[cfg(target_feature = "debug_events")] +#[cfg(feature = "debug_events")] #[allow(unused_macros)] macro_rules! debug_event { ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => { - $si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))).await; + $si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))); } } -#[cfg(not(target_feature = "debug_events"))] +#[cfg(not(feature = "debug_events"))] #[allow(unused_macros)] macro_rules! debug_event { ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {}; diff --git a/zerotier-network-hypervisor/src/vl1/node.rs b/zerotier-network-hypervisor/src/vl1/node.rs index 454404a3e..dc78f0e36 100644 --- a/zerotier-network-hypervisor/src/vl1/node.rs +++ b/zerotier-network-hypervisor/src/vl1/node.rs @@ -32,7 +32,10 @@ pub trait SystemInterface: Sync + Send + 'static { type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; /// An event occurred. - async fn event(&self, event: Event); + /// + /// This isn't async to avoid all kinds of issues in code that deals with locks. If you need + /// it to be async use a channel or something. + fn event(&self, event: Event); /// A USER_MESSAGE packet was received. async fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]); @@ -123,15 +126,17 @@ const ROOT_SYNC_INTERVAL_MS: i64 = 1000; struct BackgroundTaskIntervals { root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>, - peers: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>, - paths: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>, - whois: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>, + root_spam_hello: IntervalGate<{ ROOT_HELLO_SPAM_INTERVAL }>, + peer_service: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>, + path_service: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>, + whois_service: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>, } struct RootInfo { roots: HashMap>, Vec>, sets: HashMap, sets_modified: bool, + online: bool, } enum PathKey<'a, SI: SystemInterface> { @@ -224,7 +229,7 @@ impl Node { return Err(InvalidParameterError("no identity found and auto-generate not enabled")); } else { let id = Identity::generate(); - si.event(Event::IdentityAutoGenerated(id.clone())).await; + si.event(Event::IdentityAutoGenerated(id.clone())); si.save_node_identity(&id).await; id } @@ -237,10 +242,12 @@ impl Node { let old = id.clone(); if id.upgrade()? { si.save_node_identity(&id).await; - si.event(Event::IdentityAutoUpgraded(old, id.clone())).await; + si.event(Event::IdentityAutoUpgraded(old, id.clone())); } } + debug_event!(si, "[vl1] loaded identity {}", id.to_string()); + Ok(Self { instance_id: zerotier_core_crypto::random::get_bytes_secure(), identity: id, @@ -251,6 +258,7 @@ impl Node { roots: HashMap::new(), sets: HashMap::new(), sets_modified: false, + online: false, }), best_root: RwLock::new(None), whois: WhoisQueue::new(), @@ -267,13 +275,79 @@ impl Node { self.peers.read().get(&a).cloned() } + pub fn is_online(&self) -> bool { + self.roots.read().online + } + + fn update_best_root(&self, si: &SI, time_ticks: i64) { + let roots = self.roots.read(); + + // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison + // this is a proxy for latency and also causes roots that fail to reply to drop out quickly. + let mut best: Option<&Arc>> = None; + let mut latest_hello_reply = 0; + for (r, _) in roots.roots.iter() { + let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed); + if t > latest_hello_reply { + latest_hello_reply = t; + let _ = best.insert(r); + } + } + + if let Some(best) = best { + let mut best_root = self.best_root.write(); + if let Some(best_root) = best_root.as_mut() { + if !Arc::ptr_eq(best_root, best) { + debug_event!(si, "[vl1] new best root: {} (replaced {})", best.identity.address.to_string(), best_root.identity.address.to_string()); + *best_root = best.clone(); + } + } else { + debug_event!(si, "[vl1] new best root: {} (was empty)", best.identity.address.to_string()); + let _ = best_root.insert(best.clone()); + } + } else { + if let Some(old_best) = self.best_root.write().take() { + debug_event!(si, "[vl1] new best root: NONE (replaced {})", old_best.identity.address.to_string()); + } + } + + // Determine if the node is online by whether there is a currently reachable root. + if (time_ticks - latest_hello_reply) < (ROOT_HELLO_INTERVAL * 2) && best.is_some() { + if !roots.online { + drop(roots); + self.roots.write().online = true; + si.event(Event::Online(true)); + } + } else if roots.online { + drop(roots); + self.roots.write().online = false; + si.event(Event::Online(false)); + } + } + pub async fn do_background_tasks(&self, si: &SI) -> Duration { let tt = si.time_ticks(); - let (root_sync, root_hello, peer_check, path_check, whois_check) = { + let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_service) = { let mut intervals = self.intervals.lock(); - (intervals.root_sync.gate(tt), intervals.root_hello.gate(tt), intervals.peers.gate(tt), intervals.paths.gate(tt), intervals.whois.gate(tt)) + (intervals.root_sync.gate(tt), intervals.root_hello.gate(tt), intervals.root_spam_hello.gate(tt), intervals.peer_service.gate(tt), intervals.path_service.gate(tt), intervals.whois_service.gate(tt)) }; + // We only "spam" if we are offline. + if root_spam_hello && self.is_online() { + root_spam_hello = false; + } + + debug_event!( + si, + "[vl1] do_background_tasks:{}{}{}{}{}{}", + if root_sync { " root_sync" } else { "" }, + if root_hello { " root_hello" } else { "" }, + if root_spam_hello { " root_spam_hello" } else { "" }, + if peer_service { " peer_service" } else { "" }, + if path_service { " path_service" } else { "" }, + if whois_service { " whois_service" } else { "" }, + ); + if root_sync { if { let mut roots = self.roots.write(); @@ -284,6 +358,8 @@ impl Node { false } } { + debug_event!(si, "[vl1] root sets modified, synchronizing internal data structures"); + let (mut old_root_identities, address_collisions, new_roots, bad_identities) = { let roots = self.roots.read(); @@ -309,11 +385,12 @@ impl Node { for (_, rc) in roots.sets.iter() { for m in rc.members.iter() { if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) { + debug_event!(si, "[vl1] examining root {} with {} endpoints", m.identity.address.to_string(), m.endpoints.as_ref().map_or(0, |e| e.len())); let peers = self.peers.upgradable_read(); if let Some(peer) = peers.get(&m.identity.address) { new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); } else { - if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), si.time_clock()) { + if let Some(peer) = Peer::::new(&self.identity, m.identity.clone(), si.time_clock(), tt) { new_roots.insert(parking_lot::RwLockUpgradableReadGuard::upgrade(peers).entry(m.identity.address).or_insert_with(|| Arc::new(peer)).clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); } else { bad_identities.push(m.identity.clone()); @@ -327,10 +404,10 @@ impl Node { }; for c in address_collisions.iter() { - si.event(Event::SecurityWarning(format!("address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string()))).await; + si.event(Event::SecurityWarning(format!("address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string()))); } for i in bad_identities.iter() { - si.event(Event::SecurityWarning(format!("bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string()))).await; + si.event(Event::SecurityWarning(format!("bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string()))); } let mut new_root_identities: Vec = new_roots.iter().map(|(p, _)| p.identity.clone()).collect(); @@ -338,37 +415,19 @@ impl Node { old_root_identities.sort_unstable(); new_root_identities.sort_unstable(); if !old_root_identities.eq(&new_root_identities) { - let mut best: Option>> = None; - - { - let mut roots = self.roots.write(); - roots.roots = new_roots; - - // The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison - // this is a proxy for latency and also causes roots that fail to reply to drop out quickly. - let mut latest_hello_reply = 0; - for (r, _) in roots.roots.iter() { - let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed); - if t >= latest_hello_reply { - latest_hello_reply = t; - let _ = best.insert(r.clone()); - } - } - } - - *(self.best_root.write()) = best; - - //debug_event!(si, "new best root: {}", best.as_ref().map_or("none".into(), |p| p.identity.address.to_string())); - //si.event(Event::UpdatedRoots(old_root_identities, new_root_identities)).await; + self.roots.write().roots = new_roots; + si.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); } } + + self.update_best_root(si, tt); } // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint // they have, which is a behavior that differs from normal peers. This allows roots to // e.g. see our IPv4 and our IPv6 address which can be important for us to learn our // external addresses from them. - if root_hello { + if root_hello || root_spam_hello { let roots = { let roots = self.roots.read(); let mut roots_copy = Vec::with_capacity(roots.roots.len()); @@ -385,7 +444,7 @@ impl Node { } } - if peer_check { + if peer_service { // Service all peers, removing any whose service() method returns false AND that are not // roots. Roots on the other hand remain in the peer list as long as they are roots. let mut dead_peers = Vec::new(); @@ -402,7 +461,7 @@ impl Node { } } - if path_check { + if path_service { // Service all paths, removing expired or invalid ones. This is done in two passes to // avoid introducing latency into a flow. let mut dead_paths = Vec::new(); @@ -428,18 +487,19 @@ impl Node { } } - if whois_check { + if whois_service { self.whois.service(si, self, tt); } - Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2) + Duration::from_millis(1000) } pub async fn handle_incoming_physical_packet(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) { debug_event!( si, - "<< #{} ->{} from {} length {} via socket {}@{}", + "[vl1] #{} {}->{} via {} length {} via socket {}@{}", data.bytes_fixed_at::<8>(0).map_or("????????????????".into(), |pid| zerotier_core_crypto::hex::to_string(pid)), + data.bytes_fixed_at::<5>(13).map_or("??????????".into(), |src| zerotier_core_crypto::hex::to_string(src)), data.bytes_fixed_at::<5>(8).map_or("??????????".into(), |dest| zerotier_core_crypto::hex::to_string(dest)), source_endpoint.to_string(), data.len(), @@ -455,11 +515,13 @@ impl Node { path.log_receive_anything(time_ticks); if fragment_header.is_fragment() { + #[cfg(debug_assertions)] + let fragment_header_id = u64::from_be_bytes(fragment_header.id); debug_event!(si, "-- #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments()); if let Some(assembled_packet) = path.receive_fragment(fragment_header.packet_id(), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) { if let Some(frag0) = assembled_packet.frags[0].as_ref() { - debug_event!(si, "-- #{:0>16x} packet fully assembled!", u64::from_be_bytes(fragment_header.id)); + debug_event!(si, "-- #{:0>16x} packet fully assembled!", fragment_header_id); if let Ok(packet_header) = frag0.struct_at::(0) { if let Some(source) = Address::from_bytes(&packet_header.src) { @@ -473,8 +535,9 @@ impl Node { } } } else { + #[cfg(debug_assertions)] if let Ok(packet_header) = data.struct_at::(0) { - debug_event!(si, "-- #{:0>16x} is unfragmented", u64::from_be_bytes(fragment_header.id)); + debug_event!(si, "-- #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id)); if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { @@ -486,17 +549,28 @@ impl Node { } } } else { + #[cfg(debug_assertions)] + let debug_packet_id; + if fragment_header.is_fragment() { - debug_event!(si, "-- #{:0>16x} forwarding packet fragment to {}", u64::from_be_bytes(fragment_header.id), dest.to_string()); + #[cfg(debug_assertions)] + { + debug_packet_id = u64::from_be_bytes(fragment_header.id); + } + debug_event!(si, "-- #{:0>16x} forwarding packet fragment to {}", debug_packet_id, dest.to_string()); if fragment_header.increment_hops() > FORWARD_MAX_HOPS { - debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(fragment_header.id)); + debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", debug_packet_id); return; } } else { if let Ok(packet_header) = data.struct_mut_at::(0) { - debug_event!(si, "-- #{:0>16x} forwarding packet to {}", u64::from_be_bytes(fragment_header.id), dest.to_string()); + #[cfg(debug_assertions)] + { + debug_packet_id = u64::from_be_bytes(packet_header.id); + } + debug_event!(si, "-- #{:0>16x} forwarding packet to {}", debug_packet_id, dest.to_string()); if packet_header.increment_hops() > FORWARD_MAX_HOPS { - debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(fragment_header.id)); + debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(packet_header.id)); return; } } else { @@ -507,7 +581,7 @@ impl Node { if let Some(peer) = self.peer(dest) { // TODO: SHOULD we forward? Need a way to check. peer.forward(si, time_ticks, data.as_ref()).await; - debug_event!(si, "-- #{:0>16x} forwarded successfully", u64::from_be_bytes(fragment_header.id)); + debug_event!(si, "-- #{:0>16x} forwarded successfully", debug_packet_id); } } } diff --git a/zerotier-network-hypervisor/src/vl1/peer.rs b/zerotier-network-hypervisor/src/vl1/peer.rs index 30f8d0622..a4ef3ecab 100644 --- a/zerotier-network-hypervisor/src/vl1/peer.rs +++ b/zerotier-network-hypervisor/src/vl1/peer.rs @@ -52,6 +52,7 @@ pub struct Peer { last_receive_time_ticks: AtomicI64, pub(crate) last_hello_reply_time_ticks: AtomicI64, last_forward_time_ticks: AtomicI64, + create_time_ticks: i64, // Counter for assigning sequential message IDs. message_id_counter: AtomicU64, @@ -164,7 +165,7 @@ impl Peer { /// /// This only returns None if this_node_identity does not have its secrets or if some /// fatal error occurs performing key agreement between the two identities. - pub(crate) fn new(this_node_identity: &Identity, id: Identity, time_clock: i64) -> Option> { + pub(crate) fn new(this_node_identity: &Identity, id: Identity, time_clock: i64, time_ticks: i64) -> Option> { this_node_identity.agree(&id).map(|static_secret| -> Self { Self { identity: id, @@ -175,6 +176,7 @@ impl Peer { last_receive_time_ticks: AtomicI64::new(0), last_forward_time_ticks: AtomicI64::new(0), last_hello_reply_time_ticks: AtomicI64::new(0), + create_time_ticks: time_ticks, message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)), remote_version: AtomicU64::new(0), remote_protocol_version: AtomicU8::new(0), @@ -244,17 +246,15 @@ impl Peer { paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); Self::prioritize_paths(&mut paths); } - if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PEER_EXPIRATION_TIME { - true - } else { - false - } + (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME } /// Receive, decrypt, authenticate, and process an incoming packet from this peer. /// /// If the packet comes in multiple fragments, the fragments slice should contain all /// those fragments after the main packet header and first chunk. + /// + /// This returns true if the packet decrypted and passed authentication. pub(crate) async fn receive(&self, node: &Node, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc>, header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option]) { if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) { //let mut payload = unsafe { PacketBuffer::new_without_memzero() }; @@ -286,8 +286,6 @@ impl Peer { } if let Ok(mut verb) = payload.u8_at(0) { - self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); - let extended_authentication = (verb & packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION) != 0; if extended_authentication { if payload.len() >= SHA512_HASH_SIZE { @@ -320,6 +318,8 @@ impl Peer { } } + self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed); + let mut path_is_known = false; for p in self.paths.lock().iter_mut() { if p.path_internal_instance_id == source_path.internal_instance_id { @@ -609,6 +609,7 @@ impl Peer { match ok_header.in_re_verb { verbs::VL1_HELLO => { // TODO + self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed); } verbs::VL1_WHOIS => {} _ => { diff --git a/zerotier-network-hypervisor/src/vl1/protocol.rs b/zerotier-network-hypervisor/src/vl1/protocol.rs index 54269f31a..758a853e3 100644 --- a/zerotier-network-hypervisor/src/vl1/protocol.rs +++ b/zerotier-network-hypervisor/src/vl1/protocol.rs @@ -247,6 +247,9 @@ pub const PATH_EXPIRATION_TIME: i64 = (PATH_KEEPALIVE_INTERVAL * 2) + 10000; /// How often to send HELLOs to roots, which is more often than normal peers. pub const ROOT_HELLO_INTERVAL: i64 = PATH_KEEPALIVE_INTERVAL * 2; +/// How often to send HELLOs to roots when we are offline. +pub const ROOT_HELLO_SPAM_INTERVAL: i64 = 5000; + /// How often to send HELLOs to regular peers. pub const PEER_HELLO_INTERVAL_MAX: i64 = 300000; diff --git a/zerotier-system-service/src/service.rs b/zerotier-system-service/src/service.rs index 2612bb836..bd3949fae 100644 --- a/zerotier-system-service/src/service.rs +++ b/zerotier-system-service/src/service.rs @@ -157,7 +157,7 @@ impl SystemInterface for ServiceImpl { type LocalSocket = crate::service::LocalSocket; type LocalInterface = crate::localinterface::LocalInterface; - async fn event(&self, event: Event) { + fn event(&self, event: Event) { println!("{}", event.to_string()); match event { _ => {}