diff --git a/rust-zerotier-service/src/log.rs b/rust-zerotier-service/src/log.rs index 7fffc3c7c..6a6643b16 100644 --- a/rust-zerotier-service/src/log.rs +++ b/rust-zerotier-service/src/log.rs @@ -18,6 +18,7 @@ use std::io::{Write, Seek, SeekFrom}; use std::cell::Cell; use zerotier_core::PortableAtomicI64; use chrono::Datelike; +use std::fmt::Display; pub struct Log { prefix: String, @@ -40,15 +41,15 @@ impl Log { path: String::from(path), file: Mutex::new(Cell::new(None)), cur_size: PortableAtomicI64::new(0), - max_size: AtomicUsize::new(if max_size < MIN_MAX_SIZE { MIN_MAX_SIZE } else { max_size }), + max_size: AtomicUsize::new(if max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { max_size }), } } pub fn set_max_size(&self, new_max_size: usize) { - self.max_size.store(if new_max_size < MIN_MAX_SIZE { MIN_MAX_SIZE } else { new_max_size },Ordering::Relaxed); + self.max_size.store(if new_max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { new_max_size },Ordering::Relaxed); } - pub fn log(&self, s: &S) { + pub fn log(&self, s: &S) { let mut fc = self.file.lock().unwrap(); let max_size = self.max_size.load(Ordering::Relaxed); @@ -74,15 +75,16 @@ impl Log { if eof.is_err() { return; } - cur_size = eof.unwrap() as i64; + self.cur_size.set(eof.unwrap() as i64); fc.replace(Some(f)); } let mut f = fc.get_mut().as_mut().unwrap(); let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); - writeln!("{}[{}] {}", self.prefix.as_str(), now_str.as_str(), s); + let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), s); + let _ = f.write_all(log_line.as_bytes()); let _ = f.flush(); - self.cur_size.fetch_add((now_str_b.len() + sb.len() + 1) as i64); + self.cur_size.fetch_add(log_line.len() as i64); } } diff --git a/rust-zerotier-service/src/main.rs b/rust-zerotier-service/src/main.rs index b3ebe514b..219b8a64b 100644 --- a/rust-zerotier-service/src/main.rs +++ b/rust-zerotier-service/src/main.rs @@ -27,6 +27,8 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; +use futures::stream::{self, StreamExt}; + use warp::Filter; use zerotier_core::*; @@ -36,6 +38,7 @@ use crate::localconfig::*; use crate::log::Log; use crate::physicallink::PhysicalLink; use crate::network::Network; +use futures::TryFutureExt; pub struct ServiceEventHandler {} @@ -85,34 +88,43 @@ fn main() { let mut udp_sockets: BTreeMap> = BTreeMap::new(); let handler: Arc = Arc::new(ServiceEventHandler{}); let run: AtomicBool = AtomicBool::new(true); + let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::(2); + + // + // The inner loop periodically updates UDP socket bindings and does other housekeeping, but + // otherwise does nothing. If it detects that the primary port has changed, it breaks and + // causes the outer loop to run which reboots the HTTP server. If the 'run' flag is set + // to false this causes a break of both loops which terminates the service. + // loop { let mut warp_server_port = local_config.settings.primary_port; + + let root = warp::path::end().map(|| { warp::reply::with_status("not found", warp::hyper::StatusCode::NOT_FOUND) }); + let status = warp::path("status").map(|| { "status" }); + let network = warp::path!("network" / String).map(|nwid_str| { "network" }); + let peer = warp::path!("peer" / String).map(|peer_str| { "peer" }); + + let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); + let (_, warp_server) = warp::serve(warp::any().and( + root + .or(status) + .or(network) + .or(peer) + )).bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async { + let _ = shutdown_rx.await; + }); + let warp_server = tokio::spawn(warp_server); + + let mut loop_delay = 10; loop { - let root = warp::path::end().map(|| { warp::reply::with_status("not found", warp::hyper::StatusCode::NOT_FOUND) }); - let status = warp::path("status").map(|| { "status" }); - let network = warp::path!("network" / String).map(|nwid_str| { "network" }); - let peer = warp::path!("peer" / String).map(|peer_str| { "peer" }); + let _ = tokio::time::timeout(Duration::from_secs(loop_delay), interrupt_rx.next()); - let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); - let (_, warp_server) = warp::serve(warp::any().and( - root - .or(status) - .or(network) - .or(peer) - )).bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async { - let _ = shutdown_rx.await; - }); - let warp_server = tokio::spawn(warp_server); - - tokio::time::sleep(Duration::from_secs(10)).await; - - // Diff system addresses against currently bound UDP sockets and update as needed. - // Also check interface prefix blacklists. + // Enumerate physical addresses on the system, creating a map with an entry for + // the primary_port and another for the secondary_port if bound. let mut system_addrs: BTreeMap> = BTreeMap::new(); PhysicalLink::map(|link: PhysicalLink| { if !local_config.settings.is_interface_blacklisted(link.device.as_str()) { - // Add two entries to system_addrs: one for primary port, and one for secondary if enabled. let l = Rc::new(link); let mut a = l.address.clone(); a.set_port(local_config.settings.primary_port); @@ -124,6 +136,7 @@ fn main() { } } }); + let mut udp_sockets_to_close: Vec = Vec::new(); for sock in udp_sockets.iter() { if !system_addrs.contains_key(sock.0) { @@ -133,6 +146,7 @@ fn main() { for k in udp_sockets_to_close.iter() { udp_sockets.remove(k); } + for addr in system_addrs.iter() { if !udp_sockets.contains_key(addr.0) { let s = FastUDPSocket::new(addr.1.device.as_str(), addr.0, &handler); @@ -142,13 +156,13 @@ fn main() { } } - // Breaking the inner loop causes the HTTP server to recycle, or may exit entirely if run is false. if local_config.settings.primary_port != warp_server_port || !run.load(Ordering::Relaxed) { let _ = shutdown_tx.send(()); let _ = warp_server.await; break; } } + tokio::time::sleep(Duration::from_millis(250)).await; if !run.load(Ordering::Relaxed) { break; diff --git a/rust-zerotier-service/src/physicallink.rs b/rust-zerotier-service/src/physicallink.rs index 0a0108bd3..96914ae52 100644 --- a/rust-zerotier-service/src/physicallink.rs +++ b/rust-zerotier-service/src/physicallink.rs @@ -16,7 +16,6 @@ use std::ffi::CStr; use std::ptr::{null_mut, copy_nonoverlapping}; use std::mem::size_of; -#[derive(Clone)] pub struct PhysicalLink { pub address: InetAddress, pub device: String diff --git a/rust-zerotier-service/src/store.rs b/rust-zerotier-service/src/store.rs index 780ee7d34..d8f7c47da 100644 --- a/rust-zerotier-service/src/store.rs +++ b/rust-zerotier-service/src/store.rs @@ -34,7 +34,7 @@ impl Store { return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "base path does not exist or is not writable")); } Ok(Store{ - base_path: bp.into_path_buf().into_boxed_path(), + base_path: bp.to_path_buf().into_boxed_path(), peers_path: bp.join("peers.d").into_boxed_path(), controller_path: bp.join("controller.d").into_boxed_path(), networks_path: bp.join("networks.d").into_boxed_path(), @@ -42,13 +42,13 @@ impl Store { }) } - pub fn make_obj_path(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Result { - Ok(match obj_type { + fn make_obj_path(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Option { + Some(match obj_type { StateObjectType::IdentityPublic => self.base_path.join("identity.public"), StateObjectType::IdentitySecret => self.base_path.join("identity.secret"), StateObjectType::Certificate => { if obj_id.len() < 6 { - return Err(std::io::Error(std::io::ErrorKind::NotFound)); + return None; } self.certs_path.join(format!("{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}.cert",obj_id[0],obj_id[1],obj_id[2],obj_id[3],obj_id[4],obj_id[5])) }, @@ -56,44 +56,52 @@ impl Store { StateObjectType::Locator => self.base_path.join("locator"), StateObjectType::NetworkConfig => { if obj_id.len() < 1 { - return Err(std::io::Error(std::io::ErrorKind::NotFound)); + return None; } self.networks_path.join(format!("{:0>16x}.conf", obj_id[0])) }, StateObjectType::Peer => { if obj_id.len() < 1 { - return Err(std::io::Error(std::io::ErrorKind::NotFound)); + return None; } self.peers_path.join(format!("{:0>10x}.peer", obj_id[0])) } }) } - pub fn load(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Result, std::io::Error> { - let obj_path = self.make_obj_path(obj_type, obj_id)?; - let fmd = obj_path.metadata()?; - if fmd.is_file() { - let flen = fmd.len(); - if flen <= Store::MAX_OBJECT_SIZE as u64 { - let mut f = std::fs::File::open(obj_path)?; - let mut buf: Vec = Vec::new(); - buf.reserve(flen as usize); - let rs = f.read_to_end(&mut buf)?; - buf.resize(rs as usize, 0); - return Ok(buf.into_boxed_slice()); + pub fn load(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result> { + let obj_path = self.make_obj_path(obj_type, obj_id); + if obj_path.is_some() { + let obj_path = obj_path.unwrap(); + let fmd = obj_path.metadata()?; + if fmd.is_file() { + let flen = fmd.len(); + if flen <= Store::MAX_OBJECT_SIZE as u64 { + let mut f = std::fs::File::open(obj_path)?; + let mut buf: Vec = Vec::new(); + buf.reserve(flen as usize); + let rs = f.read_to_end(&mut buf)?; + buf.resize(rs as usize, 0); + return Ok(buf.into_boxed_slice()); + } } } - Err(std::io::Error(std::io::ErrorKind::NotFound)) + Err(std::io::Error::new(std::io::ErrorKind::NotFound, "does not exist or is not readable")) } pub fn erase(&self, obj_type: StateObjectType, obj_id: &[u64]) { let obj_path = self.make_obj_path(obj_type, obj_id); - if obj_path.is_ok() { + if obj_path.is_some() { let _ = std::fs::remove_file(obj_path.unwrap()); } } - pub fn store(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> Result<(), std::io::Error> { - std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(self.make_obj_path(obj_type, obj_id)?)?.write_all(obj_data) + pub fn store(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> { + let obj_path = self.make_obj_path(obj_type, obj_id); + if obj_path.is_some() { + std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(obj_path.unwrap())?.write_all(obj_data) + } else { + Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "object ID not valid")) + } } }