mirror of
https://github.com/zerotier/ZeroTierOne.git
synced 2025-04-26 17:03:43 +02:00
Fix deadlock, which turned out to be not setting O_NONBLOCK haha.
This commit is contained in:
parent
a61bcaf0f7
commit
65df5a2387
13 changed files with 99 additions and 131 deletions
|
@ -79,8 +79,8 @@ impl<I: Interface> NetworkHypervisor<I> {
|
||||||
/// calling add_update_root_set().
|
/// calling add_update_root_set().
|
||||||
pub fn add_update_default_root_set(&self) -> bool {
|
pub fn add_update_default_root_set(&self) -> bool {
|
||||||
let mut buf: Buffer<4096> = Buffer::new();
|
let mut buf: Buffer<4096> = Buffer::new();
|
||||||
buf.set_to(include_bytes!("../default-rootset/root.zerotier.com.bin"));
|
//buf.set_to(include_bytes!("../default-rootset/root.zerotier.com.bin"));
|
||||||
//buf.set_to(include_bytes!("../default-rootset/local-parallels-test.bin"));
|
buf.set_to(include_bytes!("../default-rootset/local-parallels-test.bin"));
|
||||||
let mut cursor = 0;
|
let mut cursor = 0;
|
||||||
self.add_update_root_set(RootSet::unmarshal(&buf, &mut cursor).unwrap())
|
self.add_update_root_set(RootSet::unmarshal(&buf, &mut cursor).unwrap())
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ pub struct IntervalGate<const FREQ: i64>(i64);
|
||||||
impl<const FREQ: i64> Default for IntervalGate<FREQ> {
|
impl<const FREQ: i64> Default for IntervalGate<FREQ> {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self(0)
|
Self(crate::util::NEVER_HAPPENED_TICKS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ pub struct AtomicIntervalGate<const FREQ: i64>(AtomicI64);
|
||||||
impl<const FREQ: i64> Default for AtomicIntervalGate<FREQ> {
|
impl<const FREQ: i64> Default for AtomicIntervalGate<FREQ> {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self(AtomicI64::new(0))
|
Self(AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,9 @@ pub use zerotier_core_crypto::varint;
|
||||||
|
|
||||||
pub(crate) const ZEROES: [u8; 64] = [0_u8; 64];
|
pub(crate) const ZEROES: [u8; 64] = [0_u8; 64];
|
||||||
|
|
||||||
|
/// A value for ticks that indicates that something never happened, and is thus very long before zero ticks.
|
||||||
|
pub(crate) const NEVER_HAPPENED_TICKS: i64 = -2147483648;
|
||||||
|
|
||||||
#[cfg(feature = "debug_events")]
|
#[cfg(feature = "debug_events")]
|
||||||
#[allow(unused_macros)]
|
#[allow(unused_macros)]
|
||||||
macro_rules! debug_event {
|
macro_rules! debug_event {
|
||||||
|
|
|
@ -98,15 +98,13 @@ impl<O, F: PoolFactory<O>> AsMut<O> for Pooled<O, F> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<O, F: PoolFactory<O>> Drop for Pooled<O, F> {
|
impl<O, F: PoolFactory<O>> Drop for Pooled<O, F> {
|
||||||
#[inline(always)]
|
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
unsafe {
|
let internal = unsafe { self.0.as_mut() };
|
||||||
if let Some(p) = self.0.as_ref().return_pool.upgrade() {
|
if let Some(p) = internal.return_pool.upgrade() {
|
||||||
p.factory.reset(&mut self.0.as_mut().obj);
|
p.factory.reset(&mut internal.obj);
|
||||||
p.pool.lock().push(self.0);
|
p.pool.lock().push(self.0);
|
||||||
} else {
|
} else {
|
||||||
drop(Box::from_raw(self.0.as_ptr()))
|
drop(unsafe { Box::from_raw(self.0.as_ptr()) });
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,15 +124,15 @@ impl<O, F: PoolFactory<O>> Pool<O, F> {
|
||||||
|
|
||||||
/// Get a pooled object, or allocate one if the pool is empty.
|
/// Get a pooled object, or allocate one if the pool is empty.
|
||||||
pub fn get(&self) -> Pooled<O, F> {
|
pub fn get(&self) -> Pooled<O, F> {
|
||||||
Pooled::<O, F>(self.0.pool.lock().pop().unwrap_or_else(
|
if let Some(o) = self.0.pool.lock().pop() {
|
||||||
#[inline(always)]
|
return Pooled::<O, F>(o);
|
||||||
|| unsafe {
|
}
|
||||||
NonNull::new_unchecked(Box::into_raw(Box::new(PoolEntry::<O, F> {
|
return Pooled::<O, F>(unsafe {
|
||||||
obj: self.0.factory.create(),
|
NonNull::new_unchecked(Box::into_raw(Box::new(PoolEntry::<O, F> {
|
||||||
return_pool: Arc::downgrade(&self.0),
|
obj: self.0.factory.create(),
|
||||||
})))
|
return_pool: Arc::downgrade(&self.0),
|
||||||
},
|
})))
|
||||||
))
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Dispose of all pooled objects, freeing any memory they use.
|
/// Dispose of all pooled objects, freeing any memory they use.
|
||||||
|
|
|
@ -333,13 +333,13 @@ impl<SI: SystemInterface> Node<SI> {
|
||||||
};
|
};
|
||||||
|
|
||||||
// We only "spam" if we are offline.
|
// We only "spam" if we are offline.
|
||||||
if root_spam_hello && self.is_online() {
|
if root_spam_hello {
|
||||||
root_spam_hello = false;
|
root_spam_hello = !self.is_online();
|
||||||
}
|
}
|
||||||
|
|
||||||
debug_event!(
|
debug_event!(
|
||||||
si,
|
si,
|
||||||
"[vl1] do_background_tasks:{}{}{}{}{}{}",
|
"[vl1] do_background_tasks:{}{}{}{}{}{} ----",
|
||||||
if root_sync { " root_sync" } else { "" },
|
if root_sync { " root_sync" } else { "" },
|
||||||
if root_hello { " root_hello" } else { "" },
|
if root_hello { " root_hello" } else { "" },
|
||||||
if root_spam_hello { " root_spam_hello" } else { "" },
|
if root_spam_hello { " root_spam_hello" } else { "" },
|
||||||
|
@ -491,6 +491,7 @@ impl<SI: SystemInterface> Node<SI> {
|
||||||
self.whois.service(si, self, tt);
|
self.whois.service(si, self, tt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
debug_event!(si, "[vl1] do_background_tasks DONE ----");
|
||||||
Duration::from_millis(1000)
|
Duration::from_millis(1000)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,8 +46,8 @@ impl<SI: SystemInterface> Path<SI> {
|
||||||
local_socket,
|
local_socket,
|
||||||
local_interface,
|
local_interface,
|
||||||
internal_instance_id: INSTANCE_ID_COUNTER.fetch_add(1, Ordering::SeqCst),
|
internal_instance_id: INSTANCE_ID_COUNTER.fetch_add(1, Ordering::SeqCst),
|
||||||
last_send_time_ticks: AtomicI64::new(0),
|
last_send_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
|
||||||
last_receive_time_ticks: AtomicI64::new(0),
|
last_receive_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
|
||||||
create_time_ticks: time_ticks,
|
create_time_ticks: time_ticks,
|
||||||
fragmented_packets: Mutex::new(HashMap::with_capacity_and_hasher(4, PacketIdHasher(zerotier_core_crypto::random::xorshift64_random()))),
|
fragmented_packets: Mutex::new(HashMap::with_capacity_and_hasher(4, PacketIdHasher(zerotier_core_crypto::random::xorshift64_random()))),
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,10 +149,10 @@ impl<SI: SystemInterface> Peer<SI> {
|
||||||
identity_symmetric_key: SymmetricSecret::new(static_secret),
|
identity_symmetric_key: SymmetricSecret::new(static_secret),
|
||||||
ephemeral_symmetric_key: RwLock::new(None),
|
ephemeral_symmetric_key: RwLock::new(None),
|
||||||
paths: Mutex::new(Vec::with_capacity(4)),
|
paths: Mutex::new(Vec::with_capacity(4)),
|
||||||
last_send_time_ticks: AtomicI64::new(0),
|
last_send_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
|
||||||
last_receive_time_ticks: AtomicI64::new(0),
|
last_receive_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
|
||||||
last_forward_time_ticks: AtomicI64::new(0),
|
last_forward_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
|
||||||
last_hello_reply_time_ticks: AtomicI64::new(0),
|
last_hello_reply_time_ticks: AtomicI64::new(crate::util::NEVER_HAPPENED_TICKS),
|
||||||
create_time_ticks: time_ticks,
|
create_time_ticks: time_ticks,
|
||||||
message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)),
|
message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)),
|
||||||
remote_version: AtomicU64::new(0),
|
remote_version: AtomicU64::new(0),
|
||||||
|
|
48
zerotier-system-service/Cargo.lock
generated
48
zerotier-system-service/Cargo.lock
generated
|
@ -118,9 +118,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap"
|
name = "clap"
|
||||||
version = "3.2.3"
|
version = "3.2.6"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1df386a2d0f35bdefc0642fd8bcb2cd28243959f028abfd22fbade6f7d30980e"
|
checksum = "9f1fe12880bae935d142c8702d500c63a4e8634b6c3c57ad72bf978fc7b6249a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"clap_lex",
|
"clap_lex",
|
||||||
|
@ -131,9 +131,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "clap_lex"
|
name = "clap_lex"
|
||||||
version = "0.2.2"
|
version = "0.2.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5538cd660450ebeb4234cfecf8f2284b844ffc4c50531e66d584ad5b91293613"
|
checksum = "87eba3c8c7f42ef17f6c659fc7416d0f4758cd3e58861ee63c5fa4a4dde649e4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"os_str_bytes",
|
"os_str_bytes",
|
||||||
]
|
]
|
||||||
|
@ -273,15 +273,15 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hashbrown"
|
name = "hashbrown"
|
||||||
version = "0.11.2"
|
version = "0.12.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
|
checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "heapless"
|
name = "heapless"
|
||||||
version = "0.7.13"
|
version = "0.7.14"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8a08e755adbc0ad283725b29f4a4883deee15336f372d5f61fae59efec40f983"
|
checksum = "065681e99f9ef7e0e813702a0326aedbcbbde7db5e55f097aedd1bf50b9dca43"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atomic-polyfill",
|
"atomic-polyfill",
|
||||||
"hash32",
|
"hash32",
|
||||||
|
@ -301,9 +301,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "indexmap"
|
name = "indexmap"
|
||||||
version = "1.8.2"
|
version = "1.9.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a"
|
checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"autocfg",
|
"autocfg",
|
||||||
"hashbrown",
|
"hashbrown",
|
||||||
|
@ -363,9 +363,9 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "0.8.3"
|
version = "0.8.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "713d550d9b44d89174e066b7a6217ae06234c10cb47819a88290d2b353c31799"
|
checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
|
@ -518,18 +518,18 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro2"
|
name = "proc-macro2"
|
||||||
version = "1.0.39"
|
version = "1.0.40"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c54b25569025b7fc9651de43004ae593a75ad88543b17178aa5e1b9c4f15f56f"
|
checksum = "dd96a1e8ed2596c337f8eae5f24924ec83f5ad5ab21ea8e455d3566c69fbcaf7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quote"
|
name = "quote"
|
||||||
version = "1.0.18"
|
version = "1.0.20"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a1feb54ed693b93a84e14094943b84b7c4eae204c512b7ccb95ab0c66d278ad1"
|
checksum = "3bcdf212e9776fbcb2d23ab029360416bb1706b1aea2d1a5ba002727cbcab804"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
]
|
]
|
||||||
|
@ -783,9 +783,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "syn"
|
name = "syn"
|
||||||
version = "1.0.96"
|
version = "1.0.98"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0748dd251e24453cb8717f0354206b91557e4ec8703673a4b30208f2abaf1ebf"
|
checksum = "c50aef8a904de4c23c788f104b7dddc7d6f79c647c7c8ce4cc8f73eb0ca773dd"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -826,21 +826,9 @@ dependencies = [
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
"socket2",
|
"socket2",
|
||||||
"tokio-macros",
|
|
||||||
"winapi",
|
"winapi",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tokio-macros"
|
|
||||||
version = "1.8.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "twox-hash"
|
name = "twox-hash"
|
||||||
version = "1.6.3"
|
version = "1.6.3"
|
||||||
|
|
|
@ -16,7 +16,7 @@ zerotier-network-hypervisor = { path = "../zerotier-network-hypervisor" }
|
||||||
zerotier-core-crypto = { path = "../zerotier-core-crypto" }
|
zerotier-core-crypto = { path = "../zerotier-core-crypto" }
|
||||||
async-trait = "^0"
|
async-trait = "^0"
|
||||||
num-traits = "^0"
|
num-traits = "^0"
|
||||||
tokio = { version = "^1", features = ["full"], default-features = false }
|
tokio = { version = "^1", features = ["fs", "io-util", "io-std", "net", "parking_lot", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], default-features = false }
|
||||||
serde = { version = "^1", features = ["derive"], default-features = false }
|
serde = { version = "^1", features = ["derive"], default-features = false }
|
||||||
serde_json = { version = "^1", features = ["std"], default-features = false }
|
serde_json = { version = "^1", features = ["std"], default-features = false }
|
||||||
parking_lot = { version = "^0", features = [], default-features = false }
|
parking_lot = { version = "^0", features = [], default-features = false }
|
||||||
|
|
|
@ -2,11 +2,12 @@
|
||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use crate::localconfig;
|
use crate::localconfig::Config;
|
||||||
use crate::utils::{read_limit, DEFAULT_FILE_IO_READ_LIMIT};
|
use crate::utils::{read_limit, DEFAULT_FILE_IO_READ_LIMIT};
|
||||||
|
|
||||||
use tokio::sync::{Mutex, RwLock, RwLockReadGuard};
|
use parking_lot::{Mutex, RwLock};
|
||||||
|
|
||||||
use zerotier_core_crypto::random::next_u32_secure;
|
use zerotier_core_crypto::random::next_u32_secure;
|
||||||
use zerotier_network_hypervisor::vl1::Identity;
|
use zerotier_network_hypervisor::vl1::Identity;
|
||||||
|
@ -21,7 +22,7 @@ const CONFIG_FILENAME: &'static str = "local.conf";
|
||||||
/// Abstraction around ZeroTier's home data directory.
|
/// Abstraction around ZeroTier's home data directory.
|
||||||
pub struct DataDir {
|
pub struct DataDir {
|
||||||
pub base_path: PathBuf,
|
pub base_path: PathBuf,
|
||||||
config: RwLock<localconfig::Config>,
|
config: RwLock<Arc<Config>>,
|
||||||
authtoken: Mutex<String>,
|
authtoken: Mutex<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,8 +38,8 @@ impl DataDir {
|
||||||
|
|
||||||
let config_path = base_path.join(CONFIG_FILENAME);
|
let config_path = base_path.join(CONFIG_FILENAME);
|
||||||
let config_data = read_limit(&config_path, DEFAULT_FILE_IO_READ_LIMIT).await;
|
let config_data = read_limit(&config_path, DEFAULT_FILE_IO_READ_LIMIT).await;
|
||||||
let config = RwLock::new(if config_data.is_ok() {
|
let config = RwLock::new(Arc::new(if config_data.is_ok() {
|
||||||
let c = serde_json::from_slice::<localconfig::Config>(config_data.unwrap().as_slice());
|
let c = serde_json::from_slice::<Config>(config_data.unwrap().as_slice());
|
||||||
if c.is_err() {
|
if c.is_err() {
|
||||||
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, c.err().unwrap()));
|
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, c.err().unwrap()));
|
||||||
}
|
}
|
||||||
|
@ -47,9 +48,9 @@ impl DataDir {
|
||||||
if config_path.is_file() {
|
if config_path.is_file() {
|
||||||
return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "local.conf not readable"));
|
return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "local.conf not readable"));
|
||||||
} else {
|
} else {
|
||||||
localconfig::Config::default()
|
Config::default()
|
||||||
}
|
}
|
||||||
});
|
}));
|
||||||
|
|
||||||
return Ok(Self { base_path, config, authtoken: Mutex::new(String::new()) });
|
return Ok(Self { base_path, config, authtoken: Mutex::new(String::new()) });
|
||||||
}
|
}
|
||||||
|
@ -76,7 +77,7 @@ impl DataDir {
|
||||||
|
|
||||||
/// Get authorization token for local API, creating and saving if it does not exist.
|
/// Get authorization token for local API, creating and saving if it does not exist.
|
||||||
pub async fn authtoken(&self) -> std::io::Result<String> {
|
pub async fn authtoken(&self) -> std::io::Result<String> {
|
||||||
let mut authtoken = self.authtoken.lock().await;
|
let authtoken = self.authtoken.lock().clone();
|
||||||
if authtoken.is_empty() {
|
if authtoken.is_empty() {
|
||||||
let authtoken_path = self.base_path.join(AUTH_TOKEN_FILENAME);
|
let authtoken_path = self.base_path.join(AUTH_TOKEN_FILENAME);
|
||||||
let authtoken_bytes = read_limit(&authtoken_path, 4096).await;
|
let authtoken_bytes = read_limit(&authtoken_path, 4096).await;
|
||||||
|
@ -87,12 +88,12 @@ impl DataDir {
|
||||||
}
|
}
|
||||||
tokio::fs::write(&authtoken_path, tmp.as_bytes()).await?;
|
tokio::fs::write(&authtoken_path, tmp.as_bytes()).await?;
|
||||||
assert!(crate::utils::fs_restrict_permissions(&authtoken_path));
|
assert!(crate::utils::fs_restrict_permissions(&authtoken_path));
|
||||||
*authtoken = tmp;
|
*self.authtoken.lock() = tmp;
|
||||||
} else {
|
} else {
|
||||||
*authtoken = String::from_utf8_lossy(authtoken_bytes.unwrap().as_slice()).into();
|
*self.authtoken.lock() = String::from_utf8_lossy(authtoken_bytes.unwrap().as_slice()).into();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(authtoken.clone())
|
Ok(authtoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a readable locked reference to this node's configuration.
|
/// Get a readable locked reference to this node's configuration.
|
||||||
|
@ -100,17 +101,16 @@ impl DataDir {
|
||||||
/// Use clone() to get a copy of the configuration if you want to modify it. Then use
|
/// Use clone() to get a copy of the configuration if you want to modify it. Then use
|
||||||
/// save_config() to save the modified configuration and update the internal copy in
|
/// save_config() to save the modified configuration and update the internal copy in
|
||||||
/// this structure.
|
/// this structure.
|
||||||
pub async fn config(&self) -> RwLockReadGuard<'_, localconfig::Config> {
|
pub async fn config(&self) -> Arc<Config> {
|
||||||
self.config.read().await
|
self.config.read().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Save a modified copy of the configuration and replace the internal copy in this structure (if it's actually changed).
|
/// Save a modified copy of the configuration and replace the internal copy in this structure (if it's actually changed).
|
||||||
pub async fn save_config(&self, modified_config: localconfig::Config) -> std::io::Result<()> {
|
pub async fn save_config(&self, modified_config: Config) -> std::io::Result<()> {
|
||||||
let mut config = self.config.write().await;
|
if !modified_config.eq(&self.config.read()) {
|
||||||
if !config.eq(&modified_config) {
|
|
||||||
let config_data = crate::utils::to_json_pretty(&modified_config);
|
let config_data = crate::utils::to_json_pretty(&modified_config);
|
||||||
tokio::fs::write(self.base_path.join(CONFIG_FILENAME), config_data.as_bytes()).await?;
|
tokio::fs::write(self.base_path.join(CONFIG_FILENAME), config_data.as_bytes()).await?;
|
||||||
*config = modified_config;
|
*self.config.write() = Arc::new(modified_config);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,6 +145,7 @@ async fn async_main(flags: Flags, global_args: Box<ArgMatches>) -> i32 {
|
||||||
let svc = service::Service::new(tokio::runtime::Handle::current(), &flags.base_path, true).await;
|
let svc = service::Service::new(tokio::runtime::Handle::current(), &flags.base_path, true).await;
|
||||||
if svc.is_ok() {
|
if svc.is_ok() {
|
||||||
let _ = tokio::signal::ctrl_c().await;
|
let _ = tokio::signal::ctrl_c().await;
|
||||||
|
println!("Terminate signal received, shutting down...");
|
||||||
exitcode::OK
|
exitcode::OK
|
||||||
} else {
|
} else {
|
||||||
println!("FATAL: error launching service: {}", svc.err().unwrap().to_string());
|
println!("FATAL: error launching service: {}", svc.err().unwrap().to_string());
|
||||||
|
|
|
@ -35,7 +35,7 @@ struct ServiceImpl {
|
||||||
pub rt: tokio::runtime::Handle,
|
pub rt: tokio::runtime::Handle,
|
||||||
pub data: DataDir,
|
pub data: DataDir,
|
||||||
pub local_socket_unique_id_counter: AtomicUsize,
|
pub local_socket_unique_id_counter: AtomicUsize,
|
||||||
pub udp_sockets: parking_lot::RwLock<HashMap<u16, BoundUdpPort>>,
|
pub udp_sockets: tokio::sync::RwLock<HashMap<u16, BoundUdpPort>>,
|
||||||
pub num_listeners_per_socket: usize,
|
pub num_listeners_per_socket: usize,
|
||||||
_core: Option<NetworkHypervisor<Self>>,
|
_core: Option<NetworkHypervisor<Self>>,
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,15 @@ impl Drop for Service {
|
||||||
self.core_background_service_task.abort();
|
self.core_background_service_task.abort();
|
||||||
|
|
||||||
// Drop all bound sockets since these can hold circular Arc<> references to 'internal'.
|
// Drop all bound sockets since these can hold circular Arc<> references to 'internal'.
|
||||||
self.internal.udp_sockets.write().clear();
|
// This shouldn't have to loop much if at all to acquire the lock, but it might if something
|
||||||
|
// is still completing somewhere in an aborting task.
|
||||||
|
loop {
|
||||||
|
if let Ok(mut udp_sockets) = self.internal.udp_sockets.try_write() {
|
||||||
|
udp_sockets.clear();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::thread::sleep(Duration::from_millis(5));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,8 +69,8 @@ impl Service {
|
||||||
rt,
|
rt,
|
||||||
data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?,
|
data: DataDir::open(base_path).await.map_err(|e| Box::new(e))?,
|
||||||
local_socket_unique_id_counter: AtomicUsize::new(1),
|
local_socket_unique_id_counter: AtomicUsize::new(1),
|
||||||
udp_sockets: parking_lot::RwLock::new(HashMap::with_capacity(4)),
|
udp_sockets: tokio::sync::RwLock::new(HashMap::with_capacity(4)),
|
||||||
num_listeners_per_socket: std::thread::available_parallelism().unwrap().get(),
|
num_listeners_per_socket: 1, //std::thread::available_parallelism().unwrap().get(),
|
||||||
_core: None,
|
_core: None,
|
||||||
};
|
};
|
||||||
let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity).await?);
|
let _ = si._core.insert(NetworkHypervisor::new(&si, true, auto_upgrade_identity).await?);
|
||||||
|
@ -86,17 +94,17 @@ impl ServiceImpl {
|
||||||
|
|
||||||
/// Called in udp_binding_task_main() to service a particular UDP port.
|
/// Called in udp_binding_task_main() to service a particular UDP port.
|
||||||
async fn update_udp_bindings_for_port(self: &Arc<Self>, port: u16, interface_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> Option<Vec<(LocalInterface, InetAddress, std::io::Error)>> {
|
async fn update_udp_bindings_for_port(self: &Arc<Self>, port: u16, interface_prefix_blacklist: &Vec<String>, cidr_blacklist: &Vec<InetAddress>) -> Option<Vec<(LocalInterface, InetAddress, std::io::Error)>> {
|
||||||
let new_sockets = {
|
for ns in {
|
||||||
let mut udp_sockets = self.udp_sockets.write();
|
let mut udp_sockets = self.udp_sockets.write().await;
|
||||||
let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port));
|
let bp = udp_sockets.entry(port).or_insert_with(|| BoundUdpPort::new(port));
|
||||||
let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist);
|
let (errors, new_sockets) = bp.update_bindings(interface_prefix_blacklist, cidr_blacklist);
|
||||||
if bp.sockets.is_empty() {
|
if bp.sockets.is_empty() {
|
||||||
return Some(errors);
|
return Some(errors);
|
||||||
}
|
}
|
||||||
new_sockets
|
new_sockets
|
||||||
};
|
}
|
||||||
|
.iter()
|
||||||
for ns in new_sockets.iter() {
|
{
|
||||||
/*
|
/*
|
||||||
* Start a task (not actual thread) for each CPU core.
|
* Start a task (not actual thread) for each CPU core.
|
||||||
*
|
*
|
||||||
|
@ -123,7 +131,6 @@ impl ServiceImpl {
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +192,7 @@ impl SystemInterface for ServiceImpl {
|
||||||
// This is the fast path -- the socket is known to the core so just send it.
|
// This is the fast path -- the socket is known to the core so just send it.
|
||||||
if let Some(s) = local_socket {
|
if let Some(s) = local_socket {
|
||||||
if let Some(s) = s.0.upgrade() {
|
if let Some(s) = s.0.upgrade() {
|
||||||
return s.send_sync_nonblock(&self.rt, address, data, packet_ttl);
|
return s.send_sync_nonblock(address, data, packet_ttl);
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -193,7 +200,7 @@ impl SystemInterface for ServiceImpl {
|
||||||
|
|
||||||
// Otherwise we try to send from one socket on every interface or from the specified interface.
|
// Otherwise we try to send from one socket on every interface or from the specified interface.
|
||||||
// This path only happens when the core is trying new endpoints. The fast path is for most packets.
|
// This path only happens when the core is trying new endpoints. The fast path is for most packets.
|
||||||
let sockets = self.udp_sockets.read();
|
let sockets = self.udp_sockets.read().await;
|
||||||
if !sockets.is_empty() {
|
if !sockets.is_empty() {
|
||||||
if let Some(specific_interface) = local_interface {
|
if let Some(specific_interface) = local_interface {
|
||||||
for (_, p) in sockets.iter() {
|
for (_, p) in sockets.iter() {
|
||||||
|
@ -202,7 +209,7 @@ impl SystemInterface for ServiceImpl {
|
||||||
for _ in 0..p.sockets.len() {
|
for _ in 0..p.sockets.len() {
|
||||||
let s = p.sockets.get(i).unwrap();
|
let s = p.sockets.get(i).unwrap();
|
||||||
if s.interface.eq(specific_interface) {
|
if s.interface.eq(specific_interface) {
|
||||||
if s.send_sync_nonblock(&self.rt, address, data, packet_ttl) {
|
if s.send_sync_nonblock(address, data, packet_ttl) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -221,7 +228,7 @@ impl SystemInterface for ServiceImpl {
|
||||||
for _ in 0..p.sockets.len() {
|
for _ in 0..p.sockets.len() {
|
||||||
let s = p.sockets.get(i).unwrap();
|
let s = p.sockets.get(i).unwrap();
|
||||||
if !sent_on_interfaces.contains(&s.interface) {
|
if !sent_on_interfaces.contains(&s.interface) {
|
||||||
if s.send_sync_nonblock(&self.rt, address, data, packet_ttl) {
|
if s.send_sync_nonblock(address, data, packet_ttl) {
|
||||||
sent_on_interfaces.insert(s.interface.clone());
|
sent_on_interfaces.insert(s.interface.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,8 +55,8 @@ impl BoundUdpSocket {
|
||||||
unsafe { libc::setsockopt(self.fd.as_(), libc::IPPROTO_IP.as_(), libc::IP_TOS.as_(), (&ttl as *const libc::c_int).cast(), std::mem::size_of::<libc::c_int>().as_()) };
|
unsafe { libc::setsockopt(self.fd.as_(), libc::IPPROTO_IP.as_(), libc::IP_TOS.as_(), (&ttl as *const libc::c_int).cast(), std::mem::size_of::<libc::c_int>().as_()) };
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(any(target_os = "macos"))]
|
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
|
||||||
pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
|
pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
|
||||||
let mut ok = false;
|
let mut ok = false;
|
||||||
if dest.family() == self.address.family() {
|
if dest.family() == self.address.family() {
|
||||||
if packet_ttl > 0 && dest.is_ipv4() {
|
if packet_ttl > 0 && dest.is_ipv4() {
|
||||||
|
@ -65,7 +65,7 @@ impl BoundUdpSocket {
|
||||||
unsafe {
|
unsafe {
|
||||||
if b.len() == 1 {
|
if b.len() == 1 {
|
||||||
let bb = *b.get_unchecked(0);
|
let bb = *b.get_unchecked(0);
|
||||||
ok = libc::sendto(self.fd.as_(), bb.as_ptr().cast(), bb.len().as_(), 0, transmute(dest as *const InetAddress), size_of::<InetAddress>().as_()) > 0;
|
ok = libc::sendto(self.fd.as_(), bb.as_ptr().cast(), bb.len().as_(), 0, (dest as *const InetAddress).cast(), size_of::<InetAddress>().as_()) > 0;
|
||||||
} else {
|
} else {
|
||||||
let mut iov: [libc::iovec; 16] = MaybeUninit::uninit().assume_init();
|
let mut iov: [libc::iovec; 16] = MaybeUninit::uninit().assume_init();
|
||||||
assert!(b.len() <= iov.len());
|
assert!(b.len() <= iov.len());
|
||||||
|
@ -95,7 +95,7 @@ impl BoundUdpSocket {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(any(target_os = "macos", target_os = "freebsd")))]
|
#[cfg(not(any(target_os = "macos", target_os = "freebsd")))]
|
||||||
pub fn send_sync_nonblock(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
|
pub fn send_sync_nonblock(&self, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
|
||||||
let mut ok = false;
|
let mut ok = false;
|
||||||
if dest.family() == self.address.family() {
|
if dest.family() == self.address.family() {
|
||||||
let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() };
|
let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() };
|
||||||
|
@ -117,42 +117,10 @@ impl BoundUdpSocket {
|
||||||
|
|
||||||
if packet_ttl > 0 && dest.is_ipv4() {
|
if packet_ttl > 0 && dest.is_ipv4() {
|
||||||
self.set_ttl(packet_ttl);
|
self.set_ttl(packet_ttl);
|
||||||
}
|
ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok();
|
||||||
ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok();
|
|
||||||
if packet_ttl > 0 && dest.is_ipv4() {
|
|
||||||
self.set_ttl(0xff);
|
self.set_ttl(0xff);
|
||||||
}
|
|
||||||
}
|
|
||||||
ok
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn send_async(&self, _: &tokio::runtime::Handle, dest: &InetAddress, b: &[&[u8]], packet_ttl: u8) -> bool {
|
|
||||||
let mut ok = false;
|
|
||||||
if dest.family() == self.address.family() {
|
|
||||||
let mut tmp: [u8; 16384] = unsafe { std::mem::MaybeUninit::uninit().assume_init() };
|
|
||||||
let data = if b.len() == 1 {
|
|
||||||
*unsafe { b.get_unchecked(0) }
|
|
||||||
} else {
|
} else {
|
||||||
let mut p = 0;
|
ok = self.socket.try_send_to(data, dest.try_into().unwrap()).is_ok();
|
||||||
for bb in b.iter() {
|
|
||||||
let pp = p + bb.len();
|
|
||||||
if pp < 16384 {
|
|
||||||
tmp[p..pp].copy_from_slice(*bb);
|
|
||||||
p = pp;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
&tmp[..p]
|
|
||||||
};
|
|
||||||
|
|
||||||
if packet_ttl > 0 && dest.is_ipv4() {
|
|
||||||
self.set_ttl(packet_ttl);
|
|
||||||
}
|
|
||||||
let sa: SocketAddr = dest.try_into().unwrap();
|
|
||||||
ok = self.socket.send_to(data, sa).await.is_ok();
|
|
||||||
if packet_ttl > 0 && dest.is_ipv4() {
|
|
||||||
self.set_ttl(0xff);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ok
|
ok
|
||||||
|
@ -242,9 +210,11 @@ unsafe fn bind_udp_to_device(device_name: &str, address: &InetAddress) -> Result
|
||||||
|
|
||||||
let s = libc::socket(af.as_(), libc::SOCK_DGRAM, 0);
|
let s = libc::socket(af.as_(), libc::SOCK_DGRAM, 0);
|
||||||
if s <= 0 {
|
if s <= 0 {
|
||||||
return Err("unable to create socket");
|
return Err("unable to create new UDP socket");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert_ne!(libc::fcntl(s, libc::F_SETFL, libc::O_NONBLOCK), -1);
|
||||||
|
|
||||||
#[allow(unused_variables)]
|
#[allow(unused_variables)]
|
||||||
let mut setsockopt_results: libc::c_int = 0;
|
let mut setsockopt_results: libc::c_int = 0;
|
||||||
let mut fl: libc::c_int;
|
let mut fl: libc::c_int;
|
||||||
|
|
Loading…
Add table
Reference in a new issue