From d40842fa6fb02cdb53bda163e57f40b01e4dca75 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 25 Aug 2023 17:01:43 +0200 Subject: [PATCH] Stream for data packets in router Signed-off-by: Lee Smet --- Cargo.lock | 2 ++ Cargo.toml | 1 + src/router.rs | 95 +++++++++++++++++++++++++++------------------------ 3 files changed, 54 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc23903..a6bbe18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,6 +975,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-stream", "tokio-tun", "tokio-util", "toml", @@ -1679,6 +1680,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fbeb5a9..2a30c64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,3 +34,4 @@ opentelemetry = { version = "0.19.0", features = [ ] } faster-hex = "0.8.0" console-subscriber = "0.1.10" +tokio-stream = { version = "0.1.14", features = ["sync"] } diff --git a/src/router.rs b/src/router.rs index 506f336..76c9a69 100644 --- a/src/router.rs +++ b/src/router.rs @@ -8,6 +8,7 @@ use crate::{ sequence_number::SeqNo, source_table::{FeasibilityDistance, SourceKey, SourceTable}, }; +use futures::StreamExt; use log::{debug, error, info, trace, warn}; use std::{ collections::HashMap, @@ -17,6 +18,7 @@ use std::{ sync::{Arc, RwLock}, }; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio_stream::wrappers::UnboundedReceiverStream; const HELLO_INTERVAL: u16 = 4; const IHU_INTERVAL: u16 = HELLO_INTERVAL * 3; @@ -674,61 +676,66 @@ impl Router { false } - async fn handle_incoming_data_packet(self, mut router_data_rx: UnboundedReceiver) { + async fn handle_incoming_data_packet(self, router_data_rx: UnboundedReceiver) { // If destination IP of packet is same as TUN interface IP, send to TUN interface // If destination IP of packet is not same as TUN interface IP, send to peer with matching overlay IP - let node_tun = self.node_tun(); - let node_tun_addr = self.node_tun_addr(); - loop { - while let Some(data_packet) = router_data_rx.recv().await { + // let node_tun = self.node_tun(); + // let node_tun_addr = self.node_tun_addr(); + UnboundedReceiverStream::new(router_data_rx) + .for_each(|data_packet| { + let router = self.clone(); + let node_tun = self.node_tun(); + let node_tun_addr = self.node_tun_addr(); + trace!("Incoming data packet, with dest_ip: {} (side node, this node's tun addr is: {})", data_packet.dest_ip, node_tun_addr); - if data_packet.dest_ip == node_tun_addr { - // decrypt & send to TUN interface - let pubkey_sender = data_packet.pubkey; - let shared_secret = match self.get_shared_secret_by_pubkey(&pubkey_sender) { - Some(ss) => ss, - None => self.node_secret_key().shared_secret(&pubkey_sender), - }; - let decrypted_raw_data = match shared_secret.decrypt(&data_packet.raw_data) { - Ok(data) => data, - Err(_) => { - log::debug!("Dropping data packet with invalid encrypted content"); - continue; - } - }; - match node_tun.send(decrypted_raw_data) { - Ok(_) => {} - Err(e) => { - error!("Error sending data packet to TUN interface: {:?}", e) - } - } - } else { - // send to peer with matching overlay IP - let best_route = self.select_best_route(IpAddr::V6(data_packet.dest_ip)); - match best_route { - Some(route_entry) => { - let peer = self.peer_by_ip(route_entry.next_hop()); - // drop the packet if the peer is couldn't be found - match peer { - Some(peer) => { - if let Err(e) = peer.send_data_packet(data_packet) { - error!("Error sending data packet to peer: {:?}", e); + async move { + if data_packet.dest_ip == node_tun_addr { + // decrypt & send to TUN interface + let pubkey_sender = data_packet.pubkey; + let shared_secret = match router.get_shared_secret_by_pubkey(&pubkey_sender) { + Some(ss) => ss, + None => router.node_secret_key().shared_secret(&pubkey_sender), + }; + let decrypted_raw_data = match shared_secret.decrypt(&data_packet.raw_data) { + Ok(data) => data, + Err(_) => { + log::debug!("Dropping data packet with invalid encrypted content"); + return; + } + }; + match node_tun.send(decrypted_raw_data) { + Ok(_) => {} + Err(e) => { + error!("Error sending data packet to TUN interface: {:?}", e) + } + } + } else { + // send to peer with matching overlay IP + let best_route = router.select_best_route(IpAddr::V6(data_packet.dest_ip)); + match best_route { + Some(route_entry) => { + let peer = router.peer_by_ip(route_entry.next_hop()); + // drop the packet if the peer is couldn't be found + match peer { + Some(peer) => { + if let Err(e) = peer.send_data_packet(data_packet) { + error!("Error sending data packet to peer: {:?}", e); + } + } + None => { + // route but no peer + warn!("Dropping data packet, no peer found"); + } } } None => { - // route but no peer - warn!("Dropping data packet, no peer found"); + trace!("Error sending data packet, no route found"); } } } - None => { - trace!("Error sending data packet, no route found"); - } - } } - } - } + }).await; } pub fn select_best_route(&self, dest_ip: IpAddr) -> Option {