feat: set cleanup timeout to 5 seconds

This commit is contained in:
fengcaiwen
2023-04-09 17:07:54 +08:00
parent 2b97dd3038
commit b2a6596405
8 changed files with 62 additions and 53 deletions

View File

@@ -1,6 +1,9 @@
package cmds
import (
"fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
@@ -46,7 +49,7 @@ func CmdReset(factory cmdutil.Factory) *cobra.Command {
if err != nil {
log.Fatal(err)
}
log.Println("done")
fmt.Fprint(os.Stdout, "Done")
},
}

View File

@@ -7,6 +7,7 @@ import (
"os/signal"
"strconv"
"syscall"
"time"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
@@ -32,7 +33,9 @@ func (c *ConnectOptions) addCleanUpResourceHandler() {
go func() {
<-stopChan
log.Info("prepare to exit, cleaning up")
err := c.dhcp.ReleaseIP(c.localTunIPv4.IP, c.localTunIPv6.IP)
cleanupCtx, cancelFunc := context.WithTimeout(context.Background(), time.Second*5)
defer cancelFunc()
err := c.dhcp.ReleaseIP(cleanupCtx, c.localTunIPv4.IP, c.localTunIPv6.IP)
if err != nil {
log.Errorf("failed to release ip to dhcp, err: %v", err)
}
@@ -41,16 +44,14 @@ func (c *ConnectOptions) addCleanUpResourceHandler() {
function()
}
}
_ = c.clientset.CoreV1().Pods(c.Namespace).Delete(context.Background(), config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)})
_ = c.clientset.CoreV1().Pods(c.Namespace).Delete(cleanupCtx, config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)})
var count int
count, err = updateRefCount(c.clientset.CoreV1().ConfigMaps(c.Namespace), config.ConfigMapPodTrafficManager, -1)
if err == nil {
// only if ref is zero and deployment is not ready, needs to clean up
if count <= 0 {
deployment, errs := c.clientset.AppsV1().Deployments(c.Namespace).Get(context.Background(), config.ConfigMapPodTrafficManager, v1.GetOptions{})
if errs == nil && deployment.Status.UnavailableReplicas != 0 {
cleanup(c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, true)
}
count, err = updateRefCount(cleanupCtx, c.clientset.CoreV1().ConfigMaps(c.Namespace), config.ConfigMapPodTrafficManager, -1)
// only if ref is zero and deployment is not ready, needs to clean up
if err == nil && count <= 0 {
deployment, errs := c.clientset.AppsV1().Deployments(c.Namespace).Get(cleanupCtx, config.ConfigMapPodTrafficManager, v1.GetOptions{})
if errs == nil && deployment.Status.UnavailableReplicas != 0 {
cleanup(cleanupCtx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, true)
}
}
if err != nil {
@@ -72,7 +73,7 @@ func Cleanup(s os.Signal) {
}
// vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99
func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, increment int) (current int, err error) {
func updateRefCount(ctx context.Context, configMapInterface v12.ConfigMapInterface, name string, increment int) (current int, err error) {
err = retry.OnError(
retry.DefaultRetry,
func(err error) bool {
@@ -88,7 +89,7 @@ func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, incr
},
func() (err error) {
var cm *corev1.ConfigMap
cm, err = configMapInterface.Get(context.Background(), name, v1.GetOptions{})
cm, err = configMapInterface.Get(ctx, name, v1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return err
@@ -102,7 +103,7 @@ func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, incr
newVal = 0
}
p := []byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyRefCount, strconv.Itoa(newVal)))
_, err = configMapInterface.Patch(context.Background(), name, types.MergePatchType, p, v1.PatchOptions{})
_, err = configMapInterface.Patch(ctx, name, types.MergePatchType, p, v1.PatchOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return err
@@ -117,7 +118,7 @@ func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, incr
}
log.Info("update ref count successfully")
var cm *corev1.ConfigMap
cm, err = configMapInterface.Get(context.Background(), name, v1.GetOptions{})
cm, err = configMapInterface.Get(ctx, name, v1.GetOptions{})
if err != nil {
err = fmt.Errorf("failed to get cm: %s, err: %v", name, err)
return
@@ -129,26 +130,25 @@ func updateRefCount(configMapInterface v12.ConfigMapInterface, name string, incr
return
}
func cleanup(clientset *kubernetes.Clientset, namespace, name string, keepCIDR bool) {
func cleanup(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string, keepCIDR bool) {
options := v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}
ctx1 := context.Background()
if keepCIDR {
// keep configmap
p := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/data/%s"},{"op": "remove", "path": "/data/%s"}]`, config.KeyDHCP, config.KeyDHCP6))
_, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx1, name, types.JSONPatchType, p, v1.PatchOptions{})
_, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx, name, types.JSONPatchType, p, v1.PatchOptions{})
p = []byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyRefCount, strconv.Itoa(0)))
_, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx1, name, types.MergePatchType, p, v1.PatchOptions{})
_, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx, name, types.MergePatchType, p, v1.PatchOptions{})
} else {
_ = clientset.CoreV1().ConfigMaps(namespace).Delete(ctx1, name, options)
_ = clientset.CoreV1().ConfigMaps(namespace).Delete(ctx, name, options)
}
_ = clientset.CoreV1().Pods(namespace).Delete(ctx1, config.CniNetName, options)
_ = clientset.CoreV1().Secrets(namespace).Delete(ctx1, name, options)
_ = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(ctx1, name+"."+namespace, options)
_ = clientset.RbacV1().RoleBindings(namespace).Delete(ctx1, name, options)
_ = clientset.CoreV1().ServiceAccounts(namespace).Delete(ctx1, name, options)
_ = clientset.RbacV1().Roles(namespace).Delete(ctx1, name, options)
_ = clientset.CoreV1().Services(namespace).Delete(ctx1, name, options)
_ = clientset.AppsV1().Deployments(namespace).Delete(ctx1, name, options)
_ = clientset.CoreV1().Pods(namespace).Delete(ctx, config.CniNetName, options)
_ = clientset.CoreV1().Secrets(namespace).Delete(ctx, name, options)
_ = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(ctx, name+"."+namespace, options)
_ = clientset.RbacV1().RoleBindings(namespace).Delete(ctx, name, options)
_ = clientset.CoreV1().ServiceAccounts(namespace).Delete(ctx, name, options)
_ = clientset.RbacV1().Roles(namespace).Delete(ctx, name, options)
_ = clientset.CoreV1().Services(namespace).Delete(ctx, name, options)
_ = clientset.AppsV1().Deployments(namespace).Delete(ctx, name, options)
}

View File

@@ -79,8 +79,8 @@ type ConnectOptions struct {
localTunIPv6 *net.IPNet
}
func (c *ConnectOptions) createRemoteInboundPod(ctx1 context.Context) (err error) {
c.localTunIPv4, c.localTunIPv6, err = c.dhcp.RentIPBaseNICAddress()
func (c *ConnectOptions) createRemoteInboundPod(ctx context.Context) (err error) {
c.localTunIPv4, c.localTunIPv6, err = c.dhcp.RentIPBaseNICAddress(ctx)
if err != nil {
return
}
@@ -92,9 +92,9 @@ func (c *ConnectOptions) createRemoteInboundPod(ctx1 context.Context) (err error
}
// means mesh mode
if len(c.Headers) != 0 {
err = InjectVPNAndEnvoySidecar(ctx1, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, configInfo, c.Headers)
err = InjectVPNAndEnvoySidecar(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, configInfo, c.Headers)
} else {
err = InjectVPNSidecar(ctx1, c.factory, c.Namespace, workload, configInfo)
err = InjectVPNSidecar(ctx, c.factory, c.Namespace, workload, configInfo)
}
if err != nil {
return err

View File

@@ -73,9 +73,9 @@ func (d *DHCPManager) initDHCP(ctx context.Context) error {
return nil
}
func (d *DHCPManager) RentIPBaseNICAddress() (*net.IPNet, *net.IPNet, error) {
func (d *DHCPManager) RentIPBaseNICAddress(ctx context.Context) (*net.IPNet, *net.IPNet, error) {
var v4, v6 net.IP
err := d.updateDHCPConfigMap(func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
err := d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
if v4, err = ipv4.AllocateNext(); err != nil {
return err
}
@@ -90,9 +90,9 @@ func (d *DHCPManager) RentIPBaseNICAddress() (*net.IPNet, *net.IPNet, error) {
return &net.IPNet{IP: v4, Mask: d.cidr.Mask}, &net.IPNet{IP: v6, Mask: d.cidr6.Mask}, nil
}
func (d *DHCPManager) RentIPRandom() (*net.IPNet, *net.IPNet, error) {
func (d *DHCPManager) RentIPRandom(ctx context.Context) (*net.IPNet, *net.IPNet, error) {
var v4, v6 net.IP
err := d.updateDHCPConfigMap(func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
err := d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
if v4, err = ipv4.AllocateNext(); err != nil {
return err
}
@@ -108,8 +108,8 @@ func (d *DHCPManager) RentIPRandom() (*net.IPNet, *net.IPNet, error) {
return &net.IPNet{IP: v4, Mask: d.cidr.Mask}, &net.IPNet{IP: v6, Mask: d.cidr6.Mask}, nil
}
func (d *DHCPManager) ReleaseIP(ips ...net.IP) error {
return d.updateDHCPConfigMap(func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error {
func (d *DHCPManager) ReleaseIP(ctx context.Context, ips ...net.IP) error {
return d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error {
for _, ip := range ips {
var use *ipallocator.Range
if ip.To4() != nil {
@@ -125,21 +125,23 @@ func (d *DHCPManager) ReleaseIP(ips ...net.IP) error {
})
}
func (d *DHCPManager) updateDHCPConfigMap(f func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error) error {
cm, err := d.client.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{})
func (d *DHCPManager) updateDHCPConfigMap(ctx context.Context, f func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error) error {
cm, err := d.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get cm DHCP server, err: %v", err)
}
if cm.Data == nil {
cm.Data = make(map[string]string)
}
dhcp, err := ipallocator.NewAllocatorCIDRRange(d.cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
var dhcp *ipallocator.Range
dhcp, err = ipallocator.NewAllocatorCIDRRange(d.cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
str, err := base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP])
var str []byte
str, err = base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP])
if err == nil {
err = dhcp.Restore(d.cidr, str)
if err != nil {
@@ -147,7 +149,8 @@ func (d *DHCPManager) updateDHCPConfigMap(f func(ipv4 *ipallocator.Range, ipv6 *
}
}
dhcp6, err := ipallocator.NewAllocatorCIDRRange(d.cidr6, func(max int, rangeSpec string) (allocator.Interface, error) {
var dhcp6 *ipallocator.Range
dhcp6, err = ipallocator.NewAllocatorCIDRRange(d.cidr6, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
@@ -177,7 +180,7 @@ func (d *DHCPManager) updateDHCPConfigMap(f func(ipv4 *ipallocator.Range, ipv6 *
}
cm.Data[key] = base64.StdEncoding.EncodeToString(bytes)
}
_, err = d.client.Update(context.Background(), cm, metav1.UpdateOptions{})
_, err = d.client.Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update dhcp failed, err: %v", err)
}

View File

@@ -46,7 +46,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
if err == nil {
_, err = polymorphichelpers.AttachablePodForObjectFn(factory, service, 2*time.Second)
if err == nil {
_, err = updateRefCount(clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1)
_, err = updateRefCount(ctx, clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1)
if err != nil {
return
}
@@ -442,7 +442,7 @@ out:
if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create MutatingWebhookConfigurations, err: %v", err)
}
_, err = updateRefCount(clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1)
_, err = updateRefCount(ctx, clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1)
if err != nil {
return
}

View File

@@ -18,8 +18,8 @@ import (
// Reset
// 1, get all proxy-resources from configmap
// 2, cleanup all containers
func (c *ConnectOptions) Reset(ctx2 context.Context) error {
cm, err := c.clientset.CoreV1().ConfigMaps(c.Namespace).Get(ctx2, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
func (c *ConnectOptions) Reset(ctx context.Context) error {
cm, err := c.clientset.CoreV1().ConfigMaps(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
@@ -44,7 +44,7 @@ func (c *ConnectOptions) Reset(ctx2 context.Context) error {
}
}
}
cleanup(c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false)
cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false)
var cli *client.Client
if cli, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()); err != nil {
return nil

View File

@@ -1,6 +1,7 @@
package webhook
import (
"context"
"fmt"
"net"
"net/http"
@@ -21,11 +22,12 @@ type dhcpServer struct {
func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) {
podName := r.Header.Get(config.HeaderPodName)
namespace := r.Header.Get(config.HeaderPodNamespace)
ctx := context.Background()
log.Infof("handling rent ip request, pod name: %s, ns: %s", podName, namespace)
cmi := d.clientset.CoreV1().ConfigMaps(namespace)
dhcp := handler.NewDHCPManager(cmi, namespace)
v4, v6, err := dhcp.RentIPRandom()
v4, v6, err := dhcp.RentIPRandom(ctx)
if err != nil {
log.Error(err)
w.WriteHeader(http.StatusBadRequest)
@@ -56,7 +58,7 @@ func (d *dhcpServer) releaseIP(w http.ResponseWriter, r *http.Request) {
log.Infof("handling release ip request, pod name: %s, ns: %s", podName, namespace)
cmi := d.clientset.CoreV1().ConfigMaps(namespace)
dhcp := handler.NewDHCPManager(cmi, namespace)
if err := dhcp.ReleaseIP(ips...); err != nil {
if err := dhcp.ReleaseIP(context.Background(), ips...); err != nil {
log.Error(err)
w.WriteHeader(http.StatusBadRequest)
return

View File

@@ -1,6 +1,7 @@
package webhook
import (
"context"
"encoding/json"
"fmt"
"net"
@@ -53,7 +54,7 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR
found = true
cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
dhcp := handler.NewDHCPManager(cmi, ar.Request.Namespace)
v4, v6, err = dhcp.RentIPRandom()
v4, v6, err = dhcp.RentIPRandom(context.Background())
if err != nil {
log.Errorf("rent ip random failed, err: %v", err)
return toV1AdmissionResponse(err)
@@ -128,7 +129,7 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR
}
}
cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
err := handler.NewDHCPManager(cmi, ar.Request.Namespace).ReleaseIP(ips...)
err := handler.NewDHCPManager(cmi, ar.Request.Namespace).ReleaseIP(context.Background(), ips...)
if err != nil {
log.Errorf("release ip to dhcp err: %v, ips: %v", err, ips)
} else {