From 31186fc1d9698837d6ee8853b458cbb9220d30ea Mon Sep 17 00:00:00 2001 From: naison <895703375@qq.com> Date: Thu, 24 Apr 2025 22:39:03 +0800 Subject: [PATCH] refactor: only ssh jump in user daemon (#558) --- cmd/kubevpn/cmds/quit.go | 2 +- pkg/daemon/action/connect-fork.go | 26 +++++++++++------------ pkg/daemon/action/connect.go | 24 ++++++++++----------- pkg/daemon/action/disconnect.go | 35 +++++++++++++++++-------------- 4 files changed, 45 insertions(+), 42 deletions(-) diff --git a/cmd/kubevpn/cmds/quit.go b/cmd/kubevpn/cmds/quit.go index f2d08233..9cfac298 100644 --- a/cmd/kubevpn/cmds/quit.go +++ b/cmd/kubevpn/cmds/quit.go @@ -29,8 +29,8 @@ func CmdQuit(f cmdutil.Factory) *cobra.Command { kubevpn quit `)), RunE: func(cmd *cobra.Command, args []string) error { - _ = quit(cmd.Context(), false) _ = quit(cmd.Context(), true) + _ = quit(cmd.Context(), false) util.CleanExtensionLib() _, _ = fmt.Fprint(os.Stdout, "Exited") return nil diff --git a/pkg/daemon/action/connect-fork.go b/pkg/daemon/action/connect-fork.go index d3ddff2b..cdd7272e 100644 --- a/pkg/daemon/action/connect-fork.go +++ b/pkg/daemon/action/connect-fork.go @@ -3,6 +3,7 @@ package action import ( "context" "io" + "os" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -35,12 +36,6 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF if err != nil { return err } - flags := pflag.NewFlagSet("", pflag.ContinueOnError) - flags.AddFlag(&pflag.Flag{ - Name: "kubeconfig", - DefValue: file, - }) - sshCtx, sshCancel := context.WithCancel(context.Background()) connect.AddRolloutFunc(func() error { sshCancel() @@ -55,12 +50,7 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF } }() - var path string - path, err = ssh.SshJump(sshCtx, ssh.ParseSshFromRPC(req.SshJump), flags, false) - if err != nil { - return err - } - err = connect.InitClient(util.InitFactoryByPath(path, req.Namespace)) + err = connect.InitClient(util.InitFactoryByPath(file, req.Namespace)) if err != nil { return err } @@ -99,6 +89,8 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp DefValue: file, }) sshCtx, sshCancel := context.WithCancel(context.Background()) + sshCtx = plog.WithLogger(sshCtx, logger) + defer plog.WithoutLogger(sshCtx) connect := &handler.ConnectOptions{ Namespace: req.Namespace, ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute), @@ -125,7 +117,7 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp return err } - connectNs, err := util.DetectConnectNamespace(plog.WithLogger(sshCtx, logger), connect.GetFactory(), req.Namespace) + connectNs, err := util.DetectConnectNamespace(sshCtx, connect.GetFactory(), req.Namespace) if err != nil { return err } @@ -144,6 +136,7 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp connect.GetClientset().CoreV1(), connect.Namespace, ) if isSameCluster { + sshCancel() // same cluster, do nothing logger.Infof("Connected with cluster") return nil @@ -155,6 +148,13 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp return err } + // only ssh jump in user daemon + content, err := os.ReadFile(path) + if err != nil { + return err + } + req.KubeconfigBytes = string(content) + req.SshJump = ssh.SshConfig{}.ToRPC() connResp, err := cli.ConnectFork(ctx, req) if err != nil { return err diff --git a/pkg/daemon/action/connect.go b/pkg/daemon/action/connect.go index 3cb839de..2d37c873 100644 --- a/pkg/daemon/action/connect.go +++ b/pkg/daemon/action/connect.go @@ -3,6 +3,7 @@ package action import ( "context" "io" + "os" "time" "github.com/pkg/errors" @@ -54,12 +55,6 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe if err != nil { return err } - flags := pflag.NewFlagSet("", pflag.ContinueOnError) - flags.AddFlag(&pflag.Flag{ - Name: "kubeconfig", - DefValue: file, - }) - sshCtx, sshCancel := context.WithCancel(context.Background()) svr.connect.AddRolloutFunc(func() error { sshCancel() @@ -75,12 +70,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe sshCancel() } }() - var path string - path, err = ssh.SshJump(sshCtx, ssh.ParseSshFromRPC(req.SshJump), flags, false) - if err != nil { - return err - } - err = svr.connect.InitClient(util.InitFactoryByPath(path, req.Namespace)) + err = svr.connect.InitClient(util.InitFactoryByPath(file, req.Namespace)) if err != nil { return err } @@ -114,6 +104,8 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon DefValue: file, }) sshCtx, sshCancel := context.WithCancel(context.Background()) + sshCtx = plog.WithLogger(sshCtx, logger) + defer plog.WithoutLogger(sshCtx) connect := &handler.ConnectOptions{ Namespace: req.Namespace, ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute), @@ -159,6 +151,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon connect.GetClientset().CoreV1(), connect.Namespace, ) if isSameCluster { + sshCancel() // same cluster, do nothing logger.Infof("Connected to cluster") return nil @@ -170,6 +163,13 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon return err } + // only ssh jump in user daemon + content, err := os.ReadFile(path) + if err != nil { + return err + } + req.KubeconfigBytes = string(content) + req.SshJump = ssh.SshConfig{}.ToRPC() connResp, err := cli.Connect(ctx, req) if err != nil { return err diff --git a/pkg/daemon/action/disconnect.go b/pkg/daemon/action/disconnect.go index a800ed5c..98b2f048 100644 --- a/pkg/daemon/action/disconnect.go +++ b/pkg/daemon/action/disconnect.go @@ -21,6 +21,25 @@ import ( func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_DisconnectServer) error { logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newDisconnectWarp(resp), svr.LogFile)) ctx := plog.WithLogger(resp.Context(), logger) + + // disconnect sudo daemon first + // then disconnect from user daemon + // because only ssh jump in user daemon + if !svr.IsSudo { + cli, err := svr.GetClient(true) + if err != nil { + return errors.Wrap(err, "sudo daemon not start") + } + connResp, err := cli.Disconnect(resp.Context(), req) + if err != nil { + return err + } + err = util.CopyGRPCStream[rpc.DisconnectResponse](connResp, resp) + if err != nil { + return err + } + } + switch { case req.GetAll(): if svr.clone != nil { @@ -102,22 +121,6 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon _ = dns.CleanupHosts() } } - - if !svr.IsSudo { - cli, err := svr.GetClient(true) - if err != nil { - return errors.Wrap(err, "sudo daemon not start") - } - connResp, err := cli.Disconnect(resp.Context(), req) - if err != nil { - return err - } - err = util.CopyGRPCStream[rpc.DisconnectResponse](connResp, resp) - if err != nil { - return err - } - } - return nil }