diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 8a62e609..01763490 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -37,7 +37,7 @@ jobs: release_name: v1.4.4.${{ steps.date.outputs.today }} draft: false prerelease: false - body: 1. 增加流量统计(暂时显示服务端流量) + body: "1. 总览,和详细流量统计,一眼知道服务器流量花在哪里\r\n2. 优化信标。减少流量,没有操作时尽量不产生流量" - name: upload-win-x86-oss id: upload-win-x86-oss uses: tvrcgo/oss-action@v0.1.1 diff --git a/linker.libs/LastTicksManager.cs b/linker.libs/LastTicksManager.cs new file mode 100644 index 00000000..6b6d2175 --- /dev/null +++ b/linker.libs/LastTicksManager.cs @@ -0,0 +1,45 @@ +using System; + +namespace linker.libs +{ + public sealed class LastTicksManager + { + private long ticks = Environment.TickCount64; + + public void Update() + { + ticks = Environment.TickCount64; + } + public bool Less(long ms) + { + return Environment.TickCount64 - ticks <= ms; + } + public bool Greater(long ms) + { + return Environment.TickCount64 - ticks > ms; + } + public bool Equal(long ms) + { + return ticks == ms; + } + public bool NotEqual(long ms) + { + return ticks != ms; + } + + public long Diff() + { + return Environment.TickCount64 - ticks; + } + public bool Timeout(long ms) + { + return ticks == 0 || Environment.TickCount64 - ticks > ms; + } + public void Clear() + { + ticks = 0; + } + + } + +} diff --git a/linker.tunnel/connection/ITunnelConnection.cs b/linker.tunnel/connection/ITunnelConnection.cs index 5727da87..43beada9 100644 --- a/linker.tunnel/connection/ITunnelConnection.cs +++ b/linker.tunnel/connection/ITunnelConnection.cs @@ -1,4 +1,5 @@ -using System.Net; +using linker.libs; +using System.Net; namespace linker.tunnel.connection { @@ -139,7 +140,7 @@ namespace linker.tunnel.connection /// /// 最后通信时间 /// - public long LastTicks { get; } + public LastTicksManager LastTicks { get; } /// /// 发送ping diff --git a/linker.tunnel/connection/TunnelConnectionMsQuic.cs b/linker.tunnel/connection/TunnelConnectionMsQuic.cs index 08c0d1c1..62a9134d 100644 --- a/linker.tunnel/connection/TunnelConnectionMsQuic.cs +++ b/linker.tunnel/connection/TunnelConnectionMsQuic.cs @@ -32,12 +32,12 @@ namespace linker.tunnel.connection public byte BufferSize { get; init; } = 3; - public bool Connected => Stream != null && Stream.CanWrite && LastTicks > 0; + public bool Connected => Stream != null && Stream.CanWrite && LastTicks.NotEqual(0); public int Delay { get; private set; } public long SendBytes { get; private set; } public long ReceiveBytes { get; private set; } - public long LastTicks { get; private set; } = Environment.TickCount64; + public LastTicksManager LastTicks { get; private set; } = new LastTicksManager(); [JsonIgnore] public QuicStream Stream { get; init; } @@ -56,7 +56,7 @@ namespace linker.tunnel.connection private bool framing; private ReceiveDataBuffer bufferCache = new ReceiveDataBuffer(); - private long pingStart = Environment.TickCount64; + private LastTicksManager pingTicks = new(); private byte[] pingBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.tcp.ping"); private byte[] pongBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.tcp.pong"); private bool pong = true; @@ -148,7 +148,7 @@ namespace linker.tunnel.connection private async Task CallbackPacket(Memory packet) { ReceiveBytes += packet.Length; - LastTicks = Environment.TickCount64; + LastTicks.Update(); if (packet.Length == pingBytes.Length && (packet.Span.SequenceEqual(pingBytes) || packet.Span.SequenceEqual(pongBytes))) { if (packet.Span.SequenceEqual(pingBytes)) @@ -157,7 +157,7 @@ namespace linker.tunnel.connection } else if (packet.Span.SequenceEqual(pongBytes)) { - Delay = (int)(Environment.TickCount64 - pingStart); + Delay = (int)pingTicks.Diff(); pong = true; } } @@ -179,9 +179,9 @@ namespace linker.tunnel.connection { while (cancellationTokenSource.IsCancellationRequested == false) { - if (Environment.TickCount64 - LastTicks > 3000) + if (LastTicks.Greater(3000)) { - pingStart = Environment.TickCount64; + pingTicks.Update(); await SendPingPong(pingBytes).ConfigureAwait(false); } await Task.Delay(3000).ConfigureAwait(false); @@ -223,7 +223,7 @@ namespace linker.tunnel.connection { if (pong == false) return; pong = false; - pingStart = Environment.TickCount64; + pingTicks.Update(); await SendPingPong(pingBytes).ConfigureAwait(false); } private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); @@ -254,7 +254,7 @@ namespace linker.tunnel.connection public void Dispose() { - LastTicks = 0; + LastTicks.Clear(); if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) LoggerHelper.Instance.Error($"tunnel connection writer offline {ToString()}"); diff --git a/linker.tunnel/connection/TunnelConnectionTcp.cs b/linker.tunnel/connection/TunnelConnectionTcp.cs index 08e5db4b..77d3cd82 100644 --- a/linker.tunnel/connection/TunnelConnectionTcp.cs +++ b/linker.tunnel/connection/TunnelConnectionTcp.cs @@ -27,12 +27,12 @@ namespace linker.tunnel.connection public IPEndPoint IPEndPoint { get; init; } public bool SSL { get; init; } public byte BufferSize { get; init; } = 3; - public bool Connected => Socket != null && LastTicks > 0 && Environment.TickCount64 - LastTicks < 15000; + public bool Connected => Socket != null && LastTicks.Timeout(15000) == false; public int Delay { get; private set; } public long SendBytes { get; private set; } public long ReceiveBytes { get; private set; } - public long LastTicks { get; private set; } = Environment.TickCount64; + public LastTicksManager LastTicks { get; private set; } = new LastTicksManager(); [JsonIgnore] public SslStream Stream { get; init; } @@ -47,7 +47,7 @@ namespace linker.tunnel.connection private bool framing; private ReceiveDataBuffer bufferCache = new ReceiveDataBuffer(); - private long pingStart = Environment.TickCount64; + private LastTicksManager pingTicks = new LastTicksManager(); private byte[] pingBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.tcp.ping"); private byte[] pongBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.tcp.pong"); private bool pong = true; @@ -157,7 +157,7 @@ namespace linker.tunnel.connection private async Task CallbackPacket(Memory packet) { ReceiveBytes += packet.Length; - LastTicks = Environment.TickCount64; + LastTicks.Update(); if (packet.Length == pingBytes.Length) { if (packet.Span.SequenceEqual(pingBytes)) @@ -166,7 +166,7 @@ namespace linker.tunnel.connection } else if (packet.Span.SequenceEqual(pongBytes)) { - Delay = (int)(Environment.TickCount64 - pingStart); + Delay = (int)pingTicks.Diff(); pong = true; } return; @@ -192,9 +192,9 @@ namespace linker.tunnel.connection break; } - if (Environment.TickCount64 - LastTicks > 3000) + if (LastTicks.Greater(3000)) { - pingStart = Environment.TickCount64; + pingTicks.Update(); await SendPingPong(pingBytes).ConfigureAwait(false); } @@ -246,7 +246,7 @@ namespace linker.tunnel.connection { if (pong == false) return; pong = false; - pingStart = Environment.TickCount64; + pingTicks.Update(); await SendPingPong(pingBytes).ConfigureAwait(false); } public async Task SendAsync(ReadOnlyMemory data) @@ -286,7 +286,7 @@ namespace linker.tunnel.connection public void Dispose() { - LastTicks = 0; + LastTicks.Clear(); if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG) LoggerHelper.Instance.Error($"tunnel connection {this.GetHashCode()} writer offline {ToString()}"); diff --git a/linker.tunnel/connection/TunnelConnectionUdp.cs b/linker.tunnel/connection/TunnelConnectionUdp.cs index a9de34b9..df13518b 100644 --- a/linker.tunnel/connection/TunnelConnectionUdp.cs +++ b/linker.tunnel/connection/TunnelConnectionUdp.cs @@ -27,11 +27,11 @@ namespace linker.tunnel.connection public bool SSL { get; init; } public byte BufferSize { get; init; } = 3; - public bool Connected => UdpClient != null && LastTicks > 0 && Environment.TickCount64 - LastTicks < 15000; + public bool Connected => UdpClient != null && LastTicks.Timeout(15000) == false; public int Delay { get; private set; } public long SendBytes { get; private set; } public long ReceiveBytes { get; private set; } - public long LastTicks { get; private set; } = Environment.TickCount64; + public LastTicksManager LastTicks { get; private set; } = new LastTicksManager(); public bool Receive { get; init; } @@ -56,7 +56,7 @@ namespace linker.tunnel.connection private CancellationTokenSource cancellationTokenSource; private object userToken; - private long pingStart = Environment.TickCount64; + private LastTicksManager pingTicks = new LastTicksManager(); private byte[] pingBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.udp.ping"); private byte[] pongBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.udp.pong"); private byte[] finBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.udp.fing"); @@ -129,7 +129,7 @@ namespace linker.tunnel.connection private async Task CallbackPacket(Memory packet) { ReceiveBytes += packet.Length; - LastTicks = Environment.TickCount64; + LastTicks.Update(); Memory memory = packet.Slice(4); if (memory.Length == pingBytes.Length && memory.Span.Slice(0, pingBytes.Length - 4).SequenceEqual(pingBytes.AsSpan(0, pingBytes.Length - 4))) @@ -140,7 +140,7 @@ namespace linker.tunnel.connection } else if (memory.Span.SequenceEqual(pongBytes)) { - Delay = (int)(Environment.TickCount64 - pingStart); + Delay = (int)pingTicks.Diff(); pong = true; } else if (memory.Span.SequenceEqual(finBytes)) @@ -178,9 +178,9 @@ namespace linker.tunnel.connection break; } - if (Environment.TickCount64 - LastTicks > 3000) + if (LastTicks.Greater(3000)) { - pingStart = Environment.TickCount64; + pingTicks.Update(); await SendPingPong(pingBytes).ConfigureAwait(false); } await Task.Delay(3000).ConfigureAwait(false); @@ -218,7 +218,7 @@ namespace linker.tunnel.connection { if (pong == false) return; pong = false; - pingStart = Environment.TickCount64; + pingTicks.Update(); await SendPingPong(pingBytes).ConfigureAwait(false); } @@ -258,7 +258,7 @@ namespace linker.tunnel.connection SendPingPong(finBytes).ContinueWith((result) => { - LastTicks = 0; + LastTicks.Clear(); if (Receive == true) UdpClient?.SafeClose(); uUdpClient = null; diff --git a/linker.tunnel/transport/TransportUdpPortMap.cs b/linker.tunnel/transport/TransportUdpPortMap.cs index 543424e7..0e97e243 100644 --- a/linker.tunnel/transport/TransportUdpPortMap.cs +++ b/linker.tunnel/transport/TransportUdpPortMap.cs @@ -351,8 +351,7 @@ namespace linker.tunnel.transport { TimerHelper.SetInterval(() => { - long ticks = Environment.TickCount64; - var keys = connectionsDic.Where(c => (c.Value.Connection == null && ticks - c.Value.LastTicks > 5000) || (c.Value.Connection != null && c.Value.Connection.Connected == false)).Select(c => c.Key).ToList(); + var keys = connectionsDic.Where(c => (c.Value.Connection == null && c.Value.LastTicks.Greater(5000)) || (c.Value.Connection != null && c.Value.Connection.Connected == false)).Select(c => c.Key).ToList(); foreach (var item in keys) { connectionsDic.TryRemove(item, out _); @@ -370,7 +369,7 @@ namespace linker.tunnel.transport public sealed class ConnectionCacheInfo { - public long LastTicks { get; set; } = Environment.TickCount64; + public LastTicksManager LastTicks { get; set; } = new LastTicksManager(); public TunnelConnectionUdp Connection { get; set; } } diff --git a/linker.web/src/apis/flow.js b/linker.web/src/apis/flow.js index b9bcf3b1..e1a1156b 100644 --- a/linker.web/src/apis/flow.js +++ b/linker.web/src/apis/flow.js @@ -2,4 +2,10 @@ import { sendWebsocketMsg } from './request' export const getFlows = () => { return sendWebsocketMsg('flowClient/GetFlows'); +} +export const getMessengerFlows = () => { + return sendWebsocketMsg('flowClient/GetMessengerFlows'); +} +export const getSForwardFlows = (data) => { + return sendWebsocketMsg('flowClient/GetSForwardFlows', data); } \ No newline at end of file diff --git a/linker.web/src/apis/tuntap.js b/linker.web/src/apis/tuntap.js index d61237f7..0b027393 100644 --- a/linker.web/src/apis/tuntap.js +++ b/linker.web/src/apis/tuntap.js @@ -22,4 +22,8 @@ export const updateTuntap = (name) => { } export const refreshTuntap = () => { return sendWebsocketMsg('tuntapclient/refresh'); -} \ No newline at end of file +} +export const subscribePing = () => { + return sendWebsocketMsg('tuntapclient/SubscribePing'); +} + diff --git a/linker.web/src/apis/updater.js b/linker.web/src/apis/updater.js index de78af3c..195a5325 100644 --- a/linker.web/src/apis/updater.js +++ b/linker.web/src/apis/updater.js @@ -30,4 +30,7 @@ export const confirmServer = (version) => { } export const exitServer = () => { return sendWebsocketMsg('updaterclient/exitserver'); +} +export const subscribeUpdater = () => { + return sendWebsocketMsg('updaterclient/Subscribe'); } \ No newline at end of file diff --git a/linker.web/src/views/full/devices/List.vue b/linker.web/src/views/full/devices/List.vue index 720f83ec..943c1b7c 100644 --- a/linker.web/src/views/full/devices/List.vue +++ b/linker.web/src/views/full/devices/List.vue @@ -81,7 +81,7 @@ export default { handleTunnelConnections,clearConnectionsTimeout } = provideConnections(); - const {_getUpdater,clearUpdaterTimeout} = provideUpdater(); + const {_getUpdater,_subscribeUpdater,clearUpdaterTimeout} = provideUpdater(); const {_getAccessInfo,clearAccessTimeout} = provideAccess(); @@ -158,6 +158,7 @@ export default { _getSForwardInfo(); _getUpdater(); + _subscribeUpdater(); _getAccessInfo(); diff --git a/linker.web/src/views/full/devices/TuntapEdit.vue b/linker.web/src/views/full/devices/TuntapEdit.vue index 38730269..f9100ddf 100644 --- a/linker.web/src/views/full/devices/TuntapEdit.vue +++ b/linker.web/src/views/full/devices/TuntapEdit.vue @@ -9,17 +9,14 @@ / - + - + + @@ -104,6 +101,7 @@ export default { ShowDelay: tuntap.value.current.ShowDelay, AutoConnect: tuntap.value.current.AutoConnect, Upgrade: tuntap.value.current.Upgrade, + Multicast: tuntap.value.current.Multicast, Forwards:tuntap.value.current.Forwards.length == 0 ? [ {ListenAddr:'0.0.0.0',ListenPort:0,ConnectAddr:'0.0.0.0',ConnectPort:0} @@ -165,6 +163,7 @@ export default { json.ShowDelay = state.ruleForm.ShowDelay; json.AutoConnect = state.ruleForm.AutoConnect; json.Upgrade = state.ruleForm.Upgrade; + json.Multicast = state.ruleForm.Multicast; json.Forwards = state.ruleForm.Forwards; json.Forwards.forEach(c=>{ c.ListenPort=+c.ListenPort; diff --git a/linker.web/src/views/full/devices/tuntap.js b/linker.web/src/views/full/devices/tuntap.js index 9967c792..1c7f03c4 100644 --- a/linker.web/src/views/full/devices/tuntap.js +++ b/linker.web/src/views/full/devices/tuntap.js @@ -1,7 +1,7 @@ import { injectGlobalData } from "@/provide"; import { ElMessage } from "element-plus"; import { inject, provide, ref } from "vue" -import { getTuntapInfo, refreshTuntap } from "@/apis/tuntap"; +import { getTuntapInfo, refreshTuntap, subscribePing } from "@/apis/tuntap"; const tuntapSymbol = Symbol(); export const provideTuntap = () => { @@ -57,6 +57,7 @@ export const provideTuntap = () => { tuntap.value.list = res.List; } tuntap.value.timer = setTimeout(_getTuntapInfo, 1100); + subscribePing(); }).catch((e) => { tuntap.value.timer = setTimeout(_getTuntapInfo, 1100); }); diff --git a/linker.web/src/views/full/devices/updater.js b/linker.web/src/views/full/devices/updater.js index 779bb9c3..e1478d9c 100644 --- a/linker.web/src/views/full/devices/updater.js +++ b/linker.web/src/views/full/devices/updater.js @@ -1,4 +1,4 @@ -import { getUpdater } from "@/apis/updater"; +import { getUpdater, subscribeUpdater } from "@/apis/updater"; import { injectGlobalData } from "@/provide"; import { inject, provide, ref } from "vue"; @@ -9,7 +9,9 @@ export const provideUpdater = () => { timer: 0, list: {}, hashcode: 0, - current: { Version: '', Msg: [], DateTime: '', Status: 0, Length: 0, Current: 0 } + current: { Version: '', Msg: [], DateTime: '', Status: 0, Length: 0, Current: 0 }, + + subscribeTimer: 0 }); provide(updaterSymbol, updater); const _getUpdater = () => { @@ -36,15 +38,23 @@ export const provideUpdater = () => { updater.value.timer = setTimeout(_getUpdater, 800); }); } + const _subscribeUpdater = () => { + subscribeUpdater().then(() => { + updater.value.subscribeTimer = setTimeout(_subscribeUpdater, 5000); + }).catch(() => { + updater.value.subscribeTimer = setTimeout(_subscribeUpdater, 5000); + }); + } const clearUpdaterTimeout = () => { clearTimeout(updater.value.timer); + clearTimeout(updater.value.subscribeTimer); } return { - updater, _getUpdater, clearUpdaterTimeout + updater, _getUpdater, _subscribeUpdater, clearUpdaterTimeout } } export const useUpdater = () => { diff --git a/linker.web/src/views/full/status/ServerFlow.vue b/linker.web/src/views/full/status/ServerFlow.vue index c0744e9e..1ae6380b 100644 --- a/linker.web/src/views/full/status/ServerFlow.vue +++ b/linker.web/src/views/full/status/ServerFlow.vue @@ -6,7 +6,7 @@
- + + + \ No newline at end of file diff --git a/linker.web/src/views/full/status/ServerFlowSForward.vue b/linker.web/src/views/full/status/ServerFlowSForward.vue new file mode 100644 index 00000000..99533206 --- /dev/null +++ b/linker.web/src/views/full/status/ServerFlowSForward.vue @@ -0,0 +1,158 @@ + + + + + \ No newline at end of file diff --git a/linker.web/src/views/net/List.vue b/linker.web/src/views/net/List.vue index 886fa7b3..08513569 100644 --- a/linker.web/src/views/net/List.vue +++ b/linker.web/src/views/net/List.vue @@ -52,7 +52,7 @@ export default { const {devices, machineId, _getSignList, _getSignList1,handleDeviceEdit, handlePageChange, handlePageSizeChange, handleDel,clearDevicesTimeout} = provideDevices(); const {tuntap,_getTuntapInfo,handleTuntapRefresh,clearTuntapTimeout,handleTuntapEdit,sortTuntapIP} = provideTuntap(); - const {_getUpdater,clearUpdaterTimeout} = provideUpdater(); + const {_getUpdater,_subscribeUpdater,clearUpdaterTimeout} = provideUpdater(); onMounted(() => { handlePageChange(); @@ -63,6 +63,7 @@ export default { _getTuntapInfo(); _getUpdater(); + _subscribeUpdater(); }); onUnmounted(() => { clearDevicesTimeout(); diff --git a/linker/linker.csproj b/linker/linker.csproj index d4dea817..2136c1c4 100644 --- a/linker/linker.csproj +++ b/linker/linker.csproj @@ -19,7 +19,8 @@ linker snltty snltty - 1. 增加流量统计(暂时显示服务端流量) + 1. 总览,和详细流量统计,一眼知道服务器流量花在哪里 +2. 优化信标。减少流量,没有操作时尽量不产生流量 snltty https://github.com/snltty/linker https://github.com/snltty/linker diff --git a/linker/plugins/flow/FlowClientApiController.cs b/linker/plugins/flow/FlowClientApiController.cs index 1c023c26..e9385537 100644 --- a/linker/plugins/flow/FlowClientApiController.cs +++ b/linker/plugins/flow/FlowClientApiController.cs @@ -6,6 +6,8 @@ using linker.plugins.client; using linker.plugins.capi; using linker.plugins.messenger; using linker.plugins.flow.messenger; +using linker.libs.extends; +using linker.plugins.sforward.proxy; namespace linker.plugins.flow { @@ -37,7 +39,34 @@ namespace linker.plugins.flow } return new FlowInfo(); } + public async Task> GetMessengerFlows(ApiControllerParamsInfo param) + { + MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap + { + Connection = clientSignInState.Connection, + MessengerId = (ushort)FlowMessengerIds.Messenger, + }); + if (resp.Code == MessageResponeCodes.OK && resp.Data.Length > 0) + { + return MemoryPackSerializer.Deserialize>(resp.Data.Span); + } + return new Dictionary(); + } + public async Task GetSForwardFlows(ApiControllerParamsInfo param) + { + MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap + { + Connection = clientSignInState.Connection, + MessengerId = (ushort)FlowMessengerIds.SForward, + Payload = MemoryPackSerializer.Serialize(param.Content.DeJson()) + }); + if (resp.Code == MessageResponeCodes.OK && resp.Data.Length > 0) + { + return MemoryPackSerializer.Deserialize(resp.Data.Span); + } + return new SForwardFlowResponseInfo(); + } } } diff --git a/linker/plugins/flow/FlowTransfer.cs b/linker/plugins/flow/FlowTransfer.cs index 1292e793..d2619386 100644 --- a/linker/plugins/flow/FlowTransfer.cs +++ b/linker/plugins/flow/FlowTransfer.cs @@ -7,6 +7,7 @@ namespace linker.plugins.flow { private List flows = new List(); + private readonly ServiceProvider serviceProvider; public FlowTransfer(ServiceProvider serviceProvider) { diff --git a/linker/plugins/flow/IFlow.cs b/linker/plugins/flow/IFlow.cs index b0e667df..ccd0a32b 100644 --- a/linker/plugins/flow/IFlow.cs +++ b/linker/plugins/flow/IFlow.cs @@ -1,4 +1,5 @@ -using MemoryPack; +using linker.libs; +using MemoryPack; using System.Text.Json.Serialization; namespace linker.plugins.flow @@ -11,7 +12,7 @@ namespace linker.plugins.flow } [MemoryPackable] - public sealed partial class FlowItemInfo + public partial class FlowItemInfo { public ulong ReceiveBytes { get; set; } public ulong SendtBytes { get; set; } @@ -23,11 +24,10 @@ namespace linker.plugins.flow [MemoryPackable] public sealed partial class FlowInfo { - public Dictionary Resolvers { get; set; } - public Dictionary Messangers { get; set; } - + public Dictionary Items { get; set; } public DateTime Start { get; set; } public DateTime Now { get; set; } } + } diff --git a/linker/plugins/flow/messenger/FlowMessenger.cs b/linker/plugins/flow/messenger/FlowMessenger.cs index edaba1d9..3f402fb5 100644 --- a/linker/plugins/flow/messenger/FlowMessenger.cs +++ b/linker/plugins/flow/messenger/FlowMessenger.cs @@ -1,4 +1,5 @@ using linker.plugins.messenger; +using linker.plugins.sforward.proxy; using MemoryPack; namespace linker.plugins.flow.messenger @@ -8,14 +9,16 @@ namespace linker.plugins.flow.messenger private readonly MessengerResolver messengerResolver; private readonly FlowTransfer flowTransfer; private readonly MessengerFlow messengerFlow; + private readonly SForwardFlow sForwardFlow; private DateTime start = DateTime.Now; - public FlowMessenger(MessengerResolver messengerResolver, FlowTransfer flowTransfer, MessengerFlow messengerFlow) + public FlowMessenger(MessengerResolver messengerResolver, FlowTransfer flowTransfer, MessengerFlow messengerFlow, SForwardFlow sForwardFlow) { this.messengerResolver = messengerResolver; this.flowTransfer = flowTransfer; this.messengerFlow = messengerFlow; + this.sForwardFlow = sForwardFlow; } [MessengerId((ushort)FlowMessengerIds.List)] @@ -23,14 +26,26 @@ namespace linker.plugins.flow.messenger { FlowInfo serverFlowInfo = new FlowInfo { - Messangers = messengerFlow.GetFlows(), - Resolvers = flowTransfer.GetFlows(), + Items = flowTransfer.GetFlows(), Start = start, Now = DateTime.Now, }; connection.Write(MemoryPackSerializer.Serialize(serverFlowInfo)); } + [MessengerId((ushort)FlowMessengerIds.Messenger)] + public void Messenger(IConnection connection) + { + connection.Write(MemoryPackSerializer.Serialize(messengerFlow.GetFlows())); + } + + [MessengerId((ushort)FlowMessengerIds.SForward)] + public void SForward(IConnection connection) + { + sForwardFlow.Update(); + SForwardFlowRequestInfo info = MemoryPackSerializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); + connection.Write(MemoryPackSerializer.Serialize(sForwardFlow.GetFlows(info))); + } } } diff --git a/linker/plugins/flow/messenger/FlowMessengerIds.cs b/linker/plugins/flow/messenger/FlowMessengerIds.cs index 54d07612..36ae15f1 100644 --- a/linker/plugins/flow/messenger/FlowMessengerIds.cs +++ b/linker/plugins/flow/messenger/FlowMessengerIds.cs @@ -5,6 +5,9 @@ Min = 2700, List = 2701, + Messenger = 2702, + Relay = 2703, + SForward = 2704, Max = 2799 } diff --git a/linker/plugins/forward/proxy/ForwardProxyUdp.cs b/linker/plugins/forward/proxy/ForwardProxyUdp.cs index 7aecb91a..5e447f35 100644 --- a/linker/plugins/forward/proxy/ForwardProxyUdp.cs +++ b/linker/plugins/forward/proxy/ForwardProxyUdp.cs @@ -149,7 +149,7 @@ namespace linker.plugins.forward.proxy { token.Connection = tunnelToken.Connection; await token.TargetSocket.SendToAsync(tunnelToken.Proxy.Data, target).ConfigureAwait(false); - token.Update(); + token.LastTicks.Update(); return; } @@ -221,7 +221,7 @@ namespace linker.plugins.forward.proxy { SocketReceiveFromResult result = await socket.ReceiveFromAsync(udpToken.Buffer, SocketFlags.None, target).ConfigureAwait(false); udpToken.Proxy.Data = udpToken.Buffer.AsMemory(0, result.ReceivedBytes); - udpToken.Update(); + udpToken.LastTicks.Update(); await SendToConnection(udpToken).ConfigureAwait(false); } } @@ -384,8 +384,8 @@ namespace linker.plugins.forward.proxy public byte[] Buffer { get; set; } - public long LastTime { get; set; } = Environment.TickCount64; - public bool Timeout => Environment.TickCount64 - LastTime > 60*60*1000; + public LastTicksManager LastTicks { get; set; } = new LastTicksManager(); + public bool Timeout => LastTicks.Timeout(60 * 60 * 1000); public void Clear() { TargetSocket?.SafeClose(); @@ -393,10 +393,6 @@ namespace linker.plugins.forward.proxy GC.Collect(); GC.SuppressFinalize(this); } - public void Update() - { - LastTime = Environment.TickCount64; - } } public sealed class ConnectIdUdpComparer : IEqualityComparer { diff --git a/linker/plugins/messenger/IConnection.cs b/linker/plugins/messenger/IConnection.cs index 5878c290..35884353 100644 --- a/linker/plugins/messenger/IConnection.cs +++ b/linker/plugins/messenger/IConnection.cs @@ -246,7 +246,7 @@ namespace linker.plugins.messenger } - public override bool Connected => SourceSocket != null && ticks > 0 && Environment.TickCount64 - ticks < 15000; + public override bool Connected => SourceSocket != null && lastTicks.Timeout(15000) == false; private IConnectionReceiveCallback callback; @@ -256,8 +256,8 @@ namespace linker.plugins.messenger private bool framing; private ReceiveDataBuffer bufferCache = new ReceiveDataBuffer(); - private long ticks = Environment.TickCount64; - private long pingStart = Environment.TickCount64; + private LastTicksManager lastTicks = new LastTicksManager(); + private LastTicksManager pingTicks =new LastTicksManager(); private static byte[] pingBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.tcp.ping"); private static byte[] pongBytes = Encoding.UTF8.GetBytes($"{Helper.GlobalString}.tcp.pong"); private bool pong = true; @@ -297,7 +297,7 @@ namespace linker.plugins.messenger break; } ReceiveBytes += length; - ticks = Environment.TickCount64; + lastTicks.Update(); await ReadPacket(buffer.AsMemory(0, length)).ConfigureAwait(false); } } @@ -310,7 +310,7 @@ namespace linker.plugins.messenger { LoggerHelper.Instance.Error(ex); } - + } finally { @@ -362,7 +362,7 @@ namespace linker.plugins.messenger } else if (packet.Span.SequenceEqual(pongBytes)) { - Delay = (int)(Environment.TickCount64 - pingStart); + Delay = (int)pingTicks.Diff(); pong = true; } } @@ -384,9 +384,9 @@ namespace linker.plugins.messenger { while (cancellationTokenSource.IsCancellationRequested == false) { - if (Environment.TickCount64 - ticks > 3000) + if (lastTicks.Greater(3000)) { - pingStart = Environment.TickCount64; + pingTicks .Update(); await SendPingPong(pingBytes).ConfigureAwait(false); } @@ -435,7 +435,7 @@ namespace linker.plugins.messenger { if (pong == false) return; pong = false; - pingStart = Environment.TickCount64; + pingTicks.Update(); await SendPingPong(pingBytes); } public override async Task SendAsync(ReadOnlyMemory data) @@ -448,7 +448,7 @@ namespace linker.plugins.messenger else await SourceSocket.SendAsync(data, cancellationTokenSourceWrite.Token).ConfigureAwait(false); SendBytes += data.Length; - ticks = Environment.TickCount64; + lastTicks.Update(); } catch (OperationCanceledException) { @@ -496,7 +496,7 @@ namespace linker.plugins.messenger SourceSocket?.SafeClose(); TargetSocket?.SafeClose(); - ticks = 0; + lastTicks.Clear(); } } diff --git a/linker/plugins/messenger/MessengerFlow.cs b/linker/plugins/messenger/MessengerFlow.cs index 1b832725..fdf087b1 100644 --- a/linker/plugins/messenger/MessengerFlow.cs +++ b/linker/plugins/messenger/MessengerFlow.cs @@ -1,15 +1,15 @@ using linker.plugins.flow; -using static System.Reflection.Metadata.BlobBuilder; namespace linker.plugins.messenger { - public sealed class MessengerFlow : IFlow + public sealed class MessengerFlow : IFlow { public ulong ReceiveBytes { get; private set; } public ulong SendtBytes { get; private set; } public string FlowName => "Messenger"; private Dictionary flows { get; } = new Dictionary(); + public MessengerFlow() { } diff --git a/linker/plugins/sforward/SForwardStartup.cs b/linker/plugins/sforward/SForwardStartup.cs index 6b513fda..d3ff7e59 100644 --- a/linker/plugins/sforward/SForwardStartup.cs +++ b/linker/plugins/sforward/SForwardStartup.cs @@ -28,6 +28,10 @@ namespace linker.plugins.sforward serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); + + serviceCollection.AddSingleton(); + + } public void AddServer(ServiceCollection serviceCollection, FileConfig config, Assembly[] assemblies) @@ -37,6 +41,7 @@ namespace linker.plugins.sforward serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); + serviceCollection.AddSingleton(); } bool added = false; diff --git a/linker/plugins/sforward/proxy/SForwardFlow.cs b/linker/plugins/sforward/proxy/SForwardFlow.cs new file mode 100644 index 00000000..3cd3cdeb --- /dev/null +++ b/linker/plugins/sforward/proxy/SForwardFlow.cs @@ -0,0 +1,153 @@ +using linker.libs; +using linker.libs.extends; +using linker.plugins.flow; +using MemoryPack; +using System.Text.Json.Serialization; + +namespace linker.plugins.sforward.proxy +{ + public sealed class SForwardFlow : IFlow + { + public ulong ReceiveBytes { get; private set; } + public ulong SendtBytes { get; private set; } + public string FlowName => "SForward"; + + private readonly LastTicksManager lastTicksManager = new LastTicksManager(); + + private Dictionary flows { get; } = new Dictionary(); + + public SForwardFlow() + { + TimerHelper.SetInterval(() => + { + if (lastTicksManager.Less(5000)) + { + foreach (var item in flows.Values) + { + item.DiffReceiveBytes = item.SendtBytes - item.OldSendtBytes; + item.DiffSendtBytes = item.ReceiveBytes - item.OldReceiveBytes; + + item.OldSendtBytes = item.SendtBytes; + item.OldReceiveBytes = item.ReceiveBytes; + } + } + return true; + }, 1000); + + AddSendt("snltty", 0); + } + + + public void Update() + { + lastTicksManager.Update(); + } + + public void AddReceive(string key, ulong bytes) + { + if (flows.TryGetValue(key, out SForwardFlowItemInfo messengerFlowItemInfo) == false) + { + messengerFlowItemInfo = new SForwardFlowItemInfo { }; + flows.TryAdd(key, messengerFlowItemInfo); + } + ReceiveBytes += bytes; + messengerFlowItemInfo.ReceiveBytes += bytes; + } + public void AddSendt(string key, ulong bytes) + { + if (flows.TryGetValue(key, out SForwardFlowItemInfo messengerFlowItemInfo) == false) + { + messengerFlowItemInfo = new SForwardFlowItemInfo { }; + flows.TryAdd(key, messengerFlowItemInfo); + } + SendtBytes += bytes; + messengerFlowItemInfo.SendtBytes += bytes; + } + public SForwardFlowResponseInfo GetFlows(SForwardFlowRequestInfo info) + { + var items = flows.Where(c => string.IsNullOrWhiteSpace(info.Key) || c.Key.Contains(info.Key)); + switch (info.Order) + { + case SForwardFlowOrder.Sendt: + if (info.OrderType == SForwardFlowOrderType.Desc) + items = items.OrderByDescending(x => x.Value.SendtBytes); + else + items = items.OrderBy(x => x.Value.SendtBytes); + break; + case SForwardFlowOrder.DiffSendt: + if (info.OrderType == SForwardFlowOrderType.Desc) + items = items.OrderByDescending(x => x.Value.DiffSendtBytes); + else + items = items.OrderBy(x => x.Value.DiffSendtBytes); + break; + case SForwardFlowOrder.Receive: + if (info.OrderType == SForwardFlowOrderType.Desc) + items = items.OrderByDescending(x => x.Value.ReceiveBytes); + else + items = items.OrderBy(x => x.Value.ReceiveBytes); + break; + case SForwardFlowOrder.DiffRecive: + if (info.OrderType == SForwardFlowOrderType.Desc) + items = items.OrderByDescending(x => x.Value.DiffReceiveBytes); + else + items = items.OrderBy(x => x.Value.DiffReceiveBytes); + break; + default: + break; + } + SForwardFlowResponseInfo resp = new SForwardFlowResponseInfo + { + Page = info.Page, + PageSize = info.PageSize, + Count = flows.Count, + Data = items.Skip((info.Page - 1) * info.PageSize).Take(info.PageSize).ToDictionary() + }; + + return resp; + } + } + + [MemoryPackable] + public sealed partial class SForwardFlowItemInfo : FlowItemInfo + { + public ulong DiffReceiveBytes { get; set; } + public ulong DiffSendtBytes { get; set; } + + [MemoryPackIgnore, JsonIgnore] + public ulong OldReceiveBytes { get; set; } + [MemoryPackIgnore, JsonIgnore] + public ulong OldSendtBytes { get; set; } + } + + [MemoryPackable] + public sealed partial class SForwardFlowRequestInfo + { + public string Key { get; set; } = string.Empty; + public int Page { get; set; } = 1; + public int PageSize { get; set; } = 15; + public SForwardFlowOrder Order { get; set; } + public SForwardFlowOrderType OrderType { get; set; } + } + + public enum SForwardFlowOrder : byte + { + Sendt = 1, + DiffSendt = 2, + Receive = 3, + DiffRecive = 4 + } + public enum SForwardFlowOrderType : byte + { + Desc = 0, + Asc = 1, + } + + [MemoryPackable] + public sealed partial class SForwardFlowResponseInfo + { + public int Page { get; set; } + public int PageSize { get; set; } + public int Count { get; set; } + public Dictionary Data { get; set; } + } +} \ No newline at end of file diff --git a/linker/plugins/sforward/proxy/SForwardProxy.cs b/linker/plugins/sforward/proxy/SForwardProxy.cs index e997e744..a03b2815 100644 --- a/linker/plugins/sforward/proxy/SForwardProxy.cs +++ b/linker/plugins/sforward/proxy/SForwardProxy.cs @@ -4,19 +4,18 @@ using System.Text; namespace linker.plugins.sforward.proxy { - public partial class SForwardProxy : IFlow + public partial class SForwardProxy { - public ulong ReceiveBytes { get; private set; } - public ulong SendtBytes { get; private set; } - public string FlowName => "SForward"; - private readonly NumberSpace ns = new NumberSpace(); private byte[] flagBytes = Encoding.UTF8.GetBytes($"snltty.sforward"); - public SForwardProxy() + private readonly SForwardFlow sForwardFlow; + public SForwardProxy(SForwardFlow sForwardFlow) { + this.sForwardFlow = sForwardFlow; UdpTask(); + } public string Start(int port, bool isweb, byte bufferSize) diff --git a/linker/plugins/sforward/proxy/SForwardProxyTcp.cs b/linker/plugins/sforward/proxy/SForwardProxyTcp.cs index 49a99658..5fd20b29 100644 --- a/linker/plugins/sforward/proxy/SForwardProxyTcp.cs +++ b/linker/plugins/sforward/proxy/SForwardProxyTcp.cs @@ -18,7 +18,7 @@ namespace linker.plugins.sforward.proxy #region 服务端 - + private void StartTcp(int port, bool isweb, byte bufferSize) { IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port); @@ -154,7 +154,7 @@ namespace linker.plugins.sforward.proxy await token.TargetSocket.SendAsync(buffer1.AsMemory(0, length)).ConfigureAwait(false); //两端交换数据 - await Task.WhenAll(CopyToAsync(buffer1, token.SourceSocket, token.TargetSocket), CopyToAsync(buffer2, token.TargetSocket, token.SourceSocket)).ConfigureAwait(false); + await Task.WhenAll(CopyToAsync(token.Host, token.ListenPort, buffer1, token.SourceSocket, token.TargetSocket), CopyToAsync(token.Host, token.ListenPort, buffer2, token.TargetSocket, token.SourceSocket)).ConfigureAwait(false); CloseClientSocket(token); } @@ -248,7 +248,7 @@ namespace linker.plugins.sforward.proxy await sourceSocket.SendAsync(buffer1.AsMemory(0, flagBytes.Length + 8)).ConfigureAwait(false); //交换数据即可 - await Task.WhenAll(CopyToAsync(buffer1, sourceSocket, targetSocket), CopyToAsync(buffer2, targetSocket, sourceSocket)).ConfigureAwait(false); + await Task.WhenAll(CopyToAsync(string.Empty, service.Port, buffer1, sourceSocket, targetSocket), CopyToAsync(string.Empty, service.Port, buffer2, targetSocket, sourceSocket)).ConfigureAwait(false); } catch (Exception) @@ -265,20 +265,32 @@ namespace linker.plugins.sforward.proxy /// /// 读取数据,然后发送给对方,用户两端交换数据 /// + /// + /// /// /// /// - /// /// - private async Task CopyToAsync(Memory buffer, Socket source, Socket target) + private async Task CopyToAsync(string domain, int port, Memory buffer, Socket source, Socket target) { + bool isDomain = string.IsNullOrWhiteSpace(domain) == false; + string portStr = port.ToString(); + try { int bytesRead; while ((bytesRead = await source.ReceiveAsync(buffer, SocketFlags.None).ConfigureAwait(false)) != 0) { - ReceiveBytes += (ulong)bytesRead; - SendtBytes += (ulong)bytesRead; + if (isDomain) + { + sForwardFlow.AddReceive(domain, (ulong)bytesRead); + sForwardFlow.AddSendt(domain, (ulong)bytesRead); + } + else + { + sForwardFlow.AddReceive(portStr, (ulong)bytesRead); + sForwardFlow.AddSendt(portStr, (ulong)bytesRead); + } await target.SendAsync(buffer.Slice(0, bytesRead), SocketFlags.None).ConfigureAwait(false); } } @@ -296,7 +308,7 @@ namespace linker.plugins.sforward.proxy } } - + } public sealed class AsyncUserToken diff --git a/linker/plugins/sforward/proxy/SForwardProxyUdp.cs b/linker/plugins/sforward/proxy/SForwardProxyUdp.cs index 878c2401..86ea58cd 100644 --- a/linker/plugins/sforward/proxy/SForwardProxyUdp.cs +++ b/linker/plugins/sforward/proxy/SForwardProxyUdp.cs @@ -40,6 +40,9 @@ namespace linker.plugins.sforward.proxy { byte[] buffer = new byte[(1 << bufferSize) * 1024]; IPEndPoint tempRemoteEP = new IPEndPoint(IPAddress.Any, IPEndPoint.MinPort); + + string portStr = token.ListenPort.ToString(); + while (true) { SocketReceiveFromResult result = await token.SourceSocket.ReceiveFromAsync(buffer, tempRemoteEP).ConfigureAwait(false); @@ -49,14 +52,15 @@ namespace linker.plugins.sforward.proxy } Memory memory = buffer.AsMemory(0, result.ReceivedBytes); - ReceiveBytes += (ulong)memory.Length; + + sForwardFlow.AddReceive(portStr, (ulong)memory.Length); IPEndPoint source = result.RemoteEndPoint as IPEndPoint; //已经连接 if (udpConnections.TryGetValue(source, out UdpTargetCache cache) && cache != null) { - SendtBytes += (ulong)memory.Length; - cache.Update(); + sForwardFlow.AddSendt(portStr, (ulong)memory.Length); + cache.LastTicks.Update(); await token.SourceSocket.SendToAsync(memory, cache.IPEndPoint).ConfigureAwait(false); } else @@ -171,6 +175,8 @@ namespace linker.plugins.sforward.proxy IPEndPoint tempEp = new IPEndPoint(IPAddress.Any, IPEndPoint.MinPort); UdpConnectedCache cache = new UdpConnectedCache { SourceSocket = socketUdp, TargetSocket = serviceUdp }; + + string portStr = service.Port.ToString(); while (true) { try @@ -183,11 +189,12 @@ namespace linker.plugins.sforward.proxy socketUdp?.Dispose(); break; } - cache.Update(); + cache.LastTicks.Update(); Memory memory = buffer.AsMemory(0, result.ReceivedBytes); - ReceiveBytes += (ulong)memory.Length; - SendtBytes += (ulong)memory.Length; + + sForwardFlow.AddReceive(portStr, (ulong)memory.Length); + sForwardFlow.AddSendt(portStr, (ulong)memory.Length); if (serviceUdp == null) { @@ -215,11 +222,11 @@ namespace linker.plugins.sforward.proxy break; } Memory memory = buffer.AsMemory(0, result.ReceivedBytes); - ReceiveBytes += (ulong)memory.Length; - SendtBytes += (ulong)memory.Length; + sForwardFlow.AddReceive(portStr, (ulong)memory.Length); + sForwardFlow.AddSendt(portStr, (ulong)memory.Length); await socketUdp.SendToAsync(memory, server).ConfigureAwait(false); - cache.Update(); + cache.LastTicks.Update(); } catch (Exception ex) { @@ -284,24 +291,16 @@ namespace linker.plugins.sforward.proxy public sealed class UdpTargetCache { public IPEndPoint IPEndPoint { get; set; } - public long LastTime { get; set; } = Environment.TickCount64; - public void Update() - { - LastTime = Environment.TickCount64; - } - public bool Timeout => Environment.TickCount64 - LastTime > 5 * 60 * 1000; + public LastTicksManager LastTicks { get; set; } = new LastTicksManager(); + public bool Timeout => LastTicks.Greater(5 * 60 * 1000); } public sealed class UdpConnectedCache { public Socket SourceSocket { get; set; } public Socket TargetSocket { get; set; } - public long LastTime { get; set; } = Environment.TickCount64; - public void Update() - { - LastTime = Environment.TickCount64; - } - public bool Timeout => Environment.TickCount64 - LastTime > 5 * 60 * 1000; + public LastTicksManager LastTicks { get; set; } = new LastTicksManager(); + public bool Timeout => LastTicks.Greater(5 * 60 * 1000); public void Clear() { diff --git a/linker/plugins/tunnel/TunnelApiController.cs b/linker/plugins/tunnel/TunnelApiController.cs index ca78afa8..fbc45c1c 100644 --- a/linker/plugins/tunnel/TunnelApiController.cs +++ b/linker/plugins/tunnel/TunnelApiController.cs @@ -180,23 +180,6 @@ namespace linker.plugins.tunnel return true; } - - - public async Task> Records(ApiControllerParamsInfo param) - { - MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap - { - Connection = clientSignInState.Connection, - MessengerId = (ushort)TunnelMessengerIds.Records - }).ConfigureAwait(false); - - if(resp.Code == MessageResponeCodes.OK) - { - return MemoryPackSerializer.Deserialize>(resp.Data.Span); - } - - return new ConcurrentDictionary(); - } } } diff --git a/linker/plugins/tunnel/messenger/TunnelMessenger.cs b/linker/plugins/tunnel/messenger/TunnelMessenger.cs index e81d0e63..127135b5 100644 --- a/linker/plugins/tunnel/messenger/TunnelMessenger.cs +++ b/linker/plugins/tunnel/messenger/TunnelMessenger.cs @@ -156,22 +156,6 @@ namespace linker.plugins.tunnel.messenger tunnelTransportInfo.Local.MachineName = cacheFrom.MachineName; tunnelTransportInfo.Remote.MachineName = cacheTo.MachineName; - /* - TunnelRecordInfo tunnelRecordInfo = new TunnelRecordInfo - { - MachineName = cacheFrom.MachineName, - Times = 1 - }; - records.AddOrUpdate(cacheFrom.MachineName, tunnelRecordInfo, (a, b) => - { - TunnelRecordItemInfo item = new TunnelRecordItemInfo { MachineName = cacheTo.MachineName, Times = 1 }; - b.To.AddOrUpdate(cacheTo.MachineId, item, (a, b) => { b.Times++; return b; }); - - b.Times++; - return b; - }); - */ - await messengerSender.SendOnly(new MessageRequestWrap { Connection = cacheTo.Connection, @@ -269,13 +253,6 @@ namespace linker.plugins.tunnel.messenger } } - - [MessengerId((ushort)TunnelMessengerIds.Records)] - public void Records(IConnection connection) - { - connection.Write(MemoryPackSerializer.Serialize(records)); - } - } [MemoryPackable] diff --git a/linker/plugins/tunnel/messenger/TunnelMessengerIds.cs b/linker/plugins/tunnel/messenger/TunnelMessengerIds.cs index 202bb27f..4f31aec8 100644 --- a/linker/plugins/tunnel/messenger/TunnelMessengerIds.cs +++ b/linker/plugins/tunnel/messenger/TunnelMessengerIds.cs @@ -22,8 +22,6 @@ RouteLevel = 2011, RouteLevelForward = 2012, - Records = 2013, - None = 2099 } } diff --git a/linker/plugins/tuntap/TuntapApiController.cs b/linker/plugins/tuntap/TuntapApiController.cs index f3489a9e..02c03428 100644 --- a/linker/plugins/tuntap/TuntapApiController.cs +++ b/linker/plugins/tuntap/TuntapApiController.cs @@ -202,6 +202,11 @@ namespace linker.plugins.tuntap } + public void SubscribePing(ApiControllerParamsInfo param) + { + tuntapTransfer.SubscribePing(); + } + public sealed class RouteItemListInfo { public object[] List { get; set; } diff --git a/linker/plugins/tuntap/TuntapTransfer.cs b/linker/plugins/tuntap/TuntapTransfer.cs index 44bf5ac3..2ab3d1f5 100644 --- a/linker/plugins/tuntap/TuntapTransfer.cs +++ b/linker/plugins/tuntap/TuntapTransfer.cs @@ -190,6 +190,9 @@ namespace linker.plugins.tuntap TimerHelper.Async(() => { DeleteForward(); + + bool needReboot = info.IP.Equals(runningConfig.Data.Tuntap.IP) == false || info.PrefixLength != runningConfig.Data.Tuntap.PrefixLength; + runningConfig.Data.Tuntap.IP = info.IP; runningConfig.Data.Tuntap.LanIPs = info.LanIPs; runningConfig.Data.Tuntap.Masks = info.Masks; @@ -197,7 +200,7 @@ namespace linker.plugins.tuntap runningConfig.Data.Tuntap.Switch = info.Switch; runningConfig.Data.Tuntap.Forwards = info.Forwards; runningConfig.Data.Update(); - if (Status == TuntapStatus.Running) + if (Status == TuntapStatus.Running && needReboot) { Shutdown(); Setup(); @@ -254,7 +257,7 @@ namespace linker.plugins.tuntap foreach (var item in list) { tuntapInfos.AddOrUpdate(item.MachineId, item, (a, b) => item); - item.LastTicks = Environment.TickCount64; + item.LastTicks.Update(); } var removes = tuntapInfos.Keys.Except(list.Select(c => c.MachineId)).ToList(); foreach (var item in removes) @@ -262,7 +265,7 @@ namespace linker.plugins.tuntap if (tuntapInfos.TryGetValue(item, out TuntapInfo tuntapInfo)) { tuntapInfo.Status = TuntapStatus.Normal; - tuntapInfo.LastTicks = 0; + tuntapInfo.LastTicks.Clear(); } } Version.Add(); @@ -428,7 +431,7 @@ namespace linker.plugins.tuntap return infos //自己的ip不要 - .Where(c => c.IP.Equals(runningConfig.Data.Tuntap.IP) == false && c.LastTicks > 0) + .Where(c => c.IP.Equals(runningConfig.Data.Tuntap.IP) == false && c.LastTicks.Greater(0)) .OrderBy(c => c.LastTicks) .Select(c => { @@ -511,30 +514,51 @@ namespace linker.plugins.tuntap } + + private readonly LastTicksManager lastTicksManager = new LastTicksManager(); + public void SubscribePing() + { + lastTicksManager.Update(); + } private void PingTask() { TimerHelper.SetInterval(async () => { - if (Status == TuntapStatus.Running && (runningConfig.Data.Tuntap.Switch & TuntapSwitch.ShowDelay) == TuntapSwitch.ShowDelay) + if (lastTicksManager.Less(5000)) { - var items = tuntapInfos.Values.Where(c => c.IP != null && c.IP.Equals(IPAddress.Any) == false && (c.Status & TuntapStatus.Running) == TuntapStatus.Running); - if ((runningConfig.Data.Tuntap.Switch & TuntapSwitch.AutoConnect) != TuntapSwitch.AutoConnect) - { - var connections = tuntapProxy.GetConnections(); - items = items.Where(c => (connections.TryGetValue(c.MachineId, out ITunnelConnection connection) && connection.Connected) || c.MachineId == config.Data.Client.Id); - } - - foreach (var item in items) - { - using Ping ping = new Ping(); - PingReply pingReply = await ping.SendPingAsync(item.IP, 500); - item.Delay = pingReply.Status == IPStatus.Success ? (int)pingReply.RoundtripTime : -1; - - Version.Add(); - } + await Ping(); } return true; }, 3000); + TimerHelper.SetInterval(async () => + { + if (lastTicksManager.Greater(15000)) + { + await Ping(); + } + return true; + }, 30000); + } + private async Task Ping() + { + if (Status == TuntapStatus.Running && (runningConfig.Data.Tuntap.Switch & TuntapSwitch.ShowDelay) == TuntapSwitch.ShowDelay) + { + var items = tuntapInfos.Values.Where(c => c.IP != null && c.IP.Equals(IPAddress.Any) == false && (c.Status & TuntapStatus.Running) == TuntapStatus.Running); + if ((runningConfig.Data.Tuntap.Switch & TuntapSwitch.AutoConnect) != TuntapSwitch.AutoConnect) + { + var connections = tuntapProxy.GetConnections(); + items = items.Where(c => (connections.TryGetValue(c.MachineId, out ITunnelConnection connection) && connection.Connected) || c.MachineId == config.Data.Client.Id); + } + + foreach (var item in items) + { + using Ping ping = new Ping(); + PingReply pingReply = await ping.SendPingAsync(item.IP, 500); + item.Delay = pingReply.Status == IPStatus.Success ? (int)pingReply.RoundtripTime : -1; + + Version.Add(); + } + } } } } diff --git a/linker/plugins/tuntap/config/config.cs b/linker/plugins/tuntap/config/config.cs index d3e1ba41..4f1dd65a 100644 --- a/linker/plugins/tuntap/config/config.cs +++ b/linker/plugins/tuntap/config/config.cs @@ -1,4 +1,5 @@ -using linker.plugins.tuntap.config; +using linker.libs; +using linker.plugins.tuntap.config; using MemoryPack; using System.Net; @@ -149,7 +150,7 @@ namespace linker.plugins.tuntap.config public TuntapSwitch Switch { get; set; } [MemoryPackIgnore] - public long LastTicks { get; set; } = Environment.TickCount64; + public LastTicksManager LastTicks { get; set; } = new LastTicksManager(); /// /// 延迟ms @@ -246,6 +247,29 @@ namespace linker.plugins.tuntap.config } } + /// + /// 使用广播 + /// + [MemoryPackIgnore] + public bool Multicast + { + get + { + return (Switch & TuntapSwitch.Multicast) == TuntapSwitch.Multicast; + } + set + { + if (value) + { + Switch |= TuntapSwitch.Multicast; + } + else + { + Switch &= ~TuntapSwitch.Multicast; + } + } + } + } [Flags] @@ -255,6 +279,7 @@ namespace linker.plugins.tuntap.config ShowDelay = 2, Upgrade = 4, AutoConnect = 8, + Multicast = 16, } diff --git a/linker/plugins/updater/UpdaterClientApiController.cs b/linker/plugins/updater/UpdaterClientApiController.cs index 85ccae26..030c9718 100644 --- a/linker/plugins/updater/UpdaterClientApiController.cs +++ b/linker/plugins/updater/UpdaterClientApiController.cs @@ -41,15 +41,6 @@ namespace linker.plugins.updater updaterTransfer.SetSecretKey(param.Content); } - public UpdateInfo GetCurrent(ApiControllerParamsInfo param) - { - var updaters = updaterTransfer.Get(); - if (updaters.TryGetValue(config.Data.Client.Id, out UpdateInfo info)) - { - return info; - } - return new UpdateInfo { }; - } public async Task GetServer(ApiControllerParamsInfo param) { MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap @@ -85,7 +76,15 @@ namespace linker.plugins.updater }); } - + public UpdateInfo GetCurrent(ApiControllerParamsInfo param) + { + var updaters = updaterTransfer.Get(); + if (updaters.TryGetValue(config.Data.Client.Id, out UpdateInfo info)) + { + return info; + } + return new UpdateInfo { }; + } public UpdaterListInfo Get(ApiControllerParamsInfo param) { ulong hashCode = ulong.Parse(param.Content); @@ -144,6 +143,16 @@ namespace linker.plugins.updater } return true; } + + + public async Task Subscribe(ApiControllerParamsInfo param) + { + await messengerSender.SendOnly(new MessageRequestWrap + { + Connection = clientSignInState.Connection, + MessengerId = (ushort)UpdaterMessengerIds.SubscribeForward + }); + } } public sealed class UpdaterListInfo diff --git a/linker/plugins/updater/UpdaterClientTransfer.cs b/linker/plugins/updater/UpdaterClientTransfer.cs index e93fb138..048fc87b 100644 --- a/linker/plugins/updater/UpdaterClientTransfer.cs +++ b/linker/plugins/updater/UpdaterClientTransfer.cs @@ -13,6 +13,7 @@ namespace linker.plugins.updater { private UpdateInfo updateInfo = new UpdateInfo(); private ConcurrentDictionary updateInfos = new ConcurrentDictionary(); + private ConcurrentDictionary subscribes = new ConcurrentDictionary(); private readonly FileConfig fileConfig; private readonly MessengerSender messengerSender; @@ -76,19 +77,43 @@ namespace linker.plugins.updater } } + + public void Subscribe(string machineId) + { + if (subscribes.TryGetValue(machineId, out LastTicksManager lastTicksManager) == false) + { + lastTicksManager = new LastTicksManager(); + subscribes.TryAdd(machineId, lastTicksManager); + } + + //距离上次订阅超过一分钟,需要立即更新一次 + bool needUpdate = lastTicksManager.Greater(60 * 1000); + + lastTicksManager.Update(); + + if (needUpdate) + { + updateInfo.Update(); + } + } + private void UpdateTask() { TimerHelper.SetInterval(async () => { if (updateInfo.Updated) { - updateInfo.MachineId = fileConfig.Data.Client.Id; - await messengerSender.SendOnly(new MessageRequestWrap + string[] machines = subscribes.Where(c => c.Value.Less(15000)).Select(c => c.Key).ToArray(); + if (machines.Length > 0) { - Connection = clientSignInState.Connection, - MessengerId = (ushort)UpdaterMessengerIds.UpdateForward, - Payload = MemoryPackSerializer.Serialize(updateInfo), - }); + updateInfo.MachineId = fileConfig.Data.Client.Id; + await messengerSender.SendOnly(new MessageRequestWrap + { + Connection = clientSignInState.Connection, + MessengerId = (ushort)UpdaterMessengerIds.UpdateForward, + Payload = MemoryPackSerializer.Serialize(new UpdateClientInfo { ToMachines = machines, Info = updateInfo }), + }); + } Update(updateInfo); } return true; @@ -105,5 +130,11 @@ namespace linker.plugins.updater } } + [MemoryPackable] + public sealed partial class UpdateClientInfo + { + public string[] ToMachines { get; set; } + public UpdateInfo Info { get; set; } + } } diff --git a/linker/plugins/updater/messenger/UpdaterMessenger.cs b/linker/plugins/updater/messenger/UpdaterMessenger.cs index c9c53750..7062f7a8 100644 --- a/linker/plugins/updater/messenger/UpdaterMessenger.cs +++ b/linker/plugins/updater/messenger/UpdaterMessenger.cs @@ -44,6 +44,14 @@ namespace linker.plugins.updater.messenger { Environment.Exit(1); } + + + [MessengerId((ushort)UpdaterMessengerIds.Subscribe)] + public void Subscribe(IConnection connection) + { + string machineId = MemoryPackSerializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); + updaterTransfer.Subscribe(machineId); + } } @@ -144,16 +152,17 @@ namespace linker.plugins.updater.messenger [MessengerId((ushort)UpdaterMessengerIds.UpdateForward)] public void UpdateForward(IConnection connection) { - UpdateInfo info = MemoryPackSerializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); + UpdateClientInfo info = MemoryPackSerializer.Deserialize(connection.ReceiveRequestWrap.Payload.Span); if (signCaching.TryGet(connection.Id, out SignCacheInfo cache)) { - foreach (var item in signCaching.Get(cache.GroupId).Where(c => c.Connected && c.MachineId != connection.Id)) + byte[] payload = MemoryPackSerializer.Serialize(info.Info); + foreach (var item in signCaching.Get(cache.GroupId).Where(c => info.ToMachines.Contains(c.MachineId)).Where(c => c.Connected && c.MachineId != connection.Id)) { _ = messengerSender.SendOnly(new MessageRequestWrap { Connection = item.Connection, MessengerId = (ushort)UpdaterMessengerIds.Update, - Payload = connection.ReceiveRequestWrap.Payload + Payload = payload }); } @@ -177,5 +186,30 @@ namespace linker.plugins.updater.messenger }); } } + + + + /// + /// 订阅更新信息 + /// + /// + [MessengerId((ushort)UpdaterMessengerIds.SubscribeForward)] + public void SubscribeForward(IConnection connection) + { + if (signCaching.TryGet(connection.Id, out SignCacheInfo cache)) + { + byte[] mechineId = MemoryPackSerializer.Serialize(connection.Id); + foreach (var item in signCaching.Get(cache.GroupId).Where(c => c.Connected && c.MachineId != connection.Id)) + { + _ = messengerSender.SendOnly(new MessageRequestWrap + { + Connection = item.Connection, + MessengerId = (ushort)UpdaterMessengerIds.Subscribe, + Payload = mechineId + }); + } + + } + } } } diff --git a/linker/plugins/updater/messenger/UpdaterMessengerIds.cs b/linker/plugins/updater/messenger/UpdaterMessengerIds.cs index e2d17b51..af2460e5 100644 --- a/linker/plugins/updater/messenger/UpdaterMessengerIds.cs +++ b/linker/plugins/updater/messenger/UpdaterMessengerIds.cs @@ -17,6 +17,9 @@ ConfirmServer = 2608, ExitServer = 2609, + Subscribe = 2610, + SubscribeForward = 2611, + Max = 2299 } } diff --git a/version.txt b/version.txt index eeae3f71..d1d2b68b 100644 --- a/version.txt +++ b/version.txt @@ -1,3 +1,4 @@ v1.4.4 -2024-09-26 23:52:00 -1. 增加流量统计(暂时显示服务端流量) \ No newline at end of file +2024-09-27 17:23:47 +1. 总览,和详细流量统计,一眼知道服务器流量花在哪里 +2. 优化信标。减少流量,没有操作时尽量不产生流量 \ No newline at end of file