diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index 0794c7e..299ba48 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::net::TcpStream; use tokio::net::{TcpListener, UdpSocket}; +use tokio::task::AbortHandle; use tokio::time::MissedTickBehavior; /// Magic bytes to identify a multicast UDP packet used in link local peer discovery. @@ -46,6 +47,8 @@ const MAX_FAILED_LOCAL_PEER_CONNECTION_ATTEMPTS: usize = 3; /// [`Router`]. pub struct PeerManager { inner: Arc>, + /// Handles to background tasks so we can abort them when the PeerManager is dropped. + abort_handles: Vec, } /// Details how the PeerManager learned about a remote. @@ -187,7 +190,7 @@ where // Set the initially configured peer count in metrics. metrics.peer_manager_known_peers(static_peers_sockets.len()); - let peer_manager = PeerManager { + let mut peer_manager = PeerManager { inner: Arc::new(Inner { router: Mutex::new(router), peers: Mutex::new( @@ -219,31 +222,36 @@ where metrics, firewall_mark, }), + abort_handles: vec![], }; // Start listeners for inbound connections. // Start the tcp listener, in case we are running a private network the tcp listener will // actually be a tls listener. - tokio::spawn(peer_manager.inner.clone().tcp_listener()); + let handle = tokio::spawn(peer_manager.inner.clone().tcp_listener()); + peer_manager.abort_handles.push(handle.abort_handle()); if is_private_net { info!("Enabled private network mode"); } else { // Currently quic is not supported in private network mode. - tokio::spawn(peer_manager.inner.clone().quic_listener()); + let handle = tokio::spawn(peer_manager.inner.clone().quic_listener()); + peer_manager.abort_handles.push(handle.abort_handle()); }; // Start (re)connecting to outbound/local peers - tokio::spawn(peer_manager.inner.clone().connect_to_peers()); + let handle = tokio::spawn(peer_manager.inner.clone().connect_to_peers()); + peer_manager.abort_handles.push(handle.abort_handle()); // Discover local peers, this does not actually connect to them. That is handle by the // connect_to_peers task. if !disable_peer_discovery { - tokio::spawn( + let handle = tokio::spawn( peer_manager .inner .clone() .local_discovery(peer_discovery_port), ); + peer_manager.abort_handles.push(handle.abort_handle()); } Ok(peer_manager) @@ -319,6 +327,15 @@ where } } +impl Drop for PeerManager { + fn drop(&mut self) { + // Cancel all background tasks + for ah in &self.abort_handles { + ah.abort(); + } + } +} + impl Inner where M: Metrics + Clone + Send + 'static,