From 3f7a8f07ee01a3ea3dd6e5b73c00e0dc8f2224b7 Mon Sep 17 00:00:00 2001 From: fengcaiwen Date: Tue, 21 Mar 2023 10:36:13 +0800 Subject: [PATCH] feat: use one clientset --- pkg/dns/dns.go | 18 +++- pkg/handler/connect.go | 132 +++++++++++++++++++------- pkg/webhook/dhcp.go | 20 +--- pkg/webhook/mutateadmissionwebhook.go | 15 ++- pkg/webhook/pods.go | 17 +--- 5 files changed, 135 insertions(+), 67 deletions(-) diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index f9d461f1..80e7143f 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -9,17 +9,22 @@ import ( "sort" "strings" "text/tabwriter" + "time" miekgdns "github.com/miekg/dns" "github.com/pkg/errors" v12 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" v13 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/utils/pointer" "github.com/wencaiwulue/kubevpn/pkg/util" ) @@ -78,6 +83,9 @@ func GetDNSIPFromDnsPod(clientset *kubernetes.Clientset) (ips []string, err erro } func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface) { + rateLimiter := flowcontrol.NewTokenBucketRateLimiter(0.2, 1) + defer rateLimiter.Stop() + var last string for { select { @@ -85,8 +93,13 @@ func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInte return default: func() { - w, err := serviceInterface.Watch(ctx, v1.ListOptions{}) + w, err := serviceInterface.Watch(ctx, v1.ListOptions{ + Watch: true, TimeoutSeconds: pointer.Int64(30), + }) if err != nil { + if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) { + time.Sleep(time.Second * 5) + } return } defer w.Stop() @@ -99,6 +112,9 @@ func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInte if watch.Deleted == c.Type || watch.Error == c.Type { continue } + if !rateLimiter.TryAccept() { + return + } list, err := serviceInterface.List(ctx, v1.ListOptions{}) if err != nil { return diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 6c11fc07..5098b184 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -20,11 +20,14 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/pflag" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" + utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" @@ -35,9 +38,11 @@ import ( "k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/tools/clientcmd/api/latest" clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest" + "k8s.io/client-go/util/retry" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/polymorphichelpers" "k8s.io/kubectl/pkg/scheme" + "k8s.io/utils/clock" "k8s.io/utils/pointer" "github.com/wencaiwulue/kubevpn/pkg/config" @@ -145,7 +150,10 @@ func (c *ConnectOptions) DoConnect() (err error) { if err != nil { return err } - c.addRouteDynamic(ctx) + err = c.addRouteDynamic(ctx) + if err != nil { + return err + } c.deleteFirewallRule(ctx) err = c.setupDNS() if err != nil { @@ -289,13 +297,15 @@ func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress } // Listen all pod, add route if needed -func (c *ConnectOptions) addRouteDynamic(ctx context.Context) { - r, err := netroute.New() +func (c *ConnectOptions) addRouteDynamic(ctx context.Context) (err error) { + var r routing.Router + r, err = netroute.New() if err != nil { return } - tunIface, err := tun.GetInterface() + var tunIface *net.Interface + tunIface, err = tun.GetInterface() if err != nil { return } @@ -305,16 +315,35 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) { return } // if route is right, not need add route - iface, _, _, err := r.Route(net.ParseIP(ip)) - if err == nil && tunIface.Name == iface.Name { + iface, _, _, errs := r.Route(net.ParseIP(ip)) + if errs == nil && tunIface.Name == iface.Name { return } - err = tun.AddRoutes(types.Route{Dst: net.IPNet{IP: net.ParseIP(ip), Mask: net.CIDRMask(32, 32)}}) - if err != nil { - log.Debugf("[route] add route failed, pod: %s, ip: %s,err: %v", resource, ip, err) + errs = tun.AddRoutes(types.Route{Dst: net.IPNet{IP: net.ParseIP(ip), Mask: net.CIDRMask(32, 32)}}) + if errs != nil { + log.Debugf("[route] add route failed, resource: %s, ip: %s,err: %v", resource, ip, err) } } + manager := wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, clock.RealClock{}) + + var podList *v1.PodList + podList, err = c.clientset.CoreV1().Pods(v1.NamespaceAll).List(ctx, metav1.ListOptions{TimeoutSeconds: pointer.Int64(30)}) + if err != nil { + log.Debugf("list pod failed, err: %v", err) + return + } + + for _, pod := range podList.Items { + if pod.Spec.HostNetwork { + continue + } + if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed { + continue + } + addRouteFunc(pod.Name, pod.Status.PodIP) + } + // add pod route go func() { for { @@ -328,10 +357,16 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) { log.Errorln(er) } }() - w, err := c.clientset.CoreV1().Pods(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{Watch: true, TimeoutSeconds: pointer.Int64(30)}) - if err != nil { + w, errs := c.clientset.CoreV1().Pods(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{ + Watch: true, TimeoutSeconds: pointer.Int64(30), ResourceVersion: podList.ResourceVersion, + }) + if errs != nil { + if utilnet.IsConnectionRefused(errs) || apierrors.IsTooManyRequests(errs) { + <-manager.Backoff().C() + return + } time.Sleep(time.Second * 5) - log.Debugf("wait pod failed, err: %v", err) + log.Debugf("wait pod failed, err: %v", errs) return } defer w.Stop() @@ -365,6 +400,18 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) { } }() + var serviceList *v1.ServiceList + serviceList, err = c.clientset.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{ + TimeoutSeconds: pointer.Int64(30), + }) + if err != nil { + err = fmt.Errorf("can not list service to add it to route table, err: %v", err) + return + } + for _, item := range serviceList.Items { + addRouteFunc(item.Name, item.Spec.ClusterIP) + } + // add service route go func() { for { @@ -378,9 +425,15 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) { log.Errorln(er) } }() - w, err := c.clientset.CoreV1().Services(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{Watch: true, TimeoutSeconds: pointer.Int64(30)}) - if err != nil { - log.Debugf("wait service failed, err: %v", err) + w, errs := c.clientset.CoreV1().Services(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{ + Watch: true, TimeoutSeconds: pointer.Int64(30), ResourceVersion: serviceList.ResourceVersion, + }) + if errs != nil { + if utilnet.IsConnectionRefused(errs) || apierrors.IsTooManyRequests(errs) { + <-manager.Backoff().C() + return + } + log.Debugf("wait service failed, err: %v", errs) time.Sleep(time.Second * 5) return } @@ -396,19 +449,21 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) { if e.Type != watch.Added { continue } - var pod *v1.Service - pod, ok = e.Object.(*v1.Service) + var svc *v1.Service + svc, ok = e.Object.(*v1.Service) if !ok { continue } - ip := pod.Spec.ClusterIP - addRouteFunc(pod.Name, ip) + ip := svc.Spec.ClusterIP + addRouteFunc(svc.Name, ip) } } }() } } }() + + return } func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) { @@ -759,7 +814,7 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context) (err error) { } addRouteFunc := func(resource, ip string) { - if ip == "" || net.ParseIP(ip) == nil { + if net.ParseIP(ip) == nil { return } // if route is right, not need add route @@ -775,20 +830,31 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context) (err error) { client := &miekgdns.Client{Net: "udp", SingleInflight: true, DialTimeout: time.Second * 30} for _, domain := range c.ExtraDomain { - var answer *miekgdns.Msg - answer, _, err = client.ExchangeContext(ctx, &miekgdns.Msg{ - Question: []miekgdns.Question{{ - Name: domain + ".", - Qtype: miekgdns.TypeA, - }}, - }, fmt.Sprintf("%s:%d", ips[0], 53)) + err = retry.OnError( + retry.DefaultRetry, + func(err error) bool { + return err != nil + }, + func() error { + var answer *miekgdns.Msg + answer, _, err = client.ExchangeContext(ctx, &miekgdns.Msg{ + Question: []miekgdns.Question{{ + Name: domain + ".", + Qtype: miekgdns.TypeA, + }}, + }, fmt.Sprintf("%s:%d", ips[0], 53)) + if err != nil { + return err + } + for _, rr := range answer.Answer { + if a, ok := rr.(*miekgdns.A); ok && a.A != nil { + addRouteFunc(domain, a.A.String()) + } + } + return nil + }) if err != nil { - return - } - for _, rr := range answer.Answer { - if a, ok := rr.(*miekgdns.A); ok && a.A != nil { - addRouteFunc(domain, a.A.String()) - } + return err } } return diff --git a/pkg/webhook/dhcp.go b/pkg/webhook/dhcp.go index 6216b72f..dfecfdd9 100644 --- a/pkg/webhook/dhcp.go +++ b/pkg/webhook/dhcp.go @@ -6,6 +6,7 @@ import ( "net/http" log "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" "k8s.io/kubectl/pkg/cmd/util" "github.com/wencaiwulue/kubevpn/pkg/config" @@ -13,7 +14,8 @@ import ( ) type dhcpServer struct { - f util.Factory + f util.Factory + clientset *kubernetes.Clientset } func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) { @@ -21,13 +23,7 @@ func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) { namespace := r.Header.Get("POD_NAMESPACE") log.Infof("handling rent ip request, pod name: %s, ns: %s", podName, namespace) - clientset, err := d.f.KubernetesClientSet() - if err != nil { - log.Error(err) - w.WriteHeader(http.StatusBadRequest) - return - } - cmi := clientset.CoreV1().ConfigMaps(namespace) + cmi := d.clientset.CoreV1().ConfigMaps(namespace) dhcp := handler.NewDHCPManager(cmi, namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}) random, err := dhcp.RentIPRandom() if err != nil { @@ -56,13 +52,7 @@ func (d *dhcpServer) releaseIP(w http.ResponseWriter, r *http.Request) { } log.Infof("handling release ip request, pod name: %s, ns: %s", podName, namespace) - clientset, err := d.f.KubernetesClientSet() - if err != nil { - log.Error(err) - w.WriteHeader(http.StatusBadRequest) - return - } - cmi := clientset.CoreV1().ConfigMaps(namespace) + cmi := d.clientset.CoreV1().ConfigMaps(namespace) dhcp := handler.NewDHCPManager(cmi, namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}) err = dhcp.ReleaseIpToDHCP(ipNet) if err != nil { diff --git a/pkg/webhook/mutateadmissionwebhook.go b/pkg/webhook/mutateadmissionwebhook.go index f97243a2..24fc4670 100644 --- a/pkg/webhook/mutateadmissionwebhook.go +++ b/pkg/webhook/mutateadmissionwebhook.go @@ -12,6 +12,7 @@ import ( v1 "k8s.io/api/admission/v1" "k8s.io/api/admission/v1beta1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" cmdutil "k8s.io/kubectl/pkg/cmd/util" "github.com/wencaiwulue/kubevpn/pkg/config" @@ -19,7 +20,8 @@ import ( // admissionReviewHandler is a handler to handle business logic, holding an util.Factory type admissionReviewHandler struct { - f cmdutil.Factory + f cmdutil.Factory + clientset *kubernetes.Clientset } // admitv1beta1Func handles a v1beta1 admission @@ -122,12 +124,19 @@ func serve(w http.ResponseWriter, r *http.Request, admit admitHandler) { } func Main(f cmdutil.Factory) error { - h := &admissionReviewHandler{f: f} + clientset, err2 := f.KubernetesClientSet() + if err2 != nil { + return err2 + } + h := &admissionReviewHandler{f: f, clientset: clientset} + http.HandleFunc("/pods", func(w http.ResponseWriter, r *http.Request) { serve(w, r, newDelegateToV1AdmitHandler(h.admitPods)) }) http.HandleFunc("/readyz", func(w http.ResponseWriter, req *http.Request) { w.Write([]byte("ok")) }) - s := dhcpServer{f: f} + + s := &dhcpServer{f: f, clientset: clientset} http.HandleFunc(config.APIRentIP, s.rentIP) http.HandleFunc(config.APIReleaseIP, s.releaseIP) + cert, ok := os.LookupEnv(config.TLSCertKey) if !ok { return fmt.Errorf("can not get %s from env", config.TLSCertKey) diff --git a/pkg/webhook/pods.go b/pkg/webhook/pods.go index 7b2d99d2..a30e3998 100644 --- a/pkg/webhook/pods.go +++ b/pkg/webhook/pods.go @@ -11,7 +11,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/kubectl/pkg/cmd/util/podcmd" "github.com/wencaiwulue/kubevpn/pkg/config" @@ -51,13 +50,7 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR pair := pod.Spec.Containers[i].Env[j] if pair.Name == config.EnvInboundPodTunIP && pair.Value == "" { found = true - var clientset *kubernetes.Clientset - clientset, err = h.f.KubernetesClientSet() - if err != nil { - log.Errorf("can not get clientset, err: %v", err) - return toV1AdmissionResponse(err) - } - cmi := clientset.CoreV1().ConfigMaps(ar.Request.Namespace) + cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace) dhcp := handler.NewDHCPManager(cmi, ar.Request.Namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}) var random *net.IPNet random, err = dhcp.RentIPRandom() @@ -123,13 +116,7 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR if envVar.Name == config.EnvInboundPodTunIP && envVar.Value != "" { ip, cidr, err := net.ParseCIDR(envVar.Value) if err == nil { - var clientset *kubernetes.Clientset - clientset, err = h.f.KubernetesClientSet() - if err != nil { - log.Errorf("can not get clientset, err: %v", err) - return toV1AdmissionResponse(err) - } - cmi := clientset.CoreV1().ConfigMaps(ar.Request.Namespace) + cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace) ipnet := &net.IPNet{ IP: ip, Mask: cidr.Mask,