From 7f1d5233a1171e1a1d37fc46cf1494d8f11415e5 Mon Sep 17 00:00:00 2001 From: VaalaCat Date: Sat, 13 Dec 2025 16:47:46 +0000 Subject: [PATCH] feat: implement ping smoothing for endpoint and virtual address pings --- services/wg/ping.go | 98 +++++++++++++++++-- services/wg/routing_planner.go | 58 +---------- services/wg/routing_planner_policy.go | 134 ++++++++++++++++++++++++++ services/wg/wireguard.go | 22 +++-- services/wg/wireguard_types.go | 4 + 5 files changed, 243 insertions(+), 73 deletions(-) create mode 100644 services/wg/routing_planner_policy.go diff --git a/services/wg/ping.go b/services/wg/ping.go index 0ce7613..2a70c0b 100644 --- a/services/wg/ping.go +++ b/services/wg/ping.go @@ -26,6 +26,14 @@ const ( endpointPingTimeout = 10 * time.Second ) +// ping 平滑参数(用于上报到 master 的 runtimeInfo.PingMap / VirtAddrPingMap) +const ( + pingSmoothAlpha = 0.30 // EWMA: new*alpha + old*(1-alpha) + pingSmoothMinMs uint32 = 1 + pingSmoothMaxMs uint32 = 1500 + pingSmoothBucketMs uint32 = 5 // 分桶:减少 1-2ms 的抖动引起的波动 +) + func (w *wireGuard) pingPeers() { log := w.svcLogger.WithField("op", "pingPeers") @@ -149,17 +157,13 @@ func (w *wireGuard) scheduleVirtualAddrPings(log *logrus.Entry, ifceConfig *defs avg, err := tcpPingAvg(tcpAddr, endpointPingCount, endpointPingTimeout) if err != nil { log.WithError(err).Errorf("failed to tcp ping virt addr %s via %s", addr, tcpAddr) - if w.virtAddrPingMap != nil { - w.virtAddrPingMap.Store(addr, math.MaxUint32) - } + w.storeVirtAddrPing(addr, math.MaxUint32) return } log.Tracef("tcp ping stats for %s via %s: avg=%s", addr, tcpAddr, avg) avgRttMs := uint32(avg.Milliseconds()) - if w.virtAddrPingMap != nil { - w.virtAddrPingMap.Store(addr, avgRttMs) - } + w.storeVirtAddrPing(addr, avgRttMs) log.Debugf("tcp ping virt addr [%s] completed via %s", addr, tcpAddr) }) } @@ -177,7 +181,87 @@ func (w *wireGuard) storeEndpointPing(peerID uint32, ms uint32) { if w.endpointPingMap == nil { return } - w.endpointPingMap.Store(peerID, ms) + w.endpointPingMap.Store(peerID, w.smoothEndpointPing(peerID, ms)) +} + +func (w *wireGuard) storeVirtAddrPing(addr string, ms uint32) { + if w.virtAddrPingMap == nil || addr == "" { + return + } + w.virtAddrPingMap.Store(addr, w.smoothVirtAddrPing(addr, ms)) +} + +func clampPingMs(ms uint32) uint32 { + if ms == 0 { + ms = 1 + } + if ms < pingSmoothMinMs { + return pingSmoothMinMs + } + if ms > pingSmoothMaxMs { + return pingSmoothMaxMs + } + return ms +} + +func bucketPingMs(ms uint32) uint32 { + if pingSmoothBucketMs == 0 { + return ms + } + b := pingSmoothBucketMs + // 四舍五入到最近桶 + return ((ms + b/2) / b) * b +} + +func (w *wireGuard) smoothEndpointPing(peerID uint32, raw uint32) uint32 { + // 不可达哨兵值:直接上报不可达,但保留历史 EWMA 以便恢复时平滑 + if raw == math.MaxUint32 { + return math.MaxUint32 + } + raw = bucketPingMs(clampPingMs(raw)) + v := float64(raw) + + w.pingAggMu.Lock() + defer w.pingAggMu.Unlock() + + if w.endpointPingEWMA == nil { + w.endpointPingEWMA = make(map[uint32]float64, 64) + } + old, ok := w.endpointPingEWMA[peerID] + if !ok || old <= 0 { + w.endpointPingEWMA[peerID] = v + return raw + } + ema := pingSmoothAlpha*v + (1.0-pingSmoothAlpha)*old + w.endpointPingEWMA[peerID] = ema + ms := uint32(math.Round(ema)) + ms = bucketPingMs(clampPingMs(ms)) + return ms +} + +func (w *wireGuard) smoothVirtAddrPing(addr string, raw uint32) uint32 { + if raw == math.MaxUint32 { + return math.MaxUint32 + } + raw = bucketPingMs(clampPingMs(raw)) + v := float64(raw) + + w.pingAggMu.Lock() + defer w.pingAggMu.Unlock() + + if w.virtAddrPingEWMA == nil { + w.virtAddrPingEWMA = make(map[string]float64, 64) + } + old, ok := w.virtAddrPingEWMA[addr] + if !ok || old <= 0 { + w.virtAddrPingEWMA[addr] = v + return raw + } + ema := pingSmoothAlpha*v + (1.0-pingSmoothAlpha)*old + w.virtAddrPingEWMA[addr] = ema + ms := uint32(math.Round(ema)) + ms = bucketPingMs(clampPingMs(ms)) + return ms } // collectEndpointPingTargets 收集所有“可能直连”的节点 endpoint(高内聚:只关注 ping 需要的目标集合)。 diff --git a/services/wg/routing_planner.go b/services/wg/routing_planner.go index fc12454..aa7fa14 100644 --- a/services/wg/routing_planner.go +++ b/services/wg/routing_planner.go @@ -10,51 +10,8 @@ import ( "github.com/VaalaCat/frp-panel/models" "github.com/VaalaCat/frp-panel/pb" - "github.com/VaalaCat/frp-panel/services/app" ) -// RoutingPolicy 决定边权重的计算方式。 -// cost = LatencyWeight*latency_ms + InverseBandwidthWeight*(1/max(up_mbps,1e-6)) + HopWeight + HandshakePenalty -type RoutingPolicy struct { - LatencyWeight float64 - InverseBandwidthWeight float64 - HopWeight float64 - MinUpMbps uint32 - DefaultEndpointUpMbps uint32 - DefaultEndpointLatencyMs uint32 - OfflineThreshold time.Duration - // HandshakeStaleThreshold/HandshakeStalePenalty 用于抑制“握手过旧”的链路被选为最短路。 - // 仅在能从 runtimeInfo 中找到对应 peer 的 last_handshake_time_sec 时生效;否则不惩罚(避免误伤)。 - HandshakeStaleThreshold time.Duration - HandshakeStalePenalty float64 - - ACL *ACL - NetworkTopologyCache app.NetworkTopologyCache - CliMgr app.ClientsManager -} - -func (p *RoutingPolicy) LoadACL(acl *ACL) *RoutingPolicy { - p.ACL = acl - return p -} - -func DefaultRoutingPolicy(acl *ACL, networkTopologyCache app.NetworkTopologyCache, cliMgr app.ClientsManager) RoutingPolicy { - return RoutingPolicy{ - LatencyWeight: 1.0, - InverseBandwidthWeight: 50.0, // 对低带宽路径给予更高惩罚 - HopWeight: 1.0, - DefaultEndpointUpMbps: 50, - DefaultEndpointLatencyMs: 30, - OfflineThreshold: 2 * time.Minute, - // 默认启用一个温和的“握手过旧惩罚”:优先选择近期有握手的链路,但不至于强制剔除路径。 - HandshakeStaleThreshold: 5 * time.Minute, - HandshakeStalePenalty: 30.0, - ACL: acl, - NetworkTopologyCache: networkTopologyCache, - CliMgr: cliMgr, - } -} - type AllowedIPsPlanner interface { // Compute 基于拓扑与链路指标,计算每个节点应配置到直连邻居的 AllowedIPs。 // 输入的 peers 应包含同一 Network 下的所有 WireGuard 节点,links 为其有向链路。 @@ -374,12 +331,10 @@ func buildAdjacency(order []uint, idToPeer map[uint]*models.WireGuard, links []* } latency := policy.DefaultEndpointLatencyMs + // GetLatencyMs 已自带“正反向兜底 + endpoint/virt ping 组合”,这里避免重复查询与覆盖,减少抖动 if latencyMs, ok := policy.NetworkTopologyCache.GetLatencyMs(from, to); ok { latency = latencyMs } - if latencyMs, ok := policy.NetworkTopologyCache.GetLatencyMs(to, from); ok { - latency = latencyMs - } if lastSeenAt, ok := policy.CliMgr.GetLastSeenAt(idToPeer[from].ClientID); !ok || time.Since(lastSeenAt) > policy.OfflineThreshold { continue @@ -480,16 +435,7 @@ func runAllPairsDijkstra(order []uint, adj map[uint][]Edge, idToPeer map[uint]*m } visited[u] = true for _, e := range adj[u] { - invBw := 1.0 / math.Max(float64(e.upMbps), 1e-6) - handshakePenalty := 0.0 - if policy.HandshakeStalePenalty > 0 && policy.HandshakeStaleThreshold > 0 { - // 握手惩罚必须是“无方向”的,否则会导致 A->B 与 B->A 权重不一致, - // 进而产生单向选路(WireGuard AllowedIPs 源地址校验下会丢包)。 - if age, ok := getHandshakeAgeBetween(u, e.to, idToPeer, policy); ok && age > policy.HandshakeStaleThreshold { - handshakePenalty = policy.HandshakeStalePenalty - } - } - w := policy.LatencyWeight*float64(e.latency) + policy.InverseBandwidthWeight*invBw + policy.HopWeight + handshakePenalty + w := policy.EdgeWeight(u, e, idToPeer) alt := dist[u] + w if alt < dist[e.to] { dist[e.to] = alt diff --git a/services/wg/routing_planner_policy.go b/services/wg/routing_planner_policy.go new file mode 100644 index 0000000..891eaaf --- /dev/null +++ b/services/wg/routing_planner_policy.go @@ -0,0 +1,134 @@ +package wg + +import ( + "math" + "time" + + "github.com/VaalaCat/frp-panel/models" + "github.com/VaalaCat/frp-panel/services/app" +) + +// RoutingPolicy 决定边权重的计算方式。 +// cost = LatencyTerm + InverseBandwidthTerm + HopWeight + HandshakePenalty +type RoutingPolicy struct { + LatencyWeight float64 + InverseBandwidthWeight float64 + HopWeight float64 + MinUpMbps uint32 + + // LatencyBucketMs 用于对 latency 做“分桶/量化”,降低抖动导致的最短路频繁切换。 + // 例如 bucket=5ms,则 31/32/33ms 都会被量化为 30/35ms 附近的同一档。 + LatencyBucketMs uint32 + // MinLatencyMs/MaxLatencyMs 用于对 latency 做限幅,避免异常值对最短路产生过强扰动。 + MinLatencyMs uint32 + MaxLatencyMs uint32 + // LatencyLogScale>0 时,对 latency 使用 log1p 变换并乘以该系数,使权重对小幅抖动更不敏感。 + // 若为 0,则回退为线性 latency。 + LatencyLogScale float64 + + DefaultEndpointUpMbps uint32 + DefaultEndpointLatencyMs uint32 + OfflineThreshold time.Duration + // HandshakeStaleThreshold/HandshakeStalePenalty 用于抑制“握手过旧”的链路被选为最短路。 + // 仅在能从 runtimeInfo 中找到对应 peer 的 last_handshake_time_sec 时生效;否则不惩罚(避免误伤)。 + HandshakeStaleThreshold time.Duration + HandshakeStalePenalty float64 + + ACL *ACL + NetworkTopologyCache app.NetworkTopologyCache + CliMgr app.ClientsManager +} + +func (p *RoutingPolicy) LoadACL(acl *ACL) *RoutingPolicy { + p.ACL = acl + return p +} + +func DefaultRoutingPolicy(acl *ACL, networkTopologyCache app.NetworkTopologyCache, cliMgr app.ClientsManager) RoutingPolicy { + return RoutingPolicy{ + LatencyWeight: 1.0, + InverseBandwidthWeight: 50.0, // 对低带宽路径给予更高惩罚 + HopWeight: 1.0, + MinUpMbps: 1, + LatencyBucketMs: 5, + MinLatencyMs: 1, + MaxLatencyMs: 1500, + LatencyLogScale: 10.0, + DefaultEndpointUpMbps: 50, + DefaultEndpointLatencyMs: 30, + OfflineThreshold: 2 * time.Minute, + // 默认启用一个温和的“握手过旧惩罚”:优先选择近期有握手的链路,但不至于强制剔除路径。 + HandshakeStaleThreshold: 5 * time.Minute, + HandshakeStalePenalty: 30.0, + ACL: acl, + NetworkTopologyCache: networkTopologyCache, + CliMgr: cliMgr, + } +} + +// EdgeWeight 计算一条“有向边”的权重(越小越优)。 +// 为了抑制延迟探测的噪声导致路由频繁抖动,这里对 latency 做了:限幅 + 分桶(可选)+ log1p(可选)。 +func (p *RoutingPolicy) EdgeWeight(fromWGID uint, e Edge, idToPeer map[uint]*models.WireGuard) float64 { + lat := float64(e.latency) + + // 1) 延迟限幅 + minLat := float64(p.MinLatencyMs) + maxLat := float64(p.MaxLatencyMs) + if minLat <= 0 { + minLat = 1 + } + if maxLat <= 0 { + maxLat = 1500 + } + if lat < minLat { + lat = minLat + } + if lat > maxLat { + lat = maxLat + } + + // 2) 延迟分桶(量化) + if p.LatencyBucketMs > 0 { + b := float64(p.LatencyBucketMs) + // 四舍五入到最近桶 + lat = math.Floor((lat+b/2)/b) * b + if lat < minLat { + lat = minLat + } + if lat > maxLat { + lat = maxLat + } + } + + // 3) 延迟项:log1p(可选)+ scale + latencyTerm := 0.0 + if p.LatencyWeight != 0 { + if p.LatencyLogScale > 0 { + latencyTerm = p.LatencyWeight * math.Log1p(lat) * p.LatencyLogScale + } else { + latencyTerm = p.LatencyWeight * lat + } + } + + // 4) 带宽项:对低带宽更敏感;使用 MinUpMbps 做下限避免极端值 + minUp := float64(p.MinUpMbps) + if minUp <= 0 { + minUp = 1 + } + up := math.Max(float64(e.upMbps), minUp) + invBw := 1.0 / math.Max(up, 1e-6) + bwTerm := p.InverseBandwidthWeight * invBw + + // 5) hop 项 + hopTerm := p.HopWeight + + // 6) 握手过旧惩罚:必须无方向 + handshakePenalty := 0.0 + if p.HandshakeStalePenalty > 0 && p.HandshakeStaleThreshold > 0 { + if age, ok := getHandshakeAgeBetween(fromWGID, e.to, idToPeer, *p); ok && age > p.HandshakeStaleThreshold { + handshakePenalty = p.HandshakeStalePenalty + } + } + + return latencyTerm + bwTerm + hopTerm + handshakePenalty +} diff --git a/services/wg/wireguard.go b/services/wg/wireguard.go index 11e9f2d..3c76933 100644 --- a/services/wg/wireguard.go +++ b/services/wg/wireguard.go @@ -37,16 +37,18 @@ func NewWireGuard(ctx *app.Context, ifce defs.WireGuardConfig, logger *logrus.En fwManager := newFirewallManager(logger.WithField("component", "iptables")) return &wireGuard{ - ifce: &cfg, - ctx: svcCtx, - cancel: cancel, - svcLogger: logger, - endpointPingMap: &utils.SyncMap[uint32, uint32]{}, - useGvisorNet: useGvisorNet, - virtAddrPingMap: &utils.SyncMap[string, uint32]{}, - fwManager: fwManager, - peerDirectory: make(map[uint32]*pb.WireGuardPeerConfig, 64), - preconnectPeers: make(map[uint32]struct{}, 64), + ifce: &cfg, + ctx: svcCtx, + cancel: cancel, + svcLogger: logger, + endpointPingMap: &utils.SyncMap[uint32, uint32]{}, + useGvisorNet: useGvisorNet, + virtAddrPingMap: &utils.SyncMap[string, uint32]{}, + endpointPingEWMA: make(map[uint32]float64, 64), + virtAddrPingEWMA: make(map[string]float64, 64), + fwManager: fwManager, + peerDirectory: make(map[uint32]*pb.WireGuardPeerConfig, 64), + preconnectPeers: make(map[uint32]struct{}, 64), }, nil } diff --git a/services/wg/wireguard_types.go b/services/wg/wireguard_types.go index 6352efa..c6be537 100644 --- a/services/wg/wireguard_types.go +++ b/services/wg/wireguard_types.go @@ -29,6 +29,10 @@ type wireGuard struct { ifce *defs.WireGuardConfig endpointPingMap *utils.SyncMap[uint32, uint32] // ms virtAddrPingMap *utils.SyncMap[string, uint32] // ms + // ping 平滑器:对“瞬时探测值”做 EWMA 聚合,降低抖动 + pingAggMu sync.Mutex + endpointPingEWMA map[uint32]float64 // peerID -> ema(ms) + virtAddrPingEWMA map[string]float64 // virtAddr -> ema(ms) peerDirectory map[uint32]*pb.WireGuardPeerConfig // 仅用于“预连接/保持连接”的 peer(AllowedIPs 为空),用于后续根据拓扑变化做增删 preconnectPeers map[uint32]struct{}