mirror of
https://github.com/threefoldtech/mycelium.git
synced 2026-03-29 07:39:51 +00:00
Return error in API when binding local socks5 socket fails
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
@@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
a topic filter is set.
|
||||
- When we stop probing for socks proxies, replace the cancellation token so future
|
||||
probes behave as expected.
|
||||
- Properly return an error from the API when trying to connect a proxy if we can't
|
||||
bind the local socket.
|
||||
|
||||
### Deprecated
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::time::Duration;
|
||||
|
||||
use crate::cdn::Cdn;
|
||||
use crate::packet_queue::IncomingPacketQueue;
|
||||
use crate::proxy::{ConnectionError, Proxy};
|
||||
use crate::proxy::{Proxy, ProxyError};
|
||||
use crate::tun::TunConfig;
|
||||
use bytes::BytesMut;
|
||||
use data::DataPlane;
|
||||
@@ -397,7 +397,7 @@ where
|
||||
pub fn connect_proxy(
|
||||
&self,
|
||||
remote: Option<SocketAddr>,
|
||||
) -> impl Future<Output = Result<SocketAddr, ConnectionError>> + Send {
|
||||
) -> impl Future<Output = Result<SocketAddr, ProxyError>> + Send {
|
||||
let proxy = self.proxy.clone();
|
||||
async move { proxy.connect(remote).await }
|
||||
}
|
||||
|
||||
@@ -229,7 +229,7 @@ where
|
||||
}
|
||||
|
||||
/// Connect to a remote Socks5 proxy. If a proxy address is given, connect to that one. If not, connect to the best (fastest) known proxy.
|
||||
pub async fn connect(&self, remote: Option<SocketAddr>) -> Result<SocketAddr, ConnectionError> {
|
||||
pub async fn connect(&self, remote: Option<SocketAddr>) -> Result<SocketAddr, ProxyError> {
|
||||
let target = match remote {
|
||||
Some(remote) => remote,
|
||||
None => {
|
||||
@@ -267,7 +267,7 @@ where
|
||||
|
||||
let target: Option<SocketAddr> = futs.filter_map(|o| o).next().await;
|
||||
if target.is_none() {
|
||||
return Err(ConnectionError { _private: () });
|
||||
return Err(ConnectionError { _private: () }.into());
|
||||
}
|
||||
// Safe since we just checked the None case above
|
||||
target.unwrap()
|
||||
@@ -281,7 +281,7 @@ where
|
||||
.lock()
|
||||
.expect("Can lock chosen remote; qed") = Some(target);
|
||||
|
||||
self.start_proxy();
|
||||
self.start_proxy().await?;
|
||||
|
||||
Ok(target)
|
||||
}
|
||||
@@ -300,30 +300,34 @@ where
|
||||
|
||||
/// Starts a background task for proxying connections.
|
||||
/// This spawns a listener, and proxies all connections to the chosen target.
|
||||
fn start_proxy(&self) {
|
||||
async fn start_proxy(&self) -> Result<(), ProxyError> {
|
||||
let target = *self
|
||||
.chosen_remote
|
||||
.lock()
|
||||
.expect("Can lock chosen remote; qed");
|
||||
if target.is_none() {
|
||||
warn!("Can't start proxy if target is none, this should not happen");
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
let target = target.unwrap();
|
||||
info!(%target, "Starting Socks5 proxy forwarding");
|
||||
// First cancel the old token, then set a new token
|
||||
let mut old_token = self.proxy_token.lock().expect("Can lock proxy token; qed");
|
||||
old_token.cancel();
|
||||
let proxy_token = CancellationToken::new();
|
||||
*old_token = proxy_token.clone();
|
||||
// We scope here to drop the lock on old_token as we can't cary that over an await point
|
||||
let proxy_token = {
|
||||
let mut old_token = self.proxy_token.lock().expect("Can lock proxy token; qed");
|
||||
old_token.cancel();
|
||||
let proxy_token = CancellationToken::new();
|
||||
*old_token = proxy_token.clone();
|
||||
proxy_token
|
||||
};
|
||||
|
||||
let listener = TcpListener::bind(("::", DEFAULT_SOCKS5_PORT))
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
error!(%err, "Could not bind TCP listener");
|
||||
})?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = TcpListener::bind(("::", DEFAULT_SOCKS5_PORT))
|
||||
.await
|
||||
.inspect_err(|err| {
|
||||
error!(%err, "Could not bind TCP listener");
|
||||
})?;
|
||||
|
||||
loop {
|
||||
select! {
|
||||
_ = proxy_token.cancelled() => {
|
||||
@@ -369,6 +373,39 @@ where
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ProxyError {
|
||||
/// No valid proxies found while scanning to auto connect.
|
||||
Connection(ConnectionError),
|
||||
/// IO error while binding local Socks5 listening socket.
|
||||
IO(std::io::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ProxyError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Connection(e) => e.fmt(f),
|
||||
Self::IO(e) => e.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ProxyError {}
|
||||
|
||||
impl From<ConnectionError> for ProxyError {
|
||||
fn from(value: ConnectionError) -> Self {
|
||||
Self::Connection(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for ProxyError {
|
||||
fn from(value: std::io::Error) -> Self {
|
||||
Self::IO(value)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user