FileDB and timing fixes.

This commit is contained in:
Adam Ierymenko 2022-11-11 18:23:45 -05:00
parent 6bf978d4de
commit 5d9022f815
2 changed files with 131 additions and 128 deletions

View file

@ -6,6 +6,7 @@ use std::sync::{Arc, Mutex, Weak};
use async_trait::async_trait; use async_trait::async_trait;
use notify::{RecursiveMode, Watcher}; use notify::{RecursiveMode, Watcher};
use serde::de::DeserializeOwned;
use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage}; use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage};
use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_network_hypervisor::vl2::NetworkId;
@ -66,6 +67,8 @@ impl FileDatabase {
let mut watcher = notify::recommended_watcher(move |event: notify::Result<notify::event::Event>| { let mut watcher = notify::recommended_watcher(move |event: notify::Result<notify::event::Event>| {
if let Ok(event) = event { if let Ok(event) = event {
match event.kind {
notify::EventKind::Create(_) | notify::EventKind::Modify(_) | notify::EventKind::Remove(_) => {
if let Some(db) = db_weak.lock().unwrap().upgrade() { if let Some(db) = db_weak.lock().unwrap().upgrade() {
if let Some(controller_address) = db.get_controller_address() { if let Some(controller_address) = db.get_controller_address() {
db.clone().tasks.add( db.clone().tasks.add(
@ -121,7 +124,9 @@ impl FileDatabase {
println!("DELETED: {}", deleted.unwrap().as_os_str().to_string_lossy()); println!("DELETED: {}", deleted.unwrap().as_os_str().to_string_lossy());
match record_type { match record_type {
RecordType::Network => { RecordType::Network => {
if let Some((network, mut members)) = db.cache.on_network_deleted(network_id) { if let Some((network, mut members)) =
db.cache.on_network_deleted(network_id)
{
for m in members.drain(..) { for m in members.drain(..) {
let _ = db.change_sender.send(Change::MemberDeleted(m)); let _ = db.change_sender.send(Change::MemberDeleted(m));
} }
@ -130,7 +135,9 @@ impl FileDatabase {
} }
RecordType::Member => { RecordType::Member => {
if let Some(node_id) = node_id { if let Some(node_id) = node_id {
if let Some(member) = db.cache.on_member_deleted(network_id, node_id) { if let Some(member) =
db.cache.on_member_deleted(network_id, node_id)
{
let _ = db.change_sender.send(Change::MemberDeleted(member)); let _ = db.change_sender.send(Change::MemberDeleted(member));
} }
} }
@ -143,7 +150,9 @@ impl FileDatabase {
println!("CHANGED: {}", changed.as_os_str().to_string_lossy()); println!("CHANGED: {}", changed.as_os_str().to_string_lossy());
match record_type { match record_type {
RecordType::Network => { RecordType::Network => {
if let Ok(Some(new_network)) = Self::get_network_internal(changed).await { if let Ok(Some(new_network)) =
Self::load_object::<Network>(changed).await
{
match db.cache.on_network_updated(new_network.clone()) { match db.cache.on_network_updated(new_network.clone()) {
(true, Some(old_network)) => { (true, Some(old_network)) => {
let _ = db let _ = db
@ -151,14 +160,17 @@ impl FileDatabase {
.send(Change::NetworkChanged(old_network, new_network)); .send(Change::NetworkChanged(old_network, new_network));
} }
(true, None) => { (true, None) => {
let _ = db.change_sender.send(Change::NetworkCreated(new_network)); let _ = db
.change_sender
.send(Change::NetworkCreated(new_network));
} }
_ => {} _ => {}
} }
} }
} }
RecordType::Member => { RecordType::Member => {
if let Ok(Some(new_member)) = Self::get_member_internal(changed).await { if let Ok(Some(new_member)) = Self::load_object::<Member>(changed).await
{
match db.cache.on_member_updated(new_member.clone()) { match db.cache.on_member_updated(new_member.clone()) {
(true, Some(old_member)) => { (true, Some(old_member)) => {
let _ = db let _ = db
@ -166,7 +178,9 @@ impl FileDatabase {
.send(Change::MemberChanged(old_member, new_member)); .send(Change::MemberChanged(old_member, new_member));
} }
(true, None) => { (true, None) => {
let _ = db.change_sender.send(Change::MemberCreated(new_member)); let _ = db
.change_sender
.send(Change::MemberCreated(new_member));
} }
_ => {} _ => {}
} }
@ -183,6 +197,9 @@ impl FileDatabase {
} }
} }
} }
_ => {}
}
}
}) })
.expect("FATAL: unable to start filesystem change listener"); .expect("FATAL: unable to start filesystem change listener");
let _ = watcher.configure( let _ = watcher.configure(
@ -202,7 +219,7 @@ impl FileDatabase {
}); });
db.cache.load_all(db.as_ref()).await?; db.cache.load_all(db.as_ref()).await?;
*db_weak_tmp.lock().unwrap() = Arc::downgrade(&db); // this kicks off watcher task too *db_weak_tmp.lock().unwrap() = Arc::downgrade(&db); // this starts the daemon tasks and starts watching for file changes
Ok(db) Ok(db)
} }
@ -231,40 +248,27 @@ impl FileDatabase {
.join(format!("M{}.yaml", member_id.to_string())) .join(format!("M{}.yaml", member_id.to_string()))
} }
async fn get_network_internal<P: AsRef<Path>>(path: P) -> Result<Option<Network>, Box<dyn Error + Send + Sync>> { async fn load_object<O: DeserializeOwned>(path: &Path) -> Result<Option<O>, Box<dyn Error + Send + Sync>> {
let r = fs::read(path).await; if let Ok(raw) = fs::read(path).await {
if let Ok(raw) = r { return Ok(Some(serde_yaml::from_slice::<O>(raw.as_slice())?));
return Ok(Some(serde_yaml::from_slice::<Network>(raw.as_slice())?));
} else { } else {
return Ok(None); return Ok(None);
} }
} }
async fn get_member_internal<P: AsRef<Path>>(path: P) -> Result<Option<Member>, Box<dyn Error + Send + Sync>> {
let r = fs::read(path).await;
if let Ok(raw) = r {
Ok(Some(serde_yaml::from_slice::<Member>(raw.as_slice())?))
} else {
Ok(None)
}
}
/// Get record type and also the number after it: network number or address. /// Get record type and also the number after it: network number or address.
fn record_type_from_path<P: AsRef<Path>>(controller_address: Address, pp: P) -> Option<(RecordType, NetworkId, Option<Address>)> { fn record_type_from_path(controller_address: Address, p: &Path) -> Option<(RecordType, NetworkId, Option<Address>)> {
let p: &Path = pp.as_ref(); let parent = p.parent()?.to_string_lossy();
if p.parent().map_or(false, |p| p.starts_with("N") && p.as_os_str().len() == 7) { if parent.len() == 7 && (parent.starts_with("N") || parent.starts_with('n')) {
let network_id = NetworkId::from_controller_and_network_no( let network_id = NetworkId::from_controller_and_network_no(controller_address, u64::from_str_radix(&parent[1..], 16).ok()?)?;
controller_address,
u64::from_str_radix(&p.parent().unwrap().to_str().unwrap()[1..], 16).unwrap_or(0),
)?;
if let Some(file_name) = p.file_name().map(|p| p.to_string_lossy().to_lowercase()) { if let Some(file_name) = p.file_name().map(|p| p.to_string_lossy().to_lowercase()) {
if file_name.eq("config.yaml") { if file_name.eq("config.yaml") {
return Some((RecordType::Network, network_id, None)); return Some((RecordType::Network, network_id, None));
} else if file_name.len() == 16 && file_name.starts_with("m") { } else if file_name.len() == 16 && file_name.starts_with("m") && file_name.ends_with(".yaml") {
return Some(( return Some((
RecordType::Member, RecordType::Member,
network_id, network_id,
Some(Address::from_u64(u64::from_str_radix(&file_name.as_str()[1..], 16).unwrap_or(0))?), Some(Address::from_u64(u64::from_str_radix(&file_name.as_str()[1..11], 16).unwrap_or(0))?),
)); ));
} }
} }
@ -327,9 +331,8 @@ impl Database for FileDatabase {
Ok(networks) Ok(networks)
} }
#[inline]
async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error + Send + Sync>> { async fn get_network(&self, id: NetworkId) -> Result<Option<Network>, Box<dyn Error + Send + Sync>> {
let mut network = Self::get_network_internal(&self.network_path(id)).await?; let mut network = Self::load_object::<Network>(self.network_path(id).as_path()).await?;
if let Some(network) = network.as_mut() { if let Some(network) = network.as_mut() {
// FileDatabase stores networks by their "network number" and automatically adapts their IDs // FileDatabase stores networks by their "network number" and automatically adapts their IDs
// if the controller's identity changes. This is done to make it easy to just clone networks, // if the controller's identity changes. This is done to make it easy to just clone networks,
@ -354,7 +357,7 @@ impl Database for FileDatabase {
async fn list_members(&self, network_id: NetworkId) -> Result<Vec<Address>, Box<dyn Error + Send + Sync>> { async fn list_members(&self, network_id: NetworkId) -> Result<Vec<Address>, Box<dyn Error + Send + Sync>> {
let mut members = Vec::new(); let mut members = Vec::new();
let mut dir = fs::read_dir(self.base_path.join(network_id.to_string())).await?; let mut dir = fs::read_dir(self.base_path.join(format!("N{:06x}", network_id.network_no()))).await?;
while let Ok(Some(ent)) = dir.next_entry().await { while let Ok(Some(ent)) = dir.next_entry().await {
if ent.file_type().await.map_or(false, |t| t.is_file() || t.is_symlink()) { if ent.file_type().await.map_or(false, |t| t.is_file() || t.is_symlink()) {
let osname = ent.file_name(); let osname = ent.file_name();
@ -375,7 +378,7 @@ impl Database for FileDatabase {
} }
async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Box<dyn Error + Send + Sync>> { async fn get_member(&self, network_id: NetworkId, node_id: Address) -> Result<Option<Member>, Box<dyn Error + Send + Sync>> {
let mut member = Self::get_member_internal(&self.member_path(network_id, node_id)).await?; let mut member = Self::load_object::<Member>(self.member_path(network_id, node_id).as_path()).await?;
if let Some(member) = member.as_mut() { if let Some(member) = member.as_mut() {
if member.network_id != network_id { if member.network_id != network_id {
// Also auto-update member network IDs, see get_network(). // Also auto-update member network IDs, see get_network().

View file

@ -27,8 +27,8 @@ pub mod reaper;
#[cfg(feature = "tokio")] #[cfg(feature = "tokio")]
pub use tokio; pub use tokio;
/// A monotonic ticks value for "never happened" that should be lower than any initial value. /// Initial value that should be used for monotonic tick time variables.
pub const NEVER_HAPPENED_TICKS: i64 = i64::MIN; pub const NEVER_HAPPENED_TICKS: i64 = 0;
/// Get milliseconds since unix epoch. /// Get milliseconds since unix epoch.
#[inline] #[inline]