diff --git a/src/main.rs b/src/main.rs index a316220..360f8b1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,10 +52,17 @@ async fn main() -> Result<(), Box> { let static_peers = cli.static_peers; // Creating a new Router instance - let router = Arc::new(router::Router::new( - node_tun.clone(), - vec![StaticRoute::new(cli.tun_addr.into())], - )); + let router = match router::Router::new(node_tun.clone(), vec![StaticRoute::new(cli.tun_addr.into())]) { + Ok(router) => { + println!("Router created"); + router + } + Err(e) => { + panic!("Error creating router: {}", e); + } + }; + + // Creating a new PeerManager instance let _peer_manager: peer_manager::PeerManager = peer_manager::PeerManager::new(router.clone(), static_peers); @@ -63,8 +70,8 @@ async fn main() -> Result<(), Box> { // Read packets from the TUN interface (originating from the kernel) and send them to the router // Note: we will never receive control packets from the kernel, only data packets { - let node_tun = node_tun.clone(); let router = router.clone(); + let node_tun = node_tun.clone(); tokio::spawn(async move { loop { @@ -97,7 +104,7 @@ async fn main() -> Result<(), Box> { dest_ip: dest_addr, raw_data: buf.to_vec(), }; - if router.router_data_tx.send(data_packet).is_err() { + if router.router_data_tx().send(data_packet).is_err() { eprintln!("Failed to send data_packet"); } } else { @@ -109,7 +116,6 @@ async fn main() -> Result<(), Box> { let mut reader = tokio::io::BufReader::new(tokio::io::stdin()); let mut line = String::new(); - let router = router.clone(); let read_handle = tokio::spawn(async move { loop { @@ -123,7 +129,7 @@ async fn main() -> Result<(), Box> { router.print_routes(); println!("\n----------- Current peers: -----------"); - for p in router.get_peer_interfaces() { + for p in router.peer_interfaces() { println!( "Peer: {:?}, with link cost: {}", p.overlay_ip(), diff --git a/src/node_setup.rs b/src/node_setup.rs index f308b86..6076603 100644 --- a/src/node_setup.rs +++ b/src/node_setup.rs @@ -1,5 +1,6 @@ use futures::stream::TryStreamExt; use rtnetlink::Handle; +use core::fmt; use std::{error::Error, net::Ipv4Addr, sync::Arc}; use tokio_tun::{Tun, TunBuilder}; @@ -61,4 +62,4 @@ pub async fn setup_node(tun_addr: Ipv4Addr) -> Result, Box> println!("Static route created"); Ok(tun) -} +} \ No newline at end of file diff --git a/src/peer.rs b/src/peer.rs index 3acee1b..24a380f 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -106,6 +106,14 @@ impl Peer { } } +impl PartialEq for Peer { + fn eq(&self, other: &Self) -> bool { + self.overlay_ip() == other.overlay_ip() + } +} + + + #[derive(Debug)] struct PeerInner { stream_ip: IpAddr, diff --git a/src/peer_manager.rs b/src/peer_manager.rs index 9d638d9..26e8ae8 100644 --- a/src/peer_manager.rs +++ b/src/peer_manager.rs @@ -16,11 +16,11 @@ struct PeersConfig { #[derive(Clone)] pub struct PeerManager { - pub router: Arc, + pub router: Router, } impl PeerManager { - pub fn new(router: Arc, static_peers_sockets: Vec) -> Self { + pub fn new(router: Router, static_peers_sockets: Vec) -> Self { let peer_manager = PeerManager { router }; // Start a TCP listener. When a new connection is accepted, the reverse peer exchange is performed. @@ -63,7 +63,7 @@ impl PeerManager { ); let mut buf = [0u8; 17]; - match self.router.get_node_tun_address() { + match self.router.node_tun_addr() { IpAddr::V4(tun_addr) => { buf[0] = 0; buf[1..5].copy_from_slice(&tun_addr.octets()[..]); @@ -78,12 +78,12 @@ impl PeerManager { let peer_stream_ip = peer_addr.ip(); if let Ok(new_peer) = Peer::new( peer_stream_ip, - self.router.router_data_tx.clone(), - self.router.router_control_tx.clone(), + self.router.router_data_tx(), + self.router.router_control_tx(), peer_stream, received_overlay_ip, ) { - self.router.add_directly_connected_peer(new_peer); + self.router.add_peer_interface(new_peer); } } } @@ -120,7 +120,7 @@ impl PeerManager { let mut buf = [0u8; 17]; - match self.router.get_node_tun_address() { + match self.router.node_tun_addr() { IpAddr::V4(tun_addr) => { buf[0] = 0; buf[1..5].copy_from_slice(&tun_addr.octets()[..]); @@ -136,12 +136,12 @@ impl PeerManager { let peer_stream_ip = peer_addr.ip(); if let Ok(new_peer) = Peer::new( peer_stream_ip, - self.router.router_data_tx.clone(), - self.router.router_control_tx.clone(), + self.router.router_data_tx(), + self.router.router_control_tx(), peer_stream, received_overlay_ip, ) { - self.router.add_directly_connected_peer(new_peer); + self.router.add_peer_interface(new_peer); } } } @@ -165,14 +165,14 @@ impl PeerManager { } } - async fn start_reverse_peer_exchange(mut stream: TcpStream, router: Arc) { + async fn start_reverse_peer_exchange(mut stream: TcpStream, router: Router) { // Steps: // 1. Send own TUN address over the stream // 2. Read other node's TUN address from the stream let mut buf = [0u8; 17]; - match router.get_node_tun_address() { + match router.node_tun_addr() { IpAddr::V4(tun_addr) => { buf[0] = 0; buf[1..5].copy_from_slice(&tun_addr.octets()[..]); @@ -203,14 +203,14 @@ impl PeerManager { let peer_stream_ip = stream.peer_addr().unwrap().ip(); let new_peer = Peer::new( peer_stream_ip, - router.router_data_tx.clone(), - router.router_control_tx.clone(), + router.router_data_tx(), + router.router_control_tx(), stream, received_overlay_ip, ); match new_peer { Ok(new_peer) => { - router.add_directly_connected_peer(new_peer); + router.add_peer_interface(new_peer); } Err(e) => { eprintln!("Error creating peer: {}", e); diff --git a/src/router.rs b/src/router.rs index ba0fdde..b88765d 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,18 +1,15 @@ use crate::{ - packet::{ - BabelPacketBody, BabelPacketHeader, BabelTLV, BabelTLVType, ControlPacket, ControlStruct, - DataPacket, - }, + packet::{BabelTLV, BabelTLVType, ControlPacket, ControlStruct, DataPacket}, peer::Peer, - routing_table::{self, RouteEntry, RouteKey, RoutingTable}, - source_table::{self, FeasibilityDistance, SourceKey, SourceTable}, - timers::{self, Timer}, + routing_table::{RouteEntry, RouteKey, RoutingTable}, + source_table::{FeasibilityDistance, SourceKey, SourceTable}, }; use rand::Rng; use std::{ - collections::HashMap, - net::{IpAddr, Ipv4Addr}, - sync::{Arc, Mutex}, + error::Error, + fmt::Debug, + net::IpAddr, + sync::{Arc, RwLock}, }; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio_tun::Tun; @@ -40,440 +37,118 @@ impl StaticRoute { #[derive(Clone)] pub struct Router { - pub router_id: u64, - // The peer interfaces are the known neighbors of this node - peer_interfaces: Arc>>, - pub router_control_tx: UnboundedSender, - pub router_data_tx: UnboundedSender, - pub node_tun: Arc, - pub routing_table: Arc>, - pub source_table: Arc>, - pub router_seqno: u16, - static_routes: Arc>>, + inner: Arc>, } impl Router { - pub fn new(node_tun: Arc, static_routes: Vec) -> Self { + pub fn new( + node_tun: Arc, + static_routes: Vec, + ) -> Result> { // Tx is passed onto each new peer instance. This enables peers to send control packets to the router. let (router_control_tx, router_control_rx) = mpsc::unbounded_channel::(); // Tx is passed onto each new peer instance. This enables peers to send data packets to the router. let (router_data_tx, router_data_rx) = mpsc::unbounded_channel::(); let router = Router { - router_id: rand::thread_rng().gen(), - peer_interfaces: Arc::new(Mutex::new(Vec::new())), - router_control_tx, - router_data_tx: router_data_tx.clone(), - node_tun, - routing_table: Arc::new(Mutex::new(RoutingTable::new())), - source_table: Arc::new(Mutex::new(SourceTable::new())), - router_seqno: 0, - static_routes: Arc::new(Mutex::new(static_routes)), + inner: Arc::new(RwLock::new(RouterInner::new( + node_tun, + static_routes, + router_data_tx, + router_control_tx, + )?)), }; + tokio::spawn(Router::start_periodic_hello_sender(router.clone())); tokio::spawn(Router::handle_incoming_control_packet( router.clone(), router_control_rx, )); - tokio::spawn(Router::handle_incoming_data_packets( + tokio::spawn(Router::handle_incoming_data_packet( router.clone(), router_data_rx, - router_data_tx.clone(), )); - tokio::spawn(Router::start_periodic_hello_sender(router.clone())); - - // loops over all peers and adds routing table entries for each peer - //tokio::spawn(Router::initialize_peer_route_entries(router.clone())); - - // propagate routes tokio::spawn(Router::propagate_routes(router.clone())); - router + Ok(router) } - async fn handle_incoming_control_packet( - self, - mut router_control_rx: UnboundedReceiver, - ) { - loop { - while let Some(control_struct) = router_control_rx.recv().await { - match control_struct.control_packet.body.tlv_type { - BabelTLVType::AckReq => todo!(), - BabelTLVType::Ack => todo!(), - BabelTLVType::Hello => Self::handle_incoming_hello(control_struct), - BabelTLVType::IHU => Self::handle_incoming_ihu(&self, control_struct), - BabelTLVType::NextHop => todo!(), - BabelTLVType::Update => { - let mut source_table = self.source_table.lock().unwrap(); - let mut routing_table = self.routing_table.lock().unwrap(); - Self::handle_incoming_update( - self.clone(), - &mut source_table, - &mut routing_table, - control_struct, - ); - } - BabelTLVType::RouteReq => todo!(), - BabelTLVType::SeqnoReq => todo!(), - } - } - } + // pub fn source_table(&self) -> SourceTable { + // self.inner.read().unwrap().source_table + // } + + // pub fn routing_table(&self) -> RoutingTable { + // self.inner.read().unwrap().routing_table + // } + + pub fn router_id(&self) -> u64 { + self.inner.read().unwrap().router_id } - async fn handle_incoming_data_packets( - self, - mut router_data_rx: UnboundedReceiver, - router_data_tx: UnboundedSender, - ) { - // If the destination IP of the data packet matches with the IP address of this node's TUN interface - // we should forward the data packet towards the TUN interface. - // If the destination IP doesn't match, we need to lookup if we have a matching peer instance - // where the destination IP matches with the peer's overlay IP. If we do, we should forward the - // data packet to the peer's to_peer_data channel. - - let tun_addr = self.node_tun.address().unwrap(); - loop { - while let Some(data_packet) = router_data_rx.recv().await { - let dest_ip = data_packet.dest_ip; - println!( - "Received data packet with destination: {}", - data_packet.dest_ip - ); - - if dest_ip == tun_addr { - match self.node_tun.send(&data_packet.raw_data).await { - Ok(_) => {} - Err(e) => eprintln!("Error sending data packet to TUN interface: {:?}", e), - } - } else { - // router_data_tx.send(data_packet).unwrap(); - // DO BABEL route selection - // kijke nr next-hop en gwn sturenn naar de peer die daarmee overeenkomt - - // select the best route towards the destination - let best_route = self.select_best_route(IpAddr::V4(dest_ip)); - - // get the peer corresponding to the best the best route - let peer = self.get_peer_by_ip(best_route.unwrap().next_hop).unwrap(); - if let Err(e) = peer.send_data_packet(data_packet) { - eprintln!("Error sending data packet to peer: {:?}", e); - } - } - } - } + pub fn router_control_tx(&self) -> UnboundedSender { + self.inner.read().unwrap().router_control_tx.clone() } - async fn start_periodic_hello_sender(self) { - loop { - tokio::time::sleep(tokio::time::Duration::from_secs(HELLO_INTERVAL as u64)).await; - for peer in self.peer_interfaces.lock().unwrap().iter_mut() { - // create a new hello packet for this peer - let hello = ControlPacket::new_hello(peer, HELLO_INTERVAL); - // set the last hello timestamp for this peer - peer.set_time_last_received_hello(tokio::time::Instant::now()); - // send the hello packet to the peer - println!("Sending hello to peer: {:?}", peer.overlay_ip()); - if let Err(error) = peer.send_control_packet(hello) { - eprintln!("Error sending hello to peer: {}", error); - } - } - } + pub fn router_data_tx(&self) -> UnboundedSender { + self.inner.read().unwrap().router_data_tx.clone() } - fn handle_incoming_hello(control_struct: ControlStruct) { - let destination_ip = control_struct.src_overlay_ip; - control_struct.reply(ControlPacket::new_ihu(IHU_INTERVAL, destination_ip)); + pub fn node_tun_addr(&self) -> IpAddr { + IpAddr::V4(self.inner.read().unwrap().node_tun.address().unwrap()) } - fn handle_incoming_ihu(&self, control_struct: ControlStruct) { - let mut source_peer = self.get_source_peer_from_control_struct(control_struct); - // reset the IHU timer associated with the peer - source_peer.reset_ihu_timer(tokio::time::Duration::from_secs(IHU_INTERVAL as u64)); - // measure time between Hello and and IHU and set the link cost - let time_diff = tokio::time::Instant::now() - .duration_since(source_peer.time_last_received_hello()) - .as_millis(); - - source_peer.set_link_cost(time_diff as u16); - println!( - "Link cost for peer {:?} set to {}", - source_peer.overlay_ip(), - source_peer.link_cost() - ); - - println!("IHU timer for peer {:?} reset", source_peer.overlay_ip()); + pub fn node_tun(&self) -> Arc { + self.inner.read().unwrap().node_tun.clone() } - fn handle_incoming_update( - router: Router, - source_table: &mut SourceTable, - routing_table: &mut RoutingTable, - update: ControlStruct, - ) { - if source_table.is_feasible(&update) { - println!("incoming update is feasible"); - source_table.update(&update); - - // get routing table entry for the source of the update - match update.control_packet.body.tlv { - BabelTLV::Update { - plen, - interval: _, - seqno, - metric, - prefix, - router_id, - } => { - println!( - "Received update from {} for {:?} with seqno {} and metric {}", - router_id, prefix, seqno, metric - ); - let source_ip = update.src_overlay_ip; - - // get RouteEntry for the source of the update - let route_key = &RouteKey { - prefix, - plen, - neighbor: source_ip, - }; - if let Some(route_entry) = routing_table.clone().table.get_mut(route_key) { - let should_be_selected = Router::set_incoming_update_selected(routing_table, route_key.clone(), metric); - route_entry.update(update, should_be_selected); - } - else { - let source_key = SourceKey { - prefix, - plen, - router_id, - }; - let feas_dist = FeasibilityDistance(metric, seqno); - - // create the source table entry - source_table.insert(source_key.clone(), feas_dist); - - // now we can create the routing table entry - let route_key = RouteKey { - prefix, // prefix is peer that ANNOUNCED the route - plen, - neighbor: update.src_overlay_ip, - }; - - let peer = router.get_peer_by_ip(update.src_overlay_ip).unwrap(); - let mut route_entry = RouteEntry::new( - source_key, - peer.clone(), - metric, - seqno, - update.src_overlay_ip, - true, // for new entries the route is always selected - router, - ); - - - // create the routing table entry - routing_table.insert(route_key, route_entry); - } - } - _ => {} - } - } else { - println!("incoming update is NOT feasible"); - // received update is not feasible - // unselect to route if it was selected - } - } - - // this function shoud check if an incoming update should be selected or not - pub fn set_incoming_update_selected(routing_table: &mut RoutingTable, route_key: RouteKey, new_metric: u16) -> bool { - - // first check if the routing table has an entry for that key or not, if it has no entry return true - if !routing_table.table.contains_key(&route_key) { - println!("no entry for this route key, returning true"); - return true - } else { - let route_entry = routing_table.table.get_mut(&route_key).unwrap(); - println!("current metric: {}, new metric: {}", route_entry.metric, new_metric); - if route_entry.metric < new_metric { - return false - } - return true - } + pub fn router_seqno(&self) -> u16 { + self.inner.read().unwrap().router_seqno } - - pub fn add_directly_connected_peer(&self, peer: Peer) { - self.peer_interfaces.lock().unwrap().push(peer); + pub fn increment_router_seqno(&self) { + self.inner.write().unwrap().router_seqno += 1; } - pub fn get_node_tun_address(&self) -> IpAddr { - self.node_tun.address().unwrap().into() + pub fn peer_interfaces(&self) -> Vec { + self.inner.read().unwrap().peer_interfaces.clone() } - pub fn get_peer_interfaces(&self) -> Vec { - self.peer_interfaces.lock().unwrap().clone() + pub fn add_peer_interface(&self, peer: Peer) { + self.inner.write().unwrap().peer_interfaces.push(peer); } - pub fn get_peer_by_ip(&self, peer_ip: IpAddr) -> Option { - let peers = self.get_peer_interfaces(); + pub fn remove_peer_interface(&self, peer: Peer) { + let mut peer_interfaces = self.inner.write().unwrap().peer_interfaces.clone(); + peer_interfaces.retain(|p| p != &peer); + self.inner.write().unwrap().peer_interfaces = peer_interfaces; + } + + // pub fn static_routes(&self) -> Vec { + // self.inner.read().unwrap().static_routes.clone() + // } + + pub fn peer_by_ip(&self, peer_ip: IpAddr) -> Option { + let peers = self.peer_interfaces(); let matching_peer = peers.iter().find(|peer| peer.overlay_ip() == peer_ip); matching_peer.map(Clone::clone) } - fn get_source_peer_from_control_struct(&self, control_struct: ControlStruct) -> Peer { - let source = control_struct.src_overlay_ip; + pub fn source_peer_from_control_struct(&self, control_struct: ControlStruct) -> Option { + let peers = self.peer_interfaces(); + let matching_peer = peers + .iter() + .find(|peer| peer.overlay_ip() == control_struct.src_overlay_ip); - self.get_peer_by_ip(source).unwrap() - } - - pub fn print_routes(&self) { - let routing_table = self.routing_table.lock().unwrap(); - for route in routing_table.table.iter() { - println!("Route key: {:?}", route.0); - println!( - "Route: {:?}/{:?} (with next-hop: {:?}, metric: {}, selected: {})", - route.0.prefix, route.0.plen, route.1.next_hop, route.1.metric, route.1.selected - ); - println!("As advertised by: {:?}", route.1.source.router_id); - } - } - - // loop over the directly connected peers and create routing table entries for them - // this is done by looking at the currently set link cost. as the cost gets initialized to 65535, we can use this to check if - // the link cost has been set to a lower value, indicating that the peer is reachable and a routing table entry exits already - //pub async fn initialize_peer_route_entries(self) { - //// we wait for 10 seconds before we start initializing the routing table entries - //// possible optimization: only run this when necassary (e.g. when a new peer is added) - //loop { - //tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - //for peer in self.peer_interfaces.lock().unwrap().iter_mut() { - //// we check for the u16::MAX - 1 value, as this is the value that the link cost is initialized to - ////if peer.link_cost() == u16::MAX - 1 { - //// before we can create a routing table entry, we need to create a source table entry - //let source_key = SourceKey { - //prefix: peer.overlay_ip(), - //plen: 32, // we set the prefix length to 32 for now, this means that each peer is a /32 network (so only route to the peer itself) - //router_id: 0, // we set all router ids to 0 temporarily - //}; - //let feas_dist = FeasibilityDistance(peer.link_cost(), 0); - - //// create the source table entry - //self.source_table - //.lock() - //.unwrap() - //.insert(source_key.clone(), feas_dist); - - //// now we can create the routing table entry - //let route_key = RouteKey { - //prefix: peer.overlay_ip(), - //plen: 32, // we set the prefix length to 32 for now, this means that each peer is a /32 network (so only route to the peer itself) - //neighbor: peer.overlay_ip(), - //}; - - //let seqno = if let Some(re) = self.routing_table.lock().unwrap().remove(&route_key) - //{ - //re.seqno - //} else { - //0 - //}; - //let route_entry = RouteEntry { - //source: source_key, - //neighbor: peer.clone(), - //metric: peer.link_cost(), - //seqno: seqno, // we set the seqno to 0 for now - //next_hop: peer.overlay_ip(), - //selected: true, // set selected always to true for now as we have manually decided the topology to only have p2p links - //}; - //// create the routing table entry - //self.routing_table - //.lock() - //.unwrap() - //.insert(route_key, route_entry); - ////} - //} - //} - //} - - // routing table updates are send periodically to all directly connected peers - // updates are used to advertise new routes or the retract existing routes (retracting when the metric is set to 0xFFFF) - // this function is run when the route_update timer expires - pub async fn propagate_routes(self) { - loop { - // routes are propagated every 3 secs - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - - let mut router_table = self.routing_table.lock().unwrap(); - let mut static_routes = self.static_routes.lock().unwrap(); - let peers = self.peer_interfaces.lock().unwrap(); - - for route in static_routes.iter_mut() { - route.seqno += 1; - for peer in peers.iter() { - let update = ControlPacket::new_update( - route.plen, - UPDATE_INTERVAL as u16, - route.seqno, - peer.link_cost(), - route.prefix, - self.router_id, - ); - - if peer.send_control_packet(update).is_err() { - println!("could not send static route to peer"); - } - } - } - - for (key, entry) in router_table.table.iter_mut() - .filter(|(key, _)| { // Filter out the static routes - for sr in static_routes.iter() { - if key.prefix == sr.prefix && key.plen == sr.plen { - return false; - } - } - true - }) { - entry.seqno += 1; - for peer in peers.iter() { - let link_cost = peer.link_cost(); - - - // DEBUG purposes - if entry.metric > u16::MAX - 1 - link_cost { - println!("SENDING UPDATE WITH METRIC: {}", u16::MAX - 1); - } else { - println!("SENDING UPDATE WITH METRIC: {}", entry.metric + link_cost); - } - - - - let update = ControlPacket::new_update( - key.plen, - UPDATE_INTERVAL as u16, - entry.seqno, - if entry.metric > u16::MAX - 1 - link_cost { - u16::MAX - 1 - } else { - entry.metric + link_cost - }, - key.prefix, - entry.source.router_id, - ); - - if peer.send_control_packet(update).is_err() { - println!("route update packet dropped"); - } - } - } - - - // FILTER VOOR SELECTED ROUTE NODIG --> we sturen nu alle routes maar eignelijk moeten we enkel de selected routes gaan propagaten - } + matching_peer.map(Clone::clone) } pub fn select_best_route(&self, dest_ip: IpAddr) -> Option { + + let inner = self.inner.read().unwrap(); + // first look in the routing table for all routekeys where prefix == dest_ip - let routing_table = self.routing_table.lock().unwrap(); + let routing_table = &inner.routing_table; let mut matching_routes: Vec<&RouteEntry> = Vec::new(); for route in routing_table.table.iter() { @@ -501,4 +176,318 @@ impl Router { None => None, } } + + pub fn add_static_route(&self, static_route: StaticRoute) { + let mut static_routes = self.inner.write().unwrap().static_routes.clone(); + static_routes.push(static_route); + self.inner.write().unwrap().static_routes = static_routes; + } + + pub fn print_routes(&self) { + + let inner = self.inner.read().unwrap(); + + let routing_table = &inner.routing_table; + for route in routing_table.table.iter() { + println!("Route key: {:?}", route.0); + println!( + "Route: {:?}/{:?} (with next-hop: {:?}, metric: {}, selected: {})", + route.0.prefix, route.0.plen, route.1.next_hop, route.1.metric, route.1.selected + ); + println!("As advertised by: {:?}", route.1.source.router_id); + } + } + + async fn handle_incoming_control_packet( + self, + mut router_control_rx: UnboundedReceiver, + ) { + while let Some(control_struct) = router_control_rx.recv().await { + match control_struct.control_packet.body.tlv_type { + BabelTLVType::AckReq => todo!(), + BabelTLVType::Ack => todo!(), + BabelTLVType::Hello => Self::handle_incoming_hello(control_struct), + BabelTLVType::IHU => Self::handle_incoming_ihu(&self, control_struct), + BabelTLVType::NextHop => todo!(), + BabelTLVType::Update => { + //let mut source_table = self.source_table(); + //let mut routing_table = self.routing_table.lock().unwrap(); + Self::handle_incoming_update(&self, control_struct); + } + BabelTLVType::RouteReq => todo!(), + BabelTLVType::SeqnoReq => todo!(), + } + } + } + + fn handle_incoming_hello(control_struct: ControlStruct) { + let destination_ip = control_struct.src_overlay_ip; + control_struct.reply(ControlPacket::new_ihu(IHU_INTERVAL, destination_ip)); + } + + fn handle_incoming_ihu(&self, control_struct: ControlStruct) { + if let Some(mut source_peer) = self.source_peer_from_control_struct(control_struct) { + // reset the IHU timer associated with the peer + source_peer.reset_ihu_timer(tokio::time::Duration::from_secs(IHU_INTERVAL as u64)); + // measure time between Hello and and IHU and set the link cost + let time_diff = tokio::time::Instant::now() + .duration_since(source_peer.time_last_received_hello()) + .as_millis(); + + source_peer.set_link_cost(time_diff as u16); + } + } + + fn handle_incoming_update(&self, update: ControlStruct) { + + let mut inner = self.inner.write().unwrap(); + + if inner.source_table.is_feasible(&update) { + println!("incoming update is feasible"); + inner.source_table.update(&update); + + // get routing table entry for the source of the update + match update.control_packet.body.tlv { + BabelTLV::Update { + plen, + interval: _, + seqno, + metric, + prefix, + router_id, + } => { + println!( + "Received update from {} for {:?} with seqno {} and metric {}", + router_id, prefix, seqno, metric + ); + let source_ip = update.src_overlay_ip; + + // get RouteEntry for the source of the update + let route_key = &RouteKey { + prefix, + plen, + neighbor: source_ip, + }; + if let Some(route_entry) = inner.routing_table.clone().table.get_mut(route_key) + { + // let should_be_selected = Router::set_incoming_update_selected( + // routing_table, + // route_key.clone(), + // metric, + // ); + route_entry.update(update); + } else { + let source_key = SourceKey { + prefix, + plen, + router_id, + }; + let feas_dist = FeasibilityDistance(metric, seqno); + + // create the source table entry + inner.source_table.insert(source_key.clone(), feas_dist); + + // now we can create the routing table entry + let route_key = RouteKey { + prefix, // prefix is peer that ANNOUNCED the route + plen, + neighbor: update.src_overlay_ip, + }; + + let peer = self.peer_by_ip(update.src_overlay_ip).unwrap(); + let mut route_entry = RouteEntry::new( + source_key, + peer.clone(), + metric, + seqno, + update.src_overlay_ip, + true, // for new entries the route is always selected + ); + + // create the routing table entry + inner.routing_table.insert(route_key, route_entry); + } + } + _ => {} + } + } else { + println!("incoming update is NOT feasible"); + // received update is not feasible + // unselect to route if it was selected + } + } + + // fn set_incoming_update_selected( + // routing_table: &mut RoutingTable, + // route_key: RouteKey, + // new_metric: u16, + // ) -> bool { + // // first check if the routing table has an entry for that key or not, if it has no entry return true + // if !routing_table.table.contains_key(&route_key) { + // println!("no entry for this route key, returning true"); + // return true; + // } else { + // let route_entry = routing_table.table.get_mut(&route_key).unwrap(); + // println!( + // "current metric: {}, new metric: {}", + // route_entry.metric, new_metric + // ); + // if route_entry.metric < new_metric { + // return false; + // } + // return true; + // } + // } + + async fn handle_incoming_data_packet(self, mut router_data_rx: UnboundedReceiver) { + // If destination IP of packet is same as TUN interface IP, send to TUN interface + // If destination IP of packet is not same as TUN interface IP, send to peer with matching overlay IP + let node_tun_addr = self.node_tun_addr(); + loop { + while let Some(data_packet) = router_data_rx.recv().await { + match data_packet.dest_ip { + x if x == node_tun_addr => { + match self.node_tun().send(&data_packet.raw_data).await { + Ok(_) => {} + Err(e) => { + eprintln!("Error sending data packet to TUN interface: {:?}", e) + } + } + } + _ => { + let best_route = self.select_best_route(IpAddr::V4(data_packet.dest_ip)); + let peer = self.peer_by_ip(best_route.unwrap().next_hop).unwrap(); + if let Err(e) = peer.send_data_packet(data_packet) { + eprintln!("Error sending data packet to peer: {:?}", e); + } + } + } + } + } + } + + // routing table updates are send periodically to all directly connected peers + // updates are used to advertise new routes or the retract existing routes (retracting when the metric is set to 0xFFFF) + // this function is run when the route_update timer expires + pub async fn propagate_routes(self) { + + + loop { + // routes are propagated every 3 secs + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + let mut inner = self.inner.write().unwrap(); + + let peers = inner.peer_interfaces.clone(); + for route in inner.static_routes.iter_mut() { + route.seqno += 1; + for peer in peers.iter() { + let update = ControlPacket::new_update( + route.plen, + UPDATE_INTERVAL as u16, + route.seqno, + peer.link_cost(), + route.prefix, + self.router_id(), + ); + + if peer.send_control_packet(update).is_err() { + println!("could not send static route to peer"); + } + } + } + + let static_routes = inner.static_routes.clone(); + for (key, entry) in inner.routing_table.table.iter_mut().filter(|(key, _)| { + // Filter out the static routes + for sr in static_routes.iter() { + if key.prefix == sr.prefix && key.plen == sr.plen { + return false; + } + } + true + }) { + entry.seqno += 1; + for peer in peers.iter() { + let link_cost = peer.link_cost(); + + // DEBUG purposes + if entry.metric > u16::MAX - 1 - link_cost { + println!("SENDING UPDATE WITH METRIC: {}", u16::MAX - 1); + } else { + println!("SENDING UPDATE WITH METRIC: {}", entry.metric + link_cost); + } + + let update = ControlPacket::new_update( + key.plen, + UPDATE_INTERVAL as u16, + entry.seqno, + if entry.metric > u16::MAX - 1 - link_cost { + u16::MAX - 1 + } else { + entry.metric + link_cost + }, + key.prefix, + entry.source.router_id, + ); + + if peer.send_control_packet(update).is_err() { + println!("route update packet dropped"); + } + } + } + + // FILTER VOOR SELECTED ROUTE NODIG --> we sturen nu alle routes maar eignelijk moeten we enkel de selected routes gaan propagaten + } + } + + async fn start_periodic_hello_sender(self) { + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(HELLO_INTERVAL as u64)).await; + + for peer in self.peer_interfaces().iter_mut() { + let hello = ControlPacket::new_hello(peer, HELLO_INTERVAL); + peer.set_time_last_received_hello(tokio::time::Instant::now()); + + if let Err(error) = peer.send_control_packet(hello) { + eprintln!("Error sending hello to peer: {}", error); + } + } + } + } +} + +pub struct RouterInner { + pub router_id: u64, + peer_interfaces: Vec, + router_control_tx: UnboundedSender, + router_data_tx: UnboundedSender, + node_tun: Arc, + routing_table: RoutingTable, + source_table: SourceTable, + router_seqno: u16, + static_routes: Vec, +} + +impl RouterInner { + pub fn new( + node_tun: Arc, + static_routes: Vec, + router_data_tx: UnboundedSender, + router_control_tx: UnboundedSender, + ) -> Result> { + let router_inner = RouterInner { + router_id: rand::thread_rng().gen(), + peer_interfaces: Vec::new(), + router_control_tx, + router_data_tx, + node_tun: node_tun, + routing_table: RoutingTable::new(), + source_table: SourceTable::new(), + router_seqno: 0, + static_routes: static_routes, + }; + + Ok(router_inner) + } } diff --git a/src/routing_table.rs b/src/routing_table.rs index becc0c3..028e6e5 100644 --- a/src/routing_table.rs +++ b/src/routing_table.rs @@ -34,7 +34,6 @@ impl RouteEntry { seqno: u16, next_hop: IpAddr, selected: bool, - router: Router, ) -> Self { Self { source, @@ -46,13 +45,12 @@ impl RouteEntry { } } - pub fn update(&mut self, update: ControlStruct, should_be_selected: bool) { + pub fn update(&mut self, update: ControlStruct) { // the update is assumed to be feasible here match update.control_packet.body.tlv { BabelTLV::Update { seqno, metric, .. } => { self.metric = metric; self.seqno = seqno; - self.selected = should_be_selected; } _ => { panic!("Received update with invalid TLV"); diff --git a/src/source_table.rs b/src/source_table.rs index 03f7455..29e9fc7 100644 --- a/src/source_table.rs +++ b/src/source_table.rs @@ -13,6 +13,8 @@ pub struct SourceKey { pub struct FeasibilityDistance(pub u16, pub u16); // (metric, seqno) // Store (prefix, plen, router_id) -> feasibility distance mapping + +#[derive(Debug)] pub struct SourceTable { pub table: HashMap, }