diff --git a/core/Endpoint.cpp b/core/Endpoint.cpp index 357a8fd89..f902247de 100644 --- a/core/Endpoint.cpp +++ b/core/Endpoint.cpp @@ -113,14 +113,11 @@ bool Endpoint::fromString(const char *s) noexcept return false; } break; } - } else { + } else if (strchr(s, '/') != nullptr) { // IP/port is parsed as an IP_UDP endpoint for backward compatibility. - if (strchr(s, '/')) { - return asInetAddress(this->value.ss).fromString(s); - } else { - return false; - } + return asInetAddress(this->value.ss).fromString(s); } + return false; } int Endpoint::marshal(uint8_t data[ZT_ENDPOINT_MARSHAL_SIZE_MAX]) const noexcept diff --git a/osdep/OSUtils.cpp b/osdep/OSUtils.cpp index 97f24f80a..d11b0d303 100644 --- a/osdep/OSUtils.cpp +++ b/osdep/OSUtils.cpp @@ -25,7 +25,7 @@ #include #include -#ifdef __GCC__ +#if defined(__GCC__) || defined(__GNUC__) #pragma GCC diagnostic ignored "-Wdeprecated-declarations" #endif diff --git a/rust-zerotier-core/src/endpoint.rs b/rust-zerotier-core/src/endpoint.rs index c09f8f054..db7ffaed1 100644 --- a/rust-zerotier-core/src/endpoint.rs +++ b/rust-zerotier-core/src/endpoint.rs @@ -85,7 +85,7 @@ impl Endpoint { /// Get a reference to the InetAddress in this endpoint or None if this is not of a relevant type. pub fn as_inetaddress(&self) -> Option<&InetAddress> { match self.type_ { - EndpointType::Ip | EndpointType::IpUdp | EndpointType::IpTcp | EndpointType::IpHttp => { + EndpointType::Ip | EndpointType::IpUdp | EndpointType::IpTcp | EndpointType::IpTcpWs => { unsafe { Some(InetAddress::transmute_capi(&self.capi.value.ia)) } @@ -109,7 +109,11 @@ impl ToString for Endpoint { impl PartialEq for Endpoint { fn eq(&self, other: &Endpoint) -> bool { - self.to_string() == other.to_string() + if self.type_ == other.type_ { + self.to_string() == other.to_string() + } else { + false + } } } diff --git a/rust-zerotier-service/Cargo.lock b/rust-zerotier-service/Cargo.lock index 22d704588..6fc7fc233 100644 --- a/rust-zerotier-service/Cargo.lock +++ b/rust-zerotier-service/Cargo.lock @@ -180,32 +180,6 @@ dependencies = [ "slab", ] -[[package]] -name = "h2" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b67e66362108efccd8ac053abafc8b7a8d86a37e6e48fc4f6f7485eb5e9e6a5" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", - "tracing-futures", -] - -[[package]] -name = "hashbrown" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" - [[package]] name = "hermit-abi" version = "0.1.17" @@ -264,7 +238,6 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", "http", "http-body", "httparse", @@ -278,16 +251,6 @@ dependencies = [ "want", ] -[[package]] -name = "indexmap" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb1fa934250de4de8aef298d81c729a7d33d8c239daa3a7575e6b92bfc7313b" -dependencies = [ - "autocfg", - "hashbrown", -] - [[package]] name = "itoa" version = "0.4.7" @@ -589,9 +552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ca04cec6ff2474c638057b65798f60ac183e5e79d3448bb7163d36a39cff6ec" dependencies = [ "autocfg", - "bytes", "libc", - "memchr", "mio", "once_cell", "pin-project-lite", @@ -611,20 +572,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-util" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebb7cb2f00c5ae8df755b252306272cd1790d39728363936e01827e11f0b017b" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tower-service" version = "0.3.0" @@ -651,16 +598,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "tracing-futures" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" -dependencies = [ - "pin-project", - "tracing", -] - [[package]] name = "try-lock" version = "0.2.3" diff --git a/rust-zerotier-service/Cargo.toml b/rust-zerotier-service/Cargo.toml index feb514188..e2c209c31 100644 --- a/rust-zerotier-service/Cargo.toml +++ b/rust-zerotier-service/Cargo.toml @@ -20,8 +20,8 @@ hex = "0" lazy_static = "1" num-traits = "0" num-derive = "0" -hyper = { version = "0", features = ["http1", "http2", "runtime", "server", "client", "tcp", "stream"] } -net2 = "0" +hyper = { version = "0", features = ["http1", "runtime", "server", "client", "tcp", "stream"] } +net2 = "0.2" [target."cfg(windows)".dependencies] winapi = { version = "0.3.9", features = ["handleapi", "ws2ipdef", "ws2tcpip"] } diff --git a/rust-zerotier-service/src/service.rs b/rust-zerotier-service/src/service.rs index 9c9460a28..66ecfb8a3 100644 --- a/rust-zerotier-service/src/service.rs +++ b/rust-zerotier-service/src/service.rs @@ -12,18 +12,15 @@ /****/ use std::collections::BTreeMap; +use std::net::{SocketAddr, Ipv4Addr, IpAddr, Ipv6Addr}; use std::str::FromStr; use std::sync::{Arc, Mutex, Weak}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering, AtomicPtr}; use std::time::Duration; -//use futures::stream::StreamExt; -//use warp::{Filter, Reply}; -//use warp::http::{HeaderMap, Method, StatusCode}; -//use warp::hyper::body::Bytes; - use zerotier_core::*; use zerotier_core::trace::{TraceEvent, TraceEventLayer}; +use futures::StreamExt; use crate::fastudpsocket::*; use crate::getifaddrs; @@ -32,23 +29,27 @@ use crate::log::Log; use crate::network::Network; use crate::store::Store; use crate::utils::ms_since_epoch; +use crate::weblistener::WebListener; +/// How often to check for major configuration changes. This shouldn't happen +/// too often since it uses a bit of CPU. const CONFIG_CHECK_INTERVAL: i64 = 5000; -/// Core ZeroTier service. -/// This object must be clonable across threads, so all its innards are in -/// Arc containers. It's probably faster to clone all these Arcs when new -/// threads are created (a rare event) so we only have to dereference each -/// Arc once for common events like packet receipt. +struct ServiceIntl { + auth_token: String, + interrupt: Mutex>, + local_config: Mutex>, + run: AtomicBool, + online: AtomicBool, +} + +/// Core ZeroTier service, which is sort of just a container for all the things. #[derive(Clone)] pub(crate) struct Service { - pub auth_token: Arc, - pub log: Arc, - _local_config: Arc>>, // Arc -> shared Mutex container so it can be changed globally - run: Arc, - pub online: Arc, - pub store: Arc, - pub node: Weak>, // weak since Node itself may hold a reference to this + pub(crate) log: Arc, + pub(crate) store: Arc, + _node: Weak>, + intl: Arc, } impl NodeEventHandler for Service { @@ -62,25 +63,23 @@ impl NodeEventHandler for Service { fn event(&self, event: Event, event_data: &[u8]) { match event { Event::Up => { - let _ = self.node.upgrade().map(|n: Arc>| { - d!(self.log, "node {} started up in data store '{}'", n.address().to_string(), self.store.base_path.to_str().unwrap()); - }); - }, + d!(self.log, "node started up in data store '{}'", self.store.base_path.to_str().unwrap()); + } Event::Down => { d!(self.log, "node shutting down."); - self.run.store(false, Ordering::Relaxed); - }, + self.intl.run.store(false, Ordering::Relaxed); + } Event::Online => { d!(self.log, "node is online."); - self.online.store(true, Ordering::Relaxed); - }, + self.intl.online.store(true, Ordering::Relaxed); + } Event::Offline => { d!(self.log, "node is offline."); - self.online.store(true, Ordering::Relaxed); - }, + self.intl.online.store(true, Ordering::Relaxed); + } Event::Trace => { if !event_data.is_empty() { @@ -100,9 +99,9 @@ impl NodeEventHandler for Service { }); }); } - }, + } - Event::UserMessage => {}, + Event::UserMessage => {} } } @@ -151,14 +150,263 @@ impl NodeEventHandler for Service { impl Service { #[inline(always)] - fn local_config(&self) -> Arc { - self._local_config.lock().unwrap().clone() + pub fn local_config(&self) -> Arc { + self.intl.local_config.lock().unwrap().clone() } #[inline(always)] - fn set_local_config(&self, new_lc: LocalConfig) { - *(self._local_config.lock().unwrap()) = Arc::new(new_lc); + pub fn set_local_config(&self, new_lc: LocalConfig) { + *(self.intl.local_config.lock().unwrap()) = Arc::new(new_lc); } + + /// Get the node running with this service. + /// This should never return None, but technically it could if Service + /// persisted beyond the life span of Node. Check this to be technically + /// pure "safe" Rust. + #[inline(always)] + pub fn node(&self) -> Option>> { + self._node.upgrade() + } + + #[inline(always)] + pub fn online(&self) -> bool { + self.intl.online.load(Ordering::Relaxed) + } + + pub fn shutdown(&self) { + self.intl.run.store(false, Ordering::Relaxed); + let _ = self.intl.interrupt.lock().unwrap().try_send(()); + } +} + +unsafe impl Send for Service {} + +unsafe impl Sync for Service {} + +async fn run_async(store: &Arc, auth_token: String, log: Arc, local_config: Arc) -> i32 { + let mut process_exit_value: i32 = 0; + + let mut udp_sockets: BTreeMap = BTreeMap::new(); + let mut web_listeners: BTreeMap = BTreeMap::new(); + let mut local_web_listeners: (Option, Option) = (None, None); // IPv4, IPv6 + + let (interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1); + let mut service = Service { + log: log.clone(), + store: store.clone(), + _node: Weak::new(), + intl: Arc::new(ServiceIntl { + auth_token, + interrupt: Mutex::new(interrupt_tx), + local_config: Mutex::new(local_config), + run: AtomicBool::new(true), + online: AtomicBool::new(false), + }), + }; + + let node = Node::new(service.clone(), ms_since_epoch()); + if node.is_err() { + log.fatal(format!("error initializing node: {}", node.err().unwrap().to_str())); + return 1; + } + let node = Arc::new(node.ok().unwrap()); + + service._node = Arc::downgrade(&node); + let service = service; // make immutable after setting node + + let mut now: i64 = ms_since_epoch(); + let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL; + let mut last_checked_config: i64 = 0; + let mut local_config = service.local_config(); + while service.intl.run.load(Ordering::Relaxed) { + let loop_start = ms_since_epoch(); + + // Write zerotier.port which is used by the CLI to know how to reach the HTTP API. + //let _ = store.write_port(local_config.settings.primary_port); + + // Wait for (1) loop delay elapsed, (2) a signal to interrupt delay now, or + // (3) an external signal to exit. + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(loop_delay)) => { + now = ms_since_epoch(); + let actual_delay = now - loop_start; + if actual_delay > ((loop_delay as i64) * 4_i64) { + l!(log, "likely sleep/wake detected due to excess delay, reestablishing links..."); + // TODO: handle likely sleep/wake or other system interruption + } + }, + _ = interrupt_rx.next() => { + d!(log, "inner loop delay interrupted!"); + if !service.intl.run.load(Ordering::Relaxed) { + break; + } + now = ms_since_epoch(); + }, + _ = tokio::signal::ctrl_c() => { + l!(log, "exit signal received, shutting down..."); + service.intl.run.store(false, Ordering::Relaxed); + break; + }, + } + + // Check every CONFIG_CHECK_INTERVAL for changes to either the system configuration + // or the node's local configuration and take actions as needed. + if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL { + last_checked_config = now; + + // Check for changes to local.conf. + let new_config = store.read_local_conf(true); + if new_config.is_ok() { + d!(log, "local.conf changed on disk, reloading."); + service.set_local_config(new_config.unwrap()); + } + + // Check for and handle configuration changes, some of which require inner loop restart. + let next_local_config = service.local_config(); + if local_config.settings.primary_port != next_local_config.settings.primary_port { + local_web_listeners.0 = None; + local_web_listeners.1 = None; + } + if local_config.settings.log.max_size != next_local_config.settings.log.max_size { + log.set_max_size(next_local_config.settings.log.max_size); + } + if local_config.settings.log.stderr != next_local_config.settings.log.stderr { + log.set_log_to_stderr(next_local_config.settings.log.stderr); + } + if local_config.settings.log.debug != next_local_config.settings.log.debug { + log.set_debug(next_local_config.settings.log.debug); + } + local_config = next_local_config; + + let mut system_addrs: BTreeMap = BTreeMap::new(); + getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| { + match addr.ip_scope() { + IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => { + if !local_config.settings.is_interface_blacklisted(dev) { + let mut a = addr.clone(); + a.set_port(local_config.settings.primary_port); + system_addrs.insert(a, String::from(dev)); + if local_config.settings.secondary_port.is_some() { + let mut a = addr.clone(); + a.set_port(local_config.settings.secondary_port.unwrap()); + system_addrs.insert(a, String::from(dev)); + } + } + } + _ => {} + } + }); + + let mut udp_sockets_to_close: Vec = Vec::new(); + for sock in udp_sockets.iter() { + if !system_addrs.contains_key(sock.0) { + udp_sockets_to_close.push(sock.0.clone()); + } + } + for k in udp_sockets_to_close.iter() { + l!(log, "unbinding UDP socket at {}", k.to_string()); + udp_sockets.remove(k); + } + + for addr in system_addrs.iter() { + if !udp_sockets.contains_key(addr.0) { + let _ = FastUDPSocket::new(addr.1.as_str(), addr.0, move |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| { + // TODO: incoming packet handler + }).map_or_else(|e| { + l!(log, "error binding UDP socket to {}: {}", addr.0.to_string(), e.to_string()); + }, |s| { + l!(log, "bound UDP socket at {}", addr.0.to_string()); + udp_sockets.insert(addr.0.clone(), s); + }); + } + } + + let mut udp_primary_port_bind_failure = true; + let mut udp_secondary_port_bind_failure = local_config.settings.secondary_port.is_some(); + for s in udp_sockets.iter() { + if s.0.port() == local_config.settings.primary_port { + udp_primary_port_bind_failure = false; + if !udp_secondary_port_bind_failure { + break; + } + } + if s.0.port() == local_config.settings.secondary_port.unwrap() { + udp_secondary_port_bind_failure = false; + if !udp_primary_port_bind_failure { + break; + } + } + } + if udp_primary_port_bind_failure { + if local_config.settings.auto_port_search { + // TODO: port hunting + } else { + l!(log, "WARNING: failed to bind to any address at primary port {}", local_config.settings.primary_port); + } + } + if udp_secondary_port_bind_failure { + if local_config.settings.auto_port_search { + // TODO: port hunting + } else { + l!(log, "WARNING: failed to bind to any address at secondary port {}", local_config.settings.secondary_port.unwrap_or(0)); + } + } + + let mut web_listeners_to_close: Vec = Vec::new(); + for l in web_listeners.iter() { + if !system_addrs.contains_key(l.0) { + web_listeners_to_close.push(l.0.clone()); + } + } + for k in web_listeners_to_close.iter() { + l!(log, "closing HTTP listener at {}", k.to_string()); + web_listeners.remove(k); + } + + for addr in system_addrs.iter() { + if addr.0.port() == local_config.settings.primary_port && !web_listeners.contains_key(addr.0) { + let sa = addr.0.to_socketaddr(); + if sa.is_some() { + let wl = WebListener::new(addr.1.as_str(), sa.unwrap(), &service).map_or_else(|e| { + l!(log, "error creating HTTP listener at {}: {}", addr.0.to_string(), e.to_string()); + }, |l| { + l!(log, "created HTTP listener at {}", addr.0.to_string()); + web_listeners.insert(addr.0.clone(), l); + }); + } + } + } + + if local_web_listeners.0.is_none() { + let _ = WebListener::new("", SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| { + local_web_listeners.0 = Some(wl); + }); + } + if local_web_listeners.1.is_none() { + let _ = WebListener::new("", SocketAddr::new(IpAddr::from(Ipv6Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| { + local_web_listeners.1 = Some(wl); + }); + } + if local_web_listeners.0.is_none() && local_web_listeners.1.is_none() { + l!(log, "error creating HTTP listener on 127.0.0.1/{} or ::1/{}", local_config.settings.primary_port, local_config.settings.primary_port); + } + } + + // Run background task handler in ZeroTier core. + loop_delay = node.process_background_tasks(now); + } + + l!(log, "shutting down normally..."); + + drop(udp_sockets); + drop(web_listeners); + drop(local_web_listeners); + drop(service); + drop(node); + + d!(log, "shutdown complete."); + + process_exit_value } pub(crate) fn run(store: &Arc, auth_token: Option) -> i32 { @@ -201,243 +449,10 @@ pub(crate) fn run(store: &Arc, auth_token: Option) -> i32 { log.fatal(format!("unable to write authtoken.secret to '{}'", store.base_path.to_str().unwrap())); return 1; } - let auth_token = Arc::new(auth_token); - let _ = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async { - let mut udp_sockets: BTreeMap = BTreeMap::new(); - let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1); - - let mut service = Service { - auth_token: auth_token.clone(), - log: log.clone(), - _local_config: Arc::new(Mutex::new(local_config)), - run: Arc::new(AtomicBool::new(true)), - online: Arc::new(AtomicBool::new(false)), - store: store.clone(), - node: Weak::new(), - }; - - let node = Node::new(service.clone(), ms_since_epoch()); - if node.is_err() { - log.fatal(format!("error initializing node: {}", node.err().unwrap().to_str())); - process_exit_value = 1; - return; - } - let node = Arc::new(node.ok().unwrap()); - - service.node = Arc::downgrade(&node); - let service = service; // make immutable after setting node - - let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL; - loop { - let mut local_config = service.local_config(); - - /* - let web_service = warp::service(warp::any() - .and(warp::path::end().map(|| { warp::reply::with_status("404", StatusCode::NOT_FOUND) }) - .or(warp::path("status") - .and(warp::addr::remote()) - .and(warp::method()) - .and(warp::header::headers_cloned()) - .and(warp::body::content_length_limit(1048576)) - .and(warp::body::bytes()) - .map(move |remote: Option, method: Method, headers: HeaderMap, post_data: Bytes| { s0.web_api_status(remote, method, headers, post_data) })) - .or(warp::path!("network" / String) - .and(warp::addr::remote()) - .and(warp::method()) - .and(warp::header::headers_cloned()) - .and(warp::body::content_length_limit(1048576)) - .and(warp::body::bytes()) - .map(move |network_str: String, remote: Option, method: Method, headers: HeaderMap, post_data: Bytes| { s1.web_api_network(network_str, remote, method, headers, post_data) })) - .or(warp::path!("peer" / String) - .and(warp::addr::remote()) - .and(warp::method()) - .and(warp::header::headers_cloned()) - .and(warp::body::content_length_limit(1048576)) - .and(warp::body::bytes()) - .map(move |peer_str: String, remote: Option, method: Method, headers: HeaderMap, post_data: Bytes| { s2.web_api_peer(peer_str, remote, method, headers, post_data) })) - ) - ); - l!(log, "starting local HTTP API server on 127.0.0.1/{}", local_config.settings.primary_port); - let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel(); - let (s0, s1, s2) = (service.clone(), service.clone(), service.clone()); // clones to move to closures - let warp_server = warp::serve(web_service).try_bind_with_graceful_shutdown((IpAddr::from([127_u8, 0_u8, 0_u8, 1_u8]), local_config.settings.primary_port), async { let _ = shutdown_rx.await; }); - if warp_server.is_err() { - l!(log, "ERROR: local API http server failed to bind to port {} or failed to start: {}, restarting inner loop...", local_config.settings.primary_port, warp_server.err().unwrap().to_string()); - break; - } - let warp_server = tokio::spawn(warp_server.unwrap().1); - */ - - // Write zerotier.port which is used by the CLI to know how to reach the HTTP API. - let _ = store.write_port(local_config.settings.primary_port); - - // The inner loop runs the web server in the "background" (async) while periodically - // scanning for significant configuration changes. Some major changes may require - // the inner loop to exit and be restarted. - let mut last_checked_config: i64 = 0; - d!(log, "local HTTP API server running, inner loop starting."); - loop { - let loop_start = ms_since_epoch(); - let mut now: i64 = 0; - - // Wait for (1) loop delay elapsed, (2) a signal to interrupt delay now, or - // (3) an external signal to exit. - tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(loop_delay)) => { - now = ms_since_epoch(); - let actual_delay = now - loop_start; - if actual_delay > ((loop_delay as i64) * 4_i64) { - l!(log, "likely sleep/wake detected due to excess delay, reestablishing links..."); - // TODO: handle likely sleep/wake or other system interruption - } - }, - _ = interrupt_rx.next() => { - d!(log, "inner loop delay interrupted!"); - now = ms_since_epoch(); - }, - _ = tokio::signal::ctrl_c() => { - l!(log, "exit signal received, shutting down..."); - service.run.store(false, Ordering::Relaxed); - break; - } - } - - // Check every CONFIG_CHECK_INTERVAL for changes to either the system configuration - // or the node's local configuration and take actions as needed. - if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL { - last_checked_config = now; - - // Check for changes to local.conf. - let new_config = store.read_local_conf(true); - if new_config.is_ok() { - d!(log, "local.conf changed on disk, reloading."); - service.set_local_config(new_config.unwrap()); - } - - // Check for and handle configuration changes, some of which require inner loop restart. - let next_local_config = service.local_config(); - if local_config.settings.primary_port != next_local_config.settings.primary_port { - break; - } - if local_config.settings.log.max_size != next_local_config.settings.log.max_size { - log.set_max_size(next_local_config.settings.log.max_size); - } - if local_config.settings.log.stderr != next_local_config.settings.log.stderr { - log.set_log_to_stderr(next_local_config.settings.log.stderr); - } - if local_config.settings.log.debug != next_local_config.settings.log.debug { - log.set_debug(next_local_config.settings.log.debug); - } - local_config = next_local_config; - - // Enumerate all useful addresses bound to interfaces on the system. - let mut system_addrs: BTreeMap = BTreeMap::new(); - getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| { - match addr.ip_scope() { - IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => { - if !local_config.settings.is_interface_blacklisted(dev) { - let mut a = addr.clone(); - a.set_port(local_config.settings.primary_port); - system_addrs.insert(a, String::from(dev)); - if local_config.settings.secondary_port.is_some() { - let mut a = addr.clone(); - a.set_port(local_config.settings.secondary_port.unwrap()); - system_addrs.insert(a, String::from(dev)); - } - } - } - _ => {} - } - }); - - // Drop bound sockets that are no longer valid or are now blacklisted. - let mut udp_sockets_to_close: Vec = Vec::new(); - for sock in udp_sockets.iter() { - if !system_addrs.contains_key(sock.0) { - udp_sockets_to_close.push(sock.0.clone()); - } - } - for k in udp_sockets_to_close.iter() { - l!(log, "unbinding UDP socket at {} (no longer appears to be present or port has changed)", k.to_string()); - udp_sockets.remove(k); - } - - // Create sockets for unbound addresses. - for addr in system_addrs.iter() { - if !udp_sockets.contains_key(addr.0) { - let _ = FastUDPSocket::new(addr.1.as_str(), addr.0, move |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| { - // TODO: incoming packet handler - }).map_or_else(|e| { - l!(log, "error binding UDP socket to {}: {}", addr.0.to_string(), e.to_string()); - }, |s| { - l!(log, "bound UDP socket at {}", addr.0.to_string()); - udp_sockets.insert(addr.0.clone(), s); - }); - } - } - - // Determine if primary and secondary port (if secondary enabled) failed to - // bind to any interface. - let mut primary_port_bind_failure = true; - let mut secondary_port_bind_failure = local_config.settings.secondary_port.is_some(); - for s in udp_sockets.iter() { - if s.0.port() == local_config.settings.primary_port { - primary_port_bind_failure = false; - if !secondary_port_bind_failure { - break; - } - } - if s.0.port() == local_config.settings.secondary_port.unwrap() { - secondary_port_bind_failure = false; - if !primary_port_bind_failure { - break; - } - } - } - if primary_port_bind_failure { - if local_config.settings.auto_port_search { - // TODO: port hunting - } else { - l!(log, "WARNING: failed to bind to any address at primary port {} (will try again)", local_config.settings.primary_port); - } - } - if secondary_port_bind_failure { - if local_config.settings.auto_port_search { - // TODO: port hunting - } else { - l!(log, "WARNING: failed to bind to any address at secondary port {} (will try again)", local_config.settings.secondary_port.unwrap_or(0)); - } - } - } - - if !service.run.load(Ordering::Relaxed) { - break; - } - - // Run background task handler in ZeroTier core. - loop_delay = node.process_background_tasks(now); - } - - d!(log, "inner loop exited, shutting down local API HTTP server..."); - - // Gracefully shut down the local web server. - //let _ = shutdown_tx.send(()); - //let _ = warp_server.await; - - // Sleep for a brief period of time to prevent thrashing if some invalid - // state is hit that causes the inner loop to keep breaking. - if !service.run.load(Ordering::Relaxed) { - d!(log, "exiting."); - break; - } - let _ = tokio::time::sleep(Duration::from_secs(1)).await; - if !service.run.load(Ordering::Relaxed) { - d!(log, "exiting."); - break; - } - } - }); + let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + process_exit_value = rt.block_on(async move { run_async(store, auth_token, log, local_config).await }); + rt.shutdown_timeout(Duration::from_millis(500)); process_exit_value } diff --git a/rust-zerotier-service/src/weblistener.rs b/rust-zerotier-service/src/weblistener.rs index 6a7acc43b..070057798 100644 --- a/rust-zerotier-service/src/weblistener.rs +++ b/rust-zerotier-service/src/weblistener.rs @@ -11,91 +11,103 @@ */ /****/ -use std::net::TcpListener; -use std::time::Duration; +use std::any::Any; +use std::cell::RefCell; use std::convert::Infallible; +use std::net::{SocketAddr, TcpListener}; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +use futures::TryFutureExt; +use hyper::{Body, Request, Response}; +use hyper::server::Server; +use hyper::service::{make_service_fn, service_fn}; +use net2::TcpBuilder; +#[cfg(unix)] use net2::unix::UnixTcpBuilderExt; use zerotier_core::InetAddress; -use hyper::{Request, Response, Body}; -use hyper::service::{make_service_fn, service_fn}; -use futures::Future; -use futures::future::{AbortHandle, abortable}; -use net2::TcpBuilder; use crate::service::Service; +#[inline(always)] +async fn web_handler(service: Service, req: Request) -> Result, Infallible> { + Ok(Response::new("Hello, World".into())) +} + /// Listener for http connections to the API or for TCP P2P. pub(crate) struct WebListener { - server: dyn Future, - abort_handle: AbortHandle, + shutdown_tx: RefCell>>, + server: Box, } impl WebListener { /// Create a new "background" TCP WebListener using the current tokio reactor async runtime. - pub fn new(_device_name: &str, addr: &InetAddress, service: Arc) -> Result { - let addr = addr.to_socketaddr(); - if addr.is_none() { - return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "invalid address")); - } - let addr = addr.unwrap(); - + pub fn new(_device_name: &str, addr: SocketAddr, service: &Service) -> Result> { let listener = if addr.is_ipv4() { let l = TcpBuilder::new_v4(); if l.is_err() { - return Err(l.err().unwrap()); + return Err(Box::new(l.err().unwrap())); } - l.unwrap() + let l = l.unwrap(); + #[cfg(unix)] { + let _ = l.reuse_port(true); + } + l } else { let l = TcpBuilder::new_v6(); if l.is_err() { - return Err(l.err().unwrap()); + return Err(Box::new(l.err().unwrap())); } let l = l.unwrap(); - l.only_v6(true); + let _ = l.only_v6(true); + #[cfg(unix)] { + let _ = l.reuse_port(true); + } l }; + // TODO: bind to device on Linux? let listener = listener.bind(addr); if listener.is_err() { - return Err(listener.err().unwrap()); + return Err(Box::new(listener.err().unwrap())); } let listener = listener.unwrap().listen(128); if listener.is_err() { - return Err(listener.err().unwrap()); + return Err(Box::new(listener.err().unwrap())); } let listener = listener.unwrap(); - let builder = hyper::server::Server::from_tcp(listener); + let builder = Server::from_tcp(listener); if builder.is_err() { - return Err(builder.err().unwrap()); + return Err(Box::new(builder.err().unwrap())); } let builder = builder.unwrap() - .executor(tokio::spawn) .http1_half_close(false) .http1_keepalive(true) - .http1_max_buf_size(131072) - .http2_keep_alive_interval(Duration::from_secs(30)) - .http2_keep_alive_timeout(Duration::from_secs(90)) - .http2_adaptive_window(true) - .http2_max_frame_size(131072) - .http2_max_concurrent_streams(16); + .http1_max_buf_size(131072); - let (server, abort_handle) = abortable(builder.serve(make_service_fn(|_| async move { - Ok::<_, Infallible>(service_fn(|req: Request| -> Result, Infallible> async move { - Ok(Response::new("Hello, World".into())) - })) - }))); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let service = service.clone(); + let server = builder.serve(make_service_fn(move |_| { + let service = service.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req: Request| { + let service = service.clone(); + async move { + web_handler(service, req).await + } + })) + } + })).with_graceful_shutdown(async { let _ = shutdown_rx.await; }); Ok(WebListener { - server, - abort_handle, + shutdown_tx: RefCell::new(Some(shutdown_tx)), + server: Box::new(server), }) } } impl Drop for WebListener { fn drop(&mut self) { - self.abort_handle.abort(); - self.server.await; + let _ = self.shutdown_tx.take().map(|tx| { tx.send(()); }); } }