From 88181c491ce74987bdaca34447bae12772de64d5 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Mon, 1 Mar 2021 16:26:21 -0500 Subject: [PATCH] Pin memory in ZT core CAPI wrapper. --- rust-zerotier-core/src/node.rs | 338 ++++++++++----------- rust-zerotier-service/Cargo.lock | 13 +- rust-zerotier-service/Cargo.toml | 2 +- rust-zerotier-service/src/fastudpsocket.rs | 20 +- rust-zerotier-service/src/service.rs | 79 ++--- rust-zerotier-service/src/weblistener.rs | 61 ++-- 6 files changed, 258 insertions(+), 255 deletions(-) diff --git a/rust-zerotier-core/src/node.rs b/rust-zerotier-core/src/node.rs index b354df3b7..e96800e5d 100644 --- a/rust-zerotier-core/src/node.rs +++ b/rust-zerotier-core/src/node.rs @@ -15,8 +15,9 @@ use std::collections::hash_map::HashMap; use std::intrinsics::copy_nonoverlapping; use std::mem::{MaybeUninit, transmute}; use std::os::raw::{c_int, c_uint, c_ulong, c_void}; +use std::pin::Pin; use std::ptr::{null_mut, slice_from_raw_parts}; -use std::sync::*; +use std::sync::Mutex; use num_traits::FromPrimitive; use serde::{Deserialize, Serialize}; @@ -24,10 +25,11 @@ use serde::{Deserialize, Serialize}; use crate::*; use crate::capi as ztcore; -/// Maximum delay between calls to run_background_tasks() -pub const NODE_BACKGROUND_TASKS_MAX_INTERVAL: u64 = 200; +pub const NODE_BACKGROUND_TASKS_MAX_INTERVAL: i64 = 200; -#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq)] +const EMPTY_BYTE_ARRAY: [u8; 0] = []; + +#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq, Clone, Copy)] pub enum Event { Up = ztcore::ZT_Event_ZT_EVENT_UP as isize, Offline = ztcore::ZT_Event_ZT_EVENT_OFFLINE as isize, @@ -37,7 +39,7 @@ pub enum Event { UserMessage = ztcore::ZT_Event_ZT_EVENT_USER_MESSAGE as isize, } -#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq)] +#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq, Clone, Copy)] pub enum StateObjectType { IdentityPublic = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_IDENTITY_PUBLIC as isize, IdentitySecret = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_IDENTITY_SECRET as isize, @@ -49,7 +51,7 @@ pub enum StateObjectType { } /// The status of a ZeroTier node. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct NodeStatus { pub address: Address, pub identity: Identity, @@ -63,12 +65,12 @@ pub struct NodeStatus { /// An event handler that receives events, frames, and packets from the core. /// Note that these handlers can be called concurrently from any thread and /// must be thread safe. -pub trait NodeEventHandler { +pub trait NodeEventHandler { /// Called when a configuration change or update should be applied to a network. - fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>); + fn virtual_network_config(&self, network_id: NetworkId, network_obj: &N, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>); /// Called when a frame should be injected into the virtual network (physical -> virtual). - fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]); + fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &N, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]); /// Called when a core ZeroTier event occurs. fn event(&self, event: Event, event_data: &[u8]); @@ -89,30 +91,24 @@ pub trait NodeEventHandler { fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option; } -/// An instance of the ZeroTier core. -/// This is templated on the actual implementation of NodeEventHandler for performance reasons, -/// as it avoids an extra indirect function call. -#[allow(non_snake_case)] -pub struct Node + Sync + Send + Clone + 'static, N: 'static> { +pub struct NodeIntl + Sync + Send + Clone + 'static, N: Sync + Send + 'static> { event_handler: T, capi: *mut ztcore::ZT_Node, now: PortableAtomicI64, - networks_by_id: Mutex>> // pointer to an Arc<> is a raw value created from Box> + networks_by_id: Mutex>>>, } -////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -macro_rules! node_from_raw_ptr { - ($uptr:ident) => { - unsafe { - let ntmp: *const Node = $uptr.cast::>(); - let ntmp: &Node = &*ntmp; - ntmp - } - } +/// An instance of the ZeroTier core. +/// This is templated on the actual implementations of things to avoid "dyn" indirect +/// call overhead. This is a high performance networking thingy so we care about cycles +/// where possible. +pub struct Node + Sync + Send + Clone + 'static, N: Sync + Send + 'static> { + intl: Pin>>, } -extern "C" fn zt_virtual_network_config_function + Sync + Send + Clone + 'static, N: 'static>( +/********************************************************************************************************************/ + +extern "C" fn zt_virtual_network_config_function + Sync + Send + Clone + 'static, N: Sync + Send + 'static>( _: *mut ztcore::ZT_Node, uptr: *mut c_void, _: *mut c_void, @@ -121,21 +117,18 @@ extern "C" fn zt_virtual_network_config_function + Sync + op: ztcore::ZT_VirtualNetworkConfigOperation, conf: *const ztcore::ZT_VirtualNetworkConfig, ) { - let op2 = VirtualNetworkConfigOperation::from_i32(op as i32); - if op2.is_some() { - let op2 = op2.unwrap(); - let n = node_from_raw_ptr!(uptr); - let network_obj = unsafe { &*((*nptr).cast::>()) }; + let _ = VirtualNetworkConfigOperation::from_i32(op as i32).map(|op| { + let n = unsafe { &*(uptr.cast::>()) }; if conf.is_null() { - n.event_handler.virtual_network_config(NetworkId(nwid), network_obj, op2, None); + n.event_handler.virtual_network_config(NetworkId(nwid), unsafe { &*(nptr.cast::()) }, op, None); } else { let conf2 = unsafe { VirtualNetworkConfig::new_from_capi(&*conf) }; - n.event_handler.virtual_network_config(NetworkId(nwid), network_obj, op2, Some(&conf2)); + n.event_handler.virtual_network_config(NetworkId(nwid), unsafe { &*(nptr.cast::()) }, op, Some(&conf2)); } - } + }); } -extern "C" fn zt_virtual_network_frame_function + Sync + Send + Clone + 'static, N: 'static>( +extern "C" fn zt_virtual_network_frame_function + Sync + Send + Clone + 'static, N: Sync + Send + 'static>( _: *mut ztcore::ZT_Node, uptr: *mut c_void, _: *mut c_void, @@ -149,10 +142,9 @@ extern "C" fn zt_virtual_network_frame_function + Sync + data_size: c_uint, ) { if !nptr.is_null() { - let n = node_from_raw_ptr!(uptr); - n.event_handler.virtual_network_frame( + unsafe { &*(uptr.cast::>()) }.event_handler.virtual_network_frame( NetworkId(nwid), - unsafe { &*((*nptr).cast::>()) }, + unsafe { &*(nptr.cast::()) }, MAC(source_mac), MAC(dest_mac), ethertype as u16, @@ -161,7 +153,7 @@ extern "C" fn zt_virtual_network_frame_function + Sync + } } -extern "C" fn zt_event_callback + Sync + Send + Clone + 'static, N: 'static>( +extern "C" fn zt_event_callback + Sync + Send + Clone + 'static, N: Sync + Send + 'static>( _: *mut ztcore::ZT_Node, uptr: *mut c_void, _: *mut c_void, @@ -169,20 +161,17 @@ extern "C" fn zt_event_callback + Sync + Send + Clone + ' data: *const c_void, data_size: c_uint ) { - let ev2 = Event::from_i32(ev as i32); - if ev2.is_some() { - let ev2 = ev2.unwrap(); - let n = node_from_raw_ptr!(uptr); + let _ = Event::from_i32(ev as i32).map(|ev: Event| { + let n = unsafe { &*(uptr.cast::>()) }; if data.is_null() { - n.event_handler.event(ev2, &[0_u8; 0]); + n.event_handler.event(ev, &EMPTY_BYTE_ARRAY); } else { - let data2 = unsafe { &*slice_from_raw_parts(data.cast::(), data_size as usize) }; - n.event_handler.event(ev2, data2); + n.event_handler.event(ev, unsafe { &*slice_from_raw_parts(data.cast::(), data_size as usize) }); } - } + }); } -extern "C" fn zt_state_put_function + Sync + Send + Clone + 'static, N: 'static>( +extern "C" fn zt_state_put_function + Sync + Send + Clone + 'static, N: Sync + Send + 'static>( _: *mut ztcore::ZT_Node, uptr: *mut c_void, _: *mut c_void, @@ -192,17 +181,13 @@ extern "C" fn zt_state_put_function + Sync + Send + Clone obj_data: *const c_void, obj_data_len: c_int, ) { - let obj_type2 = StateObjectType::from_i32(obj_type as i32); - if obj_type2.is_some() { - let obj_type2 = obj_type2.unwrap(); - let n = node_from_raw_ptr!(uptr); - let obj_id2 = unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }; - let obj_data2 = unsafe { &*slice_from_raw_parts(obj_data.cast::(), obj_data_len as usize) }; - let _ = n.event_handler.state_put(obj_type2, obj_id2, obj_data2); - } + let _ = StateObjectType::from_i32(obj_type as i32).map(|obj_type| { + let n = unsafe { &*(uptr.cast::>()) }; + let _ = n.event_handler.state_put(obj_type, unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }, unsafe { &*slice_from_raw_parts(obj_data.cast::(), obj_data_len as usize) }); + }); } -extern "C" fn zt_state_get_function + Sync + Send + Clone + 'static, N: 'static>( +extern "C" fn zt_state_get_function + Sync + Send + Clone + 'static, N: Sync + Send + 'static>( _: *mut ztcore::ZT_Node, uptr: *mut c_void, _: *mut c_void, @@ -213,38 +198,39 @@ extern "C" fn zt_state_get_function + Sync + Send + Clone obj_data_free_function: *mut *mut c_void, ) -> c_int { if obj_data.is_null() || obj_data_free_function.is_null() { - return -1; - } - unsafe { - *obj_data = null_mut(); - *obj_data_free_function = transmute(ztcore::free as *const ()); - } - - let obj_type2 = StateObjectType::from_i32(obj_type as i32); - if obj_type2.is_some() { - let obj_type2 = obj_type2.unwrap(); - let n = node_from_raw_ptr!(uptr); - let obj_id2 = unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }; - let obj_data_result = n.event_handler.state_get(obj_type2, obj_id2); - if obj_data_result.is_ok() { - let obj_data_result = obj_data_result.unwrap(); - if obj_data_result.len() > 0 { - unsafe { - let obj_data_len: c_int = obj_data_result.len() as c_int; - let obj_data_raw = ztcore::malloc(obj_data_len as c_ulong); - if !obj_data_raw.is_null() { - copy_nonoverlapping(obj_data_result.as_ptr(), obj_data_raw.cast::(), obj_data_len as usize); - *obj_data = obj_data_raw; - return obj_data_len; - } - } - } + -1 as c_int + } else { + unsafe { + *obj_data = null_mut(); + *obj_data_free_function = transmute(ztcore::free as *const ()); } + StateObjectType::from_i32(obj_type as i32).map_or_else(|| { + -1 as c_int + }, |obj_type| { + unsafe { &*(uptr.cast::>()) }.event_handler.state_get(obj_type, unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }).map_or_else(|_| { + -1 as c_int + }, |obj_data_result| { + let obj_data_len = obj_data_result.len() as c_int; + if obj_data_len > 0 { + unsafe { + let obj_data_raw = ztcore::malloc(obj_data_len as c_ulong); + if obj_data_raw.is_null() { + -1 as c_int + } else { + copy_nonoverlapping(obj_data_result.as_ptr(), obj_data_raw.cast::(), obj_data_len as usize); + *obj_data = obj_data_raw; + obj_data_len + } + } + } else { + -1 as c_int + } + }) + }) } - return -1; } -extern "C" fn zt_wire_packet_send_function + Sync + Send + Clone + 'static, N: 'static>( +extern "C" fn zt_wire_packet_send_function + Sync + Send + Clone + 'static, N: Sync + Send + 'static>( _: *mut ztcore::ZT_Node, uptr: *mut c_void, _: *mut c_void, @@ -254,11 +240,10 @@ extern "C" fn zt_wire_packet_send_function + Sync + Send data_size: c_uint, packet_ttl: c_uint, ) -> c_int { - let n = node_from_raw_ptr!(uptr); - return n.event_handler.wire_packet_send(local_socket, InetAddress::transmute_capi(unsafe { &*sock_addr }), unsafe { &*slice_from_raw_parts(data.cast::(), data_size as usize) }, packet_ttl as u32) as c_int; + unsafe { &*(uptr.cast::>()) }.event_handler.wire_packet_send(local_socket, InetAddress::transmute_capi(unsafe { &*sock_addr }), unsafe { &*slice_from_raw_parts(data.cast::(), data_size as usize) }, packet_ttl as u32) as c_int } -extern "C" fn zt_path_check_function + Sync + Send + Clone + 'static, N: 'static>( +extern "C" fn zt_path_check_function + Sync + Send + Clone + 'static, N: Sync + Send + 'static>( _: *mut ztcore::ZT_Node, uptr: *mut c_void, _: *mut c_void, @@ -267,15 +252,11 @@ extern "C" fn zt_path_check_function + Sync + Send + Clon local_socket: i64, sock_addr: *const ztcore::ZT_InetAddress, ) -> c_int { - let n = node_from_raw_ptr!(uptr); let id = Identity::new_from_capi(identity, false); - if n.event_handler.path_check(Address(address), &id, local_socket, InetAddress::transmute_capi(unsafe{ &*sock_addr })) { - return 1; - } - return 0; + unsafe { &*(uptr.cast::>()) }.event_handler.path_check(Address(address), &id, local_socket, InetAddress::transmute_capi(unsafe{ &*sock_addr })) as c_int } -extern "C" fn zt_path_lookup_function + Sync + Send + Clone + 'static, N: 'static>( +extern "C" fn zt_path_lookup_function + Sync + Send + Clone + 'static, N: Sync + Send + 'static>( _: *mut ztcore::ZT_Node, uptr: *mut c_void, _: *mut c_void, @@ -298,34 +279,34 @@ extern "C" fn zt_path_lookup_function + Sync + Send + Clo } } - let n = node_from_raw_ptr!(uptr); let id = Identity::new_from_capi(identity, false); - let result = n.event_handler.path_lookup(Address(address), &id, sock_family2); - if result.is_some() { - let result = result.unwrap(); + unsafe { &*(uptr.cast::>()) }.event_handler.path_lookup(Address(address), &id, sock_family2).map_or_else(|| { + 0 as c_int + }, |result| { let result_ptr = &result as *const InetAddress; unsafe { copy_nonoverlapping(result_ptr.cast::(), sock_addr, 1); } - return 1; - } - return 0; + 1 as c_int + }) } -////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +/********************************************************************************************************************/ -impl + Sync + Send + Clone + 'static, N: 'static> Node { +impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Node { /// Create a new Node with a given event handler. + #[allow(unused_mut)] pub fn new(event_handler: T, now: i64) -> Result, ResultCode> { let mut n = Node { - event_handler: event_handler.clone(), - capi: null_mut(), - now: PortableAtomicI64::new(now), - networks_by_id: Mutex::new(HashMap::new()) + intl: Box::pin(NodeIntl { + event_handler: event_handler.clone(), + capi: null_mut(), + now: PortableAtomicI64::new(now), + networks_by_id: Mutex::new(HashMap::new()) + }), }; - let mut capi: *mut ztcore::ZT_Node = null_mut(); - unsafe { + let rc = unsafe { let callbacks = ztcore::ZT_Node_Callbacks { statePutFunction: transmute(zt_state_put_function:: as *const ()), stateGetFunction: transmute(zt_state_get_function:: as *const ()), @@ -336,49 +317,34 @@ impl + Sync + Send + Clone + 'static, N: 'static> Node as *const ()), pathLookupFunction: transmute(zt_path_lookup_function:: as *const ()), }; - let rc = ztcore::ZT_Node_new(&mut capi as *mut *mut ztcore::ZT_Node, transmute(&n as *const Node), null_mut(), &callbacks as *const ztcore::ZT_Node_Callbacks, now); - if rc != 0 { - return Err(ResultCode::from_i32(rc as i32).unwrap_or(ResultCode::FatalErrorInternal)); - } else if capi.is_null() { - return Err(ResultCode::FatalErrorInternal); - } - } - n.capi = capi; + ztcore::ZT_Node_new(transmute(&(n.intl.capi) as *const *mut ztcore::ZT_Node), transmute(&*n.intl as *const NodeIntl), null_mut(), &callbacks, now) + }; - Ok(n) + if rc == 0 { + assert!(!n.intl.capi.is_null()); + Ok(n) + } else { + Err(ResultCode::from_i32(rc as i32).unwrap_or(ResultCode::FatalErrorInternal)) + } } /// Perform periodic background tasks. /// The first call should happen no more than NODE_BACKGROUND_TASKS_MAX_INTERVAL milliseconds /// since the node was created, and after this runs it returns the amount of time the caller /// should wait before calling it again. - pub fn process_background_tasks(&self, now: i64) -> u64 { - self.now.set(now); - + pub fn process_background_tasks(&self, now: i64) -> i64 { + self.intl.now.set(now); let mut next_task_deadline: i64 = now; unsafe { - ztcore::ZT_Node_processBackgroundTasks(self.capi, null_mut(), now, (&mut next_task_deadline as *mut i64).cast()); + ztcore::ZT_Node_processBackgroundTasks(self.intl.capi, null_mut(), now, (&mut next_task_deadline as *mut i64).cast()); } - let mut next_delay = next_task_deadline - now; - - if next_delay < 1 { - next_delay = 1; - } else if next_delay > NODE_BACKGROUND_TASKS_MAX_INTERVAL as i64 { - next_delay = NODE_BACKGROUND_TASKS_MAX_INTERVAL as i64; - } - next_delay as u64 + (next_task_deadline - now).clamp(1_i64, NODE_BACKGROUND_TASKS_MAX_INTERVAL) } - fn delete_network_uptr(&self, nwid: u64) { - let nptr = self.networks_by_id.lock().unwrap().remove(&nwid).unwrap_or(null_mut()); - if !nptr.is_null() { - unsafe { - Box::from_raw(nptr); - } - } - } - - pub fn join(&self, nwid: NetworkId, controller_fingerprint: Option, network_obj: &Arc) -> ResultCode { + /// Join a network, associating network_obj with it. + /// If a fingerprint is supplied it will be used as a full sha384 fingerprint of the + /// network's controller. + pub fn join(&self, nwid: NetworkId, controller_fingerprint: Option, network_obj: N) -> ResultCode { let mut cfp: MaybeUninit = MaybeUninit::uninit(); let mut cfpp: *mut ztcore::ZT_Fingerprint = null_mut(); if controller_fingerprint.is_some() { @@ -390,59 +356,74 @@ impl + Sync + Send + Clone + 'static, N: 'static> Node(), null_mut()) }; - if rc != ztcore::ZT_ResultCode_ZT_RESULT_OK { - self.delete_network_uptr(nwid.0); + let network_obj = Box::pin(network_obj); + let rc = unsafe { ztcore::ZT_Node_join(self.intl.capi, nwid.0, cfpp, transmute((&*network_obj) as *const N), null_mut()) }; + if rc == ztcore::ZT_ResultCode_ZT_RESULT_OK { + self.intl.networks_by_id.lock().unwrap().insert(nwid.0, network_obj); + ResultCode::Ok + } else { + ResultCode::from_i32(rc as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } - return ResultCode::from_i32(rc as i32).unwrap_or(ResultCode::ErrorInternalNonFatal); } + /// Leave a network. pub fn leave(&self, nwid: NetworkId) -> ResultCode { - self.delete_network_uptr(nwid.0); - unsafe { - return ResultCode::from_i32(ztcore::ZT_Node_leave(self.capi, nwid.0, null_mut(), null_mut()) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal); - } + self.intl.networks_by_id.lock().unwrap().remove(&nwid.0).map_or_else(|| { + ResultCode::ErrorNetworkNotFound + }, |_| { + unsafe { ResultCode::from_i32(ztcore::ZT_Node_leave(self.intl.capi, nwid.0, null_mut(), null_mut()) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } + }) + } + + /// Access a network's associated network object. + /// This executes the supplied function or closure if we are joined to + /// this network, providing its associated network object as a parameter. + /// This happens while the internal data structure is locked, so do not + /// do anything time consuming while inside this function. The return value + /// (if any) of this function is returned, or None if we are not joined to + /// this network. + #[inline(always)] + pub fn network R, R>(&self, nwid: NetworkId, f: F) -> Option { + self.intl.networks_by_id.lock().unwrap().get(&nwid.0).map_or_else(|| { + None + }, |nw| { + Some(f(&*nw)) + }) } #[inline(always)] pub fn address(&self) -> Address { - unsafe { - return Address(ztcore::ZT_Node_address(self.capi) as u64); - } + unsafe { Address(ztcore::ZT_Node_address(self.intl.capi) as u64) } } #[inline(always)] pub fn process_wire_packet(&self, local_socket: i64, remote_address: &InetAddress, data: Buffer) -> ResultCode { - let current_time = self.now.get(); + let intl = &*self.intl; + let current_time = intl.now.get(); let mut next_task_deadline: i64 = current_time; - let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processWirePacket(self.capi, null_mut(), current_time, local_socket, remote_address.as_capi_ptr(), data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_task_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }; + let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processWirePacket(intl.capi, null_mut(), current_time, local_socket, remote_address.as_capi_ptr(), data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_task_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }; std::mem::forget(data); // prevent Buffer from being returned to ZT core twice, see comment in drop() in buffer.rs rc } #[inline(always)] pub fn process_virtual_network_frame(&self, nwid: &NetworkId, source_mac: &MAC, dest_mac: &MAC, ethertype: u16, vlan_id: u16, data: Buffer) -> ResultCode { - let current_time = self.now.get(); + let intl = &*self.intl; + let current_time = intl.now.get(); let mut next_tick_deadline: i64 = current_time; - let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processVirtualNetworkFrame(self.capi, null_mut(), current_time, nwid.0, source_mac.0, dest_mac.0, ethertype as c_uint, vlan_id as c_uint, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_tick_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }; + let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processVirtualNetworkFrame(intl.capi, null_mut(), current_time, nwid.0, source_mac.0, dest_mac.0, ethertype as c_uint, vlan_id as c_uint, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_tick_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }; std::mem::forget(data); // prevent Buffer from being returned to ZT core twice, see comment in drop() in buffer.rs rc } #[inline(always)] pub fn multicast_subscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode { - unsafe { - return ResultCode::from_i32(ztcore::ZT_Node_multicastSubscribe(self.capi, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal); - } + unsafe { ResultCode::from_i32(ztcore::ZT_Node_multicastSubscribe(self.intl.capi, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } } #[inline(always)] pub fn multicast_unsubscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode { - unsafe { - return ResultCode::from_i32(ztcore::ZT_Node_multicastUnsubscribe(self.capi, nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal); - } + unsafe { ResultCode::from_i32(ztcore::ZT_Node_multicastUnsubscribe(self.intl.capi, nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } } /// Get a copy of this node's identity. @@ -456,31 +437,31 @@ impl + Sync + Send + Clone + 'static, N: 'static> Node Identity { - Identity::new_from_capi(ztcore::ZT_Node_identity(self.capi), false) + Identity::new_from_capi(ztcore::ZT_Node_identity(self.intl.capi), false) } pub fn status(&self) -> NodeStatus { let mut ns: MaybeUninit = MaybeUninit::zeroed(); unsafe { - ztcore::ZT_Node_status(self.capi, ns.as_mut_ptr()); + ztcore::ZT_Node_status(self.intl.capi, ns.as_mut_ptr()); let ns = ns.assume_init(); if ns.identity.is_null() { panic!("ZT_Node_status() returned null identity"); } - return NodeStatus { + NodeStatus { address: Address(ns.address), identity: Identity::new_from_capi(&*ns.identity, false).clone(), public_identity: cstr_to_string(ns.publicIdentity, -1), secret_identity: cstr_to_string(ns.secretIdentity, -1), online: ns.online != 0, - }; + } } } pub fn peers(&self) -> Vec { let mut p: Vec = Vec::new(); unsafe { - let pl = ztcore::ZT_Node_peers(self.capi); + let pl = ztcore::ZT_Node_peers(self.intl.capi); if !pl.is_null() { let peer_count = (*pl).peerCount as usize; p.reserve(peer_count); @@ -496,7 +477,7 @@ impl + Sync + Send + Clone + 'static, N: 'static> Node Vec { let mut n: Vec = Vec::new(); unsafe { - let nl = ztcore::ZT_Node_networks(self.capi); + let nl = ztcore::ZT_Node_networks(self.intl.capi); if !nl.is_null() { let net_count = (*nl).networkCount as usize; n.reserve(net_count); @@ -512,7 +493,7 @@ impl + Sync + Send + Clone + 'static, N: 'static> Node Vec<(Certificate, u32)> { let mut c: Vec<(Certificate, u32)> = Vec::new(); unsafe { - let cl = ztcore::ZT_Node_listCertificates(self.capi); + let cl = ztcore::ZT_Node_listCertificates(self.intl.capi); if !cl.is_null() { let cert_count = (*cl).certCount as usize; c.reserve(cert_count); @@ -526,23 +507,14 @@ impl + Sync + Send + Clone + 'static, N: 'static> Node + Sync + Send + Clone + 'static, N: 'static> Sync for Node {} +unsafe impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Sync for Node {} -unsafe impl + Sync + Send + Clone + 'static, N: 'static> Send for Node {} +unsafe impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Send for Node {} -impl + Sync + Send + Clone + 'static,N: 'static> Drop for Node { +impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Drop for Node { fn drop(&mut self) { - // Manually take care of the unboxed Boxes in networks_by_id - let mut nwids: Vec = Vec::new(); - for n in self.networks_by_id.lock().unwrap().iter() { - nwids.push(*n.0); - } - for nwid in nwids.iter() { - self.delete_network_uptr(*nwid); - } - unsafe { - ztcore::ZT_Node_delete(self.capi, null_mut()); + ztcore::ZT_Node_delete(self.intl.capi, null_mut()); } } } diff --git a/rust-zerotier-service/Cargo.lock b/rust-zerotier-service/Cargo.lock index 6fc7fc233..206604253 100644 --- a/rust-zerotier-service/Cargo.lock +++ b/rust-zerotier-service/Cargo.lock @@ -307,17 +307,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "net2" -version = "0.2.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" -dependencies = [ - "cfg-if 0.1.10", - "libc", - "winapi", -] - [[package]] name = "ntapi" version = "0.3.6" @@ -675,12 +664,12 @@ dependencies = [ "hex", "hyper", "lazy_static", - "net2", "num-derive", "num-traits", "num_cpus", "serde", "serde_json", + "socket2", "tokio", "winapi", "zerotier-core", diff --git a/rust-zerotier-service/Cargo.toml b/rust-zerotier-service/Cargo.toml index e2c209c31..dfb466187 100644 --- a/rust-zerotier-service/Cargo.toml +++ b/rust-zerotier-service/Cargo.toml @@ -21,7 +21,7 @@ lazy_static = "1" num-traits = "0" num-derive = "0" hyper = { version = "0", features = ["http1", "runtime", "server", "client", "tcp", "stream"] } -net2 = "0.2" +socket2 = { version = "0", features = ["reuseport", "unix", "pair"] } [target."cfg(windows)".dependencies] winapi = { version = "0.3.9", features = ["handleapi", "ws2ipdef", "ws2tcpip"] } diff --git a/rust-zerotier-service/src/fastudpsocket.rs b/rust-zerotier-service/src/fastudpsocket.rs index 730edd5e4..a0a5ea331 100644 --- a/rust-zerotier-service/src/fastudpsocket.rs +++ b/rust-zerotier-service/src/fastudpsocket.rs @@ -18,6 +18,8 @@ use num_traits::cast::AsPrimitive; use std::os::raw::c_int; use crate::osdep as osdep; +#[cfg(target_os = "linux")] + /* * This is a threaded UDP socket listener for high performance. The fastest way to receive UDP * (without heroic efforts like kernel bypass) on most platforms is to create a separate socket @@ -49,7 +51,7 @@ pub(crate) fn test_bind_udp(port: u16) -> (bool, bool) { } #[cfg(unix)] -fn bind_udp_socket(_: &str, address: &InetAddress) -> Result { +fn bind_udp_socket(_device_name: &str, address: &InetAddress) -> Result { unsafe { let af; let sa_len; @@ -95,6 +97,22 @@ fn bind_udp_socket(_: &str, address: &InetAddress) -> Result>, local_config: Mutex>, + store: Arc, run: AtomicBool, online: AtomicBool, } @@ -47,28 +48,27 @@ struct ServiceIntl { #[derive(Clone)] pub(crate) struct Service { pub(crate) log: Arc, - pub(crate) store: Arc, _node: Weak>, intl: Arc, } impl NodeEventHandler for Service { #[inline(always)] - fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {} + fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Network, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {} #[inline(always)] - fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) {} + fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Network, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) {} #[inline(always)] fn event(&self, event: Event, event_data: &[u8]) { match event { Event::Up => { - d!(self.log, "node started up in data store '{}'", self.store.base_path.to_str().unwrap()); + d!(self.log, "node startup event received."); } Event::Down => { - d!(self.log, "node shutting down."); - self.intl.run.store(false, Ordering::Relaxed); + d!(self.log, "node shutdown event received."); + self.intl.online.store(false, Ordering::Relaxed); } Event::Online => { @@ -78,7 +78,7 @@ impl NodeEventHandler for Service { Event::Offline => { d!(self.log, "node is offline."); - self.intl.online.store(true, Ordering::Relaxed); + self.intl.online.store(false, Ordering::Relaxed); } Event::Trace => { @@ -108,16 +108,16 @@ impl NodeEventHandler for Service { #[inline(always)] fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> { if !obj_data.is_empty() { - self.store.store_object(&obj_type, obj_id, obj_data) + self.intl.store.store_object(&obj_type, obj_id, obj_data) } else { - self.store.erase_object(&obj_type, obj_id); + self.intl.store.erase_object(&obj_type, obj_id); Ok(()) } } #[inline(always)] fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result> { - self.store.load_object(&obj_type, obj_id) + self.intl.store.load_object(&obj_type, obj_id) } #[inline(always)] @@ -160,14 +160,19 @@ impl Service { } /// Get the node running with this service. - /// This should never return None, but technically it could if Service - /// persisted beyond the life span of Node. Check this to be technically - /// pure "safe" Rust. + /// This can return None if we are in the midst of shutdown. In this case + /// whatever operation is in progress should abort. None will never be + /// returned during normal operation. #[inline(always)] pub fn node(&self) -> Option>> { self._node.upgrade() } + #[inline(always)] + pub fn store(&self) -> &Arc { + &self.intl.store + } + #[inline(always)] pub fn online(&self) -> bool { self.intl.online.load(Ordering::Relaxed) @@ -183,7 +188,13 @@ unsafe impl Send for Service {} unsafe impl Sync for Service {} -async fn run_async(store: &Arc, auth_token: String, log: Arc, local_config: Arc) -> i32 { +impl Drop for Service { + fn drop(&mut self) { + self.log.debug("Service::drop()"); + } +} + +async fn run_async(store: &Arc, auth_token: String, log: &Arc, local_config: Arc) -> i32 { let mut process_exit_value: i32 = 0; let mut udp_sockets: BTreeMap = BTreeMap::new(); @@ -193,12 +204,12 @@ async fn run_async(store: &Arc, auth_token: String, log: Arc, local_ let (interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1); let mut service = Service { log: log.clone(), - store: store.clone(), _node: Weak::new(), intl: Arc::new(ServiceIntl { auth_token, interrupt: Mutex::new(interrupt_tx), local_config: Mutex::new(local_config), + store: store.clone(), run: AtomicBool::new(true), online: AtomicBool::new(false), }), @@ -214,20 +225,17 @@ async fn run_async(store: &Arc, auth_token: String, log: Arc, local_ service._node = Arc::downgrade(&node); let service = service; // make immutable after setting node + let mut local_config = service.local_config(); + store.write_port(local_config.settings.primary_port); + let mut now: i64 = ms_since_epoch(); let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL; let mut last_checked_config: i64 = 0; - let mut local_config = service.local_config(); while service.intl.run.load(Ordering::Relaxed) { let loop_start = ms_since_epoch(); - // Write zerotier.port which is used by the CLI to know how to reach the HTTP API. - //let _ = store.write_port(local_config.settings.primary_port); - - // Wait for (1) loop delay elapsed, (2) a signal to interrupt delay now, or - // (3) an external signal to exit. tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(loop_delay)) => { + _ = tokio::time::sleep(Duration::from_millis(loop_delay as u64)) => { now = ms_since_epoch(); let actual_delay = now - loop_start; if actual_delay > ((loop_delay as i64) * 4_i64) { @@ -249,23 +257,20 @@ async fn run_async(store: &Arc, auth_token: String, log: Arc, local_ }, } - // Check every CONFIG_CHECK_INTERVAL for changes to either the system configuration - // or the node's local configuration and take actions as needed. if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL { last_checked_config = now; - // Check for changes to local.conf. let new_config = store.read_local_conf(true); if new_config.is_ok() { d!(log, "local.conf changed on disk, reloading."); service.set_local_config(new_config.unwrap()); } - // Check for and handle configuration changes, some of which require inner loop restart. let next_local_config = service.local_config(); if local_config.settings.primary_port != next_local_config.settings.primary_port { local_web_listeners.0 = None; local_web_listeners.1 = None; + store.write_port(next_local_config.settings.primary_port); } if local_config.settings.log.max_size != next_local_config.settings.log.max_size { log.set_max_size(next_local_config.settings.log.max_size); @@ -278,6 +283,7 @@ async fn run_async(store: &Arc, auth_token: String, log: Arc, local_ } local_config = next_local_config; + let mut loopback_dev_name = String::new(); let mut system_addrs: BTreeMap = BTreeMap::new(); getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| { match addr.ip_scope() { @@ -292,8 +298,13 @@ async fn run_async(store: &Arc, auth_token: String, log: Arc, local_ system_addrs.insert(a, String::from(dev)); } } - } - _ => {} + }, + IpScope::Loopback => { + if loopback_dev_name.is_empty() { + loopback_dev_name.push_str(dev); + } + }, + _ => {}, } }); @@ -378,12 +389,12 @@ async fn run_async(store: &Arc, auth_token: String, log: Arc, local_ } if local_web_listeners.0.is_none() { - let _ = WebListener::new("", SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| { + let _ = WebListener::new(loopback_dev_name.as_str(), SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| { local_web_listeners.0 = Some(wl); }); } if local_web_listeners.1.is_none() { - let _ = WebListener::new("", SocketAddr::new(IpAddr::from(Ipv6Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| { + let _ = WebListener::new(loopback_dev_name.as_str(), SocketAddr::new(IpAddr::from(Ipv6Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| { local_web_listeners.1 = Some(wl); }); } @@ -396,13 +407,13 @@ async fn run_async(store: &Arc, auth_token: String, log: Arc, local_ loop_delay = node.process_background_tasks(now); } - l!(log, "shutting down normally..."); + l!(log, "shutting down normally."); drop(udp_sockets); drop(web_listeners); drop(local_web_listeners); - drop(service); drop(node); + drop(service); d!(log, "shutdown complete."); @@ -410,8 +421,6 @@ async fn run_async(store: &Arc, auth_token: String, log: Arc, local_ } pub(crate) fn run(store: &Arc, auth_token: Option) -> i32 { - let mut process_exit_value: i32 = 0; - let local_config = Arc::new(store.read_local_conf(false).unwrap_or_else(|_| { LocalConfig::default() })); let log = Arc::new(Log::new( @@ -451,7 +460,7 @@ pub(crate) fn run(store: &Arc, auth_token: Option) -> i32 { } let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); - process_exit_value = rt.block_on(async move { run_async(store, auth_token, log, local_config).await }); + let process_exit_value = rt.block_on(async move { run_async(store, auth_token, &log, local_config).await }); rt.shutdown_timeout(Duration::from_millis(500)); process_exit_value diff --git a/rust-zerotier-service/src/weblistener.rs b/rust-zerotier-service/src/weblistener.rs index 7b3df9852..79181677a 100644 --- a/rust-zerotier-service/src/weblistener.rs +++ b/rust-zerotier-service/src/weblistener.rs @@ -11,31 +11,31 @@ */ /****/ -use std::any::Any; use std::cell::RefCell; use std::convert::Infallible; use std::net::{SocketAddr, TcpListener}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use futures::TryFutureExt; use hyper::{Body, Request, Response}; use hyper::server::Server; use hyper::service::{make_service_fn, service_fn}; -use net2::TcpBuilder; -#[cfg(unix)] use net2::unix::UnixTcpBuilderExt; use tokio::task::JoinHandle; use zerotier_core::InetAddress; use crate::service::Service; +#[cfg(target_os = "linux")] +use std::os::unix::io::AsRawFd; + +/// Handles API dispatch and other HTTP handler stuff. #[inline(always)] async fn web_handler(service: Service, req: Request) -> Result, Infallible> { Ok(Response::new("Hello, World".into())) } /// Listener for http connections to the API or for TCP P2P. +/// Dropping a listener initiates shutdown of the background hyper Server instance, +/// but it might not shut down instantly as this occurs asynchronously. pub(crate) struct WebListener { shutdown_tx: RefCell>>, server: JoinHandle>, @@ -45,37 +45,50 @@ impl WebListener { /// Create a new "background" TCP WebListener using the current tokio reactor async runtime. pub fn new(_device_name: &str, addr: SocketAddr, service: &Service) -> Result> { let listener = if addr.is_ipv4() { - let l = TcpBuilder::new_v4(); + let l = socket2::Socket::new(socket2::Domain::ipv4(), socket2::Type::stream(), Some(socket2::Protocol::tcp())); if l.is_err() { return Err(Box::new(l.err().unwrap())); } let l = l.unwrap(); #[cfg(unix)] { - let _ = l.reuse_port(true); + let _ = l.set_reuse_port(true); } l } else { - let l = TcpBuilder::new_v6(); + let l = socket2::Socket::new(socket2::Domain::ipv6(), socket2::Type::stream(), Some(socket2::Protocol::tcp())); if l.is_err() { return Err(Box::new(l.err().unwrap())); } let l = l.unwrap(); - let _ = l.only_v6(true); - #[cfg(unix)] { - let _ = l.reuse_port(true); - } + let _ = l.set_only_v6(true); l }; - // TODO: bind to device on Linux? - let listener = listener.bind(addr); - if listener.is_err() { - return Err(Box::new(listener.err().unwrap())); + + #[cfg(target_os = "linux")] { + if !_device_name.is_empty() { + let sock = listener.as_raw_fd(); + unsafe { + let _ = std::ffi::CString::new(_device_name).map(|dn| { + let dnb = dn.as_bytes_with_nul(); + let _ = crate::osdep::setsockopt( + sock as std::os::raw::c_int, + crate::osdep::SOL_SOCKET as std::os::raw::c_int, + crate::osdep::SO_BINDTODEVICE as std::os::raw::c_int, + dnb.as_ptr().cast(), + (dnb.len() - 1) as crate::osdep::socklen_t); + }); + } + } } - let listener = listener.unwrap().listen(128); - if listener.is_err() { - return Err(Box::new(listener.err().unwrap())); + + let addr = socket2::SockAddr::from(addr); + if let Err(e) = listener.bind(&addr) { + return Err(Box::new(e)); } - let listener = listener.unwrap(); + if let Err(e) = listener.listen(128) { + return Err(Box::new(e)); + } + let listener = listener.into_tcp_listener(); let builder = Server::from_tcp(listener); if builder.is_err() { @@ -109,7 +122,9 @@ impl WebListener { impl Drop for WebListener { fn drop(&mut self) { - let _ = self.shutdown_tx.take().map(|tx| { tx.send(()); }); - self.server.abort(); + let _ = self.shutdown_tx.take().map(|tx| { + let _ = tx.send(()); + self.server.abort(); + }); } }