Some efficiency improvements.

This commit is contained in:
Adam Ierymenko 2021-03-26 18:43:50 -04:00
parent 2d072de515
commit 4a60ae5736
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
6 changed files with 165 additions and 163 deletions

View file

@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize};
use crate::*;
use crate::capi as ztcore;
use std::marker::PhantomData;
pub const NODE_BACKGROUND_TASKS_MAX_INTERVAL: i64 = 200;
@ -47,7 +48,7 @@ pub enum StateObjectType {
Peer = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_PEER as isize,
NetworkConfig = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_NETWORK_CONFIG as isize,
TrustStore = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_TRUST_STORE as isize,
Certificate = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_CERT as isize
Certificate = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_CERT as isize,
}
/// The status of a ZeroTier node.
@ -91,24 +92,32 @@ pub trait NodeEventHandler<N: Sync + Send + 'static> {
fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option<InetAddress>;
}
pub struct NodeIntl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> {
pub struct NodeIntl<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>> {
event_handler: T,
capi: *mut ztcore::ZT_Node,
now: PortableAtomicI64,
networks_by_id: Mutex<HashMap<u64, Pin<Box<N>>>>,
event_handler_placeholder: PhantomData<H>,
}
/// An instance of the ZeroTier core.
/// This is templated on the actual implementations of things to avoid "dyn" indirect
/// call overhead. This is a high performance networking thingy so we care about cycles
/// where possible.
pub struct Node<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> {
intl: Pin<Box<NodeIntl<T, N>>>,
///
/// The event handler is templated as AsRef<H> where H is the concrete type of the actual
/// handler. This allows the handler to be an Arc<>, Box<>, or similar. We do this instead
/// of templating it on "dyn NodeEventHandler" because we want the types to all be concrete
/// to avoid dynamic call overhead. Unfortunately it makes the types here a tad more
/// verbose.
///
/// In most cases you will want the handler to be an Arc<> anyway since most uses will be
/// multithreaded or async.
pub struct Node<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>> {
intl: Pin<Box<NodeIntl<T, N, H>>>,
event_handler_placeholder: PhantomData<H>,
}
/********************************************************************************************************************/
extern "C" fn zt_virtual_network_config_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
extern "C" fn zt_virtual_network_config_function<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -118,17 +127,17 @@ extern "C" fn zt_virtual_network_config_function<T: NodeEventHandler<N> + Sync +
conf: *const ztcore::ZT_VirtualNetworkConfig,
) {
let _ = VirtualNetworkConfigOperation::from_i32(op as i32).map(|op| {
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) };
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N, H>>()) };
if conf.is_null() {
n.event_handler.virtual_network_config(NetworkId(nwid), unsafe { &*(nptr.cast::<N>()) }, op, None);
n.event_handler.as_ref().virtual_network_config(NetworkId(nwid), unsafe { &*(nptr.cast::<N>()) }, op, None);
} else {
let conf2 = unsafe { VirtualNetworkConfig::new_from_capi(&*conf) };
n.event_handler.virtual_network_config(NetworkId(nwid), unsafe { &*(nptr.cast::<N>()) }, op, Some(&conf2));
n.event_handler.as_ref().virtual_network_config(NetworkId(nwid), unsafe { &*(nptr.cast::<N>()) }, op, Some(&conf2));
}
});
}
extern "C" fn zt_virtual_network_frame_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
extern "C" fn zt_virtual_network_frame_function<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -142,7 +151,7 @@ extern "C" fn zt_virtual_network_frame_function<T: NodeEventHandler<N> + Sync +
data_size: c_uint,
) {
if !nptr.is_null() {
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.virtual_network_frame(
unsafe { &*(uptr.cast::<NodeIntl<T, N, H>>()) }.event_handler.as_ref().virtual_network_frame(
NetworkId(nwid),
unsafe { &*(nptr.cast::<N>()) },
MAC(source_mac),
@ -153,25 +162,25 @@ extern "C" fn zt_virtual_network_frame_function<T: NodeEventHandler<N> + Sync +
}
}
extern "C" fn zt_event_callback<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
extern "C" fn zt_event_callback<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
ev: ztcore::ZT_Event,
data: *const c_void,
data_size: c_uint
data_size: c_uint,
) {
let _ = Event::from_i32(ev as i32).map(|ev: Event| {
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) };
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N, H>>()) };
if data.is_null() {
n.event_handler.event(ev, &EMPTY_BYTE_ARRAY);
n.event_handler.as_ref().event(ev, &EMPTY_BYTE_ARRAY);
} else {
n.event_handler.event(ev, unsafe { &*slice_from_raw_parts(data.cast::<u8>(), data_size as usize) });
n.event_handler.as_ref().event(ev, unsafe { &*slice_from_raw_parts(data.cast::<u8>(), data_size as usize) });
}
});
}
extern "C" fn zt_state_put_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
extern "C" fn zt_state_put_function<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -182,12 +191,12 @@ extern "C" fn zt_state_put_function<T: NodeEventHandler<N> + Sync + Send + Clone
obj_data_len: c_int,
) {
let _ = StateObjectType::from_i32(obj_type as i32).map(|obj_type| {
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) };
let _ = n.event_handler.state_put(obj_type, unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }, unsafe { &*slice_from_raw_parts(obj_data.cast::<u8>(), obj_data_len as usize) });
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N, H>>()) };
let _ = n.event_handler.as_ref().state_put(obj_type, unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }, unsafe { &*slice_from_raw_parts(obj_data.cast::<u8>(), obj_data_len as usize) });
});
}
extern "C" fn zt_state_get_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
extern "C" fn zt_state_get_function<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -207,7 +216,7 @@ extern "C" fn zt_state_get_function<T: NodeEventHandler<N> + Sync + Send + Clone
StateObjectType::from_i32(obj_type as i32).map_or_else(|| {
-1 as c_int
}, |obj_type| {
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.state_get(obj_type, unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }).map_or_else(|_| {
unsafe { &*(uptr.cast::<NodeIntl<T, N, H>>()) }.event_handler.as_ref().state_get(obj_type, unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }).map_or_else(|_| {
-1 as c_int
}, |obj_data_result| {
let obj_data_len = obj_data_result.len() as c_int;
@ -230,7 +239,7 @@ extern "C" fn zt_state_get_function<T: NodeEventHandler<N> + Sync + Send + Clone
}
}
extern "C" fn zt_wire_packet_send_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
extern "C" fn zt_wire_packet_send_function<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -240,10 +249,10 @@ extern "C" fn zt_wire_packet_send_function<T: NodeEventHandler<N> + Sync + Send
data_size: c_uint,
packet_ttl: c_uint,
) -> c_int {
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.wire_packet_send(local_socket, InetAddress::transmute_capi(unsafe { &*sock_addr }), unsafe { &*slice_from_raw_parts(data.cast::<u8>(), data_size as usize) }, packet_ttl as u32) as c_int
unsafe { &*(uptr.cast::<NodeIntl<T, N, H>>()) }.event_handler.as_ref().wire_packet_send(local_socket, InetAddress::transmute_capi(unsafe { &*sock_addr }), unsafe { &*slice_from_raw_parts(data.cast::<u8>(), data_size as usize) }, packet_ttl as u32) as c_int
}
extern "C" fn zt_path_check_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
extern "C" fn zt_path_check_function<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -253,10 +262,10 @@ extern "C" fn zt_path_check_function<T: NodeEventHandler<N> + Sync + Send + Clon
sock_addr: *const ztcore::ZT_InetAddress,
) -> c_int {
let id = Identity::new_from_capi(identity, false);
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.path_check(Address(address), &id, local_socket, InetAddress::transmute_capi(unsafe{ &*sock_addr })) as c_int
unsafe { &*(uptr.cast::<NodeIntl<T, N, H>>()) }.event_handler.as_ref().path_check(Address(address), &id, local_socket, InetAddress::transmute_capi(unsafe { &*sock_addr })) as c_int
}
extern "C" fn zt_path_lookup_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
extern "C" fn zt_path_lookup_function<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -280,7 +289,7 @@ extern "C" fn zt_path_lookup_function<T: NodeEventHandler<N> + Sync + Send + Clo
}
let id = Identity::new_from_capi(identity, false);
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.path_lookup(Address(address), &id, sock_family2).map_or_else(|| {
unsafe { &*(uptr.cast::<NodeIntl<T, N, H>>()) }.event_handler.as_ref().path_lookup(Address(address), &id, sock_family2).map_or_else(|| {
0 as c_int
}, |result| {
let result_ptr = &result as *const InetAddress;
@ -293,31 +302,33 @@ extern "C" fn zt_path_lookup_function<T: NodeEventHandler<N> + Sync + Send + Clo
/********************************************************************************************************************/
impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Node<T, N> {
impl<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>> Node<T, N, H> {
/// Create a new Node with a given event handler.
#[allow(unused_mut)]
pub fn new(event_handler: T, now: i64) -> Result<Node<T, N>, ResultCode> {
pub fn new(event_handler: T, now: i64) -> Result<Node<T, N, H>, ResultCode> {
let mut n = Node {
intl: Box::pin(NodeIntl {
event_handler: event_handler.clone(),
capi: null_mut(),
now: PortableAtomicI64::new(now),
networks_by_id: Mutex::new(HashMap::new())
networks_by_id: Mutex::new(HashMap::new()),
event_handler_placeholder: PhantomData::default(),
}),
event_handler_placeholder: PhantomData::default(),
};
let rc = unsafe {
let callbacks = ztcore::ZT_Node_Callbacks {
statePutFunction: transmute(zt_state_put_function::<T, N> as *const ()),
stateGetFunction: transmute(zt_state_get_function::<T, N> as *const ()),
wirePacketSendFunction: transmute(zt_wire_packet_send_function::<T, N> as *const ()),
virtualNetworkFrameFunction: transmute(zt_virtual_network_frame_function::<T, N> as *const ()),
virtualNetworkConfigFunction: transmute(zt_virtual_network_config_function::<T, N> as *const ()),
eventCallback: transmute(zt_event_callback::<T, N> as *const ()),
pathCheckFunction: transmute(zt_path_check_function::<T, N> as *const ()),
pathLookupFunction: transmute(zt_path_lookup_function::<T, N> as *const ()),
statePutFunction: transmute(zt_state_put_function::<T, N, H> as *const ()),
stateGetFunction: transmute(zt_state_get_function::<T, N, H> as *const ()),
wirePacketSendFunction: transmute(zt_wire_packet_send_function::<T, N, H> as *const ()),
virtualNetworkFrameFunction: transmute(zt_virtual_network_frame_function::<T, N, H> as *const ()),
virtualNetworkConfigFunction: transmute(zt_virtual_network_config_function::<T, N, H> as *const ()),
eventCallback: transmute(zt_event_callback::<T, N, H> as *const ()),
pathCheckFunction: transmute(zt_path_check_function::<T, N, H> as *const ()),
pathLookupFunction: transmute(zt_path_lookup_function::<T, N, H> as *const ()),
};
ztcore::ZT_Node_new(transmute(&(n.intl.capi) as *const *mut ztcore::ZT_Node), transmute(&*n.intl as *const NodeIntl<T, N>), null_mut(), &callbacks, now)
ztcore::ZT_Node_new(transmute(&(n.intl.capi) as *const *mut ztcore::ZT_Node), transmute(&*n.intl as *const NodeIntl<T, N, H>), null_mut(), &callbacks, now)
};
if rc == 0 {
@ -505,19 +516,19 @@ impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 's
}
}
impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + Clone + 'static> Node<T, N> {
impl<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + Clone + 'static, H: NodeEventHandler<N>> Node<T, N, H> {
/// Get a copy of this network's associated object.
/// This is only available if N implements Clone.
pub fn network(&self, nwid: NetworkId) -> Option<N> {
self.intl.networks_by_id.lock().unwrap().get(&nwid.0).map_or(None, |nw| { Some(nw.as_ref().get_ref().clone()) })
self.intl.networks_by_id.lock().unwrap().get(&nwid.0).map_or(None, |nw| Some((**nw).clone()))
}
}
unsafe impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Sync for Node<T, N> {}
unsafe impl<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>> Sync for Node<T, N, H> {}
unsafe impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Send for Node<T, N> {}
unsafe impl<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>> Send for Node<T, N, H> {}
impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Drop for Node<T, N> {
impl<T: AsRef<H> + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler<N>> Drop for Node<T, N, H> {
fn drop(&mut self) {
unsafe {
ztcore::ZT_Node_delete(self.intl.capi, null_mut());

View file

@ -23,7 +23,7 @@ num-derive = "0"
hyper = { version = "0", features = ["http1", "runtime", "server", "client", "tcp", "stream"] }
socket2 = { version = "0", features = ["reuseport", "unix", "pair"] }
dialoguer = "0"
digest_auth = "0"
digest_auth = "0.2.4"
colored = "2"
[target."cfg(windows)".dependencies]

View file

@ -11,10 +11,13 @@
*/
/****/
use crate::service::Service;
use std::sync::Arc;
use hyper::{Request, Body, StatusCode, Method};
pub(crate) fn status(service: Service, req: Request<Body>) -> (StatusCode, Body) {
use crate::service::Service;
pub(crate) fn status(service: Arc<Service>, req: Request<Body>) -> (StatusCode, Body) {
if req.method() == Method::GET {
service.status().map_or_else(|| {
(StatusCode::SERVICE_UNAVAILABLE, Body::from("node shutdown in progress"))
@ -26,7 +29,7 @@ pub(crate) fn status(service: Service, req: Request<Body>) -> (StatusCode, Body)
}
}
pub(crate) fn config(service: Service, req: Request<Body>) -> (StatusCode, Body) {
pub(crate) fn config(service: Arc<Service>, req: Request<Body>) -> (StatusCode, Body) {
let config = service.local_config();
if req.method() == Method::POST || req.method() == Method::PUT {
// TODO: diff config
@ -34,10 +37,10 @@ pub(crate) fn config(service: Service, req: Request<Body>) -> (StatusCode, Body)
(StatusCode::OK, Body::from(serde_json::to_string(config.as_ref()).unwrap()))
}
pub(crate) fn peer(service: Service, req: Request<Body>) -> (StatusCode, Body) {
pub(crate) fn peer(service: Arc<Service>, req: Request<Body>) -> (StatusCode, Body) {
(StatusCode::NOT_IMPLEMENTED, Body::from(""))
}
pub(crate) fn network(service: Service, req: Request<Body>) -> (StatusCode, Body) {
pub(crate) fn network(service: Arc<Service>, req: Request<Body>) -> (StatusCode, Body) {
(StatusCode::NOT_IMPLEMENTED, Body::from(""))
}

View file

@ -93,10 +93,11 @@ pub(crate) fn run_command<
}
/// Send a request to the API with support for HTTP digest authentication.
/// The data option is for PUT and POST requests. For GET it is ignored. Errors indicate total
/// failure such as connection refused. A returned result must still have its status checked.
/// Note that if authorization is required and the auth token doesn't work, IncorrectAuthTokenError
/// is returned as an error instead of a 401 response object.
/// The data option is for PUT and POST requests. For GET it is ignored. This will try to
/// authenticate if a WWW-Authorized header is sent in an unauthorized response. If authentication
/// with auth_token fails, IncorrectAuthTokenError is returned as an error. If the request is
/// unauthorizred and no WWW-Authorired header is present, a normal response is returned. The
/// caller must always check the response status code.
pub(crate) async fn request(client: &HttpClient, method: Method, uri: Uri, data: Option<&[u8]>, auth_token: &str) -> Result<Response<Body>, Box<dyn Error>> {
let body: Vec<u8> = data.map_or_else(|| Vec::new(), |data| data.to_vec());
@ -113,7 +114,7 @@ pub(crate) async fn request(client: &HttpClient, method: Method, uri: Uri, data:
if res.status() == StatusCode::UNAUTHORIZED {
let auth = res.headers().get(hyper::header::WWW_AUTHENTICATE);
if auth.is_none() {
return Err(Box::new(UnexpectedStatusCodeError(StatusCode::UNAUTHORIZED, "host returned 401 but no WWW-Authenticate header found")))
return Ok(res);
}
let auth = auth.unwrap().to_str();
if auth.is_err() {

View file

@ -11,14 +11,16 @@
*/
/****/
use std::cell::RefCell;
use std::cell::Cell;
use std::convert::Infallible;
use std::sync::Arc;
use std::net::SocketAddr;
use hyper::{Body, Request, Response, StatusCode, Method};
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use tokio::task::JoinHandle;
use digest_auth::{AuthContext, AuthorizationHeader, Charset, WwwAuthenticateHeader};
use crate::service::Service;
use crate::api;
@ -26,7 +28,6 @@ use crate::utils::{decrypt_http_auth_nonce, ms_since_epoch, create_http_auth_non
#[cfg(target_os = "linux")]
use std::os::unix::io::AsRawFd;
use digest_auth::{AuthContext, AuthorizationHeader, Charset, WwwAuthenticateHeader};
const HTTP_MAX_NONCE_AGE_MS: i64 = 30000;
@ -35,17 +36,19 @@ const HTTP_MAX_NONCE_AGE_MS: i64 = 30000;
/// but it might not shut down instantly as this occurs asynchronously.
pub(crate) struct HttpListener {
pub address: SocketAddr,
shutdown_tx: RefCell<Option<tokio::sync::oneshot::Sender<()>>>,
shutdown_tx: Cell<Option<tokio::sync::oneshot::Sender<()>>>,
server: JoinHandle<hyper::Result<()>>,
}
async fn http_handler(service: Service, req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn http_handler(service: Arc<Service>, req: Request<Body>) -> Result<Response<Body>, Infallible> {
let req_path = req.uri().path();
let mut authorized = false;
let mut stale = false;
let auth_token = service.store().auth_token(false);
if auth_token.is_err() {
return Ok::<Response<Body>, Infallible>(Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from("authorization token unreadable")).unwrap());
return Ok(Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from("authorization token unreadable")).unwrap());
}
let auth_context = AuthContext::new_with_method("", auth_token.unwrap(), req.uri().to_string(), None::<&[u8]>, match *req.method() {
Method::GET => digest_auth::HttpMethod::GET,
@ -53,14 +56,16 @@ async fn http_handler(service: Service, req: Request<Body>) -> Result<Response<B
Method::HEAD => digest_auth::HttpMethod::HEAD,
Method::PUT => digest_auth::HttpMethod::OTHER("PUT"),
Method::DELETE => digest_auth::HttpMethod::OTHER("DELETE"),
_ => digest_auth::HttpMethod::OTHER(""),
_ => {
return Ok(Response::builder().status(StatusCode::METHOD_NOT_ALLOWED).body(Body::from("unrecognized method")).unwrap());
}
});
let auth_header = req.headers().get(hyper::header::AUTHORIZATION);
if auth_header.is_some() {
let auth_header = AuthorizationHeader::parse(auth_header.unwrap().to_str().unwrap_or(""));
if auth_header.is_err() {
return Ok::<Response<Body>, Infallible>(Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(format!("invalid authorization header: {}", auth_header.err().unwrap().to_string()))).unwrap());
return Ok(Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(format!("invalid authorization header: {}", auth_header.err().unwrap().to_string()))).unwrap());
}
let auth_header = auth_header.unwrap();
@ -88,11 +93,8 @@ async fn http_handler(service: Service, req: Request<Body>) -> Result<Response<B
}
if authorized {
let req_path = req.uri().path();
let (status, body) =
if req_path == "/_zt" {
(StatusCode::NOT_IMPLEMENTED, Body::from("not implemented yet"))
} else if req_path == "/status" {
if req_path == "/status" {
api::status(service, req)
} else if req_path == "/config" {
api::config(service, req)
@ -107,9 +109,9 @@ async fn http_handler(service: Service, req: Request<Body>) -> Result<Response<B
} else {
(StatusCode::NOT_FOUND, Body::from("not found"))
};
Ok::<Response<Body>, Infallible>(Response::builder().header("Content-Type", "application/json").status(status).body(body).unwrap())
Ok(Response::builder().header("Content-Type", "application/json").status(status).body(body).unwrap())
} else {
Ok::<Response<Body>, Infallible>(Response::builder().header(hyper::header::WWW_AUTHENTICATE, WwwAuthenticateHeader {
Ok(Response::builder().header(hyper::header::WWW_AUTHENTICATE, WwwAuthenticateHeader {
domain: None,
realm: "zerotier-service-api".to_owned(),
nonce: create_http_auth_nonce(ms_since_epoch()),
@ -126,7 +128,7 @@ async fn http_handler(service: Service, req: Request<Body>) -> Result<Response<B
impl HttpListener {
/// Create a new "background" TCP WebListener using the current tokio reactor async runtime.
pub async fn new(_device_name: &str, address: SocketAddr, service: &Service) -> Result<HttpListener, Box<dyn std::error::Error>> {
pub async fn new(_device_name: &str, address: SocketAddr, service: &Arc<Service>) -> Result<HttpListener, Box<dyn std::error::Error>> {
let listener = if address.is_ipv4() {
let listener = socket2::Socket::new(socket2::Domain::ipv4(), socket2::Type::stream(), Some(socket2::Protocol::tcp()));
if listener.is_err() {
@ -188,7 +190,7 @@ impl HttpListener {
Ok(HttpListener {
address,
shutdown_tx: RefCell::new(Some(shutdown_tx)),
shutdown_tx: Cell::new(Some(shutdown_tx)),
server,
})
}

View file

@ -11,6 +11,7 @@
*/
/****/
use std::cell::Cell;
use std::collections::BTreeMap;
use std::net::{SocketAddr, Ipv4Addr, IpAddr, Ipv6Addr};
use std::sync::{Arc, Mutex, Weak};
@ -64,7 +65,10 @@ pub struct ServiceStatus {
pub http_local_endpoints: Vec<InetAddress>,
}
struct ServiceIntl {
/// Core ZeroTier service, which is sort of just a container for all the things.
pub(crate) struct Service {
pub(crate) log: Log,
_node: Cell<Weak<Node<Arc<Service>, Network, Service>>>, // never modified after node is created
udp_local_endpoints: Mutex<Vec<InetAddress>>,
http_local_endpoints: Mutex<Vec<InetAddress>>,
interrupt: Mutex<futures::channel::mpsc::Sender<()>>,
@ -76,18 +80,6 @@ struct ServiceIntl {
online: AtomicBool,
}
unsafe impl Send for ServiceIntl {}
unsafe impl Sync for ServiceIntl {}
/// Core ZeroTier service, which is sort of just a container for all the things.
#[derive(Clone)]
pub(crate) struct Service {
pub(crate) log: Arc<Log>,
_node: Weak<Node<Service, Network>>,
intl: Arc<ServiceIntl>,
}
impl NodeEventHandler<Network> for Service {
#[inline(always)]
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Network, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {}
@ -104,17 +96,17 @@ impl NodeEventHandler<Network> for Service {
Event::Down => {
d!(self.log, "node shutdown event received.");
self.intl.online.store(false, Ordering::Relaxed);
self.online.store(false, Ordering::Relaxed);
}
Event::Online => {
d!(self.log, "node is online.");
self.intl.online.store(true, Ordering::Relaxed);
self.online.store(true, Ordering::Relaxed);
}
Event::Offline => {
d!(self.log, "node is offline.");
self.intl.online.store(false, Ordering::Relaxed);
self.online.store(false, Ordering::Relaxed);
}
Event::Trace => {
@ -144,16 +136,16 @@ impl NodeEventHandler<Network> for Service {
#[inline(always)]
fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> {
if !obj_data.is_empty() {
self.intl.store.store_object(&obj_type, obj_id, obj_data)
self.store.store_object(&obj_type, obj_id, obj_data)
} else {
self.intl.store.erase_object(&obj_type, obj_id);
self.store.erase_object(&obj_type, obj_id);
Ok(())
}
}
#[inline(always)]
fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result<Vec<u8>> {
self.intl.store.load_object(&obj_type, obj_id)
self.store.load_object(&obj_type, obj_id)
}
#[inline(always)]
@ -185,33 +177,34 @@ impl NodeEventHandler<Network> for Service {
impl Service {
pub fn local_config(&self) -> Arc<LocalConfig> {
self.intl.local_config.lock().unwrap().clone()
self.local_config.lock().unwrap().clone()
}
pub fn set_local_config(&self, new_lc: LocalConfig) {
*(self.intl.local_config.lock().unwrap()) = Arc::new(new_lc);
*(self.local_config.lock().unwrap()) = Arc::new(new_lc);
}
/// Get the node running with this service.
/// This can return None if we are in the midst of shutdown. In this case
/// whatever operation is in progress should abort. None will never be
/// returned during normal operation.
pub fn node(&self) -> Option<Arc<Node<Service, Network>>> {
self._node.upgrade()
/// This can return None during shutdown because Service holds a weak
/// reference to Node to avoid circular Arc<> pointers. This will only
/// return None during shutdown, in which case whatever is happening
/// should abort as quietly as possible.
pub fn node(&self) -> Option<Arc<Node<Arc<Service>, Network, Service>>> {
unsafe { &*self._node.as_ptr() }.upgrade()
}
#[inline(always)]
pub fn store(&self) -> &Arc<Store> {
&self.intl.store
&self.store
}
pub fn online(&self) -> bool {
self.intl.online.load(Ordering::Relaxed)
self.online.load(Ordering::Relaxed)
}
pub fn shutdown(&self) {
self.intl.run.store(false, Ordering::Relaxed);
let _ = self.intl.interrupt.lock().unwrap().try_send(());
self.run.store(false, Ordering::Relaxed);
let _ = self.interrupt.lock().unwrap().try_send(());
}
/// Get service status for API, or None if a shutdown is in progress.
@ -222,8 +215,8 @@ impl Service {
object_type: "status".to_owned(),
address: node.address(),
clock: ms_since_epoch(),
start_time: self.intl.startup_time,
uptime: ms_monotonic() - self.intl.startup_time_monotonic,
start_time: self.startup_time,
uptime: ms_monotonic() - self.startup_time_monotonic,
config: (*self.local_config()).clone(),
online: self.online(),
public_identity: node.identity(),
@ -232,8 +225,8 @@ impl Service {
version_minor: ver.1,
version_revision: ver.2,
version_build: ver.3,
udp_local_endpoints: self.intl.udp_local_endpoints.lock().unwrap().clone(),
http_local_endpoints: self.intl.http_local_endpoints.lock().unwrap().clone(),
udp_local_endpoints: self.udp_local_endpoints.lock().unwrap().clone(),
http_local_endpoints: self.http_local_endpoints.lock().unwrap().clone(),
}
})
}
@ -243,7 +236,7 @@ unsafe impl Send for Service {}
unsafe impl Sync for Service {}
async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConfig>) -> i32 {
async fn run_async(store: Arc<Store>, local_config: Arc<LocalConfig>) -> i32 {
let mut process_exit_value: i32 = 0;
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket> = BTreeMap::new();
@ -251,58 +244,64 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
let mut loopback_http_listeners: (Option<HttpListener>, Option<HttpListener>) = (None, None); // 127.0.0.1, ::1
let (interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1);
let mut service = Service {
log: log.clone(),
_node: Weak::new(),
intl: Arc::new(ServiceIntl {
udp_local_endpoints: Mutex::new(Vec::new()),
http_local_endpoints: Mutex::new(Vec::new()),
interrupt: Mutex::new(interrupt_tx),
local_config: Mutex::new(local_config),
store: store.clone(),
startup_time: ms_since_epoch(),
startup_time_monotonic: ms_monotonic(),
run: AtomicBool::new(true),
online: AtomicBool::new(false),
}),
};
let service = Arc::new(Service {
log: Log::new(
if local_config.settings.log.path.as_ref().is_some() {
local_config.settings.log.path.as_ref().unwrap().as_str()
} else {
store.default_log_path.to_str().unwrap()
},
local_config.settings.log.max_size,
local_config.settings.log.stderr,
local_config.settings.log.debug,
"",
),
_node: Cell::new(Weak::new()),
udp_local_endpoints: Mutex::new(Vec::new()),
http_local_endpoints: Mutex::new(Vec::new()),
interrupt: Mutex::new(interrupt_tx),
local_config: Mutex::new(local_config),
store: store.clone(),
startup_time: ms_since_epoch(),
startup_time_monotonic: ms_monotonic(),
run: AtomicBool::new(true),
online: AtomicBool::new(false),
});
let node = Node::new(service.clone(), ms_monotonic());
if node.is_err() {
log.fatal(format!("error initializing node: {}", node.err().unwrap().to_str()));
service.log.fatal(format!("error initializing node: {}", node.err().unwrap().to_str()));
return 1;
}
let node = Arc::new(node.ok().unwrap());
service._node = Arc::downgrade(&node);
let service = service; // make immutable after setting node
service._node.replace(Arc::downgrade(&node));
let mut local_config = service.local_config();
let mut now: i64 = ms_monotonic();
let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL;
let mut last_checked_config: i64 = 0;
while service.intl.run.load(Ordering::Relaxed) {
while service.run.load(Ordering::Relaxed) {
let loop_delay_start = ms_monotonic();
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(loop_delay as u64)) => {
now = ms_monotonic();
let actual_delay = now - loop_delay_start;
if actual_delay > ((loop_delay as i64) * 4_i64) {
l!(log, "likely sleep/wake detected due to excessive loop delay, cycling links...");
l!(service.log, "likely sleep/wake detected due to excessive loop delay, cycling links...");
// TODO: handle likely sleep/wake or other system interruption
}
},
_ = interrupt_rx.next() => {
d!(log, "inner loop delay interrupted!");
if !service.intl.run.load(Ordering::Relaxed) {
d!(service.log, "inner loop delay interrupted!");
if !service.run.load(Ordering::Relaxed) {
break;
}
now = ms_monotonic();
},
_ = tokio::signal::ctrl_c() => {
l!(log, "exit signal received, shutting down...");
service.intl.run.store(false, Ordering::Relaxed);
l!(service.log, "exit signal received, shutting down...");
service.run.store(false, Ordering::Relaxed);
break;
},
}
@ -313,7 +312,7 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
let mut bindings_changed = false;
let _ = store.read_local_conf(true).map(|new_config| new_config.map(|new_config| {
d!(log, "local.conf changed on disk, reloading.");
d!(service.log, "local.conf changed on disk, reloading.");
service.set_local_config(new_config);
}));
@ -324,13 +323,13 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
bindings_changed = true;
}
if local_config.settings.log.max_size != next_local_config.settings.log.max_size {
log.set_max_size(next_local_config.settings.log.max_size);
service.log.set_max_size(next_local_config.settings.log.max_size);
}
if local_config.settings.log.stderr != next_local_config.settings.log.stderr {
log.set_log_to_stderr(next_local_config.settings.log.stderr);
service.log.set_log_to_stderr(next_local_config.settings.log.stderr);
}
if local_config.settings.log.debug != next_local_config.settings.log.debug {
log.set_debug(next_local_config.settings.log.debug);
service.log.set_debug(next_local_config.settings.log.debug);
}
local_config = next_local_config;
@ -362,7 +361,7 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
// TODO: need to also inform the core about these IPs...
for k in udp_sockets.keys().filter_map(|a| if system_addrs.contains_key(a) { None } else { Some(a.clone()) }).collect::<Vec<InetAddress>>().iter() {
l!(log, "unbinding UDP socket at {} (address no longer exists on system or port has changed)", k.to_string());
l!(service.log, "unbinding UDP socket at {} (address no longer exists on system or port has changed)", k.to_string());
udp_sockets.remove(k);
bindings_changed = true;
}
@ -371,9 +370,9 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
let _ = FastUDPSocket::new(a.1.as_str(), a.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
// TODO: incoming packet handler
}).map_or_else(|e| {
l!(log, "error binding UDP socket to {}: {}", a.0.to_string(), e.to_string());
l!(service.log, "error binding UDP socket to {}: {}", a.0.to_string(), e.to_string());
}, |s| {
l!(log, "bound UDP socket at {}", a.0.to_string());
l!(service.log, "bound UDP socket at {}", a.0.to_string());
udp_sockets.insert(a.0.clone(), s);
bindings_changed = true;
});
@ -400,19 +399,19 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
if local_config.settings.auto_port_search {
// TODO: port hunting
} else {
l!(log, "WARNING: failed to bind to any address at primary port {}", local_config.settings.primary_port);
l!(service.log, "WARNING: failed to bind to any address at primary port {}", local_config.settings.primary_port);
}
}
if udp_secondary_port_bind_failure {
if local_config.settings.auto_port_search {
// TODO: port hunting
} else {
l!(log, "WARNING: failed to bind to any address at secondary port {}", local_config.settings.secondary_port.unwrap_or(0));
l!(service.log, "WARNING: failed to bind to any address at secondary port {}", local_config.settings.secondary_port.unwrap_or(0));
}
}
for k in http_listeners.keys().filter_map(|a| if system_addrs.contains_key(a) { None } else { Some(a.clone()) }).collect::<Vec<InetAddress>>().iter() {
l!(log, "closing HTTP listener at {} (address no longer exists on system or port has changed)", k.to_string());
l!(service.log, "closing HTTP listener at {} (address no longer exists on system or port has changed)", k.to_string());
http_listeners.remove(k);
bindings_changed = true;
}
@ -421,9 +420,9 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
let sa = a.0.to_socketaddr();
if sa.is_some() {
let wl = HttpListener::new(a.1.as_str(), sa.unwrap(), &service).await.map_or_else(|e| {
l!(log, "error creating HTTP listener at {}: {}", a.0.to_string(), e.to_string());
l!(service.log, "error creating HTTP listener at {}: {}", a.0.to_string(), e.to_string());
}, |l| {
l!(log, "created HTTP listener at {}", a.0.to_string());
l!(service.log, "created HTTP listener at {}", a.0.to_string());
http_listeners.insert(a.0.clone(), l);
bindings_changed = true;
});
@ -449,12 +448,12 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
}
if loopback_http_listeners.0.is_none() && loopback_http_listeners.1.is_none() {
// TODO: port hunting
l!(log, "CRITICAL: unable to create HTTP endpoint on 127.0.0.1/{} or ::1/{}, service control API will not work!", local_config.settings.primary_port, local_config.settings.primary_port);
l!(service.log, "CRITICAL: unable to create HTTP endpoint on 127.0.0.1/{} or ::1/{}, service control API will not work!", local_config.settings.primary_port, local_config.settings.primary_port);
}
if bindings_changed {
{
let mut udp_local_endpoints = service.intl.udp_local_endpoints.lock().unwrap();
let mut udp_local_endpoints = service.udp_local_endpoints.lock().unwrap();
udp_local_endpoints.clear();
for ep in udp_sockets.iter() {
udp_local_endpoints.push(ep.0.clone());
@ -462,7 +461,7 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
udp_local_endpoints.sort();
}
{
let mut http_local_endpoints = service.intl.http_local_endpoints.lock().unwrap();
let mut http_local_endpoints = service.http_local_endpoints.lock().unwrap();
http_local_endpoints.clear();
for ep in http_listeners.iter() {
http_local_endpoints.push(ep.0.clone());
@ -482,7 +481,7 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
loop_delay = node.process_background_tasks(now);
}
l!(log, "shutting down normally.");
l!(service.log, "shutting down normally.");
drop(udp_sockets);
drop(http_listeners);
@ -490,26 +489,12 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
drop(node);
drop(service);
d!(log, "shutdown complete.");
process_exit_value
}
pub(crate) fn run(store: Arc<Store>) -> i32 {
let local_config = Arc::new(store.read_local_conf_or_default());
let log = Arc::new(Log::new(
if local_config.settings.log.path.as_ref().is_some() {
local_config.settings.log.path.as_ref().unwrap().as_str()
} else {
store.default_log_path.to_str().unwrap()
},
local_config.settings.log.max_size,
local_config.settings.log.stderr,
local_config.settings.log.debug,
"",
));
if store.auth_token(true).is_err() {
eprintln!("FATAL: error writing new web API authorization token (likely permission problem).");
return 1;
@ -521,7 +506,7 @@ pub(crate) fn run(store: Arc<Store>) -> i32 {
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let store2 = store.clone();
let process_exit_value = rt.block_on(async move { run_async(store2, log, local_config).await });
let process_exit_value = rt.block_on(async move { run_async(store2, local_config).await });
rt.shutdown_timeout(Duration::from_millis(500));
store.erase_pid();