This commit is contained in:
Adam Ierymenko 2021-01-26 11:45:42 -05:00
parent 23ee8c3b01
commit b7d7e8eb40
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
6 changed files with 95 additions and 43 deletions

View file

@ -48,13 +48,14 @@ public:
*/
static ZT_INLINE bool accelerated()
{
#ifdef ZT_AES_NO_ACCEL
return false;
#else
#ifdef ZT_AES_AESNI
return Utils::CPUID.aes;
#else
#endif
#ifdef ZT_AES_NEON
return Utils::ARMCAP.aes;
#else
return false;
#endif
#endif
}

View file

@ -15,12 +15,6 @@
namespace ZeroTier {
Dictionary::Dictionary()
{}
Dictionary::~Dictionary()
{}
Vector< uint8_t > &Dictionary::operator[](const char *const k)
{ return m_entries[k]; }
@ -91,9 +85,7 @@ char *Dictionary::getS(const char *k, char *v, const unsigned int cap) const
}
void Dictionary::clear()
{
m_entries.clear();
}
{ m_entries.clear(); }
void Dictionary::encode(Vector< uint8_t > &out) const
{

View file

@ -42,9 +42,11 @@ class Dictionary
public:
typedef SortedMap< String, Vector< uint8_t > >::const_iterator const_iterator;
Dictionary();
ZT_INLINE Dictionary()
{}
~Dictionary();
ZT_INLINE ~Dictionary()
{}
/*
ZT_INLINE void dump() const

View file

@ -17,7 +17,8 @@
namespace {
// The highest 1024 primes less than 2 ^ 52 (only the least significant 32 bits are needed) of the form 6k + 5
// Largest 1024 primes of form 6k + 5 and less than 2^52. Only the least significant 32
// bits need to be here, as the most significant bits are all 1.
const uint32_t ZT_MIMC52_PRIMES[1024] = {4294895267, 4294895477, 4294895513, 4294895519, 4294895543, 4294895567, 4294895657, 4294895711, 4294895777, 4294895861, 4294895909, 4294895921, 4294895969, 4294896011, 4294896149, 4294896227, 4294896401, 4294896473, 4294896527, 4294896563, 4294896653, 4294896731, 4294896863, 4294896899, 4294896983, 4294897037, 4294897103, 4294897331, 4294897349, 4294897451, 4294897571, 4294897661, 4294897703, 4294897757, 4294897793, 4294897811, 4294897817, 4294897829, 4294897877, 4294897919, 4294897991, 4294898027, 4294898129, 4294898153, 4294898231, 4294898273,
4294898279, 4294898291, 4294898363, 4294898369, 4294898417, 4294898423, 4294898453, 4294898489, 4294898573, 4294898579, 4294898639, 4294898693, 4294898747, 4294898759, 4294898867, 4294898879, 4294898909, 4294898921, 4294898933, 4294899011, 4294899041, 4294899047, 4294899203, 4294899221, 4294899227, 4294899287, 4294899341, 4294899431, 4294899509, 4294899533, 4294899539, 4294899551, 4294899629, 4294899791, 4294899809, 4294899971, 4294900001, 4294900007, 4294900013, 4294900307, 4294900331, 4294900427, 4294900469, 4294900481, 4294900541, 4294900583,
4294900781, 4294900853, 4294900931, 4294900991, 4294901033, 4294901087, 4294901159, 4294901267, 4294901393, 4294901411, 4294901489, 4294901657, 4294902011, 4294902071, 4294902101, 4294902107, 4294902353, 4294902377, 4294902599, 4294902647, 4294902743, 4294902869, 4294902977, 4294903067, 4294903103, 4294903259, 4294903289, 4294903397, 4294903421, 4294903493, 4294903577, 4294903631, 4294903637, 4294903733, 4294903799, 4294903823, 4294904003, 4294904033, 4294904081, 4294904129, 4294904279, 4294904297, 4294904303, 4294904333, 4294904351, 4294904381,
@ -62,8 +63,13 @@ static uint64_t mulmod64(uint64_t a, uint64_t b, const uint64_t m)
#else
// 52-bit mulmod using FPU hack, which is very fast on most systems with IEEE 64-bit floats.
#define mulmod52(a, b, m, mf) ( ( ( a * b ) - ( ((uint64_t)(((double)a * (double)b) / mf) - 1) * m ) ) % m )
ZT_INLINE uint64_t mulmod52(uint64_t a, const uint64_t b, const uint64_t m, const double mf)
{
a = ( ( a * b ) - ( ((uint64_t)(((double)a * (double)b) / mf) - 1) * m ) );
//a -= m * (uint64_t)(a > m); // faster on some systems, but slower on newer cores
a %= m;
return a;
}
#endif

View file

@ -49,7 +49,7 @@ impl Log {
self.max_size.store(if new_max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { new_max_size },Ordering::Relaxed);
}
pub fn log<S: Display>(&self, s: &S) {
pub fn log<S: Into<String>>(&self, s: S) {
let mut fc = self.file.lock().unwrap();
let max_size = self.max_size.load(Ordering::Relaxed);
@ -81,7 +81,7 @@ impl Log {
let mut f = fc.get_mut().as_mut().unwrap();
let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), s);
let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), s.into());
let _ = f.write_all(log_line.as_bytes());
let _ = f.flush();
self.cur_size.fetch_add(log_line.len() as i64);

View file

@ -30,6 +30,8 @@ use std::time::Duration;
use futures::stream::{self, StreamExt};
use warp::Filter;
use warp::hyper::{HeaderMap, Method};
use warp::hyper::body::Bytes;
use zerotier_core::*;
@ -38,7 +40,6 @@ use crate::localconfig::*;
use crate::log::Log;
use crate::physicallink::PhysicalLink;
use crate::network::Network;
use futures::TryFutureExt;
pub struct ServiceEventHandler {}
@ -81,44 +82,82 @@ impl NodeEventHandler<Network> for ServiceEventHandler {
}
fn main() {
tokio::runtime::Builder::new_multi_thread().thread_stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).build().unwrap().block_on(async {
let inaddr_v6_any = IpAddr::from_str("::0").unwrap();
let inaddr_v6_any = IpAddr::from_str("::0").unwrap();
let mut process_exit_value: i32 = 0;
let mut local_config: Box<LocalConfig> = Box::new(LocalConfig::default());
// Current active local configuration for this node.
let mut local_config: Box<LocalConfig> = Box::new(LocalConfig::default());
// Handler for incoming packets from FastUDPSocket and incoming events from Node.
let handler: Arc<ServiceEventHandler> = Arc::new(ServiceEventHandler{});
// From this point on we are in Tokio async land...
let tokio_rt = tokio::runtime::Builder::new_multi_thread().thread_stack_size(zerotier_core::RECOMMENDED_THREAD_STACK_SIZE).build().unwrap();
tokio_rt.block_on(async {
// Keeps track of FastUDPSocket instances by bound address.
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket<ServiceEventHandler>> = BTreeMap::new();
let handler: Arc<ServiceEventHandler> = Arc::new(ServiceEventHandler{});
let run: AtomicBool = AtomicBool::new(true);
// Send something to interrupt_tx to interrupt the inner loop and force it to
// detect a change or exit if run has been set to false.
let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<u8>(2);
//
// The inner loop periodically updates UDP socket bindings and does other housekeeping, but
// otherwise does nothing. If it detects that the primary port has changed, it breaks and
// causes the outer loop to run which reboots the HTTP server. If the 'run' flag is set
// to false this causes a break of both loops which terminates the service.
//
// Setting this to false terminates the service. It's atomic since this is multithreaded.
let run = AtomicBool::new(true);
loop {
let mut warp_server_port = local_config.settings.primary_port;
let root = warp::path::end().map(|| { warp::reply::with_status("not found", warp::hyper::StatusCode::NOT_FOUND) });
let status = warp::path("status").map(|| { "status" });
let network = warp::path!("network" / String).map(|nwid_str| { "network" });
let peer = warp::path!("peer" / String).map(|peer_str| { "peer" });
let root = warp::path::end().map(|| {
warp::reply::with_status("404", warp::hyper::StatusCode::NOT_FOUND)
});
let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel();
let (_, warp_server) = warp::serve(warp::any().and(
let status = warp::path("status")
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.map(|method: Method, headers: HeaderMap, post_data: Bytes| {
"status"
});
let network = warp::path!("network" / String)
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.map(|nwid_str: String, method: Method, headers: HeaderMap, post_data: Bytes| {
"network"
});
let peer = warp::path!("peer" / String)
.and(warp::method())
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.map(|peer_str: String, method: Method, headers: HeaderMap, post_data: Bytes| {
"peer"
});
let (mut shutdown_tx, mut shutdown_rx) = futures::channel::oneshot::channel();
let warp_server = warp::serve(warp::any().and(
root
.or(status)
.or(network)
.or(peer)
)).bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async {
let _ = shutdown_rx.await;
});
let warp_server = tokio::spawn(warp_server);
)).try_bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async { shutdown_rx.await; });
if warp_server.is_err() {
// TODO: log unable to bind to primary port
run.store(false, Ordering::Relaxed);
}
let warp_server = tokio_rt.spawn(warp_server.unwrap().1);
let mut loop_delay = 10;
loop {
let _ = tokio::time::timeout(Duration::from_secs(loop_delay), interrupt_rx.next());
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(loop_delay)) => {},
_ = interrupt_rx.next() => {},
_ = tokio::signal::ctrl_c() => {
// TODO: log CTRL+C received
run.store(false, Ordering::Relaxed);
let _ = shutdown_tx.send(());
break;
}
}
// Enumerate physical addresses on the system, creating a map with an entry for
// the primary_port and another for the secondary_port if bound.
@ -137,6 +176,7 @@ fn main() {
}
});
// Close UDP bindings that no longer apply.
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) {
@ -147,26 +187,37 @@ fn main() {
udp_sockets.remove(k);
}
// Bind addresses that are not already bound.
for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) {
let s = FastUDPSocket::new(addr.1.device.as_str(), addr.0, &handler);
if s.is_ok() {
udp_sockets.insert(addr.0.clone(), s.unwrap());
} else if addr.0.port() == local_config.settings.primary_port {
run.store(false, Ordering::Relaxed);
// TODO: log failure to bind to primary port (UDP)
break;
}
}
}
if local_config.settings.primary_port != warp_server_port || !run.load(Ordering::Relaxed) {
let _ = shutdown_tx.send(());
let _ = warp_server.await;
break;
}
}
let _ = warp_server.await;
if !run.load(Ordering::Relaxed) {
break;
}
tokio::time::sleep(Duration::from_millis(250)).await;
if !run.load(Ordering::Relaxed) {
break;
}
}
});
std::process::exit(process_exit_value);
}