From db6070db7b9dc4bb6a3d7cf4a637faaeca225745 Mon Sep 17 00:00:00 2001 From: VaalaCat Date: Wed, 12 Nov 2025 15:43:28 +0000 Subject: [PATCH] feat: add virt ping map to route --- services/wg/topology_cache.go | 24 +++++++++++++++++++----- services/wg/wireguard.go | 11 +++++++++++ 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/services/wg/topology_cache.go b/services/wg/topology_cache.go index 0b36e28..f20150e 100644 --- a/services/wg/topology_cache.go +++ b/services/wg/topology_cache.go @@ -16,7 +16,7 @@ var ( type networkTopologyCache struct { wireguardRuntimeInfoMap *utils.SyncMap[uint, *pb.WGDeviceRuntimeInfo] // wireguardId -> peerRuntimeInfo fromToLatencyMap *utils.SyncMap[string, uint32] // fromWGID::toWGID -> latencyMs - virtAddrPingMap *utils.SyncMap[string, uint32] // virtAddr -> pingMs + virtAddrPingMap *utils.SyncMap[string, uint32] // fromWGID::toWGID -> pingMs } func NewNetworkTopologyCache() *networkTopologyCache { @@ -36,8 +36,9 @@ func (c *networkTopologyCache) SetRuntimeInfo(wireguardId uint, runtimeInfo *pb. for toWireGuardId, latencyMs := range runtimeInfo.GetPingMap() { c.fromToLatencyMap.Store(parseFromToLatencyKey(wireguardId, uint(toWireGuardId)), latencyMs) } + for virtAddr, pingMs := range runtimeInfo.GetVirtAddrPingMap() { - c.virtAddrPingMap.Store(virtAddr, pingMs) + c.virtAddrPingMap.Store(parseFromToLatencyKey(wireguardId, uint(runtimeInfo.PeerVirtAddrMap[virtAddr])), pingMs) } } @@ -46,11 +47,24 @@ func (c *networkTopologyCache) DeleteRuntimeInfo(wireguardId uint) { } func (c *networkTopologyCache) GetLatencyMs(fromWGID, toWGID uint) (uint32, bool) { - v1, ok := c.fromToLatencyMap.Load(parseFromToLatencyKey(fromWGID, toWGID)) + + endpointLatency, ok := c.fromToLatencyMap.Load(parseFromToLatencyKey(fromWGID, toWGID)) if !ok { - return c.fromToLatencyMap.Load(parseFromToLatencyKey(toWGID, fromWGID)) + endpointLatency, ok = c.fromToLatencyMap.Load(parseFromToLatencyKey(toWGID, fromWGID)) + if !ok { + return 0, false + } } - return v1, true + + virtAddrLatency, ok := c.virtAddrPingMap.Load(parseFromToLatencyKey(fromWGID, toWGID)) + if !ok { + virtAddrLatency, ok = c.virtAddrPingMap.Load(parseFromToLatencyKey(toWGID, fromWGID)) + if !ok { + return endpointLatency, false + } + } + + return (endpointLatency + virtAddrLatency) / 2, true } func parseFromToLatencyKey(fromWGID, toWGID uint) string { diff --git a/services/wg/wireguard.go b/services/wg/wireguard.go index 010db38..a5b3207 100644 --- a/services/wg/wireguard.go +++ b/services/wg/wireguard.go @@ -676,6 +676,8 @@ func (w *wireGuard) pingPeers() { return } + log.Debugf("start to ping peers, len: %d", len(ifceConfig.Peers)) + peers := ifceConfig.Peers var waitGroup conc.WaitGroup @@ -699,6 +701,7 @@ func (w *wireGuard) pingPeers() { } epPinger.Count = 5 + epPinger.Timeout = 10 * time.Second epPinger.OnFinish = func(stats *probing.Statistics) { // stats.PacketsSent, stats.PacketsRecv, stats.PacketLoss @@ -725,8 +728,10 @@ func (w *wireGuard) pingPeers() { waitGroup.Go(func() { if err := epPinger.Run(); err != nil { log.WithError(err).Errorf("failed to run pinger for %s", addr) + w.endpointPingMap.Store(peer.Id, math.MaxUint32) return } + log.Debugf("ping endpoint [%s] completed", addr) }) } @@ -744,6 +749,8 @@ func (w *wireGuard) pingPeers() { } virtAddrPinger.Count = 5 + virtAddrPinger.InterfaceName = w.ifce.GetInterfaceName() + virtAddrPinger.Timeout = 10 * time.Second virtAddrPinger.OnFinish = func(stats *probing.Statistics) { log.Tracef("ping stats for %s: %v", addr, stats) avgRttMs := uint32(stats.AvgRtt.Milliseconds()) @@ -764,11 +771,15 @@ func (w *wireGuard) pingPeers() { waitGroup.Go(func() { if err := virtAddrPinger.Run(); err != nil { log.WithError(err).Errorf("failed to run pinger for %s", addr) + w.virtAddrPingMap.Store(addr, math.MaxUint32) return } + log.Debugf("ping virt addr [%s] completed", addr) }) } + log.Debugf("wait for pingers to complete") + rcs := waitGroup.WaitAndRecover() if rcs != nil { log.WithError(rcs.AsError()).Errorf("failed to wait for pingers")