diff --git a/cmonitor.web.client/src/views/devices/List.vue b/cmonitor.web.client/src/views/devices/List.vue index ee3de97a..681e11f5 100644 --- a/cmonitor.web.client/src/views/devices/List.vue +++ b/cmonitor.web.client/src/views/devices/List.vue @@ -176,7 +176,6 @@ export default { _getSignList(); } const handleForwardRefresh = ()=>{ - refreshForward(); ElMessage.success('刷新成功'); } @@ -227,6 +226,8 @@ export default { } const handlePageRefresh = ()=>{ handlePageChange(); + refreshTunnel(); + refreshTuntap(); ElMessage.success('刷新成功'); } const handlePageChange = () => { @@ -239,7 +240,12 @@ export default { } onMounted(() => { - subWebsocketState((state) => { if (state) _getSignList(); }); + subWebsocketState((state) => { + if (state){ + handlePageChange(); + _getSignList(); + } + }); _getSignList(); _getSignList1(); _getTuntapInfo(); diff --git a/cmonitor/client/tunnel/ITunnelConnection.cs b/cmonitor/client/tunnel/ITunnelConnection.cs index 8da8cd83..3586582e 100644 --- a/cmonitor/client/tunnel/ITunnelConnection.cs +++ b/cmonitor/client/tunnel/ITunnelConnection.cs @@ -50,8 +50,8 @@ namespace cmonitor.client.tunnel public bool Connected { get; } - public Task SendAsync(ReadOnlyMemory data, CancellationToken cancellationToken = default); - public void BeginReceive(ITunnelConnectionReceiveCallback callback, object userToken, bool byFrame = true); + public Task SendAsync(ReadOnlyMemory data); + public void BeginReceive(ITunnelConnectionReceiveCallback callback, object userToken, bool framing = true); public void Close(); @@ -60,6 +60,13 @@ namespace cmonitor.client.tunnel public sealed class TunnelConnectionTcp : ITunnelConnection { + public TunnelConnectionTcp() + { + sendCancellationTokenSource = new CancellationTokenSource(); + senderPipe = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024, resumeWriterThreshold: 128 * 1024)); + _ = ProcessSender(); + } + public string RemoteMachineName { get; init; } public string TransactionId { get; init; } @@ -212,9 +219,59 @@ namespace cmonitor.client.tunnel return buffer.Start; } - public async Task SendAsync(ReadOnlyMemory data, CancellationToken cancellationToken = default) + + private CancellationTokenSource sendCancellationTokenSource; + private Pipe senderPipe; + private async Task ProcessSender() { - await Socket.WriteAsync(data, cancellationToken); + var reader = senderPipe.Reader; + try + { + while (sendCancellationTokenSource.IsCancellationRequested == false) + { + ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false); + ReadOnlySequence buffer = readResult.Buffer; + if (buffer.Length == 0) + { + break; + } + + SequencePosition position = buffer.Start; + while (buffer.TryGet(ref position, out ReadOnlyMemory memory)) + { + await Socket.WriteAsync(memory); + } + + reader.AdvanceTo(buffer.End); + } + } + catch (Exception ex) + { + if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + Logger.Instance.Error(ex); + } + } + finally + { + Close(); + await reader.CompleteAsync(); + } + } + public async Task SendAsync(ReadOnlyMemory data) + { + try + { + await senderPipe.Writer.WriteAsync(data, sendCancellationTokenSource.Token); + await senderPipe.Writer.FlushAsync(sendCancellationTokenSource.Token); + } + catch (Exception ex) + { + if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + Logger.Instance.Error(ex); + } + } } private void Cancel() @@ -231,6 +288,16 @@ namespace cmonitor.client.tunnel Cancel(); Socket?.Close(); Socket?.Dispose(); + + try + { + sendCancellationTokenSource?.Cancel(); + senderPipe.Writer.Complete(); + senderPipe.Reader.Complete(); + } + catch (Exception) + { + } } public override string ToString() @@ -239,5 +306,4 @@ namespace cmonitor.client.tunnel } } - } diff --git a/cmonitor/plugins/tunnel/transport/TransportTcpNutssb.cs b/cmonitor/plugins/tunnel/transport/TransportTcpNutssb.cs index 220507c7..976d7ed0 100644 --- a/cmonitor/plugins/tunnel/transport/TransportTcpNutssb.cs +++ b/cmonitor/plugins/tunnel/transport/TransportTcpNutssb.cs @@ -52,6 +52,7 @@ namespace cmonitor.plugins.tunnel.transport { return null; } + await Task.Delay(500); ITunnelConnection connection = await ConnectForward(tunnelTransportInfo); if (connection != null) { @@ -263,7 +264,7 @@ namespace cmonitor.plugins.tunnel.transport try { - ITunnelConnection connection = await tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(3000)); + ITunnelConnection connection = await tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(5000)); return connection; } catch (Exception) diff --git a/cmonitor/server/IConnection.cs b/cmonitor/server/IConnection.cs index 25a945a1..2c50abcd 100644 --- a/cmonitor/server/IConnection.cs +++ b/cmonitor/server/IConnection.cs @@ -5,6 +5,7 @@ using System.IO.Pipelines; using System.Net; using System.Net.Security; using System.Net.Sockets; +using System.Reflection.PortableExecutable; namespace cmonitor.server { @@ -172,6 +173,10 @@ namespace cmonitor.server local = new IPEndPoint(new IPAddress(local.Address.GetAddressBytes()[^4..]), local.Port); } LocalAddress = local; + + sendCancellationTokenSource = new CancellationTokenSource(); + senderPipe = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024, resumeWriterThreshold: 128 * 1024)); + _ = ProcessSender(); } public override bool Connected => TcpSourceSocket != null && TcpSourceSocket.CanWrite; @@ -308,13 +313,53 @@ namespace cmonitor.server return buffer.Start; } + + private CancellationTokenSource sendCancellationTokenSource; + private Pipe senderPipe; + private async Task ProcessSender() + { + var reader = senderPipe.Reader; + try + { + while (sendCancellationTokenSource.IsCancellationRequested == false) + { + ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false); + ReadOnlySequence buffer = readResult.Buffer; + if (buffer.Length == 0) + { + break; + } + + SequencePosition position = buffer.Start; + while (buffer.TryGet(ref position, out ReadOnlyMemory memory)) + { + await TcpSourceSocket.WriteAsync(memory); + } + + reader.AdvanceTo(buffer.End); + } + } + catch (Exception ex) + { + if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG) + { + Logger.Instance.Error(ex); + } + } + finally + { + Disponse(); + await reader.CompleteAsync(); + } + } public override async Task SendAsync(ReadOnlyMemory data) { if (Connected) { try { - await TcpSourceSocket.WriteAsync(data); + await senderPipe.Writer.WriteAsync(data, sendCancellationTokenSource.Token); + await senderPipe.Writer.FlushAsync(sendCancellationTokenSource.Token); return true; } catch (Exception ex) @@ -331,7 +376,6 @@ namespace cmonitor.server return await SendAsync(data.AsMemory(0, length)); } - public override void Cancel() { callback = null; @@ -342,13 +386,13 @@ namespace cmonitor.server bufferCache.Clear(true); } - public override void Disponse() { Cancel(); base.Disponse(); try { + sendCancellationTokenSource?.Cancel(); if (TcpSourceSocket != null) { TcpSourceSocket.ShutdownAsync(); @@ -357,6 +401,8 @@ namespace cmonitor.server TcpTargetSocket?.ShutdownAsync(); TcpTargetSocket?.Dispose(); } + senderPipe.Writer.Complete(); + senderPipe.Reader.Complete(); } catch (Exception) {