From 5772a135f58a5bd1cb1aad53846c7691bc3261c7 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 10 Nov 2022 18:25:36 -0500 Subject: [PATCH] More work on controller: FileDB change detection, etc. --- controller/src/cache.rs | 109 ++++++ controller/src/controller.rs | 105 ++++-- controller/src/database.rs | 9 +- controller/src/filedatabase.rs | 316 ++++++++++++++---- controller/src/lib.rs | 2 + controller/src/main.rs | 4 +- controller/src/model/member.rs | 18 +- controller/src/model/mod.rs | 7 + controller/src/model/network.rs | 5 +- network-hypervisor/src/protocol.rs | 1 + network-hypervisor/src/vl1/identity.rs | 7 + network-hypervisor/src/vl2/networkid.rs | 13 +- .../src/vl2/v1/certificateofmembership.rs | 34 +- network-hypervisor/src/vl2/v1/revocation.rs | 5 + utils/src/memory.rs | 8 + 15 files changed, 513 insertions(+), 130 deletions(-) create mode 100644 controller/src/cache.rs diff --git a/controller/src/cache.rs b/controller/src/cache.rs new file mode 100644 index 000000000..b8e2feb70 --- /dev/null +++ b/controller/src/cache.rs @@ -0,0 +1,109 @@ +// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. + +use std::collections::HashMap; +use std::error::Error; +use std::mem::replace; +use std::sync::{Mutex, RwLock}; + +use crate::database::Database; +use crate::model::{Member, Network}; + +use zerotier_network_hypervisor::vl1::Address; +use zerotier_network_hypervisor::vl2::NetworkId; + +/// Network and member cache used by database implementations to implement change detection. +pub struct Cache { + by_nwid: RwLock>)>>, +} + +impl Cache { + pub fn new() -> Self { + Self { by_nwid: RwLock::new(HashMap::new()) } + } + + /// Load (or reload) the entire cache from a database. + pub async fn load_all(&self, db: &DatabaseImpl) -> Result<(), Box> { + let mut by_nwid = self.by_nwid.write().unwrap(); + by_nwid.clear(); + + let networks = db.list_networks().await?; + for network_id in networks { + if let Some(network) = db.get_network(network_id).await? { + let network_entry = by_nwid.entry(network_id).or_insert_with(|| (network, Mutex::new(HashMap::new()))); + let mut by_node_id = network_entry.1.lock().unwrap(); + let members = db.list_members(network_id).await?; + for node_id in members { + if let Some(member) = db.get_member(network_id, node_id).await? { + let _ = by_node_id.insert(node_id, member); + } + } + } + } + + Ok(()) + } + + pub fn list_cached_networks(&self) -> Vec { + self.by_nwid.read().unwrap().keys().cloned().collect() + } + + /// Update a network if changed, returning whether or not any update was made and the old version if any. + /// A value of (true, None) indicates that there was no network by that ID in which case it is added. + pub fn on_network_updated(&self, network: Network) -> (bool, Option) { + let mut by_nwid = self.by_nwid.write().unwrap(); + if let Some(prev_network) = by_nwid.get_mut(&network.id) { + if !prev_network.0.eq(&network) { + (true, Some(replace(&mut prev_network.0, network))) + } else { + (false, None) + } + } else { + let _ = by_nwid.insert(network.id, (network.clone(), Mutex::new(HashMap::new()))); + (true, None) + } + } + + /// Update a member if changed, returning whether or not any update was made and the old version if any. + /// A value of (true, None) indicates that there was no member with that ID. If there is no network with + /// the member's network ID (false, None) is returned and no action is taken. + pub fn on_member_updated(&self, member: Member) -> (bool, Option) { + let by_nwid = self.by_nwid.read().unwrap(); + if let Some(network) = by_nwid.get(&member.network_id) { + let mut by_node_id = network.1.lock().unwrap(); + if let Some(prev_member) = by_node_id.get_mut(&member.node_id) { + if !member.eq(prev_member) { + (true, Some(replace(prev_member, member))) + } else { + (false, None) + } + } else { + let _ = by_node_id.insert(member.node_id, member); + (true, None) + } + } else { + (false, None) + } + } + + /// Delete a network, returning it if it existed. + pub fn on_network_deleted(&self, network_id: NetworkId) -> Option<(Network, Vec)> { + let mut by_nwid = self.by_nwid.write().unwrap(); + if let Some(network) = by_nwid.remove(&network_id) { + let mut members = network.1.lock().unwrap(); + Some((network.0, members.drain().map(|(_, v)| v).collect())) + } else { + None + } + } + + /// Delete a member, returning it if it existed. + pub fn on_member_deleted(&self, network_id: NetworkId, node_id: Address) -> Option { + let by_nwid = self.by_nwid.read().unwrap(); + if let Some(network) = by_nwid.get(&network_id) { + let mut members = network.1.lock().unwrap(); + members.remove(&node_id) + } else { + None + } + } +} diff --git a/controller/src/controller.rs b/controller/src/controller.rs index ab161cca6..171f95db2 100644 --- a/controller/src/controller.rs +++ b/controller/src/controller.rs @@ -16,7 +16,6 @@ use zerotier_network_hypervisor::vl2::v1::Revocation; use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_utils::blob::Blob; use zerotier_utils::buffer::OutOfBoundsError; -use zerotier_utils::dictionary::Dictionary; use zerotier_utils::error::InvalidParameterError; use zerotier_utils::reaper::Reaper; use zerotier_utils::tokio; @@ -125,7 +124,7 @@ impl Controller { })); } - /// Compose and send network configuration packet. + /// Compose and send network configuration packet (either V1 or V2) fn send_network_config( &self, peer: &Peer, @@ -184,25 +183,45 @@ impl Controller { } } - /// Send one or more revocation object(s) to a peer. - fn send_revocations(&self, peer: &Peer, revocations: Vec) { - if let Some(host_system) = self.service.read().unwrap().upgrade() {} - } + /// Send one or more revocation object(s) to a peer (V1 protocol only). + fn v1_proto_send_revocations(&self, peer: &Peer, mut revocations: Vec) { + if let Some(host_system) = self.service.read().unwrap().upgrade() { + let time_ticks = ms_monotonic(); + while !revocations.is_empty() { + let send_count = revocations.len().min(protocol::UDP_DEFAULT_MTU / 256); + debug_assert!(send_count <= (u16::MAX as usize)); + peer.send( + host_system.as_ref(), + host_system.node(), + None, + time_ticks, + |packet| -> Result<(), OutOfBoundsError> { + let payload_start = packet.len(); - /// Called when the DB informs us of a change. - async fn handle_change_notification(self: Arc, change: Change) { - match change { - Change::MemberAuthorized(_, _) => {} - Change::MemberDeauthorized(network_id, node_id) => { - if let Ok(Some(member)) = self.database.get_member(network_id, node_id).await { - if !member.authorized() { - // TODO - } - } + packet.append_u8(protocol::message_type::VL2_NETWORK_CREDENTIALS)?; + packet.append_u8(0)?; + packet.append_u16(0)?; + packet.append_u16(0)?; + packet.append_u16(send_count as u16)?; + for _ in 0..send_count { + let r = revocations.pop().unwrap(); + packet.append_bytes(r.to_bytes(self.local_identity.address).as_bytes())?; + } + packet.append_u16(0)?; + + let new_payload_len = protocol::compress(&mut packet.as_bytes_mut()[payload_start..]); + packet.set_size(payload_start + new_payload_len); + + Ok(()) + }, + ); } } } + /// Called when the DB informs us of a change. + async fn handle_change_notification(self: Arc, change: Change) {} + /// Attempt to create a network configuration and return the result. /// /// This is the central function of the controller that looks up members, checks their @@ -228,20 +247,34 @@ impl Controller { let mut member = self.database.get_member(network_id, source_identity.address).await?; let mut member_changed = false; + // WARNING: this is where members are verified before they get admitted to a network. Read and edit + // very carefully! + // If we have a member object and a pinned identity, check to make sure it matches. Also accept // upgraded identities to replace old versions if they are properly formed and inherit. if let Some(member) = member.as_mut() { if let Some(pinned_identity) = member.identity.as_ref() { if !pinned_identity.eq(&source_identity) { - return Ok((AuthorizationResult::RejectedIdentityMismatch, None, None)); - } else if source_identity.is_upgraded_from(pinned_identity) { + if source_identity.is_upgraded_from(pinned_identity) { + // Upgrade identity types if we have a V2 identity upgraded from a V1 identity. + let _ = member.identity.replace(source_identity.clone_without_secret()); + member_changed = true; + } else { + return Ok((AuthorizationResult::RejectedIdentityMismatch, None, None)); + } + } + } else if let Some(pinned_fingerprint) = member.identity_fingerprint.as_ref() { + if pinned_fingerprint.as_bytes().eq(&source_identity.fingerprint) { + // Learn the FULL identity if the fingerprint is pinned and they match. let _ = member.identity.replace(source_identity.clone_without_secret()); member_changed = true; + } else { + return Ok((AuthorizationResult::RejectedIdentityMismatch, None, None)); } } } - // This is the final verdict after everything has been checked. + // This will be the final verdict after everything has been checked. let mut authorization_result = AuthorizationResult::Rejected; // This is the main "authorized" flag on the member record. If it is true then @@ -296,7 +329,7 @@ impl Controller { let credential_ttl = network.credential_ttl.unwrap_or(CREDENTIAL_WINDOW_SIZE_DEFAULT); // Check and if necessary auto-assign static IPs for this member. - member_changed |= network.check_zt_ip_assignments(self.database.as_ref(), &mut member).await; + member_changed |= network.assign_ip_addresses(self.database.as_ref(), &mut member).await; let mut nc = NetworkConfig::new(network_id, source_identity.address); @@ -314,6 +347,9 @@ impl Controller { nc.dns = network.dns; if network.min_supported_version.unwrap_or(0) < (protocol::PROTOCOL_VERSION_V2 as u32) { + // If this network supports V1 nodes we have to include V1 credentials. Otherwise we can skip + // the overhead (bandwidth and CPU) of generating these. + if let Some(com) = vl2::v1::CertificateOfMembership::new(&self.local_identity, network_id, &source_identity, now, credential_ttl) { @@ -344,7 +380,7 @@ impl Controller { nc.v1_credentials = Some(v1cred); - // Staple a bunch of revocations for anyone deauthed that still might be in the window. + // For anyone who has been deauthorized but is still in the window, send revocations. if let Ok(deauthed_members_still_in_window) = self .database .list_members_deauthorized_after(network.id, now - credential_ttl) @@ -373,9 +409,10 @@ impl Controller { } } else { // TODO: create V2 type credential for V2-only networks - // TODO: populate node info for V2 networks } + // Log this member in the recently authorized cache, which is currently just used to filter whether we should + // handle multicast subscription traffic. let _ = self .recently_authorized .write() @@ -432,22 +469,18 @@ impl InnerProtocol for Controller { u64::from(network_id) ); - let meta_data = if (cursor + 2) < payload.len() { + let metadata = if (cursor + 2) < payload.len() { let meta_data_len = payload.read_u16(&mut cursor); if meta_data_len.is_err() { return PacketHandlerResult::Error; } if let Ok(d) = payload.read_bytes(meta_data_len.unwrap() as usize, &mut cursor) { - let d = Dictionary::from_bytes(d); - if d.is_none() { - return PacketHandlerResult::Error; - } - d.unwrap() + d.to_vec() } else { return PacketHandlerResult::Error; } } else { - Dictionary::new() + Vec::new() }; // Launch handler as an async background task. @@ -464,7 +497,7 @@ impl InnerProtocol for Controller { //println!("{}", serde_yaml::to_string(&config).unwrap()); self2.send_network_config(source.as_ref(), &config, Some(message_id)); if let Some(revocations) = revocations { - self2.send_revocations(source.as_ref(), revocations); + self2.v1_proto_send_revocations(source.as_ref(), revocations); } (result, Some(config)) } @@ -484,11 +517,7 @@ impl InnerProtocol for Controller { node_id, node_fingerprint, controller_node_id: self2.local_identity.address, - metadata: if meta_data.is_empty() { - Vec::new() - } else { - meta_data.to_bytes() - }, + metadata, peer_version: source.version(), peer_protocol_version: source.protocol_version(), timestamp: now, @@ -550,16 +579,18 @@ impl InnerProtocol for Controller { impl VL1AuthProvider for Controller { #[inline(always)] fn should_respond_to(&self, _: &Identity) -> bool { + // Controllers always have to establish sessions to process requests. We don't really know if + // a member is relevant until we have looked up both the network and the member, since whether + // or not to "learn" unknown members is a network level option. true } fn has_trust_relationship(&self, id: &Identity) -> bool { - let time_ticks = ms_monotonic(); self.recently_authorized .read() .unwrap() .get(&id.fingerprint) - .map_or(false, |by_network| by_network.values().any(|t| *t > time_ticks)) + .map_or(false, |by_network| by_network.values().any(|t| *t > ms_monotonic())) } } diff --git a/controller/src/database.rs b/controller/src/database.rs index b2700bf05..3390197c0 100644 --- a/controller/src/database.rs +++ b/controller/src/database.rs @@ -11,12 +11,17 @@ use crate::model::*; /// Database change relevant to the controller and that was NOT initiated by the controller. #[derive(Clone)] pub enum Change { - MemberAuthorized(NetworkId, Address), - MemberDeauthorized(NetworkId, Address), + NetworkCreated(Network), + NetworkChanged(Network, Network), + NetworkDeleted(Network), + MemberCreated(Member), + MemberChanged(Member, Member), + MemberDeleted(Member), } #[async_trait] pub trait Database: Sync + Send + NodeStorage + 'static { + async fn list_networks(&self) -> Result, Box>; async fn get_network(&self, id: NetworkId) -> Result, Box>; async fn save_network(&self, obj: Network) -> Result<(), Box>; diff --git a/controller/src/filedatabase.rs b/controller/src/filedatabase.rs index fb28daf13..a014d6d19 100644 --- a/controller/src/filedatabase.rs +++ b/controller/src/filedatabase.rs @@ -2,7 +2,7 @@ use std::error::Error; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; use async_trait::async_trait; use notify::{RecursiveMode, Watcher}; @@ -12,13 +12,19 @@ use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_utils::io::{fs_restrict_permissions, read_limit}; +use zerotier_utils::reaper::Reaper; use zerotier_utils::tokio::fs; +use zerotier_utils::tokio::runtime::Handle; use zerotier_utils::tokio::sync::broadcast::{channel, Receiver, Sender}; +use zerotier_utils::tokio::task::JoinHandle; +use zerotier_utils::tokio::time::{sleep, Duration, Instant}; +use crate::cache::Cache; use crate::database::{Change, Database}; use crate::model::*; const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret"; +const EVENT_HANDLER_TASK_TIMEOUT: Duration = Duration::from_secs(5); /// An in-filesystem database that permits live editing. /// @@ -28,47 +34,174 @@ const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret"; pub struct FileDatabase { base_path: PathBuf, controller_address: AtomicU64, - change_sender: Arc>, - watcher: Mutex>, + change_sender: Sender, + tasks: Reaper, + cache: Cache, + daemon: JoinHandle<()>, } // TODO: should cache at least hashes and detect changes in the filesystem live. impl FileDatabase { - pub async fn new>(base_path: P) -> Result> { + pub async fn new>(runtime: Handle, base_path: P) -> Result, Box> { let base_path: PathBuf = base_path.as_ref().into(); let _ = fs::create_dir_all(&base_path).await?; let (change_sender, _) = channel(256); - let change_sender = Arc::new(change_sender); + let db_weak_tmp: Arc>> = Arc::new(Mutex::new(Weak::default())); + let db_weak = db_weak_tmp.clone(); + let runtime2 = runtime.clone(); - let sender = Arc::downgrade(&change_sender); - let mut watcher: Box = - Box::new(notify::recommended_watcher(move |event: notify::Result| { - if let Ok(event) = event { - if let Some(_sender) = sender.upgrade() { - // TODO - match event.kind { - notify::EventKind::Modify(_) => {} - notify::EventKind::Remove(_) => {} - _ => {} - } - } - } - })?); - let _ = watcher.configure( - notify::Config::default() - .with_compare_contents(true) - .with_poll_interval(std::time::Duration::from_secs(2)), - ); - watcher.watch(&base_path, RecursiveMode::Recursive)?; - - Ok(Self { + let db = Arc::new(Self { base_path: base_path.clone(), controller_address: AtomicU64::new(0), change_sender, - watcher: Mutex::new(watcher), - }) + tasks: Reaper::new(&runtime2), + cache: Cache::new(), + daemon: runtime2.spawn(async move { + while db_weak.lock().unwrap().upgrade().is_none() { + // Wait for parent to finish constructing and start up, then create watcher. + sleep(Duration::from_millis(10)).await; + } + + let mut watcher = notify::recommended_watcher(move |event: notify::Result| { + if let Ok(event) = event { + if let Some(db) = db_weak.lock().unwrap().upgrade() { + if let Some(controller_address) = db.get_controller_address() { + db.clone().tasks.add( + runtime.spawn(async move { + if let Some(path0) = event.paths.first() { + if let Some((record_type, network_id, node_id)) = + Self::record_type_from_path(controller_address, path0.as_path()) + { + let mut deleted = None; + let mut changed = None; + + match event.kind { + notify::EventKind::Create(create_kind) => match create_kind { + notify::event::CreateKind::File => { + changed = Some(path0.as_path()); + } + _ => {} + }, + notify::EventKind::Modify(modify_kind) => match modify_kind { + notify::event::ModifyKind::Data(_) => { + changed = Some(path0.as_path()); + } + notify::event::ModifyKind::Name(rename_mode) => match rename_mode { + notify::event::RenameMode::Both => { + if event.paths.len() >= 2 { + if let Some(path1) = event.paths.last() { + deleted = Some(path0.as_path()); + changed = Some(path1.as_path()); + } + } + } + notify::event::RenameMode::From => { + deleted = Some(path0.as_path()); + } + notify::event::RenameMode::To => { + changed = Some(path0.as_path()); + } + _ => {} + }, + _ => {} + }, + notify::EventKind::Remove(remove_kind) => match remove_kind { + notify::event::RemoveKind::File => { + deleted = Some(path0.as_path()); + } + _ => {} + }, + _ => {} + } + + if deleted.is_some() { + match record_type { + RecordType::Network => { + if let Some((network, mut members)) = db.cache.on_network_deleted(network_id) { + for m in members.drain(..) { + let _ = db.change_sender.send(Change::MemberDeleted(m)); + } + let _ = db.change_sender.send(Change::NetworkDeleted(network)); + } + } + RecordType::Member => { + if let Some(node_id) = node_id { + if let Some(member) = db.cache.on_member_deleted(network_id, node_id) { + let _ = db.change_sender.send(Change::MemberDeleted(member)); + } + } + } + _ => {} + } + } + + if let Some(changed) = changed { + match record_type { + RecordType::Network => { + if let Ok(Some(new_network)) = Self::get_network_internal(changed).await { + match db.cache.on_network_updated(new_network.clone()) { + (true, Some(old_network)) => { + let _ = db + .change_sender + .send(Change::NetworkChanged(old_network, new_network)); + } + (true, None) => { + let _ = db.change_sender.send(Change::NetworkCreated(new_network)); + } + _ => {} + } + } + } + RecordType::Member => { + if let Ok(Some(new_member)) = Self::get_member_internal(changed).await { + match db.cache.on_member_updated(new_member.clone()) { + (true, Some(old_member)) => { + let _ = db + .change_sender + .send(Change::MemberChanged(old_member, new_member)); + } + (true, None) => { + let _ = db.change_sender.send(Change::MemberCreated(new_member)); + } + _ => {} + } + } + } + _ => {} + } + } + } + } + }), + Instant::now().checked_add(EVENT_HANDLER_TASK_TIMEOUT).unwrap(), + ); + } + } + } + }) + .expect("FATAL: unable to start filesystem change listener"); + let _ = watcher.configure( + notify::Config::default() + .with_compare_contents(true) + .with_poll_interval(std::time::Duration::from_secs(2)), + ); + watcher + .watch(&base_path, RecursiveMode::Recursive) + .expect("FATAL: unable to watch base path"); + + loop { + // Any periodic background stuff can be put here. Adjust timing as needed. + sleep(Duration::from_secs(10)).await; + } + }), + }); + + db.cache.load_all(db.as_ref()).await?; + *db_weak_tmp.lock().unwrap() = Arc::downgrade(&db); // this kicks off watcher task too + + Ok(db) } fn get_controller_address(&self) -> Option
{ @@ -86,7 +219,7 @@ impl FileDatabase { } fn network_path(&self, network_id: NetworkId) -> PathBuf { - self.base_path.join(format!("N{:06x}.yaml", network_id.network_no())) + self.base_path.join(format!("N{:06x}", network_id.network_no())).join("config.yaml") } fn member_path(&self, network_id: NetworkId, member_id: Address) -> PathBuf { @@ -94,11 +227,52 @@ impl FileDatabase { .join(format!("N{:06x}", network_id.network_no())) .join(format!("M{}.yaml", member_id.to_string())) } + + async fn get_network_internal>(path: P) -> Result, Box> { + let r = fs::read(path).await; + if let Ok(raw) = r { + return Ok(Some(serde_yaml::from_slice::(raw.as_slice())?)); + } else { + return Ok(None); + } + } + + async fn get_member_internal>(path: P) -> Result, Box> { + let r = fs::read(path).await; + if let Ok(raw) = r { + Ok(Some(serde_yaml::from_slice::(raw.as_slice())?)) + } else { + Ok(None) + } + } + + /// Get record type and also the number after it: network number or address. + fn record_type_from_path>(controller_address: Address, pp: P) -> Option<(RecordType, NetworkId, Option
)> { + let p: &Path = pp.as_ref(); + if p.parent().map_or(false, |p| p.starts_with("N") && p.as_os_str().len() == 7) { + let network_id = NetworkId::from_controller_and_network_no( + controller_address, + u64::from_str_radix(&p.parent().unwrap().to_str().unwrap()[1..], 16).unwrap_or(0), + )?; + if let Some(file_name) = p.file_name().map(|p| p.to_string_lossy().to_lowercase()) { + if file_name.eq("config.yaml") { + return Some((RecordType::Network, network_id, None)); + } else if file_name.len() == 16 && file_name.starts_with("m") { + return Some(( + RecordType::Member, + network_id, + Some(Address::from_u64(u64::from_str_radix(&file_name.as_str()[1..], 16).unwrap_or(0))?), + )); + } + } + } + return None; + } } impl Drop for FileDatabase { fn drop(&mut self) { - let _ = self.watcher.lock().unwrap().unwatch(&self.base_path); + self.daemon.abort(); } } @@ -126,35 +300,51 @@ impl NodeStorage for FileDatabase { #[async_trait] impl Database for FileDatabase { - async fn get_network(&self, id: NetworkId) -> Result, Box> { - let r = fs::read(self.network_path(id)).await; - if let Ok(raw) = r { - let mut network = serde_yaml::from_slice::(raw.as_slice())?; + async fn list_networks(&self) -> Result, Box> { + let mut networks = Vec::new(); + if let Some(controller_address) = self.get_controller_address() { + let controller_address_shift24 = u64::from(controller_address).wrapping_shl(24); + let mut dir = fs::read_dir(&self.base_path).await?; + while let Ok(Some(ent)) = dir.next_entry().await { + if ent.file_type().await.map_or(false, |t| t.is_dir()) { + let osname = ent.file_name(); + let name = osname.to_string_lossy(); + if name.len() == 7 && name.starts_with("N") { + if fs::metadata(ent.path().join("config.yaml")).await.is_ok() { + if let Ok(nwid_last24bits) = u64::from_str_radix(&name[1..], 16) { + if let Some(nwid) = NetworkId::from_u64(controller_address_shift24 | nwid_last24bits) { + networks.push(nwid); + } + } + } + } + } + } + } + Ok(networks) + } + #[inline] + async fn get_network(&self, id: NetworkId) -> Result, Box> { + let mut network = Self::get_network_internal(&self.network_path(id)).await?; + if let Some(network) = network.as_mut() { // FileDatabase stores networks by their "network number" and automatically adapts their IDs // if the controller's identity changes. This is done to make it easy to just clone networks, // including storing them in "git." if let Some(controller_address) = self.get_controller_address() { let network_id_should_be = network.id.change_network_controller(controller_address); - if id != network_id_should_be { - return Ok(None); - } if network.id != network_id_should_be { network.id = network_id_should_be; - let _ = self.save_network(network.clone()).await; + let _ = self.save_network(network.clone()).await?; } } - - return Ok(Some(network)); - } else { - return Ok(None); } + Ok(network) } async fn save_network(&self, obj: Network) -> Result<(), Box> { let base_network_path = self.network_path(obj.id); let _ = fs::create_dir_all(base_network_path.parent().unwrap()).await; - //let _ = fs::write(base_network_path, to_json_pretty(&obj).as_bytes()).await?; let _ = fs::write(base_network_path, serde_yaml::to_string(&obj)?.as_bytes()).await?; return Ok(()); } @@ -163,15 +353,17 @@ impl Database for FileDatabase { let mut members = Vec::new(); let mut dir = fs::read_dir(self.base_path.join(network_id.to_string())).await?; while let Ok(Some(ent)) = dir.next_entry().await { - let osname = ent.file_name(); - let name = osname.to_string_lossy(); - if name.len() == (zerotier_network_hypervisor::protocol::ADDRESS_SIZE_STRING + 6) - && name.starts_with("M") - && name.ends_with(".yaml") - { - if let Ok(member_address) = u64::from_str_radix(&name[1..11], 16) { - if let Some(member_address) = Address::from_u64(member_address) { - members.push(member_address); + if ent.file_type().await.map_or(false, |t| t.is_file() || t.is_symlink()) { + let osname = ent.file_name(); + let name = osname.to_string_lossy(); + if name.len() == (zerotier_network_hypervisor::protocol::ADDRESS_SIZE_STRING + 6) + && name.starts_with("M") + && name.ends_with(".yaml") + { + if let Ok(member_address) = u64::from_str_radix(&name[1..11], 16) { + if let Some(member_address) = Address::from_u64(member_address) { + members.push(member_address); + } } } } @@ -180,16 +372,14 @@ impl Database for FileDatabase { } async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Box> { - let r = fs::read(self.member_path(network_id, node_id)).await; - if let Ok(raw) = r { - let mut member = serde_yaml::from_slice::(raw.as_slice())?; - self.get_controller_address() - .map(|a| member.network_id = member.network_id.change_network_controller(a)); - Ok(Some(member)) - //Ok(Some(serde_json::from_slice::(raw.as_slice())?)) - } else { - Ok(None) + let mut member = Self::get_member_internal(&self.member_path(network_id, node_id)).await?; + if let Some(member) = member.as_mut() { + if member.network_id != network_id { + member.network_id = network_id; + self.save_member(member.clone()).await?; + } } + Ok(member) } async fn save_member(&self, obj: Member) -> Result<(), Box> { @@ -227,7 +417,7 @@ mod tests { let _ = std::fs::remove_dir_all(&test_dir); - let db = FileDatabase::new(test_dir).await.expect("new db"); + let db = FileDatabase::new(tokio_runtime.handle().clone(), test_dir).await.expect("new db"); let mut test_member = Member::new_without_identity(node_id, network_id); for x in 0..3 { diff --git a/controller/src/lib.rs b/controller/src/lib.rs index f217fad1d..4327dfbb8 100644 --- a/controller/src/lib.rs +++ b/controller/src/lib.rs @@ -2,6 +2,8 @@ mod controller; +pub(crate) mod cache; + pub mod database; pub mod filedatabase; pub mod model; diff --git a/controller/src/main.rs b/controller/src/main.rs index e7f44c75f..db1fc638c 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -78,7 +78,7 @@ fn main() { if let Ok(tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() { tokio_runtime.block_on(async { if let Some(filedb_base_path) = global_args.value_of("filedb") { - let file_db = FileDatabase::new(filedb_base_path).await; + let file_db = FileDatabase::new(tokio_runtime.handle().clone(), filedb_base_path).await; if file_db.is_err() { eprintln!( "FATAL: unable to open filesystem database at {}: {}", @@ -87,7 +87,7 @@ fn main() { ); std::process::exit(exitcode::ERR_IOERR) } - std::process::exit(run(Arc::new(file_db.unwrap()), &tokio_runtime).await); + std::process::exit(run(file_db.unwrap(), &tokio_runtime).await); } else { eprintln!("FATAL: no database type selected."); std::process::exit(exitcode::ERR_USAGE); diff --git a/controller/src/model/member.rs b/controller/src/model/member.rs index 9890ed530..63483a39e 100644 --- a/controller/src/model/member.rs +++ b/controller/src/model/member.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use zerotier_network_hypervisor::vl1::{Address, Identity, InetAddress}; use zerotier_network_hypervisor::vl2::NetworkId; +use zerotier_utils::blob::Blob; #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Member { @@ -15,6 +16,13 @@ pub struct Member { #[serde(rename = "networkId")] pub network_id: NetworkId, + /// Pinned full member identity fingerprint, if known. + /// If this is set but 'identity' is not, the 'identity' field will be set on first request + /// but an identity not matching this fingerprint will not be accepted. This allows a member + /// to be created with an address and a fingerprint for full SHA384 identity specification. + #[serde(skip_serializing_if = "Option::is_none")] + pub identity_fingerprint: Option>, + /// Pinned full member identity, if known. #[serde(skip_serializing_if = "Option::is_none")] pub identity: Option, @@ -71,11 +79,13 @@ pub struct Member { } impl Member { + /// Create a new network member without specifying a "pinned" identity. pub fn new_without_identity(node_id: Address, network_id: NetworkId) -> Self { Self { node_id, network_id, identity: None, + identity_fingerprint: None, name: String::new(), last_authorized_time: None, last_deauthorized_time: None, @@ -90,6 +100,7 @@ impl Member { pub fn new_with_identity(identity: Identity, network_id: NetworkId) -> Self { let mut tmp = Self::new_without_identity(identity.address, network_id); + tmp.identity_fingerprint = Some(Blob::from(identity.fingerprint)); tmp.identity = Some(identity); tmp } @@ -110,14 +121,7 @@ impl Hash for Member { } impl ToString for Member { - #[inline(always)] fn to_string(&self) -> String { zerotier_utils::json::to_json_pretty(self) } } - -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Tag { - pub id: u32, - pub value: u32, -} diff --git a/controller/src/model/mod.rs b/controller/src/model/mod.rs index 3e2c6e5fc..c3320c8cf 100644 --- a/controller/src/model/mod.rs +++ b/controller/src/model/mod.rs @@ -15,6 +15,13 @@ use zerotier_network_hypervisor::vl2::networkconfig::NetworkConfig; use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_utils::blob::Blob; +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum RecordType { + Network, + Member, + RequestLogItem, +} + /// A complete network with all member configuration information for import/export or blob storage. #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct NetworkExport { diff --git a/controller/src/model/network.rs b/controller/src/model/network.rs index cc90b9400..b040e11a9 100644 --- a/controller/src/model/network.rs +++ b/controller/src/model/network.rs @@ -116,7 +116,7 @@ pub struct Network { #[serde(skip_serializing_if = "Option::is_none")] pub mtu: Option, - /// If true the network has access control, which is usually what you want. + /// If true the network has access control, which is usually what you want and is the default if not specified. #[serde(default = "troo")] pub private: bool, @@ -135,7 +135,6 @@ impl Hash for Network { } impl ToString for Network { - #[inline(always)] fn to_string(&self) -> String { zerotier_utils::json::to_json_pretty(self) } @@ -148,7 +147,7 @@ fn troo() -> bool { impl Network { /// Check member IP assignments and return 'true' if IP assignments were created or modified. - pub async fn check_zt_ip_assignments(&self, database: &DatabaseImpl, member: &mut Member) -> bool { + pub async fn assign_ip_addresses(&self, database: &DatabaseImpl, member: &mut Member) -> bool { let mut modified = false; if self.v4_assign_mode.as_ref().map_or(false, |m| m.zt) { diff --git a/network-hypervisor/src/protocol.rs b/network-hypervisor/src/protocol.rs index 2dfa781f0..5628f7a3e 100644 --- a/network-hypervisor/src/protocol.rs +++ b/network-hypervisor/src/protocol.rs @@ -86,6 +86,7 @@ pub mod message_type { pub const VL1_USER_MESSAGE: u8 = 0x14; pub const VL2_MULTICAST_LIKE: u8 = 0x09; + pub const VL2_NETWORK_CREDENTIALS: u8 = 0x0a; pub const VL2_NETWORK_CONFIG_REQUEST: u8 = 0x0b; pub const VL2_NETWORK_CONFIG: u8 = 0x0c; pub const VL2_MULTICAST_GATHER: u8 = 0x0d; diff --git a/network-hypervisor/src/vl1/identity.rs b/network-hypervisor/src/vl1/identity.rs index 2098790cc..2e22e57ae 100644 --- a/network-hypervisor/src/vl1/identity.rs +++ b/network-hypervisor/src/vl1/identity.rs @@ -514,6 +514,13 @@ impl Identity { let mut h = SHA384::new(); assert!(self.write_public(&mut h, false).is_ok()); self.fingerprint = h.finish(); + + // NIST guidelines specify that the left-most N bits of a hash should be taken if it's truncated. + // We want to start the fingerprint with the address, so move the hash over and discard 40 bits. + // We're not even really losing security here since the address is a hash, but NIST would not + // consider it such since it's not a NIST-approved algorithm. + self.fingerprint.copy_within(ADDRESS_SIZE..48, ADDRESS_SIZE); + self.fingerprint[..ADDRESS_SIZE].copy_from_slice(&self.address.to_bytes()); } #[inline(always)] diff --git a/network-hypervisor/src/vl2/networkid.rs b/network-hypervisor/src/vl2/networkid.rs index ab0872d93..38c4a3948 100644 --- a/network-hypervisor/src/vl2/networkid.rs +++ b/network-hypervisor/src/vl2/networkid.rs @@ -18,7 +18,7 @@ use crate::vl1::Address; pub struct NetworkId(NonZeroU64); impl NetworkId { - #[inline(always)] + #[inline] pub fn from_u64(i: u64) -> Option { // Note that we check both that 'i' is non-zero and that the address of the controller is valid. if let Some(ii) = NonZeroU64::new(i) { @@ -29,7 +29,12 @@ impl NetworkId { return None; } - #[inline(always)] + #[inline] + pub fn from_controller_and_network_no(controller: Address, network_no: u64) -> Option { + Self::from_u64(u64::from(controller).wrapping_shl(24) | (network_no & 0xffffff)) + } + + #[inline] pub fn from_bytes(b: &[u8]) -> Option { if b.len() >= 8 { Self::from_bytes_fixed(b[0..8].try_into().unwrap()) @@ -38,12 +43,12 @@ impl NetworkId { } } - #[inline(always)] + #[inline] pub fn from_bytes_fixed(b: &[u8; 8]) -> Option { Self::from_u64(u64::from_be_bytes(*b)) } - #[inline(always)] + #[inline] pub fn to_bytes(&self) -> [u8; 8] { self.0.get().to_be_bytes() } diff --git a/network-hypervisor/src/vl2/v1/certificateofmembership.rs b/network-hypervisor/src/vl2/v1/certificateofmembership.rs index eebedf740..b5a0cd89a 100644 --- a/network-hypervisor/src/vl2/v1/certificateofmembership.rs +++ b/network-hypervisor/src/vl2/v1/certificateofmembership.rs @@ -13,6 +13,14 @@ use zerotier_utils::blob::Blob; use zerotier_utils::error::InvalidParameterError; use zerotier_utils::memory; +/// ZeroTier V1 certificate of membership. +/// +/// The somewhat odd encoding of this is an artifact of an old V1 design choice: certificates are +/// tuples of arbitrary values coupled by how different they are permitted to be (max delta). +/// +/// This was done to permit some things such as geo-fencing that were never implemented, so it's +/// a bit of a case of YAGNI. In V2 this is deprecated in favor of a more standard sort of +/// certificate. #[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct CertificateOfMembership { pub network_id: NetworkId, @@ -50,31 +58,33 @@ impl CertificateOfMembership { q[0] = 0; q[1] = self.timestamp.to_be() as u64; - q[2] = self.max_delta.to_be() as u64; + q[2] = self.max_delta.to_be() as u64; // TTL / "window" in V1 q[3] = 1u64.to_be(); - let nwid: u64 = self.network_id.into(); - q[4] = nwid.to_be(); - q[5] = 0; + q[4] = u64::from(self.network_id).to_be(); + q[5] = 0; // no disagreement permitted q[6] = 2u64.to_be(); - let a: u64 = self.issued_to.into(); - q[7] = a.to_be(); - q[8] = 0xffffffffffffffffu64; // no to_be needed + q[7] = u64::from(self.issued_to).to_be(); + q[8] = u64::MAX; // no to_be needed for all-1s + // This is a fix for a security issue in V1 in which an attacker could (with much CPU use) + // duplciate an identity and insert themselves in place of one after 30-60 days when local + // identity caches expire. The full hash should have been included from the beginning, and + // V2 only ever uses the full hash of the identity to verify credentials. let fp = self.issued_to_fingerprint.as_bytes(); q[9] = 3; q[10] = u64::from_ne_bytes(fp[0..8].try_into().unwrap()); - q[11] = 0xffffffffffffffffu64; + q[11] = u64::MAX; // these will never agree; they're explicitly checked in V1 q[12] = 4; q[13] = u64::from_ne_bytes(fp[8..16].try_into().unwrap()); - q[14] = 0xffffffffffffffffu64; + q[14] = u64::MAX; q[15] = 5; q[16] = u64::from_ne_bytes(fp[16..24].try_into().unwrap()); - q[17] = 0xffffffffffffffffu64; + q[17] = u64::MAX; q[18] = 6; q[19] = u64::from_ne_bytes(fp[24..32].try_into().unwrap()); - q[20] = 0xffffffffffffffffu64; + q[20] = u64::MAX; - *memory::as_byte_array(&q) + memory::to_byte_array(q) } /// Get the identity fingerprint used in V1, which only covers the curve25519 keys. diff --git a/network-hypervisor/src/vl2/v1/revocation.rs b/network-hypervisor/src/vl2/v1/revocation.rs index be7f4f33e..d9c16c535 100644 --- a/network-hypervisor/src/vl2/v1/revocation.rs +++ b/network-hypervisor/src/vl2/v1/revocation.rs @@ -76,4 +76,9 @@ impl Revocation { v } + + #[inline(always)] + pub fn to_bytes(&self, controller_address: Address) -> ArrayVec { + self.internal_to_bytes(false, controller_address) + } } diff --git a/utils/src/memory.rs b/utils/src/memory.rs index 0132ad64c..166d1204e 100644 --- a/utils/src/memory.rs +++ b/utils/src/memory.rs @@ -66,6 +66,14 @@ pub fn as_byte_array(o: &T) -> &[u8; S] { unsafe { &*(o as *const T).cast() } } +/// Transmute an object to a byte array. +/// The template parameter S must equal the size of the object in bytes or this will panic. +#[inline(always)] +pub fn to_byte_array(o: T) -> [u8; S] { + assert_eq!(S, size_of::()); + unsafe { *(&o as *const T).cast() } +} + /// Get a byte array as a flat object. /// /// WARNING: while this is technically safe, care must be taken if the object requires aligned access.