From 7f3fe758408dfd61899791f88e67c1bc2330665d Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 25 Aug 2023 11:59:24 +0200 Subject: [PATCH] Fix Tun Stream implementation Signed-off-by: Lee Smet --- src/tun/linux.rs | 48 +++++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/tun/linux.rs b/src/tun/linux.rs index d4ffa88..672181c 100644 --- a/src/tun/linux.rs +++ b/src/tun/linux.rs @@ -4,12 +4,13 @@ use std::{ io, mem, net::{IpAddr, Ipv4Addr, Ipv6Addr}, pin::Pin, - sync::Arc, task::Poll, }; use futures::{Future, Sink, Stream, TryStreamExt}; +use log::{debug, error, trace}; use rtnetlink::Handle; +use tokio::io::{AsyncRead, ReadBuf}; use tokio_tun::{Tun, TunBuilder}; use super::IpPacket; @@ -19,13 +20,13 @@ const LINK_MTU: i32 = 1420; /// A sender half of a tun interface. pub struct TxHalf { - inner: Arc, + inner: Tun, state: TxState, } /// A receiver half of a tun interface. pub struct RxHalf { - inner: Arc, + inner: Tun, buffer: Vec, mtu: usize, } @@ -48,7 +49,7 @@ pub async fn new( route_address: IpAddr, route_prefix_len: u8, ) -> Result<(RxHalf, TxHalf), Box> { - let tun = Arc::new(create_tun_interface(name)?); + let (tun_rx, tun_tx) = create_tun_interface(name)?; let (conn, handle, _) = rtnetlink::new_connection()?; let netlink_task_handle = tokio::spawn(conn); @@ -66,28 +67,28 @@ pub async fn new( Ok(( RxHalf { - inner: tun.clone(), + inner: tun_rx, buffer: vec![0; LINK_MTU as usize], mtu: LINK_MTU as usize, }, TxHalf { - inner: tun, + inner: tun_tx, state: TxState::Idle, }, )) } /// Create a new TUN interface -fn create_tun_interface(name: &str) -> Result> { - let tun = TunBuilder::new() +fn create_tun_interface(name: &str) -> Result<(Tun, Tun), Box> { + let mut tun = TunBuilder::new() .name(name) .tap(false) .mtu(LINK_MTU) .packet_info(false) .up() - .try_build()?; + .try_build_mq(2)?; - Ok(tun) + Ok((tun.remove(1), tun.remove(0))) } /// Retrieve the link index of an interface with the given name @@ -244,20 +245,33 @@ impl Stream for RxHalf { self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + trace!("Poll tun receiving half, buffer size {}", self.buffer.len()); let RxHalf { - inner, + ref mut inner, ref mut buffer, mtu, } = self.get_mut(); + // Assign poll result to a temporary variable to appease the borrow checker. Failure to do // so will result in a compilation error because there are 2 borrows on buffer. - let tmp = std::pin::pin!(inner.recv(buffer)).poll(cx); + let mut buf = ReadBuf::new(buffer); + let old_len = buf.filled().len(); + let tmp = Pin::new(inner).poll_read(cx, &mut buf)?; match tmp { - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), - Poll::Ready(Ok(0)) => Poll::Ready(None), - Poll::Ready(Ok(n)) => { - buffer.truncate(n); + Poll::Pending => { + trace!("Tun read is Poll::pending"); + Poll::Pending + } + Poll::Ready(()) => { + let buf_len = buf.filled().len(); + let read = buf_len - old_len; + trace!("Read {read} bytes packet from tun in poll method."); + if read == 0 { + // EOF reached + debug!("Stream read 0 bytes, closing"); + return Poll::Ready(None); + } + buffer.truncate(read); // Create new buffer. let mut new_buffer = vec![0; *mtu]; mem::swap(buffer, &mut new_buffer);