diff --git a/osdep/rust-osdep.cpp b/osdep/rust-osdep.cpp index 96c2943a3..dcc7f80ca 100644 --- a/osdep/rust-osdep.cpp +++ b/osdep/rust-osdep.cpp @@ -10,6 +10,7 @@ struct prf_ra { #include "../core/Constants.hpp" #include "../core/Mutex.hpp" #include "../core/Containers.hpp" +#include "../core/SHA512.hpp" #include "OSUtils.hpp" #include "rust-osdep.h" @@ -116,4 +117,13 @@ int64_t msSinceEpoch() void lockDownFile(const char *path, int isDir) { ZeroTier::OSUtils::lockDownFile(path, isDir != 0); } +void getSecureRandom(void *buf, unsigned int len) +{ ZeroTier::Utils::getSecureRandom(buf, len); } + +void sha384(const void *in, unsigned int len, void *out) +{ ZeroTier::SHA384(out, in, len); } + +void sha512(const void *in, unsigned int len, void *out) +{ ZeroTier::SHA512(out, in, len); } + } diff --git a/osdep/rust-osdep.h b/osdep/rust-osdep.h index c0722acd4..7d8eeaae7 100644 --- a/osdep/rust-osdep.h +++ b/osdep/rust-osdep.h @@ -58,6 +58,9 @@ extern "C" { extern const char *platformDefaultHomePath(); extern int64_t msSinceEpoch(); extern void lockDownFile(const char *path, int isDir); +extern void getSecureRandom(void *buf, unsigned int len); +extern void sha384(const void *in, unsigned int len, void *out); +extern void sha512(const void *in, unsigned int len, void *out); #ifdef __cplusplus } diff --git a/rust-zerotier-core/src/lib.rs b/rust-zerotier-core/src/lib.rs index 1cd272d18..6fb00c7c5 100644 --- a/rust-zerotier-core/src/lib.rs +++ b/rust-zerotier-core/src/lib.rs @@ -165,6 +165,23 @@ pub enum ResultCode { ErrorInternalNonFatal = ztcore::ZT_ResultCode_ZT_RESULT_ERROR_INTERNAL as isize, } +impl ToString for ResultCode { + fn to_string(&self) -> String { + match *self { + ResultCode::Ok => "Ok", + ResultCode::FatalErrorOutOfMemory => "FatalErrorOutOfMemory", + ResultCode::FatalErrorDataStoreFailed => "FatalErrorDataStoreFailed", + ResultCode::FatalErrorInternal => "FatalErrorInternal", + ResultCode::ErrorNetworkNotFound => "ErrorNetworkNotFound", + ResultCode::ErrorUnsupportedOperation => "ErrorUnsupportedOperation", + ResultCode::ErrorBadParameter => "ErrorBadParameter", + ResultCode::ErrorInvalidCredential => "ErrorInvalidCredential", + ResultCode::ErrorCollidingObject => "ErrorCollidingObject", + ResultCode::ErrorInternalNonFatal => "ErrorInternalNonFatal", + }.to_string() + } +} + /// Returns a tuple of major, minor, revision, and build version numbers from the ZeroTier core. pub fn version() -> (i32, i32, i32, i32) { let mut major: c_int = 0; diff --git a/rust-zerotier-core/src/node.rs b/rust-zerotier-core/src/node.rs index b9c0c0724..ffbb047bd 100644 --- a/rust-zerotier-core/src/node.rs +++ b/rust-zerotier-core/src/node.rs @@ -11,15 +11,12 @@ */ /****/ -use std::cell::Cell; use std::collections::hash_map::HashMap; use std::intrinsics::copy_nonoverlapping; use std::mem::{MaybeUninit, transmute}; use std::os::raw::{c_int, c_uint, c_ulong, c_void}; use std::ptr::{null_mut, slice_from_raw_parts}; use std::sync::*; -use std::sync::atomic::*; -use std::time::Duration; use num_traits::FromPrimitive; use serde::{Deserialize, Serialize}; diff --git a/rust-zerotier-service/src/commands/mod.rs b/rust-zerotier-service/src/commands/mod.rs index 8116efd57..d48f3d1cb 100644 --- a/rust-zerotier-service/src/commands/mod.rs +++ b/rust-zerotier-service/src/commands/mod.rs @@ -11,7 +11,6 @@ */ /****/ -pub(crate) mod service; pub(crate) mod status; pub(crate) mod set; pub(crate) mod peer; diff --git a/rust-zerotier-service/src/commands/service.rs b/rust-zerotier-service/src/commands/service.rs deleted file mode 100644 index 49ad55ba7..000000000 --- a/rust-zerotier-service/src/commands/service.rs +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Copyright (c)2013-2020 ZeroTier, Inc. - * - * Use of this software is governed by the Business Source License included - * in the LICENSE.TXT file in the project's root directory. - * - * Change Date: 2025-01-01 - * - * On the date above, in accordance with the Business Source License, use - * of this software will be governed by version 2.0 of the Apache License. - */ -/****/ - -use std::cell::Cell; -use std::collections::BTreeMap; -use std::net::IpAddr; -use std::rc::Rc; -use std::str::FromStr; -use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; - -use futures::stream::{self, StreamExt}; -use warp::{Filter, Rejection, Reply}; -use warp::http::{HeaderMap, Method, StatusCode}; -use warp::hyper::body::Bytes; - -use zerotier_core::{Buffer, Address, IpScope, Node, NodeEventHandler, NetworkId, VirtualNetworkConfigOperation, VirtualNetworkConfig, StateObjectType, MAC, Event, InetAddress, InetAddressFamily, Identity}; - -use crate::fastudpsocket::*; -use crate::getifaddrs; -use crate::localconfig::*; -use crate::log::Log; -use crate::network::Network; -use crate::store::Store; - -// Check local addresses and bindings every (this) milliseconds. -const BINDING_CHECK_INTERVAL: i64 = 5000; - -struct Service { - local_config: Mutex>, - run: Arc, - store: Arc, -} - -impl Clone for Service { - fn clone(&self) -> Self { - Service { - local_config: Mutex::new(self.local_config.lock().unwrap().clone()), - run: self.run.clone(), - store: self.store.clone(), - } - } -} - -impl NodeEventHandler for Service { - fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) { - } - - #[inline(always)] - fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) { - } - - fn event(&self, event: Event, event_data: &[u8]) { - match event { - Event::Up => {}, - Event::Down => {}, - Event::Online => {}, - Event::Offline => {}, - Event::Trace => {}, - Event::UserMessage => {}, - } - } - - #[inline(always)] - fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> { - self.store.store_object(&obj_type, obj_id, obj_data) - } - - #[inline(always)] - fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result> { - self.store.load_object(&obj_type, obj_id) - } - - #[inline(always)] - fn wire_packet_send(&self, local_socket: i64, sock_addr: &InetAddress, data: &[u8], packet_ttl: u32) -> i32 { - 0 - } - - fn path_check(&self, address: Address, id: &Identity, local_socket: i64, sock_addr: &InetAddress) -> bool { - true - } - - fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option { - let lc = self.get_local_config(); - let vc = lc.virtual_.get(&address); - vc.map_or(None, |c: &LocalConfigVirtualConfig| { - if c.try_.is_empty() { - None - } else { - let t = c.try_.get((zerotier_core::random() as usize) % c.try_.len()); - t.map_or(None, |v: &InetAddress| { - Some(v.clone()) - }) - } - }) - } -} - -impl Service { - #[inline(always)] - fn web_api_status(&self, method: Method, headers: HeaderMap, post_data: Bytes) -> Box { - Box::new(warp::http::StatusCode::BAD_REQUEST) - } - - #[inline(always)] - fn web_api_network(&self, network_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box { - Box::new(warp::http::StatusCode::BAD_REQUEST) - } - - #[inline(always)] - fn web_api_peer(&self, peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box { - Box::new(warp::http::StatusCode::BAD_REQUEST) - } - - #[inline(always)] - fn get_local_config(&self) -> Arc { - self.local_config.lock().unwrap().clone() - } - - #[inline(always)] - fn set_local_config(&self, new_lc: &Arc) { - let mut lc = self.local_config.lock().unwrap(); - *lc = new_lc.clone(); - } -} - -pub(crate) fn run(store: &Arc) -> i32 { - let mut process_exit_value: i32 = 0; - - let tokio_rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); - tokio_rt.block_on(async { - let mut udp_sockets: BTreeMap = BTreeMap::new(); - let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::(2); - - let service = Service { - local_config: Mutex::new(Arc::new(LocalConfig::default())), - run: Arc::new(AtomicBool::new(true)), - store: store.clone(), - }; - - let node = Node::new(service.clone()); - if node.is_err() { - // TODO: log and handle error - return; - } - let node = Arc::new(node.ok().unwrap()); - - let mut primary_port_bind_failure = false; - let mut last_checked_bindings: i64 = 0; - let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL; - loop { - let mut current_local_config = service.get_local_config(); - - let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel(); - let warp_server; - { - let s0 = service.clone(); - let s1 = service.clone(); - let s2 = service.clone(); - warp_server = warp::serve(warp::any().and(warp::path::end().map(|| { warp::reply::with_status("404", warp::hyper::StatusCode::NOT_FOUND) }) - .or(warp::path("status").and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes()) - .map(move |method: Method, headers: HeaderMap, post_data: Bytes| { s0.web_api_status(method, headers, post_data) })) - .or(warp::path!("network" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes()) - .map(move |network_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { s1.web_api_network(network_str, method, headers, post_data) })) - .or(warp::path!("peer" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes()) - .map(move |peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { s2.web_api_peer(peer_str, method, headers, post_data) })) - )).try_bind_with_graceful_shutdown((IpAddr::from([127_u8, 0_u8, 0_u8, 1_u8]), current_local_config.settings.primary_port), async { let _ = shutdown_rx.await; }); - } - if warp_server.is_err() { - primary_port_bind_failure = true; - break; - } - let warp_server = tokio_rt.spawn(warp_server.unwrap().1); - - loop { - tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(loop_delay)) => {}, - _ = interrupt_rx.next() => {}, - _ = tokio::signal::ctrl_c() => { - // TODO: log CTRL+C received - service.run.store(false, Ordering::Relaxed); - let _ = shutdown_tx.send(()); - break; - } - } - - loop_delay = node.process_background_tasks(); - - let now = zerotier_core::now(); - if (now - last_checked_bindings) >= BINDING_CHECK_INTERVAL { - last_checked_bindings = now; - - let mut system_addrs: BTreeMap = BTreeMap::new(); - getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| { - match addr.ip_scope() { - IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => { - if !current_local_config.settings.is_interface_blacklisted(dev) { - let mut a = addr.clone(); - a.set_port(current_local_config.settings.primary_port); - system_addrs.insert(a, String::from(dev)); - if current_local_config.settings.secondary_port.is_some() { - let mut a = addr.clone(); - a.set_port(current_local_config.settings.secondary_port.unwrap()); - system_addrs.insert(a, String::from(dev)); - } - } - }, - _ => {} - } - }); - - let mut udp_sockets_to_close: Vec = Vec::new(); - for sock in udp_sockets.iter() { - if !system_addrs.contains_key(sock.0) { - udp_sockets_to_close.push(sock.0.clone()); - } - } - for k in udp_sockets_to_close.iter() { - udp_sockets.remove(k); - } - - for addr in system_addrs.iter() { - if !udp_sockets.contains_key(addr.0) { - let s = FastUDPSocket::new(addr.1.as_str(), addr.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| { - // TODO: incoming packet handler - }); - if s.is_ok() { - udp_sockets.insert(addr.0.clone(), s.unwrap()); - } - } - } - - primary_port_bind_failure = true; - for s in udp_sockets.iter() { - if s.0.port() == current_local_config.settings.primary_port { - primary_port_bind_failure = false; - break; - } - } - if primary_port_bind_failure { - break; - } - } - - let next_local_config = service.get_local_config(); - if !service.run.load(Ordering::Relaxed) || current_local_config.settings.primary_port != next_local_config.settings.primary_port { - let _ = shutdown_tx.send(()); - break; - } - current_local_config = next_local_config; - } - - let _ = warp_server.await; - - if !service.run.load(Ordering::Relaxed) { - break; - } - tokio::time::sleep(Duration::from_millis(250)).await; - if !service.run.load(Ordering::Relaxed) { - break; - } - - if primary_port_bind_failure { - let local_config = service.get_local_config(); - if local_config.settings.auto_port_search { - // TODO: port hunting if enabled - } - } - } - }); - - process_exit_value -} diff --git a/rust-zerotier-service/src/fastudpsocket.rs b/rust-zerotier-service/src/fastudpsocket.rs index e038ffbaf..0f7b8aa72 100644 --- a/rust-zerotier-service/src/fastudpsocket.rs +++ b/rust-zerotier-service/src/fastudpsocket.rs @@ -33,6 +33,21 @@ pub(crate) type FastUDPRawOsSocket = winsock2::SOCKET; #[cfg(unix)] pub(crate) type FastUDPRawOsSocket = c_int; +/// Test bind UDP to a port at 0.0.0.0 and ::0, returning whether IPv4 and/or IPv6 succeeded (respectively). +pub(crate) fn test_bind_udp(port: u16) -> (bool, bool) { + let v4 = InetAddress::new_ipv4_any(port); + let v6 = InetAddress::new_ipv6_any(port); + let v4b = bind_udp_socket("", &v4); + if v4b.is_ok() { + fast_udp_socket_close(v4b.as_ref().unwrap()); + } + let v6b = bind_udp_socket("", &v6); + if v6b.is_ok() { + fast_udp_socket_close(v6b.as_ref().unwrap()); + } + (v4b.is_ok(), v6b.is_ok()) +} + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // bind_udp_socket() implementations for each platform @@ -122,21 +137,6 @@ fn bind_udp_socket(_: &str, address: &InetAddress) -> Result (bool, bool) { - let v4 = InetAddress::new_ipv4_any(port); - let v6 = InetAddress::new_ipv6_any(port); - let v4b = bind_udp_socket("", &v4); - if v4b.is_ok() { - fast_udp_socket_close(v4b.as_ref().unwrap()); - } - let v6b = bind_udp_socket("", &v6); - if v6b.is_ok() { - fast_udp_socket_close(v6b.as_ref().unwrap()); - } - (v4b.is_ok(), v6b.is_ok()) -} - /// A multi-threaded (or otherwise fast) UDP socket that binds to both IPv4 and IPv6 addresses. pub(crate) struct FastUDPSocket { threads: Vec>, diff --git a/rust-zerotier-service/src/localconfig.rs b/rust-zerotier-service/src/localconfig.rs index 3dc9bbd97..8e43745df 100644 --- a/rust-zerotier-service/src/localconfig.rs +++ b/rust-zerotier-service/src/localconfig.rs @@ -160,7 +160,7 @@ impl Default for LocalConfigNetworkSettings { impl LocalConfigSettings { #[cfg(target_os = "macos")] - const DEFAULT_PREFIX_BLACKLIST: [&'static str; 7] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth"]; + const DEFAULT_PREFIX_BLACKLIST: [&'static str; 8] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth", "zt"]; #[cfg(target_os = "linux")] const DEFAULT_PREFIX_BLACKLIST: [&'static str; 5] = ["lo", "tun", "tap", "ipsec", "zt"]; diff --git a/rust-zerotier-service/src/log.rs b/rust-zerotier-service/src/log.rs index 3f2fe2a98..a0199aeed 100644 --- a/rust-zerotier-service/src/log.rs +++ b/rust-zerotier-service/src/log.rs @@ -15,24 +15,20 @@ use std::cell::Cell; use std::fmt::Display; use std::fs::{File, OpenOptions}; use std::io::{Seek, SeekFrom, Write}; -use std::sync::atomic::{AtomicUsize, Ordering, AtomicBool}; use std::sync::Mutex; use chrono::Datelike; -use zerotier_core::PortableAtomicI64; - struct LogIntl { file: Option, cur_size: u64, max_size: usize, - enabled: bool } pub(crate) struct Log { prefix: String, path: String, - intl: Mutex + intl: Mutex, } impl Log { @@ -50,7 +46,6 @@ impl Log { file: None, cur_size: 0, max_size: if max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { max_size }, - enabled: true, }), } } @@ -59,55 +54,60 @@ impl Log { self.intl.lock().unwrap().max_size = if new_max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { new_max_size }; } - pub fn set_enabled(&self, enabled: bool) { - self.intl.lock().unwrap().enabled = enabled; - } - pub fn log>(&self, s: S) { - let mut l = self.intl.lock().unwrap(); - if l.enabled { - if l.file.is_none() { - let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str()); - if f.is_err() { - return; - } - let mut f = f.unwrap(); - let eof = f.seek(SeekFrom::End(0)); - if eof.is_err() { - return; - } - l.cur_size = eof.unwrap(); - l.file = Some(f); - } - - if l.max_size > 0 && l.cur_size > l.max_size as u64 { - l.file = None; - l.cur_size = 0; - - let mut old_path = self.path.clone(); - old_path.push_str(".old"); - let _ = std::fs::remove_file(old_path.as_str()); - let _ = std::fs::rename(self.path.as_str(), old_path.as_str()); - let _ = std::fs::remove_file(self.path.as_str()); // should fail - - let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str()); - if f.is_err() { - return; - } - l.file = Some(f.unwrap()); - } - - let f = l.file.as_mut().unwrap(); - let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); - let ss: &str = s.as_ref(); - let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), ss); - let _ = f.write_all(log_line.as_bytes()); - let _ = f.flush(); - l.cur_size += log_line.len() as u64; + let ss: &str = s.as_ref(); + if ss.starts_with("FATAL") { + eprintln!("{}", ss); } + + let mut l = self.intl.lock().unwrap(); + if l.file.is_none() { + let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str()); + if f.is_err() { + return; + } + let mut f = f.unwrap(); + let eof = f.seek(SeekFrom::End(0)); + if eof.is_err() { + return; + } + l.cur_size = eof.unwrap(); + l.file = Some(f); + } + + if l.max_size > 0 && l.cur_size > l.max_size as u64 { + l.file = None; + l.cur_size = 0; + + let mut old_path = self.path.clone(); + old_path.push_str(".old"); + let _ = std::fs::remove_file(old_path.as_str()); + let _ = std::fs::rename(self.path.as_str(), old_path.as_str()); + let _ = std::fs::remove_file(self.path.as_str()); // should fail + + let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str()); + if f.is_err() { + return; + } + l.file = Some(f.unwrap()); + } + + let f = l.file.as_mut().unwrap(); + let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); + let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), ss); + let _ = f.write_all(log_line.as_bytes()); + let _ = f.flush(); + l.cur_size += log_line.len() as u64; } } +#[macro_export] +macro_rules! l( + ($logger:ident, $($arg:tt)*) => { + $logger.log(format!($($arg)*)) + } +); + unsafe impl Sync for Log {} /* diff --git a/rust-zerotier-service/src/main.rs b/rust-zerotier-service/src/main.rs index 03a220654..b29d50bae 100644 --- a/rust-zerotier-service/src/main.rs +++ b/rust-zerotier-service/src/main.rs @@ -17,10 +17,11 @@ mod commands; mod fastudpsocket; mod localconfig; mod getifaddrs; -mod log; +#[macro_use] mod log; mod store; mod network; mod vnic; +mod service; #[allow(non_snake_case,non_upper_case_globals,non_camel_case_types,dead_code,improper_ctypes)] mod osdep; // bindgen generated @@ -28,18 +29,19 @@ mod osdep; // bindgen generated use std::boxed::Box; use std::ffi::CStr; use std::path::Path; +use std::sync::Arc; use crate::store::Store; -use std::sync::Arc; fn main() { let mut process_exit_value: i32 = 0; let mut cli_args = Some(Box::new(cli::parse_cli_args())); - let mut zerotier_path = unsafe { zerotier_core::cstr_to_string(osdep::platformDefaultHomePath(), 256) }; - let json_output; - let mut token: Option = None; - let mut token_path = Path::new(&zerotier_path).join("authtoken.secret"); + let mut zerotier_path = unsafe { zerotier_core::cstr_to_string(osdep::platformDefaultHomePath(), -1) }; + + let json_output: bool; + let mut auth_token: Option = None; + let mut auth_token_path: Option = None; { let a = cli_args.as_ref().unwrap(); json_output = a.is_present("json"); @@ -49,21 +51,35 @@ fn main() { } let v = a.value_of("token"); if v.is_some() { - token = Some(String::from(v.unwrap().trim())); + auth_token = Some(v.unwrap().trim().to_string()); } let v = a.value_of("token_path"); if v.is_some() { - token_path = Path::new(v.unwrap().trim()).to_path_buf(); + auth_token_path = Some(v.unwrap().to_string()); } } let store = Store::new(zerotier_path.as_str()); if store.is_err() { - println!("FATAL: error accessing directory '{}': {}", zerotier_path, store.err().unwrap().to_string()); + eprintln!("FATAL: error accessing directory '{}': {}", zerotier_path, store.err().unwrap().to_string()); std::process::exit(1); } let store = Arc::new(store.unwrap()); + if auth_token.is_none() { + let t; + if auth_token_path.is_some() { + t = store.read_file_str(auth_token_path.unwrap().trim()); + } else { + t = store.read_authtoken_secret(); + } + if t.is_ok() { + auth_token = Some(t.unwrap().trim().to_string()); + } + } else { + auth_token = Some(auth_token.unwrap().trim().to_string()); + } + match cli_args.as_ref().unwrap().subcommand_name().unwrap() { "version" => { let ver = zerotier_core::version(); @@ -71,7 +87,7 @@ fn main() { }, "service" => { cli_args = None; // free any memory we can when launching service - process_exit_value = commands::service::run(&store); + process_exit_value = service::run(&store, auth_token); }, _ => cli::print_help(), // includes "help" } diff --git a/rust-zerotier-service/src/service.rs b/rust-zerotier-service/src/service.rs new file mode 100644 index 000000000..9c8bbc913 --- /dev/null +++ b/rust-zerotier-service/src/service.rs @@ -0,0 +1,363 @@ +/* + * Copyright (c)2013-2020 ZeroTier, Inc. + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file in the project's root directory. + * + * Change Date: 2025-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2.0 of the Apache License. + */ +/****/ + +use std::collections::BTreeMap; +use std::net::IpAddr; +use std::str::FromStr; +use std::sync::{Arc, Mutex, Weak}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; + +use futures::stream::StreamExt; +use warp::{Filter, Reply}; +use warp::http::{HeaderMap, Method, StatusCode}; +use warp::hyper::body::Bytes; + +use zerotier_core::{Buffer, Address, IpScope, Node, NodeEventHandler, NetworkId, VirtualNetworkConfigOperation, VirtualNetworkConfig, StateObjectType, MAC, Event, InetAddress, InetAddressFamily, Identity}; + +use crate::fastudpsocket::*; +use crate::getifaddrs; +use crate::localconfig::*; +use crate::log::Log; +use crate::network::Network; +use crate::store::Store; + +const CONFIG_CHECK_INTERVAL: i64 = 5000; + +#[derive(Clone)] +struct Service { + auth_token: Arc, + log: Arc, + _local_config: Arc>>, + run: Arc, + store: Arc, + node: Weak>, // weak since Node can hold a reference to this +} + +impl NodeEventHandler for Service { + fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {} + + #[inline(always)] + fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) {} + + fn event(&self, event: Event, event_data: &[u8]) { + match event { + Event::Up => {} + Event::Down => {} + Event::Online => {} + Event::Offline => {} + Event::Trace => {} + Event::UserMessage => {} + } + } + + #[inline(always)] + fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> { + self.store.store_object(&obj_type, obj_id, obj_data) + } + + #[inline(always)] + fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result> { + self.store.load_object(&obj_type, obj_id) + } + + #[inline(always)] + fn wire_packet_send(&self, local_socket: i64, sock_addr: &InetAddress, data: &[u8], packet_ttl: u32) -> i32 { + 0 + } + + fn path_check(&self, address: Address, id: &Identity, local_socket: i64, sock_addr: &InetAddress) -> bool { + true + } + + fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option { + let lc = self.local_config(); + let vc = lc.virtual_.get(&address); + vc.map_or(None, |c: &LocalConfigVirtualConfig| { + if c.try_.is_empty() { + None + } else { + let t = c.try_.get((zerotier_core::random() as usize) % c.try_.len()); + t.map_or(None, |v: &InetAddress| { + Some(v.clone()) + }) + } + }) + } +} + +impl Service { + #[inline(always)] + fn web_api_status(&self, method: Method, headers: HeaderMap, post_data: Bytes) -> Box { + Box::new(StatusCode::BAD_REQUEST) + } + + #[inline(always)] + fn web_api_network(&self, network_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box { + Box::new(StatusCode::BAD_REQUEST) + } + + #[inline(always)] + fn web_api_peer(&self, peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box { + Box::new(StatusCode::BAD_REQUEST) + } + + #[inline(always)] + fn local_config(&self) -> Arc { + self._local_config.lock().unwrap().clone() + } + + #[inline(always)] + fn set_local_config(&self, new_lc: LocalConfig) { + *(self._local_config.lock().unwrap()) = Arc::new(new_lc); + } +} + +pub(crate) fn run(store: &Arc, auth_token: Option) -> i32 { + let mut process_exit_value: i32 = 0; + + let init_local_config = Arc::new(store.read_local_conf(false).unwrap_or(LocalConfig::default())); + + // Open log in store. + let log = Arc::new(Log::new( + if init_local_config.settings.log_path.as_ref().is_some() { init_local_config.settings.log_path.as_ref().unwrap().as_str() } else { store.default_log_path.to_str().unwrap() }, + init_local_config.settings.log_size_max, + "", + )); + + // Generate authtoken.secret from secure random bytes if not already set. + let auth_token = auth_token.unwrap_or_else(|| { + let mut rb = [0_u8; 64]; + unsafe { + crate::osdep::getSecureRandom(rb.as_mut_ptr().cast(), 64); + } + let mut t = String::new(); + t.reserve(64); + for b in rb.iter() { + if *b > 127_u8 { + t.push((65 + (*b % 26)) as char); // A..Z + } else { + t.push((97 + (*b % 26)) as char); // a..z + } + } + if store.write_authtoken_secret(t.as_str()).is_err() { + t.clear(); + } + t + }); + if auth_token.is_empty() { + l!(log, "FATAL: unable to write authtoken.secret to '{}'", store.base_path.to_str().unwrap()); + return 1; + } + let auth_token = Arc::new(auth_token); + + let tokio_rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); + tokio_rt.block_on(async { + let mut udp_sockets: BTreeMap = BTreeMap::new(); + let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1); + + // Create clonable implementation of NodeEventHandler and local web API endpoints. + let mut service = Service { + auth_token: auth_token.clone(), + log: log.clone(), + _local_config: Arc::new(Mutex::new(init_local_config)), + run: Arc::new(AtomicBool::new(true)), + store: store.clone(), + node: Weak::new(), + }; + + // Create instance of Node which will call Service on events. + let node = Node::new(service.clone()); + if node.is_err() { + process_exit_value = 1; + l!(log, "FATAL: error initializing node: {}", node.err().unwrap().to_string()); + return; + } + let node = Arc::new(node.ok().unwrap()); + + service.node = Arc::downgrade(&node); + let service = service; // make immutable after setting node + + let mut last_checked_config: i64 = 0; + let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL; + loop { + let mut local_config = service.local_config(); + + let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel(); + let warp_server; + { + let s0 = service.clone(); + let s1 = service.clone(); + let s2 = service.clone(); + warp_server = warp::serve( + warp::any().and(warp::path::end().map(|| { + warp::reply::with_status("404", StatusCode::NOT_FOUND) + }) + .or(warp::path("status").and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes()) + .map(move |method: Method, headers: HeaderMap, post_data: Bytes| { + s0.web_api_status(method, headers, post_data) + })) + .or(warp::path!("network" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes()) + .map(move |network_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { + s1.web_api_network(network_str, method, headers, post_data) + })) + .or(warp::path!("peer" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes()) + .map(move |peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { + s2.web_api_peer(peer_str, method, headers, post_data) + })) + )).try_bind_with_graceful_shutdown( + (IpAddr::from([127_u8, 0_u8, 0_u8, 1_u8]), local_config.settings.primary_port), + async { let _ = shutdown_rx.await; }, + ); + } + if warp_server.is_err() { + l!(log, "ERROR: local API http server failed to bind to port {}: {}", local_config.settings.primary_port, warp_server.err().unwrap().to_string()); + break; + } + let warp_server = tokio_rt.spawn(warp_server.unwrap().1); + + loop { + // Wait for (1) loop delay elapsed, (2) a signal to interrupt delay now, or + // (3) an external signal to exit. + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(loop_delay)) => {}, + _ = interrupt_rx.next() => {}, + _ = tokio::signal::ctrl_c() => { + l!(log, "exit signal received, shutting down..."); + service.run.store(false, Ordering::Relaxed); + break; + } + } + + // Check every CONFIG_CHECK_INTERVAL for changes to either the system configuration + // or the node's local configuration and take actions as needed. + let now = zerotier_core::now(); + if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL { + last_checked_config = now; + + // Check for changes to local.conf. + let new_config = store.read_local_conf(true); + if new_config.is_ok() { + service.set_local_config(new_config.unwrap()); + } + + // Check for configuration changes that require a reboot of the inner loop + // or other actions to be taken. + let next_local_config = service.local_config(); + if local_config.settings.primary_port != next_local_config.settings.primary_port { + break; + } + local_config = next_local_config; + + // Enumerate all useful addresses bound to interfaces on the system. + let mut system_addrs: BTreeMap = BTreeMap::new(); + getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| { + match addr.ip_scope() { + IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => { + if !local_config.settings.is_interface_blacklisted(dev) { + let mut a = addr.clone(); + a.set_port(local_config.settings.primary_port); + system_addrs.insert(a, String::from(dev)); + if local_config.settings.secondary_port.is_some() { + let mut a = addr.clone(); + a.set_port(local_config.settings.secondary_port.unwrap()); + system_addrs.insert(a, String::from(dev)); + } + } + } + _ => {} + } + }); + + // Drop bound sockets that are no longer valid or are now blacklisted. + let mut udp_sockets_to_close: Vec = Vec::new(); + for sock in udp_sockets.iter() { + if !system_addrs.contains_key(sock.0) { + udp_sockets_to_close.push(sock.0.clone()); + } + } + for k in udp_sockets_to_close.iter() { + udp_sockets.remove(k); + } + + // Create sockets for unbound addresses. + for addr in system_addrs.iter() { + if !udp_sockets.contains_key(addr.0) { + let s = FastUDPSocket::new(addr.1.as_str(), addr.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| { + // TODO: incoming packet handler + }); + if s.is_ok() { + udp_sockets.insert(addr.0.clone(), s.unwrap()); + } + } + } + + // Determine if primary and secondary port (if secondary enabled) failed to + // bind to any interface. + let mut primary_port_bind_failure = true; + let mut secondary_port_bind_failure = local_config.settings.secondary_port.is_some(); + for s in udp_sockets.iter() { + if s.0.port() == local_config.settings.primary_port { + primary_port_bind_failure = false; + if !secondary_port_bind_failure { + break; + } + } + if s.0.port() == local_config.settings.secondary_port.unwrap() { + secondary_port_bind_failure = false; + if !primary_port_bind_failure { + break; + } + } + } + if primary_port_bind_failure { + if local_config.settings.auto_port_search { + // TODO: port hunting if enabled + } else { + l!(log, "primary port {} failed to bind, waiting and trying again...", local_config.settings.primary_port); + break; + } + } + if secondary_port_bind_failure { + l!(log, "secondary port {} failed to bind (non-fatal, will try again)", local_config.settings.secondary_port.unwrap_or(0)); + // hunt for a secondary port. + } + } + + // Check to make sure nothing outside this code turned off the run flag. + if !service.run.load(Ordering::Relaxed) { + break; + } + + // Run background task handler in ZeroTier core. + loop_delay = node.process_background_tasks(); + } + + // Gracefully shut down the local web server. + let _ = shutdown_tx.send(()); + let _ = warp_server.await; + + // Sleep for a brief period of time to prevent thrashing if some invalid + // state is hit that causes the inner loop to keep breaking. + if !service.run.load(Ordering::Relaxed) { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + if !service.run.load(Ordering::Relaxed) { + break; + } + } + }); + + process_exit_value +} diff --git a/rust-zerotier-service/src/store.rs b/rust-zerotier-service/src/store.rs index 52968eda6..61636271a 100644 --- a/rust-zerotier-service/src/store.rs +++ b/rust-zerotier-service/src/store.rs @@ -14,6 +14,7 @@ use std::error::Error; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::Mutex; use std::ffi::CString; use zerotier_core::{StateObjectType, NetworkId}; @@ -23,6 +24,7 @@ use crate::localconfig::LocalConfig; pub(crate) struct Store { pub base_path: Box, pub default_log_path: Box, + prev_local_config: Mutex, peers_path: Box, controller_path: Box, networks_path: Box, @@ -53,6 +55,7 @@ impl Store { let s = Store { base_path: bp.to_path_buf().into_boxed_path(), default_log_path: bp.join("service.log").into_boxed_path(), + prev_local_config: Mutex::new(String::new()), peers_path: bp.join("peers.d").into_boxed_path(), controller_path: bp.join("controller.d").into_boxed_path(), networks_path: bp.join("networks.d").into_boxed_path(), @@ -161,13 +164,24 @@ impl Store { } /// Write a file to the base ZeroTier home directory. + /// Error code std::io::ErrorKind::Other is returned if skip_if_unchanged is true + /// and there has been no change from the last read. pub fn write_file(&self, fname: &str, data: &[u8]) -> std::io::Result<()> { std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(self.base_path.join(fname))?.write_all(data) } /// Reads local.conf and deserializes into a LocalConfig object. - pub fn read_local_conf(&self) -> std::io::Result { + pub fn read_local_conf(&self, skip_if_unchanged: bool) -> std::io::Result { let data = self.read_file_str("local.conf")?; + if skip_if_unchanged { + let mut prev = self.prev_local_config.lock().unwrap(); + if prev.eq(&data) { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "unchangd")); + } + *prev = data.clone(); + } else { + *(self.prev_local_config.lock().unwrap()) = data.clone(); + } let lc = LocalConfig::new_from_json(data.as_str()); if lc.is_err() { return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, lc.err().unwrap())); @@ -182,9 +196,9 @@ impl Store { } /// Reads the authtoken.secret file in the home directory. + #[inline(always)] pub fn read_authtoken_secret(&self) -> std::io::Result { - let data = self.read_file_str("authtoken.secret")?; - Ok(data.trim().to_string()) + Ok(self.read_file_str("authtoken.secret")?) } /// Write authtoken.secret and lock down file permissions. @@ -219,12 +233,14 @@ impl Store { if obj_path.is_some() { let obj_path = obj_path.unwrap(); std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(&obj_path)?.write_all(obj_data)?; + if obj_type.eq(&StateObjectType::IdentitySecret) || obj_type.eq(&StateObjectType::TrustStore) { lock_down_file(obj_path.to_str().unwrap()); } + Ok(()) } else { - Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "object ID not valid")) + Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "object type or ID not valid")) } } }