From 5d9022f815cae30a05316d7a2b9cfcff5fed2f96 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Fri, 11 Nov 2022 18:23:45 -0500 Subject: [PATCH] FileDB and timing fixes. --- controller/src/filedatabase.rs | 255 +++++++++++++++++---------------- utils/src/lib.rs | 4 +- 2 files changed, 131 insertions(+), 128 deletions(-) diff --git a/controller/src/filedatabase.rs b/controller/src/filedatabase.rs index 3fd782e07..01a04832e 100644 --- a/controller/src/filedatabase.rs +++ b/controller/src/filedatabase.rs @@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex, Weak}; use async_trait::async_trait; use notify::{RecursiveMode, Watcher}; +use serde::de::DeserializeOwned; use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage}; use zerotier_network_hypervisor::vl2::NetworkId; @@ -66,121 +67,137 @@ impl FileDatabase { let mut watcher = notify::recommended_watcher(move |event: notify::Result| { if let Ok(event) = event { - if let Some(db) = db_weak.lock().unwrap().upgrade() { - if let Some(controller_address) = db.get_controller_address() { - db.clone().tasks.add( - runtime.spawn(async move { - if let Some(path0) = event.paths.first() { - if let Some((record_type, network_id, node_id)) = - Self::record_type_from_path(controller_address, path0.as_path()) - { - // Paths to objects that were deleted or changed. Changed includes adding new objects. - let mut deleted = None; - let mut changed = None; + match event.kind { + notify::EventKind::Create(_) | notify::EventKind::Modify(_) | notify::EventKind::Remove(_) => { + if let Some(db) = db_weak.lock().unwrap().upgrade() { + if let Some(controller_address) = db.get_controller_address() { + db.clone().tasks.add( + runtime.spawn(async move { + if let Some(path0) = event.paths.first() { + if let Some((record_type, network_id, node_id)) = + Self::record_type_from_path(controller_address, path0.as_path()) + { + // Paths to objects that were deleted or changed. Changed includes adding new objects. + let mut deleted = None; + let mut changed = None; - match event.kind { - notify::EventKind::Create(create_kind) => match create_kind { - notify::event::CreateKind::File => { - changed = Some(path0.as_path()); - } - _ => {} - }, - notify::EventKind::Modify(modify_kind) => match modify_kind { - notify::event::ModifyKind::Data(_) => { - changed = Some(path0.as_path()); - } - notify::event::ModifyKind::Name(rename_mode) => match rename_mode { - notify::event::RenameMode::Both => { - if event.paths.len() >= 2 { - if let Some(path1) = event.paths.last() { + match event.kind { + notify::EventKind::Create(create_kind) => match create_kind { + notify::event::CreateKind::File => { + changed = Some(path0.as_path()); + } + _ => {} + }, + notify::EventKind::Modify(modify_kind) => match modify_kind { + notify::event::ModifyKind::Data(_) => { + changed = Some(path0.as_path()); + } + notify::event::ModifyKind::Name(rename_mode) => match rename_mode { + notify::event::RenameMode::Both => { + if event.paths.len() >= 2 { + if let Some(path1) = event.paths.last() { + deleted = Some(path0.as_path()); + changed = Some(path1.as_path()); + } + } + } + notify::event::RenameMode::From => { deleted = Some(path0.as_path()); - changed = Some(path1.as_path()); } + notify::event::RenameMode::To => { + changed = Some(path0.as_path()); + } + _ => {} + }, + _ => {} + }, + notify::EventKind::Remove(remove_kind) => match remove_kind { + notify::event::RemoveKind::File => { + deleted = Some(path0.as_path()); } - } - notify::event::RenameMode::From => { - deleted = Some(path0.as_path()); - } - notify::event::RenameMode::To => { - changed = Some(path0.as_path()); - } + _ => {} + }, _ => {} - }, - _ => {} - }, - notify::EventKind::Remove(remove_kind) => match remove_kind { - notify::event::RemoveKind::File => { - deleted = Some(path0.as_path()); } - _ => {} - }, - _ => {} - } - if deleted.is_some() { - println!("DELETED: {}", deleted.unwrap().as_os_str().to_string_lossy()); - match record_type { - RecordType::Network => { - if let Some((network, mut members)) = db.cache.on_network_deleted(network_id) { - for m in members.drain(..) { - let _ = db.change_sender.send(Change::MemberDeleted(m)); + if deleted.is_some() { + println!("DELETED: {}", deleted.unwrap().as_os_str().to_string_lossy()); + match record_type { + RecordType::Network => { + if let Some((network, mut members)) = + db.cache.on_network_deleted(network_id) + { + for m in members.drain(..) { + let _ = db.change_sender.send(Change::MemberDeleted(m)); + } + let _ = db.change_sender.send(Change::NetworkDeleted(network)); + } } - let _ = db.change_sender.send(Change::NetworkDeleted(network)); + RecordType::Member => { + if let Some(node_id) = node_id { + if let Some(member) = + db.cache.on_member_deleted(network_id, node_id) + { + let _ = db.change_sender.send(Change::MemberDeleted(member)); + } + } + } + _ => {} } } - RecordType::Member => { - if let Some(node_id) = node_id { - if let Some(member) = db.cache.on_member_deleted(network_id, node_id) { - let _ = db.change_sender.send(Change::MemberDeleted(member)); + + if let Some(changed) = changed { + println!("CHANGED: {}", changed.as_os_str().to_string_lossy()); + match record_type { + RecordType::Network => { + if let Ok(Some(new_network)) = + Self::load_object::(changed).await + { + match db.cache.on_network_updated(new_network.clone()) { + (true, Some(old_network)) => { + let _ = db + .change_sender + .send(Change::NetworkChanged(old_network, new_network)); + } + (true, None) => { + let _ = db + .change_sender + .send(Change::NetworkCreated(new_network)); + } + _ => {} + } + } } + RecordType::Member => { + if let Ok(Some(new_member)) = Self::load_object::(changed).await + { + match db.cache.on_member_updated(new_member.clone()) { + (true, Some(old_member)) => { + let _ = db + .change_sender + .send(Change::MemberChanged(old_member, new_member)); + } + (true, None) => { + let _ = db + .change_sender + .send(Change::MemberCreated(new_member)); + } + _ => {} + } + } + } + _ => {} } } - _ => {} } } - - if let Some(changed) = changed { - println!("CHANGED: {}", changed.as_os_str().to_string_lossy()); - match record_type { - RecordType::Network => { - if let Ok(Some(new_network)) = Self::get_network_internal(changed).await { - match db.cache.on_network_updated(new_network.clone()) { - (true, Some(old_network)) => { - let _ = db - .change_sender - .send(Change::NetworkChanged(old_network, new_network)); - } - (true, None) => { - let _ = db.change_sender.send(Change::NetworkCreated(new_network)); - } - _ => {} - } - } - } - RecordType::Member => { - if let Ok(Some(new_member)) = Self::get_member_internal(changed).await { - match db.cache.on_member_updated(new_member.clone()) { - (true, Some(old_member)) => { - let _ = db - .change_sender - .send(Change::MemberChanged(old_member, new_member)); - } - (true, None) => { - let _ = db.change_sender.send(Change::MemberCreated(new_member)); - } - _ => {} - } - } - } - _ => {} - } - } - } - } - }), - Instant::now().checked_add(EVENT_HANDLER_TASK_TIMEOUT).unwrap(), - ); + }), + Instant::now().checked_add(EVENT_HANDLER_TASK_TIMEOUT).unwrap(), + ); + } + } } + _ => {} } } }) @@ -202,7 +219,7 @@ impl FileDatabase { }); db.cache.load_all(db.as_ref()).await?; - *db_weak_tmp.lock().unwrap() = Arc::downgrade(&db); // this kicks off watcher task too + *db_weak_tmp.lock().unwrap() = Arc::downgrade(&db); // this starts the daemon tasks and starts watching for file changes Ok(db) } @@ -231,40 +248,27 @@ impl FileDatabase { .join(format!("M{}.yaml", member_id.to_string())) } - async fn get_network_internal>(path: P) -> Result, Box> { - let r = fs::read(path).await; - if let Ok(raw) = r { - return Ok(Some(serde_yaml::from_slice::(raw.as_slice())?)); + async fn load_object(path: &Path) -> Result, Box> { + if let Ok(raw) = fs::read(path).await { + return Ok(Some(serde_yaml::from_slice::(raw.as_slice())?)); } else { return Ok(None); } } - async fn get_member_internal>(path: P) -> Result, Box> { - let r = fs::read(path).await; - if let Ok(raw) = r { - Ok(Some(serde_yaml::from_slice::(raw.as_slice())?)) - } else { - Ok(None) - } - } - /// Get record type and also the number after it: network number or address. - fn record_type_from_path>(controller_address: Address, pp: P) -> Option<(RecordType, NetworkId, Option
)> { - let p: &Path = pp.as_ref(); - if p.parent().map_or(false, |p| p.starts_with("N") && p.as_os_str().len() == 7) { - let network_id = NetworkId::from_controller_and_network_no( - controller_address, - u64::from_str_radix(&p.parent().unwrap().to_str().unwrap()[1..], 16).unwrap_or(0), - )?; + fn record_type_from_path(controller_address: Address, p: &Path) -> Option<(RecordType, NetworkId, Option
)> { + let parent = p.parent()?.to_string_lossy(); + if parent.len() == 7 && (parent.starts_with("N") || parent.starts_with('n')) { + let network_id = NetworkId::from_controller_and_network_no(controller_address, u64::from_str_radix(&parent[1..], 16).ok()?)?; if let Some(file_name) = p.file_name().map(|p| p.to_string_lossy().to_lowercase()) { if file_name.eq("config.yaml") { return Some((RecordType::Network, network_id, None)); - } else if file_name.len() == 16 && file_name.starts_with("m") { + } else if file_name.len() == 16 && file_name.starts_with("m") && file_name.ends_with(".yaml") { return Some(( RecordType::Member, network_id, - Some(Address::from_u64(u64::from_str_radix(&file_name.as_str()[1..], 16).unwrap_or(0))?), + Some(Address::from_u64(u64::from_str_radix(&file_name.as_str()[1..11], 16).unwrap_or(0))?), )); } } @@ -327,9 +331,8 @@ impl Database for FileDatabase { Ok(networks) } - #[inline] async fn get_network(&self, id: NetworkId) -> Result, Box> { - let mut network = Self::get_network_internal(&self.network_path(id)).await?; + let mut network = Self::load_object::(self.network_path(id).as_path()).await?; if let Some(network) = network.as_mut() { // FileDatabase stores networks by their "network number" and automatically adapts their IDs // if the controller's identity changes. This is done to make it easy to just clone networks, @@ -354,7 +357,7 @@ impl Database for FileDatabase { async fn list_members(&self, network_id: NetworkId) -> Result, Box> { let mut members = Vec::new(); - let mut dir = fs::read_dir(self.base_path.join(network_id.to_string())).await?; + let mut dir = fs::read_dir(self.base_path.join(format!("N{:06x}", network_id.network_no()))).await?; while let Ok(Some(ent)) = dir.next_entry().await { if ent.file_type().await.map_or(false, |t| t.is_file() || t.is_symlink()) { let osname = ent.file_name(); @@ -375,7 +378,7 @@ impl Database for FileDatabase { } async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result, Box> { - let mut member = Self::get_member_internal(&self.member_path(network_id, node_id)).await?; + let mut member = Self::load_object::(self.member_path(network_id, node_id).as_path()).await?; if let Some(member) = member.as_mut() { if member.network_id != network_id { // Also auto-update member network IDs, see get_network(). diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 07cc68345..4c3d54eec 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -27,8 +27,8 @@ pub mod reaper; #[cfg(feature = "tokio")] pub use tokio; -/// A monotonic ticks value for "never happened" that should be lower than any initial value. -pub const NEVER_HAPPENED_TICKS: i64 = i64::MIN; +/// Initial value that should be used for monotonic tick time variables. +pub const NEVER_HAPPENED_TICKS: i64 = 0; /// Get milliseconds since unix epoch. #[inline]