using linker.libs;
using linker.libs.extends;
using linker.messenger.relay.messenger;
using System.Buffers;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
namespace linker.messenger.relay.server
{
///
/// 中继节点操作
///
public class RelayServerNodeTransfer
{
private uint connectionNum = 0;
private IConnection localConnection;
private IConnection remoteConnection;
private long bytes = 0;
private long lastBytes = 0;
private RelaySpeedLimit limitTotal = new RelaySpeedLimit();
private readonly ConcurrentDictionary trafficDict = new ConcurrentDictionary();
private readonly ConcurrentDictionary cdkeyLastBytes = new ConcurrentDictionary();
private readonly ISerializer serializer;
private readonly IRelayServerNodeStore relayServerNodeStore;
private readonly IRelayServerMasterStore relayServerMasterStore;
private readonly IMessengerResolver messengerResolver;
private readonly IMessengerSender messengerSender;
public RelayServerNodeTransfer(ISerializer serializer, IRelayServerNodeStore relayServerNodeStore, IRelayServerMasterStore relayServerMasterStore, IMessengerResolver messengerResolver, IMessengerSender messengerSender)
{
this.serializer = serializer;
this.relayServerNodeStore = relayServerNodeStore;
this.relayServerMasterStore = relayServerMasterStore;
this.messengerResolver = messengerResolver;
this.messengerSender = messengerSender;
limitTotal.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidthTotal * 1024 * 1024) / 8.0));
TrafficTask();
ReportTask();
SignInTask();
}
public async ValueTask TryGetRelayCache(string key, string nodeid)
{
try
{
IConnection connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection;
MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap
{
Connection = connection,
MessengerId = (ushort)RelayMessengerIds.NodeGetCache,
Payload = serializer.Serialize(key)
});
if (resp.Code == MessageResponeCodes.OK && resp.Data.Length > 0)
{
return serializer.Deserialize(resp.Data.Span);
}
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Error($"{ex}");
}
return null;
}
///
/// 无效请求
///
///
public bool Validate(RelayCacheInfo relayCache)
{
//已认证的没有流量限制
if (relayCache.Validated) return true;
//流量卡有的,就能继续用
if (relayCache.Cdkey.Any(c => c.LastBytes > 0)) return true;
return ValidateConnection(relayCache) && ValidateBytes(relayCache);
}
///
/// 连接数是否够
///
///
private bool ValidateConnection(RelayCacheInfo relayCache)
{
bool res = relayServerNodeStore.Node.MaxConnection == 0 || relayServerNodeStore.Node.MaxConnection * 2 > connectionNum;
if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Debug($"relay ValidateConnection false,{connectionNum}/{relayServerNodeStore.Node.MaxConnection * 2}");
return res;
}
///
/// 流量是否够
///
///
private bool ValidateBytes(RelayCacheInfo relayCache)
{
bool res = relayServerNodeStore.Node.MaxGbTotal == 0
|| (relayServerNodeStore.Node.MaxGbTotal > 0 && relayServerNodeStore.Node.MaxGbTotalLastBytes > 0);
if (res == false && LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
LoggerHelper.Instance.Debug($"relay ValidateBytes false,{relayServerNodeStore.Node.MaxGbTotalLastBytes}bytes/{relayServerNodeStore.Node.MaxGbTotal}gb");
return res;
}
///
/// 增加连接数
///
public void IncrementConnectionNum()
{
Interlocked.Increment(ref connectionNum);
}
///
/// 减少连接数
///
public void DecrementConnectionNum()
{
Interlocked.Decrement(ref connectionNum);
}
///
/// 是否需要总限速
///
///
public bool NeedLimit(RelayTrafficCacheInfo relayCache)
{
if (relayCache.Cache.Validated) return false;
//if (relayCache.CurrentCdkey != null) return false;
return limitTotal.NeedLimit();
}
///
/// 总限速
///
///
///
public bool TryLimit(ref int length)
{
return limitTotal.TryLimit(ref length);
}
///
/// 开始计算流量
///
///
public void AddTrafficCache(RelayTrafficCacheInfo relayCache)
{
SetLimit(relayCache);
trafficDict.TryAdd(relayCache.Cache.FlowId, relayCache);
}
///
/// 取消计算流量
///
///
public void RemoveTrafficCache(RelayTrafficCacheInfo relayCache)
{
trafficDict.TryRemove(relayCache.Cache.FlowId, out _);
foreach (var item in relayCache.Cache.Cdkey)
{
cdkeyLastBytes.TryRemove(item.CdkeyId, out _);
}
}
///
/// 消耗流量
///
///
///
public bool AddBytes(RelayTrafficCacheInfo cache, long length)
{
Interlocked.Add(ref bytes, length);
//验证过的,不消耗流量
if (cache.Cache.Validated) return true;
//节点无流量限制的,不消耗流量
if (relayServerNodeStore.Node.MaxGbTotal == 0) return true;
Interlocked.Add(ref cache.Sendt, length);
var current = cache.CurrentCdkey;
if (current != null) return current.LastBytes > 0;
return relayServerNodeStore.Node.MaxGbTotalLastBytes > 0;
}
///
/// 设置限速
///
///
private void SetLimit(RelayTrafficCacheInfo relayCache)
{
//无限制
if (relayCache.Cache.Validated || relayServerNodeStore.Node.MaxBandwidth == 0)
{
relayCache.Limit.SetLimit(0);
return;
}
RelayServerCdkeyInfo currentCdkey = relayCache.Cache.Cdkey.Where(c => c.LastBytes > 0).OrderByDescending(c => c.Bandwidth).FirstOrDefault();
//有cdkey,且带宽大于节点带宽,就用cdkey的带宽
if (currentCdkey != null && (currentCdkey.Bandwidth == 0 || currentCdkey.Bandwidth > relayServerNodeStore.Node.MaxBandwidth))
{
relayCache.CurrentCdkey = currentCdkey;
relayCache.Limit.SetLimit((uint)Math.Ceiling((relayCache.CurrentCdkey.Bandwidth * 1024 * 1024) / 8.0));
return;
}
relayCache.CurrentCdkey = null;
relayCache.Limit.SetLimit((uint)Math.Ceiling((relayServerNodeStore.Node.MaxBandwidth * 1024 * 1024) / 8.0));
}
private void ResetNodeBytes()
{
foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey == null))
{
long length = cache.Sendt;
Interlocked.Exchange(ref cache.Sendt, 0);
if (relayServerNodeStore.Node.MaxGbTotalLastBytes >= length)
relayServerNodeStore.SetMaxGbTotalLastBytes(relayServerNodeStore.Node.MaxGbTotalLastBytes - length);
else relayServerNodeStore.SetMaxGbTotalLastBytes(0);
}
if (relayServerNodeStore.Node.MaxGbTotalMonth != DateTime.Now.Month)
{
relayServerNodeStore.SetMaxGbTotalMonth(DateTime.Now.Month);
relayServerNodeStore.SetMaxGbTotalLastBytes((long)(relayServerNodeStore.Node.MaxGbTotal * 1024 * 1024 * 1024));
}
relayServerNodeStore.Confirm();
}
private void DownloadBytes()
{
TimerHelper.Async(async () =>
{
List ids = trafficDict.Values.SelectMany(c => c.Cache.Cdkey).Select(c => c.CdkeyId).Distinct().ToList();
while (ids.Count > 0)
{
//分批更新,可能数量很大
List id = ids.Take(100).ToList();
ids.RemoveRange(0, id.Count);
MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap
{
Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection,
MessengerId = (ushort)RelayMessengerIds.TrafficReport,
Payload = serializer.Serialize(new RelayTrafficUpdateInfo
{
Dic = [],
Ids = id,
SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID
? relayServerMasterStore.Master.SecretKey
: relayServerNodeStore.Node.MasterSecretKey
}),
Timeout = 4000
});
if (resp.Code == MessageResponeCodes.OK && resp.Data.Length > 0)
{
Dictionary dic = serializer.Deserialize>(resp.Data.Span);
//更新剩余流量
foreach (KeyValuePair item in dic)
{
cdkeyLastBytes.AddOrUpdate(item.Key, item.Value, (a, b) => item.Value);
}
//查不到的,归零
foreach (long item in id.Except(dic.Keys))
{
cdkeyLastBytes.AddOrUpdate(item, 0, (a, b) => 0);
}
}
}
});
}
private void UploadBytes()
{
TimerHelper.Async(async () =>
{
MessageResponeInfo resp = await messengerSender.SendReply(new MessageRequestWrap
{
Connection = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection,
MessengerId = (ushort)RelayMessengerIds.TrafficReport,
Payload = serializer.Serialize(new RelayTrafficUpdateInfo
{
Dic = trafficDict.Values.Where(c => c.CurrentCdkey != null && c.Sendt > 0).GroupBy(c => c.CurrentCdkey.CdkeyId).ToDictionary(c => c.Key, d => d.Sum(d => d.Sendt)),
Ids = [],
SecretKey = relayServerNodeStore.Node.Id == RelayServerNodeInfo.MASTER_NODE_ID
? relayServerMasterStore.Master.SecretKey
: relayServerNodeStore.Node.MasterSecretKey
}),
Timeout = 4000
});
if (resp.Code == MessageResponeCodes.OK)
{
try
{
serializer.Deserialize>(resp.Data.Span);
//成功报告了流量,就重新计数
foreach (var cache in trafficDict.Values.Where(c => c.CurrentCdkey != null))
{
Interlocked.Exchange(ref cache.Sendt, 0);
//检查一下是不是需要更新剩余流量
if (cdkeyLastBytes.TryGetValue(cache.CurrentCdkey.CdkeyId, out long value))
{
cache.CurrentCdkey.LastBytes = value;
}
//当前cdkey流量用完了,就重新找找新的cdkey
if (cache.CurrentCdkey.LastBytes <= 0)
{
SetLimit(cache);
}
}
}
catch (Exception)
{
}
}
});
}
private void TrafficTask()
{
TimerHelper.SetIntervalLong(() =>
{
UploadBytes();
DownloadBytes();
ResetNodeBytes();
return true;
}, 5000);
}
private void ReportTask()
{
TimerHelper.SetIntervalLong(async () =>
{
IEnumerable nodes = new List
{
//默认报告给自己,作为本服务器的一个默认中继节点
new RelayServerNodeInfo{
Id = RelayServerNodeInfo.MASTER_NODE_ID,
Host = new IPEndPoint(IPAddress.Any, relayServerNodeStore.ServicePort).ToString(),
MasterHost = new IPEndPoint(IPAddress.Loopback, relayServerNodeStore.ServicePort).ToString(),
MasterSecretKey = relayServerMasterStore.Master.SecretKey,
MaxBandwidth = 0,
MaxConnection = 0,
MaxBandwidthTotal=0,
MaxGbTotal=0,
MaxGbTotalLastBytes=0,
MaxGbTotalMonth=0,
Name = "default",
Public = false
},
//配置的中继节点
relayServerNodeStore.Node
}.Where(c => string.IsNullOrWhiteSpace(c.MasterHost) == false && string.IsNullOrWhiteSpace(c.MasterSecretKey) == false)
.Where(c => string.IsNullOrWhiteSpace(c.Name) == false && string.IsNullOrWhiteSpace(c.Id) == false);
double diff = (bytes - lastBytes) * 8 / 1024.0 / 1024.0;
lastBytes = bytes;
foreach (var node in nodes)
{
try
{
IConnection connection = node.Id == RelayServerNodeInfo.MASTER_NODE_ID ? localConnection : remoteConnection;
IPEndPoint endPoint = await NetworkHelper.GetEndPointAsync(node.Host, relayServerNodeStore.ServicePort) ?? new IPEndPoint(IPAddress.Any, relayServerNodeStore.ServicePort);
RelayServerNodeReportInfo relayNodeReportInfo = new RelayServerNodeReportInfo
{
Id = node.Id,
Name = node.Name,
Public = node.Public,
MaxBandwidth = node.MaxBandwidth,
BandwidthRatio = Math.Round(node.MaxBandwidthTotal == 0 ? 0 : diff / 5 / node.MaxBandwidthTotal, 2),
MaxBandwidthTotal = node.MaxBandwidthTotal,
MaxGbTotal = node.MaxGbTotal,
MaxGbTotalLastBytes = node.MaxGbTotalLastBytes,
MaxConnection = node.MaxConnection,
ConnectionRatio = Math.Round(node.MaxConnection == 0 ? 0 : connectionNum / 2.0 / node.MaxConnection, 2),
EndPoint = endPoint
};
await messengerSender.SendOnly(new MessageRequestWrap
{
Connection = connection,
MessengerId = (ushort)RelayMessengerIds.NodeReport,
Payload = serializer.Serialize(relayNodeReportInfo)
});
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error($"relay report : {ex}");
}
}
}
return true;
}, 5000);
}
private void SignInTask()
{
TimerHelper.SetIntervalLong(async () =>
{
if ((remoteConnection == null || remoteConnection.Connected == false) && string.IsNullOrWhiteSpace(relayServerNodeStore.Node.MasterHost) == false)
{
remoteConnection = await SignIn(relayServerNodeStore.Node.MasterHost, relayServerNodeStore.Node.MasterSecretKey).ConfigureAwait(false);
}
if (localConnection == null || localConnection.Connected == false)
{
localConnection = await SignIn(new IPEndPoint(IPAddress.Loopback, relayServerNodeStore.ServicePort).ToString(), relayServerMasterStore.Master.SecretKey).ConfigureAwait(false);
}
return true;
}, 3000);
}
private async Task SignIn(string host, string secretKey)
{
byte[] bytes = ArrayPool.Shared.Rent(1024);
try
{
byte[] secretKeyBytes = secretKey.Md5().ToBytes();
bytes[0] = (byte)secretKeyBytes.Length;
secretKeyBytes.AsSpan().CopyTo(bytes.AsSpan(1));
IPEndPoint remote = await NetworkHelper.GetEndPointAsync(host, 1802);
Socket socket = new Socket(remote.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.KeepAlive();
await socket.ConnectAsync(remote).WaitAsync(TimeSpan.FromMilliseconds(5000)).ConfigureAwait(false);
return await messengerResolver.BeginReceiveClient(socket, true, (byte)ResolverType.RelayReport, bytes.AsMemory(0, secretKeyBytes.Length + 1)).ConfigureAwait(false);
}
catch (Exception ex)
{
if (LoggerHelper.Instance.LoggerLevel <= LoggerTypes.DEBUG)
{
LoggerHelper.Instance.Error(ex);
}
}
finally
{
ArrayPool.Shared.Return(bytes);
}
return null;
}
}
public sealed partial class RelayTrafficUpdateInfo
{
///
/// cdkey id 和 流量
///
public Dictionary Dic { get; set; }
///
/// 需要知道哪些cdkey的剩余流量
///
public List Ids { get; set; }
public string SecretKey { get; set; }
}
}