From cbd10a0d5baf1d406a98cbbb70e07a6df7fde71e Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 16 Nov 2023 15:50:19 +0100 Subject: [PATCH] Close #59: Improve dead peer detection Signed-off-by: Lee Smet --- src/peer.rs | 23 +++++++++++++++++++++-- src/peer_manager.rs | 5 +++++ src/router.rs | 20 ++++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/peer.rs b/src/peer.rs index e5067e6..cdbf655 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -1,5 +1,5 @@ use futures::{SinkExt, StreamExt}; -use log::{error, info, trace}; +use log::{debug, error, info, trace}; use std::{ error::Error, net::IpAddr, @@ -40,6 +40,7 @@ impl Peer { router_control_tx: mpsc::UnboundedSender<(ControlPacket, Peer)>, stream: TcpStream, overlay_ip: IpAddr, + dead_peer_sink: mpsc::Sender, ) -> Result> { // Data channel for peer let (to_peer_data, mut from_routing_data) = mpsc::unbounded_channel::(); @@ -88,10 +89,11 @@ impl Peer { } Some(Err(e)) => { error!("Error from framed: {}", e); + break; }, None => { info!("Stream is closed."); - return + break; } } } @@ -116,6 +118,7 @@ impl Peer { .filter_map(|item| item.map(|item| Ok(Packet::DataPacket(item))))); if let Err(e) = framed.send_all(&mut packet_stream).await { error!("Error writing to stream: {}", e); + break; } } @@ -123,10 +126,18 @@ impl Peer { // Send it over the TCP stream if let Err(e) = framed.send(Packet::ControlPacket(packet)).await { error!("Error writing to stream: {}", e); + break; } } } } + + // Notify router we are dead + let remote_id = peer.underlay_ip(); + debug!("Notifying router peer {remote_id} is dead"); + if let Err(e) = dead_peer_sink.send(peer).await { + error!("Peer {remote_id} could not notify router of termination: {e}"); + } }); } @@ -189,6 +200,12 @@ impl Peer { pub fn set_time_last_received_ihu(&self, time: tokio::time::Instant) { self.inner.state.write().unwrap().time_last_received_ihu = time } + + /// Checks if the connection to this `Peer` is alive and useable. If it is not, this `Peer` + /// instance is dead and should be disposed of. + pub fn connection_alive(&self) -> bool { + self.inner.state.read().unwrap().connection_alive + } } impl PartialEq for Peer { @@ -214,6 +231,7 @@ struct PeerState { time_last_received_hello: tokio::time::Instant, link_cost: u16, time_last_received_ihu: tokio::time::Instant, + connection_alive: bool, } impl PeerState { @@ -233,6 +251,7 @@ impl PeerState { link_cost, time_last_received_ihu, time_last_received_hello, + connection_alive: true, } } } diff --git a/src/peer_manager.rs b/src/peer_manager.rs index 081d4f4..0c7607b 100644 --- a/src/peer_manager.rs +++ b/src/peer_manager.rs @@ -78,6 +78,7 @@ impl PeerManager { self.router.router_control_tx(), peer_stream, received_overlay_ip, + self.router.dead_peer_sink().clone(), ) { info!("Connected to new peer {}", new_peer.underlay_ip()); self.router.add_peer_interface(new_peer); @@ -98,6 +99,7 @@ impl PeerManager { }; let router_data_tx = self.router.router_data_tx(); let router_control_tx = self.router.router_control_tx(); + let dead_peer_sink = self.router.dead_peer_sink(); match TcpListener::bind(("::", port)).await { Ok(listener) => loop { @@ -108,6 +110,7 @@ impl PeerManager { node_tun_addr, router_data_tx.clone(), router_control_tx.clone(), + dead_peer_sink.clone(), ) .await; match new_peer { @@ -136,6 +139,7 @@ impl PeerManager { node_tun_addr: Ipv6Addr, router_data_tx: Sender, router_control_tx: UnboundedSender<(ControlPacket, Peer)>, + dead_peer_sink: Sender, ) -> Result> { // Steps: // 1. Send own TUN address over the stream @@ -171,6 +175,7 @@ impl PeerManager { router_control_tx, stream, received_overlay_ip, + dead_peer_sink, ) } } diff --git a/src/router.rs b/src/router.rs index d93a2af..407f8c6 100644 --- a/src/router.rs +++ b/src/router.rs @@ -56,6 +56,8 @@ pub struct Router { node_tun: UnboundedSender, node_tun_subnet: Subnet, update_filters: Arc>>, + /// Channel injected into peers, so they can notify the router if they exit. + dead_peer_sink: mpsc::Sender, } impl Router { @@ -72,6 +74,7 @@ impl Router { let (router_data_tx, router_data_rx) = mpsc::channel::(1000); let (expired_source_key_sink, expired_source_key_stream) = mpsc::channel(1); let (expired_route_entry_sink, expired_route_entry_stream) = mpsc::channel(1); + let (dead_peer_sink, dead_peer_stream) = mpsc::channel(1); let router_inner = RouterInner::new(expired_source_key_sink, expired_route_entry_sink)?; let (mut inner_w, inner_r) = left_right::new_from_empty(router_inner); @@ -89,6 +92,7 @@ impl Router { router_control_tx, node_tun, node_tun_subnet, + dead_peer_sink, update_filters: Arc::new(update_filters), }; @@ -116,6 +120,8 @@ impl Router { expired_route_entry_stream, )); + tokio::spawn(Router::process_dead_peers(router.clone(), dead_peer_stream)); + Ok(router) } @@ -216,6 +222,11 @@ impl Router { .map(|(_, ss)| ss.clone()) } + /// Get a reference to this `Router`s' dead peer sink. + pub fn dead_peer_sink(&self) -> &mpsc::Sender { + &self.dead_peer_sink + } + pub fn print_selected_routes(&self) { let inner = self .inner_r @@ -468,6 +479,15 @@ impl Router { warn!("Expired route key processing halted"); } + /// Process notifications about peers who are dead. This allows peers who can self-diagnose + /// connection states to notify us, and allow for more efficient cleanup. + async fn process_dead_peers(self, mut dead_peer_stream: mpsc::Receiver) { + while let Some(dead_peer) = dead_peer_stream.recv().await { + self.handle_dead_peer(dead_peer); + } + warn!("Processing of dead peers halted"); + } + async fn handle_incoming_control_packet( self, mut router_control_rx: UnboundedReceiver<(ControlPacket, Peer)>,