refactor: use informer to list&watch pod&service ip for adding to route table (#610)

This commit is contained in:
naison
2025-05-23 10:09:06 +08:00
committed by GitHub
parent 6d545dc5c9
commit 75c609211b
5 changed files with 280 additions and 332 deletions

View File

@@ -17,12 +17,8 @@ import (
miekgdns "github.com/miekg/dns"
"github.com/pkg/errors"
v12 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
v13 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"tailscale.com/net/dns"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
@@ -31,10 +27,11 @@ import (
)
type Config struct {
Config *miekgdns.ClientConfig
Ns []string
Services []v12.Service
TunName string
Config *miekgdns.ClientConfig
Ns []string
Services []v12.Service
SvcInformer cache.SharedIndexInformer
TunName string
Hosts []Entry
Lock *sync.Mutex
@@ -45,22 +42,8 @@ type Config struct {
OSConfigurator dns.OSConfigurator
}
func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts ...Entry) error {
func (c *Config) AddServiceNameToHosts(ctx context.Context, hosts ...Entry) error {
var serviceList []v12.Service
//listOptions := v1.ListOptions{Limit: 100}
//for {
// services, err := serviceInterface.List(ctx, listOptions)
// if err != nil {
// break
// }
// serviceList = append(serviceList, services.Items...)
// if services.Continue != "" {
// listOptions.Continue = services.Continue
// } else {
// break
// }
//}
c.Lock.Lock()
defer c.Lock.Unlock()
@@ -71,100 +54,65 @@ func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13
return err
}
go c.watchServiceToAddHosts(ctx, serviceInterface, hosts)
go c.watchServiceToAddHosts(ctx, hosts)
return nil
}
func (c *Config) watchServiceToAddHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts []Entry) {
func (c *Config) watchServiceToAddHosts(ctx context.Context, hosts []Entry) {
defer util.HandleCrash()
ticker := time.NewTicker(time.Second * 15)
defer ticker.Stop()
immediate := make(chan struct{}, 1)
immediate <- struct{}{}
var ErrChanDone = errors.New("watch service chan done")
for ctx.Err() == nil {
err := func() error {
w, err := serviceInterface.Watch(ctx, v1.ListOptions{Watch: true})
if err != nil {
return err
_, err := c.SvcInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
if svc, ok := obj.(*v12.Service); ok && svc.Namespace == c.Ns[0] {
return true
} else {
return false
}
defer w.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-w.ResultChan():
if !ok {
return ErrChanDone
}
svc, ok := event.Object.(*v12.Service)
if !ok {
continue
}
if ctx.Err() != nil {
return ctx.Err()
}
if event.Type == watch.Deleted {
if net.ParseIP(svc.Spec.ClusterIP) == nil {
continue
}
var list = []Entry{{
IP: svc.Spec.ClusterIP,
Domain: svc.Name,
}}
err = c.removeHosts(list)
if err != nil {
plog.G(ctx).Errorf("Failed to remove hosts(%s) to hosts: %v", entryList2String(list), err)
}
}
if event.Type == watch.Added {
c.Lock.Lock()
appendHosts := c.generateAppendHosts([]v12.Service{*svc}, hosts)
err = c.appendHosts(appendHosts)
c.Lock.Unlock()
if err != nil {
plog.G(ctx).Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err)
}
}
case <-ticker.C:
var list *v12.ServiceList
list, err = serviceInterface.List(ctx, v1.ListOptions{})
if err != nil {
continue
}
c.Lock.Lock()
appendHosts := c.generateAppendHosts(list.Items, hosts)
err = c.appendHosts(appendHosts)
c.Lock.Unlock()
if err != nil {
plog.G(ctx).Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err)
}
case <-immediate:
var list *v12.ServiceList
list, err = serviceInterface.List(ctx, v1.ListOptions{})
if err != nil {
continue
}
c.Lock.Lock()
appendHosts := c.generateAppendHosts(list.Items, hosts)
err = c.appendHosts(appendHosts)
c.Lock.Unlock()
if err != nil {
plog.G(ctx).Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err)
}
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ticker.Reset(time.Second * 3)
},
UpdateFunc: func(oldObj, newObj interface{}) {
ticker.Reset(time.Second * 3)
},
DeleteFunc: func(obj interface{}) {
ticker.Reset(time.Second * 3)
},
},
})
if err != nil {
plog.G(ctx).Errorf("Failed to add service event handler: %v", err)
return
}
for ; ctx.Err() == nil; <-ticker.C {
ticker.Reset(time.Second * 15)
serviceList, err := c.SvcInformer.GetIndexer().ByIndex(cache.NamespaceIndex, c.Ns[0])
if err != nil {
plog.G(ctx).Errorf("Failed to list service by namespace %s: %v", c.Ns[0], err)
continue
}
var services []v12.Service
for _, service := range serviceList {
svc, ok := service.(*v12.Service)
if !ok {
continue
}
}()
services = append(services, *svc)
}
if len(services) == 0 {
continue
}
if ctx.Err() != nil {
return
}
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ErrChanDone) {
plog.G(ctx).Debugf("Failed to watch service to add route table: %v", err)
}
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
time.Sleep(time.Second * 1)
} else {
time.Sleep(time.Millisecond * 200)
c.Lock.Lock()
appendHosts := c.generateAppendHosts(services, hosts)
err = c.appendHosts(appendHosts)
c.Lock.Unlock()
if err != nil && !errors.Is(err, context.Canceled) {
plog.G(ctx).Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err)
}
}
}

View File

@@ -6,10 +6,10 @@ import (
"bytes"
"context"
"fmt"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"os"
"os/exec"
"path/filepath"
"slices"
"strings"
"time"
@@ -17,6 +17,10 @@ import (
miekgdns "github.com/miekg/dns"
v12 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
// https://github.com/golang/go/issues/12524
@@ -32,6 +36,59 @@ var resolv = "/etc/resolv.conf"
// service.namespace.svc.cluster:port
// service.namespace.svc.cluster.local:port
func (c *Config) SetupDNS(ctx context.Context) error {
defer util.HandleCrash()
ticker := time.NewTicker(time.Second * 15)
_, err := c.SvcInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
if svc, ok := obj.(*v12.Service); ok && svc.Namespace == c.Ns[0] {
return true
} else {
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ticker.Reset(time.Second * 3)
},
UpdateFunc: func(oldObj, newObj interface{}) {
ticker.Reset(time.Second * 3)
},
DeleteFunc: func(obj interface{}) {
ticker.Reset(time.Second * 3)
},
},
})
if err != nil {
plog.G(ctx).Errorf("Failed to add service event handler: %v", err)
return err
}
go func() {
defer ticker.Stop()
for ; ctx.Err() == nil; <-ticker.C {
ticker.Reset(time.Second * 15)
serviceList, err := c.SvcInformer.GetIndexer().ByIndex(cache.NamespaceIndex, c.Ns[0])
if err != nil {
plog.G(ctx).Errorf("Failed to list service by namespace %s: %v", c.Ns[0], err)
continue
}
var services []v12.Service
for _, service := range serviceList {
svc, ok := service.(*v12.Service)
if !ok {
continue
}
services = append(services, *svc)
}
if len(services) == 0 {
continue
}
if ctx.Err() != nil {
return
}
c.Services = services
c.usingResolver(ctx)
}
}()
c.usingResolver(ctx)
return nil
}
@@ -72,6 +129,9 @@ func (c *Config) usingResolver(ctx context.Context) {
plog.G(ctx).Errorf("Parse resolver %s error: %v", filename, err)
continue
}
if slices.Contains(conf.Servers, clientConfig.Servers[0]) {
continue
}
// insert current name server to first location
conf.Servers = append([]string{clientConfig.Servers[0]}, conf.Servers...)
err = os.WriteFile(filename, []byte(toString(*conf)), 0644)

View File

@@ -32,15 +32,16 @@ import (
"k8s.io/apimachinery/pkg/labels"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
pkgtypes "k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/resource"
runtimeresource "k8s.io/cli-runtime/pkg/resource"
informerv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
v2 "k8s.io/client-go/kubernetes/typed/networking/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/kubectl/pkg/cmd/set"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
@@ -280,13 +281,14 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <-
return
}
plog.G(ctx).Infof("Adding Pod IP and Service IP to route table...")
if err = c.addRouteDynamic(c.ctx); err != nil {
var svcInformer cache.SharedIndexInformer
if svcInformer, _, err = c.addRouteDynamic(c.ctx); err != nil {
plog.G(ctx).Errorf("Add route dynamic failed: %v", err)
return
}
go c.deleteFirewallRule(c.ctx)
plog.G(ctx).Infof("Configuring DNS service...")
if err = c.setupDNS(c.ctx); err != nil {
if err = c.setupDNS(c.ctx, svcInformer); err != nil {
plog.G(ctx).Errorf("Configure DNS failed: %v", err)
return
}
@@ -490,57 +492,116 @@ func (c *ConnectOptions) startLocalTunServer(ctx context.Context, forwardAddress
}
// Listen all pod, add route if needed
func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
podNs, svcNs, err1 := util.GetNsForListPodAndSvc(ctx, c.clientset, []string{v1.NamespaceAll, c.OriginNamespace})
if err1 != nil {
return err1
func (c *ConnectOptions) addRouteDynamic(ctx context.Context) (cache.SharedIndexInformer, cache.SharedIndexInformer, error) {
podNs, svcNs, err := util.GetNsForListPodAndSvc(ctx, c.clientset, []string{v1.NamespaceAll, c.OriginNamespace})
if err != nil {
return nil, nil, err
}
conf := rest.CopyConfig(c.config)
conf.QPS = 1
conf.Burst = 2
clientSet, err := kubernetes.NewForConfig(conf)
if err != nil {
plog.G(ctx).Errorf("Failed to create clientset: %v", err)
return nil, nil, err
}
svcIndexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
svcInformer := informerv1.NewServiceInformer(clientSet, svcNs, 0, svcIndexers)
svcTicker := time.NewTicker(time.Second * 15)
_, err = svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
svcTicker.Reset(time.Second * 3)
},
UpdateFunc: func(oldObj, newObj interface{}) {
svcTicker.Reset(time.Second * 3)
},
DeleteFunc: func(obj interface{}) {
svcTicker.Reset(time.Second * 3)
},
})
if err != nil {
plog.G(ctx).Errorf("Failed to add service event handler: %v", err)
return nil, nil, err
}
go svcInformer.Run(ctx.Done())
go func() {
var listDone bool
for ctx.Err() == nil {
err := func() error {
if !listDone {
err := util.ListService(ctx, c.clientset.CoreV1().Services(svcNs), c.addRoute)
if err != nil {
return err
}
listDone = true
defer svcTicker.Stop()
for ; ctx.Err() == nil; <-svcTicker.C {
svcTicker.Reset(time.Second * 15)
serviceList := svcInformer.GetIndexer().List()
var ips = sets.New[string]()
for _, service := range serviceList {
svc, ok := service.(*v1.Service)
if !ok {
continue
}
err := util.WatchServiceToAddRoute(ctx, c.clientset.CoreV1().Services(svcNs), c.addRoute)
return err
}()
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
time.Sleep(time.Second * 10)
} else {
time.Sleep(time.Second * 2)
ips.Insert(svc.Spec.ClusterIP)
ips.Insert(svc.Spec.ClusterIPs...)
}
if ctx.Err() != nil {
return
}
if ips.Len() == 0 {
continue
}
err := c.addRoute(ips.UnsortedList()...)
if err != nil {
plog.G(ctx).Debugf("Add service IP to route table failed: %v", err)
}
}
}()
podIndexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
podInformer := informerv1.NewPodInformer(clientSet, podNs, 0, podIndexers)
podTicker := time.NewTicker(time.Second * 15)
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
podTicker.Reset(time.Second * 3)
},
UpdateFunc: func(oldObj, newObj interface{}) {
podTicker.Reset(time.Second * 3)
},
DeleteFunc: func(obj interface{}) {
podTicker.Reset(time.Second * 3)
},
})
if err != nil {
plog.G(ctx).Errorf("Failed to add service event handler: %v", err)
return nil, nil, err
}
go podInformer.Run(ctx.Done())
go func() {
var listDone bool
for ctx.Err() == nil {
err := func() error {
if !listDone {
err := util.ListPod(ctx, c.clientset.CoreV1().Pods(podNs), c.addRoute)
if err != nil {
return err
}
listDone = true
defer podTicker.Stop()
for ; ctx.Err() == nil; <-podTicker.C {
podTicker.Reset(time.Second * 15)
podList := podInformer.GetIndexer().List()
var ips = sets.New[string]()
for _, pod := range podList {
p, ok := pod.(*v1.Pod)
if !ok {
continue
}
err := util.WatchPodToAddRoute(ctx, c.clientset.CoreV1().Pods(podNs), c.addRoute)
return err
}()
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
time.Sleep(time.Second * 10)
} else {
time.Sleep(time.Second * 2)
if p.Spec.HostNetwork {
continue
}
ips.Insert(util.GetPodIP(*p)...)
}
if ctx.Err() != nil {
return
}
if ips.Len() == 0 {
continue
}
err := c.addRoute(ips.UnsortedList()...)
if err != nil {
plog.G(ctx).Debugf("Add pod IP to route table failed: %v", err)
}
}
}()
return nil
return svcInformer, podInformer, nil
}
func (c *ConnectOptions) addRoute(ipStrList ...string) error {
@@ -548,6 +609,7 @@ func (c *ConnectOptions) addRoute(ipStrList ...string) error {
return nil
}
var routes []types.Route
r, _ := netroute.New()
for _, ipStr := range ipStrList {
ip := net.ParseIP(ipStr)
if ip == nil {
@@ -570,7 +632,7 @@ func (c *ConnectOptions) addRoute(ipStrList ...string) error {
} else {
mask = net.CIDRMask(128, 128)
}
if r, err := netroute.New(); err == nil {
if r != nil {
ifi, _, _, err := r.Route(ip)
if err == nil && ifi.Name == c.tunName {
continue
@@ -578,6 +640,9 @@ func (c *ConnectOptions) addRoute(ipStrList ...string) error {
}
routes = append(routes, types.Route{Dst: net.IPNet{IP: ip, Mask: mask}})
}
if len(routes) == 0 {
return nil
}
err := tun.AddRoutes(c.tunName, routes...)
return err
}
@@ -600,7 +665,7 @@ func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) {
util.DeleteBlockFirewallRule(ctx)
}
func (c *ConnectOptions) setupDNS(ctx context.Context) error {
func (c *ConnectOptions) setupDNS(ctx context.Context, svcInformer cache.SharedIndexInformer) error {
const portTCP = 10800
podList, err := c.GetRunningPodList(ctx)
if err != nil {
@@ -652,19 +717,14 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error {
}
plog.G(ctx).Infof("Listing namespace %s services...", c.OriginNamespace)
var serviceList []v1.Service
services, err := c.clientset.CoreV1().Services(c.OriginNamespace).List(ctx, metav1.ListOptions{})
if err == nil {
serviceList = append(serviceList, services.Items...)
}
c.dnsConfig = &dns.Config{
Config: relovConf,
Ns: ns,
Services: serviceList,
TunName: c.tunName,
Hosts: c.extraHost,
Lock: c.Lock,
Config: relovConf,
Ns: ns,
Services: []v1.Service{},
SvcInformer: svcInformer,
TunName: c.tunName,
Hosts: c.extraHost,
Lock: c.Lock,
HowToGetExternalName: func(domain string) (string, error) {
podList, err := c.GetRunningPodList(ctx)
if err != nil {
@@ -688,7 +748,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error {
}
plog.G(ctx).Infof("Dump service in namespace %s into hosts...", c.OriginNamespace)
// dump service in current namespace for support DNS resolve service:port
err = c.dnsConfig.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.OriginNamespace), c.extraHost...)
err = c.dnsConfig.AddServiceNameToHosts(ctx, c.extraHost...)
return err
}

View File

@@ -11,12 +11,15 @@ import (
"strings"
"unsafe"
errors2 "github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@@ -26,6 +29,7 @@ import (
"k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
func GetClusterID(ctx context.Context, client v12.ConfigMapInterface) (types.UID, error) {
@@ -259,3 +263,48 @@ func GetKubeconfigPath(factory cmdutil.Factory) (string, error) {
}
return file, nil
}
func GetNsForListPodAndSvc(ctx context.Context, clientset *kubernetes.Clientset, nsList []string) (podNs string, svcNs string, err error) {
for _, ns := range nsList {
_, err = clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{Limit: 1})
if errors.IsForbidden(err) {
continue
}
if err != nil {
return
}
podNs = ns
break
}
if err != nil {
err = errors2.Wrap(err, "can not list pod to add it to route table")
return
}
if podNs == "" {
log.G(ctx).Debugf("List all namepsace pods")
} else {
log.G(ctx).Debugf("List namepsace %s pods", podNs)
}
for _, ns := range nsList {
_, err = clientset.CoreV1().Services(ns).List(ctx, metav1.ListOptions{Limit: 1})
if errors.IsForbidden(err) {
continue
}
if err != nil {
return
}
svcNs = ns
break
}
if err != nil {
err = errors2.Wrap(err, "can not list service to add it to route table")
return
}
if svcNs == "" {
log.G(ctx).Debugf("List all namepsace services")
} else {
log.G(ctx).Debugf("List namepsace %s services", svcNs)
}
return
}

View File

@@ -1,169 +0,0 @@
package util
import (
"context"
"fmt"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
func GetNsForListPodAndSvc(ctx context.Context, clientset *kubernetes.Clientset, nsList []string) (podNs string, svcNs string, err error) {
for _, ns := range nsList {
_, err = clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{Limit: 1})
if apierrors.IsForbidden(err) {
continue
}
if err != nil {
return
}
podNs = ns
break
}
if err != nil {
err = errors.Wrap(err, "can not list pod to add it to route table")
return
}
if podNs == "" {
plog.G(ctx).Debugf("List all namepsace pods")
} else {
plog.G(ctx).Debugf("List namepsace %s pods", podNs)
}
for _, ns := range nsList {
_, err = clientset.CoreV1().Services(ns).List(ctx, metav1.ListOptions{Limit: 1})
if apierrors.IsForbidden(err) {
continue
}
if err != nil {
return
}
svcNs = ns
break
}
if err != nil {
err = errors.Wrap(err, "can not list service to add it to route table")
return
}
if svcNs == "" {
plog.G(ctx).Debugf("List all namepsace services")
} else {
plog.G(ctx).Debugf("List namepsace %s services", svcNs)
}
return
}
func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc func(ipStr ...string) error) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
serviceList, err := lister.List(ctx, opts)
if err != nil {
return err
}
var ips []string
for _, service := range serviceList.Items {
ips = append(ips, service.Spec.ClusterIP)
}
err = addRouteFunc(ips...)
if err != nil {
plog.G(ctx).Errorf("Failed to add service IP to route table: %v", err)
}
if serviceList.Continue == "" {
return nil
}
opts.Continue = serviceList.Continue
}
}
func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, routeFunc func(ipStr ...string) error) error {
defer func() {
if er := recover(); er != nil {
plog.G(ctx).Error(er)
}
}()
w, err := watcher.Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
defer w.Stop()
for {
select {
case <-ctx.Done():
return nil
case e, ok := <-w.ResultChan():
if !ok {
return errors.New("watch service chan done")
}
var svc *v1.Service
svc, ok = e.Object.(*v1.Service)
if !ok {
continue
}
_ = routeFunc(svc.Spec.ClusterIP)
}
}
}
func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(ipStr ...string) error) error {
opts := metav1.ListOptions{Limit: 100, Continue: ""}
for {
podList, err := lister.List(ctx, opts)
if err != nil {
return err
}
var ips []string
for _, pod := range podList.Items {
if pod.Spec.HostNetwork {
continue
}
ips = append(ips, pod.Status.PodIP)
}
err = addRouteFunc(ips...)
if err != nil {
plog.G(ctx).Errorf("Failed to add Pod IP to route table: %v", err)
}
if podList.Continue == "" {
return nil
}
opts.Continue = podList.Continue
}
}
func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteFunc func(ipStrList ...string) error) error {
defer func() {
if er := recover(); er != nil {
plog.G(ctx).Errorln(er)
}
}()
w, err := watcher.Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
return err
}
defer w.Stop()
for {
select {
case <-ctx.Done():
return nil
case e, ok := <-w.ResultChan():
if !ok {
return fmt.Errorf("watch pod chan done")
}
var pod *v1.Pod
pod, ok = e.Object.(*v1.Pod)
if !ok {
continue
}
if pod.Spec.HostNetwork {
continue
}
ip := pod.Status.PodIP
_ = addRouteFunc(ip)
}
}
}