Close #58: Properly handle dead peers

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2023-11-16 14:48:31 +01:00
parent b91fbfb37c
commit 9be41e1305
2 changed files with 102 additions and 85 deletions
+96 -61
View File
@@ -104,7 +104,7 @@ impl Router {
tokio::spawn(Router::propagate_static_route(router.clone()));
tokio::spawn(Router::propagate_selected_routes(router.clone()));
tokio::spawn(Router::check_for_dead_peers(router.clone(), router_id));
tokio::spawn(Router::check_for_dead_peers(router.clone()));
tokio::spawn(Router::process_expired_source_keys(
router.clone(),
@@ -271,7 +271,7 @@ impl Router {
}
}
async fn check_for_dead_peers(self, router_id: RouterId) {
async fn check_for_dead_peers(self) {
let ihu_threshold = tokio::time::Duration::from_secs(DEAD_PEER_TRESHOLD);
loop {
@@ -280,65 +280,109 @@ impl Router {
trace!("Checking for dead peers");
let mut inner_w = self.inner_w.lock().expect("Mutex is not poisoned");
let dead_peers = {
// a peer is assumed dead when the peer's last sent ihu exceeds a threshold
let mut dead_peers = Vec::new();
for peer in inner_w
.enter()
.expect("We deref through a write handle so this enter never fails")
.peer_interfaces
.iter()
{
for peer in self.inner_r.enter().expect("Write handle is saved on router so it can never go out of scope before read handle").peer_interfaces.iter() {
// check if the peer's last_received_ihu is greater than the threshold
if peer.time_last_received_ihu().elapsed() > ihu_threshold {
// peer is dead
info!("Peer {:?} is dead", peer.overlay_ip());
info!("Peer {} is dead", peer.underlay_ip());
dead_peers.push(peer.clone());
}
}
dead_peers
};
// vec to store retraction update that need to be sent
let mut retraction_updates = Vec::<ControlPacket>::with_capacity(dead_peers.len());
// remove the peer from the peer_interfaces and the routes
for dead_peer in dead_peers {
inner_w.append(RouterOpLogEntry::RemovePeer(dead_peer.clone()));
info!("Sending retraction for {} to peers", dead_peer.overlay_ip());
// create retraction update for each dead peer
let retraction_update = ControlPacket::new_update(
UPDATE_INTERVAL,
inner_w.enter().expect("Write handle is saved on router so it is not dropped before the read handles").router_seqno,
Metric::infinite(),
Subnet::new(dead_peer.overlay_ip(), 64).expect("TODO: fix this"),
router_id,
);
retraction_updates.push(retraction_update);
self.handle_dead_peer(dead_peer);
}
}
}
// Flush now, so when we aquire the next read handle the dead peers will have been
// removed.
inner_w.publish();
/// Remove a dead peer from the router.
fn handle_dead_peer(&self, dead_peer: Peer) {
debug!(
"Cleaning up peer {} which is reportedly dead",
dead_peer.underlay_ip()
);
// send retraction update for the dead peer
// when other nodes receive this update (with metric 0XFFFF), they should also remove the routing tables entries with that peer as neighbor
for peer in inner_w
.enter()
.expect("We deref through a write handle so this enter never fails")
.peer_interfaces
.iter()
{
for ru in retraction_updates.iter() {
if let Err(e) = peer.send_control_packet(ru.clone()) {
error!("Error sending retraction update to peer: {e}");
}
// Scope for the mutex lock
let subnets_to_select = {
let inner = self.inner_r.enter().expect(
"Write handle is saved on the router so it is not dropped before the read handles",
);
let mut inner_w = self.inner_w.lock().unwrap();
let mut subnets_to_select = Vec::new();
for (rk, re) in inner.routing_table.iter() {
if rk.neighbour() == &dead_peer {
subnets_to_select.push(rk.subnet());
inner_w.append(RouterOpLogEntry::UpdateRouteEntry(
rk,
re.seqno(),
Metric::infinite(),
re.source().router_id(),
));
}
}
// Make sure we release the read handle, so a publish on the write handle eventually
// succeeds.
drop(inner);
inner_w.append(RouterOpLogEntry::RemovePeer(dead_peer));
inner_w.publish();
subnets_to_select
};
// And run required route selection
for subnet in subnets_to_select {
self.route_selection(subnet);
}
}
/// Run route selection for a given subnet
fn route_selection(&self, subnet: Subnet) {
debug!("Running route selection for {subnet}");
let mut inner_w = self.inner_w.lock().unwrap();
let routes = inner_w
.enter()
.expect("Deref through write handle so there always is a write handle in scope here")
.routing_table
.entries(subnet);
// No routes for subnet, nothing to do here.
if routes.is_empty() {
return;
}
// If there is no selected route there is nothing to do here. We keep expired routes in the
// table for a while so updates of those should already have propagated to peers.
if let Some(new_selected) = self.find_best_route(&routes) {
if new_selected.neighbour() == routes[0].neighbour() && routes[0].selected() {
debug!(
"New selected route for {subnet} is the same as the route alreayd installed"
);
return;
}
if new_selected.metric().is_infinite()
&& routes[0].metric().is_infinite()
&& routes[0].selected()
{
debug!("New selected route for {subnet} is retracted, like the previously selected route");
return;
}
inner_w.append(RouterOpLogEntry::SelectRoute(RouteKey::new(
subnet,
new_selected.neighbour().clone(),
)));
inner_w.publish();
Router::trigger_update(&mut inner_w, subnet, self.router_id);
}
}
@@ -992,24 +1036,23 @@ impl RouterInner {
Ok(router_inner)
}
fn remove_peer_interface(&mut self, peer: Peer) {
self.peer_interfaces.retain(|p| p != &peer);
fn remove_peer_interface(&mut self, peer: &Peer) {
self.peer_interfaces.retain(|p| p != peer);
}
fn send_update(&self, peer: &Peer, update: babel::Update) -> Option<RouterOpLogEntry> {
// before sending an update, the source table might need to be updated
let metric = update.metric();
// Nothing to do on route retraction.
if metric.is_infinite() {
return None;
}
let seqno = update.seqno();
let router_id = update.router_id();
let subnet = update.subnet();
let source_key = SourceKey::new(subnet, router_id);
let op = if let Some(source_entry) = self.source_table.get(&source_key) {
// Nothing to do on route retraction.
let op = if metric.is_infinite() {
None
} else if let Some(source_entry) = self.source_table.get(&source_key) {
// if seqno of the update is greater than the seqno in the source table, update the source table
if !seqno.lt(&source_entry.seqno()) {
Some(RouterOpLogEntry::InsertSourceEntry(
@@ -1094,13 +1137,7 @@ impl RouterInner {
continue;
}
}
let update = babel::Update::new(
UPDATE_INTERVAL,
seqno, // updates receive the seqno of the router
cost,
subnet,
router_id,
);
let update = babel::Update::new(UPDATE_INTERVAL, seqno, cost, subnet, router_id);
debug!(
"Propagating route update for {} to {}",
subnet,
@@ -1128,7 +1165,7 @@ impl RouterInner {
}
let update = babel::Update::new(
UPDATE_INTERVAL,
sre.seqno(), // updates receive the seqno of the router
sre.seqno(),
// the cost of the route is the cost of the route + the cost of the link to the next-hop
sre.metric() + neigh_link_cost,
srk.subnet(),
@@ -1194,9 +1231,7 @@ impl left_right::Absorb<RouterOpLogEntry> for RouterInner {
}
RouterOpLogEntry::AddPeer(peer) => self.peer_interfaces.push(peer.clone()),
RouterOpLogEntry::RemovePeer(peer) => {
self.remove_peer_interface(peer.clone());
// remove the peer's routes from all routing tables (= all the peers that use the peer as next-hop)
self.routing_table.remove_peer(peer.clone());
self.remove_peer_interface(peer);
}
RouterOpLogEntry::InsertSourceEntry(sk, fd) => {
self.source_table
+6 -24
View File
@@ -62,6 +62,12 @@ impl RouteKey {
pub const fn subnet(&self) -> Subnet {
self.subnet
}
/// Returns the [`neighbour`](Peer) associated with this `RouteKey`.
#[inline]
pub fn neighbour(&self) -> &Peer {
&self.neighbor
}
}
impl RouteEntry {
@@ -297,30 +303,6 @@ impl RoutingTable {
})
}
/// Remove all [`RouteKey`] and [`RouteEntry`] pairs where the [`RouteEntry`]'s neighbour value
/// is the given [`Peer`].
pub fn remove_peer(&mut self, peer: Peer) {
let mut to_remove = Vec::new();
for (ip, mask, entries) in self.table.iter_mut() {
entries.retain_mut(|(entry, expiration)| {
if entry.neighbor == peer {
expiration.abort();
false
} else {
true
}
});
if entries.is_empty() {
to_remove.push((ip, mask));
}
}
// Clear empty lists
for (ip, mask) in to_remove {
self.table.remove(ip, mask);
}
}
/// Look up a selected route for an [`IpAddr`] in the `RoutingTable`.
///
/// Currently only IPv6 is supported, looking up an IPv4 address always returns [`Option::None`].