This commit is contained in:
snltty
2025-09-14 16:04:01 +08:00
parent 368ce5fd05
commit 40987c2ea7
11 changed files with 151 additions and 282 deletions

View File

@@ -77,12 +77,14 @@ namespace linker.messenger.relay.client.transport
if (relayInfo.SSL)
{
sslStream = new SslStream(new NetworkStream(socket, false), false, ValidateServerCertificate, null);
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions
{
EnabledSslProtocols = SslProtocols.Tls13 | SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls,
CertificateRevocationCheckMode = X509RevocationMode.NoCheck,
ClientCertificates = new X509CertificateCollection { messengerStore.Certificate }
}).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
}
return new TunnelConnectionTcp

View File

@@ -46,18 +46,6 @@ namespace linker.messenger.tuntap
{
Add(connection);
connection.BeginReceive(this, null);
if (tuntapConfigTransfer.Info.TcpMerge)
{
connection.StartPacketMerge();
}
/*
if (connection.ProtocolType == TunnelProtocolType.Tcp && tuntapConfigTransfer.Info.FakeAck && tuntapDecenter.HasSwitchFlag(connection.RemoteMachineId, TuntapSwitch.FakeAck))
{
connection.SendBuffer = new byte[4 * 1024];
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Debug($"[{connection.RemoteMachineId}][{connection.RemoteMachineName}] use fake ack");
}
*/
//有哪些目标IP用了相同目标隧道更新一下
tuntapCidrConnectionManager.Update(connection);
}

View File

@@ -74,7 +74,9 @@ namespace linker.messenger
SslStream sslStream = new SslStream(networkStream, true, ValidateServerCertificate,null);
try
{
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsServerAsync(messengerStore.Certificate, OperatingSystem.IsAndroid(), SslProtocols.Tls13 | SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls, false).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
IConnection connection = CreateConnection(sslStream, networkStream, socket, socket.LocalEndPoint as IPEndPoint, socket.RemoteEndPoint as IPEndPoint);
connection.BeginReceive(this, null, true);
@@ -139,6 +141,7 @@ namespace linker.messenger
SslStream sslStream = new SslStream(networkStream, true, ValidateServerCertificate, null);
try
{
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions
{
AllowRenegotiation = true,
@@ -146,6 +149,7 @@ namespace linker.messenger
CertificateRevocationCheckMode = X509RevocationMode.NoCheck,
ClientCertificates = new X509CertificateCollection { messengerStore.Certificate }
}).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
IConnection connection = CreateConnection(sslStream, networkStream, socket, socket.LocalEndPoint as IPEndPoint, socket.RemoteEndPoint as IPEndPoint);
connection.BeginReceive(this, null, true);

View File

@@ -180,10 +180,6 @@ namespace linker.tunnel.connection
/// <param name="userToken">自定义数据,回调带上</param>
public void BeginReceive(ITunnelConnectionReceiveCallback callback, object userToken);
public void StartPacketMerge()
{
}
public string ToString();
public bool Equals(ITunnelConnection connection);
}

View File

@@ -60,6 +60,10 @@ namespace linker.tunnel.connection
private readonly byte[] pingBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.tcp.ping");
private readonly byte[] pongBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.tcp.pong");
private Pipe pipeSender;
private Pipe pipeWriter;
private byte[] packetBuffer = new byte[4096];
/// <summary>
/// 开始接收数据
/// </summary>
@@ -73,146 +77,40 @@ namespace linker.tunnel.connection
this.userToken = userToken;
cancellationTokenSource = new CancellationTokenSource();
pipeSender = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024, resumeWriterThreshold: 512 * 1024, useSynchronizationContext: false));
pipeWriter = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024, resumeWriterThreshold: 512 * 1024, useSynchronizationContext: false));
_ = ProcessWrite();
_ = ProcessHeart();
}
private Pipe pipeSender;
private Pipe pipeWriter;
public void StartPacketMerge()
{
pipeSender = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024));
pipeWriter = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024));
_ = Sender();
_ = Writer();
}
private async Task Sender()
{
while (cancellationTokenSource.IsCancellationRequested == false)
{
ReadResult result = await pipeSender.Reader.ReadAsync();
if (result.IsCompleted && result.Buffer.IsEmpty)
{
cancellationTokenSource.Cancel();
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
while (buffer.Length > 0)
{
int chunkSize = (int)Math.Min(buffer.Length, 8192);
ReadOnlySequence<byte> chunk = buffer.Slice(0, chunkSize);
if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
foreach (var item in chunk)
{
if (Stream != null)
{
await Stream.WriteAsync(item).ConfigureAwait(false);
}
else
{
await Socket.SendAsync(item, SocketFlags.None).ConfigureAwait(false);
}
}
SendBytes += chunk.Length;
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
if (Stream != null) semaphoreSlim.Release();
}
buffer = buffer.Slice(chunkSize);
}
pipeSender.Reader.AdvanceTo(result.Buffer.End);
}
}
private async Task Writer()
{
while (cancellationTokenSource.IsCancellationRequested == false)
{
ReadResult result = await pipeWriter.Reader.ReadAsync();
if (result.IsCompleted && result.Buffer.IsEmpty)
{
cancellationTokenSource.Cancel();
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
while (buffer.Length > 0)
{
int chunkSize = (int)Math.Min(buffer.Length, 8192);
ReadOnlySequence<byte> chunk = buffer.Slice(0, chunkSize);
if (Stream != null) await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
foreach (var item in chunk)
{
await StickPacket(item);
}
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
if (Stream != null) semaphoreSlim.Release();
}
buffer = buffer.Slice(chunkSize);
}
pipeWriter.Reader.AdvanceTo(result.Buffer.End);
}
_ = Recver();
_ = ProcessHeart();
}
private async Task ProcessWrite()
{
byte[] buffer = new byte[16 * 1024];
try
{
int length = 0;
while (cancellationTokenSource.IsCancellationRequested == false)
{
if (Stream != null)
{
length = await Stream.ReadAsync(buffer).ConfigureAwait(false);
if (length == 0) break;
await RecvPacket(buffer.AsMemory(0, length)).ConfigureAwait(false);
Memory<byte> memory = pipeWriter.Writer.GetMemory(8 * 1024);
length = await Stream.ReadAsync(memory).ConfigureAwait(false);
if (length == 0)
{
break;
}
pipeWriter.Writer.Advance(length);
await pipeWriter.Writer.FlushAsync().ConfigureAwait(false);
}
else
{
length = await Socket.ReceiveAsync(buffer.AsMemory(), SocketFlags.None).ConfigureAwait(false);
Memory<byte> memory = pipeWriter.Writer.GetMemory(8 * 1024);
length = await Socket.ReceiveAsync(memory, SocketFlags.None).ConfigureAwait(false);
if (length == 0) break;
await RecvPacket(buffer.AsMemory(0, length)).ConfigureAwait(false);
while (Socket.Available > 0)
{
length = Socket.Receive(buffer);
if (length == 0) break;
await RecvPacket(buffer.AsMemory(0, length)).ConfigureAwait(false);
}
pipeWriter.Writer.Advance(length);
await pipeWriter.Writer.FlushAsync().ConfigureAwait(false);
}
}
}
@@ -228,53 +126,61 @@ namespace linker.tunnel.connection
Dispose();
}
}
private async Task RecvPacket(ReadOnlyMemory<byte> buffer)
private async Task Recver()
{
if (pipeWriter != null)
while (cancellationTokenSource.IsCancellationRequested == false)
{
Memory<byte> memory = pipeWriter.Writer.GetMemory(buffer.Length);
buffer.CopyTo(memory);
pipeWriter.Writer.Advance(buffer.Length);
await pipeWriter.Writer.FlushAsync();
return;
}
await StickPacket(buffer).ConfigureAwait(false);
}
private async Task StickPacket(ReadOnlyMemory<byte> buffer)
{
//没有缓存,可能是一个完整的包
if (bufferCache.Size == 0 && buffer.Length > 4)
{
int packageLen = buffer.Span.ToInt32();
//数据足够,包长度+4那就存在一个完整包
if (packageLen + 4 <= buffer.Length)
try
{
await WritePacket(buffer.Slice(4, packageLen)).ConfigureAwait(false);
buffer = buffer.Slice(4 + packageLen);
ReadResult result = await pipeWriter.Reader.ReadAsync().ConfigureAwait(false);
if (result.IsCompleted && result.Buffer.IsEmpty)
{
cancellationTokenSource.Cancel();
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
ReceiveBytes += buffer.Length;
long offset = 0;
do
{
int packageLen = 0;
if (buffer.First.Length >= 4) packageLen = buffer.First.ToInt32();
else
{
buffer.Slice(0, 4).CopyTo(packetBuffer);
packageLen = packetBuffer.ToInt32();
}
if (packageLen + 4 > buffer.Length) break;
ReadOnlySequence<byte> temp = buffer.Slice(4, packageLen);
if (packetBuffer.Length < temp.Length) packetBuffer = new byte[temp.Length];
temp.CopyTo(packetBuffer);
await WritePacket(packetBuffer.AsMemory(0, packageLen)).ConfigureAwait(false);
offset += 4 + packageLen;
buffer = buffer.Slice(4 + packageLen);
} while (buffer.Length > 4);
pipeWriter.Reader.AdvanceTo(result.Buffer.GetPosition(offset), result.Buffer.End);
}
//没有剩下的数据就不继续往下了
if (buffer.Length == 0)
return;
}
//添加到缓存
bufferCache.AddRange(buffer);
do
{
//取出一个一个包
int packageLen = bufferCache.Data.Span.ToInt32();
if (packageLen + 4 > bufferCache.Size)
catch (Exception ex)
{
break;
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
await WritePacket(bufferCache.Data.Slice(4, packageLen)).ConfigureAwait(false);
bufferCache.RemoveRange(0, packageLen + 4);
} while (bufferCache.Size > 4);
}
}
private async Task WritePacket(ReadOnlyMemory<byte> packet)
{
ReceiveBytes += packet.Length;
LastTicks.Update();
if (packet.Length == pingBytes.Length && packet.Span.Slice(0, pingBytes.Length - 4).SequenceEqual(pingBytes.AsSpan(0, pingBytes.Length - 4)))
{
@@ -338,130 +244,85 @@ namespace linker.tunnel.connection
data.Length.ToBytes(heartData.AsSpan());
data.AsMemory().CopyTo(heartData.AsMemory(4));
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
if (Stream != null)
{
await Stream.WriteAsync(heartData.AsMemory(0, length)).ConfigureAwait(false);
}
else
{
await Socket.SendAsync(heartData.AsMemory(0, length)).ConfigureAwait(false);
}
SendBytes += data.Length;
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
semaphoreSlim.Release();
}
await SendAsync(heartData.AsMemory(0, length));
ArrayPool<byte>.Shared.Return(heartData);
}
private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
private async Task Sender()
{
while (cancellationTokenSource.IsCancellationRequested == false)
{
try
{
ReadResult result = await pipeSender.Reader.ReadAsync().ConfigureAwait(false);
if (result.IsCompleted && result.Buffer.IsEmpty)
{
cancellationTokenSource.Cancel();
break;
}
if (result.Buffer.IsEmpty)
{
continue;
}
ReadOnlySequence<byte> buffer = result.Buffer;
foreach (ReadOnlyMemory<byte> memoryBlock in result.Buffer)
{
if (Stream != null)
{
await Stream.WriteAsync(memoryBlock).ConfigureAwait(false);
}
else
{
int sendt = 0;
do
{
ReadOnlyMemory<byte> sendBlock = memoryBlock.Slice(sendt);
int remaining = await Socket.SendAsync(sendBlock, SocketFlags.None).ConfigureAwait(false);
if (remaining == 0) break;
sendt += remaining;
} while (sendt < memoryBlock.Length);
}
}
pipeSender.Reader.AdvanceTo(buffer.End);
LastTicks.Update();
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
}
}
private readonly object _writeLock = new object();
public async Task<bool> SendAsync(ReadOnlyMemory<byte> data)
{
if (callback == null) return false;
if (pipeSender != null)
lock (_writeLock)
{
Memory<byte> memory = pipeSender.Writer.GetMemory(data.Length);
data.CopyTo(memory);
pipeSender.Writer.Advance(data.Length);
await pipeSender.Writer.FlushAsync();
return true;
}
if (Stream != null)
{
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
}
try
{
if (Stream != null)
{
await Stream.WriteAsync(data).ConfigureAwait(false);
}
else
{
await Socket.SendAsync(data, SocketFlags.None).ConfigureAwait(false);
}
SendBytes += data.Length;
return true;
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
if (Stream != null)
semaphoreSlim.Release();
}
await pipeSender.Writer.FlushAsync().ConfigureAwait(false);
return false;
}
public async Task<bool> SendAsync(byte[] buffer, int offset, int length)
{
if (callback == null) return false;
if (pipeSender != null)
{
ReadOnlyMemory<byte> data = buffer.AsMemory(offset, length);
Memory<byte> memory = pipeSender.Writer.GetMemory(data.Length);
data.CopyTo(memory);
pipeSender.Writer.Advance(data.Length);
await pipeSender.Writer.FlushAsync();
return true;
}
if (Stream != null)
{
await semaphoreSlim.WaitAsync().ConfigureAwait(false);
}
try
{
if (Stream != null)
{
await Stream.WriteAsync(buffer.AsMemory(offset, length)).ConfigureAwait(false);
}
else
{
await Socket.SendAsync(buffer.AsMemory(offset, length), SocketFlags.None).ConfigureAwait(false);
}
SendBytes += length;
LastTicks.Update();
return true;
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
Dispose();
}
finally
{
if (Stream != null)
semaphoreSlim.Release();
}
return false;
return await SendAsync(buffer.AsMemory(offset, length)).ConfigureAwait(false);
}
public void Dispose()
{
LastTicks.Clear();
@@ -480,6 +341,8 @@ namespace linker.tunnel.connection
Socket?.SafeClose();
packetBuffer = null;
try
{
pipeSender?.Writer.Complete();

View File

@@ -208,6 +208,7 @@ namespace linker.tunnel.transport
IPEndPoint remoteEP = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromMilliseconds(500)).ConfigureAwait(false);
//绑定一个udp用来给QUIC链接
Socket quicUdp = ListenQuicConnect(tunnelTransportInfo.BufferSize, remoteUdp, remoteEP);
#pragma warning disable SYSLIB0039 // 类型或成员已过时
QuicConnection connection = connection = await QuicConnection.ConnectAsync(new QuicClientConnectionOptions
{
RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, (quicUdp.LocalEndPoint as IPEndPoint).Port),
@@ -225,6 +226,7 @@ namespace linker.tunnel.transport
}
}
}).AsTask().WaitAsync(TimeSpan.FromMilliseconds(1000)).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
QuicStream quicStream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional).ConfigureAwait(false);
return new TunnelConnectionMsQuic
{
@@ -693,6 +695,7 @@ namespace linker.tunnel.transport
ListenEndPoint = new IPEndPoint(IPAddress.Any, 0),
ConnectionOptionsCallback = (connection, hello, token) =>
{
#pragma warning disable SYSLIB0039 // 类型或成员已过时
return ValueTask.FromResult(new QuicServerConnectionOptions
{
MaxInboundBidirectionalStreams = 65535,
@@ -707,6 +710,7 @@ namespace linker.tunnel.transport
ApplicationProtocols = new List<SslApplicationProtocol> { SslApplicationProtocol.Http3 }
}
});
#pragma warning restore SYSLIB0039 // 类型或成员已过时
}
}).ConfigureAwait(false);
quicListenEP = new IPEndPoint(IPAddress.Loopback, listener.LocalEndPoint.Port);

View File

@@ -178,11 +178,13 @@ namespace linker.tunnel.transport
if (tunnelTransportInfo.SSL)
{
sslStream = new SslStream(new NetworkStream(targetSocket, false), false, ValidateServerCertificate, null);
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions {
EnabledSslProtocols = SslProtocols.Tls13 | SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls,
CertificateRevocationCheckMode = X509RevocationMode.NoCheck,
ClientCertificates = new X509CertificateCollection { certificate }
}).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
}
return new TunnelConnectionTcp
@@ -280,7 +282,9 @@ namespace linker.tunnel.transport
}
sslStream = new SslStream(new NetworkStream(socket, false), false, ValidateServerCertificate,null);
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsServerAsync(certificate, OperatingSystem.IsAndroid(), SslProtocols.Tls13 | SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls, false).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
}
TunnelConnectionTcp result = new TunnelConnectionTcp

View File

@@ -176,12 +176,14 @@ namespace linker.tunnel.transport
if (state.SSL)
{
sslStream = new SslStream(new NetworkStream(socket, false), false, ValidateServerCertificate, null);
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions
{
EnabledSslProtocols = SslProtocols.Tls13 | SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls,
CertificateRevocationCheckMode = X509RevocationMode.NoCheck,
ClientCertificates = new X509CertificateCollection { certificate }
}).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
}
return new TunnelConnectionTcp
@@ -237,7 +239,9 @@ namespace linker.tunnel.transport
}
sslStream = new SslStream(new NetworkStream(socket, false), false, ValidateServerCertificate, null);
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsServerAsync(certificate, OperatingSystem.IsAndroid(), SslProtocols.Tls13 | SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls, false).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
}
return new TunnelConnectionTcp

View File

@@ -262,7 +262,9 @@ namespace linker.tunnel.transport
}
sslStream = new SslStream(new NetworkStream(socket, false), false, ValidateServerCertificate, null);
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsServerAsync(certificate, OperatingSystem.IsAndroid(), SslProtocols.Tls13 | SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls, false).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
}
TunnelConnectionTcp result = new TunnelConnectionTcp
@@ -327,11 +329,13 @@ namespace linker.tunnel.transport
if (tunnelTransportInfo.SSL)
{
sslStream = new SslStream(new NetworkStream(targetSocket, false), false, ValidateServerCertificate, null);
#pragma warning disable SYSLIB0039 // 类型或成员已过时
await sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions {
EnabledSslProtocols = SslProtocols.Tls13 | SslProtocols.Tls12 | SslProtocols.Tls11 | SslProtocols.Tls,
CertificateRevocationCheckMode = X509RevocationMode.NoCheck,
ClientCertificates = new X509CertificateCollection { certificate }
}).ConfigureAwait(false);
#pragma warning restore SYSLIB0039 // 类型或成员已过时
}
return new TunnelConnectionTcp

View File

@@ -19,7 +19,7 @@
<el-checkbox class="mgr-1" v-model="state.ruleForm.AutoConnect" label="自动连接" size="large" />
<el-checkbox class="mgr-1" v-model="state.ruleForm.Multicast" label="禁用广播" size="large" />
<el-checkbox class="mgr-1" v-model="state.ruleForm.DisableNat" label="禁用NAT" size="large" />
<el-checkbox class="mgr-1" v-model="state.ruleForm.TcpMerge" label="TCP包合并" size="large" />
<!-- <el-checkbox class="mgr-1" v-model="state.ruleForm.TcpMerge" label="TCP包合并" size="large" /> -->
<el-checkbox class="mgr-1" v-model="state.ruleForm.InterfaceOrder" label="网卡顺序" size="large" />
<!-- <el-checkbox v-model="state.ruleForm.FakeAck" label="伪ACK" size="large" /> -->
</el-form-item>

View File

@@ -1,5 +1,5 @@
v1.9.1
2025-09-12 17:21:56
2025-09-14 16:04:01
1. 一些累计更新
2. 服务器转发多节点
3. 虚拟网卡下伪造ACK为TCP-in-TCP隧道提速