From 862710f553d182aaa0fac908c1294d255940620a Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Sun, 23 Oct 2022 14:52:22 -0700 Subject: [PATCH] Stub out change watcher and simplify it. --- controller/Cargo.toml | 1 + controller/src/database.rs | 10 +++---- controller/src/filedatabase.rs | 51 +++++++++++++++++++++++++++++---- controller/src/handler.rs | 5 ++-- controller/src/model/network.rs | 1 - 5 files changed, 53 insertions(+), 15 deletions(-) diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 1915f3cb8..3406a81ff 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -17,6 +17,7 @@ serde = { version = "^1", features = ["derive"], default-features = false } serde_json = { version = "^1", features = ["std"], default-features = false } serde_yaml = "^0" clap = { version = "^3", features = ["std", "suggestions"], default-features = false } +notify = { version = "^5", features = ["macos_fsevent"], default-features = false } [target."cfg(not(windows))".dependencies] signal-hook = "^0" diff --git a/controller/src/database.rs b/controller/src/database.rs index f5ee9dd6f..b12237800 100644 --- a/controller/src/database.rs +++ b/controller/src/database.rs @@ -7,14 +7,12 @@ use zerotier_utils::tokio::sync::broadcast::Receiver; use crate::model::*; +/// Database change relevant to the controller and that was NOT initiated by the controller. #[derive(Clone)] pub enum Change { - NetworkCreated(Network), - NetworkDeleted(Network), - NetworkChanged(Network, Network), - MemberCreated(Member), - MemberDeleted(Member), - MemberChanged(Member, Member), + NetworkDeleted(NetworkId), + MemberAuthorized(NetworkId, Address), + MemberDeauthorized(NetworkId, Address), } #[async_trait] diff --git a/controller/src/filedatabase.rs b/controller/src/filedatabase.rs index 9d1860512..fae0ea12c 100644 --- a/controller/src/filedatabase.rs +++ b/controller/src/filedatabase.rs @@ -3,16 +3,20 @@ use std::fmt::Display; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; +use notify::{RecursiveMode, Watcher}; use zerotier_network_hypervisor::vl1::{Address, Identity, NodeStorage}; use zerotier_network_hypervisor::vl2::NetworkId; use zerotier_utils::io::{fs_restrict_permissions, read_limit}; -use zerotier_utils::tokio::fs; -use crate::database::Database; +use zerotier_utils::tokio::fs; +use zerotier_utils::tokio::sync::broadcast::{channel, Receiver, Sender}; + +use crate::database::{Change, Database}; use crate::model::*; const IDENTITY_SECRET_FILENAME: &'static str = "identity.secret"; @@ -54,15 +58,42 @@ impl From for FileDatabaseError { pub struct FileDatabase { base_path: PathBuf, controller_address: AtomicU64, + change_sender: Arc>, + watcher: Mutex>, } // TODO: should cache at least hashes and detect changes in the filesystem live. impl FileDatabase { pub async fn new>(base_path: P) -> Result> { - let base: PathBuf = base_path.as_ref().into(); - let _ = fs::create_dir_all(&base).await?; - Ok(Self { base_path: base, controller_address: AtomicU64::new(0) }) + let base_path: PathBuf = base_path.as_ref().into(); + let _ = fs::create_dir_all(&base_path).await?; + + let (change_sender, _) = channel(256); + let change_sender = Arc::new(change_sender); + + let sender = Arc::downgrade(&change_sender); + let mut watcher: Box = + Box::new(notify::recommended_watcher(move |event: notify::Result| { + if let Ok(event) = event { + if let Some(_sender) = sender.upgrade() { + // TODO + match event.kind { + notify::EventKind::Modify(_) => {} + notify::EventKind::Remove(_) => {} + _ => {} + } + } + } + })?); + watcher.watch(&base_path, RecursiveMode::Recursive)?; + + Ok(Self { + base_path: base_path.clone(), + controller_address: AtomicU64::new(0), + change_sender, + watcher: Mutex::new(watcher), + }) } fn get_controller_address(&self) -> Option
{ @@ -90,6 +121,12 @@ impl FileDatabase { } } +impl Drop for FileDatabase { + fn drop(&mut self) { + let _ = self.watcher.lock().unwrap().unwatch(&self.base_path); + } +} + impl NodeStorage for FileDatabase { fn load_node_identity(&self) -> Option { let id_data = read_limit(self.base_path.join(IDENTITY_SECRET_FILENAME), 16384); @@ -178,6 +215,10 @@ impl Database for FileDatabase { Ok(()) } + async fn changes(&self) -> Option> { + Some(self.change_sender.subscribe()) + } + async fn log_request(&self, obj: RequestLogItem) -> Result<(), Self::Error> { println!("{}", obj.to_string()); Ok(()) diff --git a/controller/src/handler.rs b/controller/src/handler.rs index 53a19ef5f..3578e0f09 100644 --- a/controller/src/handler.rs +++ b/controller/src/handler.rs @@ -295,12 +295,11 @@ impl Inner { Ok(()) }, ); + // TODO: log errors } } - async fn handle_change_notification(self: Arc, _change: Change) { - todo!() - } + async fn handle_change_notification(self: Arc, change: Change) {} async fn handle_network_config_request( self: &Arc, diff --git a/controller/src/model/network.rs b/controller/src/model/network.rs index eb03048d1..9ef3e6cb1 100644 --- a/controller/src/model/network.rs +++ b/controller/src/model/network.rs @@ -1,7 +1,6 @@ // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. use std::collections::{HashMap, HashSet}; -use std::error::Error; use std::hash::Hash; use serde::{Deserialize, Serialize};