Files
openlan/pkg/switch/findhop_linux.go
2025-04-13 15:33:35 +08:00

517 lines
11 KiB
Go

package cswitch
import (
"fmt"
"net"
"os/exec"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
co "github.com/luscis/openlan/pkg/config"
"github.com/luscis/openlan/pkg/libol"
"github.com/luscis/openlan/pkg/models"
"github.com/luscis/openlan/pkg/schema"
nl "github.com/vishvananda/netlink"
)
type FindHop struct {
name string
cfg *co.Network
drivers map[string]FindHopDriver
out *libol.SubLogger
lock sync.RWMutex
}
func NewFindHop(name string, cfg *co.Network) *FindHop {
drivers := make(map[string]FindHopDriver, 32)
for key, obj := range cfg.FindHop {
obj.Vrf = cfg.Namespace
drv := newCheckDriver(key, name, obj)
if drv != nil {
drivers[key] = drv
}
}
return &FindHop{
name: name,
cfg: cfg,
drivers: drivers,
out: libol.NewSubLogger("findhop"),
}
}
func newCheckDriver(name string, network string, cfg *co.FindHop) FindHopDriver {
switch cfg.Check {
case "ping":
return NewPingDriver(name, network, cfg)
}
return nil
}
func (h *FindHop) Start() {
h.out.Info("FindHop.Start: drivers size: %d", len(h.drivers))
if len(h.drivers) > 0 {
for _, checker := range h.drivers {
checker.Start()
}
}
}
func (h *FindHop) Stop() {
if len(h.drivers) > 0 {
for _, driver := range h.drivers {
driver.Stop()
}
}
}
// for add findhop dynamicly
func (h *FindHop) addHop(name string, cfg *co.FindHop) error {
if _, ok := h.drivers[name]; ok {
h.out.Error("FindHop.addHop: checker already exists %s", name)
return nil
}
cfg.Vrf = h.cfg.Namespace
driver := newCheckDriver(name, h.name, cfg)
if driver == nil {
return libol.NewErr("FindHop.AddHop: don't support this driver %s", name)
}
h.drivers[name] = driver
driver.Start()
return nil
}
// for del findhop dynamicly
func (h *FindHop) removeHop(name string) error {
if driver, ok := h.drivers[name]; !ok {
h.out.Error("FindHop.addHop: checker not exists %s", name)
return nil
} else {
if driver.HasRoute() {
return libol.NewErr("FindHop.delHop: checker has route %s", name)
}
driver.Stop()
delete(h.drivers, name)
}
return nil
}
func (h *FindHop) LoadHop(findhop string, nlr *nl.Route) {
h.lock.RLock()
defer h.lock.RUnlock()
if driver, ok := h.drivers[findhop]; ok {
h.out.Info("FindHop.LoadHop: %s via %s", nlr.String(), findhop)
driver.LoadRoute(nlr)
} else {
h.out.Error("FindHop.LoadHop: checker not found %s", findhop)
}
}
func (h *FindHop) UnloadHop(findhop string, nlr *nl.Route) {
h.lock.RLock()
defer h.lock.RUnlock()
if driver, ok := h.drivers[findhop]; ok {
h.out.Info("FindHop.UnloadHop: %s via %s", nlr.String(), findhop)
driver.UnloadRoute(nlr)
} else {
h.out.Error("FindHop.UnloadHop: checker not found %s", findhop)
}
}
func (h *FindHop) AddHop(data schema.FindHop) error {
h.lock.Lock()
defer h.lock.Unlock()
cc := &co.FindHop{
Name: data.Name,
Mode: data.Mode,
NextHop: strings.Split(data.NextHop, ","),
Check: data.Check,
}
cc.Correct()
if h.cfg.AddFindHop(cc) {
return h.addHop(data.Name, cc)
}
return nil
}
func (h *FindHop) DelHop(data schema.FindHop) error {
h.lock.Lock()
defer h.lock.Unlock()
cc := &co.FindHop{
Name: data.Name,
}
if err := h.removeHop(data.Name); err == nil {
h.cfg.DelFindHop(cc)
return nil
} else {
return err
}
}
func (h *FindHop) ListHop(call func(obj schema.FindHop)) {
h.lock.RLock()
defer h.lock.RUnlock()
for name, drv := range h.drivers {
cc := drv.Config()
avas := make([]string, 0)
for _, ava := range cc.Available {
avas = append(avas, ava.NextHop)
}
call(schema.FindHop{
Name: name,
Mode: cc.Mode,
NextHop: strings.Join(cc.NextHop, ","),
Check: cc.Check,
Available: strings.Join(avas, ","),
})
}
}
func (h *FindHop) SaveHop() {
h.lock.RLock()
defer h.lock.RUnlock()
h.cfg.SaveFindHop()
}
type FindHopDriver interface {
Name() string
Check([]string) []co.MultiPath
Start()
Stop()
ReloadRoute()
LoadRoute(route *nl.Route)
UnloadRoute(route *nl.Route)
HasRoute() bool
Config() *co.FindHop
}
type FindHopImpl struct {
Network string
routes []*nl.Route
cfg *co.FindHop
out *libol.SubLogger
}
func (c *FindHopImpl) Name() string {
return "common"
}
func (c *FindHopImpl) Start() {
}
func (c *FindHopImpl) Stop() {
}
func (c *FindHopImpl) HasRoute() bool {
return len(c.routes) > 0
}
func (c *FindHopImpl) Check(ipList []string) []co.MultiPath {
return nil
}
func (c *FindHopImpl) UpdateAvailable(mp []co.MultiPath) bool {
if c.cfg.Mode == "load-balance" {
if !compareMultiPaths(mp, c.cfg.Available) {
c.cfg.Available = mp
c.out.Info("FindHopImpl.UpdateAvailable: available %v", c.cfg.Available)
return true
}
} else {
var newPath co.MultiPath
if len(mp) > 0 {
newPath = mp[0]
}
var oldPath co.MultiPath
if len(c.cfg.Available) > 0 {
oldPath = c.cfg.Available[0]
}
if !newPath.CompareEqual(oldPath) {
c.cfg.Available = []co.MultiPath{newPath}
c.out.Info("FindHopImpl.UpdateAvailable: available %v", c.cfg.Available)
return true
}
}
return false
}
func (c *FindHopImpl) ReloadRoute() {
c.out.Debug("FindHopImpl.ReloadRoute: route reload %d", len(c.routes))
for _, rt := range c.routes {
c.updateRoute(rt)
}
}
func (c *FindHopImpl) modelMultiPath() []models.MultiPath {
var modelMultiPath []models.MultiPath
for _, mp := range c.cfg.Available {
modelMultiPath = append(modelMultiPath, models.MultiPath{
NextHop: mp.NextHop,
Weight: mp.Weight,
})
}
return modelMultiPath
}
func (c *FindHopImpl) buildNexthopInfos() []*nl.NexthopInfo {
multiPath := make([]*nl.NexthopInfo, 0, len(c.cfg.Available))
if len(c.cfg.Available) > 0 {
for _, mr := range c.cfg.Available {
nxhe := &nl.NexthopInfo{
Hops: mr.Weight,
Gw: net.ParseIP(mr.NextHop),
}
multiPath = append(multiPath, nxhe)
}
}
return multiPath
}
func (c *FindHopImpl) updateRoute(nlr *nl.Route) {
c.out.Debug("FindHopImpl.updateRoute: %v ", nlr)
multiPath := c.buildNexthopInfos()
nlr.MultiPath = multiPath
promise := libol.NewPromise()
promise.Go(func() error {
if err := nl.RouteReplace(nlr); err != nil {
c.out.Warn("FindHopImpl.updateRoute: %s %s", nlr.String(), err)
return err
}
c.out.Info("FindHopImpl.updateRoute: %s success", nlr.String())
return nil
})
}
func (c *FindHopImpl) LoadRoute(nlr *nl.Route) {
c.out.Debug("FindHopImpl.LoadRoute: %v", nlr)
c.routes = append(c.routes, nlr)
nlr.MultiPath = c.buildNexthopInfos()
nlr.Gw = nil
if len(nlr.MultiPath) == 0 {
c.out.Debug("ignored if no nexthop")
} else {
c.updateRoute(nlr)
}
}
func (c *FindHopImpl) UnloadRoute(rt *nl.Route) {
c.out.Debug("FindHopImpl.UnLoadRoute: %v", rt)
//find route in routes
var nlr *nl.Route
for i, r := range c.routes {
if r.Dst.String() == rt.Dst.String() && r.Table == rt.Table {
nlr = r
c.routes = append(c.routes[:i], c.routes[i+1:]...)
break
}
}
if nlr != nil {
if err := nl.RouteDel(nlr); err != nil {
c.out.Warn("FindHopImpl.UnLoadRoute: %s", err)
return
}
}
}
func (c *FindHopImpl) Config() *co.FindHop {
return c.cfg
}
type PingResult struct {
Ip string
Latency float64
Loss int
}
type PingDriver struct {
*FindHopImpl
CfgName string
Running bool
PingParams *co.PingParams
}
func NewPingDriver(name string, network string, cfg *co.FindHop) *PingDriver {
return &PingDriver{
CfgName: name,
FindHopImpl: &FindHopImpl{
Network: network,
cfg: cfg,
out: libol.NewSubLogger(cfg.Check + "_" + name),
},
PingParams: &cfg.Params,
}
}
func (pc *PingDriver) Name() string {
return "ping"
}
func filter(results []PingResult, condition func(PingResult) bool) []PingResult {
var filteredResults []PingResult
for _, result := range results {
if condition(result) {
filteredResults = append(filteredResults, result)
}
}
return filteredResults
}
func compareMultiPaths(a []co.MultiPath, b []co.MultiPath) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if !a[i].CompareEqual(b[i]) {
return false
}
}
return true
}
// check the ipList and return the available NextHops
func (pc *PingDriver) Check(ipList []string) []co.MultiPath {
count := pc.PingParams.Count
loss := pc.PingParams.Loss
pc.out.Debug("PingDriver.Check: start check ips")
var resultIps []PingResult
for _, ip := range ipList {
avgLatency, packetLoss, err := pc.ping(ip, count)
if err != nil {
continue
}
resultIps = append(resultIps, PingResult{
Ip: ip,
Latency: avgLatency,
Loss: packetLoss,
})
}
// filter loss
filterResultIps := filter(resultIps, func(result PingResult) bool {
return result.Loss <= loss
})
sort.Slice(filterResultIps, func(i, j int) bool {
ii := filterResultIps[i]
jj := filterResultIps[j]
if ii.Loss != jj.Loss {
return ii.Loss < jj.Loss
}
return ii.Ip < jj.Ip
})
var sortedIPs []co.MultiPath
for _, result := range filterResultIps {
sortedIPs = append(sortedIPs, co.MultiPath{
NextHop: result.Ip,
Weight: 1,
})
pc.out.Debug("PingDriver.Check: available %s loss: %d ", result.Ip, result.Loss)
}
return sortedIPs
}
func (pc *PingDriver) updateNextHop() {
ipList := pc.Check(pc.cfg.NextHop)
if pc.UpdateAvailable(ipList) {
pc.ReloadRoute()
}
}
func (pc *PingDriver) update() {
frequency := pc.cfg.Params.Interval
if frequency <= 0 {
frequency = 5
}
//wait tun device start
time.Sleep(time.Second * time.Duration(2))
for pc.Running = true; pc.Running; {
pc.updateNextHop()
time.Sleep(time.Second * time.Duration(frequency))
}
}
func (pc *PingDriver) Start() {
libol.Go(pc.update)
}
func (pc *PingDriver) Stop() {
pc.Running = false
}
func (pc *PingDriver) ping(ip string, count int) (float64, int, error) {
ping, err := exec.LookPath("ping")
if err != nil {
pc.out.Warn("PingDriver.Ping: ping cmd not found: ", err)
}
output := ""
vrf := pc.cfg.Vrf
cstr := strconv.Itoa(count)
if vrf == "" {
output, err = libol.Exec(ping, ip, "-c", cstr)
} else {
ipcmd, err := exec.LookPath("ip")
if err != nil {
pc.out.Warn("PingDriver.Ping: ip cmd not found: ", err)
}
output, err = libol.Exec(ipcmd, "vrf", "exec", vrf, ping, ip, "-c", cstr)
}
pc.out.Debug("PingDriver.Ping: exec %s ping %s %s", vrf, ip, output)
if err != nil {
return 0, 0, err
}
avgLatency := pc.extractLatency(output)
LossRate, err := pc.extractLoss(output)
if err != nil {
pc.out.Error("PingDriver.Ping:parse loss error : %s", err)
return 0, 0, err
}
packetLoss := int(LossRate * float64(count) / 100)
pc.out.Debug("PingDriver.Ping: ping ip[%s] loss:%.f%", ip, avgLatency, LossRate)
return avgLatency, packetLoss, nil
}
func (pc *PingDriver) extractLatency(outputStr string) float64 {
pattern := `rtt min/avg/max/mdev = (\d+\.*\d*)/(\d+\.*\d*)/(\d+\.*\d*)/(\d+\.*\d*) ms`
re := regexp.MustCompile(pattern)
subMatches := re.FindStringSubmatch(outputStr)
if len(subMatches) != 5 {
pc.out.Error("PingDriver.Ping: Cannot extract average delay.")
return 0
}
avgLatencyStr := subMatches[2]
avgLatency, err := strconv.ParseFloat(avgLatencyStr, 64)
if err != nil {
pc.out.Error("PingDriver.Ping: parse float error : %s", err)
return 0
}
return avgLatency
}
func (pc *PingDriver) extractLoss(outputStr string) (float64, error) {
re := regexp.MustCompile(`(\d+)% packet loss`)
match := re.FindStringSubmatch(outputStr)
if len(match) < 2 {
return 0, fmt.Errorf("PingDriver.Ping: packet loss parse error")
}
lossRate, err := strconv.ParseFloat(match[1], 64)
if err != nil {
return 0, err
}
return lossRate, nil
}