diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 4f61ff30..b2e68e81 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -17,12 +17,8 @@ import ( miekgdns "github.com/miekg/dns" "github.com/pkg/errors" v12 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1" - utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/watch" - v13 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" "tailscale.com/net/dns" "github.com/wencaiwulue/kubevpn/v2/pkg/config" @@ -31,10 +27,11 @@ import ( ) type Config struct { - Config *miekgdns.ClientConfig - Ns []string - Services []v12.Service - TunName string + Config *miekgdns.ClientConfig + Ns []string + Services []v12.Service + SvcInformer cache.SharedIndexInformer + TunName string Hosts []Entry Lock *sync.Mutex @@ -45,22 +42,8 @@ type Config struct { OSConfigurator dns.OSConfigurator } -func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts ...Entry) error { +func (c *Config) AddServiceNameToHosts(ctx context.Context, hosts ...Entry) error { var serviceList []v12.Service - //listOptions := v1.ListOptions{Limit: 100} - //for { - // services, err := serviceInterface.List(ctx, listOptions) - // if err != nil { - // break - // } - // serviceList = append(serviceList, services.Items...) - // if services.Continue != "" { - // listOptions.Continue = services.Continue - // } else { - // break - // } - //} - c.Lock.Lock() defer c.Lock.Unlock() @@ -71,100 +54,65 @@ func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13 return err } - go c.watchServiceToAddHosts(ctx, serviceInterface, hosts) + go c.watchServiceToAddHosts(ctx, hosts) return nil } -func (c *Config) watchServiceToAddHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts []Entry) { +func (c *Config) watchServiceToAddHosts(ctx context.Context, hosts []Entry) { defer util.HandleCrash() ticker := time.NewTicker(time.Second * 15) defer ticker.Stop() - immediate := make(chan struct{}, 1) - immediate <- struct{}{} - var ErrChanDone = errors.New("watch service chan done") - for ctx.Err() == nil { - err := func() error { - w, err := serviceInterface.Watch(ctx, v1.ListOptions{Watch: true}) - if err != nil { - return err + _, err := c.SvcInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + if svc, ok := obj.(*v12.Service); ok && svc.Namespace == c.Ns[0] { + return true + } else { + return false } - defer w.Stop() - for { - select { - case <-ctx.Done(): - return ctx.Err() - case event, ok := <-w.ResultChan(): - if !ok { - return ErrChanDone - } - svc, ok := event.Object.(*v12.Service) - if !ok { - continue - } - if ctx.Err() != nil { - return ctx.Err() - } - if event.Type == watch.Deleted { - if net.ParseIP(svc.Spec.ClusterIP) == nil { - continue - } - var list = []Entry{{ - IP: svc.Spec.ClusterIP, - Domain: svc.Name, - }} - err = c.removeHosts(list) - if err != nil { - plog.G(ctx).Errorf("Failed to remove hosts(%s) to hosts: %v", entryList2String(list), err) - } - } - if event.Type == watch.Added { - c.Lock.Lock() - appendHosts := c.generateAppendHosts([]v12.Service{*svc}, hosts) - err = c.appendHosts(appendHosts) - c.Lock.Unlock() - if err != nil { - plog.G(ctx).Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err) - } - } - case <-ticker.C: - var list *v12.ServiceList - list, err = serviceInterface.List(ctx, v1.ListOptions{}) - if err != nil { - continue - } - c.Lock.Lock() - appendHosts := c.generateAppendHosts(list.Items, hosts) - err = c.appendHosts(appendHosts) - c.Lock.Unlock() - if err != nil { - plog.G(ctx).Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err) - } - case <-immediate: - var list *v12.ServiceList - list, err = serviceInterface.List(ctx, v1.ListOptions{}) - if err != nil { - continue - } - c.Lock.Lock() - appendHosts := c.generateAppendHosts(list.Items, hosts) - err = c.appendHosts(appendHosts) - c.Lock.Unlock() - if err != nil { - plog.G(ctx).Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err) - } - } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ticker.Reset(time.Second * 3) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + ticker.Reset(time.Second * 3) + }, + DeleteFunc: func(obj interface{}) { + ticker.Reset(time.Second * 3) + }, + }, + }) + if err != nil { + plog.G(ctx).Errorf("Failed to add service event handler: %v", err) + return + } + for ; ctx.Err() == nil; <-ticker.C { + ticker.Reset(time.Second * 15) + serviceList, err := c.SvcInformer.GetIndexer().ByIndex(cache.NamespaceIndex, c.Ns[0]) + if err != nil { + plog.G(ctx).Errorf("Failed to list service by namespace %s: %v", c.Ns[0], err) + continue + } + var services []v12.Service + for _, service := range serviceList { + svc, ok := service.(*v12.Service) + if !ok { + continue } - }() + services = append(services, *svc) + } + if len(services) == 0 { + continue + } if ctx.Err() != nil { return } - if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, ErrChanDone) { - plog.G(ctx).Debugf("Failed to watch service to add route table: %v", err) - } - if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) { - time.Sleep(time.Second * 1) - } else { - time.Sleep(time.Millisecond * 200) + c.Lock.Lock() + appendHosts := c.generateAppendHosts(services, hosts) + err = c.appendHosts(appendHosts) + c.Lock.Unlock() + if err != nil && !errors.Is(err, context.Canceled) { + plog.G(ctx).Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err) } } } diff --git a/pkg/dns/dns_unix.go b/pkg/dns/dns_unix.go index 7eb02971..769150f6 100644 --- a/pkg/dns/dns_unix.go +++ b/pkg/dns/dns_unix.go @@ -6,10 +6,10 @@ import ( "bytes" "context" "fmt" - plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "os" "os/exec" "path/filepath" + "slices" "strings" "time" @@ -17,6 +17,10 @@ import ( miekgdns "github.com/miekg/dns" v12 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" + "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) // https://github.com/golang/go/issues/12524 @@ -32,6 +36,59 @@ var resolv = "/etc/resolv.conf" // service.namespace.svc.cluster:port // service.namespace.svc.cluster.local:port func (c *Config) SetupDNS(ctx context.Context) error { + defer util.HandleCrash() + ticker := time.NewTicker(time.Second * 15) + _, err := c.SvcInformer.AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + if svc, ok := obj.(*v12.Service); ok && svc.Namespace == c.Ns[0] { + return true + } else { + return false + } + }, + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + ticker.Reset(time.Second * 3) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + ticker.Reset(time.Second * 3) + }, + DeleteFunc: func(obj interface{}) { + ticker.Reset(time.Second * 3) + }, + }, + }) + if err != nil { + plog.G(ctx).Errorf("Failed to add service event handler: %v", err) + return err + } + go func() { + defer ticker.Stop() + for ; ctx.Err() == nil; <-ticker.C { + ticker.Reset(time.Second * 15) + serviceList, err := c.SvcInformer.GetIndexer().ByIndex(cache.NamespaceIndex, c.Ns[0]) + if err != nil { + plog.G(ctx).Errorf("Failed to list service by namespace %s: %v", c.Ns[0], err) + continue + } + var services []v12.Service + for _, service := range serviceList { + svc, ok := service.(*v12.Service) + if !ok { + continue + } + services = append(services, *svc) + } + if len(services) == 0 { + continue + } + if ctx.Err() != nil { + return + } + c.Services = services + c.usingResolver(ctx) + } + }() c.usingResolver(ctx) return nil } @@ -72,6 +129,9 @@ func (c *Config) usingResolver(ctx context.Context) { plog.G(ctx).Errorf("Parse resolver %s error: %v", filename, err) continue } + if slices.Contains(conf.Servers, clientConfig.Servers[0]) { + continue + } // insert current name server to first location conf.Servers = append([]string{clientConfig.Servers[0]}, conf.Servers...) err = os.WriteFile(filename, []byte(toString(*conf)), 0644) diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index d8b3df27..8a790658 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -32,15 +32,16 @@ import ( "k8s.io/apimachinery/pkg/labels" pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgtypes "k8s.io/apimachinery/pkg/types" - utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/resource" runtimeresource "k8s.io/cli-runtime/pkg/resource" + informerv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" v2 "k8s.io/client-go/kubernetes/typed/networking/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" "k8s.io/kubectl/pkg/cmd/set" cmdutil "k8s.io/kubectl/pkg/cmd/util" @@ -280,13 +281,14 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <- return } plog.G(ctx).Infof("Adding Pod IP and Service IP to route table...") - if err = c.addRouteDynamic(c.ctx); err != nil { + var svcInformer cache.SharedIndexInformer + if svcInformer, _, err = c.addRouteDynamic(c.ctx); err != nil { plog.G(ctx).Errorf("Add route dynamic failed: %v", err) return } go c.deleteFirewallRule(c.ctx) plog.G(ctx).Infof("Configuring DNS service...") - if err = c.setupDNS(c.ctx); err != nil { + if err = c.setupDNS(c.ctx, svcInformer); err != nil { plog.G(ctx).Errorf("Configure DNS failed: %v", err) return } @@ -490,57 +492,116 @@ func (c *ConnectOptions) startLocalTunServer(ctx context.Context, forwardAddress } // Listen all pod, add route if needed -func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error { - podNs, svcNs, err1 := util.GetNsForListPodAndSvc(ctx, c.clientset, []string{v1.NamespaceAll, c.OriginNamespace}) - if err1 != nil { - return err1 +func (c *ConnectOptions) addRouteDynamic(ctx context.Context) (cache.SharedIndexInformer, cache.SharedIndexInformer, error) { + podNs, svcNs, err := util.GetNsForListPodAndSvc(ctx, c.clientset, []string{v1.NamespaceAll, c.OriginNamespace}) + if err != nil { + return nil, nil, err } + conf := rest.CopyConfig(c.config) + conf.QPS = 1 + conf.Burst = 2 + clientSet, err := kubernetes.NewForConfig(conf) + if err != nil { + plog.G(ctx).Errorf("Failed to create clientset: %v", err) + return nil, nil, err + } + svcIndexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} + svcInformer := informerv1.NewServiceInformer(clientSet, svcNs, 0, svcIndexers) + svcTicker := time.NewTicker(time.Second * 15) + _, err = svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + svcTicker.Reset(time.Second * 3) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + svcTicker.Reset(time.Second * 3) + }, + DeleteFunc: func(obj interface{}) { + svcTicker.Reset(time.Second * 3) + }, + }) + if err != nil { + plog.G(ctx).Errorf("Failed to add service event handler: %v", err) + return nil, nil, err + } + + go svcInformer.Run(ctx.Done()) go func() { - var listDone bool - for ctx.Err() == nil { - err := func() error { - if !listDone { - err := util.ListService(ctx, c.clientset.CoreV1().Services(svcNs), c.addRoute) - if err != nil { - return err - } - listDone = true + defer svcTicker.Stop() + for ; ctx.Err() == nil; <-svcTicker.C { + svcTicker.Reset(time.Second * 15) + serviceList := svcInformer.GetIndexer().List() + var ips = sets.New[string]() + for _, service := range serviceList { + svc, ok := service.(*v1.Service) + if !ok { + continue } - err := util.WatchServiceToAddRoute(ctx, c.clientset.CoreV1().Services(svcNs), c.addRoute) - return err - }() - if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) { - time.Sleep(time.Second * 10) - } else { - time.Sleep(time.Second * 2) + ips.Insert(svc.Spec.ClusterIP) + ips.Insert(svc.Spec.ClusterIPs...) + } + if ctx.Err() != nil { + return + } + if ips.Len() == 0 { + continue + } + err := c.addRoute(ips.UnsortedList()...) + if err != nil { + plog.G(ctx).Debugf("Add service IP to route table failed: %v", err) } } }() + podIndexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} + podInformer := informerv1.NewPodInformer(clientSet, podNs, 0, podIndexers) + podTicker := time.NewTicker(time.Second * 15) + _, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + podTicker.Reset(time.Second * 3) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + podTicker.Reset(time.Second * 3) + }, + DeleteFunc: func(obj interface{}) { + podTicker.Reset(time.Second * 3) + }, + }) + if err != nil { + plog.G(ctx).Errorf("Failed to add service event handler: %v", err) + return nil, nil, err + } + go podInformer.Run(ctx.Done()) go func() { - var listDone bool - for ctx.Err() == nil { - err := func() error { - if !listDone { - err := util.ListPod(ctx, c.clientset.CoreV1().Pods(podNs), c.addRoute) - if err != nil { - return err - } - listDone = true + defer podTicker.Stop() + for ; ctx.Err() == nil; <-podTicker.C { + podTicker.Reset(time.Second * 15) + podList := podInformer.GetIndexer().List() + var ips = sets.New[string]() + for _, pod := range podList { + p, ok := pod.(*v1.Pod) + if !ok { + continue } - err := util.WatchPodToAddRoute(ctx, c.clientset.CoreV1().Pods(podNs), c.addRoute) - return err - }() - if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) { - time.Sleep(time.Second * 10) - } else { - time.Sleep(time.Second * 2) + if p.Spec.HostNetwork { + continue + } + ips.Insert(util.GetPodIP(*p)...) + } + if ctx.Err() != nil { + return + } + if ips.Len() == 0 { + continue + } + err := c.addRoute(ips.UnsortedList()...) + if err != nil { + plog.G(ctx).Debugf("Add pod IP to route table failed: %v", err) } } }() - return nil + return svcInformer, podInformer, nil } func (c *ConnectOptions) addRoute(ipStrList ...string) error { @@ -548,6 +609,7 @@ func (c *ConnectOptions) addRoute(ipStrList ...string) error { return nil } var routes []types.Route + r, _ := netroute.New() for _, ipStr := range ipStrList { ip := net.ParseIP(ipStr) if ip == nil { @@ -570,7 +632,7 @@ func (c *ConnectOptions) addRoute(ipStrList ...string) error { } else { mask = net.CIDRMask(128, 128) } - if r, err := netroute.New(); err == nil { + if r != nil { ifi, _, _, err := r.Route(ip) if err == nil && ifi.Name == c.tunName { continue @@ -578,6 +640,9 @@ func (c *ConnectOptions) addRoute(ipStrList ...string) error { } routes = append(routes, types.Route{Dst: net.IPNet{IP: ip, Mask: mask}}) } + if len(routes) == 0 { + return nil + } err := tun.AddRoutes(c.tunName, routes...) return err } @@ -600,7 +665,7 @@ func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) { util.DeleteBlockFirewallRule(ctx) } -func (c *ConnectOptions) setupDNS(ctx context.Context) error { +func (c *ConnectOptions) setupDNS(ctx context.Context, svcInformer cache.SharedIndexInformer) error { const portTCP = 10800 podList, err := c.GetRunningPodList(ctx) if err != nil { @@ -652,19 +717,14 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error { } plog.G(ctx).Infof("Listing namespace %s services...", c.OriginNamespace) - var serviceList []v1.Service - services, err := c.clientset.CoreV1().Services(c.OriginNamespace).List(ctx, metav1.ListOptions{}) - if err == nil { - serviceList = append(serviceList, services.Items...) - } - c.dnsConfig = &dns.Config{ - Config: relovConf, - Ns: ns, - Services: serviceList, - TunName: c.tunName, - Hosts: c.extraHost, - Lock: c.Lock, + Config: relovConf, + Ns: ns, + Services: []v1.Service{}, + SvcInformer: svcInformer, + TunName: c.tunName, + Hosts: c.extraHost, + Lock: c.Lock, HowToGetExternalName: func(domain string) (string, error) { podList, err := c.GetRunningPodList(ctx) if err != nil { @@ -688,7 +748,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error { } plog.G(ctx).Infof("Dump service in namespace %s into hosts...", c.OriginNamespace) // dump service in current namespace for support DNS resolve service:port - err = c.dnsConfig.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.OriginNamespace), c.extraHost...) + err = c.dnsConfig.AddServiceNameToHosts(ctx, c.extraHost...) return err } diff --git a/pkg/util/ns.go b/pkg/util/ns.go index 9b477c51..25869b84 100644 --- a/pkg/util/ns.go +++ b/pkg/util/ns.go @@ -11,12 +11,15 @@ import ( "strings" "unsafe" + errors2 "github.com/pkg/errors" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/kubernetes" v12 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -26,6 +29,7 @@ import ( "k8s.io/utils/pointer" "github.com/wencaiwulue/kubevpn/v2/pkg/config" + "github.com/wencaiwulue/kubevpn/v2/pkg/log" ) func GetClusterID(ctx context.Context, client v12.ConfigMapInterface) (types.UID, error) { @@ -259,3 +263,48 @@ func GetKubeconfigPath(factory cmdutil.Factory) (string, error) { } return file, nil } + +func GetNsForListPodAndSvc(ctx context.Context, clientset *kubernetes.Clientset, nsList []string) (podNs string, svcNs string, err error) { + for _, ns := range nsList { + _, err = clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{Limit: 1}) + if errors.IsForbidden(err) { + continue + } + if err != nil { + return + } + podNs = ns + break + } + if err != nil { + err = errors2.Wrap(err, "can not list pod to add it to route table") + return + } + if podNs == "" { + log.G(ctx).Debugf("List all namepsace pods") + } else { + log.G(ctx).Debugf("List namepsace %s pods", podNs) + } + + for _, ns := range nsList { + _, err = clientset.CoreV1().Services(ns).List(ctx, metav1.ListOptions{Limit: 1}) + if errors.IsForbidden(err) { + continue + } + if err != nil { + return + } + svcNs = ns + break + } + if err != nil { + err = errors2.Wrap(err, "can not list service to add it to route table") + return + } + if svcNs == "" { + log.G(ctx).Debugf("List all namepsace services") + } else { + log.G(ctx).Debugf("List namepsace %s services", svcNs) + } + return +} diff --git a/pkg/util/route.go b/pkg/util/route.go deleted file mode 100644 index 2a4a34b5..00000000 --- a/pkg/util/route.go +++ /dev/null @@ -1,169 +0,0 @@ -package util - -import ( - "context" - "fmt" - - "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - v12 "k8s.io/client-go/kubernetes/typed/core/v1" - - plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" -) - -func GetNsForListPodAndSvc(ctx context.Context, clientset *kubernetes.Clientset, nsList []string) (podNs string, svcNs string, err error) { - for _, ns := range nsList { - _, err = clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{Limit: 1}) - if apierrors.IsForbidden(err) { - continue - } - if err != nil { - return - } - podNs = ns - break - } - if err != nil { - err = errors.Wrap(err, "can not list pod to add it to route table") - return - } - if podNs == "" { - plog.G(ctx).Debugf("List all namepsace pods") - } else { - plog.G(ctx).Debugf("List namepsace %s pods", podNs) - } - - for _, ns := range nsList { - _, err = clientset.CoreV1().Services(ns).List(ctx, metav1.ListOptions{Limit: 1}) - if apierrors.IsForbidden(err) { - continue - } - if err != nil { - return - } - svcNs = ns - break - } - if err != nil { - err = errors.Wrap(err, "can not list service to add it to route table") - return - } - if svcNs == "" { - plog.G(ctx).Debugf("List all namepsace services") - } else { - plog.G(ctx).Debugf("List namepsace %s services", svcNs) - } - return -} - -func ListService(ctx context.Context, lister v12.ServiceInterface, addRouteFunc func(ipStr ...string) error) error { - opts := metav1.ListOptions{Limit: 100, Continue: ""} - for { - serviceList, err := lister.List(ctx, opts) - if err != nil { - return err - } - var ips []string - for _, service := range serviceList.Items { - ips = append(ips, service.Spec.ClusterIP) - } - err = addRouteFunc(ips...) - if err != nil { - plog.G(ctx).Errorf("Failed to add service IP to route table: %v", err) - } - if serviceList.Continue == "" { - return nil - } - opts.Continue = serviceList.Continue - } -} - -func WatchServiceToAddRoute(ctx context.Context, watcher v12.ServiceInterface, routeFunc func(ipStr ...string) error) error { - defer func() { - if er := recover(); er != nil { - plog.G(ctx).Error(er) - } - }() - w, err := watcher.Watch(ctx, metav1.ListOptions{Watch: true}) - if err != nil { - return err - } - defer w.Stop() - for { - select { - case <-ctx.Done(): - return nil - case e, ok := <-w.ResultChan(): - if !ok { - return errors.New("watch service chan done") - } - var svc *v1.Service - svc, ok = e.Object.(*v1.Service) - if !ok { - continue - } - _ = routeFunc(svc.Spec.ClusterIP) - } - } -} - -func ListPod(ctx context.Context, lister v12.PodInterface, addRouteFunc func(ipStr ...string) error) error { - opts := metav1.ListOptions{Limit: 100, Continue: ""} - for { - podList, err := lister.List(ctx, opts) - if err != nil { - return err - } - var ips []string - for _, pod := range podList.Items { - if pod.Spec.HostNetwork { - continue - } - ips = append(ips, pod.Status.PodIP) - } - err = addRouteFunc(ips...) - if err != nil { - plog.G(ctx).Errorf("Failed to add Pod IP to route table: %v", err) - } - if podList.Continue == "" { - return nil - } - opts.Continue = podList.Continue - } -} - -func WatchPodToAddRoute(ctx context.Context, watcher v12.PodInterface, addRouteFunc func(ipStrList ...string) error) error { - defer func() { - if er := recover(); er != nil { - plog.G(ctx).Errorln(er) - } - }() - w, err := watcher.Watch(ctx, metav1.ListOptions{Watch: true}) - if err != nil { - return err - } - defer w.Stop() - for { - select { - case <-ctx.Done(): - return nil - case e, ok := <-w.ResultChan(): - if !ok { - return fmt.Errorf("watch pod chan done") - } - var pod *v1.Pod - pod, ok = e.Object.(*v1.Pod) - if !ok { - continue - } - if pod.Spec.HostNetwork { - continue - } - ip := pod.Status.PodIP - _ = addRouteFunc(ip) - } - } -}