Stub out change watcher and simplify it.

This commit is contained in:
Adam Ierymenko 2022-10-23 14:52:22 -07:00
parent d07d146260
commit 862710f553
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
5 changed files with 53 additions and 15 deletions

View file

@ -17,6 +17,7 @@ serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false }
serde_yaml = "^0" serde_yaml = "^0"
clap = { version = "^3", features = ["std", "suggestions"], default-features = false } clap = { version = "^3", features = ["std", "suggestions"], default-features = false }
notify = { version = "^5", features = ["macos_fsevent"], default-features = false }
[target."cfg(not(windows))".dependencies] [target."cfg(not(windows))".dependencies]
signal-hook = "^0" signal-hook = "^0"

View file

@ -7,14 +7,12 @@ use zerotier_utils::tokio::sync::broadcast::Receiver;
use crate::model::*; use crate::model::*;
/// Database change relevant to the controller and that was NOT initiated by the controller.
#[derive(Clone)] #[derive(Clone)]
pub enum Change { pub enum Change {
NetworkCreated(Network), NetworkDeleted(NetworkId),
NetworkDeleted(Network), MemberAuthorized(NetworkId, Address),
NetworkChanged(Network, Network), MemberDeauthorized(NetworkId, Address),
MemberCreated(Member),
MemberDeleted(Member),
MemberChanged(Member, Member),
} }
#[async_trait] #[async_trait]

View file

@ -3,16 +3,20 @@ use std::fmt::Display;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use async_trait::async_trait; use async_trait::async_trait;
use notify::{RecursiveMode, Watcher};
use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage}; use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage};
use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::io::{fs_restrict_permissions, read_limit}; use zerotier_utils::io::{fs_restrict_permissions, read_limit};
use zerotier_utils::tokio::fs;
use crate::database::Database; use zerotier_utils::tokio::fs;
use zerotier_utils::tokio::sync::broadcast::{channel, Receiver, Sender};
use crate::database::{Change, Database};
use crate::model::*; use crate::model::*;
const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret"; const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret";
@ -54,15 +58,42 @@ impl From<zerotier_utils::tokio::io::Error> for FileDatabaseError {
pub struct FileDatabase { pub struct FileDatabase {
base_path: PathBuf, base_path: PathBuf,
controller_address: AtomicU64, controller_address: AtomicU64,
change_sender: Arc<Sender<Change>>,
watcher: Mutex<Box<dyn Watcher + Send>>,
} }
// TODO: should cache at least hashes and detect changes in the filesystem live. // TODO: should cache at least hashes and detect changes in the filesystem live.
impl FileDatabase { impl FileDatabase {
pub async fn new<P: AsRef<Path>>(base_path: P) -> Result<Self, Box<dyn Error>> { pub async fn new<P: AsRef<Path>>(base_path: P) -> Result<Self, Box<dyn Error>> {
let base: PathBuf = base_path.as_ref().into(); let base_path: PathBuf = base_path.as_ref().into();
let _ = fs::create_dir_all(&base).await?; let _ = fs::create_dir_all(&base_path).await?;
Ok(Self { base_path: base, controller_address: AtomicU64::new(0) })
let (change_sender, _) = channel(256);
let change_sender = Arc::new(change_sender);
let sender = Arc::downgrade(&change_sender);
let mut watcher: Box<dyn Watcher + Send> =
Box::new(notify::recommended_watcher(move |event: notify::Result<notify::event::Event>| {
if let Ok(event) = event {
if let Some(_sender) = sender.upgrade() {
// TODO
match event.kind {
notify::EventKind::Modify(_) => {}
notify::EventKind::Remove(_) => {}
_ => {}
}
}
}
})?);
watcher.watch(&base_path, RecursiveMode::Recursive)?;
Ok(Self {
base_path: base_path.clone(),
controller_address: AtomicU64::new(0),
change_sender,
watcher: Mutex::new(watcher),
})
} }
fn get_controller_address(&self) -> Option<Address> { fn get_controller_address(&self) -> Option<Address> {
@ -90,6 +121,12 @@ impl FileDatabase {
} }
} }
impl Drop for FileDatabase {
fn drop(&mut self) {
let _ = self.watcher.lock().unwrap().unwatch(&self.base_path);
}
}
impl NodeStorage for FileDatabase { impl NodeStorage for FileDatabase {
fn load_node_identity(&self) -> Option<Identity> { fn load_node_identity(&self) -> Option<Identity> {
let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384); let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384);
@ -178,6 +215,10 @@ impl Database for FileDatabase {
Ok(()) Ok(())
} }
async fn changes(&self) -> Option<Receiver<Change>> {
Some(self.change_sender.subscribe())
}
async fn log_request(&self, obj: RequestLogItem) -> Result<(), Self::Error> { async fn log_request(&self, obj: RequestLogItem) -> Result<(), Self::Error> {
println!("{}", obj.to_string()); println!("{}", obj.to_string());
Ok(()) Ok(())

View file

@ -295,12 +295,11 @@ impl<DatabaseImpl: Database> Inner<DatabaseImpl> {
Ok(()) Ok(())
}, },
); );
// TODO: log errors
} }
} }
async fn handle_change_notification(self: Arc<Self>, _change: Change) { async fn handle_change_notification(self: Arc<Self>, change: Change) {}
todo!()
}
async fn handle_network_config_request<HostSystemImpl: HostSystem + ?Sized>( async fn handle_network_config_request<HostSystemImpl: HostSystem + ?Sized>(
self: &Arc<Self>, self: &Arc<Self>,

View file

@ -1,7 +1,6 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::hash::Hash; use std::hash::Hash;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};