mirror of
https://github.com/threefoldtech/mycelium.git
synced 2026-06-04 14:51:39 +00:00
Stream for data packets in router
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Generated
+2
@@ -975,6 +975,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-tun",
|
||||
"tokio-util",
|
||||
"toml",
|
||||
@@ -1679,6 +1680,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -34,3 +34,4 @@ opentelemetry = { version = "0.19.0", features = [
|
||||
] }
|
||||
faster-hex = "0.8.0"
|
||||
console-subscriber = "0.1.10"
|
||||
tokio-stream = { version = "0.1.14", features = ["sync"] }
|
||||
|
||||
+51
-44
@@ -8,6 +8,7 @@ use crate::{
|
||||
sequence_number::SeqNo,
|
||||
source_table::{FeasibilityDistance, SourceKey, SourceTable},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
@@ -17,6 +18,7 @@ use std::{
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
const HELLO_INTERVAL: u16 = 4;
|
||||
const IHU_INTERVAL: u16 = HELLO_INTERVAL * 3;
|
||||
@@ -674,61 +676,66 @@ impl Router {
|
||||
false
|
||||
}
|
||||
|
||||
async fn handle_incoming_data_packet(self, mut router_data_rx: UnboundedReceiver<DataPacket>) {
|
||||
async fn handle_incoming_data_packet(self, router_data_rx: UnboundedReceiver<DataPacket>) {
|
||||
// If destination IP of packet is same as TUN interface IP, send to TUN interface
|
||||
// If destination IP of packet is not same as TUN interface IP, send to peer with matching overlay IP
|
||||
let node_tun = self.node_tun();
|
||||
let node_tun_addr = self.node_tun_addr();
|
||||
loop {
|
||||
while let Some(data_packet) = router_data_rx.recv().await {
|
||||
// let node_tun = self.node_tun();
|
||||
// let node_tun_addr = self.node_tun_addr();
|
||||
UnboundedReceiverStream::new(router_data_rx)
|
||||
.for_each(|data_packet| {
|
||||
let router = self.clone();
|
||||
let node_tun = self.node_tun();
|
||||
let node_tun_addr = self.node_tun_addr();
|
||||
|
||||
trace!("Incoming data packet, with dest_ip: {} (side node, this node's tun addr is: {})", data_packet.dest_ip, node_tun_addr);
|
||||
|
||||
if data_packet.dest_ip == node_tun_addr {
|
||||
// decrypt & send to TUN interface
|
||||
let pubkey_sender = data_packet.pubkey;
|
||||
let shared_secret = match self.get_shared_secret_by_pubkey(&pubkey_sender) {
|
||||
Some(ss) => ss,
|
||||
None => self.node_secret_key().shared_secret(&pubkey_sender),
|
||||
};
|
||||
let decrypted_raw_data = match shared_secret.decrypt(&data_packet.raw_data) {
|
||||
Ok(data) => data,
|
||||
Err(_) => {
|
||||
log::debug!("Dropping data packet with invalid encrypted content");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match node_tun.send(decrypted_raw_data) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Error sending data packet to TUN interface: {:?}", e)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// send to peer with matching overlay IP
|
||||
let best_route = self.select_best_route(IpAddr::V6(data_packet.dest_ip));
|
||||
match best_route {
|
||||
Some(route_entry) => {
|
||||
let peer = self.peer_by_ip(route_entry.next_hop());
|
||||
// drop the packet if the peer is couldn't be found
|
||||
match peer {
|
||||
Some(peer) => {
|
||||
if let Err(e) = peer.send_data_packet(data_packet) {
|
||||
error!("Error sending data packet to peer: {:?}", e);
|
||||
async move {
|
||||
if data_packet.dest_ip == node_tun_addr {
|
||||
// decrypt & send to TUN interface
|
||||
let pubkey_sender = data_packet.pubkey;
|
||||
let shared_secret = match router.get_shared_secret_by_pubkey(&pubkey_sender) {
|
||||
Some(ss) => ss,
|
||||
None => router.node_secret_key().shared_secret(&pubkey_sender),
|
||||
};
|
||||
let decrypted_raw_data = match shared_secret.decrypt(&data_packet.raw_data) {
|
||||
Ok(data) => data,
|
||||
Err(_) => {
|
||||
log::debug!("Dropping data packet with invalid encrypted content");
|
||||
return;
|
||||
}
|
||||
};
|
||||
match node_tun.send(decrypted_raw_data) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("Error sending data packet to TUN interface: {:?}", e)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// send to peer with matching overlay IP
|
||||
let best_route = router.select_best_route(IpAddr::V6(data_packet.dest_ip));
|
||||
match best_route {
|
||||
Some(route_entry) => {
|
||||
let peer = router.peer_by_ip(route_entry.next_hop());
|
||||
// drop the packet if the peer is couldn't be found
|
||||
match peer {
|
||||
Some(peer) => {
|
||||
if let Err(e) = peer.send_data_packet(data_packet) {
|
||||
error!("Error sending data packet to peer: {:?}", e);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// route but no peer
|
||||
warn!("Dropping data packet, no peer found");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// route but no peer
|
||||
warn!("Dropping data packet, no peer found");
|
||||
trace!("Error sending data packet, no route found");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
trace!("Error sending data packet, no route found");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}).await;
|
||||
}
|
||||
|
||||
pub fn select_best_route(&self, dest_ip: IpAddr) -> Option<RouteEntry> {
|
||||
|
||||
Reference in New Issue
Block a user