mirror of
https://github.com/threefoldtech/mycelium.git
synced 2026-05-12 10:34:49 +00:00
Apply formatting to src/proxy.rs
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
+44
-23
@@ -92,7 +92,11 @@ where
|
||||
info!("Start Socks5 proxy probing");
|
||||
let router = self.router.clone();
|
||||
let proxy_cache = self.proxy_cache.clone();
|
||||
let cancel_token = self.scan_token.lock().expect("Can lock cancel_token; qed").clone();
|
||||
let cancel_token = self
|
||||
.scan_token
|
||||
.lock()
|
||||
.expect("Can lock cancel_token; qed")
|
||||
.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
@@ -115,8 +119,7 @@ where
|
||||
|
||||
debug!(%address, "Probing Socks5 proxy");
|
||||
|
||||
if
|
||||
timeout(PROBE_TIMEOUT, async {
|
||||
if timeout(PROBE_TIMEOUT, async {
|
||||
let mut stream =
|
||||
match TcpStream::connect((address, DEFAULT_SOCKS5_PORT)).await {
|
||||
Ok(stream) => stream,
|
||||
@@ -192,10 +195,13 @@ where
|
||||
});
|
||||
}
|
||||
|
||||
/// Stops any ongoing probes.
|
||||
/// Stops any ongoing probes.
|
||||
pub fn stop_scanning(&self) {
|
||||
info!("Stopping Socks5 proxy probing");
|
||||
self.scan_token.lock().expect("Can lock cancel token; qed").cancel();
|
||||
self.scan_token
|
||||
.lock()
|
||||
.expect("Can lock cancel token; qed")
|
||||
.cancel();
|
||||
}
|
||||
|
||||
/// Connect to a remote Socks5 proxy. If a proxy address is given, connect to that one. If not, connect to the best (fastest) known proxy.
|
||||
@@ -207,31 +213,37 @@ where
|
||||
// Find best proxy of our internal list by racing all proxies and finding the first
|
||||
// one which gives a valid response.
|
||||
let futs = FuturesUnordered::new();
|
||||
for ip in self.proxy_cache.read().expect("Can read lock proxy cache; qed").iter().filter_map(|(address, ps)| {
|
||||
if matches!(ps, ProxyProbeStatus::Valid) {
|
||||
Some(*address)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}) {
|
||||
for ip in self
|
||||
.proxy_cache
|
||||
.read()
|
||||
.expect("Can read lock proxy cache; qed")
|
||||
.iter()
|
||||
.filter_map(|(address, ps)| {
|
||||
if matches!(ps, ProxyProbeStatus::Valid) {
|
||||
Some(*address)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
{
|
||||
futs.push(async move {
|
||||
// It's fine to swallow errors here, we are just sanity checking
|
||||
let addr: SocketAddr = (ip, DEFAULT_SOCKS5_PORT).into();
|
||||
trace!(%addr, "Checking proxy availability and latency");
|
||||
let mut stream = TcpStream::connect(addr).await.ok()?;
|
||||
stream.write_all(&SOCKS5_CLIENT_GREETING).await.ok()?;
|
||||
let mut recv_buf = [0;2];
|
||||
let mut recv_buf = [0; 2];
|
||||
stream.read_exact(&mut recv_buf).await.ok()?;
|
||||
match recv_buf {
|
||||
SOCKS5_EXPECTED_SERVER_CHOICE => Some(addr),
|
||||
_ => None
|
||||
_ => None,
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let target: Option<SocketAddr> = futs.filter_map(|o| o).next().await;
|
||||
if target.is_none() {
|
||||
return Err(ConnectionError{_private: ()})
|
||||
return Err(ConnectionError { _private: () });
|
||||
}
|
||||
// Safe since we just checked the None case above
|
||||
target.unwrap()
|
||||
@@ -240,23 +252,33 @@ where
|
||||
|
||||
// Now that we have a target, "connect" to it, i.e. set it as proxy destination.
|
||||
debug!(%target, "Setting remote Socks5 proxy");
|
||||
*self.chosen_remote.lock().expect("Can lock chosen remote; qed") = Some(target);
|
||||
|
||||
*self
|
||||
.chosen_remote
|
||||
.lock()
|
||||
.expect("Can lock chosen remote; qed") = Some(target);
|
||||
|
||||
Ok(target)
|
||||
}
|
||||
|
||||
/// Disconnects from the proxy, if any is connected
|
||||
pub async fn disconnect(&self) {
|
||||
self.proxy_token.lock().expect("Can lock proxy token; qed").cancel();
|
||||
*self.chosen_remote.lock().expect("Can lock chosen remote; qed") = None;
|
||||
|
||||
self.proxy_token
|
||||
.lock()
|
||||
.expect("Can lock proxy token; qed")
|
||||
.cancel();
|
||||
*self
|
||||
.chosen_remote
|
||||
.lock()
|
||||
.expect("Can lock chosen remote; qed") = None;
|
||||
}
|
||||
|
||||
/// Starts a background task for proxying connections.
|
||||
/// This spawns a listener, and proxies all connections to the chosen target.
|
||||
fn start_proxy(&self) {
|
||||
let target = *self.chosen_remote.lock().expect("Can lock chosen remote; qed");
|
||||
let target = *self
|
||||
.chosen_remote
|
||||
.lock()
|
||||
.expect("Can lock chosen remote; qed");
|
||||
// First cancel the old token, then set a new token
|
||||
let mut old_token = self.proxy_token.lock().expect("Can lock proxy token; qed");
|
||||
old_token.cancel();
|
||||
@@ -303,7 +325,7 @@ where
|
||||
}
|
||||
Ok((_, _)) => {
|
||||
trace!(%source, %target, "Proxy stream finished normally");
|
||||
Ok(())
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -324,7 +346,6 @@ where
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionError {
|
||||
_private: (),
|
||||
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ConnectionError {
|
||||
|
||||
Reference in New Issue
Block a user