Web server...

This commit is contained in:
Adam Ierymenko 2021-02-26 01:16:24 -05:00
parent 0f49c7510b
commit e1795bacef
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
7 changed files with 349 additions and 384 deletions

View file

@ -113,14 +113,11 @@ bool Endpoint::fromString(const char *s) noexcept
return false;
} break;
}
} else {
} else if (strchr(s, '/') != nullptr) {
// IP/port is parsed as an IP_UDP endpoint for backward compatibility.
if (strchr(s, '/')) {
return asInetAddress(this->value.ss).fromString(s);
} else {
return false;
}
return asInetAddress(this->value.ss).fromString(s);
}
return false;
}
int Endpoint::marshal(uint8_t data[ZT_ENDPOINT_MARSHAL_SIZE_MAX]) const noexcept

View file

@ -25,7 +25,7 @@
#include <algorithm>
#include <utility>
#ifdef __GCC__
#if defined(__GCC__) || defined(__GNUC__)
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#endif

View file

@ -85,7 +85,7 @@ impl Endpoint {
/// Get a reference to the InetAddress in this endpoint or None if this is not of a relevant type.
pub fn as_inetaddress(&self) -> Option<&InetAddress> {
match self.type_ {
EndpointType::Ip | EndpointType::IpUdp | EndpointType::IpTcp | EndpointType::IpHttp => {
EndpointType::Ip | EndpointType::IpUdp | EndpointType::IpTcp | EndpointType::IpTcpWs => {
unsafe {
Some(InetAddress::transmute_capi(&self.capi.value.ia))
}
@ -109,7 +109,11 @@ impl ToString for Endpoint {
impl PartialEq for Endpoint {
fn eq(&self, other: &Endpoint) -> bool {
self.to_string() == other.to_string()
if self.type_ == other.type_ {
self.to_string() == other.to_string()
} else {
false
}
}
}

View file

@ -180,32 +180,6 @@ dependencies = [
"slab",
]
[[package]]
name = "h2"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b67e66362108efccd8ac053abafc8b7a8d86a37e6e48fc4f6f7485eb5e9e6a5"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
"tracing-futures",
]
[[package]]
name = "hashbrown"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
[[package]]
name = "hermit-abi"
version = "0.1.17"
@ -264,7 +238,6 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
@ -278,16 +251,6 @@ dependencies = [
"want",
]
[[package]]
name = "indexmap"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb1fa934250de4de8aef298d81c729a7d33d8c239daa3a7575e6b92bfc7313b"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
name = "itoa"
version = "0.4.7"
@ -589,9 +552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ca04cec6ff2474c638057b65798f60ac183e5e79d3448bb7163d36a39cff6ec"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
"mio",
"once_cell",
"pin-project-lite",
@ -611,20 +572,6 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-util"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebb7cb2f00c5ae8df755b252306272cd1790d39728363936e01827e11f0b017b"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tower-service"
version = "0.3.0"
@ -651,16 +598,6 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "try-lock"
version = "0.2.3"

View file

@ -20,8 +20,8 @@ hex = "0"
lazy_static = "1"
num-traits = "0"
num-derive = "0"
hyper = { version = "0", features = ["http1", "http2", "runtime", "server", "client", "tcp", "stream"] }
net2 = "0"
hyper = { version = "0", features = ["http1", "runtime", "server", "client", "tcp", "stream"] }
net2 = "0.2"
[target."cfg(windows)".dependencies]
winapi = { version = "0.3.9", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

@ -12,18 +12,15 @@
/****/
use std::collections::BTreeMap;
use std::net::{SocketAddr, Ipv4Addr, IpAddr, Ipv6Addr};
use std::str::FromStr;
use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, Ordering, AtomicPtr};
use std::time::Duration;
//use futures::stream::StreamExt;
//use warp::{Filter, Reply};
//use warp::http::{HeaderMap, Method, StatusCode};
//use warp::hyper::body::Bytes;
use zerotier_core::*;
use zerotier_core::trace::{TraceEvent, TraceEventLayer};
use futures::StreamExt;
use crate::fastudpsocket::*;
use crate::getifaddrs;
@ -32,23 +29,27 @@ use crate::log::Log;
use crate::network::Network;
use crate::store::Store;
use crate::utils::ms_since_epoch;
use crate::weblistener::WebListener;
/// How often to check for major configuration changes. This shouldn't happen
/// too often since it uses a bit of CPU.
const CONFIG_CHECK_INTERVAL: i64 = 5000;
/// Core ZeroTier service.
/// This object must be clonable across threads, so all its innards are in
/// Arc containers. It's probably faster to clone all these Arcs when new
/// threads are created (a rare event) so we only have to dereference each
/// Arc once for common events like packet receipt.
struct ServiceIntl {
auth_token: String,
interrupt: Mutex<futures::channel::mpsc::Sender<()>>,
local_config: Mutex<Arc<LocalConfig>>,
run: AtomicBool,
online: AtomicBool,
}
/// Core ZeroTier service, which is sort of just a container for all the things.
#[derive(Clone)]
pub(crate) struct Service {
pub auth_token: Arc<String>,
pub log: Arc<Log>,
_local_config: Arc<Mutex<Arc<LocalConfig>>>, // Arc -> shared Mutex container so it can be changed globally
run: Arc<AtomicBool>,
pub online: Arc<AtomicBool>,
pub store: Arc<Store>,
pub node: Weak<Node<Service, Network>>, // weak since Node itself may hold a reference to this
pub(crate) log: Arc<Log>,
pub(crate) store: Arc<Store>,
_node: Weak<Node<Service, Network>>,
intl: Arc<ServiceIntl>,
}
impl NodeEventHandler<Network> for Service {
@ -62,25 +63,23 @@ impl NodeEventHandler<Network> for Service {
fn event(&self, event: Event, event_data: &[u8]) {
match event {
Event::Up => {
let _ = self.node.upgrade().map(|n: Arc<Node<Service, Network>>| {
d!(self.log, "node {} started up in data store '{}'", n.address().to_string(), self.store.base_path.to_str().unwrap());
});
},
d!(self.log, "node started up in data store '{}'", self.store.base_path.to_str().unwrap());
}
Event::Down => {
d!(self.log, "node shutting down.");
self.run.store(false, Ordering::Relaxed);
},
self.intl.run.store(false, Ordering::Relaxed);
}
Event::Online => {
d!(self.log, "node is online.");
self.online.store(true, Ordering::Relaxed);
},
self.intl.online.store(true, Ordering::Relaxed);
}
Event::Offline => {
d!(self.log, "node is offline.");
self.online.store(true, Ordering::Relaxed);
},
self.intl.online.store(true, Ordering::Relaxed);
}
Event::Trace => {
if !event_data.is_empty() {
@ -100,9 +99,9 @@ impl NodeEventHandler<Network> for Service {
});
});
}
},
}
Event::UserMessage => {},
Event::UserMessage => {}
}
}
@ -151,14 +150,263 @@ impl NodeEventHandler<Network> for Service {
impl Service {
#[inline(always)]
fn local_config(&self) -> Arc<LocalConfig> {
self._local_config.lock().unwrap().clone()
pub fn local_config(&self) -> Arc<LocalConfig> {
self.intl.local_config.lock().unwrap().clone()
}
#[inline(always)]
fn set_local_config(&self, new_lc: LocalConfig) {
*(self._local_config.lock().unwrap()) = Arc::new(new_lc);
pub fn set_local_config(&self, new_lc: LocalConfig) {
*(self.intl.local_config.lock().unwrap()) = Arc::new(new_lc);
}
/// Get the node running with this service.
/// This should never return None, but technically it could if Service
/// persisted beyond the life span of Node. Check this to be technically
/// pure "safe" Rust.
#[inline(always)]
pub fn node(&self) -> Option<Arc<Node<Service, Network>>> {
self._node.upgrade()
}
#[inline(always)]
pub fn online(&self) -> bool {
self.intl.online.load(Ordering::Relaxed)
}
pub fn shutdown(&self) {
self.intl.run.store(false, Ordering::Relaxed);
let _ = self.intl.interrupt.lock().unwrap().try_send(());
}
}
unsafe impl Send for Service {}
unsafe impl Sync for Service {}
async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_config: Arc<LocalConfig>) -> i32 {
let mut process_exit_value: i32 = 0;
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket> = BTreeMap::new();
let mut web_listeners: BTreeMap<InetAddress, WebListener> = BTreeMap::new();
let mut local_web_listeners: (Option<WebListener>, Option<WebListener>) = (None, None); // IPv4, IPv6
let (interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1);
let mut service = Service {
log: log.clone(),
store: store.clone(),
_node: Weak::new(),
intl: Arc::new(ServiceIntl {
auth_token,
interrupt: Mutex::new(interrupt_tx),
local_config: Mutex::new(local_config),
run: AtomicBool::new(true),
online: AtomicBool::new(false),
}),
};
let node = Node::new(service.clone(), ms_since_epoch());
if node.is_err() {
log.fatal(format!("error initializing node: {}", node.err().unwrap().to_str()));
return 1;
}
let node = Arc::new(node.ok().unwrap());
service._node = Arc::downgrade(&node);
let service = service; // make immutable after setting node
let mut now: i64 = ms_since_epoch();
let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL;
let mut last_checked_config: i64 = 0;
let mut local_config = service.local_config();
while service.intl.run.load(Ordering::Relaxed) {
let loop_start = ms_since_epoch();
// Write zerotier.port which is used by the CLI to know how to reach the HTTP API.
//let _ = store.write_port(local_config.settings.primary_port);
// Wait for (1) loop delay elapsed, (2) a signal to interrupt delay now, or
// (3) an external signal to exit.
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(loop_delay)) => {
now = ms_since_epoch();
let actual_delay = now - loop_start;
if actual_delay > ((loop_delay as i64) * 4_i64) {
l!(log, "likely sleep/wake detected due to excess delay, reestablishing links...");
// TODO: handle likely sleep/wake or other system interruption
}
},
_ = interrupt_rx.next() => {
d!(log, "inner loop delay interrupted!");
if !service.intl.run.load(Ordering::Relaxed) {
break;
}
now = ms_since_epoch();
},
_ = tokio::signal::ctrl_c() => {
l!(log, "exit signal received, shutting down...");
service.intl.run.store(false, Ordering::Relaxed);
break;
},
}
// Check every CONFIG_CHECK_INTERVAL for changes to either the system configuration
// or the node's local configuration and take actions as needed.
if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL {
last_checked_config = now;
// Check for changes to local.conf.
let new_config = store.read_local_conf(true);
if new_config.is_ok() {
d!(log, "local.conf changed on disk, reloading.");
service.set_local_config(new_config.unwrap());
}
// Check for and handle configuration changes, some of which require inner loop restart.
let next_local_config = service.local_config();
if local_config.settings.primary_port != next_local_config.settings.primary_port {
local_web_listeners.0 = None;
local_web_listeners.1 = None;
}
if local_config.settings.log.max_size != next_local_config.settings.log.max_size {
log.set_max_size(next_local_config.settings.log.max_size);
}
if local_config.settings.log.stderr != next_local_config.settings.log.stderr {
log.set_log_to_stderr(next_local_config.settings.log.stderr);
}
if local_config.settings.log.debug != next_local_config.settings.log.debug {
log.set_debug(next_local_config.settings.log.debug);
}
local_config = next_local_config;
let mut system_addrs: BTreeMap<InetAddress, String> = BTreeMap::new();
getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| {
match addr.ip_scope() {
IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => {
if !local_config.settings.is_interface_blacklisted(dev) {
let mut a = addr.clone();
a.set_port(local_config.settings.primary_port);
system_addrs.insert(a, String::from(dev));
if local_config.settings.secondary_port.is_some() {
let mut a = addr.clone();
a.set_port(local_config.settings.secondary_port.unwrap());
system_addrs.insert(a, String::from(dev));
}
}
}
_ => {}
}
});
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) {
udp_sockets_to_close.push(sock.0.clone());
}
}
for k in udp_sockets_to_close.iter() {
l!(log, "unbinding UDP socket at {}", k.to_string());
udp_sockets.remove(k);
}
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let _ = FastUDPSocket::new(addr.1.as_str(), addr.0, move |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
// TODO: incoming packet handler
}).map_or_else(|e| {
l!(log, "error binding UDP socket to {}: {}", addr.0.to_string(), e.to_string());
}, |s| {
l!(log, "bound UDP socket at {}", addr.0.to_string());
udp_sockets.insert(addr.0.clone(), s);
});
}
}
let mut udp_primary_port_bind_failure = true;
let mut udp_secondary_port_bind_failure = local_config.settings.secondary_port.is_some();
for s in udp_sockets.iter() {
if s.0.port() == local_config.settings.primary_port {
udp_primary_port_bind_failure = false;
if !udp_secondary_port_bind_failure {
break;
}
}
if s.0.port() == local_config.settings.secondary_port.unwrap() {
udp_secondary_port_bind_failure = false;
if !udp_primary_port_bind_failure {
break;
}
}
}
if udp_primary_port_bind_failure {
if local_config.settings.auto_port_search {
// TODO: port hunting
} else {
l!(log, "WARNING: failed to bind to any address at primary port {}", local_config.settings.primary_port);
}
}
if udp_secondary_port_bind_failure {
if local_config.settings.auto_port_search {
// TODO: port hunting
} else {
l!(log, "WARNING: failed to bind to any address at secondary port {}", local_config.settings.secondary_port.unwrap_or(0));
}
}
let mut web_listeners_to_close: Vec<InetAddress> = Vec::new();
for l in web_listeners.iter() {
if !system_addrs.contains_key(l.0) {
web_listeners_to_close.push(l.0.clone());
}
}
for k in web_listeners_to_close.iter() {
l!(log, "closing HTTP listener at {}", k.to_string());
web_listeners.remove(k);
}
for addr in system_addrs.iter() {
if addr.0.port() == local_config.settings.primary_port && !web_listeners.contains_key(addr.0) {
let sa = addr.0.to_socketaddr();
if sa.is_some() {
let wl = WebListener::new(addr.1.as_str(), sa.unwrap(), &service).map_or_else(|e| {
l!(log, "error creating HTTP listener at {}: {}", addr.0.to_string(), e.to_string());
}, |l| {
l!(log, "created HTTP listener at {}", addr.0.to_string());
web_listeners.insert(addr.0.clone(), l);
});
}
}
}
if local_web_listeners.0.is_none() {
let _ = WebListener::new("", SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| {
local_web_listeners.0 = Some(wl);
});
}
if local_web_listeners.1.is_none() {
let _ = WebListener::new("", SocketAddr::new(IpAddr::from(Ipv6Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| {
local_web_listeners.1 = Some(wl);
});
}
if local_web_listeners.0.is_none() && local_web_listeners.1.is_none() {
l!(log, "error creating HTTP listener on 127.0.0.1/{} or ::1/{}", local_config.settings.primary_port, local_config.settings.primary_port);
}
}
// Run background task handler in ZeroTier core.
loop_delay = node.process_background_tasks(now);
}
l!(log, "shutting down normally...");
drop(udp_sockets);
drop(web_listeners);
drop(local_web_listeners);
drop(service);
drop(node);
d!(log, "shutdown complete.");
process_exit_value
}
pub(crate) fn run(store: &Arc<Store>, auth_token: Option<String>) -> i32 {
@ -201,243 +449,10 @@ pub(crate) fn run(store: &Arc<Store>, auth_token: Option<String>) -> i32 {
log.fatal(format!("unable to write authtoken.secret to '{}'", store.base_path.to_str().unwrap()));
return 1;
}
let auth_token = Arc::new(auth_token);
let _ = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket> = BTreeMap::new();
let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1);
let mut service = Service {
auth_token: auth_token.clone(),
log: log.clone(),
_local_config: Arc::new(Mutex::new(local_config)),
run: Arc::new(AtomicBool::new(true)),
online: Arc::new(AtomicBool::new(false)),
store: store.clone(),
node: Weak::new(),
};
let node = Node::new(service.clone(), ms_since_epoch());
if node.is_err() {
log.fatal(format!("error initializing node: {}", node.err().unwrap().to_str()));
process_exit_value = 1;
return;
}
let node = Arc::new(node.ok().unwrap());
service.node = Arc::downgrade(&node);
let service = service; // make immutable after setting node
let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL;
loop {
let mut local_config = service.local_config();
/*
let web_service = warp::service(warp::any()
.and(warp::path::end().map(|| { warp::reply::with_status("404", StatusCode::NOT_FOUND) })
.or(warp::path("status")
.and(warp::addr::remote())
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::content_length_limit(1048576))
.and(warp::body::bytes())
.map(move |remote: Option<SocketAddr>, method: Method, headers: HeaderMap, post_data: Bytes| { s0.web_api_status(remote, method, headers, post_data) }))
.or(warp::path!("network" / String)
.and(warp::addr::remote())
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::content_length_limit(1048576))
.and(warp::body::bytes())
.map(move |network_str: String, remote: Option<SocketAddr>, method: Method, headers: HeaderMap, post_data: Bytes| { s1.web_api_network(network_str, remote, method, headers, post_data) }))
.or(warp::path!("peer" / String)
.and(warp::addr::remote())
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::content_length_limit(1048576))
.and(warp::body::bytes())
.map(move |peer_str: String, remote: Option<SocketAddr>, method: Method, headers: HeaderMap, post_data: Bytes| { s2.web_api_peer(peer_str, remote, method, headers, post_data) }))
)
);
l!(log, "starting local HTTP API server on 127.0.0.1/{}", local_config.settings.primary_port);
let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel();
let (s0, s1, s2) = (service.clone(), service.clone(), service.clone()); // clones to move to closures
let warp_server = warp::serve(web_service).try_bind_with_graceful_shutdown((IpAddr::from([127_u8, 0_u8, 0_u8, 1_u8]), local_config.settings.primary_port), async { let _ = shutdown_rx.await; });
if warp_server.is_err() {
l!(log, "ERROR: local API http server failed to bind to port {} or failed to start: {}, restarting inner loop...", local_config.settings.primary_port, warp_server.err().unwrap().to_string());
break;
}
let warp_server = tokio::spawn(warp_server.unwrap().1);
*/
// Write zerotier.port which is used by the CLI to know how to reach the HTTP API.
let _ = store.write_port(local_config.settings.primary_port);
// The inner loop runs the web server in the "background" (async) while periodically
// scanning for significant configuration changes. Some major changes may require
// the inner loop to exit and be restarted.
let mut last_checked_config: i64 = 0;
d!(log, "local HTTP API server running, inner loop starting.");
loop {
let loop_start = ms_since_epoch();
let mut now: i64 = 0;
// Wait for (1) loop delay elapsed, (2) a signal to interrupt delay now, or
// (3) an external signal to exit.
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(loop_delay)) => {
now = ms_since_epoch();
let actual_delay = now - loop_start;
if actual_delay > ((loop_delay as i64) * 4_i64) {
l!(log, "likely sleep/wake detected due to excess delay, reestablishing links...");
// TODO: handle likely sleep/wake or other system interruption
}
},
_ = interrupt_rx.next() => {
d!(log, "inner loop delay interrupted!");
now = ms_since_epoch();
},
_ = tokio::signal::ctrl_c() => {
l!(log, "exit signal received, shutting down...");
service.run.store(false, Ordering::Relaxed);
break;
}
}
// Check every CONFIG_CHECK_INTERVAL for changes to either the system configuration
// or the node's local configuration and take actions as needed.
if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL {
last_checked_config = now;
// Check for changes to local.conf.
let new_config = store.read_local_conf(true);
if new_config.is_ok() {
d!(log, "local.conf changed on disk, reloading.");
service.set_local_config(new_config.unwrap());
}
// Check for and handle configuration changes, some of which require inner loop restart.
let next_local_config = service.local_config();
if local_config.settings.primary_port != next_local_config.settings.primary_port {
break;
}
if local_config.settings.log.max_size != next_local_config.settings.log.max_size {
log.set_max_size(next_local_config.settings.log.max_size);
}
if local_config.settings.log.stderr != next_local_config.settings.log.stderr {
log.set_log_to_stderr(next_local_config.settings.log.stderr);
}
if local_config.settings.log.debug != next_local_config.settings.log.debug {
log.set_debug(next_local_config.settings.log.debug);
}
local_config = next_local_config;
// Enumerate all useful addresses bound to interfaces on the system.
let mut system_addrs: BTreeMap<InetAddress, String> = BTreeMap::new();
getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| {
match addr.ip_scope() {
IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => {
if !local_config.settings.is_interface_blacklisted(dev) {
let mut a = addr.clone();
a.set_port(local_config.settings.primary_port);
system_addrs.insert(a, String::from(dev));
if local_config.settings.secondary_port.is_some() {
let mut a = addr.clone();
a.set_port(local_config.settings.secondary_port.unwrap());
system_addrs.insert(a, String::from(dev));
}
}
}
_ => {}
}
});
// Drop bound sockets that are no longer valid or are now blacklisted.
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) {
udp_sockets_to_close.push(sock.0.clone());
}
}
for k in udp_sockets_to_close.iter() {
l!(log, "unbinding UDP socket at {} (no longer appears to be present or port has changed)", k.to_string());
udp_sockets.remove(k);
}
// Create sockets for unbound addresses.
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let _ = FastUDPSocket::new(addr.1.as_str(), addr.0, move |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
// TODO: incoming packet handler
}).map_or_else(|e| {
l!(log, "error binding UDP socket to {}: {}", addr.0.to_string(), e.to_string());
}, |s| {
l!(log, "bound UDP socket at {}", addr.0.to_string());
udp_sockets.insert(addr.0.clone(), s);
});
}
}
// Determine if primary and secondary port (if secondary enabled) failed to
// bind to any interface.
let mut primary_port_bind_failure = true;
let mut secondary_port_bind_failure = local_config.settings.secondary_port.is_some();
for s in udp_sockets.iter() {
if s.0.port() == local_config.settings.primary_port {
primary_port_bind_failure = false;
if !secondary_port_bind_failure {
break;
}
}
if s.0.port() == local_config.settings.secondary_port.unwrap() {
secondary_port_bind_failure = false;
if !primary_port_bind_failure {
break;
}
}
}
if primary_port_bind_failure {
if local_config.settings.auto_port_search {
// TODO: port hunting
} else {
l!(log, "WARNING: failed to bind to any address at primary port {} (will try again)", local_config.settings.primary_port);
}
}
if secondary_port_bind_failure {
if local_config.settings.auto_port_search {
// TODO: port hunting
} else {
l!(log, "WARNING: failed to bind to any address at secondary port {} (will try again)", local_config.settings.secondary_port.unwrap_or(0));
}
}
}
if !service.run.load(Ordering::Relaxed) {
break;
}
// Run background task handler in ZeroTier core.
loop_delay = node.process_background_tasks(now);
}
d!(log, "inner loop exited, shutting down local API HTTP server...");
// Gracefully shut down the local web server.
//let _ = shutdown_tx.send(());
//let _ = warp_server.await;
// Sleep for a brief period of time to prevent thrashing if some invalid
// state is hit that causes the inner loop to keep breaking.
if !service.run.load(Ordering::Relaxed) {
d!(log, "exiting.");
break;
}
let _ = tokio::time::sleep(Duration::from_secs(1)).await;
if !service.run.load(Ordering::Relaxed) {
d!(log, "exiting.");
break;
}
}
});
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
process_exit_value = rt.block_on(async move { run_async(store, auth_token, log, local_config).await });
rt.shutdown_timeout(Duration::from_millis(500));
process_exit_value
}

View file

@ -11,91 +11,103 @@
*/
/****/
use std::net::TcpListener;
use std::time::Duration;
use std::any::Any;
use std::cell::RefCell;
use std::convert::Infallible;
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use futures::TryFutureExt;
use hyper::{Body, Request, Response};
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use net2::TcpBuilder;
#[cfg(unix)] use net2::unix::UnixTcpBuilderExt;
use zerotier_core::InetAddress;
use hyper::{Request, Response, Body};
use hyper::service::{make_service_fn, service_fn};
use futures::Future;
use futures::future::{AbortHandle, abortable};
use net2::TcpBuilder;
use crate::service::Service;
#[inline(always)]
async fn web_handler(service: Service, req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new("Hello, World".into()))
}
/// Listener for http connections to the API or for TCP P2P.
pub(crate) struct WebListener {
server: dyn Future,
abort_handle: AbortHandle,
shutdown_tx: RefCell<Option<tokio::sync::oneshot::Sender<()>>>,
server: Box<dyn Any>,
}
impl WebListener {
/// Create a new "background" TCP WebListener using the current tokio reactor async runtime.
pub fn new(_device_name: &str, addr: &InetAddress, service: Arc<Service>) -> Result<WebListener, dyn std::error::Error> {
let addr = addr.to_socketaddr();
if addr.is_none() {
return Err(std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, "invalid address"));
}
let addr = addr.unwrap();
pub fn new(_device_name: &str, addr: SocketAddr, service: &Service) -> Result<WebListener, Box<dyn std::error::Error>> {
let listener = if addr.is_ipv4() {
let l = TcpBuilder::new_v4();
if l.is_err() {
return Err(l.err().unwrap());
return Err(Box::new(l.err().unwrap()));
}
l.unwrap()
let l = l.unwrap();
#[cfg(unix)] {
let _ = l.reuse_port(true);
}
l
} else {
let l = TcpBuilder::new_v6();
if l.is_err() {
return Err(l.err().unwrap());
return Err(Box::new(l.err().unwrap()));
}
let l = l.unwrap();
l.only_v6(true);
let _ = l.only_v6(true);
#[cfg(unix)] {
let _ = l.reuse_port(true);
}
l
};
// TODO: bind to device on Linux?
let listener = listener.bind(addr);
if listener.is_err() {
return Err(listener.err().unwrap());
return Err(Box::new(listener.err().unwrap()));
}
let listener = listener.unwrap().listen(128);
if listener.is_err() {
return Err(listener.err().unwrap());
return Err(Box::new(listener.err().unwrap()));
}
let listener = listener.unwrap();
let builder = hyper::server::Server::from_tcp(listener);
let builder = Server::from_tcp(listener);
if builder.is_err() {
return Err(builder.err().unwrap());
return Err(Box::new(builder.err().unwrap()));
}
let builder = builder.unwrap()
.executor(tokio::spawn)
.http1_half_close(false)
.http1_keepalive(true)
.http1_max_buf_size(131072)
.http2_keep_alive_interval(Duration::from_secs(30))
.http2_keep_alive_timeout(Duration::from_secs(90))
.http2_adaptive_window(true)
.http2_max_frame_size(131072)
.http2_max_concurrent_streams(16);
.http1_max_buf_size(131072);
let (server, abort_handle) = abortable(builder.serve(make_service_fn(|_| async move {
Ok::<_, Infallible>(service_fn(|req: Request<Body>| -> Result<Response<Body>, Infallible> async move {
Ok(Response::new("Hello, World".into()))
}))
})));
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let service = service.clone();
let server = builder.serve(make_service_fn(move |_| {
let service = service.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
let service = service.clone();
async move {
web_handler(service, req).await
}
}))
}
})).with_graceful_shutdown(async { let _ = shutdown_rx.await; });
Ok(WebListener {
server,
abort_handle,
shutdown_tx: RefCell::new(Some(shutdown_tx)),
server: Box::new(server),
})
}
}
impl Drop for WebListener {
fn drop(&mut self) {
self.abort_handle.abort();
self.server.await;
let _ = self.shutdown_tx.take().map(|tx| { tx.send(()); });
}
}