From b7d7e8eb40f25fdfebe41483719831e2daacb537 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 26 Jan 2021 11:45:42 -0500 Subject: [PATCH] cleanup --- core/AES.hpp | 7 ++- core/Dictionary.cpp | 10 +--- core/Dictionary.hpp | 6 +- core/MIMC52.cpp | 12 +++- rust-zerotier-service/src/log.rs | 4 +- rust-zerotier-service/src/main.rs | 99 +++++++++++++++++++++++-------- 6 files changed, 95 insertions(+), 43 deletions(-) diff --git a/core/AES.hpp b/core/AES.hpp index dc0130adf..63beb87b8 100644 --- a/core/AES.hpp +++ b/core/AES.hpp @@ -48,13 +48,14 @@ public: */ static ZT_INLINE bool accelerated() { +#ifdef ZT_AES_NO_ACCEL + return false; +#else #ifdef ZT_AES_AESNI return Utils::CPUID.aes; -#else +#endif #ifdef ZT_AES_NEON return Utils::ARMCAP.aes; -#else - return false; #endif #endif } diff --git a/core/Dictionary.cpp b/core/Dictionary.cpp index 3cb3fa6c5..47d676e54 100644 --- a/core/Dictionary.cpp +++ b/core/Dictionary.cpp @@ -15,12 +15,6 @@ namespace ZeroTier { -Dictionary::Dictionary() -{} - -Dictionary::~Dictionary() -{} - Vector< uint8_t > &Dictionary::operator[](const char *const k) { return m_entries[k]; } @@ -91,9 +85,7 @@ char *Dictionary::getS(const char *k, char *v, const unsigned int cap) const } void Dictionary::clear() -{ - m_entries.clear(); -} +{ m_entries.clear(); } void Dictionary::encode(Vector< uint8_t > &out) const { diff --git a/core/Dictionary.hpp b/core/Dictionary.hpp index faccf4922..2900603f4 100644 --- a/core/Dictionary.hpp +++ b/core/Dictionary.hpp @@ -42,9 +42,11 @@ class Dictionary public: typedef SortedMap< String, Vector< uint8_t > >::const_iterator const_iterator; - Dictionary(); + ZT_INLINE Dictionary() + {} - ~Dictionary(); + ZT_INLINE ~Dictionary() + {} /* ZT_INLINE void dump() const diff --git a/core/MIMC52.cpp b/core/MIMC52.cpp index 7b97a3d93..70267d476 100644 --- a/core/MIMC52.cpp +++ b/core/MIMC52.cpp @@ -17,7 +17,8 @@ namespace { -// The highest 1024 primes less than 2 ^ 52 (only the least significant 32 bits are needed) of the form 6k + 5 +// Largest 1024 primes of form 6k + 5 and less than 2^52. Only the least significant 32 +// bits need to be here, as the most significant bits are all 1. const uint32_t ZT_MIMC52_PRIMES[1024] = {4294895267, 4294895477, 4294895513, 4294895519, 4294895543, 4294895567, 4294895657, 4294895711, 4294895777, 4294895861, 4294895909, 4294895921, 4294895969, 4294896011, 4294896149, 4294896227, 4294896401, 4294896473, 4294896527, 4294896563, 4294896653, 4294896731, 4294896863, 4294896899, 4294896983, 4294897037, 4294897103, 4294897331, 4294897349, 4294897451, 4294897571, 4294897661, 4294897703, 4294897757, 4294897793, 4294897811, 4294897817, 4294897829, 4294897877, 4294897919, 4294897991, 4294898027, 4294898129, 4294898153, 4294898231, 4294898273, 4294898279, 4294898291, 4294898363, 4294898369, 4294898417, 4294898423, 4294898453, 4294898489, 4294898573, 4294898579, 4294898639, 4294898693, 4294898747, 4294898759, 4294898867, 4294898879, 4294898909, 4294898921, 4294898933, 4294899011, 4294899041, 4294899047, 4294899203, 4294899221, 4294899227, 4294899287, 4294899341, 4294899431, 4294899509, 4294899533, 4294899539, 4294899551, 4294899629, 4294899791, 4294899809, 4294899971, 4294900001, 4294900007, 4294900013, 4294900307, 4294900331, 4294900427, 4294900469, 4294900481, 4294900541, 4294900583, 4294900781, 4294900853, 4294900931, 4294900991, 4294901033, 4294901087, 4294901159, 4294901267, 4294901393, 4294901411, 4294901489, 4294901657, 4294902011, 4294902071, 4294902101, 4294902107, 4294902353, 4294902377, 4294902599, 4294902647, 4294902743, 4294902869, 4294902977, 4294903067, 4294903103, 4294903259, 4294903289, 4294903397, 4294903421, 4294903493, 4294903577, 4294903631, 4294903637, 4294903733, 4294903799, 4294903823, 4294904003, 4294904033, 4294904081, 4294904129, 4294904279, 4294904297, 4294904303, 4294904333, 4294904351, 4294904381, @@ -62,8 +63,13 @@ static uint64_t mulmod64(uint64_t a, uint64_t b, const uint64_t m) #else -// 52-bit mulmod using FPU hack, which is very fast on most systems with IEEE 64-bit floats. -#define mulmod52(a, b, m, mf) ( ( ( a * b ) - ( ((uint64_t)(((double)a * (double)b) / mf) - 1) * m ) ) % m ) +ZT_INLINE uint64_t mulmod52(uint64_t a, const uint64_t b, const uint64_t m, const double mf) +{ + a = ( ( a * b ) - ( ((uint64_t)(((double)a * (double)b) / mf) - 1) * m ) ); + //a -= m * (uint64_t)(a > m); // faster on some systems, but slower on newer cores + a %= m; + return a; +} #endif diff --git a/rust-zerotier-service/src/log.rs b/rust-zerotier-service/src/log.rs index 6a6643b16..0fc273b7b 100644 --- a/rust-zerotier-service/src/log.rs +++ b/rust-zerotier-service/src/log.rs @@ -49,7 +49,7 @@ impl Log { self.max_size.store(if new_max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { new_max_size },Ordering::Relaxed); } - pub fn log(&self, s: &S) { + pub fn log>(&self, s: S) { let mut fc = self.file.lock().unwrap(); let max_size = self.max_size.load(Ordering::Relaxed); @@ -81,7 +81,7 @@ impl Log { let mut f = fc.get_mut().as_mut().unwrap(); let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); - let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), s); + let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), s.into()); let _ = f.write_all(log_line.as_bytes()); let _ = f.flush(); self.cur_size.fetch_add(log_line.len() as i64); diff --git a/rust-zerotier-service/src/main.rs b/rust-zerotier-service/src/main.rs index 219b8a64b..97d97ff09 100644 --- a/rust-zerotier-service/src/main.rs +++ b/rust-zerotier-service/src/main.rs @@ -30,6 +30,8 @@ use std::time::Duration; use futures::stream::{self, StreamExt}; use warp::Filter; +use warp::hyper::{HeaderMap, Method}; +use warp::hyper::body::Bytes; use zerotier_core::*; @@ -38,7 +40,6 @@ use crate::localconfig::*; use crate::log::Log; use crate::physicallink::PhysicalLink; use crate::network::Network; -use futures::TryFutureExt; pub struct ServiceEventHandler {} @@ -81,44 +82,82 @@ impl NodeEventHandler for ServiceEventHandler { } fn main() { - tokio::runtime::Builder::new_multi_thread().thread_stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).build().unwrap().block_on(async { - let inaddr_v6_any = IpAddr::from_str("::0").unwrap(); + let inaddr_v6_any = IpAddr::from_str("::0").unwrap(); + let mut process_exit_value: i32 = 0; - let mut local_config: Box = Box::new(LocalConfig::default()); + // Current active local configuration for this node. + let mut local_config: Box = Box::new(LocalConfig::default()); + + // Handler for incoming packets from FastUDPSocket and incoming events from Node. + let handler: Arc = Arc::new(ServiceEventHandler{}); + + // From this point on we are in Tokio async land... + let tokio_rt = tokio::runtime::Builder::new_multi_thread().thread_stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).build().unwrap(); + tokio_rt.block_on(async { + // Keeps track of FastUDPSocket instances by bound address. let mut udp_sockets: BTreeMap> = BTreeMap::new(); - let handler: Arc = Arc::new(ServiceEventHandler{}); - let run: AtomicBool = AtomicBool::new(true); + + // Send something to interrupt_tx to interrupt the inner loop and force it to + // detect a change or exit if run has been set to false. let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::(2); - // - // The inner loop periodically updates UDP socket bindings and does other housekeeping, but - // otherwise does nothing. If it detects that the primary port has changed, it breaks and - // causes the outer loop to run which reboots the HTTP server. If the 'run' flag is set - // to false this causes a break of both loops which terminates the service. - // + // Setting this to false terminates the service. It's atomic since this is multithreaded. + let run = AtomicBool::new(true); loop { let mut warp_server_port = local_config.settings.primary_port; - let root = warp::path::end().map(|| { warp::reply::with_status("not found", warp::hyper::StatusCode::NOT_FOUND) }); - let status = warp::path("status").map(|| { "status" }); - let network = warp::path!("network" / String).map(|nwid_str| { "network" }); - let peer = warp::path!("peer" / String).map(|peer_str| { "peer" }); + let root = warp::path::end().map(|| { + warp::reply::with_status("404", warp::hyper::StatusCode::NOT_FOUND) + }); - let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); - let (_, warp_server) = warp::serve(warp::any().and( + let status = warp::path("status") + .and(warp::method()) + .and(warp::header::headers_cloned()) + .and(warp::body::bytes()) + .map(|method: Method, headers: HeaderMap, post_data: Bytes| { + "status" + }); + let network = warp::path!("network" / String) + .and(warp::method()) + .and(warp::header::headers_cloned()) + .and(warp::body::bytes()) + .map(|nwid_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { + "network" + }); + let peer = warp::path!("peer" / String) + .and(warp::method()) + .and(warp::header::headers_cloned()) + .and(warp::body::bytes()) + .map(|peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { + "peer" + }); + + let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel(); + let warp_server = warp::serve(warp::any().and( root .or(status) .or(network) .or(peer) - )).bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async { - let _ = shutdown_rx.await; - }); - let warp_server = tokio::spawn(warp_server); + )).try_bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async { shutdown_rx.await; }); + if warp_server.is_err() { + // TODO: log unable to bind to primary port + run.store(false, Ordering::Relaxed); + } + let warp_server = tokio_rt.spawn(warp_server.unwrap().1); let mut loop_delay = 10; loop { - let _ = tokio::time::timeout(Duration::from_secs(loop_delay), interrupt_rx.next()); + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(loop_delay)) => {}, + _ = interrupt_rx.next() => {}, + _ = tokio::signal::ctrl_c() => { + // TODO: log CTRL+C received + run.store(false, Ordering::Relaxed); + let _ = shutdown_tx.send(()); + break; + } + } // Enumerate physical addresses on the system, creating a map with an entry for // the primary_port and another for the secondary_port if bound. @@ -137,6 +176,7 @@ fn main() { } }); + // Close UDP bindings that no longer apply. let mut udp_sockets_to_close: Vec = Vec::new(); for sock in udp_sockets.iter() { if !system_addrs.contains_key(sock.0) { @@ -147,26 +187,37 @@ fn main() { udp_sockets.remove(k); } + // Bind addresses that are not already bound. for addr in system_addrs.iter() { if !udp_sockets.contains_key(addr.0) { let s = FastUDPSocket::new(addr.1.device.as_str(), addr.0, &handler); if s.is_ok() { udp_sockets.insert(addr.0.clone(), s.unwrap()); + } else if addr.0.port() == local_config.settings.primary_port { + run.store(false, Ordering::Relaxed); + // TODO: log failure to bind to primary port (UDP) + break; } } } if local_config.settings.primary_port != warp_server_port || !run.load(Ordering::Relaxed) { let _ = shutdown_tx.send(()); - let _ = warp_server.await; break; } } + let _ = warp_server.await; + + if !run.load(Ordering::Relaxed) { + break; + } tokio::time::sleep(Duration::from_millis(250)).await; if !run.load(Ordering::Relaxed) { break; } } }); + + std::process::exit(process_exit_value); }