mirror of
https://github.com/threefoldtech/mycelium.git
synced 2026-06-05 23:31:38 +00:00
Fix Tun Stream implementation
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
+31
-17
@@ -4,12 +4,13 @@ use std::{
|
||||
io, mem,
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use futures::{Future, Sink, Stream, TryStreamExt};
|
||||
use log::{debug, error, trace};
|
||||
use rtnetlink::Handle;
|
||||
use tokio::io::{AsyncRead, ReadBuf};
|
||||
use tokio_tun::{Tun, TunBuilder};
|
||||
|
||||
use super::IpPacket;
|
||||
@@ -19,13 +20,13 @@ const LINK_MTU: i32 = 1420;
|
||||
|
||||
/// A sender half of a tun interface.
|
||||
pub struct TxHalf {
|
||||
inner: Arc<Tun>,
|
||||
inner: Tun,
|
||||
state: TxState,
|
||||
}
|
||||
|
||||
/// A receiver half of a tun interface.
|
||||
pub struct RxHalf {
|
||||
inner: Arc<Tun>,
|
||||
inner: Tun,
|
||||
buffer: Vec<u8>,
|
||||
mtu: usize,
|
||||
}
|
||||
@@ -48,7 +49,7 @@ pub async fn new(
|
||||
route_address: IpAddr,
|
||||
route_prefix_len: u8,
|
||||
) -> Result<(RxHalf, TxHalf), Box<dyn std::error::Error>> {
|
||||
let tun = Arc::new(create_tun_interface(name)?);
|
||||
let (tun_rx, tun_tx) = create_tun_interface(name)?;
|
||||
|
||||
let (conn, handle, _) = rtnetlink::new_connection()?;
|
||||
let netlink_task_handle = tokio::spawn(conn);
|
||||
@@ -66,28 +67,28 @@ pub async fn new(
|
||||
|
||||
Ok((
|
||||
RxHalf {
|
||||
inner: tun.clone(),
|
||||
inner: tun_rx,
|
||||
buffer: vec![0; LINK_MTU as usize],
|
||||
mtu: LINK_MTU as usize,
|
||||
},
|
||||
TxHalf {
|
||||
inner: tun,
|
||||
inner: tun_tx,
|
||||
state: TxState::Idle,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
/// Create a new TUN interface
|
||||
fn create_tun_interface(name: &str) -> Result<Tun, Box<dyn std::error::Error>> {
|
||||
let tun = TunBuilder::new()
|
||||
fn create_tun_interface(name: &str) -> Result<(Tun, Tun), Box<dyn std::error::Error>> {
|
||||
let mut tun = TunBuilder::new()
|
||||
.name(name)
|
||||
.tap(false)
|
||||
.mtu(LINK_MTU)
|
||||
.packet_info(false)
|
||||
.up()
|
||||
.try_build()?;
|
||||
.try_build_mq(2)?;
|
||||
|
||||
Ok(tun)
|
||||
Ok((tun.remove(1), tun.remove(0)))
|
||||
}
|
||||
|
||||
/// Retrieve the link index of an interface with the given name
|
||||
@@ -244,20 +245,33 @@ impl Stream for RxHalf {
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
trace!("Poll tun receiving half, buffer size {}", self.buffer.len());
|
||||
let RxHalf {
|
||||
inner,
|
||||
ref mut inner,
|
||||
ref mut buffer,
|
||||
mtu,
|
||||
} = self.get_mut();
|
||||
|
||||
// Assign poll result to a temporary variable to appease the borrow checker. Failure to do
|
||||
// so will result in a compilation error because there are 2 borrows on buffer.
|
||||
let tmp = std::pin::pin!(inner.recv(buffer)).poll(cx);
|
||||
let mut buf = ReadBuf::new(buffer);
|
||||
let old_len = buf.filled().len();
|
||||
let tmp = Pin::new(inner).poll_read(cx, &mut buf)?;
|
||||
match tmp {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
|
||||
Poll::Ready(Ok(0)) => Poll::Ready(None),
|
||||
Poll::Ready(Ok(n)) => {
|
||||
buffer.truncate(n);
|
||||
Poll::Pending => {
|
||||
trace!("Tun read is Poll::pending");
|
||||
Poll::Pending
|
||||
}
|
||||
Poll::Ready(()) => {
|
||||
let buf_len = buf.filled().len();
|
||||
let read = buf_len - old_len;
|
||||
trace!("Read {read} bytes packet from tun in poll method.");
|
||||
if read == 0 {
|
||||
// EOF reached
|
||||
debug!("Stream read 0 bytes, closing");
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
buffer.truncate(read);
|
||||
// Create new buffer.
|
||||
let mut new_buffer = vec![0; *mtu];
|
||||
mem::swap(buffer, &mut new_buffer);
|
||||
|
||||
Reference in New Issue
Block a user