router refactor

This commit is contained in:
Maxime Van Hees
2023-05-15 14:58:14 +00:00
parent 0bce73c7de
commit e03b35a956
7 changed files with 425 additions and 421 deletions
+14 -8
View File
@@ -52,10 +52,17 @@ async fn main() -> Result<(), Box<dyn Error>> {
let static_peers = cli.static_peers;
// Creating a new Router instance
let router = Arc::new(router::Router::new(
node_tun.clone(),
vec![StaticRoute::new(cli.tun_addr.into())],
));
let router = match router::Router::new(node_tun.clone(), vec![StaticRoute::new(cli.tun_addr.into())]) {
Ok(router) => {
println!("Router created");
router
}
Err(e) => {
panic!("Error creating router: {}", e);
}
};
// Creating a new PeerManager instance
let _peer_manager: peer_manager::PeerManager =
peer_manager::PeerManager::new(router.clone(), static_peers);
@@ -63,8 +70,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Read packets from the TUN interface (originating from the kernel) and send them to the router
// Note: we will never receive control packets from the kernel, only data packets
{
let node_tun = node_tun.clone();
let router = router.clone();
let node_tun = node_tun.clone();
tokio::spawn(async move {
loop {
@@ -97,7 +104,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
dest_ip: dest_addr,
raw_data: buf.to_vec(),
};
if router.router_data_tx.send(data_packet).is_err() {
if router.router_data_tx().send(data_packet).is_err() {
eprintln!("Failed to send data_packet");
}
} else {
@@ -109,7 +116,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut reader = tokio::io::BufReader::new(tokio::io::stdin());
let mut line = String::new();
let router = router.clone();
let read_handle = tokio::spawn(async move {
loop {
@@ -123,7 +129,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
router.print_routes();
println!("\n----------- Current peers: -----------");
for p in router.get_peer_interfaces() {
for p in router.peer_interfaces() {
println!(
"Peer: {:?}, with link cost: {}",
p.overlay_ip(),
+2 -1
View File
@@ -1,5 +1,6 @@
use futures::stream::TryStreamExt;
use rtnetlink::Handle;
use core::fmt;
use std::{error::Error, net::Ipv4Addr, sync::Arc};
use tokio_tun::{Tun, TunBuilder};
@@ -61,4 +62,4 @@ pub async fn setup_node(tun_addr: Ipv4Addr) -> Result<Arc<Tun>, Box<dyn Error>>
println!("Static route created");
Ok(tun)
}
}
+8
View File
@@ -106,6 +106,14 @@ impl Peer {
}
}
impl PartialEq for Peer {
fn eq(&self, other: &Self) -> bool {
self.overlay_ip() == other.overlay_ip()
}
}
#[derive(Debug)]
struct PeerInner {
stream_ip: IpAddr,
+15 -15
View File
@@ -16,11 +16,11 @@ struct PeersConfig {
#[derive(Clone)]
pub struct PeerManager {
pub router: Arc<Router>,
pub router: Router,
}
impl PeerManager {
pub fn new(router: Arc<Router>, static_peers_sockets: Vec<SocketAddr>) -> Self {
pub fn new(router: Router, static_peers_sockets: Vec<SocketAddr>) -> Self {
let peer_manager = PeerManager { router };
// Start a TCP listener. When a new connection is accepted, the reverse peer exchange is performed.
@@ -63,7 +63,7 @@ impl PeerManager {
);
let mut buf = [0u8; 17];
match self.router.get_node_tun_address() {
match self.router.node_tun_addr() {
IpAddr::V4(tun_addr) => {
buf[0] = 0;
buf[1..5].copy_from_slice(&tun_addr.octets()[..]);
@@ -78,12 +78,12 @@ impl PeerManager {
let peer_stream_ip = peer_addr.ip();
if let Ok(new_peer) = Peer::new(
peer_stream_ip,
self.router.router_data_tx.clone(),
self.router.router_control_tx.clone(),
self.router.router_data_tx(),
self.router.router_control_tx(),
peer_stream,
received_overlay_ip,
) {
self.router.add_directly_connected_peer(new_peer);
self.router.add_peer_interface(new_peer);
}
}
}
@@ -120,7 +120,7 @@ impl PeerManager {
let mut buf = [0u8; 17];
match self.router.get_node_tun_address() {
match self.router.node_tun_addr() {
IpAddr::V4(tun_addr) => {
buf[0] = 0;
buf[1..5].copy_from_slice(&tun_addr.octets()[..]);
@@ -136,12 +136,12 @@ impl PeerManager {
let peer_stream_ip = peer_addr.ip();
if let Ok(new_peer) = Peer::new(
peer_stream_ip,
self.router.router_data_tx.clone(),
self.router.router_control_tx.clone(),
self.router.router_data_tx(),
self.router.router_control_tx(),
peer_stream,
received_overlay_ip,
) {
self.router.add_directly_connected_peer(new_peer);
self.router.add_peer_interface(new_peer);
}
}
}
@@ -165,14 +165,14 @@ impl PeerManager {
}
}
async fn start_reverse_peer_exchange(mut stream: TcpStream, router: Arc<Router>) {
async fn start_reverse_peer_exchange(mut stream: TcpStream, router: Router) {
// Steps:
// 1. Send own TUN address over the stream
// 2. Read other node's TUN address from the stream
let mut buf = [0u8; 17];
match router.get_node_tun_address() {
match router.node_tun_addr() {
IpAddr::V4(tun_addr) => {
buf[0] = 0;
buf[1..5].copy_from_slice(&tun_addr.octets()[..]);
@@ -203,14 +203,14 @@ impl PeerManager {
let peer_stream_ip = stream.peer_addr().unwrap().ip();
let new_peer = Peer::new(
peer_stream_ip,
router.router_data_tx.clone(),
router.router_control_tx.clone(),
router.router_data_tx(),
router.router_control_tx(),
stream,
received_overlay_ip,
);
match new_peer {
Ok(new_peer) => {
router.add_directly_connected_peer(new_peer);
router.add_peer_interface(new_peer);
}
Err(e) => {
eprintln!("Error creating peer: {}", e);
+383 -394
View File
@@ -1,18 +1,15 @@
use crate::{
packet::{
BabelPacketBody, BabelPacketHeader, BabelTLV, BabelTLVType, ControlPacket, ControlStruct,
DataPacket,
},
packet::{BabelTLV, BabelTLVType, ControlPacket, ControlStruct, DataPacket},
peer::Peer,
routing_table::{self, RouteEntry, RouteKey, RoutingTable},
source_table::{self, FeasibilityDistance, SourceKey, SourceTable},
timers::{self, Timer},
routing_table::{RouteEntry, RouteKey, RoutingTable},
source_table::{FeasibilityDistance, SourceKey, SourceTable},
};
use rand::Rng;
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
sync::{Arc, Mutex},
error::Error,
fmt::Debug,
net::IpAddr,
sync::{Arc, RwLock},
};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio_tun::Tun;
@@ -40,440 +37,118 @@ impl StaticRoute {
#[derive(Clone)]
pub struct Router {
pub router_id: u64,
// The peer interfaces are the known neighbors of this node
peer_interfaces: Arc<Mutex<Vec<Peer>>>,
pub router_control_tx: UnboundedSender<ControlStruct>,
pub router_data_tx: UnboundedSender<DataPacket>,
pub node_tun: Arc<Tun>,
pub routing_table: Arc<Mutex<RoutingTable>>,
pub source_table: Arc<Mutex<SourceTable>>,
pub router_seqno: u16,
static_routes: Arc<Mutex<Vec<StaticRoute>>>,
inner: Arc<RwLock<RouterInner>>,
}
impl Router {
pub fn new(node_tun: Arc<Tun>, static_routes: Vec<StaticRoute>) -> Self {
pub fn new(
node_tun: Arc<Tun>,
static_routes: Vec<StaticRoute>,
) -> Result<Self, Box<dyn Error>> {
// Tx is passed onto each new peer instance. This enables peers to send control packets to the router.
let (router_control_tx, router_control_rx) = mpsc::unbounded_channel::<ControlStruct>();
// Tx is passed onto each new peer instance. This enables peers to send data packets to the router.
let (router_data_tx, router_data_rx) = mpsc::unbounded_channel::<DataPacket>();
let router = Router {
router_id: rand::thread_rng().gen(),
peer_interfaces: Arc::new(Mutex::new(Vec::new())),
router_control_tx,
router_data_tx: router_data_tx.clone(),
node_tun,
routing_table: Arc::new(Mutex::new(RoutingTable::new())),
source_table: Arc::new(Mutex::new(SourceTable::new())),
router_seqno: 0,
static_routes: Arc::new(Mutex::new(static_routes)),
inner: Arc::new(RwLock::new(RouterInner::new(
node_tun,
static_routes,
router_data_tx,
router_control_tx,
)?)),
};
tokio::spawn(Router::start_periodic_hello_sender(router.clone()));
tokio::spawn(Router::handle_incoming_control_packet(
router.clone(),
router_control_rx,
));
tokio::spawn(Router::handle_incoming_data_packets(
tokio::spawn(Router::handle_incoming_data_packet(
router.clone(),
router_data_rx,
router_data_tx.clone(),
));
tokio::spawn(Router::start_periodic_hello_sender(router.clone()));
// loops over all peers and adds routing table entries for each peer
//tokio::spawn(Router::initialize_peer_route_entries(router.clone()));
// propagate routes
tokio::spawn(Router::propagate_routes(router.clone()));
router
Ok(router)
}
async fn handle_incoming_control_packet(
self,
mut router_control_rx: UnboundedReceiver<ControlStruct>,
) {
loop {
while let Some(control_struct) = router_control_rx.recv().await {
match control_struct.control_packet.body.tlv_type {
BabelTLVType::AckReq => todo!(),
BabelTLVType::Ack => todo!(),
BabelTLVType::Hello => Self::handle_incoming_hello(control_struct),
BabelTLVType::IHU => Self::handle_incoming_ihu(&self, control_struct),
BabelTLVType::NextHop => todo!(),
BabelTLVType::Update => {
let mut source_table = self.source_table.lock().unwrap();
let mut routing_table = self.routing_table.lock().unwrap();
Self::handle_incoming_update(
self.clone(),
&mut source_table,
&mut routing_table,
control_struct,
);
}
BabelTLVType::RouteReq => todo!(),
BabelTLVType::SeqnoReq => todo!(),
}
}
}
// pub fn source_table(&self) -> SourceTable {
// self.inner.read().unwrap().source_table
// }
// pub fn routing_table(&self) -> RoutingTable {
// self.inner.read().unwrap().routing_table
// }
pub fn router_id(&self) -> u64 {
self.inner.read().unwrap().router_id
}
async fn handle_incoming_data_packets(
self,
mut router_data_rx: UnboundedReceiver<DataPacket>,
router_data_tx: UnboundedSender<DataPacket>,
) {
// If the destination IP of the data packet matches with the IP address of this node's TUN interface
// we should forward the data packet towards the TUN interface.
// If the destination IP doesn't match, we need to lookup if we have a matching peer instance
// where the destination IP matches with the peer's overlay IP. If we do, we should forward the
// data packet to the peer's to_peer_data channel.
let tun_addr = self.node_tun.address().unwrap();
loop {
while let Some(data_packet) = router_data_rx.recv().await {
let dest_ip = data_packet.dest_ip;
println!(
"Received data packet with destination: {}",
data_packet.dest_ip
);
if dest_ip == tun_addr {
match self.node_tun.send(&data_packet.raw_data).await {
Ok(_) => {}
Err(e) => eprintln!("Error sending data packet to TUN interface: {:?}", e),
}
} else {
// router_data_tx.send(data_packet).unwrap();
// DO BABEL route selection
// kijke nr next-hop en gwn sturenn naar de peer die daarmee overeenkomt
// select the best route towards the destination
let best_route = self.select_best_route(IpAddr::V4(dest_ip));
// get the peer corresponding to the best the best route
let peer = self.get_peer_by_ip(best_route.unwrap().next_hop).unwrap();
if let Err(e) = peer.send_data_packet(data_packet) {
eprintln!("Error sending data packet to peer: {:?}", e);
}
}
}
}
pub fn router_control_tx(&self) -> UnboundedSender<ControlStruct> {
self.inner.read().unwrap().router_control_tx.clone()
}
async fn start_periodic_hello_sender(self) {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(HELLO_INTERVAL as u64)).await;
for peer in self.peer_interfaces.lock().unwrap().iter_mut() {
// create a new hello packet for this peer
let hello = ControlPacket::new_hello(peer, HELLO_INTERVAL);
// set the last hello timestamp for this peer
peer.set_time_last_received_hello(tokio::time::Instant::now());
// send the hello packet to the peer
println!("Sending hello to peer: {:?}", peer.overlay_ip());
if let Err(error) = peer.send_control_packet(hello) {
eprintln!("Error sending hello to peer: {}", error);
}
}
}
pub fn router_data_tx(&self) -> UnboundedSender<DataPacket> {
self.inner.read().unwrap().router_data_tx.clone()
}
fn handle_incoming_hello(control_struct: ControlStruct) {
let destination_ip = control_struct.src_overlay_ip;
control_struct.reply(ControlPacket::new_ihu(IHU_INTERVAL, destination_ip));
pub fn node_tun_addr(&self) -> IpAddr {
IpAddr::V4(self.inner.read().unwrap().node_tun.address().unwrap())
}
fn handle_incoming_ihu(&self, control_struct: ControlStruct) {
let mut source_peer = self.get_source_peer_from_control_struct(control_struct);
// reset the IHU timer associated with the peer
source_peer.reset_ihu_timer(tokio::time::Duration::from_secs(IHU_INTERVAL as u64));
// measure time between Hello and and IHU and set the link cost
let time_diff = tokio::time::Instant::now()
.duration_since(source_peer.time_last_received_hello())
.as_millis();
source_peer.set_link_cost(time_diff as u16);
println!(
"Link cost for peer {:?} set to {}",
source_peer.overlay_ip(),
source_peer.link_cost()
);
println!("IHU timer for peer {:?} reset", source_peer.overlay_ip());
pub fn node_tun(&self) -> Arc<Tun> {
self.inner.read().unwrap().node_tun.clone()
}
fn handle_incoming_update(
router: Router,
source_table: &mut SourceTable,
routing_table: &mut RoutingTable,
update: ControlStruct,
) {
if source_table.is_feasible(&update) {
println!("incoming update is feasible");
source_table.update(&update);
// get routing table entry for the source of the update
match update.control_packet.body.tlv {
BabelTLV::Update {
plen,
interval: _,
seqno,
metric,
prefix,
router_id,
} => {
println!(
"Received update from {} for {:?} with seqno {} and metric {}",
router_id, prefix, seqno, metric
);
let source_ip = update.src_overlay_ip;
// get RouteEntry for the source of the update
let route_key = &RouteKey {
prefix,
plen,
neighbor: source_ip,
};
if let Some(route_entry) = routing_table.clone().table.get_mut(route_key) {
let should_be_selected = Router::set_incoming_update_selected(routing_table, route_key.clone(), metric);
route_entry.update(update, should_be_selected);
}
else {
let source_key = SourceKey {
prefix,
plen,
router_id,
};
let feas_dist = FeasibilityDistance(metric, seqno);
// create the source table entry
source_table.insert(source_key.clone(), feas_dist);
// now we can create the routing table entry
let route_key = RouteKey {
prefix, // prefix is peer that ANNOUNCED the route
plen,
neighbor: update.src_overlay_ip,
};
let peer = router.get_peer_by_ip(update.src_overlay_ip).unwrap();
let mut route_entry = RouteEntry::new(
source_key,
peer.clone(),
metric,
seqno,
update.src_overlay_ip,
true, // for new entries the route is always selected
router,
);
// create the routing table entry
routing_table.insert(route_key, route_entry);
}
}
_ => {}
}
} else {
println!("incoming update is NOT feasible");
// received update is not feasible
// unselect to route if it was selected
}
}
// this function shoud check if an incoming update should be selected or not
pub fn set_incoming_update_selected(routing_table: &mut RoutingTable, route_key: RouteKey, new_metric: u16) -> bool {
// first check if the routing table has an entry for that key or not, if it has no entry return true
if !routing_table.table.contains_key(&route_key) {
println!("no entry for this route key, returning true");
return true
} else {
let route_entry = routing_table.table.get_mut(&route_key).unwrap();
println!("current metric: {}, new metric: {}", route_entry.metric, new_metric);
if route_entry.metric < new_metric {
return false
}
return true
}
pub fn router_seqno(&self) -> u16 {
self.inner.read().unwrap().router_seqno
}
pub fn add_directly_connected_peer(&self, peer: Peer) {
self.peer_interfaces.lock().unwrap().push(peer);
pub fn increment_router_seqno(&self) {
self.inner.write().unwrap().router_seqno += 1;
}
pub fn get_node_tun_address(&self) -> IpAddr {
self.node_tun.address().unwrap().into()
pub fn peer_interfaces(&self) -> Vec<Peer> {
self.inner.read().unwrap().peer_interfaces.clone()
}
pub fn get_peer_interfaces(&self) -> Vec<Peer> {
self.peer_interfaces.lock().unwrap().clone()
pub fn add_peer_interface(&self, peer: Peer) {
self.inner.write().unwrap().peer_interfaces.push(peer);
}
pub fn get_peer_by_ip(&self, peer_ip: IpAddr) -> Option<Peer> {
let peers = self.get_peer_interfaces();
pub fn remove_peer_interface(&self, peer: Peer) {
let mut peer_interfaces = self.inner.write().unwrap().peer_interfaces.clone();
peer_interfaces.retain(|p| p != &peer);
self.inner.write().unwrap().peer_interfaces = peer_interfaces;
}
// pub fn static_routes(&self) -> Vec<StaticRoute> {
// self.inner.read().unwrap().static_routes.clone()
// }
pub fn peer_by_ip(&self, peer_ip: IpAddr) -> Option<Peer> {
let peers = self.peer_interfaces();
let matching_peer = peers.iter().find(|peer| peer.overlay_ip() == peer_ip);
matching_peer.map(Clone::clone)
}
fn get_source_peer_from_control_struct(&self, control_struct: ControlStruct) -> Peer {
let source = control_struct.src_overlay_ip;
pub fn source_peer_from_control_struct(&self, control_struct: ControlStruct) -> Option<Peer> {
let peers = self.peer_interfaces();
let matching_peer = peers
.iter()
.find(|peer| peer.overlay_ip() == control_struct.src_overlay_ip);
self.get_peer_by_ip(source).unwrap()
}
pub fn print_routes(&self) {
let routing_table = self.routing_table.lock().unwrap();
for route in routing_table.table.iter() {
println!("Route key: {:?}", route.0);
println!(
"Route: {:?}/{:?} (with next-hop: {:?}, metric: {}, selected: {})",
route.0.prefix, route.0.plen, route.1.next_hop, route.1.metric, route.1.selected
);
println!("As advertised by: {:?}", route.1.source.router_id);
}
}
// loop over the directly connected peers and create routing table entries for them
// this is done by looking at the currently set link cost. as the cost gets initialized to 65535, we can use this to check if
// the link cost has been set to a lower value, indicating that the peer is reachable and a routing table entry exits already
//pub async fn initialize_peer_route_entries(self) {
//// we wait for 10 seconds before we start initializing the routing table entries
//// possible optimization: only run this when necassary (e.g. when a new peer is added)
//loop {
//tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
//for peer in self.peer_interfaces.lock().unwrap().iter_mut() {
//// we check for the u16::MAX - 1 value, as this is the value that the link cost is initialized to
////if peer.link_cost() == u16::MAX - 1 {
//// before we can create a routing table entry, we need to create a source table entry
//let source_key = SourceKey {
//prefix: peer.overlay_ip(),
//plen: 32, // we set the prefix length to 32 for now, this means that each peer is a /32 network (so only route to the peer itself)
//router_id: 0, // we set all router ids to 0 temporarily
//};
//let feas_dist = FeasibilityDistance(peer.link_cost(), 0);
//// create the source table entry
//self.source_table
//.lock()
//.unwrap()
//.insert(source_key.clone(), feas_dist);
//// now we can create the routing table entry
//let route_key = RouteKey {
//prefix: peer.overlay_ip(),
//plen: 32, // we set the prefix length to 32 for now, this means that each peer is a /32 network (so only route to the peer itself)
//neighbor: peer.overlay_ip(),
//};
//let seqno = if let Some(re) = self.routing_table.lock().unwrap().remove(&route_key)
//{
//re.seqno
//} else {
//0
//};
//let route_entry = RouteEntry {
//source: source_key,
//neighbor: peer.clone(),
//metric: peer.link_cost(),
//seqno: seqno, // we set the seqno to 0 for now
//next_hop: peer.overlay_ip(),
//selected: true, // set selected always to true for now as we have manually decided the topology to only have p2p links
//};
//// create the routing table entry
//self.routing_table
//.lock()
//.unwrap()
//.insert(route_key, route_entry);
////}
//}
//}
//}
// routing table updates are send periodically to all directly connected peers
// updates are used to advertise new routes or the retract existing routes (retracting when the metric is set to 0xFFFF)
// this function is run when the route_update timer expires
pub async fn propagate_routes(self) {
loop {
// routes are propagated every 3 secs
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let mut router_table = self.routing_table.lock().unwrap();
let mut static_routes = self.static_routes.lock().unwrap();
let peers = self.peer_interfaces.lock().unwrap();
for route in static_routes.iter_mut() {
route.seqno += 1;
for peer in peers.iter() {
let update = ControlPacket::new_update(
route.plen,
UPDATE_INTERVAL as u16,
route.seqno,
peer.link_cost(),
route.prefix,
self.router_id,
);
if peer.send_control_packet(update).is_err() {
println!("could not send static route to peer");
}
}
}
for (key, entry) in router_table.table.iter_mut()
.filter(|(key, _)| { // Filter out the static routes
for sr in static_routes.iter() {
if key.prefix == sr.prefix && key.plen == sr.plen {
return false;
}
}
true
}) {
entry.seqno += 1;
for peer in peers.iter() {
let link_cost = peer.link_cost();
// DEBUG purposes
if entry.metric > u16::MAX - 1 - link_cost {
println!("SENDING UPDATE WITH METRIC: {}", u16::MAX - 1);
} else {
println!("SENDING UPDATE WITH METRIC: {}", entry.metric + link_cost);
}
let update = ControlPacket::new_update(
key.plen,
UPDATE_INTERVAL as u16,
entry.seqno,
if entry.metric > u16::MAX - 1 - link_cost {
u16::MAX - 1
} else {
entry.metric + link_cost
},
key.prefix,
entry.source.router_id,
);
if peer.send_control_packet(update).is_err() {
println!("route update packet dropped");
}
}
}
// FILTER VOOR SELECTED ROUTE NODIG --> we sturen nu alle routes maar eignelijk moeten we enkel de selected routes gaan propagaten
}
matching_peer.map(Clone::clone)
}
pub fn select_best_route(&self, dest_ip: IpAddr) -> Option<RouteEntry> {
let inner = self.inner.read().unwrap();
// first look in the routing table for all routekeys where prefix == dest_ip
let routing_table = self.routing_table.lock().unwrap();
let routing_table = &inner.routing_table;
let mut matching_routes: Vec<&RouteEntry> = Vec::new();
for route in routing_table.table.iter() {
@@ -501,4 +176,318 @@ impl Router {
None => None,
}
}
pub fn add_static_route(&self, static_route: StaticRoute) {
let mut static_routes = self.inner.write().unwrap().static_routes.clone();
static_routes.push(static_route);
self.inner.write().unwrap().static_routes = static_routes;
}
pub fn print_routes(&self) {
let inner = self.inner.read().unwrap();
let routing_table = &inner.routing_table;
for route in routing_table.table.iter() {
println!("Route key: {:?}", route.0);
println!(
"Route: {:?}/{:?} (with next-hop: {:?}, metric: {}, selected: {})",
route.0.prefix, route.0.plen, route.1.next_hop, route.1.metric, route.1.selected
);
println!("As advertised by: {:?}", route.1.source.router_id);
}
}
async fn handle_incoming_control_packet(
self,
mut router_control_rx: UnboundedReceiver<ControlStruct>,
) {
while let Some(control_struct) = router_control_rx.recv().await {
match control_struct.control_packet.body.tlv_type {
BabelTLVType::AckReq => todo!(),
BabelTLVType::Ack => todo!(),
BabelTLVType::Hello => Self::handle_incoming_hello(control_struct),
BabelTLVType::IHU => Self::handle_incoming_ihu(&self, control_struct),
BabelTLVType::NextHop => todo!(),
BabelTLVType::Update => {
//let mut source_table = self.source_table();
//let mut routing_table = self.routing_table.lock().unwrap();
Self::handle_incoming_update(&self, control_struct);
}
BabelTLVType::RouteReq => todo!(),
BabelTLVType::SeqnoReq => todo!(),
}
}
}
fn handle_incoming_hello(control_struct: ControlStruct) {
let destination_ip = control_struct.src_overlay_ip;
control_struct.reply(ControlPacket::new_ihu(IHU_INTERVAL, destination_ip));
}
fn handle_incoming_ihu(&self, control_struct: ControlStruct) {
if let Some(mut source_peer) = self.source_peer_from_control_struct(control_struct) {
// reset the IHU timer associated with the peer
source_peer.reset_ihu_timer(tokio::time::Duration::from_secs(IHU_INTERVAL as u64));
// measure time between Hello and and IHU and set the link cost
let time_diff = tokio::time::Instant::now()
.duration_since(source_peer.time_last_received_hello())
.as_millis();
source_peer.set_link_cost(time_diff as u16);
}
}
fn handle_incoming_update(&self, update: ControlStruct) {
let mut inner = self.inner.write().unwrap();
if inner.source_table.is_feasible(&update) {
println!("incoming update is feasible");
inner.source_table.update(&update);
// get routing table entry for the source of the update
match update.control_packet.body.tlv {
BabelTLV::Update {
plen,
interval: _,
seqno,
metric,
prefix,
router_id,
} => {
println!(
"Received update from {} for {:?} with seqno {} and metric {}",
router_id, prefix, seqno, metric
);
let source_ip = update.src_overlay_ip;
// get RouteEntry for the source of the update
let route_key = &RouteKey {
prefix,
plen,
neighbor: source_ip,
};
if let Some(route_entry) = inner.routing_table.clone().table.get_mut(route_key)
{
// let should_be_selected = Router::set_incoming_update_selected(
// routing_table,
// route_key.clone(),
// metric,
// );
route_entry.update(update);
} else {
let source_key = SourceKey {
prefix,
plen,
router_id,
};
let feas_dist = FeasibilityDistance(metric, seqno);
// create the source table entry
inner.source_table.insert(source_key.clone(), feas_dist);
// now we can create the routing table entry
let route_key = RouteKey {
prefix, // prefix is peer that ANNOUNCED the route
plen,
neighbor: update.src_overlay_ip,
};
let peer = self.peer_by_ip(update.src_overlay_ip).unwrap();
let mut route_entry = RouteEntry::new(
source_key,
peer.clone(),
metric,
seqno,
update.src_overlay_ip,
true, // for new entries the route is always selected
);
// create the routing table entry
inner.routing_table.insert(route_key, route_entry);
}
}
_ => {}
}
} else {
println!("incoming update is NOT feasible");
// received update is not feasible
// unselect to route if it was selected
}
}
// fn set_incoming_update_selected(
// routing_table: &mut RoutingTable,
// route_key: RouteKey,
// new_metric: u16,
// ) -> bool {
// // first check if the routing table has an entry for that key or not, if it has no entry return true
// if !routing_table.table.contains_key(&route_key) {
// println!("no entry for this route key, returning true");
// return true;
// } else {
// let route_entry = routing_table.table.get_mut(&route_key).unwrap();
// println!(
// "current metric: {}, new metric: {}",
// route_entry.metric, new_metric
// );
// if route_entry.metric < new_metric {
// return false;
// }
// return true;
// }
// }
async fn handle_incoming_data_packet(self, mut router_data_rx: UnboundedReceiver<DataPacket>) {
// If destination IP of packet is same as TUN interface IP, send to TUN interface
// If destination IP of packet is not same as TUN interface IP, send to peer with matching overlay IP
let node_tun_addr = self.node_tun_addr();
loop {
while let Some(data_packet) = router_data_rx.recv().await {
match data_packet.dest_ip {
x if x == node_tun_addr => {
match self.node_tun().send(&data_packet.raw_data).await {
Ok(_) => {}
Err(e) => {
eprintln!("Error sending data packet to TUN interface: {:?}", e)
}
}
}
_ => {
let best_route = self.select_best_route(IpAddr::V4(data_packet.dest_ip));
let peer = self.peer_by_ip(best_route.unwrap().next_hop).unwrap();
if let Err(e) = peer.send_data_packet(data_packet) {
eprintln!("Error sending data packet to peer: {:?}", e);
}
}
}
}
}
}
// routing table updates are send periodically to all directly connected peers
// updates are used to advertise new routes or the retract existing routes (retracting when the metric is set to 0xFFFF)
// this function is run when the route_update timer expires
pub async fn propagate_routes(self) {
loop {
// routes are propagated every 3 secs
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let mut inner = self.inner.write().unwrap();
let peers = inner.peer_interfaces.clone();
for route in inner.static_routes.iter_mut() {
route.seqno += 1;
for peer in peers.iter() {
let update = ControlPacket::new_update(
route.plen,
UPDATE_INTERVAL as u16,
route.seqno,
peer.link_cost(),
route.prefix,
self.router_id(),
);
if peer.send_control_packet(update).is_err() {
println!("could not send static route to peer");
}
}
}
let static_routes = inner.static_routes.clone();
for (key, entry) in inner.routing_table.table.iter_mut().filter(|(key, _)| {
// Filter out the static routes
for sr in static_routes.iter() {
if key.prefix == sr.prefix && key.plen == sr.plen {
return false;
}
}
true
}) {
entry.seqno += 1;
for peer in peers.iter() {
let link_cost = peer.link_cost();
// DEBUG purposes
if entry.metric > u16::MAX - 1 - link_cost {
println!("SENDING UPDATE WITH METRIC: {}", u16::MAX - 1);
} else {
println!("SENDING UPDATE WITH METRIC: {}", entry.metric + link_cost);
}
let update = ControlPacket::new_update(
key.plen,
UPDATE_INTERVAL as u16,
entry.seqno,
if entry.metric > u16::MAX - 1 - link_cost {
u16::MAX - 1
} else {
entry.metric + link_cost
},
key.prefix,
entry.source.router_id,
);
if peer.send_control_packet(update).is_err() {
println!("route update packet dropped");
}
}
}
// FILTER VOOR SELECTED ROUTE NODIG --> we sturen nu alle routes maar eignelijk moeten we enkel de selected routes gaan propagaten
}
}
async fn start_periodic_hello_sender(self) {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(HELLO_INTERVAL as u64)).await;
for peer in self.peer_interfaces().iter_mut() {
let hello = ControlPacket::new_hello(peer, HELLO_INTERVAL);
peer.set_time_last_received_hello(tokio::time::Instant::now());
if let Err(error) = peer.send_control_packet(hello) {
eprintln!("Error sending hello to peer: {}", error);
}
}
}
}
}
pub struct RouterInner {
pub router_id: u64,
peer_interfaces: Vec<Peer>,
router_control_tx: UnboundedSender<ControlStruct>,
router_data_tx: UnboundedSender<DataPacket>,
node_tun: Arc<Tun>,
routing_table: RoutingTable,
source_table: SourceTable,
router_seqno: u16,
static_routes: Vec<StaticRoute>,
}
impl RouterInner {
pub fn new(
node_tun: Arc<Tun>,
static_routes: Vec<StaticRoute>,
router_data_tx: UnboundedSender<DataPacket>,
router_control_tx: UnboundedSender<ControlStruct>,
) -> Result<Self, Box<dyn Error>> {
let router_inner = RouterInner {
router_id: rand::thread_rng().gen(),
peer_interfaces: Vec::new(),
router_control_tx,
router_data_tx,
node_tun: node_tun,
routing_table: RoutingTable::new(),
source_table: SourceTable::new(),
router_seqno: 0,
static_routes: static_routes,
};
Ok(router_inner)
}
}
+1 -3
View File
@@ -34,7 +34,6 @@ impl RouteEntry {
seqno: u16,
next_hop: IpAddr,
selected: bool,
router: Router,
) -> Self {
Self {
source,
@@ -46,13 +45,12 @@ impl RouteEntry {
}
}
pub fn update(&mut self, update: ControlStruct, should_be_selected: bool) {
pub fn update(&mut self, update: ControlStruct) {
// the update is assumed to be feasible here
match update.control_packet.body.tlv {
BabelTLV::Update { seqno, metric, .. } => {
self.metric = metric;
self.seqno = seqno;
self.selected = should_be_selected;
}
_ => {
panic!("Received update with invalid TLV");
+2
View File
@@ -13,6 +13,8 @@ pub struct SourceKey {
pub struct FeasibilityDistance(pub u16, pub u16); // (metric, seqno)
// Store (prefix, plen, router_id) -> feasibility distance mapping
#[derive(Debug)]
pub struct SourceTable {
pub table: HashMap<SourceKey, FeasibilityDistance>,
}