mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-12-24 11:51:13 +08:00
hotfix: fix port-forward retry bug (#366)
This commit is contained in:
@@ -276,7 +276,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
|
||||
var readyChan = make(chan struct{})
|
||||
podName := podList[0].GetName()
|
||||
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
|
||||
//go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace))
|
||||
go util.CheckPodStatus(childCtx, cancelFunc, podName, c.clientset.CoreV1().Pods(c.Namespace))
|
||||
go util.CheckPortStatus(childCtx, cancelFunc, readyChan, strings.Split(portPair[1], ":")[0])
|
||||
if *first {
|
||||
go func() {
|
||||
|
||||
@@ -18,14 +18,12 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/cli-runtime/pkg/genericiooptions"
|
||||
"k8s.io/cli-runtime/pkg/resource"
|
||||
@@ -171,11 +169,20 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
|
||||
return err
|
||||
}
|
||||
|
||||
if err = forwarder.ForwardPorts(); err != nil {
|
||||
log.Debugf("Forward port error: %s", err.Error())
|
||||
defer forwarder.Close()
|
||||
|
||||
var errChan = make(chan error, 1)
|
||||
go func() {
|
||||
errChan <- forwarder.ForwardPorts()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err = <-errChan:
|
||||
log.Debugf("Forward port error: %v", err)
|
||||
return err
|
||||
case <-stopChan:
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetTopOwnerReference(factory util.Factory, ns, workload string) (*resource.Info, error) {
|
||||
@@ -330,36 +337,42 @@ func FindContainerByName(pod *corev1.Pod, name string) (*corev1.Container, int)
|
||||
}
|
||||
|
||||
func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName string, podInterface v12.PodInterface) {
|
||||
var verifyAPIServerConnection = func() {
|
||||
err := retry.OnError(
|
||||
retry.DefaultBackoff,
|
||||
func(err error) bool {
|
||||
return err != nil
|
||||
},
|
||||
func() error {
|
||||
ctx1, cancelFunc1 := context.WithTimeout(ctx, time.Second*10)
|
||||
defer cancelFunc1()
|
||||
_, err := podInterface.Get(ctx1, podName, v1.GetOptions{})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
log.Debugf("Failed to get Pod %s: %v", podName, err)
|
||||
cancelFunc()
|
||||
}
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
func() {
|
||||
defer time.Sleep(time.Millisecond * 200)
|
||||
defer time.Sleep(time.Second * 5)
|
||||
|
||||
w, err := podInterface.Watch(ctx, v1.ListOptions{
|
||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
|
||||
})
|
||||
if err != nil {
|
||||
if !k8serrors.IsForbidden(err) && !errors.Is(err, context.Canceled) {
|
||||
log.Debugf("Failed to watch Pod %s: %v", podName, err)
|
||||
}
|
||||
log.Debugf("Failed to watch Pod %s: %v", podName, err)
|
||||
return
|
||||
}
|
||||
defer w.Stop()
|
||||
|
||||
_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
|
||||
if err != nil {
|
||||
if !k8serrors.IsForbidden(err) && !errors.Is(err, context.Canceled) {
|
||||
log.Debugf("Failed to get Pod %s: %v", podName, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
verifyAPIServerConnection()
|
||||
select {
|
||||
case e, ok := <-w.ResultChan():
|
||||
if !ok {
|
||||
_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
log.Debugf("Failed to get Pod %s: %v", podName, err)
|
||||
cancelFunc()
|
||||
}
|
||||
verifyAPIServerConnection()
|
||||
return
|
||||
}
|
||||
switch e.Type {
|
||||
@@ -368,11 +381,7 @@ func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName
|
||||
cancelFunc()
|
||||
return
|
||||
case watch.Error:
|
||||
_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
log.Debugf("Failed to get Pod %s: %v", podName, err)
|
||||
cancelFunc()
|
||||
}
|
||||
verifyAPIServerConnection()
|
||||
return
|
||||
case watch.Added, watch.Modified, watch.Bookmark:
|
||||
// do nothing
|
||||
@@ -397,25 +406,14 @@ func CheckPortStatus(ctx context.Context, cancelFunc context.CancelFunc, readyCh
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
err := retry.OnError(wait.Backoff{
|
||||
Steps: 6,
|
||||
Duration: time.Second,
|
||||
}, func(err error) bool {
|
||||
return err != nil
|
||||
}, func() error {
|
||||
var lc net.ListenConfig
|
||||
conn, err := lc.Listen(ctx, "tcp", net.JoinHostPort("127.0.0.1", localGvisorTCPPort))
|
||||
if err == nil {
|
||||
_ = conn.Close()
|
||||
return errors.New("port is free")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Debugf("Can not dial local port: %s: %v", localGvisorTCPPort, err)
|
||||
var lc net.ListenConfig
|
||||
conn, err := lc.Listen(ctx, "tcp", net.JoinHostPort("127.0.0.1", localGvisorTCPPort))
|
||||
if err == nil {
|
||||
_ = conn.Close()
|
||||
log.Debugf("Local port: %s is free", localGvisorTCPPort)
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
time.Sleep(time.Second * 1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user