From 3e713360e358fbde541ff434305bfc5d5d4e506c Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 13 Oct 2022 13:45:07 -0400 Subject: [PATCH] Ease up some template restrictions and wire up some more stuff in controller. --- controller/src/handler.rs | 77 +++++++++----- controller/src/main.rs | 3 + network-hypervisor/src/vl1/node.rs | 154 +++++++++++++-------------- network-hypervisor/src/vl1/path.rs | 4 +- network-hypervisor/src/vl1/peer.rs | 26 ++--- network-hypervisor/src/vl2/switch.rs | 6 +- vl1-service/src/vl1service.rs | 3 +- 7 files changed, 148 insertions(+), 125 deletions(-) diff --git a/controller/src/handler.rs b/controller/src/handler.rs index 47fe7d7d7..29b8d6bc0 100644 --- a/controller/src/handler.rs +++ b/controller/src/handler.rs @@ -1,7 +1,7 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. use std::error::Error; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::time::{Duration, Instant}; @@ -13,52 +13,42 @@ use zerotier_utils::error::{InvalidParameterError, UnexpectedError}; use zerotier_utils::ms_since_epoch; use zerotier_utils::reaper::Reaper; use zerotier_utils::tokio; +use zerotier_vl1_service::VL1Service; use crate::database::*; use crate::model::{AuthorizationResult, Member, CREDENTIAL_WINDOW_SIZE_DEFAULT}; +// A netconf per-query task timeout, just a sanity limit. const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); +/// ZeroTier VL2 network controller packet handler, answers VL2 netconf queries. pub struct Handler { inner: Arc>, - change_watcher: Option>, } struct Inner { reaper: Reaper, + daemons: Mutex>>, // drop() aborts these runtime: tokio::runtime::Handle, database: Arc, local_identity: Identity, } impl Handler { + /// Start an inner protocol handler answer ZeroTier VL2 network controller queries. pub async fn new(database: Arc, runtime: tokio::runtime::Handle) -> Result, Box> { if let Some(local_identity) = database.load_node_identity() { assert!(local_identity.secret.is_some()); let inner = Arc::new(Inner:: { reaper: Reaper::new(&runtime), + daemons: Mutex::new(Vec::with_capacity(1)), runtime, database: database.clone(), local_identity, }); - let h = Arc::new(Self { - inner: inner.clone(), - change_watcher: database.changes().await.map(|mut ch| { - let inner2 = inner.clone(); - inner.runtime.spawn(async move { - loop { - if let Ok(change) = ch.recv().await { - inner2.reaper.add( - inner2.runtime.spawn(inner2.clone().handle_change_notification(change)), - Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), - ); - } - } - }) - }), - }); + let h = Arc::new(Self { inner: inner.clone() }); Ok(h) } else { @@ -67,16 +57,33 @@ impl Handler { ))) } } -} -impl Drop for Handler { - fn drop(&mut self) { - let _ = self.change_watcher.take().map(|w| w.abort()); + /// Start a change watcher to respond to changes detected by the database. + /// This should only be called once, though multiple calls won't do anything but create unnecessary async tasks. + pub async fn start_change_watcher(&self, service: &Arc>) { + if let Some(cw) = self.inner.database.changes().await.map(|mut ch| { + let inner = self.inner.clone(); + let service = service.clone(); + self.inner.runtime.spawn(async move { + loop { + if let Ok(change) = ch.recv().await { + inner.reaper.add( + inner + .runtime + .spawn(inner.clone().handle_change_notification(service.clone(), change)), + Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), + ); + } + } + }) + }) { + self.inner.daemons.lock().unwrap().push(cw); + } } } impl PathFilter for Handler { - fn should_use_physical_path( + fn should_use_physical_path( &self, _id: &Identity, _endpoint: &zerotier_network_hypervisor::vl1::Endpoint, @@ -86,7 +93,7 @@ impl PathFilter for Handler { true } - fn get_path_hints( + fn get_path_hints( &self, _id: &Identity, ) -> Option< @@ -101,7 +108,7 @@ impl PathFilter for Handler { } impl InnerProtocol for Handler { - fn handle_packet( + fn handle_packet( &self, _node: &Node, source: &Arc>, @@ -177,7 +184,7 @@ impl InnerProtocol for Handler { } } - fn handle_error( + fn handle_error( &self, _node: &Node, _source: &Arc>, @@ -192,7 +199,7 @@ impl InnerProtocol for Handler { PacketHandlerResult::NotHandled } - fn handle_ok( + fn handle_ok( &self, _node: &Node, _source: &Arc>, @@ -212,11 +219,15 @@ impl InnerProtocol for Handler { } impl Inner { - async fn handle_change_notification(self: Arc, _change: Change) { + async fn handle_change_notification( + self: Arc, + service: Arc, Handler>>, + _change: Change, + ) { todo!() } - async fn handle_network_config_request( + async fn handle_network_config_request( self: Arc, source: Arc>, _source_path: Arc>, @@ -339,3 +350,11 @@ impl Inner { Ok((authorization_result, nc)) } } + +impl Drop for Inner { + fn drop(&mut self) { + for h in self.daemons.lock().unwrap().drain(..) { + h.abort(); + } + } +} diff --git a/controller/src/main.rs b/controller/src/main.rs index 750fe6814..c920238a5 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -22,15 +22,18 @@ async fn run(database: Arc, runtime: &Runt exitcode::ERR_CONFIG } else { let handler = handler.unwrap(); + let svc = VL1Service::new( database.clone(), handler.clone(), handler.clone(), zerotier_vl1_service::VL1Settings::default(), ); + if svc.is_ok() { let svc = svc.unwrap(); svc.node().init_default_roots(); + handler.start_change_watcher(&svc).await; // Wait for kill signal on Unix-like platforms. #[cfg(unix)] diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index e73b5ce3f..3ca60aea3 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -31,10 +31,10 @@ use zerotier_utils::ringbuffer::RingBuffer; /// during calls to things like wire_recieve() and do_background_tasks(). pub trait HostSystem: Sync + Send + 'static { /// Type for local system sockets. - type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + 'static; + type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; /// Type for local system interfaces. - type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString; + type LocalInterface: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized; /// A VL1 level event occurred. fn event(&self, event: Event); @@ -91,7 +91,7 @@ pub trait NodeStorage: Sync + Send + 'static { /// Trait to be implemented to provide path hints and a filter to approve physical paths. pub trait PathFilter: Sync + Send + 'static { /// Called to check and see if a physical address should be used for ZeroTier traffic to a node. - fn should_use_physical_path( + fn should_use_physical_path( &self, id: &Identity, endpoint: &Endpoint, @@ -100,7 +100,7 @@ pub trait PathFilter: Sync + Send + 'static { ) -> bool; /// Called to look up any statically defined or memorized paths to known nodes. - fn get_path_hints( + fn get_path_hints( &self, id: &Identity, ) -> Option< @@ -132,7 +132,7 @@ pub trait InnerProtocol: Sync + Send + 'static { /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). - fn handle_packet( + fn handle_packet( &self, node: &Node, source: &Arc>, @@ -143,7 +143,7 @@ pub trait InnerProtocol: Sync + Send + 'static { ) -> PacketHandlerResult; /// Handle errors, returning true if the error was recognized. - fn handle_error( + fn handle_error( &self, node: &Node, source: &Arc>, @@ -157,7 +157,7 @@ pub trait InnerProtocol: Sync + Send + 'static { ) -> PacketHandlerResult; /// Handle an OK, returing true if the OK was recognized. - fn handle_ok( + fn handle_ok( &self, node: &Node, source: &Arc>, @@ -176,7 +176,7 @@ pub trait InnerProtocol: Sync + Send + 'static { /// How often to check the root cluster definitions against the root list and update. const ROOT_SYNC_INTERVAL_MS: i64 = 1000; -struct RootInfo { +struct RootInfo { /// Root sets to which we are a member. sets: HashMap>, @@ -206,14 +206,14 @@ struct BackgroundTaskIntervals { } /// WHOIS requests and any packets that are waiting on them to be decrypted and authenticated. -struct WhoisQueueItem { +struct WhoisQueueItem { v1_proto_waiting_packets: RingBuffer<(Weak>, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, last_retry_time: i64, retry_count: u16, } /// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network. -pub struct Node { +pub struct Node { /// A random ID generated to identify this particular running instance. /// /// This can be used to implement multi-homing by allowing remote nodes to distinguish instances @@ -242,8 +242,8 @@ pub struct Node { whois_queue: Mutex>>, } -impl Node { - pub fn new( +impl Node { + pub fn new( host_system: &HostSystemImpl, storage: &NodeStorageImpl, auto_generate_identity: bool, @@ -301,6 +301,58 @@ impl Node { self.roots.read().unwrap().online } + /// Get the current "best" root from among this node's trusted roots. + pub fn best_root(&self) -> Option>> { + self.best_root.read().unwrap().clone() + } + + /// Check whether a peer is a root according to any root set trusted by this node. + pub fn is_peer_root(&self, peer: &Peer) -> bool { + self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity)) + } + + /// Returns true if this node is a member of a root set (that it knows about). + pub fn this_node_is_root(&self) -> bool { + self.roots.read().unwrap().this_root_sets.is_some() + } + + /// Add a new root set or update the existing root set if the new root set is newer and otherwise matches. + pub fn add_update_root_set(&self, rs: Verified) -> bool { + let mut roots = self.roots.write().unwrap(); + if let Some(entry) = roots.sets.get_mut(&rs.name) { + if rs.should_replace(entry) { + *entry = rs; + roots.sets_modified = true; + true + } else { + false + } + } else { + let _ = roots.sets.insert(rs.name.clone(), rs); + roots.sets_modified = true; + true + } + } + + /// Returns whether or not this node has any root sets defined. + pub fn has_roots_defined(&self) -> bool { + self.roots.read().unwrap().sets.iter().any(|rs| !rs.1.members.is_empty()) + } + + /// Initialize with default roots if there are no roots defined, otherwise do nothing. + pub fn init_default_roots(&self) -> bool { + if !self.has_roots_defined() { + self.add_update_root_set(RootSet::zerotier_default()) + } else { + false + } + } + + /// Get the root sets that this node trusts. + pub fn root_sets(&self) -> Vec { + self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect() + } + pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { const INTERVAL_MS: i64 = 1000; const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64); @@ -624,7 +676,7 @@ impl Node { INTERVAL } - pub fn handle_incoming_physical_packet( + pub fn handle_incoming_physical_packet( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -864,7 +916,7 @@ impl Node { } /// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS). - pub(crate) fn handle_incoming_identity( + pub(crate) fn handle_incoming_identity( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -900,26 +952,11 @@ impl Node { } } - /// Get the current "best" root from among this node's trusted roots. - pub fn best_root(&self) -> Option>> { - self.best_root.read().unwrap().clone() - } - - /// Check whether a peer is a root according to any root set trusted by this node. - pub fn is_peer_root(&self, peer: &Peer) -> bool { - self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity)) - } - - /// Returns true if this node is a member of a root set (that it knows about). - pub fn this_node_is_root(&self) -> bool { - self.roots.read().unwrap().this_root_sets.is_some() - } - /// Called when a remote node sends us a root set update, applying the update if it is valid and applicable. /// /// This will only replace an existing root set with a newer one. It won't add a new root set, which must be /// done by an authorized user or administrator not just by a root. - pub(crate) fn remote_update_root_set(&self, received_from: &Identity, rs: Verified) { + pub(crate) fn on_remote_update_root_set(&self, received_from: &Identity, rs: Verified) { let mut roots = self.roots.write().unwrap(); if let Some(entry) = roots.sets.get_mut(&rs.name) { if entry.members.iter().any(|m| m.identity.eq(received_from)) && rs.should_replace(entry) { @@ -929,43 +966,6 @@ impl Node { } } - /// Add a new root set or update the existing root set if the new root set is newer and otherwise matches. - pub fn add_update_root_set(&self, rs: Verified) -> bool { - let mut roots = self.roots.write().unwrap(); - if let Some(entry) = roots.sets.get_mut(&rs.name) { - if rs.should_replace(entry) { - *entry = rs; - roots.sets_modified = true; - true - } else { - false - } - } else { - let _ = roots.sets.insert(rs.name.clone(), rs); - roots.sets_modified = true; - true - } - } - - /// Returns whether or not this node has any root sets defined. - pub fn has_roots_defined(&self) -> bool { - self.roots.read().unwrap().sets.iter().any(|rs| !rs.1.members.is_empty()) - } - - /// Initialize with default roots if there are no roots defined, otherwise do nothing. - pub fn init_default_roots(&self) -> bool { - if !self.has_roots_defined() { - self.add_update_root_set(RootSet::zerotier_default()) - } else { - false - } - } - - /// Get the root sets that this node trusts. - pub fn root_sets(&self) -> Vec { - self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect() - } - /// Get the canonical Path object corresponding to an endpoint. pub(crate) fn canonical_path( &self, @@ -991,12 +991,12 @@ impl Node { /// Key used to look up paths in a hash map /// This supports copied keys for storing and refs for fast lookup without having to copy anything. -enum PathKey<'a, 'b, HostSystemImpl: HostSystem> { +enum PathKey<'a, 'b, HostSystemImpl: HostSystem + ?Sized> { Copied(Endpoint, HostSystemImpl::LocalSocket), Ref(&'a Endpoint, &'b HostSystemImpl::LocalSocket), } -impl<'a, 'b, HostSystemImpl: HostSystem> Hash for PathKey<'a, 'b, HostSystemImpl> { +impl<'a, 'b, HostSystemImpl: HostSystem + ?Sized> Hash for PathKey<'a, 'b, HostSystemImpl> { fn hash(&self, state: &mut H) { match self { Self::Copied(ep, ls) => { @@ -1011,7 +1011,7 @@ impl<'a, 'b, HostSystemImpl: HostSystem> Hash for PathKey<'a, 'b, HostSystemImpl } } -impl PartialEq for PathKey<'_, '_, HostSystemImpl> { +impl PartialEq for PathKey<'_, '_, HostSystemImpl> { fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2), @@ -1022,9 +1022,9 @@ impl PartialEq for PathKey<'_, '_, HostSystemImpl> { } } -impl Eq for PathKey<'_, '_, HostSystemImpl> {} +impl Eq for PathKey<'_, '_, HostSystemImpl> {} -impl<'a, 'b, HostSystemImpl: HostSystem> PathKey<'a, 'b, HostSystemImpl> { +impl<'a, 'b, HostSystemImpl: HostSystem + ?Sized> PathKey<'a, 'b, HostSystemImpl> { #[inline(always)] fn local_socket(&self) -> &HostSystemImpl::LocalSocket { match self { @@ -1048,7 +1048,7 @@ pub struct DummyInnerProtocol; impl InnerProtocol for DummyInnerProtocol { #[inline(always)] - fn handle_packet( + fn handle_packet( &self, _node: &Node, _source: &Arc>, @@ -1061,7 +1061,7 @@ impl InnerProtocol for DummyInnerProtocol { } #[inline(always)] - fn handle_error( + fn handle_error( &self, _node: &Node, _source: &Arc>, @@ -1077,7 +1077,7 @@ impl InnerProtocol for DummyInnerProtocol { } #[inline(always)] - fn handle_ok( + fn handle_ok( &self, _node: &Node, _source: &Arc>, @@ -1103,7 +1103,7 @@ pub struct DummyPathFilter; impl PathFilter for DummyPathFilter { #[inline(always)] - fn should_use_physical_path( + fn should_use_physical_path( &self, _id: &Identity, _endpoint: &Endpoint, @@ -1114,7 +1114,7 @@ impl PathFilter for DummyPathFilter { } #[inline(always)] - fn get_path_hints( + fn get_path_hints( &self, _id: &Identity, ) -> Option< diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index 02efeea62..37eb2b04c 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -24,7 +24,7 @@ pub(crate) enum PathServiceResult { /// These are maintained in Node and canonicalized so that all unique paths have /// one and only one unique path object. That enables statistics to be tracked /// for them and uniform application of things like keepalives. -pub struct Path { +pub struct Path { pub endpoint: Endpoint, pub local_socket: HostSystemImpl::LocalSocket, pub local_interface: HostSystemImpl::LocalInterface, @@ -34,7 +34,7 @@ pub struct Path { fragmented_packets: Mutex>, } -impl Path { +impl Path { #[inline] pub fn new( endpoint: Endpoint, diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index e46668ad1..e0402649c 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -22,7 +22,7 @@ use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000; -pub struct Peer { +pub struct Peer { pub identity: Identity, v1_proto_static_secret: v1::SymmetricSecret, @@ -39,7 +39,7 @@ pub struct Peer { remote_node_info: RwLock, } -struct PeerPath { +struct PeerPath { path: Weak>, last_receive_time_ticks: i64, } @@ -51,11 +51,11 @@ struct RemoteNodeInfo { } /// Sort a list of paths by quality or priority, with best paths first. -fn prioritize_paths(paths: &mut Vec>) { +fn prioritize_paths(paths: &mut Vec>) { paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); } -impl Peer { +impl Peer { /// Create a new peer. /// /// This only returns None if this_node_identity does not have its secrets or if some @@ -451,7 +451,7 @@ impl Peer { /// those fragments after the main packet header and first chunk. /// /// This returns true if the packet decrypted and passed authentication. - pub(crate) fn v1_proto_receive( + pub(crate) fn v1_proto_receive( self: &Arc, node: &Node, host_system: &HostSystemImpl, @@ -560,7 +560,7 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_hello( + fn handle_incoming_hello( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -620,7 +620,7 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_error( + fn handle_incoming_error( self: &Arc, _: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -654,7 +654,7 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_ok( + fn handle_incoming_ok( self: &Arc, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -752,7 +752,7 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_whois( + fn handle_incoming_whois( self: &Arc, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -812,7 +812,7 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_echo( + fn handle_incoming_echo( &self, host_system: &HostSystemImpl, inner: &InnerProtocolImpl, @@ -866,21 +866,21 @@ impl Peer { } } -impl Hash for Peer { +impl Hash for Peer { #[inline(always)] fn hash(&self, state: &mut H) { state.write_u64(self.identity.address.into()); } } -impl PartialEq for Peer { +impl PartialEq for Peer { #[inline(always)] fn eq(&self, other: &Self) -> bool { self.identity.fingerprint.eq(&other.identity.fingerprint) } } -impl Eq for Peer {} +impl Eq for Peer {} fn v1_proto_try_aead_decrypt( secret: &v1::SymmetricSecret, diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index cd9336357..b166f1a0f 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -11,7 +11,7 @@ pub trait SwitchInterface: Sync + Send {} pub struct Switch {} impl InnerProtocol for Switch { - fn handle_packet( + fn handle_packet( &self, node: &Node, source: &Arc>, @@ -23,7 +23,7 @@ impl InnerProtocol for Switch { PacketHandlerResult::NotHandled } - fn handle_error( + fn handle_error( &self, node: &Node, source: &Arc>, @@ -38,7 +38,7 @@ impl InnerProtocol for Switch { PacketHandlerResult::NotHandled } - fn handle_ok( + fn handle_ok( &self, node: &Node, source: &Arc>, diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index 530c6e4c8..3e924a05c 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -36,7 +36,7 @@ pub struct VL1Service< inner: Arc, path_filter: Arc, buffer_pool: Arc, - node_container: Option>, + node_container: Option>, // never None, set in new() } struct VL1ServiceMutableState { @@ -71,6 +71,7 @@ impl