diff --git a/core/CAPI.cpp b/core/CAPI.cpp index 059c20822..72917a412 100644 --- a/core/CAPI.cpp +++ b/core/CAPI.cpp @@ -834,6 +834,18 @@ enum ZT_InetAddress_IpScope ZT_InetAddress_ipScope(const ZT_InetAddress *ia) return ZT_IP_SCOPE_NONE; } +int ZT_InetAddress_lessThan(const ZT_InetAddress *a, const ZT_InetAddress *b) +{ + if ((a)&&(b)) { + return (int)(*reinterpret_cast(a) < *reinterpret_cast(b)); + } else if (a) { + return 0; + } else if (b) { + return 1; + } + return 0; +} + /********************************************************************************************************************/ uint64_t ZT_random() diff --git a/core/zerotier.h b/core/zerotier.h index 7266aa392..b36367251 100644 --- a/core/zerotier.h +++ b/core/zerotier.h @@ -3018,6 +3018,11 @@ ZT_SDK_API int ZT_InetAddress_isV6(const ZT_InetAddress *ia); */ ZT_SDK_API enum ZT_InetAddress_IpScope ZT_InetAddress_ipScope(const ZT_InetAddress *ia); +/** + * Compare a and b, return non-zero if a < b + */ +ZT_SDK_API int ZT_InetAddress_lessThan(const ZT_InetAddress *a, const ZT_InetAddress *b); + /* These mirror the values of AF_INET and AF_INET6 for use by Rust and other things that need it. */ ZT_SDK_API const int ZT_AF_INET,ZT_AF_INET6; diff --git a/rust-zerotier-core/src/address.rs b/rust-zerotier-core/src/address.rs index dc0cdac71..f6a09ea40 100644 --- a/rust-zerotier-core/src/address.rs +++ b/rust-zerotier-core/src/address.rs @@ -11,6 +11,8 @@ */ /****/ +use std::cmp::Ordering; + pub struct Address(pub u64); impl ToString for Address { @@ -41,6 +43,27 @@ impl PartialEq for Address { impl Eq for Address {} +impl Ord for Address { + #[inline(always)] + fn cmp(&self, other: &Self) -> Ordering { + self.0.cmp(&other.0) + } +} + +impl PartialOrd for Address { + #[inline(always)] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.0.cmp(&other.0)) + } +} + +impl Clone for Address { + #[inline(always)] + fn clone(&self) -> Self { + Address(self.0) + } +} + impl serde::Serialize for Address { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer { serializer.serialize_str(self.to_string().as_str()) diff --git a/rust-zerotier-core/src/inetaddress.rs b/rust-zerotier-core/src/inetaddress.rs index e64797a2c..5319e34e3 100644 --- a/rust-zerotier-core/src/inetaddress.rs +++ b/rust-zerotier-core/src/inetaddress.rs @@ -20,6 +20,7 @@ use num_traits::FromPrimitive; use crate::*; use crate::bindings::capi as ztcore; use std::os::raw::{c_void, c_uint}; +use std::cmp::Ordering; // WARNING: here be dragons! This defines an opaque blob in Rust that shadows // and is of the exact size as an opaque blob in C that shadows and is the @@ -148,6 +149,18 @@ impl InetAddress { } } + pub fn port(&self) -> u16 { + unsafe { + ztcore::ZT_InetAddress_port(self.as_capi_ptr()) as u16 + } + } + + pub fn set_port(&mut self, port: u16) { + unsafe { + ztcore::ZT_InetAddress_setPort(self.as_capi_mut_ptr(), port as c_uint); + } + } + /// Get the network scope of the IP in this object. pub fn ip_scope(&self) -> IpScope { unsafe { @@ -199,6 +212,26 @@ impl Clone for InetAddress { } } +impl Ord for InetAddress { + fn cmp(&self, other: &Self) -> Ordering { + unsafe { + if ztcore::ZT_InetAddress_lessThan(self.as_capi_ptr(), other.as_capi_ptr()) != 0 { + return Ordering::Less; + } else if ztcore::ZT_InetAddress_lessThan(other.as_capi_ptr(), self.as_capi_ptr()) != 0 { + return Ordering::Greater; + } + return Ordering::Equal; + } + } +} + +impl PartialOrd for InetAddress { + #[inline(always)] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + impl PartialEq for InetAddress { #[inline(always)] fn eq(&self, other: &Self) -> bool { diff --git a/rust-zerotier-core/src/lib.rs b/rust-zerotier-core/src/lib.rs index 607021d98..8bc79ae29 100644 --- a/rust-zerotier-core/src/lib.rs +++ b/rust-zerotier-core/src/lib.rs @@ -50,6 +50,8 @@ pub use buffer::Buffer; pub use portableatomici64::PortableAtomicI64; pub use virtualnetworkconfig::*; +pub const RECOMMENDED_THREAD_STACK_SIZE: usize = 262144; + pub const DEFAULT_PORT: u16 = ztcore::ZT_DEFAULT_PORT as u16; pub const BUF_SIZE: u32 = ztcore::ZT_BUF_SIZE; @@ -192,7 +194,7 @@ pub unsafe fn cstr_to_string(cstr: *const c_char, max_len: isize) -> String { } /// Macro to implement to_json and new_from_json on types that are Serializable. -#[macro_export(crate)] +#[macro_export] macro_rules! implement_to_from_json { ($struct_name:ident) => { impl $struct_name { diff --git a/rust-zerotier-core/src/networkid.rs b/rust-zerotier-core/src/networkid.rs index e66c20397..06e5cf14b 100644 --- a/rust-zerotier-core/src/networkid.rs +++ b/rust-zerotier-core/src/networkid.rs @@ -11,6 +11,8 @@ */ /****/ +use std::cmp::Ordering; + pub struct NetworkId(pub u64); impl NetworkId { @@ -41,6 +43,20 @@ impl From<&str> for NetworkId { } } +impl Ord for NetworkId { + #[inline(always)] + fn cmp(&self, other: &Self) -> Ordering { + self.0.cmp(&other.0) + } +} + +impl PartialOrd for NetworkId { + #[inline(always)] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.0.cmp(&other.0)) + } +} + impl PartialEq for NetworkId { #[inline(always)] fn eq(&self, other: &Self) -> bool { @@ -48,6 +64,13 @@ impl PartialEq for NetworkId { } } +impl Clone for NetworkId { + #[inline(always)] + fn clone(&self) -> Self { + NetworkId(self.0) + } +} + impl Eq for NetworkId {} impl serde::Serialize for NetworkId { diff --git a/rust-zerotier-core/src/node.rs b/rust-zerotier-core/src/node.rs index 55d28893c..19ded2938 100644 --- a/rust-zerotier-core/src/node.rs +++ b/rust-zerotier-core/src/node.rs @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize}; use crate::*; use crate::bindings::capi as ztcore; -const NODE_BACKGROUND_MAX_DELAY: i64 = 250; +const NODE_BACKGROUND_MAX_DELAY: i64 = 500; #[derive(FromPrimitive, ToPrimitive, PartialEq, Eq)] pub enum Event { @@ -52,21 +52,6 @@ pub enum StateObjectType { Certificate = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_CERT as isize } -impl StateObjectType { - /// Get the canonical file extension for this object type. - pub fn to_file_ext(&self) -> &str { - match *self { - StateObjectType::IdentityPublic => "public", - StateObjectType::IdentitySecret => "secret", - StateObjectType::Locator => "locator", - StateObjectType::Peer => "peer", - StateObjectType::NetworkConfig => "network", - StateObjectType::TrustStore => "trust", - StateObjectType::Certificate => "cert" - } - } -} - /// The status of a ZeroTier node. #[derive(Serialize, Deserialize)] pub struct NodeStatus { @@ -96,7 +81,7 @@ pub trait NodeEventHandler { fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]); /// Called to retrieve an object from the object store. - fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Box<[u8]>; + fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Option>; /// Called to send a packet over the physical network (virtual -> physical). fn wire_packet_send(&self, local_socket: i64, sock_addr: &InetAddress, data: &[u8], packet_ttl: u32) -> i32; @@ -246,14 +231,17 @@ extern "C" fn zt_state_get_function( let n = node_from_raw_ptr!(uptr); let obj_id2 = unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }; let obj_data_result = n.event_handler.state_get(obj_type2, obj_id2); - if obj_data_result.len() > 0 { - unsafe { - let obj_data_len: c_int = obj_data_result.len() as c_int; - let obj_data_raw = ztcore::malloc(obj_data_len as c_ulong); - if !obj_data_raw.is_null() { - copy_nonoverlapping(obj_data_result.as_ptr(), obj_data_raw.cast::(), obj_data_len as usize); - *obj_data = obj_data_raw; - return obj_data_len; + if obj_data_result.is_some() { + let obj_data_result = obj_data_result.unwrap(); + if obj_data_result.len() > 0 { + unsafe { + let obj_data_len: c_int = obj_data_result.len() as c_int; + let obj_data_raw = ztcore::malloc(obj_data_len as c_ulong); + if !obj_data_raw.is_null() { + copy_nonoverlapping(obj_data_result.as_ptr(), obj_data_raw.cast::(), obj_data_len as usize); + *obj_data = obj_data_raw; + return obj_data_len; + } } } } @@ -369,7 +357,7 @@ impl Node { let wn = Arc::downgrade(&n); let run = n.background_thread_run.clone(); - n.background_thread.replace(Some(std::thread::spawn(move || { + n.background_thread.replace(Some(std::thread::Builder::new().stack_size(RECOMMENDED_THREAD_STACK_SIZE).spawn(move || { let mut loop_delay = Duration::from_millis(NODE_BACKGROUND_MAX_DELAY as u64); while run.load(Ordering::Relaxed) { std::thread::park_timeout(loop_delay); @@ -384,12 +372,12 @@ impl Node { break; } } - }))); + }).unwrap())); Ok(n) } - /// This is called periodically from the background service thread. + /// This is called periodically from the background service thread. It's not called from anywhere else. fn process_background_tasks(&self) -> i64 { let current_time = now(); self.now.set(current_time); diff --git a/rust-zerotier-core/src/portableatomici64.rs b/rust-zerotier-core/src/portableatomici64.rs index e57d36cd1..0e0fa6c58 100644 --- a/rust-zerotier-core/src/portableatomici64.rs +++ b/rust-zerotier-core/src/portableatomici64.rs @@ -44,6 +44,14 @@ impl PortableAtomicI64 { pub fn set(&self, v: i64) { *self.i.lock().unwrap() = v; } + + #[inline(always)] + pub fn fetch_add(&self, v: i64) -> i64 { + let i = self.i.lock().unwrap(); + let j = *i; + *i += v; + j + } } #[cfg(all(target_pointer_width = "64"))] @@ -69,4 +77,9 @@ impl PortableAtomicI64 { pub fn set(&self, v: i64) { self.i.store(v, Ordering::Relaxed) } + + #[inline(always)] + pub fn fetch_add(&self, v: i64) -> i64 { + self.i.fetch_add(v, Ordering::Relaxed) + } } diff --git a/rust-zerotier-service/Cargo.lock b/rust-zerotier-service/Cargo.lock index 4fc5d42ee..1fde2d064 100644 --- a/rust-zerotier-service/Cargo.lock +++ b/rust-zerotier-service/Cargo.lock @@ -1,5 +1,25 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -88,6 +108,34 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits", + "time", + "winapi", +] + +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + [[package]] name = "cpuid-bool" version = "0.1.2" @@ -142,6 +190,7 @@ checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -164,12 +213,35 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65" +[[package]] +name = "futures-executor" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500" +[[package]] +name = "futures-macro" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.12" @@ -191,11 +263,17 @@ version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", "slab", ] @@ -527,6 +605,16 @@ dependencies = [ "syn", ] +[[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.14" @@ -653,6 +741,18 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + [[package]] name = "proc-macro2" version = "1.0.24" @@ -906,6 +1006,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "syn" version = "1.0.58" @@ -931,6 +1037,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "time" version = "0.1.43" @@ -1130,6 +1245,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + [[package]] name = "unicode-xid" version = "0.2.1" @@ -1154,6 +1275,12 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.2" @@ -1249,6 +1376,10 @@ dependencies = [ name = "zerotier-service" version = "0.1.0" dependencies = [ + "chrono", + "clap", + "futures", + "hex", "libc", "num_cpus", "serde", diff --git a/rust-zerotier-service/Cargo.toml b/rust-zerotier-service/Cargo.toml index acdb45878..c2d319a18 100644 --- a/rust-zerotier-service/Cargo.toml +++ b/rust-zerotier-service/Cargo.toml @@ -3,6 +3,7 @@ name = "zerotier-service" version = "0.1.0" authors = ["Adam Ierymenko "] edition = "2018" +build = "build.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -13,6 +14,10 @@ tokio = { version = "1", features = ["full"] } warp = "0.3" serde = { version = "1", features = ["derive"] } serde_json = "1" +futures = "0.3" +clap = { version = "2", features = ["default"] } +chrono = "0.4" +hex = "0.4" [target."cfg(unix)".dependencies] libc = "0.2.82" diff --git a/rust-zerotier-service/build.rs b/rust-zerotier-service/build.rs new file mode 100644 index 000000000..788b9dafc --- /dev/null +++ b/rust-zerotier-service/build.rs @@ -0,0 +1,6 @@ +fn main() { + let d = env!("CARGO_MANIFEST_DIR"); + println!("cargo:rustc-link-search=native={}/../build/core", d); + println!("cargo:rustc-link-lib=static=zt_core"); + println!("cargo:rustc-link-lib=c++"); +} diff --git a/rust-zerotier-service/src/fastudp/mod.rs b/rust-zerotier-service/src/fastudp/mod.rs deleted file mode 100644 index a426f0532..000000000 --- a/rust-zerotier-service/src/fastudp/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod fastudpsocket; -pub use fastudpsocket::*; diff --git a/rust-zerotier-service/src/fastudp/fastudpsocket.rs b/rust-zerotier-service/src/fastudpsocket.rs similarity index 95% rename from rust-zerotier-service/src/fastudp/fastudpsocket.rs rename to rust-zerotier-service/src/fastudpsocket.rs index 905eb0eb5..b91e29e59 100644 --- a/rust-zerotier-service/src/fastudp/fastudpsocket.rs +++ b/rust-zerotier-service/src/fastudpsocket.rs @@ -121,7 +121,7 @@ pub struct FastUDPSocket threads: Vec>, thread_run: Arc, sockets: Vec, - bind_address: InetAddress, + pub bind_address: InetAddress, } #[cfg(unix)] @@ -154,12 +154,6 @@ fn fast_udp_socket_recvfrom(socket: &FastUDPRawOsSocket, buf: &mut Buffer, from_ } } -// Integer incremented to select sockets on a mostly round robin basis. This -// isn't synchronized since if all cores don't see it the same there is no -// significant impact. It's just a faster way to pick a socket for sending -// than a random number generator. -static mut SOCKET_SPIN_INT: usize = 0; - impl FastUDPSocket { pub fn new(device_name: &str, address: &InetAddress, handler: &Arc) -> Result, String> { let thread_count = num_cpus::get(); @@ -181,7 +175,7 @@ impl FastUDPSocket { let thread_run = s.thread_run.clone(); let handler_weak = Arc::downgrade(&s.handler); - s.threads.push(std::thread::spawn(move || { + s.threads.push(std::thread::Builder::new().stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).spawn(move || { let mut from_address = InetAddress::new(); while thread_run.load(Ordering::Relaxed) { let mut buf = Buffer::new(); @@ -189,7 +183,7 @@ impl FastUDPSocket { if read_length > 0 { let handler = handler_weak.upgrade(); if handler.is_some() { - unsafe { buf.set_len(read_length as u32); } + unsafe { buf.set_len(read_length as usize); } handler.unwrap().incoming_udp_packet(&thread_socket, &from_address, buf); } else { break; @@ -198,7 +192,7 @@ impl FastUDPSocket { break; } } - })); + }).unwrap()); } else { bind_failed_reason = thread_socket.err().unwrap(); } @@ -217,13 +211,13 @@ impl FastUDPSocket { /// Sockets are thread safe. #[inline(always)] pub fn send(&self, to_address: &InetAddress, data: *const u8, len: usize, packet_ttl: i32) { - let mut i; - unsafe { - i = SOCKET_SPIN_INT; - SOCKET_SPIN_INT = i + 1; - } - i %= self.sockets.len(); - fast_udp_socket_sendto(self.sockets.get(i).unwrap(), to_address, data, len, packet_ttl); + fast_udp_socket_sendto(self.sockets.get(0).unwrap(), to_address, data, len, packet_ttl); + } + + /// Get a raw socket that can be used to send UDP packets. + #[inline(always)] + pub fn raw_socket(&self) -> FastUDPRawOsSocket { + *self.sockets.get(0).unwrap() } /// Get the number of threads this socket is currently running. diff --git a/rust-zerotier-service/src/localconfig.rs b/rust-zerotier-service/src/localconfig.rs index cee3f88eb..fe01c3fdf 100644 --- a/rust-zerotier-service/src/localconfig.rs +++ b/rust-zerotier-service/src/localconfig.rs @@ -48,61 +48,71 @@ pub const UNASSIGNED_PRIVILEGED_PORTS: [u16; 299] = [ 1023, ]; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(default)] pub struct LocalConfigPhysicalPathConfig { - blacklist: bool + pub blacklist: bool } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(default)] pub struct LocalConfigVirtualConfig { #[serde(rename = "try")] - try_: Vec + pub try_: Vec } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(default)] pub struct LocalConfigNetworkSettings { #[serde(rename = "allowManagedIPs")] - allow_managed_ips: bool, + pub allow_managed_ips: bool, #[serde(rename = "allowGlobalIPs")] - allow_global_ips: bool, + pub allow_global_ips: bool, #[serde(rename = "allowManagedRoutes")] - allow_managed_routes: bool, + pub allow_managed_routes: bool, #[serde(rename = "allowGlobalRoutes")] - allow_global_routes: bool, + pub allow_global_routes: bool, #[serde(rename = "allowDefaultRouteOverride")] - allow_default_route_override: bool, + pub allow_default_route_override: bool, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(default)] pub struct LocalConfigSettings { #[serde(rename = "primaryPort")] - primary_port: u16, + pub primary_port: u16, #[serde(rename = "secondaryPort")] - secondary_port: Option, + pub secondary_port: Option, #[serde(rename = "autoPortSearch")] - auto_port_search: bool, + pub auto_port_search: bool, #[serde(rename = "portMapping")] - port_mapping: bool, + pub port_mapping: bool, #[serde(rename = "logSizeMax")] - log_size_max: usize, + pub log_size_max: usize, + #[serde(rename = "logVL1Events")] + pub log_vl1_events: bool, + #[serde(rename = "logVL2Events")] + pub log_vl2_events: bool, + #[serde(rename = "logFilterEvents")] + pub log_filter_events: bool, + #[serde(rename = "logMulticastEvents")] + pub log_multicast_events: bool, + #[serde(rename = "logDebug")] + pub log_debug: bool, #[serde(rename = "interfacePrefixBlacklist")] - interface_prefix_blacklist: Vec, + pub interface_prefix_blacklist: Vec, #[serde(rename = "explicitAddresses")] - explicit_addresses: Vec, + pub explicit_addresses: Vec, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] #[serde(default)] pub struct LocalConfig { - physical: BTreeMap, + pub physical: BTreeMap, #[serde(rename = "virtual")] - virtual_: BTreeMap, - network: BTreeMap, - settings: LocalConfigSettings, + pub virtual_: BTreeMap, + pub network: BTreeMap, + pub settings: LocalConfigSettings, } impl Default for LocalConfigPhysicalPathConfig { @@ -135,10 +145,23 @@ impl Default for LocalConfigNetworkSettings { impl LocalConfigSettings { #[cfg(target_os = "macos")] - const DEFAULT_PREFIX_BLACKLIST: [&str; 7] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth"]; + const DEFAULT_PREFIX_BLACKLIST: [&'static str; 7] = ["lo", "utun", "gif", "stf", "iptap", "pktap", "feth"]; #[cfg(target_os = "linux")] - const DEFAULT_PREFIX_BLACKLIST: [&str; 5] = ["lo", "tun", "tap", "ipsec", "zt"]; + const DEFAULT_PREFIX_BLACKLIST: [&'static str; 5] = ["lo", "tun", "tap", "ipsec", "zt"]; + + // This is not applicable on Windows as it doesn't use the same device name semantics. + #[cfg(windows)] + const DEFAULT_PREFIX_BLACKLIST: [&'static str; 0] = []; + + pub fn is_interface_blacklisted(&self, ifname: &str) -> bool { + for p in self.interface_prefix_blacklist.iter() { + if ifname.starts_with(p.as_str()) { + return true; + } + } + false + } } impl Default for LocalConfigSettings { @@ -154,7 +177,12 @@ impl Default for LocalConfigSettings { secondary_port: Some(293), // this is one of UNASSIGNED_PRIVILEGED_PORTS that we will default to auto_port_search: true, port_mapping: true, - log_size_max: 16777216, + log_size_max: 1048576, + log_vl1_events: false, + log_vl2_events: false, + log_filter_events: false, + log_multicast_events: false, + log_debug: false, interface_prefix_blacklist: bl, explicit_addresses: Vec::new() } diff --git a/rust-zerotier-service/src/log.rs b/rust-zerotier-service/src/log.rs new file mode 100644 index 000000000..0ca661849 --- /dev/null +++ b/rust-zerotier-service/src/log.rs @@ -0,0 +1,76 @@ +use std::fs::{File, OpenOptions}; +use std::sync::Mutex; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::io::{Write, Seek, SeekFrom}; +use std::cell::Cell; +use zerotier_core::PortableAtomicI64; +use chrono::Datelike; + +pub struct Log { + prefix: String, + path: String, + file: Mutex>>, + cur_size: PortableAtomicI64, + max_size: AtomicUsize, +} + +impl Log { + const MIN_MAX_SIZE: usize = 4096; + + pub fn new(path: &str, max_size: usize, prefix: &str) -> Log { + let mut p = String::from(prefix); + if !p.is_empty() { + p.push(' '); + } + Log{ + prefix: p, + path: String::from(path), + file: Mutex::new(Cell::new(None)), + cur_size: PortableAtomicI64::new(0), + max_size: AtomicUsize::new(if max_size < MIN_MAX_SIZE { MIN_MAX_SIZE } else { max_size }), + } + } + + pub fn set_max_size(&self, new_max_size: usize) { + self.max_size.store(if new_max_size < MIN_MAX_SIZE { MIN_MAX_SIZE } else { new_max_size },Ordering::Relaxed); + } + + pub fn log(&self, s: &S) { + let mut fc = self.file.lock().unwrap(); + + let max_size = self.max_size.load(Ordering::Relaxed); + if max_size > 0 && fc.get_mut().is_some() { + if self.cur_size.get() >= max_size as i64 { + fc.replace(None); // close and dispose of old File + let mut old_path = self.path.clone(); + old_path.push_str(".old"); + let _ = std::fs::remove_file(old_path.as_str()); + let _ = std::fs::rename(self.path.as_str(), old_path.as_str()); + let _ = std::fs::remove_file(self.path.as_str()); // should fail + self.cur_size.set(0); + } + } + + if fc.get_mut().is_none() { + let mut f = OpenOptions::new().read(true).write(true).create(true).open(self.path.as_str()); + if f.is_err() { + return; + } + let mut f = f.unwrap(); + let eof = f.seek(SeekFrom::End(0)); + if eof.is_err() { + return; + } + cur_size = eof.unwrap() as i64; + fc.replace(Some(f)); + } + + let mut f = fc.get_mut().as_mut().unwrap(); + let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); + writeln!("{}[{}] {}", self.prefix.as_str(), now_str.as_str(), s); + let _ = f.flush(); + self.cur_size.fetch_add((now_str_b.len() + sb.len() + 1) as i64); + } +} + +unsafe impl Sync for Log {} diff --git a/rust-zerotier-service/src/main.rs b/rust-zerotier-service/src/main.rs index 91c58e2a3..a6d7f58dc 100644 --- a/rust-zerotier-service/src/main.rs +++ b/rust-zerotier-service/src/main.rs @@ -1,11 +1,144 @@ -mod fastudp; +mod fastudpsocket; mod localconfig; +mod physicallink; +mod log; +mod store; + +use std::any::Any; +use std::cell::Cell; +use std::collections::BTreeMap; +use std::net::IpAddr; +use std::rc::Rc; +use std::str::FromStr; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; + +use warp::Filter; + +use zerotier_core::{Address, Buffer, Event, Identity, InetAddress, InetAddressFamily, MAC, NetworkId, Node, NodeEventHandler, StateObjectType, VirtualNetworkConfig, VirtualNetworkConfigOperation}; + +use crate::fastudpsocket::*; +use crate::localconfig::*; +use crate::physicallink::PhysicalLink; +use crate::log::Log; + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +pub struct ServiceEventHandler {} + +impl FastUDPSocketPacketHandler for ServiceEventHandler { + fn incoming_udp_packet(&self, raw_socket: &FastUDPRawOsSocket, from_adddress: &InetAddress, data: Buffer) {} +} + +impl NodeEventHandler for ServiceEventHandler { + fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) { + } + + fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) { + } + + fn event(&self, event: Event, event_data: &[u8]) { + } + + fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) { + } + + fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Option> { + None + } + + fn wire_packet_send(&self, local_socket: i64, sock_addr: &InetAddress, data: &[u8], packet_ttl: u32) -> i32 { + 0 + } + + fn path_check(&self, address: Address, id: &Identity, local_socket: i64, sock_addr: &InetAddress) -> bool { + true + } + + fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option { + None + } +} + +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// fn main() { - println!("Hello, world!"); + tokio::runtime::Builder::new_multi_thread().thread_stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).build().unwrap().block_on(async { + let inaddr_v6_any = IpAddr::from_str("::0").unwrap(); - let tokio_rt = tokio::runtime::Runtime::new().unwrap(); - tokio_rt.block_on(async { - // TODO: init warp http server and anything else using tokio + let mut local_config: Box = Box::new(LocalConfig::default()); + let mut udp_sockets: BTreeMap> = BTreeMap::new(); + let handler: Arc = Arc::new(ServiceEventHandler{}); + let run: AtomicBool = AtomicBool::new(true); + + loop { + let mut warp_server_port = local_config.settings.primary_port; + loop { + let root = warp::path::end().map(|| { warp::reply::with_status("not found", warp::hyper::StatusCode::NOT_FOUND) }); + let status = warp::path("status").map(|| { "status" }); + let network = warp::path!("network" / String).map(|nwid_str| { "network" }); + let peer = warp::path!("peer" / String).map(|peer_str| { "peer" }); + + let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); + let (_, warp_server) = warp::serve(warp::any().and( + root + .or(status) + .or(network) + .or(peer) + )).bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async { + let _ = shutdown_rx.await; + }); + let warp_server = tokio::spawn(warp_server); + + tokio::time::sleep(Duration::from_secs(10)).await; + + // Diff system addresses against currently bound UDP sockets and update as needed. + // Also check interface prefix blacklists. + let mut system_addrs: BTreeMap> = BTreeMap::new(); + PhysicalLink::map(|link: PhysicalLink| { + if !local_config.settings.is_interface_blacklisted(link.device.as_str()) { + // Add two entries to system_addrs: one for primary port, and one for secondary if enabled. + let l = Rc::new(link); + let mut a = l.address.clone(); + a.set_port(local_config.settings.primary_port); + system_addrs.insert(a, l.clone()); + if local_config.settings.secondary_port.is_some() { + let mut a = l.address.clone(); + a.set_port(local_config.settings.secondary_port.unwrap()); + system_addrs.insert(a, l.clone()); + } + } + }); + let mut udp_sockets_to_close: Vec = Vec::new(); + for sock in udp_sockets.iter() { + if !system_addrs.contains_key(sock.0) { + udp_sockets_to_close.push(sock.0.clone()); + } + } + for k in udp_sockets_to_close.iter() { + udp_sockets.remove(k); + } + for addr in system_addrs.iter() { + if !udp_sockets.contains_key(addr.0) { + let s = FastUDPSocket::new(addr.1.device.as_str(), addr.0, &handler); + if s.is_ok() { + udp_sockets.insert(addr.0.clone(), s.unwrap()); + } + } + } + + // Breaking the inner loop causes the HTTP server to recycle, or may exit entirely if run is false. + if local_config.settings.primary_port != warp_server_port || !run.load(Ordering::Relaxed) { + let _ = shutdown_tx.send(()); + let _ = warp_server.await; + break; + } + } + tokio::time::sleep(Duration::from_millis(250)).await; + if !run.load(Ordering::Relaxed) { + break; + } + } }); } diff --git a/rust-zerotier-service/src/physicallink.rs b/rust-zerotier-service/src/physicallink.rs new file mode 100644 index 000000000..7a4700f58 --- /dev/null +++ b/rust-zerotier-service/src/physicallink.rs @@ -0,0 +1,41 @@ +use zerotier_core::InetAddress; +use std::ffi::CStr; +use std::ptr::{null_mut, copy_nonoverlapping}; +use std::mem::size_of; + +#[derive(Clone)] +pub struct PhysicalLink { + pub address: InetAddress, + pub device: String +} + +impl PhysicalLink { + #[cfg(unix)] + pub fn map(mut f: F) { + unsafe { + let mut ifap: *mut libc::ifaddrs = null_mut(); + if libc::getifaddrs((&mut ifap as *mut *mut libc::ifaddrs).cast()) == 0 { + let mut i = ifap; + while !i.is_null() { + if !(*i).ifa_addr.is_null() { + let mut a = InetAddress::new(); + if (*(*i).ifa_addr).sa_family == libc::AF_INET as u8 { + copy_nonoverlapping((*i).ifa_addr.cast::(), (&mut a as *mut InetAddress).cast::(), size_of::()); + } else if (*(*i).ifa_addr).sa_family == libc::AF_INET6 as u8 { + copy_nonoverlapping((*i).ifa_addr.cast::(), (&mut a as *mut InetAddress).cast::(), size_of::()); + } else { + continue; + } + a.set_port(0); + f(PhysicalLink{ + address: a, + device: if (*i).ifa_name.is_null() { String::new() } else { String::from(CStr::from_ptr((*i).ifa_name).to_str().unwrap()) } + }); + } + i = (*i).ifa_next; + } + libc::freeifaddrs(ifap.cast()); + } + } + } +} diff --git a/rust-zerotier-service/src/store.rs b/rust-zerotier-service/src/store.rs new file mode 100644 index 000000000..d8f5dafc8 --- /dev/null +++ b/rust-zerotier-service/src/store.rs @@ -0,0 +1,86 @@ +use std::error::Error; +use std::path::{Path, PathBuf}; +use zerotier_core::StateObjectType; +use std::io::{Read, Write}; + +pub struct Store { + pub base_path: Box, + pub peers_path: Box, + pub controller_path: Box, + pub networks_path: Box, + pub certs_path: Box, +} + +impl Store { + const MAX_OBJECT_SIZE: usize = 131072; // sanity limit + + pub fn new(base_path: &str) -> Result { + let bp = Path::new(base_path); + let md = bp.metadata()?; + if !md.is_dir() || md.permissions().readonly() { + return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "base path does not exist or is not writable")); + } + Ok(Store{ + base_path: bp.into_path_buf().into_boxed_path(), + peers_path: bp.join("peers.d").into_boxed_path(), + controller_path: bp.join("controller.d").into_boxed_path(), + networks_path: bp.join("networks.d").into_boxed_path(), + certs_path: bp.join("certs.d").into_boxed_path(), + }) + } + + pub fn make_obj_path(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Result { + Ok(match obj_type { + StateObjectType::IdentityPublic => self.base_path.join("identity.public"), + StateObjectType::IdentitySecret => self.base_path.join("identity.secret"), + StateObjectType::Certificate => { + if obj_id.len() < 6 { + return Err(std::io::Error(std::io::ErrorKind::NotFound)); + } + self.certs_path.join(format!("{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}.cert",obj_id[0],obj_id[1],obj_id[2],obj_id[3],obj_id[4],obj_id[5])) + }, + StateObjectType::TrustStore => self.base_path.join("truststore"), + StateObjectType::Locator => self.base_path.join("locator"), + StateObjectType::NetworkConfig => { + if obj_id.len() < 1 { + return Err(std::io::Error(std::io::ErrorKind::NotFound)); + } + self.networks_path.join(format!("{:0>16x}.conf", obj_id[0])) + }, + StateObjectType::Peer => { + if obj_id.len() < 1 { + return Err(std::io::Error(std::io::ErrorKind::NotFound)); + } + self.peers_path.join(format!("{:0>10x}.peer", obj_id[0])) + } + }) + } + + pub fn load(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Result, std::io::Error> { + let obj_path = self.make_obj_path(obj_type, obj_id)?; + let fmd = obj_path.metadata()?; + if fmd.is_file() { + let flen = fmd.len(); + if flen <= Store::MAX_OBJECT_SIZE as u64 { + let mut f = std::fs::File::open(obj_path)?; + let mut buf: Vec = Vec::new(); + buf.reserve(flen as usize); + let rs = f.read_to_end(&mut buf)?; + buf.resize(rs as usize, 0); + return Ok(buf.into_boxed_slice()); + } + } + Err(std::io::Error(std::io::ErrorKind::NotFound)) + } + + pub fn erase(&self, obj_type: StateObjectType, obj_id: &[u64]) { + let obj_path = self.make_obj_path(obj_type, obj_id); + if obj_path.is_ok() { + let _ = std::fs::remove_file(obj_path.unwrap()); + } + } + + pub fn store(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> Result<(), std::io::Error> { + std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(self.make_obj_path(obj_type, obj_id)?)?.write_all(obj_data) + } +}