mirror of
https://github.com/threefoldtech/mycelium.git
synced 2026-05-30 20:34:20 +00:00
first commit
This commit is contained in:
@@ -0,0 +1,2 @@
|
||||
/target
|
||||
nodeconfig.toml
|
||||
Vendored
+3
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"rust-analyzer.showUnlinkedFileNotification": false
|
||||
}
|
||||
Generated
+1032
File diff suppressed because it is too large
Load Diff
+18
@@ -0,0 +1,18 @@
|
||||
[package]
|
||||
name = "masterproef_v2"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1.26.0", features = ["full"] }
|
||||
tokio-tun = "0.7.0"
|
||||
tokio-util = { version = "0.7.7", features = ["codec"] }
|
||||
clap = { version = "4.1.8", features = ["derive"] }
|
||||
rtnetlink = "0.12.0"
|
||||
futures = "0.3.27"
|
||||
toml = "0.7.3"
|
||||
serde = { version = "1.0.158", features = ["derive"] }
|
||||
rand = "0.8.5"
|
||||
bytes = "1.4.0"
|
||||
+95
@@ -0,0 +1,95 @@
|
||||
use std::{
|
||||
error::Error,
|
||||
net::{Ipv4Addr},
|
||||
};
|
||||
|
||||
use clap::Parser;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
|
||||
mod node_setup;
|
||||
mod peer;
|
||||
mod peer_manager;
|
||||
|
||||
use peer::Peer;
|
||||
use peer_manager::PeerManager;
|
||||
|
||||
#[derive(Parser)]
|
||||
struct Cli {
|
||||
#[arg(short = 'a', long = "tun-addr")]
|
||||
tun_addr: Ipv4Addr,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
// Create TUN interface and add static route
|
||||
let node_tun = match node_setup::setup_node(cli.tun_addr).await {
|
||||
Ok(tun)=> {
|
||||
println!("Node setup complete");
|
||||
tun
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("Error setting up node: {}", e);
|
||||
}
|
||||
};
|
||||
|
||||
// Create an unbounded channel for this node
|
||||
let (to_tun, mut from_peers) = mpsc::unbounded_channel::<Vec<u8>>();
|
||||
|
||||
// Create the PeerManager: an interface to all peers this node is connected to
|
||||
// Each node should include itself as a peer
|
||||
// Additional static peers are obtained through the nodeconfig.toml file
|
||||
let myself = Peer{id: "0".to_string(), to_peer: to_tun.clone()};
|
||||
let peer_manager = PeerManager::new(myself);
|
||||
tokio::spawn(async move {
|
||||
peer_manager.get_peers_from_config(to_tun.clone()).await; // --> here we create peer by TcpStream connect
|
||||
|
||||
// listen for inbound request --> "to created the reverse peer object" --> here we reverse create peer be listener.accept'ing
|
||||
tokio::spawn(async move {
|
||||
match TcpListener::bind("[::]:9651").await {
|
||||
Ok(listener) => {
|
||||
// loop to accept the inbound requests
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((stream, _)) => {
|
||||
println!("Got inbound request from: {}", stream.peer_addr().unwrap().to_string());
|
||||
// "reverse peer add"
|
||||
let peer_id = stream.peer_addr().unwrap().to_string();
|
||||
match Peer::new(peer_id, to_tun.clone(), stream) {
|
||||
Ok(new_peer) => {
|
||||
let mut known_peers = peer_manager.known_peers.lock().unwrap();
|
||||
known_peers.push(new_peer);
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Error creating 'reverse' peer: {}", e);
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Error accepting TCP listener: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Error binding TCP listener: {}", e);
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
// TODO: Loop to read the 'from_peers' receiver and foward it toward the TUN interface
|
||||
|
||||
|
||||
// TODO: Loop to read from the TUN interface and forward it towards to correct destination peer (by selecting the correct to_peer sender)
|
||||
|
||||
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_secs(60 * 60 * 24)).await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
use tokio_tun::{Tun, TunBuilder};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
net::Ipv4Addr,
|
||||
error::Error,
|
||||
};
|
||||
use rtnetlink::Handle;
|
||||
use futures::stream::TryStreamExt;
|
||||
|
||||
pub const TUN_NAME: &str = "tun0";
|
||||
pub const TUN_ROUTE_DEST: Ipv4Addr = Ipv4Addr::new(10, 0, 0, 0);
|
||||
pub const TUN_ROUTE_PREFIX: u8 = 24;
|
||||
|
||||
// Create a TUN interface
|
||||
pub fn create_tun_interface(int_addr: Ipv4Addr) -> Result<Arc<Tun>, Box<dyn Error>> {
|
||||
let tun = TunBuilder::new()
|
||||
.name(TUN_NAME)
|
||||
.tap(false)
|
||||
.mtu(1420)
|
||||
.packet_info(false)
|
||||
.address(int_addr)
|
||||
.broadcast(Ipv4Addr::new(255, 255, 255, 0))
|
||||
.up()
|
||||
.try_build()?;
|
||||
|
||||
Ok(Arc::new(tun))
|
||||
}
|
||||
|
||||
// Add a route to the TUN interface
|
||||
pub async fn add_route(handle: Handle) -> Result<(), Box<dyn Error>> {
|
||||
let mut link_request = handle
|
||||
.link()
|
||||
.get()
|
||||
.match_name(String::from(TUN_NAME))
|
||||
.execute();
|
||||
|
||||
let link_idx = if let Some(link) = link_request.try_next().await? {
|
||||
link.header.index
|
||||
} else {
|
||||
eprintln!("link not found");
|
||||
panic!("link not found");
|
||||
};
|
||||
|
||||
let route = handle.route();
|
||||
route
|
||||
.add()
|
||||
.v4()
|
||||
.destination_prefix(TUN_ROUTE_DEST, TUN_ROUTE_PREFIX)
|
||||
.output_interface(link_idx)
|
||||
.execute()
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn setup_node(tun_addr: Ipv4Addr) -> Result<Arc<Tun>, Box<dyn Error>> {
|
||||
match create_tun_interface(tun_addr) {
|
||||
Ok(tun) => {
|
||||
println!("Interface '{}' ({}) created", TUN_NAME, tun_addr);
|
||||
match rtnetlink::new_connection() {
|
||||
Ok((conn, handle, _)) => {
|
||||
tokio::spawn(conn);
|
||||
match add_route(handle.clone()).await {
|
||||
Ok(_) => {
|
||||
println!("Static route created");
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("Error adding route: {}", e);
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("Error creating new handle: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Ok(tun)
|
||||
},
|
||||
Err(e) => {
|
||||
panic!("Error creating TUN: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
+63
@@ -0,0 +1,63 @@
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::TcpStream,
|
||||
sync::{mpsc, Mutex},
|
||||
select,
|
||||
};
|
||||
use std::{error::Error, sync::Arc};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Peer {
|
||||
pub id: String,
|
||||
pub to_peer: mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
pub fn new(id: String, to_tun: mpsc::UnboundedSender<Vec<u8>>, mut stream: TcpStream) -> Result<Self, Box<dyn Error>> {
|
||||
|
||||
// Create channel for each peer
|
||||
let (to_peer, mut from_tun) = mpsc::unbounded_channel::<Vec<u8>>();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let link_mtu = 1500;
|
||||
let mut read_buf = vec![0u8; link_mtu];
|
||||
|
||||
select! {
|
||||
// Read from TCP stream, write to 'to_tun'
|
||||
read_result = stream.read(&mut read_buf) => {
|
||||
match read_result {
|
||||
Ok(n) => {
|
||||
// Truncate buffer, removing any extra bytes
|
||||
read_buf.truncate(n);
|
||||
|
||||
// For testing purposes
|
||||
println!("Received bytes from peer: {:?}", read_buf);
|
||||
|
||||
// Send to TUN interface
|
||||
if let Err(error) = to_tun.send(read_buf.clone()) {
|
||||
eprintln!("Error sending to TUN: {}", error);
|
||||
}
|
||||
},
|
||||
Err(error) => {
|
||||
eprintln!("Error reading from TCP stream: {}", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Read from 'from_tun' receiver, write to TCP stream
|
||||
Some(packet) = from_tun.recv() => {
|
||||
if let Err(error) = stream.write_all(&packet).await {
|
||||
eprintln!("Error writing to TCP stream: {}", error);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
id,
|
||||
to_peer,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
use std::collections::HashMap;
|
||||
use tokio::{sync::mpsc::UnboundedSender, net::TcpStream};
|
||||
|
||||
use crate::peer::{Peer, self};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use serde::Deserialize;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
pub const NODE_CONFIG_FILE_PATH: &str = "nodeconfig.toml";
|
||||
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PeersConfig {
|
||||
peers: Vec<SocketAddr>,
|
||||
}
|
||||
|
||||
pub struct PeerManager {
|
||||
pub known_peers: Mutex<Vec<Peer>>,
|
||||
}
|
||||
|
||||
impl PeerManager {
|
||||
pub fn new(myself_peer: Peer) -> Self {
|
||||
let mut known_peers: Vec<Peer> = Vec::new();
|
||||
// add itself
|
||||
known_peers.push(myself_peer);
|
||||
|
||||
Self {
|
||||
known_peers: Mutex::new(known_peers),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_peers_from_config(&self, to_tun: UnboundedSender<Vec<u8>>) {
|
||||
// Read from the nodeconfig.toml file
|
||||
match std::fs::read_to_string(NODE_CONFIG_FILE_PATH) {
|
||||
Ok(file_content) => {
|
||||
// Create a PeersConfig based on the file content
|
||||
let config: PeersConfig = toml::from_str(&file_content).unwrap();
|
||||
for peer_addr in config.peers {
|
||||
match TcpStream::connect(peer_addr).await {
|
||||
Ok(peer_stream) => {
|
||||
println!("TCP stream connected: {}", peer_addr);
|
||||
// Create peer instance
|
||||
let peer_id = peer_addr.to_string();
|
||||
match Peer::new(peer_id, to_tun.clone(), peer_stream) {
|
||||
Ok(new_peer) => {
|
||||
// Add peer to known_peers
|
||||
let mut known_peers = self.known_peers.lock().unwrap();
|
||||
known_peers.push(new_peer);
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Error creating peer: {}", e);
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Error connecting to TCP stream for {}: {}", peer_addr.to_string(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Error reading nodeconfig.toml file: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user