Rust builds again.

This commit is contained in:
Adam Ierymenko 2021-01-22 13:25:20 -05:00
parent 1e555ef24d
commit 23ee8c3b01
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
4 changed files with 73 additions and 50 deletions

View file

@ -18,6 +18,7 @@ use std::io::{Write, Seek, SeekFrom};
use std::cell::Cell; use std::cell::Cell;
use zerotier_core::PortableAtomicI64; use zerotier_core::PortableAtomicI64;
use chrono::Datelike; use chrono::Datelike;
use std::fmt::Display;
pub struct Log { pub struct Log {
prefix: String, prefix: String,
@ -40,15 +41,15 @@ impl Log {
path: String::from(path), path: String::from(path),
file: Mutex::new(Cell::new(None)), file: Mutex::new(Cell::new(None)),
cur_size: PortableAtomicI64::new(0), cur_size: PortableAtomicI64::new(0),
max_size: AtomicUsize::new(if max_size < MIN_MAX_SIZE { MIN_MAX_SIZE } else { max_size }), max_size: AtomicUsize::new(if max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { max_size }),
} }
} }
pub fn set_max_size(&self, new_max_size: usize) { pub fn set_max_size(&self, new_max_size: usize) {
self.max_size.store(if new_max_size < MIN_MAX_SIZE { MIN_MAX_SIZE } else { new_max_size },Ordering::Relaxed); self.max_size.store(if new_max_size < Log::MIN_MAX_SIZE { Log::MIN_MAX_SIZE } else { new_max_size },Ordering::Relaxed);
} }
pub fn log<S>(&self, s: &S) { pub fn log<S: Display>(&self, s: &S) {
let mut fc = self.file.lock().unwrap(); let mut fc = self.file.lock().unwrap();
let max_size = self.max_size.load(Ordering::Relaxed); let max_size = self.max_size.load(Ordering::Relaxed);
@ -74,15 +75,16 @@ impl Log {
if eof.is_err() { if eof.is_err() {
return; return;
} }
cur_size = eof.unwrap() as i64; self.cur_size.set(eof.unwrap() as i64);
fc.replace(Some(f)); fc.replace(Some(f));
} }
let mut f = fc.get_mut().as_mut().unwrap(); let mut f = fc.get_mut().as_mut().unwrap();
let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(); let now_str = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
writeln!("{}[{}] {}", self.prefix.as_str(), now_str.as_str(), s); let log_line = format!("{}[{}] {}\n", self.prefix.as_str(), now_str.as_str(), s);
let _ = f.write_all(log_line.as_bytes());
let _ = f.flush(); let _ = f.flush();
self.cur_size.fetch_add((now_str_b.len() + sb.len() + 1) as i64); self.cur_size.fetch_add(log_line.len() as i64);
} }
} }

View file

@ -27,6 +27,8 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration; use std::time::Duration;
use futures::stream::{self, StreamExt};
use warp::Filter; use warp::Filter;
use zerotier_core::*; use zerotier_core::*;
@ -36,6 +38,7 @@ use crate::localconfig::*;
use crate::log::Log; use crate::log::Log;
use crate::physicallink::PhysicalLink; use crate::physicallink::PhysicalLink;
use crate::network::Network; use crate::network::Network;
use futures::TryFutureExt;
pub struct ServiceEventHandler {} pub struct ServiceEventHandler {}
@ -85,34 +88,43 @@ fn main() {
let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket<ServiceEventHandler>> = BTreeMap::new(); let mut udp_sockets: BTreeMap<InetAddress, FastUDPSocket<ServiceEventHandler>> = BTreeMap::new();
let handler: Arc<ServiceEventHandler> = Arc::new(ServiceEventHandler{}); let handler: Arc<ServiceEventHandler> = Arc::new(ServiceEventHandler{});
let run: AtomicBool = AtomicBool::new(true); let run: AtomicBool = AtomicBool::new(true);
let (mut interrupt_tx, mut interrupt_rx) = futures::channel::mpsc::channel::<u8>(2);
//
// The inner loop periodically updates UDP socket bindings and does other housekeeping, but
// otherwise does nothing. If it detects that the primary port has changed, it breaks and
// causes the outer loop to run which reboots the HTTP server. If the 'run' flag is set
// to false this causes a break of both loops which terminates the service.
//
loop { loop {
let mut warp_server_port = local_config.settings.primary_port; let mut warp_server_port = local_config.settings.primary_port;
let root = warp::path::end().map(|| { warp::reply::with_status("not found", warp::hyper::StatusCode::NOT_FOUND) });
let status = warp::path("status").map(|| { "status" });
let network = warp::path!("network" / String).map(|nwid_str| { "network" });
let peer = warp::path!("peer" / String).map(|peer_str| { "peer" });
let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel();
let (_, warp_server) = warp::serve(warp::any().and(
root
.or(status)
.or(network)
.or(peer)
)).bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async {
let _ = shutdown_rx.await;
});
let warp_server = tokio::spawn(warp_server);
let mut loop_delay = 10;
loop { loop {
let root = warp::path::end().map(|| { warp::reply::with_status("not found", warp::hyper::StatusCode::NOT_FOUND) }); let _ = tokio::time::timeout(Duration::from_secs(loop_delay), interrupt_rx.next());
let status = warp::path("status").map(|| { "status" });
let network = warp::path!("network" / String).map(|nwid_str| { "network" });
let peer = warp::path!("peer" / String).map(|peer_str| { "peer" });
let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); // Enumerate physical addresses on the system, creating a map with an entry for
let (_, warp_server) = warp::serve(warp::any().and( // the primary_port and another for the secondary_port if bound.
root
.or(status)
.or(network)
.or(peer)
)).bind_with_graceful_shutdown((inaddr_v6_any, warp_server_port), async {
let _ = shutdown_rx.await;
});
let warp_server = tokio::spawn(warp_server);
tokio::time::sleep(Duration::from_secs(10)).await;
// Diff system addresses against currently bound UDP sockets and update as needed.
// Also check interface prefix blacklists.
let mut system_addrs: BTreeMap<InetAddress, Rc<PhysicalLink>> = BTreeMap::new(); let mut system_addrs: BTreeMap<InetAddress, Rc<PhysicalLink>> = BTreeMap::new();
PhysicalLink::map(|link: PhysicalLink| { PhysicalLink::map(|link: PhysicalLink| {
if !local_config.settings.is_interface_blacklisted(link.device.as_str()) { if !local_config.settings.is_interface_blacklisted(link.device.as_str()) {
// Add two entries to system_addrs: one for primary port, and one for secondary if enabled.
let l = Rc::new(link); let l = Rc::new(link);
let mut a = l.address.clone(); let mut a = l.address.clone();
a.set_port(local_config.settings.primary_port); a.set_port(local_config.settings.primary_port);
@ -124,6 +136,7 @@ fn main() {
} }
} }
}); });
let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new(); let mut udp_sockets_to_close: Vec<InetAddress> = Vec::new();
for sock in udp_sockets.iter() { for sock in udp_sockets.iter() {
if !system_addrs.contains_key(sock.0) { if !system_addrs.contains_key(sock.0) {
@ -133,6 +146,7 @@ fn main() {
for k in udp_sockets_to_close.iter() { for k in udp_sockets_to_close.iter() {
udp_sockets.remove(k); udp_sockets.remove(k);
} }
for addr in system_addrs.iter() { for addr in system_addrs.iter() {
if !udp_sockets.contains_key(addr.0) { if !udp_sockets.contains_key(addr.0) {
let s = FastUDPSocket::new(addr.1.device.as_str(), addr.0, &handler); let s = FastUDPSocket::new(addr.1.device.as_str(), addr.0, &handler);
@ -142,13 +156,13 @@ fn main() {
} }
} }
// Breaking the inner loop causes the HTTP server to recycle, or may exit entirely if run is false.
if local_config.settings.primary_port != warp_server_port || !run.load(Ordering::Relaxed) { if local_config.settings.primary_port != warp_server_port || !run.load(Ordering::Relaxed) {
let _ = shutdown_tx.send(()); let _ = shutdown_tx.send(());
let _ = warp_server.await; let _ = warp_server.await;
break; break;
} }
} }
tokio::time::sleep(Duration::from_millis(250)).await; tokio::time::sleep(Duration::from_millis(250)).await;
if !run.load(Ordering::Relaxed) { if !run.load(Ordering::Relaxed) {
break; break;

View file

@ -16,7 +16,6 @@ use std::ffi::CStr;
use std::ptr::{null_mut, copy_nonoverlapping}; use std::ptr::{null_mut, copy_nonoverlapping};
use std::mem::size_of; use std::mem::size_of;
#[derive(Clone)]
pub struct PhysicalLink { pub struct PhysicalLink {
pub address: InetAddress, pub address: InetAddress,
pub device: String pub device: String

View file

@ -34,7 +34,7 @@ impl Store {
return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "base path does not exist or is not writable")); return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "base path does not exist or is not writable"));
} }
Ok(Store{ Ok(Store{
base_path: bp.into_path_buf().into_boxed_path(), base_path: bp.to_path_buf().into_boxed_path(),
peers_path: bp.join("peers.d").into_boxed_path(), peers_path: bp.join("peers.d").into_boxed_path(),
controller_path: bp.join("controller.d").into_boxed_path(), controller_path: bp.join("controller.d").into_boxed_path(),
networks_path: bp.join("networks.d").into_boxed_path(), networks_path: bp.join("networks.d").into_boxed_path(),
@ -42,13 +42,13 @@ impl Store {
}) })
} }
pub fn make_obj_path(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Result<PathBuf, std::io::Error> { fn make_obj_path(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Option<PathBuf> {
Ok(match obj_type { Some(match obj_type {
StateObjectType::IdentityPublic => self.base_path.join("identity.public"), StateObjectType::IdentityPublic => self.base_path.join("identity.public"),
StateObjectType::IdentitySecret => self.base_path.join("identity.secret"), StateObjectType::IdentitySecret => self.base_path.join("identity.secret"),
StateObjectType::Certificate => { StateObjectType::Certificate => {
if obj_id.len() < 6 { if obj_id.len() < 6 {
return Err(std::io::Error(std::io::ErrorKind::NotFound)); return None;
} }
self.certs_path.join(format!("{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}.cert",obj_id[0],obj_id[1],obj_id[2],obj_id[3],obj_id[4],obj_id[5])) self.certs_path.join(format!("{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}{:0>16x}.cert",obj_id[0],obj_id[1],obj_id[2],obj_id[3],obj_id[4],obj_id[5]))
}, },
@ -56,44 +56,52 @@ impl Store {
StateObjectType::Locator => self.base_path.join("locator"), StateObjectType::Locator => self.base_path.join("locator"),
StateObjectType::NetworkConfig => { StateObjectType::NetworkConfig => {
if obj_id.len() < 1 { if obj_id.len() < 1 {
return Err(std::io::Error(std::io::ErrorKind::NotFound)); return None;
} }
self.networks_path.join(format!("{:0>16x}.conf", obj_id[0])) self.networks_path.join(format!("{:0>16x}.conf", obj_id[0]))
}, },
StateObjectType::Peer => { StateObjectType::Peer => {
if obj_id.len() < 1 { if obj_id.len() < 1 {
return Err(std::io::Error(std::io::ErrorKind::NotFound)); return None;
} }
self.peers_path.join(format!("{:0>10x}.peer", obj_id[0])) self.peers_path.join(format!("{:0>10x}.peer", obj_id[0]))
} }
}) })
} }
pub fn load(&self, obj_type: StateObjectType, obj_id: &[u64]) -> Result<Box<[u8]>, std::io::Error> { pub fn load(&self, obj_type: StateObjectType, obj_id: &[u64]) -> std::io::Result<Box<[u8]>> {
let obj_path = self.make_obj_path(obj_type, obj_id)?; let obj_path = self.make_obj_path(obj_type, obj_id);
let fmd = obj_path.metadata()?; if obj_path.is_some() {
if fmd.is_file() { let obj_path = obj_path.unwrap();
let flen = fmd.len(); let fmd = obj_path.metadata()?;
if flen <= Store::MAX_OBJECT_SIZE as u64 { if fmd.is_file() {
let mut f = std::fs::File::open(obj_path)?; let flen = fmd.len();
let mut buf: Vec<u8> = Vec::new(); if flen <= Store::MAX_OBJECT_SIZE as u64 {
buf.reserve(flen as usize); let mut f = std::fs::File::open(obj_path)?;
let rs = f.read_to_end(&mut buf)?; let mut buf: Vec<u8> = Vec::new();
buf.resize(rs as usize, 0); buf.reserve(flen as usize);
return Ok(buf.into_boxed_slice()); let rs = f.read_to_end(&mut buf)?;
buf.resize(rs as usize, 0);
return Ok(buf.into_boxed_slice());
}
} }
} }
Err(std::io::Error(std::io::ErrorKind::NotFound)) Err(std::io::Error::new(std::io::ErrorKind::NotFound, "does not exist or is not readable"))
} }
pub fn erase(&self, obj_type: StateObjectType, obj_id: &[u64]) { pub fn erase(&self, obj_type: StateObjectType, obj_id: &[u64]) {
let obj_path = self.make_obj_path(obj_type, obj_id); let obj_path = self.make_obj_path(obj_type, obj_id);
if obj_path.is_ok() { if obj_path.is_some() {
let _ = std::fs::remove_file(obj_path.unwrap()); let _ = std::fs::remove_file(obj_path.unwrap());
} }
} }
pub fn store(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> Result<(), std::io::Error> { pub fn store(&self, obj_type: StateObjectType, obj_id: &[u64], obj_data: &[u8]) -> std::io::Result<()> {
std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(self.make_obj_path(obj_type, obj_id)?)?.write_all(obj_data) let obj_path = self.make_obj_path(obj_type, obj_id);
if obj_path.is_some() {
std::fs::OpenOptions::new().write(true).truncate(true).create(true).open(obj_path.unwrap())?.write_all(obj_data)
} else {
Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "object ID not valid"))
}
} }
} }