diff --git a/controller/src/controller.rs b/controller/src/controller.rs index 26079a107..32ef810d0 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -31,11 +31,11 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// ZeroTier VL2 network controller packet handler, answers VL2 netconf queries. pub struct Controller { self_ref: Weak, - service: RwLock>>, + service: RwLock>>, reaper: Reaper, runtime: tokio::runtime::Handle, database: Arc, - local_identity: Verified, + local_identity: Valid, /// Handler for MULTICAST_LIKE and MULTICAST_GATHER messages. multicast_authority: MulticastAuthority, @@ -78,7 +78,7 @@ impl Controller { /// This must be called once the service that uses this handler is up or the controller /// won't actually do anything. The controller holds a weak reference to VL1Service so /// be sure it's not dropped. - pub async fn start(&self, service: &Arc>) { + pub async fn start(&self, service: &Arc>) { *self.service.write().unwrap() = Arc::downgrade(service); // Create database change listener. @@ -256,7 +256,7 @@ impl Controller { /// reason is returned with None or an acceptance reason with a network configuration is returned. async fn authorize( self: &Arc, - source_identity: &Verified, + source_identity: &Valid, network_id: NetworkId, time_clock: i64, ) -> Result<(AuthenticationResult, Option), Box> { @@ -371,13 +371,10 @@ impl Controller { // Make sure these agree. It should be impossible to end up with a member that's authorized and // whose identity and identity fingerprint don't match. - if !secure_eq(&member - .identity - .as_ref() - .unwrap() - .fingerprint, - member.identity_fingerprint.as_ref().unwrap().as_bytes()) - { + if !secure_eq( + &member.identity.as_ref().unwrap().fingerprint, + member.identity_fingerprint.as_ref().unwrap().as_bytes(), + ) { debug_assert!(false); return Ok((AuthenticationResult::RejectedDueToError, None)); } @@ -500,8 +497,24 @@ impl Controller { } } -impl InnerProtocol for Controller { - fn handle_packet( +impl InnerProtocolLayer for Controller { + #[inline(always)] + fn should_respond_to(&self, _: &Valid) -> bool { + // Controllers always have to establish sessions to process requests. We don't really know if + // a member is relevant until we have looked up both the network and the member, since whether + // or not to "learn" unknown members is a network level option. + true + } + + fn has_trust_relationship(&self, id: &Valid) -> bool { + self.recently_authorized + .read() + .unwrap() + .get(&id.fingerprint) + .map_or(false, |by_network| by_network.values().any(|t| *t > ms_monotonic())) + } + + fn handle_packet( &self, host_system: &HostSystemImpl, _: &Node, @@ -515,7 +528,7 @@ impl InnerProtocol for Controller { ) -> PacketHandlerResult { match verb { protocol::message_type::VL2_NETWORK_CONFIG_REQUEST => { - if !host_system.should_respond_to(&source.identity) { + if !self.should_respond_to(&source.identity) { return PacketHandlerResult::Ok; // handled and ignored } @@ -641,24 +654,6 @@ impl InnerProtocol for Controller { } } -impl VL1AuthProvider for Controller { - #[inline(always)] - fn should_respond_to(&self, _: &Verified) -> bool { - // Controllers always have to establish sessions to process requests. We don't really know if - // a member is relevant until we have looked up both the network and the member, since whether - // or not to "learn" unknown members is a network level option. - true - } - - fn has_trust_relationship(&self, id: &Verified) -> bool { - self.recently_authorized - .read() - .unwrap() - .get(&id.fingerprint) - .map_or(false, |by_network| by_network.values().any(|t| *t > ms_monotonic())) - } -} - impl Drop for Controller { fn drop(&mut self) { for h in self.daemons.lock().unwrap().drain(..) { diff --git a/controller/src/database.rs b/controller/src/database.rs index 74fde0118..44c873c62 100644 --- a/controller/src/database.rs +++ b/controller/src/database.rs @@ -1,10 +1,10 @@ use async_trait::async_trait; use zerotier_crypto::secure_eq; -use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorage}; +use zerotier_network_hypervisor::vl1::{Address, InetAddress}; use zerotier_network_hypervisor::vl2::NetworkId; - use zerotier_utils::tokio::sync::broadcast::Receiver; +use zerotier_vl1_service::VL1DataStorage; use crate::model::*; @@ -22,7 +22,7 @@ pub enum Change { } #[async_trait] -pub trait Database: Sync + Send + NodeStorage + 'static { +pub trait Database: Sync + Send + VL1DataStorage + 'static { async fn list_networks(&self) -> Result, Error>; async fn get_network(&self, id: NetworkId) -> Result, Error>; async fn save_network(&self, obj: Network, generate_change_notification: bool) -> Result<(), Error>; diff --git a/controller/src/filedatabase.rs b/controller/src/filedatabase.rs index 9625c2a8b..ca1234d2e 100644 --- a/controller/src/filedatabase.rs +++ b/controller/src/filedatabase.rs @@ -1,28 +1,26 @@ use std::path::{Path, PathBuf}; -use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, Weak}; use async_trait::async_trait; use notify::{RecursiveMode, Watcher}; use serde::de::DeserializeOwned; -use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage, Verified}; +use zerotier_network_hypervisor::vl1::{Address, Identity, Valid}; use zerotier_network_hypervisor::vl2::NetworkId; -use zerotier_utils::io::{fs_restrict_permissions, read_limit}; use zerotier_utils::reaper::Reaper; use zerotier_utils::tokio::fs; use zerotier_utils::tokio::runtime::Handle; use zerotier_utils::tokio::sync::broadcast::{channel, Receiver, Sender}; use zerotier_utils::tokio::task::JoinHandle; use zerotier_utils::tokio::time::{sleep, Duration, Instant}; +use zerotier_vl1_service::datadir::{load_node_identity, save_node_identity}; +use zerotier_vl1_service::VL1DataStorage; use crate::cache::Cache; use crate::database::{Change, Database, Error}; use crate::model::*; -const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret"; -const EVENT_HANDLER_TASK_TIMEOUT: Duration = Duration::from_secs(5); +const EVENT_HANDLER_TASK_TIMEOUT: Duration = Duration::from_secs(10); /// An in-filesystem database that permits live editing. /// @@ -34,7 +32,7 @@ const EVENT_HANDLER_TASK_TIMEOUT: Duration = Duration::from_secs(5); /// is different from V1 so it'll need a converter to use with V1 FileDb controller data. pub struct FileDatabase { base_path: PathBuf, - controller_address: AtomicU64, + local_identity: Valid, change_sender: Sender, tasks: Reaper, cache: Cache, @@ -53,9 +51,13 @@ impl FileDatabase { let db_weak = db_weak_tmp.clone(); let runtime2 = runtime.clone(); + let local_identity = load_node_identity(base_path.as_path()) + .ok_or(std::io::Error::new(std::io::ErrorKind::NotFound, "identity.secret not found"))?; + let controller_address = local_identity.address; + let db = Arc::new(Self { base_path: base_path.clone(), - controller_address: AtomicU64::new(0), + local_identity, change_sender, tasks: Reaper::new(&runtime2), cache: Cache::new(), @@ -65,127 +67,115 @@ impl FileDatabase { match event.kind { notify::EventKind::Create(_) | notify::EventKind::Modify(_) | notify::EventKind::Remove(_) => { if let Some(db) = db_weak.lock().unwrap().upgrade() { - if let Some(controller_address) = db.get_controller_address() { - db.clone().tasks.add( - runtime.spawn(async move { - if let Some(path0) = event.paths.first() { - if let Some((record_type, network_id, node_id)) = - Self::record_type_from_path(controller_address, path0.as_path()) - { - // Paths to objects that were deleted or changed. Changed includes adding new objects. - let mut deleted = None; - let mut changed = None; + db.clone().tasks.add( + runtime.spawn(async move { + if let Some(path0) = event.paths.first() { + if let Some((record_type, network_id, node_id)) = + Self::record_type_from_path(controller_address, path0.as_path()) + { + // Paths to objects that were deleted or changed. Changed includes adding new objects. + let mut deleted = None; + let mut changed = None; - match event.kind { - notify::EventKind::Create(create_kind) => match create_kind { - notify::event::CreateKind::File => { - changed = Some(path0.as_path()); - } - _ => {} - }, - notify::EventKind::Modify(modify_kind) => match modify_kind { - notify::event::ModifyKind::Data(_) => { - changed = Some(path0.as_path()); - } - notify::event::ModifyKind::Name(rename_mode) => match rename_mode { - notify::event::RenameMode::Both => { - if event.paths.len() >= 2 { - if let Some(path1) = event.paths.last() { - deleted = Some(path0.as_path()); - changed = Some(path1.as_path()); - } + match event.kind { + notify::EventKind::Create(create_kind) => match create_kind { + notify::event::CreateKind::File => { + changed = Some(path0.as_path()); + } + _ => {} + }, + notify::EventKind::Modify(modify_kind) => match modify_kind { + notify::event::ModifyKind::Data(_) => { + changed = Some(path0.as_path()); + } + notify::event::ModifyKind::Name(rename_mode) => match rename_mode { + notify::event::RenameMode::Both => { + if event.paths.len() >= 2 { + if let Some(path1) = event.paths.last() { + deleted = Some(path0.as_path()); + changed = Some(path1.as_path()); } } - notify::event::RenameMode::From => { - deleted = Some(path0.as_path()); - } - notify::event::RenameMode::To => { - changed = Some(path0.as_path()); - } - _ => {} - }, - _ => {} - }, - notify::EventKind::Remove(remove_kind) => match remove_kind { - notify::event::RemoveKind::File => { + } + notify::event::RenameMode::From => { deleted = Some(path0.as_path()); } + notify::event::RenameMode::To => { + changed = Some(path0.as_path()); + } _ => {} }, _ => {} - } - - if deleted.is_some() { - match record_type { - RecordType::Network => { - if let Some((network, members)) = - db.cache.on_network_deleted(network_id) - { - let _ = - db.change_sender.send(Change::NetworkDeleted(network, members)); - } - } - RecordType::Member => { - if let Some(node_id) = node_id { - if let Some(member) = - db.cache.on_member_deleted(network_id, node_id) - { - let _ = db.change_sender.send(Change::MemberDeleted(member)); - } - } - } - _ => {} + }, + notify::EventKind::Remove(remove_kind) => match remove_kind { + notify::event::RemoveKind::File => { + deleted = Some(path0.as_path()); } - } + _ => {} + }, + _ => {} + } - if let Some(changed) = changed { - match record_type { - RecordType::Network => { - if let Ok(Some(new_network)) = - Self::load_object::(changed).await - { - match db.cache.on_network_updated(new_network.clone()) { - (true, Some(old_network)) => { - let _ = db - .change_sender - .send(Change::NetworkChanged(old_network, new_network)); - } - (true, None) => { - let _ = db - .change_sender - .send(Change::NetworkCreated(new_network)); - } - _ => {} - } - } + if deleted.is_some() { + match record_type { + RecordType::Network => { + if let Some((network, members)) = db.cache.on_network_deleted(network_id) { + let _ = db.change_sender.send(Change::NetworkDeleted(network, members)); } - RecordType::Member => { - if let Ok(Some(new_member)) = Self::load_object::(changed).await - { - match db.cache.on_member_updated(new_member.clone()) { - (true, Some(old_member)) => { - let _ = db - .change_sender - .send(Change::MemberChanged(old_member, new_member)); - } - (true, None) => { - let _ = db - .change_sender - .send(Change::MemberCreated(new_member)); - } - _ => {} - } - } - } - _ => {} } + RecordType::Member => { + if let Some(node_id) = node_id { + if let Some(member) = db.cache.on_member_deleted(network_id, node_id) { + let _ = db.change_sender.send(Change::MemberDeleted(member)); + } + } + } + _ => {} + } + } + + if let Some(changed) = changed { + match record_type { + RecordType::Network => { + if let Ok(Some(new_network)) = Self::load_object::(changed).await { + match db.cache.on_network_updated(new_network.clone()) { + (true, Some(old_network)) => { + let _ = db + .change_sender + .send(Change::NetworkChanged(old_network, new_network)); + } + (true, None) => { + let _ = + db.change_sender.send(Change::NetworkCreated(new_network)); + } + _ => {} + } + } + } + RecordType::Member => { + if let Ok(Some(new_member)) = Self::load_object::(changed).await { + match db.cache.on_member_updated(new_member.clone()) { + (true, Some(old_member)) => { + let _ = db + .change_sender + .send(Change::MemberChanged(old_member, new_member)); + } + (true, None) => { + let _ = + db.change_sender.send(Change::MemberCreated(new_member)); + } + _ => {} + } + } + } + _ => {} } } } - }), - Instant::now().checked_add(EVENT_HANDLER_TASK_TIMEOUT).unwrap(), - ); - } + } + }), + Instant::now().checked_add(EVENT_HANDLER_TASK_TIMEOUT).unwrap(), + ); } } _ => {} @@ -215,20 +205,6 @@ impl FileDatabase { Ok(db) } - fn get_controller_address(&self) -> Option
{ - let a = self.controller_address.load(Ordering::Relaxed); - if a == 0 { - if let Some(id) = self.load_node_identity() { - self.controller_address.store(id.address.into(), Ordering::Relaxed); - Some(id.address) - } else { - None - } - } else { - Address::from_u64(a) - } - } - fn network_path(&self, network_id: NetworkId) -> PathBuf { self.base_path.join(format!("N{:06x}", network_id.network_no())).join("config.yaml") } @@ -274,25 +250,13 @@ impl Drop for FileDatabase { } } -impl NodeStorage for FileDatabase { - fn load_node_identity(&self) -> Option> { - let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384); - if id_data.is_err() { - return None; - } - let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref()); - if id_data.is_err() { - return None; - } - Some(Verified::assume_verified(id_data.unwrap())) +impl VL1DataStorage for FileDatabase { + fn load_node_identity(&self) -> Option> { + load_node_identity(self.base_path.as_path()) } - fn save_node_identity(&self, id: &Verified) { - assert!(id.secret.is_some()); - let id_secret_str = id.to_secret_string(); - let secret_path = self.base_path.join(IDENTITY_SECRET_FILENAME); - assert!(std::fs::write(&secret_path, id_secret_str.as_bytes()).is_ok()); - assert!(fs_restrict_permissions(&secret_path)); + fn save_node_identity(&self, id: &Valid) -> bool { + save_node_identity(self.base_path.as_path(), id) } } @@ -300,19 +264,17 @@ impl NodeStorage for FileDatabase { impl Database for FileDatabase { async fn list_networks(&self) -> Result, Error> { let mut networks = Vec::new(); - if let Some(controller_address) = self.get_controller_address() { - let controller_address_shift24 = u64::from(controller_address).wrapping_shl(24); - let mut dir = fs::read_dir(&self.base_path).await?; - while let Ok(Some(ent)) = dir.next_entry().await { - if ent.file_type().await.map_or(false, |t| t.is_dir()) { - let osname = ent.file_name(); - let name = osname.to_string_lossy(); - if name.len() == 7 && name.starts_with("N") { - if fs::metadata(ent.path().join("config.yaml")).await.is_ok() { - if let Ok(nwid_last24bits) = u64::from_str_radix(&name[1..], 16) { - if let Some(nwid) = NetworkId::from_u64(controller_address_shift24 | nwid_last24bits) { - networks.push(nwid); - } + let controller_address_shift24 = u64::from(self.local_identity.address).wrapping_shl(24); + let mut dir = fs::read_dir(&self.base_path).await?; + while let Ok(Some(ent)) = dir.next_entry().await { + if ent.file_type().await.map_or(false, |t| t.is_dir()) { + let osname = ent.file_name(); + let name = osname.to_string_lossy(); + if name.len() == 7 && name.starts_with("N") { + if fs::metadata(ent.path().join("config.yaml")).await.is_ok() { + if let Ok(nwid_last24bits) = u64::from_str_radix(&name[1..], 16) { + if let Some(nwid) = NetworkId::from_u64(controller_address_shift24 | nwid_last24bits) { + networks.push(nwid); } } } @@ -328,12 +290,10 @@ impl Database for FileDatabase { // FileDatabase stores networks by their "network number" and automatically adapts their IDs // if the controller's identity changes. This is done to make it easy to just clone networks, // including storing them in "git." - if let Some(controller_address) = self.get_controller_address() { - let network_id_should_be = network.id.change_network_controller(controller_address); - if network.id != network_id_should_be { - network.id = network_id_should_be; - let _ = self.save_network(network.clone(), false).await?; - } + let network_id_should_be = network.id.change_network_controller(self.local_identity.address); + if network.id != network_id_should_be { + network.id = network_id_should_be; + let _ = self.save_network(network.clone(), false).await?; } } Ok(network) diff --git a/controller/src/main.rs b/controller/src/main.rs index 624b0bd19..6c9d29f3c 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -2,53 +2,41 @@ use std::sync::Arc; -use clap::{Arg, Command}; - use zerotier_network_controller::database::Database; use zerotier_network_controller::filedatabase::FileDatabase; use zerotier_network_controller::Controller; - use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; use zerotier_utils::exitcode; use zerotier_utils::tokio::runtime::Runtime; use zerotier_vl1_service::VL1Service; -async fn run(database: Arc, runtime: &Runtime) -> i32 { - let handler = Controller::new(database.clone(), runtime.handle().clone()).await; - if handler.is_err() { - eprintln!("FATAL: error initializing handler: {}", handler.err().unwrap().to_string()); - exitcode::ERR_CONFIG - } else { - let handler = handler.unwrap(); - - let svc = VL1Service::new( - database.clone(), - handler.clone(), - handler.clone(), - zerotier_vl1_service::VL1Settings::default(), - ); - - if svc.is_ok() { - let svc = svc.unwrap(); - svc.node().init_default_roots(); - - handler.start(&svc).await; - - zerotier_utils::wait_for_process_abort(); - println!("Terminate signal received, shutting down..."); - exitcode::OK - } else { - eprintln!("FATAL: error launching service: {}", svc.err().unwrap().to_string()); - exitcode::ERR_IOERR +async fn run(database: Arc, runtime: &Runtime) -> i32 { + match Controller::new(database.clone(), runtime.handle().clone()).await { + Err(err) => { + eprintln!("FATAL: error initializing handler: {}", err.to_string()); + exitcode::ERR_CONFIG } + Ok(handler) => match VL1Service::new(database.clone(), handler.clone(), zerotier_vl1_service::VL1Settings::default()) { + Err(err) => { + eprintln!("FATAL: error launching service: {}", err.to_string()); + exitcode::ERR_IOERR + } + Ok(svc) => { + svc.node().init_default_roots(); + handler.start(&svc).await; + zerotier_utils::wait_for_process_abort(); + println!("Terminate signal received, shutting down..."); + exitcode::OK + } + }, } } fn main() { const REQUIRE_ONE_OF_ARGS: [&'static str; 2] = ["postgres", "filedb"]; - let global_args = Command::new("zerotier-controller") + let global_args = clap::Command::new("zerotier-controller") .arg( - Arg::new("filedb") + clap::Arg::new("filedb") .short('f') .long("filedb") .takes_value(true) @@ -58,7 +46,7 @@ fn main() { .required_unless_present_any(&REQUIRE_ONE_OF_ARGS), ) .arg( - Arg::new("postgres") + clap::Arg::new("postgres") .short('p') .long("postgres") .takes_value(true) diff --git a/controller/src/postgresdatabase.rs b/controller/src/postgresdatabase.rs index 94f208b08..cc7198ed9 100644 --- a/controller/src/postgresdatabase.rs +++ b/controller/src/postgresdatabase.rs @@ -10,9 +10,9 @@ use tokio_postgres::types::Type; use tokio_postgres::{Client, Statement}; use zerotier_crypto::secure_eq; -use zerotier_crypto::verified::Verified; +use zerotier_crypto::typestate::Valid; -use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress, NodeStorage}; +use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress}; use zerotier_network_hypervisor::vl2::networkconfig::IpRoute; use zerotier_network_hypervisor::vl2::rule::Rule; use zerotier_network_hypervisor::vl2::NetworkId; @@ -22,6 +22,7 @@ use zerotier_utils::tokio; use zerotier_utils::tokio::runtime::Handle; use zerotier_utils::tokio::sync::broadcast::{channel, Receiver, Sender}; use zerotier_utils::tokio::task::JoinHandle; +use zerotier_vl1_service::VL1DataStorage; use crate::database::*; use crate::model::{IpAssignmentPool, Member, Network, RequestLogItem}; @@ -136,7 +137,7 @@ impl<'a> Drop for ConnectionHolder<'a> { pub struct PostgresDatabase { local_controller_id_str: String, - local_identity: Verified, + local_identity: Valid, connections: Mutex<(Vec>, Sender<()>)>, postgres_path: String, runtime: Handle, @@ -147,7 +148,7 @@ impl PostgresDatabase { runtime: Handle, postgres_path: String, num_connections: usize, - local_identity: Verified, + local_identity: Valid, ) -> Result, Error> { assert!(num_connections > 0); let (sender, _) = channel(4096); @@ -187,14 +188,13 @@ impl PostgresDatabase { } } -impl NodeStorage for PostgresDatabase { - fn load_node_identity(&self) -> Option> { +impl VL1DataStorage for PostgresDatabase { + fn load_node_identity(&self) -> Option> { Some(self.local_identity.clone()) } - fn save_node_identity(&self, _: &Verified) { - eprintln!("FATAL: NodeStorage::save_node_identity() not implemented in PostgresDatabase, identity must be pregenerated"); - panic!(); + fn save_node_identity(&self, id: &Valid) -> bool { + panic!("local identity saving not supported by PostgresDatabase") } } diff --git a/crypto/src/lib.rs b/crypto/src/lib.rs index 7e28a999b..94ab7ff7f 100644 --- a/crypto/src/lib.rs +++ b/crypto/src/lib.rs @@ -8,7 +8,7 @@ pub mod poly1305; pub mod random; pub mod salsa; pub mod secret; -pub mod verified; +pub mod typestate; pub mod x25519; pub const ZEROES: [u8; 64] = [0_u8; 64]; diff --git a/crypto/src/verified.rs b/crypto/src/typestate.rs similarity index 68% rename from crypto/src/verified.rs rename to crypto/src/typestate.rs index 73cba50d8..f8aa47f4c 100644 --- a/crypto/src/verified.rs +++ b/crypto/src/typestate.rs @@ -4,31 +4,25 @@ use std::fmt::Debug; use std::hash::Hash; use std::ops::{Deref, DerefMut}; -/// A zero-overhead typestate indicating that a credential has been verified as valid. -/// -/// What this means is obviously specific to the credential. -/// -/// The purpose of this is to make code more self-documenting and make it harder to accidentally -/// use an unverified/unvalidated credential (or other security critical object) where a verified -/// one is required. +/// Typestate indicating that a credential or other object has been internally validated. #[repr(transparent)] -pub struct Verified(T); +pub struct Valid(T); -impl AsRef for Verified { +impl AsRef for Valid { #[inline(always)] fn as_ref(&self) -> &T { &self.0 } } -impl AsMut for Verified { +impl AsMut for Valid { #[inline(always)] fn as_mut(&mut self) -> &mut T { &mut self.0 } } -impl Deref for Verified { +impl Deref for Valid { type Target = T; #[inline(always)] @@ -37,14 +31,14 @@ impl Deref for Verified { } } -impl DerefMut for Verified { +impl DerefMut for Valid { #[inline(always)] fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } -impl Clone for Verified +impl Clone for Valid where T: Clone, { @@ -54,7 +48,7 @@ where } } -impl PartialEq for Verified +impl PartialEq for Valid where T: PartialEq, { @@ -64,9 +58,9 @@ where } } -impl Eq for Verified where T: Eq {} +impl Eq for Valid where T: Eq {} -impl Ord for Verified +impl Ord for Valid where T: Ord, { @@ -76,7 +70,7 @@ where } } -impl PartialOrd for Verified +impl PartialOrd for Valid where T: PartialOrd, { @@ -86,7 +80,7 @@ where } } -impl Hash for Verified +impl Hash for Valid where T: Hash, { @@ -96,7 +90,7 @@ where } } -impl Debug for Verified +impl Debug for Valid where T: Debug, { @@ -106,7 +100,7 @@ where } } -impl Verified { +impl Valid { /// Strip the Verified typestate off this object. #[inline(always)] pub fn unwrap(self) -> T { diff --git a/network-hypervisor/src/vl1/identity.rs b/network-hypervisor/src/vl1/identity.rs index c28033b2b..fa70071e9 100644 --- a/network-hypervisor/src/vl1/identity.rs +++ b/network-hypervisor/src/vl1/identity.rs @@ -9,11 +9,11 @@ use std::str::FromStr; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use zerotier_crypto::{hash::*, secure_eq}; use zerotier_crypto::p384::*; use zerotier_crypto::salsa::Salsa; use zerotier_crypto::secret::Secret; use zerotier_crypto::x25519::*; +use zerotier_crypto::{hash::*, secure_eq}; use zerotier_utils::arrayvec::ArrayVec; use zerotier_utils::buffer::Buffer; @@ -23,7 +23,7 @@ use zerotier_utils::{base64_decode_url_nopad, base64_encode_url_nopad, hex}; use crate::protocol::{ADDRESS_SIZE, ADDRESS_SIZE_STRING, IDENTITY_POW_THRESHOLD}; use crate::vl1::Address; -use crate::vl1::Verified; +use crate::vl1::Valid; /// Current maximum size for an identity signature. pub const IDENTITY_MAX_SIGNATURE_SIZE: usize = P384_ECDSA_SIGNATURE_SIZE + 1; @@ -166,7 +166,7 @@ impl Identity { const FLAG_INCLUDES_SECRETS: u8 = 0x80; /// Generate a new identity. - pub fn generate() -> Verified { + pub fn generate() -> Valid { // First generate an identity with just x25519 keys and derive its address. let mut sha = SHA512::new(); let ed25519 = Ed25519KeyPair::generate(); @@ -206,7 +206,7 @@ impl Identity { assert!(id.upgrade().is_ok()); assert!(id.p384.is_some() && id.secret.as_ref().unwrap().p384.is_some()); - Verified::assume_verified(id) + Valid::assume_verified(id) } /// Upgrade older x25519-only identities to hybrid identities with both x25519 and NIST P-384 curves. @@ -290,7 +290,7 @@ impl Identity { /// Locally check the validity of this identity. /// /// This is somewhat time consuming due to the memory-intensive work algorithm. - pub fn validate(self) -> Option> { + pub fn validate(self) -> Option> { if let Some(p384) = self.p384.as_ref() { let mut self_sign_buf: Vec = Vec::with_capacity( ADDRESS_SIZE + 4 + C25519_PUBLIC_KEY_SIZE + ED25519_PUBLIC_KEY_SIZE + P384_PUBLIC_KEY_SIZE + P384_PUBLIC_KEY_SIZE, @@ -321,7 +321,7 @@ impl Identity { zt_address_derivation_work_function(&mut digest); return if digest[0] < IDENTITY_POW_THRESHOLD && Address::from_bytes(&digest[59..64]).map_or(false, |a| a == self.address) { - Some(Verified::assume_verified(self)) + Some(Valid::assume_verified(self)) } else { None }; @@ -345,7 +345,7 @@ impl Identity { /// For new identities with P-384 keys a hybrid agreement is performed using both X25519 and NIST P-384 ECDH. /// The final key is derived as HMAC(x25519 secret, p-384 secret) to yield a FIPS-compliant key agreement with /// the X25519 secret being used as a "salt" as far as FIPS is concerned. - pub fn agree(&self, other: &Verified) -> Option> { + pub fn agree(&self, other: &Valid) -> Option> { if let Some(secret) = self.secret.as_ref() { let c25519_secret: Secret<64> = Secret(SHA512::hash(&secret.x25519.agree(&other.x25519).0)); diff --git a/network-hypervisor/src/vl1/mod.rs b/network-hypervisor/src/vl1/mod.rs index 63cd1ece4..5c7e71dc2 100644 --- a/network-hypervisor/src/vl1/mod.rs +++ b/network-hypervisor/src/vl1/mod.rs @@ -18,12 +18,12 @@ pub use event::Event; pub use identity::Identity; pub use inetaddress::InetAddress; pub use mac::MAC; -pub use node::{DummyInnerProtocol, HostSystem, InnerProtocol, Node, NodeStorage, PacketHandlerResult, VL1AuthProvider}; +pub use node::{ApplicationLayer, InnerProtocolLayer, Node, PacketHandlerResult}; pub use path::Path; pub use peer::Peer; pub use rootset::{Root, RootSet}; -pub use zerotier_crypto::verified::Verified; +pub use zerotier_crypto::typestate::Valid; #[cfg(feature = "debug_events")] #[allow(unused_macros)] diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index 3c96c889a..c2408f7fe 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -17,7 +17,7 @@ use crate::vl1::identity::Identity; use crate::vl1::path::{Path, PathServiceResult}; use crate::vl1::peer::Peer; use crate::vl1::rootset::RootSet; -use crate::vl1::Verified; +use crate::vl1::Valid; use zerotier_crypto::random; use zerotier_utils::error::InvalidParameterError; @@ -27,42 +27,11 @@ use zerotier_utils::marshalable::Marshalable; use zerotier_utils::ringbuffer::RingBuffer; use zerotier_utils::thing::Thing; -/// Trait providing VL1 authentication functions to determine which nodes we should talk to. +/// Interface trait to be implemented by code that's using the ZeroTier network hypervisor. /// -/// This is included in HostSystem but is provided as a separate trait to make it easy for -/// implementers of HostSystem to break this out and allow a user to specify it. -pub trait VL1AuthProvider: Sync + Send { - /// Check if this node should respond to messages from a given peer at all. - /// - /// If this returns false, the node simply drops messages on the floor and refuses - /// to init V2 sessions. - fn should_respond_to(&self, id: &Verified) -> bool; - - /// Check if this node has any trust relationship with the provided identity. - /// - /// This should return true if there is any special trust relationship such as mutual - /// membership in a network or for controllers the peer's membership in any network - /// they control. - fn has_trust_relationship(&self, id: &Verified) -> bool; -} - -/// Trait to be implemented by outside code to provide object storage to VL1 -/// -/// This is included in HostSystem but is provided as a separate trait to make it easy for -/// implementers of HostSystem to break this out and allow a user to specify it. -pub trait NodeStorage: Sync + Send { - /// Load this node's identity from the data store. - fn load_node_identity(&self) -> Option>; - - /// Save this node's identity to the data store. - fn save_node_identity(&self, id: &Verified); -} - -/// Trait implemented by external code to handle events and provide an interface to the system or application. -pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static { - /// Type for implementation of NodeStorage. - type Storage: NodeStorage + ?Sized; - +/// This is analogous to a C struct full of function pointers to callbacks along with some +/// associated type definitions. +pub trait ApplicationLayer: Sync + Send { /// Type for local system sockets. type LocalSocket: Sync + Send + Hash + PartialEq + Eq + Clone + ToString + Sized + 'static; @@ -72,8 +41,11 @@ pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static { /// A VL1 level event occurred. fn event(&self, event: Event); - /// Get a reference to the local storage implementation at this host. - fn storage(&self) -> &Self::Storage; + /// Load this node's identity from the data store. + fn load_node_identity(&self) -> Option>; + + /// Save this node's identity to the data store, returning true on success. + fn save_node_identity(&self, id: &Valid) -> bool; /// Get a pooled packet buffer for internal use. fn get_buffer(&self) -> PooledPacketBuffer; @@ -110,12 +82,12 @@ pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static { /// /// The default implementation always returns true. #[allow(unused_variables)] - fn should_use_physical_path( + fn should_use_physical_path( &self, - id: &Verified, + id: &Valid, endpoint: &Endpoint, - local_socket: Option<&HostSystemImpl::LocalSocket>, - local_interface: Option<&HostSystemImpl::LocalInterface>, + local_socket: Option<&Application::LocalSocket>, + local_interface: Option<&Application::LocalInterface>, ) -> bool { true } @@ -124,16 +96,10 @@ pub trait HostSystem: VL1AuthProvider + NodeStorage + 'static { /// /// The default implementation always returns None. #[allow(unused_variables)] - fn get_path_hints( + fn get_path_hints( &self, - id: &Verified, - ) -> Option< - Vec<( - Endpoint, - Option, - Option, - )>, - > { + id: &Valid, + ) -> Option, Option)>> { None } @@ -163,14 +129,32 @@ pub enum PacketHandlerResult { /// This is implemented by Switch in VL2. It's usually not used outside of VL2 in the core but /// it could also be implemented for testing or "off label" use of VL1 to carry different protocols. #[allow(unused)] -pub trait InnerProtocol: Sync + Send { +pub trait InnerProtocolLayer: Sync + Send { + /// Check if this node should respond to messages from a given peer at all. + /// + /// The default implementation always returns true. + fn should_respond_to(&self, id: &Valid) -> bool { + true + } + + /// Check if this node has any trust relationship with the provided identity. + /// + /// This should return true if there is any special trust relationship. It controls things + /// like sharing of detailed P2P connectivity data, which should be limited to peers with + /// some privileged relationship like mutual membership in a network. + /// + /// The default implementation always returns true. + fn has_trust_relationship(&self, id: &Valid) -> bool { + true + } + /// Handle a packet, returning true if it was handled by the next layer. /// /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). /// The default version returns NotHandled. - fn handle_packet( + fn handle_packet( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -185,9 +169,9 @@ pub trait InnerProtocol: Sync + Send { /// Handle errors, returning true if the error was recognized. /// The default version returns NotHandled. - fn handle_error( + fn handle_error( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -204,9 +188,9 @@ pub trait InnerProtocol: Sync + Send { /// Handle an OK, returning true if the OK was recognized. /// The default version returns NotHandled. - fn handle_ok( + fn handle_ok( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -221,12 +205,9 @@ pub trait InnerProtocol: Sync + Send { } } -/// How often to check the root cluster definitions against the root list and update. -const ROOT_SYNC_INTERVAL_MS: i64 = 1000; - struct RootInfo { /// Root sets to which we are a member. - sets: HashMap>, + sets: HashMap>, /// Root peers and their statically defined endpoints (from root sets). roots: HashMap, Vec>, @@ -242,7 +223,9 @@ struct RootInfo { online: bool, } -/// Interval gate objects used ot fire off background tasks, see do_background_tasks(). +/// How often to check the root cluster definitions against the root list and update. +const ROOT_SYNC_INTERVAL_MS: i64 = 1000; + #[derive(Default)] struct BackgroundTaskIntervals { root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, @@ -253,7 +236,6 @@ struct BackgroundTaskIntervals { whois_queue_retry: IntervalGate<{ WHOIS_RETRY_INTERVAL }>, } -/// WHOIS requests and any packets that are waiting on them to be decrypted and authenticated. struct WhoisQueueItem { v1_proto_waiting_packets: RingBuffer<(Weak, PooledPacketBuffer), WHOIS_MAX_WAITING_PACKETS>, last_retry_time: i64, @@ -261,24 +243,21 @@ struct WhoisQueueItem { } const PATH_MAP_SIZE: usize = std::mem::size_of::() + 128], Arc>>(); -type PathMap = HashMap, Arc>; +type PathMap = HashMap, Arc>; /// A ZeroTier VL1 node that can communicate securely with the ZeroTier peer-to-peer network. pub struct Node { /// A random ID generated to identify this particular running instance. - /// - /// This can be used to implement multi-homing by allowing remote nodes to distinguish instances - /// that share an identity. pub instance_id: [u8; 16], /// This node's identity and permanent keys. - pub identity: Verified, + pub identity: Valid, /// Interval latches for periodic background tasks. intervals: Mutex, /// Canonicalized network paths, held as Weak<> to be automatically cleaned when no longer in use. - paths: RwLock>, // holds a PathMap<> but as a Thing<> to hide HostSystemImpl template parameter + paths: RwLock>, // holds a PathMap<> but as a Thing<> to hide ApplicationLayer template parameter /// Peers with which we are currently communicating. peers: RwLock>>, @@ -294,20 +273,20 @@ pub struct Node { } impl Node { - pub fn new( - host_system: &HostSystemImpl, + pub fn new( + app: &Application, auto_generate_identity: bool, auto_upgrade_identity: bool, ) -> Result { let mut id = { - let id = host_system.storage().load_node_identity(); + let id = app.load_node_identity(); if id.is_none() { if !auto_generate_identity { return Err(InvalidParameterError("no identity found and auto-generate not enabled")); } else { let id = Identity::generate(); - host_system.event(Event::IdentityAutoGenerated(id.as_ref().clone())); - host_system.storage().save_node_identity(&id); + app.event(Event::IdentityAutoGenerated(id.as_ref().clone())); + app.save_node_identity(&id); id } } else { @@ -318,18 +297,18 @@ impl Node { if auto_upgrade_identity { let old = id.clone(); if id.upgrade()? { - host_system.storage().save_node_identity(&id); - host_system.event(Event::IdentityAutoUpgraded(old.unwrap(), id.as_ref().clone())); + app.save_node_identity(&id); + app.event(Event::IdentityAutoUpgraded(old.unwrap(), id.as_ref().clone())); } } - debug_event!(host_system, "[vl1] loaded identity {}", id.to_string()); + debug_event!(app, "[vl1] loaded identity {}", id.to_string()); Ok(Self { instance_id: random::get_bytes_secure(), identity: id, intervals: Mutex::new(BackgroundTaskIntervals::default()), - paths: RwLock::new(Thing::new(PathMap::::new())), + paths: RwLock::new(Thing::new(PathMap::::new())), peers: RwLock::new(HashMap::new()), roots: RwLock::new(RootInfo { sets: HashMap::new(), @@ -343,31 +322,37 @@ impl Node { }) } + #[inline] pub fn peer(&self, a: Address) -> Option> { self.peers.read().unwrap().get(&a).cloned() } + #[inline] pub fn is_online(&self) -> bool { self.roots.read().unwrap().online } /// Get the current "best" root from among this node's trusted roots. + #[inline] pub fn best_root(&self) -> Option> { self.best_root.read().unwrap().clone() } /// Check whether a peer is a root according to any root set trusted by this node. + #[inline] pub fn is_peer_root(&self, peer: &Peer) -> bool { self.roots.read().unwrap().roots.keys().any(|p| p.identity.eq(&peer.identity)) } /// Returns true if this node is a member of a root set (that it knows about). + #[inline] pub fn this_node_is_root(&self) -> bool { self.roots.read().unwrap().this_root_sets.is_some() } /// Add a new root set or update the existing root set if the new root set is newer and otherwise matches. - pub fn add_update_root_set(&self, rs: Verified) -> bool { + #[inline] + pub fn add_update_root_set(&self, rs: Valid) -> bool { let mut roots = self.roots.write().unwrap(); if let Some(entry) = roots.sets.get_mut(&rs.name) { if rs.should_replace(entry) { @@ -385,11 +370,13 @@ impl Node { } /// Returns whether or not this node has any root sets defined. + #[inline] pub fn has_roots_defined(&self) -> bool { self.roots.read().unwrap().sets.iter().any(|rs| !rs.1.members.is_empty()) } /// Initialize with default roots if there are no roots defined, otherwise do nothing. + #[inline] pub fn init_default_roots(&self) -> bool { if !self.has_roots_defined() { self.add_update_root_set(RootSet::zerotier_default()) @@ -399,69 +386,28 @@ impl Node { } /// Get the root sets that this node trusts. + #[inline] pub fn root_sets(&self) -> Vec { self.roots.read().unwrap().sets.values().cloned().map(|s| s.unwrap()).collect() } - pub fn do_background_tasks(&self, host_system: &HostSystemImpl) -> Duration { + pub fn do_background_tasks(&self, app: &Application) -> Duration { const INTERVAL_MS: i64 = 1000; const INTERVAL: Duration = Duration::from_millis(INTERVAL_MS as u64); - let time_ticks = host_system.time_ticks(); + let time_ticks = app.time_ticks(); - let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_queue_retry) = { + let (root_sync, root_hello, root_spam_hello, peer_service, path_service, whois_queue_retry) = { let mut intervals = self.intervals.lock().unwrap(); ( intervals.root_sync.gate(time_ticks), intervals.root_hello.gate(time_ticks), - intervals.root_spam_hello.gate(time_ticks), + intervals.root_spam_hello.gate(time_ticks) && !self.is_online(), intervals.peer_service.gate(time_ticks), intervals.path_service.gate(time_ticks), intervals.whois_queue_retry.gate(time_ticks), ) }; - // We only "spam" (try to contact roots more often) if we are offline. - if root_spam_hello { - root_spam_hello = !self.is_online(); - } - - /* - debug_event!( - host_system, - "[vl1] do_background_tasks:{}{}{}{}{}{} ----", - if root_sync { - " root_sync" - } else { - "" - }, - if root_hello { - " root_hello" - } else { - "" - }, - if root_spam_hello { - " root_spam_hello" - } else { - "" - }, - if peer_service { - " peer_service" - } else { - "" - }, - if path_service { - " path_service" - } else { - "" - }, - if whois_queue_retry { - " whois_queue_retry" - } else { - "" - } - ); - */ - if root_sync { if { let mut roots = self.roots.write().unwrap(); @@ -472,7 +418,7 @@ impl Node { false } } { - debug_event!(host_system, "[vl1] root sets modified, synchronizing internal data structures"); + debug_event!(app, "[vl1] root sets modified, synchronizing internal data structures"); let (mut old_root_identities, address_collisions, new_roots, bad_identities, my_root_sets) = { let roots = self.roots.read().unwrap(); @@ -513,7 +459,7 @@ impl Node { if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) && !m.identity.eq(&self.identity) { debug_event!( - host_system, + app, "[vl1] examining root {} with {} endpoints", m.identity.address.to_string(), m.endpoints.as_ref().map_or(0, |e| e.len()) @@ -522,8 +468,7 @@ impl Node { if let Some(peer) = peers.get(&m.identity.address) { new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); } else { - if let Some(peer) = Peer::new(&self.identity, Verified::assume_verified(m.identity.clone()), time_ticks) - { + if let Some(peer) = Peer::new(&self.identity, Valid::assume_verified(m.identity.clone()), time_ticks) { drop(peers); new_roots.insert( self.peers @@ -546,13 +491,13 @@ impl Node { }; for c in address_collisions.iter() { - host_system.event(Event::SecurityWarning(format!( + app.event(Event::SecurityWarning(format!( "address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string() ))); } for i in bad_identities.iter() { - host_system.event(Event::SecurityWarning(format!( + app.event(Event::SecurityWarning(format!( "bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string() ))); @@ -566,7 +511,7 @@ impl Node { let mut roots = self.roots.write().unwrap(); roots.roots = new_roots; roots.this_root_sets = my_root_sets; - host_system.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); + app.event(Event::UpdatedRoots(old_root_identities, new_root_identities)); } } @@ -592,7 +537,7 @@ impl Node { let mut best_root = self.best_root.write().unwrap(); if let Some(best_root) = best_root.as_mut() { debug_event!( - host_system, + app, "[vl1] selected new best root: {} (replaced {})", best.identity.address.to_string(), best_root.identity.address.to_string() @@ -600,7 +545,7 @@ impl Node { *best_root = best.clone(); } else { debug_event!( - host_system, + app, "[vl1] selected new best root: {} (was empty)", best.identity.address.to_string() ); @@ -610,7 +555,7 @@ impl Node { } else { if let Some(old_best) = self.best_root.write().unwrap().take() { debug_event!( - host_system, + app, "[vl1] selected new best root: NONE (replaced {})", old_best.identity.address.to_string() ); @@ -622,12 +567,12 @@ impl Node { if !roots.online { drop(roots); self.roots.write().unwrap().online = true; - host_system.event(Event::Online(true)); + app.event(Event::Online(true)); } } else if roots.online { drop(roots); self.roots.write().unwrap().online = false; - host_system.event(Event::Online(false)); + app.event(Event::Online(false)); } } } @@ -648,14 +593,14 @@ impl Node { for (root, endpoints) in roots.iter() { for ep in endpoints.iter() { debug_event!( - host_system, + app, "sending HELLO to root {} (root interval: {})", root.identity.address.to_string(), ROOT_HELLO_INTERVAL ); let root = root.clone(); let ep = ep.clone(); - root.send_hello(host_system, self, Some(&ep)); + root.send_hello(app, self, Some(&ep)); } } } @@ -667,7 +612,7 @@ impl Node { { let roots = self.roots.read().unwrap(); for (a, peer) in self.peers.read().unwrap().iter() { - if !peer.service(host_system, self, time_ticks) && !roots.roots.contains_key(peer) { + if !peer.service(app, self, time_ticks) && !roots.roots.contains_key(peer) { dead_peers.push(*a); } } @@ -682,8 +627,8 @@ impl Node { let mut need_keepalive = Vec::new(); // First check all paths in read mode to avoid blocking the entire node. - for (k, path) in self.paths.read().unwrap().get::>().iter() { - if host_system.local_socket_is_valid(k.local_socket()) { + for (k, path) in self.paths.read().unwrap().get::>().iter() { + if app.local_socket_is_valid(k.local_socket()) { match path.service(time_ticks) { PathServiceResult::Ok => {} PathServiceResult::Dead => dead_paths.push(k.to_copied()), @@ -696,16 +641,20 @@ impl Node { // Lock in write mode and remove dead paths, doing so piecemeal to again avoid blocking. for dp in dead_paths.iter() { - self.paths.write().unwrap().get_mut::>().remove(dp); + self.paths + .write() + .unwrap() + .get_mut::>() + .remove(dp); } // Finally run keepalive sends as a batch. let keepalive_buf = [time_ticks as u8]; // just an arbitrary byte, no significance for p in need_keepalive.iter() { - host_system.wire_send( + app.wire_send( &p.endpoint, - Some(p.local_socket::()), - Some(p.local_interface::()), + Some(p.local_socket::()), + Some(p.local_interface::()), &keepalive_buf, 0, ); @@ -727,26 +676,25 @@ impl Node { need_whois }; if !need_whois.is_empty() { - self.send_whois(host_system, need_whois.as_slice(), time_ticks); + self.send_whois(app, need_whois.as_slice(), time_ticks); } } - //debug_event!(host_system, "[vl1] do_background_tasks DONE ----"); INTERVAL } - pub fn handle_incoming_physical_packet( + pub fn handle_incoming_physical_packet( &self, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, source_endpoint: &Endpoint, - source_local_socket: &HostSystemImpl::LocalSocket, - source_local_interface: &HostSystemImpl::LocalInterface, + source_local_socket: &Application::LocalSocket, + source_local_interface: &Application::LocalInterface, time_ticks: i64, mut packet: PooledPacketBuffer, ) { debug_event!( - host_system, + app, "[vl1] {} -> #{} {}->{} length {} (on socket {}@{})", source_endpoint.to_string(), packet @@ -779,15 +727,14 @@ impl Node { if dest == self.identity.address { let fragment_header = &*fragment_header; // discard mut - let path = - self.canonical_path::(source_endpoint, source_local_socket, source_local_interface, time_ticks); + let path = self.canonical_path::(source_endpoint, source_local_socket, source_local_interface, time_ticks); path.log_receive_anything(time_ticks); if fragment_header.is_fragment() { #[cfg(debug_assertions)] let fragment_header_id = u64::from_be_bytes(fragment_header.id); debug_event!( - host_system, + app, "[vl1] [v1] #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), @@ -803,14 +750,14 @@ impl Node { ) { if let Some(frag0) = assembled_packet.frags[0].as_ref() { #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] [v1] #{:0>16x} packet fully assembled!", fragment_header_id); + debug_event!(app, "[vl1] [v1] #{:0>16x} packet fully assembled!", fragment_header_id); if let Ok(packet_header) = frag0.struct_at::(0) { if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { peer.v1_proto_receive( self, - host_system, + app, inner, time_ticks, &path, @@ -821,7 +768,7 @@ impl Node { } else { // If WHOIS is needed we need to go ahead and combine the packet so it can be cached // for later processing when a WHOIS reply comes back. - let mut combined_packet = host_system.get_buffer(); + let mut combined_packet = app.get_buffer(); let mut ok = combined_packet.append_bytes(frag0.as_bytes()).is_ok(); for i in 1..assembled_packet.have { if let Some(f) = assembled_packet.frags[i as usize].as_ref() { @@ -832,7 +779,7 @@ impl Node { } } if ok { - self.whois(host_system, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks); + self.whois(app, source, Some((Arc::downgrade(&path), combined_packet)), time_ticks); } } } // else source address invalid @@ -840,17 +787,13 @@ impl Node { } // else reassembly failed (in a way that shouldn't be possible) } // else packet not fully assembled yet } else if let Ok(packet_header) = packet.struct_at::(0) { - debug_event!( - host_system, - "[vl1] [v1] #{:0>16x} is unfragmented", - u64::from_be_bytes(packet_header.id) - ); + debug_event!(app, "[vl1] [v1] #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id)); if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(peer) = self.peer(source) { - peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, packet.as_ref(), &[]); + peer.v1_proto_receive(self, app, inner, time_ticks, &path, packet_header, packet.as_ref(), &[]); } else { - self.whois(host_system, source, Some((Arc::downgrade(&path), packet)), time_ticks); + self.whois(app, source, Some((Arc::downgrade(&path), packet)), time_ticks); } } } // else not fragment and header incomplete @@ -866,7 +809,7 @@ impl Node { { debug_packet_id = u64::from_be_bytes(fragment_header.id); debug_event!( - host_system, + app, "[vl1] [v1] #{:0>16x} forwarding packet fragment to {}", debug_packet_id, dest.to_string() @@ -874,7 +817,7 @@ impl Node { } if fragment_header.increment_hops() > v1::FORWARD_MAX_HOPS { #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); + debug_event!(app, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", debug_packet_id); return; } } else if let Ok(packet_header) = packet.struct_mut_at::(0) { @@ -882,7 +825,7 @@ impl Node { { debug_packet_id = u64::from_be_bytes(packet_header.id); debug_event!( - host_system, + app, "[vl1] [v1] #{:0>16x} forwarding packet to {}", debug_packet_id, dest.to_string() @@ -891,7 +834,7 @@ impl Node { if packet_header.increment_hops() > v1::FORWARD_MAX_HOPS { #[cfg(debug_assertions)] debug_event!( - host_system, + app, "[vl1] [v1] #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(packet_header.id) ); @@ -903,10 +846,10 @@ impl Node { if let Some(peer) = self.peer(dest) { if let Some(forward_path) = peer.direct_path() { - host_system.wire_send( + app.wire_send( &forward_path.endpoint, - Some(forward_path.local_socket::()), - Some(forward_path.local_interface::()), + Some(forward_path.local_socket::()), + Some(forward_path.local_interface::()), packet.as_bytes(), 0, ); @@ -914,7 +857,7 @@ impl Node { peer.last_forward_time_ticks.store(time_ticks, Ordering::Relaxed); #[cfg(debug_assertions)] - debug_event!(host_system, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id); + debug_event!(app, "[vl1] [v1] #{:0>16x} forwarded successfully", debug_packet_id); } } } // else not for this node and shouldn't be forwarded @@ -923,9 +866,9 @@ impl Node { } /// Enqueue and send a WHOIS query for a given address, adding the supplied packet (if any) to the list to be processed on reply. - fn whois( + fn whois( &self, - host_system: &HostSystemImpl, + app: &Application, address: Address, waiting_packet: Option<(Weak, PooledPacketBuffer)>, time_ticks: i64, @@ -947,13 +890,13 @@ impl Node { qi.retry_count += 1; } } - self.send_whois(host_system, &[address], time_ticks); + self.send_whois(app, &[address], time_ticks); } /// Send a WHOIS query to the current best root. - fn send_whois(&self, host_system: &HostSystemImpl, mut addresses: &[Address], time_ticks: i64) { + fn send_whois(&self, app: &Application, mut addresses: &[Address], time_ticks: i64) { debug_assert!(!addresses.is_empty()); - debug_event!(host_system, "[vl1] [v1] sending WHOIS for {}", { + debug_event!(app, "[vl1] [v1] sending WHOIS for {}", { let mut tmp = String::new(); for a in addresses.iter() { if !tmp.is_empty() { @@ -966,7 +909,7 @@ impl Node { if let Some(root) = self.best_root() { while !addresses.is_empty() { if !root - .send(host_system, self, None, time_ticks, |packet| -> Result<(), Infallible> { + .send(app, self, None, time_ticks, |packet| -> Result<(), Infallible> { assert!(packet.append_u8(message_type::VL1_WHOIS).is_ok()); while !addresses.is_empty() && (packet.len() + ADDRESS_SIZE) <= UDP_DEFAULT_MTU { assert!(packet.append_bytes_fixed(&addresses[0].to_bytes()).is_ok()); @@ -983,10 +926,10 @@ impl Node { } /// Called by Peer when an identity is received from another node, e.g. via OK(WHOIS). - pub(crate) fn handle_incoming_identity( + pub(crate) fn handle_incoming_identity( &self, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, received_identity: Identity, time_ticks: i64, authoritative: bool, @@ -996,7 +939,7 @@ impl Node { let mut whois_queue = self.whois_queue.lock().unwrap(); if let Some(qi) = whois_queue.get_mut(&received_identity.address) { let address = received_identity.address; - if host_system.should_respond_to(&received_identity) { + if inner.should_respond_to(&received_identity) { let mut peers = self.peers.write().unwrap(); if let Some(peer) = peers.get(&address).cloned().or_else(|| { Peer::new(&self.identity, received_identity, time_ticks) @@ -1007,7 +950,7 @@ impl Node { for p in qi.v1_proto_waiting_packets.iter() { if let Some(path) = p.0.upgrade() { if let Ok(packet_header) = p.1.struct_at::(0) { - peer.v1_proto_receive(self, host_system, inner, time_ticks, &path, packet_header, &p.1, &[]); + peer.v1_proto_receive(self, app, inner, time_ticks, &path, packet_header, &p.1, &[]); } } } @@ -1023,7 +966,8 @@ impl Node { /// /// This will only replace an existing root set with a newer one. It won't add a new root set, which must be /// done by an authorized user or administrator not just by a root. - pub(crate) fn on_remote_update_root_set(&self, received_from: &Identity, rs: Verified) { + #[allow(unused)] + pub(crate) fn on_remote_update_root_set(&self, received_from: &Identity, rs: Valid) { let mut roots = self.roots.write().unwrap(); if let Some(entry) = roots.sets.get_mut(&rs.name) { if entry.members.iter().any(|m| m.identity.eq(received_from)) && rs.should_replace(entry) { @@ -1034,25 +978,28 @@ impl Node { } /// Get the canonical Path object corresponding to an endpoint. - pub(crate) fn canonical_path( + pub(crate) fn canonical_path( &self, ep: &Endpoint, - local_socket: &HostSystemImpl::LocalSocket, - local_interface: &HostSystemImpl::LocalInterface, + local_socket: &Application::LocalSocket, + local_interface: &Application::LocalInterface, time_ticks: i64, ) -> Arc { let paths = self.paths.read().unwrap(); - if let Some(path) = paths.get::>().get(&PathKey::Ref(ep, local_socket)) { + if let Some(path) = paths + .get::>() + .get(&PathKey::Ref(ep, local_socket)) + { path.clone() } else { drop(paths); self.paths .write() .unwrap() - .get_mut::>() + .get_mut::>() .entry(PathKey::Copied(ep.clone(), local_socket.clone())) .or_insert_with(|| { - Arc::new(Path::new::( + Arc::new(Path::new::( ep.clone(), local_socket.clone(), local_interface.clone(), @@ -1064,14 +1011,14 @@ impl Node { } } -/// Key used to look up paths in a hash map -/// This supports copied keys for storing and refs for fast lookup without having to copy anything. -enum PathKey<'a, 'b, HostSystemImpl: HostSystem + ?Sized> { - Copied(Endpoint, HostSystemImpl::LocalSocket), - Ref(&'a Endpoint, &'b HostSystemImpl::LocalSocket), +/// Key used to look up paths in a hash map efficiently. +enum PathKey<'a, 'b, LocalSocket: Hash + PartialEq + Eq + Clone> { + Copied(Endpoint, LocalSocket), + Ref(&'a Endpoint, &'b LocalSocket), } -impl Hash for PathKey<'_, '_, HostSystemImpl> { +impl Hash for PathKey<'_, '_, LocalSocket> { + #[inline(always)] fn hash(&self, state: &mut H) { match self { Self::Copied(ep, ls) => { @@ -1086,7 +1033,8 @@ impl Hash for PathKey<'_, '_, HostSystemImp } } -impl PartialEq for PathKey<'_, '_, HostSystemImpl> { +impl PartialEq for PathKey<'_, '_, LocalSocket> { + #[inline(always)] fn eq(&self, other: &Self) -> bool { match (self, other) { (Self::Copied(ep1, ls1), Self::Copied(ep2, ls2)) => ep1.eq(ep2) && ls1.eq(ls2), @@ -1097,11 +1045,11 @@ impl PartialEq for PathKey<'_, '_, HostSyst } } -impl Eq for PathKey<'_, '_, HostSystemImpl> {} +impl Eq for PathKey<'_, '_, LocalSocket> {} -impl PathKey<'_, '_, HostSystemImpl> { +impl PathKey<'_, '_, LocalSocket> { #[inline(always)] - fn local_socket(&self) -> &HostSystemImpl::LocalSocket { + fn local_socket(&self) -> &LocalSocket { match self { Self::Copied(_, ls) => ls, Self::Ref(_, ls) => *ls, @@ -1109,28 +1057,10 @@ impl PathKey<'_, '_, HostSystemImpl> { } #[inline(always)] - fn to_copied(&self) -> PathKey<'static, 'static, HostSystemImpl> { + fn to_copied(&self) -> PathKey<'static, 'static, LocalSocket> { match self { - Self::Copied(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied(ep.clone(), ls.clone()), - Self::Ref(ep, ls) => PathKey::<'static, 'static, HostSystemImpl>::Copied((*ep).clone(), (*ls).clone()), + Self::Copied(ep, ls) => PathKey::<'static, 'static, LocalSocket>::Copied(ep.clone(), ls.clone()), + Self::Ref(ep, ls) => PathKey::<'static, 'static, LocalSocket>::Copied((*ep).clone(), (*ls).clone()), } } } - -/// Dummy no-op inner protocol for debugging and testing. -#[derive(Default)] -pub struct DummyInnerProtocol; - -impl InnerProtocol for DummyInnerProtocol {} - -impl VL1AuthProvider for DummyInnerProtocol { - #[inline(always)] - fn should_respond_to(&self, _: &Verified) -> bool { - true - } - - #[inline(always)] - fn has_trust_relationship(&self, _: &Verified) -> bool { - true - } -} diff --git a/network-hypervisor/src/vl1/path.rs b/network-hypervisor/src/vl1/path.rs index 11068a6eb..5a59364ec 100644 --- a/network-hypervisor/src/vl1/path.rs +++ b/network-hypervisor/src/vl1/path.rs @@ -37,10 +37,10 @@ pub struct Path { } impl Path { - pub(crate) fn new( + pub(crate) fn new( endpoint: Endpoint, - local_socket: HostSystemImpl::LocalSocket, - local_interface: HostSystemImpl::LocalInterface, + local_socket: Application::LocalSocket, + local_interface: Application::LocalInterface, time_ticks: i64, ) -> Self { Self { @@ -55,12 +55,12 @@ impl Path { } #[inline(always)] - pub(crate) fn local_socket(&self) -> &HostSystemImpl::LocalSocket { + pub(crate) fn local_socket(&self) -> &Application::LocalSocket { self.local_socket.get() } #[inline(always)] - pub(crate) fn local_interface(&self) -> &HostSystemImpl::LocalInterface { + pub(crate) fn local_interface(&self) -> &Application::LocalInterface { self.local_interface.get() } diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index d74ac3851..3178b213b 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -18,14 +18,14 @@ use crate::protocol::*; use crate::vl1::address::Address; use crate::vl1::debug_event; use crate::vl1::node::*; -use crate::vl1::Verified; +use crate::vl1::Valid; use crate::vl1::{Endpoint, Identity, Path}; use crate::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; pub(crate) const SERVICE_INTERVAL_MS: i64 = 10000; pub struct Peer { - pub identity: Verified, + pub identity: Valid, v1_proto_static_secret: v1::SymmetricSecret, paths: Mutex>, @@ -53,7 +53,7 @@ struct RemoteNodeInfo { } /// Sort a list of paths by quality or priority, with best paths first. -fn prioritize_paths(paths: &mut Vec) { +fn prioritize_paths(paths: &mut Vec) { paths.sort_unstable_by(|a, b| a.last_receive_time_ticks.cmp(&b.last_receive_time_ticks).reverse()); } @@ -62,7 +62,7 @@ impl Peer { /// /// This only returns None if this_node_identity does not have its secrets or if some /// fatal error occurs performing key agreement between the two identities. - pub(crate) fn new(this_node_identity: &Verified, id: Verified, time_ticks: i64) -> Option { + pub(crate) fn new(this_node_identity: &Valid, id: Valid, time_ticks: i64) -> Option { this_node_identity.agree(&id).map(|static_secret| -> Self { Self { identity: id, @@ -136,7 +136,7 @@ impl Peer { return None; } - fn learn_path(&self, host_system: &HostSystemImpl, new_path: &Arc, time_ticks: i64) { + fn learn_path(&self, app: &Application, new_path: &Arc, time_ticks: i64) { let mut paths = self.paths.lock().unwrap(); // TODO: check path filter @@ -155,7 +155,7 @@ impl Peer { Endpoint::IpUdp(existing_ip) => { if existing_ip.ip_bytes().eq(new_ip.ip_bytes()) { debug_event!( - host_system, + app, "[vl1] {} replacing path {} with {} (same IP, different port)", self.identity.address.to_string(), p.endpoint.to_string(), @@ -163,7 +163,7 @@ impl Peer { ); pi.path = Arc::downgrade(new_path); pi.last_receive_time_ticks = time_ticks; - prioritize_paths::(&mut paths); + prioritize_paths::(&mut paths); return; } } @@ -183,7 +183,7 @@ impl Peer { // Learn new path if it's not a duplicate or should not replace an existing path. debug_event!( - host_system, + app, "[vl1] {} learned new path: {}", self.identity.address.to_string(), new_path.endpoint.to_string() @@ -192,7 +192,7 @@ impl Peer { path: Arc::downgrade(new_path), last_receive_time_ticks: time_ticks, }); - prioritize_paths::(&mut paths); + prioritize_paths::(&mut paths); } /// Get the next sequential message ID for use with the V1 transport protocol. @@ -202,7 +202,7 @@ impl Peer { } /// Called every SERVICE_INTERVAL_MS by the background service loop in Node. - pub(crate) fn service(&self, _: &HostSystemImpl, _: &Node, time_ticks: i64) -> bool { + pub(crate) fn service(&self, _: &Application, _: &Node, time_ticks: i64) -> bool { // Prune dead paths and sort in descending order of quality. { let mut paths = self.paths.lock().unwrap(); @@ -210,7 +210,7 @@ impl Peer { if paths.capacity() > 16 { paths.shrink_to_fit(); } - prioritize_paths::(&mut paths); + prioritize_paths::(&mut paths); } // Prune dead entries from the map of reported local endpoints (e.g. externally visible IPs). @@ -223,19 +223,19 @@ impl Peer { } /// Send a prepared and encrypted packet using the V1 protocol with fragmentation if needed. - fn v1_proto_internal_send( + fn v1_proto_internal_send( &self, - host_system: &HostSystemImpl, + app: &Application, endpoint: &Endpoint, - local_socket: Option<&HostSystemImpl::LocalSocket>, - local_interface: Option<&HostSystemImpl::LocalInterface>, + local_socket: Option<&Application::LocalSocket>, + local_interface: Option<&Application::LocalInterface>, max_fragment_size: usize, packet: PooledPacketBuffer, ) { let packet_size = packet.len(); if packet_size > max_fragment_size { let bytes = packet.as_bytes(); - host_system.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0); + app.wire_send(endpoint, local_socket, local_interface, &bytes[0..UDP_DEFAULT_MTU], 0); let mut pos = UDP_DEFAULT_MTU; let overrun_size = (packet_size - UDP_DEFAULT_MTU) as u32; @@ -259,7 +259,7 @@ impl Peer { let fragment_size = v1::FRAGMENT_HEADER_SIZE + chunk_size; tmp_buf[..v1::FRAGMENT_HEADER_SIZE].copy_from_slice(header.as_bytes()); tmp_buf[v1::FRAGMENT_HEADER_SIZE..fragment_size].copy_from_slice(&bytes[pos..next_pos]); - host_system.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0); + app.wire_send(endpoint, local_socket, local_interface, &tmp_buf[..fragment_size], 0); pos = next_pos; if pos < packet_size { chunk_size = (packet_size - pos).min(UDP_DEFAULT_MTU - v1::HEADER_SIZE); @@ -268,7 +268,7 @@ impl Peer { } } } else { - host_system.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0); + app.wire_send(endpoint, local_socket, local_interface, packet.as_bytes(), 0); } } @@ -281,9 +281,9 @@ impl Peer { /// The builder function must append the verb (with any verb flags) and packet payload. If it returns /// an error, the error is returned immediately and the send is aborted. None is returned if the send /// function itself fails for some reason such as no paths being available. - pub fn send Result>( + pub fn send Result>( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, path: Option<&Arc>, time_ticks: i64, @@ -303,7 +303,7 @@ impl Peer { let max_fragment_size = path.endpoint.max_fragment_size(); - let mut packet = host_system.get_buffer(); + let mut packet = app.get_buffer(); if !self.is_v2() { // For the V1 protocol, leave room for for the header in the buffer. packet.set_size(v1::HEADER_SIZE); @@ -371,10 +371,10 @@ impl Peer { } self.v1_proto_internal_send( - host_system, + app, &path.endpoint, - Some(path.local_socket::()), - Some(path.local_interface::()), + Some(path.local_socket::()), + Some(path.local_interface::()), max_fragment_size, packet, ); @@ -393,9 +393,9 @@ impl Peer { /// Unlike other messages HELLO is sent partially in the clear and always with the long-lived /// static identity key. Authentication in old versions is via Poly1305 and in new versions /// via HMAC-SHA512. - pub(crate) fn send_hello( + pub(crate) fn send_hello( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, explicit_endpoint: Option<&Endpoint>, ) -> bool { @@ -412,9 +412,9 @@ impl Peer { }; let max_fragment_size = destination.max_fragment_size(); - let time_ticks = host_system.time_ticks(); + let time_ticks = app.time_ticks(); - let mut packet = host_system.get_buffer(); + let mut packet = app.get_buffer(); { let message_id = self.v1_proto_next_message_id(); @@ -447,7 +447,7 @@ impl Peer { self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); debug_event!( - host_system, + app, "HELLO -> {} @ {} ({} bytes)", self.identity.address.to_string(), destination.to_string(), @@ -457,16 +457,16 @@ impl Peer { if let Some(p) = path.as_ref() { self.v1_proto_internal_send( - host_system, + app, destination, - Some(p.local_socket::()), - Some(p.local_interface::()), + Some(p.local_socket::()), + Some(p.local_interface::()), max_fragment_size, packet, ); p.log_send_anything(time_ticks); } else { - self.v1_proto_internal_send(host_system, destination, None, None, max_fragment_size, packet); + self.v1_proto_internal_send(app, destination, None, None, max_fragment_size, packet); } return true; @@ -478,11 +478,11 @@ impl Peer { /// those fragments after the main packet header and first chunk. /// /// This returns true if the packet decrypted and passed authentication. - pub(crate) fn v1_proto_receive( + pub(crate) fn v1_proto_receive( self: &Arc, node: &Node, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, time_ticks: i64, source_path: &Arc, packet_header: &v1::PacketHeader, @@ -503,11 +503,7 @@ impl Peer { message_id2 } else { // Packet failed to decrypt using either ephemeral or permanent key, reject. - debug_event!( - host_system, - "[vl1] #{:0>16x} failed authentication", - u64::from_be_bytes(packet_header.id) - ); + debug_event!(app, "[vl1] #{:0>16x} failed authentication", u64::from_be_bytes(packet_header.id)); return PacketHandlerResult::Error; }; @@ -539,7 +535,7 @@ impl Peer { verb &= v1::VERB_MASK; // mask off flags debug_event!( - host_system, + app, "[vl1] #{:0>16x} decrypted and authenticated, verb: {} ({:0>2x})", u64::from_be_bytes(packet_header.id), message_type::name(verb), @@ -548,11 +544,9 @@ impl Peer { return match verb { message_type::VL1_NOP => PacketHandlerResult::Ok, - message_type::VL1_HELLO => { - self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload) - } + message_type::VL1_HELLO => self.handle_incoming_hello(app, inner, node, time_ticks, message_id, source_path, &payload), message_type::VL1_ERROR => self.handle_incoming_error( - host_system, + app, inner, node, time_ticks, @@ -562,7 +556,7 @@ impl Peer { &payload, ), message_type::VL1_OK => self.handle_incoming_ok( - host_system, + app, inner, node, time_ticks, @@ -572,28 +566,16 @@ impl Peer { path_is_known, &payload, ), - message_type::VL1_WHOIS => self.handle_incoming_whois(host_system, inner, node, time_ticks, message_id, &payload), + message_type::VL1_WHOIS => self.handle_incoming_whois(app, inner, node, time_ticks, message_id, &payload), message_type::VL1_RENDEZVOUS => { - self.handle_incoming_rendezvous(host_system, node, time_ticks, message_id, source_path, &payload) + self.handle_incoming_rendezvous(app, node, time_ticks, message_id, source_path, &payload) } - message_type::VL1_ECHO => self.handle_incoming_echo(host_system, inner, node, time_ticks, message_id, &payload), + message_type::VL1_ECHO => self.handle_incoming_echo(app, inner, node, time_ticks, message_id, &payload), message_type::VL1_PUSH_DIRECT_PATHS => { - self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload) + self.handle_incoming_push_direct_paths(app, node, time_ticks, source_path, &payload) } - message_type::VL1_USER_MESSAGE => { - self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload) - } - _ => inner.handle_packet( - host_system, - node, - self, - &source_path, - packet_header.hops(), - message_id, - verb, - &payload, - 1, - ), + message_type::VL1_USER_MESSAGE => self.handle_incoming_user_message(app, node, time_ticks, source_path, &payload), + _ => inner.handle_packet(app, node, self, &source_path, packet_header.hops(), message_id, verb, &payload, 1), }; } } @@ -601,19 +583,19 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_hello( + fn handle_incoming_hello( &self, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, time_ticks: i64, message_id: MessageId, source_path: &Arc, payload: &PacketBuffer, ) -> PacketHandlerResult { - if !(host_system.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { + if !(inner.should_respond_to(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { debug_event!( - host_system, + app, "[vl1] dropping HELLO from {} due to lack of trust relationship", self.identity.address.to_string() ); @@ -634,25 +616,19 @@ impl Peer { ); } - self.send( - host_system, - node, - Some(source_path), - time_ticks, - |packet| -> Result<(), Infallible> { - let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) = - packet.append_struct_get_mut().unwrap(); - f.0.verb = message_type::VL1_OK; - f.0.in_re_verb = message_type::VL1_HELLO; - f.0.in_re_message_id = message_id.to_ne_bytes(); - f.1.timestamp_echo = hello_fixed_headers.timestamp; - f.1.version_proto = PROTOCOL_VERSION; - f.1.version_major = VERSION_MAJOR; - f.1.version_minor = VERSION_MINOR; - f.1.version_revision = VERSION_REVISION.to_be_bytes(); - Ok(()) - }, - ); + self.send(app, node, Some(source_path), time_ticks, |packet| -> Result<(), Infallible> { + let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) = + packet.append_struct_get_mut().unwrap(); + f.0.verb = message_type::VL1_OK; + f.0.in_re_verb = message_type::VL1_HELLO; + f.0.in_re_message_id = message_id.to_ne_bytes(); + f.1.timestamp_echo = hello_fixed_headers.timestamp; + f.1.version_proto = PROTOCOL_VERSION; + f.1.version_major = VERSION_MAJOR; + f.1.version_minor = VERSION_MINOR; + f.1.version_revision = VERSION_REVISION.to_be_bytes(); + Ok(()) + }); return PacketHandlerResult::Ok; } @@ -662,10 +638,10 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_error( + fn handle_incoming_error( self: &Arc, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, _time_ticks: i64, source_path: &Arc, @@ -682,7 +658,7 @@ impl Peer { match error_header.in_re_verb { _ => { return inner.handle_error( - host_system, + app, node, self, &source_path, @@ -700,10 +676,10 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_ok( + fn handle_incoming_ok( self: &Arc, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, time_ticks: i64, source_path: &Arc, @@ -724,7 +700,7 @@ impl Peer { payload.read_struct::(&mut cursor) { if source_hops == 0 { - debug_event!(host_system, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),); + debug_event!(app, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),); if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) { #[cfg(debug_assertions)] let reported_endpoint2 = reported_endpoint.clone(); @@ -738,7 +714,7 @@ impl Peer { { #[cfg(debug_assertions)] debug_event!( - host_system, + app, "[vl1] {} reported new remote perspective, local endpoint: {}", self.identity.address.to_string(), reported_endpoint2.to_string() @@ -748,7 +724,7 @@ impl Peer { } if source_hops == 0 && !path_is_known { - self.learn_path(host_system, source_path, time_ticks); + self.learn_path(app, source_path, time_ticks); } self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed); @@ -756,21 +732,21 @@ impl Peer { } message_type::VL1_WHOIS => { - debug_event!(host_system, "[vl1] OK(WHOIS)"); + debug_event!(app, "[vl1] OK(WHOIS)"); if node.is_peer_root(self) { while cursor < payload.len() { let r = Identity::unmarshal(payload, &mut cursor); if let Ok(received_identity) = r { debug_event!( - host_system, + app, "[vl1] {} OK(WHOIS): received identity: {}", self.identity.address.to_string(), received_identity.to_string() ); - node.handle_incoming_identity(host_system, inner, received_identity, time_ticks, true); + node.handle_incoming_identity(app, inner, received_identity, time_ticks, true); } else { debug_event!( - host_system, + app, "[vl1] {} OK(WHOIS): received bad identity: {}", self.identity.address.to_string(), r.err().unwrap().to_string() @@ -785,7 +761,7 @@ impl Peer { _ => { return inner.handle_ok( - host_system, + app, node, self, &source_path, @@ -802,20 +778,20 @@ impl Peer { return PacketHandlerResult::Error; } - fn handle_incoming_whois( + fn handle_incoming_whois( self: &Arc, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, ) -> PacketHandlerResult { - if node.this_node_is_root() || host_system.should_respond_to(&self.identity) { + if node.this_node_is_root() || inner.should_respond_to(&self.identity) { let mut addresses = payload.as_bytes(); while addresses.len() >= ADDRESS_SIZE { if !self - .send(host_system, node, None, time_ticks, |packet| { + .send(app, node, None, time_ticks, |packet| { while addresses.len() >= ADDRESS_SIZE && (packet.len() + Identity::MAX_MARSHAL_SIZE) <= UDP_DEFAULT_MTU { if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) { if let Some(peer) = node.peer(zt_address) { @@ -835,9 +811,9 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_rendezvous( + fn handle_incoming_rendezvous( self: &Arc, - host_system: &HostSystemImpl, + app: &Application, node: &Node, time_ticks: i64, message_id: MessageId, @@ -848,17 +824,17 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_echo( + fn handle_incoming_echo( &self, - host_system: &HostSystemImpl, - inner: &InnerProtocolImpl, + app: &Application, + inner: &Inner, node: &Node, time_ticks: i64, message_id: MessageId, payload: &PacketBuffer, ) -> PacketHandlerResult { - if host_system.should_respond_to(&self.identity) || node.is_peer_root(self) { - self.send(host_system, node, None, time_ticks, |packet| { + if inner.should_respond_to(&self.identity) || node.is_peer_root(self) { + self.send(app, node, None, time_ticks, |packet| { let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap(); f.verb = message_type::VL1_OK; f.in_re_verb = message_type::VL1_ECHO; @@ -867,7 +843,7 @@ impl Peer { }); } else { debug_event!( - host_system, + app, "[vl1] dropping ECHO from {} due to lack of trust relationship", self.identity.address.to_string() ); @@ -875,9 +851,9 @@ impl Peer { return PacketHandlerResult::Ok; } - fn handle_incoming_push_direct_paths( + fn handle_incoming_push_direct_paths( self: &Arc, - host_system: &HostSystemImpl, + app: &Application, node: &Node, time_ticks: i64, source_path: &Arc, @@ -886,9 +862,9 @@ impl Peer { PacketHandlerResult::Ok } - fn handle_incoming_user_message( + fn handle_incoming_user_message( self: &Arc, - host_system: &HostSystemImpl, + app: &Application, node: &Node, time_ticks: i64, source_path: &Arc, diff --git a/network-hypervisor/src/vl1/rootset.rs b/network-hypervisor/src/vl1/rootset.rs index 0647f2c37..56d60c0bb 100644 --- a/network-hypervisor/src/vl1/rootset.rs +++ b/network-hypervisor/src/vl1/rootset.rs @@ -6,7 +6,7 @@ use std::io::Write; use crate::vl1::identity::{Identity, IDENTITY_MAX_SIGNATURE_SIZE}; use crate::vl1::Endpoint; -use zerotier_crypto::verified::Verified; +use zerotier_crypto::typestate::Valid; use zerotier_utils::arrayvec::ArrayVec; use zerotier_utils::buffer::Buffer; use zerotier_utils::marshalable::{Marshalable, UnmarshalError}; @@ -91,7 +91,7 @@ impl RootSet { } /// Get the ZeroTier default root set, which contains roots run by ZeroTier Inc. - pub fn zerotier_default() -> Verified { + pub fn zerotier_default() -> Valid { let mut cursor = 0; let rs = include_bytes!("../../default-rootset/root.zerotier.com.bin"); //let rs = include_bytes!("../../default-rootset/test-root.bin"); @@ -107,7 +107,7 @@ impl RootSet { } /// Verify signatures present in this root cluster definition. - pub fn verify(self) -> Option> { + pub fn verify(self) -> Option> { if self.members.is_empty() { return None; } @@ -119,7 +119,7 @@ impl RootSet { } } - return Some(Verified::assume_verified(self)); + return Some(Valid::assume_verified(self)); } /// Add a member to this definition, replacing any current entry with this address. diff --git a/network-hypervisor/src/vl2/multicastauthority.rs b/network-hypervisor/src/vl2/multicastauthority.rs index 8100d9ced..c253ca4cc 100644 --- a/network-hypervisor/src/vl2/multicastauthority.rs +++ b/network-hypervisor/src/vl2/multicastauthority.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex, RwLock}; use crate::protocol; use crate::protocol::PacketBuffer; -use crate::vl1::{Address, HostSystem, Identity, Node, PacketHandlerResult, Peer, MAC}; +use crate::vl1::{Address, ApplicationLayer, Identity, Node, PacketHandlerResult, Peer, MAC}; use crate::vl2::{MulticastGroup, NetworkId}; use zerotier_utils::buffer::OutOfBoundsError; @@ -84,7 +84,7 @@ impl MulticastAuthority { } /// Call for VL2_MULTICAST_GATHER packets. - pub fn handle_vl2_multicast_gather bool>( + pub fn handle_vl2_multicast_gather bool>( &self, auth: Authenticator, time_ticks: i64, diff --git a/network-hypervisor/src/vl2/revocation.rs b/network-hypervisor/src/vl2/revocation.rs index 484de1f6b..21c7595f9 100644 --- a/network-hypervisor/src/vl2/revocation.rs +++ b/network-hypervisor/src/vl2/revocation.rs @@ -1,6 +1,6 @@ use std::io::Write; -use zerotier_crypto::verified::Verified; +use zerotier_crypto::typestate::Valid; use zerotier_utils::arrayvec::ArrayVec; use serde::{Deserialize, Serialize}; @@ -26,7 +26,7 @@ impl Revocation { threshold: i64, target: Address, issued_to: Address, - signer: &Verified, + signer: &Valid, fast_propagate: bool, ) -> Option { let mut r = Self { diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index bfc98101d..8b66ee4c2 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -3,17 +3,25 @@ use std::sync::Arc; use crate::protocol::PacketBuffer; -use crate::vl1::{HostSystem, InnerProtocol, Node, PacketHandlerResult, Path, Peer}; +use crate::vl1::{ApplicationLayer, InnerProtocolLayer, Node, PacketHandlerResult, Path, Peer}; pub trait SwitchInterface: Sync + Send {} pub struct Switch {} #[allow(unused_variables)] -impl InnerProtocol for Switch { - fn handle_packet( +impl InnerProtocolLayer for Switch { + fn should_respond_to(&self, id: &zerotier_crypto::typestate::Valid) -> bool { + true + } + + fn has_trust_relationship(&self, id: &zerotier_crypto::typestate::Valid) -> bool { + true + } + + fn handle_packet( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -26,9 +34,9 @@ impl InnerProtocol for Switch { PacketHandlerResult::NotHandled } - fn handle_error( + fn handle_error( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, @@ -43,9 +51,9 @@ impl InnerProtocol for Switch { PacketHandlerResult::NotHandled } - fn handle_ok( + fn handle_ok( &self, - host_system: &HostSystemImpl, + app: &Application, node: &Node, source: &Arc, source_path: &Arc, diff --git a/network-hypervisor/src/vl2/v1/certificateofmembership.rs b/network-hypervisor/src/vl2/v1/certificateofmembership.rs index 7c0261088..36aa705cb 100644 --- a/network-hypervisor/src/vl2/v1/certificateofmembership.rs +++ b/network-hypervisor/src/vl2/v1/certificateofmembership.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use zerotier_crypto::hash::SHA384; use zerotier_crypto::secure_eq; -use zerotier_crypto::verified::Verified; +use zerotier_crypto::typestate::Valid; use zerotier_utils::arrayvec::ArrayVec; use zerotier_utils::blob::Blob; use zerotier_utils::error::InvalidParameterError; @@ -171,10 +171,13 @@ impl CertificateOfMembership { } /// Verify this certificate of membership. - pub fn verify(self, issuer: &Identity, expect_issued_to: &Identity) -> Option> { - if secure_eq(&Self::v1_proto_issued_to_fingerprint(expect_issued_to), &self.issued_to_fingerprint.as_bytes()[..32]) { + pub fn verify(self, issuer: &Identity, expect_issued_to: &Identity) -> Option> { + if secure_eq( + &Self::v1_proto_issued_to_fingerprint(expect_issued_to), + &self.issued_to_fingerprint.as_bytes()[..32], + ) { if issuer.verify(&self.v1_proto_get_qualifier_bytes(), self.signature.as_bytes()) { - return Some(Verified::assume_verified(self)); + return Some(Valid::assume_verified(self)); } } return None; diff --git a/service/src/main.rs b/service/src/main.rs index b9221c9d9..52fe57242 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -15,6 +15,7 @@ use clap::error::{ContextKind, ContextValue}; #[allow(unused_imports)] use clap::{Arg, ArgMatches, Command}; +use zerotier_network_hypervisor::vl1::InnerProtocolLayer; use zerotier_network_hypervisor::{VERSION_MAJOR, VERSION_MINOR, VERSION_REVISION}; use zerotier_utils::exitcode; use zerotier_vl1_service::datadir::DataDir; @@ -209,14 +210,9 @@ fn main() { Some(("service", _)) => { drop(global_args); // free unnecessary heap before starting service as we're done with CLI args if let Ok(_tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() { - let test_inner = Arc::new(zerotier_network_hypervisor::vl1::DummyInnerProtocol::default()); + let test_inner = Arc::new(DummyInnerLayer); let datadir = open_datadir(&flags); - let svc = VL1Service::new( - datadir, - test_inner.clone(), - test_inner, - zerotier_vl1_service::VL1Settings::default(), - ); + let svc = VL1Service::new(datadir, test_inner, zerotier_vl1_service::VL1Settings::default()); if svc.is_ok() { let svc = svc.unwrap(); svc.node().init_default_roots(); @@ -254,3 +250,7 @@ fn main() { std::process::exit(exit_code); } + +struct DummyInnerLayer; + +impl InnerProtocolLayer for DummyInnerLayer {} diff --git a/vl1-service/src/datadir.rs b/vl1-service/src/datadir.rs index 27fc0cc33..6bce2a160 100644 --- a/vl1-service/src/datadir.rs +++ b/vl1-service/src/datadir.rs @@ -8,10 +8,12 @@ use serde::de::DeserializeOwned; use serde::Serialize; use zerotier_crypto::random::next_u32_secure; -use zerotier_network_hypervisor::vl1::{Identity, NodeStorage, Verified}; +use zerotier_network_hypervisor::vl1::{Identity, Valid}; use zerotier_utils::io::{fs_restrict_permissions, read_limit, DEFAULT_FILE_IO_READ_LIMIT}; use zerotier_utils::json::to_json_pretty; +use crate::vl1service::VL1DataStorage; + pub const AUTH_TOKEN_FILENAME: &'static str = "authtoken.secret"; pub const IDENTITY_PUBLIC_FILENAME: &'static str = "identity.public"; pub const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret"; @@ -20,34 +22,43 @@ pub const CONFIG_FILENAME: &'static str = "local.conf"; const AUTH_TOKEN_DEFAULT_LENGTH: usize = 48; const AUTH_TOKEN_POSSIBLE_CHARS: &'static str = "0123456789abcdefghijklmnopqrstuvwxyz"; +pub fn load_node_identity(base_path: &Path) -> Option> { + let id_data = read_limit(base_path.join(IDENTITY_SECRET_FILENAME), 4096); + if id_data.is_err() { + return None; + } + let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref()); + if id_data.is_err() { + return None; + } + Some(Valid::assume_verified(id_data.unwrap())) +} + +pub fn save_node_identity(base_path: &Path, id: &Valid) -> bool { + assert!(id.secret.is_some()); + let id_secret_str = id.to_secret_string(); + let id_public_str = id.to_string(); + let secret_path = base_path.join(IDENTITY_SECRET_FILENAME); + if std::fs::write(&secret_path, id_secret_str.as_bytes()).is_err() { + return false; + } + assert!(fs_restrict_permissions(&secret_path)); + return std::fs::write(base_path.join(IDENTITY_PUBLIC_FILENAME), id_public_str.as_bytes()).is_ok(); +} + pub struct DataDir { pub base_path: PathBuf, config: RwLock>, authtoken: Mutex, } -impl NodeStorage for DataDir { - fn load_node_identity(&self) -> Option> { - let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096); - if id_data.is_err() { - return None; - } - let id_data = Identity::from_str(String::from_utf8_lossy(id_data.unwrap().as_slice()).as_ref()); - if id_data.is_err() { - return None; - } - Some(Verified::assume_verified(id_data.unwrap())) +impl VL1DataStorage for DataDir { + fn load_node_identity(&self) -> Option> { + load_node_identity(self.base_path.as_path()) } - fn save_node_identity(&self, id: &Verified) { - assert!(id.secret.is_some()); - let id_secret_str = id.to_secret_string(); - let id_public_str = id.to_string(); - let secret_path = self.base_path.join(IDENTITY_SECRET_FILENAME); - // TODO: handle errors - let _ = std::fs::write(&secret_path, id_secret_str.as_bytes()); - assert!(fs_restrict_permissions(&secret_path)); - let _ = std::fs::write(self.base_path.join(IDENTITY_PUBLIC_FILENAME), id_public_str.as_bytes()); + fn save_node_identity(&self, id: &Valid) -> bool { + save_node_identity(self.base_path.as_path(), id) } } diff --git a/vl1-service/src/localinterface.rs b/vl1-service/src/localinterface.rs index 758f1bd57..00277c1e1 100644 --- a/vl1-service/src/localinterface.rs +++ b/vl1-service/src/localinterface.rs @@ -13,21 +13,22 @@ use std::hash::Hash; pub struct LocalInterface(u128); impl LocalInterface { + #[inline] #[cfg(unix)] pub fn from_unix_interface_name(name: &str) -> Self { - let mut tmp = [0_u8; 16]; + let mut tmp = 0u128.to_ne_bytes(); let nb = name.as_bytes(); let l = nb.len(); assert!(l <= 16); // do any *nix OSes have device names longer than 16 bytes? tmp[..l].copy_from_slice(&nb[..l]); - Self(u128::from_be_bytes(tmp)) + Self(u128::from_ne_bytes(tmp)) } } impl ToString for LocalInterface { #[cfg(unix)] fn to_string(&self) -> String { - let b = self.0.to_be_bytes(); + let b = self.0.to_ne_bytes(); let mut l = 0; for bb in b.iter() { if *bb > 0 { @@ -41,6 +42,6 @@ impl ToString for LocalInterface { #[cfg(windows)] fn to_string(&self) -> String { - zerotier_core_crypto::hex::to_string(&self.0.to_be_bytes()) + zerotier_core_crypto::hex::to_string(&self.0.to_ne_bytes()) } } diff --git a/vl1-service/src/localsocket.rs b/vl1-service/src/localsocket.rs index 0b17fe8b0..4d178c02c 100644 --- a/vl1-service/src/localsocket.rs +++ b/vl1-service/src/localsocket.rs @@ -1,33 +1,30 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently proprietary pending actual release and licensing. See LICENSE.md. use std::hash::Hash; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Weak}; use crate::sys::udp::BoundUdpSocket; -static LOCAL_SOCKET_UNIQUE_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); - /// Local socket wrapper to provide to the core. /// -/// This implements very fast hash and equality in terms of an arbitrary unique ID assigned at -/// construction and holds a weak reference to the bound socket so dead sockets will silently -/// cease to exist or work. This also means that this code can check the weak count to determine -/// if the core is currently holding/using a socket for any reason. +/// This wraps a bound UDP socket in weak form so sockets that are released by the UDP +/// binding engine can be "garbage collected" by the core. +#[repr(transparent)] #[derive(Clone)] -pub struct LocalSocket(pub(crate) Weak, usize); +pub struct LocalSocket(Weak); impl LocalSocket { + #[inline] pub fn new(s: &Arc) -> Self { - Self(Arc::downgrade(s), LOCAL_SOCKET_UNIQUE_ID_COUNTER.fetch_add(1, Ordering::SeqCst)) + Self(Arc::downgrade(s)) } - #[inline(always)] + #[inline] pub fn is_valid(&self) -> bool { self.0.strong_count() > 0 } - #[inline(always)] + #[inline] pub fn socket(&self) -> Option> { self.0.upgrade() } @@ -36,7 +33,7 @@ impl LocalSocket { impl PartialEq for LocalSocket { #[inline(always)] fn eq(&self, other: &Self) -> bool { - self.1 == other.1 + self.0.ptr_eq(&other.0) } } @@ -45,7 +42,7 @@ impl Eq for LocalSocket {} impl Hash for LocalSocket { #[inline(always)] fn hash(&self, state: &mut H) { - self.1.hash(state) + self.0.as_ptr().hash(state) } } diff --git a/vl1-service/src/sys/udp.rs b/vl1-service/src/sys/udp.rs index 5543af64a..ba8d3a949 100644 --- a/vl1-service/src/sys/udp.rs +++ b/vl1-service/src/sys/udp.rs @@ -42,7 +42,7 @@ fn socket_read_concurrency() -> usize { } } -pub trait UdpPacketHandler: Send + Sync + 'static { +pub trait UdpPacketHandler: Send + Sync { fn incoming_udp_packet( self: &Arc, time_ticks: i64, @@ -189,7 +189,7 @@ impl BoundUdpPort { /// The caller can check the 'sockets' member variable after calling to determine which if any bindings were /// successful. Any errors that occurred are returned as tuples of (interface, address, error). The second vector /// returned contains newly bound sockets. - pub fn update_bindings( + pub fn update_bindings( &mut self, interface_prefix_blacklist: &HashSet, cidr_blacklist: &HashSet, diff --git a/vl1-service/src/vl1service.rs b/vl1-service/src/vl1service.rs index cca43e675..e6ed37682 100644 --- a/vl1-service/src/vl1service.rs +++ b/vl1-service/src/vl1service.rs @@ -20,21 +20,22 @@ use crate::LocalSocket; /// Update UDP bindings every this many seconds. const UPDATE_UDP_BINDINGS_EVERY_SECS: usize = 10; +/// Trait to implement to provide storage for VL1-related state information. +pub trait VL1DataStorage: Sync + Send { + fn load_node_identity(&self) -> Option>; + fn save_node_identity(&self, id: &Valid) -> bool; +} + /// VL1 service that connects to the physical network and hosts an inner protocol like ZeroTier VL2. /// /// This is the "outward facing" half of a full ZeroTier stack on a normal system. It binds sockets, /// talks to the physical network, manages the vl1 node, and presents a templated interface for /// whatever inner protocol implementation is using it. This would typically be VL2 but could be /// a test harness or just the controller for a controller that runs stand-alone. -pub struct VL1Service< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, -> { +pub struct VL1Service { state: RwLock, - storage: Arc, - vl1_auth_provider: Arc, - inner: Arc, + vl1_data_storage: Arc, + inner: Arc, buffer_pool: Arc, node_container: Option, // never None, set in new() } @@ -46,18 +47,8 @@ struct VL1ServiceMutableState { running: bool, } -impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > VL1Service -{ - pub fn new( - storage: Arc, - vl1_auth_provider: Arc, - inner: Arc, - settings: VL1Settings, - ) -> Result, Box> { +impl VL1Service { + pub fn new(vl1_data_storage: Arc, inner: Arc, settings: VL1Settings) -> Result, Box> { let mut service = Self { state: RwLock::new(VL1ServiceMutableState { daemons: Vec::with_capacity(2), @@ -65,8 +56,7 @@ impl< settings, running: true, }), - storage, - vl1_auth_provider, + vl1_data_storage, inner, buffer_pool: Arc::new(PacketBufferPool::new( std::thread::available_parallelism().map_or(2, |c| c.get() + 2), @@ -189,12 +179,7 @@ impl< } } -impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > UdpPacketHandler for VL1Service -{ +impl UdpPacketHandler for VL1Service { #[inline(always)] fn incoming_udp_packet( self: &Arc, @@ -215,16 +200,11 @@ impl< } } -impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > HostSystem for VL1Service -{ - type Storage = NodeStorageImpl; +impl ApplicationLayer for VL1Service { type LocalSocket = crate::LocalSocket; type LocalInterface = crate::LocalInterface; + #[inline] fn event(&self, event: Event) { println!("{}", event.to_string()); match event { @@ -237,9 +217,14 @@ impl< socket.is_valid() } - #[inline(always)] - fn storage(&self) -> &Self::Storage { - self.storage.as_ref() + #[inline] + fn load_node_identity(&self) -> Option> { + self.vl1_data_storage.load_node_identity() + } + + #[inline] + fn save_node_identity(&self, id: &Valid) -> bool { + self.vl1_data_storage.save_node_identity(id) } #[inline] @@ -247,6 +232,7 @@ impl< self.buffer_pool.get() } + #[inline] fn wire_send( &self, endpoint: &Endpoint, @@ -259,7 +245,7 @@ impl< Endpoint::IpUdp(address) => { // This is the fast path -- the socket is known to the core so just send it. if let Some(s) = local_socket { - if let Some(s) = s.0.upgrade() { + if let Some(s) = s.socket() { s.send(address, data, packet_ttl); } else { return; @@ -321,46 +307,7 @@ impl< } } -impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > NodeStorage for VL1Service -{ - #[inline(always)] - fn load_node_identity(&self) -> Option> { - self.storage.load_node_identity() - } - - #[inline(always)] - fn save_node_identity(&self, id: &Verified) { - self.storage.save_node_identity(id) - } -} - -impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > VL1AuthProvider for VL1Service -{ - #[inline(always)] - fn should_respond_to(&self, id: &Verified) -> bool { - self.vl1_auth_provider.should_respond_to(id) - } - - #[inline(always)] - fn has_trust_relationship(&self, id: &Verified) -> bool { - self.vl1_auth_provider.has_trust_relationship(id) - } -} - -impl< - NodeStorageImpl: NodeStorage + ?Sized + 'static, - VL1AuthProviderImpl: VL1AuthProvider + ?Sized + 'static, - InnerProtocolImpl: InnerProtocol + ?Sized + 'static, - > Drop for VL1Service -{ +impl Drop for VL1Service { fn drop(&mut self) { let mut state = self.state.write().unwrap(); state.running = false; diff --git a/zssp/src/zssp.rs b/zssp/src/zssp.rs index a788733a9..b34184dd0 100644 --- a/zssp/src/zssp.rs +++ b/zssp/src/zssp.rs @@ -83,7 +83,7 @@ pub enum ReceiveResult<'a, H: ApplicationLayer> { /// Packet is valid and a new session was created. /// - /// The session will have already been gated by the `accept_new_session()` method in the Host trait. + /// The session will have already been gated by the accept_new_session() method in ApplicationLayer. OkNewSession(Session), /// Packet superficially appears valid but was ignored e.g. as a duplicate.