speed up fail if port-forward occurs error

This commit is contained in:
p_caiwfeng
2022-04-08 14:54:07 +08:00
parent e34b1c99ab
commit 2043ab3f1b
3 changed files with 21 additions and 17 deletions

View File

@@ -49,7 +49,6 @@ func (s *server) ServeDNS(w miekgdns.ResponseWriter, r *miekgdns.Msg) {
answer, _, err := client.Exchange(r, s.forwardDNS.Servers[0]+":53")
//answer, err := miekgdns.Exchange(r, s.forwardDNS.Servers[0]+":53")
if err != nil {
log.Warnln(err)
err = w.WriteMsg(r)
if err != nil {
log.Warnln(err)

View File

@@ -14,6 +14,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
@@ -138,6 +139,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, port int) error {
var readyChan = make(chan struct{}, 1)
var errChan = make(chan error, 1)
var first = true
podInterface := c.clientset.CoreV1().Pods(c.Namespace)
go func() {
for ctx.Err() == nil {
func() {
@@ -151,11 +153,19 @@ func (c *ConnectOptions) portForward(ctx context.Context, port int) error {
if !first {
readyChan = nil
}
curPodName.Store(podList[0].GetName())
podName := podList[0].GetName()
// if port-forward occurs error, check pod is deleted or not, speed up fail
runtime.ErrorHandlers = []func(error){func(err error) {
pod, err := podInterface.Get(childCtx, podName, metav1.GetOptions{})
if apierrors.IsNotFound(err) || pod.GetDeletionTimestamp() != nil {
cancelFunc()
}
}}
curPodName.Store(podName)
err = util.PortForwardPod(
c.config,
c.restclient,
podList[0].GetName(),
podName,
c.Namespace,
strconv.Itoa(port),
readyChan,
@@ -169,20 +179,14 @@ func (c *ConnectOptions) portForward(ctx context.Context, port int) error {
if err == nil {
return
}
if apierrors.IsNotFound(err) {
log.Errorln("can not found outbound pod, try to create one")
tm := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
if _, err = CreateOutboundPod(c.clientset, c.Namespace, tm.String(), c.cidrs); err != nil {
log.Errorf("error while create traffic manager, will retry after a snap, err: %v", err)
}
} else if strings.Contains(err.Error(), "unable to listen on any of the requested ports") ||
if strings.Contains(err.Error(), "unable to listen on any of the requested ports") ||
strings.Contains(err.Error(), "address already in use") {
log.Errorf("port 10800 already in use, needs to release it manually")
log.Errorf("port %d already in use, needs to release it manually", port)
time.Sleep(time.Second * 5)
} else {
log.Errorf("port-forward occurs error, err: %v, retrying", err)
time.Sleep(time.Second * 2)
}
time.Sleep(time.Second * 2)
}()
}
}()
@@ -196,22 +200,23 @@ func (c *ConnectOptions) portForward(ctx context.Context, port int) error {
time.Sleep(2 * time.Second)
return
}
watchStream, err := c.clientset.CoreV1().Pods(c.Namespace).Watch(childCtx, metav1.ListOptions{
stream, err := podInterface.Watch(childCtx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName.(string)).String(),
})
if apierrors.IsForbidden(err) {
time.Sleep(5 * time.Second)
time.Sleep(30 * time.Second)
return
}
if err != nil {
return
}
defer watchStream.Stop()
defer stream.Stop()
for childCtx.Err() == nil {
select {
case e := <-watchStream.ResultChan():
case e := <-stream.ResultChan():
if e.Type == watch.Deleted {
cancelFunc()
return
}
}
}

View File

@@ -96,7 +96,7 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
p := []string{port}
forwarder, err := portforward.NewOnAddresses(dialer, []string{"0.0.0.0"}, p, stopChan, readyChan, os.Stdout, os.Stderr)
forwarder, err := portforward.NewOnAddresses(dialer, []string{"0.0.0.0"}, p, stopChan, readyChan, nil, os.Stderr)
if err != nil {
log.Error(err)
return err