feat: implement ping smoothing for endpoint and virtual address pings

This commit is contained in:
VaalaCat
2025-12-13 16:47:46 +00:00
parent 15fa2a2a83
commit 7f1d5233a1
5 changed files with 243 additions and 73 deletions

View File

@@ -26,6 +26,14 @@ const (
endpointPingTimeout = 10 * time.Second
)
// ping 平滑参数(用于上报到 master 的 runtimeInfo.PingMap / VirtAddrPingMap
const (
pingSmoothAlpha = 0.30 // EWMA: new*alpha + old*(1-alpha)
pingSmoothMinMs uint32 = 1
pingSmoothMaxMs uint32 = 1500
pingSmoothBucketMs uint32 = 5 // 分桶:减少 1-2ms 的抖动引起的波动
)
func (w *wireGuard) pingPeers() {
log := w.svcLogger.WithField("op", "pingPeers")
@@ -149,17 +157,13 @@ func (w *wireGuard) scheduleVirtualAddrPings(log *logrus.Entry, ifceConfig *defs
avg, err := tcpPingAvg(tcpAddr, endpointPingCount, endpointPingTimeout)
if err != nil {
log.WithError(err).Errorf("failed to tcp ping virt addr %s via %s", addr, tcpAddr)
if w.virtAddrPingMap != nil {
w.virtAddrPingMap.Store(addr, math.MaxUint32)
}
w.storeVirtAddrPing(addr, math.MaxUint32)
return
}
log.Tracef("tcp ping stats for %s via %s: avg=%s", addr, tcpAddr, avg)
avgRttMs := uint32(avg.Milliseconds())
if w.virtAddrPingMap != nil {
w.virtAddrPingMap.Store(addr, avgRttMs)
}
w.storeVirtAddrPing(addr, avgRttMs)
log.Debugf("tcp ping virt addr [%s] completed via %s", addr, tcpAddr)
})
}
@@ -177,7 +181,87 @@ func (w *wireGuard) storeEndpointPing(peerID uint32, ms uint32) {
if w.endpointPingMap == nil {
return
}
w.endpointPingMap.Store(peerID, ms)
w.endpointPingMap.Store(peerID, w.smoothEndpointPing(peerID, ms))
}
func (w *wireGuard) storeVirtAddrPing(addr string, ms uint32) {
if w.virtAddrPingMap == nil || addr == "" {
return
}
w.virtAddrPingMap.Store(addr, w.smoothVirtAddrPing(addr, ms))
}
func clampPingMs(ms uint32) uint32 {
if ms == 0 {
ms = 1
}
if ms < pingSmoothMinMs {
return pingSmoothMinMs
}
if ms > pingSmoothMaxMs {
return pingSmoothMaxMs
}
return ms
}
func bucketPingMs(ms uint32) uint32 {
if pingSmoothBucketMs == 0 {
return ms
}
b := pingSmoothBucketMs
// 四舍五入到最近桶
return ((ms + b/2) / b) * b
}
func (w *wireGuard) smoothEndpointPing(peerID uint32, raw uint32) uint32 {
// 不可达哨兵值:直接上报不可达,但保留历史 EWMA 以便恢复时平滑
if raw == math.MaxUint32 {
return math.MaxUint32
}
raw = bucketPingMs(clampPingMs(raw))
v := float64(raw)
w.pingAggMu.Lock()
defer w.pingAggMu.Unlock()
if w.endpointPingEWMA == nil {
w.endpointPingEWMA = make(map[uint32]float64, 64)
}
old, ok := w.endpointPingEWMA[peerID]
if !ok || old <= 0 {
w.endpointPingEWMA[peerID] = v
return raw
}
ema := pingSmoothAlpha*v + (1.0-pingSmoothAlpha)*old
w.endpointPingEWMA[peerID] = ema
ms := uint32(math.Round(ema))
ms = bucketPingMs(clampPingMs(ms))
return ms
}
func (w *wireGuard) smoothVirtAddrPing(addr string, raw uint32) uint32 {
if raw == math.MaxUint32 {
return math.MaxUint32
}
raw = bucketPingMs(clampPingMs(raw))
v := float64(raw)
w.pingAggMu.Lock()
defer w.pingAggMu.Unlock()
if w.virtAddrPingEWMA == nil {
w.virtAddrPingEWMA = make(map[string]float64, 64)
}
old, ok := w.virtAddrPingEWMA[addr]
if !ok || old <= 0 {
w.virtAddrPingEWMA[addr] = v
return raw
}
ema := pingSmoothAlpha*v + (1.0-pingSmoothAlpha)*old
w.virtAddrPingEWMA[addr] = ema
ms := uint32(math.Round(ema))
ms = bucketPingMs(clampPingMs(ms))
return ms
}
// collectEndpointPingTargets 收集所有“可能直连”的节点 endpoint高内聚只关注 ping 需要的目标集合)。

View File

@@ -10,51 +10,8 @@ import (
"github.com/VaalaCat/frp-panel/models"
"github.com/VaalaCat/frp-panel/pb"
"github.com/VaalaCat/frp-panel/services/app"
)
// RoutingPolicy 决定边权重的计算方式。
// cost = LatencyWeight*latency_ms + InverseBandwidthWeight*(1/max(up_mbps,1e-6)) + HopWeight + HandshakePenalty
type RoutingPolicy struct {
LatencyWeight float64
InverseBandwidthWeight float64
HopWeight float64
MinUpMbps uint32
DefaultEndpointUpMbps uint32
DefaultEndpointLatencyMs uint32
OfflineThreshold time.Duration
// HandshakeStaleThreshold/HandshakeStalePenalty 用于抑制“握手过旧”的链路被选为最短路。
// 仅在能从 runtimeInfo 中找到对应 peer 的 last_handshake_time_sec 时生效;否则不惩罚(避免误伤)。
HandshakeStaleThreshold time.Duration
HandshakeStalePenalty float64
ACL *ACL
NetworkTopologyCache app.NetworkTopologyCache
CliMgr app.ClientsManager
}
func (p *RoutingPolicy) LoadACL(acl *ACL) *RoutingPolicy {
p.ACL = acl
return p
}
func DefaultRoutingPolicy(acl *ACL, networkTopologyCache app.NetworkTopologyCache, cliMgr app.ClientsManager) RoutingPolicy {
return RoutingPolicy{
LatencyWeight: 1.0,
InverseBandwidthWeight: 50.0, // 对低带宽路径给予更高惩罚
HopWeight: 1.0,
DefaultEndpointUpMbps: 50,
DefaultEndpointLatencyMs: 30,
OfflineThreshold: 2 * time.Minute,
// 默认启用一个温和的“握手过旧惩罚”:优先选择近期有握手的链路,但不至于强制剔除路径。
HandshakeStaleThreshold: 5 * time.Minute,
HandshakeStalePenalty: 30.0,
ACL: acl,
NetworkTopologyCache: networkTopologyCache,
CliMgr: cliMgr,
}
}
type AllowedIPsPlanner interface {
// Compute 基于拓扑与链路指标,计算每个节点应配置到直连邻居的 AllowedIPs。
// 输入的 peers 应包含同一 Network 下的所有 WireGuard 节点links 为其有向链路。
@@ -374,12 +331,10 @@ func buildAdjacency(order []uint, idToPeer map[uint]*models.WireGuard, links []*
}
latency := policy.DefaultEndpointLatencyMs
// GetLatencyMs 已自带“正反向兜底 + endpoint/virt ping 组合”,这里避免重复查询与覆盖,减少抖动
if latencyMs, ok := policy.NetworkTopologyCache.GetLatencyMs(from, to); ok {
latency = latencyMs
}
if latencyMs, ok := policy.NetworkTopologyCache.GetLatencyMs(to, from); ok {
latency = latencyMs
}
if lastSeenAt, ok := policy.CliMgr.GetLastSeenAt(idToPeer[from].ClientID); !ok || time.Since(lastSeenAt) > policy.OfflineThreshold {
continue
@@ -480,16 +435,7 @@ func runAllPairsDijkstra(order []uint, adj map[uint][]Edge, idToPeer map[uint]*m
}
visited[u] = true
for _, e := range adj[u] {
invBw := 1.0 / math.Max(float64(e.upMbps), 1e-6)
handshakePenalty := 0.0
if policy.HandshakeStalePenalty > 0 && policy.HandshakeStaleThreshold > 0 {
// 握手惩罚必须是“无方向”的,否则会导致 A->B 与 B->A 权重不一致,
// 进而产生单向选路WireGuard AllowedIPs 源地址校验下会丢包)。
if age, ok := getHandshakeAgeBetween(u, e.to, idToPeer, policy); ok && age > policy.HandshakeStaleThreshold {
handshakePenalty = policy.HandshakeStalePenalty
}
}
w := policy.LatencyWeight*float64(e.latency) + policy.InverseBandwidthWeight*invBw + policy.HopWeight + handshakePenalty
w := policy.EdgeWeight(u, e, idToPeer)
alt := dist[u] + w
if alt < dist[e.to] {
dist[e.to] = alt

View File

@@ -0,0 +1,134 @@
package wg
import (
"math"
"time"
"github.com/VaalaCat/frp-panel/models"
"github.com/VaalaCat/frp-panel/services/app"
)
// RoutingPolicy 决定边权重的计算方式。
// cost = LatencyTerm + InverseBandwidthTerm + HopWeight + HandshakePenalty
type RoutingPolicy struct {
LatencyWeight float64
InverseBandwidthWeight float64
HopWeight float64
MinUpMbps uint32
// LatencyBucketMs 用于对 latency 做“分桶/量化”,降低抖动导致的最短路频繁切换。
// 例如 bucket=5ms则 31/32/33ms 都会被量化为 30/35ms 附近的同一档。
LatencyBucketMs uint32
// MinLatencyMs/MaxLatencyMs 用于对 latency 做限幅,避免异常值对最短路产生过强扰动。
MinLatencyMs uint32
MaxLatencyMs uint32
// LatencyLogScale>0 时,对 latency 使用 log1p 变换并乘以该系数,使权重对小幅抖动更不敏感。
// 若为 0则回退为线性 latency。
LatencyLogScale float64
DefaultEndpointUpMbps uint32
DefaultEndpointLatencyMs uint32
OfflineThreshold time.Duration
// HandshakeStaleThreshold/HandshakeStalePenalty 用于抑制“握手过旧”的链路被选为最短路。
// 仅在能从 runtimeInfo 中找到对应 peer 的 last_handshake_time_sec 时生效;否则不惩罚(避免误伤)。
HandshakeStaleThreshold time.Duration
HandshakeStalePenalty float64
ACL *ACL
NetworkTopologyCache app.NetworkTopologyCache
CliMgr app.ClientsManager
}
func (p *RoutingPolicy) LoadACL(acl *ACL) *RoutingPolicy {
p.ACL = acl
return p
}
func DefaultRoutingPolicy(acl *ACL, networkTopologyCache app.NetworkTopologyCache, cliMgr app.ClientsManager) RoutingPolicy {
return RoutingPolicy{
LatencyWeight: 1.0,
InverseBandwidthWeight: 50.0, // 对低带宽路径给予更高惩罚
HopWeight: 1.0,
MinUpMbps: 1,
LatencyBucketMs: 5,
MinLatencyMs: 1,
MaxLatencyMs: 1500,
LatencyLogScale: 10.0,
DefaultEndpointUpMbps: 50,
DefaultEndpointLatencyMs: 30,
OfflineThreshold: 2 * time.Minute,
// 默认启用一个温和的“握手过旧惩罚”:优先选择近期有握手的链路,但不至于强制剔除路径。
HandshakeStaleThreshold: 5 * time.Minute,
HandshakeStalePenalty: 30.0,
ACL: acl,
NetworkTopologyCache: networkTopologyCache,
CliMgr: cliMgr,
}
}
// EdgeWeight 计算一条“有向边”的权重(越小越优)。
// 为了抑制延迟探测的噪声导致路由频繁抖动,这里对 latency 做了:限幅 + 分桶(可选)+ log1p可选
func (p *RoutingPolicy) EdgeWeight(fromWGID uint, e Edge, idToPeer map[uint]*models.WireGuard) float64 {
lat := float64(e.latency)
// 1) 延迟限幅
minLat := float64(p.MinLatencyMs)
maxLat := float64(p.MaxLatencyMs)
if minLat <= 0 {
minLat = 1
}
if maxLat <= 0 {
maxLat = 1500
}
if lat < minLat {
lat = minLat
}
if lat > maxLat {
lat = maxLat
}
// 2) 延迟分桶(量化)
if p.LatencyBucketMs > 0 {
b := float64(p.LatencyBucketMs)
// 四舍五入到最近桶
lat = math.Floor((lat+b/2)/b) * b
if lat < minLat {
lat = minLat
}
if lat > maxLat {
lat = maxLat
}
}
// 3) 延迟项log1p可选+ scale
latencyTerm := 0.0
if p.LatencyWeight != 0 {
if p.LatencyLogScale > 0 {
latencyTerm = p.LatencyWeight * math.Log1p(lat) * p.LatencyLogScale
} else {
latencyTerm = p.LatencyWeight * lat
}
}
// 4) 带宽项:对低带宽更敏感;使用 MinUpMbps 做下限避免极端值
minUp := float64(p.MinUpMbps)
if minUp <= 0 {
minUp = 1
}
up := math.Max(float64(e.upMbps), minUp)
invBw := 1.0 / math.Max(up, 1e-6)
bwTerm := p.InverseBandwidthWeight * invBw
// 5) hop 项
hopTerm := p.HopWeight
// 6) 握手过旧惩罚:必须无方向
handshakePenalty := 0.0
if p.HandshakeStalePenalty > 0 && p.HandshakeStaleThreshold > 0 {
if age, ok := getHandshakeAgeBetween(fromWGID, e.to, idToPeer, *p); ok && age > p.HandshakeStaleThreshold {
handshakePenalty = p.HandshakeStalePenalty
}
}
return latencyTerm + bwTerm + hopTerm + handshakePenalty
}

View File

@@ -37,16 +37,18 @@ func NewWireGuard(ctx *app.Context, ifce defs.WireGuardConfig, logger *logrus.En
fwManager := newFirewallManager(logger.WithField("component", "iptables"))
return &wireGuard{
ifce: &cfg,
ctx: svcCtx,
cancel: cancel,
svcLogger: logger,
endpointPingMap: &utils.SyncMap[uint32, uint32]{},
useGvisorNet: useGvisorNet,
virtAddrPingMap: &utils.SyncMap[string, uint32]{},
fwManager: fwManager,
peerDirectory: make(map[uint32]*pb.WireGuardPeerConfig, 64),
preconnectPeers: make(map[uint32]struct{}, 64),
ifce: &cfg,
ctx: svcCtx,
cancel: cancel,
svcLogger: logger,
endpointPingMap: &utils.SyncMap[uint32, uint32]{},
useGvisorNet: useGvisorNet,
virtAddrPingMap: &utils.SyncMap[string, uint32]{},
endpointPingEWMA: make(map[uint32]float64, 64),
virtAddrPingEWMA: make(map[string]float64, 64),
fwManager: fwManager,
peerDirectory: make(map[uint32]*pb.WireGuardPeerConfig, 64),
preconnectPeers: make(map[uint32]struct{}, 64),
}, nil
}

View File

@@ -29,6 +29,10 @@ type wireGuard struct {
ifce *defs.WireGuardConfig
endpointPingMap *utils.SyncMap[uint32, uint32] // ms
virtAddrPingMap *utils.SyncMap[string, uint32] // ms
// ping 平滑器:对“瞬时探测值”做 EWMA 聚合,降低抖动
pingAggMu sync.Mutex
endpointPingEWMA map[uint32]float64 // peerID -> ema(ms)
virtAddrPingEWMA map[string]float64 // virtAddr -> ema(ms)
peerDirectory map[uint32]*pb.WireGuardPeerConfig
// 仅用于“预连接/保持连接”的 peerAllowedIPs 为空),用于后续根据拓扑变化做增删
preconnectPeers map[uint32]struct{}