From b2a6596405478b898b1d28b1593f73b94c97f880 Mon Sep 17 00:00:00 2001 From: fengcaiwen Date: Sun, 9 Apr 2023 17:07:54 +0800 Subject: [PATCH] feat: set cleanup timeout to 5 seconds --- cmd/kubevpn/cmds/reset.go | 5 +++- pkg/handler/cleaner.go | 54 +++++++++++++++++++-------------------- pkg/handler/connect.go | 8 +++--- pkg/handler/dhcp.go | 27 +++++++++++--------- pkg/handler/remote.go | 4 +-- pkg/handler/reset.go | 6 ++--- pkg/webhook/dhcp.go | 6 +++-- pkg/webhook/pods.go | 5 ++-- 8 files changed, 62 insertions(+), 53 deletions(-) diff --git a/cmd/kubevpn/cmds/reset.go b/cmd/kubevpn/cmds/reset.go index 871853ed..5b15b066 100644 --- a/cmd/kubevpn/cmds/reset.go +++ b/cmd/kubevpn/cmds/reset.go @@ -1,6 +1,9 @@ package cmds import ( + "fmt" + "os" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" cmdutil "k8s.io/kubectl/pkg/cmd/util" @@ -46,7 +49,7 @@ func CmdReset(factory cmdutil.Factory) *cobra.Command { if err != nil { log.Fatal(err) } - log.Println("done") + fmt.Fprint(os.Stdout, "Done") }, } diff --git a/pkg/handler/cleaner.go b/pkg/handler/cleaner.go index 7f12c778..2a05558a 100644 --- a/pkg/handler/cleaner.go +++ b/pkg/handler/cleaner.go @@ -7,6 +7,7 @@ import ( "os/signal" "strconv" "syscall" + "time" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -32,7 +33,9 @@ func (c *ConnectOptions) addCleanUpResourceHandler() { go func() { <-stopChan log.Info("prepare to exit, cleaning up") - err := c.dhcp.ReleaseIP(c.localTunIPv4.IP, c.localTunIPv6.IP) + cleanupCtx, cancelFunc := context.WithTimeout(context.Background(), time.Second*5) + defer cancelFunc() + err := c.dhcp.ReleaseIP(cleanupCtx, c.localTunIPv4.IP, c.localTunIPv6.IP) if err != nil { log.Errorf("failed to release ip to dhcp, err: %v", err) } @@ -41,16 +44,14 @@ func (c *ConnectOptions) addCleanUpResourceHandler() { function() } } - _ = c.clientset.CoreV1().Pods(c.Namespace).Delete(context.Background(), config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}) + _ = c.clientset.CoreV1().Pods(c.Namespace).Delete(cleanupCtx, config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}) var count int - count, err = updateRefCount(c.clientset.CoreV1().ConfigMaps(c.Namespace), config.ConfigMapPodTrafficManager, -1) - if err == nil { - // only if ref is zero and deployment is not ready, needs to clean up - if count <= 0 { - deployment, errs := c.clientset.AppsV1().Deployments(c.Namespace).Get(context.Background(), config.ConfigMapPodTrafficManager, v1.GetOptions{}) - if errs == nil && deployment.Status.UnavailableReplicas != 0 { - cleanup(c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, true) - } + count, err = updateRefCount(cleanupCtx, c.clientset.CoreV1().ConfigMaps(c.Namespace), config.ConfigMapPodTrafficManager, -1) + // only if ref is zero and deployment is not ready, needs to clean up + if err == nil && count <= 0 { + deployment, errs := c.clientset.AppsV1().Deployments(c.Namespace).Get(cleanupCtx, config.ConfigMapPodTrafficManager, v1.GetOptions{}) + if errs == nil && deployment.Status.UnavailableReplicas != 0 { + cleanup(cleanupCtx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, true) } } if err != nil { @@ -72,7 +73,7 @@ func Cleanup(s os.Signal) { } // vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99 -func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, increment int) (current int, err error) { +func updateRefCount(ctx context.Context, configMapInterface v12.ConfigMapInterface, name string, increment int) (current int, err error) { err = retry.OnError( retry.DefaultRetry, func(err error) bool { @@ -88,7 +89,7 @@ func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, incr }, func() (err error) { var cm *corev1.ConfigMap - cm, err = configMapInterface.Get(context.Background(), name, v1.GetOptions{}) + cm, err = configMapInterface.Get(ctx, name, v1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { return err @@ -102,7 +103,7 @@ func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, incr newVal = 0 } p := []byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyRefCount, strconv.Itoa(newVal))) - _, err = configMapInterface.Patch(context.Background(), name, types.MergePatchType, p, v1.PatchOptions{}) + _, err = configMapInterface.Patch(ctx, name, types.MergePatchType, p, v1.PatchOptions{}) if err != nil { if k8serrors.IsNotFound(err) { return err @@ -117,7 +118,7 @@ func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, incr } log.Info("update ref count successfully") var cm *corev1.ConfigMap - cm, err = configMapInterface.Get(context.Background(), name, v1.GetOptions{}) + cm, err = configMapInterface.Get(ctx, name, v1.GetOptions{}) if err != nil { err = fmt.Errorf("failed to get cm: %s, err: %v", name, err) return @@ -129,26 +130,25 @@ func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, incr return } -func cleanup(clientset *kubernetes.Clientset, namespace, name string, keepCIDR bool) { +func cleanup(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string, keepCIDR bool) { options := v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)} - ctx1 := context.Background() if keepCIDR { // keep configmap p := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/data/%s"},{"op": "remove", "path": "/data/%s"}]`, config.KeyDHCP, config.KeyDHCP6)) - _, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx1, name, types.JSONPatchType, p, v1.PatchOptions{}) + _, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx, name, types.JSONPatchType, p, v1.PatchOptions{}) p = []byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyRefCount, strconv.Itoa(0))) - _, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx1, name, types.MergePatchType, p, v1.PatchOptions{}) + _, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx, name, types.MergePatchType, p, v1.PatchOptions{}) } else { - _ = clientset.CoreV1().ConfigMaps(namespace).Delete(ctx1, name, options) + _ = clientset.CoreV1().ConfigMaps(namespace).Delete(ctx, name, options) } - _ = clientset.CoreV1().Pods(namespace).Delete(ctx1, config.CniNetName, options) - _ = clientset.CoreV1().Secrets(namespace).Delete(ctx1, name, options) - _ = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(ctx1, name+"."+namespace, options) - _ = clientset.RbacV1().RoleBindings(namespace).Delete(ctx1, name, options) - _ = clientset.CoreV1().ServiceAccounts(namespace).Delete(ctx1, name, options) - _ = clientset.RbacV1().Roles(namespace).Delete(ctx1, name, options) - _ = clientset.CoreV1().Services(namespace).Delete(ctx1, name, options) - _ = clientset.AppsV1().Deployments(namespace).Delete(ctx1, name, options) + _ = clientset.CoreV1().Pods(namespace).Delete(ctx, config.CniNetName, options) + _ = clientset.CoreV1().Secrets(namespace).Delete(ctx, name, options) + _ = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(ctx, name+"."+namespace, options) + _ = clientset.RbacV1().RoleBindings(namespace).Delete(ctx, name, options) + _ = clientset.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, options) + _ = clientset.RbacV1().Roles(namespace).Delete(ctx, name, options) + _ = clientset.CoreV1().Services(namespace).Delete(ctx, name, options) + _ = clientset.AppsV1().Deployments(namespace).Delete(ctx, name, options) } diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 79ab9830..8a6a7bb5 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -79,8 +79,8 @@ type ConnectOptions struct { localTunIPv6 *net.IPNet } -func (c *ConnectOptions) createRemoteInboundPod(ctx1 context.Context) (err error) { - c.localTunIPv4, c.localTunIPv6, err = c.dhcp.RentIPBaseNICAddress() +func (c *ConnectOptions) createRemoteInboundPod(ctx context.Context) (err error) { + c.localTunIPv4, c.localTunIPv6, err = c.dhcp.RentIPBaseNICAddress(ctx) if err != nil { return } @@ -92,9 +92,9 @@ func (c *ConnectOptions) createRemoteInboundPod(ctx1 context.Context) (err error } // means mesh mode if len(c.Headers) != 0 { - err = InjectVPNAndEnvoySidecar(ctx1, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, configInfo, c.Headers) + err = InjectVPNAndEnvoySidecar(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, configInfo, c.Headers) } else { - err = InjectVPNSidecar(ctx1, c.factory, c.Namespace, workload, configInfo) + err = InjectVPNSidecar(ctx, c.factory, c.Namespace, workload, configInfo) } if err != nil { return err diff --git a/pkg/handler/dhcp.go b/pkg/handler/dhcp.go index 85fd1b94..a4e78177 100644 --- a/pkg/handler/dhcp.go +++ b/pkg/handler/dhcp.go @@ -73,9 +73,9 @@ func (d *DHCPManager) initDHCP(ctx context.Context) error { return nil } -func (d *DHCPManager) RentIPBaseNICAddress() (*net.IPNet, *net.IPNet, error) { +func (d *DHCPManager) RentIPBaseNICAddress(ctx context.Context) (*net.IPNet, *net.IPNet, error) { var v4, v6 net.IP - err := d.updateDHCPConfigMap(func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) { + err := d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) { if v4, err = ipv4.AllocateNext(); err != nil { return err } @@ -90,9 +90,9 @@ func (d *DHCPManager) RentIPBaseNICAddress() (*net.IPNet, *net.IPNet, error) { return &net.IPNet{IP: v4, Mask: d.cidr.Mask}, &net.IPNet{IP: v6, Mask: d.cidr6.Mask}, nil } -func (d *DHCPManager) RentIPRandom() (*net.IPNet, *net.IPNet, error) { +func (d *DHCPManager) RentIPRandom(ctx context.Context) (*net.IPNet, *net.IPNet, error) { var v4, v6 net.IP - err := d.updateDHCPConfigMap(func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) { + err := d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) { if v4, err = ipv4.AllocateNext(); err != nil { return err } @@ -108,8 +108,8 @@ func (d *DHCPManager) RentIPRandom() (*net.IPNet, *net.IPNet, error) { return &net.IPNet{IP: v4, Mask: d.cidr.Mask}, &net.IPNet{IP: v6, Mask: d.cidr6.Mask}, nil } -func (d *DHCPManager) ReleaseIP(ips ...net.IP) error { - return d.updateDHCPConfigMap(func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error { +func (d *DHCPManager) ReleaseIP(ctx context.Context, ips ...net.IP) error { + return d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error { for _, ip := range ips { var use *ipallocator.Range if ip.To4() != nil { @@ -125,21 +125,23 @@ func (d *DHCPManager) ReleaseIP(ips ...net.IP) error { }) } -func (d *DHCPManager) updateDHCPConfigMap(f func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error) error { - cm, err := d.client.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{}) +func (d *DHCPManager) updateDHCPConfigMap(ctx context.Context, f func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error) error { + cm, err := d.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get cm DHCP server, err: %v", err) } if cm.Data == nil { cm.Data = make(map[string]string) } - dhcp, err := ipallocator.NewAllocatorCIDRRange(d.cidr, func(max int, rangeSpec string) (allocator.Interface, error) { + var dhcp *ipallocator.Range + dhcp, err = ipallocator.NewAllocatorCIDRRange(d.cidr, func(max int, rangeSpec string) (allocator.Interface, error) { return allocator.NewContiguousAllocationMap(max, rangeSpec), nil }) if err != nil { return err } - str, err := base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP]) + var str []byte + str, err = base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP]) if err == nil { err = dhcp.Restore(d.cidr, str) if err != nil { @@ -147,7 +149,8 @@ func (d *DHCPManager) updateDHCPConfigMap(f func(ipv4 *ipallocator.Range, ipv6 * } } - dhcp6, err := ipallocator.NewAllocatorCIDRRange(d.cidr6, func(max int, rangeSpec string) (allocator.Interface, error) { + var dhcp6 *ipallocator.Range + dhcp6, err = ipallocator.NewAllocatorCIDRRange(d.cidr6, func(max int, rangeSpec string) (allocator.Interface, error) { return allocator.NewContiguousAllocationMap(max, rangeSpec), nil }) if err != nil { @@ -177,7 +180,7 @@ func (d *DHCPManager) updateDHCPConfigMap(f func(ipv4 *ipallocator.Range, ipv6 * } cm.Data[key] = base64.StdEncoding.EncodeToString(bytes) } - _, err = d.client.Update(context.Background(), cm, metav1.UpdateOptions{}) + _, err = d.client.Update(ctx, cm, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("update dhcp failed, err: %v", err) } diff --git a/pkg/handler/remote.go b/pkg/handler/remote.go index 84a79fc8..56214a79 100644 --- a/pkg/handler/remote.go +++ b/pkg/handler/remote.go @@ -46,7 +46,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset * if err == nil { _, err = polymorphichelpers.AttachablePodForObjectFn(factory, service, 2*time.Second) if err == nil { - _, err = updateRefCount(clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) + _, err = updateRefCount(ctx, clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) if err != nil { return } @@ -442,7 +442,7 @@ out: if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) { return fmt.Errorf("failed to create MutatingWebhookConfigurations, err: %v", err) } - _, err = updateRefCount(clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) + _, err = updateRefCount(ctx, clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) if err != nil { return } diff --git a/pkg/handler/reset.go b/pkg/handler/reset.go index c6b98983..4c59c68b 100644 --- a/pkg/handler/reset.go +++ b/pkg/handler/reset.go @@ -18,8 +18,8 @@ import ( // Reset // 1, get all proxy-resources from configmap // 2, cleanup all containers -func (c *ConnectOptions) Reset(ctx2 context.Context) error { - cm, err := c.clientset.CoreV1().ConfigMaps(c.Namespace).Get(ctx2, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) +func (c *ConnectOptions) Reset(ctx context.Context) error { + cm, err := c.clientset.CoreV1().ConfigMaps(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err } @@ -44,7 +44,7 @@ func (c *ConnectOptions) Reset(ctx2 context.Context) error { } } } - cleanup(c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false) + cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false) var cli *client.Client if cli, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()); err != nil { return nil diff --git a/pkg/webhook/dhcp.go b/pkg/webhook/dhcp.go index db611161..80f51acb 100644 --- a/pkg/webhook/dhcp.go +++ b/pkg/webhook/dhcp.go @@ -1,6 +1,7 @@ package webhook import ( + "context" "fmt" "net" "net/http" @@ -21,11 +22,12 @@ type dhcpServer struct { func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) { podName := r.Header.Get(config.HeaderPodName) namespace := r.Header.Get(config.HeaderPodNamespace) + ctx := context.Background() log.Infof("handling rent ip request, pod name: %s, ns: %s", podName, namespace) cmi := d.clientset.CoreV1().ConfigMaps(namespace) dhcp := handler.NewDHCPManager(cmi, namespace) - v4, v6, err := dhcp.RentIPRandom() + v4, v6, err := dhcp.RentIPRandom(ctx) if err != nil { log.Error(err) w.WriteHeader(http.StatusBadRequest) @@ -56,7 +58,7 @@ func (d *dhcpServer) releaseIP(w http.ResponseWriter, r *http.Request) { log.Infof("handling release ip request, pod name: %s, ns: %s", podName, namespace) cmi := d.clientset.CoreV1().ConfigMaps(namespace) dhcp := handler.NewDHCPManager(cmi, namespace) - if err := dhcp.ReleaseIP(ips...); err != nil { + if err := dhcp.ReleaseIP(context.Background(), ips...); err != nil { log.Error(err) w.WriteHeader(http.StatusBadRequest) return diff --git a/pkg/webhook/pods.go b/pkg/webhook/pods.go index 78f80093..89c0de79 100644 --- a/pkg/webhook/pods.go +++ b/pkg/webhook/pods.go @@ -1,6 +1,7 @@ package webhook import ( + "context" "encoding/json" "fmt" "net" @@ -53,7 +54,7 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR found = true cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace) dhcp := handler.NewDHCPManager(cmi, ar.Request.Namespace) - v4, v6, err = dhcp.RentIPRandom() + v4, v6, err = dhcp.RentIPRandom(context.Background()) if err != nil { log.Errorf("rent ip random failed, err: %v", err) return toV1AdmissionResponse(err) @@ -128,7 +129,7 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR } } cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace) - err := handler.NewDHCPManager(cmi, ar.Request.Namespace).ReleaseIP(ips...) + err := handler.NewDHCPManager(cmi, ar.Request.Namespace).ReleaseIP(context.Background(), ips...) if err != nil { log.Errorf("release ip to dhcp err: %v, ips: %v", err, ips) } else {