From baedcb114c56e294f3a9647645e236a58c0173ee Mon Sep 17 00:00:00 2001 From: fengcaiwen Date: Sun, 10 Sep 2023 19:46:20 +0800 Subject: [PATCH] feat: cleanup works fine --- cmd/kubevpn/cmds/connect.go | 1 + cmd/kubevpn/cmds/logs.go | 9 ++- cmd/kubevpn/cmds/reset.go | 1 + pkg/daemon/action/connect.go | 14 ++-- pkg/daemon/action/disconnect.go | 33 ++++---- pkg/daemon/action/leave.go | 30 +++---- pkg/daemon/action/list.go | 2 +- pkg/daemon/action/logs.go | 12 ++- pkg/daemon/action/proxy.go | 135 +++++++++----------------------- pkg/daemon/action/quit.go | 30 +++---- pkg/daemon/action/stop.go | 32 ++++---- pkg/handler/cleaner.go | 11 ++- pkg/handler/connect.go | 9 ++- pkg/handler/reset.go | 74 ++++++++++------- pkg/util/ns.go | 4 +- pkg/util/ssh.go | 22 +++--- pkg/util/util.go | 6 +- pkg/util/util_test.go | 2 +- 18 files changed, 209 insertions(+), 218 deletions(-) diff --git a/cmd/kubevpn/cmds/connect.go b/cmd/kubevpn/cmds/connect.go index 2511c4aa..111ac54a 100644 --- a/cmd/kubevpn/cmds/connect.go +++ b/cmd/kubevpn/cmds/connect.go @@ -82,6 +82,7 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { } log.Print(recv.GetMessage()) } + util.Print(os.Stdout, "Now you can access resources in the kubernetes cluster, enjoy it :)") // hangup if foreground { // disconnect from cluster network diff --git a/cmd/kubevpn/cmds/logs.go b/cmd/kubevpn/cmds/logs.go index 7d0af0a0..2be1183b 100644 --- a/cmd/kubevpn/cmds/logs.go +++ b/cmd/kubevpn/cmds/logs.go @@ -1,9 +1,12 @@ package cmds import ( + "context" + "errors" + "fmt" "io" + "os" - log "github.com/sirupsen/logrus" "github.com/spf13/cobra" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/i18n" @@ -35,7 +38,9 @@ func CmdLogs(f cmdutil.Factory) *cobra.Command { if err == io.EOF { break } else if err == nil { - log.Print(resp.Message) + fmt.Fprintln(os.Stdout, resp.Message) + } else if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil } else { return err } diff --git a/cmd/kubevpn/cmds/reset.go b/cmd/kubevpn/cmds/reset.go index 0a5841ee..00b9b96b 100644 --- a/cmd/kubevpn/cmds/reset.go +++ b/cmd/kubevpn/cmds/reset.go @@ -42,6 +42,7 @@ func CmdReset(factory cmdutil.Factory) *cobra.Command { return handler.SshJump(cmd.Context(), sshConf, cmd.Flags()) }, Run: func(cmd *cobra.Command, args []string) { + util.InitLogger(false) if err := connect.InitClient(factory); err != nil { log.Fatal(err) } diff --git a/pkg/daemon/action/connect.go b/pkg/daemon/action/connect.go index b9f1b3e5..a7c1208f 100644 --- a/pkg/daemon/action/connect.go +++ b/pkg/daemon/action/connect.go @@ -2,7 +2,6 @@ package action import ( "context" - "errors" "fmt" "io" defaultlog "log" @@ -11,6 +10,8 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/pflag" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" @@ -88,7 +89,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe if !svr.t.IsZero() { log.Debugf("already connect to another cluster, you can disconnect this connect by command `kubevpn disconnect`") // todo define already connect error? - return errors.New("already connected") + return status.Error(codes.AlreadyExists, "") } svr.t = time.Now() svr.connect = &handler.ConnectOptions{ @@ -118,14 +119,14 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe return err } } - tempFile, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) + file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) if err != nil { return err } flags := pflag.NewFlagSet("", pflag.ContinueOnError) flags.AddFlag(&pflag.Flag{ Name: "kubeconfig", - DefValue: tempFile, + DefValue: file, }) sshCtx, sshCancel := context.WithCancel(context.Background()) @@ -154,7 +155,6 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe svr.connect.Cleanup() return err } - util.Print(out, "Now you can access resources in the kubernetes cluster, enjoy it :)") return nil } @@ -180,14 +180,14 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon ConfigAlias: req.ConfigAlias, RemoteKubeconfig: req.RemoteKubeconfig, } - tempFile, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) + file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) if err != nil { return err } flags := pflag.NewFlagSet("", pflag.ContinueOnError) flags.AddFlag(&pflag.Flag{ Name: "kubeconfig", - DefValue: tempFile, + DefValue: file, }) sshCtx, sshCancel := context.WithCancel(context.Background()) handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel) diff --git a/pkg/daemon/action/disconnect.go b/pkg/daemon/action/disconnect.go index 51d207f8..c677ca2f 100644 --- a/pkg/daemon/action/disconnect.go +++ b/pkg/daemon/action/disconnect.go @@ -10,21 +10,6 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" ) -type disconnectWarp struct { - server rpc.Daemon_DisconnectServer -} - -func (r *disconnectWarp) Write(p []byte) (n int, err error) { - err = r.server.Send(&rpc.DisconnectResponse{ - Message: string(p), - }) - return len(p), err -} - -func newDisconnectWarp(server rpc.Daemon_DisconnectServer) io.Writer { - return &disconnectWarp{server: server} -} - func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_DisconnectServer) error { if !svr.IsSudo { cli := svr.GetClient(true) @@ -35,8 +20,9 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon if err != nil { return err } + var recv *rpc.DisconnectResponse for { - recv, err := connResp.Recv() + recv, err = connResp.Recv() if err == io.EOF { svr.t = time.Time{} svr.connect = nil @@ -66,3 +52,18 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon svr.connect = nil return nil } + +type disconnectWarp struct { + server rpc.Daemon_DisconnectServer +} + +func (r *disconnectWarp) Write(p []byte) (n int, err error) { + err = r.server.Send(&rpc.DisconnectResponse{ + Message: string(p), + }) + return len(p), err +} + +func newDisconnectWarp(server rpc.Daemon_DisconnectServer) io.Writer { + return &disconnectWarp{server: server} +} diff --git a/pkg/daemon/action/leave.go b/pkg/daemon/action/leave.go index 2a840017..75346760 100644 --- a/pkg/daemon/action/leave.go +++ b/pkg/daemon/action/leave.go @@ -10,21 +10,6 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/handler" ) -type leaveWarp struct { - server rpc.Daemon_LeaveServer -} - -func (r *leaveWarp) Write(p []byte) (n int, err error) { - err = r.server.Send(&rpc.LeaveResponse{ - Message: string(p), - }) - return len(p), err -} - -func newLeaveWarp(server rpc.Daemon_LeaveServer) io.Writer { - return &leaveWarp{server: server} -} - func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) error { out := newLeaveWarp(resp) origin := log.StandardLogger().Out @@ -49,3 +34,18 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err } return nil } + +type leaveWarp struct { + server rpc.Daemon_LeaveServer +} + +func (r *leaveWarp) Write(p []byte) (n int, err error) { + err = r.server.Send(&rpc.LeaveResponse{ + Message: string(p), + }) + return len(p), err +} + +func newLeaveWarp(server rpc.Daemon_LeaveServer) io.Writer { + return &leaveWarp{server: server} +} diff --git a/pkg/daemon/action/list.go b/pkg/daemon/action/list.go index 6751b9a5..493d0257 100644 --- a/pkg/daemon/action/list.go +++ b/pkg/daemon/action/list.go @@ -16,7 +16,7 @@ import ( func (svr *Server) List(ctx context.Context, req *rpc.ListRequest) (*rpc.ListResponse, error) { if svr.connect == nil { - return nil, fmt.Errorf("no proxy workloads found") + return nil, fmt.Errorf("not connect to any cluster") } mapInterface := svr.connect.GetClientset().CoreV1().ConfigMaps(svr.connect.Namespace) configMap, err := mapInterface.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) diff --git a/pkg/daemon/action/logs.go b/pkg/daemon/action/logs.go index e9b08e7e..80350721 100644 --- a/pkg/daemon/action/logs.go +++ b/pkg/daemon/action/logs.go @@ -2,12 +2,17 @@ package action import ( "github.com/hpcloud/tail" + "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" ) func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error { path := GetDaemonLogPath() - config := tail.Config{Follow: true, ReOpen: true, MustExist: true} + config := tail.Config{Follow: req.Follow, ReOpen: true, MustExist: true} + if !req.Follow { + // FATAL -- cannot set ReOpen without Follow. + config.ReOpen = false + } file, err := tail.TailFile(path, config) if err != nil { return err @@ -16,7 +21,10 @@ func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error { select { case <-resp.Context().Done(): return nil - case line := <-file.Lines: + case line, ok := <-file.Lines: + if !ok { + return nil + } if line.Err != nil { return err } diff --git a/pkg/daemon/action/proxy.go b/pkg/daemon/action/proxy.go index 81c8cb34..06144eab 100644 --- a/pkg/daemon/action/proxy.go +++ b/pkg/daemon/action/proxy.go @@ -1,10 +1,8 @@ package action import ( - "context" "fmt" "io" - "time" log "github.com/sirupsen/logrus" "github.com/spf13/pflag" @@ -15,32 +13,17 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/util" ) -type proxyWarp struct { - server rpc.Daemon_ProxyServer -} - -func (r *proxyWarp) Write(p []byte) (n int, err error) { - err = r.server.Send(&rpc.ConnectResponse{ - Message: string(p), - }) - return len(p), err -} - -func newProxyWarp(server rpc.Daemon_ProxyServer) io.Writer { - return &proxyWarp{server: server} -} - -// 1. if not connect to cluster -// 1.1 connect to cluster -// 1.2 proxy workloads -// 2. if already connect to cluster -// 2.1 disconnect from cluster -// 2.2 same as step 1 - +// Proxy +// 1. if not connect to cluster +// 1.1 connect to cluster +// 1.2 proxy workloads +// 2. if already connect to cluster +// 2.1 disconnect from cluster +// 2.2 same as step 1 func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) error { - out := newProxyWarp(resp) origin := log.StandardLogger().Out - log.SetOutput(io.MultiWriter(out, origin)) + out := io.MultiWriter(newProxyWarp(resp), origin) + log.SetOutput(out) defer func() { log.SetOutput(origin) log.SetLevel(log.DebugLevel) @@ -65,7 +48,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e RemoteKubeconfig: req.RemoteKubeconfig, } - file, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) + file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) if err != nil { return err } @@ -92,7 +75,8 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e return fmt.Errorf("daemon is not avaliable") } if svr.connect != nil { - isSameCluster, err := util.IsSameCluster( + var isSameCluster bool + isSameCluster, err = util.IsSameCluster( svr.connect.GetClientset().CoreV1().ConfigMaps(svr.connect.Namespace), svr.connect.Namespace, connect.GetClientset().CoreV1().ConfigMaps(connect.Namespace), connect.Namespace, ) @@ -101,12 +85,14 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e log.Debugf("already connect to cluster") } else { log.Debugf("try to disconnect from another cluster") - disconnect, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{}) + var disconnect rpc.Daemon_DisconnectClient + disconnect, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{}) if err != nil { return err } + var recv *rpc.DisconnectResponse for { - recv, err := disconnect.Recv() + recv, err = disconnect.Recv() if err == io.EOF { break } else if err != nil { @@ -122,12 +108,14 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e if svr.connect == nil { log.Debugf("connectting to cluster") - connResp, err := daemonClient.Connect(ctx, req) + var connResp rpc.Daemon_ConnectClient + connResp, err = daemonClient.Connect(ctx, req) if err != nil { return err } + var recv *rpc.ConnectResponse for { - recv, err := connResp.Recv() + recv, err = connResp.Recv() if err == io.EOF { break } else if err != nil { @@ -142,70 +130,25 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e log.Debugf("proxy resource...") err = svr.connect.CreateRemoteInboundPod(ctx) + if err != nil { + return err + } log.Debugf("proxy resource done") - return err -} - -func (svr *Server) redirectToSudoDaemon1(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServer) error { - cli := svr.GetClient(true) - if cli == nil { - return fmt.Errorf("sudo daemon not start") - } - connResp, err := cli.Connect(resp.Context(), req) - if err != nil { - return err - } - for { - recv, err := connResp.Recv() - if err == io.EOF { - break - } else if err != nil { - return err - } - err = resp.Send(recv) - if err != nil { - return err - } - } - - svr.t = time.Now() - svr.connect = &handler.ConnectOptions{ - Namespace: req.Namespace, - Headers: req.Headers, - Workloads: req.Workloads, - ExtraCIDR: req.ExtraCIDR, - ExtraDomain: req.ExtraDomain, - UseLocalDNS: req.UseLocalDNS, - Engine: config.Engine(req.Engine), - } - var sshConf = &util.SshConfig{ - Addr: req.Addr, - User: req.User, - Password: req.Password, - Keyfile: req.Keyfile, - ConfigAlias: req.ConfigAlias, - RemoteKubeconfig: req.RemoteKubeconfig, - } - file, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) - if err != nil { - return err - } - flags := pflag.NewFlagSet("", pflag.ContinueOnError) - flags.AddFlag(&pflag.Flag{ - Name: "kubeconfig", - DefValue: file, - }) - err = handler.SshJump(context.Background(), sshConf, flags) - if err != nil { - return err - } - err = svr.connect.InitClient(InitFactory(req.KubeconfigBytes, req.Namespace)) - if err != nil { - return err - } - err = svr.connect.PreCheckResource() - if err != nil { - return err - } + util.Print(out, "Now you can access resources in the kubernetes cluster, enjoy it :)") return nil } + +type proxyWarp struct { + server rpc.Daemon_ProxyServer +} + +func (r *proxyWarp) Write(p []byte) (n int, err error) { + err = r.server.Send(&rpc.ConnectResponse{ + Message: string(p), + }) + return len(p), err +} + +func newProxyWarp(server rpc.Daemon_ProxyServer) io.Writer { + return &proxyWarp{server: server} +} diff --git a/pkg/daemon/action/quit.go b/pkg/daemon/action/quit.go index f48b787c..0b7be508 100644 --- a/pkg/daemon/action/quit.go +++ b/pkg/daemon/action/quit.go @@ -8,21 +8,6 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" ) -type QuitWarp struct { - server rpc.Daemon_QuitServer -} - -func (r *QuitWarp) Write(p []byte) (n int, err error) { - err = r.server.Send(&rpc.QuitResponse{ - Message: string(p), - }) - return len(p), err -} - -func newQuitWarp(server rpc.Daemon_QuitServer) io.Writer { - return &QuitWarp{server: server} -} - func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error { origin := log.StandardLogger().Out defer func() { @@ -39,3 +24,18 @@ func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error } return nil } + +type quitWarp struct { + server rpc.Daemon_QuitServer +} + +func (r *quitWarp) Write(p []byte) (n int, err error) { + err = r.server.Send(&rpc.QuitResponse{ + Message: string(p), + }) + return len(p), err +} + +func newQuitWarp(server rpc.Daemon_QuitServer) io.Writer { + return &quitWarp{server: server} +} diff --git a/pkg/daemon/action/stop.go b/pkg/daemon/action/stop.go index 2437c22f..7f87c1a5 100644 --- a/pkg/daemon/action/stop.go +++ b/pkg/daemon/action/stop.go @@ -9,27 +9,12 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" ) -type StopWarp struct { - server rpc.Daemon_QuitServer -} - -func (r *StopWarp) Write(p []byte) (n int, err error) { - err = r.server.Send(&rpc.QuitResponse{ - Message: string(p), - }) - return len(p), err -} - -func newStopWarp(server rpc.Daemon_QuitServer) io.Writer { - return &StopWarp{server: server} -} - func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error { if svr.connect == nil { return nil } - out := newQuitWarp(resp) + out := newStopWarp(resp) origin := log.StandardLogger().Out defer func() { log.SetOutput(origin) @@ -42,3 +27,18 @@ func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error svr.connect = nil return nil } + +type stopWarp struct { + server rpc.Daemon_QuitServer +} + +func (r *stopWarp) Write(p []byte) (n int, err error) { + err = r.server.Send(&rpc.QuitResponse{ + Message: string(p), + }) + return len(p), err +} + +func newStopWarp(server rpc.Daemon_QuitServer) io.Writer { + return &stopWarp{server: server} +} diff --git a/pkg/handler/cleaner.go b/pkg/handler/cleaner.go index 37ed71fa..d44785c1 100644 --- a/pkg/handler/cleaner.go +++ b/pkg/handler/cleaner.go @@ -40,9 +40,6 @@ func (c *ConnectOptions) Cleanup() { log.Info("prepare to exit, cleaning up") ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - if c.cancel != nil { - c.cancel() - } var ips []net.IP if c.localTunIPv4 != nil && c.localTunIPv4.IP != nil { ips = append(ips, c.localTunIPv4.IP) @@ -75,6 +72,14 @@ func (c *ConnectOptions) Cleanup() { function() } } + // leave proxy resources + err := c.LeaveProxyResources(context.Background()) + if err != nil { + log.Error(err) + } + if c.cancel != nil { + c.cancel() + } RollbackFuncList = RollbackFuncList[:] dns.CancelDNS() log.Info("clean up successful") diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 7a2c5325..1136700c 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -765,7 +765,12 @@ func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (e return err } - var local = &netip.AddrPort{} + port := util.GetAvailableTCPPortOrDie() + var local netip.AddrPort + local, err = netip.ParseAddrPort(net.JoinHostPort("127.0.0.1", strconv.Itoa(port))) + if err != nil { + return err + } errChan := make(chan error, 1) readyChan := make(chan struct{}, 1) go func() { @@ -776,7 +781,7 @@ func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (e default: } - err := util.Main(ctx, &remote, local, conf, readyChan) + err := util.Main(ctx, remote, local, conf, readyChan) if err != nil { if !errors.Is(err, context.Canceled) { log.Errorf("ssh forward failed err: %v", err) diff --git a/pkg/handler/reset.go b/pkg/handler/reset.go index e1b0037c..49cfcb3f 100644 --- a/pkg/handler/reset.go +++ b/pkg/handler/reset.go @@ -2,6 +2,8 @@ package handler import ( "context" + "fmt" + corev1 "k8s.io/api/core/v1" "strings" "github.com/docker/docker/api/types" @@ -19,41 +21,59 @@ import ( // 1, get all proxy-resources from configmap // 2, cleanup all containers func (c *ConnectOptions) Reset(ctx context.Context) error { - cm, err := c.clientset.CoreV1().ConfigMaps(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - var v = make([]*controlplane.Virtual, 0) - localTunIPv4 := c.GetLocalTunIPv4() - if cm != nil && cm.Data != nil { - if str, ok := cm.Data[config.KeyEnvoy]; ok && len(str) != 0 { - if err = yaml.Unmarshal([]byte(str), &v); err != nil { - log.Error(err) - return err - } - for _, virtual := range v { - // deployments.apps.ry-server --> deployments.apps/ry-server - lastIndex := strings.LastIndex(virtual.Uid, ".") - uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:] - err = UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, localTunIPv4) - if err != nil { - log.Error(err) - continue - } - } - } + err := c.LeaveProxyResources(ctx) + if err != nil { + log.Error(err) } + cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false) var cli *client.Client - if cli, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()); err != nil { + cli, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { return nil } - var i types.NetworkResource - if i, err = cli.NetworkInspect(ctx, config.ConfigMapPodTrafficManager, types.NetworkInspectOptions{}); err != nil { + var networkResource types.NetworkResource + networkResource, err = cli.NetworkInspect(ctx, config.ConfigMapPodTrafficManager, types.NetworkInspectOptions{}) + if err != nil { return nil } - if len(i.Containers) == 0 { + if len(networkResource.Containers) == 0 { return cli.NetworkRemove(ctx, config.ConfigMapPodTrafficManager) } return nil } + +func (c *ConnectOptions) LeaveProxyResources(ctx context.Context) (err error) { + if c == nil || c.clientset == nil { + return + } + + mapInterface := c.clientset.CoreV1().ConfigMaps(c.Namespace) + var cm *corev1.ConfigMap + cm, err = mapInterface.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return + } + if cm == nil || cm.Data == nil || len(cm.Data[config.KeyEnvoy]) == 0 { + err = fmt.Errorf("can not found proxy resources") + return + } + var v = make([]*controlplane.Virtual, 0) + str := cm.Data[config.KeyEnvoy] + if err = yaml.Unmarshal([]byte(str), &v); err != nil { + log.Error(err) + return + } + localTunIPv4 := c.GetLocalTunIPv4() + for _, virtual := range v { + // deployments.apps.ry-server --> deployments.apps/ry-server + lastIndex := strings.LastIndex(virtual.Uid, ".") + uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:] + err = UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, localTunIPv4) + if err != nil { + log.Error(err) + continue + } + } + return err +} diff --git a/pkg/util/ns.go b/pkg/util/ns.go index 40527072..84120d7c 100644 --- a/pkg/util/ns.go +++ b/pkg/util/ns.go @@ -63,8 +63,8 @@ func ConvertToKubeconfigBytes(factory cmdutil.Factory) ([]byte, string, error) { return marshal, namespace, nil } -func ConvertToTempFile(kubeconfigBytes []byte) (string, error) { - temp, err := os.CreateTemp("", "") +func ConvertToTempKubeconfigFile(kubeconfigBytes []byte) (string, error) { + temp, err := os.CreateTemp("", "*.tmp.kubeconfig") if err != nil { return "", err } diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 5af5fd69..71c36b7d 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -29,7 +29,10 @@ type SshConfig struct { RemoteKubeconfig string } -func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, conf *SshConfig, done chan struct{}) error { +func Main(pctx context.Context, remoteEndpoint, localEndpoint netip.AddrPort, conf *SshConfig, done chan struct{}) error { + ctx, cancelFunc := context.WithCancel(pctx) + defer cancelFunc() + var remote *ssh.Client var err error if conf.ConfigAlias != "" { @@ -60,16 +63,12 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co // Listen on remote server port var lc net.ListenConfig - listen, err := lc.Listen(ctx, "tcp", "localhost:0") + listen, err := lc.Listen(ctx, "tcp", localEndpoint.String()) if err != nil { return err } defer listen.Close() - *localEndpoint, err = netip.ParseAddrPort(listen.Addr().String()) - if err != nil { - return err - } select { case done <- struct{}{}: } @@ -94,9 +93,10 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co if err == nil { break } - time.Sleep(time.Second) + time.Sleep(time.Millisecond * 200) } if conn == nil { + cancelFunc() return } defer conn.Close() @@ -180,12 +180,12 @@ func publicKeyFile(file string) ssh.AuthMethod { } func handleClient(client net.Conn, remote net.Conn) { - chDone := make(chan bool) + chDone := make(chan bool, 2) // start remote -> local data transfer go func() { _, err := io.Copy(client, remote) - if err != nil { + if err != nil && !errors.Is(err, net.ErrClosed) { log.Debugf("error while copy remote->local: %s", err) } chDone <- true @@ -194,7 +194,7 @@ func handleClient(client net.Conn, remote net.Conn) { // start local -> remote data transfer go func() { _, err := io.Copy(remote, client) - if err != nil { + if err != nil && !errors.Is(err, net.ErrClosed) { log.Debugf("error while copy local->remote: %s", err) } chDone <- true @@ -267,6 +267,7 @@ func dial(from *SshConfig) (*ssh.Client, error) { User: from.User, Auth: []ssh.AuthMethod{publicKeyFile(from.Keyfile)}, HostKeyCallback: ssh.InsecureIgnoreHostKey(), + Timeout: time.Second * 10, }) } @@ -281,6 +282,7 @@ func jump(bClient *ssh.Client, to *SshConfig) (*ssh.Client, error) { User: to.User, Auth: []ssh.AuthMethod{publicKeyFile(to.Keyfile)}, HostKeyCallback: ssh.InsecureIgnoreHostKey(), + Timeout: time.Second * 10, }) if err != nil { return nil, err diff --git a/pkg/util/util.go b/pkg/util/util.go index 11c3596a..55159be9 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -55,7 +55,7 @@ import ( ) func GetAvailableUDPPortOrDie() int { - address, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:0", "0.0.0.0")) + address, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:0", "localhost")) if err != nil { log.Fatal(err) } @@ -68,7 +68,7 @@ func GetAvailableUDPPortOrDie() int { } func GetAvailableTCPPortOrDie() int { - address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", "0.0.0.0")) + address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", "localhost")) if err != nil { log.Fatal(err) } @@ -489,7 +489,7 @@ func WaitPortToBeFree(ctx context.Context, port int) error { } func IsPortListening(port int) bool { - listener, err := net.Listen("tcp4", net.JoinHostPort("0.0.0.0", strconv.Itoa(port))) + listener, err := net.Listen("tcp4", net.JoinHostPort("localhost", strconv.Itoa(port))) if err != nil { return true } else { diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index c51b3434..8e2ec85c 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -93,7 +93,7 @@ func TestPing(t *testing.T) { log.Infof("failed to serialize icmp packet, err: %v", err) return } - ipConn, err := net.ListenPacket("ip4:icmp", "0.0.0.0") + ipConn, err := net.ListenPacket("ip4:icmp", "localhost") if err != nil { if strings.Contains(err.Error(), "operation not permitted") { return