Flesh out HTTP API, break into functions to make code more readable.

This commit is contained in:
Adam Ierymenko 2021-03-24 00:23:25 -04:00
parent ce5edba44d
commit 5be66eda2b
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
4 changed files with 122 additions and 85 deletions

36
service/src/api.rs Normal file
View file

@ -0,0 +1,36 @@
/*
* Copyright (c)2013-2021 ZeroTier, Inc.
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file in the project's root directory.
*
* Change Date: 2026-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2.0 of the Apache License.
*/
/****/
use crate::service::Service;
use hyper::{Request, Body, StatusCode, Response, Method};
pub(crate) fn status(service: Service, req: Request<Body>) -> (StatusCode, Body) {
if req.method() == Method::GET {
let status = service.status();
if status.is_none() {
(StatusCode::SERVICE_UNAVAILABLE, Body::from("node shutdown in progress"))
} else {
(StatusCode::OK, Body::from(serde_json::to_string(status.as_ref().unwrap()).unwrap()))
}
} else {
(StatusCode::METHOD_NOT_ALLOWED, Body::from("/status allows method(s): GET"))
}
}
pub(crate) fn peer(service: Service, req: Request<Body>) -> (StatusCode, Body) {
(StatusCode::NOT_IMPLEMENTED, Body::from(""))
}
pub(crate) fn network(service: Service, req: Request<Body>) -> (StatusCode, Body) {
(StatusCode::NOT_IMPLEMENTED, Body::from(""))
}

View file

@ -15,21 +15,17 @@ use std::cell::RefCell;
use std::convert::Infallible;
use std::net::SocketAddr;
use hyper::{Body, Request, Response};
use hyper::{Body, Request, Response, StatusCode};
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use tokio::task::JoinHandle;
use crate::service::Service;
use crate::api;
#[cfg(target_os = "linux")]
use std::os::unix::io::AsRawFd;
/// Handles API dispatch and other HTTP handler stuff.
async fn http_handler(service: Service, req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new("Hello, World".into()))
}
/// Listener for http connections to the API or for TCP P2P.
/// Dropping a listener initiates shutdown of the background hyper Server instance,
/// but it might not shut down instantly as this occurs asynchronously.
@ -42,7 +38,7 @@ pub(crate) struct HttpListener {
impl HttpListener {
/// Create a new "background" TCP WebListener using the current tokio reactor async runtime.
pub async fn new(_device_name: &str, address: SocketAddr, service: &Service) -> Result<HttpListener, Box<dyn std::error::Error>> {
let listener = if addr.is_ipv4() {
let listener = if address.is_ipv4() {
let listener = socket2::Socket::new(socket2::Domain::ipv4(), socket2::Type::stream(), Some(socket2::Protocol::tcp()));
if listener.is_err() {
return Err(Box::new(listener.err().unwrap()));
@ -100,7 +96,17 @@ impl HttpListener {
Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
let service = service.clone();
async move {
http_handler(service, req).await
let req_path = req.uri().path();
let (status, body) = if req_path == "/status" {
api::status(service, req)
} else if req_path.starts_with("/peer") {
api::peer(service, req)
} else if req_path.starts_with("/network") {
api::network(service, req)
} else {
(StatusCode::NOT_FOUND, "not found")
};
Ok(Response::builder().header("Content-Type", "application/json").status(status).body(body).unwrap())
}
}))
}

View file

@ -11,6 +11,7 @@
*/
/****/
mod api;
mod commands;
mod fastudpsocket;
mod localconfig;

View file

@ -22,6 +22,7 @@ use std::time::Duration;
use zerotier_core::*;
use zerotier_core::trace::{TraceEvent, TraceEventLayer};
use futures::StreamExt;
use serde::{Serialize, Deserialize};
use crate::fastudpsocket::*;
use crate::getifaddrs;
@ -39,10 +40,14 @@ const CONFIG_CHECK_INTERVAL: i64 = 5000;
/// ServiceStatus is the object returned by the API /status endpoint
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ServiceStatus {
#[serde(rename = "objectType")]
pub object_type: &'static str,
pub address: Address,
pub clock: i64,
#[serde(rename = "startTime")]
pub start_time: i64,
pub uptime: i64,
pub config: Arc<LocalConfig>,
pub config: LocalConfig,
pub online: bool,
#[serde(rename = "publicIdentity")]
pub public_identity: Identity,
@ -67,7 +72,8 @@ struct ServiceIntl {
interrupt: Mutex<futures::channel::mpsc::Sender<()>>,
local_config: Mutex<Arc<LocalConfig>>,
store: Arc<Store>,
starup_time: i64,
startup_time: i64,
startup_time_monotonic: i64,
run: AtomicBool,
online: AtomicBool,
}
@ -196,8 +202,8 @@ impl Service {
self._node.upgrade()
}
pub fn store(&self) -> &Arc<Store> {
&self.intl.store
pub fn store(&self) -> Arc<Store> {
self.intl.store.clone()
}
pub fn online(&self) -> bool {
@ -209,19 +215,17 @@ impl Service {
let _ = self.intl.interrupt.lock().unwrap().try_send(());
}
pub fn uptime(&self) -> i64 {
ms_since_epoch() - self.intl.starup_time
}
/// Get service status for API, or None if a shutdown is in progress.
pub fn status(&self) -> Option<ServiceStatus> {
let ver = zerotier_core::version();
self.node().map(|node| {
ServiceStatus {
object_type: "status",
address: node.address(),
clock: ms_since_epoch(),
uptime: self.uptime(),
config: self.local_config(),
start_time: self.intl.startup_time,
uptime: ms_monotonic() - self.intl.startup_time_monotonic,
config: (*self.local_config()).clone(),
online: self.online(),
public_identity: node.identity(),
version: format!("{}.{}.{}", ver.0, ver.1, ver.2),
@ -244,8 +248,8 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
let mut process_exit_value: i32 = 0;
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket> = BTreeMap::new();
let mut web_listeners: BTreeMap<InetAddress, HttpListener> = BTreeMap::new();
let mut local_web_listeners: (Option<HttpListener>, Option<HttpListener>) = (None, None); // IPv4, IPv6
let mut http_listeners: BTreeMap<InetAddress, HttpListener> = BTreeMap::new();
let mut loopback_http_listeners: (Option<HttpListener>, Option<HttpListener>) = (None, None); // 127.0.0.1, ::1
let (interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1);
let mut service = Service {
@ -257,13 +261,14 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
interrupt: Mutex::new(interrupt_tx),
local_config: Mutex::new(local_config),
store: store.clone(),
starup_time: ms_monotonic(),
startup_time: ms_since_epoch(),
startup_time_monotonic: ms_monotonic(),
run: AtomicBool::new(true),
online: AtomicBool::new(false),
}),
};
let node = Node::new(service.clone(), ms_since_epoch());
let node = Node::new(service.clone(), ms_monotonic());
if node.is_err() {
log.fatal(format!("error initializing node: {}", node.err().unwrap().to_str()));
return 1;
@ -275,18 +280,17 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
let mut local_config = service.local_config();
let mut now: i64 = ms_since_epoch();
let mut now: i64 = ms_monotonic();
let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL;
let mut last_checked_config: i64 = 0;
while service.intl.run.load(Ordering::Relaxed) {
let loop_start = ms_since_epoch();
let loop_delay_start = ms_monotonic();
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(loop_delay as u64)) => {
now = ms_since_epoch();
let actual_delay = now - loop_start;
now = ms_monotonic();
let actual_delay = now - loop_delay_start;
if actual_delay > ((loop_delay as i64) * 4_i64) {
l!(log, "likely sleep/wake detected due to excess delay, reestablishing links...");
l!(log, "likely sleep/wake detected due to excessive loop delay, cycling links...");
// TODO: handle likely sleep/wake or other system interruption
}
},
@ -295,7 +299,7 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
if !service.intl.run.load(Ordering::Relaxed) {
break;
}
now = ms_since_epoch();
now = ms_monotonic();
},
_ = tokio::signal::ctrl_c() => {
l!(log, "exit signal received, shutting down...");
@ -316,8 +320,8 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
let next_local_config = service.local_config();
if local_config.settings.primary_port != next_local_config.settings.primary_port {
local_web_listeners.0 = None;
local_web_listeners.1 = None;
loopback_http_listeners.0 = None;
loopback_http_listeners.1 = None;
bindings_changed = true;
}
if local_config.settings.log.max_size != next_local_config.settings.log.max_size {
@ -356,27 +360,22 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
}
});
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) {
udp_sockets_to_close.push(sock.0.clone());
}
}
for k in udp_sockets_to_close.iter() {
l!(log, "unbinding UDP socket at {}", k.to_string());
udp_sockets.remove(k);
}
drop(udp_sockets_to_close);
// TODO: need to also inform the core about these IPs...
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let _ = FastUDPSocket::new(addr.1.as_str(), addr.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
for k in udp_sockets.keys().filter_map(|a| if system_addrs.contains_key(a) { None } else { Some(a.clone()) }).collect::<Vec<InetAddress>>().iter() {
l!(log, "unbinding UDP socket at {} (address no longer exists on system or port has changed)", k.to_string());
udp_sockets.remove(k);
bindings_changed = true;
}
for a in system_addrs.iter() {
if !udp_sockets.contains_key(a.0) {
let _ = FastUDPSocket::new(a.1.as_str(), a.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
// TODO: incoming packet handler
}).map_or_else(|e| {
l!(log, "error binding UDP socket to {}: {}", addr.0.to_string(), e.to_string());
l!(log, "error binding UDP socket to {}: {}", a.0.to_string(), e.to_string());
}, |s| {
l!(log, "bound UDP socket at {}", addr.0.to_string());
udp_sockets.insert(addr.0.clone(), s);
l!(log, "bound UDP socket at {}", a.0.to_string());
udp_sockets.insert(a.0.clone(), s);
bindings_changed = true;
});
}
@ -413,75 +412,70 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
}
}
let mut web_listeners_to_close: Vec<InetAddress> = Vec::new();
for l in web_listeners.iter() {
if !system_addrs.contains_key(l.0) {
web_listeners_to_close.push(l.0.clone());
bindings_changed = true;
}
for k in http_listeners.keys().filter_map(|a| if system_addrs.contains_key(a) { None } else { Some(a.clone()) }).collect::<Vec<InetAddress>>().iter() {
l!(log, "closing HTTP listener at {} (address no longer exists on system or port has changed)", k.to_string());
http_listeners.remove(k);
bindings_changed = true;
}
for k in web_listeners_to_close.iter() {
l!(log, "closing HTTP listener at {}", k.to_string());
web_listeners.remove(k);
}
drop(web_listeners_to_close);
for addr in system_addrs.iter() {
if addr.0.port() == local_config.settings.primary_port && !web_listeners.contains_key(addr.0) {
let sa = addr.0.to_socketaddr();
for a in system_addrs.iter() {
if !http_listeners.contains_key(a.0) {
let sa = a.0.to_socketaddr();
if sa.is_some() {
let wl = HttpListener::new(addr.1.as_str(), sa.unwrap(), &service).await.map_or_else(|e| {
l!(log, "error creating HTTP listener at {}: {}", addr.0.to_string(), e.to_string());
let wl = HttpListener::new(a.1.as_str(), sa.unwrap(), &service).await.map_or_else(|e| {
l!(log, "error creating HTTP listener at {}: {}", a.0.to_string(), e.to_string());
}, |l| {
l!(log, "created HTTP listener at {}", addr.0.to_string());
web_listeners.insert(addr.0.clone(), l);
l!(log, "created HTTP listener at {}", a.0.to_string());
http_listeners.insert(a.0.clone(), l);
bindings_changed = true;
});
}
}
}
if local_web_listeners.0.is_none() {
if loopback_http_listeners.0.is_none() {
let _ = HttpListener::new(loopback_dev_name.as_str(), SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), local_config.settings.primary_port), &service).await.map(|wl| {
local_web_listeners.0 = Some(wl);
loopback_http_listeners.0 = Some(wl);
let _ = store.write_uri(format!("http://127.0.0.1:{}/", local_config.settings.primary_port).as_str());
bindings_changed = true;
});
}
if local_web_listeners.1.is_none() {
if loopback_http_listeners.1.is_none() {
let _ = HttpListener::new(loopback_dev_name.as_str(), SocketAddr::new(IpAddr::from(Ipv6Addr::LOCALHOST), local_config.settings.primary_port), &service).await.map(|wl| {
local_web_listeners.1 = Some(wl);
if local_web_listeners.0.is_none() {
loopback_http_listeners.1 = Some(wl);
if loopback_http_listeners.0.is_none() {
let _ = store.write_uri(format!("http://[::1]:{}/", local_config.settings.primary_port).as_str());
}
bindings_changed = true;
});
}
if local_web_listeners.0.is_none() && local_web_listeners.1.is_none() {
l!(log, "error creating HTTP listener on 127.0.0.1/{} or ::1/{}", local_config.settings.primary_port, local_config.settings.primary_port);
if loopback_http_listeners.0.is_none() && loopback_http_listeners.1.is_none() {
// TODO: port hunting
l!(log, "CRITICAL: unable to create HTTP endpoint on 127.0.0.1/{} or ::1/{}, service control API will not work!", local_config.settings.primary_port, local_config.settings.primary_port);
}
if bindings_changed {
service.intl.udp_local_endpoints.lock().as_mut().map(|udp_local_endpoints| {
{
let mut udp_local_endpoints = service.intl.udp_local_endpoints.lock().unwrap();
udp_local_endpoints.clear();
for ep in udp_sockets.iter() {
udp_local_endpoints.push(ep.0.clone());
}
udp_local_endpoints.sort();
});
service.intl.http_local_endpoints.lock().as_mut().map(|http_local_endpoints| {
}
{
let mut http_local_endpoints = service.intl.http_local_endpoints.lock().unwrap();
http_local_endpoints.clear();
for ep in web_listeners.iter() {
for ep in http_listeners.iter() {
http_local_endpoints.push(ep.0.clone());
}
if local_web_listeners.0.is_some() {
http_local_endpoints.push(InetAddress::new_ipv4_loopback(local_web_listeners.0.unwrap().address.port()));
if loopback_http_listeners.0.is_some() {
http_local_endpoints.push(InetAddress::new_ipv4_loopback(loopback_http_listeners.0.as_ref().unwrap().address.port()));
}
if local_web_listeners.1.is_some() {
http_local_endpoints.push(InetAddress::new_ipv6_loopback(local_web_listeners.1.unwrap().address.port()));
if loopback_http_listeners.1.is_some() {
http_local_endpoints.push(InetAddress::new_ipv6_loopback(loopback_http_listeners.1.as_ref().unwrap().address.port()));
}
http_local_endpoints.sort();
});
}
}
}
@ -492,8 +486,8 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
l!(log, "shutting down normally.");
drop(udp_sockets);
drop(web_listeners);
drop(local_web_listeners);
drop(http_listeners);
drop(loopback_http_listeners);
drop(node);
drop(service);