diff --git a/src/router.rs b/src/router.rs index 965d07a..d93a2af 100644 --- a/src/router.rs +++ b/src/router.rs @@ -104,7 +104,7 @@ impl Router { tokio::spawn(Router::propagate_static_route(router.clone())); tokio::spawn(Router::propagate_selected_routes(router.clone())); - tokio::spawn(Router::check_for_dead_peers(router.clone(), router_id)); + tokio::spawn(Router::check_for_dead_peers(router.clone())); tokio::spawn(Router::process_expired_source_keys( router.clone(), @@ -271,7 +271,7 @@ impl Router { } } - async fn check_for_dead_peers(self, router_id: RouterId) { + async fn check_for_dead_peers(self) { let ihu_threshold = tokio::time::Duration::from_secs(DEAD_PEER_TRESHOLD); loop { @@ -280,65 +280,109 @@ impl Router { trace!("Checking for dead peers"); - let mut inner_w = self.inner_w.lock().expect("Mutex is not poisoned"); - let dead_peers = { // a peer is assumed dead when the peer's last sent ihu exceeds a threshold let mut dead_peers = Vec::new(); - for peer in inner_w - .enter() - .expect("We deref through a write handle so this enter never fails") - .peer_interfaces - .iter() - { + for peer in self.inner_r.enter().expect("Write handle is saved on router so it can never go out of scope before read handle").peer_interfaces.iter() { // check if the peer's last_received_ihu is greater than the threshold if peer.time_last_received_ihu().elapsed() > ihu_threshold { // peer is dead - info!("Peer {:?} is dead", peer.overlay_ip()); + info!("Peer {} is dead", peer.underlay_ip()); dead_peers.push(peer.clone()); } } dead_peers }; - // vec to store retraction update that need to be sent - let mut retraction_updates = Vec::::with_capacity(dead_peers.len()); - - // remove the peer from the peer_interfaces and the routes for dead_peer in dead_peers { - inner_w.append(RouterOpLogEntry::RemovePeer(dead_peer.clone())); - - info!("Sending retraction for {} to peers", dead_peer.overlay_ip()); - - // create retraction update for each dead peer - let retraction_update = ControlPacket::new_update( - UPDATE_INTERVAL, - inner_w.enter().expect("Write handle is saved on router so it is not dropped before the read handles").router_seqno, - Metric::infinite(), - Subnet::new(dead_peer.overlay_ip(), 64).expect("TODO: fix this"), - router_id, - ); - retraction_updates.push(retraction_update); + self.handle_dead_peer(dead_peer); } + } + } - // Flush now, so when we aquire the next read handle the dead peers will have been - // removed. - inner_w.publish(); + /// Remove a dead peer from the router. + fn handle_dead_peer(&self, dead_peer: Peer) { + debug!( + "Cleaning up peer {} which is reportedly dead", + dead_peer.underlay_ip() + ); - // send retraction update for the dead peer - // when other nodes receive this update (with metric 0XFFFF), they should also remove the routing tables entries with that peer as neighbor - for peer in inner_w - .enter() - .expect("We deref through a write handle so this enter never fails") - .peer_interfaces - .iter() - { - for ru in retraction_updates.iter() { - if let Err(e) = peer.send_control_packet(ru.clone()) { - error!("Error sending retraction update to peer: {e}"); - } + // Scope for the mutex lock + let subnets_to_select = { + let inner = self.inner_r.enter().expect( + "Write handle is saved on the router so it is not dropped before the read handles", + ); + let mut inner_w = self.inner_w.lock().unwrap(); + + let mut subnets_to_select = Vec::new(); + + for (rk, re) in inner.routing_table.iter() { + if rk.neighbour() == &dead_peer { + subnets_to_select.push(rk.subnet()); + inner_w.append(RouterOpLogEntry::UpdateRouteEntry( + rk, + re.seqno(), + Metric::infinite(), + re.source().router_id(), + )); } } + // Make sure we release the read handle, so a publish on the write handle eventually + // succeeds. + drop(inner); + inner_w.append(RouterOpLogEntry::RemovePeer(dead_peer)); + inner_w.publish(); + + subnets_to_select + }; + + // And run required route selection + for subnet in subnets_to_select { + self.route_selection(subnet); + } + } + + /// Run route selection for a given subnet + fn route_selection(&self, subnet: Subnet) { + debug!("Running route selection for {subnet}"); + let mut inner_w = self.inner_w.lock().unwrap(); + + let routes = inner_w + .enter() + .expect("Deref through write handle so there always is a write handle in scope here") + .routing_table + .entries(subnet); + + // No routes for subnet, nothing to do here. + if routes.is_empty() { + return; + } + + // If there is no selected route there is nothing to do here. We keep expired routes in the + // table for a while so updates of those should already have propagated to peers. + if let Some(new_selected) = self.find_best_route(&routes) { + if new_selected.neighbour() == routes[0].neighbour() && routes[0].selected() { + debug!( + "New selected route for {subnet} is the same as the route alreayd installed" + ); + return; + } + + if new_selected.metric().is_infinite() + && routes[0].metric().is_infinite() + && routes[0].selected() + { + debug!("New selected route for {subnet} is retracted, like the previously selected route"); + return; + } + + inner_w.append(RouterOpLogEntry::SelectRoute(RouteKey::new( + subnet, + new_selected.neighbour().clone(), + ))); + inner_w.publish(); + + Router::trigger_update(&mut inner_w, subnet, self.router_id); } } @@ -992,24 +1036,23 @@ impl RouterInner { Ok(router_inner) } - fn remove_peer_interface(&mut self, peer: Peer) { - self.peer_interfaces.retain(|p| p != &peer); + fn remove_peer_interface(&mut self, peer: &Peer) { + self.peer_interfaces.retain(|p| p != peer); } fn send_update(&self, peer: &Peer, update: babel::Update) -> Option { // before sending an update, the source table might need to be updated let metric = update.metric(); - // Nothing to do on route retraction. - if metric.is_infinite() { - return None; - } let seqno = update.seqno(); let router_id = update.router_id(); let subnet = update.subnet(); let source_key = SourceKey::new(subnet, router_id); - let op = if let Some(source_entry) = self.source_table.get(&source_key) { + // Nothing to do on route retraction. + let op = if metric.is_infinite() { + None + } else if let Some(source_entry) = self.source_table.get(&source_key) { // if seqno of the update is greater than the seqno in the source table, update the source table if !seqno.lt(&source_entry.seqno()) { Some(RouterOpLogEntry::InsertSourceEntry( @@ -1094,13 +1137,7 @@ impl RouterInner { continue; } } - let update = babel::Update::new( - UPDATE_INTERVAL, - seqno, // updates receive the seqno of the router - cost, - subnet, - router_id, - ); + let update = babel::Update::new(UPDATE_INTERVAL, seqno, cost, subnet, router_id); debug!( "Propagating route update for {} to {}", subnet, @@ -1128,7 +1165,7 @@ impl RouterInner { } let update = babel::Update::new( UPDATE_INTERVAL, - sre.seqno(), // updates receive the seqno of the router + sre.seqno(), // the cost of the route is the cost of the route + the cost of the link to the next-hop sre.metric() + neigh_link_cost, srk.subnet(), @@ -1194,9 +1231,7 @@ impl left_right::Absorb for RouterInner { } RouterOpLogEntry::AddPeer(peer) => self.peer_interfaces.push(peer.clone()), RouterOpLogEntry::RemovePeer(peer) => { - self.remove_peer_interface(peer.clone()); - // remove the peer's routes from all routing tables (= all the peers that use the peer as next-hop) - self.routing_table.remove_peer(peer.clone()); + self.remove_peer_interface(peer); } RouterOpLogEntry::InsertSourceEntry(sk, fd) => { self.source_table diff --git a/src/routing_table.rs b/src/routing_table.rs index 39bdb02..7f839d4 100644 --- a/src/routing_table.rs +++ b/src/routing_table.rs @@ -62,6 +62,12 @@ impl RouteKey { pub const fn subnet(&self) -> Subnet { self.subnet } + + /// Returns the [`neighbour`](Peer) associated with this `RouteKey`. + #[inline] + pub fn neighbour(&self) -> &Peer { + &self.neighbor + } } impl RouteEntry { @@ -297,30 +303,6 @@ impl RoutingTable { }) } - /// Remove all [`RouteKey`] and [`RouteEntry`] pairs where the [`RouteEntry`]'s neighbour value - /// is the given [`Peer`]. - pub fn remove_peer(&mut self, peer: Peer) { - let mut to_remove = Vec::new(); - for (ip, mask, entries) in self.table.iter_mut() { - entries.retain_mut(|(entry, expiration)| { - if entry.neighbor == peer { - expiration.abort(); - false - } else { - true - } - }); - if entries.is_empty() { - to_remove.push((ip, mask)); - } - } - - // Clear empty lists - for (ip, mask) in to_remove { - self.table.remove(ip, mask); - } - } - /// Look up a selected route for an [`IpAddr`] in the `RoutingTable`. /// /// Currently only IPv6 is supported, looking up an IPv4 address always returns [`Option::None`].