diff --git a/src/linker.tunnel/connection/TunnelConnectionTcp.cs b/src/linker.tunnel/connection/TunnelConnectionTcp.cs index e380f295..f593f3ee 100644 --- a/src/linker.tunnel/connection/TunnelConnectionTcp.cs +++ b/src/linker.tunnel/connection/TunnelConnectionTcp.cs @@ -77,6 +77,114 @@ namespace linker.tunnel.connection _ = ProcessHeart(); } + private Pipe pipeSender; + private Pipe pipeWriter; + public void StartPacketMerge() + { + pipeSender = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024)); + pipeWriter = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024)); + _ = Sender(); + _ = Writer(); + } + private async Task Sender() + { + while (cancellationTokenSource.IsCancellationRequested == false) + { + ReadResult result = await pipeSender.Reader.ReadAsync(); + if (result.IsCompleted && result.Buffer.IsEmpty) + { + cancellationTokenSource.Cancel(); + break; + } + + ReadOnlySequence buffer = result.Buffer; + while (buffer.Length > 0) + { + int chunkSize = (int)Math.Min(buffer.Length, 8192); + ReadOnlySequence chunk = buffer.Slice(0, chunkSize); + + if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + foreach (var item in chunk) + { + if (Stream != null) + { + await Stream.WriteAsync(item).ConfigureAwait(false); + } + else + { + await Socket.SendAsync(item, SocketFlags.None).ConfigureAwait(false); + } + } + SendBytes += chunk.Length; + } + catch (Exception ex) + { + if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + LoggerHelper.Instance.Error(ex); + } + Dispose(); + } + finally + { + if (Stream != null) semaphoreSlim.Release(); + } + + buffer = buffer.Slice(chunkSize); + + } + pipeSender.Reader.AdvanceTo(result.Buffer.End); + + } + } + private async Task Writer() + { + while (cancellationTokenSource.IsCancellationRequested == false) + { + ReadResult result = await pipeWriter.Reader.ReadAsync(); + if (result.IsCompleted && result.Buffer.IsEmpty) + { + cancellationTokenSource.Cancel(); + break; + } + + ReadOnlySequence buffer = result.Buffer; + while (buffer.Length > 0) + { + int chunkSize = (int)Math.Min(buffer.Length, 8192); + ReadOnlySequence chunk = buffer.Slice(0, chunkSize); + + if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + foreach (var item in chunk) + { + await StickPacket(item); + } + } + catch (Exception ex) + { + if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + LoggerHelper.Instance.Error(ex); + } + Dispose(); + } + finally + { + if (Stream != null) semaphoreSlim.Release(); + } + + buffer = buffer.Slice(chunkSize); + + } + pipeWriter.Reader.AdvanceTo(result.Buffer.End); + + } + } + private async Task ProcessWrite() { byte[] buffer = new byte[16 * 1024]; @@ -124,15 +232,15 @@ namespace linker.tunnel.connection { if (pipeWriter != null) { - Memory memory = pipeSender.Writer.GetMemory(buffer.Length); + Memory memory = pipeWriter.Writer.GetMemory(buffer.Length); buffer.CopyTo(memory); - pipeSender.Writer.Advance(buffer.Length); - await pipeSender.Writer.FlushAsync(); + pipeWriter.Writer.Advance(buffer.Length); + await pipeWriter.Writer.FlushAsync(); return; } - await ReadPacket(buffer).ConfigureAwait(false); + await StickPacket(buffer).ConfigureAwait(false); } - private async Task ReadPacket(ReadOnlyMemory buffer) + private async Task StickPacket(ReadOnlyMemory buffer) { //没有缓存,可能是一个完整的包 if (bufferCache.Size == 0 && buffer.Length > 4) @@ -141,7 +249,7 @@ namespace linker.tunnel.connection //数据足够,包长度+4,那就存在一个完整包 if (packageLen + 4 <= buffer.Length) { - await CallbackPacket(buffer.Slice(4, packageLen)).ConfigureAwait(false); + await WritePacket(buffer.Slice(4, packageLen)).ConfigureAwait(false); buffer = buffer.Slice(4 + packageLen); } //没有剩下的数据就不继续往下了 @@ -158,13 +266,13 @@ namespace linker.tunnel.connection { break; } - await CallbackPacket(bufferCache.Data.Slice(4, packageLen)).ConfigureAwait(false); + await WritePacket(bufferCache.Data.Slice(4, packageLen)).ConfigureAwait(false); bufferCache.RemoveRange(0, packageLen + 4); } while (bufferCache.Size > 4); } - private async Task CallbackPacket(ReadOnlyMemory packet) + private async Task WritePacket(ReadOnlyMemory packet) { ReceiveBytes += packet.Length; LastTicks.Update(); @@ -354,116 +462,6 @@ namespace linker.tunnel.connection return false; } - - private Pipe pipeSender; - private Pipe pipeWriter; - public void StartPacketMerge() - { - pipeSender = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024)); - pipeWriter = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024)); - _ = Sender(); - _ = Writer(); - } - private async Task Sender() - { - while (cancellationTokenSource.IsCancellationRequested == false) - { - ReadResult result = await pipeSender.Reader.ReadAsync(); - if (result.IsCompleted && result.Buffer.IsEmpty) - { - cancellationTokenSource.Cancel(); - break; - } - - ReadOnlySequence buffer = result.Buffer; - while (buffer.Length > 0) - { - int chunkSize = (int)Math.Min(buffer.Length, 8192); - ReadOnlySequence chunk = buffer.Slice(0, chunkSize); - - if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false); - try - { - foreach (var item in chunk) - { - if (Stream != null) - { - await Stream.WriteAsync(item).ConfigureAwait(false); - } - else - { - await Socket.SendAsync(item, SocketFlags.None).ConfigureAwait(false); - } - } - SendBytes += chunk.Length; - } - catch (Exception ex) - { - if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) - { - LoggerHelper.Instance.Error(ex); - } - Dispose(); - } - finally - { - if (Stream != null) semaphoreSlim.Release(); - } - - buffer = buffer.Slice(chunkSize); - - } - pipeSender.Reader.AdvanceTo(result.Buffer.End); - - } - } - private async Task Writer() - { - while (cancellationTokenSource.IsCancellationRequested == false) - { - ReadResult result = await pipeWriter.Reader.ReadAsync(); - if (result.IsCompleted && result.Buffer.IsEmpty) - { - cancellationTokenSource.Cancel(); - break; - } - - ReadOnlySequence buffer = result.Buffer; - while (buffer.Length > 0) - { - int chunkSize = (int)Math.Min(buffer.Length, 8192); - ReadOnlySequence chunk = buffer.Slice(0, chunkSize); - - if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false); - try - { - foreach (var item in chunk) - { - ReadPacket(item); - } - } - catch (Exception ex) - { - if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) - { - LoggerHelper.Instance.Error(ex); - } - Dispose(); - } - finally - { - if (Stream != null) semaphoreSlim.Release(); - } - - buffer = buffer.Slice(chunkSize); - - } - pipeWriter.Reader.AdvanceTo(result.Buffer.End); - - } - } - - public void Dispose() { LastTicks.Clear(); diff --git a/version.txt b/version.txt index 99c3301d..5f49d696 100644 --- a/version.txt +++ b/version.txt @@ -1,5 +1,5 @@ v1.9.1 -2025-09-12 15:34:04 +2025-09-12 15:50:59 1. 一些累计更新 2. 服务器转发多节点 3. 虚拟网卡下伪造ACK为TCP-in-TCP隧道提速