diff --git a/controller/src/database.rs b/controller/src/database.rs index abc032436..f5ee9dd6f 100644 --- a/controller/src/database.rs +++ b/controller/src/database.rs @@ -1,5 +1,3 @@ -use std::error::Error; - use async_trait::async_trait; use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorage}; @@ -21,12 +19,14 @@ pub enum Change { #[async_trait] pub trait Database: Sync + Send + NodeStorage + 'static { - async fn get_network(&self, id: NetworkId) -> Result, Box>; - async fn save_network(&self, obj: Network) -> Result<(), Box>; + type Error: std::error::Error + Send + 'static; - async fn list_members(&self, network_id: NetworkId) -> Result, Box>; - async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Box>; - async fn save_member(&self, obj: Member) -> Result<(), Box>; + async fn get_network(&self, id: NetworkId) -> Result, Self::Error>; + async fn save_network(&self, obj: Network) -> Result<(), Self::Error>; + + async fn list_members(&self, network_id: NetworkId) -> Result, Self::Error>; + async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Self::Error>; + async fn save_member(&self, obj: Member) -> Result<(), Self::Error>; /// Get a receiver that can be used to receive changes made to networks and members, if supported. /// @@ -47,7 +47,7 @@ pub trait Database: Sync + Send + NodeStorage + 'static { /// /// The default trait implementation uses a brute force method. This should be reimplemented if a /// more efficient way is available. - async fn list_members_deauthorized_after(&self, network_id: NetworkId, cutoff: i64) -> Result, Box> { + async fn list_members_deauthorized_after(&self, network_id: NetworkId, cutoff: i64) -> Result, Self::Error> { let mut v = Vec::new(); let members = self.list_members(network_id).await?; for a in members.iter() { @@ -64,7 +64,7 @@ pub trait Database: Sync + Send + NodeStorage + 'static { /// /// The default trait implementation uses a brute force method. This should be reimplemented if a /// more efficient way is available. - async fn is_ip_assigned(&self, network_id: NetworkId, ip: &InetAddress) -> Result> { + async fn is_ip_assigned(&self, network_id: NetworkId, ip: &InetAddress) -> Result { let members = self.list_members(network_id).await?; for a in members.iter() { if let Some(m) = self.get_member(network_id, *a).await? { @@ -76,5 +76,5 @@ pub trait Database: Sync + Send + NodeStorage + 'static { return Ok(false); } - async fn log_request(&self, obj: &RequestLogItem) -> Result<(), Box>; + async fn log_request(&self, obj: RequestLogItem) -> Result<(), Self::Error>; } diff --git a/controller/src/filedatabase.rs b/controller/src/filedatabase.rs index 298516960..9d1860512 100644 --- a/controller/src/filedatabase.rs +++ b/controller/src/filedatabase.rs @@ -1,4 +1,5 @@ use std::error::Error; +use std::fmt::Display; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; @@ -16,6 +17,35 @@ use crate::model::*; const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret"; +#[derive(Debug)] +pub enum FileDatabaseError { + InvalidYaml(String), + IoError(String), +} + +impl Display for FileDatabaseError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidYaml(e) => f.write_str(format!("invalid YAML ({})", e).as_str()), + Self::IoError(e) => f.write_str(format!("I/O error ({})", e).as_str()), + } + } +} + +impl Error for FileDatabaseError {} + +impl From for FileDatabaseError { + fn from(e: serde_yaml::Error) -> Self { + Self::InvalidYaml(e.to_string()) + } +} + +impl From for FileDatabaseError { + fn from(e: zerotier_utils::tokio::io::Error) -> Self { + Self::IoError(e.to_string()) + } +} + /// An in-filesystem database that permits live editing. /// /// A cache is maintained that contains the actual objects. When an object is live edited, @@ -84,7 +114,9 @@ impl NodeStorage for FileDatabase { #[async_trait] impl Database for FileDatabase { - async fn get_network(&self, id: NetworkId) -> Result, Box> { + type Error = FileDatabaseError; + + async fn get_network(&self, id: NetworkId) -> Result, Self::Error> { let r = fs::read(self.network_path(id)).await; if let Ok(raw) = r { let mut network = serde_yaml::from_slice::(raw.as_slice())?; @@ -97,7 +129,7 @@ impl Database for FileDatabase { } } - async fn save_network(&self, obj: Network) -> Result<(), Box> { + async fn save_network(&self, obj: Network) -> Result<(), Self::Error> { let base_network_path = self.network_path(obj.id); let _ = fs::create_dir_all(base_network_path.parent().unwrap()).await; //let _ = fs::write(base_network_path, to_json_pretty(&obj).as_bytes()).await?; @@ -105,7 +137,7 @@ impl Database for FileDatabase { return Ok(()); } - async fn list_members(&self, network_id: NetworkId) -> Result, Box> { + async fn list_members(&self, network_id: NetworkId) -> Result, Self::Error> { let mut members = Vec::new(); let mut dir = fs::read_dir(self.base_path.join(network_id.to_string())).await?; while let Ok(Some(ent)) = dir.next_entry().await { @@ -125,7 +157,7 @@ impl Database for FileDatabase { Ok(members) } - async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Box> { + async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Self::Error> { let r = fs::read(self.member_path(network_id, node_id)).await; if let Ok(raw) = r { let mut member = serde_yaml::from_slice::(raw.as_slice())?; @@ -138,7 +170,7 @@ impl Database for FileDatabase { } } - async fn save_member(&self, obj: Member) -> Result<(), Box> { + async fn save_member(&self, obj: Member) -> Result<(), Self::Error> { let base_member_path = self.member_path(obj.network_id, obj.node_id); let _ = fs::create_dir_all(base_member_path.parent().unwrap()).await; //let _ = fs::write(base_member_path, to_json_pretty(&obj).as_bytes()).await?; @@ -146,7 +178,7 @@ impl Database for FileDatabase { Ok(()) } - async fn log_request(&self, obj: &RequestLogItem) -> Result<(), Box> { + async fn log_request(&self, obj: RequestLogItem) -> Result<(), Self::Error> { println!("{}", obj.to_string()); Ok(()) } diff --git a/controller/src/handler.rs b/controller/src/handler.rs index 29b8d6bc0..4aed8161c 100644 --- a/controller/src/handler.rs +++ b/controller/src/handler.rs @@ -1,22 +1,26 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. +use std::any::Any; use std::error::Error; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use tokio::time::{Duration, Instant}; -use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer, DEFAULT_MULTICAST_LIMIT, ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU}; +use zerotier_network_hypervisor::protocol; +use zerotier_network_hypervisor::protocol::{PacketBuffer, DEFAULT_MULTICAST_LIMIT, ZEROTIER_VIRTUAL_NETWORK_DEFAULT_MTU}; use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, Node, PacketHandlerResult, Path, PathFilter, Peer}; use zerotier_network_hypervisor::vl2::{CertificateOfMembership, CertificateOfOwnership, NetworkConfig, NetworkId, Tag}; +use zerotier_utils::blob::Blob; +use zerotier_utils::buffer::OutOfBoundsError; use zerotier_utils::dictionary::Dictionary; -use zerotier_utils::error::{InvalidParameterError, UnexpectedError}; -use zerotier_utils::ms_since_epoch; +use zerotier_utils::error::InvalidParameterError; use zerotier_utils::reaper::Reaper; use zerotier_utils::tokio; +use zerotier_utils::{ms_monotonic, ms_since_epoch}; use zerotier_vl1_service::VL1Service; use crate::database::*; -use crate::model::{AuthorizationResult, Member, CREDENTIAL_WINDOW_SIZE_DEFAULT}; +use crate::model::{AuthorizationResult, Member, RequestLogItem, CREDENTIAL_WINDOW_SIZE_DEFAULT}; // A netconf per-query task timeout, just a sanity limit. const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); @@ -27,6 +31,7 @@ pub struct Handler { } struct Inner { + service: RwLock, Handler>>>, reaper: Reaper, daemons: Mutex>>, // drop() aborts these runtime: tokio::runtime::Handle, @@ -41,6 +46,7 @@ impl Handler { assert!(local_identity.secret.is_some()); let inner = Arc::new(Inner:: { + service: RwLock::new(Weak::default()), reaper: Reaper::new(&runtime), daemons: Mutex::new(Vec::with_capacity(1)), runtime, @@ -58,19 +64,29 @@ impl Handler { } } + /// Set the service and HostSystem implementation for this controller. + /// + /// This must be called once the service that uses this handler is up or the controller + /// won't actually do anything. The reference the handler holds is weak to prevent + /// a circular reference, so if the VL1Service is dropped this must be called again to + /// tell the controller handler about a new instance. + pub fn set_service(&self, service: &Arc>) { + *self.inner.service.write().unwrap() = Arc::downgrade(service); + } + /// Start a change watcher to respond to changes detected by the database. - /// This should only be called once, though multiple calls won't do anything but create unnecessary async tasks. - pub async fn start_change_watcher(&self, service: &Arc>) { + /// + /// This should only be called once, though multiple calls won't do anything but create + /// unnecessary async tasks. If the database being used does not support changes, this + /// does nothing. + pub async fn start_change_watcher(&self) { if let Some(cw) = self.inner.database.changes().await.map(|mut ch| { let inner = self.inner.clone(); - let service = service.clone(); self.inner.runtime.spawn(async move { loop { if let Ok(change) = ch.recv().await { inner.reaper.add( - inner - .runtime - .spawn(inner.clone().handle_change_notification(service.clone(), change)), + inner.runtime.spawn(inner.clone().handle_change_notification(change)), Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), ); } @@ -110,15 +126,17 @@ impl PathFilter for Handler { impl InnerProtocol for Handler { fn handle_packet( &self, + _host_system: &HostSystemImpl, _node: &Node, source: &Arc>, source_path: &Arc>, + source_hops: u8, message_id: u64, verb: u8, payload: &PacketBuffer, ) -> PacketHandlerResult { match verb { - verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => { + protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST => { let mut cursor = 1; let network_id = payload.read_u64(&mut cursor); @@ -149,6 +167,7 @@ impl InnerProtocol for Handler { Dictionary::new() }; + /* let (have_revision, have_timestamp) = if (cursor + 16) <= payload.len() { let r = payload.read_u64(&mut cursor); let t = payload.read_u64(&mut cursor); @@ -159,21 +178,49 @@ impl InnerProtocol for Handler { } else { (None, None) }; + */ // Launch handler as an async background task. - let (inner, source2, source_path2) = (self.inner.clone(), source.clone(), source_path.clone()); + let (inner, peer, source_remote_endpoint) = (self.inner.clone(), source.clone(), source_path.endpoint.clone()); self.inner.reaper.add( self.inner.runtime.spawn(async move { - // TODO: log errors - let result = inner.handle_network_config_request( - source2, - source_path2, - message_id, - network_id, - meta_data, - have_revision, - have_timestamp, - ); + let node_id = peer.identity.address; + let node_fingerprint = Blob::from(peer.identity.fingerprint); + let now = ms_since_epoch(); + + let result = match inner + .handle_network_config_request::(&peer.identity, network_id, &meta_data, now) + .await + { + Result::Ok((result, Some(config))) => { + inner.send_network_config(peer, &config, Some(message_id)); + result + } + Result::Ok((result, None)) => result, + Result::Err(_) => { + // TODO: log invalid request or internal error + return; + } + }; + + let _ = inner + .database + .log_request(RequestLogItem { + network_id, + node_id, + node_fingerprint, + controller_node_id: inner.local_identity.address, + metadata: if meta_data.is_empty() { + Vec::new() + } else { + meta_data.to_bytes() + }, + timestamp: now, + source_remote_endpoint, + source_hops, + result, + }) + .await; }), Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), ); @@ -186,9 +233,11 @@ impl InnerProtocol for Handler { fn handle_error( &self, + _host_system: &HostSystemImpl, _node: &Node, _source: &Arc>, _source_path: &Arc>, + _source_hops: u8, _message_id: u64, _in_re_verb: u8, _in_re_message_id: u64, @@ -201,9 +250,11 @@ impl InnerProtocol for Handler { fn handle_ok( &self, + _host_system: &HostSystemImpl, _node: &Node, _source: &Arc>, _source_path: &Arc>, + _source_hops: u8, _message_id: u64, _in_re_verb: u8, _in_re_message_id: u64, @@ -219,24 +270,73 @@ impl InnerProtocol for Handler { } impl Inner { - async fn handle_change_notification( - self: Arc, - service: Arc, Handler>>, - _change: Change, + fn send_network_config( + &self, + peer: Arc, // hack can go away when Rust has specialization + config: &NetworkConfig, + in_re_message_id: Option, // None for unsolicited push ) { + if let Some(host_system) = self.service.read().unwrap().upgrade() { + if let Some(peer) = peer.downcast_ref::, Handler>>>() { + peer.send( + host_system.as_ref(), + host_system.node(), + None, + ms_monotonic(), + |packet| -> Result<(), OutOfBoundsError> { + if let Some(in_re_message_id) = in_re_message_id { + let ok_header = packet.append_struct_get_mut::()?; + ok_header.verb = protocol::verbs::VL1_OK; + ok_header.in_re_verb = protocol::verbs::VL2_VERB_NETWORK_CONFIG_REQUEST; + ok_header.in_re_message_id = in_re_message_id.to_be_bytes(); + } else { + packet.append_u8(protocol::verbs::VL2_VERB_NETWORK_CONFIG)?; + } + + if peer.is_v2() { + todo!() + } else { + let config_data = if let Some(config_dict) = config.v1_proto_to_dictionary(&self.local_identity) { + config_dict.to_bytes() + } else { + eprintln!("WARNING: unexpected error serializing network config into V1 format dictionary"); + return Err(OutOfBoundsError); // abort + }; + if config_data.len() > (u16::MAX as usize) { + eprintln!("WARNING: network config is larger than 65536 bytes!"); + return Err(OutOfBoundsError); // abort + } + + packet.append_u64(config.network_id.into())?; + packet.append_u16(config_data.len() as u16)?; + packet.append_bytes(config_data.as_slice())?; + + // TODO: compress + + // NOTE: V1 supports a bunch of other things like chunking but it was never truly used and is optional. + // Omit it here as it adds overhead. + } + + Ok(()) + }, + ); + } else { + panic!("HostSystem implementation mismatch with service to which controller is harnessed"); + } + } + } + + async fn handle_change_notification(self: Arc, _change: Change) { todo!() } async fn handle_network_config_request( - self: Arc, - source: Arc>, - _source_path: Arc>, - _message_id: u64, + self: &Arc, + source_identity: &Identity, network_id: NetworkId, - _meta_data: Dictionary, - _have_revision: Option, - _have_timestamp: Option, - ) -> Result<(AuthorizationResult, Option), Box> { + _meta_data: &Dictionary, + now: i64, + ) -> Result<(AuthorizationResult, Option), DatabaseImpl::Error> { let network = self.database.get_network(network_id).await?; if network.is_none() { // TODO: send error @@ -244,27 +344,25 @@ impl Inner { } let network = network.unwrap(); - let mut member = self.database.get_member(network_id, source.identity.address).await?; + let mut member = self.database.get_member(network_id, source_identity.address).await?; let mut member_changed = false; - let legacy_v1 = source.identity.p384.is_none(); + let legacy_v1 = source_identity.p384.is_none(); // If we have a member object and a pinned identity, check to make sure it matches. if let Some(member) = member.as_ref() { if let Some(pinned_identity) = member.identity.as_ref() { - if !pinned_identity.eq(&source.identity) { + if !pinned_identity.eq(&source_identity) { return Ok((AuthorizationResult::RejectedIdentityMismatch, None)); } } } - let now = ms_since_epoch(); - let mut authorization_result = AuthorizationResult::Rejected; let mut authorized = member.as_ref().map_or(false, |m| m.authorized()); if !authorized { if member.is_none() { if network.learn_members.unwrap_or(true) { - let _ = member.insert(Member::new_with_identity(source.identity.clone(), network_id)); + let _ = member.insert(Member::new_with_identity(source_identity.clone(), network_id)); member_changed = true; } else { return Ok((AuthorizationResult::Rejected, None)); @@ -296,9 +394,9 @@ impl Inner { let deauthed_members_still_in_window = self.database.list_members_deauthorized_after(network.id, now - max_delta).await; // Check and if necessary auto-assign static IPs for this member. - member_changed |= network.check_zt_ip_assignments(self.database.as_ref(), &mut member).await?; + member_changed |= network.check_zt_ip_assignments(self.database.as_ref(), &mut member).await; - let mut nc = NetworkConfig::new(network_id, source.identity.address); + let mut nc = NetworkConfig::new(network_id, source_identity.address); nc.name = member.name.clone(); nc.private = network.private; @@ -312,25 +410,27 @@ impl Inner { nc.rules = network.rules; nc.dns = network.dns; - nc.certificate_of_membership = Some( - CertificateOfMembership::new(&self.local_identity, network_id, &source.identity, now, max_delta, legacy_v1) - .ok_or(UnexpectedError)?, - ); + nc.certificate_of_membership = + CertificateOfMembership::new(&self.local_identity, network_id, &source_identity, now, max_delta, legacy_v1); + if nc.certificate_of_membership.is_none() { + return Ok((AuthorizationResult::RejectedDueToError, None)); + } - let mut coo = CertificateOfOwnership::new(network_id, now, source.identity.address, legacy_v1); + let mut coo = CertificateOfOwnership::new(network_id, now, source_identity.address, legacy_v1); for ip in nc.static_ips.iter() { coo.add_ip(ip); } - if !coo.sign(&self.local_identity, &source.identity) { - return Err(Box::new(UnexpectedError)); + if !coo.sign(&self.local_identity, &source_identity) { + return Ok((AuthorizationResult::RejectedDueToError, None)); } nc.certificates_of_ownership.push(coo); for (id, value) in member.tags.iter() { - let _ = nc.tags.insert( - *id, - Tag::new(*id, *value, &self.local_identity, network_id, &source.identity, now, legacy_v1).ok_or(UnexpectedError)?, - ); + let tag = Tag::new(*id, *value, &self.local_identity, network_id, &source_identity, now, legacy_v1); + if tag.is_none() { + return Ok((AuthorizationResult::RejectedDueToError, None)); + } + let _ = nc.tags.insert(*id, tag.unwrap()); } // TODO: node info, which isn't supported in v1 so not needed yet diff --git a/controller/src/main.rs b/controller/src/main.rs index c920238a5..937241214 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -33,7 +33,9 @@ async fn run(database: Arc, runtime: &Runt if svc.is_ok() { let svc = svc.unwrap(); svc.node().init_default_roots(); - handler.start_change_watcher(&svc).await; + + handler.set_service(&svc); + handler.start_change_watcher().await; // Wait for kill signal on Unix-like platforms. #[cfg(unix)] diff --git a/controller/src/model/mod.rs b/controller/src/model/mod.rs index 117272f5d..d869c7b8d 100644 --- a/controller/src/model/mod.rs +++ b/controller/src/model/mod.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; use zerotier_network_hypervisor::vl1::{Address, Endpoint}; use zerotier_network_hypervisor::vl2::NetworkId; +use zerotier_utils::blob::Blob; /// A complete network with all member configuration information for import/export or blob storage. #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -94,14 +95,16 @@ pub struct RequestLogItem { pub network_id: NetworkId, #[serde(rename = "nid")] pub node_id: Address, + #[serde(rename = "nf")] + pub node_fingerprint: Blob<48>, #[serde(rename = "cid")] pub controller_node_id: Address, + #[serde(skip_serializing_if = "Vec::is_empty")] + #[serde(default)] #[serde(rename = "md")] pub metadata: Vec, #[serde(rename = "ts")] pub timestamp: i64, - #[serde(rename = "v")] - pub version: (u16, u16, u16, u16), #[serde(rename = "s")] pub source_remote_endpoint: Endpoint, #[serde(rename = "sh")] @@ -113,15 +116,11 @@ pub struct RequestLogItem { impl ToString for RequestLogItem { fn to_string(&self) -> String { format!( - "{} {} {} ts={} v={}.{}.{},{} s={},{} {}", + "{} {} {} ts={} s={},{} {}", self.controller_node_id.to_string(), self.network_id.to_string(), self.node_id.to_string(), self.timestamp, - self.version.0, - self.version.1, - self.version.2, - self.version.3, self.source_remote_endpoint.to_string(), self.source_hops, self.result.to_string() diff --git a/controller/src/model/network.rs b/controller/src/model/network.rs index b9148ebb0..9f38300aa 100644 --- a/controller/src/model/network.rs +++ b/controller/src/model/network.rs @@ -142,11 +142,7 @@ fn troo() -> bool { impl Network { /// Check member IP assignments and return 'true' if IP assignments were created or modified. - pub async fn check_zt_ip_assignments( - &self, - database: &DatabaseImpl, - member: &mut Member, - ) -> Result> { + pub async fn check_zt_ip_assignments(&self, database: &DatabaseImpl, member: &mut Member) -> bool { let mut modified = false; if self.v4_assign_mode.zt { @@ -159,10 +155,14 @@ impl Network { for route in self.ip_routes.iter() { let ip = InetAddress::from_ip_port(&ip_ptr.to_be_bytes(), route.target.port()); // IP/bits if ip.is_within(&route.target) { - if !database.is_ip_assigned(self.id, &ip).await? { - modified = true; - let _ = member.ip_assignments.insert(ip); - break 'ip_search; + if let Ok(is_ip_assigned) = database.is_ip_assigned(self.id, &ip).await { + if !is_ip_assigned { + modified = true; + let _ = member.ip_assignments.insert(ip); + break 'ip_search; + } + } else { + return false; } } } @@ -183,10 +183,14 @@ impl Network { for route in self.ip_routes.iter() { let ip = InetAddress::from_ip_port(&ip_ptr.to_be_bytes(), route.target.port()); // IP/bits if ip.is_within(&route.target) { - if !database.is_ip_assigned(self.id, &ip).await? { - modified = true; - let _ = member.ip_assignments.insert(ip); - break 'ip_search; + if let Ok(is_ip_assigned) = database.is_ip_assigned(self.id, &ip).await { + if !is_ip_assigned { + modified = true; + let _ = member.ip_assignments.insert(ip); + break 'ip_search; + } + } else { + return false; } } } @@ -197,6 +201,6 @@ impl Network { } } - Ok(modified) + return modified; } } diff --git a/network-hypervisor/src/protocol.rs b/network-hypervisor/src/protocol.rs index c74e46896..d6f4a1ea3 100644 --- a/network-hypervisor/src/protocol.rs +++ b/network-hypervisor/src/protocol.rs @@ -129,7 +129,7 @@ pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff; /// Bit mask for address bits in a u64. pub const ADDRESS_MASK: u64 = 0xffffffffff; -pub(crate) mod v1 { +pub mod v1 { use super::*; /// Size of packet header that lies outside the encryption envelope. @@ -266,7 +266,7 @@ pub(crate) mod v1 { /// /// This will panic if the buffer provided doesn't contain a proper header. #[inline(always)] - pub fn set_packet_fragment_flag(pkt: &mut Buffer) { + pub(crate) fn set_packet_fragment_flag(pkt: &mut Buffer) { pkt.as_bytes_mut()[FLAGS_FIELD_INDEX] |= HEADER_FLAG_FRAGMENTED; } @@ -393,27 +393,10 @@ pub(crate) mod v1 { } /// Flat packed structs for fixed length header blocks in messages. - pub(crate) mod message_component_structs { + pub mod message_component_structs { #[derive(Clone, Copy)] #[repr(C, packed)] - pub struct OkHeader { - pub verb: u8, - pub in_re_verb: u8, - pub in_re_message_id: [u8; 8], - } - - #[derive(Clone, Copy)] - #[repr(C, packed)] - pub struct ErrorHeader { - pub verb: u8, - pub in_re_verb: u8, - pub in_re_message_id: [u8; 8], - pub error_code: u8, - } - - #[derive(Clone, Copy)] - #[repr(C, packed)] - pub struct HelloFixedHeaderFields { + pub(crate) struct HelloFixedHeaderFields { pub verb: u8, pub version_proto: u8, pub version_major: u8, @@ -424,7 +407,7 @@ pub(crate) mod v1 { #[derive(Clone, Copy)] #[repr(C, packed)] - pub struct OkHelloFixedHeaderFields { + pub(crate) struct OkHelloFixedHeaderFields { pub timestamp_echo: [u8; 8], // u64 pub version_proto: u8, pub version_major: u8, @@ -532,6 +515,23 @@ pub(crate) mod v1 { } } +#[derive(Clone, Copy)] +#[repr(C, packed)] +pub struct OkHeader { + pub verb: u8, + pub in_re_verb: u8, + pub in_re_message_id: [u8; 8], +} + +#[derive(Clone, Copy)] +#[repr(C, packed)] +pub struct ErrorHeader { + pub verb: u8, + pub in_re_verb: u8, + pub in_re_message_id: [u8; 8], + pub error_code: u8, +} + /// Maximum delta between the message ID of a sent packet and its response. pub(crate) const PACKET_RESPONSE_COUNTER_DELTA_MAX: u64 = 256; @@ -573,8 +573,8 @@ mod tests { #[test] fn representation() { - assert_eq!(size_of::(), 10); - assert_eq!(size_of::(), 11); + assert_eq!(size_of::(), 10); + assert_eq!(size_of::(), 11); assert_eq!(size_of::(), v1::HEADER_SIZE); assert_eq!(size_of::(), v1::FRAGMENT_HEADER_SIZE); diff --git a/network-hypervisor/src/vl1/node.rs b/network-hypervisor/src/vl1/node.rs index f0cb4db53..0ad67c97a 100644 --- a/network-hypervisor/src/vl1/node.rs +++ b/network-hypervisor/src/vl1/node.rs @@ -135,9 +135,11 @@ pub trait InnerProtocol: Sync + Send { /// Do not attempt to handle OK or ERROR. Instead implement handle_ok() and handle_error(). fn handle_packet( &self, + host_system: &HostSystemImpl, node: &Node, source: &Arc>, source_path: &Arc>, + source_hops: u8, message_id: u64, verb: u8, payload: &PacketBuffer, @@ -146,9 +148,11 @@ pub trait InnerProtocol: Sync + Send { /// Handle errors, returning true if the error was recognized. fn handle_error( &self, + host_system: &HostSystemImpl, node: &Node, source: &Arc>, source_path: &Arc>, + source_hops: u8, message_id: u64, in_re_verb: u8, in_re_message_id: u64, @@ -160,9 +164,11 @@ pub trait InnerProtocol: Sync + Send { /// Handle an OK, returing true if the OK was recognized. fn handle_ok( &self, + host_system: &HostSystemImpl, node: &Node, source: &Arc>, source_path: &Arc>, + source_hops: u8, message_id: u64, in_re_verb: u8, in_re_message_id: u64, @@ -1049,9 +1055,11 @@ impl InnerProtocol for DummyInnerProtocol { #[inline(always)] fn handle_packet( &self, + _host_system: &HostSystemImpl, _node: &Node, _source: &Arc>, _source_path: &Arc>, + _source_hops: u8, _message_id: u64, _verb: u8, _payload: &PacketBuffer, @@ -1062,9 +1070,11 @@ impl InnerProtocol for DummyInnerProtocol { #[inline(always)] fn handle_error( &self, + _host_system: &HostSystemImpl, _node: &Node, _source: &Arc>, _source_path: &Arc>, + _source_hops: u8, _message_id: u64, _in_re_verb: u8, _in_re_message_id: u64, @@ -1078,9 +1088,11 @@ impl InnerProtocol for DummyInnerProtocol { #[inline(always)] fn handle_ok( &self, + _host_system: &HostSystemImpl, _node: &Node, _source: &Arc>, _source_path: &Arc>, + _source_hops: u8, _message_id: u64, _in_re_verb: u8, _in_re_message_id: u64, diff --git a/network-hypervisor/src/vl1/peer.rs b/network-hypervisor/src/vl1/peer.rs index 8cf2d8ea8..2ee7664c7 100644 --- a/network-hypervisor/src/vl1/peer.rs +++ b/network-hypervisor/src/vl1/peer.rs @@ -83,6 +83,12 @@ impl Peer { }) } + /// Returns true if this peer supports the ZeroTier V2 protocol stack and features. + #[inline(always)] + pub fn is_v2(&self) -> bool { + self.identity.p384.is_some() + } + /// Get the remote version of this peer: major, minor, revision. /// Returns None if it's not yet known. pub fn version(&self) -> Option<(u8, u8, u16)> { @@ -105,6 +111,7 @@ impl Peer { } /// Get current best path or None if there are no direct paths to this peer. + #[inline] pub fn direct_path(&self) -> Option>> { for p in self.paths.lock().unwrap().iter() { let pp = p.path.upgrade(); @@ -116,6 +123,7 @@ impl Peer { } /// Get either the current best direct path or an indirect path via e.g. a root. + #[inline] pub fn path(&self, node: &Node) -> Option>> { let direct_path = self.direct_path(); if direct_path.is_some() { @@ -184,6 +192,7 @@ impl Peer { prioritize_paths(&mut paths); } + /// Get the next sequential message ID for use with the V1 transport protocol. #[inline(always)] pub(crate) fn v1_proto_next_message_id(&self) -> MessageId { self.message_id_counter.fetch_add(1, Ordering::SeqCst) @@ -292,7 +301,7 @@ impl Peer { let max_fragment_size = path.endpoint.max_fragment_size(); let mut packet = host_system.get_buffer(); - if !self.identity.p384.is_some() { + if !self.is_v2() { // For the V1 protocol, leave room for for the header in the buffer. packet.set_size(v1::HEADER_SIZE); } @@ -300,7 +309,7 @@ impl Peer { let r = builder_function(packet.as_mut()); if r.is_ok() { - if self.identity.p384.is_some() { + if self.is_v2() { todo!() // TODO: ZSSP / V2 protocol } else { if self.remote_node_info.read().unwrap().remote_protocol_version >= 11 { @@ -422,7 +431,7 @@ impl Peer { } debug_assert_eq!(packet.len(), 41); - assert!(node.identity.write_public(packet.as_mut(), self.identity.p384.is_none()).is_ok()); + assert!(node.identity.write_public(packet.as_mut(), !self.is_v2()).is_ok()); let (_, poly1305_key) = v1_proto_salsa_poly_create( &self.v1_proto_static_secret, @@ -536,25 +545,25 @@ impl Peer { return match verb { verbs::VL1_NOP => PacketHandlerResult::Ok, - verbs::VL1_HELLO => self.handle_incoming_hello( + verbs::VL1_HELLO => self.handle_incoming_hello(host_system, inner, node, time_ticks, message_id, source_path, &payload), + verbs::VL1_ERROR => self.handle_incoming_error( host_system, inner, node, time_ticks, - message_id, source_path, packet_header.hops(), + message_id, &payload, ), - verbs::VL1_ERROR => self.handle_incoming_error(host_system, inner, node, time_ticks, source_path, message_id, &payload), verbs::VL1_OK => self.handle_incoming_ok( host_system, inner, node, time_ticks, source_path, - message_id, packet_header.hops(), + message_id, path_is_known, &payload, ), @@ -567,7 +576,16 @@ impl Peer { self.handle_incoming_push_direct_paths(host_system, node, time_ticks, source_path, &payload) } verbs::VL1_USER_MESSAGE => self.handle_incoming_user_message(host_system, node, time_ticks, source_path, &payload), - _ => inner.handle_packet(node, self, &source_path, message_id, verb, &payload), + _ => inner.handle_packet( + host_system, + node, + self, + &source_path, + packet_header.hops(), + message_id, + verb, + &payload, + ), }; } } @@ -583,7 +601,6 @@ impl Peer { time_ticks: i64, message_id: MessageId, source_path: &Arc>, - _hops: u8, payload: &PacketBuffer, ) -> PacketHandlerResult { if !(inner.should_communicate_with(&self.identity) || node.this_node_is_root() || node.is_peer_root(self)) { @@ -615,10 +632,8 @@ impl Peer { Some(source_path), time_ticks, |packet| -> Result<(), Infallible> { - let f: &mut ( - v1::message_component_structs::OkHeader, - v1::message_component_structs::OkHelloFixedHeaderFields, - ) = packet.append_struct_get_mut().unwrap(); + let f: &mut (OkHeader, v1::message_component_structs::OkHelloFixedHeaderFields) = + packet.append_struct_get_mut().unwrap(); f.0.verb = verbs::VL1_OK; f.0.in_re_verb = verbs::VL1_HELLO; f.0.in_re_message_id = message_id.to_ne_bytes(); @@ -641,24 +656,27 @@ impl Peer { fn handle_incoming_error( self: &Arc, - _: &HostSystemImpl, + host_system: &HostSystemImpl, inner: &InnerProtocolImpl, node: &Node, _: i64, source_path: &Arc>, + source_hops: u8, message_id: u64, payload: &PacketBuffer, ) -> PacketHandlerResult { let mut cursor = 0; - if let Ok(error_header) = payload.read_struct::(&mut cursor) { + if let Ok(error_header) = payload.read_struct::(&mut cursor) { let in_re_message_id: MessageId = u64::from_be_bytes(error_header.in_re_message_id); if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { match error_header.in_re_verb { _ => { return inner.handle_error( + host_system, node, self, &source_path, + source_hops, message_id, error_header.in_re_verb, in_re_message_id, @@ -680,13 +698,13 @@ impl Peer { node: &Node, time_ticks: i64, source_path: &Arc>, + source_hops: u8, message_id: u64, - hops: u8, path_is_known: bool, payload: &PacketBuffer, ) -> PacketHandlerResult { let mut cursor = 0; - if let Ok(ok_header) = payload.read_struct::(&mut cursor) { + if let Ok(ok_header) = payload.read_struct::(&mut cursor) { let in_re_message_id: MessageId = u64::from_ne_bytes(ok_header.in_re_message_id); if self.message_id_counter.load(Ordering::Relaxed).wrapping_sub(in_re_message_id) <= PACKET_RESPONSE_COUNTER_DELTA_MAX { match ok_header.in_re_verb { @@ -694,7 +712,7 @@ impl Peer { if let Ok(_ok_hello_fixed_header_fields) = payload.read_struct::(&mut cursor) { - if hops == 0 { + if source_hops == 0 { debug_event!(host_system, "[vl1] {} OK(HELLO)", self.identity.address.to_string(),); if let Ok(reported_endpoint) = Endpoint::unmarshal(&payload, &mut cursor) { #[cfg(debug_assertions)] @@ -718,7 +736,7 @@ impl Peer { } } - if hops == 0 && !path_is_known { + if source_hops == 0 && !path_is_known { self.learn_path(host_system, source_path, time_ticks); } @@ -755,9 +773,11 @@ impl Peer { _ => { return inner.handle_ok( + host_system, node, self, &source_path, + source_hops, message_id, ok_header.in_re_verb, in_re_message_id, @@ -788,7 +808,7 @@ impl Peer { while addresses.len() >= ADDRESS_SIZE && (packet.len() + Identity::MAX_MARSHAL_SIZE) <= UDP_DEFAULT_MTU { if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) { if let Some(peer) = node.peer(zt_address) { - peer.identity.write_public(packet, self.identity.p384.is_none())?; + peer.identity.write_public(packet, !self.is_v2())?; } } addresses = &addresses[ADDRESS_SIZE..]; @@ -828,7 +848,7 @@ impl Peer { ) -> PacketHandlerResult { if inner.should_communicate_with(&self.identity) || node.is_peer_root(self) { self.send(host_system, node, None, time_ticks, |packet| { - let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap(); + let mut f: &mut OkHeader = packet.append_struct_get_mut().unwrap(); f.verb = verbs::VL1_OK; f.in_re_verb = verbs::VL1_ECHO; f.in_re_message_id = message_id.to_ne_bytes(); diff --git a/network-hypervisor/src/vl2/networkconfig.rs b/network-hypervisor/src/vl2/networkconfig.rs index dba24159a..ea8add846 100644 --- a/network-hypervisor/src/vl2/networkconfig.rs +++ b/network-hypervisor/src/vl2/networkconfig.rs @@ -410,13 +410,15 @@ pub struct SSOAuthConfiguration { /// Information about nodes on the network that can be included in a network config. #[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct NodeInfo { - pub flags: u64, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub flags: Option, #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub ip: Option, - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "String::is_empty")] #[serde(default)] - pub name: Option, + pub name: String, #[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(default)] pub services: HashMap>, @@ -429,8 +431,12 @@ pub struct IpRoute { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub via: Option, - pub flags: u16, - pub metric: u16, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub flags: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub metric: Option, } impl Marshalable for IpRoute { @@ -446,8 +452,8 @@ impl Marshalable for IpRoute { } else { buf.append_u8(0)?; // "nil" InetAddress } - buf.append_u16(self.flags)?; - buf.append_u16(self.metric)?; + buf.append_u16(self.flags.unwrap_or(0))?; + buf.append_u16(self.metric.unwrap_or(0))?; Ok(()) } @@ -465,8 +471,20 @@ impl Marshalable for IpRoute { Some(via) } }, - flags: buf.read_u16(cursor)?, - metric: buf.read_u16(cursor)?, + flags: buf.read_u16(cursor).map(|f| { + if f == 0 { + None + } else { + Some(f) + } + })?, + metric: buf.read_u16(cursor).map(|f| { + if f == 0 { + None + } else { + Some(f) + } + })?, }) } } diff --git a/network-hypervisor/src/vl2/switch.rs b/network-hypervisor/src/vl2/switch.rs index b166f1a0f..4331b055c 100644 --- a/network-hypervisor/src/vl2/switch.rs +++ b/network-hypervisor/src/vl2/switch.rs @@ -13,9 +13,11 @@ pub struct Switch {} impl InnerProtocol for Switch { fn handle_packet( &self, + host_system: &HostSystemImpl, node: &Node, source: &Arc>, source_path: &Arc>, + source_hops: u8, message_id: u64, verb: u8, payload: &PacketBuffer, @@ -25,9 +27,11 @@ impl InnerProtocol for Switch { fn handle_error( &self, + host_system: &HostSystemImpl, node: &Node, source: &Arc>, source_path: &Arc>, + source_hops: u8, message_id: u64, in_re_verb: u8, in_re_message_id: u64, @@ -40,9 +44,11 @@ impl InnerProtocol for Switch { fn handle_ok( &self, + host_system: &HostSystemImpl, node: &Node, source: &Arc>, source_path: &Arc>, + source_hops: u8, message_id: u64, in_re_verb: u8, in_re_message_id: u64,