diff --git a/cmd/kubevpn/cmds/connect.go b/cmd/kubevpn/cmds/connect.go index dca0df5c..be48a2e3 100644 --- a/cmd/kubevpn/cmds/connect.go +++ b/cmd/kubevpn/cmds/connect.go @@ -46,13 +46,13 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { return daemon.StartupDaemon(cmd.Context()) }, RunE: func(cmd *cobra.Command, args []string) error { - bytes, err := util.ConvertToKubeconfigBytes(f) + bytes, ns, err := util.ConvertToKubeconfigBytes(f) if err != nil { return err } req := &rpc.ConnectRequest{ KubeconfigBytes: string(bytes), - Namespace: connect.Namespace, + Namespace: ns, ExtraCIDR: connect.ExtraCIDR, ExtraDomain: connect.ExtraDomain, UseLocalDNS: connect.UseLocalDNS, diff --git a/cmd/kubevpn/cmds/cp.go b/cmd/kubevpn/cmds/cp.go index adb4bb3e..eaa8dd50 100644 --- a/cmd/kubevpn/cmds/cp.go +++ b/cmd/kubevpn/cmds/cp.go @@ -68,7 +68,7 @@ func CmdCp(f cmdutil.Factory) *cobra.Command { Long: i18n.T("Copy files and directories to and from containers. Different between kubectl cp is it will de-reference symbol link."), Example: cpExample, ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { - cmdutil.CheckErr(handler.SshJump(sshConf, cmd.Flags())) + cmdutil.CheckErr(handler.SshJump(cmd.Context(), sshConf, cmd.Flags())) var comps []string if len(args) == 0 { diff --git a/cmd/kubevpn/cmds/daemon.go b/cmd/kubevpn/cmds/daemon.go index f2a99158..fbd4e87b 100644 --- a/cmd/kubevpn/cmds/daemon.go +++ b/cmd/kubevpn/cmds/daemon.go @@ -1,9 +1,11 @@ package cmds import ( + "errors" "github.com/spf13/cobra" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/i18n" + "os" "github.com/wencaiwulue/kubevpn/pkg/daemon" ) @@ -14,6 +16,14 @@ func CmdDaemon(_ cmdutil.Factory) *cobra.Command { Use: "daemon", Short: i18n.T("Startup GRPC server"), Long: i18n.T(`Startup GRPC server`), + PreRunE: func(cmd *cobra.Command, args []string) error { + portPath := daemon.GetSockPath(opt.IsSudo) + err := os.Remove(portPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + return nil + }, RunE: func(cmd *cobra.Command, args []string) error { defer opt.Stop() return opt.Start(cmd.Context()) diff --git a/cmd/kubevpn/cmds/dev.go b/cmd/kubevpn/cmds/dev.go index dc6c7e2b..a77c141d 100644 --- a/cmd/kubevpn/cmds/dev.go +++ b/cmd/kubevpn/cmds/dev.go @@ -80,7 +80,7 @@ Startup your kubernetes workloads in local Docker container with same volume态e if devOptions.Engine == config.EngineGvisor { return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw) } - return handler.SshJump(sshConf, cmd.Flags()) + return handler.SshJump(cmd.Context(), sshConf, cmd.Flags()) }, RunE: func(cmd *cobra.Command, args []string) error { devOptions.Workload = args[0] diff --git a/cmd/kubevpn/cmds/duplicate.go b/cmd/kubevpn/cmds/duplicate.go index e5549783..32223699 100644 --- a/cmd/kubevpn/cmds/duplicate.go +++ b/cmd/kubevpn/cmds/duplicate.go @@ -60,7 +60,7 @@ func CmdDuplicate(f cmdutil.Factory) *cobra.Command { if duplicateOptions.Engine == config.EngineGvisor { return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw) } - return handler.SshJump(sshConf, cmd.Flags()) + return handler.SshJump(cmd.Context(), sshConf, cmd.Flags()) }, RunE: func(cmd *cobra.Command, args []string) error { if len(args) == 0 { diff --git a/cmd/kubevpn/cmds/get.go b/cmd/kubevpn/cmds/get.go index ddd73ce4..f9896ea5 100644 --- a/cmd/kubevpn/cmds/get.go +++ b/cmd/kubevpn/cmds/get.go @@ -42,7 +42,7 @@ func CmdGet(f cmdutil.Factory) *cobra.Command { return daemon.StartupDaemon(cmd.Context()) }, RunE: func(cmd *cobra.Command, args []string) error { - bytes, err := util.ConvertToKubeconfigBytes(f) + bytes, ns, err := util.ConvertToKubeconfigBytes(f) if err != nil { return err } @@ -50,7 +50,7 @@ func CmdGet(f cmdutil.Factory) *cobra.Command { cmd.Context(), &rpc.ConnectRequest{ KubeconfigBytes: string(bytes), - Namespace: connect.Namespace, + Namespace: ns, Headers: connect.Headers, Workloads: connect.Workloads, ExtraCIDR: connect.ExtraCIDR, diff --git a/cmd/kubevpn/cmds/proxy.go b/cmd/kubevpn/cmds/proxy.go index 06eb7673..a7d12959 100644 --- a/cmd/kubevpn/cmds/proxy.go +++ b/cmd/kubevpn/cmds/proxy.go @@ -74,7 +74,7 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command { return cmdutil.UsageErrorf(cmd, usageString) } - bytes, err := util.ConvertToKubeconfigBytes(f) + bytes, ns, err := util.ConvertToKubeconfigBytes(f) if err != nil { return err } @@ -83,7 +83,7 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command { cmd.Context(), &rpc.ConnectRequest{ KubeconfigBytes: string(bytes), - Namespace: connect.Namespace, + Namespace: ns, Headers: connect.Headers, Workloads: args, ExtraCIDR: connect.ExtraCIDR, diff --git a/cmd/kubevpn/cmds/reset.go b/cmd/kubevpn/cmds/reset.go index efb42647..0a5841ee 100644 --- a/cmd/kubevpn/cmds/reset.go +++ b/cmd/kubevpn/cmds/reset.go @@ -39,7 +39,7 @@ func CmdReset(factory cmdutil.Factory) *cobra.Command { `)), PreRunE: func(cmd *cobra.Command, args []string) error { - return handler.SshJump(sshConf, cmd.Flags()) + return handler.SshJump(cmd.Context(), sshConf, cmd.Flags()) }, Run: func(cmd *cobra.Command, args []string) { if err := connect.InitClient(factory); err != nil { diff --git a/pkg/daemon/action/connect.go b/pkg/daemon/action/connect.go index e46f26b1..20899bae 100644 --- a/pkg/daemon/action/connect.go +++ b/pkg/daemon/action/connect.go @@ -11,6 +11,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "github.com/spf13/pflag" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest" @@ -122,7 +123,16 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe return err } } - err = handler.SshJump(sshConf, nil) + tempFile, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) + if err != nil { + return err + } + flags := pflag.NewFlagSet("", pflag.ContinueOnError) + flags.AddFlag(&pflag.Flag{ + Name: "kubeconfig", + DefValue: tempFile, + }) + err = handler.SshJump(context.Background(), sshConf, flags) if err != nil { return err } @@ -139,6 +149,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe return err } + config.Image = req.Image err = svr.connect.DoConnect(context.Background()) if err != nil { log.Errorln(err) @@ -171,7 +182,16 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon ConfigAlias: req.ConfigAlias, RemoteKubeconfig: req.RemoteKubeconfig, } - err := handler.SshJump(sshConf, nil) + tempFile, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) + if err != nil { + return err + } + flags := pflag.NewFlagSet("", pflag.ContinueOnError) + flags.AddFlag(&pflag.Flag{ + Name: "kubeconfig", + DefValue: tempFile, + }) + err = handler.SshJump(context.Background(), sshConf, flags) if err != nil { return err } diff --git a/pkg/daemon/action/proxy.go b/pkg/daemon/action/proxy.go index 86b36da9..e642ff3d 100644 --- a/pkg/daemon/action/proxy.go +++ b/pkg/daemon/action/proxy.go @@ -63,7 +63,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e ConfigAlias: req.ConfigAlias, RemoteKubeconfig: req.RemoteKubeconfig, } - err := handler.SshJump(sshConf, nil) + err := handler.SshJump(ctx, sshConf, nil) if err != nil { return err } @@ -175,7 +175,7 @@ func (svr *Server) redirectToSudoDaemon1(req *rpc.ConnectRequest, resp rpc.Daemo ConfigAlias: req.ConfigAlias, RemoteKubeconfig: req.RemoteKubeconfig, } - err = handler.SshJump(sshConf, nil) + err = handler.SshJump(context.Background(), sshConf, nil) if err != nil { return err } diff --git a/pkg/dev/main.go b/pkg/dev/main.go index 8be550e8..dd6d89eb 100644 --- a/pkg/dev/main.go +++ b/pkg/dev/main.go @@ -476,13 +476,14 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImag } } var kubeconfig []byte - kubeconfig, err = util.ConvertToKubeconfigBytes(f) + var ns string + kubeconfig, ns, err = util.ConvertToKubeconfigBytes(f) if err != nil { return } req := &rpc.ConnectRequest{ KubeconfigBytes: string(kubeconfig), - Namespace: connect.Namespace, + Namespace: ns, Headers: connect.Headers, Workloads: connect.Workloads, ExtraCIDR: connect.ExtraCIDR, diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index def85ead..08a6aa51 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -104,15 +104,17 @@ func (c *ConnectOptions) RentInnerIP(ctx context.Context) (context.Context, erro if ok { ipv4s := md.Get(config.HeaderIPv4) if len(ipv4s) != 0 { - _, c.localTunIPv4, _ = net.ParseCIDR(ipv4s[0]) - if c.localTunIPv4 != nil { + ip, ipNet, err := net.ParseCIDR(ipv4s[0]) + if err == nil { + c.localTunIPv4 = &net.IPNet{IP: ip, Mask: ipNet.Mask} log.Debugf("get ipv4 %s from context", c.localTunIPv4.String()) } } ipv6s := md.Get(config.HeaderIPv6) if len(ipv6s) != 0 { - _, c.localTunIPv6, _ = net.ParseCIDR(ipv6s[0]) - if c.localTunIPv6 != nil { + ip, ipNet, err := net.ParseCIDR(ipv6s[0]) + if err == nil { + c.localTunIPv6 = &net.IPNet{IP: ip, Mask: ipNet.Mask} log.Debugf("get ipv6 %s from context", c.localTunIPv6.String()) } } @@ -677,7 +679,7 @@ func (c *ConnectOptions) InitClient(f cmdutil.Factory) (err error) { return } -func SshJump(conf *util.SshConfig, flags *pflag.FlagSet) (err error) { +func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (err error) { if conf.Addr == "" && conf.ConfigAlias == "" { return } @@ -729,8 +731,12 @@ func SshJump(conf *util.SshConfig, flags *pflag.FlagSet) (err error) { } else { if flags != nil { lookup := flags.Lookup("kubeconfig") - if lookup != nil && lookup.Value != nil && lookup.Value.String() != "" { - configFlags.KubeConfig = pointer.String(lookup.Value.String()) + if lookup != nil { + if lookup.Value != nil && lookup.Value.String() != "" { + configFlags.KubeConfig = pointer.String(lookup.Value.String()) + } else if lookup.DefValue != "" { + configFlags.KubeConfig = pointer.String(lookup.DefValue) + } } } } @@ -766,8 +772,9 @@ func SshJump(conf *util.SshConfig, flags *pflag.FlagSet) (err error) { errChan := make(chan error, 1) readyChan := make(chan struct{}, 1) go func() { - err := util.Main(&remote, local, conf, readyChan) + err := util.Main(ctx, &remote, local, conf, readyChan) if err != nil { + log.Errorf("ssh forward failed err: %v", err) errChan <- err return } @@ -776,6 +783,7 @@ func SshJump(conf *util.SshConfig, flags *pflag.FlagSet) (err error) { select { case <-readyChan: case err = <-errChan: + log.Errorf("ssh proxy err: %v", err) return err } diff --git a/pkg/util/log.go b/pkg/util/log.go index 3e53638e..ac6d52b9 100644 --- a/pkg/util/log.go +++ b/pkg/util/log.go @@ -2,6 +2,7 @@ package util import ( "fmt" + log "github.com/sirupsen/logrus" ) diff --git a/pkg/util/ns.go b/pkg/util/ns.go index 13a70b6a..6f625de1 100644 --- a/pkg/util/ns.go +++ b/pkg/util/ns.go @@ -3,6 +3,7 @@ package util import ( "context" "encoding/json" + "os" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -39,11 +40,36 @@ func IsSameCluster(client v12.ConfigMapInterface, namespace string, clientB v12. return a.UID == b.UID, nil } -func ConvertToKubeconfigBytes(factory cmdutil.Factory) ([]byte, error) { - rawConfig, err := factory.ToRawKubeConfigLoader().RawConfig() +func ConvertToKubeconfigBytes(factory cmdutil.Factory) ([]byte, string, error) { + loader := factory.ToRawKubeConfigLoader() + namespace, _, err2 := loader.Namespace() + if err2 != nil { + return nil, "", err2 + } + rawConfig, err := loader.RawConfig() convertedObj, err := latest.Scheme.ConvertToVersion(&rawConfig, latest.ExternalVersion) if err != nil { - return nil, err + return nil, "", err } - return json.Marshal(convertedObj) + marshal, err2 := json.Marshal(convertedObj) + if err2 != nil { + return nil, "", err2 + } + return marshal, namespace, nil +} + +func ConvertToTempFile(kubeconfigBytes []byte) (string, error) { + temp, err := os.CreateTemp("", "") + if err != nil { + return "", err + } + err = temp.Close() + if err != nil { + return "", err + } + err = os.WriteFile(temp.Name(), kubeconfigBytes, os.ModePerm) + if err != nil { + return "", err + } + return temp.Name(), nil } diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 6125a041..ea2b44db 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -2,6 +2,7 @@ package util import ( "bytes" + "context" "errors" "fmt" "io" @@ -28,7 +29,7 @@ type SshConfig struct { RemoteKubeconfig string } -func Main(remoteEndpoint, localEndpoint *netip.AddrPort, conf *SshConfig, done chan struct{}) error { +func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, conf *SshConfig, done chan struct{}) error { var remote *ssh.Client var err error if conf.ConfigAlias != "" { @@ -57,7 +58,8 @@ func Main(remoteEndpoint, localEndpoint *netip.AddrPort, conf *SshConfig, done c } // Listen on remote server port - listen, err := net.Listen("tcp", "localhost:0") + var lc net.ListenConfig + listen, err := lc.Listen(ctx, "tcp", "localhost:0") if err != nil { return err } @@ -70,6 +72,12 @@ func Main(remoteEndpoint, localEndpoint *netip.AddrPort, conf *SshConfig, done c done <- struct{}{} // handle incoming connections on reverse forwarded tunnel for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + local, err := listen.Accept() if err != nil { log.Error(err) @@ -89,6 +97,7 @@ func Main(remoteEndpoint, localEndpoint *netip.AddrPort, conf *SshConfig, done c if conn == nil { return } + defer conn.Close() handleClient(local, conn) }() }