This commit is contained in:
snltty
2025-09-12 15:50:59 +08:00
parent f489320899
commit cad0e2e248
2 changed files with 117 additions and 119 deletions

View File

@@ -77,6 +77,114 @@ namespace linker.tunnel.connection
_ = ProcessHeart(); _ = ProcessHeart();
} }
private Pipe pipeSender;
private Pipe pipeWriter;
public void StartPacketMerge()
{
pipeSender = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024));
pipeWriter = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024));
_ = Sender();
_ = Writer();
}
private async Task Sender()
{
while (cancellationTokenSource.IsCancellationRequested == false)
{
ReadResult result = await pipeSender.Reader.ReadAsync();
if (result.IsCompleted && result.Buffer.IsEmpty)
{
cancellationTokenSource.Cancel();
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
while (buffer.Length > 0)
{
int chunkSize = (int)Math.Min(buffer.Length, 8192);
ReadOnlySequence<byte> chunk = buffer.Slice(0, chunkSize);
if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
foreach (var item in chunk)
{
if (Stream != null)
{
await Stream.WriteAsync(item).ConfigureAwait(false);
}
else
{
await Socket.SendAsync(item, SocketFlags.None).ConfigureAwait(false);
}
}
SendBytes += chunk.Length;
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
if (Stream != null) semaphoreSlim.Release();
}
buffer = buffer.Slice(chunkSize);
}
pipeSender.Reader.AdvanceTo(result.Buffer.End);
}
}
private async Task Writer()
{
while (cancellationTokenSource.IsCancellationRequested == false)
{
ReadResult result = await pipeWriter.Reader.ReadAsync();
if (result.IsCompleted && result.Buffer.IsEmpty)
{
cancellationTokenSource.Cancel();
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
while (buffer.Length > 0)
{
int chunkSize = (int)Math.Min(buffer.Length, 8192);
ReadOnlySequence<byte> chunk = buffer.Slice(0, chunkSize);
if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
foreach (var item in chunk)
{
await StickPacket(item);
}
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
if (Stream != null) semaphoreSlim.Release();
}
buffer = buffer.Slice(chunkSize);
}
pipeWriter.Reader.AdvanceTo(result.Buffer.End);
}
}
private async Task ProcessWrite() private async Task ProcessWrite()
{ {
byte[] buffer = new byte[16 * 1024]; byte[] buffer = new byte[16 * 1024];
@@ -124,15 +232,15 @@ namespace linker.tunnel.connection
{ {
if (pipeWriter != null) if (pipeWriter != null)
{ {
Memory<byte> memory = pipeSender.Writer.GetMemory(buffer.Length); Memory<byte> memory = pipeWriter.Writer.GetMemory(buffer.Length);
buffer.CopyTo(memory); buffer.CopyTo(memory);
pipeSender.Writer.Advance(buffer.Length); pipeWriter.Writer.Advance(buffer.Length);
await pipeSender.Writer.FlushAsync(); await pipeWriter.Writer.FlushAsync();
return; return;
} }
await ReadPacket(buffer).ConfigureAwait(false); await StickPacket(buffer).ConfigureAwait(false);
} }
private async Task ReadPacket(ReadOnlyMemory<byte> buffer) private async Task StickPacket(ReadOnlyMemory<byte> buffer)
{ {
//没有缓存,可能是一个完整的包 //没有缓存,可能是一个完整的包
if (bufferCache.Size == 0 && buffer.Length > 4) if (bufferCache.Size == 0 && buffer.Length > 4)
@@ -141,7 +249,7 @@ namespace linker.tunnel.connection
//数据足够,包长度+4那就存在一个完整包 //数据足够,包长度+4那就存在一个完整包
if (packageLen + 4 <= buffer.Length) if (packageLen + 4 <= buffer.Length)
{ {
await CallbackPacket(buffer.Slice(4, packageLen)).ConfigureAwait(false); await WritePacket(buffer.Slice(4, packageLen)).ConfigureAwait(false);
buffer = buffer.Slice(4 + packageLen); buffer = buffer.Slice(4 + packageLen);
} }
//没有剩下的数据就不继续往下了 //没有剩下的数据就不继续往下了
@@ -158,13 +266,13 @@ namespace linker.tunnel.connection
{ {
break; break;
} }
await CallbackPacket(bufferCache.Data.Slice(4, packageLen)).ConfigureAwait(false); await WritePacket(bufferCache.Data.Slice(4, packageLen)).ConfigureAwait(false);
bufferCache.RemoveRange(0, packageLen + 4); bufferCache.RemoveRange(0, packageLen + 4);
} while (bufferCache.Size > 4); } while (bufferCache.Size > 4);
} }
private async Task CallbackPacket(ReadOnlyMemory<byte> packet) private async Task WritePacket(ReadOnlyMemory<byte> packet)
{ {
ReceiveBytes += packet.Length; ReceiveBytes += packet.Length;
LastTicks.Update(); LastTicks.Update();
@@ -354,116 +462,6 @@ namespace linker.tunnel.connection
return false; return false;
} }
private Pipe pipeSender;
private Pipe pipeWriter;
public void StartPacketMerge()
{
pipeSender = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024));
pipeWriter = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024));
_ = Sender();
_ = Writer();
}
private async Task Sender()
{
while (cancellationTokenSource.IsCancellationRequested == false)
{
ReadResult result = await pipeSender.Reader.ReadAsync();
if (result.IsCompleted && result.Buffer.IsEmpty)
{
cancellationTokenSource.Cancel();
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
while (buffer.Length > 0)
{
int chunkSize = (int)Math.Min(buffer.Length, 8192);
ReadOnlySequence<byte> chunk = buffer.Slice(0, chunkSize);
if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
foreach (var item in chunk)
{
if (Stream != null)
{
await Stream.WriteAsync(item).ConfigureAwait(false);
}
else
{
await Socket.SendAsync(item, SocketFlags.None).ConfigureAwait(false);
}
}
SendBytes += chunk.Length;
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
if (Stream != null) semaphoreSlim.Release();
}
buffer = buffer.Slice(chunkSize);
}
pipeSender.Reader.AdvanceTo(result.Buffer.End);
}
}
private async Task Writer()
{
while (cancellationTokenSource.IsCancellationRequested == false)
{
ReadResult result = await pipeWriter.Reader.ReadAsync();
if (result.IsCompleted && result.Buffer.IsEmpty)
{
cancellationTokenSource.Cancel();
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
while (buffer.Length > 0)
{
int chunkSize = (int)Math.Min(buffer.Length, 8192);
ReadOnlySequence<byte> chunk = buffer.Slice(0, chunkSize);
if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
foreach (var item in chunk)
{
ReadPacket(item);
}
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
if (Stream != null) semaphoreSlim.Release();
}
buffer = buffer.Slice(chunkSize);
}
pipeWriter.Reader.AdvanceTo(result.Buffer.End);
}
}
public void Dispose() public void Dispose()
{ {
LastTicks.Clear(); LastTicks.Clear();

View File

@@ -1,5 +1,5 @@
v1.9.1 v1.9.1
2025-09-12 15:34:04 2025-09-12 15:50:59
1. 一些累计更新 1. 一些累计更新
2. 服务器转发多节点 2. 服务器转发多节点
3. 虚拟网卡下伪造ACK为TCP-in-TCP隧道提速 3. 虚拟网卡下伪造ACK为TCP-in-TCP隧道提速