A ton of work in progress on service: logging, physical link enumeration, data store, local config, etc.

This commit is contained in:
Adam Ierymenko 2021-01-21 21:10:11 -05:00
parent 896d75fe86
commit deec3b807b
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
18 changed files with 676 additions and 79 deletions

View file

@ -834,6 +834,18 @@ enum ZT_InetAddress_IpScope ZT_InetAddress_ipScope(const ZT_InetAddress *ia)
return ZT_IP_SCOPE_NONE;
}
int ZT_InetAddress_lessThan(const ZT_InetAddress *a, const ZT_InetAddress *b)
{
if ((a)&&(b)) {
return (int)(*reinterpret_cast<const ZeroTier::InetAddress *>(a) < *reinterpret_cast<const ZeroTier::InetAddress *>(b));
} else if (a) {
return 0;
} else if (b) {
return 1;
}
return 0;
}
/********************************************************************************************************************/
uint64_t ZT_random()

View file

@ -3018,6 +3018,11 @@ ZT_SDK_API int ZT_InetAddress_isV6(const ZT_InetAddress *ia);
*/
ZT_SDK_API enum ZT_InetAddress_IpScope ZT_InetAddress_ipScope(const ZT_InetAddress *ia);
/**
* Compare a and b, return non-zero if a < b
*/
ZT_SDK_API int ZT_InetAddress_lessThan(const ZT_InetAddress *a, const ZT_InetAddress *b);
/* These mirror the values of AF_INET and AF_INET6 for use by Rust and other things that need it. */
ZT_SDK_API const int ZT_AF_INET,ZT_AF_INET6;

View file

@ -11,6 +11,8 @@
*/
/****/
use std::cmp::Ordering;
pub struct Address(pub u64);
impl ToString for Address {
@ -41,6 +43,27 @@ impl PartialEq for Address {
impl Eq for Address {}
impl Ord for Address {
#[inline(always)]
fn cmp(&self, other: &Self) -> Ordering {
self.0.cmp(&other.0)
}
}
impl PartialOrd for Address {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.0.cmp(&other.0))
}
}
impl Clone for Address {
#[inline(always)]
fn clone(&self) -> Self {
Address(self.0)
}
}
impl serde::Serialize for Address {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer {
serializer.serialize_str(self.to_string().as_str())

View file

@ -20,6 +20,7 @@ use num_traits::FromPrimitive;
use crate::*;
use crate::bindings::capi as ztcore;
use std::os::raw::{c_void, c_uint};
use std::cmp::Ordering;
// WARNING: here be dragons! This defines an opaque blob in Rust that shadows
// and is of the exact size as an opaque blob in C that shadows and is the
@ -148,6 +149,18 @@ impl InetAddress {
}
}
pub fn port(&self) -> u16 {
unsafe {
ztcore::ZT_InetAddress_port(self.as_capi_ptr()) as u16
}
}
pub fn set_port(&mut self, port: u16) {
unsafe {
ztcore::ZT_InetAddress_setPort(self.as_capi_mut_ptr(), port as c_uint);
}
}
/// Get the network scope of the IP in this object.
pub fn ip_scope(&self) -> IpScope {
unsafe {
@ -199,6 +212,26 @@ impl Clone for InetAddress {
}
}
impl Ord for InetAddress {
fn cmp(&self, other: &Self) -> Ordering {
unsafe {
if ztcore::ZT_InetAddress_lessThan(self.as_capi_ptr(), other.as_capi_ptr()) != 0 {
return Ordering::Less;
} else if ztcore::ZT_InetAddress_lessThan(other.as_capi_ptr(), self.as_capi_ptr()) != 0 {
return Ordering::Greater;
}
return Ordering::Equal;
}
}
}
impl PartialOrd for InetAddress {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for InetAddress {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {

View file

@ -50,6 +50,8 @@ pub use buffer::Buffer;
pub use portableatomici64::PortableAtomicI64;
pub use virtualnetworkconfig::*;
pub const RECOMMENDED_THREAD_STACK_SIZE: usize = 262144;
pub const DEFAULT_PORT: u16 = ztcore::ZT_DEFAULT_PORT as u16;
pub const BUF_SIZE: u32 = ztcore::ZT_BUF_SIZE;
@ -192,7 +194,7 @@ pub unsafe fn cstr_to_string(cstr: *const c_char, max_len: isize) -> String {
}
/// Macro to implement to_json and new_from_json on types that are Serializable.
#[macro_export(crate)]
#[macro_export]
macro_rules! implement_to_from_json {
($struct_name:ident) => {
impl $struct_name {

View file

@ -11,6 +11,8 @@
*/
/****/
use std::cmp::Ordering;
pub struct NetworkId(pub u64);
impl NetworkId {
@ -41,6 +43,20 @@ impl From<&str> for NetworkId {
}
}
impl Ord for NetworkId {
#[inline(always)]
fn cmp(&self, other: &Self) -> Ordering {
self.0.cmp(&other.0)
}
}
impl PartialOrd for NetworkId {
#[inline(always)]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.0.cmp(&other.0))
}
}
impl PartialEq for NetworkId {
#[inline(always)]
fn eq(&self, other: &Self) -> bool {
@ -48,6 +64,13 @@ impl PartialEq for NetworkId {
}
}
impl Clone for NetworkId {
#[inline(always)]
fn clone(&self) -> Self {
NetworkId(self.0)
}
}
impl Eq for NetworkId {}
impl serde::Serialize for NetworkId {

View file

@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize};
use crate::*;
use crate::bindings::capi as ztcore;
const NODE_BACKGROUND_MAX_DELAY: i64 = 250;
const NODE_BACKGROUND_MAX_DELAY: i64 = 500;
#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq)]
pub enum Event {
@ -52,21 +52,6 @@ pub enum StateObjectType {
Certificate = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_CERT as isize
}
impl StateObjectType {
/// Get the canonical file extension for this object type.
pub fn to_file_ext(&self) -> &str {
match *self {
StateObjectType::IdentityPublic => "public",
StateObjectType::IdentitySecret => "secret",
StateObjectType::Locator => "locator",
StateObjectType::Peer => "peer",
StateObjectType::NetworkConfig => "network",
StateObjectType::TrustStore => "trust",
StateObjectType::Certificate => "cert"
}
}
}
/// The status of a ZeroTier node.
#[derive(Serialize, Deserialize)]
pub struct NodeStatus {
@ -96,7 +81,7 @@ pub trait NodeEventHandler {
fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]);
/// Called to retrieve an object from the object store.
fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Box<[u8]>;
fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Option<Box<[u8]>>;
/// Called to send a packet over the physical network (virtual -> physical).
fn wire_packet_send(&self, local_socket: i64, sock_addr: &InetAddress, data: &[u8], packet_ttl: u32) -> i32;
@ -246,14 +231,17 @@ extern "C" fn zt_state_get_function<T: NodeEventHandler + 'static>(
let n = node_from_raw_ptr!(uptr);
let obj_id2 = unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) };
let obj_data_result = n.event_handler.state_get(obj_type2, obj_id2);
if obj_data_result.len() > 0 {
unsafe {
let obj_data_len: c_int = obj_data_result.len() as c_int;
let obj_data_raw = ztcore::malloc(obj_data_len as c_ulong);
if !obj_data_raw.is_null() {
copy_nonoverlapping(obj_data_result.as_ptr(), obj_data_raw.cast::<u8>(), obj_data_len as usize);
*obj_data = obj_data_raw;
return obj_data_len;
if obj_data_result.is_some() {
let obj_data_result = obj_data_result.unwrap();
if obj_data_result.len() > 0 {
unsafe {
let obj_data_len: c_int = obj_data_result.len() as c_int;
let obj_data_raw = ztcore::malloc(obj_data_len as c_ulong);
if !obj_data_raw.is_null() {
copy_nonoverlapping(obj_data_result.as_ptr(), obj_data_raw.cast::<u8>(), obj_data_len as usize);
*obj_data = obj_data_raw;
return obj_data_len;
}
}
}
}
@ -369,7 +357,7 @@ impl<T: NodeEventHandler + 'static> Node<T> {
let wn = Arc::downgrade(&n);
let run = n.background_thread_run.clone();
n.background_thread.replace(Some(std::thread::spawn(move || {
n.background_thread.replace(Some(std::thread::Builder::new().stack_size(RECOMMENDED_THREAD_STACK_SIZE).spawn(move || {
let mut loop_delay = Duration::from_millis(NODE_BACKGROUND_MAX_DELAY as u64);
while run.load(Ordering::Relaxed) {
std::thread::park_timeout(loop_delay);
@ -384,12 +372,12 @@ impl<T: NodeEventHandler + 'static> Node<T> {
break;
}
}
})));
}).unwrap()));
Ok(n)
}
/// This is called periodically from the background service thread.
/// This is called periodically from the background service thread. It's not called from anywhere else.
fn process_background_tasks(&self) -> i64 {
let current_time = now();
self.now.set(current_time);

View file

@ -44,6 +44,14 @@ impl PortableAtomicI64 {
pub fn set(&self, v: i64) {
*self.i.lock().unwrap() = v;
}
#[inline(always)]
pub fn fetch_add(&self, v: i64) -> i64 {
let i = self.i.lock().unwrap();
let j = *i;
*i += v;
j
}
}
#[cfg(all(target_pointer_width = "64"))]
@ -69,4 +77,9 @@ impl PortableAtomicI64 {
pub fn set(&self, v: i64) {
self.i.store(v, Ordering::Relaxed)
}
#[inline(always)]
pub fn fetch_add(&self, v: i64) -> i64 {
self.i.fetch_add(v, Ordering::Relaxed)
}
}

View file

@ -1,5 +1,25 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [
"winapi",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.0.1"
@ -88,6 +108,34 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"winapi",
]
[[package]]
name = "clap"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "cpuid-bool"
version = "0.1.2"
@ -142,6 +190,7 @@ checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -164,12 +213,35 @@ version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65"
[[package]]
name = "futures-executor"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500"
[[package]]
name = "futures-macro"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.12"
@ -191,11 +263,17 @@ version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
@ -527,6 +605,16 @@ dependencies = [
"syn",
]
[[package]]
name = "num-integer"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.14"
@ -653,6 +741,18 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "1.0.24"
@ -906,6 +1006,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "syn"
version = "1.0.58"
@ -931,6 +1037,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "time"
version = "0.1.43"
@ -1130,6 +1245,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-width"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.2.1"
@ -1154,6 +1275,12 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.2"
@ -1249,6 +1376,10 @@ dependencies = [
name = "zerotier-service"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"futures",
"hex",
"libc",
"num_cpus",
"serde",

View file

@ -3,6 +3,7 @@ name = "zerotier-service"
version = "0.1.0"
authors = ["Adam Ierymenko <adam.ierymenko@zerotier.com>"]
edition = "2018"
build = "build.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -13,6 +14,10 @@ tokio = { version = "1", features = ["full"] }
warp = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
clap = { version = "2", features = ["default"] }
chrono = "0.4"
hex = "0.4"
[target."cfg(unix)".dependencies]
libc = "0.2.82"

View file

@ -0,0 +1,6 @@
fn main() {
let d = env!("CARGO_MANIFEST_DIR");
println!("cargo:rustc-link-search=native={}/../build/core", d);
println!("cargo:rustc-link-lib=static=zt_core");
println!("cargo:rustc-link-lib=c++");
}

View file

@ -1,2 +0,0 @@
pub mod fastudpsocket;
pub use fastudpsocket::*;

View file

@ -121,7 +121,7 @@ pub struct FastUDPSocket<H: FastUDPSocketPacketHandler + Send + Sync + 'static>
threads: Vec<std::thread::JoinHandle<()>>,
thread_run: Arc<AtomicBool>,
sockets: Vec<FastUDPRawOsSocket>,
bind_address: InetAddress,
pub bind_address: InetAddress,
}
#[cfg(unix)]
@ -154,12 +154,6 @@ fn fast_udp_socket_recvfrom(socket: &FastUDPRawOsSocket, buf: &mut Buffer, from_
}
}
// Integer incremented to select sockets on a mostly round robin basis. This
// isn't synchronized since if all cores don't see it the same there is no
// significant impact. It's just a faster way to pick a socket for sending
// than a random number generator.
static mut SOCKET_SPIN_INT: usize = 0;
impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
pub fn new(device_name: &str, address: &InetAddress, handler: &Arc<H>) -> Result<FastUDPSocket<H>, String> {
let thread_count = num_cpus::get();
@ -181,7 +175,7 @@ impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
let thread_run = s.thread_run.clone();
let handler_weak = Arc::downgrade(&s.handler);
s.threads.push(std::thread::spawn(move || {
s.threads.push(std::thread::Builder::new().stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).spawn(move || {
let mut from_address = InetAddress::new();
while thread_run.load(Ordering::Relaxed) {
let mut buf = Buffer::new();
@ -189,7 +183,7 @@ impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
if read_length > 0 {
let handler = handler_weak.upgrade();
if handler.is_some() {
unsafe { buf.set_len(read_length as u32); }
unsafe { buf.set_len(read_length as usize); }
handler.unwrap().incoming_udp_packet(&thread_socket, &from_address, buf);
} else {
break;
@ -198,7 +192,7 @@ impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
break;
}
}
}));
}).unwrap());
} else {
bind_failed_reason = thread_socket.err().unwrap();
}
@ -217,13 +211,13 @@ impl<H: FastUDPSocketPacketHandler + Send + Sync + 'static> FastUDPSocket<H> {
/// Sockets are thread safe.
#[inline(always)]
pub fn send(&self, to_address: &InetAddress, data: *const u8, len: usize, packet_ttl: i32) {
let mut i;
unsafe {
i = SOCKET_SPIN_INT;
SOCKET_SPIN_INT = i + 1;
}
i %= self.sockets.len();
fast_udp_socket_sendto(self.sockets.get(i).unwrap(), to_address, data, len, packet_ttl);
fast_udp_socket_sendto(self.sockets.get(0).unwrap(), to_address, data, len, packet_ttl);
}
/// Get a raw socket that can be used to send UDP packets.
#[inline(always)]
pub fn raw_socket(&self) -> FastUDPRawOsSocket {
*self.sockets.get(0).unwrap()
}
/// Get the number of threads this socket is currently running.

View file

@ -48,61 +48,71 @@ pub const UNASSIGNED_PRIVILEGED_PORTS: [u16; 299] = [
1023,
];
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct LocalConfigPhysicalPathConfig {
blacklist: bool
pub blacklist: bool
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct LocalConfigVirtualConfig {
#[serde(rename = "try")]
try_: Vec<InetAddress>
pub try_: Vec<InetAddress>
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct LocalConfigNetworkSettings {
#[serde(rename = "allowManagedIPs")]
allow_managed_ips: bool,
pub allow_managed_ips: bool,
#[serde(rename = "allowGlobalIPs")]
allow_global_ips: bool,
pub allow_global_ips: bool,
#[serde(rename = "allowManagedRoutes")]
allow_managed_routes: bool,
pub allow_managed_routes: bool,
#[serde(rename = "allowGlobalRoutes")]
allow_global_routes: bool,
pub allow_global_routes: bool,
#[serde(rename = "allowDefaultRouteOverride")]
allow_default_route_override: bool,
pub allow_default_route_override: bool,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct LocalConfigSettings {
#[serde(rename = "primaryPort")]
primary_port: u16,
pub primary_port: u16,
#[serde(rename = "secondaryPort")]
secondary_port: Option<u16>,
pub secondary_port: Option<u16>,
#[serde(rename = "autoPortSearch")]
auto_port_search: bool,
pub auto_port_search: bool,
#[serde(rename = "portMapping")]
port_mapping: bool,
pub port_mapping: bool,
#[serde(rename = "logSizeMax")]
log_size_max: usize,
pub log_size_max: usize,
#[serde(rename = "logVL1Events")]
pub log_vl1_events: bool,
#[serde(rename = "logVL2Events")]
pub log_vl2_events: bool,
#[serde(rename = "logFilterEvents")]
pub log_filter_events: bool,
#[serde(rename = "logMulticastEvents")]
pub log_multicast_events: bool,
#[serde(rename = "logDebug")]
pub log_debug: bool,
#[serde(rename = "interfacePrefixBlacklist")]
interface_prefix_blacklist: Vec<String>,
pub interface_prefix_blacklist: Vec<String>,
#[serde(rename = "explicitAddresses")]
explicit_addresses: Vec<InetAddress>,
pub explicit_addresses: Vec<InetAddress>,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct LocalConfig {
physical: BTreeMap<InetAddress, LocalConfigPhysicalPathConfig>,
pub physical: BTreeMap<InetAddress, LocalConfigPhysicalPathConfig>,
#[serde(rename = "virtual")]
virtual_: BTreeMap<Address, LocalConfigVirtualConfig>,
network: BTreeMap<NetworkId, LocalConfigNetworkSettings>,
settings: LocalConfigSettings,
pub virtual_: BTreeMap<Address, LocalConfigVirtualConfig>,
pub network: BTreeMap<NetworkId, LocalConfigNetworkSettings>,
pub settings: LocalConfigSettings,
}
impl Default for LocalConfigPhysicalPathConfig {
@ -135,10 +145,23 @@ impl Default for LocalConfigNetworkSettings {
impl LocalConfigSettings {
#[cfg(target_os = "macos")]
const DEFAULT_PREFIX_BLACKLIST: [&str; 7] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth"];
const DEFAULT_PREFIX_BLACKLIST: [&'static str; 7] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth"];
#[cfg(target_os = "linux")]
const DEFAULT_PREFIX_BLACKLIST: [&str; 5] = ["lo", "tun", "tap", "ipsec", "zt"];
const DEFAULT_PREFIX_BLACKLIST: [&'static str; 5] = ["lo", "tun", "tap", "ipsec", "zt"];
// This is not applicable on Windows as it doesn't use the same device name semantics.
#[cfg(windows)]
const DEFAULT_PREFIX_BLACKLIST: [&'static str; 0] = [];
pub fn is_interface_blacklisted(&self, ifname: &str) -> bool {
for p in self.interface_prefix_blacklist.iter() {
if ifname.starts_with(p.as_str()) {
return true;
}
}
false
}
}
impl Default for LocalConfigSettings {
@ -154,7 +177,12 @@ impl Default for LocalConfigSettings {
secondary_port: Some(293), // this is one of UNASSIGNED_PRIVILEGED_PORTS that we will default to
auto_port_search: true,
port_mapping: true,
log_size_max: 16777216,
log_size_max: 1048576,
log_vl1_events: false,
log_vl2_events: false,
log_filter_events: false,
log_multicast_events: false,
log_debug: false,
interface_prefix_blacklist: bl,
explicit_addresses: Vec::new()
}

View file

@ -0,0 +1,76 @@
use std::fs::{File, OpenOptions};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::io::{Write, Seek, SeekFrom};
use std::cell::Cell;
use zerotier_core::PortableAtomicI64;
use chrono::Datelike;
pub struct Log {
prefix: String,
path: String,
file: Mutex<Cell<Option<File>>>,
cur_size: PortableAtomicI64,
max_size: AtomicUsize,
}
impl Log {
const MIN_MAX_SIZE: usize = 4096;
pub fn new(path: &str, max_size: usize, prefix: &str) -> Log {
let mut p = String::from(prefix);
if !p.is_empty() {
p.push(' ');
}
Log{
prefix: p,
path: String::from(path),
file: Mutex::new(Cell::new(None)),
cur_size: PortableAtomicI64::new(0),
max_size: AtomicUsize::new(if max_size < MIN_MAX_SIZE { MIN_MAX_SIZE } else { max_size }),
}
}
pub fn set_max_size(&self, new_max_size: usize) {
self.max_size.store(if new_max_size < MIN_MAX_SIZE { MIN_MAX_SIZE } else { new_max_size },Ordering::Relaxed);
}
pub fn log<S>(&self, s: &S) {
let mut fc = self.file.lock().unwrap();
let max_size = self.max_size.load(Ordering::Relaxed);
if max_size > 0 && fc.get_mut().is_some() {
if self.cur_size.get() >= max_size as i64 {
fc.replace(None); // close and dispose of old File
let mut old_path = self.path.clone();
old_path.push_str(".old");
let _ = std::fs::remove_file(old_path.as_str());
let _ = std::fs::rename(self.path.as_str(), old_path.as_str());
let _ = std::fs::remove_file(self.path.as_str()); // should fail
self.cur_size.set(0);
}
}
if fc.get_mut().is_none() {
let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str());
if f.is_err() {
return;
}
let mut f = f.unwrap();
let eof = f.seek(SeekFrom::End(0));
if eof.is_err() {
return;
}
cur_size = eof.unwrap() as i64;
fc.replace(Some(f));
}
let mut f = fc.get_mut().as_mut().unwrap();
let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
writeln!("{}[{}] {}", self.prefix.as_str(), now_str.as_str(), s);
let _ = f.flush();
self.cur_size.fetch_add((now_str_b.len() + sb.len() + 1) as i64);
}
}
unsafe impl Sync for Log {}

View file

@ -1,11 +1,144 @@
mod fastudp;
mod fastudpsocket;
mod localconfig;
mod physicallink;
mod log;
mod store;
use std::any::Any;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use warp::Filter;
use zerotier_core::{Address, Buffer, Event, Identity, InetAddress, InetAddressFamily, MAC, NetworkId, Node, NodeEventHandler, StateObjectType, VirtualNetworkConfig, VirtualNetworkConfigOperation};
use crate::fastudpsocket::*;
use crate::localconfig::*;
use crate::physicallink::PhysicalLink;
use crate::log::Log;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub struct ServiceEventHandler {}
impl FastUDPSocketPacketHandler for ServiceEventHandler {
fn incoming_udp_packet(&self, raw_socket: &FastUDPRawOsSocket, from_adddress: &InetAddress, data: Buffer) {}
}
impl NodeEventHandler for ServiceEventHandler {
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc<dyn Any>, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {
}
fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc<dyn Any>, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) {
}
fn event(&self, event: Event, event_data: &[u8]) {
}
fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) {
}
fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Option<Box<[u8]>> {
None
}
fn wire_packet_send(&self, local_socket: i64, sock_addr: &InetAddress, data: &[u8], packet_ttl: u32) -> i32 {
0
}
fn path_check(&self, address: Address, id: &Identity, local_socket: i64, sock_addr: &InetAddress) -> bool {
true
}
fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option<InetAddress> {
None
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
fn main() {
println!("Hello, world!");
tokio::runtime::Builder::new_multi_thread().thread_stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).build().unwrap().block_on(async {
let inaddr_v6_any = IpAddr::from_str("::0").unwrap();
let tokio_rt = tokio::runtime::Runtime::new().unwrap();
tokio_rt.block_on(async {
// TODO: init warp http server and anything else using tokio
let mut local_config: Box<LocalConfig> = Box::new(LocalConfig::default());
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket<ServiceEventHandler>> = BTreeMap::new();
let handler: Arc<ServiceEventHandler> = Arc::new(ServiceEventHandler{});
let run: AtomicBool = AtomicBool::new(true);
loop {
let mut warp_server_port = local_config.settings.primary_port;
loop {
let root = warp::path::end().map(|| { warp::reply::with_status("not found", warp::hyper::StatusCode::NOT_FOUND) });
let status = warp::path("status").map(|| { "status" });
let network = warp::path!("network" / String).map(|nwid_str| { "network" });
let peer = warp::path!("peer" / String).map(|peer_str| { "peer" });
let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel();
let (_, warp_server) = warp::serve(warp::any().and(
root
.or(status)
.or(network)
.or(peer)
)).bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async {
let _ = shutdown_rx.await;
});
let warp_server = tokio::spawn(warp_server);
tokio::time::sleep(Duration::from_secs(10)).await;
// Diff system addresses against currently bound UDP sockets and update as needed.
// Also check interface prefix blacklists.
let mut system_addrs: BTreeMap<InetAddress, Rc<PhysicalLink>> = BTreeMap::new();
PhysicalLink::map(|link: PhysicalLink| {
if !local_config.settings.is_interface_blacklisted(link.device.as_str()) {
// Add two entries to system_addrs: one for primary port, and one for secondary if enabled.
let l = Rc::new(link);
let mut a = l.address.clone();
a.set_port(local_config.settings.primary_port);
system_addrs.insert(a, l.clone());
if local_config.settings.secondary_port.is_some() {
let mut a = l.address.clone();
a.set_port(local_config.settings.secondary_port.unwrap());
system_addrs.insert(a, l.clone());
}
}
});
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) {
udp_sockets_to_close.push(sock.0.clone());
}
}
for k in udp_sockets_to_close.iter() {
udp_sockets.remove(k);
}
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let s = FastUDPSocket::new(addr.1.device.as_str(), addr.0, &handler);
if s.is_ok() {
udp_sockets.insert(addr.0.clone(), s.unwrap());
}
}
}
// Breaking the inner loop causes the HTTP server to recycle, or may exit entirely if run is false.
if local_config.settings.primary_port != warp_server_port || !run.load(Ordering::Relaxed) {
let _ = shutdown_tx.send(());
let _ = warp_server.await;
break;
}
}
tokio::time::sleep(Duration::from_millis(250)).await;
if !run.load(Ordering::Relaxed) {
break;
}
}
});
}

View file

@ -0,0 +1,41 @@
use zerotier_core::InetAddress;
use std::ffi::CStr;
use std::ptr::{null_mut, copy_nonoverlapping};
use std::mem::size_of;
#[derive(Clone)]
pub struct PhysicalLink {
pub address: InetAddress,
pub device: String
}
impl PhysicalLink {
#[cfg(unix)]
pub fn map<F: FnMut(PhysicalLink)>(mut f: F) {
unsafe {
let mut ifap: *mut libc::ifaddrs = null_mut();
if libc::getifaddrs((&mut ifap as *mut *mut libc::ifaddrs).cast()) == 0 {
let mut i = ifap;
while !i.is_null() {
if !(*i).ifa_addr.is_null() {
let mut a = InetAddress::new();
if (*(*i).ifa_addr).sa_family == libc::AF_INET as u8 {
copy_nonoverlapping((*i).ifa_addr.cast::<u8>(), (&mut a as *mut InetAddress).cast::<u8>(), size_of::<libc::sockaddr_in>());
} else if (*(*i).ifa_addr).sa_family == libc::AF_INET6 as u8 {
copy_nonoverlapping((*i).ifa_addr.cast::<u8>(), (&mut a as *mut InetAddress).cast::<u8>(), size_of::<libc::sockaddr_in6>());
} else {
continue;
}
a.set_port(0);
f(PhysicalLink{
address: a,
device: if (*i).ifa_name.is_null() { String::new() } else { String::from(CStr::from_ptr((*i).ifa_name).to_str().unwrap()) }
});
}
i = (*i).ifa_next;
}
libc::freeifaddrs(ifap.cast());
}
}
}
}

View file

@ -0,0 +1,86 @@
use std::error::Error;
use std::path::{Path, PathBuf};
use zerotier_core::StateObjectType;
use std::io::{Read, Write};
pub struct Store {
pub base_path: Box<Path>,
pub peers_path: Box<Path>,
pub controller_path: Box<Path>,
pub networks_path: Box<Path>,
pub certs_path: Box<Path>,
}
impl Store {
const MAX_OBJECT_SIZE: usize = 131072; // sanity limit
pub fn new(base_path: &str) -> Result<Store, std::io::Error> {
let bp = Path::new(base_path);
let md = bp.metadata()?;
if !md.is_dir() || md.permissions().readonly() {
return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "base path does not exist or is not writable"));
}
Ok(Store{
base_path: bp.into_path_buf().into_boxed_path(),
peers_path: bp.join("peers.d").into_boxed_path(),
controller_path: bp.join("controller.d").into_boxed_path(),
networks_path: bp.join("networks.d").into_boxed_path(),
certs_path: bp.join("certs.d").into_boxed_path(),
})
}
pub fn make_obj_path(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Result<PathBuf, std::io::Error> {
Ok(match obj_type {
StateObjectType::IdentityPublic => self.base_path.join("identity.public"),
StateObjectType::IdentitySecret => self.base_path.join("identity.secret"),
StateObjectType::Certificate => {
if obj_id.len() < 6 {
return Err(std::io::Error(std::io::ErrorKind::NotFound));
}
self.certs_path.join(format!("{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}.cert",obj_id[0],obj_id[1],obj_id[2],obj_id[3],obj_id[4],obj_id[5]))
},
StateObjectType::TrustStore => self.base_path.join("truststore"),
StateObjectType::Locator => self.base_path.join("locator"),
StateObjectType::NetworkConfig => {
if obj_id.len() < 1 {
return Err(std::io::Error(std::io::ErrorKind::NotFound));
}
self.networks_path.join(format!("{:0>16x}.conf", obj_id[0]))
},
StateObjectType::Peer => {
if obj_id.len() < 1 {
return Err(std::io::Error(std::io::ErrorKind::NotFound));
}
self.peers_path.join(format!("{:0>10x}.peer", obj_id[0]))
}
})
}
pub fn load(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Result<Box<[u8]>, std::io::Error> {
let obj_path = self.make_obj_path(obj_type, obj_id)?;
let fmd = obj_path.metadata()?;
if fmd.is_file() {
let flen = fmd.len();
if flen <= Store::MAX_OBJECT_SIZE as u64 {
let mut f = std::fs::File::open(obj_path)?;
let mut buf: Vec<u8> = Vec::new();
buf.reserve(flen as usize);
let rs = f.read_to_end(&mut buf)?;
buf.resize(rs as usize, 0);
return Ok(buf.into_boxed_slice());
}
}
Err(std::io::Error(std::io::ErrorKind::NotFound))
}
pub fn erase(&self, obj_type: StateObjectType, obj_id: &[u64]) {
let obj_path = self.make_obj_path(obj_type, obj_id);
if obj_path.is_ok() {
let _ = std::fs::remove_file(obj_path.unwrap());
}
}
pub fn store(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> Result<(), std::io::Error> {
std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(self.make_obj_path(obj_type, obj_id)?)?.write_all(obj_data)
}
}