diff --git a/controller/src/controller.rs b/controller/src/controller.rs index 26079a107..17fed8abb 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -371,13 +371,10 @@ impl Controller { // Make sure these agree. It should be impossible to end up with a member that's authorized and // whose identity and identity fingerprint don't match. - if !secure_eq(&member - .identity - .as_ref() - .unwrap() - .fingerprint, - member.identity_fingerprint.as_ref().unwrap().as_bytes()) - { + if !secure_eq( + &member.identity.as_ref().unwrap().fingerprint, + member.identity_fingerprint.as_ref().unwrap().as_bytes(), + ) { debug_assert!(false); return Ok((AuthenticationResult::RejectedDueToError, None)); } @@ -501,7 +498,7 @@ impl Controller { } impl InnerProtocol for Controller { - fn handle_packet( + fn handle_packet( &self, host_system: &HostSystemImpl, _: &Node, @@ -641,7 +638,7 @@ impl InnerProtocol for Controller { } } -impl VL1AuthProvider for Controller { +impl PeerFilter for Controller { #[inline(always)] fn should_respond_to(&self, _: &Verified) -> bool { // Controllers always have to establish sessions to process requests. We don't really know if diff --git a/controller/src/database.rs b/controller/src/database.rs index 74fde0118..54983bbc8 100644 --- a/controller/src/database.rs +++ b/controller/src/database.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use zerotier_crypto::secure_eq; -use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorage}; +use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorageProvider}; use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_utils::tokio::sync::broadcast::Receiver; @@ -22,7 +22,7 @@ pub enum Change { } #[async_trait] -pub trait Database: Sync + Send + NodeStorage + 'static { +pub trait Database: Sync + Send + NodeStorageProvider + 'static { async fn list_networks(&self) -> Result, Error>; async fn get_network(&self, id: NetworkId) -> Result, Error>; async fn save_network(&self, obj: Network, generate_change_notification: bool) -> Result<(), Error>; diff --git a/controller/src/filedatabase.rs b/controller/src/filedatabase.rs index 9625c2a8b..5706cddf4 100644 --- a/controller/src/filedatabase.rs +++ b/controller/src/filedatabase.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use notify::{RecursiveMode, Watcher}; use serde::de::DeserializeOwned; -use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage, Verified}; +use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorageProvider, Verified}; use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_utils::io::{fs_restrict_permissions, read_limit}; use zerotier_utils::reaper::Reaper; @@ -274,7 +274,7 @@ impl Drop for FileDatabase { } } -impl NodeStorage for FileDatabase { +impl NodeStorageProvider for FileDatabase { fn load_node_identity(&self) -> Option> { let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384); if id_data.is_err() { diff --git a/controller/src/postgresdatabase.rs b/controller/src/postgresdatabase.rs index 94f208b08..16a01064b 100644 --- a/controller/src/postgresdatabase.rs +++ b/controller/src/postgresdatabase.rs @@ -12,7 +12,7 @@ use tokio_postgres::{Client, Statement}; use zerotier_crypto::secure_eq; use zerotier_crypto::verified::Verified; -use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress, NodeStorage}; +use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress, NodeStorageProvider}; use zerotier_network_hypervisor::vl2::networkconfig::IpRoute; use zerotier_network_hypervisor::vl2::rule::Rule; use zerotier_network_hypervisor::vl2::NetworkId; @@ -187,7 +187,7 @@ impl PostgresDatabase { } } -impl NodeStorage for PostgresDatabase { +impl NodeStorageProvider for PostgresDatabase { fn load_node_identity(&self) -> Option> { Some(self.local_identity.clone()) } diff --git a/network-hypervisor/src/vl1/mod.rs b/network-hypervisor/src/vl1/mod.rs index 63cd1ece4..8bab15241 100644 --- a/network-hypervisor/src/vl1/mod.rs +++ b/network-hypervisor/src/vl1/mod.rs @@ -18,7 +18,7 @@ pub use event::Event; pub use identity::Identity; pub use inetaddress::InetAddress; pub use mac::MAC; -pub use node::{DummyInnerProtocol, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, VL1AuthProvider}; +pub use node::{ApplicationLayer, DummyInnerLayer, InnerLayer, Node, NodeStorageProvider, PacketHandlerResult, PeerFilter}; pub use path::Path; pub use peer::Peer; pub use rootset::{Root, RootSet}; diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index 3c96c889a..1cbf4227f 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -27,11 +27,11 @@ use zerotier_utils::marshalable::Marshalable; use zerotier_utils::ringbuffer::RingBuffer; use zerotier_utils::thing::Thing; -/// Trait providing VL1 authentication functions to determine which nodes we should talk to. +/// Trait providing functions to determine what peers we should talk to. /// -/// This is included in HostSystem but is provided as a separate trait to make it easy for -/// implementers of HostSystem to break this out and allow a user to specify it. -pub trait VL1AuthProvider: Sync + Send { +/// This is included in ApplicationLayer but is provided as a separate trait to make it easy for +/// implementers of ApplicationLayer to break this out and allow a user to specify it. +pub trait PeerFilter: Sync + Send { /// Check if this node should respond to messages from a given peer at all. /// /// If this returns false, the node simply drops messages on the floor and refuses @@ -48,9 +48,9 @@ pub trait VL1AuthProvider: Sync + Send { /// Trait to be implemented by outside code to provide object storage to VL1 /// -/// This is included in HostSystem but is provided as a separate trait to make it easy for -/// implementers of HostSystem to break this out and allow a user to specify it. -pub trait NodeStorage: Sync + Send { +/// This is included in ApplicationLayer but is provided as a separate trait to make it easy for +/// implementers of ApplicationLayer to break this out and allow a user to specify it. +pub trait NodeStorageProvider: Sync + Send { /// Load this node's identity from the data store. fn load_node_identity(&self) -> Option>; @@ -59,9 +59,9 @@ pub trait NodeStorage: Sync + Send { } /// Trait implemented by external code to handle events and provide an interface to the system or application. -pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static { +pub trait ApplicationLayer: PeerFilter + NodeStorageProvider + 'static { /// Type for implementation of NodeStorage. - type Storage: NodeStorage + ?Sized; + type Storage: NodeStorageProvider + ?Sized; /// Type for local system sockets. type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; @@ -110,12 +110,12 @@ pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static { /// /// The default implementation always returns true. #[allow(unused_variables)] - fn should_use_physical_path( + fn should_use_physical_path( &self, id: &Verified, endpoint: &Endpoint, - local_socket: Option<&HostSystemImpl::LocalSocket>, - local_interface: Option<&HostSystemImpl::LocalInterface>, + local_socket: Option<&Application::LocalSocket>, + local_interface: Option<&Application::LocalInterface>, ) -> bool { true } @@ -124,16 +124,10 @@ pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static { /// /// The default implementation always returns None. #[allow(unused_variables)] - fn get_path_hints( + fn get_path_hints( &self, id: &Verified, - ) -> Option< - Vec<( - Endpoint, - Option, - Option, - )>, - > { + ) -> Option, Option)>> { None } @@ -163,14 +157,14 @@ pub enum PacketHandlerResult { /// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but /// it could also be implemented for testing or "off label" use of VL1 to carry different protocols. #[allow(unused)] -pub trait InnerProtocol: Sync + Send { +pub trait InnerLayer: Sync + Send { /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). /// The default version returns NotHandled. - fn handle_packet( + fn handle_packet( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -185,9 +179,9 @@ pub trait InnerProtocol: Sync + Send { /// Handle errors, returning true if the error was recognized. /// The default version returns NotHandled. - fn handle_error( + fn handle_error( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -204,9 +198,9 @@ pub trait InnerProtocol: Sync + Send { /// Handle an OK, returning true if the OK was recognized. /// The default version returns NotHandled. - fn handle_ok( + fn handle_ok( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -261,7 +255,7 @@ struct WhoisQueueItem { } const PATH_MAP_SIZE: usize = std::mem::size_of::() + 128], Arc>>(); -type PathMap = HashMap, Arc>; +type PathMap = HashMap, Arc>; /// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network. pub struct Node { @@ -278,7 +272,7 @@ pub struct Node { intervals: Mutex, /// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use. - paths: RwLock>, // holds a PathMap<> but as a Thing<> to hide HostSystemImpl template parameter + paths: RwLock>, // holds a PathMap<> but as a Thing<> to hide ApplicationLayer template parameter /// Peers with which we are currently communicating. peers: RwLock>>, @@ -294,20 +288,20 @@ pub struct Node { } impl Node { - pub fn new( - host_system: &HostSystemImpl, + pub fn new( + app: &Application, auto_generate_identity: bool, auto_upgrade_identity: bool, ) -> Result { let mut id = { - let id = host_system.storage().load_node_identity(); + let id = app.storage().load_node_identity(); if id.is_none() { if !auto_generate_identity { return Err(InvalidParameterError("no identity found and auto-generate not enabled")); } else { let id = Identity::generate(); - host_system.event(Event::IdentityAutoGenerated(id.as_ref().clone())); - host_system.storage().save_node_identity(&id); + app.event(Event::IdentityAutoGenerated(id.as_ref().clone())); + app.storage().save_node_identity(&id); id } } else { @@ -318,18 +312,18 @@ impl Node { if auto_upgrade_identity { let old = id.clone(); if id.upgrade()? { - host_system.storage().save_node_identity(&id); - host_system.event(Event::IdentityAutoUpgraded(old.unwrap(), id.as_ref().clone())); + app.storage().save_node_identity(&id); + app.event(Event::IdentityAutoUpgraded(old.unwrap(), id.as_ref().clone())); } } - debug_event!(host_system, "[vl1] loaded identity {}", id.to_string()); + debug_event!(app, "[vl1] loaded identity {}", id.to_string()); Ok(Self { instance_id: random::get_bytes_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), - paths: RwLock::new(Thing::new(PathMap::::new())), + paths: RwLock::new(Thing::new(PathMap::::new())), peers: RwLock::new(HashMap::new()), roots: RwLock::new(RootInfo { sets: HashMap::new(), @@ -403,10 +397,10 @@ impl Node { self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect() } - pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { + pub fn do_background_tasks(&self, app: &Application) -> Duration { const INTERVAL_MS: i64 = 1000; const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64); - let time_ticks = host_system.time_ticks(); + let time_ticks = app.time_ticks(); let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_queue_retry) = { let mut intervals = self.intervals.lock().unwrap(); @@ -472,7 +466,7 @@ impl Node { false } } { - debug_event!(host_system, "[vl1] root sets modified, synchronizing internal data structures"); + debug_event!(app, "[vl1] root sets modified, synchronizing internal data structures"); let (mut old_root_identities, address_collisions, new_roots, bad_identities, my_root_sets) = { let roots = self.roots.read().unwrap(); @@ -513,7 +507,7 @@ impl Node { if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) && !m.identity.eq(&self.identity) { debug_event!( - host_system, + app, "[vl1] examining root {} with {} endpoints", m.identity.address.to_string(), m.endpoints.as_ref().map_or(0, |e| e.len()) @@ -546,13 +540,13 @@ impl Node { }; for c in address_collisions.iter() { - host_system.event(Event::SecurityWarning(format!( + app.event(Event::SecurityWarning(format!( "address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string() ))); } for i in bad_identities.iter() { - host_system.event(Event::SecurityWarning(format!( + app.event(Event::SecurityWarning(format!( "bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string() ))); @@ -566,7 +560,7 @@ impl Node { let mut roots = self.roots.write().unwrap(); roots.roots = new_roots; roots.this_root_sets = my_root_sets; - host_system.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); + app.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); } } @@ -592,7 +586,7 @@ impl Node { let mut best_root = self.best_root.write().unwrap(); if let Some(best_root) = best_root.as_mut() { debug_event!( - host_system, + app, "[vl1] selected new best root: {} (replaced {})", best.identity.address.to_string(), best_root.identity.address.to_string() @@ -600,7 +594,7 @@ impl Node { *best_root = best.clone(); } else { debug_event!( - host_system, + app, "[vl1] selected new best root: {} (was empty)", best.identity.address.to_string() ); @@ -610,7 +604,7 @@ impl Node { } else { if let Some(old_best) = self.best_root.write().unwrap().take() { debug_event!( - host_system, + app, "[vl1] selected new best root: NONE (replaced {})", old_best.identity.address.to_string() ); @@ -622,12 +616,12 @@ impl Node { if !roots.online { drop(roots); self.roots.write().unwrap().online = true; - host_system.event(Event::Online(true)); + app.event(Event::Online(true)); } } else if roots.online { drop(roots); self.roots.write().unwrap().online = false; - host_system.event(Event::Online(false)); + app.event(Event::Online(false)); } } } @@ -648,14 +642,14 @@ impl Node { for (root, endpoints) in roots.iter() { for ep in endpoints.iter() { debug_event!( - host_system, + app, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL ); let root = root.clone(); let ep = ep.clone(); - root.send_hello(host_system, self, Some(&ep)); + root.send_hello(app, self, Some(&ep)); } } } @@ -667,7 +661,7 @@ impl Node { { let roots = self.roots.read().unwrap(); for (a, peer) in self.peers.read().unwrap().iter() { - if !peer.service(host_system, self, time_ticks) && !roots.roots.contains_key(peer) { + if !peer.service(app, self, time_ticks) && !roots.roots.contains_key(peer) { dead_peers.push(*a); } } @@ -682,8 +676,8 @@ impl Node { let mut need_keepalive = Vec::new(); // First check all paths in read mode to avoid blocking the entire node. - for (k, path) in self.paths.read().unwrap().get::>().iter() { - if host_system.local_socket_is_valid(k.local_socket()) { + for (k, path) in self.paths.read().unwrap().get::>().iter() { + if app.local_socket_is_valid(k.local_socket()) { match path.service(time_ticks) { PathServiceResult::Ok => {} PathServiceResult::Dead => dead_paths.push(k.to_copied()), @@ -696,16 +690,16 @@ impl Node { // Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking. for dp in dead_paths.iter() { - self.paths.write().unwrap().get_mut::>().remove(dp); + self.paths.write().unwrap().get_mut::>().remove(dp); } // Finally run keepalive sends as a batch. let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance for p in need_keepalive.iter() { - host_system.wire_send( + app.wire_send( &p.endpoint, - Some(p.local_socket::()), - Some(p.local_interface::()), + Some(p.local_socket::()), + Some(p.local_interface::()), &keepalive_buf, 0, ); @@ -727,7 +721,7 @@ impl Node { need_whois }; if !need_whois.is_empty() { - self.send_whois(host_system, need_whois.as_slice(), time_ticks); + self.send_whois(app, need_whois.as_slice(), time_ticks); } } @@ -735,18 +729,18 @@ impl Node { INTERVAL } - pub fn handle_incoming_physical_packet( + pub fn handle_incoming_physical_packet( &self, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, source_endpoint: &Endpoint, - source_local_socket: &HostSystemImpl::LocalSocket, - source_local_interface: &HostSystemImpl::LocalInterface, + source_local_socket: &Application::LocalSocket, + source_local_interface: &Application::LocalInterface, time_ticks: i64, mut packet: PooledPacketBuffer, ) { debug_event!( - host_system, + app, "[vl1] {} -> #{} {}->{} length {} (on socket {}@{})", source_endpoint.to_string(), packet @@ -779,15 +773,14 @@ impl Node { if dest == self.identity.address { let fragment_header = &*fragment_header; // discard mut - let path = - self.canonical_path::(source_endpoint, source_local_socket, source_local_interface, time_ticks); + let path = self.canonical_path::(source_endpoint, source_local_socket, source_local_interface, time_ticks); path.log_receive_anything(time_ticks); if fragment_header.is_fragment() { #[cfg(debug_assertions)] let fragment_header_id = u64::from_be_bytes(fragment_header.id); debug_event!( - host_system, + app, "[vl1] [v1] #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), @@ -803,14 +796,14 @@ impl Node { ) { if let Some(frag0) = assembled_packet.frags[0].as_ref() { #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] [v1] #{:0>16x} packet fully assembled!", fragment_header_id); + debug_event!(app, "[vl1] [v1] #{:0>16x} packet fully assembled!", fragment_header_id); if let Ok(packet_header) = frag0.struct_at::(0) { if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { peer.v1_proto_receive( self, - host_system, + app, inner, time_ticks, &path, @@ -821,7 +814,7 @@ impl Node { } else { // If WHOIS is needed we need to go ahead and combine the packet so it can be cached // for later processing when a WHOIS reply comes back. - let mut combined_packet = host_system.get_buffer(); + let mut combined_packet = app.get_buffer(); let mut ok = combined_packet.append_bytes(frag0.as_bytes()).is_ok(); for i in 1..assembled_packet.have { if let Some(f) = assembled_packet.frags[i as usize].as_ref() { @@ -832,7 +825,7 @@ impl Node { } } if ok { - self.whois(host_system, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks); + self.whois(app, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks); } } } // else source address invalid @@ -840,17 +833,13 @@ impl Node { } // else reassembly failed (in a way that shouldn't be possible) } // else packet not fully assembled yet } else if let Ok(packet_header) = packet.struct_at::(0) { - debug_event!( - host_system, - "[vl1] [v1] #{:0>16x} is unfragmented", - u64::from_be_bytes(packet_header.id) - ); + debug_event!(app, "[vl1] [v1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id)); if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, packet.as_ref(), &[]); + peer.v1_proto_receive(self, app, inner, time_ticks, &path, packet_header, packet.as_ref(), &[]); } else { - self.whois(host_system, source, Some((Arc::downgrade(&path), packet)), time_ticks); + self.whois(app, source, Some((Arc::downgrade(&path), packet)), time_ticks); } } } // else not fragment and header incomplete @@ -866,7 +855,7 @@ impl Node { { debug_packet_id = u64::from_be_bytes(fragment_header.id); debug_event!( - host_system, + app, "[vl1] [v1] #{:0>16x} forwarding packet fragment to {}", debug_packet_id, dest.to_string() @@ -874,7 +863,7 @@ impl Node { } if fragment_header.increment_hops() > v1::FORWARD_MAX_HOPS { #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); + debug_event!(app, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); return; } } else if let Ok(packet_header) = packet.struct_mut_at::(0) { @@ -882,7 +871,7 @@ impl Node { { debug_packet_id = u64::from_be_bytes(packet_header.id); debug_event!( - host_system, + app, "[vl1] [v1] #{:0>16x} forwarding packet to {}", debug_packet_id, dest.to_string() @@ -891,7 +880,7 @@ impl Node { if packet_header.increment_hops() > v1::FORWARD_MAX_HOPS { #[cfg(debug_assertions)] debug_event!( - host_system, + app, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(packet_header.id) ); @@ -903,10 +892,10 @@ impl Node { if let Some(peer) = self.peer(dest) { if let Some(forward_path) = peer.direct_path() { - host_system.wire_send( + app.wire_send( &forward_path.endpoint, - Some(forward_path.local_socket::()), - Some(forward_path.local_interface::()), + Some(forward_path.local_socket::()), + Some(forward_path.local_interface::()), packet.as_bytes(), 0, ); @@ -914,7 +903,7 @@ impl Node { peer.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id); + debug_event!(app, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id); } } } // else not for this node and shouldn't be forwarded @@ -923,9 +912,9 @@ impl Node { } /// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply. - fn whois( + fn whois( &self, - host_system: &HostSystemImpl, + app: &Application, address: Address, waiting_packet: Option<(Weak, PooledPacketBuffer)>, time_ticks: i64, @@ -947,13 +936,13 @@ impl Node { qi.retry_count += 1; } } - self.send_whois(host_system, &[address], time_ticks); + self.send_whois(app, &[address], time_ticks); } /// Send a WHOIS query to the current best root. - fn send_whois(&self, host_system: &HostSystemImpl, mut addresses: &[Address], time_ticks: i64) { + fn send_whois(&self, app: &Application, mut addresses: &[Address], time_ticks: i64) { debug_assert!(!addresses.is_empty()); - debug_event!(host_system, "[vl1] [v1] sending WHOIS for {}", { + debug_event!(app, "[vl1] [v1] sending WHOIS for {}", { let mut tmp = String::new(); for a in addresses.iter() { if !tmp.is_empty() { @@ -966,7 +955,7 @@ impl Node { if let Some(root) = self.best_root() { while !addresses.is_empty() { if !root - .send(host_system, self, None, time_ticks, |packet| -> Result<(), Infallible> { + .send(app, self, None, time_ticks, |packet| -> Result<(), Infallible> { assert!(packet.append_u8(message_type::VL1_WHOIS).is_ok()); while !addresses.is_empty() && (packet.len() + ADDRESS_SIZE) <= UDP_DEFAULT_MTU { assert!(packet.append_bytes_fixed(&addresses[0].to_bytes()).is_ok()); @@ -983,10 +972,10 @@ impl Node { } /// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS). - pub(crate) fn handle_incoming_identity( + pub(crate) fn handle_incoming_identity( &self, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, received_identity: Identity, time_ticks: i64, authoritative: bool, @@ -996,7 +985,7 @@ impl Node { let mut whois_queue = self.whois_queue.lock().unwrap(); if let Some(qi) = whois_queue.get_mut(&received_identity.address) { let address = received_identity.address; - if host_system.should_respond_to(&received_identity) { + if app.should_respond_to(&received_identity) { let mut peers = self.peers.write().unwrap(); if let Some(peer) = peers.get(&address).cloned().or_else(|| { Peer::new(&self.identity, received_identity, time_ticks) @@ -1007,7 +996,7 @@ impl Node { for p in qi.v1_proto_waiting_packets.iter() { if let Some(path) = p.0.upgrade() { if let Ok(packet_header) = p.1.struct_at::(0) { - peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]); + peer.v1_proto_receive(self, app, inner, time_ticks, &path, packet_header, &p.1, &[]); } } } @@ -1034,25 +1023,25 @@ impl Node { } /// Get the canonical Path object corresponding to an endpoint. - pub(crate) fn canonical_path( + pub(crate) fn canonical_path( &self, ep: &Endpoint, - local_socket: &HostSystemImpl::LocalSocket, - local_interface: &HostSystemImpl::LocalInterface, + local_socket: &Application::LocalSocket, + local_interface: &Application::LocalInterface, time_ticks: i64, ) -> Arc { let paths = self.paths.read().unwrap(); - if let Some(path) = paths.get::>().get(&PathKey::Ref(ep, local_socket)) { + if let Some(path) = paths.get::>().get(&PathKey::Ref(ep, local_socket)) { path.clone() } else { drop(paths); self.paths .write() .unwrap() - .get_mut::>() + .get_mut::>() .entry(PathKey::Copied(ep.clone(), local_socket.clone())) .or_insert_with(|| { - Arc::new(Path::new::( + Arc::new(Path::new::( ep.clone(), local_socket.clone(), local_interface.clone(), @@ -1066,12 +1055,12 @@ impl Node { /// Key used to look up paths in a hash map /// This supports copied keys for storing and refs for fast lookup without having to copy anything. -enum PathKey<'a, 'b, HostSystemImpl: HostSystem + ?Sized> { - Copied(Endpoint, HostSystemImpl::LocalSocket), - Ref(&'a Endpoint, &'b HostSystemImpl::LocalSocket), +enum PathKey<'a, 'b, Application: ApplicationLayer + ?Sized> { + Copied(Endpoint, Application::LocalSocket), + Ref(&'a Endpoint, &'b Application::LocalSocket), } -impl Hash for PathKey<'_, '_, HostSystemImpl> { +impl Hash for PathKey<'_, '_, Application> { fn hash(&self, state: &mut H) { match self { Self::Copied(ep, ls) => { @@ -1086,7 +1075,7 @@ impl Hash for PathKey<'_, '_, HostSystemImp } } -impl PartialEq for PathKey<'_, '_, HostSystemImpl> { +impl PartialEq for PathKey<'_, '_, Application> { fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2), @@ -1097,11 +1086,11 @@ impl PartialEq for PathKey<'_, '_, HostSyst } } -impl Eq for PathKey<'_, '_, HostSystemImpl> {} +impl Eq for PathKey<'_, '_, Application> {} -impl PathKey<'_, '_, HostSystemImpl> { +impl PathKey<'_, '_, Application> { #[inline(always)] - fn local_socket(&self) -> &HostSystemImpl::LocalSocket { + fn local_socket(&self) -> &Application::LocalSocket { match self { Self::Copied(_, ls) => ls, Self::Ref(_, ls) => *ls, @@ -1109,21 +1098,21 @@ impl PathKey<'_, '_, HostSystemImpl> { } #[inline(always)] - fn to_copied(&self) -> PathKey<'static, 'static, HostSystemImpl> { + fn to_copied(&self) -> PathKey<'static, 'static, Application> { match self { - Self::Copied(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()), - Self::Ref(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()), + Self::Copied(ep, ls) => PathKey::<'static, 'static, Application>::Copied(ep.clone(), ls.clone()), + Self::Ref(ep, ls) => PathKey::<'static, 'static, Application>::Copied((*ep).clone(), (*ls).clone()), } } } /// Dummy no-op inner protocol for debugging and testing. #[derive(Default)] -pub struct DummyInnerProtocol; +pub struct DummyInnerLayer; -impl InnerProtocol for DummyInnerProtocol {} +impl InnerLayer for DummyInnerLayer {} -impl VL1AuthProvider for DummyInnerProtocol { +impl PeerFilter for DummyInnerLayer { #[inline(always)] fn should_respond_to(&self, _: &Verified) -> bool { true diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index 11068a6eb..5a59364ec 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -37,10 +37,10 @@ pub struct Path { } impl Path { - pub(crate) fn new( + pub(crate) fn new( endpoint: Endpoint, - local_socket: HostSystemImpl::LocalSocket, - local_interface: HostSystemImpl::LocalInterface, + local_socket: Application::LocalSocket, + local_interface: Application::LocalInterface, time_ticks: i64, ) -> Self { Self { @@ -55,12 +55,12 @@ impl Path { } #[inline(always)] - pub(crate) fn local_socket(&self) -> &HostSystemImpl::LocalSocket { + pub(crate) fn local_socket(&self) -> &Application::LocalSocket { self.local_socket.get() } #[inline(always)] - pub(crate) fn local_interface(&self) -> &HostSystemImpl::LocalInterface { + pub(crate) fn local_interface(&self) -> &Application::LocalInterface { self.local_interface.get() } diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index d74ac3851..697af63d7 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -53,7 +53,7 @@ struct RemoteNodeInfo { } /// Sort a list of paths by quality or priority, with best paths first. -fn prioritize_paths(paths: &mut Vec) { +fn prioritize_paths(paths: &mut Vec) { paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); } @@ -136,7 +136,7 @@ impl Peer { return None; } - fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc, time_ticks: i64) { + fn learn_path(&self, app: &Application, new_path: &Arc, time_ticks: i64) { let mut paths = self.paths.lock().unwrap(); // TODO: check path filter @@ -155,7 +155,7 @@ impl Peer { Endpoint::IpUdp(existing_ip) => { if existing_ip.ip_bytes().eq(new_ip.ip_bytes()) { debug_event!( - host_system, + app, "[vl1] {} replacing path {} with {} (same IP, different port)", self.identity.address.to_string(), p.endpoint.to_string(), @@ -163,7 +163,7 @@ impl Peer { ); pi.path = Arc::downgrade(new_path); pi.last_receive_time_ticks = time_ticks; - prioritize_paths::(&mut paths); + prioritize_paths::(&mut paths); return; } } @@ -183,7 +183,7 @@ impl Peer { // Learn new path if it's not a duplicate or should not replace an existing path. debug_event!( - host_system, + app, "[vl1] {} learned new path: {}", self.identity.address.to_string(), new_path.endpoint.to_string() @@ -192,7 +192,7 @@ impl Peer { path: Arc::downgrade(new_path), last_receive_time_ticks: time_ticks, }); - prioritize_paths::(&mut paths); + prioritize_paths::(&mut paths); } /// Get the next sequential message ID for use with the V1 transport protocol. @@ -202,7 +202,7 @@ impl Peer { } /// Called every SERVICE_INTERVAL_MS by the background service loop in Node. - pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool { + pub(crate) fn service(&self, _: &Application, _: &Node, time_ticks: i64) -> bool { // Prune dead paths and sort in descending order of quality. { let mut paths = self.paths.lock().unwrap(); @@ -210,7 +210,7 @@ impl Peer { if paths.capacity() > 16 { paths.shrink_to_fit(); } - prioritize_paths::(&mut paths); + prioritize_paths::(&mut paths); } // Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs). @@ -223,19 +223,19 @@ impl Peer { } /// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed. - fn v1_proto_internal_send( + fn v1_proto_internal_send( &self, - host_system: &HostSystemImpl, + app: &Application, endpoint: &Endpoint, - local_socket: Option<&HostSystemImpl::LocalSocket>, - local_interface: Option<&HostSystemImpl::LocalInterface>, + local_socket: Option<&Application::LocalSocket>, + local_interface: Option<&Application::LocalInterface>, max_fragment_size: usize, packet: PooledPacketBuffer, ) { let packet_size = packet.len(); if packet_size > max_fragment_size { let bytes = packet.as_bytes(); - host_system.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0); + app.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0); let mut pos = UDP_DEFAULT_MTU; let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32; @@ -259,7 +259,7 @@ impl Peer { let fragment_size = v1::FRAGMENT_HEADER_SIZE + chunk_size; tmp_buf[..v1::FRAGMENT_HEADER_SIZE].copy_from_slice(header.as_bytes()); tmp_buf[v1::FRAGMENT_HEADER_SIZE..fragment_size].copy_from_slice(&bytes[pos..next_pos]); - host_system.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0); + app.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0); pos = next_pos; if pos < packet_size { chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE); @@ -268,7 +268,7 @@ impl Peer { } } } else { - host_system.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0); + app.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0); } } @@ -281,9 +281,9 @@ impl Peer { /// The builder function must append the verb (with any verb flags) and packet payload. If it returns /// an error, the error is returned immediately and the send is aborted. None is returned if the send /// function itself fails for some reason such as no paths being available. - pub fn send Result>( + pub fn send Result>( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, path: Option<&Arc>, time_ticks: i64, @@ -303,7 +303,7 @@ impl Peer { let max_fragment_size = path.endpoint.max_fragment_size(); - let mut packet = host_system.get_buffer(); + let mut packet = app.get_buffer(); if !self.is_v2() { // For the V1 protocol, leave room for for the header in the buffer. packet.set_size(v1::HEADER_SIZE); @@ -371,10 +371,10 @@ impl Peer { } self.v1_proto_internal_send( - host_system, + app, &path.endpoint, - Some(path.local_socket::()), - Some(path.local_interface::()), + Some(path.local_socket::()), + Some(path.local_interface::()), max_fragment_size, packet, ); @@ -393,9 +393,9 @@ impl Peer { /// Unlike other messages HELLO is sent partially in the clear and always with the long-lived /// static identity key. Authentication in old versions is via Poly1305 and in new versions /// via HMAC-SHA512. - pub(crate) fn send_hello( + pub(crate) fn send_hello( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, explicit_endpoint: Option<&Endpoint>, ) -> bool { @@ -412,9 +412,9 @@ impl Peer { }; let max_fragment_size = destination.max_fragment_size(); - let time_ticks = host_system.time_ticks(); + let time_ticks = app.time_ticks(); - let mut packet = host_system.get_buffer(); + let mut packet = app.get_buffer(); { let message_id = self.v1_proto_next_message_id(); @@ -447,7 +447,7 @@ impl Peer { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); debug_event!( - host_system, + app, "HELLO -> {} @ {} ({} bytes)", self.identity.address.to_string(), destination.to_string(), @@ -457,16 +457,16 @@ impl Peer { if let Some(p) = path.as_ref() { self.v1_proto_internal_send( - host_system, + app, destination, - Some(p.local_socket::()), - Some(p.local_interface::()), + Some(p.local_socket::()), + Some(p.local_interface::()), max_fragment_size, packet, ); p.log_send_anything(time_ticks); } else { - self.v1_proto_internal_send(host_system, destination, None, None, max_fragment_size, packet); + self.v1_proto_internal_send(app, destination, None, None, max_fragment_size, packet); } return true; @@ -478,11 +478,11 @@ impl Peer { /// those fragments after the main packet header and first chunk. /// /// This returns true if the packet decrypted and passed authentication. - pub(crate) fn v1_proto_receive( + pub(crate) fn v1_proto_receive( self: &Arc, node: &Node, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, time_ticks: i64, source_path: &Arc, packet_header: &v1::PacketHeader, @@ -503,11 +503,7 @@ impl Peer { message_id2 } else { // Packet failed to decrypt using either ephemeral or permanent key, reject. - debug_event!( - host_system, - "[vl1] #{:0>16x} failed authentication", - u64::from_be_bytes(packet_header.id) - ); + debug_event!(app, "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id)); return PacketHandlerResult::Error; }; @@ -539,7 +535,7 @@ impl Peer { verb &= v1::VERB_MASK; // mask off flags debug_event!( - host_system, + app, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), message_type::name(verb), @@ -548,11 +544,9 @@ impl Peer { return match verb { message_type::VL1_NOP => PacketHandlerResult::Ok, - message_type::VL1_HELLO => { - self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload) - } + message_type::VL1_HELLO => self.handle_incoming_hello(app, inner, node, time_ticks, message_id, source_path, &payload), message_type::VL1_ERROR => self.handle_incoming_error( - host_system, + app, inner, node, time_ticks, @@ -562,7 +556,7 @@ impl Peer { &payload, ), message_type::VL1_OK => self.handle_incoming_ok( - host_system, + app, inner, node, time_ticks, @@ -572,28 +566,16 @@ impl Peer { path_is_known, &payload, ), - message_type::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload), + message_type::VL1_WHOIS => self.handle_incoming_whois(app, inner, node, time_ticks, message_id, &payload), message_type::VL1_RENDEZVOUS => { - self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload) + self.handle_incoming_rendezvous(app, node, time_ticks, message_id, source_path, &payload) } - message_type::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload), + message_type::VL1_ECHO => self.handle_incoming_echo(app, inner, node, time_ticks, message_id, &payload), message_type::VL1_PUSH_DIRECT_PATHS => { - self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload) + self.handle_incoming_push_direct_paths(app, node, time_ticks, source_path, &payload) } - message_type::VL1_USER_MESSAGE => { - self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload) - } - _ => inner.handle_packet( - host_system, - node, - self, - &source_path, - packet_header.hops(), - message_id, - verb, - &payload, - 1, - ), + message_type::VL1_USER_MESSAGE => self.handle_incoming_user_message(app, node, time_ticks, source_path, &payload), + _ => inner.handle_packet(app, node, self, &source_path, packet_header.hops(), message_id, verb, &payload, 1), }; } } @@ -601,19 +583,19 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_hello( + fn handle_incoming_hello( &self, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, time_ticks: i64, message_id: MessageId, source_path: &Arc, payload: &PacketBuffer, ) -> PacketHandlerResult { - if !(host_system.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { + if !(app.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { debug_event!( - host_system, + app, "[vl1] dropping HELLO from {} due to lack of trust relationship", self.identity.address.to_string() ); @@ -634,25 +616,19 @@ impl Peer { ); } - self.send( - host_system, - node, - Some(source_path), - time_ticks, - |packet| -> Result<(), Infallible> { - let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) = - packet.append_struct_get_mut().unwrap(); - f.0.verb = message_type::VL1_OK; - f.0.in_re_verb = message_type::VL1_HELLO; - f.0.in_re_message_id = message_id.to_ne_bytes(); - f.1.timestamp_echo = hello_fixed_headers.timestamp; - f.1.version_proto = PROTOCOL_VERSION; - f.1.version_major = VERSION_MAJOR; - f.1.version_minor = VERSION_MINOR; - f.1.version_revision = VERSION_REVISION.to_be_bytes(); - Ok(()) - }, - ); + self.send(app, node, Some(source_path), time_ticks, |packet| -> Result<(), Infallible> { + let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) = + packet.append_struct_get_mut().unwrap(); + f.0.verb = message_type::VL1_OK; + f.0.in_re_verb = message_type::VL1_HELLO; + f.0.in_re_message_id = message_id.to_ne_bytes(); + f.1.timestamp_echo = hello_fixed_headers.timestamp; + f.1.version_proto = PROTOCOL_VERSION; + f.1.version_major = VERSION_MAJOR; + f.1.version_minor = VERSION_MINOR; + f.1.version_revision = VERSION_REVISION.to_be_bytes(); + Ok(()) + }); return PacketHandlerResult::Ok; } @@ -662,10 +638,10 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_error( + fn handle_incoming_error( self: &Arc, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, _time_ticks: i64, source_path: &Arc, @@ -682,7 +658,7 @@ impl Peer { match error_header.in_re_verb { _ => { return inner.handle_error( - host_system, + app, node, self, &source_path, @@ -700,10 +676,10 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_ok( + fn handle_incoming_ok( self: &Arc, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, time_ticks: i64, source_path: &Arc, @@ -724,7 +700,7 @@ impl Peer { payload.read_struct::(&mut cursor) { if source_hops == 0 { - debug_event!(host_system, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),); + debug_event!(app, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),); if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) { #[cfg(debug_assertions)] let reported_endpoint2 = reported_endpoint.clone(); @@ -738,7 +714,7 @@ impl Peer { { #[cfg(debug_assertions)] debug_event!( - host_system, + app, "[vl1] {} reported new remote perspective, local endpoint: {}", self.identity.address.to_string(), reported_endpoint2.to_string() @@ -748,7 +724,7 @@ impl Peer { } if source_hops == 0 && !path_is_known { - self.learn_path(host_system, source_path, time_ticks); + self.learn_path(app, source_path, time_ticks); } self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed); @@ -756,21 +732,21 @@ impl Peer { } message_type::VL1_WHOIS => { - debug_event!(host_system, "[vl1] OK(WHOIS)"); + debug_event!(app, "[vl1] OK(WHOIS)"); if node.is_peer_root(self) { while cursor < payload.len() { let r = Identity::unmarshal(payload, &mut cursor); if let Ok(received_identity) = r { debug_event!( - host_system, + app, "[vl1] {} OK(WHOIS): received identity: {}", self.identity.address.to_string(), received_identity.to_string() ); - node.handle_incoming_identity(host_system, inner, received_identity, time_ticks, true); + node.handle_incoming_identity(app, inner, received_identity, time_ticks, true); } else { debug_event!( - host_system, + app, "[vl1] {} OK(WHOIS): received bad identity: {}", self.identity.address.to_string(), r.err().unwrap().to_string() @@ -785,7 +761,7 @@ impl Peer { _ => { return inner.handle_ok( - host_system, + app, node, self, &source_path, @@ -802,20 +778,20 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_whois( + fn handle_incoming_whois( self: &Arc, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, ) -> PacketHandlerResult { - if node.this_node_is_root() || host_system.should_respond_to(&self.identity) { + if node.this_node_is_root() || app.should_respond_to(&self.identity) { let mut addresses = payload.as_bytes(); while addresses.len() >= ADDRESS_SIZE { if !self - .send(host_system, node, None, time_ticks, |packet| { + .send(app, node, None, time_ticks, |packet| { while addresses.len() >= ADDRESS_SIZE && (packet.len() + Identity::MAX_MARSHAL_SIZE) <= UDP_DEFAULT_MTU { if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) { if let Some(peer) = node.peer(zt_address) { @@ -835,9 +811,9 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_rendezvous( + fn handle_incoming_rendezvous( self: &Arc, - host_system: &HostSystemImpl, + app: &Application, node: &Node, time_ticks: i64, message_id: MessageId, @@ -848,17 +824,17 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_echo( + fn handle_incoming_echo( &self, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, ) -> PacketHandlerResult { - if host_system.should_respond_to(&self.identity) || node.is_peer_root(self) { - self.send(host_system, node, None, time_ticks, |packet| { + if app.should_respond_to(&self.identity) || node.is_peer_root(self) { + self.send(app, node, None, time_ticks, |packet| { let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap(); f.verb = message_type::VL1_OK; f.in_re_verb = message_type::VL1_ECHO; @@ -867,7 +843,7 @@ impl Peer { }); } else { debug_event!( - host_system, + app, "[vl1] dropping ECHO from {} due to lack of trust relationship", self.identity.address.to_string() ); @@ -875,9 +851,9 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_push_direct_paths( + fn handle_incoming_push_direct_paths( self: &Arc, - host_system: &HostSystemImpl, + app: &Application, node: &Node, time_ticks: i64, source_path: &Arc, @@ -886,9 +862,9 @@ impl Peer { PacketHandlerResult::Ok } - fn handle_incoming_user_message( + fn handle_incoming_user_message( self: &Arc, - host_system: &HostSystemImpl, + app: &Application, node: &Node, time_ticks: i64, source_path: &Arc, diff --git a/network-hypervisor/src/vl2/multicastauthority.rs b/network-hypervisor/src/vl2/multicastauthority.rs index 8100d9ced..c253ca4cc 100644 --- a/network-hypervisor/src/vl2/multicastauthority.rs +++ b/network-hypervisor/src/vl2/multicastauthority.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex, RwLock}; use crate::protocol; use crate::protocol::PacketBuffer; -use crate::vl1::{Address, HostSystem, Identity, Node, PacketHandlerResult, Peer, MAC}; +use crate::vl1::{Address, ApplicationLayer, Identity, Node, PacketHandlerResult, Peer, MAC}; use crate::vl2::{MulticastGroup, NetworkId}; use zerotier_utils::buffer::OutOfBoundsError; @@ -84,7 +84,7 @@ impl MulticastAuthority { } /// Call for VL2_MULTICAST_GATHER packets. - pub fn handle_vl2_multicast_gather bool>( + pub fn handle_vl2_multicast_gather bool>( &self, auth: Authenticator, time_ticks: i64, diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index bfc98101d..8dde12e2b 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -3,17 +3,17 @@ use std::sync::Arc; use crate::protocol::PacketBuffer; -use crate::vl1::{HostSystem, InnerProtocol, Node, PacketHandlerResult, Path, Peer}; +use crate::vl1::{ApplicationLayer, InnerLayer, Node, PacketHandlerResult, Path, Peer}; pub trait SwitchInterface: Sync + Send {} pub struct Switch {} #[allow(unused_variables)] -impl InnerProtocol for Switch { - fn handle_packet( +impl InnerLayer for Switch { + fn handle_packet( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -26,9 +26,9 @@ impl InnerProtocol for Switch { PacketHandlerResult::NotHandled } - fn handle_error( + fn handle_error( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -43,9 +43,9 @@ impl InnerProtocol for Switch { PacketHandlerResult::NotHandled } - fn handle_ok( + fn handle_ok( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, diff --git a/service/src/main.rs b/service/src/main.rs index b9221c9d9..8cdb89c26 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -209,7 +209,7 @@ fn main() { Some(("service", _)) => { drop(global_args); // free unnecessary heap before starting service as we're done with CLI args if let Ok(_tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() { - let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerProtocol::default()); + let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerLayer::default()); let datadir = open_datadir(&flags); let svc = VL1Service::new( datadir, diff --git a/vl1-service/src/datadir.rs b/vl1-service/src/datadir.rs index 27fc0cc33..de7853266 100644 --- a/vl1-service/src/datadir.rs +++ b/vl1-service/src/datadir.rs @@ -8,7 +8,7 @@ use serde::de::DeserializeOwned; use serde::Serialize; use zerotier_crypto::random::next_u32_secure; -use zerotier_network_hypervisor::vl1::{Identity, NodeStorage, Verified}; +use zerotier_network_hypervisor::vl1::{Identity, NodeStorageProvider, Verified}; use zerotier_utils::io::{fs_restrict_permissions, read_limit, DEFAULT_FILE_IO_READ_LIMIT}; use zerotier_utils::json::to_json_pretty; @@ -26,7 +26,9 @@ pub struct DataDir, } -impl NodeStorage for DataDir { +impl NodeStorageProvider + for DataDir +{ fn load_node_identity(&self) -> Option> { let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096); if id_data.is_err() { diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index cca43e675..548a47cac 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -27,14 +27,14 @@ const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10; /// whatever inner protocol implementation is using it. This would typically be VL2 but could be /// a test harness or just the controller for a controller that runs stand-alone. pub struct VL1Service< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, + NodeStorageImpl: NodeStorageProvider + ?Sized + 'static, + PeerToPeerAuthentication: PeerFilter + ?Sized + 'static, + Inner: InnerLayer + ?Sized + 'static, > { state: RwLock, storage: Arc, - vl1_auth_provider: Arc, - inner: Arc, + vl1_auth_provider: Arc, + inner: Arc, buffer_pool: Arc, node_container: Option, // never None, set in new() } @@ -47,15 +47,15 @@ struct VL1ServiceMutableState { } impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > VL1Service + NodeStorageImpl: NodeStorageProvider + ?Sized + 'static, + PeerToPeerAuthentication: PeerFilter + ?Sized + 'static, + Inner: InnerLayer + ?Sized + 'static, + > VL1Service { pub fn new( storage: Arc, - vl1_auth_provider: Arc, - inner: Arc, + vl1_auth_provider: Arc, + inner: Arc, settings: VL1Settings, ) -> Result, Box> { let mut service = Self { @@ -190,10 +190,10 @@ impl< } impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > UdpPacketHandler for VL1Service + NodeStorageImpl: NodeStorageProvider + ?Sized + 'static, + PeerToPeerAuthentication: PeerFilter + ?Sized + 'static, + Inner: InnerLayer + ?Sized + 'static, + > UdpPacketHandler for VL1Service { #[inline(always)] fn incoming_udp_packet( @@ -216,10 +216,10 @@ impl< } impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > HostSystem for VL1Service + NodeStorageImpl: NodeStorageProvider + ?Sized + 'static, + PeerToPeerAuthentication: PeerFilter + ?Sized + 'static, + Inner: InnerLayer + ?Sized + 'static, + > ApplicationLayer for VL1Service { type Storage = NodeStorageImpl; type LocalSocket = crate::LocalSocket; @@ -322,10 +322,10 @@ impl< } impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > NodeStorage for VL1Service + NodeStorageImpl: NodeStorageProvider + ?Sized + 'static, + PeerToPeerAuthentication: PeerFilter + ?Sized + 'static, + Inner: InnerLayer + ?Sized + 'static, + > NodeStorageProvider for VL1Service { #[inline(always)] fn load_node_identity(&self) -> Option> { @@ -339,10 +339,10 @@ impl< } impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > VL1AuthProvider for VL1Service + NodeStorageImpl: NodeStorageProvider + ?Sized + 'static, + PeerToPeerAuthentication: PeerFilter + ?Sized + 'static, + Inner: InnerLayer + ?Sized + 'static, + > PeerFilter for VL1Service { #[inline(always)] fn should_respond_to(&self, id: &Verified) -> bool { @@ -356,10 +356,10 @@ impl< } impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > Drop for VL1Service + NodeStorageImpl: NodeStorageProvider + ?Sized + 'static, + PeerToPeerAuthentication: PeerFilter + ?Sized + 'static, + Inner: InnerLayer + ?Sized + 'static, + > Drop for VL1Service { fn drop(&mut self) { let mut state = self.state.write().unwrap();