feat: clone works well

This commit is contained in:
fengcaiwen
2023-09-17 12:41:34 +08:00
committed by naison
parent 49e8a14118
commit f91507102e
23 changed files with 2053 additions and 1132 deletions

View File

@@ -357,8 +357,7 @@ kubevpn --headers user=naison dev deployment/authors
Example
```shell
root@27b74bde78b6:/app# kubevpn --headers user=naison dev deployment/authors
hostname 27b74bde78b6
➜ ~ kubevpn --headers user=naison dev deployment/authors
got cidr from cache
update ref count successfully
traffic manager already exist, reuse it

View File

@@ -352,8 +352,7 @@ kubevpn --headers user=naison dev deployment/authors
例如:
```shell
root@27b74bde78b6:/app# kubevpn --headers user=naison dev deployment/authors
hostname 27b74bde78b6
➜ ~ kubevpn --headers user=naison dev deployment/authors
got cidr from cache
update ref count successfully
traffic manager already exist, reuse it

146
cmd/kubevpn/cmds/clone.go Normal file
View File

@@ -0,0 +1,146 @@
package cmds
import (
"fmt"
"io"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
utilcomp "k8s.io/kubectl/pkg/util/completion"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/daemon"
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/pkg/handler"
"github.com/wencaiwulue/kubevpn/pkg/util"
)
// CmdClone multiple cluster operate, can start up one deployment to another cluster
// kubectl exec POD_NAME -c CONTAINER_NAME /sbin/killall5 or ephemeralcontainers
func CmdClone(f cmdutil.Factory) *cobra.Command {
var options = handler.CloneOptions{}
var sshConf = &util.SshConfig{}
var transferImage bool
cmd := &cobra.Command{
Use: "clone",
Short: i18n.T("Clone workloads to target-kubeconfig cluster with same volume、env、and network"),
Long: templates.LongDesc(i18n.T(`Clone workloads to target-kubeconfig cluster with same volume、env、and network`)),
Example: templates.Examples(i18n.T(`
# clone
- clone deployment in current cluster and current namespace
kubevpn clone deployment/productpage
- clone deployment in current cluster with different namespace
kubevpn clone deployment/productpage -n test
- clone deployment to another cluster
kubevpn clone deployment/productpage --target-kubeconfig ~/.kube/other-kubeconfig
- clone multiple workloads
kubevpn clone deployment/authors deployment/productpage
or
kubevpn clone deployment authors productpage
# clone with mesh, traffic with header a=1, will hit cloned workloads, otherwise hit origin workloads
kubevpn clone deployment/productpage --headers a=1
# clone workloads which api-server behind of bastion host or ssh jump host
kubevpn clone deployment/productpage --ssh-addr 192.168.1.100:22 --ssh-username root --ssh-keyfile ~/.ssh/ssh.pem --headers a=1
# it also support ProxyJump, like
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌────────────┐
│ pc ├────►│ ssh1 ├────►│ ssh2 ├────►│ ssh3 ├─────►... ─────► │ api-server │
└──────┘ └──────┘ └──────┘ └──────┘ └────────────┘
kubevpn clone service/productpage --ssh-alias <alias> --headers a=1
`)),
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
// not support temporally
if options.Engine == config.EngineGvisor {
return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw)
}
// startup daemon process and sudo process
return daemon.StartupDaemon(cmd.Context())
},
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
_, _ = fmt.Fprintf(os.Stdout, "You must specify the type of resource to proxy. %s\n\n", cmdutil.SuggestAPIResources("kubevpn"))
fullCmdName := cmd.Parent().CommandPath()
usageString := "Required resource not specified."
if len(fullCmdName) > 0 && cmdutil.IsSiblingCommandExists(cmd, "explain") {
usageString = fmt.Sprintf("%s\nUse \"%s explain <resource>\" for a detailed description of that resource (e.g. %[2]s explain pods).", usageString, fullCmdName)
}
return cmdutil.UsageErrorf(cmd, usageString)
}
// special empty string, eg: --target-registry ""
options.IsChangeTargetRegistry = cmd.Flags().Changed("target-registry")
bytes, ns, err := util.ConvertToKubeconfigBytes(f)
if err != nil {
return err
}
req := &rpc.CloneRequest{
KubeconfigBytes: string(bytes),
Namespace: ns,
Headers: options.Headers,
Workloads: args,
ExtraCIDR: options.ExtraCIDR,
ExtraDomain: options.ExtraDomain,
UseLocalDNS: options.UseLocalDNS,
Engine: string(options.Engine),
SshJump: sshConf.ToRPC(),
TargetKubeconfig: options.TargetKubeconfig,
TargetNamespace: options.TargetNamespace,
TargetContainer: options.TargetContainer,
TargetImage: options.TargetImage,
TargetRegistry: options.TargetRegistry,
IsChangeTargetRegistry: options.IsChangeTargetRegistry,
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
}
cli := daemon.GetClient(false)
resp, err := cli.Clone(cmd.Context(), req)
if err != nil {
return err
}
for {
recv, err := resp.Recv()
if err == io.EOF {
break
} else if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Canceled {
return nil
} else if err != nil {
return err
}
log.Print(recv.GetMessage())
}
util.Print(os.Stdout, "Now clone workloads running successfully on other cluster, enjoy it :)")
return nil
},
}
cmd.Flags().StringToStringVarP(&options.Headers, "headers", "H", map[string]string{}, "Traffic with special headers with reverse it to clone workloads, you should startup your service after reverse workloads successfully, If not special, redirect all traffic to clone workloads, format is k=v, like: k1=v1,k2=v2")
cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug mode or not, true or false")
cmd.Flags().StringVar(&config.Image, "image", config.Image, "Use this image to startup container")
cmd.Flags().StringArrayVar(&options.ExtraCIDR, "extra-cidr", []string{}, "Extra cidr string, eg: --extra-cidr 192.168.0.159/24 --extra-cidr 192.168.1.160/32")
cmd.Flags().StringArrayVar(&options.ExtraDomain, "extra-domain", []string{}, "Extra domain string, the resolved ip will add to route table, eg: --extra-domain test.abc.com --extra-domain foo.test.com")
cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image)
cmd.Flags().StringVar((*string)(&options.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw))
cmd.Flags().BoolVar(&options.UseLocalDNS, "use-localdns", false, "if use-lcoaldns is true, kubevpn will start coredns listen at 53 to forward your dns queries. only support on linux now")
cmd.Flags().StringVar(&options.TargetImage, "target-image", "", "Clone container use this image to startup container, if not special, use origin origin image")
cmd.Flags().StringVar(&options.TargetContainer, "target-container", "", "Clone container use special image to startup this container, if not special, use origin origin image")
cmd.Flags().StringVar(&options.TargetNamespace, "target-namespace", "", "Clone workloads in this namespace, if not special, use origin namespace")
cmd.Flags().StringVar(&options.TargetKubeconfig, "target-kubeconfig", "", "Clone workloads will create in this cluster, if not special, use origin cluster")
cmd.Flags().StringVar(&options.TargetRegistry, "target-registry", "", "Clone workloads will create this registry domain to replace origin registry, if not special, use origin registry")
addSshFlags(cmd, sshConf)
cmd.ValidArgsFunction = utilcomp.ResourceTypeAndNameCompletionFunc(f)
return cmd
}

View File

@@ -54,21 +54,17 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command {
return err
}
req := &rpc.ConnectRequest{
KubeconfigBytes: string(bytes),
Namespace: ns,
ExtraCIDR: connect.ExtraCIDR,
ExtraDomain: connect.ExtraDomain,
UseLocalDNS: connect.UseLocalDNS,
Engine: string(connect.Engine),
Addr: sshConf.Addr,
User: sshConf.User,
Password: sshConf.Password,
Keyfile: sshConf.Keyfile,
ConfigAlias: sshConf.ConfigAlias,
RemoteKubeconfig: sshConf.RemoteKubeconfig,
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
KubeconfigBytes: string(bytes),
Namespace: ns,
ExtraCIDR: connect.ExtraCIDR,
ExtraDomain: connect.ExtraDomain,
UseLocalDNS: connect.UseLocalDNS,
Engine: string(connect.Engine),
SshJump: sshConf.ToRPC(),
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
}
cli := daemon.GetClient(false)
resp, err := cli.Connect(cmd.Context(), req)

View File

@@ -68,7 +68,7 @@ func CmdCp(f cmdutil.Factory) *cobra.Command {
Long: i18n.T("Copy files and directories to and from containers. Different between kubectl cp is it will de-reference symbol link."),
Example: cpExample,
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
cmdutil.CheckErr(handler.SshJump(cmd.Context(), sshConf, cmd.Flags()))
cmdutil.CheckErr(handler.SshJump(cmd.Context(), sshConf, cmd.Flags(), false))
var comps []string
if len(args) == 0 {

View File

@@ -81,7 +81,7 @@ Startup your kubernetes workloads in local Docker container with same volume、e
if err != nil {
return err
}
return handler.SshJump(cmd.Context(), sshConf, cmd.Flags())
return handler.SshJump(cmd.Context(), sshConf, cmd.Flags(), false)
},
RunE: func(cmd *cobra.Command, args []string) error {
devOptions.Workload = args[0]

View File

@@ -1,129 +0,0 @@
package cmds
import (
"context"
"fmt"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
utilcomp "k8s.io/kubectl/pkg/util/completion"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/handler"
"github.com/wencaiwulue/kubevpn/pkg/util"
)
// CmdDuplicate multiple cluster operate, can start up one deployment to another cluster
// kubectl exec POD_NAME -c CONTAINER_NAME /sbin/killall5 or ephemeralcontainers
func CmdDuplicate(f cmdutil.Factory) *cobra.Command {
var duplicateOptions = handler.DuplicateOptions{}
var sshConf = &util.SshConfig{}
var transferImage bool
cmd := &cobra.Command{
Use: "duplicate",
Short: i18n.T("Duplicate workloads to target-kubeconfig cluster with same volume、env、and network"),
Long: templates.LongDesc(i18n.T(`Duplicate workloads to target-kubeconfig cluster with same volume、env、and network`)),
Example: templates.Examples(i18n.T(`
# duplicate
- duplicate deployment in current cluster and current namespace
kubevpn duplicate deployment/productpage
- duplicate deployment in current cluster with different namespace
kubevpn duplicate deployment/productpage -n test
- duplicate deployment to another cluster
kubevpn duplicate deployment/productpage --target-kubeconfig ~/.kube/other-kubeconfig
- duplicate multiple workloads
kubevpn duplicate deployment/authors deployment/productpage
or
kubevpn duplicate deployment authors productpage
# duplicate with mesh, traffic with header a=1, will hit duplicate workloads, otherwise hit origin workloads
kubevpn duplicate deployment/productpage --headers a=1
# duplicate workloads which api-server behind of bastion host or ssh jump host
kubevpn duplicate deployment/productpage --ssh-addr 192.168.1.100:22 --ssh-username root --ssh-keyfile ~/.ssh/ssh.pem --headers a=1
# it also support ProxyJump, like
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌────────────┐
│ pc ├────►│ ssh1 ├────►│ ssh2 ├────►│ ssh3 ├─────►... ─────► │ api-server │
└──────┘ └──────┘ └──────┘ └──────┘ └────────────┘
kubevpn duplicate service/productpage --ssh-alias <alias> --headers a=1
`)),
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
// not support temporally
if duplicateOptions.Engine == config.EngineGvisor {
return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw)
}
return handler.SshJump(cmd.Context(), sshConf, cmd.Flags())
},
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
_, _ = fmt.Fprintf(os.Stdout, "You must specify the type of resource to proxy. %s\n\n", cmdutil.SuggestAPIResources("kubevpn"))
fullCmdName := cmd.Parent().CommandPath()
usageString := "Required resource not specified."
if len(fullCmdName) > 0 && cmdutil.IsSiblingCommandExists(cmd, "explain") {
usageString = fmt.Sprintf("%s\nUse \"%s explain <resource>\" for a detailed description of that resource (e.g. %[2]s explain pods).", usageString, fullCmdName)
}
return cmdutil.UsageErrorf(cmd, usageString)
}
// special empty string, eg: --target-registry ""
duplicateOptions.IsChangeTargetRegistry = cmd.Flags().Changed("target-registry")
connectOptions := handler.ConnectOptions{
Namespace: duplicateOptions.Namespace,
Workloads: args,
ExtraCIDR: duplicateOptions.ExtraCIDR,
Engine: duplicateOptions.Engine,
}
if err := connectOptions.InitClient(f); err != nil {
return err
}
err := connectOptions.PreCheckResource()
if err != nil {
return err
}
duplicateOptions.Workloads = connectOptions.Workloads
connectOptions.Workloads = []string{}
// todo use rpc
if err = connectOptions.DoConnect(cmd.Context()); err != nil {
log.Errorln(err)
connectOptions.Cleanup()
} else {
err = duplicateOptions.InitClient(f)
if err != nil {
return err
}
err = duplicateOptions.DoDuplicate(context.Background())
if err != nil {
return err
}
util.Print(os.Stdout, "Now duplicate workloads running successfully on other cluster, enjoy it :)")
}
select {}
},
}
cmd.Flags().StringToStringVarP(&duplicateOptions.Headers, "headers", "H", map[string]string{}, "Traffic with special headers with reverse it to duplicate workloads, you should startup your service after reverse workloads successfully, If not special, redirect all traffic to duplicate workloads, format is k=v, like: k1=v1,k2=v2")
cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug mode or not, true or false")
cmd.Flags().StringVar(&config.Image, "image", config.Image, "Use this image to startup container")
cmd.Flags().StringArrayVar(&duplicateOptions.ExtraCIDR, "extra-cidr", []string{}, "Extra cidr string, eg: --extra-cidr 192.168.0.159/24 --extra-cidr 192.168.1.160/32")
cmd.Flags().StringArrayVar(&duplicateOptions.ExtraDomain, "extra-domain", []string{}, "Extra domain string, the resolved ip will add to route table, eg: --extra-domain test.abc.com --extra-domain foo.test.com")
cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image)
cmd.Flags().StringVar((*string)(&duplicateOptions.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw))
cmd.Flags().StringVar(&duplicateOptions.TargetImage, "target-image", "", "Duplicate container use this image to startup container, if not special, use origin origin image")
cmd.Flags().StringVar(&duplicateOptions.TargetContainer, "target-container", "", "Duplicate container use special image to startup this container, if not special, use origin origin image")
cmd.Flags().StringVar(&duplicateOptions.TargetNamespace, "target-namespace", "", "Duplicate workloads in this namespace, if not special, use origin namespace")
cmd.Flags().StringVar(&duplicateOptions.TargetKubeconfig, "target-kubeconfig", "", "Duplicate workloads will create in this cluster, if not special, use origin cluster")
cmd.Flags().StringVar(&duplicateOptions.TargetRegistry, "target-registry", "", "Duplicate workloads will create this registry domain to replace origin registry, if not special, use origin registry")
addSshFlags(cmd, sshConf)
cmd.ValidArgsFunction = utilcomp.ResourceTypeAndNameCompletionFunc(f)
return cmd
}

View File

@@ -49,23 +49,18 @@ func CmdGet(f cmdutil.Factory) *cobra.Command {
client, err := daemon.GetClient(true).Connect(
cmd.Context(),
&rpc.ConnectRequest{
KubeconfigBytes: string(bytes),
Namespace: ns,
Headers: connect.Headers,
Workloads: connect.Workloads,
ExtraCIDR: connect.ExtraCIDR,
ExtraDomain: connect.ExtraDomain,
UseLocalDNS: connect.UseLocalDNS,
Engine: string(connect.Engine),
Addr: sshConf.Addr,
User: sshConf.User,
Password: sshConf.Password,
Keyfile: sshConf.Keyfile,
ConfigAlias: sshConf.ConfigAlias,
RemoteKubeconfig: sshConf.RemoteKubeconfig,
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
KubeconfigBytes: string(bytes),
Namespace: ns,
Headers: connect.Headers,
Workloads: connect.Workloads,
ExtraCIDR: connect.ExtraCIDR,
ExtraDomain: connect.ExtraDomain,
UseLocalDNS: connect.UseLocalDNS,
Engine: string(connect.Engine),
SshJump: sshConf.ToRPC(),
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
},
)
if err != nil {

View File

@@ -1,9 +1,11 @@
package cmds
import (
"context"
"fmt"
"io"
"os"
"time"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@@ -24,7 +26,7 @@ import (
func CmdProxy(f cmdutil.Factory) *cobra.Command {
var connect = handler.ConnectOptions{}
var sshConf = &util.SshConfig{}
var transferImage bool
var transferImage, foreground bool
cmd := &cobra.Command{
Use: "proxy",
Short: i18n.T("Proxy kubernetes workloads inbound traffic into local PC"),
@@ -81,26 +83,22 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
return err
}
// todo 将 doConnect 方法封装?内部使用 client 发送到daemon
client, err := daemon.GetClient(false).Proxy(
cli := daemon.GetClient(false)
client, err := cli.Proxy(
cmd.Context(),
&rpc.ConnectRequest{
KubeconfigBytes: string(bytes),
Namespace: ns,
Headers: connect.Headers,
Workloads: args,
ExtraCIDR: connect.ExtraCIDR,
ExtraDomain: connect.ExtraDomain,
UseLocalDNS: connect.UseLocalDNS,
Engine: string(connect.Engine),
Addr: sshConf.Addr,
User: sshConf.User,
Password: sshConf.Password,
Keyfile: sshConf.Keyfile,
ConfigAlias: sshConf.ConfigAlias,
RemoteKubeconfig: sshConf.RemoteKubeconfig,
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
KubeconfigBytes: string(bytes),
Namespace: ns,
Headers: connect.Headers,
Workloads: args,
ExtraCIDR: connect.ExtraCIDR,
ExtraDomain: connect.ExtraDomain,
UseLocalDNS: connect.UseLocalDNS,
Engine: string(connect.Engine),
SshJump: sshConf.ToRPC(),
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
},
)
if err != nil {
@@ -119,6 +117,32 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
return err
}
}
// hangup
if foreground {
// leave from cluster resources
<-cmd.Context().Done()
now := time.Now()
stream, err := cli.Leave(context.Background(), &rpc.LeaveRequest{
Workloads: args,
})
fmt.Printf("call api leave use %s\n", time.Now().Sub(now).String())
if err != nil {
return err
}
var resp *rpc.LeaveResponse
for {
resp, err = stream.Recv()
if err == io.EOF {
return nil
} else if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Canceled {
return nil
} else if err != nil {
return err
}
fmt.Fprint(os.Stdout, resp.Message)
}
}
return nil
},
}
@@ -129,6 +153,7 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
cmd.Flags().StringArrayVar(&connect.ExtraDomain, "extra-domain", []string{}, "Extra domain string, the resolved ip will add to route table, eg: --extra-domain test.abc.com --extra-domain foo.test.com")
cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image)
cmd.Flags().StringVar((*string)(&connect.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw))
cmd.Flags().BoolVar(&foreground, "foreground", false, "foreground hang up")
addSshFlags(cmd, sshConf)
cmd.ValidArgsFunction = utilcomp.ResourceTypeAndNameCompletionFunc(f)

View File

@@ -0,0 +1,45 @@
package cmds
import (
"fmt"
"io"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"github.com/wencaiwulue/kubevpn/pkg/daemon"
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
)
func CmdRemove(f cmdutil.Factory) *cobra.Command {
var leaveCmd = &cobra.Command{
Use: "remove",
Short: "Remove reverse remote resource traffic to local machine",
Long: `Remove remote traffic to local machine`,
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
return daemon.StartupDaemon(cmd.Context())
},
RunE: func(cmd *cobra.Command, args []string) error {
leave, err := daemon.GetClient(false).Remove(cmd.Context(), &rpc.RemoveRequest{
Workloads: args,
})
if err != nil {
return err
}
for {
recv, err := leave.Recv()
if err == io.EOF {
return nil
} else if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Canceled {
return nil
} else if err != nil {
return err
}
fmt.Print(recv.GetMessage())
}
},
}
return leaveCmd
}

View File

@@ -39,7 +39,7 @@ func CmdReset(factory cmdutil.Factory) *cobra.Command {
`)),
PreRunE: func(cmd *cobra.Command, args []string) error {
return handler.SshJump(cmd.Context(), sshConf, cmd.Flags())
return handler.SshJump(cmd.Context(), sshConf, cmd.Flags(), false)
},
Run: func(cmd *cobra.Command, args []string) {
util.InitLogger(false)

View File

@@ -57,7 +57,8 @@ func NewKubeVPNCommand() *cobra.Command {
CmdList(factory),
CmdLeave(factory),
CmdDev(factory),
CmdDuplicate(factory),
CmdClone(factory),
CmdRemove(factory),
CmdCp(factory),
CmdUpgrade(factory),
CmdReset(factory),

125
pkg/daemon/action/clone.go Normal file
View File

@@ -0,0 +1,125 @@
package action
import (
"fmt"
"io"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/pkg/handler"
"github.com/wencaiwulue/kubevpn/pkg/util"
)
func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) error {
var sshConf = util.ParseSshFromRPC(req.SshJump)
origin := log.StandardLogger().Out
defer func() {
log.SetOutput(origin)
log.SetLevel(log.DebugLevel)
}()
out := io.MultiWriter(newCloneWarp(resp), origin)
log.SetOutput(out)
util.InitLogger(false)
connReq := &rpc.ConnectRequest{
KubeconfigBytes: req.KubeconfigBytes,
Namespace: req.Namespace,
ExtraCIDR: req.ExtraCIDR,
ExtraDomain: req.ExtraDomain,
UseLocalDNS: req.UseLocalDNS,
Engine: req.Engine,
SshJump: req.SshJump,
TransferImage: req.TransferImage,
Image: req.Image,
Level: req.Level,
}
cli := svr.GetClient(false)
connResp, err := cli.Connect(resp.Context(), connReq)
if err != nil {
return err
}
var msg *rpc.ConnectResponse
for {
msg, err = connResp.Recv()
if err == io.EOF {
break
} else if err == nil {
fmt.Fprint(os.Stdout, msg.Message)
} else if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Canceled {
return nil
} else {
return err
}
}
options := &handler.CloneOptions{
Namespace: req.Namespace,
Headers: req.Headers,
Workloads: req.Workloads,
ExtraCIDR: req.ExtraCIDR,
ExtraDomain: req.ExtraDomain,
UseLocalDNS: req.UseLocalDNS,
Engine: config.Engine(req.Engine),
TargetKubeconfig: req.TargetKubeconfig,
TargetNamespace: req.TargetNamespace,
TargetContainer: req.TargetContainer,
TargetImage: req.TargetImage,
TargetRegistry: req.TargetRegistry,
IsChangeTargetRegistry: req.IsChangeTargetRegistry,
}
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
err = handler.SshJump(resp.Context(), sshConf, flags, true)
if err != nil {
return err
}
f := InitFactory(req.KubeconfigBytes, req.Namespace)
err = options.InitClient(f)
if err != nil {
return err
}
config.Image = req.Image
err = options.DoClone(resp.Context())
if err != nil {
return err
}
return nil
}
type cloneWarp struct {
server rpc.Daemon_CloneServer
}
func (r *cloneWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.CloneResponse{
Message: string(p),
})
return len(p), err
}
func newCloneWarp(server rpc.Daemon_CloneServer) io.Writer {
return &cloneWarp{server: server}
}
//type daemonConnectServer struct {
// out io.Writer
// grpc.ServerStream
//}
//
//func (d *daemonConnectServer) Send(response *rpc.ConnectResponse) error {
// _, err := d.out.Write([]byte(response.Message))
// return err
//}

View File

@@ -101,14 +101,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
UseLocalDNS: req.UseLocalDNS,
Engine: config.Engine(req.Engine),
}
var sshConf = &util.SshConfig{
Addr: req.Addr,
User: req.User,
Password: req.Password,
Keyfile: req.Keyfile,
ConfigAlias: req.ConfigAlias,
RemoteKubeconfig: req.RemoteKubeconfig,
}
var sshConf = util.ParseSshFromRPC(req.SshJump)
var transferImage = req.TransferImage
go util.StartupPProf(config.PProfPort)
@@ -131,7 +124,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
sshCtx, sshCancel := context.WithCancel(context.Background())
handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel)
err = handler.SshJump(sshCtx, sshConf, flags)
err = handler.SshJump(sshCtx, sshConf, flags, false)
if err != nil {
return err
}
@@ -172,14 +165,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
UseLocalDNS: req.UseLocalDNS,
Engine: config.Engine(req.Engine),
}
var sshConf = &util.SshConfig{
Addr: req.Addr,
User: req.User,
Password: req.Password,
Keyfile: req.Keyfile,
ConfigAlias: req.ConfigAlias,
RemoteKubeconfig: req.RemoteKubeconfig,
}
var sshConf = util.ParseSshFromRPC(req.SshJump)
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
@@ -191,7 +177,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
})
sshCtx, sshCancel := context.WithCancel(context.Background())
handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel)
err = handler.SshJump(sshCtx, sshConf, flags)
err = handler.SshJump(sshCtx, sshConf, flags, true)
if err != nil {
return err
}

View File

@@ -39,14 +39,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
UseLocalDNS: req.UseLocalDNS,
Engine: config.Engine(req.Engine),
}
var sshConf = &util.SshConfig{
Addr: req.Addr,
User: req.User,
Password: req.Password,
Keyfile: req.Keyfile,
ConfigAlias: req.ConfigAlias,
RemoteKubeconfig: req.RemoteKubeconfig,
}
var sshConf = util.ParseSshFromRPC(req.SshJump)
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
@@ -57,7 +50,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
Name: "kubeconfig",
DefValue: file,
})
err = handler.SshJump(ctx, sshConf, flags)
err = handler.SshJump(ctx, sshConf, flags, false)
if err != nil {
return err
}

View File

@@ -0,0 +1,9 @@
package action
import (
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
)
func (svr *Server) Remove(req *rpc.RemoveRequest, resp rpc.Daemon_RemoveServer) error {
return nil
}

View File

@@ -130,7 +130,11 @@ func runDaemon(ctx context.Context, isSudo bool) error {
}
}
if isSudo {
err = util.RunCmdWithElevated([]string{"daemon", "--sudo"})
if !util.IsAdmin() {
err = util.RunCmdWithElevated([]string{"daemon", "--sudo"})
} else {
err = util.RunCmd([]string{"daemon", "--sudo"})
}
} else {
err = util.RunCmd([]string{"daemon"})
}

File diff suppressed because it is too large Load Diff

View File

@@ -6,14 +6,17 @@ package rpc;
service Daemon {
rpc Connect (ConnectRequest) returns (stream ConnectResponse) {}
rpc Proxy (ConnectRequest) returns (stream ConnectResponse) {}
rpc Disconnect (DisconnectRequest) returns (stream DisconnectResponse) {}
rpc Proxy (ConnectRequest) returns (stream ConnectResponse) {}
rpc Leave (LeaveRequest) returns (stream LeaveResponse) {}
rpc Clone (CloneRequest) returns (stream CloneResponse) {}
rpc Remove (RemoveRequest) returns (stream RemoveResponse) {}
rpc Logs (LogRequest) returns (stream LogResponse) {}
rpc List (ListRequest) returns (ListResponse) {}
rpc Upgrade (UpgradeRequest) returns (stream UpgradeResponse) {}
rpc Status (StatusRequest) returns (StatusResponse) {}
rpc Quit (QuitRequest) returns (stream QuitResponse) {}
rpc List (ListRequest) returns (ListResponse) {}
rpc Leave (LeaveRequest) returns (stream LeaveResponse) {}
rpc Upgrade (UpgradeRequest) returns (stream UpgradeResponse) {}
}
message ConnectRequest {
@@ -26,25 +29,76 @@ message ConnectRequest {
bool UseLocalDNS = 7;
string Engine = 8;
// ssh jump
string Addr = 9;
string User = 10;
string Password = 11;
string Keyfile = 12;
string ConfigAlias = 13;
string RemoteKubeconfig = 14;
SshJump SshJump = 9;
// transfer image
bool TransferImage = 15;
string Image = 16;
bool TransferImage = 10;
string Image = 11;
// log level
int32 Level = 17;
int32 Level = 12;
}
message ConnectResponse {
string message = 1;
}
message DisconnectRequest {
}
message DisconnectResponse {
string message = 1;
}
message LeaveRequest {
repeated string Workloads = 1;
}
message LeaveResponse {
string message = 1;
}
message CloneRequest {
string KubeconfigBytes = 1;
string Namespace = 2;
map<string, string> Headers = 3;
repeated string Workloads = 4;
repeated string ExtraCIDR = 5;
repeated string ExtraDomain = 6;
bool UseLocalDNS = 7;
string Engine = 8;
// ssh jump
SshJump SshJump = 9;
// target cluster info
string TargetKubeconfig = 10;
string TargetNamespace = 11;
string TargetContainer = 12;
string TargetImage = 13;
string TargetRegistry = 14;
bool IsChangeTargetRegistry = 15;
// transfer image
bool TransferImage = 16;
string Image = 17;
// log level
int32 Level = 18;
}
message CloneResponse {
string message = 1;
}
message RemoveRequest {
repeated string Workloads = 1;
}
message RemoveResponse {
string message = 1;
}
message QuitRequest {
}
@@ -68,13 +122,6 @@ message LogResponse {
string message = 1;
}
message DisconnectRequest {
}
message DisconnectResponse {
string message = 1;
}
message ListRequest {
}
@@ -82,14 +129,6 @@ message ListResponse {
string message = 1;
}
message LeaveRequest {
repeated string Workloads = 1;
}
message LeaveResponse {
string message = 1;
}
message UpgradeRequest {
string ClientVersion = 1;
string ClientCommitId = 2;
@@ -97,4 +136,13 @@ message UpgradeRequest {
message UpgradeResponse {
string message = 1;
}
message SshJump {
string Addr = 1;
string User = 2;
string Password = 3;
string Keyfile = 4;
string ConfigAlias = 5;
string RemoteKubeconfig = 6;
}

View File

@@ -20,14 +20,16 @@ const _ = grpc.SupportPackageIsVersion7
const (
Daemon_Connect_FullMethodName = "/rpc.Daemon/Connect"
Daemon_Proxy_FullMethodName = "/rpc.Daemon/Proxy"
Daemon_Disconnect_FullMethodName = "/rpc.Daemon/Disconnect"
Daemon_Proxy_FullMethodName = "/rpc.Daemon/Proxy"
Daemon_Leave_FullMethodName = "/rpc.Daemon/Leave"
Daemon_Clone_FullMethodName = "/rpc.Daemon/Clone"
Daemon_Remove_FullMethodName = "/rpc.Daemon/Remove"
Daemon_Logs_FullMethodName = "/rpc.Daemon/Logs"
Daemon_List_FullMethodName = "/rpc.Daemon/List"
Daemon_Upgrade_FullMethodName = "/rpc.Daemon/Upgrade"
Daemon_Status_FullMethodName = "/rpc.Daemon/Status"
Daemon_Quit_FullMethodName = "/rpc.Daemon/Quit"
Daemon_List_FullMethodName = "/rpc.Daemon/List"
Daemon_Leave_FullMethodName = "/rpc.Daemon/Leave"
Daemon_Upgrade_FullMethodName = "/rpc.Daemon/Upgrade"
)
// DaemonClient is the client API for Daemon service.
@@ -35,14 +37,16 @@ const (
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DaemonClient interface {
Connect(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (Daemon_ConnectClient, error)
Proxy(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (Daemon_ProxyClient, error)
Disconnect(ctx context.Context, in *DisconnectRequest, opts ...grpc.CallOption) (Daemon_DisconnectClient, error)
Proxy(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (Daemon_ProxyClient, error)
Leave(ctx context.Context, in *LeaveRequest, opts ...grpc.CallOption) (Daemon_LeaveClient, error)
Clone(ctx context.Context, in *CloneRequest, opts ...grpc.CallOption) (Daemon_CloneClient, error)
Remove(ctx context.Context, in *RemoveRequest, opts ...grpc.CallOption) (Daemon_RemoveClient, error)
Logs(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (Daemon_LogsClient, error)
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (Daemon_UpgradeClient, error)
Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error)
Quit(ctx context.Context, in *QuitRequest, opts ...grpc.CallOption) (Daemon_QuitClient, error)
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
Leave(ctx context.Context, in *LeaveRequest, opts ...grpc.CallOption) (Daemon_LeaveClient, error)
Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (Daemon_UpgradeClient, error)
}
type daemonClient struct {
@@ -85,40 +89,8 @@ func (x *daemonConnectClient) Recv() (*ConnectResponse, error) {
return m, nil
}
func (c *daemonClient) Proxy(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (Daemon_ProxyClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[1], Daemon_Proxy_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &daemonProxyClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Daemon_ProxyClient interface {
Recv() (*ConnectResponse, error)
grpc.ClientStream
}
type daemonProxyClient struct {
grpc.ClientStream
}
func (x *daemonProxyClient) Recv() (*ConnectResponse, error) {
m := new(ConnectResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *daemonClient) Disconnect(ctx context.Context, in *DisconnectRequest, opts ...grpc.CallOption) (Daemon_DisconnectClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[2], Daemon_Disconnect_FullMethodName, opts...)
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[1], Daemon_Disconnect_FullMethodName, opts...)
if err != nil {
return nil, err
}
@@ -149,12 +121,12 @@ func (x *daemonDisconnectClient) Recv() (*DisconnectResponse, error) {
return m, nil
}
func (c *daemonClient) Logs(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (Daemon_LogsClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[3], Daemon_Logs_FullMethodName, opts...)
func (c *daemonClient) Proxy(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (Daemon_ProxyClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[2], Daemon_Proxy_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &daemonLogsClient{stream}
x := &daemonProxyClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
@@ -164,75 +136,25 @@ func (c *daemonClient) Logs(ctx context.Context, in *LogRequest, opts ...grpc.Ca
return x, nil
}
type Daemon_LogsClient interface {
Recv() (*LogResponse, error)
type Daemon_ProxyClient interface {
Recv() (*ConnectResponse, error)
grpc.ClientStream
}
type daemonLogsClient struct {
type daemonProxyClient struct {
grpc.ClientStream
}
func (x *daemonLogsClient) Recv() (*LogResponse, error) {
m := new(LogResponse)
func (x *daemonProxyClient) Recv() (*ConnectResponse, error) {
m := new(ConnectResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *daemonClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) {
out := new(StatusResponse)
err := c.cc.Invoke(ctx, Daemon_Status_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *daemonClient) Quit(ctx context.Context, in *QuitRequest, opts ...grpc.CallOption) (Daemon_QuitClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[4], Daemon_Quit_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &daemonQuitClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Daemon_QuitClient interface {
Recv() (*QuitResponse, error)
grpc.ClientStream
}
type daemonQuitClient struct {
grpc.ClientStream
}
func (x *daemonQuitClient) Recv() (*QuitResponse, error) {
m := new(QuitResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *daemonClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
out := new(ListResponse)
err := c.cc.Invoke(ctx, Daemon_List_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *daemonClient) Leave(ctx context.Context, in *LeaveRequest, opts ...grpc.CallOption) (Daemon_LeaveClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[5], Daemon_Leave_FullMethodName, opts...)
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[3], Daemon_Leave_FullMethodName, opts...)
if err != nil {
return nil, err
}
@@ -263,8 +185,113 @@ func (x *daemonLeaveClient) Recv() (*LeaveResponse, error) {
return m, nil
}
func (c *daemonClient) Clone(ctx context.Context, in *CloneRequest, opts ...grpc.CallOption) (Daemon_CloneClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[4], Daemon_Clone_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &daemonCloneClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Daemon_CloneClient interface {
Recv() (*CloneResponse, error)
grpc.ClientStream
}
type daemonCloneClient struct {
grpc.ClientStream
}
func (x *daemonCloneClient) Recv() (*CloneResponse, error) {
m := new(CloneResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *daemonClient) Remove(ctx context.Context, in *RemoveRequest, opts ...grpc.CallOption) (Daemon_RemoveClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[5], Daemon_Remove_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &daemonRemoveClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Daemon_RemoveClient interface {
Recv() (*RemoveResponse, error)
grpc.ClientStream
}
type daemonRemoveClient struct {
grpc.ClientStream
}
func (x *daemonRemoveClient) Recv() (*RemoveResponse, error) {
m := new(RemoveResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *daemonClient) Logs(ctx context.Context, in *LogRequest, opts ...grpc.CallOption) (Daemon_LogsClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[6], Daemon_Logs_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &daemonLogsClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Daemon_LogsClient interface {
Recv() (*LogResponse, error)
grpc.ClientStream
}
type daemonLogsClient struct {
grpc.ClientStream
}
func (x *daemonLogsClient) Recv() (*LogResponse, error) {
m := new(LogResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *daemonClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
out := new(ListResponse)
err := c.cc.Invoke(ctx, Daemon_List_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *daemonClient) Upgrade(ctx context.Context, in *UpgradeRequest, opts ...grpc.CallOption) (Daemon_UpgradeClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[6], Daemon_Upgrade_FullMethodName, opts...)
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[7], Daemon_Upgrade_FullMethodName, opts...)
if err != nil {
return nil, err
}
@@ -295,19 +322,62 @@ func (x *daemonUpgradeClient) Recv() (*UpgradeResponse, error) {
return m, nil
}
func (c *daemonClient) Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) {
out := new(StatusResponse)
err := c.cc.Invoke(ctx, Daemon_Status_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *daemonClient) Quit(ctx context.Context, in *QuitRequest, opts ...grpc.CallOption) (Daemon_QuitClient, error) {
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[8], Daemon_Quit_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &daemonQuitClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Daemon_QuitClient interface {
Recv() (*QuitResponse, error)
grpc.ClientStream
}
type daemonQuitClient struct {
grpc.ClientStream
}
func (x *daemonQuitClient) Recv() (*QuitResponse, error) {
m := new(QuitResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// DaemonServer is the server API for Daemon service.
// All implementations must embed UnimplementedDaemonServer
// for forward compatibility
type DaemonServer interface {
Connect(*ConnectRequest, Daemon_ConnectServer) error
Proxy(*ConnectRequest, Daemon_ProxyServer) error
Disconnect(*DisconnectRequest, Daemon_DisconnectServer) error
Proxy(*ConnectRequest, Daemon_ProxyServer) error
Leave(*LeaveRequest, Daemon_LeaveServer) error
Clone(*CloneRequest, Daemon_CloneServer) error
Remove(*RemoveRequest, Daemon_RemoveServer) error
Logs(*LogRequest, Daemon_LogsServer) error
List(context.Context, *ListRequest) (*ListResponse, error)
Upgrade(*UpgradeRequest, Daemon_UpgradeServer) error
Status(context.Context, *StatusRequest) (*StatusResponse, error)
Quit(*QuitRequest, Daemon_QuitServer) error
List(context.Context, *ListRequest) (*ListResponse, error)
Leave(*LeaveRequest, Daemon_LeaveServer) error
Upgrade(*UpgradeRequest, Daemon_UpgradeServer) error
mustEmbedUnimplementedDaemonServer()
}
@@ -318,30 +388,36 @@ type UnimplementedDaemonServer struct {
func (UnimplementedDaemonServer) Connect(*ConnectRequest, Daemon_ConnectServer) error {
return status.Errorf(codes.Unimplemented, "method Connect not implemented")
}
func (UnimplementedDaemonServer) Proxy(*ConnectRequest, Daemon_ProxyServer) error {
return status.Errorf(codes.Unimplemented, "method Proxy not implemented")
}
func (UnimplementedDaemonServer) Disconnect(*DisconnectRequest, Daemon_DisconnectServer) error {
return status.Errorf(codes.Unimplemented, "method Disconnect not implemented")
}
func (UnimplementedDaemonServer) Proxy(*ConnectRequest, Daemon_ProxyServer) error {
return status.Errorf(codes.Unimplemented, "method Proxy not implemented")
}
func (UnimplementedDaemonServer) Leave(*LeaveRequest, Daemon_LeaveServer) error {
return status.Errorf(codes.Unimplemented, "method Leave not implemented")
}
func (UnimplementedDaemonServer) Clone(*CloneRequest, Daemon_CloneServer) error {
return status.Errorf(codes.Unimplemented, "method Clone not implemented")
}
func (UnimplementedDaemonServer) Remove(*RemoveRequest, Daemon_RemoveServer) error {
return status.Errorf(codes.Unimplemented, "method Remove not implemented")
}
func (UnimplementedDaemonServer) Logs(*LogRequest, Daemon_LogsServer) error {
return status.Errorf(codes.Unimplemented, "method Logs not implemented")
}
func (UnimplementedDaemonServer) List(context.Context, *ListRequest) (*ListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedDaemonServer) Upgrade(*UpgradeRequest, Daemon_UpgradeServer) error {
return status.Errorf(codes.Unimplemented, "method Upgrade not implemented")
}
func (UnimplementedDaemonServer) Status(context.Context, *StatusRequest) (*StatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Status not implemented")
}
func (UnimplementedDaemonServer) Quit(*QuitRequest, Daemon_QuitServer) error {
return status.Errorf(codes.Unimplemented, "method Quit not implemented")
}
func (UnimplementedDaemonServer) List(context.Context, *ListRequest) (*ListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (UnimplementedDaemonServer) Leave(*LeaveRequest, Daemon_LeaveServer) error {
return status.Errorf(codes.Unimplemented, "method Leave not implemented")
}
func (UnimplementedDaemonServer) Upgrade(*UpgradeRequest, Daemon_UpgradeServer) error {
return status.Errorf(codes.Unimplemented, "method Upgrade not implemented")
}
func (UnimplementedDaemonServer) mustEmbedUnimplementedDaemonServer() {}
// UnsafeDaemonServer may be embedded to opt out of forward compatibility for this service.
@@ -376,27 +452,6 @@ func (x *daemonConnectServer) Send(m *ConnectResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Proxy_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(ConnectRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServer).Proxy(m, &daemonProxyServer{stream})
}
type Daemon_ProxyServer interface {
Send(*ConnectResponse) error
grpc.ServerStream
}
type daemonProxyServer struct {
grpc.ServerStream
}
func (x *daemonProxyServer) Send(m *ConnectResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Disconnect_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(DisconnectRequest)
if err := stream.RecvMsg(m); err != nil {
@@ -418,6 +473,90 @@ func (x *daemonDisconnectServer) Send(m *DisconnectResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Proxy_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(ConnectRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServer).Proxy(m, &daemonProxyServer{stream})
}
type Daemon_ProxyServer interface {
Send(*ConnectResponse) error
grpc.ServerStream
}
type daemonProxyServer struct {
grpc.ServerStream
}
func (x *daemonProxyServer) Send(m *ConnectResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Leave_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(LeaveRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServer).Leave(m, &daemonLeaveServer{stream})
}
type Daemon_LeaveServer interface {
Send(*LeaveResponse) error
grpc.ServerStream
}
type daemonLeaveServer struct {
grpc.ServerStream
}
func (x *daemonLeaveServer) Send(m *LeaveResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Clone_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(CloneRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServer).Clone(m, &daemonCloneServer{stream})
}
type Daemon_CloneServer interface {
Send(*CloneResponse) error
grpc.ServerStream
}
type daemonCloneServer struct {
grpc.ServerStream
}
func (x *daemonCloneServer) Send(m *CloneResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Remove_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(RemoveRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServer).Remove(m, &daemonRemoveServer{stream})
}
type Daemon_RemoveServer interface {
Send(*RemoveResponse) error
grpc.ServerStream
}
type daemonRemoveServer struct {
grpc.ServerStream
}
func (x *daemonRemoveServer) Send(m *RemoveResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Logs_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(LogRequest)
if err := stream.RecvMsg(m); err != nil {
@@ -439,6 +578,45 @@ func (x *daemonLogsServer) Send(m *LogResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DaemonServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Daemon_List_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DaemonServer).List(ctx, req.(*ListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Daemon_Upgrade_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(UpgradeRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServer).Upgrade(m, &daemonUpgradeServer{stream})
}
type Daemon_UpgradeServer interface {
Send(*UpgradeResponse) error
grpc.ServerStream
}
type daemonUpgradeServer struct {
grpc.ServerStream
}
func (x *daemonUpgradeServer) Send(m *UpgradeResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Status_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StatusRequest)
if err := dec(in); err != nil {
@@ -478,66 +656,6 @@ func (x *daemonQuitServer) Send(m *QuitResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DaemonServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Daemon_List_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DaemonServer).List(ctx, req.(*ListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Daemon_Leave_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(LeaveRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServer).Leave(m, &daemonLeaveServer{stream})
}
type Daemon_LeaveServer interface {
Send(*LeaveResponse) error
grpc.ServerStream
}
type daemonLeaveServer struct {
grpc.ServerStream
}
func (x *daemonLeaveServer) Send(m *LeaveResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Daemon_Upgrade_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(UpgradeRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServer).Upgrade(m, &daemonUpgradeServer{stream})
}
type Daemon_UpgradeServer interface {
Send(*UpgradeResponse) error
grpc.ServerStream
}
type daemonUpgradeServer struct {
grpc.ServerStream
}
func (x *daemonUpgradeServer) Send(m *UpgradeResponse) error {
return x.ServerStream.SendMsg(m)
}
// Daemon_ServiceDesc is the grpc.ServiceDesc for Daemon service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -545,14 +663,14 @@ var Daemon_ServiceDesc = grpc.ServiceDesc{
ServiceName: "rpc.Daemon",
HandlerType: (*DaemonServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Status",
Handler: _Daemon_Status_Handler,
},
{
MethodName: "List",
Handler: _Daemon_List_Handler,
},
{
MethodName: "Status",
Handler: _Daemon_Status_Handler,
},
},
Streams: []grpc.StreamDesc{
{
@@ -560,24 +678,14 @@ var Daemon_ServiceDesc = grpc.ServiceDesc{
Handler: _Daemon_Connect_Handler,
ServerStreams: true,
},
{
StreamName: "Proxy",
Handler: _Daemon_Proxy_Handler,
ServerStreams: true,
},
{
StreamName: "Disconnect",
Handler: _Daemon_Disconnect_Handler,
ServerStreams: true,
},
{
StreamName: "Logs",
Handler: _Daemon_Logs_Handler,
ServerStreams: true,
},
{
StreamName: "Quit",
Handler: _Daemon_Quit_Handler,
StreamName: "Proxy",
Handler: _Daemon_Proxy_Handler,
ServerStreams: true,
},
{
@@ -585,11 +693,31 @@ var Daemon_ServiceDesc = grpc.ServiceDesc{
Handler: _Daemon_Leave_Handler,
ServerStreams: true,
},
{
StreamName: "Clone",
Handler: _Daemon_Clone_Handler,
ServerStreams: true,
},
{
StreamName: "Remove",
Handler: _Daemon_Remove_Handler,
ServerStreams: true,
},
{
StreamName: "Logs",
Handler: _Daemon_Logs_Handler,
ServerStreams: true,
},
{
StreamName: "Upgrade",
Handler: _Daemon_Upgrade_Handler,
ServerStreams: true,
},
{
StreamName: "Quit",
Handler: _Daemon_Quit_Handler,
ServerStreams: true,
},
},
Metadata: "daemon.proto",
}

View File

@@ -39,13 +39,14 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/util"
)
type DuplicateOptions struct {
type CloneOptions struct {
Namespace string
Headers map[string]string
Workloads []string
ExtraCIDR []string
ExtraDomain []string
Engine config.Engine
UseLocalDNS bool
TargetKubeconfig string
TargetNamespace string
@@ -67,7 +68,7 @@ type DuplicateOptions struct {
factory cmdutil.Factory
}
func (d *DuplicateOptions) InitClient(f cmdutil.Factory) (err error) {
func (d *CloneOptions) InitClient(f cmdutil.Factory) (err error) {
d.factory = f
if d.config, err = d.factory.ToRESTConfig(); err != nil {
return
@@ -99,9 +100,8 @@ func (d *DuplicateOptions) InitClient(f cmdutil.Factory) (err error) {
configFlags.Namespace = pointer.String(d.TargetNamespace)
matchVersionFlags := cmdutil.NewMatchVersionFlags(configFlags)
d.targetFactory = cmdutil.NewFactory(matchVersionFlags)
loader := d.targetFactory.ToRawKubeConfigLoader()
var found bool
d.TargetNamespace, found, err = loader.Namespace()
d.TargetNamespace, found, err = d.targetFactory.ToRawKubeConfigLoader().Namespace()
if err != nil || !found {
d.TargetNamespace = d.Namespace
}
@@ -109,14 +109,14 @@ func (d *DuplicateOptions) InitClient(f cmdutil.Factory) (err error) {
return
}
// DoDuplicate
// DoClone
/*
* 1) download mount path use empty-dir but skip empty-dir in init-containers
* 2) get env from containers
* 3) create serviceAccount as needed
* 4) modify podTempSpec inject kubevpn container
*/
func (d *DuplicateOptions) DoDuplicate(ctx context.Context) error {
func (d *CloneOptions) DoClone(ctx context.Context) error {
rawConfig, err := d.targetFactory.ToRawKubeConfigLoader().RawConfig()
if err != nil {
return err
@@ -151,7 +151,7 @@ func (d *DuplicateOptions) DoDuplicate(ctx context.Context) error {
if err != nil {
return err
}
u.SetName(fmt.Sprintf("%s-dup-%s", u.GetName(), newUUID.String()[:5]))
u.SetName(fmt.Sprintf("%s-clone-%s", u.GetName(), newUUID.String()[:5]))
// if is another cluster, needs to set volume and set env
if !d.isSame {
@@ -263,19 +263,26 @@ func (d *DuplicateOptions) DoDuplicate(ctx context.Context) error {
return err
}
}
i := []string{
"kubevpn",
"proxy",
workload,
"--kubeconfig", "/tmp/.kube/" + config.KUBECONFIG,
"--namespace", d.Namespace,
"--headers", labels.Set(d.Headers).String(),
"--image", config.Image,
"--foreground",
}
container := &v1.Container{
Name: config.ContainerSidecarVPN,
Image: config.Image,
Command: []string{
"kubevpn",
"proxy",
workload,
"--kubeconfig", "/tmp/.kube/" + config.KUBECONFIG,
"--namespace", d.Namespace,
"--headers", labels.Set(d.Headers).String(),
"--image", config.Image,
},
Args: nil,
Args: []string{fmt.Sprintf(`
sysctl -w net.ipv4.ip_forward=1
sysctl -w net.ipv6.conf.all.disable_ipv6=0
sysctl -w net.ipv6.conf.all.forwarding=1
sysctl -w net.ipv4.conf.all.route_localnet=1
%s`, strings.Join(i, " "))},
Command: []string{"/bin/sh", "-c"},
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1000m"),
@@ -317,7 +324,7 @@ func (d *DuplicateOptions) DoDuplicate(ctx context.Context) error {
return createErr
})
if retryErr != nil {
return fmt.Errorf("create duplidate for resource %s failed: %v", workload, retryErr)
return fmt.Errorf("create clone for resource %s failed: %v", workload, retryErr)
}
err = util.WaitPodToBeReady(ctx, d.targetClientset.CoreV1().Pods(d.TargetNamespace), metav1.LabelSelector{MatchLabels: labelsMap})
if err != nil {
@@ -353,7 +360,7 @@ func RemoveUselessInfo(u *unstructured.Unstructured) {
/*
1) calculate volume content, and download it into emptyDir
*/
func (d *DuplicateOptions) setVolume(u *unstructured.Unstructured) error {
func (d *CloneOptions) setVolume(u *unstructured.Unstructured) error {
const TokenVolumeMountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
@@ -520,7 +527,7 @@ func (d *DuplicateOptions) setVolume(u *unstructured.Unstructured) error {
return nil
}
func (d *DuplicateOptions) setEnv(u *unstructured.Unstructured) error {
func (d *CloneOptions) setEnv(u *unstructured.Unstructured) error {
temp, path, err := util.GetPodTemplateSpecPath(u)
if err != nil {
return err
@@ -654,7 +661,7 @@ func (d *DuplicateOptions) setEnv(u *unstructured.Unstructured) error {
}
// replace origin registry with special registry for pulling image
func (d *DuplicateOptions) replaceRegistry(u *unstructured.Unstructured) error {
func (d *CloneOptions) replaceRegistry(u *unstructured.Unstructured) error {
// not pass this options, do nothing
if !d.IsChangeTargetRegistry {
return nil

View File

@@ -686,7 +686,7 @@ func (c *ConnectOptions) InitClient(f cmdutil.Factory) (err error) {
return
}
func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (err error) {
func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet, print bool) (err error) {
if conf.Addr == "" && conf.ConfigAlias == "" {
return
}
@@ -806,7 +806,9 @@ func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (e
}
}
}()
log.Infof("wait jump to bastion host...")
if print {
log.Infof("wait jump to bastion host...")
}
select {
case <-readyChan:
case err = <-errChan:
@@ -838,7 +840,9 @@ func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (e
if err = os.Chmod(temp.Name(), 0644); err != nil {
return err
}
log.Infof("using temp kubeconfig %s", temp.Name())
if print {
log.Infof("using temp kubeconfig %s", temp.Name())
}
err = os.Setenv(clientcmd.RecommendedConfigPathEnvVar, temp.Name())
if err != nil {
return err

View File

@@ -18,6 +18,8 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
"k8s.io/client-go/util/homedir"
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
)
type SshConfig struct {
@@ -29,6 +31,31 @@ type SshConfig struct {
RemoteKubeconfig string
}
func ParseSshFromRPC(sshJump *rpc.SshJump) *SshConfig {
if sshJump == nil {
return &SshConfig{}
}
return &SshConfig{
Addr: sshJump.Addr,
User: sshJump.User,
Password: sshJump.Password,
Keyfile: sshJump.Keyfile,
ConfigAlias: sshJump.ConfigAlias,
RemoteKubeconfig: sshJump.RemoteKubeconfig,
}
}
func (s *SshConfig) ToRPC() *rpc.SshJump {
return &rpc.SshJump{
Addr: s.Addr,
User: s.User,
Password: s.Password,
Keyfile: s.Keyfile,
ConfigAlias: s.ConfigAlias,
RemoteKubeconfig: s.RemoteKubeconfig,
}
}
func Main(pctx context.Context, remoteEndpoint, localEndpoint netip.AddrPort, conf *SshConfig, done chan struct{}) error {
ctx, cancelFunc := context.WithCancel(pctx)
defer cancelFunc()