This commit is contained in:
snltty
2024-05-26 17:20:04 +08:00
parent 2c18d1d347
commit baccb07b09
4 changed files with 130 additions and 11 deletions

View File

@@ -176,7 +176,6 @@ export default {
_getSignList();
}
const handleForwardRefresh = ()=>{
refreshForward();
ElMessage.success('刷新成功');
}
@@ -227,6 +226,8 @@ export default {
}
const handlePageRefresh = ()=>{
handlePageChange();
refreshTunnel();
refreshTuntap();
ElMessage.success('刷新成功');
}
const handlePageChange = () => {
@@ -239,7 +240,12 @@ export default {
}
onMounted(() => {
subWebsocketState((state) => { if (state) _getSignList(); });
subWebsocketState((state) => {
if (state){
handlePageChange();
_getSignList();
}
});
_getSignList();
_getSignList1();
_getTuntapInfo();

View File

@@ -50,8 +50,8 @@ namespace cmonitor.client.tunnel
public bool Connected { get; }
public Task SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
public void BeginReceive(ITunnelConnectionReceiveCallback callback, object userToken, bool byFrame = true);
public Task SendAsync(ReadOnlyMemory<byte> data);
public void BeginReceive(ITunnelConnectionReceiveCallback callback, object userToken, bool framing = true);
public void Close();
@@ -60,6 +60,13 @@ namespace cmonitor.client.tunnel
public sealed class TunnelConnectionTcp : ITunnelConnection
{
public TunnelConnectionTcp()
{
sendCancellationTokenSource = new CancellationTokenSource();
senderPipe = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024, resumeWriterThreshold: 128 * 1024));
_ = ProcessSender();
}
public string RemoteMachineName { get; init; }
public string TransactionId { get; init; }
@@ -212,9 +219,59 @@ namespace cmonitor.client.tunnel
return buffer.Start;
}
public async Task SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
private CancellationTokenSource sendCancellationTokenSource;
private Pipe senderPipe;
private async Task ProcessSender()
{
await Socket.WriteAsync(data, cancellationToken);
var reader = senderPipe.Reader;
try
{
while (sendCancellationTokenSource.IsCancellationRequested == false)
{
ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = readResult.Buffer;
if (buffer.Length == 0)
{
break;
}
SequencePosition position = buffer.Start;
while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
{
await Socket.WriteAsync(memory);
}
reader.AdvanceTo(buffer.End);
}
}
catch (Exception ex)
{
if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
Logger.Instance.Error(ex);
}
}
finally
{
Close();
await reader.CompleteAsync();
}
}
public async Task SendAsync(ReadOnlyMemory<byte> data)
{
try
{
await senderPipe.Writer.WriteAsync(data, sendCancellationTokenSource.Token);
await senderPipe.Writer.FlushAsync(sendCancellationTokenSource.Token);
}
catch (Exception ex)
{
if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
Logger.Instance.Error(ex);
}
}
}
private void Cancel()
@@ -231,6 +288,16 @@ namespace cmonitor.client.tunnel
Cancel();
Socket?.Close();
Socket?.Dispose();
try
{
sendCancellationTokenSource?.Cancel();
senderPipe.Writer.Complete();
senderPipe.Reader.Complete();
}
catch (Exception)
{
}
}
public override string ToString()
@@ -239,5 +306,4 @@ namespace cmonitor.client.tunnel
}
}
}

View File

@@ -52,6 +52,7 @@ namespace cmonitor.plugins.tunnel.transport
{
return null;
}
await Task.Delay(500);
ITunnelConnection connection = await ConnectForward(tunnelTransportInfo);
if (connection != null)
{
@@ -263,7 +264,7 @@ namespace cmonitor.plugins.tunnel.transport
try
{
ITunnelConnection connection = await tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(3000));
ITunnelConnection connection = await tcs.Task.WaitAsync(TimeSpan.FromMilliseconds(5000));
return connection;
}
catch (Exception)

View File

@@ -5,6 +5,7 @@ using System.IO.Pipelines;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Reflection.PortableExecutable;
namespace cmonitor.server
{
@@ -172,6 +173,10 @@ namespace cmonitor.server
local = new IPEndPoint(new IPAddress(local.Address.GetAddressBytes()[^4..]), local.Port);
}
LocalAddress = local;
sendCancellationTokenSource = new CancellationTokenSource();
senderPipe = new Pipe(new PipeOptions(pauseWriterThreshold: 1 * 1024 * 1024, resumeWriterThreshold: 128 * 1024));
_ = ProcessSender();
}
public override bool Connected => TcpSourceSocket != null && TcpSourceSocket.CanWrite;
@@ -308,13 +313,53 @@ namespace cmonitor.server
return buffer.Start;
}
private CancellationTokenSource sendCancellationTokenSource;
private Pipe senderPipe;
private async Task ProcessSender()
{
var reader = senderPipe.Reader;
try
{
while (sendCancellationTokenSource.IsCancellationRequested == false)
{
ReadResult readResult = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = readResult.Buffer;
if (buffer.Length == 0)
{
break;
}
SequencePosition position = buffer.Start;
while (buffer.TryGet(ref position, out ReadOnlyMemory<byte> memory))
{
await TcpSourceSocket.WriteAsync(memory);
}
reader.AdvanceTo(buffer.End);
}
}
catch (Exception ex)
{
if (Logger.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
Logger.Instance.Error(ex);
}
}
finally
{
Disponse();
await reader.CompleteAsync();
}
}
public override async Task<bool> SendAsync(ReadOnlyMemory<byte> data)
{
if (Connected)
{
try
{
await TcpSourceSocket.WriteAsync(data);
await senderPipe.Writer.WriteAsync(data, sendCancellationTokenSource.Token);
await senderPipe.Writer.FlushAsync(sendCancellationTokenSource.Token);
return true;
}
catch (Exception ex)
@@ -331,7 +376,6 @@ namespace cmonitor.server
return await SendAsync(data.AsMemory(0, length));
}
public override void Cancel()
{
callback = null;
@@ -342,13 +386,13 @@ namespace cmonitor.server
bufferCache.Clear(true);
}
public override void Disponse()
{
Cancel();
base.Disponse();
try
{
sendCancellationTokenSource?.Cancel();
if (TcpSourceSocket != null)
{
TcpSourceSocket.ShutdownAsync();
@@ -357,6 +401,8 @@ namespace cmonitor.server
TcpTargetSocket?.ShutdownAsync();
TcpTargetSocket?.Dispose();
}
senderPipe.Writer.Complete();
senderPipe.Reader.Complete();
}
catch (Exception)
{