Pin memory in ZT core CAPI wrapper.

This commit is contained in:
Adam Ierymenko 2021-03-01 16:26:21 -05:00
parent 6fe78e2003
commit 88181c491c
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
6 changed files with 258 additions and 255 deletions

View file

@ -15,8 +15,9 @@ use std::collections::hash_map::HashMap;
use std::intrinsics::copy_nonoverlapping;
use std::mem::{MaybeUninit, transmute};
use std::os::raw::{c_int, c_uint, c_ulong, c_void};
use std::pin::Pin;
use std::ptr::{null_mut, slice_from_raw_parts};
use std::sync::*;
use std::sync::Mutex;
use num_traits::FromPrimitive;
use serde::{Deserialize, Serialize};
@ -24,10 +25,11 @@ use serde::{Deserialize, Serialize};
use crate::*;
use crate::capi as ztcore;
/// Maximum delay between calls to run_background_tasks()
pub const NODE_BACKGROUND_TASKS_MAX_INTERVAL: u64 = 200;
pub const NODE_BACKGROUND_TASKS_MAX_INTERVAL: i64 = 200;
#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq)]
const EMPTY_BYTE_ARRAY: [u8; 0] = [];
#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq, Clone, Copy)]
pub enum Event {
Up = ztcore::ZT_Event_ZT_EVENT_UP as isize,
Offline = ztcore::ZT_Event_ZT_EVENT_OFFLINE as isize,
@ -37,7 +39,7 @@ pub enum Event {
UserMessage = ztcore::ZT_Event_ZT_EVENT_USER_MESSAGE as isize,
}
#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq)]
#[derive(FromPrimitive, ToPrimitive, PartialEq, Eq, Clone, Copy)]
pub enum StateObjectType {
IdentityPublic = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_IDENTITY_PUBLIC as isize,
IdentitySecret = ztcore::ZT_StateObjectType_ZT_STATE_OBJECT_IDENTITY_SECRET as isize,
@ -49,7 +51,7 @@ pub enum StateObjectType {
}
/// The status of a ZeroTier node.
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct NodeStatus {
pub address: Address,
pub identity: Identity,
@ -63,12 +65,12 @@ pub struct NodeStatus {
/// An event handler that receives events, frames, and packets from the core.
/// Note that these handlers can be called concurrently from any thread and
/// must be thread safe.
pub trait NodeEventHandler<N: 'static> {
pub trait NodeEventHandler<N: Sync + Send + 'static> {
/// Called when a configuration change or update should be applied to a network.
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc<N>, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>);
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &N, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>);
/// Called when a frame should be injected into the virtual network (physical -> virtual).
fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc<N>, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]);
fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &N, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]);
/// Called when a core ZeroTier event occurs.
fn event(&self, event: Event, event_data: &[u8]);
@ -89,30 +91,24 @@ pub trait NodeEventHandler<N: 'static> {
fn path_lookup(&self, address: Address, id: &Identity, desired_family: InetAddressFamily) -> Option<InetAddress>;
}
/// An instance of the ZeroTier core.
/// This is templated on the actual implementation of NodeEventHandler for performance reasons,
/// as it avoids an extra indirect function call.
#[allow(non_snake_case)]
pub struct Node<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> {
pub struct NodeIntl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> {
event_handler: T,
capi: *mut ztcore::ZT_Node,
now: PortableAtomicI64,
networks_by_id: Mutex<HashMap<u64, *mut Arc<N>>> // pointer to an Arc<> is a raw value created from Box<Arc<N>>
networks_by_id: Mutex<HashMap<u64, Pin<Box<N>>>>,
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
macro_rules! node_from_raw_ptr {
($uptr:ident) => {
unsafe {
let ntmp: *const Node<T, N> = $uptr.cast::<Node<T, N>>();
let ntmp: &Node<T, N> = &*ntmp;
ntmp
}
}
/// An instance of the ZeroTier core.
/// This is templated on the actual implementations of things to avoid "dyn" indirect
/// call overhead. This is a high performance networking thingy so we care about cycles
/// where possible.
pub struct Node<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> {
intl: Pin<Box<NodeIntl<T, N>>>,
}
extern "C" fn zt_virtual_network_config_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static>(
/********************************************************************************************************************/
extern "C" fn zt_virtual_network_config_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -121,21 +117,18 @@ extern "C" fn zt_virtual_network_config_function<T: NodeEventHandler<N> + Sync +
op: ztcore::ZT_VirtualNetworkConfigOperation,
conf: *const ztcore::ZT_VirtualNetworkConfig,
) {
let op2 = VirtualNetworkConfigOperation::from_i32(op as i32);
if op2.is_some() {
let op2 = op2.unwrap();
let n = node_from_raw_ptr!(uptr);
let network_obj = unsafe { &*((*nptr).cast::<Arc<N>>()) };
let _ = VirtualNetworkConfigOperation::from_i32(op as i32).map(|op| {
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) };
if conf.is_null() {
n.event_handler.virtual_network_config(NetworkId(nwid), network_obj, op2, None);
n.event_handler.virtual_network_config(NetworkId(nwid), unsafe { &*(nptr.cast::<N>()) }, op, None);
} else {
let conf2 = unsafe { VirtualNetworkConfig::new_from_capi(&*conf) };
n.event_handler.virtual_network_config(NetworkId(nwid), network_obj, op2, Some(&conf2));
n.event_handler.virtual_network_config(NetworkId(nwid), unsafe { &*(nptr.cast::<N>()) }, op, Some(&conf2));
}
}
});
}
extern "C" fn zt_virtual_network_frame_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static>(
extern "C" fn zt_virtual_network_frame_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -149,10 +142,9 @@ extern "C" fn zt_virtual_network_frame_function<T: NodeEventHandler<N> + Sync +
data_size: c_uint,
) {
if !nptr.is_null() {
let n = node_from_raw_ptr!(uptr);
n.event_handler.virtual_network_frame(
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.virtual_network_frame(
NetworkId(nwid),
unsafe { &*((*nptr).cast::<Arc<N>>()) },
unsafe { &*(nptr.cast::<N>()) },
MAC(source_mac),
MAC(dest_mac),
ethertype as u16,
@ -161,7 +153,7 @@ extern "C" fn zt_virtual_network_frame_function<T: NodeEventHandler<N> + Sync +
}
}
extern "C" fn zt_event_callback<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static>(
extern "C" fn zt_event_callback<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -169,20 +161,17 @@ extern "C" fn zt_event_callback<T: NodeEventHandler<N> + Sync + Send + Clone + '
data: *const c_void,
data_size: c_uint
) {
let ev2 = Event::from_i32(ev as i32);
if ev2.is_some() {
let ev2 = ev2.unwrap();
let n = node_from_raw_ptr!(uptr);
let _ = Event::from_i32(ev as i32).map(|ev: Event| {
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) };
if data.is_null() {
n.event_handler.event(ev2, &[0_u8; 0]);
n.event_handler.event(ev, &EMPTY_BYTE_ARRAY);
} else {
let data2 = unsafe { &*slice_from_raw_parts(data.cast::<u8>(), data_size as usize) };
n.event_handler.event(ev2, data2);
n.event_handler.event(ev, unsafe { &*slice_from_raw_parts(data.cast::<u8>(), data_size as usize) });
}
}
});
}
extern "C" fn zt_state_put_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static>(
extern "C" fn zt_state_put_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -192,17 +181,13 @@ extern "C" fn zt_state_put_function<T: NodeEventHandler<N> + Sync + Send + Clone
obj_data: *const c_void,
obj_data_len: c_int,
) {
let obj_type2 = StateObjectType::from_i32(obj_type as i32);
if obj_type2.is_some() {
let obj_type2 = obj_type2.unwrap();
let n = node_from_raw_ptr!(uptr);
let obj_id2 = unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) };
let obj_data2 = unsafe { &*slice_from_raw_parts(obj_data.cast::<u8>(), obj_data_len as usize) };
let _ = n.event_handler.state_put(obj_type2, obj_id2, obj_data2);
}
let _ = StateObjectType::from_i32(obj_type as i32).map(|obj_type| {
let n = unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) };
let _ = n.event_handler.state_put(obj_type, unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }, unsafe { &*slice_from_raw_parts(obj_data.cast::<u8>(), obj_data_len as usize) });
});
}
extern "C" fn zt_state_get_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static>(
extern "C" fn zt_state_get_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -213,38 +198,39 @@ extern "C" fn zt_state_get_function<T: NodeEventHandler<N> + Sync + Send + Clone
obj_data_free_function: *mut *mut c_void,
) -> c_int {
if obj_data.is_null() || obj_data_free_function.is_null() {
return -1;
}
unsafe {
*obj_data = null_mut();
*obj_data_free_function = transmute(ztcore::free as *const ());
}
let obj_type2 = StateObjectType::from_i32(obj_type as i32);
if obj_type2.is_some() {
let obj_type2 = obj_type2.unwrap();
let n = node_from_raw_ptr!(uptr);
let obj_id2 = unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) };
let obj_data_result = n.event_handler.state_get(obj_type2, obj_id2);
if obj_data_result.is_ok() {
let obj_data_result = obj_data_result.unwrap();
if obj_data_result.len() > 0 {
unsafe {
let obj_data_len: c_int = obj_data_result.len() as c_int;
let obj_data_raw = ztcore::malloc(obj_data_len as c_ulong);
if !obj_data_raw.is_null() {
copy_nonoverlapping(obj_data_result.as_ptr(), obj_data_raw.cast::<u8>(), obj_data_len as usize);
*obj_data = obj_data_raw;
return obj_data_len;
}
}
}
-1 as c_int
} else {
unsafe {
*obj_data = null_mut();
*obj_data_free_function = transmute(ztcore::free as *const ());
}
StateObjectType::from_i32(obj_type as i32).map_or_else(|| {
-1 as c_int
}, |obj_type| {
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.state_get(obj_type, unsafe { &*slice_from_raw_parts(obj_id, obj_id_len as usize) }).map_or_else(|_| {
-1 as c_int
}, |obj_data_result| {
let obj_data_len = obj_data_result.len() as c_int;
if obj_data_len > 0 {
unsafe {
let obj_data_raw = ztcore::malloc(obj_data_len as c_ulong);
if obj_data_raw.is_null() {
-1 as c_int
} else {
copy_nonoverlapping(obj_data_result.as_ptr(), obj_data_raw.cast::<u8>(), obj_data_len as usize);
*obj_data = obj_data_raw;
obj_data_len
}
}
} else {
-1 as c_int
}
})
})
}
return -1;
}
extern "C" fn zt_wire_packet_send_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static>(
extern "C" fn zt_wire_packet_send_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -254,11 +240,10 @@ extern "C" fn zt_wire_packet_send_function<T: NodeEventHandler<N> + Sync + Send
data_size: c_uint,
packet_ttl: c_uint,
) -> c_int {
let n = node_from_raw_ptr!(uptr);
return n.event_handler.wire_packet_send(local_socket, InetAddress::transmute_capi(unsafe { &*sock_addr }), unsafe { &*slice_from_raw_parts(data.cast::<u8>(), data_size as usize) }, packet_ttl as u32) as c_int;
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.wire_packet_send(local_socket, InetAddress::transmute_capi(unsafe { &*sock_addr }), unsafe { &*slice_from_raw_parts(data.cast::<u8>(), data_size as usize) }, packet_ttl as u32) as c_int
}
extern "C" fn zt_path_check_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static>(
extern "C" fn zt_path_check_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -267,15 +252,11 @@ extern "C" fn zt_path_check_function<T: NodeEventHandler<N> + Sync + Send + Clon
local_socket: i64,
sock_addr: *const ztcore::ZT_InetAddress,
) -> c_int {
let n = node_from_raw_ptr!(uptr);
let id = Identity::new_from_capi(identity, false);
if n.event_handler.path_check(Address(address), &id, local_socket, InetAddress::transmute_capi(unsafe{ &*sock_addr })) {
return 1;
}
return 0;
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.path_check(Address(address), &id, local_socket, InetAddress::transmute_capi(unsafe{ &*sock_addr })) as c_int
}
extern "C" fn zt_path_lookup_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static>(
extern "C" fn zt_path_lookup_function<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static>(
_: *mut ztcore::ZT_Node,
uptr: *mut c_void,
_: *mut c_void,
@ -298,34 +279,34 @@ extern "C" fn zt_path_lookup_function<T: NodeEventHandler<N> + Sync + Send + Clo
}
}
let n = node_from_raw_ptr!(uptr);
let id = Identity::new_from_capi(identity, false);
let result = n.event_handler.path_lookup(Address(address), &id, sock_family2);
if result.is_some() {
let result = result.unwrap();
unsafe { &*(uptr.cast::<NodeIntl<T, N>>()) }.event_handler.path_lookup(Address(address), &id, sock_family2).map_or_else(|| {
0 as c_int
}, |result| {
let result_ptr = &result as *const InetAddress;
unsafe {
copy_nonoverlapping(result_ptr.cast::<ztcore::ZT_InetAddress>(), sock_addr, 1);
}
return 1;
}
return 0;
1 as c_int
})
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/********************************************************************************************************************/
impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Node<T, N> {
impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Node<T, N> {
/// Create a new Node with a given event handler.
#[allow(unused_mut)]
pub fn new(event_handler: T, now: i64) -> Result<Node<T, N>, ResultCode> {
let mut n = Node {
event_handler: event_handler.clone(),
capi: null_mut(),
now: PortableAtomicI64::new(now),
networks_by_id: Mutex::new(HashMap::new())
intl: Box::pin(NodeIntl {
event_handler: event_handler.clone(),
capi: null_mut(),
now: PortableAtomicI64::new(now),
networks_by_id: Mutex::new(HashMap::new())
}),
};
let mut capi: *mut ztcore::ZT_Node = null_mut();
unsafe {
let rc = unsafe {
let callbacks = ztcore::ZT_Node_Callbacks {
statePutFunction: transmute(zt_state_put_function::<T, N> as *const ()),
stateGetFunction: transmute(zt_state_get_function::<T, N> as *const ()),
@ -336,49 +317,34 @@ impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Node<T,
pathCheckFunction: transmute(zt_path_check_function::<T, N> as *const ()),
pathLookupFunction: transmute(zt_path_lookup_function::<T, N> as *const ()),
};
let rc = ztcore::ZT_Node_new(&mut capi as *mut *mut ztcore::ZT_Node, transmute(&n as *const Node<T, N>), null_mut(), &callbacks as *const ztcore::ZT_Node_Callbacks, now);
if rc != 0 {
return Err(ResultCode::from_i32(rc as i32).unwrap_or(ResultCode::FatalErrorInternal));
} else if capi.is_null() {
return Err(ResultCode::FatalErrorInternal);
}
}
n.capi = capi;
ztcore::ZT_Node_new(transmute(&(n.intl.capi) as *const *mut ztcore::ZT_Node), transmute(&*n.intl as *const NodeIntl<T, N>), null_mut(), &callbacks, now)
};
Ok(n)
if rc == 0 {
assert!(!n.intl.capi.is_null());
Ok(n)
} else {
Err(ResultCode::from_i32(rc as i32).unwrap_or(ResultCode::FatalErrorInternal))
}
}
/// Perform periodic background tasks.
/// The first call should happen no more than NODE_BACKGROUND_TASKS_MAX_INTERVAL milliseconds
/// since the node was created, and after this runs it returns the amount of time the caller
/// should wait before calling it again.
pub fn process_background_tasks(&self, now: i64) -> u64 {
self.now.set(now);
pub fn process_background_tasks(&self, now: i64) -> i64 {
self.intl.now.set(now);
let mut next_task_deadline: i64 = now;
unsafe {
ztcore::ZT_Node_processBackgroundTasks(self.capi, null_mut(), now, (&mut next_task_deadline as *mut i64).cast());
ztcore::ZT_Node_processBackgroundTasks(self.intl.capi, null_mut(), now, (&mut next_task_deadline as *mut i64).cast());
}
let mut next_delay = next_task_deadline - now;
if next_delay < 1 {
next_delay = 1;
} else if next_delay > NODE_BACKGROUND_TASKS_MAX_INTERVAL as i64 {
next_delay = NODE_BACKGROUND_TASKS_MAX_INTERVAL as i64;
}
next_delay as u64
(next_task_deadline - now).clamp(1_i64, NODE_BACKGROUND_TASKS_MAX_INTERVAL)
}
fn delete_network_uptr(&self, nwid: u64) {
let nptr = self.networks_by_id.lock().unwrap().remove(&nwid).unwrap_or(null_mut());
if !nptr.is_null() {
unsafe {
Box::from_raw(nptr);
}
}
}
pub fn join(&self, nwid: NetworkId, controller_fingerprint: Option<Fingerprint>, network_obj: &Arc<N>) -> ResultCode {
/// Join a network, associating network_obj with it.
/// If a fingerprint is supplied it will be used as a full sha384 fingerprint of the
/// network's controller.
pub fn join(&self, nwid: NetworkId, controller_fingerprint: Option<Fingerprint>, network_obj: N) -> ResultCode {
let mut cfp: MaybeUninit<ztcore::ZT_Fingerprint> = MaybeUninit::uninit();
let mut cfpp: *mut ztcore::ZT_Fingerprint = null_mut();
if controller_fingerprint.is_some() {
@ -390,59 +356,74 @@ impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Node<T,
}
}
let nptr = Box::into_raw(Box::new(network_obj.clone()));
self.networks_by_id.lock().as_deref_mut().unwrap().insert(nwid.0, nptr);
let rc = unsafe { ztcore::ZT_Node_join(self.capi, nwid.0, cfpp, nptr.cast::<c_void>(), null_mut()) };
if rc != ztcore::ZT_ResultCode_ZT_RESULT_OK {
self.delete_network_uptr(nwid.0);
let network_obj = Box::pin(network_obj);
let rc = unsafe { ztcore::ZT_Node_join(self.intl.capi, nwid.0, cfpp, transmute((&*network_obj) as *const N), null_mut()) };
if rc == ztcore::ZT_ResultCode_ZT_RESULT_OK {
self.intl.networks_by_id.lock().unwrap().insert(nwid.0, network_obj);
ResultCode::Ok
} else {
ResultCode::from_i32(rc as i32).unwrap_or(ResultCode::ErrorInternalNonFatal)
}
return ResultCode::from_i32(rc as i32).unwrap_or(ResultCode::ErrorInternalNonFatal);
}
/// Leave a network.
pub fn leave(&self, nwid: NetworkId) -> ResultCode {
self.delete_network_uptr(nwid.0);
unsafe {
return ResultCode::from_i32(ztcore::ZT_Node_leave(self.capi, nwid.0, null_mut(), null_mut()) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal);
}
self.intl.networks_by_id.lock().unwrap().remove(&nwid.0).map_or_else(|| {
ResultCode::ErrorNetworkNotFound
}, |_| {
unsafe { ResultCode::from_i32(ztcore::ZT_Node_leave(self.intl.capi, nwid.0, null_mut(), null_mut()) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }
})
}
/// Access a network's associated network object.
/// This executes the supplied function or closure if we are joined to
/// this network, providing its associated network object as a parameter.
/// This happens while the internal data structure is locked, so do not
/// do anything time consuming while inside this function. The return value
/// (if any) of this function is returned, or None if we are not joined to
/// this network.
#[inline(always)]
pub fn network<F: FnOnce(&N) -> R, R>(&self, nwid: NetworkId, f: F) -> Option<R> {
self.intl.networks_by_id.lock().unwrap().get(&nwid.0).map_or_else(|| {
None
}, |nw| {
Some(f(&*nw))
})
}
#[inline(always)]
pub fn address(&self) -> Address {
unsafe {
return Address(ztcore::ZT_Node_address(self.capi) as u64);
}
unsafe { Address(ztcore::ZT_Node_address(self.intl.capi) as u64) }
}
#[inline(always)]
pub fn process_wire_packet(&self, local_socket: i64, remote_address: &InetAddress, data: Buffer) -> ResultCode {
let current_time = self.now.get();
let intl = &*self.intl;
let current_time = intl.now.get();
let mut next_task_deadline: i64 = current_time;
let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processWirePacket(self.capi, null_mut(), current_time, local_socket, remote_address.as_capi_ptr(), data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_task_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) };
let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processWirePacket(intl.capi, null_mut(), current_time, local_socket, remote_address.as_capi_ptr(), data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_task_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) };
std::mem::forget(data); // prevent Buffer from being returned to ZT core twice, see comment in drop() in buffer.rs
rc
}
#[inline(always)]
pub fn process_virtual_network_frame(&self, nwid: &NetworkId, source_mac: &MAC, dest_mac: &MAC, ethertype: u16, vlan_id: u16, data: Buffer) -> ResultCode {
let current_time = self.now.get();
let intl = &*self.intl;
let current_time = intl.now.get();
let mut next_tick_deadline: i64 = current_time;
let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processVirtualNetworkFrame(self.capi, null_mut(), current_time, nwid.0, source_mac.0, dest_mac.0, ethertype as c_uint, vlan_id as c_uint, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_tick_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) };
let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processVirtualNetworkFrame(intl.capi, null_mut(), current_time, nwid.0, source_mac.0, dest_mac.0, ethertype as c_uint, vlan_id as c_uint, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_tick_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) };
std::mem::forget(data); // prevent Buffer from being returned to ZT core twice, see comment in drop() in buffer.rs
rc
}
#[inline(always)]
pub fn multicast_subscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode {
unsafe {
return ResultCode::from_i32(ztcore::ZT_Node_multicastSubscribe(self.capi, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal);
}
unsafe { ResultCode::from_i32(ztcore::ZT_Node_multicastSubscribe(self.intl.capi, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }
}
#[inline(always)]
pub fn multicast_unsubscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode {
unsafe {
return ResultCode::from_i32(ztcore::ZT_Node_multicastUnsubscribe(self.capi, nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal);
}
unsafe { ResultCode::from_i32(ztcore::ZT_Node_multicastUnsubscribe(self.intl.capi, nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }
}
/// Get a copy of this node's identity.
@ -456,31 +437,31 @@ impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Node<T,
/// to exist the Identity becomes invalid. Use clone() on it to get a copy.
#[inline(always)]
pub(crate) unsafe fn identity_fast(&self) -> Identity {
Identity::new_from_capi(ztcore::ZT_Node_identity(self.capi), false)
Identity::new_from_capi(ztcore::ZT_Node_identity(self.intl.capi), false)
}
pub fn status(&self) -> NodeStatus {
let mut ns: MaybeUninit<ztcore::ZT_NodeStatus> = MaybeUninit::zeroed();
unsafe {
ztcore::ZT_Node_status(self.capi, ns.as_mut_ptr());
ztcore::ZT_Node_status(self.intl.capi, ns.as_mut_ptr());
let ns = ns.assume_init();
if ns.identity.is_null() {
panic!("ZT_Node_status() returned null identity");
}
return NodeStatus {
NodeStatus {
address: Address(ns.address),
identity: Identity::new_from_capi(&*ns.identity, false).clone(),
public_identity: cstr_to_string(ns.publicIdentity, -1),
secret_identity: cstr_to_string(ns.secretIdentity, -1),
online: ns.online != 0,
};
}
}
}
pub fn peers(&self) -> Vec<Peer> {
let mut p: Vec<Peer> = Vec::new();
unsafe {
let pl = ztcore::ZT_Node_peers(self.capi);
let pl = ztcore::ZT_Node_peers(self.intl.capi);
if !pl.is_null() {
let peer_count = (*pl).peerCount as usize;
p.reserve(peer_count);
@ -496,7 +477,7 @@ impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Node<T,
pub fn networks(&self) -> Vec<VirtualNetworkConfig> {
let mut n: Vec<VirtualNetworkConfig> = Vec::new();
unsafe {
let nl = ztcore::ZT_Node_networks(self.capi);
let nl = ztcore::ZT_Node_networks(self.intl.capi);
if !nl.is_null() {
let net_count = (*nl).networkCount as usize;
n.reserve(net_count);
@ -512,7 +493,7 @@ impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Node<T,
pub fn certificates(&self) -> Vec<(Certificate, u32)> {
let mut c: Vec<(Certificate, u32)> = Vec::new();
unsafe {
let cl = ztcore::ZT_Node_listCertificates(self.capi);
let cl = ztcore::ZT_Node_listCertificates(self.intl.capi);
if !cl.is_null() {
let cert_count = (*cl).certCount as usize;
c.reserve(cert_count);
@ -526,23 +507,14 @@ impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Node<T,
}
}
unsafe impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Sync for Node<T, N> {}
unsafe impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Sync for Node<T, N> {}
unsafe impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: 'static> Send for Node<T, N> {}
unsafe impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Send for Node<T, N> {}
impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static,N: 'static> Drop for Node<T, N> {
impl<T: NodeEventHandler<N> + Sync + Send + Clone + 'static, N: Sync + Send + 'static> Drop for Node<T, N> {
fn drop(&mut self) {
// Manually take care of the unboxed Boxes in networks_by_id
let mut nwids: Vec<u64> = Vec::new();
for n in self.networks_by_id.lock().unwrap().iter() {
nwids.push(*n.0);
}
for nwid in nwids.iter() {
self.delete_network_uptr(*nwid);
}
unsafe {
ztcore::ZT_Node_delete(self.capi, null_mut());
ztcore::ZT_Node_delete(self.intl.capi, null_mut());
}
}
}

View file

@ -307,17 +307,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "net2"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -675,12 +664,12 @@ dependencies = [
"hex",
"hyper",
"lazy_static",
"net2",
"num-derive",
"num-traits",
"num_cpus",
"serde",
"serde_json",
"socket2",
"tokio",
"winapi",
"zerotier-core",

View file

@ -21,7 +21,7 @@ lazy_static = "1"
num-traits = "0"
num-derive = "0"
hyper = { version = "0", features = ["http1", "runtime", "server", "client", "tcp", "stream"] }
net2 = "0.2"
socket2 = { version = "0", features = ["reuseport", "unix", "pair"] }
[target."cfg(windows)".dependencies]
winapi = { version = "0.3.9", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

@ -18,6 +18,8 @@ use num_traits::cast::AsPrimitive;
use std::os::raw::c_int;
use crate::osdep as osdep;
#[cfg(target_os = "linux")]
/*
* This is a threaded UDP socket listener for high performance. The fastest way to receive UDP
* (without heroic efforts like kernel bypass) on most platforms is to create a separate socket
@ -49,7 +51,7 @@ pub(crate) fn test_bind_udp(port: u16) -> (bool, bool) {
}
#[cfg(unix)]
fn bind_udp_socket(_: &str, address: &InetAddress) -> Result<FastUDPRawOsSocket, &'static str> {
fn bind_udp_socket(_device_name: &str, address: &InetAddress) -> Result<FastUDPRawOsSocket, &'static str> {
unsafe {
let af;
let sa_len;
@ -95,6 +97,22 @@ fn bind_udp_socket(_: &str, address: &InetAddress) -> Result<FastUDPRawOsSocket,
setsockopt_results |= osdep::setsockopt(s, osdep::SOL_SOCKET.as_(), osdep::SO_NOSIGPIPE.as_(), (&mut fl as *mut c_int).cast(), fl_size)
}
#[cfg(target_os = "linux")] {
if !_device_name.is_empty() {
unsafe {
let _ = std::ffi::CString::new(_device_name).map(|dn| {
let dnb = dn.as_bytes_with_nul();
let _ = osdep::setsockopt(
s.as_(),
osdep::SOL_SOCKET.as_(),
osdep::SO_BINDTODEVICE.as_(),
dnb.as_ptr().cast(),
(dnb.len() - 1).as_());
});
}
}
}
if setsockopt_results != 0 {
osdep::close(s);
return Err("setsockopt() failed");

View file

@ -39,6 +39,7 @@ struct ServiceIntl {
auth_token: String,
interrupt: Mutex<futures::channel::mpsc::Sender<()>>,
local_config: Mutex<Arc<LocalConfig>>,
store: Arc<Store>,
run: AtomicBool,
online: AtomicBool,
}
@ -47,28 +48,27 @@ struct ServiceIntl {
#[derive(Clone)]
pub(crate) struct Service {
pub(crate) log: Arc<Log>,
pub(crate) store: Arc<Store>,
_node: Weak<Node<Service, Network>>,
intl: Arc<ServiceIntl>,
}
impl NodeEventHandler<Network> for Service {
#[inline(always)]
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Arc<Network>, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {}
fn virtual_network_config(&self, network_id: NetworkId, network_obj: &Network, config_op: VirtualNetworkConfigOperation, config: Option<&VirtualNetworkConfig>) {}
#[inline(always)]
fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Arc<Network>, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) {}
fn virtual_network_frame(&self, network_id: NetworkId, network_obj: &Network, source_mac: MAC, dest_mac: MAC, ethertype: u16, vlan_id: u16, data: &[u8]) {}
#[inline(always)]
fn event(&self, event: Event, event_data: &[u8]) {
match event {
Event::Up => {
d!(self.log, "node started up in data store '{}'", self.store.base_path.to_str().unwrap());
d!(self.log, "node startup event received.");
}
Event::Down => {
d!(self.log, "node shutting down.");
self.intl.run.store(false, Ordering::Relaxed);
d!(self.log, "node shutdown event received.");
self.intl.online.store(false, Ordering::Relaxed);
}
Event::Online => {
@ -78,7 +78,7 @@ impl NodeEventHandler<Network> for Service {
Event::Offline => {
d!(self.log, "node is offline.");
self.intl.online.store(true, Ordering::Relaxed);
self.intl.online.store(false, Ordering::Relaxed);
}
Event::Trace => {
@ -108,16 +108,16 @@ impl NodeEventHandler<Network> for Service {
#[inline(always)]
fn state_put(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> {
if !obj_data.is_empty() {
self.store.store_object(&obj_type, obj_id, obj_data)
self.intl.store.store_object(&obj_type, obj_id, obj_data)
} else {
self.store.erase_object(&obj_type, obj_id);
self.intl.store.erase_object(&obj_type, obj_id);
Ok(())
}
}
#[inline(always)]
fn state_get(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result<Vec<u8>> {
self.store.load_object(&obj_type, obj_id)
self.intl.store.load_object(&obj_type, obj_id)
}
#[inline(always)]
@ -160,14 +160,19 @@ impl Service {
}
/// Get the node running with this service.
/// This should never return None, but technically it could if Service
/// persisted beyond the life span of Node. Check this to be technically
/// pure "safe" Rust.
/// This can return None if we are in the midst of shutdown. In this case
/// whatever operation is in progress should abort. None will never be
/// returned during normal operation.
#[inline(always)]
pub fn node(&self) -> Option<Arc<Node<Service, Network>>> {
self._node.upgrade()
}
#[inline(always)]
pub fn store(&self) -> &Arc<Store> {
&self.intl.store
}
#[inline(always)]
pub fn online(&self) -> bool {
self.intl.online.load(Ordering::Relaxed)
@ -183,7 +188,13 @@ unsafe impl Send for Service {}
unsafe impl Sync for Service {}
async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_config: Arc<LocalConfig>) -> i32 {
impl Drop for Service {
fn drop(&mut self) {
self.log.debug("Service::drop()");
}
}
async fn run_async(store: &Arc<Store>, auth_token: String, log: &Arc<Log>, local_config: Arc<LocalConfig>) -> i32 {
let mut process_exit_value: i32 = 0;
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket> = BTreeMap::new();
@ -193,12 +204,12 @@ async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_
let (interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<()>(1);
let mut service = Service {
log: log.clone(),
store: store.clone(),
_node: Weak::new(),
intl: Arc::new(ServiceIntl {
auth_token,
interrupt: Mutex::new(interrupt_tx),
local_config: Mutex::new(local_config),
store: store.clone(),
run: AtomicBool::new(true),
online: AtomicBool::new(false),
}),
@ -214,20 +225,17 @@ async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_
service._node = Arc::downgrade(&node);
let service = service; // make immutable after setting node
let mut local_config = service.local_config();
store.write_port(local_config.settings.primary_port);
let mut now: i64 = ms_since_epoch();
let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL;
let mut last_checked_config: i64 = 0;
let mut local_config = service.local_config();
while service.intl.run.load(Ordering::Relaxed) {
let loop_start = ms_since_epoch();
// Write zerotier.port which is used by the CLI to know how to reach the HTTP API.
//let _ = store.write_port(local_config.settings.primary_port);
// Wait for (1) loop delay elapsed, (2) a signal to interrupt delay now, or
// (3) an external signal to exit.
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(loop_delay)) => {
_ = tokio::time::sleep(Duration::from_millis(loop_delay as u64)) => {
now = ms_since_epoch();
let actual_delay = now - loop_start;
if actual_delay > ((loop_delay as i64) * 4_i64) {
@ -249,23 +257,20 @@ async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_
},
}
// Check every CONFIG_CHECK_INTERVAL for changes to either the system configuration
// or the node's local configuration and take actions as needed.
if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL {
last_checked_config = now;
// Check for changes to local.conf.
let new_config = store.read_local_conf(true);
if new_config.is_ok() {
d!(log, "local.conf changed on disk, reloading.");
service.set_local_config(new_config.unwrap());
}
// Check for and handle configuration changes, some of which require inner loop restart.
let next_local_config = service.local_config();
if local_config.settings.primary_port != next_local_config.settings.primary_port {
local_web_listeners.0 = None;
local_web_listeners.1 = None;
store.write_port(next_local_config.settings.primary_port);
}
if local_config.settings.log.max_size != next_local_config.settings.log.max_size {
log.set_max_size(next_local_config.settings.log.max_size);
@ -278,6 +283,7 @@ async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_
}
local_config = next_local_config;
let mut loopback_dev_name = String::new();
let mut system_addrs: BTreeMap<InetAddress, String> = BTreeMap::new();
getifaddrs::for_each_address(|addr: &InetAddress, dev: &str| {
match addr.ip_scope() {
@ -292,8 +298,13 @@ async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_
system_addrs.insert(a, String::from(dev));
}
}
}
_ => {}
},
IpScope::Loopback => {
if loopback_dev_name.is_empty() {
loopback_dev_name.push_str(dev);
}
},
_ => {},
}
});
@ -378,12 +389,12 @@ async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_
}
if local_web_listeners.0.is_none() {
let _ = WebListener::new("", SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| {
let _ = WebListener::new(loopback_dev_name.as_str(), SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| {
local_web_listeners.0 = Some(wl);
});
}
if local_web_listeners.1.is_none() {
let _ = WebListener::new("", SocketAddr::new(IpAddr::from(Ipv6Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| {
let _ = WebListener::new(loopback_dev_name.as_str(), SocketAddr::new(IpAddr::from(Ipv6Addr::LOCALHOST), local_config.settings.primary_port), &service).map(|wl| {
local_web_listeners.1 = Some(wl);
});
}
@ -396,13 +407,13 @@ async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_
loop_delay = node.process_background_tasks(now);
}
l!(log, "shutting down normally...");
l!(log, "shutting down normally.");
drop(udp_sockets);
drop(web_listeners);
drop(local_web_listeners);
drop(service);
drop(node);
drop(service);
d!(log, "shutdown complete.");
@ -410,8 +421,6 @@ async fn run_async(store: &Arc<Store>, auth_token: String, log: Arc<Log>, local_
}
pub(crate) fn run(store: &Arc<Store>, auth_token: Option<String>) -> i32 {
let mut process_exit_value: i32 = 0;
let local_config = Arc::new(store.read_local_conf(false).unwrap_or_else(|_| { LocalConfig::default() }));
let log = Arc::new(Log::new(
@ -451,7 +460,7 @@ pub(crate) fn run(store: &Arc<Store>, auth_token: Option<String>) -> i32 {
}
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
process_exit_value = rt.block_on(async move { run_async(store, auth_token, log, local_config).await });
let process_exit_value = rt.block_on(async move { run_async(store, auth_token, &log, local_config).await });
rt.shutdown_timeout(Duration::from_millis(500));
process_exit_value

View file

@ -11,31 +11,31 @@
*/
/****/
use std::any::Any;
use std::cell::RefCell;
use std::convert::Infallible;
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use futures::TryFutureExt;
use hyper::{Body, Request, Response};
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use net2::TcpBuilder;
#[cfg(unix)] use net2::unix::UnixTcpBuilderExt;
use tokio::task::JoinHandle;
use zerotier_core::InetAddress;
use crate::service::Service;
#[cfg(target_os = "linux")]
use std::os::unix::io::AsRawFd;
/// Handles API dispatch and other HTTP handler stuff.
#[inline(always)]
async fn web_handler(service: Service, req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new("Hello, World".into()))
}
/// Listener for http connections to the API or for TCP P2P.
/// Dropping a listener initiates shutdown of the background hyper Server instance,
/// but it might not shut down instantly as this occurs asynchronously.
pub(crate) struct WebListener {
shutdown_tx: RefCell<Option<tokio::sync::oneshot::Sender<()>>>,
server: JoinHandle<hyper::Result<()>>,
@ -45,37 +45,50 @@ impl WebListener {
/// Create a new "background" TCP WebListener using the current tokio reactor async runtime.
pub fn new(_device_name: &str, addr: SocketAddr, service: &Service) -> Result<WebListener, Box<dyn std::error::Error>> {
let listener = if addr.is_ipv4() {
let l = TcpBuilder::new_v4();
let l = socket2::Socket::new(socket2::Domain::ipv4(), socket2::Type::stream(), Some(socket2::Protocol::tcp()));
if l.is_err() {
return Err(Box::new(l.err().unwrap()));
}
let l = l.unwrap();
#[cfg(unix)] {
let _ = l.reuse_port(true);
let _ = l.set_reuse_port(true);
}
l
} else {
let l = TcpBuilder::new_v6();
let l = socket2::Socket::new(socket2::Domain::ipv6(), socket2::Type::stream(), Some(socket2::Protocol::tcp()));
if l.is_err() {
return Err(Box::new(l.err().unwrap()));
}
let l = l.unwrap();
let _ = l.only_v6(true);
#[cfg(unix)] {
let _ = l.reuse_port(true);
}
let _ = l.set_only_v6(true);
l
};
// TODO: bind to device on Linux?
let listener = listener.bind(addr);
if listener.is_err() {
return Err(Box::new(listener.err().unwrap()));
#[cfg(target_os = "linux")] {
if !_device_name.is_empty() {
let sock = listener.as_raw_fd();
unsafe {
let _ = std::ffi::CString::new(_device_name).map(|dn| {
let dnb = dn.as_bytes_with_nul();
let _ = crate::osdep::setsockopt(
sock as std::os::raw::c_int,
crate::osdep::SOL_SOCKET as std::os::raw::c_int,
crate::osdep::SO_BINDTODEVICE as std::os::raw::c_int,
dnb.as_ptr().cast(),
(dnb.len() - 1) as crate::osdep::socklen_t);
});
}
}
}
let listener = listener.unwrap().listen(128);
if listener.is_err() {
return Err(Box::new(listener.err().unwrap()));
let addr = socket2::SockAddr::from(addr);
if let Err(e) = listener.bind(&addr) {
return Err(Box::new(e));
}
let listener = listener.unwrap();
if let Err(e) = listener.listen(128) {
return Err(Box::new(e));
}
let listener = listener.into_tcp_listener();
let builder = Server::from_tcp(listener);
if builder.is_err() {
@ -109,7 +122,9 @@ impl WebListener {
impl Drop for WebListener {
fn drop(&mut self) {
let _ = self.shutdown_tx.take().map(|tx| { tx.send(()); });
self.server.abort();
let _ = self.shutdown_tx.take().map(|tx| {
let _ = tx.send(());
self.server.abort();
});
}
}