Simplification of some code in controller.

This commit is contained in:
Adam Ierymenko 2022-10-23 20:47:05 -04:00
parent bc5861c539
commit 88e613e043
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3

View file

@ -26,11 +26,8 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// ZeroTier VL2 network controller packet handler, answers VL2 netconf queries.
pub struct Handler {
inner: Arc<Inner>,
}
struct Inner {
service: RwLock<Weak<VL1Service<dyn Database, Handler, Handler>>>,
self_ref: Weak<Self>,
service: RwLock<Weak<VL1Service<dyn Database, Self, Self>>>,
reaper: Reaper,
daemons: Mutex<Vec<tokio::task::JoinHandle<()>>>, // drop() aborts these
runtime: tokio::runtime::Handle,
@ -43,19 +40,15 @@ impl Handler {
pub async fn new(database: Arc<dyn Database>, runtime: tokio::runtime::Handle) -> Result<Arc<Self>, Box<dyn Error>> {
if let Some(local_identity) = database.load_node_identity() {
assert!(local_identity.secret.is_some());
let inner = Arc::new(Inner {
Ok(Arc::new_cyclic(|r| Self {
self_ref: r.clone(),
service: RwLock::new(Weak::default()),
reaper: Reaper::new(&runtime),
daemons: Mutex::new(Vec::with_capacity(1)),
runtime,
database: database.clone(),
local_identity,
});
let h = Arc::new(Self { inner: inner.clone() });
Ok(h)
}))
} else {
Err(Box::new(InvalidParameterError(
"local controller's identity not readable by database",
@ -69,8 +62,8 @@ impl Handler {
/// won't actually do anything. The reference the handler holds is weak to prevent
/// a circular reference, so if the VL1Service is dropped this must be called again to
/// tell the controller handler about a new instance.
pub fn set_service(&self, service: &Arc<VL1Service<dyn Database, Handler, Handler>>) {
*self.inner.service.write().unwrap() = Arc::downgrade(service);
pub fn set_service(&self, service: &Arc<VL1Service<dyn Database, Self, Self>>) {
*self.service.write().unwrap() = Arc::downgrade(service);
}
/// Start a change watcher to respond to changes detected by the database.
@ -79,20 +72,20 @@ impl Handler {
/// unnecessary async tasks. If the database being used does not support changes, this
/// does nothing.
pub async fn start_change_watcher(&self) {
if let Some(cw) = self.inner.database.changes().await.map(|mut ch| {
let inner = self.inner.clone();
self.inner.runtime.spawn(async move {
if let Some(cw) = self.database.changes().await.map(|mut ch| {
let self2 = self.self_ref.upgrade().unwrap();
self.runtime.spawn(async move {
loop {
if let Ok(change) = ch.recv().await {
inner.reaper.add(
inner.runtime.spawn(inner.clone().handle_change_notification(change)),
self2.reaper.add(
self2.runtime.spawn(self2.clone().handle_change_notification(change)),
Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(),
);
}
}
})
}) {
self.inner.daemons.lock().unwrap().push(cw);
self.daemons.lock().unwrap().push(cw);
}
}
}
@ -103,8 +96,8 @@ impl PathFilter for Handler {}
impl InnerProtocol for Handler {
fn handle_packet<HostSystemImpl: HostSystem + ?Sized>(
&self,
_host_system: &HostSystemImpl,
_node: &Node,
_: &HostSystemImpl,
_: &Node,
source: &Arc<Peer>,
source_path: &Arc<Path>,
source_hops: u8,
@ -157,19 +150,17 @@ impl InnerProtocol for Handler {
*/
// Launch handler as an async background task.
let (inner, peer, source_remote_endpoint) = (self.inner.clone(), source.clone(), source_path.endpoint.clone());
self.inner.reaper.add(
self.inner.runtime.spawn(async move {
let (self2, peer, source_remote_endpoint) =
(self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone());
self.reaper.add(
self.runtime.spawn(async move {
let node_id = peer.identity.address;
let node_fingerprint = Blob::from(peer.identity.fingerprint);
let now = ms_since_epoch();
let result = match inner
.handle_network_config_request::<HostSystemImpl>(&peer.identity, network_id, now)
.await
{
let result = match self2.handle_network_config_request(&peer.identity, network_id, now).await {
Result::Ok((result, Some(config))) => {
inner.send_network_config(peer.as_ref(), &config, Some(message_id));
self2.send_network_config(peer.as_ref(), &config, Some(message_id));
result
}
Result::Ok((result, None)) => result,
@ -179,13 +170,13 @@ impl InnerProtocol for Handler {
}
};
let _ = inner
let _ = self2
.database
.log_request(RequestLogItem {
network_id,
node_id,
node_fingerprint,
controller_node_id: inner.local_identity.address,
controller_node_id: self2.local_identity.address,
metadata: if meta_data.is_empty() {
Vec::new()
} else {
@ -245,7 +236,7 @@ impl InnerProtocol for Handler {
}
}
impl Inner {
impl Handler {
fn send_network_config(
&self,
peer: &Peer,
@ -301,7 +292,7 @@ impl Inner {
async fn handle_change_notification(self: Arc<Self>, _change: Change) {}
async fn handle_network_config_request<HostSystemImpl: HostSystem + ?Sized>(
async fn handle_network_config_request(
self: &Arc<Self>,
source_identity: &Identity,
network_id: NetworkId,
@ -424,7 +415,7 @@ impl Inner {
}
}
impl Drop for Inner {
impl Drop for Handler {
fn drop(&mut self) {
for h in self.daemons.lock().unwrap().drain(..) {
h.abort();