Centralize import of tokio to control its version and features.

This commit is contained in:
Adam Ierymenko 2022-09-22 09:30:37 -04:00
parent 1828163219
commit be000c2046
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
6 changed files with 24 additions and 12 deletions

View file

@ -9,11 +9,10 @@ path = "src/main.rs"
[dependencies]
zerotier-crypto = { path = "../crypto" }
zerotier-utils = { path = "../utils" }
zerotier-utils = { path = "../utils", features = ["tokio"] }
zerotier-network-hypervisor = { path = "../network-hypervisor" }
zerotier-vl1-service = { path = "../vl1-service" }
async-trait = "^0"
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }

View file

@ -1,8 +1,9 @@
use crate::database::Database;
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::sync::Arc;
use tokio::time::{Duration, Instant};
use zerotier_utils::tokio;
use zerotier_network_hypervisor::protocol::{verbs, PacketBuffer};
use zerotier_network_hypervisor::util::dictionary::Dictionary;
@ -11,16 +12,19 @@ use zerotier_network_hypervisor::vl2::NetworkId;
use zerotier_utils::reaper::Reaper;
use crate::database::Database;
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Controller<DatabaseImpl: Database> {
database: Arc<DatabaseImpl>,
reaper: Reaper,
runtime: tokio::runtime::Handle,
}
impl<DatabaseImpl: Database> Controller<DatabaseImpl> {
pub async fn new(database: Arc<DatabaseImpl>) -> Arc<Self> {
Arc::new(Self { database, reaper: Reaper::new() })
pub async fn new(database: Arc<DatabaseImpl>, runtime: tokio::runtime::Handle) -> Arc<Self> {
Arc::new(Self { database, reaper: Reaper::new(&runtime), runtime })
}
async fn handle_network_config_request<HostSystemImpl: HostSystem>(
@ -86,7 +90,7 @@ impl<DatabaseImpl: Database> InnerProtocol for Controller<DatabaseImpl> {
if let Some(deadline) = Instant::now().checked_add(REQUEST_TIMEOUT) {
self.reaper.add(
tokio::spawn(Self::handle_network_config_request(
self.runtime.spawn(Self::handle_network_config_request(
self.database.clone(),
source.clone(),
source_path.clone(),
@ -98,7 +102,7 @@ impl<DatabaseImpl: Database> InnerProtocol for Controller<DatabaseImpl> {
deadline,
);
} else {
eprintln!("WARNING: instant + REQUEST_TIMEOUT overflowed! should be impossible.");
eprintln!("WARNING: Instant::now() + REQUEST_TIMEOUT overflowed! should be impossible.");
}
PacketHandlerResult::Ok

View file

@ -5,8 +5,12 @@ license = "MPL-2.0"
name = "zerotier-utils"
version = "0.1.0"
[features]
default = []
tokio = ["dep:tokio"]
[dependencies]
parking_lot = { version = "^0", features = [], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
tokio = { version = "^1", default-features = false, features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], optional = true }

View file

@ -10,11 +10,16 @@ pub mod hex;
pub mod json;
pub mod memory;
pub mod pool;
pub mod reaper;
pub mod ringbuffer;
pub mod ringbuffermap;
pub mod varint;
#[cfg(feature = "tokio")]
pub mod reaper;
#[cfg(feature = "tokio")]
pub use tokio;
/// Get milliseconds since unix epoch.
pub fn ms_since_epoch() -> i64 {
std::time::SystemTime::now()

View file

@ -12,11 +12,11 @@ pub struct Reaper {
}
impl Reaper {
pub fn new() -> Self {
pub fn new(runtime: &tokio::runtime::Handle) -> Self {
let q = Arc::new((parking_lot::Mutex::new(VecDeque::with_capacity(16)), Notify::new()));
Self {
q: q.clone(),
finisher: tokio::spawn(async move {
finisher: runtime.spawn(async move {
loop {
q.1.notified().await;
loop {

View file

@ -315,7 +315,7 @@ impl<NodeStorageImpl: NodeStorage, PathFilterImpl: PathFilter, InnerProtocolImpl
let mut daemons: Vec<JoinHandle<()>> = state.daemons.drain(..).collect();
drop(state);
for d in daemons.drain(..) {
d.join();
let _ = d.join();
}
}
}