diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 5284120fd..1915f3cb8 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -15,6 +15,7 @@ zerotier-vl1-service = { path = "../vl1-service" } async-trait = "^0" serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } +serde_yaml = "^0" clap = { version = "^3", features = ["std", "suggestions"], default-features = false } [target."cfg(not(windows))".dependencies] diff --git a/controller/src/database.rs b/controller/src/database.rs index d4fd2ee6f..abc032436 100644 --- a/controller/src/database.rs +++ b/controller/src/database.rs @@ -5,8 +5,20 @@ use async_trait::async_trait; use zerotier_network_hypervisor::vl1::{Address, InetAddress, NodeStorage}; use zerotier_network_hypervisor::vl2::NetworkId; +use zerotier_utils::tokio::sync::broadcast::Receiver; + use crate::model::*; +#[derive(Clone)] +pub enum Change { + NetworkCreated(Network), + NetworkDeleted(Network), + NetworkChanged(Network, Network), + MemberCreated(Member), + MemberDeleted(Member), + MemberChanged(Member, Member), +} + #[async_trait] pub trait Database: Sync + Send + NodeStorage + 'static { async fn get_network(&self, id: NetworkId) -> Result, Box>; @@ -16,6 +28,21 @@ pub trait Database: Sync + Send + NodeStorage + 'static { async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Box>; async fn save_member(&self, obj: Member) -> Result<(), Box>; + /// Get a receiver that can be used to receive changes made to networks and members, if supported. + /// + /// The receiver returned is a broadcast receiver. This can be called more than once if there are + /// multiple parts of the controller that listen. + /// + /// Changes should NOT be broadcast on call to save_network() or save_member(). They should only + /// be broadcast when externally generated changes occur. + /// + /// The default implementation returns None indicating that change following is not supported. + /// Change following is required for instant deauthorization with revocations and other instant + /// changes in response to modifications to network and member configuration. + async fn changes(&self) -> Option> { + None + } + /// List members deauthorized after a given time (milliseconds since epoch). /// /// The default trait implementation uses a brute force method. This should be reimplemented if a diff --git a/controller/src/filedatabase.rs b/controller/src/filedatabase.rs index be5d4eb65..298516960 100644 --- a/controller/src/filedatabase.rs +++ b/controller/src/filedatabase.rs @@ -1,6 +1,7 @@ use std::error::Error; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; use async_trait::async_trait; @@ -8,10 +9,7 @@ use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage}; use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_utils::io::{fs_restrict_permissions, read_limit}; -use zerotier_utils::json::to_json_pretty; use zerotier_utils::tokio::fs; -use zerotier_utils::tokio::io::AsyncWriteExt; -use zerotier_utils::tokio::sync::Mutex; use crate::database::Database; use crate::model::*; @@ -25,32 +23,46 @@ const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret"; /// the cache. The cache will also contain any ephemeral data, generated data, etc. pub struct FileDatabase { base_path: PathBuf, - old_log: Mutex, + controller_address: AtomicU64, } -fn network_path(base: &PathBuf, network_id: NetworkId) -> PathBuf { - base.join(network_id.to_string()).join(format!("n{}.json", network_id.to_string())) -} - -fn member_path(base: &PathBuf, network_id: NetworkId, member_id: Address) -> PathBuf { - base.join(network_id.to_string()).join(format!("m{}.json", member_id.to_string())) -} +// TODO: should cache at least hashes and detect changes in the filesystem live. impl FileDatabase { pub async fn new>(base_path: P) -> Result> { let base: PathBuf = base_path.as_ref().into(); - let changelog = base.join("_history"); let _ = fs::create_dir_all(&base).await?; - Ok(Self { - base_path: base, - old_log: Mutex::new(fs::OpenOptions::new().append(true).create(true).open(changelog).await?), - }) + Ok(Self { base_path: base, controller_address: AtomicU64::new(0) }) + } + + fn get_controller_address(&self) -> Option
{ + let a = self.controller_address.load(Ordering::Relaxed); + if a == 0 { + if let Some(id) = self.load_node_identity() { + self.controller_address.store(id.address.into(), Ordering::Relaxed); + Some(id.address) + } else { + None + } + } else { + Address::from_u64(a) + } + } + + fn network_path(&self, network_id: NetworkId) -> PathBuf { + self.base_path.join(format!("n{:06x}", network_id.network_no())).join("config.yaml") + } + + fn member_path(&self, network_id: NetworkId, member_id: Address) -> PathBuf { + self.base_path + .join(format!("n{:06x}", network_id.network_no())) + .join(format!("m{}.yaml", member_id.to_string())) } } impl NodeStorage for FileDatabase { fn load_node_identity(&self) -> Option { - let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 4096); + let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384); if id_data.is_err() { return None; } @@ -73,29 +85,23 @@ impl NodeStorage for FileDatabase { #[async_trait] impl Database for FileDatabase { async fn get_network(&self, id: NetworkId) -> Result, Box> { - let r = fs::read(network_path(&self.base_path, id)).await; + let r = fs::read(self.network_path(id)).await; if let Ok(raw) = r { - Ok(Some(serde_json::from_slice::(raw.as_slice())?)) + let mut network = serde_yaml::from_slice::(raw.as_slice())?; + self.get_controller_address() + .map(|a| network.id = network.id.change_network_controller(a)); + Ok(Some(network)) + //Ok(Some(serde_json::from_slice::(raw.as_slice())?)) } else { Ok(None) } } async fn save_network(&self, obj: Network) -> Result<(), Box> { - let base_network_path = network_path(&self.base_path, obj.id); + let base_network_path = self.network_path(obj.id); let _ = fs::create_dir_all(base_network_path.parent().unwrap()).await; - - let prev = self.get_network(obj.id).await?; - if let Some(prev) = prev { - if obj == prev { - return Ok(()); - } - let mut j = zerotier_utils::json::to_json(&prev); - j.push('\n'); - let _ = self.old_log.lock().await.write_all(j.as_bytes()).await?; - } - - let _ = fs::write(base_network_path, to_json_pretty(&obj).as_bytes()).await?; + //let _ = fs::write(base_network_path, to_json_pretty(&obj).as_bytes()).await?; + let _ = fs::write(base_network_path, serde_yaml::to_string(&obj)?.as_bytes()).await?; return Ok(()); } @@ -107,7 +113,7 @@ impl Database for FileDatabase { let name = osname.to_string_lossy(); if name.len() == (zerotier_network_hypervisor::protocol::ADDRESS_SIZE_STRING + 6) && name.starts_with("m") - && name.ends_with(".json") + && name.ends_with(".yaml") { if let Ok(member_address) = u64::from_str_radix(&name[1..11], 16) { if let Some(member_address) = Address::from_u64(member_address) { @@ -120,29 +126,23 @@ impl Database for FileDatabase { } async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Box> { - let r = fs::read(member_path(&self.base_path, network_id, node_id)).await; + let r = fs::read(self.member_path(network_id, node_id)).await; if let Ok(raw) = r { - Ok(Some(serde_json::from_slice::(raw.as_slice())?)) + let mut member = serde_yaml::from_slice::(raw.as_slice())?; + self.get_controller_address() + .map(|a| member.network_id = member.network_id.change_network_controller(a)); + Ok(Some(member)) + //Ok(Some(serde_json::from_slice::(raw.as_slice())?)) } else { Ok(None) } } async fn save_member(&self, obj: Member) -> Result<(), Box> { - let base_member_path = member_path(&self.base_path, obj.network_id, obj.node_id); + let base_member_path = self.member_path(obj.network_id, obj.node_id); let _ = fs::create_dir_all(base_member_path.parent().unwrap()).await; - - let prev = self.get_member(obj.network_id, obj.node_id).await?; - if let Some(prev) = prev { - if obj == prev { - return Ok(()); - } - let mut j = zerotier_utils::json::to_json(&prev); - j.push('\n'); - let _ = self.old_log.lock().await.write_all(j.as_bytes()).await?; - } - - let _ = fs::write(base_member_path, to_json_pretty(&obj).as_bytes()).await?; + //let _ = fs::write(base_member_path, to_json_pretty(&obj).as_bytes()).await?; + let _ = fs::write(base_member_path, serde_yaml::to_string(&obj)?.as_bytes()).await?; Ok(()) } diff --git a/controller/src/handler.rs b/controller/src/handler.rs index 506af62ac..47fe7d7d7 100644 --- a/controller/src/handler.rs +++ b/controller/src/handler.rs @@ -9,30 +9,69 @@ use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer, DEFAULT_MULTICA use zerotier_network_hypervisor::vl1::{HostSystem, Identity, InnerProtocol, Node, PacketHandlerResult, Path, PathFilter, Peer}; use zerotier_network_hypervisor::vl2::{CertificateOfMembership, CertificateOfOwnership, NetworkConfig, NetworkId, Tag}; use zerotier_utils::dictionary::Dictionary; -use zerotier_utils::error::UnexpectedError; +use zerotier_utils::error::{InvalidParameterError, UnexpectedError}; use zerotier_utils::ms_since_epoch; use zerotier_utils::reaper::Reaper; use zerotier_utils::tokio; -use crate::database::Database; +use crate::database::*; use crate::model::{AuthorizationResult, Member, CREDENTIAL_WINDOW_SIZE_DEFAULT}; const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); pub struct Handler { + inner: Arc>, + change_watcher: Option>, +} + +struct Inner { reaper: Reaper, runtime: tokio::runtime::Handle, - inner: Arc>, + database: Arc, + local_identity: Identity, } impl Handler { - pub fn new(database: Arc, runtime: tokio::runtime::Handle, local_identity: Identity) -> Arc { - assert!(local_identity.secret.is_some()); - Arc::new(Self { - reaper: Reaper::new(&runtime), - runtime, - inner: Arc::new(Inner:: { database, local_identity }), - }) + pub async fn new(database: Arc, runtime: tokio::runtime::Handle) -> Result, Box> { + if let Some(local_identity) = database.load_node_identity() { + assert!(local_identity.secret.is_some()); + + let inner = Arc::new(Inner:: { + reaper: Reaper::new(&runtime), + runtime, + database: database.clone(), + local_identity, + }); + + let h = Arc::new(Self { + inner: inner.clone(), + change_watcher: database.changes().await.map(|mut ch| { + let inner2 = inner.clone(); + inner.runtime.spawn(async move { + loop { + if let Ok(change) = ch.recv().await { + inner2.reaper.add( + inner2.runtime.spawn(inner2.clone().handle_change_notification(change)), + Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), + ); + } + } + }) + }), + }); + + Ok(h) + } else { + Err(Box::new(InvalidParameterError( + "local controller's identity not readable by database", + ))) + } + } +} + +impl Drop for Handler { + fn drop(&mut self) { + let _ = self.change_watcher.take().map(|w| w.abort()); } } @@ -116,10 +155,10 @@ impl InnerProtocol for Handler { // Launch handler as an async background task. let (inner, source2, source_path2) = (self.inner.clone(), source.clone(), source_path.clone()); - self.reaper.add( - self.runtime.spawn(async move { + self.inner.reaper.add( + self.inner.runtime.spawn(async move { // TODO: log errors - let _ = inner.handle_network_config_request( + let result = inner.handle_network_config_request( source2, source_path2, message_id, @@ -172,12 +211,11 @@ impl InnerProtocol for Handler { } } -struct Inner { - database: Arc, - local_identity: Identity, -} - impl Inner { + async fn handle_change_notification(self: Arc, _change: Change) { + todo!() + } + async fn handle_network_config_request( self: Arc, source: Arc>, @@ -214,7 +252,7 @@ impl Inner { let mut authorized = member.as_ref().map_or(false, |m| m.authorized()); if !authorized { if member.is_none() { - if network.learn_members { + if network.learn_members.unwrap_or(true) { let _ = member.insert(Member::new_with_identity(source.identity.clone(), network_id)); member_changed = true; } else { diff --git a/controller/src/main.rs b/controller/src/main.rs index 2aae68638..750fe6814 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -16,35 +16,40 @@ use zerotier_utils::tokio::runtime::Runtime; use zerotier_vl1_service::VL1Service; async fn run(database: Arc, runtime: &Runtime) -> i32 { - let handler = Handler::new(database.clone(), runtime.handle().clone(), todo!()); - - let svc = VL1Service::new( - database.clone(), - handler.clone(), - handler.clone(), - zerotier_vl1_service::VL1Settings::default(), - ); - if svc.is_ok() { - let svc = svc.unwrap(); - svc.node().init_default_roots(); - - // Wait for kill signal on Unix-like platforms. - #[cfg(unix)] - { - let term = Arc::new(AtomicBool::new(false)); - let _ = signal_hook::flag::register(libc::SIGINT, term.clone()); - let _ = signal_hook::flag::register(libc::SIGTERM, term.clone()); - let _ = signal_hook::flag::register(libc::SIGQUIT, term.clone()); - while !term.load(Ordering::Relaxed) { - std::thread::sleep(Duration::from_secs(1)); - } - } - - println!("Terminate signal received, shutting down..."); - exitcode::OK + let handler = Handler::new(database.clone(), runtime.handle().clone()).await; + if handler.is_err() { + eprintln!("FATAL: error initializing handler: {}", handler.err().unwrap().to_string()); + exitcode::ERR_CONFIG } else { - eprintln!("FATAL: error launching service: {}", svc.err().unwrap().to_string()); - exitcode::ERR_IOERR + let handler = handler.unwrap(); + let svc = VL1Service::new( + database.clone(), + handler.clone(), + handler.clone(), + zerotier_vl1_service::VL1Settings::default(), + ); + if svc.is_ok() { + let svc = svc.unwrap(); + svc.node().init_default_roots(); + + // Wait for kill signal on Unix-like platforms. + #[cfg(unix)] + { + let term = Arc::new(AtomicBool::new(false)); + let _ = signal_hook::flag::register(libc::SIGINT, term.clone()); + let _ = signal_hook::flag::register(libc::SIGTERM, term.clone()); + let _ = signal_hook::flag::register(libc::SIGQUIT, term.clone()); + while !term.load(Ordering::Relaxed) { + std::thread::sleep(Duration::from_secs(1)); + } + } + + println!("Terminate signal received, shutting down..."); + exitcode::OK + } else { + eprintln!("FATAL: error launching service: {}", svc.err().unwrap().to_string()); + exitcode::ERR_IOERR + } } } @@ -82,14 +87,23 @@ fn main() { if let Ok(tokio_runtime) = zerotier_utils::tokio::runtime::Builder::new_multi_thread().enable_all().build() { tokio_runtime.block_on(async { if let Some(filedb_base_path) = global_args.value_of("filedb") { - std::process::exit(run(Arc::new(FileDatabase::new(filedb_base_path).await.unwrap()), &tokio_runtime).await); + let file_db = FileDatabase::new(filedb_base_path).await; + if file_db.is_err() { + eprintln!( + "FATAL: unable to open filesystem database at {}: {}", + filedb_base_path, + file_db.as_ref().err().unwrap().to_string() + ); + std::process::exit(exitcode::ERR_IOERR) + } + std::process::exit(run(Arc::new(file_db.unwrap()), &tokio_runtime).await); } else { eprintln!("FATAL: no database type selected."); std::process::exit(exitcode::ERR_USAGE); }; }); } else { - eprintln!("FATAL: error launching service: can't start async runtime"); + eprintln!("FATAL: can't start async runtime"); std::process::exit(exitcode::ERR_IOERR) } } diff --git a/controller/src/model/member.rs b/controller/src/model/member.rs index b2d54dc7d..393c5ba6c 100644 --- a/controller/src/model/member.rs +++ b/controller/src/model/member.rs @@ -18,51 +18,61 @@ pub struct Member { pub network_id: NetworkId, /// Pinned full member identity, if known. + #[serde(skip_serializing_if = "Option::is_none")] pub identity: Option, /// A short name that can also be used for DNS, etc. + #[serde(skip_serializing_if = "String::is_empty")] #[serde(default)] pub name: String, /// Time member was most recently authorized, None for 'never'. + #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "lastAuthorizedTime")] pub last_authorized_time: Option, /// Time member was most recently deauthorized, None for 'never'. + #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "lastDeauthorizedTime")] pub last_deauthorized_time: Option, /// ZeroTier-managed IP assignments. + #[serde(skip_serializing_if = "HashSet::is_empty")] #[serde(rename = "ipAssignments")] #[serde(default)] pub ip_assignments: HashSet, /// If true, do not auto-assign IPs in the controller. + #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "noAutoAssignIps")] #[serde(default)] - pub no_auto_assign_ips: bool, + pub no_auto_assign_ips: Option, /// If true this member is a full Ethernet bridge. + #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "activeBridge")] #[serde(default)] - pub bridge: bool, + pub bridge: Option, /// Tags that can be used in rule evaluation for ACL-like behavior. + #[serde(skip_serializing_if = "HashMap::is_empty")] #[serde(default)] pub tags: HashMap, /// Member is exempt from SSO, authorization managed conventionally. + #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "ssoExempt")] #[serde(default)] - pub sso_exempt: bool, + pub sso_exempt: Option, /// If true this node is explicitly listed in every member's network configuration. /// This is only supported for V2 nodes. - #[serde(rename = "advertised")] + #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] - pub advertised: bool, + pub advertised: Option, /// API object type documentation field, not actually edited/used. + #[serde(skip_deserializing)] #[serde(default = "ObjectType::member")] pub objtype: ObjectType, } @@ -77,11 +87,11 @@ impl Member { last_authorized_time: None, last_deauthorized_time: None, ip_assignments: HashSet::new(), - no_auto_assign_ips: false, - bridge: false, + no_auto_assign_ips: None, + bridge: None, tags: HashMap::new(), - sso_exempt: false, - advertised: false, + sso_exempt: None, + advertised: None, objtype: ObjectType::Member, } } diff --git a/controller/src/model/network.rs b/controller/src/model/network.rs index f29e3e5f5..b9148ebb0 100644 --- a/controller/src/model/network.rs +++ b/controller/src/model/network.rs @@ -45,18 +45,21 @@ pub struct Network { pub id: NetworkId, /// Network name that's sent to network members + #[serde(skip_serializing_if = "String::is_empty")] #[serde(default)] pub name: String, /// Guideline for the maximum number of multicast recipients on a network (not a hard limit). /// Setting to zero disables multicast entirely. The default is used if this is not set. + #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "multicastLimit")] pub multicast_limit: Option, /// If true, this network supports ff:ff:ff:ff:ff:ff Ethernet broadcast. + #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "enableBroadcast")] - #[serde(default = "troo")] - pub enable_broadcast: bool, + #[serde(default)] + pub enable_broadcast: Option, /// Auto IP assignment mode(s) for IPv4 addresses. #[serde(rename = "v4AssignMode")] @@ -69,18 +72,22 @@ pub struct Network { pub v6_assign_mode: Ipv6AssignMode, /// IPv4 or IPv6 auto-assignment pools available, must be present to use 'zt' mode. + #[serde(skip_serializing_if = "HashSet::is_empty")] #[serde(rename = "ipAssignmentPools")] #[serde(default)] pub ip_assignment_pools: HashSet, /// IPv4 or IPv6 routes to advertise. + #[serde(skip_serializing_if = "HashSet::is_empty")] #[serde(default)] pub ip_routes: HashSet, /// DNS records to push to members. + #[serde(skip_serializing_if = "HashMap::is_empty")] pub dns: HashMap>, /// Network rule set. + #[serde(skip_serializing_if = "Vec::is_empty")] #[serde(default)] pub rules: Vec, @@ -92,6 +99,7 @@ pub struct Network { /// promptly, so nodes will still deauthorize quickly even if the window is long. /// /// Usually this does not need to be changed. + #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "credentialWindowSize")] pub credential_window_size: Option, @@ -103,10 +111,12 @@ pub struct Network { pub private: bool, /// If true this network will add not-authorized members for anyone who requests a config. - #[serde(default = "troo")] - pub learn_members: bool, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub learn_members: Option, /// Static object type field for use with API. + #[serde(skip_deserializing)] #[serde(default = "ObjectType::network")] pub objtype: ObjectType, } diff --git a/network-hypervisor/src/protocol.rs b/network-hypervisor/src/protocol.rs index bc43d5830..c74e46896 100644 --- a/network-hypervisor/src/protocol.rs +++ b/network-hypervisor/src/protocol.rs @@ -126,6 +126,9 @@ pub const ADDRESS_SIZE_STRING: usize = 10; /// Prefix indicating reserved addresses (that can't actually be addresses). pub const ADDRESS_RESERVED_PREFIX: u8 = 0xff; +/// Bit mask for address bits in a u64. +pub const ADDRESS_MASK: u64 = 0xffffffffff; + pub(crate) mod v1 { use super::*; diff --git a/network-hypervisor/src/vl2/networkid.rs b/network-hypervisor/src/vl2/networkid.rs index 90a78d3aa..8325d7b66 100644 --- a/network-hypervisor/src/vl2/networkid.rs +++ b/network-hypervisor/src/vl2/networkid.rs @@ -10,6 +10,9 @@ use zerotier_utils::error::InvalidFormatError; use zerotier_utils::hex; use zerotier_utils::hex::HEX_CHARS; +use crate::protocol::ADDRESS_MASK; +use crate::vl1::Address; + #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] #[repr(transparent)] pub struct NetworkId(NonZeroU64); @@ -17,7 +20,13 @@ pub struct NetworkId(NonZeroU64); impl NetworkId { #[inline(always)] pub fn from_u64(i: u64) -> Option { - NonZeroU64::new(i).map(|i| Self(i)) + // Note that we check both that 'i' is non-zero and that the address of the controller is valid. + if let Some(ii) = NonZeroU64::new(i) { + if Address::from_u64(i & ADDRESS_MASK).is_some() { + return Some(Self(ii)); + } + } + return None; } #[inline(always)] @@ -38,6 +47,24 @@ impl NetworkId { pub fn to_bytes(&self) -> [u8; 8] { self.0.get().to_be_bytes() } + + /// Get the network controller ID for this network, which is the most significant 40 bits. + #[inline] + pub fn network_controller(&self) -> Address { + Address::from_u64(self.0.get()).unwrap() + } + + /// Consume this network ID and return one with the same network number but a different controller ID. + pub fn change_network_controller(self, new_controller: Address) -> NetworkId { + let new_controller: u64 = new_controller.into(); + Self(NonZeroU64::new((self.network_no() as u64) | new_controller.wrapping_shr(24)).unwrap()) + } + + /// Get the 24-bit local network identifier minus the 40-bit controller address portion. + #[inline] + pub fn network_no(&self) -> u32 { + (self.0.get() & 0xffffff) as u32 + } } impl From for u64 {