Wire up logging, service, etc.

This commit is contained in:
Adam Ierymenko 2021-02-12 23:10:41 -05:00
parent f9649217fb
commit 8d6d457dd6
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
12 changed files with 505 additions and 368 deletions

View file

@ -10,6 +10,7 @@ struct prf_ra {
#include "../core/Constants.hpp"
#include "../core/Mutex.hpp"
#include "../core/Containers.hpp"
#include "../core/SHA512.hpp"
#include "OSUtils.hpp"
#include "rust-osdep.h"
@ -116,4 +117,13 @@ int64_t msSinceEpoch()
void lockDownFile(const char *path, int isDir)
{ ZeroTier::OSUtils::lockDownFile(path, isDir != 0); }
void getSecureRandom(void *buf, unsigned int len)
{ ZeroTier::Utils::getSecureRandom(buf, len); }
void sha384(const void *in, unsigned int len, void *out)
{ ZeroTier::SHA384(out, in, len); }
void sha512(const void *in, unsigned int len, void *out)
{ ZeroTier::SHA512(out, in, len); }
}

View file

@ -58,6 +58,9 @@ extern "C" {
extern const char *platformDefaultHomePath();
extern int64_t msSinceEpoch();
extern void lockDownFile(const char *path, int isDir);
extern void getSecureRandom(void *buf, unsigned int len);
extern void sha384(const void *in, unsigned int len, void *out);
extern void sha512(const void *in, unsigned int len, void *out);
#ifdef __cplusplus
}

View file

@ -165,6 +165,23 @@ pub enum ResultCode {
ErrorInternalNonFatal = ztcore::ZT_ResultCode_ZT_RESULT_ERROR_INTERNAL as isize,
}
impl ToString for ResultCode {
fn to_string(&self) -> String {
match *self {
ResultCode::Ok => "Ok",
ResultCode::FatalErrorOutOfMemory => "FatalErrorOutOfMemory",
ResultCode::FatalErrorDataStoreFailed => "FatalErrorDataStoreFailed",
ResultCode::FatalErrorInternal => "FatalErrorInternal",
ResultCode::ErrorNetworkNotFound => "ErrorNetworkNotFound",
ResultCode::ErrorUnsupportedOperation => "ErrorUnsupportedOperation",
ResultCode::ErrorBadParameter => "ErrorBadParameter",
ResultCode::ErrorInvalidCredential => "ErrorInvalidCredential",
ResultCode::ErrorCollidingObject => "ErrorCollidingObject",
ResultCode::ErrorInternalNonFatal => "ErrorInternalNonFatal",
}.to_string()
}
}
/// Returns a tuple of major, minor, revision, and build version numbers from the ZeroTier core.
pub fn version() -> (i32, i32, i32, i32) {
let mut major: c_int = 0;

View file

@ -11,15 +11,12 @@
*/
/****/
use std::cell::Cell;
use std::collections::hash_map::HashMap;
use std::intrinsics::copy_nonoverlapping;
use std::mem::{MaybeUninit, transmute};
use std::os::raw::{c_int, c_uint, c_ulong, c_void};
use std::ptr::{null_mut, slice_from_raw_parts};
use std::sync::*;
use std::sync::atomic::*;
use std::time::Duration;
use num_traits::FromPrimitive;
use serde::{Deserialize, Serialize};

View file

@ -11,7 +11,6 @@
*/
/****/
pub(crate) mod service;
pub(crate) mod status;
pub(crate) mod set;
pub(crate) mod peer;

View file

@ -1,284 +0,0 @@
/*
* Copyright (c)2013-2020 ZeroTier, Inc.
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file in the project's root directory.
*
* Change Date: 2025-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2.0 of the Apache License.
*/
/****/
use std::cell::Cell;
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use futures::stream::{self, StreamExt};
use warp::{Filter, Rejection, Reply};
use warp::http::{HeaderMap, Method, StatusCode};
use warp::hyper::body::Bytes;
use zerotier_core::{Buffer, Address, IpScope, Node, NodeEventHandler, NetworkId, VirtualNetworkConfigOperation, VirtualNetworkConfig, StateObjectType, MAC, Event, InetAddress, InetAddressFamily, Identity};
use crate::fastudpsocket::*;
use crate::getifaddrs;
use crate::localconfig::*;
use crate::log::Log;
use crate::network::Network;
use crate::store::Store;
// Check local addresses and bindings every (this) milliseconds.
const BINDING_CHECK_INTERVAL: i64 = 5000;
struct Service {
local_config: Mutex<Arc<LocalConfig>>,
run: Arc<AtomicBool>,
store: Arc<Store>,
}
impl Clone for Service {
fn clone(&self) -> Self {
Service {
local_config: Mutex::new(self.local_config.lock().unwrap().clone()),
run: self.run.clone(),
store: self.store.clone(),
}
}
}
impl NodeEventHandler<Network> for Service {
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc<Network>, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {
}
#[inline(always)]
fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc<Network>, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) {
}
fn event(&self, event: Event, event_data: &[u8]) {
match event {
Event::Up => {},
Event::Down => {},
Event::Online => {},
Event::Offline => {},
Event::Trace => {},
Event::UserMessage => {},
}
}
#[inline(always)]
fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> {
self.store.store_object(&obj_type, obj_id, obj_data)
}
#[inline(always)]
fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result<Vec<u8>> {
self.store.load_object(&obj_type, obj_id)
}
#[inline(always)]
fn wire_packet_send(&self, local_socket: i64, sock_addr: &InetAddress, data: &[u8], packet_ttl: u32) -> i32 {
0
}
fn path_check(&self, address: Address, id: &Identity, local_socket: i64, sock_addr: &InetAddress) -> bool {
true
}
fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option<InetAddress> {
let lc = self.get_local_config();
let vc = lc.virtual_.get(&address);
vc.map_or(None, |c: &LocalConfigVirtualConfig| {
if c.try_.is_empty() {
None
} else {
let t = c.try_.get((zerotier_core::random() as usize) % c.try_.len());
t.map_or(None, |v: &InetAddress| {
Some(v.clone())
})
}
})
}
}
impl Service {
#[inline(always)]
fn web_api_status(&self, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(warp::http::StatusCode::BAD_REQUEST)
}
#[inline(always)]
fn web_api_network(&self, network_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(warp::http::StatusCode::BAD_REQUEST)
}
#[inline(always)]
fn web_api_peer(&self, peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(warp::http::StatusCode::BAD_REQUEST)
}
#[inline(always)]
fn get_local_config(&self) -> Arc<LocalConfig> {
self.local_config.lock().unwrap().clone()
}
#[inline(always)]
fn set_local_config(&self, new_lc: &Arc<LocalConfig>) {
let mut lc = self.local_config.lock().unwrap();
*lc = new_lc.clone();
}
}
pub(crate) fn run(store: &Arc<Store>) -> i32 {
let mut process_exit_value: i32 = 0;
let tokio_rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
tokio_rt.block_on(async {
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket> = BTreeMap::new();
let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<u8>(2);
let service = Service {
local_config: Mutex::new(Arc::new(LocalConfig::default())),
run: Arc::new(AtomicBool::new(true)),
store: store.clone(),
};
let node = Node::new(service.clone());
if node.is_err() {
// TODO: log and handle error
return;
}
let node = Arc::new(node.ok().unwrap());
let mut primary_port_bind_failure = false;
let mut last_checked_bindings: i64 = 0;
let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL;
loop {
let mut current_local_config = service.get_local_config();
let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel();
let warp_server;
{
let s0 = service.clone();
let s1 = service.clone();
let s2 = service.clone();
warp_server = warp::serve(warp::any().and(warp::path::end().map(|| { warp::reply::with_status("404", warp::hyper::StatusCode::NOT_FOUND) })
.or(warp::path("status").and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |method: Method, headers: HeaderMap, post_data: Bytes| { s0.web_api_status(method, headers, post_data) }))
.or(warp::path!("network" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |network_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { s1.web_api_network(network_str, method, headers, post_data) }))
.or(warp::path!("peer" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes| { s2.web_api_peer(peer_str, method, headers, post_data) }))
)).try_bind_with_graceful_shutdown((IpAddr::from([127_u8, 0_u8, 0_u8, 1_u8]), current_local_config.settings.primary_port), async { let _ = shutdown_rx.await; });
}
if warp_server.is_err() {
primary_port_bind_failure = true;
break;
}
let warp_server = tokio_rt.spawn(warp_server.unwrap().1);
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(loop_delay)) => {},
_ = interrupt_rx.next() => {},
_ = tokio::signal::ctrl_c() => {
// TODO: log CTRL+C received
service.run.store(false, Ordering::Relaxed);
let _ = shutdown_tx.send(());
break;
}
}
loop_delay = node.process_background_tasks();
let now = zerotier_core::now();
if (now - last_checked_bindings) >= BINDING_CHECK_INTERVAL {
last_checked_bindings = now;
let mut system_addrs: BTreeMap<InetAddress, String> = BTreeMap::new();
getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| {
match addr.ip_scope() {
IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => {
if !current_local_config.settings.is_interface_blacklisted(dev) {
let mut a = addr.clone();
a.set_port(current_local_config.settings.primary_port);
system_addrs.insert(a, String::from(dev));
if current_local_config.settings.secondary_port.is_some() {
let mut a = addr.clone();
a.set_port(current_local_config.settings.secondary_port.unwrap());
system_addrs.insert(a, String::from(dev));
}
}
},
_ => {}
}
});
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) {
udp_sockets_to_close.push(sock.0.clone());
}
}
for k in udp_sockets_to_close.iter() {
udp_sockets.remove(k);
}
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let s = FastUDPSocket::new(addr.1.as_str(), addr.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
// TODO: incoming packet handler
});
if s.is_ok() {
udp_sockets.insert(addr.0.clone(), s.unwrap());
}
}
}
primary_port_bind_failure = true;
for s in udp_sockets.iter() {
if s.0.port() == current_local_config.settings.primary_port {
primary_port_bind_failure = false;
break;
}
}
if primary_port_bind_failure {
break;
}
}
let next_local_config = service.get_local_config();
if !service.run.load(Ordering::Relaxed) || current_local_config.settings.primary_port != next_local_config.settings.primary_port {
let _ = shutdown_tx.send(());
break;
}
current_local_config = next_local_config;
}
let _ = warp_server.await;
if !service.run.load(Ordering::Relaxed) {
break;
}
tokio::time::sleep(Duration::from_millis(250)).await;
if !service.run.load(Ordering::Relaxed) {
break;
}
if primary_port_bind_failure {
let local_config = service.get_local_config();
if local_config.settings.auto_port_search {
// TODO: port hunting if enabled
}
}
}
});
process_exit_value
}

View file

@ -33,6 +33,21 @@ pub(crate) type FastUDPRawOsSocket = winsock2::SOCKET;
#[cfg(unix)]
pub(crate) type FastUDPRawOsSocket = c_int;
/// Test bind UDP to a port at 0.0.0.0 and ::0, returning whether IPv4 and/or IPv6 succeeded (respectively).
pub(crate) fn test_bind_udp(port: u16) -> (bool, bool) {
let v4 = InetAddress::new_ipv4_any(port);
let v6 = InetAddress::new_ipv6_any(port);
let v4b = bind_udp_socket("", &v4);
if v4b.is_ok() {
fast_udp_socket_close(v4b.as_ref().unwrap());
}
let v6b = bind_udp_socket("", &v6);
if v6b.is_ok() {
fast_udp_socket_close(v6b.as_ref().unwrap());
}
(v4b.is_ok(), v6b.is_ok())
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// bind_udp_socket() implementations for each platform
@ -122,21 +137,6 @@ fn bind_udp_socket(_: &str, address: &InetAddress) -> Result<FastUDPRawOsSocket,
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// Test bind UDP to a port at 0.0.0.0 and ::0, returning whether IPv4 and/or IPv6 succeeded (respectively).
pub(crate) fn test_bind_udp(port: u16) -> (bool, bool) {
let v4 = InetAddress::new_ipv4_any(port);
let v6 = InetAddress::new_ipv6_any(port);
let v4b = bind_udp_socket("", &v4);
if v4b.is_ok() {
fast_udp_socket_close(v4b.as_ref().unwrap());
}
let v6b = bind_udp_socket("", &v6);
if v6b.is_ok() {
fast_udp_socket_close(v6b.as_ref().unwrap());
}
(v4b.is_ok(), v6b.is_ok())
}
/// A multi-threaded (or otherwise fast) UDP socket that binds to both IPv4 and IPv6 addresses.
pub(crate) struct FastUDPSocket {
threads: Vec<std::thread::JoinHandle<()>>,

View file

@ -160,7 +160,7 @@ impl Default for LocalConfigNetworkSettings {
impl LocalConfigSettings {
#[cfg(target_os = "macos")]
const DEFAULT_PREFIX_BLACKLIST: [&'static str; 7] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth"];
const DEFAULT_PREFIX_BLACKLIST: [&'static str; 8] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth", "zt"];
#[cfg(target_os = "linux")]
const DEFAULT_PREFIX_BLACKLIST: [&'static str; 5] = ["lo", "tun", "tap", "ipsec", "zt"];

View file

@ -15,24 +15,20 @@ use std::cell::Cell;
use std::fmt::Display;
use std::fs::{File, OpenOptions};
use std::io::{Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicUsize, Ordering, AtomicBool};
use std::sync::Mutex;
use chrono::Datelike;
use zerotier_core::PortableAtomicI64;
struct LogIntl {
file: Option<File>,
cur_size: u64,
max_size: usize,
enabled: bool
}
pub(crate) struct Log {
prefix: String,
path: String,
intl: Mutex<LogIntl>
intl: Mutex<LogIntl>,
}
impl Log {
@ -50,7 +46,6 @@ impl Log {
file: None,
cur_size: 0,
max_size: if max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { max_size },
enabled: true,
}),
}
}
@ -59,55 +54,60 @@ impl Log {
self.intl.lock().unwrap().max_size = if new_max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { new_max_size };
}
pub fn set_enabled(&self, enabled: bool) {
self.intl.lock().unwrap().enabled = enabled;
}
pub fn log<S: AsRef<str>>(&self, s: S) {
let mut l = self.intl.lock().unwrap();
if l.enabled {
if l.file.is_none() {
let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str());
if f.is_err() {
return;
}
let mut f = f.unwrap();
let eof = f.seek(SeekFrom::End(0));
if eof.is_err() {
return;
}
l.cur_size = eof.unwrap();
l.file = Some(f);
}
if l.max_size > 0 && l.cur_size > l.max_size as u64 {
l.file = None;
l.cur_size = 0;
let mut old_path = self.path.clone();
old_path.push_str(".old");
let _ = std::fs::remove_file(old_path.as_str());
let _ = std::fs::rename(self.path.as_str(), old_path.as_str());
let _ = std::fs::remove_file(self.path.as_str()); // should fail
let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str());
if f.is_err() {
return;
}
l.file = Some(f.unwrap());
}
let f = l.file.as_mut().unwrap();
let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
let ss: &str = s.as_ref();
let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), ss);
let _ = f.write_all(log_line.as_bytes());
let _ = f.flush();
l.cur_size += log_line.len() as u64;
let ss: &str = s.as_ref();
if ss.starts_with("FATAL") {
eprintln!("{}", ss);
}
let mut l = self.intl.lock().unwrap();
if l.file.is_none() {
let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str());
if f.is_err() {
return;
}
let mut f = f.unwrap();
let eof = f.seek(SeekFrom::End(0));
if eof.is_err() {
return;
}
l.cur_size = eof.unwrap();
l.file = Some(f);
}
if l.max_size > 0 && l.cur_size > l.max_size as u64 {
l.file = None;
l.cur_size = 0;
let mut old_path = self.path.clone();
old_path.push_str(".old");
let _ = std::fs::remove_file(old_path.as_str());
let _ = std::fs::rename(self.path.as_str(), old_path.as_str());
let _ = std::fs::remove_file(self.path.as_str()); // should fail
let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str());
if f.is_err() {
return;
}
l.file = Some(f.unwrap());
}
let f = l.file.as_mut().unwrap();
let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), ss);
let _ = f.write_all(log_line.as_bytes());
let _ = f.flush();
l.cur_size += log_line.len() as u64;
}
}
#[macro_export]
macro_rules! l(
($logger:ident, $($arg:tt)*) => {
$logger.log(format!($($arg)*))
}
);
unsafe impl Sync for Log {}
/*

View file

@ -17,10 +17,11 @@ mod commands;
mod fastudpsocket;
mod localconfig;
mod getifaddrs;
mod log;
#[macro_use] mod log;
mod store;
mod network;
mod vnic;
mod service;
#[allow(non_snake_case,non_upper_case_globals,non_camel_case_types,dead_code,improper_ctypes)]
mod osdep; // bindgen generated
@ -28,18 +29,19 @@ mod osdep; // bindgen generated
use std::boxed::Box;
use std::ffi::CStr;
use std::path::Path;
use std::sync::Arc;
use crate::store::Store;
use std::sync::Arc;
fn main() {
let mut process_exit_value: i32 = 0;
let mut cli_args = Some(Box::new(cli::parse_cli_args()));
let mut zerotier_path = unsafe { zerotier_core::cstr_to_string(osdep::platformDefaultHomePath(), 256) };
let json_output;
let mut token: Option<String> = None;
let mut token_path = Path::new(&zerotier_path).join("authtoken.secret");
let mut zerotier_path = unsafe { zerotier_core::cstr_to_string(osdep::platformDefaultHomePath(), -1) };
let json_output: bool;
let mut auth_token: Option<String> = None;
let mut auth_token_path: Option<String> = None;
{
let a = cli_args.as_ref().unwrap();
json_output = a.is_present("json");
@ -49,21 +51,35 @@ fn main() {
}
let v = a.value_of("token");
if v.is_some() {
token = Some(String::from(v.unwrap().trim()));
auth_token = Some(v.unwrap().trim().to_string());
}
let v = a.value_of("token_path");
if v.is_some() {
token_path = Path::new(v.unwrap().trim()).to_path_buf();
auth_token_path = Some(v.unwrap().to_string());
}
}
let store = Store::new(zerotier_path.as_str());
if store.is_err() {
println!("FATAL: error accessing directory '{}': {}", zerotier_path, store.err().unwrap().to_string());
eprintln!("FATAL: error accessing directory '{}': {}", zerotier_path, store.err().unwrap().to_string());
std::process::exit(1);
}
let store = Arc::new(store.unwrap());
if auth_token.is_none() {
let t;
if auth_token_path.is_some() {
t = store.read_file_str(auth_token_path.unwrap().trim());
} else {
t = store.read_authtoken_secret();
}
if t.is_ok() {
auth_token = Some(t.unwrap().trim().to_string());
}
} else {
auth_token = Some(auth_token.unwrap().trim().to_string());
}
match cli_args.as_ref().unwrap().subcommand_name().unwrap() {
"version" => {
let ver = zerotier_core::version();
@ -71,7 +87,7 @@ fn main() {
},
"service" => {
cli_args = None; // free any memory we can when launching service
process_exit_value = commands::service::run(&store);
process_exit_value = service::run(&store, auth_token);
},
_ => cli::print_help(), // includes "help"
}

View file

@ -0,0 +1,363 @@
/*
* Copyright (c)2013-2020 ZeroTier, Inc.
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file in the project's root directory.
*
* Change Date: 2025-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2.0 of the Apache License.
*/
/****/
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::str::FromStr;
use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use futures::stream::StreamExt;
use warp::{Filter, Reply};
use warp::http::{HeaderMap, Method, StatusCode};
use warp::hyper::body::Bytes;
use zerotier_core::{Buffer, Address, IpScope, Node, NodeEventHandler, NetworkId, VirtualNetworkConfigOperation, VirtualNetworkConfig, StateObjectType, MAC, Event, InetAddress, InetAddressFamily, Identity};
use crate::fastudpsocket::*;
use crate::getifaddrs;
use crate::localconfig::*;
use crate::log::Log;
use crate::network::Network;
use crate::store::Store;
const CONFIG_CHECK_INTERVAL: i64 = 5000;
#[derive(Clone)]
struct Service {
auth_token: Arc<String>,
log: Arc<Log>,
_local_config: Arc<Mutex<Arc<LocalConfig>>>,
run: Arc<AtomicBool>,
store: Arc<Store>,
node: Weak<Node<Service, Network>>, // weak since Node can hold a reference to this
}
impl NodeEventHandler<Network> for Service {
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc<Network>, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {}
#[inline(always)]
fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc<Network>, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) {}
fn event(&self, event: Event, event_data: &[u8]) {
match event {
Event::Up => {}
Event::Down => {}
Event::Online => {}
Event::Offline => {}
Event::Trace => {}
Event::UserMessage => {}
}
}
#[inline(always)]
fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> {
self.store.store_object(&obj_type, obj_id, obj_data)
}
#[inline(always)]
fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result<Vec<u8>> {
self.store.load_object(&obj_type, obj_id)
}
#[inline(always)]
fn wire_packet_send(&self, local_socket: i64, sock_addr: &InetAddress, data: &[u8], packet_ttl: u32) -> i32 {
0
}
fn path_check(&self, address: Address, id: &Identity, local_socket: i64, sock_addr: &InetAddress) -> bool {
true
}
fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option<InetAddress> {
let lc = self.local_config();
let vc = lc.virtual_.get(&address);
vc.map_or(None, |c: &LocalConfigVirtualConfig| {
if c.try_.is_empty() {
None
} else {
let t = c.try_.get((zerotier_core::random() as usize) % c.try_.len());
t.map_or(None, |v: &InetAddress| {
Some(v.clone())
})
}
})
}
}
impl Service {
#[inline(always)]
fn web_api_status(&self, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(StatusCode::BAD_REQUEST)
}
#[inline(always)]
fn web_api_network(&self, network_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(StatusCode::BAD_REQUEST)
}
#[inline(always)]
fn web_api_peer(&self, peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes) -> Box<dyn Reply> {
Box::new(StatusCode::BAD_REQUEST)
}
#[inline(always)]
fn local_config(&self) -> Arc<LocalConfig> {
self._local_config.lock().unwrap().clone()
}
#[inline(always)]
fn set_local_config(&self, new_lc: LocalConfig) {
*(self._local_config.lock().unwrap()) = Arc::new(new_lc);
}
}
pub(crate) fn run(store: &Arc<Store>, auth_token: Option<String>) -> i32 {
let mut process_exit_value: i32 = 0;
let init_local_config = Arc::new(store.read_local_conf(false).unwrap_or(LocalConfig::default()));
// Open log in store.
let log = Arc::new(Log::new(
if init_local_config.settings.log_path.as_ref().is_some() { init_local_config.settings.log_path.as_ref().unwrap().as_str() } else { store.default_log_path.to_str().unwrap() },
init_local_config.settings.log_size_max,
"",
));
// Generate authtoken.secret from secure random bytes if not already set.
let auth_token = auth_token.unwrap_or_else(|| {
let mut rb = [0_u8; 64];
unsafe {
crate::osdep::getSecureRandom(rb.as_mut_ptr().cast(), 64);
}
let mut t = String::new();
t.reserve(64);
for b in rb.iter() {
if *b > 127_u8 {
t.push((65 + (*b % 26)) as char); // A..Z
} else {
t.push((97 + (*b % 26)) as char); // a..z
}
}
if store.write_authtoken_secret(t.as_str()).is_err() {
t.clear();
}
t
});
if auth_token.is_empty() {
l!(log, "FATAL: unable to write authtoken.secret to '{}'", store.base_path.to_str().unwrap());
return 1;
}
let auth_token = Arc::new(auth_token);
let tokio_rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
tokio_rt.block_on(async {
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket> = BTreeMap::new();
let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1);
// Create clonable implementation of NodeEventHandler and local web API endpoints.
let mut service = Service {
auth_token: auth_token.clone(),
log: log.clone(),
_local_config: Arc::new(Mutex::new(init_local_config)),
run: Arc::new(AtomicBool::new(true)),
store: store.clone(),
node: Weak::new(),
};
// Create instance of Node which will call Service on events.
let node = Node::new(service.clone());
if node.is_err() {
process_exit_value = 1;
l!(log, "FATAL: error initializing node: {}", node.err().unwrap().to_string());
return;
}
let node = Arc::new(node.ok().unwrap());
service.node = Arc::downgrade(&node);
let service = service; // make immutable after setting node
let mut last_checked_config: i64 = 0;
let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL;
loop {
let mut local_config = service.local_config();
let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel();
let warp_server;
{
let s0 = service.clone();
let s1 = service.clone();
let s2 = service.clone();
warp_server = warp::serve(
warp::any().and(warp::path::end().map(|| {
warp::reply::with_status("404", StatusCode::NOT_FOUND)
})
.or(warp::path("status").and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |method: Method, headers: HeaderMap, post_data: Bytes| {
s0.web_api_status(method, headers, post_data)
}))
.or(warp::path!("network" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |network_str: String, method: Method, headers: HeaderMap, post_data: Bytes| {
s1.web_api_network(network_str, method, headers, post_data)
}))
.or(warp::path!("peer" / String).and(warp::method()).and(warp::header::headers_cloned()).and(warp::body::bytes())
.map(move |peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes| {
s2.web_api_peer(peer_str, method, headers, post_data)
}))
)).try_bind_with_graceful_shutdown(
(IpAddr::from([127_u8, 0_u8, 0_u8, 1_u8]), local_config.settings.primary_port),
async { let _ = shutdown_rx.await; },
);
}
if warp_server.is_err() {
l!(log, "ERROR: local API http server failed to bind to port {}: {}", local_config.settings.primary_port, warp_server.err().unwrap().to_string());
break;
}
let warp_server = tokio_rt.spawn(warp_server.unwrap().1);
loop {
// Wait for (1) loop delay elapsed, (2) a signal to interrupt delay now, or
// (3) an external signal to exit.
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(loop_delay)) => {},
_ = interrupt_rx.next() => {},
_ = tokio::signal::ctrl_c() => {
l!(log, "exit signal received, shutting down...");
service.run.store(false, Ordering::Relaxed);
break;
}
}
// Check every CONFIG_CHECK_INTERVAL for changes to either the system configuration
// or the node's local configuration and take actions as needed.
let now = zerotier_core::now();
if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL {
last_checked_config = now;
// Check for changes to local.conf.
let new_config = store.read_local_conf(true);
if new_config.is_ok() {
service.set_local_config(new_config.unwrap());
}
// Check for configuration changes that require a reboot of the inner loop
// or other actions to be taken.
let next_local_config = service.local_config();
if local_config.settings.primary_port != next_local_config.settings.primary_port {
break;
}
local_config = next_local_config;
// Enumerate all useful addresses bound to interfaces on the system.
let mut system_addrs: BTreeMap<InetAddress, String> = BTreeMap::new();
getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| {
match addr.ip_scope() {
IpScope::Global | IpScope::Private | IpScope::PseudoPrivate | IpScope::Shared => {
if !local_config.settings.is_interface_blacklisted(dev) {
let mut a = addr.clone();
a.set_port(local_config.settings.primary_port);
system_addrs.insert(a, String::from(dev));
if local_config.settings.secondary_port.is_some() {
let mut a = addr.clone();
a.set_port(local_config.settings.secondary_port.unwrap());
system_addrs.insert(a, String::from(dev));
}
}
}
_ => {}
}
});
// Drop bound sockets that are no longer valid or are now blacklisted.
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) {
udp_sockets_to_close.push(sock.0.clone());
}
}
for k in udp_sockets_to_close.iter() {
udp_sockets.remove(k);
}
// Create sockets for unbound addresses.
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let s = FastUDPSocket::new(addr.1.as_str(), addr.0, |raw_socket: &FastUDPRawOsSocket, from_address: &InetAddress, data: Buffer| {
// TODO: incoming packet handler
});
if s.is_ok() {
udp_sockets.insert(addr.0.clone(), s.unwrap());
}
}
}
// Determine if primary and secondary port (if secondary enabled) failed to
// bind to any interface.
let mut primary_port_bind_failure = true;
let mut secondary_port_bind_failure = local_config.settings.secondary_port.is_some();
for s in udp_sockets.iter() {
if s.0.port() == local_config.settings.primary_port {
primary_port_bind_failure = false;
if !secondary_port_bind_failure {
break;
}
}
if s.0.port() == local_config.settings.secondary_port.unwrap() {
secondary_port_bind_failure = false;
if !primary_port_bind_failure {
break;
}
}
}
if primary_port_bind_failure {
if local_config.settings.auto_port_search {
// TODO: port hunting if enabled
} else {
l!(log, "primary port {} failed to bind, waiting and trying again...", local_config.settings.primary_port);
break;
}
}
if secondary_port_bind_failure {
l!(log, "secondary port {} failed to bind (non-fatal, will try again)", local_config.settings.secondary_port.unwrap_or(0));
// hunt for a secondary port.
}
}
// Check to make sure nothing outside this code turned off the run flag.
if !service.run.load(Ordering::Relaxed) {
break;
}
// Run background task handler in ZeroTier core.
loop_delay = node.process_background_tasks();
}
// Gracefully shut down the local web server.
let _ = shutdown_tx.send(());
let _ = warp_server.await;
// Sleep for a brief period of time to prevent thrashing if some invalid
// state is hit that causes the inner loop to keep breaking.
if !service.run.load(Ordering::Relaxed) {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
if !service.run.load(Ordering::Relaxed) {
break;
}
}
});
process_exit_value
}

View file

@ -14,6 +14,7 @@
use std::error::Error;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::ffi::CString;
use zerotier_core::{StateObjectType, NetworkId};
@ -23,6 +24,7 @@ use crate::localconfig::LocalConfig;
pub(crate) struct Store {
pub base_path: Box<Path>,
pub default_log_path: Box<Path>,
prev_local_config: Mutex<String>,
peers_path: Box<Path>,
controller_path: Box<Path>,
networks_path: Box<Path>,
@ -53,6 +55,7 @@ impl Store {
let s = Store {
base_path: bp.to_path_buf().into_boxed_path(),
default_log_path: bp.join("service.log").into_boxed_path(),
prev_local_config: Mutex::new(String::new()),
peers_path: bp.join("peers.d").into_boxed_path(),
controller_path: bp.join("controller.d").into_boxed_path(),
networks_path: bp.join("networks.d").into_boxed_path(),
@ -161,13 +164,24 @@ impl Store {
}
/// Write a file to the base ZeroTier home directory.
/// Error code std::io::ErrorKind::Other is returned if skip_if_unchanged is true
/// and there has been no change from the last read.
pub fn write_file(&self, fname: &str, data: &[u8]) -> std::io::Result<()> {
std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(self.base_path.join(fname))?.write_all(data)
}
/// Reads local.conf and deserializes into a LocalConfig object.
pub fn read_local_conf(&self) -> std::io::Result<LocalConfig> {
pub fn read_local_conf(&self, skip_if_unchanged: bool) -> std::io::Result<LocalConfig> {
let data = self.read_file_str("local.conf")?;
if skip_if_unchanged {
let mut prev = self.prev_local_config.lock().unwrap();
if prev.eq(&data) {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "unchangd"));
}
*prev = data.clone();
} else {
*(self.prev_local_config.lock().unwrap()) = data.clone();
}
let lc = LocalConfig::new_from_json(data.as_str());
if lc.is_err() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, lc.err().unwrap()));
@ -182,9 +196,9 @@ impl Store {
}
/// Reads the authtoken.secret file in the home directory.
#[inline(always)]
pub fn read_authtoken_secret(&self) -> std::io::Result<String> {
let data = self.read_file_str("authtoken.secret")?;
Ok(data.trim().to_string())
Ok(self.read_file_str("authtoken.secret")?)
}
/// Write authtoken.secret and lock down file permissions.
@ -219,12 +233,14 @@ impl Store {
if obj_path.is_some() {
let obj_path = obj_path.unwrap();
std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(&obj_path)?.write_all(obj_data)?;
if obj_type.eq(&StateObjectType::IdentitySecret) || obj_type.eq(&StateObjectType::TrustStore) {
lock_down_file(obj_path.to_str().unwrap());
}
Ok(())
} else {
Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "object ID not valid"))
Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "object type or ID not valid"))
}
}
}