feat: use one clientset

This commit is contained in:
fengcaiwen
2023-03-21 10:36:13 +08:00
parent feabc95ee8
commit 3f7a8f07ee
5 changed files with 135 additions and 67 deletions

View File

@@ -9,17 +9,22 @@ import (
"sort" "sort"
"strings" "strings"
"text/tabwriter" "text/tabwriter"
"time"
miekgdns "github.com/miekg/dns" miekgdns "github.com/miekg/dns"
"github.com/pkg/errors" "github.com/pkg/errors"
v12 "k8s.io/api/core/v1" v12 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
v13 "k8s.io/client-go/kubernetes/typed/core/v1" v13 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/pkg/util" "github.com/wencaiwulue/kubevpn/pkg/util"
) )
@@ -78,6 +83,9 @@ func GetDNSIPFromDnsPod(clientset *kubernetes.Clientset) (ips []string, err erro
} }
func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface) { func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface) {
rateLimiter := flowcontrol.NewTokenBucketRateLimiter(0.2, 1)
defer rateLimiter.Stop()
var last string var last string
for { for {
select { select {
@@ -85,8 +93,13 @@ func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInte
return return
default: default:
func() { func() {
w, err := serviceInterface.Watch(ctx, v1.ListOptions{}) w, err := serviceInterface.Watch(ctx, v1.ListOptions{
Watch: true, TimeoutSeconds: pointer.Int64(30),
})
if err != nil { if err != nil {
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
time.Sleep(time.Second * 5)
}
return return
} }
defer w.Stop() defer w.Stop()
@@ -99,6 +112,9 @@ func AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInte
if watch.Deleted == c.Type || watch.Error == c.Type { if watch.Deleted == c.Type || watch.Error == c.Type {
continue continue
} }
if !rateLimiter.TryAccept() {
return
}
list, err := serviceInterface.List(ctx, v1.ListOptions{}) list, err := serviceInterface.List(ctx, v1.ListOptions{})
if err != nil { if err != nil {
return return

View File

@@ -20,11 +20,14 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/pflag" "github.com/spf13/pflag"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource" "k8s.io/cli-runtime/pkg/resource"
@@ -35,9 +38,11 @@ import (
"k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/clientcmd/api/latest" "k8s.io/client-go/tools/clientcmd/api/latest"
clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest" clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"
"k8s.io/client-go/util/retry"
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/polymorphichelpers" "k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/scheme" "k8s.io/kubectl/pkg/scheme"
"k8s.io/utils/clock"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/config"
@@ -145,7 +150,10 @@ func (c *ConnectOptions) DoConnect() (err error) {
if err != nil { if err != nil {
return err return err
} }
c.addRouteDynamic(ctx) err = c.addRouteDynamic(ctx)
if err != nil {
return err
}
c.deleteFirewallRule(ctx) c.deleteFirewallRule(ctx)
err = c.setupDNS() err = c.setupDNS()
if err != nil { if err != nil {
@@ -289,13 +297,15 @@ func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress
} }
// Listen all pod, add route if needed // Listen all pod, add route if needed
func (c *ConnectOptions) addRouteDynamic(ctx context.Context) { func (c *ConnectOptions) addRouteDynamic(ctx context.Context) (err error) {
r, err := netroute.New() var r routing.Router
r, err = netroute.New()
if err != nil { if err != nil {
return return
} }
tunIface, err := tun.GetInterface() var tunIface *net.Interface
tunIface, err = tun.GetInterface()
if err != nil { if err != nil {
return return
} }
@@ -305,16 +315,35 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) {
return return
} }
// if route is right, not need add route // if route is right, not need add route
iface, _, _, err := r.Route(net.ParseIP(ip)) iface, _, _, errs := r.Route(net.ParseIP(ip))
if err == nil && tunIface.Name == iface.Name { if errs == nil && tunIface.Name == iface.Name {
return return
} }
err = tun.AddRoutes(types.Route{Dst: net.IPNet{IP: net.ParseIP(ip), Mask: net.CIDRMask(32, 32)}}) errs = tun.AddRoutes(types.Route{Dst: net.IPNet{IP: net.ParseIP(ip), Mask: net.CIDRMask(32, 32)}})
if err != nil { if errs != nil {
log.Debugf("[route] add route failed, pod: %s, ip: %s,err: %v", resource, ip, err) log.Debugf("[route] add route failed, resource: %s, ip: %s,err: %v", resource, ip, err)
} }
} }
manager := wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, clock.RealClock{})
var podList *v1.PodList
podList, err = c.clientset.CoreV1().Pods(v1.NamespaceAll).List(ctx, metav1.ListOptions{TimeoutSeconds: pointer.Int64(30)})
if err != nil {
log.Debugf("list pod failed, err: %v", err)
return
}
for _, pod := range podList.Items {
if pod.Spec.HostNetwork {
continue
}
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
continue
}
addRouteFunc(pod.Name, pod.Status.PodIP)
}
// add pod route // add pod route
go func() { go func() {
for { for {
@@ -328,10 +357,16 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) {
log.Errorln(er) log.Errorln(er)
} }
}() }()
w, err := c.clientset.CoreV1().Pods(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{Watch: true, TimeoutSeconds: pointer.Int64(30)}) w, errs := c.clientset.CoreV1().Pods(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{
if err != nil { Watch: true, TimeoutSeconds: pointer.Int64(30), ResourceVersion: podList.ResourceVersion,
})
if errs != nil {
if utilnet.IsConnectionRefused(errs) || apierrors.IsTooManyRequests(errs) {
<-manager.Backoff().C()
return
}
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
log.Debugf("wait pod failed, err: %v", err) log.Debugf("wait pod failed, err: %v", errs)
return return
} }
defer w.Stop() defer w.Stop()
@@ -365,6 +400,18 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) {
} }
}() }()
var serviceList *v1.ServiceList
serviceList, err = c.clientset.CoreV1().Services(v1.NamespaceAll).List(ctx, metav1.ListOptions{
TimeoutSeconds: pointer.Int64(30),
})
if err != nil {
err = fmt.Errorf("can not list service to add it to route table, err: %v", err)
return
}
for _, item := range serviceList.Items {
addRouteFunc(item.Name, item.Spec.ClusterIP)
}
// add service route // add service route
go func() { go func() {
for { for {
@@ -378,9 +425,15 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) {
log.Errorln(er) log.Errorln(er)
} }
}() }()
w, err := c.clientset.CoreV1().Services(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{Watch: true, TimeoutSeconds: pointer.Int64(30)}) w, errs := c.clientset.CoreV1().Services(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{
if err != nil { Watch: true, TimeoutSeconds: pointer.Int64(30), ResourceVersion: serviceList.ResourceVersion,
log.Debugf("wait service failed, err: %v", err) })
if errs != nil {
if utilnet.IsConnectionRefused(errs) || apierrors.IsTooManyRequests(errs) {
<-manager.Backoff().C()
return
}
log.Debugf("wait service failed, err: %v", errs)
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
return return
} }
@@ -396,19 +449,21 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) {
if e.Type != watch.Added { if e.Type != watch.Added {
continue continue
} }
var pod *v1.Service var svc *v1.Service
pod, ok = e.Object.(*v1.Service) svc, ok = e.Object.(*v1.Service)
if !ok { if !ok {
continue continue
} }
ip := pod.Spec.ClusterIP ip := svc.Spec.ClusterIP
addRouteFunc(pod.Name, ip) addRouteFunc(svc.Name, ip)
} }
} }
}() }()
} }
} }
}() }()
return
} }
func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) { func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) {
@@ -759,7 +814,7 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context) (err error) {
} }
addRouteFunc := func(resource, ip string) { addRouteFunc := func(resource, ip string) {
if ip == "" || net.ParseIP(ip) == nil { if net.ParseIP(ip) == nil {
return return
} }
// if route is right, not need add route // if route is right, not need add route
@@ -775,6 +830,12 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context) (err error) {
client := &miekgdns.Client{Net: "udp", SingleInflight: true, DialTimeout: time.Second * 30} client := &miekgdns.Client{Net: "udp", SingleInflight: true, DialTimeout: time.Second * 30}
for _, domain := range c.ExtraDomain { for _, domain := range c.ExtraDomain {
err = retry.OnError(
retry.DefaultRetry,
func(err error) bool {
return err != nil
},
func() error {
var answer *miekgdns.Msg var answer *miekgdns.Msg
answer, _, err = client.ExchangeContext(ctx, &miekgdns.Msg{ answer, _, err = client.ExchangeContext(ctx, &miekgdns.Msg{
Question: []miekgdns.Question{{ Question: []miekgdns.Question{{
@@ -783,13 +844,18 @@ func (c *ConnectOptions) addExtraRoute(ctx context.Context) (err error) {
}}, }},
}, fmt.Sprintf("%s:%d", ips[0], 53)) }, fmt.Sprintf("%s:%d", ips[0], 53))
if err != nil { if err != nil {
return return err
} }
for _, rr := range answer.Answer { for _, rr := range answer.Answer {
if a, ok := rr.(*miekgdns.A); ok && a.A != nil { if a, ok := rr.(*miekgdns.A); ok && a.A != nil {
addRouteFunc(domain, a.A.String()) addRouteFunc(domain, a.A.String())
} }
} }
return nil
})
if err != nil {
return err
}
} }
return return
} }

View File

@@ -6,6 +6,7 @@ import (
"net/http" "net/http"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/cmd/util"
"github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/config"
@@ -14,6 +15,7 @@ import (
type dhcpServer struct { type dhcpServer struct {
f util.Factory f util.Factory
clientset *kubernetes.Clientset
} }
func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) { func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) {
@@ -21,13 +23,7 @@ func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) {
namespace := r.Header.Get("POD_NAMESPACE") namespace := r.Header.Get("POD_NAMESPACE")
log.Infof("handling rent ip request, pod name: %s, ns: %s", podName, namespace) log.Infof("handling rent ip request, pod name: %s, ns: %s", podName, namespace)
clientset, err := d.f.KubernetesClientSet() cmi := d.clientset.CoreV1().ConfigMaps(namespace)
if err != nil {
log.Error(err)
w.WriteHeader(http.StatusBadRequest)
return
}
cmi := clientset.CoreV1().ConfigMaps(namespace)
dhcp := handler.NewDHCPManager(cmi, namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}) dhcp := handler.NewDHCPManager(cmi, namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask})
random, err := dhcp.RentIPRandom() random, err := dhcp.RentIPRandom()
if err != nil { if err != nil {
@@ -56,13 +52,7 @@ func (d *dhcpServer) releaseIP(w http.ResponseWriter, r *http.Request) {
} }
log.Infof("handling release ip request, pod name: %s, ns: %s", podName, namespace) log.Infof("handling release ip request, pod name: %s, ns: %s", podName, namespace)
clientset, err := d.f.KubernetesClientSet() cmi := d.clientset.CoreV1().ConfigMaps(namespace)
if err != nil {
log.Error(err)
w.WriteHeader(http.StatusBadRequest)
return
}
cmi := clientset.CoreV1().ConfigMaps(namespace)
dhcp := handler.NewDHCPManager(cmi, namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}) dhcp := handler.NewDHCPManager(cmi, namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask})
err = dhcp.ReleaseIpToDHCP(ipNet) err = dhcp.ReleaseIpToDHCP(ipNet)
if err != nil { if err != nil {

View File

@@ -12,6 +12,7 @@ import (
v1 "k8s.io/api/admission/v1" v1 "k8s.io/api/admission/v1"
"k8s.io/api/admission/v1beta1" "k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
"github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/config"
@@ -20,6 +21,7 @@ import (
// admissionReviewHandler is a handler to handle business logic, holding an util.Factory // admissionReviewHandler is a handler to handle business logic, holding an util.Factory
type admissionReviewHandler struct { type admissionReviewHandler struct {
f cmdutil.Factory f cmdutil.Factory
clientset *kubernetes.Clientset
} }
// admitv1beta1Func handles a v1beta1 admission // admitv1beta1Func handles a v1beta1 admission
@@ -122,12 +124,19 @@ func serve(w http.ResponseWriter, r *http.Request, admit admitHandler) {
} }
func Main(f cmdutil.Factory) error { func Main(f cmdutil.Factory) error {
h := &admissionReviewHandler{f: f} clientset, err2 := f.KubernetesClientSet()
if err2 != nil {
return err2
}
h := &admissionReviewHandler{f: f, clientset: clientset}
http.HandleFunc("/pods", func(w http.ResponseWriter, r *http.Request) { serve(w, r, newDelegateToV1AdmitHandler(h.admitPods)) }) http.HandleFunc("/pods", func(w http.ResponseWriter, r *http.Request) { serve(w, r, newDelegateToV1AdmitHandler(h.admitPods)) })
http.HandleFunc("/readyz", func(w http.ResponseWriter, req *http.Request) { w.Write([]byte("ok")) }) http.HandleFunc("/readyz", func(w http.ResponseWriter, req *http.Request) { w.Write([]byte("ok")) })
s := dhcpServer{f: f}
s := &dhcpServer{f: f, clientset: clientset}
http.HandleFunc(config.APIRentIP, s.rentIP) http.HandleFunc(config.APIRentIP, s.rentIP)
http.HandleFunc(config.APIReleaseIP, s.releaseIP) http.HandleFunc(config.APIReleaseIP, s.releaseIP)
cert, ok := os.LookupEnv(config.TLSCertKey) cert, ok := os.LookupEnv(config.TLSCertKey)
if !ok { if !ok {
return fmt.Errorf("can not get %s from env", config.TLSCertKey) return fmt.Errorf("can not get %s from env", config.TLSCertKey)

View File

@@ -11,7 +11,6 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/cmd/util/podcmd" "k8s.io/kubectl/pkg/cmd/util/podcmd"
"github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/config"
@@ -51,13 +50,7 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR
pair := pod.Spec.Containers[i].Env[j] pair := pod.Spec.Containers[i].Env[j]
if pair.Name == config.EnvInboundPodTunIP && pair.Value == "" { if pair.Name == config.EnvInboundPodTunIP && pair.Value == "" {
found = true found = true
var clientset *kubernetes.Clientset cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
clientset, err = h.f.KubernetesClientSet()
if err != nil {
log.Errorf("can not get clientset, err: %v", err)
return toV1AdmissionResponse(err)
}
cmi := clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
dhcp := handler.NewDHCPManager(cmi, ar.Request.Namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}) dhcp := handler.NewDHCPManager(cmi, ar.Request.Namespace, &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask})
var random *net.IPNet var random *net.IPNet
random, err = dhcp.RentIPRandom() random, err = dhcp.RentIPRandom()
@@ -123,13 +116,7 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR
if envVar.Name == config.EnvInboundPodTunIP && envVar.Value != "" { if envVar.Name == config.EnvInboundPodTunIP && envVar.Value != "" {
ip, cidr, err := net.ParseCIDR(envVar.Value) ip, cidr, err := net.ParseCIDR(envVar.Value)
if err == nil { if err == nil {
var clientset *kubernetes.Clientset cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
clientset, err = h.f.KubernetesClientSet()
if err != nil {
log.Errorf("can not get clientset, err: %v", err)
return toV1AdmissionResponse(err)
}
cmi := clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
ipnet := &net.IPNet{ ipnet := &net.IPNet{
IP: ip, IP: ip,
Mask: cidr.Mask, Mask: cidr.Mask,