diff --git a/core/zerotier.h b/core/zerotier.h index e0e960291..95ae318ec 100644 --- a/core/zerotier.h +++ b/core/zerotier.h @@ -2020,7 +2020,7 @@ struct ZT_Node_Callbacks * * @return Pointer to I/O buffer */ -ZT_SDK_API [[maybe_unused]] void *ZT_getBuffer(); +ZT_SDK_API void *ZT_getBuffer(); /** * Free an unused buffer obtained via getBuffer diff --git a/rust-zerotier-core/src/node.rs b/rust-zerotier-core/src/node.rs index 3fe68818c..0198e76be 100644 --- a/rust-zerotier-core/src/node.rs +++ b/rust-zerotier-core/src/node.rs @@ -102,8 +102,9 @@ pub trait NodeEventHandler { pub struct NodeIntl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler> { event_handler: T, capi: *mut ztcore::ZT_Node, - now: PortableAtomicI64, networks_by_id: Mutex>>>, + recent_clock: PortableAtomicI64, + recent_ticks: PortableAtomicI64, event_handler_placeholder: PhantomData, } @@ -312,14 +313,15 @@ extern "C" fn zt_path_lookup_function + Sync + Send + Clone + 'stati impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler> Node { /// Create a new Node with a given event handler. #[allow(unused_mut)] - pub fn new(event_handler: T, now: i64) -> Result, ResultCode> { + pub fn new(event_handler: T, clock: i64, ticks: i64) -> Result, ResultCode> { let mut n = Node { intl: Box::pin(NodeIntl { event_handler: event_handler.clone(), capi: null_mut(), - now: PortableAtomicI64::new(now), networks_by_id: Mutex::new(HashMap::new()), event_handler_placeholder: PhantomData::default(), + recent_clock: PortableAtomicI64::new(clock), + recent_ticks: PortableAtomicI64::new(ticks), }), event_handler_placeholder: PhantomData::default(), }; @@ -335,7 +337,7 @@ impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: N pathCheckFunction: transmute(zt_path_check_function:: as *const ()), pathLookupFunction: transmute(zt_path_lookup_function:: as *const ()), }; - ztcore::ZT_Node_new(transmute(&(n.intl.capi) as *const *mut ztcore::ZT_Node), transmute(&*n.intl as *const NodeIntl), null_mut(), &callbacks, now) + ztcore::ZT_Node_new(transmute(&(n.intl.capi) as *const *mut ztcore::ZT_Node), clock, ticks, null_mut(), transmute(&*n.intl as *const NodeIntl), &callbacks) }; if rc == 0 { @@ -350,19 +352,20 @@ impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: N /// The first call should happen no more than NODE_BACKGROUND_TASKS_MAX_INTERVAL milliseconds /// since the node was created, and after this runs it returns the amount of time the caller /// should wait before calling it again. - pub fn process_background_tasks(&self, now: i64) -> i64 { - self.intl.now.set(now); - let mut next_task_deadline: i64 = now; + pub fn process_background_tasks(&self, clock: i64, ticks: i64) -> i64 { + self.intl.recent_clock.set(clock); + self.intl.recent_ticks.set(ticks); + let mut next_task_deadline: i64 = ticks; unsafe { - ztcore::ZT_Node_processBackgroundTasks(self.intl.capi, null_mut(), now, (&mut next_task_deadline as *mut i64).cast()); + ztcore::ZT_Node_processBackgroundTasks(self.intl.capi, clock, ticks, null_mut(), (&mut next_task_deadline as *mut i64).cast()); } - (next_task_deadline - now).clamp(1_i64, NODE_BACKGROUND_TASKS_MAX_INTERVAL) + (next_task_deadline - ticks).clamp(1_i64, NODE_BACKGROUND_TASKS_MAX_INTERVAL) } /// Join a network, associating network_obj with it. /// If a fingerprint is supplied it will be used as a full sha384 fingerprint of the /// network's controller. - pub fn join(&self, nwid: NetworkId, controller_fingerprint: Option, network_obj: N) -> ResultCode { + pub fn join(&self, clock: i64, ticks: i64, nwid: NetworkId, controller_fingerprint: Option, network_obj: N) -> ResultCode { let mut cfp: MaybeUninit = MaybeUninit::uninit(); let mut cfpp: *mut ztcore::ZT_Fingerprint = null_mut(); if controller_fingerprint.is_some() { @@ -375,7 +378,7 @@ impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: N } let network_obj = Box::pin(network_obj); - let rc = unsafe { ztcore::ZT_Node_join(self.intl.capi, nwid.0, cfpp, transmute((&*network_obj) as *const N), null_mut()) }; + let rc = unsafe { ztcore::ZT_Node_join(self.intl.capi, clock, ticks, null_mut(), transmute((&*network_obj) as *const N), nwid.0, cfpp) }; if rc == ztcore::ZT_ResultCode_ZT_RESULT_OK { self.intl.networks_by_id.lock().unwrap().insert(nwid.0, network_obj); ResultCode::Ok @@ -385,11 +388,11 @@ impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: N } /// Leave a network. - pub fn leave(&self, nwid: NetworkId) -> ResultCode { + pub fn leave(&self, clock: i64, ticks: i64, nwid: NetworkId) -> ResultCode { self.intl.networks_by_id.lock().unwrap().remove(&nwid.0).map_or_else(|| { ResultCode::ErrorNetworkNotFound }, |_| { - unsafe { ResultCode::from_i32(ztcore::ZT_Node_leave(self.intl.capi, nwid.0, null_mut(), null_mut()) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } + unsafe { ResultCode::from_i32(ztcore::ZT_Node_leave(self.intl.capi, clock, ticks, null_mut(), null_mut(), nwid.0) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } }) } @@ -413,33 +416,29 @@ impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: N } #[inline(always)] - pub fn process_wire_packet(&self, local_socket: i64, remote_address: &InetAddress, data: Buffer) -> ResultCode { + pub fn process_wire_packet(&self, clock: i64, ticks: i64, local_socket: i64, remote_address: &InetAddress, data: Buffer, next_task_deadline: &mut i64) -> ResultCode { let intl = &*self.intl; - let current_time = intl.now.get(); - let mut next_task_deadline: i64 = current_time; - let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processWirePacket(intl.capi, null_mut(), current_time, local_socket, remote_address.as_capi_ptr(), data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_task_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }; + let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processWirePacket(intl.capi, clock, ticks, null_mut(), local_socket, remote_address.as_capi_ptr(), data.zt_core_buf as *const c_void, data.data_size as u32, 1, (next_task_deadline as *mut i64).cast()) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }; std::mem::forget(data); // prevent Buffer from being returned to ZT core twice, see comment in drop() in buffer.rs rc } #[inline(always)] - pub fn process_virtual_network_frame(&self, nwid: &NetworkId, source_mac: &MAC, dest_mac: &MAC, ethertype: u16, vlan_id: u16, data: Buffer) -> ResultCode { + pub fn process_virtual_network_frame(&self, clock: i64, ticks: i64, nwid: &NetworkId, source_mac: &MAC, dest_mac: &MAC, ethertype: u16, vlan_id: u16, data: Buffer, next_task_deadline: &mut i64) -> ResultCode { let intl = &*self.intl; - let current_time = intl.now.get(); - let mut next_tick_deadline: i64 = current_time; - let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processVirtualNetworkFrame(intl.capi, null_mut(), current_time, nwid.0, source_mac.0, dest_mac.0, ethertype as c_uint, vlan_id as c_uint, data.zt_core_buf as *const c_void, data.data_size as u32, 1, &mut next_tick_deadline as *mut i64) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }; + let rc = unsafe { ResultCode::from_i32(ztcore::ZT_Node_processVirtualNetworkFrame(intl.capi, clock, ticks, null_mut(), nwid.0, source_mac.0, dest_mac.0, ethertype as c_uint, vlan_id as c_uint, data.zt_core_buf as *const c_void, data.data_size as u32, 1, (next_task_deadline as *mut i64).cast()) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) }; std::mem::forget(data); // prevent Buffer from being returned to ZT core twice, see comment in drop() in buffer.rs rc } #[inline(always)] - pub fn multicast_subscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode { - unsafe { ResultCode::from_i32(ztcore::ZT_Node_multicastSubscribe(self.intl.capi, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } + pub fn multicast_subscribe(&self, clock: i64, ticks: i64, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode { + unsafe { ResultCode::from_i32(ztcore::ZT_Node_multicastSubscribe(self.intl.capi, clock, ticks, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } } #[inline(always)] - pub fn multicast_unsubscribe(&self, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode { - unsafe { ResultCode::from_i32(ztcore::ZT_Node_multicastUnsubscribe(self.intl.capi, nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } + pub fn multicast_unsubscribe(&self, clock: i64, ticks: i64, nwid: &NetworkId, multicast_group: &MAC, multicast_adi: u32) -> ResultCode { + unsafe { ResultCode::from_i32(ztcore::ZT_Node_multicastUnsubscribe(self.intl.capi, clock, ticks, null_mut(), nwid.0, multicast_group.0, multicast_adi as c_ulong) as i32).unwrap_or(ResultCode::ErrorInternalNonFatal) } } /// Get a copy of this node's identity. @@ -456,10 +455,10 @@ impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: N Identity::new_from_capi(ztcore::ZT_Node_identity(self.intl.capi), false) } - pub fn status(&self) -> NodeStatus { + pub fn status(&self, clock: i64, ticks: i64) -> NodeStatus { let mut ns: MaybeUninit = MaybeUninit::zeroed(); unsafe { - ztcore::ZT_Node_status(self.intl.capi, ns.as_mut_ptr()); + ztcore::ZT_Node_status(self.intl.capi, clock, ticks, null_mut(), ns.as_mut_ptr()); let ns = ns.assume_init(); if ns.identity.is_null() { panic!("ZT_Node_status() returned null identity"); @@ -474,10 +473,10 @@ impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: N } } - pub fn peers(&self) -> Vec { + pub fn peers(&self, clock: i64, ticks: i64) -> Vec { let mut p: Vec = Vec::new(); unsafe { - let pl = ztcore::ZT_Node_peers(self.intl.capi); + let pl = ztcore::ZT_Node_peers(self.intl.capi, clock, ticks, null_mut()); if !pl.is_null() { let peer_count = (*pl).peerCount as usize; p.reserve(peer_count); @@ -506,10 +505,10 @@ impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: N n } - pub fn certificates(&self) -> Vec<(Certificate, u32)> { + pub fn certificates(&self, clock: i64, ticks: i64) -> Vec<(Certificate, u32)> { let mut c: Vec<(Certificate, u32)> = Vec::new(); unsafe { - let cl = ztcore::ZT_Node_listCertificates(self.intl.capi); + let cl = ztcore::ZT_Node_listCertificates(self.intl.capi, clock, ticks, null_mut()); if !cl.is_null() { let cert_count = (*cl).certCount as usize; c.reserve(cert_count); @@ -538,7 +537,7 @@ unsafe impl + Sync + Send + Clone + 'static, N: Sync + Send + 'stati impl + Sync + Send + Clone + 'static, N: Sync + Send + 'static, H: NodeEventHandler> Drop for Node { fn drop(&mut self) { unsafe { - ztcore::ZT_Node_delete(self.intl.capi, null_mut()); + ztcore::ZT_Node_delete(self.intl.capi, self.intl.recent_clock.get(), self.intl.recent_ticks.get(), null_mut()); } } } diff --git a/service/src/commands/locator.rs b/service/src/commands/locator.rs index 6a5e8b341..faec4979f 100644 --- a/service/src/commands/locator.rs +++ b/service/src/commands/locator.rs @@ -16,14 +16,14 @@ use clap::ArgMatches; use zerotier_core::*; fn new_(cli_args: &ArgMatches) -> i32 { - let timestamp = cli_args.value_of("timestamp").map_or(crate::utils::ms_since_epoch(), |ts| { + let revision = cli_args.value_of("revision").map_or(crate::utils::ms_since_epoch(), |ts| { if ts.is_empty() { 0_i64 } else { i64::from_str_radix(ts, 10).unwrap_or(0_i64) * 1000_i64 // internally uses ms since epoch } }); - if timestamp <= 0 { + if revision <= 0 { println!("ERROR: invalid or empty timestamp specified."); return 1; } @@ -58,7 +58,7 @@ fn new_(cli_args: &ArgMatches) -> i32 { return 1; } - Locator::new(&identity, timestamp, &endpoints).map_or_else(|e| { + Locator::new(&identity, revision, &endpoints).map_or_else(|e| { println!("ERROR: failure creating locator: {}", e.to_str()); 1 }, |loc| { @@ -95,7 +95,7 @@ fn show(cli_args: &ArgMatches) -> i32 { return 1; } let locator = locator.unwrap(); - println!("{} timestamp {}", locator.signer().to_string(), (locator.timestamp() as f64) / 1000.0); + println!("{} revision {}", locator.signer().to_string(), locator.revision()); let endpoints = locator.endpoints(); for ep in endpoints.iter() { println!(" {}", (*ep).to_string()) @@ -103,7 +103,7 @@ fn show(cli_args: &ArgMatches) -> i32 { 0 } -pub(crate) fn run<'a>(cli_args: &ArgMatches<'a>) -> i32 { +pub(crate) fn run(cli_args: &ArgMatches) -> i32 { match cli_args.subcommand() { ("new", Some(sub_cli_args)) => new_(sub_cli_args), ("verify", Some(sub_cli_args)) => verify(sub_cli_args), diff --git a/service/src/main.rs b/service/src/main.rs index ed91d7d22..207f526b9 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -111,7 +111,7 @@ Advanced Operations: locator [args] new [-...] [...] Create new signed locator - -t Timestamp (default: system time) + -r Revision number verify Verify locator signature show Show contents of a locator @@ -265,7 +265,7 @@ fn main() { .arg(Arg::with_name("signature").index(3).required(true)))) .subcommand(App::new("locator") .subcommand(App::new("new") - .arg(Arg::with_name("timestamp").short("t").required(false)) + .arg(Arg::with_name("revision").short("r").required(false)) .arg(Arg::with_name("identity").index(1).required(true)) .arg(Arg::with_name("endpoint").index(2).multiple(true).required(true))) .subcommand(App::new("verify") diff --git a/service/src/service.rs b/service/src/service.rs index 417d2b92c..318e2a7ab 100644 --- a/service/src/service.rs +++ b/service/src/service.rs @@ -268,7 +268,7 @@ async fn run_async(store: Arc, local_config: Arc) -> i32 { online: AtomicBool::new(false), }); - let node = Node::new(service.clone(), ms_monotonic()); + let node = Node::new(service.clone(), ms_since_epoch(), ms_monotonic()); if node.is_err() { service.log.fatal(format!("error initializing node: {}", node.err().unwrap().to_str())); return 1; @@ -278,15 +278,15 @@ async fn run_async(store: Arc, local_config: Arc) -> i32 { let mut local_config = service.local_config(); - let mut now: i64 = ms_monotonic(); + let mut ticks: i64 = ms_monotonic(); let mut loop_delay = zerotier_core::NODE_BACKGROUND_TASKS_MAX_INTERVAL; let mut last_checked_config: i64 = 0; while service.run.load(Ordering::Relaxed) { let loop_delay_start = ms_monotonic(); tokio::select! { _ = tokio::time::sleep(Duration::from_millis(loop_delay as u64)) => { - now = ms_monotonic(); - let actual_delay = now - loop_delay_start; + ticks = ms_monotonic(); + let actual_delay = ticks - loop_delay_start; if actual_delay > ((loop_delay as i64) * 4_i64) { l!(service.log, "likely sleep/wake detected due to excessive loop delay, cycling links..."); // TODO: handle likely sleep/wake or other system interruption @@ -297,7 +297,7 @@ async fn run_async(store: Arc, local_config: Arc) -> i32 { if !service.run.load(Ordering::Relaxed) { break; } - now = ms_monotonic(); + ticks = ms_monotonic(); }, _ = tokio::signal::ctrl_c() => { l!(service.log, "exit signal received, shutting down..."); @@ -306,8 +306,8 @@ async fn run_async(store: Arc, local_config: Arc) -> i32 { }, } - if (now - last_checked_config) >= CONFIG_CHECK_INTERVAL { - last_checked_config = now; + if (ticks - last_checked_config) >= CONFIG_CHECK_INTERVAL { + last_checked_config = ticks; let mut bindings_changed = false; @@ -478,7 +478,7 @@ async fn run_async(store: Arc, local_config: Arc) -> i32 { } // Run background task handler in ZeroTier core. - loop_delay = node.process_background_tasks(now); + loop_delay = node.process_background_tasks(ms_since_epoch(), ticks); } l!(service.log, "shutting down normally.");