diff --git a/controller/src/handler.rs b/controller/src/handler.rs index 3f9c18a25..fc625b5e7 100644 --- a/controller/src/handler.rs +++ b/controller/src/handler.rs @@ -26,11 +26,8 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// ZeroTier VL2 network controller packet handler, answers VL2 netconf queries. pub struct Handler { - inner: Arc, -} - -struct Inner { - service: RwLock>>, + self_ref: Weak, + service: RwLock>>, reaper: Reaper, daemons: Mutex>>, // drop() aborts these runtime: tokio::runtime::Handle, @@ -43,19 +40,15 @@ impl Handler { pub async fn new(database: Arc, runtime: tokio::runtime::Handle) -> Result, Box> { if let Some(local_identity) = database.load_node_identity() { assert!(local_identity.secret.is_some()); - - let inner = Arc::new(Inner { + Ok(Arc::new_cyclic(|r| Self { + self_ref: r.clone(), service: RwLock::new(Weak::default()), reaper: Reaper::new(&runtime), daemons: Mutex::new(Vec::with_capacity(1)), runtime, database: database.clone(), local_identity, - }); - - let h = Arc::new(Self { inner: inner.clone() }); - - Ok(h) + })) } else { Err(Box::new(InvalidParameterError( "local controller's identity not readable by database", @@ -69,8 +62,8 @@ impl Handler { /// won't actually do anything. The reference the handler holds is weak to prevent /// a circular reference, so if the VL1Service is dropped this must be called again to /// tell the controller handler about a new instance. - pub fn set_service(&self, service: &Arc>) { - *self.inner.service.write().unwrap() = Arc::downgrade(service); + pub fn set_service(&self, service: &Arc>) { + *self.service.write().unwrap() = Arc::downgrade(service); } /// Start a change watcher to respond to changes detected by the database. @@ -79,20 +72,20 @@ impl Handler { /// unnecessary async tasks. If the database being used does not support changes, this /// does nothing. pub async fn start_change_watcher(&self) { - if let Some(cw) = self.inner.database.changes().await.map(|mut ch| { - let inner = self.inner.clone(); - self.inner.runtime.spawn(async move { + if let Some(cw) = self.database.changes().await.map(|mut ch| { + let self2 = self.self_ref.upgrade().unwrap(); + self.runtime.spawn(async move { loop { if let Ok(change) = ch.recv().await { - inner.reaper.add( - inner.runtime.spawn(inner.clone().handle_change_notification(change)), + self2.reaper.add( + self2.runtime.spawn(self2.clone().handle_change_notification(change)), Instant::now().checked_add(REQUEST_TIMEOUT).unwrap(), ); } } }) }) { - self.inner.daemons.lock().unwrap().push(cw); + self.daemons.lock().unwrap().push(cw); } } } @@ -103,8 +96,8 @@ impl PathFilter for Handler {} impl InnerProtocol for Handler { fn handle_packet( &self, - _host_system: &HostSystemImpl, - _node: &Node, + _: &HostSystemImpl, + _: &Node, source: &Arc, source_path: &Arc, source_hops: u8, @@ -157,19 +150,17 @@ impl InnerProtocol for Handler { */ // Launch handler as an async background task. - let (inner, peer, source_remote_endpoint) = (self.inner.clone(), source.clone(), source_path.endpoint.clone()); - self.inner.reaper.add( - self.inner.runtime.spawn(async move { + let (self2, peer, source_remote_endpoint) = + (self.self_ref.upgrade().unwrap(), source.clone(), source_path.endpoint.clone()); + self.reaper.add( + self.runtime.spawn(async move { let node_id = peer.identity.address; let node_fingerprint = Blob::from(peer.identity.fingerprint); let now = ms_since_epoch(); - let result = match inner - .handle_network_config_request::(&peer.identity, network_id, now) - .await - { + let result = match self2.handle_network_config_request(&peer.identity, network_id, now).await { Result::Ok((result, Some(config))) => { - inner.send_network_config(peer.as_ref(), &config, Some(message_id)); + self2.send_network_config(peer.as_ref(), &config, Some(message_id)); result } Result::Ok((result, None)) => result, @@ -179,13 +170,13 @@ impl InnerProtocol for Handler { } }; - let _ = inner + let _ = self2 .database .log_request(RequestLogItem { network_id, node_id, node_fingerprint, - controller_node_id: inner.local_identity.address, + controller_node_id: self2.local_identity.address, metadata: if meta_data.is_empty() { Vec::new() } else { @@ -245,7 +236,7 @@ impl InnerProtocol for Handler { } } -impl Inner { +impl Handler { fn send_network_config( &self, peer: &Peer, @@ -301,7 +292,7 @@ impl Inner { async fn handle_change_notification(self: Arc, _change: Change) {} - async fn handle_network_config_request( + async fn handle_network_config_request( self: &Arc, source_identity: &Identity, network_id: NetworkId, @@ -424,7 +415,7 @@ impl Inner { } } -impl Drop for Inner { +impl Drop for Handler { fn drop(&mut self) { for h in self.daemons.lock().unwrap().drain(..) { h.abort();