From 2c07a0cd7554cd12b3202da17e3ab652a657c30d Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Wed, 23 Dec 2020 16:33:10 -0500 Subject: [PATCH] Moar rustiness... got threading in Node right I think. --- rust-zerotier-core/Cargo.lock | 46 ++++++ rust-zerotier-core/Cargo.toml | 1 + rust-zerotier-core/src/buffer.rs | 51 +++++++ rust-zerotier-core/src/lib.rs | 28 +++- rust-zerotier-core/src/locator.rs | 2 +- rust-zerotier-core/src/mac.rs | 48 ++++++ rust-zerotier-core/src/node.rs | 161 ++++++++++++++++++++ rust-zerotier-core/src/portableatomici64.rs | 58 +++++++ 8 files changed, 387 insertions(+), 8 deletions(-) create mode 100644 rust-zerotier-core/src/buffer.rs create mode 100644 rust-zerotier-core/src/mac.rs create mode 100644 rust-zerotier-core/src/node.rs create mode 100644 rust-zerotier-core/src/portableatomici64.rs diff --git a/rust-zerotier-core/Cargo.lock b/rust-zerotier-core/Cargo.lock index 52c15d043..8b2048737 100644 --- a/rust-zerotier-core/Cargo.lock +++ b/rust-zerotier-core/Cargo.lock @@ -6,6 +6,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "hex" version = "0.4.2" @@ -18,6 +24,12 @@ version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" +[[package]] +name = "libc" +version = "0.2.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" + [[package]] name = "num-derive" version = "0.3.3" @@ -65,6 +77,7 @@ dependencies = [ "num-traits", "serde", "serde_json", + "socket2", ] [[package]] @@ -104,6 +117,17 @@ dependencies = [ "serde", ] +[[package]] +name = "socket2" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97e0e9fd577458a4f61fb91fcb559ea2afecc54c934119421f9f5d3d5b1a1057" +dependencies = [ + "cfg-if", + "libc", + "winapi", +] + [[package]] name = "syn" version = "1.0.54" @@ -120,3 +144,25 @@ name = "unicode-xid" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/rust-zerotier-core/Cargo.toml b/rust-zerotier-core/Cargo.toml index 39964dd2e..433654a24 100644 --- a/rust-zerotier-core/Cargo.toml +++ b/rust-zerotier-core/Cargo.toml @@ -12,3 +12,4 @@ num-derive = "0.3.3" serde = { version = "1", features = ["derive"] } serde_json = "1" hex = "0.4.2" +socket2 = "0.3.18" diff --git a/rust-zerotier-core/src/buffer.rs b/rust-zerotier-core/src/buffer.rs new file mode 100644 index 000000000..3349c2a38 --- /dev/null +++ b/rust-zerotier-core/src/buffer.rs @@ -0,0 +1,51 @@ +use crate::bindings::capi as ztcore; +use std::os::raw::c_void; +use std::slice::{from_raw_parts_mut}; +use std::ptr::null_mut; + +pub struct Buffer { + pub(crate) zt_core_buf: *mut u8, + pub(crate) data_size: usize +} + +impl Buffer { + pub fn new() -> Buffer { + unsafe { + let b = ztcore::ZT_getBuffer() as *mut u8; + if b.is_null() { + panic!("out of memory calling ZT_getBuffer()"); + } + return Buffer { + zt_core_buf: b, + data_size: ztcore::ZT_BUF_SIZE as usize + }; + } + } + + #[inline(always)] + pub fn get(&mut self) -> &mut [u8] { + unsafe { + return from_raw_parts_mut(self.zt_core_buf, ztcore::ZT_BUF_SIZE as usize); + } + } + + #[inline(always)] + pub fn set_data_size(&mut self, s: usize) { + self.data_size = s & (ztcore::ZT_BUF_SIZE - 1) as usize; + } + + #[inline(always)] + pub(crate) fn mark_consumed(&mut self) { + self.zt_core_buf = null_mut(); + } +} + +impl Drop for Buffer { + #[inline(always)] + fn drop(&mut self) { + // ZT_freeBuffer() does nothing if passed a null pointer. + unsafe { + ztcore::ZT_freeBuffer(self.zt_core_buf as *mut c_void); + } + } +} diff --git a/rust-zerotier-core/src/lib.rs b/rust-zerotier-core/src/lib.rs index eb013c6eb..062a0cc6e 100644 --- a/rust-zerotier-core/src/lib.rs +++ b/rust-zerotier-core/src/lib.rs @@ -8,16 +8,24 @@ mod networkid; mod locator; mod path; mod peer; +mod node; +mod mac; +mod buffer; +mod portableatomici64; pub use identity::*; -pub use address::*; -pub use fingerprint::*; -pub use endpoint::*; -pub use networkid::*; -pub use locator::*; +pub use address::Address; +pub use fingerprint::Fingerprint; +pub use endpoint::Endpoint; +pub use networkid::NetworkId; +pub use locator::Locator; pub use certificate::*; -pub use path::*; -pub use peer::*; +pub use path::Path; +pub use peer::Peer; +pub use node::*; +pub use mac::MAC; +pub use buffer::Buffer; +pub use portableatomici64::PortableAtomicI64; use bindings::capi as ztcore; use num_derive::{FromPrimitive, ToPrimitive}; @@ -227,6 +235,12 @@ pub fn version() -> (i32, i32, i32, i32) { (major as i32, minor as i32, revision as i32, build as i32) } +/// Convenience function to get the number of milliseconds since the Unix epoch. +#[inline] +pub fn now() -> i64 { + (std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() & 0x7fffffffffffffff) as i64 +} + #[macro_export(crate)] macro_rules! implement_json_serializable { ($struct_name:ident) => { diff --git a/rust-zerotier-core/src/locator.rs b/rust-zerotier-core/src/locator.rs index af4c375d7..ea3b2aff5 100644 --- a/rust-zerotier-core/src/locator.rs +++ b/rust-zerotier-core/src/locator.rs @@ -51,7 +51,7 @@ impl Locator { impl Drop for Locator { fn drop(&mut self) { - if self.requires_delete && !self.capi.is_null() { + if self.requires_delete { unsafe { ztcore::ZT_Locator_delete(self.capi); } diff --git a/rust-zerotier-core/src/mac.rs b/rust-zerotier-core/src/mac.rs new file mode 100644 index 000000000..9b2f510f3 --- /dev/null +++ b/rust-zerotier-core/src/mac.rs @@ -0,0 +1,48 @@ +pub struct MAC(pub u64); + +impl MAC { + #[inline] + pub fn new_from_string(s: &str) -> MAC { + return MAC(u64::from_str_radix(s.replace(":","").as_str(), 16).unwrap_or(0)); + } +} + +impl ToString for MAC { + #[inline] + fn to_string(&self) -> String { + let x = self.0; + format!("{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}", + (x >> 40) & 0xff, + (x >> 32) & 0xff, + (x >> 24) & 0xff, + (x >> 16) & 0xff, + (x >> 8) & 0xff, + x & 0xff) + } +} + +impl serde::Serialize for MAC { + fn serialize(&self, serializer: S) -> Result where S: serde::Serializer { + serializer.serialize_str(self.to_string().as_str()) + } +} + +struct AddressVisitor; + +impl<'de> serde::de::Visitor<'de> for AddressVisitor { + type Value = MAC; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("Ethernet MAC address in string format") + } + + fn visit_str(self, s: &str) -> Result where E: serde::de::Error { + Ok(MAC::new_from_string(s)) + } +} + +impl<'de> serde::Deserialize<'de> for MAC { + fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de> { + deserializer.deserialize_str(AddressVisitor) + } +} diff --git a/rust-zerotier-core/src/node.rs b/rust-zerotier-core/src/node.rs new file mode 100644 index 000000000..9d2b8348a --- /dev/null +++ b/rust-zerotier-core/src/node.rs @@ -0,0 +1,161 @@ +use std::cell::Cell; +use std::mem::MaybeUninit; +use std::os::raw::{c_uint, c_ulong, c_void}; +use std::ptr::null_mut; +use std::sync::*; +use std::sync::atomic::*; +use std::time::Duration; + +use num_traits::FromPrimitive; +use socket2::SockAddr; + +use crate::*; +use crate::bindings::capi as ztcore; + +pub struct Node { + capi: *mut ztcore::ZT_Node, + background_thread: Cell>>, + background_thread_run: Arc, + now: PortableAtomicI64, +} + +impl Node { + pub fn new(base_dir: &std::path::Path) -> Arc { + let n: Arc = Arc::new(Node { + capi: null_mut(), // TODO + background_thread: Cell::new(None), + background_thread_run: Arc::new(AtomicBool::new(true)), + now: PortableAtomicI64::new(now()), + }); + + let wn = Arc::downgrade(&n); + let run = n.background_thread_run.clone(); + n.background_thread.replace(Some(std::thread::spawn(move || { + let mut loop_delay = Duration::from_millis(500); + while run.load(Ordering::Relaxed) { + std::thread::park_timeout(loop_delay); + if run.load(Ordering::Relaxed) { + let nn = wn.upgrade(); + if nn.is_some() { + loop_delay = Duration::from_millis(nn.unwrap().process_background_tasks() as u64); + } else { + break; + } + } else { + break; + } + } + }))); + + n + } + + /// This is called periodically from internal background thread. + /// Don't call directly. + #[inline(always)] + fn process_background_tasks(&self) -> i64 { + let current_time = now(); + self.now.set(current_time); + + let mut next_task_deadline: i64 = current_time; + unsafe { + ztcore::ZT_Node_processBackgroundTasks(self.capi, 0 as *mut c_void, current_time, &mut next_task_deadline as *mut i64); + } + let mut next_delay = next_task_deadline - current_time; + + if next_delay < 50 { + next_delay = 50; + } else if next_delay > 500 { + next_delay = 500; + } + next_delay + } + + pub fn join(&self, nwid: NetworkId, controller_fingerprint: Option) -> ResultCode { + let mut cfp: MaybeUninit = MaybeUninit::uninit(); + let mut cfpp: *mut ztcore::ZT_Fingerprint = null_mut(); + if controller_fingerprint.is_some() { + let cfp2 = controller_fingerprint.unwrap(); + cfpp = cfp.as_mut_ptr(); + unsafe { + (*cfpp).address = cfp2.address.0; + (*cfpp).hash = cfp2.hash; + } + } + unsafe { + let rc = ztcore::ZT_Node_join(self.capi, nwid.0, cfpp, null_mut(), null_mut()); + return ResultCode::from_u32(rc as u32).unwrap(); + } + } + + pub fn leave(&self, nwid: NetworkId) -> ResultCode { + unsafe { + let rc = ztcore::ZT_Node_leave(self.capi, nwid.0, null_mut(), null_mut()); + return ResultCode::from_u32(rc as u32).unwrap(); + } + } + + #[inline(always)] + pub fn address(&self) -> Address { + unsafe { + return Address(ztcore::ZT_Node_address(self.capi) as u64); + } + } + + pub fn identity(&self) -> Identity { + unsafe { + return Identity::new_from_capi(ztcore::ZT_Node_identity(self.capi), false); + } + } + + #[inline(always)] + pub fn process_wire_packet(&self, local_socket: i64, remote_address: &SockAddr, data: &mut Buffer) -> ResultCode { + let current_time = self.now.get(); + let mut next_task_deadline: i64 = current_time; + unsafe { + return ResultCode::from_u32(ztcore::ZT_Node_processWirePacket(self.capi, null_mut(), current_time, local_socket, remote_address.as_ptr() as *const ztcore::sockaddr_storage, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_task_deadline as *mut i64) as u32).unwrap_or(ResultCode::ErrorInternalNonFatal); + } + } + + #[inline(always)] + pub fn process_virtual_network_frame(&self, nwid: &NetworkId, source_mac: &MAC, dest_mac: &MAC, ethertype: u16, vlan_id: u16, data: &mut Buffer) -> ResultCode { + let current_time = self.now.get(); + let mut next_tick_deadline: i64 = current_time; + unsafe { + return ResultCode::from_u32(ztcore::ZT_Node_processVirtualNetworkFrame(self.capi, null_mut(), current_time, nwid.0, source_mac.0, dest_mac.0, ethertype as c_uint, vlan_id as c_uint, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_tick_deadline as *mut i64) as u32).unwrap_or(ResultCode::ErrorInternalNonFatal); + } + } + + pub fn multicast_subscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode { + unsafe { + return ResultCode::from_u32(ztcore::ZT_Node_multicastSubscribe(self.capi, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as u32).unwrap_or(ResultCode::ErrorInternalNonFatal); + } + } + + pub fn multicast_unsubscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode { + unsafe { + return ResultCode::from_u32(ztcore::ZT_Node_multicastUnsubscribe(self.capi, nwid.0, multicast_group.0, multicast_adi as c_ulong) as u32).unwrap_or(ResultCode::ErrorInternalNonFatal); + } + } +} + +unsafe impl Sync for Node {} + +unsafe impl Send for Node {} + +impl Drop for Node { + fn drop(&mut self) { + self.background_thread_run.store(false, Ordering::Relaxed); + std::thread::yield_now(); + let bt = self.background_thread.replace(None); + if bt.is_some() { + let bt = bt.unwrap(); + bt.thread().unpark(); + let _ = bt.join(); + } + + unsafe { + ztcore::ZT_Node_delete(self.capi, 0 as *mut c_void); + } + } +} diff --git a/rust-zerotier-core/src/portableatomici64.rs b/rust-zerotier-core/src/portableatomici64.rs new file mode 100644 index 000000000..f4763a4d0 --- /dev/null +++ b/rust-zerotier-core/src/portableatomici64.rs @@ -0,0 +1,58 @@ +#[cfg(all(target_pointer_width = "32"))] +use std::sync::Mutex; + +#[cfg(all(target_pointer_width = "64"))] +use std::sync::atomic::{AtomicI64, Ordering}; + +// This implements a basic atomic i64 that uses a mutex on 32-bit systems, +// since you can't atomically access something larger than word size. + +#[cfg(all(target_pointer_width = "32"))] +pub struct PortableAtomicI64 { + i: Mutex +} + +#[cfg(all(target_pointer_width = "32"))] +impl PortableAtomicI64 { + #[inline(always)] + pub fn new(v: i64) -> PortableAtomicI64 { + PortableAtomicI64{ + i: Mutex::new(v) + } + } + + #[inline(always)] + pub fn get(&self) -> i64 { + *self.i.lock().unwrap() + } + + #[inline(always)] + pub fn set(&self, v: i64) { + *self.i.lock().unwrap() = v; + } +} + +#[cfg(all(target_pointer_width = "64"))] +pub struct PortableAtomicI64 { + i: AtomicI64 +} + +#[cfg(all(target_pointer_width = "64"))] +impl PortableAtomicI64 { + #[inline(always)] + pub fn new(v: i64) -> PortableAtomicI64 { + PortableAtomicI64{ + i: AtomicI64::new(v) + } + } + + #[inline(always)] + pub fn get(&self) -> i64 { + self.i.load(Ordering::Relaxed) + } + + #[inline(always)] + pub fn set(&self, v: i64) { + self.i.store(v, Ordering::Relaxed) + } +}