diff --git a/cmonitor/client/ClientSignInTransfer.cs b/cmonitor/client/ClientSignInTransfer.cs index de0da6d8..587b9a53 100644 --- a/cmonitor/client/ClientSignInTransfer.cs +++ b/cmonitor/client/ClientSignInTransfer.cs @@ -137,14 +137,8 @@ namespace cmonitor.client { Socket socket = new Socket(remote.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); socket.KeepAlive(); - IAsyncResult result = socket.BeginConnect(remote, null, null); - await Task.Delay(500); - if (result.IsCompleted == false) - { - socket.SafeClose(); - return false; - } - clientSignInState.Connection = await tcpServer.BindReceive(socket); + await socket.ConnectAsync(remote).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false); + clientSignInState.Connection = await tcpServer.BeginReceive(socket); return true; } private async Task SignIn2Server() diff --git a/cmonitor/client/tunnel/ITunnelConnection.cs b/cmonitor/client/tunnel/ITunnelConnection.cs index bc48e0b9..6474e572 100644 --- a/cmonitor/client/tunnel/ITunnelConnection.cs +++ b/cmonitor/client/tunnel/ITunnelConnection.cs @@ -1,21 +1,24 @@ using common.libs; using common.libs.extends; +using System.Buffers; +using System.IO.Pipelines; using System.Net; using System.Net.Security; -using System.Net.Sockets; using System.Text.Json.Serialization; namespace cmonitor.client.tunnel { - public delegate Task TunnelReceivceCallback(ITunnelConnection connection, Memory data, object state); - public delegate Task TunnelCloseCallback(ITunnelConnection connection, object state); - public enum TunnelProtocolType : byte { Tcp = 1, Udp = 2, Quic = 4, } + public enum TunnelMode : byte + { + Client = 0, + Server = 1, + } public enum TunnelType : byte { P2P = 0, @@ -27,9 +30,9 @@ namespace cmonitor.client.tunnel Reverse = 1 } - public interface ITunnelConnectionCallback + public interface ITunnelConnectionReceiveCallback { - public Task Receive(ITunnelConnection connection, Memory data, object state); + public Task Receive(ITunnelConnection connection, ReadOnlyMemory data, object state); public Task Closed(ITunnelConnection connection, object state); } @@ -39,6 +42,7 @@ namespace cmonitor.client.tunnel public string TransactionId { get; } public string TransportName { get; } public string Label { get; } + public TunnelMode Mode { get; } public TunnelType Type { get; } public TunnelProtocolType ProtocolType { get; } public TunnelDirection Direction { get; } @@ -46,8 +50,8 @@ namespace cmonitor.client.tunnel public bool Connected { get; } - public Task SendAsync(Memory data, CancellationToken cancellationToken = default); - public void BeginReceive(ITunnelConnectionCallback callback, object userToken); + public Task SendAsync(ReadOnlyMemory data, CancellationToken cancellationToken = default); + public void BeginReceive(ITunnelConnectionReceiveCallback callback, object userToken, bool byFrame = true); public void Close(); @@ -63,136 +67,167 @@ namespace cmonitor.client.tunnel public string TransportName { get; init; } public string Label { get; init; } - + public TunnelMode Mode { get; init; } public TunnelProtocolType ProtocolType { get; init; } public TunnelType Type { get; init; } public TunnelDirection Direction { get; init; } - public IPEndPoint IPEndPoint => (Socket?.RemoteEndPoint ?? new IPEndPoint(IPAddress.Any, 0)) as IPEndPoint; + public IPEndPoint IPEndPoint { get; init; } - public bool Connected => Socket != null && Socket.Connected; + public bool Connected => Socket != null && Socket.CanWrite; [JsonIgnore] - public Socket Socket { get; init; } + public SslStream Socket { get; init; } - private ITunnelConnectionCallback callback; + private ITunnelConnectionReceiveCallback callback; + private CancellationTokenSource cancellationTokenSource; + private object userToken; + private bool byFrame; + private Pipe pipe; + private ReceiveDataBuffer bufferCache = new ReceiveDataBuffer(); - public void BeginReceive(ITunnelConnectionCallback callback, object userToken) + /// + /// 开始接收数据 + /// + /// 数据回调 + /// 自定义数据 + /// 是否处理粘包,true时,请在首部4字节标注数据长度 + public void BeginReceive(ITunnelConnectionReceiveCallback callback, object userToken, bool byFrame = true) { if (this.callback != null) return; this.callback = callback; - SocketAsyncEventArgs readEventArgs = new SocketAsyncEventArgs - { - UserToken = userToken, - SocketFlags = SocketFlags.None, - }; - readEventArgs.SetBuffer(new byte[8 * 1024], 0, 8 * 1024); - readEventArgs.Completed += IO_Completed; - if (Socket.ReceiveAsync(readEventArgs) == false) - { - ProcessReceiveTarget(readEventArgs); - } + this.userToken = userToken; + this.byFrame = byFrame; + cancellationTokenSource = new CancellationTokenSource(); + pipe = new Pipe(); + + _ = ProcessWrite(); + _ = ProcessReader(); } - private void IO_Completed(object sender, SocketAsyncEventArgs e) - { - switch (e.LastOperation) - { - case SocketAsyncOperation.Receive: - ProcessReceiveTarget(e); - break; - default: - break; - } - } - private async void ProcessReceiveTarget(SocketAsyncEventArgs e) + private async Task ProcessWrite() { + var writer = pipe.Writer; try { - if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) + while (cancellationTokenSource.IsCancellationRequested == false) { - int offset = e.Offset; - int length = e.BytesTransferred; - - try + Memory buffer = writer.GetMemory(8 * 1024); + int length = await Socket.ReadAsync(buffer, cancellationTokenSource.Token); + if (length == 0) { - await callback.Receive(this, e.Buffer.AsMemory(offset, length), e.UserToken).ConfigureAwait(false); + Cancel(); + break; } - catch (Exception) - { - } - - if (Socket.Available > 0) - { - while (Socket.Available > 0) - { - length = await Socket.ReceiveAsync(e.Buffer.AsMemory(), SocketFlags.None); - if (length > 0) - { - try - { - await callback.Receive(this, e.Buffer.AsMemory(offset, length), e.UserToken).ConfigureAwait(false); - } - catch (Exception) - { - } - } - else - { - CloseClientSocket(e); - return; - } - } - } - - if (Socket.Connected == false) - { - CloseClientSocket(e); - return; - } - - if (Socket.ReceiveAsync(e) == false) - { - ProcessReceiveTarget(e); - } - } - else - { - CloseClientSocket(e); + writer.Advance(length); + await writer.FlushAsync(); } } catch (Exception ex) { if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) { - Logger.Instance.Error(this.ToString()); Logger.Instance.Error(ex); } - - CloseClientSocket(e); } - } - private void CloseClientSocket(SocketAsyncEventArgs e) - { - if (callback != null) + finally { - callback.Closed(this, e.UserToken); - e.Dispose(); - Close(); + writer.Complete(); } } - - public async Task SendAsync(Memory data, CancellationToken cancellationToken = default) + private async Task ProcessReader() { - await Socket.SendAsync(data, SocketFlags.None, cancellationToken); + PipeReader reader = pipe.Reader; + try + { + while (cancellationTokenSource.IsCancellationRequested == false) + { + ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false); + ReadOnlySequence buffer = readResult.Buffer; + if (buffer.Length == 0) + { + Cancel(); + break; + } + SequencePosition end = await ReadPacket(buffer).ConfigureAwait(false); + reader.AdvanceTo(end); + } + } + catch (Exception ex) + { + if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + Logger.Instance.Error(ex); + } + } + finally + { + bufferCache.Clear(true); + reader.Complete(); + } + } + private unsafe int ReaderHead(ReadOnlySequence buffer) + { + Span span = stackalloc byte[4]; + buffer.Slice(0, 4).CopyTo(span); + return span.ToInt32(); + } + private async Task ReadPacket(ReadOnlySequence buffer) + { + if (byFrame == false) + { + SequencePosition position = buffer.Start; + if (buffer.TryGet(ref position, out ReadOnlyMemory memory)) + { + await callback.Receive(this, memory, this.userToken).ConfigureAwait(false); + } + return buffer.End; + } + //粘包 + while (buffer.Length > 4) + { + int length = ReaderHead(buffer); + if (buffer.Length < length + 4) + { + break; + } + + ReadOnlySequence cache = buffer.Slice(4, length); + SequencePosition position = cache.Start; + while (cache.TryGet(ref position, out ReadOnlyMemory memory)) + { + bufferCache.AddRange(memory); + } + await callback.Receive(this, bufferCache.Data.Slice(0, bufferCache.Size), this.userToken).ConfigureAwait(false); + bufferCache.Clear(); + + SequencePosition endPosition = buffer.GetPosition(4 + length); + buffer = buffer.Slice(endPosition); + } + return buffer.Start; } - public void Close() + public async Task SendAsync(ReadOnlyMemory data, CancellationToken cancellationToken = default) + { + await Socket.WriteAsync(data, cancellationToken); + } + + private void Cancel() { callback = null; - Socket?.SafeClose(); + userToken = null; + cancellationTokenSource?.Cancel(); + pipe = null; + bufferCache.Clear(true); + } + public void Close() + { + + Cancel(); + Socket?.Close(); + Socket?.Dispose(); } public override string ToString() @@ -202,7 +237,4 @@ namespace cmonitor.client.tunnel } - - - } diff --git a/cmonitor/client/tunnel/TunnelProxy.cs b/cmonitor/client/tunnel/TunnelProxy.cs index 8116f2ee..02bae7f7 100644 --- a/cmonitor/client/tunnel/TunnelProxy.cs +++ b/cmonitor/client/tunnel/TunnelProxy.cs @@ -4,10 +4,11 @@ using System.Buffers; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; +using System.Text; namespace cmonitor.client.tunnel { - public class TunnelProxy : ITunnelConnectionCallback + public class TunnelProxy : ITunnelConnectionReceiveCallback { private ConcurrentDictionary userTokens = new ConcurrentDictionary(); private ConcurrentDictionary udpClients = new ConcurrentDictionary(); @@ -313,39 +314,14 @@ namespace cmonitor.client.tunnel connection.BeginReceive(this, new AsyncUserTunnelToken { Connection = connection, - Buffer = new ReceiveDataBuffer(), Proxy = new ProxyInfo { } }); } - public async Task Receive(ITunnelConnection connection, Memory memory, object userToken) + public async Task Receive(ITunnelConnection connection, ReadOnlyMemory memory, object userToken) { AsyncUserTunnelToken token = userToken as AsyncUserTunnelToken; - //是一个完整的包 - if (token.Buffer.Size == 0 && memory.Length > 4) - { - int packageLen = memory.ToInt32(); - if (packageLen == memory.Length - 4) - { - token.Proxy.DeBytes(memory.Slice(0, packageLen + 4)); - await ReadConnectionPack(token).ConfigureAwait(false); - return; - } - } - - //不是完整包 - token.Buffer.AddRange(memory); - do - { - int packageLen = token.Buffer.Data.ToInt32(); - if (packageLen > token.Buffer.Size - 4) - { - break; - } - token.Proxy.DeBytes(token.Buffer.Data.Slice(0, packageLen + 4)); - await ReadConnectionPack(token).ConfigureAwait(false); - - token.Buffer.RemoveRange(0, packageLen + 4); - } while (token.Buffer.Size > 4); + token.Proxy.DeBytes(memory); + await ReadConnectionPack(token).ConfigureAwait(false); } public async Task Closed(ITunnelConnection connection, object userToken) { @@ -404,12 +380,12 @@ namespace cmonitor.client.tunnel await SendToConnection(token); token.Proxy.Step = ProxyStep.Forward; - BindReceiveTarget(token); if (state.Data.Length > 0) { - await socket.SendAsync(state.Data.AsMemory(), SocketFlags.None); + await state.Socket.SendAsync(state.Data.AsMemory(0, state.Length), SocketFlags.None); } + BindReceiveTarget(token); } catch (Exception ex) { @@ -433,10 +409,13 @@ namespace cmonitor.client.tunnel ConnectId connectId = new ConnectId(tunnelToken.Proxy.ConnectId, tunnelToken.Connection.GetHashCode()); if (dic.TryGetValue(connectId, out AsyncUserToken token)) { - token.Received = true; - if (token.Socket.ReceiveAsync(token.Saea) == false) + if(token.Received == false) { - ProcessReceive(token.Saea); + token.Received = true; + if (token.Socket.ReceiveAsync(token.Saea) == false) + { + ProcessReceive(token.Saea); + } } } } @@ -479,10 +458,11 @@ namespace cmonitor.client.tunnel { try { - await token1.Socket.SendAsync(token1.Proxy.Data); + await token1.Socket.SendAsync(tunnelToken.Proxy.Data); } - catch (Exception) + catch (Exception ex) { + Logger.Instance.Error(ex); CloseClientSocket(token1); } } @@ -587,7 +567,7 @@ namespace cmonitor.client.tunnel { if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) { - Logger.Instance.Error($"socks5 forward udp -> receive" + ex); + Logger.Instance.Error($"forward udp -> receive" + ex); } CloseClientSocket(token); } @@ -816,7 +796,7 @@ namespace cmonitor.client.tunnel public byte Rsv { get; set; } - public Memory Data { get; set; } + public ReadOnlyMemory Data { get; set; } public byte[] ToBytes(out int length) { @@ -889,10 +869,10 @@ namespace cmonitor.client.tunnel ArrayPool.Shared.Return(bytes); } - public void DeBytes(Memory memory) + public void DeBytes(ReadOnlyMemory memory) { - int index = 4; - Span span = memory.Span; + int index = 0; + ReadOnlySpan span = memory.Span; ConnectId = memory.Slice(index).ToUInt64(); index += 8; @@ -938,9 +918,6 @@ namespace cmonitor.client.tunnel public ProxyInfo Proxy { get; set; } - public ReceiveDataBuffer Buffer { get; set; } - - public void Clear() { GC.Collect(); @@ -966,8 +943,6 @@ namespace cmonitor.client.tunnel public ProxyInfo Proxy { get; set; } - public ReceiveDataBuffer Buffer { get; set; } - public SocketAsyncEventArgs Saea { get; set; } public bool Received { get; set; } @@ -976,8 +951,6 @@ namespace cmonitor.client.tunnel { Socket?.SafeClose(); - //Buffer?.Clear(); - Saea?.Dispose(); GC.Collect(); @@ -993,7 +966,7 @@ namespace cmonitor.client.tunnel public byte[] Data { get; set; } = Helper.EmptyArray; public int Length { get; set; } - public void CopyData(Memory data) + public void CopyData(ReadOnlyMemory data) { if (data.Length > 0) { diff --git a/cmonitor/plugins/forward/proxy/ForwardProxy.cs b/cmonitor/plugins/forward/proxy/ForwardProxy.cs index 4e06eb24..85ab595d 100644 --- a/cmonitor/plugins/forward/proxy/ForwardProxy.cs +++ b/cmonitor/plugins/forward/proxy/ForwardProxy.cs @@ -76,26 +76,26 @@ namespace cmonitor.plugins.forward.proxy } if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) Logger.Instance.Debug($"forward tunnel to {machineName}"); - connection = await tunnelTransfer.ConnectAsync(machineName, "forward"); + connection = null;//await tunnelTransfer.ConnectAsync(machineName, "forward"); if (connection != null) { if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) Logger.Instance.Debug($"forward tunnel to {machineName} success"); } if (connection == null) { - //if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) Logger.Instance.Debug($"forward relay to {machineName}"); + if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) Logger.Instance.Debug($"forward relay to {machineName}"); - //connection = await relayTransfer.ConnectAsync(machineName, "forward"); + connection = await relayTransfer.ConnectAsync(machineName, "forward"); if (connection != null) { - // if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) Logger.Instance.Debug($"forward relay to {machineName} success"); + if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) Logger.Instance.Debug($"forward relay to {machineName} success"); } } if (connection != null) { - connections.AddOrUpdate(machineName, connection,(a,b)=> connection); + connections.AddOrUpdate(machineName, connection, (a, b) => connection); } - + } catch (Exception) { diff --git a/cmonitor/plugins/relay/RelayTransfer.cs b/cmonitor/plugins/relay/RelayTransfer.cs index fad2f9e2..8a8fe8b2 100644 --- a/cmonitor/plugins/relay/RelayTransfer.cs +++ b/cmonitor/plugins/relay/RelayTransfer.cs @@ -88,6 +88,13 @@ namespace cmonitor.plugins.relay ITunnelConnection connection = await transport.RelayAsync(relayInfo); if (connection != null) { + if (OnConnected.TryGetValue(connection.TransactionId, out List> callbacks)) + { + foreach (var callabck in callbacks) + { + callabck(connection); + } + } return connection; } else @@ -112,9 +119,9 @@ namespace cmonitor.plugins.relay { if (OnConnected.TryGetValue(connection.TransactionId, out List> callbacks)) { - foreach (var item in callbacks) + foreach (var callabck in callbacks) { - item(connection); + callabck(connection); } } return true; diff --git a/cmonitor/plugins/relay/transport/TransportSelfHost.cs b/cmonitor/plugins/relay/transport/TransportSelfHost.cs index 68f2c0cb..d1ac7fe5 100644 --- a/cmonitor/plugins/relay/transport/TransportSelfHost.cs +++ b/cmonitor/plugins/relay/transport/TransportSelfHost.cs @@ -5,8 +5,8 @@ using cmonitor.server; using common.libs; using common.libs.extends; using MemoryPack; +using System.Net; using System.Net.Sockets; -using System.Text; namespace cmonitor.plugins.relay.transport { @@ -18,7 +18,6 @@ namespace cmonitor.plugins.relay.transport private readonly TcpServer tcpServer; private readonly MessengerSender messengerSender; - private readonly Memory relayFlagData = Encoding.UTF8.GetBytes("snltty.relay").AsMemory(); public TransportSelfHost(TcpServer tcpServer, MessengerSender messengerSender) { @@ -33,7 +32,7 @@ namespace cmonitor.plugins.relay.transport socket.IPv6Only(relayInfo.Server.AddressFamily, false); await socket.ConnectAsync(relayInfo.Server).WaitAsync(TimeSpan.FromMilliseconds(500)); - IConnection connection = await tcpServer.BindReceive(socket); + IConnection connection = await tcpServer.BeginReceive(socket); MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap { Connection = connection, @@ -46,14 +45,16 @@ namespace cmonitor.plugins.relay.transport connection.Disponse(); return null; } - await socket.SendAsync(relayFlagData); + connection.Cancel(); await Task.Delay(10); return new TunnelConnectionTcp { Direction = TunnelDirection.Forward, ProtocolType = TunnelProtocolType.Tcp, RemoteMachineName = relayInfo.RemoteMachineName, - Socket = socket, + Socket = connection.TcpSourceSocket, + Mode = TunnelMode.Client, + IPEndPoint = socket.RemoteEndPoint as IPEndPoint, TransactionId = relayInfo.TransactionId, TransportName = Name, Type = TunnelType.Relay @@ -67,27 +68,23 @@ namespace cmonitor.plugins.relay.transport socket.IPv6Only(relayInfo.Server.AddressFamily, false); await socket.ConnectAsync(relayInfo.Server).WaitAsync(TimeSpan.FromMilliseconds(500)); - IConnection connection = await tcpServer.BindReceive(socket); - MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap + IConnection connection = await tcpServer.BeginReceive(socket); + await messengerSender.SendOnly(new MessageRequestWrap { Connection = connection, MessengerId = (ushort)RelayMessengerIds.RelayForward, - Payload = MemoryPackSerializer.Serialize(relayInfo), - Timeout = 2000 + Payload = MemoryPackSerializer.Serialize(relayInfo) }); - if (resp.Code != MessageResponeCodes.OK || resp.Data.Span.SequenceEqual(Helper.TrueArray) == false) - { - connection.Disponse(); - return null; - } - await socket.SendAsync(relayFlagData); + connection.Cancel(); await Task.Delay(10); return new TunnelConnectionTcp { Direction = TunnelDirection.Reverse, ProtocolType = TunnelProtocolType.Tcp, RemoteMachineName = relayInfo.RemoteMachineName, - Socket = socket, + Socket = connection.TcpSourceSocket, + Mode = TunnelMode.Server, + IPEndPoint = socket.RemoteEndPoint as IPEndPoint, TransactionId = relayInfo.TransactionId, TransportName = Name, Type = TunnelType.Relay diff --git a/cmonitor/plugins/tunnel/compact/TunnelCompactTransfer.cs b/cmonitor/plugins/tunnel/compact/TunnelCompactTransfer.cs index 6bc39e97..4051363a 100644 --- a/cmonitor/plugins/tunnel/compact/TunnelCompactTransfer.cs +++ b/cmonitor/plugins/tunnel/compact/TunnelCompactTransfer.cs @@ -1,5 +1,7 @@ -using cmonitor.config; +using cmonitor.client; +using cmonitor.config; using common.libs; +using common.libs.extends; using Microsoft.Extensions.DependencyInjection; using System.Diagnostics; using System.Net; @@ -13,6 +15,7 @@ namespace cmonitor.plugins.tunnel.compact private readonly Config config; private readonly ServiceProvider serviceProvider; + public TunnelCompactTransfer(Config config, ServiceProvider serviceProvider) { this.config = config; diff --git a/cmonitor/plugins/tunnel/config/Config.cs b/cmonitor/plugins/tunnel/config/Config.cs index 2865117b..a953f5b3 100644 --- a/cmonitor/plugins/tunnel/config/Config.cs +++ b/cmonitor/plugins/tunnel/config/Config.cs @@ -14,6 +14,9 @@ namespace cmonitor.config public TunnelCompactInfo[] Servers { get; set; } = Array.Empty(); public int RouteLevelPlus { get; set; } = 0; + public string Certificate { get; set; } = "./snltty.pfx"; + public string Password { get; set; } = "snltty"; + public List TunnelTransports { get; set; } = new List(); [JsonIgnore] diff --git a/cmonitor/plugins/tunnel/server/TunnelBindServer.cs b/cmonitor/plugins/tunnel/server/TunnelBindServer.cs index 53778ff2..de8d56fb 100644 --- a/cmonitor/plugins/tunnel/server/TunnelBindServer.cs +++ b/cmonitor/plugins/tunnel/server/TunnelBindServer.cs @@ -1,6 +1,5 @@ using common.libs; using common.libs.extends; -using NAudio.CoreAudioApi; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; @@ -9,10 +8,8 @@ namespace cmonitor.plugins.tunnel.server { public sealed class TunnelBindServer { - private UdpClient socketUdp; - - public Action OnTcpConnected { get; set; } = (state, socket) => { }; - public Action OnUdpConnected { get; set; } = (state, udpClient) => { }; + public Func OnTcpConnected { get; set; } = async (state, socket) => { await Task.CompletedTask; }; + public Func OnUdpConnected { get; set; } = async (state, udpClient) => { await Task.CompletedTask; }; private ConcurrentDictionary acceptBinds = new ConcurrentDictionary(); @@ -45,22 +42,22 @@ namespace cmonitor.plugins.tunnel.server StartAccept(acceptEventArg); - socketUdp = new UdpClient(localIP.AddressFamily); - socketUdp.Client.ReuseBind(new IPEndPoint(localIP, local.Port)); + token.UdpClient = new UdpClient(localIP.AddressFamily); + token.UdpClient.Client.ReuseBind(new IPEndPoint(localIP, local.Port)); //socketUdp.Client.EnableBroadcast = true; - socketUdp.Client.WindowsUdpBug(); - IAsyncResult result = socketUdp.BeginReceive(ReceiveCallbackUdp, state); + token.UdpClient.Client.WindowsUdpBug(); + IAsyncResult result = token.UdpClient.BeginReceive(ReceiveCallbackUdp, token); } catch (Exception ex) { Logger.Instance.Error(ex); } } - public void RemoveBind(int localPort) + public void RemoveBind(int localPort, bool closeUdp) { if (acceptBinds.TryRemove(localPort, out AsyncUserToken saea)) { - CloseClientSocket(saea); + CloseClientSocket(saea, closeUdp); } } @@ -77,7 +74,7 @@ namespace cmonitor.plugins.tunnel.server } catch (Exception) { - token.Clear(); + token.Clear(true); } } private void IO_Completed(object sender, SocketAsyncEventArgs e) @@ -106,29 +103,30 @@ namespace cmonitor.plugins.tunnel.server } private void ReceiveCallbackUdp(IAsyncResult result) { + AsyncUserToken token = result.AsyncState as AsyncUserToken; try { IPEndPoint ep = new IPEndPoint(IPAddress.Any, IPEndPoint.MinPort); - byte[] _ = socketUdp.EndReceive(result, ref ep); + byte[] _ = token.UdpClient.EndReceive(result, ref ep); - OnUdpConnected(result.AsyncState, socketUdp); + OnUdpConnected(token.State, token.UdpClient); } catch (Exception) { } } - private void CloseClientSocket(AsyncUserToken token) + private void CloseClientSocket(AsyncUserToken token, bool closeUdp) { if (token == null) return; Socket socket = token.SourceSocket; if (socket != null) { - token.Clear(); + token.Clear(closeUdp); if (acceptBinds.TryRemove(token.LocalPort, out AsyncUserToken tk)) { - CloseClientSocket(tk); + CloseClientSocket(tk, closeUdp); } } } @@ -140,11 +138,16 @@ namespace cmonitor.plugins.tunnel.server public object State { get; set; } public int LocalPort { get; set; } - public void Clear() + public UdpClient UdpClient { get; set; } + + public void Clear(bool closeUdp) { SourceSocket?.SafeClose(); SourceSocket = null; + if (closeUdp) + UdpClient?.Close(); + Saea?.Dispose(); GC.Collect(); diff --git a/cmonitor/plugins/tunnel/transport/TransportTcpNutssb.cs b/cmonitor/plugins/tunnel/transport/TransportTcpNutssb.cs index 17508a80..21bddc35 100644 --- a/cmonitor/plugins/tunnel/transport/TransportTcpNutssb.cs +++ b/cmonitor/plugins/tunnel/transport/TransportTcpNutssb.cs @@ -1,10 +1,14 @@ using cmonitor.client.tunnel; +using cmonitor.config; using cmonitor.plugins.tunnel.server; using common.libs; using common.libs.extends; using System.Collections.Concurrent; using System.Net; +using System.Net.Security; using System.Net.Sockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; namespace cmonitor.plugins.tunnel.transport { @@ -14,18 +18,29 @@ namespace cmonitor.plugins.tunnel.transport public string Label => "基于低TTL的TCP打洞"; public TunnelProtocolType ProtocolType => TunnelProtocolType.Tcp; + private X509Certificate serverCertificate; + public Func> OnSendConnectBegin { get; set; } = async (info) => { return await Task.FromResult(false); }; public Func OnSendConnectFail { get; set; } = async (info) => { await Task.CompletedTask; }; public Func OnSendConnectSuccess { get; set; } = async (info) => { await Task.CompletedTask; }; public Action OnConnected { get; set; } = (state) => { }; - - private readonly TunnelBindServer tunnelBindServer; - public TunnelTransportTcpNutssb(TunnelBindServer tunnelBindServer) + public TunnelTransportTcpNutssb(TunnelBindServer tunnelBindServer, Config config) { this.tunnelBindServer = tunnelBindServer; tunnelBindServer.OnTcpConnected += OnTcpConnected; + + string path = Path.GetFullPath(config.Data.Client.Tunnel.Certificate); + if (File.Exists(path)) + { + serverCertificate = new X509Certificate(path, config.Data.Client.Tunnel.Password); + } + else + { + Logger.Instance.Error($"file {path} not found"); + Environment.Exit(0); + } } public async Task ConnectAsync(TunnelTransportInfo tunnelTransportInfo) @@ -65,7 +80,6 @@ namespace cmonitor.plugins.tunnel.transport await OnSendConnectFail(tunnelTransportInfo); return null; } - public void OnBegin(TunnelTransportInfo tunnelTransportInfo) { if (tunnelTransportInfo.Direction == TunnelDirection.Forward) @@ -96,7 +110,7 @@ namespace cmonitor.plugins.tunnel.transport public void OnFail(TunnelTransportInfo tunnelTransportInfo) { - tunnelBindServer.RemoveBind(tunnelTransportInfo.Local.Local.Port); + tunnelBindServer.RemoveBind(tunnelTransportInfo.Local.Local.Port, true); if (reverseDic.TryRemove(tunnelTransportInfo.Remote.MachineName, out TaskCompletionSource tcs)) { tcs.SetResult(null); @@ -104,7 +118,7 @@ namespace cmonitor.plugins.tunnel.transport } public void OnSuccess(TunnelTransportInfo tunnelTransportInfo) { - tunnelBindServer.RemoveBind(tunnelTransportInfo.Local.Local.Port); + tunnelBindServer.RemoveBind(tunnelTransportInfo.Local.Local.Port, true); if (reverseDic.TryRemove(tunnelTransportInfo.Remote.MachineName, out TaskCompletionSource tcs)) { tcs.SetResult(null); @@ -173,25 +187,35 @@ namespace cmonitor.plugins.tunnel.transport } targetSocket.EndConnect(result); + + SslStream sslStream = new SslStream(new NetworkStream(targetSocket), true, new RemoteCertificateValidationCallback(ValidateServerCertificate), null); + await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions { EnabledSslProtocols = SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13 }); + return new TunnelConnectionTcp { - Socket = targetSocket, + Socket = sslStream, + IPEndPoint = targetSocket.RemoteEndPoint as IPEndPoint, TransactionId = tunnelTransportInfo.TransactionId, RemoteMachineName = tunnelTransportInfo.Remote.MachineName, TransportName = Name, Direction = tunnelTransportInfo.Direction, ProtocolType = ProtocolType, Type = TunnelType.P2P, + Mode = TunnelMode.Client, Label = string.Empty }; } - catch (Exception ex) + catch (Exception) { targetSocket.SafeClose(); } } return null; } + public bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) + { + return true; + } private void BindAndTTL(TunnelTransportInfo tunnelTransportInfo) { //给对方发送TTL消息 @@ -252,28 +276,43 @@ namespace cmonitor.plugins.tunnel.transport return null; } - private void OnTcpConnected(object state, Socket socket) + private async Task OnTcpConnected(object state, Socket socket) { if (state is TunnelTransportInfo _state && _state.TransportName == Name) { - TunnelConnectionTcp result = new TunnelConnectionTcp + try { - RemoteMachineName = _state.Remote.MachineName, - Direction = _state.Direction, - ProtocolType = TunnelProtocolType.Tcp, - Socket = socket, - Type = TunnelType.P2P, - TransactionId = _state.TransactionId, - TransportName = _state.TransportName, - Label = string.Empty, - }; - if (reverseDic.TryRemove(_state.Remote.MachineName, out TaskCompletionSource tcs)) - { - tcs.SetResult(result); - return; - } + SslStream sslStream = new SslStream(new NetworkStream(socket), true); + await sslStream.AuthenticateAsServerAsync(serverCertificate, false, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13, false); - OnConnected(result); + TunnelConnectionTcp result = new TunnelConnectionTcp + { + RemoteMachineName = _state.Remote.MachineName, + Direction = _state.Direction, + ProtocolType = TunnelProtocolType.Tcp, + Socket = sslStream, + Type = TunnelType.P2P, + Mode = TunnelMode.Server, + TransactionId = _state.TransactionId, + TransportName = _state.TransportName, + IPEndPoint = socket.RemoteEndPoint as IPEndPoint, + Label = string.Empty, + }; + if (reverseDic.TryRemove(_state.Remote.MachineName, out TaskCompletionSource tcs)) + { + tcs.SetResult(result); + return; + } + + OnConnected(result); + } + catch (Exception ex) + { + if(Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + Logger.Instance.Error(ex); + } + } } } diff --git a/cmonitor/plugins/tuntap/proxy/TuntapProxy.cs b/cmonitor/plugins/tuntap/proxy/TuntapProxy.cs index 1b23178f..3be58546 100644 --- a/cmonitor/plugins/tuntap/proxy/TuntapProxy.cs +++ b/cmonitor/plugins/tuntap/proxy/TuntapProxy.cs @@ -79,7 +79,7 @@ namespace cmonitor.plugins.tuntap.proxy Socks5EnumRequestCommand command = (Socks5EnumRequestCommand)token.Proxy.Data.Span[1]; //获取远端地址 - Memory ipArray = Socks5Parser.GetRemoteEndPoint(token.Proxy.Data, out Socks5EnumAddressType addressType, out ushort port, out int index); + ReadOnlyMemory ipArray = Socks5Parser.GetRemoteEndPoint(token.Proxy.Data, out Socks5EnumAddressType addressType, out ushort port, out int index); token.Proxy.TargetEP = new IPEndPoint(new IPAddress(ipArray.Span), port); token.Proxy.Data = token.Proxy.Data.Slice(index); //不支持域名 @@ -109,7 +109,7 @@ namespace cmonitor.plugins.tuntap.proxy } protected override async Task ConnectUdp(AsyncUserUdpToken token) { - Memory ipArray = Socks5Parser.GetRemoteEndPoint(token.Proxy.Data, out Socks5EnumAddressType addressType, out ushort port, out int index); + ReadOnlyMemory ipArray = Socks5Parser.GetRemoteEndPoint(token.Proxy.Data, out Socks5EnumAddressType addressType, out ushort port, out int index); token.Proxy.TargetEP = new IPEndPoint(new IPAddress(ipArray.Span), port); //解析出udp包的数据部分 token.Proxy.Data = Socks5Parser.GetUdpData(token.Proxy.Data); @@ -134,7 +134,7 @@ namespace cmonitor.plugins.tuntap.proxy SemaphoreSlim slimGlobal = new SemaphoreSlim(1); - private async ValueTask ConnectTunnel(Memory ipArray) + private async ValueTask ConnectTunnel(ReadOnlyMemory ipArray) { uint ip = BinaryPrimitives.ReadUInt32BigEndian(ipArray.Span); uint network = ip & maskValue; diff --git a/cmonitor/server/IConnection.cs b/cmonitor/server/IConnection.cs index 5ff4d36b..3016a484 100644 --- a/cmonitor/server/IConnection.cs +++ b/cmonitor/server/IConnection.cs @@ -1,22 +1,21 @@ using common.libs; using common.libs.extends; using System.Buffers; +using System.IO.Pipelines; using System.Net; using System.Net.Security; using System.Net.Sockets; namespace cmonitor.server { - /// - /// 连接对象 - /// + public interface IConnectionReceiveCallback + { + public Task Receive(IConnection connection, ReadOnlyMemory data, object state); + } + public interface IConnection { public string Name { get; set; } - /// - /// - /// 已连接 - /// public bool Connected { get; } public IPEndPoint Address { get; } @@ -26,37 +25,17 @@ namespace cmonitor.server public SslStream TcpTargetSocket { get; set; } #region 接收数据 - /// - /// 请求数据包装对象 - /// public MessageRequestWrap ReceiveRequestWrap { get; } - /// - /// 回复数据包装对象 - /// public MessageResponseWrap ReceiveResponseWrap { get; } - /// - /// 接收到的原始数据 - /// public ReadOnlyMemory ReceiveData { get; set; } #endregion - /// - /// 发送 - /// - /// - /// - public Task Send(ReadOnlyMemory data); - /// - /// 发送 - /// - /// - /// - /// - public Task Send(byte[] data, int length); + public void BeginReceive(IConnectionReceiveCallback callback, object userToken, bool byFrame = true); - /// - /// 销毁 - /// + public Task SendAsync(ReadOnlyMemory data); + public Task SendAsync(byte[] data, int length); + + public void Cancel(); public void Disponse(); #region 回复消息相关 @@ -66,19 +45,8 @@ namespace cmonitor.server public void Write(ulong num); public void Write(ushort num); public void Write(ushort[] nums); - /// - /// 英文多用这个 - /// - /// public void WriteUTF8(string str); - /// - /// 中文多用这个 - /// - /// public void WriteUTF16(string str); - /// - /// 归还池 - /// public void Return(); #endregion @@ -91,32 +59,17 @@ namespace cmonitor.server } public string Name { get; set; } - /// - /// 已连接 - /// public virtual bool Connected => false; - /// - /// 地址 - /// public IPEndPoint Address { get; protected set; } public IPEndPoint LocalAddress { get; protected set; } public SslStream TcpSourceSocket { get; protected set; } public SslStream TcpTargetSocket { get; set; } - public bool Relayed { get; set; } + #region 接收数据 - /// - /// 接收请求数据 - /// public MessageRequestWrap ReceiveRequestWrap { get; set; } - /// - /// 接收回执数据 - /// public MessageResponseWrap ReceiveResponseWrap { get; set; } - /// - /// 接收数据 - /// public ReadOnlyMemory ReceiveData { get; set; } #endregion @@ -150,10 +103,7 @@ namespace cmonitor.server nums.ToBytes(responseData); ResponseData = responseData.AsMemory(0, length); } - /// - /// 英文多用这个 - /// - /// + public void WriteUTF8(string str) { var span = str.AsSpan(); @@ -167,10 +117,7 @@ namespace cmonitor.server ResponseData = responseData.AsMemory(0, length); } - /// - /// 中文多用这个 - /// - /// + public void WriteUTF16(string str) { var span = str.GetUTF16Bytes(); @@ -181,9 +128,7 @@ namespace cmonitor.server ResponseData = responseData.AsMemory(0, length); } - /// - /// 归还池 - /// + public void Return() { if (length > 0 && ResponseData.Length > 0) @@ -196,34 +141,23 @@ namespace cmonitor.server } #endregion - /// - /// 发送 - /// - /// - /// - public abstract Task Send(ReadOnlyMemory data); - /// - /// 发送 - /// - /// - /// - /// - public abstract Task Send(byte[] data, int length); - /// - /// 销毁 - /// + public abstract void BeginReceive(IConnectionReceiveCallback callback, object userToken, bool byFrame = true); + + public abstract Task SendAsync(ReadOnlyMemory data); + public abstract Task SendAsync(byte[] data, int length); + + public virtual void Cancel() + { + } public virtual void Disponse() { } - - } - public sealed class TcpConnection : Connection { - public TcpConnection(SslStream stream,IPEndPoint local, IPEndPoint remote) : base() + public TcpConnection(SslStream stream, IPEndPoint local, IPEndPoint remote) : base() { TcpSourceSocket = stream; @@ -240,25 +174,137 @@ namespace cmonitor.server LocalAddress = local; } - /// - /// 已连接 - /// public override bool Connected => TcpSourceSocket != null && TcpSourceSocket.CanWrite; - /// - /// 发送 - /// - /// - /// - public override async Task Send(ReadOnlyMemory data) + private IConnectionReceiveCallback callback; + private CancellationTokenSource cancellationTokenSource; + private object userToken; + private bool byFrame; + private Pipe pipe; + private ReceiveDataBuffer bufferCache = new ReceiveDataBuffer(); + public override void BeginReceive(IConnectionReceiveCallback callback, object userToken, bool byFrame = true) + { + if (this.callback != null) return; + + this.callback = callback; + this.userToken = userToken; + this.byFrame = byFrame; + cancellationTokenSource = new CancellationTokenSource(); + pipe = new Pipe(); + + _ = ProcessWrite(); + _ = ProcessReader(); + } + private async Task ProcessWrite() + { + var writer = pipe.Writer; + try + { + while (cancellationTokenSource.IsCancellationRequested == false) + { + Memory buffer = writer.GetMemory(8 * 1024); + int length = await TcpSourceSocket.ReadAsync(buffer, cancellationTokenSource.Token); + writer.Advance(length); + await writer.FlushAsync(); + } + } + catch (Exception ex) + { + if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + Logger.Instance.Error(ex); + } + } + finally + { + writer.Complete(); + } + } + private async Task ProcessReader() + { + PipeReader reader = pipe.Reader; + try + { + while (cancellationTokenSource.IsCancellationRequested == false) + { + ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false); + ReadOnlySequence buffer = readResult.Buffer; + SequencePosition end = await ReadPacket(buffer).ConfigureAwait(false); + reader.AdvanceTo(end); + } + } + catch (Exception ex) + { + if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + Logger.Instance.Error(ex); + } + } + finally + { + bufferCache.Clear(true); + reader.Complete(); + } + } + private unsafe int ReaderHead(ReadOnlySequence buffer) + { + Span span = stackalloc byte[4]; + buffer.Slice(0, 4).CopyTo(span); + return span.ToInt32(); + } + private async Task ReadPacket(ReadOnlySequence buffer) + { + if (TcpTargetSocket != null) + { + SequencePosition position = buffer.Start; + while (buffer.TryGet(ref position, out ReadOnlyMemory memory)) + { + await TcpTargetSocket.WriteAsync(memory); + } + return buffer.End; + } + + if (byFrame == false) + { + SequencePosition position = buffer.Start; + if (buffer.TryGet(ref position, out ReadOnlyMemory memory)) + { + await callback.Receive(this, memory, this.userToken).ConfigureAwait(false); + } + return buffer.End; + } + + //粘包 + while (buffer.Length > 4) + { + int length = ReaderHead(buffer); + if (buffer.Length < length + 4) + { + break; + } + + ReadOnlySequence cache = buffer.Slice(4, length); + SequencePosition position = cache.Start; + while (cache.TryGet(ref position, out ReadOnlyMemory memory)) + { + bufferCache.AddRange(memory); + } + await callback.Receive(this, bufferCache.Data.Slice(0, bufferCache.Size), this.userToken).ConfigureAwait(false); + bufferCache.Clear(); + + SequencePosition endPosition = buffer.GetPosition(4 + length); + buffer = buffer.Slice(endPosition); + } + return buffer.Start; + } + + public override async Task SendAsync(ReadOnlyMemory data) { if (Connected) { try { await TcpSourceSocket.WriteAsync(data); - await TcpSourceSocket.FlushAsync(); - //SentBytes += (ulong)data.Length; return true; } catch (Exception ex) @@ -270,21 +316,26 @@ namespace cmonitor.server } return false; } - /// - /// 发送 - /// - /// - /// - /// - public override async Task Send(byte[] data, int length) + public override async Task SendAsync(byte[] data, int length) { - return await Send(data.AsMemory(0, length)); + return await SendAsync(data.AsMemory(0, length)); } - /// - /// 销毁 - /// + + + public override void Cancel() + { + callback = null; + userToken = null; + cancellationTokenSource?.Cancel(); + + pipe = null; + + bufferCache.Clear(true); + } + public override void Disponse() { + Cancel(); base.Disponse(); try { diff --git a/cmonitor/server/IMessenger.cs b/cmonitor/server/IMessenger.cs index 37247530..6faf6c87 100644 --- a/cmonitor/server/IMessenger.cs +++ b/cmonitor/server/IMessenger.cs @@ -7,31 +7,6 @@ { } - /// - /// 消息id范围 - /// - [AttributeUsage(AttributeTargets.Class)] - public sealed class MessengerIdRangeAttribute : Attribute - { - /// - /// 最小 - /// - public ushort Min { get; set; } - /// - /// 最大 - /// - public ushort Max { get; set; } - /// - /// - /// - /// - /// - public MessengerIdRangeAttribute(ushort min, ushort max) - { - Min = min; - Max = max; - } - } /// /// 消息id /// @@ -51,13 +26,4 @@ Id = id; } } - - /// - /// 消息 - /// - - [AttributeUsage(AttributeTargets.Enum)] - public sealed class MessengerIdEnumAttribute : Attribute - { - } } diff --git a/cmonitor/server/MessengerResolver.cs b/cmonitor/server/MessengerResolver.cs index 0ead828e..feaa30c5 100644 --- a/cmonitor/server/MessengerResolver.cs +++ b/cmonitor/server/MessengerResolver.cs @@ -7,24 +7,19 @@ namespace cmonitor.server /// /// 消息处理总线 /// - public sealed class MessengerResolver + public sealed class MessengerResolver : IConnectionReceiveCallback { delegate void VoidDelegate(IConnection connection); delegate Task TaskDelegate(IConnection connection); private readonly Dictionary messengers = new(); - private readonly TcpServer tcpserver; private readonly MessengerSender messengerSender; private readonly ServiceProvider serviceProvider; - - public MessengerResolver(TcpServer tcpserver, MessengerSender messengerSender, ServiceProvider serviceProvider) + public MessengerResolver(MessengerSender messengerSender, ServiceProvider serviceProvider) { - this.tcpserver = tcpserver; this.messengerSender = messengerSender; - - this.tcpserver.OnPacket = InputData; this.serviceProvider = serviceProvider; } @@ -37,7 +32,7 @@ namespace cmonitor.server foreach (Type type in types) { object obj = serviceProvider.GetService(type); - if(obj == null) + if (obj == null) { continue; } @@ -75,31 +70,23 @@ namespace cmonitor.server } - /// - /// 收到消息 - /// - /// - /// - public async Task InputData(IConnection connection) + public async Task Receive(IConnection connection, ReadOnlyMemory data, object state) { - ReadOnlyMemory receive = connection.ReceiveData; - //去掉表示数据长度的4字节 - ReadOnlyMemory readReceive = receive.Slice(4); MessageResponseWrap responseWrap = connection.ReceiveResponseWrap; MessageRequestWrap requestWrap = connection.ReceiveRequestWrap; try { //回复的消息 - if ((MessageTypes)(readReceive.Span[0] & 0b0000_1111) == MessageTypes.RESPONSE) + if ((MessageTypes)(data.Span[0] & 0b0000_1111) == MessageTypes.RESPONSE) { - responseWrap.FromArray(readReceive); + responseWrap.FromArray(data); messengerSender.Response(responseWrap); return; } //新的请求 - requestWrap.FromArray(readReceive); + requestWrap.FromArray(data); //404,没这个插件 if (messengers.TryGetValue(requestWrap.MessengerId, out MessengerCacheInfo plugin) == false) { @@ -144,7 +131,6 @@ namespace cmonitor.server } } - /// /// 消息插件缓存 /// diff --git a/cmonitor/server/MessengerSender.cs b/cmonitor/server/MessengerSender.cs index 62cbde50..f6b72624 100644 --- a/cmonitor/server/MessengerSender.cs +++ b/cmonitor/server/MessengerSender.cs @@ -69,7 +69,7 @@ namespace cmonitor.server } byte[] bytes = msg.ToArray(out int length); - bool res = await msg.Connection.Send(bytes.AsMemory(0, length)).ConfigureAwait(false); + bool res = await msg.Connection.SendAsync(bytes.AsMemory(0, length)).ConfigureAwait(false); msg.Return(bytes); return res; } @@ -95,7 +95,7 @@ namespace cmonitor.server try { byte[] bytes = msg.ToArray(out int length); - bool res = await msg.Connection.Send(bytes.AsMemory(0, length)).ConfigureAwait(false); + bool res = await msg.Connection.SendAsync(bytes.AsMemory(0, length)).ConfigureAwait(false); msg.Return(bytes); return res; } diff --git a/cmonitor/server/ServerStartup.cs b/cmonitor/server/ServerStartup.cs index 0a5efdff..3b7a8023 100644 --- a/cmonitor/server/ServerStartup.cs +++ b/cmonitor/server/ServerStartup.cs @@ -29,9 +29,6 @@ namespace cmonitor.server public void AddServer(ServiceCollection serviceCollection, Config config, Assembly[] assemblies) { serviceCollection.AddSingleton(); - // if (OperatingSystem.IsWindows()) serviceCollection.AddSingleton(); - // else if (OperatingSystem.IsLinux()) serviceCollection.AddSingleton(); - // else if (OperatingSystem.IsMacOS()) serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); @@ -65,6 +62,7 @@ namespace cmonitor.server { //服务 TcpServer tcpServer = serviceProvider.GetService(); + tcpServer.Init(config.Data.Server.Certificate, config.Data.Server.Password); tcpServer.Start(config.Data.Server.ServicePort); } catch (Exception ex) diff --git a/cmonitor/server/TcpServer.cs b/cmonitor/server/TcpServer.cs index 7dd1c5f3..86b5d9c8 100644 --- a/cmonitor/server/TcpServer.cs +++ b/cmonitor/server/TcpServer.cs @@ -1,13 +1,11 @@ using common.libs; using common.libs.extends; using System.Buffers; -using System.IO.Pipelines; using System.Net; using System.Net.Security; using System.Net.Sockets; using System.Security.Authentication; using System.Security.Cryptography.X509Certificates; -using System.Text; namespace cmonitor.server { @@ -16,34 +14,36 @@ namespace cmonitor.server private Socket socket; private UdpClient socketUdp; private CancellationTokenSource cancellationTokenSource; - private Memory relayFLagCData = Encoding.UTF8.GetBytes("snltty.relay"); - private readonly X509Certificate serverCertificate; + private X509Certificate serverCertificate; - public Func OnPacket { get; set; } = async (connection) => { await Task.CompletedTask; }; - - public TcpServer() + private readonly IConnectionReceiveCallback connectionReceiveCallback; + public TcpServer(MessengerResolver connectionReceiveCallback) { - string path = Path.GetFullPath("./snltty.pfx"); + cancellationTokenSource = new CancellationTokenSource(); + this.connectionReceiveCallback = connectionReceiveCallback; + } + + public void Init(string certificate, string password) + { + string path = Path.GetFullPath(certificate); if (File.Exists(path)) { - serverCertificate = new X509Certificate(path, "snltty"); + serverCertificate = new X509Certificate(path, password); } else { Logger.Instance.Error($"file {path} not found"); Environment.Exit(0); } - } + public void Start(int port) { if (socket == null) { - cancellationTokenSource = new CancellationTokenSource(); socket = BindAccept(port); } } - private Socket BindAccept(int port) { IPEndPoint localEndPoint = new IPEndPoint(NetworkHelper.IPv6Support ? IPAddress.IPv6Any : IPAddress.Any, port); @@ -66,9 +66,7 @@ namespace cmonitor.server socketUdp.Client.WindowsUdpBug(); IAsyncResult result = socketUdp.BeginReceive(ReceiveCallbackUdp, null); - return socket; - } byte[] sendData = ArrayPool.Shared.Rent(20); @@ -135,12 +133,11 @@ namespace cmonitor.server { if (e.AcceptSocket != null) { - _ = BindReceiveServer(e.AcceptSocket); + _ = BeginReceiveServer(e.AcceptSocket); StartAccept(e); } } - - private async Task BindReceiveServer(Socket socket) + private async Task BeginReceiveServer(Socket socket) { try { @@ -150,10 +147,11 @@ namespace cmonitor.server } socket.KeepAlive(); SslStream sslStream = new SslStream(new NetworkStream(socket), true); - await sslStream.AuthenticateAsServerAsync(serverCertificate, false, SslProtocols.Tls13, false); + await sslStream.AuthenticateAsServerAsync(serverCertificate, false, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13, false); IConnection connection = CreateConnection(sslStream, socket.LocalEndPoint as IPEndPoint, socket.RemoteEndPoint as IPEndPoint); - _ = ProcessReceive(connection, sslStream); + + connection.BeginReceive(connectionReceiveCallback,null,true); return connection; } catch (Exception ex) @@ -164,12 +162,11 @@ namespace cmonitor.server return null; } - public bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { return true; } - public async Task BindReceive(Socket socket) + public async Task BeginReceive(Socket socket) { try { @@ -179,9 +176,10 @@ namespace cmonitor.server } socket.KeepAlive(); SslStream sslStream = new SslStream(new NetworkStream(socket), true, new RemoteCertificateValidationCallback(ValidateServerCertificate), null); - await sslStream.AuthenticateAsClientAsync("snltty.com"); + await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions { EnabledSslProtocols = SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12 | SslProtocols.Tls13 }); IConnection connection = CreateConnection(sslStream, socket.LocalEndPoint as IPEndPoint, socket.RemoteEndPoint as IPEndPoint); - _ = ProcessReceive(connection, sslStream); + + connection.BeginReceive(connectionReceiveCallback, null, true); return connection; } @@ -192,82 +190,6 @@ namespace cmonitor.server } return null; } - private async Task ProcessReceive(IConnection connection, SslStream sslStream) - { - PipeReader reader = PipeReader.Create(sslStream); - - try - { - while (cancellationTokenSource.IsCancellationRequested == false) - { - ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false); - ReadOnlySequence buffer = readResult.Buffer; - - SequencePosition end = await ReadPacket(connection, buffer).ConfigureAwait(false); - reader.AdvanceTo(buffer.Start, end); - } - } - catch (Exception ex) - { - if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) - { - Logger.Instance.Error(ex); - } - } - finally - { - reader.Complete(); - } - } - private unsafe int ReaderHead(ReadOnlySequence buffer) - { - Span span = stackalloc byte[4]; - buffer.Slice(0, 4).CopyTo(span); - return span.ToInt32(); - } - - private async Task ReadPacket(IConnection connection, ReadOnlySequence buffer) - { - //已中继 - if (connection.TcpTargetSocket != null) - { - SequencePosition position = buffer.Start; - if (buffer.TryGet(ref position, out ReadOnlyMemory data)) - { - await connection.TcpTargetSocket.WriteAsync(data).ConfigureAwait(false); - await connection.TcpTargetSocket.FlushAsync(); - } - return buffer.End; - } - //中继标识 - else if (buffer.Length == relayFLagCData.Length) - { - SequencePosition position = buffer.Start; - if (buffer.TryGet(ref position, out ReadOnlyMemory data) && data.Span.SequenceEqual(relayFLagCData.Span)) - { - return buffer.End; - } - } - //正常处理 - while (buffer.Length > 4) - { - int length = ReaderHead(buffer); - if (buffer.Length < length + 4) - { - break; - } - SequencePosition position = buffer.GetPosition(4); - if (buffer.TryGet(ref position, out ReadOnlyMemory memory)) - { - connection.ReceiveData = memory; - await OnPacket(connection).ConfigureAwait(false); - } - - SequencePosition endPosition = buffer.GetPosition(4 + length); - buffer = buffer.Slice(endPosition); - } - return buffer.Start; - } public IConnection CreateConnection(SslStream stream, IPEndPoint local, IPEndPoint remote) { @@ -287,29 +209,6 @@ namespace cmonitor.server public void Disponse() { Stop(); - OnPacket = null; - } - } - - - public sealed class AsyncUserToken - { - public IConnection Connection { get; set; } - public Socket Socket { get; set; } - public ReceiveDataBuffer DataBuffer { get; set; } = new ReceiveDataBuffer(); - public byte[] PoolBuffer { get; set; } - - public void Clear() - { - Connection?.Disponse(); - Socket = null; - - PoolBuffer = Helper.EmptyArray; - - DataBuffer.Clear(true); - - GC.Collect(); - // GC.SuppressFinalize(this); } } } diff --git a/cmonitor/server/config/Config.cs b/cmonitor/server/config/Config.cs index f7bc715c..ced13f8a 100644 --- a/cmonitor/server/config/Config.cs +++ b/cmonitor/server/config/Config.cs @@ -7,5 +7,8 @@ public sealed partial class ConfigServerInfo { public int ServicePort { get; set; } = 1802; + + public string Certificate { get; set; } = "./snltty.pfx"; + public string Password { get; set; } = "snltty"; } } diff --git a/common.libs/ReceiveDataBuffer.cs b/common.libs/ReceiveDataBuffer.cs index b9784bcc..a1eb4200 100644 --- a/common.libs/ReceiveDataBuffer.cs +++ b/common.libs/ReceiveDataBuffer.cs @@ -33,6 +33,17 @@ namespace common.libs return items; } } + + public void AddRange(ReadOnlyMemory data) + { + if (data.Length > 0) + { + BeResize(data.Length); + + data.CopyTo(items.Slice(size, data.Length)); + size += data.Length; + } + } public void AddRange(Memory data) { if(data.Length > 0) diff --git a/common.libs/socks5/Socks5Parser.cs b/common.libs/socks5/Socks5Parser.cs index e80c1ea1..d5dad2db 100644 --- a/common.libs/socks5/Socks5Parser.cs +++ b/common.libs/socks5/Socks5Parser.cs @@ -52,15 +52,15 @@ namespace common.libs.socks5 /// /// /// - public static Memory GetRemoteEndPoint(Memory data, out Socks5EnumAddressType addressType, out ushort port, out int index) + public static ReadOnlyMemory GetRemoteEndPoint(ReadOnlyMemory data, out Socks5EnumAddressType addressType, out ushort port, out int index) { //VERSION COMMAND RSV ATYPE DST.ADDR DST.PORT //去掉 VERSION COMMAND RSV - Memory memory = data.Slice(3); - Span span = memory.Span; + ReadOnlyMemory memory = data.Slice(3); + ReadOnlySpan span = memory.Span; addressType = (Socks5EnumAddressType)span[0]; index = 0; - Memory result = Helper.EmptyArray; + ReadOnlyMemory result = Helper.EmptyArray; switch (addressType) { @@ -98,7 +98,7 @@ namespace common.libs.socks5 /// /// /// - public static Memory GetUdpData(Memory span) + public static ReadOnlyMemory GetUdpData(ReadOnlyMemory span) { //RSV FRAG ATYPE DST.ADDR DST.PORT DATA //去掉 RSV FRAG RSV占俩字节 @@ -150,7 +150,7 @@ namespace common.libs.socks5 /// /// /// - public static unsafe byte[] MakeUdpResponse(IPEndPoint remoteEndPoint, Memory data, out int length) + public static unsafe byte[] MakeUdpResponse(IPEndPoint remoteEndPoint, ReadOnlyMemory data, out int length) { //RSV FRAG ATYPE DST.ADDR DST.PORT DATA //RSV占俩字节 @@ -208,7 +208,7 @@ namespace common.libs.socks5 /// /// /// - public static EnumProxyValidateDataResult ValidateRequestData(Memory data) + public static EnumProxyValidateDataResult ValidateRequestData(ReadOnlyMemory data) { /* * VERSION METHODS_COUNT METHODS @@ -234,7 +234,7 @@ namespace common.libs.socks5 /// /// /// - public static EnumProxyValidateDataResult ValidateCommandData(Memory data) + public static EnumProxyValidateDataResult ValidateCommandData(ReadOnlyMemory data) { /* * VERSION COMMAND RSV ADDRESS_TYPE DST.ADDR DST.PORT @@ -269,7 +269,7 @@ namespace common.libs.socks5 /// /// /// - public static EnumProxyValidateDataResult ValidateAuthData(Memory data, Socks5EnumAuthType authType) + public static EnumProxyValidateDataResult ValidateAuthData(ReadOnlyMemory data, Socks5EnumAuthType authType) { return authType switch { @@ -282,7 +282,7 @@ namespace common.libs.socks5 _ => EnumProxyValidateDataResult.Bad, }; } - private static EnumProxyValidateDataResult ValidateAuthPasswordData(Memory data) + private static EnumProxyValidateDataResult ValidateAuthPasswordData(ReadOnlyMemory data) { /* VERSION USERNAME_LENGTH USERNAME PASSWORD_LENGTH PASSWORD diff --git a/common.libs/winapis/Wininet.cs b/common.libs/winapis/Wininet.cs index f0a599f6..644c86dd 100644 --- a/common.libs/winapis/Wininet.cs +++ b/common.libs/winapis/Wininet.cs @@ -66,7 +66,6 @@ namespace common.libs.winapis Address = addressBytes, InterfaceIndex = (uint)adapter }; - Console.WriteLine($"delete connection {adapter}->{ip}"); int result = DeleteIpNetEntry2(ref row); } }