diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4575d24..9faa67b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,6 +47,7 @@ jobs: - name: Setup system for test run: | + sudo modprobe br_netfilter sudo sysctl net.bridge.bridge-nf-call-iptables=0 sudo sysctl net.bridge.bridge-nf-call-ip6tables=0 sudo sysctl net.ipv6.conf.lo.disable_ipv6=0 diff --git a/Cargo.lock b/Cargo.lock index 23ccdd0..4d8dc3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2029,6 +2029,7 @@ dependencies = [ "tauri-plugin-shell", "tauri-plugin-single-instance", "tauri-plugin-vpnservice", + "thunk-rs", "tokio", ] @@ -2079,6 +2080,7 @@ dependencies = [ "sqlx", "sys-locale", "thiserror 1.0.63", + "thunk-rs", "tokio", "tower-http", "tower-sessions", @@ -8155,8 +8157,7 @@ dependencies = [ [[package]] name = "thunk-rs" version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cbc000e786a7ea2cfa3a85ef77cf86bfdadeaa2b215ec4751df66442fa4632a" +source = "git+https://github.com/easytier/thunk.git#5e8371a3100dbc18dda952a2036c6bd6fb0504db" [[package]] name = "tiff" diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index 9bd52f4..5a9c82a 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -106,8 +106,8 @@ async fn handle_kcp_output( #[derive(Debug, Clone)] pub struct NatDstKcpConnector { - kcp_endpoint: Arc, - peer_mgr: Arc, + pub(crate) kcp_endpoint: Arc, + pub(crate) peer_mgr: Arc, } #[async_trait::async_trait] @@ -299,6 +299,10 @@ impl KcpProxySrc { pub fn get_tcp_proxy(&self) -> Arc> { self.tcp_proxy.0.clone() } + + pub fn get_kcp_endpoint(&self) -> Arc { + self.kcp_endpoint.clone() + } } pub struct KcpProxyDst { diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs index c979a53..0d8c99c 100644 --- a/easytier/src/gateway/socks5.rs +++ b/easytier/src/gateway/socks5.rs @@ -1,10 +1,11 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::Arc, + sync::{Arc, Weak}, time::{Duration, Instant}, }; use crossbeam::atomic::AtomicCell; +use kcp_sys::{endpoint::KcpEndpoint, stream::KcpStream}; use crate::{ common::{ @@ -19,6 +20,7 @@ use crate::{ util::stream::tcp_connect_with_timeout, }, ip_reassembler::IpReassembler, + kcp_proxy::NatDstKcpConnector, tokio_smoltcp::{channel_device, Net, NetConfig}, }, tunnel::packet_def::{PacketType, ZCPacket}, @@ -43,6 +45,8 @@ use crate::{ peers::{peer_manager::PeerManager, PeerPacketFilter}, }; +use super::tcp_proxy::NatDstConnector as _; + enum SocksUdpSocket { UdpSocket(Arc), SmolUdpSocket(super::tokio_smoltcp::UdpSocket), @@ -67,6 +71,7 @@ impl SocksUdpSocket { enum SocksTcpStream { TcpStream(tokio::net::TcpStream), SmolTcpStream(super::tokio_smoltcp::TcpStream), + KcpStream(KcpStream), } impl AsyncRead for SocksTcpStream { @@ -82,6 +87,9 @@ impl AsyncRead for SocksTcpStream { SocksTcpStream::SmolTcpStream(ref mut stream) => { std::pin::Pin::new(stream).poll_read(cx, buf) } + SocksTcpStream::KcpStream(ref mut stream) => { + std::pin::Pin::new(stream).poll_read(cx, buf) + } } } } @@ -99,6 +107,9 @@ impl AsyncWrite for SocksTcpStream { SocksTcpStream::SmolTcpStream(ref mut stream) => { std::pin::Pin::new(stream).poll_write(cx, buf) } + SocksTcpStream::KcpStream(ref mut stream) => { + std::pin::Pin::new(stream).poll_write(cx, buf) + } } } @@ -111,6 +122,7 @@ impl AsyncWrite for SocksTcpStream { SocksTcpStream::SmolTcpStream(ref mut stream) => { std::pin::Pin::new(stream).poll_flush(cx) } + SocksTcpStream::KcpStream(ref mut stream) => std::pin::Pin::new(stream).poll_flush(cx), } } @@ -125,6 +137,9 @@ impl AsyncWrite for SocksTcpStream { SocksTcpStream::SmolTcpStream(ref mut stream) => { std::pin::Pin::new(stream).poll_shutdown(cx) } + SocksTcpStream::KcpStream(ref mut stream) => { + std::pin::Pin::new(stream).poll_shutdown(cx) + } } } } @@ -204,6 +219,40 @@ impl Drop for SmolTcpConnector { } } +struct Socks5KcpConnector { + kcp_endpoint: Weak, + peer_mgr: Weak, + src_addr: SocketAddr, +} + +#[async_trait::async_trait] +impl AsyncTcpConnector for Socks5KcpConnector { + type S = SocksTcpStream; + + async fn tcp_connect( + &self, + addr: SocketAddr, + _timeout_s: u64, + ) -> crate::gateway::fast_socks5::Result { + let Some(kcp_endpoint) = self.kcp_endpoint.upgrade() else { + return Err(anyhow::anyhow!("kcp endpoint is not ready").into()); + }; + let Some(peer_mgr) = self.peer_mgr.upgrade() else { + return Err(anyhow::anyhow!("peer mgr is not ready").into()); + }; + let c = NatDstKcpConnector { + kcp_endpoint, + peer_mgr, + }; + println!("connect to kcp endpoint, addr = {:?}", addr); + let ret = c + .connect(self.src_addr, addr) + .await + .map_err(|e| super::fast_socks5::SocksError::Other(e.into()))?; + Ok(SocksTcpStream::KcpStream(ret)) + } +} + struct Socks5ServerNet { ipv4_addr: cidr::Ipv4Inet, auth: Option, @@ -345,6 +394,8 @@ pub struct Socks5Server { tcp_forward_task: Arc>>, udp_client_map: Arc>>, udp_forward_task: Arc>>, + + kcp_endpoint: Mutex>>, } #[async_trait::async_trait] @@ -442,6 +493,8 @@ impl Socks5Server { tcp_forward_task: Arc::new(std::sync::Mutex::new(JoinSet::new())), udp_client_map: Arc::new(DashMap::new()), udp_forward_task: Arc::new(DashMap::new()), + + kcp_endpoint: Mutex::new(None), }) } @@ -487,7 +540,11 @@ impl Socks5Server { }); } - pub async fn run(self: &Arc) -> Result<(), Error> { + pub async fn run( + self: &Arc, + kcp_endpoint: Option>, + ) -> Result<(), Error> { + *self.kcp_endpoint.lock().await = kcp_endpoint; let mut need_start = false; if let Some(proxy_url) = self.global_ctx.config.get_socks5_portal() { let bind_addr = format!( @@ -539,7 +596,7 @@ impl Socks5Server { async fn handle_port_forward_connection( mut incoming_socket: tokio::net::TcpStream, - connector: SmolTcpConnector, + connector: Box + Send>, dst_addr: SocketAddr, ) { let outgoing_socket = match connector.tcp_connect(dst_addr, 10).await { @@ -601,10 +658,12 @@ impl Socks5Server { let entries = self.entries.clone(); let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new())); let forward_tasks = tasks.clone(); + let kcp_endpoint = self.kcp_endpoint.lock().await.clone(); + let peer_mgr = Arc::downgrade(&self.peer_manager.clone()); self.tasks.lock().unwrap().spawn(async move { loop { - let (incoming_socket, _addr) = match listener.accept().await { + let (incoming_socket, addr) = match listener.accept().await { Ok(result) => result, Err(err) => { tracing::error!("port forward accept error = {:?}", err); @@ -624,11 +683,21 @@ impl Socks5Server { continue; }; - let connector = SmolTcpConnector { - net: net.smoltcp_net.clone(), - entries: entries.clone(), - current_entry: std::sync::Mutex::new(None), - }; + let connector: Box + Send> = + if kcp_endpoint.is_none() { + Box::new(SmolTcpConnector { + net: net.smoltcp_net.clone(), + entries: entries.clone(), + current_entry: std::sync::Mutex::new(None), + }) + } else { + let kcp_endpoint = kcp_endpoint.as_ref().unwrap().clone(); + Box::new(Socks5KcpConnector { + kcp_endpoint, + peer_mgr: peer_mgr.clone(), + src_addr: addr, + }) + }; forward_tasks .lock() diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 9460f14..ebdb2f1 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -417,7 +417,13 @@ impl Instance { } #[cfg(feature = "socks5")] - self.socks5_server.run().await?; + self.socks5_server + .run( + self.kcp_proxy_src + .as_ref() + .map(|x| Arc::downgrade(&x.get_kcp_endpoint())), + ) + .await?; self.run_rpc_server().await?; diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 2eae161..2f364f3 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -175,6 +175,16 @@ pub async fn init_three_node_ex TomlConfigLoader>( ) .await; + wait_for_condition( + || async { + let routes = inst3.get_peer_manager().list_routes().await; + println!("routes: {:?}", routes); + routes.len() == 2 + }, + Duration::from_secs(5), + ) + .await; + vec![inst1, inst2, inst3] } @@ -898,6 +908,7 @@ pub async fn manual_reconnector(#[values(true, false)] is_foreign: bool) { pub async fn port_forward_test( #[values(true, false)] no_tun: bool, #[values(64, 1900)] buf_size: u64, + #[values(true, false)] enable_kcp: bool, ) { prepare_linux_namespaces(); @@ -936,6 +947,7 @@ pub async fn port_forward_test( } let mut flags = cfg.get_flags(); flags.no_tun = no_tun; + flags.enable_kcp_proxy = enable_kcp; cfg.set_flags(flags); cfg },