Implement most of /status

This commit is contained in:
Adam Ierymenko 2021-03-23 17:53:39 -04:00
parent 1b9ec2d9c3
commit 5a0de399d7
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
27 changed files with 414 additions and 194 deletions

View file

@ -38,7 +38,14 @@ static clock_serv_t _machGetRealtimeClock() noexcept
host_get_clock_service(mach_host_self(),CALENDAR_CLOCK,&c);
return c;
}
static clock_serv_t _machGetMonotonicClock() noexcept
{
clock_serv_t c;
host_get_clock_service(mach_host_self(),REALTIME_CLOCK,&c);
return c;
}
clock_serv_t OSUtils::s_machRealtimeClock = _machGetRealtimeClock();
clock_serv_t OSUtils::s_machMonotonicClock = _machGetMonotonicClock();
#endif
unsigned int OSUtils::ztsnprintf(char *buf,unsigned int len,const char *fmt,...)

View file

@ -43,6 +43,7 @@ class OSUtils
private:
#ifdef __APPLE__
static clock_serv_t s_machRealtimeClock;
static clock_serv_t s_machMonotonicClock;
#endif
public:
@ -179,6 +180,42 @@ public:
#endif
};
/**
* Get monotonic time since some point in the past
*
* On some systems this may fall back to the same return value as now(), but
* if a monotonic (not affected by time changes) source is available it will
* be used.
*
* @return Current monotonic time in milliseconds (usually since system boot, but origin point is undefined)
*/
static ZT_INLINE int64_t now_monotonic()
{
#ifdef __WINDOWS__
return (int64_t)GetTickCount64();
#else
#ifdef __LINUX__
timespec ts;
#ifdef CLOCK_MONOTONIC_COARSE
clock_gettime(CLOCK_MONOTONIC_COARSE,&ts);
#else
clock_gettime(CLOCK_MONOTONIC,&ts);
#endif
return ( (1000LL * (int64_t)ts.tv_sec) + ((int64_t)(ts.tv_nsec / 1000000)) );
#else
#ifdef __APPLE__
mach_timespec_t mts;
clock_get_time(s_machMonotonicClock,&mts);
return ( (1000LL * (int64_t)mts.tv_sec) + ((int64_t)(mts.tv_nsec / 1000000)) );
#else
timeval tv;
gettimeofday(&tv,(struct timezone *)0);
return ( (1000LL * (int64_t)tv.tv_sec) + (int64_t)(tv.tv_usec / 1000) );
#endif
#endif
#endif
};
/**
* Read the full contents of a file into a string buffer
*

View file

@ -128,6 +128,9 @@ const char *platformDefaultHomePath()
int64_t msSinceEpoch()
{ return ZeroTier::OSUtils::now(); }
int64_t msMonotonic()
{ return ZeroTier::OSUtils::now_monotonic(); }
void lockDownFile(const char *path, int isDir)
{ ZeroTier::OSUtils::lockDownFile(path, isDir != 0); }

View file

@ -108,6 +108,9 @@ extern const char *platformDefaultHomePath();
// This ms-since-epoch function may be faster than the one in Rust's stdlib.
extern int64_t msSinceEpoch();
// This is the number of milliseconds since some time in the past, unaffected by the clock (or msSinceEpoch() if not supported by host).
extern int64_t msMonotonic();
// Rust glue to C code to lock down a file, which is simple on Unix-like OSes
// and horrible on Windows.
extern void lockDownFile(const char *path, int isDir);

View file

@ -28,12 +28,6 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
[[package]]
name = "itoa"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6"
[[package]]
name = "num-derive"
version = "0.3.3"
@ -72,12 +66,6 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "serde"
version = "1.0.118"
@ -98,17 +86,6 @@ dependencies = [
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1500e84d27fe482ed1dc791a56eddc2f230046a040fa908c08bda1d9fb615779"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "syn"
version = "1.0.54"
@ -136,5 +113,4 @@ dependencies = [
"num-derive",
"num-traits",
"serde",
"serde_json",
]

View file

@ -9,7 +9,6 @@ build = "build.rs"
num-traits = "0.2"
num-derive = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
hex = "0.4"
base64-serde = "0"
base64 = "0"

View file

@ -156,7 +156,6 @@ impl CertificateSubjectUniqueIdSecret {
}
}
implement_to_from_json!(CertificateSubjectUniqueIdSecret);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -313,8 +312,6 @@ impl CertificateName {
}
}
implement_to_from_json!(CertificateName);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
@ -362,8 +359,6 @@ impl CertificateNetwork {
}
}
implement_to_from_json!(CertificateNetwork);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
@ -391,8 +386,6 @@ impl CertificateIdentity {
}
}
implement_to_from_json!(CertificateIdentity);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
@ -574,8 +567,6 @@ impl CertificateSubject {
}
}
implement_to_from_json!(CertificateSubject);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
@ -711,8 +702,6 @@ impl Certificate {
}
}
implement_to_from_json!(Certificate);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]

View file

@ -16,10 +16,9 @@ use std::ffi::c_void;
use std::os::raw::{c_char, c_uint};
use crate::{cstr_to_string, ResultCode};
use crate::capi::ZT_Dictionary_parse;
/// Rust interface to the Dictionary data structure.
#[derive(Clone, Eq, PartialEq)]
#[derive(Clone)]
pub struct Dictionary {
data: HashMap<String, Vec<u8>>,
}
@ -51,7 +50,7 @@ impl Dictionary {
pub fn new_from_bytes(dict: &[u8]) -> Result<Dictionary, ResultCode> {
let mut d = Dictionary{ data: HashMap::new() };
if unsafe { ZT_Dictionary_parse(dict.as_ptr().cast(), dict.len() as c_uint, (&mut d as *mut Dictionary).cast(), Some(populate_dict_callback)) != 0 } {
if unsafe { crate::capi::ZT_Dictionary_parse(dict.as_ptr().cast(), dict.len() as c_uint, (&mut d as *mut Dictionary).cast(), Some(populate_dict_callback)) != 0 } {
Ok(d)
} else {
Err(ResultCode::ErrorBadParameter)

View file

@ -76,6 +76,8 @@ impl InetAddressFamily {
pub const IPV4_INADDR_ANY: [u8; 4] = [0; 4];
pub const IPV6_INADDR_ANY: [u8; 16] = [0; 16];
pub const IPV4_LOOPBACK: [u8; 4] = [127, 0, 0, 1];
pub const IPV6_LOOPBACK: [u8; 16] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
/// Opaque structure that can hold an IPv4 or IPv6 address.
pub struct InetAddress {
@ -110,6 +112,24 @@ impl InetAddress {
ia
}
/// Create 127.0.0.1/port
pub fn new_ipv4_loopback(port: u16) -> InetAddress {
let mut ia = InetAddress::new();
unsafe {
ztcore::ZT_InetAddress_setIpBytes(ia.as_capi_mut_ptr(), IPV4_LOOPBACK.as_ptr().cast(), 4, port as c_uint);
}
ia
}
/// Create ::1/port
pub fn new_ipv6_loopback(port: u16) -> InetAddress {
let mut ia = InetAddress::new();
unsafe {
ztcore::ZT_InetAddress_setIpBytes(ia.as_capi_mut_ptr(), IPV6_LOOPBACK.as_ptr().cast(), 16, port as c_uint);
}
ia
}
/// Create from a 4-byte IPv4 IP or a 16-byte IPv6 IP.
/// Returns None if ip is not 4 or 16 bytes.
pub fn new_from_ip_bytes(ip: &[u8], port: u16) -> Option<InetAddress> {

View file

@ -204,29 +204,6 @@ pub unsafe fn cstr_to_string(cstr: *const c_char, max_len: isize) -> String {
String::new()
}
/// Macro to implement to_json and new_from_json on types that are Serializable.
#[macro_export]
macro_rules! implement_to_from_json {
($struct_name:ident) => {
impl $struct_name {
pub fn new_from_json(json: &str) -> Result<$struct_name, String> {
let r: serde_json::error::Result<$struct_name> = serde_json::from_str(json);
if r.is_err() {
let e = r.err();
if e.is_none() {
return Err(String::from("unknown error"));
}
return Err(e.unwrap().to_string());
}
Ok(r.unwrap())
}
pub fn to_json(&self) -> String {
serde_json::to_string_pretty(self).unwrap()
}
}
};
}
/*
#[macro_export(crate)]
macro_rules! enum_str {

View file

@ -17,13 +17,7 @@ pub struct MAC(pub u64);
impl ToString for MAC {
fn to_string(&self) -> String {
let x = self.0;
format!("{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}",
(x >> 40) & 0xff,
(x >> 32) & 0xff,
(x >> 24) & 0xff,
(x >> 16) & 0xff,
(x >> 8) & 0xff,
x & 0xff)
format!("{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}", (x >> 40) & 0xff, (x >> 32) & 0xff, (x >> 24) & 0xff, (x >> 16) & 0xff, (x >> 8) & 0xff, x & 0xff)
}
}

View file

@ -13,6 +13,7 @@
use crate::MAC;
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
#[derive(Clone, PartialEq, Eq)]
pub struct MulticastGroup {
@ -20,13 +21,34 @@ pub struct MulticastGroup {
pub adi: u32,
}
impl ToString for MulticastGroup {
fn to_string(&self) -> String {
format!("{}/{}", self.mac.to_string(), self.adi)
}
}
impl Hash for MulticastGroup {
#[inline(always)]
fn hash<H: Hasher>(&self, state: &mut H) {
self.mac.0.hash(state);
self.adi.hash(state);
}
}
impl Ord for MulticastGroup {
fn cmp(&self, other: &Self) -> Ordering {
let o1 = self.mac.0.cmp(&other.mac.0);
if o1 == Ordering::Equal {
self.adi.cmp(&other.adi)
if self.mac.0 < other.mac.0 {
Ordering::Less
} else if self.mac.0 > other.mac.0 {
Ordering::Greater
} else {
o1
if self.adi < other.adi {
Ordering::Less
} else if self.adi > other.adi {
Ordering::Greater
} else {
Ordering::Equal
}
}
}
}

13
service/Cargo.lock generated
View file

@ -104,6 +104,17 @@ dependencies = [
"vec_map",
]
[[package]]
name = "colored"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd"
dependencies = [
"atty",
"lazy_static",
"winapi",
]
[[package]]
name = "console"
version = "0.13.0"
@ -927,7 +938,6 @@ dependencies = [
"num-derive",
"num-traits",
"serde",
"serde_json",
]
[[package]]
@ -936,6 +946,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"clap",
"colored",
"dialoguer",
"digest_auth",
"futures",

View file

@ -24,6 +24,7 @@ hyper = { version = "0", features = ["http1", "runtime", "server", "client", "tc
socket2 = { version = "0", features = ["reuseport", "unix", "pair"] }
dialoguer = "0"
digest_auth = "0"
colored = "2"
[target."cfg(windows)".dependencies]
winapi = { version = "0.3.9", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

View file

@ -31,7 +31,7 @@ fn show<'a>(store: &Arc<Store>, cli_args: &ArgMatches<'a>) -> i32 {
fn newsid(cli_args: Option<&ArgMatches>) -> i32 {
let sid = CertificateSubjectUniqueIdSecret::new(CertificateUniqueIdType::NistP384); // right now there's only one type
let sid = sid.to_json();
let sid = serde_json::to_string(&sid).unwrap();
let path = cli_args.map_or("", |cli_args| { cli_args.value_of("path").unwrap_or("") });
if path.is_empty() {
let _ = std::io::stdout().write_all(sid.as_bytes());
@ -67,7 +67,7 @@ fn newcsr(cli_args: &ArgMatches) -> i32 {
println!("ERROR: invalid subject unique ID secret: {}", json.err().unwrap().to_string());
return 1;
}
let sid = CertificateSubjectUniqueIdSecret::new_from_json(json.unwrap().as_str());
let sid = serde_json::from_str::<CertificateSubjectUniqueIdSecret>(json.unwrap().as_str());
if sid.is_err() {
println!("ERROR: invalid subject unique ID secret: {}", sid.err().unwrap());
return 1;

View file

@ -11,17 +11,43 @@
*/
/****/
use std::error::Error;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use hyper::Uri;
use hyper::{Uri, Method, StatusCode};
use colored::*;
use crate::store::Store;
use crate::webclient::HttpClient;
use crate::webclient::*;
use crate::service::ServiceStatus;
use crate::{GlobalFlags, HTTP_API_OBJECT_SIZE_LIMIT};
pub(crate) async fn run(store: Arc<Store>, client: HttpClient, api_base_uri: Uri, auth_token: String) -> hyper::Result<i32> {
let mut res = client.get(api_base_uri).await?;
let body = hyper::body::to_bytes(res.body_mut()).await?;
Ok(0)
pub(crate) async fn run(store: Arc<Store>, global_flags: GlobalFlags, client: HttpClient, api_base_uri: Uri, auth_token: String) -> Result<i32, Box<dyn Error>> {
let uri = append_uri_path(api_base_uri, "/status").unwrap();
let mut res = request(&client, Method::GET, uri, None, auth_token.as_str()).await?;
match res.status() {
StatusCode::OK => {
let status = read_object_limited::<ServiceStatus>(res.body_mut(), HTTP_API_OBJECT_SIZE_LIMIT).await?;
if global_flags.json_output {
println!("{}", serde_json::to_string_pretty(&status).unwrap())
} else {
println!("address {} version {} status {}",
status.address.to_string().as_str().bright_white(),
status.version.as_str().bright_white(),
if status.online {
"ONLINE".bright_green()
} else {
"OFFLINE".bright_red()
});
// TODO: print more detailed status information
}
Ok(0)
},
_ => Err(Box::new(UnexpectedStatusCodeError(res.status())))
}
}

View file

@ -275,12 +275,6 @@ impl FastUDPSocket {
pub fn raw_socket(&self) -> FastUDPRawOsSocket {
*self.sockets.get(0).unwrap()
}
/// Get the number of threads this socket is currently running.
#[inline(always)]
pub fn thread_count(&self) -> usize {
self.threads.len()
}
}
impl Drop for FastUDPSocket {

View file

@ -228,30 +228,3 @@ impl Default for LocalConfig {
}
}
}
zerotier_core::implement_to_from_json!(LocalConfig);
impl LocalConfig {
pub fn load(path: &String) -> std::io::Result<LocalConfig> {
let md = std::path::Path::new(path).metadata();
if md.is_err() {
return Err(md.err().unwrap());
}
if md.unwrap().len() > 1048576 { // anti-memory-overflow sanity limit
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "local config file too large (sanity limit: 1MiB)"))
}
let json = std::fs::read_to_string(path);
if json.is_err() {
return Err(json.err().unwrap());
}
let json = Self::new_from_json(json.unwrap().as_str());
if json.is_err() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, json.err().unwrap().to_string()));
}
Ok(json.unwrap())
}
pub fn save(&self, path: &String) -> std::io::Result<()> {
std::fs::write(path, self.to_json())
}
}

View file

@ -11,7 +11,6 @@
*/
/****/
mod api;
mod commands;
mod fastudpsocket;
mod localconfig;
@ -36,6 +35,8 @@ use clap::{App, Arg, ArgMatches, ErrorKind};
use crate::store::Store;
pub const HTTP_API_OBJECT_SIZE_LIMIT: usize = 131072;
fn make_help() -> String {
let ver = zerotier_core::version();
format!(r###"ZeroTier Network Hypervisor Service Version {}.{}.{}
@ -139,7 +140,7 @@ filesystem or verbatim objects in string format. This is auto-detected.
pub(crate) fn print_help() {
let h = make_help();
std::io::stdout().write_all(h.as_bytes());
let _ = std::io::stdout().write_all(h.as_bytes());
}
pub(crate) fn parse_bool(v: &str) -> Result<bool, String> {
@ -166,7 +167,6 @@ fn is_valid_port(v: String) -> Result<(), String> {
}
fn make_store(cli_args: &ArgMatches) -> Arc<Store> {
//let json_output = cli_args.is_present("json"); // TODO
let zerotier_path = cli_args.value_of("path").map_or_else(|| unsafe { zerotier_core::cstr_to_string(osdep::platformDefaultHomePath(), -1) }, |ztp| ztp.to_string());
let store = Store::new(zerotier_path.as_str(), cli_args.value_of("token_path").map_or(None, |tp| Some(tp.to_string())), cli_args.value_of("token").map_or(None, |tok| Some(tok.trim().to_string())));
if store.is_err() {
@ -176,6 +176,17 @@ fn make_store(cli_args: &ArgMatches) -> Arc<Store> {
Arc::new(store.unwrap())
}
#[derive(Clone)]
pub struct GlobalFlags {
pub json_output: bool,
}
fn get_global_flags(cli_args: &ArgMatches) -> GlobalFlags {
GlobalFlags {
json_output: cli_args.is_present("json")
}
}
fn main() {
let cli_args = {
let help = make_help();
@ -313,7 +324,7 @@ fn main() {
println!("{}.{}.{}", ver.0, ver.1, ver.2);
0
}
("status", _) => crate::webclient::run_command(make_store(&cli_args), crate::commands::status::run),
("status", _) => crate::webclient::run_command(make_store(&cli_args), get_global_flags(&cli_args), crate::commands::status::run),
("set", Some(sub_cli_args)) => { 0 }
("peer", Some(sub_cli_args)) => { 0 }
("network", Some(sub_cli_args)) => { 0 }

View file

@ -11,6 +11,7 @@
*/
/****/
use std::cell::Cell;
use std::collections::BTreeMap;
use std::net::{SocketAddr, Ipv4Addr, IpAddr, Ipv6Addr};
use std::str::FromStr;
@ -28,17 +29,45 @@ use crate::localconfig::*;
use crate::log::Log;
use crate::network::Network;
use crate::store::Store;
use crate::utils::ms_since_epoch;
use crate::utils::{ms_since_epoch, ms_monotonic};
use crate::weblistener::WebListener;
/// How often to check for major configuration changes. This shouldn't happen
/// too often since it uses a bit of CPU.
const CONFIG_CHECK_INTERVAL: i64 = 5000;
/// ServiceStatus is the object returned by the API /status endpoint
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct ServiceStatus {
pub address: Address,
pub clock: i64,
pub uptime: i64,
pub config: Arc<LocalConfig>,
pub online: bool,
#[serde(rename = "publicIdentity")]
pub public_identity: Identity,
pub version: String,
#[serde(rename = "versionMajor")]
pub version_major: i32,
#[serde(rename = "versionMinor")]
pub version_minor: i32,
#[serde(rename = "versionRev")]
pub version_revision: i32,
#[serde(rename = "versionBuild")]
pub version_build: i32,
#[serde(rename = "udpLocalEndpoints")]
pub udp_local_endpoints: Vec<InetAddress>,
#[serde(rename = "httpLocalEndpoints")]
pub http_local_endpoints: Vec<InetAddress>,
}
struct ServiceIntl {
udp_local_endpoints: Mutex<Vec<InetAddress>>,
http_local_endpoints: Mutex<Vec<InetAddress>>,
interrupt: Mutex<futures::channel::mpsc::Sender<()>>,
local_config: Mutex<Arc<LocalConfig>>,
store: Arc<Store>,
starup_time: i64,
run: AtomicBool,
online: AtomicBool,
}
@ -151,12 +180,10 @@ impl NodeEventHandler<Network> for Service {
}
impl Service {
#[inline(always)]
pub fn local_config(&self) -> Arc<LocalConfig> {
self.intl.local_config.lock().unwrap().clone()
}
#[inline(always)]
pub fn set_local_config(&self, new_lc: LocalConfig) {
*(self.intl.local_config.lock().unwrap()) = Arc::new(new_lc);
}
@ -165,17 +192,14 @@ impl Service {
/// This can return None if we are in the midst of shutdown. In this case
/// whatever operation is in progress should abort. None will never be
/// returned during normal operation.
#[inline(always)]
pub fn node(&self) -> Option<Arc<Node<Service, Network>>> {
self._node.upgrade()
}
#[inline(always)]
pub fn store(&self) -> &Arc<Store> {
&self.intl.store
}
#[inline(always)]
pub fn online(&self) -> bool {
self.intl.online.load(Ordering::Relaxed)
}
@ -184,6 +208,32 @@ impl Service {
self.intl.run.store(false, Ordering::Relaxed);
let _ = self.intl.interrupt.lock().unwrap().try_send(());
}
pub fn uptime(&self) -> i64 {
ms_since_epoch() - self.intl.starup_time
}
/// Get service status for API, or None if a shutdown is in progress.
pub fn status(&self) -> Option<ServiceStatus> {
let ver = zerotier_core::version();
self.node().map(|node| {
ServiceStatus {
address: node.address(),
clock: ms_since_epoch(),
uptime: self.uptime(),
config: self.local_config(),
online: self.online(),
public_identity: node.identity(),
version: format!("{}.{}.{}", ver.0, ver.1, ver.2),
version_major: ver.0,
version_minor: ver.1,
version_revision: ver.2,
version_build: ver.3,
udp_local_endpoints: self.intl.udp_local_endpoints.lock().unwrap().clone(),
http_local_endpoints: self.intl.http_local_endpoints.lock().unwrap().clone(),
}
})
}
}
unsafe impl Send for Service {}
@ -202,9 +252,12 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
log: log.clone(),
_node: Weak::new(),
intl: Arc::new(ServiceIntl {
udp_local_endpoints: Mutex::new(Vec::new()),
http_local_endpoints: Mutex::new(Vec::new()),
interrupt: Mutex::new(interrupt_tx),
local_config: Mutex::new(local_config),
store: store.clone(),
starup_time: ms_monotonic(),
run: AtomicBool::new(true),
online: AtomicBool::new(false),
}),
@ -254,16 +307,18 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL {
last_checked_config = now;
let new_config = store.read_local_conf(true);
if new_config.is_ok() {
let mut bindings_changed = false;
let _ = store.read_local_conf(true).map(|new_config| new_config.map(|new_config| {
d!(log, "local.conf changed on disk, reloading.");
service.set_local_config(new_config.unwrap());
}
service.set_local_config(new_config);
}));
let next_local_config = service.local_config();
if local_config.settings.primary_port != next_local_config.settings.primary_port {
local_web_listeners.0 = None;
local_web_listeners.1 = None;
bindings_changed = true;
}
if local_config.settings.log.max_size != next_local_config.settings.log.max_size {
log.set_max_size(next_local_config.settings.log.max_size);
@ -311,16 +366,18 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
l!(log, "unbinding UDP socket at {}", k.to_string());
udp_sockets.remove(k);
}
drop(udp_sockets_to_close);
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let _ = FastUDPSocket::new(addr.1.as_str(), addr.0, move |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
let _ = FastUDPSocket::new(addr.1.as_str(), addr.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
// TODO: incoming packet handler
}).map_or_else(|e| {
l!(log, "error binding UDP socket to {}: {}", addr.0.to_string(), e.to_string());
}, |s| {
l!(log, "bound UDP socket at {}", addr.0.to_string());
udp_sockets.insert(addr.0.clone(), s);
bindings_changed = true;
});
}
}
@ -360,12 +417,14 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
for l in web_listeners.iter() {
if !system_addrs.contains_key(l.0) {
web_listeners_to_close.push(l.0.clone());
bindings_changed = true;
}
}
for k in web_listeners_to_close.iter() {
l!(log, "closing HTTP listener at {}", k.to_string());
web_listeners.remove(k);
}
drop(web_listeners_to_close);
for addr in system_addrs.iter() {
if addr.0.port() == local_config.settings.primary_port && !web_listeners.contains_key(addr.0) {
@ -376,6 +435,7 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
}, |l| {
l!(log, "created HTTP listener at {}", addr.0.to_string());
web_listeners.insert(addr.0.clone(), l);
bindings_changed = true;
});
}
}
@ -385,6 +445,7 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
let _ = WebListener::new(loopback_dev_name.as_str(), SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), local_config.settings.primary_port), &service).await.map(|wl| {
local_web_listeners.0 = Some(wl);
let _ = store.write_uri(format!("http://127.0.0.1:{}/", local_config.settings.primary_port).as_str());
bindings_changed = true;
});
}
if local_web_listeners.1.is_none() {
@ -393,11 +454,35 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
if local_web_listeners.0.is_none() {
let _ = store.write_uri(format!("http://[::1]:{}/", local_config.settings.primary_port).as_str());
}
bindings_changed = true;
});
}
if local_web_listeners.0.is_none() && local_web_listeners.1.is_none() {
l!(log, "error creating HTTP listener on 127.0.0.1/{} or ::1/{}", local_config.settings.primary_port, local_config.settings.primary_port);
}
if bindings_changed {
service.intl.udp_local_endpoints.lock().as_mut().map(|udp_local_endpoints| {
udp_local_endpoints.clear();
for ep in udp_sockets.iter() {
udp_local_endpoints.push(ep.0.clone());
}
udp_local_endpoints.sort();
});
service.intl.http_local_endpoints.lock().as_mut().map(|http_local_endpoints| {
http_local_endpoints.clear();
for ep in web_listeners.iter() {
http_local_endpoints.push(ep.0.clone());
}
if local_web_listeners.0.is_some() {
http_local_endpoints.push(InetAddress::new_ipv4_loopback(local_web_listeners.0.unwrap().address.port()));
}
if local_web_listeners.1.is_some() {
http_local_endpoints.push(InetAddress::new_ipv6_loopback(local_web_listeners.1.unwrap().address.port()));
}
http_local_endpoints.sort();
});
}
}
// Run background task handler in ZeroTier core.
@ -418,7 +503,7 @@ async fn run_async(store: Arc<Store>, log: Arc<Log>, local_config: Arc<LocalConf
}
pub(crate) fn run(store: Arc<Store>) -> i32 {
let local_config = Arc::new(store.read_local_conf(false).unwrap_or_else(|_| { LocalConfig::default() }));
let local_config = Arc::new(store.read_local_conf_or_default());
let log = Arc::new(Log::new(
if local_config.settings.log.path.as_ref().is_some() {

View file

@ -21,6 +21,12 @@ use zerotier_core::{StateObjectType, NetworkId};
use crate::localconfig::LocalConfig;
const ZEROTIER_PID: &'static str = "zerotier.pid";
const ZEROTIER_URI: &'static str = "zerotier.uri";
const LOCAL_CONF: &'static str = "local.conf";
const AUTHTOKEN_SECRET: &'static str = "authtoken.secret";
const SERVICE_LOG: &'static str = "service.log";
/// In-filesystem data store for configuration and objects.
pub(crate) struct Store {
pub base_path: Box<Path>,
@ -58,14 +64,14 @@ impl Store {
let s = Store {
base_path: bp.to_path_buf().into_boxed_path(),
default_log_path: bp.join("service.log").into_boxed_path(),
default_log_path: bp.join(SERVICE_LOG).into_boxed_path(),
prev_local_config: Mutex::new(String::new()),
peers_path: bp.join("peers.d").into_boxed_path(),
controller_path: bp.join("controller.d").into_boxed_path(),
networks_path: bp.join("networks.d").into_boxed_path(),
certs_path: bp.join("certs.d").into_boxed_path(),
auth_token_path: Mutex::new(auth_token_path_override.map_or_else(|| {
bp.join("authtoken.secret").into_boxed_path()
bp.join(AUTHTOKEN_SECRET).into_boxed_path()
}, |auth_token_path_override| {
PathBuf::from(auth_token_path_override).into_boxed_path()
})),
@ -100,7 +106,7 @@ impl Store {
}
},
StateObjectType::TrustStore => {
Some(self.base_path.join("truststore"))
Some(self.base_path.join("trust"))
},
StateObjectType::Locator => {
Some(self.base_path.join("locator"))
@ -122,7 +128,6 @@ impl Store {
}
}
/// Class-internal read function
fn read_internal(&self, path: PathBuf) -> std::io::Result<Vec<u8>> {
let fmd = path.metadata()?;
if fmd.is_file() {
@ -139,7 +144,6 @@ impl Store {
Err(std::io::Error::new(std::io::ErrorKind::NotFound, "does not exist or is not readable"))
}
/// Get content of authtoken.secret or optionally generate and save if missing.
pub fn auth_token(&self, generate_if_missing: bool) -> std::io::Result<String> {
let mut token = self.auth_token.lock().unwrap();
if token.is_empty() {
@ -179,10 +183,7 @@ impl Store {
}
}
/// Get a list of the network IDs to which this node is joined.
/// This is used to recall networks on startup by enumerating networks.d
/// and telling the core to (re)join them all.
pub fn networks(&self) -> Vec<NetworkId> {
pub fn list_joined_networks(&self) -> Vec<NetworkId> {
let mut list: Vec<NetworkId> = Vec::new();
let d = std::fs::read_dir(self.networks_path.as_ref());
if d.is_ok() {
@ -202,12 +203,10 @@ impl Store {
list
}
/// Read a file located in the base ZeroTier home directory.
pub fn read_file(&self, fname: &str) -> std::io::Result<Vec<u8>> {
self.read_internal(self.base_path.join(fname))
}
/// Like read_file but also converts into a string.
pub fn read_file_str(&self, fname: &str) -> std::io::Result<String> {
let data = self.read_file(fname)?;
let data = String::from_utf8(data);
@ -217,58 +216,63 @@ impl Store {
Ok(data.unwrap())
}
/// Write a file to the base ZeroTier home directory.
/// Error code std::io::ErrorKind::Other is returned if skip_if_unchanged is true
/// and there has been no change from the last read.
pub fn write_file(&self, fname: &str, data: &[u8]) -> std::io::Result<()> {
std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(self.base_path.join(fname))?.write_all(data)
}
/// Reads local.conf and deserializes into a LocalConfig object.
pub fn read_local_conf(&self, skip_if_unchanged: bool) -> std::io::Result<LocalConfig> {
let data = self.read_file_str("local.conf")?;
pub fn read_local_conf(&self, skip_if_unchanged: bool) -> Option<std::io::Result<LocalConfig>> {
let data = self.read_file_str(LOCAL_CONF);
if data.is_err() {
return Some(Err(data.err().unwrap()));
}
let data = data.unwrap();
if skip_if_unchanged {
let mut prev = self.prev_local_config.lock().unwrap();
if prev.eq(&data) {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "unchangd"));
return None;
}
*prev = data.clone();
} else {
*(self.prev_local_config.lock().unwrap()) = data.clone();
}
let lc = LocalConfig::new_from_json(data.as_str());
let lc = serde_json::from_str::<LocalConfig>(data.as_str());
if lc.is_err() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, lc.err().unwrap()));
return Some(Err(std::io::Error::new(std::io::ErrorKind::InvalidData, lc.err().unwrap())));
}
Ok(lc.unwrap())
Some(Ok(lc.unwrap()))
}
pub fn read_local_conf_or_default(&self) -> LocalConfig {
let lc = self.read_local_conf(false);
if lc.is_some() {
let lc = lc.unwrap();
if lc.is_ok() {
return lc.unwrap();
}
}
LocalConfig::default()
}
/// Writes a LocalConfig object in JSON format to local.conf.
pub fn write_local_conf(&self, lc: &LocalConfig) -> std::io::Result<()> {
let json = lc.to_json();
self.write_file("local.conf", json.as_bytes())
let json = serde_json::to_string(lc).unwrap();
self.write_file(LOCAL_CONF, json.as_bytes())
}
/// Write zerotier.pid file with current process's PID.
#[cfg(unix)]
pub fn write_pid(&self) -> std::io::Result<()> {
let pid = unsafe { crate::osdep::getpid() }.to_string();
self.write_file("zerotier.pid", pid.as_bytes())
self.write_file(ZEROTIER_PID, pid.as_bytes())
}
/// Erase zerotier.pid if present.
pub fn erase_pid(&self) {
let _ = std::fs::remove_file(self.base_path.join("zerotier.pid"));
let _ = std::fs::remove_file(self.base_path.join(ZEROTIER_PID));
}
/// Write a string to zerotier.uri
pub fn write_uri(&self, uri: &str) -> std::io::Result<()> {
self.write_file("zerotier.uri", uri.as_bytes())
self.write_file(ZEROTIER_URI, uri.as_bytes())
}
/// Load zerotier.uri if present
pub fn load_uri(&self) -> std::io::Result<hyper::Uri> {
let uri = String::from_utf8(self.read_file("zerotier.uri")?);
let uri = String::from_utf8(self.read_file(ZEROTIER_URI)?);
uri.map_or_else(|e| {
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))
}, |uri| {
@ -281,7 +285,6 @@ impl Store {
})
}
/// Load a ZeroTier core object.
pub fn load_object(&self, obj_type: &StateObjectType, obj_id: &[u64]) -> std::io::Result<Vec<u8>> {
let obj_path = self.make_obj_path_internal(&obj_type, obj_id);
if obj_path.is_some() {
@ -290,7 +293,6 @@ impl Store {
Err(std::io::Error::new(std::io::ErrorKind::NotFound, "does not exist or is not readable"))
}
/// Erase a ZeroTier core object.
pub fn erase_object(&self, obj_type: &StateObjectType, obj_id: &[u64]) {
let obj_path = self.make_obj_path_internal(obj_type, obj_id);
if obj_path.is_some() {
@ -298,8 +300,6 @@ impl Store {
}
}
/// Store a ZeroTier core object.
/// Permissions will also be restricted for some object types.
pub fn store_object(&self, obj_type: &StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> {
let obj_path = self.make_obj_path_internal(obj_type, obj_id);
if obj_path.is_some() {

View file

@ -21,14 +21,17 @@ use std::path::Path;
use zerotier_core::{Identity, Locator};
use crate::osdep;
use crate::osdep::time;
#[inline(always)]
pub(crate) fn ms_since_epoch() -> i64 {
// This is easy to do in the Rust stdlib, but the version in OSUtils is probably faster.
unsafe { osdep::msSinceEpoch() }
}
#[inline(always)]
pub(crate) fn ms_monotonic() -> i64 {
unsafe { osdep::msMonotonic() }
}
/// Convenience function to read up to limit bytes from a file.
/// If the file is larger than limit, the excess is not read.
pub(crate) fn read_limit<P: AsRef<Path>>(path: P, limit: usize) -> std::io::Result<Vec<u8>> {

View file

@ -20,8 +20,8 @@ use num_traits::cast::AsPrimitive;
use crate::osdep as osdep;
/// BSD based OSes support getifmaddrs().
#[cfg(any(target_os = "macos", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", target_os = "dragonfly", target_os = "ios", target_os = "bsd", target_os = "darwin"))]
pub(crate) fn bsd_get_multicast_groups(dev: &str) -> BTreeSet<MulticastGroup> {
#[cfg(any(target_os = "macos", target_os = "ios", target_os = "netbsd", target_os = "openbsd", target_os = "dragonfly", target_os = "freebsd", target_os = "darwin"))]
pub(crate) fn get_l2_multicast_subscriptions(dev: &str) -> BTreeSet<MulticastGroup> {
let mut groups: BTreeSet<MulticastGroup> = BTreeSet::new();
let dev = dev.as_bytes();
unsafe {
@ -50,7 +50,7 @@ pub(crate) fn bsd_get_multicast_groups(dev: &str) -> BTreeSet<MulticastGroup> {
/// Linux stores this stuff in /proc and it needs to be fetched from there.
#[cfg(target_os = "linux")]
pub(crate) fn linux_get_multicast_groups(dev: &str) -> BTreeSet<MulticastGroup> {
pub(crate) fn get_l2_multicast_subscriptions(dev: &str) -> BTreeSet<MulticastGroup> {
let mut groups: BTreeSet<MulticastGroup> = BTreeSet::new();
groups
}

View file

@ -424,7 +424,7 @@ impl VNIC for MacFethTap {
#[inline(always)]
fn get_multicast_groups(&self) -> BTreeSet<MulticastGroup> {
crate::vnic::common::bsd_get_multicast_groups(self.device.name.as_str())
crate::vnic::common::get_l2_multicast_subscriptions(self.device.name.as_str())
}
#[inline(always)]

View file

@ -13,24 +13,51 @@
use std::error::Error;
use std::future::Future;
use std::str::FromStr;
use std::time::Duration;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use hyper::{Uri, Response, Body, Method, Request, StatusCode};
use futures::stream::StreamExt;
use hyper::{Body, Method, Request, Response, StatusCode, Uri};
use hyper::http::uri::{Authority, PathAndQuery, Scheme};
use serde::de::DeserializeOwned;
use crate::GlobalFlags;
use crate::store::Store;
pub(crate) type HttpClient = Rc<hyper::Client<hyper::client::HttpConnector, Body>>;
#[derive(Debug)]
pub(crate) struct IncorrectAuthTokenError;
impl Error for IncorrectAuthTokenError {}
impl std::fmt::Display for IncorrectAuthTokenError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "401 UNAUTHORIZED (incorrect authorization token or not allowed to read token)")
}
}
#[derive(Debug)]
pub(crate) struct UnexpectedStatusCodeError(pub StatusCode);
impl Error for UnexpectedStatusCodeError {}
impl std::fmt::Display for UnexpectedStatusCodeError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "unexpected status code: {} {}", self.0.as_str(), self.0.canonical_reason().unwrap_or("???"))
}
}
/// Launch the supplied function with a ready to go HTTP client, the auth token, and the API URI.
/// This is boilerplate code for CLI commands that invoke the HTTP API. Since it instantiates and
/// then kills a tokio runtime, it's not for use in the service code that runs in a long-running
/// tokio runtime.
pub(crate) fn run_command<
R: Future<Output = hyper::Result<i32>>,
F: FnOnce(Arc<Store>, HttpClient, Uri, String) -> R
>(store: Arc<Store>, func: F) -> i32 {
R: Future<Output = Result<i32, Box<dyn Error>>>,
F: FnOnce(Arc<Store>, GlobalFlags, HttpClient, Uri, String) -> R
>(store: Arc<Store>, global_flags: GlobalFlags, func: F) -> i32 {
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let code = rt.block_on(async move {
let uri = store.load_uri();
@ -45,9 +72,11 @@ pub(crate) fn run_command<
} else {
let uri = uri.unwrap();
let uri_str = uri.to_string();
func(store, Rc::new(hyper::Client::new()), uri, auth_token.unwrap()).await.map_or_else(|e| {
println!("ERROR: service API HTTP request failed: {}", e.to_string());
println!("ZeroTier service may not be running or '{}' may be unreachable.", uri_str);
func(store, global_flags, Rc::new(hyper::Client::builder().http1_max_buf_size(65536).build_http()), uri, auth_token.unwrap()).await.map_or_else(|e| {
println!("ERROR: service API HTTP request ({}) failed: {}", uri_str, e);
println!();
println!("Common causes: service is not running, authorization token incorrect");
println!("or not readable, or a local firewall is blocking loopback connections.");
1
}, |code| {
code
@ -61,10 +90,11 @@ pub(crate) fn run_command<
/// Send a request to the API with support for HTTP digest authentication.
/// The data option is for PUT and POST requests. For GET it is ignored. Errors indicate total
/// failure such as connection refused. A returned result must still have its status checked. If
/// it's 401 (unauthorized) it likely means the auth_token is wrong.
pub(crate) async fn request<D: AsRef<[u8]>>(client: &HttpClient, method: Method, uri: Uri, data: D, auth_token: String) -> Result<Response<Body>, Box<dyn Error>> {
let body = data.as_ref().to_vec();
/// failure such as connection refused. A returned result must still have its status checked.
/// Note that if authorization is required and the auth token doesn't work, IncorrectAuthTokenError
/// is returned as an error instead of a 401 response object.
pub(crate) async fn request(client: &HttpClient, method: Method, uri: Uri, data: Option<&[u8]>, auth_token: &str) -> Result<Response<Body>, Box<dyn Error>> {
let body: Vec<u8> = data.map_or_else(|| Vec::new(), |data| data.to_vec());
let req = Request::builder().method(&method).version(hyper::Version::HTTP_11).uri(&uri).body(Body::from(body.clone()));
if req.is_err() {
@ -85,7 +115,7 @@ pub(crate) async fn request<D: AsRef<[u8]>>(client: &HttpClient, method: Method,
if auth.is_err() {
return Err(Box::new(auth.err().unwrap()));
}
let mut auth = digest_auth::parse(auth.unwrap());
let auth = digest_auth::parse(auth.unwrap());
if auth.is_err() {
return Err(Box::new(auth.err().unwrap()));
}
@ -103,8 +133,68 @@ pub(crate) async fn request<D: AsRef<[u8]>>(client: &HttpClient, method: Method,
if res.is_err() {
return Err(Box::new(res.err().unwrap()));
}
return Ok(res.unwrap());
let res = res.unwrap();
if res.status() == StatusCode::UNAUTHORIZED {
return Err(Box::new(IncorrectAuthTokenError));
}
return Ok(res);
}
return Ok(res);
}
/// Append to a URI path, returning None on error or a new Uri.
pub(crate) fn append_uri_path(uri: Uri, new_path: &str) -> Option<Uri> {
let parts = uri.into_parts();
let mut path = parts.path_and_query.map_or_else(|| String::new(), |pq| pq.to_string());
while path.ends_with("/") {
let _ = path.pop();
}
path.push_str(new_path);
let path = PathAndQuery::from_str(path.as_str());
if path.is_err() {
None
} else {
Uri::builder()
.scheme(parts.scheme.unwrap_or(Scheme::HTTP))
.authority(parts.authority.unwrap_or(Authority::from_static("127.0.0.1")))
.path_and_query(path.unwrap())
.build()
.map_or_else(|_| None, |uri| Some(uri))
}
}
/// Read HTTP body with a size limit.
pub(crate) async fn read_body_limited(body: &mut Body, max_size: usize) -> Result<Vec<u8>, Box<dyn Error>> {
let mut data: Vec<u8> = Vec::new();
loop {
let blk = body.next().await;
if blk.is_some() {
let blk = blk.unwrap();
if blk.is_err() {
return Err(Box::new(blk.err().unwrap()));
}
for b in blk.unwrap().iter() {
data.push(*b);
if data.len() >= max_size {
return Ok(data);
}
}
} else {
break;
}
}
Ok(data)
}
pub(crate) async fn read_object_limited<O: DeserializeOwned>(body: &mut Body, max_size: usize) -> Result<O, Box<dyn Error>> {
let data = read_body_limited(body, max_size).await?;
let obj = serde_json::from_slice(data.as_slice());
if obj.is_err() {
Err(Box::new(obj.err().unwrap()))
} else {
Ok(obj.unwrap())
}
}

View file

@ -13,15 +13,13 @@
use std::cell::RefCell;
use std::convert::Infallible;
use std::net::{SocketAddr, TcpListener};
use std::net::SocketAddr;
use hyper::{Body, Request, Response};
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use tokio::task::JoinHandle;
use zerotier_core::InetAddress;
use crate::service::Service;
#[cfg(target_os = "linux")]
@ -36,13 +34,14 @@ async fn web_handler(service: Service, req: Request<Body>) -> Result<Response<Bo
/// Dropping a listener initiates shutdown of the background hyper Server instance,
/// but it might not shut down instantly as this occurs asynchronously.
pub(crate) struct WebListener {
pub address: SocketAddr,
shutdown_tx: RefCell<Option<tokio::sync::oneshot::Sender<()>>>,
server: JoinHandle<hyper::Result<()>>,
}
impl WebListener {
/// Create a new "background" TCP WebListener using the current tokio reactor async runtime.
pub async fn new(_device_name: &str, addr: SocketAddr, service: &Service) -> Result<WebListener, Box<dyn std::error::Error>> {
pub async fn new(_device_name: &str, address: SocketAddr, service: &Service) -> Result<WebListener, Box<dyn std::error::Error>> {
let listener = if addr.is_ipv4() {
let listener = socket2::Socket::new(socket2::Domain::ipv4(), socket2::Type::stream(), Some(socket2::Protocol::tcp()));
if listener.is_err() {
@ -78,7 +77,7 @@ impl WebListener {
}
}
let addr = socket2::SockAddr::from(addr);
let addr = socket2::SockAddr::from(address);
if let Err(e) = listener.bind(&addr) {
return Err(Box::new(e));
}
@ -108,6 +107,7 @@ impl WebListener {
})).with_graceful_shutdown(async { let _ = shutdown_rx.await; }));
Ok(WebListener {
address,
shutdown_tx: RefCell::new(Some(shutdown_tx)),
server,
})