diff --git a/cmd/kubevpn/cmds/dev.go b/cmd/kubevpn/cmds/dev.go index a2ab77da..aff8b045 100644 --- a/cmd/kubevpn/cmds/dev.go +++ b/cmd/kubevpn/cmds/dev.go @@ -110,7 +110,7 @@ Startup your kubernetes workloads in local Docker container with same volume态e } err = dev.DoDev(cmd.Context(), devOptions, sshConf, cmd.Flags(), f, transferImage) - for _, fun := range handler.RollbackFuncList { + for _, fun := range devOptions.RollbackFuncList { if fun != nil { fun() } diff --git a/cmd/kubevpn/cmds/status.go b/cmd/kubevpn/cmds/status.go index 0c4d3a66..0646ee7e 100644 --- a/cmd/kubevpn/cmds/status.go +++ b/cmd/kubevpn/cmds/status.go @@ -26,7 +26,7 @@ func CmdStatus(f cmdutil.Factory) *cobra.Command { return daemon.StartupDaemon(cmd.Context()) }, RunE: func(cmd *cobra.Command, args []string) error { - client, err := daemon.GetClient(false).Status( + client, err := daemon.GetClient(true).Status( cmd.Context(), &rpc.StatusRequest{}, ) diff --git a/pkg/daemon/action/connect-fork.go b/pkg/daemon/action/connect-fork.go index 8c6564bc..8265344a 100644 --- a/pkg/daemon/action/connect-fork.go +++ b/pkg/daemon/action/connect-fork.go @@ -4,15 +4,14 @@ import ( "context" "fmt" "io" - "os" - "os/exec" - "strings" + defaultlog "log" log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/util/sets" + "github.com/spf13/pflag" "github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" + "github.com/wencaiwulue/kubevpn/pkg/handler" "github.com/wencaiwulue/kubevpn/pkg/util" ) @@ -21,85 +20,165 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF log.SetOutput(svr.LogFile) log.SetLevel(log.DebugLevel) }() - out := io.MultiWriter(newWarp(resp), svr.LogFile) + out := io.MultiWriter(newConnectForkWarp(resp), svr.LogFile) log.SetOutput(out) log.SetLevel(log.InfoLevel) if !svr.IsSudo { - return fmt.Errorf("connect-fork should not send to sudo daemon server") + return svr.redirectConnectForkToSudoDaemon(req, resp) } ctx := resp.Context() - return fork(ctx, req, out) -} + connect := &handler.ConnectOptions{ + Namespace: req.Namespace, + Headers: req.Headers, + Workloads: req.Workloads, + ExtraCIDR: req.ExtraCIDR, + ExtraDomain: req.ExtraDomain, + UseLocalDNS: req.UseLocalDNS, + Engine: config.Engine(req.Engine), + } + var sshConf = util.ParseSshFromRPC(req.SshJump) + var transferImage = req.TransferImage -func fork(ctx context.Context, req *rpc.ConnectRequest, out io.Writer) error { - exe, err := os.Executable() - if err != nil { - return fmt.Errorf("get executable error: %s", err.Error()) - } - var args = []string{"connect-fork"} - if req.SshJump != nil { - if req.SshJump.Addr != "" { - args = append(args, "--ssh-addr", req.SshJump.Addr) - } - if req.SshJump.User != "" { - args = append(args, "--ssh-username", req.SshJump.User) - } - if req.SshJump.Password != "" { - args = append(args, "--ssh-password", req.SshJump.Password) - } - if req.SshJump.Keyfile != "" { - args = append(args, "--ssh-keyfile", req.SshJump.Keyfile) - } - if req.SshJump.ConfigAlias != "" { // alias in ~/.ssh/config - args = append(args, "--ssh-alias", req.SshJump.ConfigAlias) - } - if req.SshJump.RemoteKubeconfig != "" { // remote path in ssh server - args = append(args, "--remote-kubeconfig", req.SshJump.RemoteKubeconfig) - } - } - if req.KubeconfigBytes != "" { - var path string - path, err = util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) + go util.StartupPProf(config.PProfPort) + defaultlog.Default().SetOutput(io.Discard) + if transferImage { + err := util.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out) if err != nil { return err } - args = append(args, "--kubeconfig", path) } - if req.Namespace != "" { - args = append(args, "-n", req.Namespace) + file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) + if err != nil { + return err } - if req.Image != "" { - args = append(args, "--image", req.Image) + flags := pflag.NewFlagSet("", pflag.ContinueOnError) + flags.AddFlag(&pflag.Flag{ + Name: "kubeconfig", + DefValue: file, + }) + + sshCtx, sshCancel := context.WithCancel(context.Background()) + connect.RollbackFuncList = append(connect.RollbackFuncList, sshCancel) + var path string + path, err = handler.SshJump(sshCtx, sshConf, flags, false) + if err != nil { + return err } - if req.TransferImage { - args = append(args, "--transfer-image") + err = connect.InitClient(InitFactoryByPath(path, req.Namespace)) + if err != nil { + return err } - for _, v := range req.ExtraCIDR { - args = append(args, "--extra-cidr", v) + err = connect.PreCheckResource() + if err != nil { + return err } - for _, v := range req.ExtraDomain { - args = append(args, "--extra-domain", v) + _, err = connect.RentInnerIP(ctx) + if err != nil { + return err } - env := os.Environ() - envKeys := sets.New[string](config.EnvInboundPodTunIPv4, config.EnvInboundPodTunIPv6) - for i := 0; i < len(env); i++ { - index := strings.Index(env[i], "=") - envKey := env[i][:index] - if envKeys.HasAny(envKey) { - env = append(env[:i], env[i+1:]...) - i-- - continue - } - } - cmd := exec.CommandContext(ctx, exe, args...) - cmd.Env = env - cmd.Stdout = out - cmd.Stderr = out - err = cmd.Run() + config.Image = req.Image + err = connect.DoConnect(sshCtx) if err != nil { - return fmt.Errorf("fork to exec connect error: %s", err.Error()) + log.Errorf("do connect error: %v", err) + connect.Cleanup() + return err } + svr.secondaryConnect = append(svr.secondaryConnect, connect) return nil } + +func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServer) error { + cli := svr.GetClient(true) + if cli == nil { + return fmt.Errorf("sudo daemon not start") + } + connect := &handler.ConnectOptions{ + Namespace: req.Namespace, + Headers: req.Headers, + Workloads: req.Workloads, + ExtraCIDR: req.ExtraCIDR, + ExtraDomain: req.ExtraDomain, + UseLocalDNS: req.UseLocalDNS, + Engine: config.Engine(req.Engine), + } + var sshConf = util.ParseSshFromRPC(req.SshJump) + file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes)) + if err != nil { + return err + } + flags := pflag.NewFlagSet("", pflag.ContinueOnError) + flags.AddFlag(&pflag.Flag{ + Name: "kubeconfig", + DefValue: file, + }) + sshCtx, sshCancel := context.WithCancel(context.Background()) + connect.RollbackFuncList = append(connect.RollbackFuncList, sshCancel) + var path string + path, err = handler.SshJump(sshCtx, sshConf, flags, true) + if err != nil { + return err + } + err = connect.InitClient(InitFactoryByPath(path, req.Namespace)) + if err != nil { + return err + } + err = connect.PreCheckResource() + if err != nil { + return err + } + + for _, options := range svr.secondaryConnect { + var isSameCluster bool + isSameCluster, err = util.IsSameCluster( + options.GetClientset().CoreV1().ConfigMaps(options.Namespace), options.Namespace, + connect.GetClientset().CoreV1().ConfigMaps(connect.Namespace), connect.Namespace, + ) + if err == nil && isSameCluster && options.Equal(connect) { + // same cluster, do nothing + log.Infof("already connect to cluster") + return nil + } + } + + ctx, err := connect.RentInnerIP(resp.Context()) + if err != nil { + return err + } + + connResp, err := cli.Connect(ctx, req) + if err != nil { + return err + } + for { + recv, err := connResp.Recv() + if err == io.EOF { + break + } else if err != nil { + return err + } + err = resp.Send(recv) + if err != nil { + return err + } + } + + svr.secondaryConnect = append(svr.secondaryConnect, connect) + return nil +} + +type connectForkWarp struct { + server rpc.Daemon_ConnectServer +} + +func (r *connectForkWarp) Write(p []byte) (n int, err error) { + err = r.server.Send(&rpc.ConnectResponse{ + Message: string(p), + }) + return len(p), err +} + +func newConnectForkWarp(server rpc.Daemon_ConnectForkServer) io.Writer { + return &connectForkWarp{server: server} +} diff --git a/pkg/daemon/action/connect.go b/pkg/daemon/action/connect.go index ca65aace..7a96b528 100644 --- a/pkg/daemon/action/connect.go +++ b/pkg/daemon/action/connect.go @@ -75,7 +75,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe }) sshCtx, sshCancel := context.WithCancel(context.Background()) - handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel) + svr.connect.RollbackFuncList = append(svr.connect.RollbackFuncList, sshCancel) var path string path, err = handler.SshJump(sshCtx, sshConf, flags, false) if err != nil { @@ -129,7 +129,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon DefValue: file, }) sshCtx, sshCancel := context.WithCancel(context.Background()) - handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel) + connect.RollbackFuncList = append(connect.RollbackFuncList, sshCancel) var path string path, err = handler.SshJump(sshCtx, sshConf, flags, true) if err != nil { diff --git a/pkg/daemon/action/disconnect.go b/pkg/daemon/action/disconnect.go index ec35af0b..fe331747 100644 --- a/pkg/daemon/action/disconnect.go +++ b/pkg/daemon/action/disconnect.go @@ -57,6 +57,7 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon for _, options := range svr.secondaryConnect { options.Cleanup() } + svr.secondaryConnect = nil } else if req.ID != nil && req.GetID() == 0 { if svr.connect != nil { svr.connect.Cleanup() @@ -71,6 +72,7 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon index := req.GetID() - 1 if index < int32(len(svr.secondaryConnect)) { svr.secondaryConnect[index].Cleanup() + svr.secondaryConnect = append(svr.secondaryConnect[:index], svr.secondaryConnect[index+1:]...) } else { log.Errorf("index %d out of range", req.GetID()) } diff --git a/pkg/daemon/action/quit.go b/pkg/daemon/action/quit.go index 93ad5eb0..e54f4300 100644 --- a/pkg/daemon/action/quit.go +++ b/pkg/daemon/action/quit.go @@ -29,6 +29,10 @@ func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error log.Errorf("quit: cleanup clone failed: %v", err) } } + for _, options := range svr.secondaryConnect { + log.Info("quit: cleanup connection") + options.Cleanup() + } return nil } diff --git a/pkg/daemon/action/status.go b/pkg/daemon/action/status.go index 615f8689..216ad1f5 100644 --- a/pkg/daemon/action/status.go +++ b/pkg/daemon/action/status.go @@ -12,17 +12,19 @@ import ( func (svr *Server) Status(ctx context.Context, request *rpc.StatusRequest) (*rpc.StatusResponse, error) { var sb = new(bytes.Buffer) w := tabwriter.NewWriter(sb, 1, 1, 1, ' ', 0) - _, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", "ID", "Priority", "Context", "Status") + _, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", "ID", "Priority", "Context", "Namespace", "Status") status := "None" kubeContext := "" + namespace := "" if svr.connect != nil { status = "Connected" kubeContext = svr.connect.GetKubeconfigContext() + namespace = svr.connect.Namespace } - _, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\n", 0, "Main", kubeContext, status) + _, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\n", 0, "Main", kubeContext, namespace, status) for i, options := range svr.secondaryConnect { - _, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\n", i+1, "Minor", options.GetKubeconfigContext(), "Connected") + _, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\n", i+1, "Minor", options.GetKubeconfigContext(), options.Namespace, "Connected") } _ = w.Flush() return &rpc.StatusResponse{Message: sb.String()}, nil diff --git a/pkg/dev/convert.go b/pkg/dev/convert.go index dde7a568..d40a3025 100644 --- a/pkg/dev/convert.go +++ b/pkg/dev/convert.go @@ -29,7 +29,6 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/cp" "github.com/wencaiwulue/kubevpn/pkg/dns" - "github.com/wencaiwulue/kubevpn/pkg/handler" ) type RunConfig struct { @@ -195,7 +194,7 @@ func GetDNS(ctx context.Context, f util.Factory, ns, pod string) (*miekgdns.Clie } // GetVolume key format: [container name]-[volume mount name] -func GetVolume(ctx context.Context, f util.Factory, ns, pod string) (map[string][]mount.Mount, error) { +func GetVolume(ctx context.Context, f util.Factory, ns, pod string, d *Options) (map[string][]mount.Mount, error) { clientSet, err := f.KubernetesClientSet() if err != nil { return nil, err @@ -224,7 +223,7 @@ func GetVolume(ctx context.Context, f util.Factory, ns, pod string) (map[string] if volumeMount.SubPath != "" { join = filepath.Join(join, volumeMount.SubPath) } - handler.RollbackFuncList = append(handler.RollbackFuncList, func() { + d.RollbackFuncList = append(d.RollbackFuncList, func() { _ = os.RemoveAll(join) }) // pod-namespace/pod-name:path diff --git a/pkg/dev/main.go b/pkg/dev/main.go index 157e1b0b..c54bc222 100644 --- a/pkg/dev/main.go +++ b/pkg/dev/main.go @@ -78,6 +78,9 @@ type Options struct { // inner Cli *client.Client DockerCli *command.DockerCli + + // rollback + RollbackFuncList []func() } func (d *Options) Main(ctx context.Context, tempContainerConfig *containerConfig) error { @@ -124,7 +127,7 @@ func (d *Options) Main(ctx context.Context, tempContainerConfig *containerConfig log.Errorf("get env from k8s: %v", err) return err } - volume, err := GetVolume(ctx, d.Factory, d.Namespace, pod) + volume, err := GetVolume(ctx, d.Factory, d.Namespace, pod, d) if err != nil { log.Errorf("get volume from k8s: %v", err) return err @@ -213,7 +216,7 @@ func (d *Options) Main(ctx context.Context, tempContainerConfig *containerConfig } } - handler.RollbackFuncList = append(handler.RollbackFuncList, func() { + d.RollbackFuncList = append(d.RollbackFuncList, func() { _ = runConfigList.Remove(ctx, d.Cli) }) err = runConfigList.Run(ctx, volume, d.Cli, d.DockerCli) @@ -570,7 +573,7 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, conf *util.S }, ) go h.Run(func() error { select {} }) - handler.RollbackFuncList = append(handler.RollbackFuncList, func() { + d.RollbackFuncList = append(d.RollbackFuncList, func() { h.Close() }) err = runLogsWaitRunning(cancelCtx, d.DockerCli, id) diff --git a/pkg/handler/cleaner.go b/pkg/handler/cleaner.go index 00b8904d..636297ea 100644 --- a/pkg/handler/cleaner.go +++ b/pkg/handler/cleaner.go @@ -25,8 +25,6 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/util" ) -var RollbackFuncList = make([]func(), 2) - func (c *ConnectOptions) addCleanUpResourceHandler() { var stopChan = make(chan os.Signal) signal.Notify(stopChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL) @@ -67,7 +65,7 @@ func (c *ConnectOptions) Cleanup() { log.Errorf("can not update ref-count: %v", err) } } - for _, function := range RollbackFuncList { + for _, function := range c.RollbackFuncList { if function != nil { function() } @@ -80,7 +78,7 @@ func (c *ConnectOptions) Cleanup() { if c.cancel != nil { c.cancel() } - RollbackFuncList = RollbackFuncList[:] + c.RollbackFuncList = c.RollbackFuncList[:] name, err := c.GetTunDeviceName() if err == nil { log.Errorf("get tun device error: %v", err) diff --git a/pkg/handler/clone.go b/pkg/handler/clone.go index 085be4fb..4c6264a0 100644 --- a/pkg/handler/clone.go +++ b/pkg/handler/clone.go @@ -67,6 +67,8 @@ type CloneOptions struct { restclient *rest.RESTClient config *rest.Config factory cmdutil.Factory + + RollbackFuncList []func() } func (d *CloneOptions) InitClient(f cmdutil.Factory) (err error) { @@ -187,7 +189,7 @@ func (d *CloneOptions) DoClone(ctx context.Context) error { if err != nil { return err } - RollbackFuncList = append(RollbackFuncList, func() { + d.RollbackFuncList = append(d.RollbackFuncList, func() { _ = client.Resource(object.Mapping.Resource).Namespace(d.TargetNamespace).Delete(context.Background(), u.GetName(), metav1.DeleteOptions{}) }) retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index f4acc75b..c19644e8 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -89,8 +89,9 @@ type ConnectOptions struct { cidrs []*net.IPNet dhcp *DHCPManager // needs to give it back to dhcp - localTunIPv4 *net.IPNet - localTunIPv6 *net.IPNet + localTunIPv4 *net.IPNet + localTunIPv6 *net.IPNet + RollbackFuncList []func() apiServerIPs []net.IP extraHost []dns.Entry @@ -608,7 +609,7 @@ func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) { if !util.FindAllowFirewallRule() { util.AddAllowFirewallRule() } - RollbackFuncList = append(RollbackFuncList, util.DeleteAllowFirewallRule) + c.RollbackFuncList = append(c.RollbackFuncList, util.DeleteAllowFirewallRule) go util.DeleteBlockFirewallRule(ctx) }