diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 21a1192a..2d957af0 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -50,6 +50,12 @@ func (o *SvrOption) Start(ctx context.Context) error { LocalTime: true, Compress: false, } + + // for gssapi to lookup KDCs in DNS + // c.LibDefaults.DNSLookupKDC = true + // c.LibDefaults.DNSLookupRealm = true + net.DefaultResolver.PreferGo = true + util.InitLoggerForServer(true) log.SetOutput(l) klog.SetOutput(l) diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index f26aa7ff..f0d1f2c1 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -42,16 +42,26 @@ type Config struct { } func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts ...Entry) error { - list, err := serviceInterface.List(ctx, v1.ListOptions{}) - if err != nil { - return err - } + var serviceList []v12.Service + //listOptions := v1.ListOptions{Limit: 100} + //for { + // services, err := serviceInterface.List(ctx, listOptions) + // if err != nil { + // break + // } + // serviceList = append(serviceList, services.Items...) + // if services.Continue != "" { + // listOptions.Continue = services.Continue + // } else { + // break + // } + //} c.Lock.Lock() defer c.Lock.Unlock() - appendHosts := c.generateAppendHosts(list.Items, hosts) - err = c.appendHosts(appendHosts) + appendHosts := c.generateAppendHosts(serviceList, hosts) + err := c.appendHosts(appendHosts) if err != nil { log.Errorf("Failed to add hosts(%s): %v", entryList2String(appendHosts), err) return err @@ -64,6 +74,8 @@ func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13 func (c *Config) watchServiceToAddHosts(ctx context.Context, serviceInterface v13.ServiceInterface, hosts []Entry) { ticker := time.NewTicker(time.Second * 15) defer ticker.Stop() + immediate := make(chan struct{}, 1) + immediate <- struct{}{} for ctx.Err() == nil { err := func() error { @@ -122,19 +134,32 @@ func (c *Config) watchServiceToAddHosts(ctx context.Context, serviceInterface v1 if err != nil { log.Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err) } + case <-immediate: + var list *v12.ServiceList + list, err = serviceInterface.List(ctx, v1.ListOptions{}) + if err != nil { + continue + } + c.Lock.Lock() + appendHosts := c.generateAppendHosts(list.Items, hosts) + err = c.appendHosts(appendHosts) + c.Lock.Unlock() + if err != nil { + log.Errorf("Failed to add hosts(%s) to hosts: %v", entryList2String(appendHosts), err) + } } } }() if ctx.Err() != nil { return } - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { log.Error(err) } if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) { - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 1) } else { - time.Sleep(time.Second * 2) + time.Sleep(time.Millisecond * 200) } } } diff --git a/pkg/handler/clone.go b/pkg/handler/clone.go index dadd3e5f..0cd2fada 100644 --- a/pkg/handler/clone.go +++ b/pkg/handler/clone.go @@ -443,7 +443,6 @@ func (d *CloneOptions) SyncDir(ctx context.Context, labels string) error { func() { defer time.Sleep(time.Second * 2) - util.CheckPodStatus(d.ctx, func() {}, podName, d.targetClientset.CoreV1().Pods(d.TargetNamespace)) sortBy := func(pods []*v1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) } _, _, _ = polymorphichelpers.GetFirstPod(d.targetClientset.CoreV1(), d.TargetNamespace, labels, time.Second*30, sortBy) list, err := util.GetRunningPodList(d.ctx, d.targetClientset, d.TargetNamespace, labels) diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 965c838b..b420ec53 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -2,13 +2,13 @@ package handler import ( "context" + "encoding/json" "fmt" "io" "math" "math/rand" "net" "net/url" - "os" "os/exec" "reflect" "sort" @@ -31,7 +31,6 @@ import ( pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgtypes "k8s.io/apimachinery/pkg/types" utilnet "k8s.io/apimachinery/pkg/util/net" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/resource" @@ -236,7 +235,7 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error) return } go c.deleteFirewallRule(c.ctx) - log.Debug("Configuring DNS service...") + log.Infof("Configuring DNS service...") if err = c.setupDNS(c.ctx); err != nil { log.Errorf("Configure DNS failed: %v", err) return @@ -250,18 +249,23 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err var readyChan = make(chan struct{}, 1) var errChan = make(chan error, 1) podInterface := c.clientset.CoreV1().Pods(c.Namespace) + var out = log.StandardLogger().WriterLevel(log.DebugLevel) go func() { + defer out.Close() var first = pointer.Bool(true) for c.ctx.Err() == nil { func() { - defer time.Sleep(time.Second * 2) + defer time.Sleep(time.Millisecond * 200) sortBy := func(pods []*v1.Pod) sort.Interface { return sort.Reverse(podutils.ActivePods(pods)) } label := fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String() - _, _, _ = polymorphichelpers.GetFirstPod(c.clientset.CoreV1(), c.Namespace, label, time.Second*30, sortBy) + _, _, _ = polymorphichelpers.GetFirstPod(c.clientset.CoreV1(), c.Namespace, label, time.Second*5, sortBy) podList, err := c.GetRunningPodList(ctx) if err != nil { - time.Sleep(time.Second * 2) + log.Errorf("Failed to get running pod: %v", err) + if *first { + errChan <- err + } return } childCtx, cancelFunc := context.WithCancel(ctx) @@ -270,13 +274,6 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err readyChan = nil } podName := podList[0].GetName() - // if port-forward occurs error, check pod is deleted or not, speed up fail - utilruntime.ErrorHandlers = []func(error){func(err error) { - if !strings.Contains(err.Error(), "an error occurred forwarding") { - log.Debugf("Port-forward occurs error, err: %v, retrying", err) - cancelFunc() - } - }} // try to detect pod is delete event, if pod is deleted, needs to redo port-forward go util.CheckPodStatus(childCtx, cancelFunc, podName, podInterface) err = util.PortForwardPod( @@ -287,6 +284,8 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err portPair, readyChan, childCtx.Done(), + out, + out, ) if *first { errChan <- err @@ -294,15 +293,14 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err first = pointer.Bool(false) // exit normal, let context.err to judge to exit or not if err == nil { + log.Errorf("Port forward retrying") return } if strings.Contains(err.Error(), "unable to listen on any of the requested ports") || strings.Contains(err.Error(), "address already in use") { log.Errorf("Port %s already in use, needs to release it manually", portPair) - time.Sleep(time.Second * 1) } else { - log.Debugf("Port-forward occurs error, err: %v, retrying", err) - time.Sleep(time.Millisecond * 500) + log.Errorf("Port-forward occurs error: %v", err) } }() } @@ -311,7 +309,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err defer ticker.Stop() select { case <-ticker.C: - return errors.New("port forward timeout") + return errors.New("wait port forward to be ready timeout") case err := <-errChan: return err case <-readyChan: @@ -320,50 +318,85 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err } func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string, lite bool) (err error) { - var list = sets.New[string]() + log.Debugf("IPv4: %s, IPv6: %s", c.localTunIPv4.IP.String(), c.localTunIPv6.IP.String()) + + var cidrList []*net.IPNet if !lite { - list.Insert(config.CIDR.String()) + cidrList = append(cidrList, config.CIDR) } for _, ipNet := range c.cidrs { - list.Insert(ipNet.String()) + cidrList = append(cidrList, ipNet) } // add extra-cidr for _, s := range c.ExtraRouteInfo.ExtraCIDR { - _, _, err = net.ParseCIDR(s) + var ipnet *net.IPNet + _, ipnet, err = net.ParseCIDR(s) if err != nil { return fmt.Errorf("invalid extra-cidr %s, err: %v", s, err) } - list.Insert(s) + cidrList = append(cidrList, ipnet) + } + + var routes []types.Route + for _, ipNet := range util.RemoveLargerOverlappingCIDRs(cidrList) { + routes = append(routes, types.Route{Dst: *ipNet}) + } + + tunConfig := tun.Config{ + Addr: c.localTunIPv4.String(), + Routes: routes, } if enable, _ := util.IsIPv6Enabled(); enable { - if err = os.Setenv(config.EnvInboundPodTunIPv6, c.localTunIPv6.String()); err != nil { - return err - } + tunConfig.Addr6 = c.localTunIPv6.String() } - r := core.Route{ - ServeNodes: []string{ - fmt.Sprintf("tun:/127.0.0.1:8422?net=%s&route=%s&%s=%s", - c.localTunIPv4.String(), - strings.Join(list.UnsortedList(), ","), - config.ConfigKubeVPNTransportEngine, - string(c.Engine), - ), - }, - ChainNode: forwardAddress, - Retries: 5, - } - - log.Debugf("IPv4: %s, IPv6: %s", c.localTunIPv4.IP.String(), c.localTunIPv6.IP.String()) - servers, err := Parse(r) + localNode := fmt.Sprintf("tun:/127.0.0.1:8422") + node, err := core.ParseNode(localNode) if err != nil { - log.Errorf("Parse route error: %v", err) + log.Errorf("Failed to parse local node %s: %v", localNode, err) return err } + node.Values.Add(config.ConfigKubeVPNTransportEngine, string(c.Engine)) + + chainNode, err := core.ParseNode(forwardAddress) + if err != nil { + log.Errorf("Failed to parse forward node %s: %v", forwardAddress, err) + return err + } + chainNode.Client = &core.Client{ + Connector: core.UDPOverTCPTunnelConnector(), + Transporter: core.TCPTransporter(), + } + chain := core.NewChain(5, chainNode) + + handler := core.TunHandler(chain, node) + listener, err := tun.Listener(tunConfig) + if err != nil { + log.Errorf("Failed to create tun listener: %v", err) + return err + } + + server := core.Server{ + Listener: listener, + Handler: handler, + } + go func() { - err = Run(ctx, servers) - if err != nil && !errors.Is(err, context.Canceled) { - log.Errorf("Failed to run local tun service: %v", err) + defer server.Listener.Close() + go func() { + <-ctx.Done() + server.Listener.Close() + }() + + for ctx.Err() == nil { + conn, err := server.Listener.Accept() + if err != nil { + if !errors.Is(err, tun.ClosedErr) { + log.Errorf("Failed to accept local tun conn: %v", err) + } + return + } + go server.Handler.Handle(ctx, conn) } }() log.Info("Connected tunnel") @@ -421,9 +454,9 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error { return err }() if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) { - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 1) } else { - time.Sleep(time.Second * 2) + time.Sleep(time.Millisecond * 200) } } }() @@ -443,9 +476,9 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error { return err }() if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) || apierrors.IsForbidden(err) { - time.Sleep(time.Second * 5) + time.Sleep(time.Second * 1) } else { - time.Sleep(time.Second * 2) + time.Sleep(time.Millisecond * 200) } } }() @@ -469,6 +502,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error { log.Errorf("Get running pod list failed, err: %v", err) return err } + log.Debugf("Get DNS service IP from pod...") relovConf, err := util.GetDNSServiceIPFromPod(ctx, c.clientset, c.config, pod[0].GetName(), c.Namespace) if err != nil { log.Errorln(err) @@ -477,6 +511,9 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error { if relovConf.Port == "" { relovConf.Port = strconv.Itoa(port) } + + marshal, _ := json.Marshal(relovConf) + log.Debugf("Get DNS service config: %v", string(marshal)) svc, err := c.clientset.CoreV1().Services(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) if err != nil { return err @@ -521,9 +558,11 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error { Hosts: c.extraHost, Lock: c.Lock, } + log.Debugf("Setup DNS...") if err = c.dnsConfig.SetupDNS(ctx); err != nil { return err } + log.Debugf("Dump service in namespace %s into hosts...", c.Namespace) // dump service in current namespace for support DNS resolve service:port err = c.dnsConfig.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.Namespace), c.extraHost...) return err @@ -714,7 +753,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context, m *dhcp.Manager) (err erro for _, s := range strings.Split(value, " ") { _, cidr, _ := net.ParseCIDR(s) if cidr != nil { - c.cidrs = util.Deduplicate(append(c.cidrs, cidr)) + c.cidrs = util.RemoveLargerOverlappingCIDRs(append(c.cidrs, cidr)) } } if len(c.cidrs) != 0 { @@ -734,7 +773,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context, m *dhcp.Manager) (err erro for _, cidr := range cidrs { s.Insert(cidr.String()) } - c.cidrs = util.Deduplicate(append(c.cidrs, cidrs...)) + c.cidrs = util.RemoveLargerOverlappingCIDRs(append(c.cidrs, cidrs...)) _ = m.Set(ctx, config.KeyClusterIPv4POOLS, strings.Join(s.UnsortedList(), " ")) return } diff --git a/pkg/ssh/ssh.go b/pkg/ssh/ssh.go index d132fe6e..4a0c4088 100644 --- a/pkg/ssh/ssh.go +++ b/pkg/ssh/ssh.go @@ -30,6 +30,7 @@ import ( "k8s.io/client-go/util/homedir" "k8s.io/kubectl/pkg/cmd/util" "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" @@ -134,24 +135,33 @@ func DialSshRemote(ctx context.Context, conf *SshConfig) (remote *ssh.Client, er // ref: https://github.com/golang/go/issues/21478 if err == nil { - go func() { - defer remote.Close() - for ctx.Err() == nil { - time.Sleep(time.Second * 15) - _, _, err := remote.SendRequest("keepalive@golang.org", true, nil) - if err == nil || err.Error() == "request failed" { - // Any response is a success. - continue - } - if err != nil { - return - } - } - }() + //go func() { + // err2 := keepAlive(remote, conn, ctx.Done()) + // if err2 != nil { + // log.Debugf("Failed to send keep-alive request: %v", err2) + // } + //}() } return remote, err } +func keepAlive(cl *ssh.Client, conn net.Conn, done <-chan struct{}) error { + const keepAliveInterval = time.Second * 10 + t := time.NewTicker(keepAliveInterval) + defer t.Stop() + for { + select { + case <-t.C: + _, _, err := cl.SendRequest("keepalive@golang.org", true, nil) + if err != nil && err != io.EOF { + return errors.Wrap(err, "failed to send keep alive") + } + case <-done: + return nil + } + } +} + func (config SshConfig) GetAuth() ([]ssh.AuthMethod, error) { host, _, _ := net.SplitHostPort(config.Addr) var auth []ssh.AuthMethod @@ -384,9 +394,10 @@ func GetBastion(name string, defaultValue SshConfig) SshConfig { } func (config SshConfig) Dial(ctx context.Context) (client *ssh.Client, err error) { - if strings.Index(config.Addr, ":") < 0 { + if _, _, err = net.SplitHostPort(config.Addr); err != nil { // use default ssh port 22 config.Addr = net.JoinHostPort(config.Addr, "22") + err = nil } // connect to the bastion host authMethod, err := config.GetAuth() @@ -418,9 +429,10 @@ func (config SshConfig) Dial(ctx context.Context) (client *ssh.Client, err error } func JumpTo(ctx context.Context, bClient *ssh.Client, to SshConfig) (client *ssh.Client, err error) { - if strings.Index(to.Addr, ":") < 0 { + if _, _, err = net.SplitHostPort(to.Addr); err != nil { // use default ssh port 22 to.Addr = net.JoinHostPort(to.Addr, "22") + err = nil } var authMethod []ssh.AuthMethod @@ -508,43 +520,40 @@ func init() { func PortMapUntil(ctx context.Context, conf *SshConfig, remote, local netip.AddrPort) error { // Listen on remote server port var lc net.ListenConfig - localListen, err := lc.Listen(ctx, "tcp", local.String()) - if err != nil { - return err + localListen, e := lc.Listen(ctx, "tcp", local.String()) + if e != nil { + return e } + log.Debugf("SSH listening on local %s forward to %s", local.String(), remote.String()) go func() { defer localListen.Close() - go func() { - <-ctx.Done() - localListen.Close() - }() for ctx.Err() == nil { - localConn, err := localListen.Accept() - if err != nil { - if !errors.Is(err, net.ErrClosed) { - log.Errorf("Failed to accept conn: %v", err) - } - return + localConn, err1 := localListen.Accept() + if err1 != nil { + log.Debugf("Failed to accept ssh conn: %v", err1) + continue } go func() { defer localConn.Close() + cCtx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() - sshClient, err := DialSshRemote(ctx, conf) + sshClient, err := DialSshRemote(cCtx, conf) if err != nil { marshal, _ := json.Marshal(conf) - log.Debugf("Failed to dial remote ssh server %v : %v", string(marshal), err) + log.Debugf("Failed to dial remote ssh server %v: %v", string(marshal), err) return } defer sshClient.Close() - remoteConn, err := sshClient.DialContext(ctx, "tcp", remote.String()) + remoteConn, err := sshClient.DialContext(cCtx, "tcp", remote.String()) if err != nil { log.Debugf("Failed to dial %s: %s", remote.String(), err) return } defer remoteConn.Close() - copyStream(ctx, localConn, remoteConn) + copyStream(cCtx, localConn, remoteConn) }() } }() @@ -706,6 +715,9 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b if print { log.Infof("Waiting jump to bastion host...") + log.Debugf("Root daemon jumping to ssh host for kubeconfig %s ...", ptr.Deref(configFlags.KubeConfig, "")) + } else { + log.Debugf("User daemon jumping to ssh host for kubeconfig %s ...", ptr.Deref(configFlags.KubeConfig, "")) } err = PortMapUntil(ctx, conf, remote, local) if err != nil { @@ -747,6 +759,9 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b if print { msg := fmt.Sprintf("To use: export KUBECONFIG=%s", temp.Name()) PrintLine(log.Info, msg) + log.Debugf("Root daemon jump ssh bastion host with kubeconfig: %s", temp.Name()) + } else { + log.Debugf("User daemon jump ssh bastion host with kubeconfig: %s", temp.Name()) } path = temp.Name() return diff --git a/pkg/tun/tun.go b/pkg/tun/tun.go index b81a8da0..9a6788b1 100644 --- a/pkg/tun/tun.go +++ b/pkg/tun/tun.go @@ -13,6 +13,8 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) +var ClosedErr = errors.New("accept on closed listener") + // Config is the config for TUN device. type Config struct { Name string @@ -56,9 +58,8 @@ func (l *tunListener) Accept() (net.Conn, error) { case conn := <-l.conns: return conn, nil case <-l.closed: + return nil, ClosedErr } - - return nil, errors.New("accept on closed listener") } func (l *tunListener) Addr() net.Addr { diff --git a/pkg/util/cidr.go b/pkg/util/cidr.go index da7312ea..c10f8f84 100644 --- a/pkg/util/cidr.go +++ b/pkg/util/cidr.go @@ -68,7 +68,7 @@ func GetCIDRElegant(ctx context.Context, clientset *kubernetes.Clientset, restco result = append(result, pod...) } - result = Deduplicate(result) + result = RemoveLargerOverlappingCIDRs(result) if len(result) == 0 { err = fmt.Errorf("failed to get any network CIDR, please verify that you have the necessary permissions") return nil, err @@ -201,7 +201,7 @@ func GetCIDRByDumpClusterInfo(ctx context.Context, clientset *kubernetes.Clients for _, s := range list { result = append(result, ParseCIDRFromString(s)...) } - return Deduplicate(result), nil + return RemoveLargerOverlappingCIDRs(result), nil } // GetCIDRFromCNI kube-controller-manager--allocate-node-cidrs=true--authentication-kubeconfig=/etc/kubernetes/controller-manager.conf--authorization-kubeconfig=/etc/kubernetes/controller-manager.conf--bind-address=0.0.0.0--client-ca-file=/etc/kubernetes/ssl/ca.crt--cluster-cidr=10.233.64.0/18--cluster-name=cluster.local--cluster-signing-cert-file=/etc/kubernetes/ssl/ca.crt--cluster-signing-key-file=/etc/kubernetes/ssl/ca.key--configure-cloud-routes=false--controllers=*,bootstrapsigner,tokencleaner--kubeconfig=/etc/kubernetes/controller-manager.conf--leader-elect=true--leader-elect-lease-duration=15s--leader-elect-renew-deadline=10s--node-cidr-mask-size=24--node-monitor-grace-period=40s--node-monitor-period=5s--port=0--profiling=False--requestheader-client-ca-file=/etc/kubernetes/ssl/front-proxy-ca.crt--root-ca-file=/etc/kubernetes/ssl/ca.crt--service-account-private-key-file=/etc/kubernetes/ssl/sa.key--service-cluster-ip-range=10.233.0.0/18--terminated-pod-gc-threshold=12500--use-service-account-credentials=true @@ -221,7 +221,7 @@ func GetCIDRFromCNI(ctx context.Context, clientset *kubernetes.Clientset, restco var result []*net.IPNet for _, s := range strings.Split(content, "\n") { - result = Deduplicate(append(result, ParseCIDRFromString(s)...)) + result = RemoveLargerOverlappingCIDRs(append(result, ParseCIDRFromString(s)...)) } return result, nil diff --git a/pkg/util/logger.go b/pkg/util/logger.go index 9956b272..398c4bc3 100644 --- a/pkg/util/logger.go +++ b/pkg/util/logger.go @@ -3,8 +3,10 @@ package util import ( "fmt" "path/filepath" + "runtime" log "github.com/sirupsen/logrus" + "k8s.io/utils/ptr" ) func InitLoggerForClient(debug bool) { @@ -45,11 +47,12 @@ type serverFormat struct { // same like log.SetFlags(log.LstdFlags | log.Lshortfile) // 2009/01/23 01:23:23 d.go:23: message func (*serverFormat) Format(e *log.Entry) ([]byte, error) { + // e.Caller maybe is nil, because pkg/handler/connect.go:252 return []byte( fmt.Sprintf("%s %s:%d %s: %s\n", e.Time.Format("2006-01-02 15:04:05"), - filepath.Base(e.Caller.File), - e.Caller.Line, + filepath.Base(ptr.Deref(e.Caller, runtime.Frame{}).File), + ptr.Deref(e.Caller, runtime.Frame{}).Line, e.Level.String(), e.Message, )), nil diff --git a/pkg/util/pod.go b/pkg/util/pod.go index bccfd7a7..058b6f2c 100644 --- a/pkg/util/pod.go +++ b/pkg/util/pod.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" @@ -133,7 +134,7 @@ func WaitPod(ctx context.Context, podInterface v12.PodInterface, list v1.ListOpt } } -func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, namespace string, portPair []string, readyChan chan struct{}, stopChan <-chan struct{}) error { +func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, namespace string, portPair []string, readyChan chan struct{}, stopChan <-chan struct{}, out, errOut io.Writer) error { err := os.Setenv(string(util.RemoteCommandWebsockets), "true") if err != nil { return err @@ -161,7 +162,7 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na } dialer = portforward.NewFallbackDialer(websocketDialer, dialer, httpstream.IsUpgradeFailure) } - forwarder, err := portforward.New(dialer, portPair, stopChan, readyChan, nil, os.Stderr) + forwarder, err := portforward.New(dialer, portPair, stopChan, readyChan, out, errOut) if err != nil { log.Errorf("Create port forward error: %s", err.Error()) return err @@ -325,39 +326,58 @@ func FindContainerByName(pod *corev1.Pod, name string) (*corev1.Container, int) return nil, -1 } -func CheckPodStatus(cCtx context.Context, cFunc context.CancelFunc, podName string, podInterface v12.PodInterface) { - w, err := podInterface.Watch(cCtx, v1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(), - }) - if err != nil { - return - } - defer w.Stop() +func CheckPodStatus(ctx context.Context, cancelFunc context.CancelFunc, podName string, podInterface v12.PodInterface) { + for ctx.Err() == nil { + func() { + defer time.Sleep(time.Millisecond * 200) - _, err = podInterface.Get(cCtx, podName, v1.GetOptions{}) - if err != nil { - return - } - for { - select { - case e, ok := <-w.ResultChan(): - if !ok { + w, err := podInterface.Watch(ctx, v1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", podName).String(), + }) + if err != nil { + if !k8serrors.IsForbidden(err) && !errors.Is(err, context.Canceled) { + log.Errorf("Failed to watch Pod %s: %v", podName, err) + cancelFunc() + } return } - switch e.Type { - case watch.Deleted: - cFunc() - return - case watch.Error: - return - case watch.Added, watch.Modified, watch.Bookmark: - // do nothing - default: + defer w.Stop() + + _, err = podInterface.Get(ctx, podName, v1.GetOptions{}) + if err != nil { + if !k8serrors.IsForbidden(err) && !errors.Is(err, context.Canceled) { + log.Errorf("Failed to get Pod %s: %v", podName, err) + cancelFunc() + } return } - case <-cCtx.Done(): - return - } + select { + case e, ok := <-w.ResultChan(): + if !ok { + _, err = podInterface.Get(ctx, podName, v1.GetOptions{}) + if err != nil && !errors.Is(err, context.Canceled) { + log.Errorf("Failed to get Pod %s: %v", podName, err) + cancelFunc() + } + return + } + switch e.Type { + case watch.Deleted: + log.Errorf("Pod %s is deleted", podName) + cancelFunc() + return + case watch.Error: + _, err = podInterface.Get(ctx, podName, v1.GetOptions{}) + if err != nil && !errors.Is(err, context.Canceled) { + log.Errorf("Failed to get Pod %s: %v", podName, err) + cancelFunc() + } + return + case watch.Added, watch.Modified, watch.Bookmark: + // do nothing + } + } + }() } } diff --git a/pkg/util/util.go b/pkg/util/util.go index 948fa7d2..257d18ea 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -12,6 +12,7 @@ import ( osexec "os/exec" "path/filepath" "runtime" + "sort" "strings" "syscall" "time" @@ -219,18 +220,32 @@ func GetTlsDomain(ns string) string { return config.ConfigMapPodTrafficManager + "." + ns + "." + "svc" } -func Deduplicate(cidr []*net.IPNet) (result []*net.IPNet) { - var set = sets.New[string]() - for _, ipNet := range cidr { - if ipNet == nil { +func RemoveLargerOverlappingCIDRs(cidrNets []*net.IPNet) []*net.IPNet { + sort.Slice(cidrNets, func(i, j int) bool { + onesI, _ := cidrNets[i].Mask.Size() + onesJ, _ := cidrNets[j].Mask.Size() + return onesI > onesJ + }) + + var cidrsOverlap = func(cidr1, cidr2 *net.IPNet) bool { + return cidr1.Contains(cidr2.IP) || cidr2.Contains(cidr1.IP) + } + + var result []*net.IPNet + skipped := make(map[int]bool) + + for i := range cidrNets { + if skipped[i] { continue } - if !set.Has(ipNet.String()) { - result = append(result, ipNet) + for j := i + 1; j < len(cidrNets); j++ { + if cidrsOverlap(cidrNets[i], cidrNets[j]) { + skipped[j] = true + } } - set.Insert(ipNet.String()) + result = append(result, cidrNets[i]) } - return + return result } func CleanExtensionLib() {