Moar rustiness... got threading in Node right I think.

This commit is contained in:
Adam Ierymenko 2020-12-23 16:33:10 -05:00
parent 36b293345b
commit 2c07a0cd75
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
8 changed files with 387 additions and 8 deletions

View file

@ -6,6 +6,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "hex"
version = "0.4.2"
@ -18,6 +24,12 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6"
[[package]]
name = "libc"
version = "0.2.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb"
[[package]]
name = "num-derive"
version = "0.3.3"
@ -65,6 +77,7 @@ dependencies = [
"num-traits",
"serde",
"serde_json",
"socket2",
]
[[package]]
@ -104,6 +117,17 @@ dependencies = [
"serde",
]
[[package]]
name = "socket2"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97e0e9fd577458a4f61fb91fcb559ea2afecc54c934119421f9f5d3d5b1a1057"
dependencies = [
"cfg-if",
"libc",
"winapi",
]
[[package]]
name = "syn"
version = "1.0.54"
@ -120,3 +144,25 @@ name = "unicode-xid"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

View file

@ -12,3 +12,4 @@ num-derive = "0.3.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
hex = "0.4.2"
socket2 = "0.3.18"

View file

@ -0,0 +1,51 @@
use crate::bindings::capi as ztcore;
use std::os::raw::c_void;
use std::slice::{from_raw_parts_mut};
use std::ptr::null_mut;
pub struct Buffer {
pub(crate) zt_core_buf: *mut u8,
pub(crate) data_size: usize
}
impl Buffer {
pub fn new() -> Buffer {
unsafe {
let b = ztcore::ZT_getBuffer() as *mut u8;
if b.is_null() {
panic!("out of memory calling ZT_getBuffer()");
}
return Buffer {
zt_core_buf: b,
data_size: ztcore::ZT_BUF_SIZE as usize
};
}
}
#[inline(always)]
pub fn get(&mut self) -> &mut [u8] {
unsafe {
return from_raw_parts_mut(self.zt_core_buf, ztcore::ZT_BUF_SIZE as usize);
}
}
#[inline(always)]
pub fn set_data_size(&mut self, s: usize) {
self.data_size = s & (ztcore::ZT_BUF_SIZE - 1) as usize;
}
#[inline(always)]
pub(crate) fn mark_consumed(&mut self) {
self.zt_core_buf = null_mut();
}
}
impl Drop for Buffer {
#[inline(always)]
fn drop(&mut self) {
// ZT_freeBuffer() does nothing if passed a null pointer.
unsafe {
ztcore::ZT_freeBuffer(self.zt_core_buf as *mut c_void);
}
}
}

View file

@ -8,16 +8,24 @@ mod networkid;
mod locator;
mod path;
mod peer;
mod node;
mod mac;
mod buffer;
mod portableatomici64;
pub use identity::*;
pub use address::*;
pub use fingerprint::*;
pub use endpoint::*;
pub use networkid::*;
pub use locator::*;
pub use address::Address;
pub use fingerprint::Fingerprint;
pub use endpoint::Endpoint;
pub use networkid::NetworkId;
pub use locator::Locator;
pub use certificate::*;
pub use path::*;
pub use peer::*;
pub use path::Path;
pub use peer::Peer;
pub use node::*;
pub use mac::MAC;
pub use buffer::Buffer;
pub use portableatomici64::PortableAtomicI64;
use bindings::capi as ztcore;
use num_derive::{FromPrimitive, ToPrimitive};
@ -227,6 +235,12 @@ pub fn version() -> (i32, i32, i32, i32) {
(major as i32, minor as i32, revision as i32, build as i32)
}
/// Convenience function to get the number of milliseconds since the Unix epoch.
#[inline]
pub fn now() -> i64 {
(std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() & 0x7fffffffffffffff) as i64
}
#[macro_export(crate)]
macro_rules! implement_json_serializable {
($struct_name:ident) => {

View file

@ -51,7 +51,7 @@ impl Locator {
impl Drop for Locator {
fn drop(&mut self) {
if self.requires_delete && !self.capi.is_null() {
if self.requires_delete {
unsafe {
ztcore::ZT_Locator_delete(self.capi);
}

View file

@ -0,0 +1,48 @@
pub struct MAC(pub u64);
impl MAC {
#[inline]
pub fn new_from_string(s: &str) -> MAC {
return MAC(u64::from_str_radix(s.replace(":","").as_str(), 16).unwrap_or(0));
}
}
impl ToString for MAC {
#[inline]
fn to_string(&self) -> String {
let x = self.0;
format!("{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}:{:0>2x}",
(x >> 40) & 0xff,
(x >> 32) & 0xff,
(x >> 24) & 0xff,
(x >> 16) & 0xff,
(x >> 8) & 0xff,
x & 0xff)
}
}
impl serde::Serialize for MAC {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer {
serializer.serialize_str(self.to_string().as_str())
}
}
struct AddressVisitor;
impl<'de> serde::de::Visitor<'de> for AddressVisitor {
type Value = MAC;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("Ethernet MAC address in string format")
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E> where E: serde::de::Error {
Ok(MAC::new_from_string(s))
}
}
impl<'de> serde::Deserialize<'de> for MAC {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> where D: serde::Deserializer<'de> {
deserializer.deserialize_str(AddressVisitor)
}
}

View file

@ -0,0 +1,161 @@
use std::cell::Cell;
use std::mem::MaybeUninit;
use std::os::raw::{c_uint, c_ulong, c_void};
use std::ptr::null_mut;
use std::sync::*;
use std::sync::atomic::*;
use std::time::Duration;
use num_traits::FromPrimitive;
use socket2::SockAddr;
use crate::*;
use crate::bindings::capi as ztcore;
pub struct Node {
capi: *mut ztcore::ZT_Node,
background_thread: Cell<Option<std::thread::JoinHandle<()>>>,
background_thread_run: Arc<AtomicBool>,
now: PortableAtomicI64,
}
impl Node {
pub fn new(base_dir: &std::path::Path) -> Arc<Node> {
let n: Arc<Node> = Arc::new(Node {
capi: null_mut(), // TODO
background_thread: Cell::new(None),
background_thread_run: Arc::new(AtomicBool::new(true)),
now: PortableAtomicI64::new(now()),
});
let wn = Arc::downgrade(&n);
let run = n.background_thread_run.clone();
n.background_thread.replace(Some(std::thread::spawn(move || {
let mut loop_delay = Duration::from_millis(500);
while run.load(Ordering::Relaxed) {
std::thread::park_timeout(loop_delay);
if run.load(Ordering::Relaxed) {
let nn = wn.upgrade();
if nn.is_some() {
loop_delay = Duration::from_millis(nn.unwrap().process_background_tasks() as u64);
} else {
break;
}
} else {
break;
}
}
})));
n
}
/// This is called periodically from internal background thread.
/// Don't call directly.
#[inline(always)]
fn process_background_tasks(&self) -> i64 {
let current_time = now();
self.now.set(current_time);
let mut next_task_deadline: i64 = current_time;
unsafe {
ztcore::ZT_Node_processBackgroundTasks(self.capi, 0 as *mut c_void, current_time, &mut next_task_deadline as *mut i64);
}
let mut next_delay = next_task_deadline - current_time;
if next_delay < 50 {
next_delay = 50;
} else if next_delay > 500 {
next_delay = 500;
}
next_delay
}
pub fn join(&self, nwid: NetworkId, controller_fingerprint: Option<Fingerprint>) -> ResultCode {
let mut cfp: MaybeUninit<ztcore::ZT_Fingerprint> = MaybeUninit::uninit();
let mut cfpp: *mut ztcore::ZT_Fingerprint = null_mut();
if controller_fingerprint.is_some() {
let cfp2 = controller_fingerprint.unwrap();
cfpp = cfp.as_mut_ptr();
unsafe {
(*cfpp).address = cfp2.address.0;
(*cfpp).hash = cfp2.hash;
}
}
unsafe {
let rc = ztcore::ZT_Node_join(self.capi, nwid.0, cfpp, null_mut(), null_mut());
return ResultCode::from_u32(rc as u32).unwrap();
}
}
pub fn leave(&self, nwid: NetworkId) -> ResultCode {
unsafe {
let rc = ztcore::ZT_Node_leave(self.capi, nwid.0, null_mut(), null_mut());
return ResultCode::from_u32(rc as u32).unwrap();
}
}
#[inline(always)]
pub fn address(&self) -> Address {
unsafe {
return Address(ztcore::ZT_Node_address(self.capi) as u64);
}
}
pub fn identity(&self) -> Identity {
unsafe {
return Identity::new_from_capi(ztcore::ZT_Node_identity(self.capi), false);
}
}
#[inline(always)]
pub fn process_wire_packet(&self, local_socket: i64, remote_address: &SockAddr, data: &mut Buffer) -> ResultCode {
let current_time = self.now.get();
let mut next_task_deadline: i64 = current_time;
unsafe {
return ResultCode::from_u32(ztcore::ZT_Node_processWirePacket(self.capi, null_mut(), current_time, local_socket, remote_address.as_ptr() as *const ztcore::sockaddr_storage, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_task_deadline as *mut i64) as u32).unwrap_or(ResultCode::ErrorInternalNonFatal);
}
}
#[inline(always)]
pub fn process_virtual_network_frame(&self, nwid: &NetworkId, source_mac: &MAC, dest_mac: &MAC, ethertype: u16, vlan_id: u16, data: &mut Buffer) -> ResultCode {
let current_time = self.now.get();
let mut next_tick_deadline: i64 = current_time;
unsafe {
return ResultCode::from_u32(ztcore::ZT_Node_processVirtualNetworkFrame(self.capi, null_mut(), current_time, nwid.0, source_mac.0, dest_mac.0, ethertype as c_uint, vlan_id as c_uint, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_tick_deadline as *mut i64) as u32).unwrap_or(ResultCode::ErrorInternalNonFatal);
}
}
pub fn multicast_subscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode {
unsafe {
return ResultCode::from_u32(ztcore::ZT_Node_multicastSubscribe(self.capi, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as u32).unwrap_or(ResultCode::ErrorInternalNonFatal);
}
}
pub fn multicast_unsubscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode {
unsafe {
return ResultCode::from_u32(ztcore::ZT_Node_multicastUnsubscribe(self.capi, nwid.0, multicast_group.0, multicast_adi as c_ulong) as u32).unwrap_or(ResultCode::ErrorInternalNonFatal);
}
}
}
unsafe impl Sync for Node {}
unsafe impl Send for Node {}
impl Drop for Node {
fn drop(&mut self) {
self.background_thread_run.store(false, Ordering::Relaxed);
std::thread::yield_now();
let bt = self.background_thread.replace(None);
if bt.is_some() {
let bt = bt.unwrap();
bt.thread().unpark();
let _ = bt.join();
}
unsafe {
ztcore::ZT_Node_delete(self.capi, 0 as *mut c_void);
}
}
}

View file

@ -0,0 +1,58 @@
#[cfg(all(target_pointer_width = "32"))]
use std::sync::Mutex;
#[cfg(all(target_pointer_width = "64"))]
use std::sync::atomic::{AtomicI64, Ordering};
// This implements a basic atomic i64 that uses a mutex on 32-bit systems,
// since you can't atomically access something larger than word size.
#[cfg(all(target_pointer_width = "32"))]
pub struct PortableAtomicI64 {
i: Mutex<i64>
}
#[cfg(all(target_pointer_width = "32"))]
impl PortableAtomicI64 {
#[inline(always)]
pub fn new(v: i64) -> PortableAtomicI64 {
PortableAtomicI64{
i: Mutex::new(v)
}
}
#[inline(always)]
pub fn get(&self) -> i64 {
*self.i.lock().unwrap()
}
#[inline(always)]
pub fn set(&self, v: i64) {
*self.i.lock().unwrap() = v;
}
}
#[cfg(all(target_pointer_width = "64"))]
pub struct PortableAtomicI64 {
i: AtomicI64
}
#[cfg(all(target_pointer_width = "64"))]
impl PortableAtomicI64 {
#[inline(always)]
pub fn new(v: i64) -> PortableAtomicI64 {
PortableAtomicI64{
i: AtomicI64::new(v)
}
}
#[inline(always)]
pub fn get(&self) -> i64 {
self.i.load(Ordering::Relaxed)
}
#[inline(always)]
pub fn set(&self, v: i64) {
self.i.store(v, Ordering::Relaxed)
}
}