From f826357baefa4cabb25f2371a02e2cacc8d604fc Mon Sep 17 00:00:00 2001 From: wencaiwulue <895703375@qq.com> Date: Tue, 27 Dec 2022 11:52:25 +0800 Subject: [PATCH] feat: optimize check pod status logic for redo port-forward --- pkg/handler/connect.go | 84 ++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 309fb844..48146060 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -7,7 +7,6 @@ import ( "os" "strconv" "strings" - "sync/atomic" "time" "github.com/pkg/errors" @@ -22,10 +21,12 @@ import ( "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" + v12 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/polymorphichelpers" "k8s.io/kubectl/pkg/scheme" + "k8s.io/utils/pointer" "github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/core" @@ -140,35 +141,33 @@ func (c *ConnectOptions) DoConnect() (err error) { // detect pod is delete event, if pod is deleted, needs to redo port-forward immediately func (c *ConnectOptions) portForward(ctx context.Context, port string) error { - var childCtx context.Context - var cancelFunc context.CancelFunc - var curPodName = &atomic.Value{} var readyChan = make(chan struct{}, 1) var errChan = make(chan error, 1) - var first = true podInterface := c.clientset.CoreV1().Pods(c.Namespace) go func() { + var first = pointer.Bool(true) for ctx.Err() == nil { func() { podList, err := c.GetRunningPodList() if err != nil { - time.Sleep(time.Second * 1) + time.Sleep(time.Second * 3) return } - childCtx, cancelFunc = context.WithCancel(ctx) + childCtx, cancelFunc := context.WithCancel(ctx) defer cancelFunc() - if !first { + if !*first { readyChan = nil } podName := podList[0].GetName() // if port-forward occurs error, check pod is deleted or not, speed up fail runtime.ErrorHandlers = []func(error){func(err error) { pod, err := podInterface.Get(childCtx, podName, metav1.GetOptions{}) - if apierrors.IsNotFound(err) || pod.GetDeletionTimestamp() != nil { + if apierrors.IsNotFound(err) || (pod != nil && pod.GetDeletionTimestamp() != nil) { cancelFunc() } }} - curPodName.Store(podName) + // try to detect pod is delete event, if pod is deleted, needs to redo port-forward + go checkPodStatus(childCtx, cancelFunc, podName, podInterface) err = util.PortForwardPod( c.config, c.restclient, @@ -178,10 +177,10 @@ func (c *ConnectOptions) portForward(ctx context.Context, port string) error { readyChan, childCtx.Done(), ) - if first { + if *first { errChan <- err } - first = false + first = pointer.Bool(false) // exit normal, let context.err to judge to exit or not if err == nil { return @@ -197,39 +196,6 @@ func (c *ConnectOptions) portForward(ctx context.Context, port string) error { }() } }() - - // try to detect pod is delete event, if pod is deleted, needs to redo port-forward - go func() { - for ctx.Err() == nil { - func() { - podName := curPodName.Load() - if podName == nil || childCtx == nil || cancelFunc == nil { - time.Sleep(2 * time.Second) - return - } - stream, err := podInterface.Watch(childCtx, metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", podName.(string)).String(), - }) - if apierrors.IsForbidden(err) { - time.Sleep(30 * time.Second) - return - } - if err != nil { - return - } - defer stream.Stop() - for childCtx.Err() == nil { - select { - case e := <-stream.ResultChan(): - if e.Type == watch.Deleted { - cancelFunc() - return - } - } - } - }() - } - }() select { case <-time.Tick(time.Second * 60): return errors.New("port forward timeout") @@ -241,6 +207,34 @@ func (c *ConnectOptions) portForward(ctx context.Context, port string) error { } } +func checkPodStatus(cCtx context.Context, cFunc context.CancelFunc, podName string, podInterface v12.PodInterface) { + w, err := podInterface.Watch(cCtx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(), + }) + if err != nil { + return + } + defer w.Stop() + for { + select { + case e := <-w.ResultChan(): + switch e.Type { + case watch.Deleted: + cFunc() + return + case watch.Error: + return + case watch.Added, watch.Modified, watch.Bookmark: + // do nothing + default: + return + } + case <-cCtx.Done(): + return + } + } +} + func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string) (err error) { // todo figure it out why if util.IsWindows() {