From ae54a872ce7aa18643a1a0dea02edfcd456bfd90 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Wed, 14 Aug 2024 23:26:15 +0800 Subject: [PATCH] support socks5 proxy usage: --socks5 12345 create an socks5 server on port 12345, can be used by socks5 client to access virtual network. --- easytier/Cargo.toml | 8 +- easytier/locales/app.yml | 5 +- easytier/src/common/config.rs | 13 + easytier/src/easytier-core.rs | 18 +- easytier/src/gateway/fast_socks5/LICENSE | 21 + easytier/src/gateway/fast_socks5/README.md | 1 + easytier/src/gateway/fast_socks5/mod.rs | 318 +++++++ easytier/src/gateway/fast_socks5/server.rs | 842 ++++++++++++++++++ easytier/src/gateway/fast_socks5/util/mod.rs | 2 + .../src/gateway/fast_socks5/util/stream.rs | 65 ++ .../gateway/fast_socks5/util/target_addr.rs | 244 +++++ easytier/src/gateway/mod.rs | 6 + easytier/src/gateway/socks5.rs | 330 +++++++ easytier/src/gateway/tokio_smoltcp/mod.rs | 11 +- easytier/src/instance/instance.rs | 15 + 15 files changed, 1889 insertions(+), 10 deletions(-) create mode 100644 easytier/src/gateway/fast_socks5/LICENSE create mode 100644 easytier/src/gateway/fast_socks5/README.md create mode 100644 easytier/src/gateway/fast_socks5/mod.rs create mode 100644 easytier/src/gateway/fast_socks5/server.rs create mode 100644 easytier/src/gateway/fast_socks5/util/mod.rs create mode 100644 easytier/src/gateway/fast_socks5/util/stream.rs create mode 100644 easytier/src/gateway/fast_socks5/util/target_addr.rs create mode 100644 easytier/src/gateway/socks5.rs diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 861a845..fd06e6e 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -206,7 +206,7 @@ defguard_wireguard_rs = "0.4.2" [features] -default = ["wireguard", "mimalloc", "websocket", "smoltcp", "tun"] +default = ["wireguard", "mimalloc", "websocket", "smoltcp", "tun", "socks5"] full = [ "quic", "websocket", @@ -215,9 +215,10 @@ full = [ "aes-gcm", "smoltcp", "tun", + "socks5", ] -mips = ["aes-gcm", "mimalloc", "wireguard", "tun", "smoltcp"] -bsd = ["aes-gcm", "mimalloc", "smoltcp"] +mips = ["aes-gcm", "mimalloc", "wireguard", "tun", "smoltcp", "socks5"] +bsd = ["aes-gcm", "mimalloc", "smoltcp", "socks5"] wireguard = ["dep:boringtun", "dep:ring"] quic = ["dep:quinn", "dep:rustls", "dep:rcgen"] mimalloc = ["dep:mimalloc-rust"] @@ -231,3 +232,4 @@ websocket = [ "dep:rcgen", ] smoltcp = ["dep:smoltcp", "dep:parking_lot"] +socks5 = ["dep:smoltcp"] diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index baa3a2e..19cda43 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -110,4 +110,7 @@ core_clap: zh-CN: "禁用P2P通信,只通过--peers指定的节点转发数据包" relay_all_peer_rpc: en: "relay all peer rpc packets, even if the peer is not in the relay network whitelist. this can help peers not in relay network whitelist to establish p2p connection." - zh-CN: "转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。这可以帮助白名单外网络中的对等节点建立P2P连接。" \ No newline at end of file + zh-CN: "转发所有对等节点的RPC数据包,即使对等节点不在转发网络白名单中。这可以帮助白名单外网络中的对等节点建立P2P连接。" + socks5: + en: "enable socks5 server, allow socks5 client to access virtual network. format: , e.g.: 1080" + zh-CN: "启用 socks5 服务器,允许 socks5 客户端访问虚拟网络. 格式: <端口>,例如:1080" \ No newline at end of file diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index 66e5a42..fc68d8f 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -64,6 +64,9 @@ pub trait ConfigLoader: Send + Sync { fn get_routes(&self) -> Option>; fn set_routes(&self, routes: Option>); + fn get_socks5_portal(&self) -> Option; + fn set_socks5_portal(&self, addr: Option); + fn dump(&self) -> String; } @@ -201,6 +204,8 @@ struct Config { routes: Option>, + socks5_proxy: Option, + flags: Option, } @@ -500,6 +505,14 @@ impl ConfigLoader for TomlConfigLoader { fn set_routes(&self, routes: Option>) { self.config.lock().unwrap().routes = routes; } + + fn get_socks5_portal(&self) -> Option { + self.config.lock().unwrap().socks5_proxy.clone() + } + + fn set_socks5_portal(&self, addr: Option) { + self.config.lock().unwrap().socks5_proxy = addr; + } } #[cfg(test)] diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index a1d11a3..5e6e5e7 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -4,8 +4,6 @@ mod tests; use std::{ - backtrace, - io::Write as _, net::{Ipv4Addr, SocketAddr}, path::PathBuf, }; @@ -274,6 +272,13 @@ struct Cli { default_value = "false" )] relay_all_peer_rpc: bool, + + #[cfg(feature = "socks5")] + #[arg( + long, + help = t!("core_clap.socks5").to_string() + )] + socks5: Option, } rust_i18n::i18n!("locales"); @@ -492,6 +497,15 @@ impl From for TomlConfigLoader { )); } + #[cfg(feature = "socks5")] + if let Some(socks5_proxy) = cli.socks5 { + cfg.set_socks5_portal(Some( + format!("socks5://0.0.0.0:{}", socks5_proxy) + .parse() + .unwrap(), + )); + } + let mut f = cfg.get_flags(); if cli.default_protocol.is_some() { f.default_protocol = cli.default_protocol.as_ref().unwrap().clone(); diff --git a/easytier/src/gateway/fast_socks5/LICENSE b/easytier/src/gateway/fast_socks5/LICENSE new file mode 100644 index 0000000..943eda1 --- /dev/null +++ b/easytier/src/gateway/fast_socks5/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Jonathan Dizdarevic + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/easytier/src/gateway/fast_socks5/README.md b/easytier/src/gateway/fast_socks5/README.md new file mode 100644 index 0000000..2f4f517 --- /dev/null +++ b/easytier/src/gateway/fast_socks5/README.md @@ -0,0 +1 @@ +Code is modified from https://github.com/dizda/fast-socks5 diff --git a/easytier/src/gateway/fast_socks5/mod.rs b/easytier/src/gateway/fast_socks5/mod.rs new file mode 100644 index 0000000..f701046 --- /dev/null +++ b/easytier/src/gateway/fast_socks5/mod.rs @@ -0,0 +1,318 @@ +//! Fast SOCKS5 client/server implementation written in Rust async/.await (with tokio). +//! +//! This library is maintained by [anyip.io](https://anyip.io/) a residential and mobile socks5 proxy provider. +//! +//! ## Features +//! +//! - An `async`/`.await` [SOCKS5](https://tools.ietf.org/html/rfc1928) implementation. +//! - An `async`/`.await` [SOCKS4 Client](https://www.openssh.com/txt/socks4.protocol) implementation. +//! - An `async`/`.await` [SOCKS4a Client](https://www.openssh.com/txt/socks4a.protocol) implementation. +//! - No **unsafe** code +//! - Built on-top of `tokio` library +//! - Ultra lightweight and scalable +//! - No system dependencies +//! - Cross-platform +//! - Authentication methods: +//! - No-Auth method +//! - Username/Password auth method +//! - Custom auth methods can be implemented via the Authentication Trait +//! - Credentials returned on authentication success +//! - All SOCKS5 RFC errors (replies) should be mapped +//! - `AsyncRead + AsyncWrite` traits are implemented on Socks5Stream & Socks5Socket +//! - `IPv4`, `IPv6`, and `Domains` types are supported +//! - Config helper for Socks5Server +//! - Helpers to run a Socks5Server à la *"std's TcpStream"* via `incoming.next().await` +//! - Examples come with real cases commands scenarios +//! - Can disable `DNS resolving` +//! - Can skip the authentication/handshake process, which will directly handle command's request (useful to save useless round-trips in a current authenticated environment) +//! - Can disable command execution (useful if you just want to forward the request to a different server) +//! +//! +//! ## Install +//! +//! Open in [crates.io](https://crates.io/crates/fast-socks5). +//! +//! +//! ## Examples +//! +//! Please check [`examples`](https://github.com/dizda/fast-socks5/tree/master/examples) directory. + +#![forbid(unsafe_code)] + +pub mod server; +pub mod util; + +use anyhow::Context; +use std::fmt; +use std::io; +use thiserror::Error; +use util::target_addr::read_address; +use util::target_addr::TargetAddr; +use util::target_addr::ToTargetAddr; + +use tokio::io::AsyncReadExt; + +use tracing::error; + +use crate::read_exact; + +#[rustfmt::skip] +pub mod consts { + pub const SOCKS5_VERSION: u8 = 0x05; + + pub const SOCKS5_AUTH_METHOD_NONE: u8 = 0x00; + pub const SOCKS5_AUTH_METHOD_GSSAPI: u8 = 0x01; + pub const SOCKS5_AUTH_METHOD_PASSWORD: u8 = 0x02; + pub const SOCKS5_AUTH_METHOD_NOT_ACCEPTABLE: u8 = 0xff; + + pub const SOCKS5_CMD_TCP_CONNECT: u8 = 0x01; + pub const SOCKS5_CMD_TCP_BIND: u8 = 0x02; + pub const SOCKS5_CMD_UDP_ASSOCIATE: u8 = 0x03; + + pub const SOCKS5_ADDR_TYPE_IPV4: u8 = 0x01; + pub const SOCKS5_ADDR_TYPE_DOMAIN_NAME: u8 = 0x03; + pub const SOCKS5_ADDR_TYPE_IPV6: u8 = 0x04; + + pub const SOCKS5_REPLY_SUCCEEDED: u8 = 0x00; + pub const SOCKS5_REPLY_GENERAL_FAILURE: u8 = 0x01; + pub const SOCKS5_REPLY_CONNECTION_NOT_ALLOWED: u8 = 0x02; + pub const SOCKS5_REPLY_NETWORK_UNREACHABLE: u8 = 0x03; + pub const SOCKS5_REPLY_HOST_UNREACHABLE: u8 = 0x04; + pub const SOCKS5_REPLY_CONNECTION_REFUSED: u8 = 0x05; + pub const SOCKS5_REPLY_TTL_EXPIRED: u8 = 0x06; + pub const SOCKS5_REPLY_COMMAND_NOT_SUPPORTED: u8 = 0x07; + pub const SOCKS5_REPLY_ADDRESS_TYPE_NOT_SUPPORTED: u8 = 0x08; +} + +#[derive(Debug, PartialEq)] +pub enum Socks5Command { + TCPConnect, + TCPBind, + UDPAssociate, +} + +#[allow(dead_code)] +impl Socks5Command { + #[inline] + #[rustfmt::skip] + fn as_u8(&self) -> u8 { + match self { + Socks5Command::TCPConnect => consts::SOCKS5_CMD_TCP_CONNECT, + Socks5Command::TCPBind => consts::SOCKS5_CMD_TCP_BIND, + Socks5Command::UDPAssociate => consts::SOCKS5_CMD_UDP_ASSOCIATE, + } + } + + #[inline] + #[rustfmt::skip] + fn from_u8(code: u8) -> Option { + match code { + consts::SOCKS5_CMD_TCP_CONNECT => Some(Socks5Command::TCPConnect), + consts::SOCKS5_CMD_TCP_BIND => Some(Socks5Command::TCPBind), + consts::SOCKS5_CMD_UDP_ASSOCIATE => Some(Socks5Command::UDPAssociate), + _ => None, + } + } +} + +#[derive(Debug, PartialEq)] +pub enum AuthenticationMethod { + None, + Password { username: String, password: String }, +} + +impl AuthenticationMethod { + #[inline] + #[rustfmt::skip] + fn as_u8(&self) -> u8 { + match self { + AuthenticationMethod::None => consts::SOCKS5_AUTH_METHOD_NONE, + AuthenticationMethod::Password {..} => + consts::SOCKS5_AUTH_METHOD_PASSWORD + } + } + + #[inline] + #[rustfmt::skip] + fn from_u8(code: u8) -> Option { + match code { + consts::SOCKS5_AUTH_METHOD_NONE => Some(AuthenticationMethod::None), + consts::SOCKS5_AUTH_METHOD_PASSWORD => Some(AuthenticationMethod::Password { username: "test".to_string(), password: "test".to_string()}), + _ => None, + } + } +} + +impl fmt::Display for AuthenticationMethod { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + AuthenticationMethod::None => f.write_str("AuthenticationMethod::None"), + AuthenticationMethod::Password { .. } => f.write_str("AuthenticationMethod::Password"), + } + } +} + +//impl Vec { +// pub fn as_bytes(&self) -> &[u8] { +// self.iter().map(|l| l.as_u8()).collect() +// } +//} +// +//impl From<&[AuthenticationMethod]> for &[u8] { +// fn from(_: Vec) -> Self { +// &[0x00] +// } +//} + +#[derive(Error, Debug)] +pub enum SocksError { + #[error("i/o error: {0}")] + Io(#[from] io::Error), + #[error("the data for key `{0}` is not available")] + Redaction(String), + #[error("invalid header (expected {expected:?}, found {found:?})")] + InvalidHeader { expected: String, found: String }, + + #[error("Auth method unacceptable `{0:?}`.")] + AuthMethodUnacceptable(Vec), + #[error("Unsupported SOCKS version `{0}`.")] + UnsupportedSocksVersion(u8), + #[error("Domain exceeded max sequence length")] + ExceededMaxDomainLen(usize), + #[error("Authentication failed `{0}`")] + AuthenticationFailed(String), + #[error("Authentication rejected `{0}`")] + AuthenticationRejected(String), + + #[error("Error with reply: {0}.")] + ReplyError(#[from] ReplyError), + + #[cfg(feature = "socks4")] + #[error("Error with reply: {0}.")] + ReplySocks4Error(#[from] socks4::ReplyError), + + #[error("Argument input error: `{0}`.")] + ArgumentInputError(&'static str), + + // #[error("Other: `{0}`.")] + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +pub type Result = core::result::Result; + +/// SOCKS5 reply code +#[derive(Error, Debug, Copy, Clone)] +pub enum ReplyError { + #[error("Succeeded")] + Succeeded, + #[error("General failure")] + GeneralFailure, + #[error("Connection not allowed by ruleset")] + ConnectionNotAllowed, + #[error("Network unreachable")] + NetworkUnreachable, + #[error("Host unreachable")] + HostUnreachable, + #[error("Connection refused")] + ConnectionRefused, + #[error("Connection timeout")] + ConnectionTimeout, + #[error("TTL expired")] + TtlExpired, + #[error("Command not supported")] + CommandNotSupported, + #[error("Address type not supported")] + AddressTypeNotSupported, + // OtherReply(u8), +} + +impl ReplyError { + #[inline] + #[rustfmt::skip] + pub fn as_u8(self) -> u8 { + match self { + ReplyError::Succeeded => consts::SOCKS5_REPLY_SUCCEEDED, + ReplyError::GeneralFailure => consts::SOCKS5_REPLY_GENERAL_FAILURE, + ReplyError::ConnectionNotAllowed => consts::SOCKS5_REPLY_CONNECTION_NOT_ALLOWED, + ReplyError::NetworkUnreachable => consts::SOCKS5_REPLY_NETWORK_UNREACHABLE, + ReplyError::HostUnreachable => consts::SOCKS5_REPLY_HOST_UNREACHABLE, + ReplyError::ConnectionRefused => consts::SOCKS5_REPLY_CONNECTION_REFUSED, + ReplyError::ConnectionTimeout => consts::SOCKS5_REPLY_TTL_EXPIRED, + ReplyError::TtlExpired => consts::SOCKS5_REPLY_TTL_EXPIRED, + ReplyError::CommandNotSupported => consts::SOCKS5_REPLY_COMMAND_NOT_SUPPORTED, + ReplyError::AddressTypeNotSupported => consts::SOCKS5_REPLY_ADDRESS_TYPE_NOT_SUPPORTED, +// ReplyError::OtherReply(c) => c, + } + } + + #[inline] + #[rustfmt::skip] + pub fn from_u8(code: u8) -> ReplyError { + match code { + consts::SOCKS5_REPLY_SUCCEEDED => ReplyError::Succeeded, + consts::SOCKS5_REPLY_GENERAL_FAILURE => ReplyError::GeneralFailure, + consts::SOCKS5_REPLY_CONNECTION_NOT_ALLOWED => ReplyError::ConnectionNotAllowed, + consts::SOCKS5_REPLY_NETWORK_UNREACHABLE => ReplyError::NetworkUnreachable, + consts::SOCKS5_REPLY_HOST_UNREACHABLE => ReplyError::HostUnreachable, + consts::SOCKS5_REPLY_CONNECTION_REFUSED => ReplyError::ConnectionRefused, + consts::SOCKS5_REPLY_TTL_EXPIRED => ReplyError::TtlExpired, + consts::SOCKS5_REPLY_COMMAND_NOT_SUPPORTED => ReplyError::CommandNotSupported, + consts::SOCKS5_REPLY_ADDRESS_TYPE_NOT_SUPPORTED => ReplyError::AddressTypeNotSupported, +// _ => ReplyError::OtherReply(code), + _ => unreachable!("ReplyError code unsupported."), + } + } +} + +/// Generate UDP header +/// +/// # UDP Request header structure. +/// ```text +/// +----+------+------+----------+----------+----------+ +/// |RSV | FRAG | ATYP | DST.ADDR | DST.PORT | DATA | +/// +----+------+------+----------+----------+----------+ +/// | 2 | 1 | 1 | Variable | 2 | Variable | +/// +----+------+------+----------+----------+----------+ +/// +/// The fields in the UDP request header are: +/// +/// o RSV Reserved X'0000' +/// o FRAG Current fragment number +/// o ATYP address type of following addresses: +/// o IP V4 address: X'01' +/// o DOMAINNAME: X'03' +/// o IP V6 address: X'04' +/// o DST.ADDR desired destination address +/// o DST.PORT desired destination port +/// o DATA user data +/// ``` +pub fn new_udp_header(target_addr: T) -> Result> { + let mut header = vec![ + 0, 0, // RSV + 0, // FRAG + ]; + header.append(&mut target_addr.to_target_addr()?.to_be_bytes()?); + + Ok(header) +} + +/// Parse data from UDP client on raw buffer, return (frag, target_addr, payload). +pub async fn parse_udp_request<'a>(mut req: &'a [u8]) -> Result<(u8, TargetAddr, &'a [u8])> { + let rsv = read_exact!(req, [0u8; 2]).context("Malformed request")?; + + if !rsv.eq(&[0u8; 2]) { + return Err(ReplyError::GeneralFailure.into()); + } + + let [frag, atyp] = read_exact!(req, [0u8; 2]).context("Malformed request")?; + + let target_addr = read_address(&mut req, atyp).await.map_err(|e| { + // print explicit error + error!("{:#}", e); + // then convert it to a reply + ReplyError::AddressTypeNotSupported + })?; + + Ok((frag, target_addr, req)) +} diff --git a/easytier/src/gateway/fast_socks5/server.rs b/easytier/src/gateway/fast_socks5/server.rs new file mode 100644 index 0000000..44b7282 --- /dev/null +++ b/easytier/src/gateway/fast_socks5/server.rs @@ -0,0 +1,842 @@ +use super::new_udp_header; +use super::parse_udp_request; +use super::read_exact; +use super::util::stream::tcp_connect_with_timeout; +use super::util::target_addr::{read_address, TargetAddr}; +use super::Socks5Command; +use super::{consts, AuthenticationMethod, ReplyError, Result, SocksError}; +use anyhow::Context; +use std::io; +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::{SocketAddr, ToSocketAddrs as StdToSocketAddrs}; +use std::ops::Deref; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use tokio::io::AsyncReadExt; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::net::UdpSocket; +use tokio::try_join; + +use tracing::{debug, error, info, trace}; + +#[derive(Clone)] +pub struct Config { + /// Timeout of the command request + request_timeout: u64, + /// Avoid useless roundtrips if we don't need the Authentication layer + skip_auth: bool, + /// Enable dns-resolving + dns_resolve: bool, + /// Enable command execution + execute_command: bool, + /// Enable UDP support + allow_udp: bool, + /// For some complex scenarios, we may want to either accept Username/Password configuration + /// or IP Whitelisting, in case the client send only 1-2 auth methods (no auth) rather than 3 (with auth) + allow_no_auth: bool, + /// Contains the authentication trait to use the user against with + auth: Option>, +} + +impl Default for Config { + fn default() -> Self { + Config { + request_timeout: 10, + skip_auth: false, + dns_resolve: true, + execute_command: true, + allow_udp: false, + allow_no_auth: false, + auth: None, + } + } +} + +/// Use this trait to handle a custom authentication on your end. +#[async_trait::async_trait] +pub trait Authentication: Send + Sync { + type Item; + + async fn authenticate(&self, credentials: Option<(String, String)>) -> Option; +} + +/// Basic user/pass auth method provided. +pub struct SimpleUserPassword { + pub username: String, + pub password: String, +} + +/// The struct returned when the user has successfully authenticated +pub struct AuthSucceeded { + pub username: String, +} + +/// This is an example to auth via simple credentials. +/// If the auth succeed, we return the username authenticated with, for further uses. +#[async_trait::async_trait] +impl Authentication for SimpleUserPassword { + type Item = AuthSucceeded; + + async fn authenticate(&self, credentials: Option<(String, String)>) -> Option { + if let Some((username, password)) = credentials { + // Client has supplied credentials + if username == self.username && password == self.password { + // Some() will allow the authentication and the credentials + // will be forwarded to the socket + Some(AuthSucceeded { username }) + } else { + // Credentials incorrect, we deny the auth + None + } + } else { + // The client hasn't supplied any credentials, which only happens + // when `Config::allow_no_auth()` is set as `true` + None + } + } +} + +/// This will simply return Option::None, which denies the authentication +#[derive(Copy, Clone, Default)] +pub struct DenyAuthentication {} + +#[async_trait::async_trait] +impl Authentication for DenyAuthentication { + type Item = (); + + async fn authenticate(&self, _credentials: Option<(String, String)>) -> Option { + None + } +} + +/// While this one will always allow the user in. +#[derive(Copy, Clone, Default)] +pub struct AcceptAuthentication {} + +#[async_trait::async_trait] +impl Authentication for AcceptAuthentication { + type Item = (); + + async fn authenticate(&self, _credentials: Option<(String, String)>) -> Option { + Some(()) + } +} + +impl Config { + /// How much time it should wait until the request timeout. + pub fn set_request_timeout(&mut self, n: u64) -> &mut Self { + self.request_timeout = n; + self + } + + /// Skip the entire auth/handshake part, which means the server will directly wait for + /// the command request. + pub fn set_skip_auth(&mut self, value: bool) -> &mut Self { + self.skip_auth = value; + self.auth = None; + self + } + + /// Enable authentication + /// 'static lifetime for Authentication avoid us to use `dyn Authentication` + /// and set the Arc before calling the function. + pub fn with_authentication(self, authentication: T) -> Config { + Config { + request_timeout: self.request_timeout, + skip_auth: self.skip_auth, + dns_resolve: self.dns_resolve, + execute_command: self.execute_command, + allow_udp: self.allow_udp, + allow_no_auth: self.allow_no_auth, + auth: Some(Arc::new(authentication)), + } + } + + /// For some complex scenarios, we may want to either accept Username/Password configuration + /// or IP Whitelisting, in case the client send only 2 auth methods rather than 3 (with auth) + pub fn set_allow_no_auth(&mut self, value: bool) -> &mut Self { + self.allow_no_auth = value; + self + } + + /// Set whether or not to execute commands + pub fn set_execute_command(&mut self, value: bool) -> &mut Self { + self.execute_command = value; + self + } + + /// Will the server perform dns resolve + pub fn set_dns_resolve(&mut self, value: bool) -> &mut Self { + self.dns_resolve = value; + self + } + + /// Set whether or not to allow udp traffic + pub fn set_udp_support(&mut self, value: bool) -> &mut Self { + self.allow_udp = value; + self + } +} + +#[async_trait::async_trait] +pub trait AsyncTcpConnector { + type S: AsyncRead + AsyncWrite + Unpin + Send + Sync; + + async fn tcp_connect(&self, addr: SocketAddr, timeout_s: u64) -> Result; +} + +pub struct DefaultTcpConnector {} + +#[async_trait::async_trait] +impl AsyncTcpConnector for DefaultTcpConnector { + type S = TcpStream; + + async fn tcp_connect(&self, addr: SocketAddr, timeout_s: u64) -> Result { + tcp_connect_with_timeout(addr, timeout_s).await + } +} + +/// Wrap TcpStream and contains Socks5 protocol implementation. +pub struct Socks5Socket +{ + inner: T, + config: Arc>, + auth: AuthenticationMethod, + target_addr: Option, + cmd: Option, + /// Socket address which will be used in the reply message. + reply_ip: Option, + /// If the client has been authenticated, that's where we store his credentials + /// to be accessed from the socket + credentials: Option, + tcp_connector: C, +} + +impl + Socks5Socket +{ + pub fn new(socket: T, config: Arc>, tcp_connector: C) -> Self { + Socks5Socket { + inner: socket, + config, + auth: AuthenticationMethod::None, + target_addr: None, + cmd: None, + reply_ip: None, + credentials: None, + tcp_connector, + } + } + + /// Set the bind IP address in Socks5Reply. + /// + /// Only the inner socket owner knows the correct reply bind addr, so leave this field to be + /// populated. For those strict clients, users can use this function to set the correct IP + /// address. + /// + /// Most popular SOCKS5 clients [1] [2] ignore BND.ADDR and BND.PORT the reply of command + /// CONNECT, but this field could be useful in some other command, such as UDP ASSOCIATE. + /// + /// [1]: https://github.com/chromium/chromium/blob/bd2c7a8b65ec42d806277dd30f138a673dec233a/net/socket/socks5_client_socket.cc#L481 + /// [2]: https://github.com/curl/curl/blob/d15692ebbad5e9cfb871b0f7f51a73e43762cee2/lib/socks.c#L978 + pub fn set_reply_ip(&mut self, addr: IpAddr) { + self.reply_ip = Some(addr); + } + + /// Process clients SOCKS requests + /// This is the entry point where a whole request is processed. + pub async fn upgrade_to_socks5(mut self) -> Result> { + trace!("upgrading to socks5..."); + + // Handshake + if !self.config.skip_auth { + let methods = self.get_methods().await?; + + let auth_method = self.can_accept_method(methods).await?; + + if self.config.auth.is_some() { + let credentials = self.authenticate(auth_method).await?; + self.credentials = Some(credentials); + } + } else { + debug!("skipping auth"); + } + + match self.request().await { + Ok(_) => {} + Err(SocksError::ReplyError(e)) => { + // If a reply error has been returned, we send it to the client + self.reply_error(&e).await?; + return Err(e.into()); // propagate the error to end this connection's task + } + // if any other errors has been detected, we simply end connection's task + Err(d) => return Err(d), + }; + + Ok(self) + } + + /// Consumes the `Socks5Socket`, returning the wrapped stream. + pub fn into_inner(self) -> T { + self.inner + } + + /// Read the authentication method provided by the client. + /// A client send a list of methods that he supports, he could send + /// + /// - 0: Non auth + /// - 2: Auth with username/password + /// + /// Altogether, then the server choose to use of of these, + /// or deny the handshake (thus the connection). + /// + /// # Examples + /// ```text + /// {SOCKS Version, methods-length} + /// eg. (non-auth) {5, 2} + /// eg. (auth) {5, 3} + /// ``` + /// + async fn get_methods(&mut self) -> Result> { + trace!("Socks5Socket: get_methods()"); + // read the first 2 bytes which contains the SOCKS version and the methods len() + let [version, methods_len] = + read_exact!(self.inner, [0u8; 2]).context("Can't read methods")?; + debug!( + "Handshake headers: [version: {version}, methods len: {len}]", + version = version, + len = methods_len, + ); + + if version != consts::SOCKS5_VERSION { + return Err(SocksError::UnsupportedSocksVersion(version)); + } + + // {METHODS available from the client} + // eg. (non-auth) {0, 1} + // eg. (auth) {0, 1, 2} + let methods = read_exact!(self.inner, vec![0u8; methods_len as usize]) + .context("Can't get methods.")?; + debug!("methods supported sent by the client: {:?}", &methods); + + // Return methods available + Ok(methods) + } + + /// Decide to whether or not, accept the authentication method. + /// Don't forget that the methods list sent by the client, contains one or more methods. + /// + /// # Request + /// + /// Client send an array of 3 entries: [0, 1, 2] + /// ```text + /// {SOCKS Version, Authentication chosen} + /// eg. (non-auth) {5, 0} + /// eg. (GSSAPI) {5, 1} + /// eg. (auth) {5, 2} + /// ``` + /// + /// # Response + /// ```text + /// eg. (accept non-auth) {5, 0x00} + /// eg. (non-acceptable) {5, 0xff} + /// ``` + /// + async fn can_accept_method(&mut self, client_methods: Vec) -> Result { + let method_supported; + + if let Some(_auth) = self.config.auth.as_ref() { + if client_methods.contains(&consts::SOCKS5_AUTH_METHOD_PASSWORD) { + // can auth with password + method_supported = consts::SOCKS5_AUTH_METHOD_PASSWORD; + } else { + // client hasn't provided a password + if self.config.allow_no_auth { + // but we allow no auth, for ip whitelisting + method_supported = consts::SOCKS5_AUTH_METHOD_NONE; + } else { + // we don't allow no auth, so we deny the entry + debug!("Don't support this auth method, reply with (0xff)"); + self.inner + .write_all(&[ + consts::SOCKS5_VERSION, + consts::SOCKS5_AUTH_METHOD_NOT_ACCEPTABLE, + ]) + .await + .context("Can't reply with method not acceptable.")?; + + return Err(SocksError::AuthMethodUnacceptable(client_methods)); + } + } + } else { + method_supported = consts::SOCKS5_AUTH_METHOD_NONE; + } + + debug!( + "Reply with method {} ({})", + AuthenticationMethod::from_u8(method_supported).context("Method not supported")?, + method_supported + ); + self.inner + .write(&[consts::SOCKS5_VERSION, method_supported]) + .await + .context("Can't reply with method auth-none")?; + Ok(method_supported) + } + + async fn read_username_password(socket: &mut T) -> Result<(String, String)> { + trace!("Socks5Socket: authenticate()"); + let [version, user_len] = read_exact!(socket, [0u8; 2]).context("Can't read user len")?; + debug!( + "Auth: [version: {version}, user len: {len}]", + version = version, + len = user_len, + ); + + if user_len < 1 { + return Err(SocksError::AuthenticationFailed(format!( + "Username malformed ({} chars)", + user_len + ))); + } + + let username = + read_exact!(socket, vec![0u8; user_len as usize]).context("Can't get username.")?; + debug!("username bytes: {:?}", &username); + + let [pass_len] = read_exact!(socket, [0u8; 1]).context("Can't read pass len")?; + debug!("Auth: [pass len: {len}]", len = pass_len,); + + if pass_len < 1 { + return Err(SocksError::AuthenticationFailed(format!( + "Password malformed ({} chars)", + pass_len + ))); + } + + let password = + read_exact!(socket, vec![0u8; pass_len as usize]).context("Can't get password.")?; + debug!("password bytes: {:?}", &password); + + let username = String::from_utf8(username).context("Failed to convert username")?; + let password = String::from_utf8(password).context("Failed to convert password")?; + + Ok((username, password)) + } + + /// Only called if + /// - this server has `Authentication` trait implemented. + /// - and the client supports authentication via username/password + /// - or the client doesn't send authentication, but we let the trait decides if the `allow_no_auth()` set as `true` + async fn authenticate(&mut self, auth_method: u8) -> Result { + let credentials = if auth_method == consts::SOCKS5_AUTH_METHOD_PASSWORD { + let credentials = Self::read_username_password(&mut self.inner).await?; + Some(credentials) + } else { + // the client hasn't provided any credentials, the function auth.authenticate() + // will then check None, according to other parameters provided by the trait + // such as IP, etc. + None + }; + + let auth = self.config.auth.as_ref().context("No auth module")?; + + if let Some(credentials) = auth.authenticate(credentials).await { + if auth_method == consts::SOCKS5_AUTH_METHOD_PASSWORD { + // only the password way expect to write a response at this moment + self.inner + .write_all(&[1, consts::SOCKS5_REPLY_SUCCEEDED]) + .await + .context("Can't reply auth success")?; + } + + info!("User logged successfully."); + + return Ok(credentials); + } else { + self.inner + .write_all(&[1, consts::SOCKS5_AUTH_METHOD_NOT_ACCEPTABLE]) + .await + .context("Can't reply with auth method not acceptable.")?; + + return Err(SocksError::AuthenticationRejected(format!( + "Authentication, rejected." + ))); + } + } + + /// Wrapper to principally cover ReplyError types for both functions read & execute request. + async fn request(&mut self) -> Result<()> { + self.read_command().await?; + + if self.config.dns_resolve { + self.resolve_dns().await?; + } else { + debug!("Domain won't be resolved because `dns_resolve`'s config has been turned off.") + } + + if self.config.execute_command { + self.execute_command().await?; + } + + Ok(()) + } + + /// Reply error to the client with the reply code according to the RFC. + async fn reply_error(&mut self, error: &ReplyError) -> Result<()> { + let reply = new_reply(error, "0.0.0.0:0".parse().unwrap()); + debug!("reply error to be written: {:?}", &reply); + + self.inner + .write(&reply) + .await + .context("Can't write the reply!")?; + + self.inner.flush().await.context("Can't flush the reply!")?; + + Ok(()) + } + + /// Decide to whether or not, accept the authentication method. + /// Don't forget that the methods list sent by the client, contains one or more methods. + /// + /// # Request + /// ```text + /// +----+-----+-------+------+----------+----------+ + /// |VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT | + /// +----+-----+-------+------+----------+----------+ + /// | 1 | 1 | 1 | 1 | Variable | 2 | + /// +----+-----+-------+------+----------+----------+ + /// ``` + /// + /// It the request is correct, it should returns a ['SocketAddr']. + /// + async fn read_command(&mut self) -> Result<()> { + let [version, cmd, rsv, address_type] = + read_exact!(self.inner, [0u8; 4]).context("Malformed request")?; + debug!( + "Request: [version: {version}, command: {cmd}, rev: {rsv}, address_type: {address_type}]", + version = version, + cmd = cmd, + rsv = rsv, + address_type = address_type, + ); + + if version != consts::SOCKS5_VERSION { + return Err(SocksError::UnsupportedSocksVersion(version)); + } + + match Socks5Command::from_u8(cmd) { + None => return Err(ReplyError::CommandNotSupported.into()), + Some(cmd) => match cmd { + Socks5Command::TCPConnect => { + self.cmd = Some(cmd); + } + Socks5Command::UDPAssociate => { + if !self.config.allow_udp { + return Err(ReplyError::CommandNotSupported.into()); + } + self.cmd = Some(cmd); + } + Socks5Command::TCPBind => return Err(ReplyError::CommandNotSupported.into()), + }, + } + + // Guess address type + let target_addr = read_address(&mut self.inner, address_type) + .await + .map_err(|e| { + // print explicit error + error!("{:#}", e); + // then convert it to a reply + ReplyError::AddressTypeNotSupported + })?; + + self.target_addr = Some(target_addr); + + debug!("Request target is {}", self.target_addr.as_ref().unwrap()); + + Ok(()) + } + + /// This function is public, it can be call manually on your own-willing + /// if config flag has been turned off: `Config::dns_resolve == false`. + pub async fn resolve_dns(&mut self) -> Result<()> { + trace!("resolving dns"); + if let Some(target_addr) = self.target_addr.take() { + // decide whether we have to resolve DNS or not + self.target_addr = match target_addr { + TargetAddr::Domain(_, _) => Some(target_addr.resolve_dns().await?), + TargetAddr::Ip(_) => Some(target_addr), + }; + } + + Ok(()) + } + + /// Execute the socks5 command that the client wants. + async fn execute_command(&mut self) -> Result<()> { + match &self.cmd { + None => Err(ReplyError::CommandNotSupported.into()), + Some(cmd) => match cmd { + Socks5Command::TCPBind => Err(ReplyError::CommandNotSupported.into()), + Socks5Command::TCPConnect => return self.execute_command_connect().await, + Socks5Command::UDPAssociate => { + if self.config.allow_udp { + return self.execute_command_udp_assoc().await; + } else { + Err(ReplyError::CommandNotSupported.into()) + } + } + }, + } + } + + /// Connect to the target address that the client wants, + /// then forward the data between them (client <=> target address). + async fn execute_command_connect(&mut self) -> Result<()> { + // async-std's ToSocketAddrs doesn't supports external trait implementation + // @see https://github.com/async-rs/async-std/issues/539 + let addr = self + .target_addr + .as_ref() + .context("target_addr empty")? + .to_socket_addrs()? + .next() + .context("unreachable")?; + + // TCP connect with timeout, to avoid memory leak for connection that takes forever + let outbound = self + .tcp_connector + .tcp_connect(addr, self.config.request_timeout) + .await?; + + debug!("Connected to remote destination"); + + self.inner + .write(&new_reply( + &ReplyError::Succeeded, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + )) + .await + .context("Can't write successful reply")?; + + self.inner.flush().await.context("Can't flush the reply!")?; + + debug!("Wrote success"); + + transfer(&mut self.inner, outbound).await + } + + /// Bind to a random UDP port, wait for the traffic from + /// the client, and then forward the data to the remote addr. + async fn execute_command_udp_assoc(&mut self) -> Result<()> { + // The DST.ADDR and DST.PORT fields contain the address and port that + // the client expects to use to send UDP datagrams on for the + // association. The server MAY use this information to limit access + // to the association. + // @see Page 6, https://datatracker.ietf.org/doc/html/rfc1928. + // + // We do NOT limit the access from the client currently in this implementation. + let _not_used = self.target_addr.as_ref(); + + // Listen with UDP6 socket, so the client can connect to it with either + // IPv4 or IPv6. + let peer_sock = UdpSocket::bind("[::]:0").await?; + + // Respect the pre-populated reply IP address. + self.inner + .write(&new_reply( + &ReplyError::Succeeded, + SocketAddr::new( + self.reply_ip.context("invalid reply ip")?, + peer_sock.local_addr()?.port(), + ), + )) + .await + .context("Can't write successful reply")?; + + debug!("Wrote success"); + + transfer_udp(peer_sock).await?; + + Ok(()) + } + + pub fn target_addr(&self) -> Option<&TargetAddr> { + self.target_addr.as_ref() + } + + pub fn auth(&self) -> &AuthenticationMethod { + &self.auth + } + + pub fn cmd(&self) -> &Option { + &self.cmd + } + + /// Borrow the credentials of the user has authenticated with + pub fn get_credentials(&self) -> Option<&<::Item as Deref>::Target> + where + ::Item: Deref, + { + self.credentials.as_deref() + } + + /// Get the credentials of the user has authenticated with + pub fn take_credentials(&mut self) -> Option { + self.credentials.take() + } + + pub fn tcp_connector(&self) -> &C { + &self.tcp_connector + } +} + +/// Copy data between two peers +/// Using 2 different generators, because they could be different structs with same traits. +async fn transfer(mut inbound: I, mut outbound: O) -> Result<()> +where + I: AsyncRead + AsyncWrite + Unpin, + O: AsyncRead + AsyncWrite + Unpin, +{ + match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await { + Ok(res) => info!("transfer closed ({}, {})", res.0, res.1), + Err(err) => error!("transfer error: {:?}", err), + }; + + Ok(()) +} + +async fn handle_udp_request(inbound: &UdpSocket, outbound: &UdpSocket) -> Result<()> { + let mut buf = vec![0u8; 0x10000]; + loop { + let (size, client_addr) = inbound.recv_from(&mut buf).await?; + debug!("Server recieve udp from {}", client_addr); + inbound.connect(client_addr).await?; + + let (frag, target_addr, data) = parse_udp_request(&buf[..size]).await?; + + if frag != 0 { + debug!("Discard UDP frag packets sliently."); + return Ok(()); + } + + debug!("Server forward to packet to {}", target_addr); + let mut target_addr = target_addr + .to_socket_addrs()? + .next() + .context("unreachable")?; + + target_addr.set_ip(match target_addr.ip() { + std::net::IpAddr::V4(v4) => std::net::IpAddr::V6(v4.to_ipv6_mapped()), + v6 @ std::net::IpAddr::V6(_) => v6, + }); + outbound.send_to(data, target_addr).await?; + } +} + +async fn handle_udp_response(inbound: &UdpSocket, outbound: &UdpSocket) -> Result<()> { + let mut buf = vec![0u8; 0x10000]; + loop { + let (size, remote_addr) = outbound.recv_from(&mut buf).await?; + debug!("Recieve packet from {}", remote_addr); + + let mut data = new_udp_header(remote_addr)?; + data.extend_from_slice(&buf[..size]); + inbound.send(&data).await?; + } +} + +async fn transfer_udp(inbound: UdpSocket) -> Result<()> { + let outbound = UdpSocket::bind("[::]:0").await?; + + let req_fut = handle_udp_request(&inbound, &outbound); + let res_fut = handle_udp_response(&inbound, &outbound); + match try_join!(req_fut, res_fut) { + Ok(_) => {} + Err(error) => return Err(error), + } + + Ok(()) +} + +// Fixes the issue "cannot borrow data in dereference of `Pin<&mut >` as mutable" +// +// cf. https://users.rust-lang.org/t/take-in-impl-future-cannot-borrow-data-in-a-dereference-of-pin/52042 +impl Unpin for Socks5Socket where + T: AsyncRead + AsyncWrite + Unpin +{ +} + +/// Allow us to read directly from the struct +impl AsyncRead for Socks5Socket +where + T: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + context: &mut std::task::Context, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_read(context, buf) + } +} + +/// Allow us to write directly into the struct +impl AsyncWrite for Socks5Socket +where + T: AsyncRead + AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + context: &mut std::task::Context, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(context, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + context: &mut std::task::Context, + ) -> Poll> { + Pin::new(&mut self.inner).poll_flush(context) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + context: &mut std::task::Context, + ) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(context) + } +} + +/// Generate reply code according to the RFC. +fn new_reply(error: &ReplyError, sock_addr: SocketAddr) -> Vec { + let (addr_type, mut ip_oct, mut port) = match sock_addr { + SocketAddr::V4(sock) => ( + consts::SOCKS5_ADDR_TYPE_IPV4, + sock.ip().octets().to_vec(), + sock.port().to_be_bytes().to_vec(), + ), + SocketAddr::V6(sock) => ( + consts::SOCKS5_ADDR_TYPE_IPV6, + sock.ip().octets().to_vec(), + sock.port().to_be_bytes().to_vec(), + ), + }; + + let mut reply = vec![ + consts::SOCKS5_VERSION, + error.as_u8(), // transform the error into byte code + 0x00, // reserved + addr_type, // address type (ipv4, v6, domain) + ]; + reply.append(&mut ip_oct); + reply.append(&mut port); + + reply +} diff --git a/easytier/src/gateway/fast_socks5/util/mod.rs b/easytier/src/gateway/fast_socks5/util/mod.rs new file mode 100644 index 0000000..e1c2f62 --- /dev/null +++ b/easytier/src/gateway/fast_socks5/util/mod.rs @@ -0,0 +1,2 @@ +pub mod stream; +pub mod target_addr; diff --git a/easytier/src/gateway/fast_socks5/util/stream.rs b/easytier/src/gateway/fast_socks5/util/stream.rs new file mode 100644 index 0000000..e76b34a --- /dev/null +++ b/easytier/src/gateway/fast_socks5/util/stream.rs @@ -0,0 +1,65 @@ +use std::time::Duration; +use tokio::io::ErrorKind as IOErrorKind; +use tokio::net::{TcpStream, ToSocketAddrs}; +use tokio::time::timeout; + +use crate::gateway::fast_socks5::{ReplyError, Result}; + +/// Easy to destructure bytes buffers by naming each fields: +/// +/// # Examples (before) +/// +/// ```ignore +/// let mut buf = [0u8; 2]; +/// stream.read_exact(&mut buf).await?; +/// let [version, method_len] = buf; +/// +/// assert_eq!(version, 0x05); +/// ``` +/// +/// # Examples (after) +/// +/// ```ignore +/// let [version, method_len] = read_exact!(stream, [0u8; 2]); +/// +/// assert_eq!(version, 0x05); +/// ``` +#[macro_export] +macro_rules! read_exact { + ($stream: expr, $array: expr) => {{ + let mut x = $array; + // $stream + // .read_exact(&mut x) + // .await + // .map_err(|_| io_err("lol"))?; + $stream.read_exact(&mut x).await.map(|_| x) + }}; +} + +pub async fn tcp_connect_with_timeout(addr: T, request_timeout_s: u64) -> Result +where + T: ToSocketAddrs, +{ + let fut = tcp_connect(addr); + match timeout(Duration::from_secs(request_timeout_s), fut).await { + Ok(result) => result, + Err(_) => Err(ReplyError::ConnectionTimeout.into()), + } +} + +pub async fn tcp_connect(addr: T) -> Result +where + T: ToSocketAddrs, +{ + match TcpStream::connect(addr).await { + Ok(o) => Ok(o), + Err(e) => match e.kind() { + // Match other TCP errors with ReplyError + IOErrorKind::ConnectionRefused => Err(ReplyError::ConnectionRefused.into()), + IOErrorKind::ConnectionAborted => Err(ReplyError::ConnectionNotAllowed.into()), + IOErrorKind::ConnectionReset => Err(ReplyError::ConnectionNotAllowed.into()), + IOErrorKind::NotConnected => Err(ReplyError::NetworkUnreachable.into()), + _ => Err(e.into()), // #[error("General failure")] ? + }, + } +} diff --git a/easytier/src/gateway/fast_socks5/util/target_addr.rs b/easytier/src/gateway/fast_socks5/util/target_addr.rs new file mode 100644 index 0000000..fa2dbbf --- /dev/null +++ b/easytier/src/gateway/fast_socks5/util/target_addr.rs @@ -0,0 +1,244 @@ +use crate::gateway::fast_socks5::consts; +use crate::gateway::fast_socks5::consts::SOCKS5_ADDR_TYPE_IPV4; +use crate::gateway::fast_socks5::SocksError; +use crate::read_exact; + +use anyhow::Context; +use std::fmt; +use std::io; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::vec::IntoIter; +use thiserror::Error; +use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::net::lookup_host; + +use tracing::{debug, error}; + +/// SOCKS5 reply code +#[derive(Error, Debug)] +pub enum AddrError { + #[error("DNS Resolution failed")] + DNSResolutionFailed, + #[error("Can't read IPv4")] + IPv4Unreadable, + #[error("Can't read IPv6")] + IPv6Unreadable, + #[error("Can't read port number")] + PortNumberUnreadable, + #[error("Can't read domain len")] + DomainLenUnreadable, + #[error("Can't read Domain content")] + DomainContentUnreadable, + #[error("Malformed UTF-8")] + Utf8, + #[error("Unknown address type")] + IncorrectAddressType, + #[error("{0}")] + Custom(String), +} + +/// A description of a connection target. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum TargetAddr { + /// Connect to an IP address. + Ip(SocketAddr), + /// Connect to a fully qualified domain name. + /// + /// The domain name will be passed along to the proxy server and DNS lookup + /// will happen there. + Domain(String, u16), +} + +impl TargetAddr { + pub async fn resolve_dns(self) -> anyhow::Result { + match self { + TargetAddr::Ip(ip) => Ok(TargetAddr::Ip(ip)), + TargetAddr::Domain(domain, port) => { + debug!("Attempt to DNS resolve the domain {}...", &domain); + + let socket_addr = lookup_host((&domain[..], port)) + .await + .context(AddrError::DNSResolutionFailed)? + .next() + .ok_or(AddrError::Custom( + "Can't fetch DNS to the domain.".to_string(), + ))?; + debug!("domain name resolved to {}", socket_addr); + + // has been converted to an ip + Ok(TargetAddr::Ip(socket_addr)) + } + } + } + + pub fn is_ip(&self) -> bool { + match self { + TargetAddr::Ip(_) => true, + _ => false, + } + } + + pub fn is_domain(&self) -> bool { + !self.is_ip() + } + + pub fn to_be_bytes(&self) -> anyhow::Result> { + let mut buf = vec![]; + match self { + TargetAddr::Ip(SocketAddr::V4(addr)) => { + debug!("TargetAddr::IpV4"); + + buf.extend_from_slice(&[SOCKS5_ADDR_TYPE_IPV4]); + + debug!("addr ip {:?}", (*addr.ip()).octets()); + buf.extend_from_slice(&(addr.ip()).octets()); // ip + buf.extend_from_slice(&addr.port().to_be_bytes()); // port + } + TargetAddr::Ip(SocketAddr::V6(addr)) => { + debug!("TargetAddr::IpV6"); + buf.extend_from_slice(&[consts::SOCKS5_ADDR_TYPE_IPV6]); + + debug!("addr ip {:?}", (*addr.ip()).octets()); + buf.extend_from_slice(&(addr.ip()).octets()); // ip + buf.extend_from_slice(&addr.port().to_be_bytes()); // port + } + TargetAddr::Domain(ref domain, port) => { + debug!("TargetAddr::Domain"); + if domain.len() > u8::max_value() as usize { + return Err(SocksError::ExceededMaxDomainLen(domain.len()).into()); + } + buf.extend_from_slice(&[consts::SOCKS5_ADDR_TYPE_DOMAIN_NAME, domain.len() as u8]); + buf.extend_from_slice(domain.as_bytes()); // domain content + buf.extend_from_slice(&port.to_be_bytes()); + // port content (.to_be_bytes() convert from u16 to u8 type) + } + } + Ok(buf) + } +} + +// async-std ToSocketAddrs doesn't supports external trait implementation +// @see https://github.com/async-rs/async-std/issues/539 +impl std::net::ToSocketAddrs for TargetAddr { + type Iter = IntoIter; + + fn to_socket_addrs(&self) -> io::Result> { + match *self { + TargetAddr::Ip(addr) => Ok(vec![addr].into_iter()), + TargetAddr::Domain(_, _) => Err(io::Error::new( + io::ErrorKind::Other, + "Domain name has to be explicitly resolved, please use TargetAddr::resolve_dns().", + )), + } + } +} + +impl fmt::Display for TargetAddr { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + TargetAddr::Ip(ref addr) => write!(f, "{}", addr), + TargetAddr::Domain(ref addr, ref port) => write!(f, "{}:{}", addr, port), + } + } +} + +/// A trait for objects that can be converted to `TargetAddr`. +pub trait ToTargetAddr { + /// Converts the value of `self` to a `TargetAddr`. + fn to_target_addr(&self) -> io::Result; +} + +impl<'a> ToTargetAddr for (&'a str, u16) { + fn to_target_addr(&self) -> io::Result { + // try to parse as an IP first + if let Ok(addr) = self.0.parse::() { + return (addr, self.1).to_target_addr(); + } + + if let Ok(addr) = self.0.parse::() { + return (addr, self.1).to_target_addr(); + } + + Ok(TargetAddr::Domain(self.0.to_owned(), self.1)) + } +} + +impl ToTargetAddr for SocketAddr { + fn to_target_addr(&self) -> io::Result { + Ok(TargetAddr::Ip(*self)) + } +} + +impl ToTargetAddr for SocketAddrV4 { + fn to_target_addr(&self) -> io::Result { + SocketAddr::V4(*self).to_target_addr() + } +} + +impl ToTargetAddr for SocketAddrV6 { + fn to_target_addr(&self) -> io::Result { + SocketAddr::V6(*self).to_target_addr() + } +} + +impl ToTargetAddr for (Ipv4Addr, u16) { + fn to_target_addr(&self) -> io::Result { + SocketAddrV4::new(self.0, self.1).to_target_addr() + } +} + +impl ToTargetAddr for (Ipv6Addr, u16) { + fn to_target_addr(&self) -> io::Result { + SocketAddrV6::new(self.0, self.1, 0, 0).to_target_addr() + } +} + +#[derive(Debug)] +pub enum Addr { + V4([u8; 4]), + V6([u8; 16]), + Domain(String), // Vec<[u8]> or Box<[u8]> or String ? +} + +/// This function is used by the client & the server +pub async fn read_address( + stream: &mut T, + atyp: u8, +) -> anyhow::Result { + let addr = match atyp { + consts::SOCKS5_ADDR_TYPE_IPV4 => { + debug!("Address type `IPv4`"); + Addr::V4(read_exact!(stream, [0u8; 4]).context(AddrError::IPv4Unreadable)?) + } + consts::SOCKS5_ADDR_TYPE_IPV6 => { + debug!("Address type `IPv6`"); + Addr::V6(read_exact!(stream, [0u8; 16]).context(AddrError::IPv6Unreadable)?) + } + consts::SOCKS5_ADDR_TYPE_DOMAIN_NAME => { + debug!("Address type `domain`"); + let len = read_exact!(stream, [0]).context(AddrError::DomainLenUnreadable)?[0]; + let domain = read_exact!(stream, vec![0u8; len as usize]) + .context(AddrError::DomainContentUnreadable)?; + // make sure the bytes are correct utf8 string + let domain = String::from_utf8(domain).context(AddrError::Utf8)?; + + Addr::Domain(domain) + } + _ => return Err(anyhow::anyhow!(AddrError::IncorrectAddressType)), + }; + + // Find port number + let port = read_exact!(stream, [0u8; 2]).context(AddrError::PortNumberUnreadable)?; + // Convert (u8 * 2) into u16 + let port = (port[0] as u16) << 8 | port[1] as u16; + + // Merge ADDRESS + PORT into a TargetAddr + let addr: TargetAddr = match addr { + Addr::V4([a, b, c, d]) => (Ipv4Addr::new(a, b, c, d), port).to_target_addr()?, + Addr::V6(x) => (Ipv6Addr::from(x), port).to_target_addr()?, + Addr::Domain(domain) => TargetAddr::Domain(domain, port), + }; + + Ok(addr) +} diff --git a/easytier/src/gateway/mod.rs b/easytier/src/gateway/mod.rs index 433e0c7..72c1edd 100644 --- a/easytier/src/gateway/mod.rs +++ b/easytier/src/gateway/mod.rs @@ -9,6 +9,12 @@ pub mod tcp_proxy; #[cfg(feature = "smoltcp")] pub mod tokio_smoltcp; pub mod udp_proxy; + +#[cfg(feature = "socks5")] +pub mod fast_socks5; +#[cfg(feature = "socks5")] +pub mod socks5; + #[derive(Debug)] struct CidrSet { global_ctx: ArcGlobalCtx, diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs new file mode 100644 index 0000000..c32c545 --- /dev/null +++ b/easytier/src/gateway/socks5.rs @@ -0,0 +1,330 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + sync::Arc, + time::Duration, +}; + +use crate::{ + gateway::{ + fast_socks5::server::{ + AcceptAuthentication, AsyncTcpConnector, Config, SimpleUserPassword, Socks5Socket, + }, + tokio_smoltcp::TcpStream, + }, + tunnel::packet_def::PacketType, +}; +use anyhow::Context; +use dashmap::DashSet; +use pnet::packet::{ip::IpNextHeaderProtocols, ipv4::Ipv4Packet, tcp::TcpPacket, Packet}; +use tokio::select; +use tokio::{ + net::TcpListener, + sync::{mpsc, Mutex}, + task::JoinSet, + time::timeout, +}; + +use crate::{ + common::{error::Error, global_ctx::GlobalCtx}, + gateway::tokio_smoltcp::{channel_device, Net, NetConfig}, + peers::{peer_manager::PeerManager, PeerPacketFilter}, + tunnel::packet_def::ZCPacket, +}; + +#[derive(Debug, Eq, PartialEq, Hash, Clone)] +struct Socks5Entry { + src: SocketAddr, + dst: SocketAddr, +} + +type Socks5EntrySet = Arc>; + +struct Socks5ServerNet { + ipv4_addr: Ipv4Addr, + auth: Option, + + smoltcp_net: Arc, + forward_tasks: Arc>>, + + entries: Socks5EntrySet, +} + +impl Socks5ServerNet { + pub fn new( + ipv4_addr: Ipv4Addr, + auth: Option, + peer_manager: Arc, + packet_recv: Arc>>, + entries: Socks5EntrySet, + ) -> Self { + let mut forward_tasks = JoinSet::new(); + let mut cap = smoltcp::phy::DeviceCapabilities::default(); + cap.max_transmission_unit = 1280; + cap.medium = smoltcp::phy::Medium::Ip; + let (dev, stack_sink, mut stack_stream) = channel_device::ChannelDevice::new(cap); + + let packet_recv = packet_recv.clone(); + forward_tasks.spawn(async move { + let mut smoltcp_stack_receiver = packet_recv.lock().await; + while let Some(packet) = smoltcp_stack_receiver.recv().await { + tracing::trace!(?packet, "receive from peer send to smoltcp packet"); + if let Err(e) = stack_sink.send(Ok(packet.payload().to_vec())).await { + tracing::error!("send to smoltcp stack failed: {:?}", e); + } + } + tracing::error!("smoltcp stack sink exited"); + panic!("smoltcp stack sink exited"); + }); + + forward_tasks.spawn(async move { + while let Some(data) = stack_stream.recv().await { + tracing::trace!( + ?data, + "receive from smoltcp stack and send to peer mgr packet" + ); + let Some(ipv4) = Ipv4Packet::new(&data) else { + tracing::error!(?data, "smoltcp stack stream get non ipv4 packet"); + continue; + }; + + let dst = ipv4.get_destination(); + let packet = ZCPacket::new_with_payload(&data); + if let Err(e) = peer_manager.send_msg_ipv4(packet, dst).await { + tracing::error!("send to peer failed in smoltcp sender: {:?}", e); + } + } + tracing::error!("smoltcp stack stream exited"); + panic!("smoltcp stack stream exited"); + }); + + let interface_config = smoltcp::iface::Config::new(smoltcp::wire::HardwareAddress::Ip); + let net = Net::new( + dev, + NetConfig::new( + interface_config, + format!("{}/24", ipv4_addr).parse().unwrap(), + vec![format!("{}", ipv4_addr).parse().unwrap()], + ), + ); + + Self { + ipv4_addr, + auth, + + smoltcp_net: Arc::new(net), + forward_tasks: Arc::new(std::sync::Mutex::new(forward_tasks)), + + entries, + } + } + + fn handle_tcp_stream(&self, stream: tokio::net::TcpStream) { + let mut config = Config::::default(); + config.set_request_timeout(10); + config.set_skip_auth(false); + config.set_allow_no_auth(true); + + struct SmolTcpConnector( + Arc, + Socks5EntrySet, + std::sync::Mutex>, + ); + + #[async_trait::async_trait] + impl AsyncTcpConnector for SmolTcpConnector { + type S = TcpStream; + + async fn tcp_connect( + &self, + addr: SocketAddr, + timeout_s: u64, + ) -> crate::gateway::fast_socks5::Result { + let port = self.0.get_port(); + + let entry = Socks5Entry { + src: SocketAddr::new(self.0.get_address(), port), + dst: addr, + }; + *self.2.lock().unwrap() = Some(entry.clone()); + self.1.insert(entry); + + let remote_socket = timeout( + Duration::from_secs(timeout_s), + self.0.tcp_connect(addr, port), + ) + .await + .with_context(|| "connect to remote timeout")?; + + remote_socket.map_err(|e| super::fast_socks5::SocksError::Other(e.into())) + } + } + + impl Drop for SmolTcpConnector { + fn drop(&mut self) { + if let Some(entry) = self.2.lock().unwrap().take() { + self.1.remove(&entry); + } + } + } + + let socket = Socks5Socket::new( + stream, + Arc::new(config), + SmolTcpConnector( + self.smoltcp_net.clone(), + self.entries.clone(), + std::sync::Mutex::new(None), + ), + ); + + self.forward_tasks.lock().unwrap().spawn(async move { + match socket.upgrade_to_socks5().await { + Ok(_) => { + tracing::info!("socks5 handle success"); + } + Err(e) => { + tracing::error!("socks5 handshake failed: {:?}", e); + } + }; + }); + } +} + +pub struct Socks5Server { + global_ctx: Arc, + peer_manager: Arc, + auth: Option, + + tasks: Arc>>, + packet_sender: mpsc::Sender, + packet_recv: Arc>>, + + net: Arc>>, + entries: Socks5EntrySet, +} + +#[async_trait::async_trait] +impl PeerPacketFilter for Socks5Server { + async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option { + let hdr = packet.peer_manager_header().unwrap(); + if hdr.packet_type != PacketType::Data as u8 { + return Some(packet); + }; + + let payload_bytes = packet.payload(); + + let ipv4 = Ipv4Packet::new(payload_bytes).unwrap(); + if ipv4.get_version() != 4 || ipv4.get_next_level_protocol() != IpNextHeaderProtocols::Tcp { + return Some(packet); + } + + let tcp_packet = TcpPacket::new(ipv4.payload()).unwrap(); + let entry = Socks5Entry { + dst: SocketAddr::new(ipv4.get_source().into(), tcp_packet.get_source()), + src: SocketAddr::new(ipv4.get_destination().into(), tcp_packet.get_destination()), + }; + + if !self.entries.contains(&entry) { + return Some(packet); + } + + let _ = self.packet_sender.try_send(packet).ok(); + return None; + } +} + +impl Socks5Server { + pub fn new( + global_ctx: Arc, + peer_manager: Arc, + auth: Option, + ) -> Arc { + let (packet_sender, packet_recv) = mpsc::channel(1024); + Arc::new(Self { + global_ctx, + peer_manager, + auth, + + tasks: Arc::new(Mutex::new(JoinSet::new())), + packet_recv: Arc::new(Mutex::new(packet_recv)), + packet_sender, + + net: Arc::new(Mutex::new(None)), + entries: Arc::new(DashSet::new()), + }) + } + + async fn run_net_update_task(self: &Arc) { + let net = self.net.clone(); + let global_ctx = self.global_ctx.clone(); + let peer_manager = self.peer_manager.clone(); + let packet_recv = self.packet_recv.clone(); + let entries = self.entries.clone(); + self.tasks.lock().await.spawn(async move { + let mut prev_ipv4 = None; + loop { + let mut event_recv = global_ctx.subscribe(); + + let cur_ipv4 = global_ctx.get_ipv4(); + if prev_ipv4 != cur_ipv4 { + prev_ipv4 = cur_ipv4; + entries.clear(); + + if cur_ipv4.is_none() { + let _ = net.lock().await.take(); + } else { + net.lock().await.replace(Socks5ServerNet::new( + cur_ipv4.unwrap(), + None, + peer_manager.clone(), + packet_recv.clone(), + entries.clone(), + )); + } + } + + select! { + _ = event_recv.recv() => {} + _ = tokio::time::sleep(Duration::from_secs(120)) => {} + } + } + }); + } + + pub async fn run(self: &Arc) -> Result<(), Error> { + let Some(proxy_url) = self.global_ctx.config.get_socks5_portal() else { + return Ok(()); + }; + + let bind_addr = format!( + "{}:{}", + proxy_url.host_str().unwrap(), + proxy_url.port().unwrap() + ); + + let listener = TcpListener::bind(bind_addr.parse::().unwrap()).await?; + + self.peer_manager + .add_packet_process_pipeline(Box::new(self.clone())) + .await; + + self.run_net_update_task().await; + + let net = self.net.clone(); + self.tasks.lock().await.spawn(async move { + loop { + match listener.accept().await { + Ok((socket, _addr)) => { + tracing::info!("accept a new connection, {:?}", socket); + if let Some(net) = net.lock().await.as_ref() { + net.handle_tcp_stream(socket); + } + } + Err(err) => tracing::error!("accept error = {:?}", err), + } + } + }); + + Ok(()) + } +} diff --git a/easytier/src/gateway/tokio_smoltcp/mod.rs b/easytier/src/gateway/tokio_smoltcp/mod.rs index 5de00ed..58805d0 100644 --- a/easytier/src/gateway/tokio_smoltcp/mod.rs +++ b/easytier/src/gateway/tokio_smoltcp/mod.rs @@ -4,7 +4,7 @@ use std::{ io, - net::{Ipv4Addr, Ipv6Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, sync::{ atomic::{AtomicU16, Ordering}, Arc, @@ -134,7 +134,10 @@ impl Net { fut, ) } - fn get_port(&self) -> u16 { + pub fn get_address(&self) -> IpAddr { + self.ip_addr.address().into() + } + pub fn get_port(&self) -> u16 { self.from_port .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { Some(if x > 60000 { 10000 } else { x + 1 }) @@ -147,10 +150,10 @@ impl Net { TcpListener::new(self.reactor.clone(), addr.into()).await } /// Opens a TCP connection to a remote host. - pub async fn tcp_connect(&self, addr: SocketAddr) -> io::Result { + pub async fn tcp_connect(&self, addr: SocketAddr, local_port: u16) -> io::Result { TcpStream::connect( self.reactor.clone(), - (self.ip_addr.address(), self.get_port()).into(), + (self.ip_addr.address(), local_port).into(), addr.into(), ) .await diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index e6b35bd..a17d1d2 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -32,6 +32,9 @@ use crate::vpn_portal::{self, VpnPortal}; use super::listeners::ListenerManager; +#[cfg(feature = "socks5")] +use crate::gateway::socks5::Socks5Server; + #[derive(Clone)] struct IpProxy { tcp_proxy: Arc, @@ -116,6 +119,9 @@ pub struct Instance { vpn_portal: Arc>>, + #[cfg(feature = "socks5")] + socks5_server: Arc, + global_ctx: ArcGlobalCtx, } @@ -161,6 +167,9 @@ impl Instance { #[cfg(not(feature = "wireguard"))] let vpn_portal_inst = vpn_portal::NullVpnPortal; + #[cfg(feature = "socks5")] + let socks5_server = Socks5Server::new(global_ctx.clone(), peer_manager.clone(), None); + Instance { inst_name: global_ctx.inst_name.clone(), id, @@ -181,6 +190,9 @@ impl Instance { vpn_portal: Arc::new(Mutex::new(Box::new(vpn_portal_inst))), + #[cfg(feature = "socks5")] + socks5_server, + global_ctx, } } @@ -387,6 +399,9 @@ impl Instance { self.run_vpn_portal().await?; } + #[cfg(feature = "socks5")] + self.socks5_server.run().await?; + Ok(()) }