mirror of
https://github.com/threefoldtech/mycelium.git
synced 2026-06-03 14:24:10 +00:00
Close #59: Improve dead peer detection
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
+21
-2
@@ -1,5 +1,5 @@
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use log::{error, info, trace};
|
||||
use log::{debug, error, info, trace};
|
||||
use std::{
|
||||
error::Error,
|
||||
net::IpAddr,
|
||||
@@ -40,6 +40,7 @@ impl Peer {
|
||||
router_control_tx: mpsc::UnboundedSender<(ControlPacket, Peer)>,
|
||||
stream: TcpStream,
|
||||
overlay_ip: IpAddr,
|
||||
dead_peer_sink: mpsc::Sender<Peer>,
|
||||
) -> Result<Self, Box<dyn Error>> {
|
||||
// Data channel for peer
|
||||
let (to_peer_data, mut from_routing_data) = mpsc::unbounded_channel::<DataPacket>();
|
||||
@@ -88,10 +89,11 @@ impl Peer {
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("Error from framed: {}", e);
|
||||
break;
|
||||
},
|
||||
None => {
|
||||
info!("Stream is closed.");
|
||||
return
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -116,6 +118,7 @@ impl Peer {
|
||||
.filter_map(|item| item.map(|item| Ok(Packet::DataPacket(item)))));
|
||||
if let Err(e) = framed.send_all(&mut packet_stream).await {
|
||||
error!("Error writing to stream: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,10 +126,18 @@ impl Peer {
|
||||
// Send it over the TCP stream
|
||||
if let Err(e) = framed.send(Packet::ControlPacket(packet)).await {
|
||||
error!("Error writing to stream: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Notify router we are dead
|
||||
let remote_id = peer.underlay_ip();
|
||||
debug!("Notifying router peer {remote_id} is dead");
|
||||
if let Err(e) = dead_peer_sink.send(peer).await {
|
||||
error!("Peer {remote_id} could not notify router of termination: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -189,6 +200,12 @@ impl Peer {
|
||||
pub fn set_time_last_received_ihu(&self, time: tokio::time::Instant) {
|
||||
self.inner.state.write().unwrap().time_last_received_ihu = time
|
||||
}
|
||||
|
||||
/// Checks if the connection to this `Peer` is alive and useable. If it is not, this `Peer`
|
||||
/// instance is dead and should be disposed of.
|
||||
pub fn connection_alive(&self) -> bool {
|
||||
self.inner.state.read().unwrap().connection_alive
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Peer {
|
||||
@@ -214,6 +231,7 @@ struct PeerState {
|
||||
time_last_received_hello: tokio::time::Instant,
|
||||
link_cost: u16,
|
||||
time_last_received_ihu: tokio::time::Instant,
|
||||
connection_alive: bool,
|
||||
}
|
||||
|
||||
impl PeerState {
|
||||
@@ -233,6 +251,7 @@ impl PeerState {
|
||||
link_cost,
|
||||
time_last_received_ihu,
|
||||
time_last_received_hello,
|
||||
connection_alive: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +78,7 @@ impl PeerManager {
|
||||
self.router.router_control_tx(),
|
||||
peer_stream,
|
||||
received_overlay_ip,
|
||||
self.router.dead_peer_sink().clone(),
|
||||
) {
|
||||
info!("Connected to new peer {}", new_peer.underlay_ip());
|
||||
self.router.add_peer_interface(new_peer);
|
||||
@@ -98,6 +99,7 @@ impl PeerManager {
|
||||
};
|
||||
let router_data_tx = self.router.router_data_tx();
|
||||
let router_control_tx = self.router.router_control_tx();
|
||||
let dead_peer_sink = self.router.dead_peer_sink();
|
||||
|
||||
match TcpListener::bind(("::", port)).await {
|
||||
Ok(listener) => loop {
|
||||
@@ -108,6 +110,7 @@ impl PeerManager {
|
||||
node_tun_addr,
|
||||
router_data_tx.clone(),
|
||||
router_control_tx.clone(),
|
||||
dead_peer_sink.clone(),
|
||||
)
|
||||
.await;
|
||||
match new_peer {
|
||||
@@ -136,6 +139,7 @@ impl PeerManager {
|
||||
node_tun_addr: Ipv6Addr,
|
||||
router_data_tx: Sender<DataPacket>,
|
||||
router_control_tx: UnboundedSender<(ControlPacket, Peer)>,
|
||||
dead_peer_sink: Sender<Peer>,
|
||||
) -> Result<Peer, Box<dyn std::error::Error>> {
|
||||
// Steps:
|
||||
// 1. Send own TUN address over the stream
|
||||
@@ -171,6 +175,7 @@ impl PeerManager {
|
||||
router_control_tx,
|
||||
stream,
|
||||
received_overlay_ip,
|
||||
dead_peer_sink,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,8 @@ pub struct Router {
|
||||
node_tun: UnboundedSender<DataPacket>,
|
||||
node_tun_subnet: Subnet,
|
||||
update_filters: Arc<Vec<Box<dyn RouteUpdateFilter + Send + Sync>>>,
|
||||
/// Channel injected into peers, so they can notify the router if they exit.
|
||||
dead_peer_sink: mpsc::Sender<Peer>,
|
||||
}
|
||||
|
||||
impl Router {
|
||||
@@ -72,6 +74,7 @@ impl Router {
|
||||
let (router_data_tx, router_data_rx) = mpsc::channel::<DataPacket>(1000);
|
||||
let (expired_source_key_sink, expired_source_key_stream) = mpsc::channel(1);
|
||||
let (expired_route_entry_sink, expired_route_entry_stream) = mpsc::channel(1);
|
||||
let (dead_peer_sink, dead_peer_stream) = mpsc::channel(1);
|
||||
|
||||
let router_inner = RouterInner::new(expired_source_key_sink, expired_route_entry_sink)?;
|
||||
let (mut inner_w, inner_r) = left_right::new_from_empty(router_inner);
|
||||
@@ -89,6 +92,7 @@ impl Router {
|
||||
router_control_tx,
|
||||
node_tun,
|
||||
node_tun_subnet,
|
||||
dead_peer_sink,
|
||||
update_filters: Arc::new(update_filters),
|
||||
};
|
||||
|
||||
@@ -116,6 +120,8 @@ impl Router {
|
||||
expired_route_entry_stream,
|
||||
));
|
||||
|
||||
tokio::spawn(Router::process_dead_peers(router.clone(), dead_peer_stream));
|
||||
|
||||
Ok(router)
|
||||
}
|
||||
|
||||
@@ -216,6 +222,11 @@ impl Router {
|
||||
.map(|(_, ss)| ss.clone())
|
||||
}
|
||||
|
||||
/// Get a reference to this `Router`s' dead peer sink.
|
||||
pub fn dead_peer_sink(&self) -> &mpsc::Sender<Peer> {
|
||||
&self.dead_peer_sink
|
||||
}
|
||||
|
||||
pub fn print_selected_routes(&self) {
|
||||
let inner = self
|
||||
.inner_r
|
||||
@@ -468,6 +479,15 @@ impl Router {
|
||||
warn!("Expired route key processing halted");
|
||||
}
|
||||
|
||||
/// Process notifications about peers who are dead. This allows peers who can self-diagnose
|
||||
/// connection states to notify us, and allow for more efficient cleanup.
|
||||
async fn process_dead_peers(self, mut dead_peer_stream: mpsc::Receiver<Peer>) {
|
||||
while let Some(dead_peer) = dead_peer_stream.recv().await {
|
||||
self.handle_dead_peer(dead_peer);
|
||||
}
|
||||
warn!("Processing of dead peers halted");
|
||||
}
|
||||
|
||||
async fn handle_incoming_control_packet(
|
||||
self,
|
||||
mut router_control_rx: UnboundedReceiver<(ControlPacket, Peer)>,
|
||||
|
||||
Reference in New Issue
Block a user