Update On Fri Sep 20 20:34:48 CEST 2024

This commit is contained in:
github-action[bot]
2024-09-20 20:34:48 +02:00
parent 0bc2f74993
commit 94e24d57cc
91 changed files with 1699 additions and 917 deletions

View File

@@ -24,7 +24,7 @@ use std::{
))]
use futures::future;
use futures::ready;
use pin_project::pin_project;
#[cfg(any(
target_os = "linux",
target_os = "android",
@@ -86,9 +86,7 @@ fn make_mtu_error(packet_size: usize, mtu: usize) -> io::Error {
/// Wrappers for outbound `UdpSocket`
#[derive(Debug)]
#[pin_project]
pub struct UdpSocket {
#[pin]
socket: tokio::net::UdpSocket,
mtu: Option<usize>,
}

View File

@@ -0,0 +1,209 @@
use std::{
future::Future,
io,
net::SocketAddr,
ops::Deref,
pin::Pin,
task::{Context, Poll},
};
use futures::ready;
use pin_project::pin_project;
use tokio::io::ReadBuf;
use crate::net::UdpSocket;
/// A socket I/O object that can transport datagram
pub trait DatagramSocket {
/// Local binded address
fn local_addr(&self) -> io::Result<SocketAddr>;
}
/// A socket I/O object that can receive datagram
pub trait DatagramReceive {
/// `recv` data into `buf`
fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>>;
/// `recv` data into `buf` with source address
fn poll_recv_from(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<SocketAddr>>;
/// Check if the underlying I/O object is ready for `recv`
fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}
/// A socket I/O object that can send datagram
pub trait DatagramSend {
/// `send` data with `buf`, returning the sent bytes
fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>;
/// `send` data with `buf` to `target`, returning the sent bytes
fn poll_send_to(&self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr) -> Poll<io::Result<usize>>;
/// Check if the underlying I/O object is ready for `send`
fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}
impl DatagramSocket for UdpSocket {
fn local_addr(&self) -> io::Result<SocketAddr> {
self.deref().local_addr()
}
}
impl DatagramReceive for UdpSocket {
fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
UdpSocket::poll_recv(self, cx, buf)
}
fn poll_recv_from(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<SocketAddr>> {
UdpSocket::poll_recv_from(self, cx, buf)
}
fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.deref().poll_recv_ready(cx)
}
}
impl DatagramSend for UdpSocket {
fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
UdpSocket::poll_send(self, cx, buf)
}
fn poll_send_to(&self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr) -> Poll<io::Result<usize>> {
UdpSocket::poll_send_to(self, cx, buf, target)
}
fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.deref().poll_send_ready(cx)
}
}
/// Future for `recv`
#[pin_project]
pub struct RecvFut<'a, S: DatagramReceive + ?Sized> {
#[pin]
io: &'a S,
buf: &'a mut [u8],
}
impl<'a, S: DatagramReceive + ?Sized> Future for RecvFut<'a, S> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut read_buf = ReadBuf::new(this.buf);
ready!(this.io.poll_recv(cx, &mut read_buf))?;
Ok(read_buf.filled().len()).into()
}
}
/// Future for `recv_from`
#[pin_project]
pub struct RecvFromFut<'a, S: DatagramReceive + ?Sized> {
#[pin]
io: &'a S,
buf: &'a mut [u8],
}
impl<'a, S: DatagramReceive + ?Sized> Future for RecvFromFut<'a, S> {
type Output = io::Result<(usize, SocketAddr)>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut read_buf = ReadBuf::new(this.buf);
let src_addr = ready!(this.io.poll_recv_from(cx, &mut read_buf))?;
Ok((read_buf.filled().len(), src_addr)).into()
}
}
/// Future for `recv_ready`
pub struct RecvReadyFut<'a, S: DatagramReceive + ?Sized> {
io: &'a S,
}
impl<'a, S: DatagramReceive + ?Sized> Future for RecvReadyFut<'a, S> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.io.poll_recv_ready(cx)
}
}
/// Future for `send`
pub struct SendFut<'a, S: DatagramSend + ?Sized> {
io: &'a S,
buf: &'a [u8],
}
impl<'a, S: DatagramSend + ?Sized> Future for SendFut<'a, S> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.io.poll_send(cx, self.buf)
}
}
/// Future for `send_to`
pub struct SendToFut<'a, S: DatagramSend + ?Sized> {
io: &'a S,
target: SocketAddr,
buf: &'a [u8],
}
impl<'a, S: DatagramSend + ?Sized> Future for SendToFut<'a, S> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.io.poll_send_to(cx, self.buf, self.target)
}
}
/// Future for `recv_ready`
pub struct SendReadyFut<'a, S: DatagramSend + ?Sized> {
io: &'a S,
}
impl<'a, S: DatagramSend + ?Sized> Future for SendReadyFut<'a, S> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.io.poll_send_ready(cx)
}
}
/// Extension methods for `DatagramReceive`
pub trait DatagramReceiveExt: DatagramReceive {
/// Async method for `poll_recv`
fn recv<'a, 'b>(&'a self, buf: &'a mut [u8]) -> RecvFut<'a, Self> {
RecvFut { io: self, buf }
}
/// Async method for `poll_recv_from`
fn recv_from<'a, 'b>(&'a self, buf: &'a mut [u8]) -> RecvFromFut<'a, Self> {
RecvFromFut { io: self, buf }
}
/// Async method for `poll_recv_ready`
fn recv_ready<'a>(&'a self) -> RecvReadyFut<'a, Self> {
RecvReadyFut { io: self }
}
}
impl<S: DatagramReceive> DatagramReceiveExt for S {}
/// Extension methods for `DatagramSend`
pub trait DatagramSendExt: DatagramSend {
/// Async method for `poll_send`
fn send<'a>(&'a self, buf: &'a [u8]) -> SendFut<'a, Self> {
SendFut { io: self, buf }
}
/// Async method for `poll_send_to`
fn send_to<'a>(&'a self, buf: &'a [u8], target: SocketAddr) -> SendToFut<'a, Self> {
SendToFut { io: self, target, buf }
}
/// Async method for `poll_send_ready`
fn send_ready<'a>(&'a self) -> SendReadyFut<'a, Self> {
SendReadyFut { io: self }
}
}
impl<S: DatagramSend> DatagramSendExt for S {}

View File

@@ -50,10 +50,12 @@
use std::time::Duration;
pub use self::proxy_socket::ProxySocket;
pub use compat::{DatagramReceive, DatagramReceiveExt, DatagramSend, DatagramSendExt, DatagramSocket};
mod aead;
#[cfg(feature = "aead-cipher-2022")]
mod aead_2022;
mod compat;
pub mod crypto_io;
pub mod options;
pub mod proxy_socket;

View File

@@ -12,7 +12,7 @@ use byte_string::ByteStr;
use bytes::{Bytes, BytesMut};
use log::{info, trace, warn};
use once_cell::sync::Lazy;
use tokio::{io::ReadBuf, net::ToSocketAddrs, time};
use tokio::{io::ReadBuf, time};
use crate::{
config::{ServerAddr, ServerConfig, ServerUserManager},
@@ -22,9 +22,12 @@ use crate::{
relay::{socks5::Address, udprelay::options::UdpSocketControlData},
};
use super::crypto_io::{
decrypt_client_payload, decrypt_server_payload, encrypt_client_payload, encrypt_server_payload, ProtocolError,
ProtocolResult,
use super::{
compat::{DatagramReceive, DatagramReceiveExt, DatagramSend, DatagramSendExt, DatagramSocket},
crypto_io::{
decrypt_client_payload, decrypt_server_payload, encrypt_client_payload, encrypt_server_payload, ProtocolError,
ProtocolResult,
},
};
#[cfg(unix)]
@@ -70,9 +73,9 @@ pub type ProxySocketResult<T> = Result<T, ProxySocketError>;
/// UDP client for communicating with ShadowSocks' server
#[derive(Debug)]
pub struct ProxySocket {
pub struct ProxySocket<S> {
socket_type: UdpSocketType,
socket: ShadowUdpSocket,
io: S,
method: CipherKind,
key: Box<[u8]>,
send_timeout: Option<Duration>,
@@ -82,9 +85,12 @@ pub struct ProxySocket {
user_manager: Option<Arc<ServerUserManager>>,
}
impl ProxySocket {
impl ProxySocket<ShadowUdpSocket> {
/// Create a client to communicate with Shadowsocks' UDP server (outbound)
pub async fn connect(context: SharedContext, svr_cfg: &ServerConfig) -> ProxySocketResult<ProxySocket> {
pub async fn connect(
context: SharedContext,
svr_cfg: &ServerConfig,
) -> ProxySocketResult<ProxySocket<ShadowUdpSocket>> {
ProxySocket::connect_with_opts(context, svr_cfg, &DEFAULT_CONNECT_OPTS)
.await
.map_err(Into::into)
@@ -95,7 +101,7 @@ impl ProxySocket {
context: SharedContext,
svr_cfg: &ServerConfig,
opts: &ConnectOpts,
) -> ProxySocketResult<ProxySocket> {
) -> ProxySocketResult<ProxySocket<ShadowUdpSocket>> {
// Note: Plugins doesn't support UDP relay
let socket = ShadowUdpSocket::connect_server_with_opts(&context, svr_cfg.udp_external_addr(), opts).await?;
@@ -115,42 +121,11 @@ impl ProxySocket {
))
}
/// Create a `ProxySocket` from a `UdpSocket`
pub fn from_socket<S>(
socket_type: UdpSocketType,
/// Create a `ProxySocket` binding to a specific address (inbound)
pub async fn bind(
context: SharedContext,
svr_cfg: &ServerConfig,
socket: S,
) -> ProxySocket
where
S: Into<ShadowUdpSocket>,
{
let key = svr_cfg.key().to_vec().into_boxed_slice();
let method = svr_cfg.method();
// NOTE: svr_cfg.timeout() is not for this socket, but for associations.
ProxySocket {
socket_type,
socket: socket.into(),
method,
key,
send_timeout: None,
recv_timeout: None,
context,
identity_keys: match socket_type {
UdpSocketType::Client => svr_cfg.clone_identity_keys(),
UdpSocketType::Server => Arc::new(Vec::new()),
},
user_manager: match socket_type {
UdpSocketType::Client => None,
UdpSocketType::Server => svr_cfg.clone_user_manager(),
},
}
}
/// Create a `ProxySocket` binding to a specific address (inbound)
pub async fn bind(context: SharedContext, svr_cfg: &ServerConfig) -> ProxySocketResult<ProxySocket> {
) -> ProxySocketResult<ProxySocket<ShadowUdpSocket>> {
ProxySocket::bind_with_opts(context, svr_cfg, AcceptOpts::default())
.await
.map_err(Into::into)
@@ -161,7 +136,7 @@ impl ProxySocket {
context: SharedContext,
svr_cfg: &ServerConfig,
opts: AcceptOpts,
) -> ProxySocketResult<ProxySocket> {
) -> ProxySocketResult<ProxySocket<ShadowUdpSocket>> {
// Plugins doesn't support UDP
let socket = match svr_cfg.udp_external_addr() {
ServerAddr::SocketAddr(sa) => ShadowUdpSocket::listen_with_opts(sa, opts).await?,
@@ -179,7 +154,54 @@ impl ProxySocket {
socket,
))
}
}
impl<S> ProxySocket<S> {
/// Create a `ProxySocket` from a I/O object that impls `DatagramTransport`
pub fn from_socket(
socket_type: UdpSocketType,
context: SharedContext,
svr_cfg: &ServerConfig,
socket: S,
) -> ProxySocket<S> {
let key = svr_cfg.key().to_vec().into_boxed_slice();
let method = svr_cfg.method();
// NOTE: svr_cfg.timeout() is not for this socket, but for associations.
ProxySocket {
socket_type,
io: socket,
method,
key,
send_timeout: None,
recv_timeout: None,
context,
identity_keys: match socket_type {
UdpSocketType::Client => svr_cfg.clone_identity_keys(),
UdpSocketType::Server => Arc::new(Vec::new()),
},
user_manager: match socket_type {
UdpSocketType::Client => None,
UdpSocketType::Server => svr_cfg.clone_user_manager(),
},
}
}
/// Set `send` timeout, `None` will clear timeout
pub fn set_send_timeout(&mut self, t: Option<Duration>) {
self.send_timeout = t;
}
/// Set `recv` timeout, `None` will clear timeout
pub fn set_recv_timeout(&mut self, t: Option<Duration>) {
self.recv_timeout = t;
}
}
impl<S> ProxySocket<S>
where
S: DatagramSend,
{
fn encrypt_send_buffer(
&self,
addr: &Address,
@@ -241,8 +263,8 @@ impl ProxySocket {
);
let send_len = match self.send_timeout {
None => self.socket.send(&send_buf).await?,
Some(d) => match time::timeout(d, self.socket.send(&send_buf)).await {
None => self.io.send(&send_buf).await?,
Some(d) => match time::timeout(d, self.io.send(&send_buf)).await {
Ok(Ok(l)) => l,
Ok(Err(err)) => return Err(err.into()),
Err(..) => return Err(io::Error::from(ErrorKind::TimedOut).into()),
@@ -295,7 +317,7 @@ impl ProxySocket {
let n_send_buf = send_buf.len();
match self.socket.poll_send(cx, &send_buf).map_err(|x| x.into()) {
match self.io.poll_send(cx, &send_buf).map_err(|x| x.into()) {
Poll::Ready(Ok(l)) => {
if l == n_send_buf {
Poll::Ready(Ok(payload.len()))
@@ -340,14 +362,14 @@ impl ProxySocket {
self.encrypt_send_buffer(addr, control, &self.identity_keys, payload, &mut send_buf)?;
info!(
"UDP server client send to {}, payload length {} bytes, packet length {} bytes",
"UDP server client poll_send_to to {}, payload length {} bytes, packet length {} bytes",
target,
payload.len(),
send_buf.len()
);
let n_send_buf = send_buf.len();
match self.socket.poll_send_to(cx, &send_buf, target).map_err(|x| x.into()) {
match self.io.poll_send_to(cx, &send_buf, target).map_err(|x| x.into()) {
Poll::Ready(Ok(l)) => {
if l == n_send_buf {
Poll::Ready(Ok(payload.len()))
@@ -363,25 +385,20 @@ impl ProxySocket {
///
/// Check if socket is ready to `send`, or writable.
pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<ProxySocketResult<()>> {
self.socket.poll_send_ready(cx).map_err(|x| x.into())
self.io.poll_send_ready(cx).map_err(|x| x.into())
}
/// Send a UDP packet to target through proxy `target`
pub async fn send_to<A: ToSocketAddrs>(
&self,
target: A,
addr: &Address,
payload: &[u8],
) -> ProxySocketResult<usize> {
pub async fn send_to(&self, target: SocketAddr, addr: &Address, payload: &[u8]) -> ProxySocketResult<usize> {
self.send_to_with_ctrl(target, addr, &DEFAULT_SOCKET_CONTROL, payload)
.await
.map_err(Into::into)
}
/// Send a UDP packet to target through proxy `target`
pub async fn send_to_with_ctrl<A: ToSocketAddrs>(
pub async fn send_to_with_ctrl(
&self,
target: A,
target: SocketAddr,
addr: &Address,
control: &UdpSocketControlData,
payload: &[u8],
@@ -390,7 +407,7 @@ impl ProxySocket {
self.encrypt_send_buffer(addr, control, &self.identity_keys, payload, &mut send_buf)?;
trace!(
"UDP server client send to, addr {}, control: {:?}, payload length {} bytes, packet length {} bytes",
"UDP server client send_to to, addr {}, control: {:?}, payload length {} bytes, packet length {} bytes",
addr,
control,
payload.len(),
@@ -398,8 +415,8 @@ impl ProxySocket {
);
let send_len = match self.send_timeout {
None => self.socket.send_to(&send_buf, target).await?,
Some(d) => match time::timeout(d, self.socket.send_to(&send_buf, target)).await {
None => self.io.send_to(&send_buf, target).await?,
Some(d) => match time::timeout(d, self.io.send_to(&send_buf, target)).await {
Ok(Ok(l)) => l,
Ok(Err(err)) => return Err(err.into()),
Err(..) => return Err(io::Error::from(ErrorKind::TimedOut).into()),
@@ -408,7 +425,7 @@ impl ProxySocket {
if send_buf.len() != send_len {
warn!(
"UDP server client send {} bytes, but actually sent {} bytes",
"UDP server client send_to {} bytes, but actually sent {} bytes",
send_buf.len(),
send_len
);
@@ -416,7 +433,12 @@ impl ProxySocket {
Ok(send_len)
}
}
impl<S> ProxySocket<S>
where
S: DatagramReceive,
{
fn decrypt_recv_buffer(
&self,
recv_buf: &mut [u8],
@@ -448,10 +470,9 @@ impl ProxySocket {
&self,
recv_buf: &mut [u8],
) -> ProxySocketResult<(usize, Address, usize, Option<UdpSocketControlData>)> {
// Waiting for response from server SERVER -> CLIENT
let recv_n = match self.recv_timeout {
None => self.socket.recv(recv_buf).await?,
Some(d) => match time::timeout(d, self.socket.recv(recv_buf)).await {
None => self.io.recv(recv_buf).await?,
Some(d) => match time::timeout(d, self.io.recv(recv_buf)).await {
Ok(Ok(l)) => l,
Ok(Err(err)) => return Err(err.into()),
Err(..) => return Err(io::Error::from(ErrorKind::TimedOut).into()),
@@ -498,8 +519,8 @@ impl ProxySocket {
) -> ProxySocketResult<(usize, SocketAddr, Address, usize, Option<UdpSocketControlData>)> {
// Waiting for response from server SERVER -> CLIENT
let (recv_n, target_addr) = match self.recv_timeout {
None => self.socket.recv_from(recv_buf).await?,
Some(d) => match time::timeout(d, self.socket.recv_from(recv_buf)).await {
None => self.io.recv_from(recv_buf).await?,
Some(d) => match time::timeout(d, self.io.recv_from(recv_buf)).await {
Ok(Ok(l)) => l,
Ok(Err(err)) => return Err(err.into()),
Err(..) => return Err(io::Error::from(ErrorKind::TimedOut).into()),
@@ -542,7 +563,7 @@ impl ProxySocket {
cx: &mut Context<'_>,
recv_buf: &mut ReadBuf,
) -> Poll<ProxySocketResult<(usize, Address, usize, Option<UdpSocketControlData>)>> {
ready!(self.socket.poll_recv(cx, recv_buf))?;
ready!(self.io.poll_recv(cx, recv_buf))?;
let n_recv = recv_buf.filled().len();
@@ -570,7 +591,7 @@ impl ProxySocket {
cx: &mut Context<'_>,
recv_buf: &mut ReadBuf,
) -> Poll<ProxySocketResult<(usize, SocketAddr, Address, usize, Option<UdpSocketControlData>)>> {
let src = ready!(self.socket.poll_recv_from(cx, recv_buf))?;
let src = ready!(self.io.poll_recv_from(cx, recv_buf))?;
let n_recv = recv_buf.filled().len();
match self.decrypt_recv_buffer(recv_buf.filled_mut(), self.user_manager.as_deref()) {
@@ -581,29 +602,27 @@ impl ProxySocket {
/// poll family functions
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<ProxySocketResult<()>> {
self.socket.poll_recv_ready(cx).map_err(|x| x.into())
self.io.poll_recv_ready(cx).map_err(|x| x.into())
}
}
impl<S> ProxySocket<S>
where
S: DatagramSocket,
{
/// Get local addr of socket
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.socket.local_addr()
}
/// Set `send` timeout, `None` will clear timeout
pub fn set_send_timeout(&mut self, t: Option<Duration>) {
self.send_timeout = t;
}
/// Set `recv` timeout, `None` will clear timeout
pub fn set_recv_timeout(&mut self, t: Option<Duration>) {
self.recv_timeout = t;
self.io.local_addr()
}
}
#[cfg(unix)]
impl AsRawFd for ProxySocket {
impl<S> AsRawFd for ProxySocket<S>
where
S: AsRawFd,
{
/// Retrieve raw fd of the outbound socket
fn as_raw_fd(&self) -> RawFd {
self.socket.as_raw_fd()
self.io.as_raw_fd()
}
}