hotfix: fix port forward and ssh (#357)

* hotfix(portfoward): fix port-forward bug and ssh reconnect bug

* hotfix: remove larger overlapping cidrs for adding routes

* feat: retry port-forward if get pod err is not forbidden

* hotfix: fix ssh and port-forward

* feat: add more log

* hotfix: set go default revolver perfergo options to true
This commit is contained in:
naison
2024-10-20 11:23:49 +08:00
committed by GitHub
parent 9238e9914a
commit a64eaf66da
10 changed files with 261 additions and 138 deletions

View File

@@ -50,6 +50,12 @@ func (o *SvrOption) Start(ctx context.Context) error {
LocalTime: true,
Compress: false,
}
// for gssapi to lookup KDCs in DNS
// c.LibDefaults.DNSLookupKDC = true
// c.LibDefaults.DNSLookupRealm = true
net.DefaultResolver.PreferGo = true
util.InitLoggerForServer(true)
log.SetOutput(l)
klog.SetOutput(l)

View File

@@ -42,16 +42,26 @@ type Config struct {
}
func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts ...Entry) error {
list, err := serviceInterface.List(ctx, v1.ListOptions{})
if err != nil {
return err
}
var serviceList []v12.Service
//listOptions := v1.ListOptions{Limit: 100}
//for {
// services, err := serviceInterface.List(ctx, listOptions)
// if err != nil {
// break
// }
// serviceList = append(serviceList, services.Items...)
// if services.Continue != "" {
// listOptions.Continue = services.Continue
// } else {
// break
// }
//}
c.Lock.Lock()
defer c.Lock.Unlock()
appendHosts := c.generateAppendHosts(list.Items, hosts)
err = c.appendHosts(appendHosts)
appendHosts := c.generateAppendHosts(serviceList, hosts)
err := c.appendHosts(appendHosts)
if err != nil {
log.Errorf("Failed to add hosts(%s): %v", entryList2String(appendHosts), err)
return err
@@ -64,6 +74,8 @@ func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13
func (c *Config) watchServiceToAddHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts []Entry) {
ticker := time.NewTicker(time.Second * 15)
defer ticker.Stop()
immediate := make(chan struct{}, 1)
immediate <- struct{}{}
for ctx.Err() == nil {
err := func() error {
@@ -122,19 +134,32 @@ func (c *Config) watchServiceToAddHosts(ctx context.Context, serviceInterface v1
if err != nil {
log.Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err)
}
case <-immediate:
var list *v12.ServiceList
list, err = serviceInterface.List(ctx, v1.ListOptions{})
if err != nil {
continue
}
c.Lock.Lock()
appendHosts := c.generateAppendHosts(list.Items, hosts)
err = c.appendHosts(appendHosts)
c.Lock.Unlock()
if err != nil {
log.Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err)
}
}
}
}()
if ctx.Err() != nil {
return
}
if err != nil {
if err != nil && !errors.Is(err, context.Canceled) {
log.Error(err)
}
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 1)
} else {
time.Sleep(time.Second * 2)
time.Sleep(time.Millisecond * 200)
}
}
}

View File

@@ -443,7 +443,6 @@ func (d *CloneOptions) SyncDir(ctx context.Context, labels string) error {
func() {
defer time.Sleep(time.Second * 2)
util.CheckPodStatus(d.ctx, func() {}, podName, d.targetClientset.CoreV1().Pods(d.TargetNamespace))
sortBy := func(pods []*v1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) }
_, _, _ = polymorphichelpers.GetFirstPod(d.targetClientset.CoreV1(), d.TargetNamespace, labels, time.Second*30, sortBy)
list, err := util.GetRunningPodList(d.ctx, d.targetClientset, d.TargetNamespace, labels)

View File

@@ -2,13 +2,13 @@ package handler
import (
"context"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net"
"net/url"
"os"
"os/exec"
"reflect"
"sort"
@@ -31,7 +31,6 @@ import (
pkgruntime "k8s.io/apimachinery/pkg/runtime"
pkgtypes "k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/resource"
@@ -236,7 +235,7 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error)
return
}
go c.deleteFirewallRule(c.ctx)
log.Debug("Configuring DNS service...")
log.Infof("Configuring DNS service...")
if err = c.setupDNS(c.ctx); err != nil {
log.Errorf("Configure DNS failed: %v", err)
return
@@ -250,18 +249,23 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
var readyChan = make(chan struct{}, 1)
var errChan = make(chan error, 1)
podInterface := c.clientset.CoreV1().Pods(c.Namespace)
var out = log.StandardLogger().WriterLevel(log.DebugLevel)
go func() {
defer out.Close()
var first = pointer.Bool(true)
for c.ctx.Err() == nil {
func() {
defer time.Sleep(time.Second * 2)
defer time.Sleep(time.Millisecond * 200)
sortBy := func(pods []*v1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) }
label := fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String()
_, _, _ = polymorphichelpers.GetFirstPod(c.clientset.CoreV1(), c.Namespace, label, time.Second*30, sortBy)
_, _, _ = polymorphichelpers.GetFirstPod(c.clientset.CoreV1(), c.Namespace, label, time.Second*5, sortBy)
podList, err := c.GetRunningPodList(ctx)
if err != nil {
time.Sleep(time.Second * 2)
log.Errorf("Failed to get running pod: %v", err)
if *first {
errChan <- err
}
return
}
childCtx, cancelFunc := context.WithCancel(ctx)
@@ -270,13 +274,6 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
readyChan = nil
}
podName := podList[0].GetName()
// if port-forward occurs error, check pod is deleted or not, speed up fail
utilruntime.ErrorHandlers = []func(error){func(err error) {
if !strings.Contains(err.Error(), "an error occurred forwarding") {
log.Debugf("Port-forward occurs error, err: %v, retrying", err)
cancelFunc()
}
}}
// try to detect pod is delete event, if pod is deleted, needs to redo port-forward
go util.CheckPodStatus(childCtx, cancelFunc, podName, podInterface)
err = util.PortForwardPod(
@@ -287,6 +284,8 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
portPair,
readyChan,
childCtx.Done(),
out,
out,
)
if *first {
errChan <- err
@@ -294,15 +293,14 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
first = pointer.Bool(false)
// exit normal, let context.err to judge to exit or not
if err == nil {
log.Errorf("Port forward retrying")
return
}
if strings.Contains(err.Error(), "unable to listen on any of the requested ports") ||
strings.Contains(err.Error(), "address already in use") {
log.Errorf("Port %s already in use, needs to release it manually", portPair)
time.Sleep(time.Second * 1)
} else {
log.Debugf("Port-forward occurs error, err: %v, retrying", err)
time.Sleep(time.Millisecond * 500)
log.Errorf("Port-forward occurs error: %v", err)
}
}()
}
@@ -311,7 +309,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
defer ticker.Stop()
select {
case <-ticker.C:
return errors.New("port forward timeout")
return errors.New("wait port forward to be ready timeout")
case err := <-errChan:
return err
case <-readyChan:
@@ -320,50 +318,85 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
}
func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string, lite bool) (err error) {
var list = sets.New[string]()
log.Debugf("IPv4: %s, IPv6: %s", c.localTunIPv4.IP.String(), c.localTunIPv6.IP.String())
var cidrList []*net.IPNet
if !lite {
list.Insert(config.CIDR.String())
cidrList = append(cidrList, config.CIDR)
}
for _, ipNet := range c.cidrs {
list.Insert(ipNet.String())
cidrList = append(cidrList, ipNet)
}
// add extra-cidr
for _, s := range c.ExtraRouteInfo.ExtraCIDR {
_, _, err = net.ParseCIDR(s)
var ipnet *net.IPNet
_, ipnet, err = net.ParseCIDR(s)
if err != nil {
return fmt.Errorf("invalid extra-cidr %s, err: %v", s, err)
}
list.Insert(s)
cidrList = append(cidrList, ipnet)
}
var routes []types.Route
for _, ipNet := range util.RemoveLargerOverlappingCIDRs(cidrList) {
routes = append(routes, types.Route{Dst: *ipNet})
}
tunConfig := tun.Config{
Addr: c.localTunIPv4.String(),
Routes: routes,
}
if enable, _ := util.IsIPv6Enabled(); enable {
if err = os.Setenv(config.EnvInboundPodTunIPv6, c.localTunIPv6.String()); err != nil {
return err
}
tunConfig.Addr6 = c.localTunIPv6.String()
}
r := core.Route{
ServeNodes: []string{
fmt.Sprintf("tun:/127.0.0.1:8422?net=%s&route=%s&%s=%s",
c.localTunIPv4.String(),
strings.Join(list.UnsortedList(), ","),
config.ConfigKubeVPNTransportEngine,
string(c.Engine),
),
},
ChainNode: forwardAddress,
Retries: 5,
}
log.Debugf("IPv4: %s, IPv6: %s", c.localTunIPv4.IP.String(), c.localTunIPv6.IP.String())
servers, err := Parse(r)
localNode := fmt.Sprintf("tun:/127.0.0.1:8422")
node, err := core.ParseNode(localNode)
if err != nil {
log.Errorf("Parse route error: %v", err)
log.Errorf("Failed to parse local node %s: %v", localNode, err)
return err
}
node.Values.Add(config.ConfigKubeVPNTransportEngine, string(c.Engine))
chainNode, err := core.ParseNode(forwardAddress)
if err != nil {
log.Errorf("Failed to parse forward node %s: %v", forwardAddress, err)
return err
}
chainNode.Client = &core.Client{
Connector: core.UDPOverTCPTunnelConnector(),
Transporter: core.TCPTransporter(),
}
chain := core.NewChain(5, chainNode)
handler := core.TunHandler(chain, node)
listener, err := tun.Listener(tunConfig)
if err != nil {
log.Errorf("Failed to create tun listener: %v", err)
return err
}
server := core.Server{
Listener: listener,
Handler: handler,
}
go func() {
err = Run(ctx, servers)
if err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("Failed to run local tun service: %v", err)
defer server.Listener.Close()
go func() {
<-ctx.Done()
server.Listener.Close()
}()
for ctx.Err() == nil {
conn, err := server.Listener.Accept()
if err != nil {
if !errors.Is(err, tun.ClosedErr) {
log.Errorf("Failed to accept local tun conn: %v", err)
}
return
}
go server.Handler.Handle(ctx, conn)
}
}()
log.Info("Connected tunnel")
@@ -421,9 +454,9 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
return err
}()
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 1)
} else {
time.Sleep(time.Second * 2)
time.Sleep(time.Millisecond * 200)
}
}
}()
@@ -443,9 +476,9 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
return err
}()
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) {
time.Sleep(time.Second * 5)
time.Sleep(time.Second * 1)
} else {
time.Sleep(time.Second * 2)
time.Sleep(time.Millisecond * 200)
}
}
}()
@@ -469,6 +502,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error {
log.Errorf("Get running pod list failed, err: %v", err)
return err
}
log.Debugf("Get DNS service IP from pod...")
relovConf, err := util.GetDNSServiceIPFromPod(ctx, c.clientset, c.config, pod[0].GetName(), c.Namespace)
if err != nil {
log.Errorln(err)
@@ -477,6 +511,9 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error {
if relovConf.Port == "" {
relovConf.Port = strconv.Itoa(port)
}
marshal, _ := json.Marshal(relovConf)
log.Debugf("Get DNS service config: %v", string(marshal))
svc, err := c.clientset.CoreV1().Services(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return err
@@ -521,9 +558,11 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error {
Hosts: c.extraHost,
Lock: c.Lock,
}
log.Debugf("Setup DNS...")
if err = c.dnsConfig.SetupDNS(ctx); err != nil {
return err
}
log.Debugf("Dump service in namespace %s into hosts...", c.Namespace)
// dump service in current namespace for support DNS resolve service:port
err = c.dnsConfig.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.Namespace), c.extraHost...)
return err
@@ -714,7 +753,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context, m *dhcp.Manager) (err erro
for _, s := range strings.Split(value, " ") {
_, cidr, _ := net.ParseCIDR(s)
if cidr != nil {
c.cidrs = util.Deduplicate(append(c.cidrs, cidr))
c.cidrs = util.RemoveLargerOverlappingCIDRs(append(c.cidrs, cidr))
}
}
if len(c.cidrs) != 0 {
@@ -734,7 +773,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context, m *dhcp.Manager) (err erro
for _, cidr := range cidrs {
s.Insert(cidr.String())
}
c.cidrs = util.Deduplicate(append(c.cidrs, cidrs...))
c.cidrs = util.RemoveLargerOverlappingCIDRs(append(c.cidrs, cidrs...))
_ = m.Set(ctx, config.KeyClusterIPv4POOLS, strings.Join(s.UnsortedList(), " "))
return
}

View File

@@ -30,6 +30,7 @@ import (
"k8s.io/client-go/util/homedir"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
@@ -134,24 +135,33 @@ func DialSshRemote(ctx context.Context, conf *SshConfig) (remote *ssh.Client, er
// ref: https://github.com/golang/go/issues/21478
if err == nil {
go func() {
defer remote.Close()
for ctx.Err() == nil {
time.Sleep(time.Second * 15)
_, _, err := remote.SendRequest("keepalive@golang.org", true, nil)
if err == nil || err.Error() == "request failed" {
// Any response is a success.
continue
}
if err != nil {
return
}
}
}()
//go func() {
// err2 := keepAlive(remote, conn, ctx.Done())
// if err2 != nil {
// log.Debugf("Failed to send keep-alive request: %v", err2)
// }
//}()
}
return remote, err
}
func keepAlive(cl *ssh.Client, conn net.Conn, done <-chan struct{}) error {
const keepAliveInterval = time.Second * 10
t := time.NewTicker(keepAliveInterval)
defer t.Stop()
for {
select {
case <-t.C:
_, _, err := cl.SendRequest("keepalive@golang.org", true, nil)
if err != nil && err != io.EOF {
return errors.Wrap(err, "failed to send keep alive")
}
case <-done:
return nil
}
}
}
func (config SshConfig) GetAuth() ([]ssh.AuthMethod, error) {
host, _, _ := net.SplitHostPort(config.Addr)
var auth []ssh.AuthMethod
@@ -384,9 +394,10 @@ func GetBastion(name string, defaultValue SshConfig) SshConfig {
}
func (config SshConfig) Dial(ctx context.Context) (client *ssh.Client, err error) {
if strings.Index(config.Addr, ":") < 0 {
if _, _, err = net.SplitHostPort(config.Addr); err != nil {
// use default ssh port 22
config.Addr = net.JoinHostPort(config.Addr, "22")
err = nil
}
// connect to the bastion host
authMethod, err := config.GetAuth()
@@ -418,9 +429,10 @@ func (config SshConfig) Dial(ctx context.Context) (client *ssh.Client, err error
}
func JumpTo(ctx context.Context, bClient *ssh.Client, to SshConfig) (client *ssh.Client, err error) {
if strings.Index(to.Addr, ":") < 0 {
if _, _, err = net.SplitHostPort(to.Addr); err != nil {
// use default ssh port 22
to.Addr = net.JoinHostPort(to.Addr, "22")
err = nil
}
var authMethod []ssh.AuthMethod
@@ -508,43 +520,40 @@ func init() {
func PortMapUntil(ctx context.Context, conf *SshConfig, remote, local netip.AddrPort) error {
// Listen on remote server port
var lc net.ListenConfig
localListen, err := lc.Listen(ctx, "tcp", local.String())
if err != nil {
return err
localListen, e := lc.Listen(ctx, "tcp", local.String())
if e != nil {
return e
}
log.Debugf("SSH listening on local %s forward to %s", local.String(), remote.String())
go func() {
defer localListen.Close()
go func() {
<-ctx.Done()
localListen.Close()
}()
for ctx.Err() == nil {
localConn, err := localListen.Accept()
if err != nil {
if !errors.Is(err, net.ErrClosed) {
log.Errorf("Failed to accept conn: %v", err)
}
return
localConn, err1 := localListen.Accept()
if err1 != nil {
log.Debugf("Failed to accept ssh conn: %v", err1)
continue
}
go func() {
defer localConn.Close()
cCtx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
sshClient, err := DialSshRemote(ctx, conf)
sshClient, err := DialSshRemote(cCtx, conf)
if err != nil {
marshal, _ := json.Marshal(conf)
log.Debugf("Failed to dial remote ssh server %v : %v", string(marshal), err)
log.Debugf("Failed to dial remote ssh server %v: %v", string(marshal), err)
return
}
defer sshClient.Close()
remoteConn, err := sshClient.DialContext(ctx, "tcp", remote.String())
remoteConn, err := sshClient.DialContext(cCtx, "tcp", remote.String())
if err != nil {
log.Debugf("Failed to dial %s: %s", remote.String(), err)
return
}
defer remoteConn.Close()
copyStream(ctx, localConn, remoteConn)
copyStream(cCtx, localConn, remoteConn)
}()
}
}()
@@ -706,6 +715,9 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b
if print {
log.Infof("Waiting jump to bastion host...")
log.Debugf("Root daemon jumping to ssh host for kubeconfig %s ...", ptr.Deref(configFlags.KubeConfig, ""))
} else {
log.Debugf("User daemon jumping to ssh host for kubeconfig %s ...", ptr.Deref(configFlags.KubeConfig, ""))
}
err = PortMapUntil(ctx, conf, remote, local)
if err != nil {
@@ -747,6 +759,9 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b
if print {
msg := fmt.Sprintf("To use: export KUBECONFIG=%s", temp.Name())
PrintLine(log.Info, msg)
log.Debugf("Root daemon jump ssh bastion host with kubeconfig: %s", temp.Name())
} else {
log.Debugf("User daemon jump ssh bastion host with kubeconfig: %s", temp.Name())
}
path = temp.Name()
return

View File

@@ -13,6 +13,8 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)
var ClosedErr = errors.New("accept on closed listener")
// Config is the config for TUN device.
type Config struct {
Name string
@@ -56,9 +58,8 @@ func (l *tunListener) Accept() (net.Conn, error) {
case conn := <-l.conns:
return conn, nil
case <-l.closed:
return nil, ClosedErr
}
return nil, errors.New("accept on closed listener")
}
func (l *tunListener) Addr() net.Addr {

View File

@@ -68,7 +68,7 @@ func GetCIDRElegant(ctx context.Context, clientset *kubernetes.Clientset, restco
result = append(result, pod...)
}
result = Deduplicate(result)
result = RemoveLargerOverlappingCIDRs(result)
if len(result) == 0 {
err = fmt.Errorf("failed to get any network CIDR, please verify that you have the necessary permissions")
return nil, err
@@ -201,7 +201,7 @@ func GetCIDRByDumpClusterInfo(ctx context.Context, clientset *kubernetes.Clients
for _, s := range list {
result = append(result, ParseCIDRFromString(s)...)
}
return Deduplicate(result), nil
return RemoveLargerOverlappingCIDRs(result), nil
}
// GetCIDRFromCNI kube-controller-manager--allocate-node-cidrs=true--authentication-kubeconfig=/etc/kubernetes/controller-manager.conf--authorization-kubeconfig=/etc/kubernetes/controller-manager.conf--bind-address=0.0.0.0--client-ca-file=/etc/kubernetes/ssl/ca.crt--cluster-cidr=10.233.64.0/18--cluster-name=cluster.local--cluster-signing-cert-file=/etc/kubernetes/ssl/ca.crt--cluster-signing-key-file=/etc/kubernetes/ssl/ca.key--configure-cloud-routes=false--controllers=*,bootstrapsigner,tokencleaner--kubeconfig=/etc/kubernetes/controller-manager.conf--leader-elect=true--leader-elect-lease-duration=15s--leader-elect-renew-deadline=10s--node-cidr-mask-size=24--node-monitor-grace-period=40s--node-monitor-period=5s--port=0--profiling=False--requestheader-client-ca-file=/etc/kubernetes/ssl/front-proxy-ca.crt--root-ca-file=/etc/kubernetes/ssl/ca.crt--service-account-private-key-file=/etc/kubernetes/ssl/sa.key--service-cluster-ip-range=10.233.0.0/18--terminated-pod-gc-threshold=12500--use-service-account-credentials=true
@@ -221,7 +221,7 @@ func GetCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restco
var result []*net.IPNet
for _, s := range strings.Split(content, "\n") {
result = Deduplicate(append(result, ParseCIDRFromString(s)...))
result = RemoveLargerOverlappingCIDRs(append(result, ParseCIDRFromString(s)...))
}
return result, nil

View File

@@ -3,8 +3,10 @@ package util
import (
"fmt"
"path/filepath"
"runtime"
log "github.com/sirupsen/logrus"
"k8s.io/utils/ptr"
)
func InitLoggerForClient(debug bool) {
@@ -45,11 +47,12 @@ type serverFormat struct {
// same like log.SetFlags(log.LstdFlags | log.Lshortfile)
// 2009/01/23 01:23:23 d.go:23: message
func (*serverFormat) Format(e *log.Entry) ([]byte, error) {
// e.Caller maybe is nil, because pkg/handler/connect.go:252
return []byte(
fmt.Sprintf("%s %s:%d %s: %s\n",
e.Time.Format("2006-01-02 15:04:05"),
filepath.Base(e.Caller.File),
e.Caller.Line,
filepath.Base(ptr.Deref(e.Caller, runtime.Frame{}).File),
ptr.Deref(e.Caller, runtime.Frame{}).Line,
e.Level.String(),
e.Message,
)), nil

View File

@@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
@@ -133,7 +134,7 @@ func WaitPod(ctx context.Context, podInterface v12.PodInterface, list v1.ListOpt
}
}
func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, namespace string, portPair []string, readyChan chan struct{}, stopChan <-chan struct{}) error {
func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, namespace string, portPair []string, readyChan chan struct{}, stopChan <-chan struct{}, out, errOut io.Writer) error {
err := os.Setenv(string(util.RemoteCommandWebsockets), "true")
if err != nil {
return err
@@ -161,7 +162,7 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
}
dialer = portforward.NewFallbackDialer(websocketDialer, dialer, httpstream.IsUpgradeFailure)
}
forwarder, err := portforward.New(dialer, portPair, stopChan, readyChan, nil, os.Stderr)
forwarder, err := portforward.New(dialer, portPair, stopChan, readyChan, out, errOut)
if err != nil {
log.Errorf("Create port forward error: %s", err.Error())
return err
@@ -325,39 +326,58 @@ func FindContainerByName(pod *corev1.Pod, name string) (*corev1.Container, int)
return nil, -1
}
func CheckPodStatus(cCtx context.Context, cFunc context.CancelFunc, podName string, podInterface v12.PodInterface) {
w, err := podInterface.Watch(cCtx, v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
})
if err != nil {
return
}
defer w.Stop()
func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName string, podInterface v12.PodInterface) {
for ctx.Err() == nil {
func() {
defer time.Sleep(time.Millisecond * 200)
_, err = podInterface.Get(cCtx, podName, v1.GetOptions{})
if err != nil {
return
}
for {
select {
case e, ok := <-w.ResultChan():
if !ok {
w, err := podInterface.Watch(ctx, v1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(),
})
if err != nil {
if !k8serrors.IsForbidden(err) && !errors.Is(err, context.Canceled) {
log.Errorf("Failed to watch Pod %s: %v", podName, err)
cancelFunc()
}
return
}
switch e.Type {
case watch.Deleted:
cFunc()
return
case watch.Error:
return
case watch.Added, watch.Modified, watch.Bookmark:
// do nothing
default:
defer w.Stop()
_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
if err != nil {
if !k8serrors.IsForbidden(err) && !errors.Is(err, context.Canceled) {
log.Errorf("Failed to get Pod %s: %v", podName, err)
cancelFunc()
}
return
}
case <-cCtx.Done():
return
}
select {
case e, ok := <-w.ResultChan():
if !ok {
_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
if err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("Failed to get Pod %s: %v", podName, err)
cancelFunc()
}
return
}
switch e.Type {
case watch.Deleted:
log.Errorf("Pod %s is deleted", podName)
cancelFunc()
return
case watch.Error:
_, err = podInterface.Get(ctx, podName, v1.GetOptions{})
if err != nil && !errors.Is(err, context.Canceled) {
log.Errorf("Failed to get Pod %s: %v", podName, err)
cancelFunc()
}
return
case watch.Added, watch.Modified, watch.Bookmark:
// do nothing
}
}
}()
}
}

View File

@@ -12,6 +12,7 @@ import (
osexec "os/exec"
"path/filepath"
"runtime"
"sort"
"strings"
"syscall"
"time"
@@ -219,18 +220,32 @@ func GetTlsDomain(ns string) string {
return config.ConfigMapPodTrafficManager + "." + ns + "." + "svc"
}
func Deduplicate(cidr []*net.IPNet) (result []*net.IPNet) {
var set = sets.New[string]()
for _, ipNet := range cidr {
if ipNet == nil {
func RemoveLargerOverlappingCIDRs(cidrNets []*net.IPNet) []*net.IPNet {
sort.Slice(cidrNets, func(i, j int) bool {
onesI, _ := cidrNets[i].Mask.Size()
onesJ, _ := cidrNets[j].Mask.Size()
return onesI > onesJ
})
var cidrsOverlap = func(cidr1, cidr2 *net.IPNet) bool {
return cidr1.Contains(cidr2.IP) || cidr2.Contains(cidr1.IP)
}
var result []*net.IPNet
skipped := make(map[int]bool)
for i := range cidrNets {
if skipped[i] {
continue
}
if !set.Has(ipNet.String()) {
result = append(result, ipNet)
for j := i + 1; j < len(cidrNets); j++ {
if cidrsOverlap(cidrNets[i], cidrNets[j]) {
skipped[j] = true
}
}
set.Insert(ipNet.String())
result = append(result, cidrNets[i])
}
return
return result
}
func CleanExtensionLib() {