feat: optimize check pod status logic for redo port-forward

This commit is contained in:
wencaiwulue
2022-12-27 11:52:25 +08:00
parent 6816c02933
commit f826357bae

View File

@@ -7,7 +7,6 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/pkg/errors"
@@ -22,10 +21,12 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/scheme"
"k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/core"
@@ -140,35 +141,33 @@ func (c *ConnectOptions) DoConnect() (err error) {
// detect pod is delete event, if pod is deleted, needs to redo port-forward immediately
func (c *ConnectOptions) portForward(ctx context.Context, port string) error {
var childCtx context.Context
var cancelFunc context.CancelFunc
var curPodName = &atomic.Value{}
var readyChan = make(chan struct{}, 1)
var errChan = make(chan error, 1)
var first = true
podInterface := c.clientset.CoreV1().Pods(c.Namespace)
go func() {
var first = pointer.Bool(true)
for ctx.Err() == nil {
func() {
podList, err := c.GetRunningPodList()
if err != nil {
time.Sleep(time.Second * 1)
time.Sleep(time.Second * 3)
return
}
childCtx, cancelFunc = context.WithCancel(ctx)
childCtx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
if !first {
if !*first {
readyChan = nil
}
podName := podList[0].GetName()
// if port-forward occurs error, check pod is deleted or not, speed up fail
runtime.ErrorHandlers = []func(error){func(err error) {
pod, err := podInterface.Get(childCtx, podName, metav1.GetOptions{})
if apierrors.IsNotFound(err) || pod.GetDeletionTimestamp() != nil {
if apierrors.IsNotFound(err) || (pod != nil && pod.GetDeletionTimestamp() != nil) {
cancelFunc()
}
}}
curPodName.Store(podName)
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
go checkPodStatus(childCtx, cancelFunc, podName, podInterface)
err = util.PortForwardPod(
c.config,
c.restclient,
@@ -178,10 +177,10 @@ func (c *ConnectOptions) portForward(ctx context.Context, port string) error {
readyChan,
childCtx.Done(),
)
if first {
if *first {
errChan <- err
}
first = false
first = pointer.Bool(false)
// exit normal, let context.err to judge to exit or not
if err == nil {
return
@@ -197,39 +196,6 @@ func (c *ConnectOptions) portForward(ctx context.Context, port string) error {
}()
}
}()
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
go func() {
for ctx.Err() == nil {
func() {
podName := curPodName.Load()
if podName == nil || childCtx == nil || cancelFunc == nil {
time.Sleep(2 * time.Second)
return
}
stream, err := podInterface.Watch(childCtx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName.(string)).String(),
})
if apierrors.IsForbidden(err) {
time.Sleep(30 * time.Second)
return
}
if err != nil {
return
}
defer stream.Stop()
for childCtx.Err() == nil {
select {
case e := <-stream.ResultChan():
if e.Type == watch.Deleted {
cancelFunc()
return
}
}
}
}()
}
}()
select {
case <-time.Tick(time.Second * 60):
return errors.New("port forward timeout")
@@ -241,6 +207,34 @@ func (c *ConnectOptions) portForward(ctx context.Context, port string) error {
}
}
func checkPodStatus(cCtx context.Context, cFunc context.CancelFunc, podName string, podInterface v12.PodInterface) {
w, err := podInterface.Watch(cCtx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
})
if err != nil {
return
}
defer w.Stop()
for {
select {
case e := <-w.ResultChan():
switch e.Type {
case watch.Deleted:
cFunc()
return
case watch.Error:
return
case watch.Added, watch.Modified, watch.Bookmark:
// do nothing
default:
return
}
case <-cCtx.Done():
return
}
}
}
func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string) (err error) {
// todo figure it out why
if util.IsWindows() {