diff --git a/cmd/kubevpn/cmds/clone.go b/cmd/kubevpn/cmds/clone.go index e6234b3b..bfa0a1b8 100644 --- a/cmd/kubevpn/cmds/clone.go +++ b/cmd/kubevpn/cmds/clone.go @@ -19,6 +19,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -26,7 +27,7 @@ import ( // kubectl exec POD_NAME -c CONTAINER_NAME /sbin/killall5 or ephemeralcontainers func CmdClone(f cmdutil.Factory) *cobra.Command { var options = handler.CloneOptions{} - var sshConf = &util.SshConfig{} + var sshConf = &pkgssh.SshConfig{} var extraRoute = &handler.ExtraRouteInfo{} var transferImage bool var syncDir string @@ -167,7 +168,7 @@ func CmdClone(f cmdutil.Factory) *cobra.Command { cmd.Flags().StringVar(&syncDir, "sync", "", "Sync local dir to remote pod dir. format: LOCAL_DIR:REMOTE_DIR, eg: ~/code:/app/code") handler.AddExtraRoute(cmd.Flags(), extraRoute) - util.AddSshFlags(cmd.Flags(), sshConf) + pkgssh.AddSshFlags(cmd.Flags(), sshConf) cmd.ValidArgsFunction = utilcomp.ResourceTypeAndNameCompletionFunc(f) return cmd } diff --git a/cmd/kubevpn/cmds/config.go b/cmd/kubevpn/cmds/config.go index a24ce454..ea940467 100644 --- a/cmd/kubevpn/cmds/config.go +++ b/cmd/kubevpn/cmds/config.go @@ -11,6 +11,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -25,7 +26,7 @@ func CmdConfig(f cmdutil.Factory) *cobra.Command { } func cmdConfigAdd(f cmdutil.Factory) *cobra.Command { - var sshConf = &util.SshConfig{} + var sshConf = &pkgssh.SshConfig{} cmd := &cobra.Command{ Use: "add", Short: i18n.T("Proxy kubeconfig"), @@ -63,7 +64,7 @@ func cmdConfigAdd(f cmdutil.Factory) *cobra.Command { return nil }, } - util.AddSshFlags(cmd.Flags(), sshConf) + pkgssh.AddSshFlags(cmd.Flags(), sshConf) return cmd } diff --git a/cmd/kubevpn/cmds/connect.go b/cmd/kubevpn/cmds/connect.go index 25a53de6..c5c73806 100644 --- a/cmd/kubevpn/cmds/connect.go +++ b/cmd/kubevpn/cmds/connect.go @@ -19,13 +19,14 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) func CmdConnect(f cmdutil.Factory) *cobra.Command { var connect = &handler.ConnectOptions{} var extraRoute = &handler.ExtraRouteInfo{} - var sshConf = &util.SshConfig{} + var sshConf = &pkgssh.SshConfig{} var transferImage, foreground, lite bool cmd := &cobra.Command{ Use: "connect", @@ -164,6 +165,6 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { cmd.Flags().BoolVar(&lite, "lite", false, "connect to multiple cluster in lite mode, you needs to special this options") handler.AddExtraRoute(cmd.Flags(), extraRoute) - util.AddSshFlags(cmd.Flags(), sshConf) + pkgssh.AddSshFlags(cmd.Flags(), sshConf) return cmd } diff --git a/cmd/kubevpn/cmds/cp.go b/cmd/kubevpn/cmds/cp.go index 95b1eb62..9a8c0535 100644 --- a/cmd/kubevpn/cmds/cp.go +++ b/cmd/kubevpn/cmds/cp.go @@ -13,7 +13,7 @@ import ( "k8s.io/kubectl/pkg/util/templates" "github.com/wencaiwulue/kubevpn/v2/pkg/cp" - "github.com/wencaiwulue/kubevpn/v2/pkg/util" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" ) var cpExample = templates.Examples(i18n.T(` @@ -64,7 +64,7 @@ func CmdCp(f cmdutil.Factory) *cobra.Command { Out: os.Stdout, ErrOut: os.Stderr, }) - var sshConf = &util.SshConfig{} + var sshConf = &pkgssh.SshConfig{} cmd := &cobra.Command{ Use: "cp ", DisableFlagsInUseLine: true, @@ -73,7 +73,7 @@ func CmdCp(f cmdutil.Factory) *cobra.Command { Long: i18n.T("Copy files and directories to and from containers. Different between kubectl cp is it will de-reference symbol link."), Example: cpExample, ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { - cmdutil.CheckErr(util.SshJumpAndSetEnv(cmd.Context(), sshConf, cmd.Flags(), false)) + cmdutil.CheckErr(pkgssh.SshJumpAndSetEnv(cmd.Context(), sshConf, cmd.Flags(), false)) var comps []string if len(args) == 0 { @@ -135,6 +135,6 @@ func CmdCp(f cmdutil.Factory) *cobra.Command { cmd.Flags().BoolVarP(&o.NoPreserve, "no-preserve", "", false, "The copied file/directory's ownership and permissions will not be preserved in the container") cmd.Flags().IntVarP(&o.MaxTries, "retries", "", 0, "Set number of retries to complete a copy operation from a container. Specify 0 to disable or any negative value for infinite retrying. The default is 0 (no retry).") - util.AddSshFlags(cmd.Flags(), sshConf) + pkgssh.AddSshFlags(cmd.Flags(), sshConf) return cmd } diff --git a/cmd/kubevpn/cmds/dev.go b/cmd/kubevpn/cmds/dev.go index 78a3c383..4c2fa2d1 100644 --- a/cmd/kubevpn/cmds/dev.go +++ b/cmd/kubevpn/cmds/dev.go @@ -16,6 +16,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" "github.com/wencaiwulue/kubevpn/v2/pkg/dev" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -24,7 +25,7 @@ func CmdDev(f cmdutil.Factory) *cobra.Command { NoProxy: false, ExtraRouteInfo: handler.ExtraRouteInfo{}, } - var sshConf = &util.SshConfig{} + var sshConf = &pkgssh.SshConfig{} var transferImage bool cmd := &cobra.Command{ Use: "dev TYPE/NAME [-c CONTAINER] [flags] -- [args...]", @@ -105,7 +106,7 @@ func CmdDev(f cmdutil.Factory) *cobra.Command { if err != nil { return err } - return util.SshJumpAndSetEnv(cmd.Context(), sshConf, cmd.Flags(), false) + return pkgssh.SshJumpAndSetEnv(cmd.Context(), sshConf, cmd.Flags(), false) }, RunE: func(cmd *cobra.Command, args []string) error { options.Workload = args[0] @@ -151,7 +152,7 @@ func CmdDev(f cmdutil.Factory) *cobra.Command { dev.AddDockerFlags(options, cmd.Flags()) handler.AddExtraRoute(cmd.Flags(), &options.ExtraRouteInfo) - util.AddSshFlags(cmd.Flags(), sshConf) + pkgssh.AddSshFlags(cmd.Flags(), sshConf) return cmd } diff --git a/cmd/kubevpn/cmds/proxy.go b/cmd/kubevpn/cmds/proxy.go index 7910d8ac..49d2451c 100644 --- a/cmd/kubevpn/cmds/proxy.go +++ b/cmd/kubevpn/cmds/proxy.go @@ -19,13 +19,14 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) func CmdProxy(f cmdutil.Factory) *cobra.Command { var connect = handler.ConnectOptions{} var extraRoute = &handler.ExtraRouteInfo{} - var sshConf = &util.SshConfig{} + var sshConf = &pkgssh.SshConfig{} var transferImage, foreground bool cmd := &cobra.Command{ Use: "proxy", @@ -183,7 +184,7 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command { cmd.Flags().BoolVar(&foreground, "foreground", false, "foreground hang up") handler.AddExtraRoute(cmd.Flags(), extraRoute) - util.AddSshFlags(cmd.Flags(), sshConf) + pkgssh.AddSshFlags(cmd.Flags(), sshConf) cmd.ValidArgsFunction = utilcomp.ResourceTypeAndNameCompletionFunc(f) return cmd } diff --git a/cmd/kubevpn/cmds/reset.go b/cmd/kubevpn/cmds/reset.go index c769fa5d..02276b5f 100644 --- a/cmd/kubevpn/cmds/reset.go +++ b/cmd/kubevpn/cmds/reset.go @@ -16,11 +16,12 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) func CmdReset(f cmdutil.Factory) *cobra.Command { - var sshConf = &util.SshConfig{} + var sshConf = &pkgssh.SshConfig{} cmd := &cobra.Command{ Use: "reset", Short: "Reset all resource create by kubevpn in k8s cluster", @@ -86,7 +87,7 @@ func CmdReset(f cmdutil.Factory) *cobra.Command { }, } - util.AddSshFlags(cmd.Flags(), sshConf) + pkgssh.AddSshFlags(cmd.Flags(), sshConf) return cmd } diff --git a/cmd/kubevpn/cmds/ssh.go b/cmd/kubevpn/cmds/ssh.go index dae1ad32..38ad48ce 100644 --- a/cmd/kubevpn/cmds/ssh.go +++ b/cmd/kubevpn/cmds/ssh.go @@ -20,13 +20,13 @@ import ( "k8s.io/kubectl/pkg/util/term" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" - "github.com/wencaiwulue/kubevpn/v2/pkg/util" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" ) // CmdSSH // Remember to use network mask 32, because ssh using unique network cidr 223.255.0.0/16 func CmdSSH(_ cmdutil.Factory) *cobra.Command { - var sshConf = &util.SshConfig{} + var sshConf = &pkgssh.SshConfig{} var ExtraCIDR []string cmd := &cobra.Command{ Use: "ssh", @@ -108,7 +108,7 @@ func CmdSSH(_ cmdutil.Factory) *cobra.Command { } }, } - util.AddSshFlags(cmd.Flags(), sshConf) + pkgssh.AddSshFlags(cmd.Flags(), sshConf) cmd.Flags().StringArrayVar(&ExtraCIDR, "extra-cidr", []string{}, "Extra cidr string, eg: --extra-cidr 192.168.0.159/24 --extra-cidr 192.168.1.160/32") return cmd } diff --git a/cmd/kubevpn/cmds/status.go b/cmd/kubevpn/cmds/status.go index ed11c587..5b53f20d 100644 --- a/cmd/kubevpn/cmds/status.go +++ b/cmd/kubevpn/cmds/status.go @@ -21,6 +21,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -224,8 +225,8 @@ func genCloneMsg(w *tabwriter.Writer, list []*rpc.Status) { func GetClusterIDByConfig(cmd *cobra.Command, config Config) (string, error) { flags := flag.NewFlagSet("", flag.ContinueOnError) - var sshConf = &util.SshConfig{} - util.AddSshFlags(flags, sshConf) + var sshConf = &pkgssh.SshConfig{} + pkgssh.AddSshFlags(flags, sshConf) handler.AddExtraRoute(flags, &handler.ExtraRouteInfo{}) configFlags := genericclioptions.NewConfigFlags(false).WithDeprecatedPasswordFlag() configFlags.AddFlags(flags) @@ -263,7 +264,7 @@ func GetClusterIDByConfig(cmd *cobra.Command, config Config) (string, error) { DefValue: ns, }) var path string - path, err = util.SshJump(cmd.Context(), sshConf, flags, false) + path, err = pkgssh.SshJump(cmd.Context(), sshConf, flags, false) if err != nil { return "", err } diff --git a/pkg/daemon/action/clone.go b/pkg/daemon/action/clone.go index bc023c57..0648706e 100644 --- a/pkg/daemon/action/clone.go +++ b/pkg/daemon/action/clone.go @@ -13,6 +13,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -25,7 +26,7 @@ func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) (er out := io.MultiWriter(newCloneWarp(resp), svr.LogFile) log.SetOutput(out) log.SetLevel(log.InfoLevel) - var sshConf = util.ParseSshFromRPC(req.SshJump) + var sshConf = ssh.ParseSshFromRPC(req.SshJump) connReq := &rpc.ConnectRequest{ KubeconfigBytes: req.KubeconfigBytes, Namespace: req.Namespace, @@ -98,7 +99,7 @@ func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) (er return nil }) var path string - path, err = util.SshJump(sshCtx, sshConf, flags, false) + path, err = ssh.SshJump(sshCtx, sshConf, flags, false) if err != nil { return err } diff --git a/pkg/daemon/action/config.go b/pkg/daemon/action/config.go index 415fa133..da2fcffd 100644 --- a/pkg/daemon/action/config.go +++ b/pkg/daemon/action/config.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/pflag" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" + "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -29,8 +30,8 @@ func (svr *Server) ConfigAdd(ctx context.Context, req *rpc.ConfigAddRequest) (re } }() var path string - var sshConf = util.ParseSshFromRPC(req.SshJump) - path, err = util.SshJump(sshCtx, sshConf, flags, true) + var sshConf = ssh.ParseSshFromRPC(req.SshJump) + path, err = ssh.SshJump(sshCtx, sshConf, flags, true) if err != nil { return nil, err } diff --git a/pkg/daemon/action/connect-fork.go b/pkg/daemon/action/connect-fork.go index 30cb561d..5f79156e 100644 --- a/pkg/daemon/action/connect-fork.go +++ b/pkg/daemon/action/connect-fork.go @@ -12,6 +12,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -38,12 +39,12 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF OriginKubeconfigPath: req.OriginKubeconfigPath, Lock: &svr.Lock, } - var sshConf = util.ParseSshFromRPC(req.SshJump) + var sshConf = ssh.ParseSshFromRPC(req.SshJump) var transferImage = req.TransferImage defaultlog.Default().SetOutput(io.Discard) if transferImage { - err = util.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out) + err = ssh.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out) if err != nil { return err } @@ -71,7 +72,7 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF }() var path string - path, err = util.SshJump(sshCtx, sshConf, flags, false) + path, err = ssh.SshJump(sshCtx, sshConf, flags, false) if err != nil { return err } @@ -115,7 +116,7 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp Engine: config.Engine(req.Engine), OriginKubeconfigPath: req.OriginKubeconfigPath, } - var sshConf = util.ParseSshFromRPC(req.SshJump) + var sshConf = ssh.ParseSshFromRPC(req.SshJump) file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) if err != nil { return err @@ -136,7 +137,7 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp } }() var path string - path, err = util.SshJump(sshCtx, sshConf, flags, true) + path, err = ssh.SshJump(sshCtx, sshConf, flags, true) if err != nil { return err } diff --git a/pkg/daemon/action/connect.go b/pkg/daemon/action/connect.go index af20face..f783e2ce 100644 --- a/pkg/daemon/action/connect.go +++ b/pkg/daemon/action/connect.go @@ -15,6 +15,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -58,12 +59,12 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe OriginKubeconfigPath: req.OriginKubeconfigPath, Lock: &svr.Lock, } - var sshConf = util.ParseSshFromRPC(req.SshJump) + var sshConf = ssh.ParseSshFromRPC(req.SshJump) var transferImage = req.TransferImage defaultlog.Default().SetOutput(io.Discard) if transferImage { - err := util.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out) + err := ssh.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out) if err != nil { return err } @@ -89,7 +90,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe } }() var path string - path, err = util.SshJump(sshCtx, sshConf, flags, false) + path, err = ssh.SshJump(sshCtx, sshConf, flags, false) if err != nil { return err } @@ -132,7 +133,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon Engine: config.Engine(req.Engine), OriginKubeconfigPath: req.OriginKubeconfigPath, } - var sshConf = util.ParseSshFromRPC(req.SshJump) + var sshConf = ssh.ParseSshFromRPC(req.SshJump) file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) if err != nil { return err @@ -153,7 +154,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon } }() var path string - path, err = util.SshJump(sshCtx, sshConf, flags, true) + path, err = ssh.SshJump(sshCtx, sshConf, flags, true) if err != nil { return err } diff --git a/pkg/daemon/action/disconnect.go b/pkg/daemon/action/disconnect.go index 0af16858..a6986de2 100644 --- a/pkg/daemon/action/disconnect.go +++ b/pkg/daemon/action/disconnect.go @@ -13,6 +13,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/dns" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -143,9 +144,9 @@ func disconnectByKubeConfig(ctx context.Context, svr *Server, kubeconfigBytes st Name: "kubeconfig", DefValue: file, }) - var sshConf = util.ParseSshFromRPC(jump) + var sshConf = ssh.ParseSshFromRPC(jump) var path string - path, err = util.SshJump(ctx, sshConf, flags, false) + path, err = ssh.SshJump(ctx, sshConf, flags, false) if err != nil { return err } diff --git a/pkg/daemon/action/proxy.go b/pkg/daemon/action/proxy.go index 7cb37217..0394785d 100644 --- a/pkg/daemon/action/proxy.go +++ b/pkg/daemon/action/proxy.go @@ -12,6 +12,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -42,7 +43,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) ( Engine: config.Engine(req.Engine), OriginKubeconfigPath: req.OriginKubeconfigPath, } - var sshConf = util.ParseSshFromRPC(req.SshJump) + var sshConf = ssh.ParseSshFromRPC(req.SshJump) file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) if err != nil { @@ -54,7 +55,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) ( DefValue: file, }) var path string - path, err = util.SshJump(ctx, sshConf, flags, false) + path, err = ssh.SshJump(ctx, sshConf, flags, false) if err != nil { return err } diff --git a/pkg/daemon/action/reset.go b/pkg/daemon/action/reset.go index 08639185..9fe85d98 100644 --- a/pkg/daemon/action/reset.go +++ b/pkg/daemon/action/reset.go @@ -8,6 +8,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -34,10 +35,10 @@ func (svr *Server) Reset(req *rpc.ResetRequest, resp rpc.Daemon_ResetServer) err Name: "kubeconfig", DefValue: file, }) - var sshConf = util.ParseSshFromRPC(req.SshJump) + var sshConf = ssh.ParseSshFromRPC(req.SshJump) var ctx = resp.Context() var path string - path, err = util.SshJump(ctx, sshConf, flags, false) + path, err = ssh.SshJump(ctx, sshConf, flags, false) if err != nil { return err } diff --git a/pkg/daemon/client.go b/pkg/daemon/client.go index aa3fc46f..1f1c5b29 100644 --- a/pkg/daemon/client.go +++ b/pkg/daemon/client.go @@ -16,8 +16,8 @@ import ( _ "google.golang.org/grpc/resolver/passthrough" "github.com/wencaiwulue/kubevpn/v2/pkg/config" + "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/elevate" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" - "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) var daemonClient, sudoDaemonClient rpc.DaemonClient @@ -149,13 +149,13 @@ func runDaemon(ctx context.Context, exe string, isSudo bool) error { return err } if isSudo { - if !util.IsAdmin() { - err = util.RunCmdWithElevated(exe, []string{"daemon", "--sudo"}) + if !elevate.IsAdmin() { + err = elevate.RunCmdWithElevated(exe, []string{"daemon", "--sudo"}) } else { - err = util.RunCmd(exe, []string{"daemon", "--sudo"}) + err = elevate.RunCmd(exe, []string{"daemon", "--sudo"}) } } else { - err = util.RunCmd(exe, []string{"daemon"}) + err = elevate.RunCmd(exe, []string{"daemon"}) } if err != nil { return err diff --git a/pkg/util/elevate_others.go b/pkg/daemon/elevate/elevate_others.go similarity index 99% rename from pkg/util/elevate_others.go rename to pkg/daemon/elevate/elevate_others.go index 1a1e13e4..23d00f8b 100644 --- a/pkg/util/elevate_others.go +++ b/pkg/daemon/elevate/elevate_others.go @@ -1,6 +1,6 @@ //go:build !windows -package util +package elevate import ( "flag" diff --git a/pkg/util/elevate_windows.go b/pkg/daemon/elevate/elevate_windows.go similarity index 99% rename from pkg/util/elevate_windows.go rename to pkg/daemon/elevate/elevate_windows.go index f1d22dee..e186593c 100644 --- a/pkg/util/elevate_windows.go +++ b/pkg/daemon/elevate/elevate_windows.go @@ -1,6 +1,6 @@ //go:build windows -package util +package elevate import ( "os" diff --git a/pkg/util/elevatecheck_others.go b/pkg/daemon/elevate/elevatecheck_others.go similarity index 99% rename from pkg/util/elevatecheck_others.go rename to pkg/daemon/elevate/elevatecheck_others.go index 1d09f9e5..88527b16 100644 --- a/pkg/util/elevatecheck_others.go +++ b/pkg/daemon/elevate/elevatecheck_others.go @@ -1,6 +1,6 @@ //go:build !windows -package util +package elevate import ( "flag" diff --git a/pkg/util/elevatecheck_windows.go b/pkg/daemon/elevate/elevatecheck_windows.go similarity index 99% rename from pkg/util/elevatecheck_windows.go rename to pkg/daemon/elevate/elevatecheck_windows.go index 4f99c967..820593f2 100644 --- a/pkg/util/elevatecheck_windows.go +++ b/pkg/daemon/elevate/elevatecheck_windows.go @@ -1,6 +1,6 @@ //go:build windows -package util +package elevate import ( "fmt" diff --git a/pkg/daemon/handler/ssh.go b/pkg/daemon/handler/ssh.go index 53d545c8..533caf92 100644 --- a/pkg/daemon/handler/ssh.go +++ b/pkg/daemon/handler/ssh.go @@ -27,12 +27,13 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/core" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) type wsHandler struct { conn *websocket.Conn - sshConfig *util.SshConfig + sshConfig *pkgssh.SshConfig cidr []string width int height int @@ -49,7 +50,7 @@ func (w *wsHandler) handle(c context.Context) { ctx, f := context.WithCancel(c) defer f() - cli, err := util.DialSshRemote(ctx, w.sshConfig) + cli, err := pkgssh.DialSshRemote(ctx, w.sshConfig) if err != nil { w.Log("Dial ssh remote error: %v", err) return @@ -84,13 +85,13 @@ func (w *wsHandler) handle(c context.Context) { if err != nil { return } - err = util.PortMapUntil(ctx, w.sshConfig, remote, local) + err = pkgssh.PortMapUntil(ctx, w.sshConfig, remote, local) if err != nil { w.Log("Port map error: %v", err) return } cmd := fmt.Sprintf(`kubevpn ssh-daemon --client-ip %s`, clientIP.String()) - serverIP, stderr, err := util.RemoteRun(cli, cmd, nil) + serverIP, stderr, err := pkgssh.RemoteRun(cli, cmd, nil) if err != nil { log.Errorf("run error: %v", err) log.Errorf("run stdout: %v", string(serverIP)) @@ -146,8 +147,8 @@ func (w *wsHandler) handle(c context.Context) { // startup daemon process if daemon process not start func startDaemonProcess(cli *ssh.Client) string { startDaemonCmd := fmt.Sprintf(`kubevpn status > /dev/null 2>&1 &`) - _, _, _ = util.RemoteRun(cli, startDaemonCmd, nil) - output, _, err := util.RemoteRun(cli, "kubevpn version", nil) + _, _, _ = pkgssh.RemoteRun(cli, startDaemonCmd, nil) + output, _, err := pkgssh.RemoteRun(cli, "kubevpn version", nil) if err != nil { return "" } @@ -260,7 +261,7 @@ func (w *wsHandler) installKubevpnOnRemote(ctx context.Context, sshClient *ssh.C }() cmd := "kubevpn version" - _, _, err = util.RemoteRun(sshClient, cmd, nil) + _, _, err = pkgssh.RemoteRun(sshClient, cmd, nil) if err == nil { w.Log("Found command kubevpn command on remote") return nil @@ -319,7 +320,7 @@ func (w *wsHandler) installKubevpnOnRemote(ctx context.Context, sshClient *ssh.C "chmod +x ~/.kubevpn/kubevpn", "sudo mv ~/.kubevpn/kubevpn /usr/local/bin/kubevpn", } - err = util.SCPAndExec(w.conn, w.conn, sshClient, tempBin.Name(), "kubevpn", cmds...) + err = pkgssh.SCPAndExec(w.conn, w.conn, sshClient, tempBin.Name(), "kubevpn", cmds...) return err } @@ -344,7 +345,7 @@ var CondReady = make(map[string]context.Context) func init() { http.Handle("/ws", websocket.Handler(func(conn *websocket.Conn) { - var sshConfig util.SshConfig + var sshConfig pkgssh.SshConfig b := conn.Request().Header.Get("ssh") if err := json.Unmarshal([]byte(b), &sshConfig); err != nil { _, _ = conn.Write([]byte(err.Error())) diff --git a/pkg/dev/docker_utils.go b/pkg/dev/docker_utils.go index 9d92ee95..f8ca9074 100644 --- a/pkg/dev/docker_utils.go +++ b/pkg/dev/docker_utils.go @@ -35,7 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "github.com/wencaiwulue/kubevpn/v2/pkg/config" - "github.com/wencaiwulue/kubevpn/v2/pkg/util" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" ) func waitExitOrRemoved(ctx context.Context, apiClient client.APIClient, containerID string, waitRemove bool) <-chan int { @@ -576,7 +576,7 @@ func run(ctx context.Context, cli *client.Client, dockerCli *command.DockerCli, } } if needPull { - err = util.PullImage(ctx, runConfig.platform, cli, dockerCli, config.Image, nil) + err = pkgssh.PullImage(ctx, runConfig.platform, cli, dockerCli, config.Image, nil) if err != nil { logrus.Errorf("Failed to pull image: %s, err: %s", config.Image, err) return diff --git a/pkg/dev/options.go b/pkg/dev/options.go index ef163478..87aa6ba7 100644 --- a/pkg/dev/options.go +++ b/pkg/dev/options.go @@ -38,6 +38,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" "github.com/wencaiwulue/kubevpn/v2/pkg/inject" + pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -77,7 +78,7 @@ type Options struct { rollbackFuncList []func() error } -func (option *Options) Main(ctx context.Context, sshConfig *util.SshConfig, flags *pflag.FlagSet, transferImage bool) error { +func (option *Options) Main(ctx context.Context, sshConfig *pkgssh.SshConfig, flags *pflag.FlagSet, transferImage bool) error { mode := typescontainer.NetworkMode(option.ContainerOptions.netMode.NetworkMode()) if mode.IsContainer() { log.Infof("network mode container is %s", mode.ConnectedContainer()) @@ -122,7 +123,7 @@ func (option *Options) Main(ctx context.Context, sshConfig *util.SshConfig, flag } // Connect to cluster network on docker container or host -func (option *Options) Connect(ctx context.Context, sshConfig *util.SshConfig, transferImage bool, portBindings nat.PortMap) error { +func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig, transferImage bool, portBindings nat.PortMap) error { switch option.ConnectMode { case ConnectModeHost: daemonCli := daemon.GetClient(false) @@ -507,7 +508,7 @@ func (option *Options) InitClient(f cmdutil.Factory) (err error) { if option.Namespace, _, err = option.factory.ToRawKubeConfigLoader().Namespace(); err != nil { return } - if option.cli, option.dockerCli, err = util.GetClient(); err != nil { + if option.cli, option.dockerCli, err = pkgssh.GetClient(); err != nil { return err } return diff --git a/pkg/inject/envoy.go b/pkg/inject/envoy.go deleted file mode 100644 index 8b6d6eaa..00000000 --- a/pkg/inject/envoy.go +++ /dev/null @@ -1,356 +0,0 @@ -package inject - -import ( - "context" - "encoding/json" - "fmt" - "reflect" - "strings" - "time" - - "github.com/pkg/errors" - log "github.com/sirupsen/logrus" - v1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/types" - k8sjson "k8s.io/apimachinery/pkg/util/json" - "k8s.io/apimachinery/pkg/util/sets" - pkgresource "k8s.io/cli-runtime/pkg/resource" - runtimeresource "k8s.io/cli-runtime/pkg/resource" - v12 "k8s.io/client-go/kubernetes/typed/core/v1" - cmdutil "k8s.io/kubectl/pkg/cmd/util" - "sigs.k8s.io/yaml" - - "github.com/wencaiwulue/kubevpn/v2/pkg/config" - "github.com/wencaiwulue/kubevpn/v2/pkg/controlplane" - "github.com/wencaiwulue/kubevpn/v2/pkg/util" -) - -// https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio - -// InjectVPNAndEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 -func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, clientset v12.ConfigMapInterface, namespace, workload string, c util.PodRouteConfig, headers map[string]string, portMaps []string) (err error) { - var object *runtimeresource.Info - object, err = util.GetUnstructuredObject(factory, namespace, workload) - if err != nil { - return err - } - - u := object.Object.(*unstructured.Unstructured) - var templateSpec *v1.PodTemplateSpec - var path []string - templateSpec, path, err = util.GetPodTemplateSpecPath(u) - if err != nil { - return err - } - - origin := templateSpec.DeepCopy() - - var ports []v1.ContainerPort - for _, container := range templateSpec.Spec.Containers { - ports = append(ports, container.Ports...) - } - for _, portMap := range portMaps { - var found = func(containerPort int32) bool { - for _, port := range ports { - if port.ContainerPort == containerPort { - return true - } - } - return false - } - port := util.ParsePort(portMap) - port.HostPort = 0 - if port.ContainerPort != 0 && !found(port.ContainerPort) { - ports = append(ports, port) - } - } - var portmap = make(map[int32]int32) - for _, port := range ports { - portmap[port.ContainerPort] = port.ContainerPort - } - for _, portMap := range portMaps { - port := util.ParsePort(portMap) - if port.ContainerPort != 0 { - portmap[port.ContainerPort] = port.HostPort - } - } - - nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) - - err = addEnvoyConfig(clientset, nodeID, c, headers, ports, portmap) - if err != nil { - log.Errorf("add envoy config error: %v", err) - return err - } - - // already inject container vpn and envoy-proxy, do nothing - containerNames := sets.New[string]() - for _, container := range templateSpec.Spec.Containers { - containerNames.Insert(container.Name) - } - if containerNames.HasAll(config.ContainerSidecarVPN, config.ContainerSidecarEnvoyProxy) { - // add rollback func to remove envoy config - //rollbackFuncList = append(rollbackFuncList, func() { - // err := UnPatchContainer(factory, clientset, namespace, workload, c.LocalTunIPv4) - // if err != nil { - // log.Error(err) - // } - //}) - log.Infof("workload %s/%s has already been injected with sidecar", namespace, workload) - return nil - } - // (1) add mesh container - removePatch, restorePatch := patch(*origin, path) - var b []byte - b, err = k8sjson.Marshal(restorePatch) - if err != nil { - log.Errorf("marshal patch error: %v", err) - return err - } - - AddMeshContainer(templateSpec, nodeID, c) - helper := pkgresource.NewHelper(object.Client, object.Mapping) - ps := []P{ - { - Op: "replace", - Path: "/" + strings.Join(append(path, "spec"), "/"), - Value: templateSpec.Spec, - }, - { - Op: "replace", - Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey, - Value: string(b), - }, - } - var bytes []byte - bytes, err = k8sjson.Marshal(append(ps, removePatch...)) - if err != nil { - return err - } - _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) - if err != nil { - log.Errorf("error while path resource: %s %s, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err) - return err - } - log.Infof("patch workload %s/%s with sidecar", namespace, workload) - err = util.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60) - return err -} - -func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, namespace, workload string, localTunIPv4 string) error { - object, err := util.GetUnstructuredObject(factory, namespace, workload) - if err != nil { - log.Errorf("get unstructured object error: %v", err) - return err - } - - u := object.Object.(*unstructured.Unstructured) - templateSpec, depth, err := util.GetPodTemplateSpecPath(u) - if err != nil { - log.Errorf("get template spec path error: %v", err) - return err - } - - nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) - - var empty, found bool - empty, found, err = removeEnvoyConfig(mapInterface, nodeID, localTunIPv4) - if err != nil { - log.Errorf("remove envoy config error: %v", err) - return err - } - if !found { - log.Infof("not proxy resource %s", workload) - return nil - } - - log.Infof("leave workload %s", workload) - - RemoveContainers(templateSpec) - if u.GetAnnotations() != nil && u.GetAnnotations()[config.KubeVPNRestorePatchKey] != "" { - patchStr := u.GetAnnotations()[config.KubeVPNRestorePatchKey] - var ps []P - err = json.Unmarshal([]byte(patchStr), &ps) - if err != nil { - return fmt.Errorf("unmarshal json patch: %s failed, err: %v", patchStr, err) - } - fromPatchToProbe(templateSpec, depth, ps) - } - - if empty { - helper := pkgresource.NewHelper(object.Client, object.Mapping) - // pod without controller - if len(depth) == 0 { - log.Infof("workload %s/%s is not controlled by any controller", namespace, workload) - delete(templateSpec.ObjectMeta.GetAnnotations(), config.KubeVPNRestorePatchKey) - pod := &v1.Pod{ObjectMeta: templateSpec.ObjectMeta, Spec: templateSpec.Spec} - CleanupUselessInfo(pod) - err = CreateAfterDeletePod(factory, pod, helper) - return err - } - - log.Infof("workload %s/%s is controlled by a controller", namespace, workload) - // resource with controller, like deployment,statefulset - var bytes []byte - bytes, err = json.Marshal([]P{ - { - Op: "replace", - Path: "/" + strings.Join(append(depth, "spec"), "/"), - Value: templateSpec.Spec, - }, - { - Op: "replace", - Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey, - Value: "", - }, - }) - if err != nil { - log.Errorf("error while generating json patch: %v", err) - return err - } - _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) - if err != nil { - log.Errorf("error while patching resource: %s %s, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err) - return err - } - } - return err -} - -func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, tunIP util.PodRouteConfig, headers map[string]string, port []v1.ContainerPort, portmap map[int32]int32) error { - configMap, err := mapInterface.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{}) - if err != nil { - return err - } - var v = make([]*controlplane.Virtual, 0) - if str, ok := configMap.Data[config.KeyEnvoy]; ok { - if err = yaml.Unmarshal([]byte(str), &v); err != nil { - return err - } - } - - v = addVirtualRule(v, nodeID, port, headers, tunIP, portmap) - marshal, err := yaml.Marshal(v) - if err != nil { - return err - } - configMap.Data[config.KeyEnvoy] = string(marshal) - _, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{}) - return err -} - -func addVirtualRule(v []*controlplane.Virtual, nodeID string, port []v1.ContainerPort, headers map[string]string, tunIP util.PodRouteConfig, portmap map[int32]int32) []*controlplane.Virtual { - var index = -1 - for i, virtual := range v { - if nodeID == virtual.Uid { - index = i - break - } - } - // 1) if not found uid, means nobody proxying it, just add it - if index < 0 { - return append(v, &controlplane.Virtual{ - Uid: nodeID, - Ports: port, - Rules: []*controlplane.Rule{{ - Headers: headers, - LocalTunIPv4: tunIP.LocalTunIPv4, - LocalTunIPv6: tunIP.LocalTunIPv6, - PortMap: portmap, - }}, - }) - } - - // 2) if already proxy deployment/xxx with header a=1. also want to add b=2 - for j, rule := range v[index].Rules { - if rule.LocalTunIPv4 == tunIP.LocalTunIPv4 && - rule.LocalTunIPv6 == tunIP.LocalTunIPv6 { - v[index].Rules[j].Headers = util.Merge[string, string](v[index].Rules[j].Headers, headers) - v[index].Rules[j].PortMap = util.Merge[int32, int32](v[index].Rules[j].PortMap, portmap) - return v - } - } - - // 3) if already proxy deployment/xxx with header a=1, other user can replace it to self - for j, rule := range v[index].Rules { - if reflect.DeepEqual(rule.Headers, headers) { - v[index].Rules[j].LocalTunIPv6 = tunIP.LocalTunIPv6 - v[index].Rules[j].LocalTunIPv4 = tunIP.LocalTunIPv4 - v[index].Rules[j].PortMap = portmap - return v - } - } - - // 4) if header is not same and tunIP is not same, means another users, just add it - v[index].Rules = append(v[index].Rules, &controlplane.Rule{ - Headers: headers, - LocalTunIPv4: tunIP.LocalTunIPv4, - LocalTunIPv6: tunIP.LocalTunIPv6, - PortMap: portmap, - }) - if v[index].Ports == nil { - v[index].Ports = port - } - return v -} - -func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTunIPv4 string) (empty bool, found bool, err error) { - configMap, err := mapInterface.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{}) - if k8serrors.IsNotFound(err) { - return true, false, nil - } - if err != nil { - return false, false, err - } - str, ok := configMap.Data[config.KeyEnvoy] - if !ok { - return false, false, errors.New("can not found value for key: envoy-config.yaml") - } - var v []*controlplane.Virtual - if err = yaml.Unmarshal([]byte(str), &v); err != nil { - return false, false, err - } - for _, virtual := range v { - if nodeID == virtual.Uid { - for i := 0; i < len(virtual.Rules); i++ { - if virtual.Rules[i].LocalTunIPv4 == localTunIPv4 { - found = true - virtual.Rules = append(virtual.Rules[:i], virtual.Rules[i+1:]...) - i-- - } - } - } - } - if !found { - return false, false, nil - } - - // remove default - for i := 0; i < len(v); i++ { - if nodeID == v[i].Uid && len(v[i].Rules) == 0 { - v = append(v[:i], v[i+1:]...) - i-- - empty = true - } - } - var bytes []byte - bytes, err = yaml.Marshal(v) - if err != nil { - return false, found, err - } - configMap.Data[config.KeyEnvoy] = string(bytes) - _, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{}) - return empty, found, err -} - -func contains(a map[string]string, sub map[string]string) bool { - for k, v := range sub { - if a[k] != v { - return false - } - } - return true -} diff --git a/pkg/inject/mesh.go b/pkg/inject/mesh.go index 5bf63692..8b6d6eaa 100644 --- a/pkg/inject/mesh.go +++ b/pkg/inject/mesh.go @@ -3,299 +3,354 @@ package inject import ( "context" "encoding/json" - errors2 "errors" "fmt" - "strconv" + "reflect" "strings" "time" - "github.com/sirupsen/logrus" - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" - json2 "k8s.io/apimachinery/pkg/util/json" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/cli-runtime/pkg/resource" - "k8s.io/client-go/util/retry" - "k8s.io/kubectl/pkg/cmd/util" - "k8s.io/utils/pointer" + k8sjson "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/sets" + pkgresource "k8s.io/cli-runtime/pkg/resource" + runtimeresource "k8s.io/cli-runtime/pkg/resource" + v12 "k8s.io/client-go/kubernetes/typed/core/v1" + cmdutil "k8s.io/kubectl/pkg/cmd/util" + "sigs.k8s.io/yaml" "github.com/wencaiwulue/kubevpn/v2/pkg/config" - util2 "github.com/wencaiwulue/kubevpn/v2/pkg/util" + "github.com/wencaiwulue/kubevpn/v2/pkg/controlplane" + "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func InjectVPNSidecar(ctx1 context.Context, factory util.Factory, namespace, workload string, c util2.PodRouteConfig) error { - object, err := util2.GetUnstructuredObject(factory, namespace, workload) +// https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio + +// InjectVPNAndEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 +func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, clientset v12.ConfigMapInterface, namespace, workload string, c util.PodRouteConfig, headers map[string]string, portMaps []string) (err error) { + var object *runtimeresource.Info + object, err = util.GetUnstructuredObject(factory, namespace, workload) if err != nil { return err } u := object.Object.(*unstructured.Unstructured) - - podTempSpec, path, err := util2.GetPodTemplateSpecPath(u) + var templateSpec *v1.PodTemplateSpec + var path []string + templateSpec, path, err = util.GetPodTemplateSpecPath(u) if err != nil { return err } - clientset, err := factory.KubernetesClientSet() - if err != nil { - return err - } - nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) + origin := templateSpec.DeepCopy() + var ports []v1.ContainerPort - for _, container := range podTempSpec.Spec.Containers { + for _, container := range templateSpec.Spec.Containers { ports = append(ports, container.Ports...) } + for _, portMap := range portMaps { + var found = func(containerPort int32) bool { + for _, port := range ports { + if port.ContainerPort == containerPort { + return true + } + } + return false + } + port := util.ParsePort(portMap) + port.HostPort = 0 + if port.ContainerPort != 0 && !found(port.ContainerPort) { + ports = append(ports, port) + } + } var portmap = make(map[int32]int32) for _, port := range ports { portmap[port.ContainerPort] = port.ContainerPort } - err = addEnvoyConfig(clientset.CoreV1().ConfigMaps(namespace), nodeID, c, nil, ports, portmap) + for _, portMap := range portMaps { + port := util.ParsePort(portMap) + if port.ContainerPort != 0 { + portmap[port.ContainerPort] = port.HostPort + } + } + + nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) + + err = addEnvoyConfig(clientset, nodeID, c, headers, ports, portmap) if err != nil { - logrus.Errorf("add envoy config error: %v", err) + log.Errorf("add envoy config error: %v", err) return err } - origin := *podTempSpec - AddContainer(&podTempSpec.Spec, c) - - helper := resource.NewHelper(object.Client, object.Mapping) - // pods without controller - if len(path) == 0 { - logrus.Infof("workload %s/%s is not controlled by any controller", namespace, workload) - for _, container := range podTempSpec.Spec.Containers { - container.LivenessProbe = nil - container.StartupProbe = nil - container.ReadinessProbe = nil - } - p := &v1.Pod{ObjectMeta: podTempSpec.ObjectMeta, Spec: podTempSpec.Spec} - CleanupUselessInfo(p) - if err = CreateAfterDeletePod(factory, p, helper); err != nil { - return err - } - } else - // controllers - { - logrus.Infof("workload %s/%s is controlled by a controller", namespace, workload) - // remove probe - removePatch, restorePatch := patch(origin, path) - b, _ := json.Marshal(restorePatch) - p := []P{ - { - Op: "replace", - Path: "/" + strings.Join(append(path, "spec"), "/"), - Value: podTempSpec.Spec, - }, - { - Op: "replace", - Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey, - Value: string(b), - }, - } - marshal, _ := json.Marshal(append(p, removePatch...)) - _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, marshal, &v12.PatchOptions{}) - if err != nil { - logrus.Errorf("error while inject proxy container, err: %v, exiting...", err) - return err - } + // already inject container vpn and envoy-proxy, do nothing + containerNames := sets.New[string]() + for _, container := range templateSpec.Spec.Containers { + containerNames.Insert(container.Name) } - err = util2.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60) + if containerNames.HasAll(config.ContainerSidecarVPN, config.ContainerSidecarEnvoyProxy) { + // add rollback func to remove envoy config + //rollbackFuncList = append(rollbackFuncList, func() { + // err := UnPatchContainer(factory, clientset, namespace, workload, c.LocalTunIPv4) + // if err != nil { + // log.Error(err) + // } + //}) + log.Infof("workload %s/%s has already been injected with sidecar", namespace, workload) + return nil + } + // (1) add mesh container + removePatch, restorePatch := patch(*origin, path) + var b []byte + b, err = k8sjson.Marshal(restorePatch) + if err != nil { + log.Errorf("marshal patch error: %v", err) + return err + } + + AddMeshContainer(templateSpec, nodeID, c) + helper := pkgresource.NewHelper(object.Client, object.Mapping) + ps := []P{ + { + Op: "replace", + Path: "/" + strings.Join(append(path, "spec"), "/"), + Value: templateSpec.Spec, + }, + { + Op: "replace", + Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey, + Value: string(b), + }, + } + var bytes []byte + bytes, err = k8sjson.Marshal(append(ps, removePatch...)) + if err != nil { + return err + } + _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) + if err != nil { + log.Errorf("error while path resource: %s %s, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err) + return err + } + log.Infof("patch workload %s/%s with sidecar", namespace, workload) + err = util.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60) return err } -func CreateAfterDeletePod(factory util.Factory, p *v1.Pod, helper *resource.Helper) error { - _, err := helper.DeleteWithOptions(p.Namespace, p.Name, &v12.DeleteOptions{ - GracePeriodSeconds: pointer.Int64(0), - }) - if err != nil { - logrus.Errorf("error while delete resource: %s %s, ignore, err: %v", p.Namespace, p.Name, err) - } - err = retry.OnError(wait.Backoff{ - Steps: 10, - Duration: 50 * time.Millisecond, - Factor: 5.0, - Jitter: 1, - }, func(err error) bool { - if !errors.IsAlreadyExists(err) { - return true - } - clientset, err := factory.KubernetesClientSet() - get, err := clientset.CoreV1().Pods(p.Namespace).Get(context.Background(), p.Name, v12.GetOptions{}) - if err != nil || get.Status.Phase != v1.PodRunning { - return true - } - return false - }, func() error { - if _, err := helper.Create(p.Namespace, true, p); err != nil { - return err - } - return errors2.New("") - }) - if err != nil { - if errors.IsAlreadyExists(err) { - return nil - } - logrus.Errorf("error while create resource: %s %s, err: %v", p.Namespace, p.Name, err) - return err - } - return nil -} - -func removeInboundContainer(factory util.Factory, namespace, workloads string) error { - object, err := util2.GetUnstructuredObject(factory, namespace, workloads) +func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, namespace, workload string, localTunIPv4 string) error { + object, err := util.GetUnstructuredObject(factory, namespace, workload) if err != nil { + log.Errorf("get unstructured object error: %v", err) return err } u := object.Object.(*unstructured.Unstructured) - - podTempSpec, path, err := util2.GetPodTemplateSpecPath(u) + templateSpec, depth, err := util.GetPodTemplateSpecPath(u) if err != nil { + log.Errorf("get template spec path error: %v", err) return err } - helper := resource.NewHelper(object.Client, object.Mapping) + nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) - // pods - if len(path) == 0 { - _, err = helper.DeleteWithOptions(object.Namespace, object.Name, &v12.DeleteOptions{ - GracePeriodSeconds: pointer.Int64(0), + var empty, found bool + empty, found, err = removeEnvoyConfig(mapInterface, nodeID, localTunIPv4) + if err != nil { + log.Errorf("remove envoy config error: %v", err) + return err + } + if !found { + log.Infof("not proxy resource %s", workload) + return nil + } + + log.Infof("leave workload %s", workload) + + RemoveContainers(templateSpec) + if u.GetAnnotations() != nil && u.GetAnnotations()[config.KubeVPNRestorePatchKey] != "" { + patchStr := u.GetAnnotations()[config.KubeVPNRestorePatchKey] + var ps []P + err = json.Unmarshal([]byte(patchStr), &ps) + if err != nil { + return fmt.Errorf("unmarshal json patch: %s failed, err: %v", patchStr, err) + } + fromPatchToProbe(templateSpec, depth, ps) + } + + if empty { + helper := pkgresource.NewHelper(object.Client, object.Mapping) + // pod without controller + if len(depth) == 0 { + log.Infof("workload %s/%s is not controlled by any controller", namespace, workload) + delete(templateSpec.ObjectMeta.GetAnnotations(), config.KubeVPNRestorePatchKey) + pod := &v1.Pod{ObjectMeta: templateSpec.ObjectMeta, Spec: templateSpec.Spec} + CleanupUselessInfo(pod) + err = CreateAfterDeletePod(factory, pod, helper) + return err + } + + log.Infof("workload %s/%s is controlled by a controller", namespace, workload) + // resource with controller, like deployment,statefulset + var bytes []byte + bytes, err = json.Marshal([]P{ + { + Op: "replace", + Path: "/" + strings.Join(append(depth, "spec"), "/"), + Value: templateSpec.Spec, + }, + { + Op: "replace", + Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey, + Value: "", + }, }) if err != nil { + log.Errorf("error while generating json patch: %v", err) + return err + } + _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) + if err != nil { + log.Errorf("error while patching resource: %s %s, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err) return err } } - // how to scale to one - RemoveContainer(&podTempSpec.Spec) - - bytes, err := json.Marshal([]struct { - Op string `json:"op"` - Path string `json:"path"` - Value interface{} `json:"value"` - }{{ - Op: "replace", - Path: "/" + strings.Join(append(path, "spec"), "/"), - Value: podTempSpec.Spec, - }}) - if err != nil { - return err - } - _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &v12.PatchOptions{ - //Force: &t, - }) return err } -func CleanupUselessInfo(pod *v1.Pod) { - pod.SetSelfLink("") - pod.SetGeneration(0) - pod.SetResourceVersion("") - pod.SetUID("") - pod.SetDeletionTimestamp(nil) - pod.SetSelfLink("") - pod.SetManagedFields(nil) - pod.SetOwnerReferences(nil) -} - -type P struct { - Op string `json:"op,omitempty"` - Path string `json:"path,omitempty"` - Value interface{} `json:"value,omitempty"` -} - -func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) { - for i := range spec.Spec.Containers { - index := strconv.Itoa(i) - readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/") - livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/") - startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/") - f := func(p *v1.Probe) string { - if p == nil { - return "" - } - marshal, err := json2.Marshal(p) - if err != nil { - logrus.Errorf("error while json marshal: %v", err) - return "" - } - return string(marshal) +func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, tunIP util.PodRouteConfig, headers map[string]string, port []v1.ContainerPort, portmap map[int32]int32) error { + configMap, err := mapInterface.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{}) + if err != nil { + return err + } + var v = make([]*controlplane.Virtual, 0) + if str, ok := configMap.Data[config.KeyEnvoy]; ok { + if err = yaml.Unmarshal([]byte(str), &v); err != nil { + return err } - remove = append(remove, P{ - Op: "replace", - Path: readinessPath, - Value: nil, - }, P{ - Op: "replace", - Path: livenessPath, - Value: nil, - }, P{ - Op: "replace", - Path: startupPath, - Value: nil, - }) - restore = append(restore, P{ - Op: "replace", - Path: readinessPath, - Value: f(spec.Spec.Containers[i].ReadinessProbe), - }, P{ - Op: "replace", - Path: livenessPath, - Value: f(spec.Spec.Containers[i].LivenessProbe), - }, P{ - Op: "replace", - Path: startupPath, - Value: f(spec.Spec.Containers[i].StartupProbe), - }) } - return + + v = addVirtualRule(v, nodeID, port, headers, tunIP, portmap) + marshal, err := yaml.Marshal(v) + if err != nil { + return err + } + configMap.Data[config.KeyEnvoy] = string(marshal) + _, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{}) + return err } -func fromPatchToProbe(spec *v1.PodTemplateSpec, path []string, patch []P) { - // 3 = readiness + liveness + startup - if len(patch) != 3*len(spec.Spec.Containers) { - logrus.Debugf("patch not match container num, not restore") - return +func addVirtualRule(v []*controlplane.Virtual, nodeID string, port []v1.ContainerPort, headers map[string]string, tunIP util.PodRouteConfig, portmap map[int32]int32) []*controlplane.Virtual { + var index = -1 + for i, virtual := range v { + if nodeID == virtual.Uid { + index = i + break + } } - for i := range spec.Spec.Containers { - index := strconv.Itoa(i) - readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/") - livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/") - startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/") - var f = func(value any) *v1.Probe { - if value == nil { - return nil - } - str, ok := value.(string) - if ok && str == "" { - return nil - } - if !ok { - marshal, err := json2.Marshal(value) - if err != nil { - logrus.Errorf("error while json marshal: %v", err) - return nil + // 1) if not found uid, means nobody proxying it, just add it + if index < 0 { + return append(v, &controlplane.Virtual{ + Uid: nodeID, + Ports: port, + Rules: []*controlplane.Rule{{ + Headers: headers, + LocalTunIPv4: tunIP.LocalTunIPv4, + LocalTunIPv6: tunIP.LocalTunIPv6, + PortMap: portmap, + }}, + }) + } + + // 2) if already proxy deployment/xxx with header a=1. also want to add b=2 + for j, rule := range v[index].Rules { + if rule.LocalTunIPv4 == tunIP.LocalTunIPv4 && + rule.LocalTunIPv6 == tunIP.LocalTunIPv6 { + v[index].Rules[j].Headers = util.Merge[string, string](v[index].Rules[j].Headers, headers) + v[index].Rules[j].PortMap = util.Merge[int32, int32](v[index].Rules[j].PortMap, portmap) + return v + } + } + + // 3) if already proxy deployment/xxx with header a=1, other user can replace it to self + for j, rule := range v[index].Rules { + if reflect.DeepEqual(rule.Headers, headers) { + v[index].Rules[j].LocalTunIPv6 = tunIP.LocalTunIPv6 + v[index].Rules[j].LocalTunIPv4 = tunIP.LocalTunIPv4 + v[index].Rules[j].PortMap = portmap + return v + } + } + + // 4) if header is not same and tunIP is not same, means another users, just add it + v[index].Rules = append(v[index].Rules, &controlplane.Rule{ + Headers: headers, + LocalTunIPv4: tunIP.LocalTunIPv4, + LocalTunIPv6: tunIP.LocalTunIPv6, + PortMap: portmap, + }) + if v[index].Ports == nil { + v[index].Ports = port + } + return v +} + +func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTunIPv4 string) (empty bool, found bool, err error) { + configMap, err := mapInterface.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{}) + if k8serrors.IsNotFound(err) { + return true, false, nil + } + if err != nil { + return false, false, err + } + str, ok := configMap.Data[config.KeyEnvoy] + if !ok { + return false, false, errors.New("can not found value for key: envoy-config.yaml") + } + var v []*controlplane.Virtual + if err = yaml.Unmarshal([]byte(str), &v); err != nil { + return false, false, err + } + for _, virtual := range v { + if nodeID == virtual.Uid { + for i := 0; i < len(virtual.Rules); i++ { + if virtual.Rules[i].LocalTunIPv4 == localTunIPv4 { + found = true + virtual.Rules = append(virtual.Rules[:i], virtual.Rules[i+1:]...) + i-- } - str = string(marshal) - } - var probe v1.Probe - err := json2.Unmarshal([]byte(str), &probe) - if err != nil { - logrus.Errorf("error while json unmarsh: %v", err) - return nil - } - return &probe - } - for _, p := range patch { - switch p.Path { - case readinessPath: - spec.Spec.Containers[i].ReadinessProbe = f(p.Value) - case livenessPath: - spec.Spec.Containers[i].LivenessProbe = f(p.Value) - case startupPath: - spec.Spec.Containers[i].StartupProbe = f(p.Value) } } } + if !found { + return false, false, nil + } + + // remove default + for i := 0; i < len(v); i++ { + if nodeID == v[i].Uid && len(v[i].Rules) == 0 { + v = append(v[:i], v[i+1:]...) + i-- + empty = true + } + } + var bytes []byte + bytes, err = yaml.Marshal(v) + if err != nil { + return false, found, err + } + configMap.Data[config.KeyEnvoy] = string(bytes) + _, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{}) + return empty, found, err +} + +func contains(a map[string]string, sub map[string]string) bool { + for k, v := range sub { + if a[k] != v { + return false + } + } + return true } diff --git a/pkg/inject/proxy.go b/pkg/inject/proxy.go new file mode 100644 index 00000000..5bf63692 --- /dev/null +++ b/pkg/inject/proxy.go @@ -0,0 +1,301 @@ +package inject + +import ( + "context" + "encoding/json" + errors2 "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + json2 "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/util/retry" + "k8s.io/kubectl/pkg/cmd/util" + "k8s.io/utils/pointer" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" + util2 "github.com/wencaiwulue/kubevpn/v2/pkg/util" +) + +func InjectVPNSidecar(ctx1 context.Context, factory util.Factory, namespace, workload string, c util2.PodRouteConfig) error { + object, err := util2.GetUnstructuredObject(factory, namespace, workload) + if err != nil { + return err + } + + u := object.Object.(*unstructured.Unstructured) + + podTempSpec, path, err := util2.GetPodTemplateSpecPath(u) + if err != nil { + return err + } + + clientset, err := factory.KubernetesClientSet() + if err != nil { + return err + } + nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) + var ports []v1.ContainerPort + for _, container := range podTempSpec.Spec.Containers { + ports = append(ports, container.Ports...) + } + var portmap = make(map[int32]int32) + for _, port := range ports { + portmap[port.ContainerPort] = port.ContainerPort + } + err = addEnvoyConfig(clientset.CoreV1().ConfigMaps(namespace), nodeID, c, nil, ports, portmap) + if err != nil { + logrus.Errorf("add envoy config error: %v", err) + return err + } + + origin := *podTempSpec + AddContainer(&podTempSpec.Spec, c) + + helper := resource.NewHelper(object.Client, object.Mapping) + // pods without controller + if len(path) == 0 { + logrus.Infof("workload %s/%s is not controlled by any controller", namespace, workload) + for _, container := range podTempSpec.Spec.Containers { + container.LivenessProbe = nil + container.StartupProbe = nil + container.ReadinessProbe = nil + } + p := &v1.Pod{ObjectMeta: podTempSpec.ObjectMeta, Spec: podTempSpec.Spec} + CleanupUselessInfo(p) + if err = CreateAfterDeletePod(factory, p, helper); err != nil { + return err + } + } else + // controllers + { + logrus.Infof("workload %s/%s is controlled by a controller", namespace, workload) + // remove probe + removePatch, restorePatch := patch(origin, path) + b, _ := json.Marshal(restorePatch) + p := []P{ + { + Op: "replace", + Path: "/" + strings.Join(append(path, "spec"), "/"), + Value: podTempSpec.Spec, + }, + { + Op: "replace", + Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey, + Value: string(b), + }, + } + marshal, _ := json.Marshal(append(p, removePatch...)) + _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, marshal, &v12.PatchOptions{}) + if err != nil { + logrus.Errorf("error while inject proxy container, err: %v, exiting...", err) + return err + } + } + err = util2.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60) + return err +} + +func CreateAfterDeletePod(factory util.Factory, p *v1.Pod, helper *resource.Helper) error { + _, err := helper.DeleteWithOptions(p.Namespace, p.Name, &v12.DeleteOptions{ + GracePeriodSeconds: pointer.Int64(0), + }) + if err != nil { + logrus.Errorf("error while delete resource: %s %s, ignore, err: %v", p.Namespace, p.Name, err) + } + err = retry.OnError(wait.Backoff{ + Steps: 10, + Duration: 50 * time.Millisecond, + Factor: 5.0, + Jitter: 1, + }, func(err error) bool { + if !errors.IsAlreadyExists(err) { + return true + } + clientset, err := factory.KubernetesClientSet() + get, err := clientset.CoreV1().Pods(p.Namespace).Get(context.Background(), p.Name, v12.GetOptions{}) + if err != nil || get.Status.Phase != v1.PodRunning { + return true + } + return false + }, func() error { + if _, err := helper.Create(p.Namespace, true, p); err != nil { + return err + } + return errors2.New("") + }) + if err != nil { + if errors.IsAlreadyExists(err) { + return nil + } + logrus.Errorf("error while create resource: %s %s, err: %v", p.Namespace, p.Name, err) + return err + } + return nil +} + +func removeInboundContainer(factory util.Factory, namespace, workloads string) error { + object, err := util2.GetUnstructuredObject(factory, namespace, workloads) + if err != nil { + return err + } + + u := object.Object.(*unstructured.Unstructured) + + podTempSpec, path, err := util2.GetPodTemplateSpecPath(u) + if err != nil { + return err + } + + helper := resource.NewHelper(object.Client, object.Mapping) + + // pods + if len(path) == 0 { + _, err = helper.DeleteWithOptions(object.Namespace, object.Name, &v12.DeleteOptions{ + GracePeriodSeconds: pointer.Int64(0), + }) + if err != nil { + return err + } + } + // how to scale to one + RemoveContainer(&podTempSpec.Spec) + + bytes, err := json.Marshal([]struct { + Op string `json:"op"` + Path string `json:"path"` + Value interface{} `json:"value"` + }{{ + Op: "replace", + Path: "/" + strings.Join(append(path, "spec"), "/"), + Value: podTempSpec.Spec, + }}) + if err != nil { + return err + } + _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &v12.PatchOptions{ + //Force: &t, + }) + return err +} + +func CleanupUselessInfo(pod *v1.Pod) { + pod.SetSelfLink("") + pod.SetGeneration(0) + pod.SetResourceVersion("") + pod.SetUID("") + pod.SetDeletionTimestamp(nil) + pod.SetSelfLink("") + pod.SetManagedFields(nil) + pod.SetOwnerReferences(nil) +} + +type P struct { + Op string `json:"op,omitempty"` + Path string `json:"path,omitempty"` + Value interface{} `json:"value,omitempty"` +} + +func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) { + for i := range spec.Spec.Containers { + index := strconv.Itoa(i) + readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/") + livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/") + startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/") + f := func(p *v1.Probe) string { + if p == nil { + return "" + } + marshal, err := json2.Marshal(p) + if err != nil { + logrus.Errorf("error while json marshal: %v", err) + return "" + } + return string(marshal) + } + remove = append(remove, P{ + Op: "replace", + Path: readinessPath, + Value: nil, + }, P{ + Op: "replace", + Path: livenessPath, + Value: nil, + }, P{ + Op: "replace", + Path: startupPath, + Value: nil, + }) + restore = append(restore, P{ + Op: "replace", + Path: readinessPath, + Value: f(spec.Spec.Containers[i].ReadinessProbe), + }, P{ + Op: "replace", + Path: livenessPath, + Value: f(spec.Spec.Containers[i].LivenessProbe), + }, P{ + Op: "replace", + Path: startupPath, + Value: f(spec.Spec.Containers[i].StartupProbe), + }) + } + return +} + +func fromPatchToProbe(spec *v1.PodTemplateSpec, path []string, patch []P) { + // 3 = readiness + liveness + startup + if len(patch) != 3*len(spec.Spec.Containers) { + logrus.Debugf("patch not match container num, not restore") + return + } + for i := range spec.Spec.Containers { + index := strconv.Itoa(i) + readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/") + livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/") + startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/") + var f = func(value any) *v1.Probe { + if value == nil { + return nil + } + str, ok := value.(string) + if ok && str == "" { + return nil + } + if !ok { + marshal, err := json2.Marshal(value) + if err != nil { + logrus.Errorf("error while json marshal: %v", err) + return nil + } + str = string(marshal) + } + var probe v1.Probe + err := json2.Unmarshal([]byte(str), &probe) + if err != nil { + logrus.Errorf("error while json unmarsh: %v", err) + return nil + } + return &probe + } + for _, p := range patch { + switch p.Path { + case readinessPath: + spec.Spec.Containers[i].ReadinessProbe = f(p.Value) + case livenessPath: + spec.Spec.Containers[i].LivenessProbe = f(p.Value) + case startupPath: + spec.Spec.Containers[i].StartupProbe = f(p.Value) + } + } + } +} diff --git a/pkg/util/gssapi.go b/pkg/ssh/gssapi.go similarity index 99% rename from pkg/util/gssapi.go rename to pkg/ssh/gssapi.go index 83e741c4..a109aad2 100644 --- a/pkg/util/gssapi.go +++ b/pkg/ssh/gssapi.go @@ -1,4 +1,4 @@ -package util +package ssh import ( "encoding/binary" diff --git a/pkg/util/gssapi_ccache.go b/pkg/ssh/gssapi_ccache.go similarity index 99% rename from pkg/util/gssapi_ccache.go rename to pkg/ssh/gssapi_ccache.go index 8194b4a8..82889f82 100644 --- a/pkg/util/gssapi_ccache.go +++ b/pkg/ssh/gssapi_ccache.go @@ -1,4 +1,4 @@ -package util +package ssh import ( "bytes" diff --git a/pkg/util/gssapi_kinit_test.go b/pkg/ssh/gssapi_kinit_test.go similarity index 99% rename from pkg/util/gssapi_kinit_test.go rename to pkg/ssh/gssapi_kinit_test.go index c2838083..e8eec7b2 100644 --- a/pkg/util/gssapi_kinit_test.go +++ b/pkg/ssh/gssapi_kinit_test.go @@ -1,4 +1,4 @@ -package util +package ssh import ( "os" diff --git a/pkg/util/gssapi_other.go b/pkg/ssh/gssapi_other.go similarity index 85% rename from pkg/util/gssapi_other.go rename to pkg/ssh/gssapi_other.go index 025a0161..62fd72cf 100644 --- a/pkg/util/gssapi_other.go +++ b/pkg/ssh/gssapi_other.go @@ -1,6 +1,6 @@ //go:build !windows -package util +package ssh func GetKrb5Path() string { return "/etc/krb5.conf" diff --git a/pkg/util/gssapi_windows.go b/pkg/ssh/gssapi_windows.go similarity index 88% rename from pkg/util/gssapi_windows.go rename to pkg/ssh/gssapi_windows.go index 851b133b..3dad4f23 100644 --- a/pkg/util/gssapi_windows.go +++ b/pkg/ssh/gssapi_windows.go @@ -1,6 +1,6 @@ //go:build windows -package util +package ssh func GetKrb5Path() string { return "C:\\ProgramData\\MIT\\Kerberos5\\krb5.ini" diff --git a/pkg/util/image.go b/pkg/ssh/image.go similarity index 76% rename from pkg/util/image.go rename to pkg/ssh/image.go index fafe6dd9..5ad85150 100644 --- a/pkg/util/image.go +++ b/pkg/ssh/image.go @@ -1,4 +1,4 @@ -package util +package ssh import ( "context" @@ -6,7 +6,6 @@ import ( "io" "os" "path/filepath" - "time" "github.com/distribution/reference" "github.com/docker/cli/cli/command" @@ -19,15 +18,11 @@ import ( registrytypes "github.com/docker/docker/api/types/registry" "github.com/docker/docker/client" "github.com/docker/docker/pkg/jsonmessage" - "github.com/hashicorp/go-version" "github.com/moby/term" "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" - v12 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/kubectl/pkg/cmd/util" - client2 "sigs.k8s.io/controller-runtime/pkg/client" ) func GetClient() (*client.Client, *command.DockerCli, error) { @@ -206,64 +201,3 @@ func PullImage(ctx context.Context, platform *v1.Platform, cli *client.Client, d } return nil } - -// UpdateImage update to newer image -func UpdateImage(ctx context.Context, factory util.Factory, ns string, deployName string, image string) error { - clientSet, err2 := factory.KubernetesClientSet() - if err2 != nil { - return err2 - } - deployment, err := clientSet.AppsV1().Deployments(ns).Get(ctx, deployName, v12.GetOptions{}) - if err != nil { - return err - } - origin := deployment.DeepCopy() - newImg, err := reference.ParseNormalizedNamed(image) - if err != nil { - return err - } - newTag, ok := newImg.(reference.NamedTagged) - if !ok { - return nil - } - oldImg, err := reference.ParseNormalizedNamed(deployment.Spec.Template.Spec.Containers[0].Image) - if err != nil { - return err - } - var oldTag reference.NamedTagged - oldTag, ok = oldImg.(reference.NamedTagged) - if !ok { - return nil - } - if reference.Domain(newImg) != reference.Domain(oldImg) { - return nil - } - var oldVersion, newVersion *version.Version - oldVersion, err = version.NewVersion(oldTag.Tag()) - if err != nil { - return nil - } - newVersion, err = version.NewVersion(newTag.Tag()) - if err != nil { - return nil - } - if oldVersion.GreaterThanOrEqual(newVersion) { - return nil - } - - log.Infof("found newer image %s, set image from %s to it...", image, deployment.Spec.Template.Spec.Containers[0].Image) - for i := range deployment.Spec.Template.Spec.Containers { - deployment.Spec.Template.Spec.Containers[i].Image = image - } - p := client2.MergeFrom(deployment) - data, err := client2.MergeFrom(origin).Data(deployment) - if err != nil { - return err - } - _, err = clientSet.AppsV1().Deployments(ns).Patch(ctx, deployName, p.Type(), data, v12.PatchOptions{}) - if err != nil { - return err - } - err = RolloutStatus(ctx, factory, ns, fmt.Sprintf("deployments/%s", deployName), time.Minute*60) - return err -} diff --git a/pkg/util/scp.go b/pkg/ssh/scp.go similarity index 99% rename from pkg/util/scp.go rename to pkg/ssh/scp.go index 27087c2a..bb5750f0 100644 --- a/pkg/util/scp.go +++ b/pkg/ssh/scp.go @@ -1,4 +1,4 @@ -package util +package ssh import ( "fmt" diff --git a/pkg/util/ssh.go b/pkg/ssh/ssh.go similarity index 99% rename from pkg/util/ssh.go rename to pkg/ssh/ssh.go index 33687be4..e15d6aa7 100644 --- a/pkg/util/ssh.go +++ b/pkg/ssh/ssh.go @@ -1,4 +1,4 @@ -package util +package ssh import ( "bytes" @@ -33,6 +33,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" + pkgutil "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) type SshConfig struct { @@ -713,7 +714,7 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b return } var port int - port, err = GetAvailableTCPPortOrDie() + port, err = pkgutil.GetAvailableTCPPortOrDie() if err != nil { return } diff --git a/pkg/upgrade/upgrade.go b/pkg/upgrade/upgrade.go index 20840682..f394334a 100644 --- a/pkg/upgrade/upgrade.go +++ b/pkg/upgrade/upgrade.go @@ -12,6 +12,7 @@ import ( goversion "github.com/hashicorp/go-version" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon" + "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/elevate" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -24,7 +25,7 @@ import ( // 6) check permission of putting new kubevpn back // 7) chmod +x, move old to /temp, move new to CURRENT_FOLDER func Main(ctx context.Context, client *http.Client, url string) error { - err := elevate() + err := elevatePermission() if err != nil { return err } @@ -92,7 +93,7 @@ func downloadAndInstall(client *http.Client, url string) error { return err } -func elevate() error { +func elevatePermission() error { executable, err := os.Executable() if err != nil { return err @@ -104,12 +105,12 @@ func elevate() error { _ = os.Remove(tem.Name()) } if os.IsPermission(err) { - util.RunWithElevated() + elevate.RunWithElevated() os.Exit(0) } else if err != nil { return err - } else if !util.IsAdmin() { - util.RunWithElevated() + } else if !elevate.IsAdmin() { + elevate.RunWithElevated() os.Exit(0) } return nil diff --git a/pkg/util/pod.go b/pkg/util/pod.go index dd0a1209..ddca75ca 100644 --- a/pkg/util/pod.go +++ b/pkg/util/pod.go @@ -11,6 +11,8 @@ import ( "text/tabwriter" "time" + "github.com/distribution/reference" + "github.com/hashicorp/go-version" "github.com/moby/term" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -35,6 +37,7 @@ import ( "k8s.io/kubectl/pkg/polymorphichelpers" scheme2 "k8s.io/kubectl/pkg/scheme" "k8s.io/kubectl/pkg/util/podutils" + pkgclient "sigs.k8s.io/controller-runtime/pkg/client" ) type PodRouteConfig struct { @@ -403,3 +406,64 @@ func GetRunningPodList(ctx context.Context, clientset *kubernetes.Clientset, ns } return list.Items, nil } + +// UpdateImage update to newer image +func UpdateImage(ctx context.Context, factory util.Factory, ns string, deployName string, image string) error { + clientSet, err2 := factory.KubernetesClientSet() + if err2 != nil { + return err2 + } + deployment, err := clientSet.AppsV1().Deployments(ns).Get(ctx, deployName, v1.GetOptions{}) + if err != nil { + return err + } + origin := deployment.DeepCopy() + newImg, err := reference.ParseNormalizedNamed(image) + if err != nil { + return err + } + newTag, ok := newImg.(reference.NamedTagged) + if !ok { + return nil + } + oldImg, err := reference.ParseNormalizedNamed(deployment.Spec.Template.Spec.Containers[0].Image) + if err != nil { + return err + } + var oldTag reference.NamedTagged + oldTag, ok = oldImg.(reference.NamedTagged) + if !ok { + return nil + } + if reference.Domain(newImg) != reference.Domain(oldImg) { + return nil + } + var oldVersion, newVersion *version.Version + oldVersion, err = version.NewVersion(oldTag.Tag()) + if err != nil { + return nil + } + newVersion, err = version.NewVersion(newTag.Tag()) + if err != nil { + return nil + } + if oldVersion.GreaterThanOrEqual(newVersion) { + return nil + } + + logrus.Infof("found newer image %s, set image from %s to it...", image, deployment.Spec.Template.Spec.Containers[0].Image) + for i := range deployment.Spec.Template.Spec.Containers { + deployment.Spec.Template.Spec.Containers[i].Image = image + } + p := pkgclient.MergeFrom(deployment) + data, err := pkgclient.MergeFrom(origin).Data(deployment) + if err != nil { + return err + } + _, err = clientSet.AppsV1().Deployments(ns).Patch(ctx, deployName, p.Type(), data, v1.PatchOptions{}) + if err != nil { + return err + } + err = RolloutStatus(ctx, factory, ns, fmt.Sprintf("deployments/%s", deployName), time.Minute*60) + return err +} diff --git a/pkg/util/upgarde.go b/pkg/util/upgrade.go similarity index 100% rename from pkg/util/upgarde.go rename to pkg/util/upgrade.go