From 2043ab3f1bb25b408ac4870963754f5656dacbee Mon Sep 17 00:00:00 2001 From: p_caiwfeng Date: Fri, 8 Apr 2022 14:54:07 +0800 Subject: [PATCH] speed up fail if port-forward occurs error --- dns/dns_server.go | 1 - pkg/connect.go | 35 ++++++++++++++++++++--------------- util/util.go | 2 +- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/dns/dns_server.go b/dns/dns_server.go index 6a3f9723..18a2090b 100644 --- a/dns/dns_server.go +++ b/dns/dns_server.go @@ -49,7 +49,6 @@ func (s *server) ServeDNS(w miekgdns.ResponseWriter, r *miekgdns.Msg) { answer, _, err := client.Exchange(r, s.forwardDNS.Servers[0]+":53") //answer, err := miekgdns.Exchange(r, s.forwardDNS.Servers[0]+":53") if err != nil { - log.Warnln(err) err = w.WriteMsg(r) if err != nil { log.Warnln(err) diff --git a/pkg/connect.go b/pkg/connect.go index 5c4ff8d5..2059e58f 100644 --- a/pkg/connect.go +++ b/pkg/connect.go @@ -14,6 +14,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -138,6 +139,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, port int) error { var readyChan = make(chan struct{}, 1) var errChan = make(chan error, 1) var first = true + podInterface := c.clientset.CoreV1().Pods(c.Namespace) go func() { for ctx.Err() == nil { func() { @@ -151,11 +153,19 @@ func (c *ConnectOptions) portForward(ctx context.Context, port int) error { if !first { readyChan = nil } - curPodName.Store(podList[0].GetName()) + podName := podList[0].GetName() + // if port-forward occurs error, check pod is deleted or not, speed up fail + runtime.ErrorHandlers = []func(error){func(err error) { + pod, err := podInterface.Get(childCtx, podName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) || pod.GetDeletionTimestamp() != nil { + cancelFunc() + } + }} + curPodName.Store(podName) err = util.PortForwardPod( c.config, c.restclient, - podList[0].GetName(), + podName, c.Namespace, strconv.Itoa(port), readyChan, @@ -169,20 +179,14 @@ func (c *ConnectOptions) portForward(ctx context.Context, port int) error { if err == nil { return } - if apierrors.IsNotFound(err) { - log.Errorln("can not found outbound pod, try to create one") - tm := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask} - if _, err = CreateOutboundPod(c.clientset, c.Namespace, tm.String(), c.cidrs); err != nil { - log.Errorf("error while create traffic manager, will retry after a snap, err: %v", err) - } - } else if strings.Contains(err.Error(), "unable to listen on any of the requested ports") || + if strings.Contains(err.Error(), "unable to listen on any of the requested ports") || strings.Contains(err.Error(), "address already in use") { - log.Errorf("port 10800 already in use, needs to release it manually") + log.Errorf("port %d already in use, needs to release it manually", port) time.Sleep(time.Second * 5) } else { log.Errorf("port-forward occurs error, err: %v, retrying", err) + time.Sleep(time.Second * 2) } - time.Sleep(time.Second * 2) }() } }() @@ -196,22 +200,23 @@ func (c *ConnectOptions) portForward(ctx context.Context, port int) error { time.Sleep(2 * time.Second) return } - watchStream, err := c.clientset.CoreV1().Pods(c.Namespace).Watch(childCtx, metav1.ListOptions{ + stream, err := podInterface.Watch(childCtx, metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", podName.(string)).String(), }) if apierrors.IsForbidden(err) { - time.Sleep(5 * time.Second) + time.Sleep(30 * time.Second) return } if err != nil { return } - defer watchStream.Stop() + defer stream.Stop() for childCtx.Err() == nil { select { - case e := <-watchStream.ResultChan(): + case e := <-stream.ResultChan(): if e.Type == watch.Deleted { cancelFunc() + return } } } diff --git a/util/util.go b/util/util.go index 9926a8a1..3a513121 100644 --- a/util/util.go +++ b/util/util.go @@ -96,7 +96,7 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) p := []string{port} - forwarder, err := portforward.NewOnAddresses(dialer, []string{"0.0.0.0"}, p, stopChan, readyChan, os.Stdout, os.Stderr) + forwarder, err := portforward.NewOnAddresses(dialer, []string{"0.0.0.0"}, p, stopChan, readyChan, nil, os.Stderr) if err != nil { log.Error(err) return err