refactor: use grpc stream send cancel operation (#629)

* refactor: use grpc stream send cancel operation

* refactor: ssh jump remove flags
This commit is contained in:
naison
2025-06-07 12:24:28 +08:00
committed by GitHub
parent 4f3e443bca
commit bb991fc1d7
47 changed files with 1565 additions and 2760 deletions

View File

@@ -2,7 +2,6 @@ package cmds
import (
"context"
"fmt"
"os"
pkgerr "github.com/pkg/errors"
@@ -76,6 +75,7 @@ func CmdClone(f cmdutil.Factory) *cobra.Command {
kubevpn clone service/productpage --ssh-addr <HOST:PORT> --ssh-username <USERNAME> --gssapi-cache /path/to/cache
kubevpn clone service/productpage --ssh-addr <HOST:PORT> --ssh-username <USERNAME> --gssapi-password <PASSWORD>
`)),
Args: cobra.MatchAll(cobra.OnlyValidArgs, cobra.MinimumNArgs(1)),
PreRunE: func(cmd *cobra.Command, args []string) (err error) {
plog.InitLoggerForClient()
// startup daemon process and sudo process
@@ -89,16 +89,6 @@ func CmdClone(f cmdutil.Factory) *cobra.Command {
return err
},
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
_, _ = fmt.Fprintf(os.Stdout, "You must specify the type of resource to proxy. %s\n\n", cmdutil.SuggestAPIResources("kubevpn"))
fullCmdName := cmd.Parent().CommandPath()
usageString := "Required resource not specified."
if len(fullCmdName) > 0 && cmdutil.IsSiblingCommandExists(cmd, "explain") {
usageString = fmt.Sprintf("%s\nUse \"%s explain <resource>\" for a detailed description of that resource (e.g. %[2]s explain pods).", usageString, fullCmdName)
}
return cmdutil.UsageErrorf(cmd, usageString)
}
if syncDir != "" {
local, remote, err := util.ParseDirMapping(syncDir)
if err != nil {
@@ -141,15 +131,18 @@ func CmdClone(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
resp, err := cli.Clone(cmd.Context(), req)
resp, err := cli.Clone(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.CloneResponse](resp)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.CloneResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
err = remove(cli, args)
return err
return nil
}
return err
}
@@ -169,20 +162,3 @@ func CmdClone(f cmdutil.Factory) *cobra.Command {
cmd.ValidArgsFunction = utilcomp.ResourceTypeAndNameCompletionFunc(f)
return cmd
}
func remove(cli rpc.DaemonClient, args []string) error {
resp, err := cli.Remove(context.Background(), &rpc.RemoveRequest{
Workloads: args,
})
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.DisconnectResponse](resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil
}
return err
}
return nil
}

View File

@@ -106,20 +106,26 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
var resp grpc.ClientStream
var resp grpc.BidiStreamingClient[rpc.ConnectRequest, rpc.ConnectResponse]
if lite {
resp, err = cli.ConnectFork(cmd.Context(), req)
resp, err = cli.ConnectFork(context.Background())
} else {
resp, err = cli.Connect(cmd.Context(), req)
resp, err = cli.Connect(context.Background())
}
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.ConnectResponse](resp)
err = resp.Send(req)
if err != nil {
return err
}
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.ConnectResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
err = disconnect(cli, bytes, ns, sshConf)
return err
return nil
}
return err
}
@@ -147,7 +153,12 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command {
}
func disconnect(cli rpc.DaemonClient, bytes []byte, ns string, sshConf *pkgssh.SshConfig) error {
resp, err := cli.Disconnect(context.Background(), &rpc.DisconnectRequest{
resp, err := cli.Disconnect(context.Background())
if err != nil {
plog.G(context.Background()).Errorf("Disconnect error: %v", err)
return err
}
err = resp.Send(&rpc.DisconnectRequest{
KubeconfigBytes: ptr.To(string(bytes)),
Namespace: ptr.To(ns),
SshJump: sshConf.ToRPC(),
@@ -156,7 +167,7 @@ func disconnect(cli rpc.DaemonClient, bytes []byte, ns string, sshConf *pkgssh.S
plog.G(context.Background()).Errorf("Disconnect error: %v", err)
return err
}
err = util.PrintGRPCStream[rpc.DisconnectResponse](resp)
err = util.PrintGRPCStream[rpc.DisconnectResponse](nil, resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -14,6 +14,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/cp"
pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
var cpExample = templates.Examples(i18n.T(`
@@ -73,7 +74,11 @@ func CmdCp(f cmdutil.Factory) *cobra.Command {
Long: i18n.T("Copy files and directories to and from containers. Different between kubectl cp is it will de-reference symbol link."),
Example: cpExample,
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
cmdutil.CheckErr(pkgssh.SshJumpAndSetEnv(cmd.Context(), sshConf, cmd.Flags(), false))
bytes, _, err := util.ConvertToKubeConfigBytes(f)
cmdutil.CheckErr(err)
file, err := util.ConvertToTempKubeconfigFile(bytes)
cmdutil.CheckErr(err)
cmdutil.CheckErr(pkgssh.SshJumpAndSetEnv(cmd.Context(), sshConf, file, false))
var comps []string
if len(args) == 0 {

View File

@@ -18,6 +18,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/handler"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
pkgssh "github.com/wencaiwulue/kubevpn/v2/pkg/ssh"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
"github.com/wencaiwulue/kubevpn/v2/pkg/util/regctl"
)
@@ -101,7 +102,15 @@ func CmdDev(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
return pkgssh.SshJumpAndSetEnv(cmd.Context(), sshConf, cmd.Flags(), false)
bytes, _, err := util.ConvertToKubeConfigBytes(f)
if err != nil {
return err
}
file, err := util.ConvertToTempKubeconfigFile(bytes)
if err != nil {
return err
}
return pkgssh.SshJumpAndSetEnv(cmd.Context(), sshConf, file, false)
},
RunE: func(cmd *cobra.Command, args []string) error {
options.Workload = args[0]

View File

@@ -1,6 +1,7 @@
package cmds
import (
"context"
"fmt"
"os"
"strconv"
@@ -65,18 +66,20 @@ func CmdDisconnect(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
client, err := cli.Disconnect(
cmd.Context(),
&rpc.DisconnectRequest{
ID: ids,
ClusterIDs: clusterIDs,
All: pointer.Bool(all),
},
)
req := &rpc.DisconnectRequest{
ID: ids,
ClusterIDs: clusterIDs,
All: pointer.Bool(all),
}
resp, err := cli.Disconnect(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.DisconnectResponse](client)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.DisconnectResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -53,13 +53,7 @@ func CmdGet(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
client, err := cli.Get(
cmd.Context(),
&rpc.GetRequest{
Namespace: ns,
Resource: args[0],
},
)
client, err := cli.Get(cmd.Context(), &rpc.GetRequest{Namespace: ns, Resource: args[0]})
if err != nil {
return err
}

View File

@@ -1,6 +1,8 @@
package cmds
import (
"context"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -43,14 +45,19 @@ func CmdLeave(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
resp, err := cli.Leave(cmd.Context(), &rpc.LeaveRequest{
req := &rpc.LeaveRequest{
Namespace: ns,
Workloads: args,
})
}
resp, err := cli.Leave(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.LeaveResponse](resp)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.LeaveResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -29,10 +29,7 @@ func CmdList(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
client, err := cli.List(
cmd.Context(),
&rpc.ListRequest{},
)
client, err := cli.List(cmd.Context(), &rpc.ListRequest{})
if err != nil {
return err
}

View File

@@ -38,11 +38,15 @@ func CmdLogs(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
client, err := cli.Logs(cmd.Context(), req)
resp, err := cli.Logs(cmd.Context())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.LogResponse](client)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.LogResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -116,33 +116,34 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
resp, err := cli.Proxy(
cmd.Context(),
&rpc.ProxyRequest{
KubeconfigBytes: string(bytes),
Namespace: ns,
Headers: headers,
PortMap: portmap,
Workloads: args,
ExtraRoute: extraRoute.ToRPC(),
Engine: string(connect.Engine),
SshJump: sshConf.ToRPC(),
TransferImage: transferImage,
Image: config.Image,
ImagePullSecretName: imagePullSecretName,
Level: int32(util.If(config.Debug, log.DebugLevel, log.InfoLevel)),
OriginKubeconfigPath: util.GetKubeConfigPath(f),
ManagerNamespace: managerNamespace,
},
)
req := &rpc.ProxyRequest{
KubeconfigBytes: string(bytes),
Namespace: ns,
Headers: headers,
PortMap: portmap,
Workloads: args,
ExtraRoute: extraRoute.ToRPC(),
Engine: string(connect.Engine),
SshJump: sshConf.ToRPC(),
TransferImage: transferImage,
Image: config.Image,
ImagePullSecretName: imagePullSecretName,
Level: int32(util.If(config.Debug, log.DebugLevel, log.InfoLevel)),
OriginKubeconfigPath: util.GetKubeConfigPath(f),
ManagerNamespace: managerNamespace,
}
resp, err := cli.Proxy(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.ConnectResponse](resp)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.ConnectResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
err = leave(cli, ns, args)
return err
return nil
}
return err
}
@@ -173,14 +174,19 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
}
func leave(cli rpc.DaemonClient, ns string, args []string) error {
stream, err := cli.Leave(context.Background(), &rpc.LeaveRequest{
req := &rpc.LeaveRequest{
Namespace: ns,
Workloads: args,
})
}
resp, err := cli.Leave(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.LeaveResponse](stream)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.LeaveResponse](nil, resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -44,11 +44,15 @@ func quit(ctx context.Context, isSudo bool) error {
if err != nil {
return err
}
client, err := cli.Quit(ctx, &rpc.QuitRequest{})
resp, err := cli.Quit(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.QuitResponse](client)
err = resp.Send(&rpc.QuitRequest{})
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.QuitResponse](ctx, resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -1,6 +1,8 @@
package cmds
import (
"context"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -36,13 +38,18 @@ func CmdRemove(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
resp, err := cli.Remove(cmd.Context(), &rpc.RemoveRequest{
req := &rpc.RemoveRequest{
Workloads: args,
})
}
resp, err := cli.Remove(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.RemoveResponse](resp)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.RemoveResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -1,6 +1,8 @@
package cmds
import (
"context"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -66,11 +68,15 @@ func CmdReset(f cmdutil.Factory) *cobra.Command {
Workloads: args,
SshJump: sshConf.ToRPC(),
}
resp, err := cli.Reset(cmd.Context(), req)
resp, err := cli.Reset(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.ResetResponse](resp)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.ResetResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -35,12 +35,7 @@ func CmdSSHDaemon(cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
resp, err := cli.SshStart(
cmd.Context(),
&rpc.SshStartRequest{
ClientIP: clientIP,
},
)
resp, err := cli.SshStart(cmd.Context(), &rpc.SshStartRequest{ClientIP: clientIP})
if err != nil {
return err
}

View File

@@ -84,12 +84,7 @@ func CmdStatus(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
resp, err := cli.Status(
cmd.Context(),
&rpc.StatusRequest{
ClusterIDs: clusterIDs,
},
)
resp, err := cli.Status(cmd.Context(), &rpc.StatusRequest{ClusterIDs: clusterIDs})
if err != nil {
return err
}
@@ -260,22 +255,15 @@ func GetClusterIDByConfig(cmd *cobra.Command, config Config) (string, error) {
if err != nil {
return "", err
}
flags = flag.NewFlagSet("", flag.ContinueOnError)
flags.AddFlag(&flag.Flag{
Name: "kubeconfig",
DefValue: file,
})
flags.AddFlag(&flag.Flag{
Name: "namespace",
DefValue: ns,
})
var path string
path, err = pkgssh.SshJump(cmd.Context(), sshConf, flags, false)
if err != nil {
return "", err
defer os.Remove(file)
if !sshConf.IsEmpty() {
file, err = pkgssh.SshJump(cmd.Context(), sshConf, file, false)
if err != nil {
return "", err
}
}
var c = &handler.ConnectOptions{}
err = c.InitClient(util.InitFactoryByPath(path, ns))
err = c.InitClient(util.InitFactoryByPath(file, ns))
if err != nil {
return "", err
}

View File

@@ -17,7 +17,7 @@ func TestPrintProxyAndClone(t *testing.T) {
Mode: "full",
Kubeconfig: "/Users/bytedance/.kube/test-feiyan-config-private-new",
Namespace: "vke-system",
Status: "Connected",
Status: "connected",
Netif: "utun4",
ProxyList: []*rpc.Proxy{
{
@@ -62,7 +62,7 @@ func TestPrintProxyAndClone(t *testing.T) {
Mode: "full",
Kubeconfig: "/Users/bytedance/.kube/dev_fy_config_new",
Namespace: "vke-system",
Status: "Connected",
Status: "connected",
Netif: "utun5",
ProxyList: []*rpc.Proxy{},
CloneList: []*rpc.Clone{},
@@ -86,7 +86,7 @@ func TestPrintProxy(t *testing.T) {
Mode: "full",
Kubeconfig: "/Users/bytedance/.kube/test-feiyan-config-private-new",
Namespace: "vke-system",
Status: "Connected",
Status: "connected",
Netif: "utun4",
ProxyList: []*rpc.Proxy{
{
@@ -115,7 +115,7 @@ func TestPrintProxy(t *testing.T) {
Mode: "full",
Kubeconfig: "/Users/bytedance/.kube/dev_fy_config_new",
Namespace: "vke-system",
Status: "Connected",
Status: "connected",
Netif: "utun5",
ProxyList: []*rpc.Proxy{},
CloneList: []*rpc.Clone{},
@@ -139,7 +139,7 @@ func TestPrintClone(t *testing.T) {
Mode: "full",
Kubeconfig: "/Users/bytedance/.kube/test-feiyan-config-private-new",
Namespace: "vke-system",
Status: "Connected",
Status: "connected",
Netif: "utun4",
ProxyList: []*rpc.Proxy{},
CloneList: []*rpc.Clone{
@@ -167,7 +167,7 @@ func TestPrintClone(t *testing.T) {
Mode: "full",
Kubeconfig: "/Users/bytedance/.kube/dev_fy_config_new",
Namespace: "vke-system",
Status: "Connected",
Status: "connected",
Netif: "utun5",
ProxyList: []*rpc.Proxy{},
CloneList: []*rpc.Clone{},
@@ -191,7 +191,7 @@ func TestPrint(t *testing.T) {
Mode: "full",
Kubeconfig: "/Users/bytedance/.kube/test-feiyan-config-private-new",
Namespace: "vke-system",
Status: "Connected",
Status: "connected",
Netif: "utun4",
ProxyList: []*rpc.Proxy{},
CloneList: []*rpc.Clone{},
@@ -203,7 +203,7 @@ func TestPrint(t *testing.T) {
Mode: "full",
Kubeconfig: "/Users/bytedance/.kube/dev_fy_config_new",
Namespace: "vke-system",
Status: "Connected",
Status: "connected",
Netif: "utun5",
ProxyList: []*rpc.Proxy{},
CloneList: []*rpc.Clone{},

View File

@@ -1,6 +1,8 @@
package cmds
import (
"context"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -62,15 +64,19 @@ func CmdUninstall(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
disconnectResp, err := cli.Disconnect(cmd.Context(), &rpc.DisconnectRequest{
KubeconfigBytes: ptr.To(string(bytes)),
Namespace: ptr.To(ns),
SshJump: sshConf.ToRPC(),
})
disconnectResp, err := cli.Disconnect(context.Background())
if err != nil {
plog.G(cmd.Context()).Warnf("Failed to disconnect from cluter: %v", err)
} else {
_ = util.PrintGRPCStream[rpc.DisconnectResponse](disconnectResp)
err = disconnectResp.Send(&rpc.DisconnectRequest{
KubeconfigBytes: ptr.To(string(bytes)),
Namespace: ptr.To(ns),
SshJump: sshConf.ToRPC(),
})
if err != nil {
plog.G(cmd.Context()).Warnf("Failed to disconnect from cluter: %v", err)
}
_ = util.PrintGRPCStream[rpc.DisconnectResponse](cmd.Context(), disconnectResp)
}
req := &rpc.UninstallRequest{
@@ -78,11 +84,15 @@ func CmdUninstall(f cmdutil.Factory) *cobra.Command {
Namespace: ns,
SshJump: sshConf.ToRPC(),
}
resp, err := cli.Uninstall(cmd.Context(), req)
resp, err := cli.Uninstall(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.UninstallResponse](resp)
err = resp.Send(req)
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.UninstallResponse](cmd.Context(), resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -1,10 +1,11 @@
package main
import (
_ "net/http/pprof"
ctrl "sigs.k8s.io/controller-runtime"
_ "k8s.io/client-go/plugin/pkg/client/auth"
_ "net/http/pprof"
"github.com/wencaiwulue/kubevpn/v2/cmd/kubevpn/cmds"
)

View File

@@ -3,8 +3,9 @@ package action
import (
"context"
"io"
"os"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
@@ -14,7 +15,11 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) (err error) {
func (svr *Server) Clone(resp rpc.Daemon_CloneServer) (err error) {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(req.Level, io.MultiWriter(newCloneWarp(resp), svr.LogFile))
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
@@ -34,11 +39,40 @@ func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) (er
if err != nil {
return err
}
connResp, err := cli.Connect(resp.Context(), connReq)
var connResp grpc.BidiStreamingClient[rpc.ConnectRequest, rpc.ConnectResponse]
var disconnectResp rpc.Daemon_DisconnectClient
sshCtx, sshFunc := context.WithCancel(context.Background())
go func() {
var s rpc.Cancel
err = resp.RecvMsg(&s)
if err != nil {
return
}
if connResp != nil {
_ = connResp.SendMsg(&s)
}
if disconnectResp != nil {
_ = disconnectResp.SendMsg(&s)
}
sshFunc()
}()
connResp, err = cli.Connect(context.Background())
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.ConnectResponse](connResp, io.MultiWriter(newCloneWarp(resp), svr.LogFile))
err = connResp.SendMsg(&connReq)
if err != nil {
return err
}
err = util.CopyAndConvertGRPCStream[rpc.ConnectResponse, rpc.CloneResponse](
connResp,
resp,
func(r *rpc.ConnectResponse) *rpc.CloneResponse {
_, _ = svr.LogFile.Write([]byte(r.Message))
return &rpc.CloneResponse{Message: r.Message}
})
if err != nil {
return err
}
@@ -61,12 +95,6 @@ func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) (er
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
sshCtx, sshFunc := context.WithCancel(context.Background())
defer func() {
if err != nil {
_ = options.Cleanup(sshCtx)
@@ -75,14 +103,16 @@ func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) (er
}()
options.AddRollbackFunc(func() error {
sshFunc()
_ = os.Remove(file)
return nil
})
var path string
path, err = ssh.SshJump(sshCtx, sshConf, flags, false)
if err != nil {
return err
if !sshConf.IsEmpty() {
file, err = ssh.SshJump(sshCtx, sshConf, file, false)
if err != nil {
return err
}
}
f := util.InitFactoryByPath(path, req.Namespace)
f := util.InitFactoryByPath(file, req.Namespace)
err = options.InitClient(f)
if err != nil {
plog.G(context.Background()).Errorf("Failed to init client: %v", err)
@@ -91,7 +121,7 @@ func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) (er
config.Image = req.Image
logger.Infof("Clone workloads...")
options.SetContext(sshCtx)
err = options.DoClone(plog.WithLogger(resp.Context(), logger), []byte(req.KubeconfigBytes))
err = options.DoClone(plog.WithLogger(sshCtx, logger), []byte(req.KubeconfigBytes))
if err != nil {
plog.G(context.Background()).Errorf("Clone workloads failed: %v", err)
return err

View File

@@ -3,8 +3,6 @@ package action
import (
"context"
"github.com/spf13/pflag"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/ssh"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
@@ -18,26 +16,21 @@ func (svr *Server) ConfigAdd(ctx context.Context, req *rpc.ConfigAddRequest) (re
if err != nil {
return nil, err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
sshCtx, sshCancel := context.WithCancel(context.Background())
defer func() {
if err != nil {
sshCancel()
}
}()
var path string
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
path, err = ssh.SshJump(sshCtx, sshConf, flags, true)
if err != nil {
return nil, err
if !sshConf.IsEmpty() {
file, err = ssh.SshJump(sshCtx, sshConf, file, true)
if err != nil {
return nil, err
}
}
CancelFunc[path] = sshCancel
return &rpc.ConfigAddResponse{ClusterID: path}, nil
CancelFunc[file] = sshCancel
return &rpc.ConfigAddResponse{ClusterID: file}, nil
}
func (svr *Server) ConfigRemove(ctx context.Context, req *rpc.ConfigRemoveRequest) (*rpc.ConfigRemoveResponse, error) {

View File

@@ -7,7 +7,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
@@ -17,7 +17,12 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectForkServer) (err error) {
func (svr *Server) ConnectFork(resp rpc.Daemon_ConnectForkServer) (err error) {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(req.Level, io.MultiWriter(newConnectForkWarp(resp), svr.LogFile))
if !svr.IsSudo {
return svr.redirectConnectForkToSudoDaemon(req, resp, logger)
@@ -40,9 +45,10 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
sshCtx, sshCancel := context.WithCancel(context.Background())
connect.AddRolloutFunc(func() error {
sshCancel()
os.Remove(file)
_ = os.Remove(file)
return nil
})
go util.ListenCancel(resp, sshCancel)
sshCtx = plog.WithLogger(sshCtx, logger)
defer plog.WithoutLogger(sshCtx)
defer func() {
@@ -62,7 +68,7 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
}
config.Image = req.Image
err = connect.DoConnect(sshCtx, true, ctx.Done())
err = connect.DoConnect(sshCtx, true)
if err != nil {
logger.Errorf("Failed to connect...")
return err
@@ -75,7 +81,7 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
return nil
}
func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServer, logger *log.Logger) (err error) {
func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectForkServer, logger *log.Logger) (err error) {
cli, err := svr.GetClient(true)
if err != nil {
return errors.Wrap(err, "sudo daemon not start")
@@ -85,11 +91,6 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
sshCtx, sshCancel := context.WithCancel(context.Background())
sshCtx = plog.WithLogger(sshCtx, logger)
defer plog.WithoutLogger(sshCtx)
@@ -102,7 +103,7 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp
}
connect.AddRolloutFunc(func() error {
sshCancel()
os.Remove(file)
_ = os.Remove(file)
return nil
})
defer func() {
@@ -111,16 +112,28 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp
sshCancel()
}
}()
var path string
path, err = ssh.SshJump(sshCtx, sshConf, flags, true)
if err != nil {
return err
var connResp grpc.BidiStreamingClient[rpc.ConnectRequest, rpc.ConnectResponse]
go func() {
var s rpc.Cancel
err = resp.RecvMsg(&s)
if err != nil {
return
}
if connResp != nil {
_ = connResp.SendMsg(&s)
} else {
sshCancel()
}
}()
if !sshConf.IsEmpty() {
file, err = ssh.SshJump(sshCtx, sshConf, file, true)
if err != nil {
return err
}
}
connect.AddRolloutFunc(func() error {
os.Remove(path)
return nil
})
err = connect.InitClient(util.InitFactoryByPath(path, req.Namespace))
err = connect.InitClient(util.InitFactoryByPath(file, req.Namespace))
if err != nil {
return err
}
@@ -147,8 +160,6 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp
)
if isSameCluster {
sshCancel()
os.Remove(file)
os.Remove(path)
// same cluster, do nothing
logger.Infof("Connected with cluster")
return nil
@@ -161,13 +172,17 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp
}
// only ssh jump in user daemon
content, err := os.ReadFile(path)
content, err := os.ReadFile(file)
if err != nil {
return err
}
req.KubeconfigBytes = string(content)
req.SshJump = ssh.SshConfig{}.ToRPC()
connResp, err := cli.ConnectFork(ctx, req)
connResp, err = cli.ConnectFork(ctx)
if err != nil {
return err
}
err = connResp.Send(req)
if err != nil {
return err
}

View File

@@ -8,7 +8,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -20,7 +20,12 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServer) (e error) {
func (svr *Server) Connect(resp rpc.Daemon_ConnectServer) (err error) {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(req.Level, io.MultiWriter(newWarp(resp), svr.LogFile))
if !svr.IsSudo {
return svr.redirectToSudoDaemon(req, resp, logger)
@@ -32,7 +37,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
return status.Error(codes.AlreadyExists, s)
}
defer func() {
if e != nil || ctx.Err() != nil {
if err != nil || ctx.Err() != nil {
if svr.connect != nil {
svr.connect.Cleanup(plog.WithLogger(context.Background(), logger))
svr.connect = nil
@@ -50,24 +55,25 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
Lock: &svr.Lock,
ImagePullSecretName: req.ImagePullSecretName,
}
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
var file string
file, err = util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
}
sshCtx, sshCancel := context.WithCancel(context.Background())
svr.connect.AddRolloutFunc(func() error {
sshCancel()
os.Remove(file)
_ = os.Remove(file)
return nil
})
go util.ListenCancel(resp, sshCancel)
sshCtx = plog.WithLogger(sshCtx, logger)
defer plog.WithoutLogger(sshCtx)
defer func() {
if e != nil {
if err != nil {
svr.connect.Cleanup(sshCtx)
svr.connect = nil
svr.t = time.Time{}
os.Remove(file)
sshCancel()
}
}()
@@ -75,13 +81,13 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
if err != nil {
return err
}
err = svr.connect.GetIPFromContext(ctx, nil)
err = svr.connect.GetIPFromContext(ctx, logger)
if err != nil {
return err
}
config.Image = req.Image
err = svr.connect.DoConnect(sshCtx, false, ctx.Done())
err = svr.connect.DoConnect(sshCtx, false)
if err != nil {
logger.Errorf("Failed to connect...")
return err
@@ -99,11 +105,6 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
sshCtx, sshCancel := context.WithCancel(context.Background())
sshCtx = plog.WithLogger(sshCtx, logger)
defer plog.WithoutLogger(sshCtx)
@@ -116,26 +117,37 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
}
connect.AddRolloutFunc(func() error {
sshCancel()
os.Remove(file)
_ = os.Remove(file)
return nil
})
defer func() {
if e != nil {
connect.Cleanup(plog.WithLogger(context.Background(), logger))
sshCancel()
os.Remove(file)
}
}()
var path string
path, err = ssh.SshJump(sshCtx, sshConf, flags, true)
if err != nil {
return err
var connResp grpc.BidiStreamingClient[rpc.ConnectRequest, rpc.ConnectResponse]
go func() {
var s rpc.Cancel
err = resp.RecvMsg(&s)
if err != nil {
return
}
if connResp != nil {
_ = connResp.SendMsg(&s)
} else {
sshCancel()
}
}()
if !sshConf.IsEmpty() {
file, err = ssh.SshJump(sshCtx, sshConf, file, true)
if err != nil {
return err
}
}
connect.AddRolloutFunc(func() error {
os.Remove(path)
return nil
})
err = connect.InitClient(util.InitFactoryByPath(path, req.Namespace))
err = connect.InitClient(util.InitFactoryByPath(file, req.Namespace))
if err != nil {
return err
}
@@ -162,8 +174,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
)
if isSameCluster {
sshCancel()
os.Remove(path)
os.Remove(file)
_ = os.Remove(file)
// same cluster, do nothing
logger.Infof("Connected to cluster")
return nil
@@ -179,13 +190,17 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
}
// only ssh jump in user daemon
content, err := os.ReadFile(path)
content, err := os.ReadFile(file)
if err != nil {
return err
}
req.KubeconfigBytes = string(content)
req.SshJump = ssh.SshConfig{}.ToRPC()
connResp, err := cli.Connect(ctx, req)
connResp, err = cli.Connect(ctx)
if err != nil {
return err
}
err = connResp.Send(req)
if err != nil {
return err
}

View File

@@ -3,11 +3,11 @@ package action
import (
"context"
"io"
"os"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
@@ -18,7 +18,12 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_DisconnectServer) error {
func (svr *Server) Disconnect(resp rpc.Daemon_DisconnectServer) error {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newDisconnectWarp(resp), svr.LogFile))
ctx := plog.WithLogger(resp.Context(), logger)
@@ -30,7 +35,11 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon
if err != nil {
return errors.Wrap(err, "sudo daemon not start")
}
connResp, err := cli.Disconnect(resp.Context(), req)
connResp, err := cli.Disconnect(resp.Context())
if err != nil {
return err
}
err = connResp.Send(req)
if err != nil {
return err
}
@@ -76,7 +85,7 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon
plog.G(ctx).Errorf("Index %d out of range", req.GetID())
}
case req.KubeconfigBytes != nil && req.Namespace != nil:
err := disconnectByKubeConfig(
err = disconnectByKubeConfig(
resp.Context(),
svr,
req.GetKubeconfigBytes(),
@@ -129,21 +138,16 @@ func disconnectByKubeConfig(ctx context.Context, svr *Server, kubeconfigBytes st
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
defer os.Remove(file)
var sshConf = ssh.ParseSshFromRPC(jump)
var path string
path, err = ssh.SshJump(ctx, sshConf, flags, false)
if err != nil {
return err
if !sshConf.IsEmpty() {
file, err = ssh.SshJump(ctx, sshConf, file, false)
if err != nil {
return err
}
}
connect := &handler.ConnectOptions{
Namespace: ns,
}
err = connect.InitClient(util.InitFactoryByPath(path, ns))
connect := &handler.ConnectOptions{}
err = connect.InitClient(util.InitFactoryByPath(file, ns))
if err != nil {
return err
}

View File

@@ -3,55 +3,36 @@ package action
import (
"fmt"
"io"
"time"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/v2/pkg/controlplane"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/inject"
"github.com/wencaiwulue/kubevpn/v2/pkg/handler"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) error {
func (svr *Server) Leave(resp rpc.Daemon_LeaveServer) error {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newLeaveWarp(resp), svr.LogFile))
if svr.connect == nil {
logger.Infof("Not proxy any resource in cluster")
return fmt.Errorf("not proxy any resource in cluster")
logger.Infof("No proxy resource found")
return fmt.Errorf("no proxy resource found")
}
ctx := plog.WithLogger(resp.Context(), logger)
factory := svr.connect.GetFactory()
namespace := svr.connect.Namespace
mapInterface := svr.connect.GetClientset().CoreV1().ConfigMaps(namespace)
v4, _ := svr.connect.GetLocalTunIP()
for _, workload := range req.GetWorkloads() {
object, controller, err := util.GetTopOwnerObject(ctx, factory, req.Namespace, workload)
if err != nil {
logger.Errorf("Failed to get unstructured controller: %v", err)
return err
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
// add rollback func to remove envoy config
var empty bool
empty, err = inject.UnPatchContainer(ctx, nodeID, factory, mapInterface, controller, func(isFargateMode bool, rule *controlplane.Rule) bool {
if isFargateMode {
return svr.connect.IsMe(req.Namespace, util.ConvertWorkloadToUid(workload), rule.Headers)
}
return rule.LocalTunIPv4 == v4
var resources []handler.Resources
for _, resource := range req.GetWorkloads() {
resources = append(resources, handler.Resources{
Namespace: req.Namespace,
Workload: resource,
})
if err != nil {
plog.G(ctx).Errorf("Leaving workload %s failed: %v", workload, err)
continue
}
if empty && util.IsK8sService(object) {
err = inject.ModifyServiceTargetPort(ctx, svr.connect.GetClientset(), req.Namespace, object.Name, map[int32]int32{})
}
svr.connect.LeavePortMap(req.Namespace, workload)
err = util.RolloutStatus(ctx, factory, req.Namespace, workload, time.Minute*60)
}
return nil
return svr.connect.LeaveResource(ctx, resources, v4)
}
type leaveWarp struct {

View File

@@ -11,7 +11,12 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
)
func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error {
func (svr *Server) Logs(resp rpc.Daemon_LogsServer) error {
req, err := resp.Recv()
if err != nil {
return err
}
// only show latest N lines
line := int64(max(req.Lines, -req.Lines))
sudoLine, sudoSize, err := seekToLastLine(config.GetDaemonLogPath(true), line)

View File

@@ -3,10 +3,11 @@ package action
import (
"context"
"io"
"os"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/cli-runtime/pkg/resource"
@@ -27,7 +28,12 @@ import (
// 2. if already connect to cluster
// 2.1 disconnect from cluster
// 2.2 same as step 1
func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e error) {
func (svr *Server) Proxy(resp rpc.Daemon_ProxyServer) (e error) {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newProxyWarp(resp), svr.LogFile))
config.Image = req.Image
ctx := plog.WithLogger(resp.Context(), logger)
@@ -37,24 +43,20 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
var path string
path, err = ssh.SshJump(ctx, sshConf, flags, false)
if err != nil {
return err
defer os.Remove(file)
if !sshConf.IsEmpty() {
file, err = ssh.SshJump(ctx, sshConf, file, false)
if err != nil {
return err
}
}
connect := &handler.ConnectOptions{
Namespace: req.Namespace,
ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute),
Engine: config.Engine(req.Engine),
OriginKubeconfigPath: req.OriginKubeconfigPath,
ImagePullSecretName: req.ImagePullSecretName,
}
err = connect.InitClient(util.InitFactoryByPath(path, req.Namespace))
err = connect.InitClient(util.InitFactoryByPath(file, req.Namespace))
if err != nil {
return err
}
@@ -84,9 +86,34 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e
return errors.Wrap(err, "daemon is not available")
}
var connResp, reConnResp grpc.BidiStreamingClient[rpc.ConnectRequest, rpc.ConnectResponse]
var disconnectResp rpc.Daemon_DisconnectClient
cancel, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
go func() {
var s rpc.Cancel
err = resp.RecvMsg(&s)
if err != nil {
return
}
if connResp != nil {
_ = connResp.SendMsg(&s)
}
if disconnectResp != nil {
_ = disconnectResp.SendMsg(&s)
}
if reConnResp != nil {
_ = reConnResp.SendMsg(&s)
}
cancelFunc()
}()
plog.G(ctx).Debugf("Connecting to cluster")
var connResp rpc.Daemon_ConnectClient
connResp, err = cli.Connect(ctx, convert(req))
connResp, err = cli.Connect(context.Background())
if err != nil {
return err
}
err = connResp.Send(convert(req))
if err != nil {
return err
}
@@ -96,8 +123,11 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e
return err
}
plog.G(ctx).Infof("Disconnecting from another cluster...")
var disconnectResp rpc.Daemon_DisconnectClient
disconnectResp, err = cli.Disconnect(ctx, &rpc.DisconnectRequest{ID: ptr.To[int32](0)})
disconnectResp, err = cli.Disconnect(context.Background())
if err != nil {
return err
}
err = disconnectResp.Send(&rpc.DisconnectRequest{ID: ptr.To[int32](0)})
if err != nil {
return err
}
@@ -105,23 +135,28 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e
disconnectResp,
resp,
func(response *rpc.DisconnectResponse) *rpc.ConnectResponse {
_, _ = svr.LogFile.Write([]byte(response.Message))
return &rpc.ConnectResponse{Message: response.Message}
},
)
if err != nil {
return err
}
connResp, err = cli.Connect(ctx, convert(req))
reConnResp, err = cli.Connect(context.Background())
if err != nil {
return err
}
err = util.CopyGRPCStream[rpc.ConnectResponse](connResp, resp)
err = reConnResp.Send(convert(req))
if err != nil {
return err
}
err = util.CopyGRPCStream[rpc.ConnectResponse](reConnResp, resp)
if err != nil {
return err
}
}
err = svr.connect.CreateRemoteInboundPod(ctx, req.Namespace, workloads, req.Headers, req.PortMap)
err = svr.connect.CreateRemoteInboundPod(plog.WithLogger(cancel, logger), req.Namespace, workloads, req.Headers, req.PortMap)
if err != nil {
plog.G(ctx).Errorf("Failed to inject inbound sidecar: %v", err)
return err

View File

@@ -14,7 +14,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error {
func (svr *Server) Quit(resp rpc.Daemon_QuitServer) error {
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newQuitWarp(resp), svr.LogFile))
ctx := context.Background()
if resp != nil {

View File

@@ -9,11 +9,15 @@ import (
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
func (svr *Server) Remove(req *rpc.RemoveRequest, resp rpc.Daemon_RemoveServer) error {
func (svr *Server) Remove(resp rpc.Daemon_RemoveServer) error {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newRemoveWarp(resp), svr.LogFile))
ctx := plog.WithLogger(resp.Context(), logger)
if svr.clone != nil {
err := svr.clone.Cleanup(ctx, req.Workloads...)
err = svr.clone.Cleanup(ctx, req.Workloads...)
svr.clone = nil
return err
} else {

View File

@@ -2,9 +2,9 @@ package action
import (
"io"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/handler"
@@ -13,27 +13,28 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) Reset(req *rpc.ResetRequest, resp rpc.Daemon_ResetServer) error {
func (svr *Server) Reset(resp rpc.Daemon_ResetServer) error {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newResetWarp(resp), svr.LogFile))
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
defer os.Remove(file)
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
var ctx = plog.WithLogger(resp.Context(), logger)
var path string
path, err = ssh.SshJump(ctx, sshConf, flags, false)
if err != nil {
return err
if !sshConf.IsEmpty() {
file, err = ssh.SshJump(ctx, sshConf, file, false)
if err != nil {
return err
}
}
connect := &handler.ConnectOptions{}
err = connect.InitClient(util.InitFactoryByPath(path, req.Namespace))
err = connect.InitClient(util.InitFactoryByPath(file, req.Namespace))
if err != nil {
return err
}

View File

@@ -16,8 +16,8 @@ import (
)
const (
StatusOk = "Connected"
StatusFailed = "Disconnected"
StatusOk = "connected"
StatusFailed = "disconnected"
ModeFull = "full"
ModeLite = "lite"

View File

@@ -10,7 +10,11 @@ import (
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error {
func (svr *Server) Stop(resp rpc.Daemon_QuitServer) error {
_, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newStopWarp(resp), svr.LogFile))
ctx := plog.WithLogger(resp.Context(), logger)
if svr.connect == nil {

View File

@@ -2,9 +2,9 @@ package action
import (
"io"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/handler"
@@ -13,7 +13,11 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) Uninstall(req *rpc.UninstallRequest, resp rpc.Daemon_UninstallServer) error {
func (svr *Server) Uninstall(resp rpc.Daemon_UninstallServer) error {
req, err := resp.Recv()
if err != nil {
return err
}
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newUninstallWarp(resp), svr.LogFile))
connect := &handler.ConnectOptions{
@@ -25,19 +29,16 @@ func (svr *Server) Uninstall(req *rpc.UninstallRequest, resp rpc.Daemon_Uninstal
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
defer os.Remove(file)
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
var ctx = plog.WithLogger(resp.Context(), logger)
var path string
path, err = ssh.SshJump(ctx, sshConf, flags, false)
if err != nil {
return err
if !sshConf.IsEmpty() {
file, err = ssh.SshJump(ctx, sshConf, file, false)
if err != nil {
return err
}
}
err = connect.InitClient(util.InitFactoryByPath(path, req.Namespace))
err = connect.InitClient(util.InitFactoryByPath(file, req.Namespace))
if err != nil {
return err
}

View File

@@ -65,11 +65,15 @@ func GetClient(isSudo bool) (cli rpc.DaemonClient, err error) {
if err == nil && resp.NeedUpgrade {
// quit daemon
var quitStream rpc.Daemon_QuitClient
quitStream, err = cli.Quit(ctx, &rpc.QuitRequest{})
quitStream, err = cli.Quit(ctx)
if err != nil {
return nil, err
}
err = util.PrintGRPCStream[rpc.QuitResponse](quitStream, nil)
err = quitStream.Send(&rpc.QuitRequest{})
if err != nil {
return nil, err
}
err = util.PrintGRPCStream[rpc.QuitResponse](nil, quitStream, nil)
return
}

View File

@@ -119,7 +119,7 @@ func (o *SvrOption) Start(ctx context.Context) error {
// remember to close http server, otherwise daemon will not quit successfully
cancel := func() {
o.svr.Cancel = nil
_ = o.svr.Quit(&rpc.QuitRequest{}, nil)
_ = o.svr.Quit(nil)
_ = downgradingServer.Close()
_ = l.Close()
}

File diff suppressed because it is too large Load Diff

View File

@@ -5,13 +5,13 @@ option go_package = ".;rpc";
package rpc;
service Daemon {
rpc Connect (ConnectRequest) returns (stream ConnectResponse) {}
rpc ConnectFork (ConnectRequest) returns (stream ConnectResponse) {}
rpc Disconnect (DisconnectRequest) returns (stream DisconnectResponse) {}
rpc Proxy (ProxyRequest) returns (stream ProxyResponse) {}
rpc Leave (LeaveRequest) returns (stream LeaveResponse) {}
rpc Clone (CloneRequest) returns (stream CloneResponse) {}
rpc Remove (RemoveRequest) returns (stream RemoveResponse) {}
rpc Connect (stream ConnectRequest) returns (stream ConnectResponse) {}
rpc ConnectFork (stream ConnectRequest) returns (stream ConnectResponse) {}
rpc Disconnect (stream DisconnectRequest) returns (stream DisconnectResponse) {}
rpc Proxy (stream ProxyRequest) returns (stream ProxyResponse) {}
rpc Leave (stream LeaveRequest) returns (stream LeaveResponse) {}
rpc Clone (stream CloneRequest) returns (stream CloneResponse) {}
rpc Remove (stream RemoveRequest) returns (stream RemoveResponse) {}
rpc ConfigAdd (ConfigAddRequest) returns (ConfigAddResponse) {}
rpc ConfigRemove (ConfigRemoveRequest) returns (ConfigRemoveResponse) {}
@@ -19,15 +19,15 @@ service Daemon {
rpc SshStop (SshStopRequest) returns (SshStopResponse) {}
rpc SshConnect (stream SshConnectRequest) returns (stream SshConnectResponse) {}
rpc Logs (LogRequest) returns (stream LogResponse) {}
rpc Logs (stream LogRequest) returns (stream LogResponse) {}
rpc List (ListRequest) returns (ListResponse) {}
rpc Get (GetRequest) returns (GetResponse) {}
rpc Upgrade (UpgradeRequest) returns (UpgradeResponse) {}
rpc Status (StatusRequest) returns (StatusResponse) {}
rpc Version (VersionRequest) returns (VersionResponse) {}
rpc Reset (ResetRequest) returns (stream ResetResponse) {}
rpc Uninstall (UninstallRequest) returns (stream UninstallResponse) {}
rpc Quit (QuitRequest) returns (stream QuitResponse) {}
rpc Reset (stream ResetRequest) returns (stream ResetResponse) {}
rpc Uninstall (stream UninstallRequest) returns (stream UninstallResponse) {}
rpc Quit (stream QuitRequest) returns (stream QuitResponse) {}
rpc Identify (IdentifyRequest) returns (IdentifyResponse) {}
}
@@ -373,4 +373,6 @@ message IdentifyRequest {}
message IdentifyResponse {
string ID = 1;
}
}
message Cancel {}

File diff suppressed because it is too large Load Diff

View File

@@ -122,7 +122,11 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
ManagerNamespace: managerNamespace,
}
option.AddRollbackFunc(func() error {
resp, err := cli.Disconnect(ctx, &rpc.DisconnectRequest{
resp, err := cli.Disconnect(context.Background())
if err != nil {
return err
}
err = resp.Send(&rpc.DisconnectRequest{
KubeconfigBytes: ptr.To(string(kubeConfigBytes)),
Namespace: ptr.To(ns),
SshJump: sshConfig.ToRPC(),
@@ -130,16 +134,21 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
if err != nil {
return err
}
_ = util.PrintGRPCStream[rpc.DisconnectResponse](resp)
_ = util.PrintGRPCStream[rpc.DisconnectResponse](ctx, resp)
return nil
})
var resp rpc.Daemon_ProxyClient
resp, err = cli.Proxy(ctx, req)
resp, err = cli.Proxy(context.Background())
if err != nil {
plog.G(ctx).Errorf("Connect to cluster error: %s", err.Error())
return err
}
err = util.PrintGRPCStream[rpc.CloneResponse](resp)
err = resp.Send(req)
if err != nil {
plog.G(ctx).Errorf("Connect to cluster error: %s", err.Error())
return err
}
err = util.PrintGRPCStream[rpc.CloneResponse](ctx, resp)
return err
}

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc-gen-go v1.36.6
// protoc v5.29.3
// source: dhcpserver.proto
@@ -11,6 +11,7 @@ import (
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
@@ -21,21 +22,18 @@ const (
)
type RentIPRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
PodName string `protobuf:"bytes,1,opt,name=PodName,proto3" json:"PodName,omitempty"`
PodNamespace string `protobuf:"bytes,2,opt,name=PodNamespace,proto3" json:"PodNamespace,omitempty"`
unknownFields protoimpl.UnknownFields
PodName string `protobuf:"bytes,1,opt,name=PodName,proto3" json:"PodName,omitempty"`
PodNamespace string `protobuf:"bytes,2,opt,name=PodNamespace,proto3" json:"PodNamespace,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *RentIPRequest) Reset() {
*x = RentIPRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_dhcpserver_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_dhcpserver_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *RentIPRequest) String() string {
@@ -46,7 +44,7 @@ func (*RentIPRequest) ProtoMessage() {}
func (x *RentIPRequest) ProtoReflect() protoreflect.Message {
mi := &file_dhcpserver_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -76,21 +74,18 @@ func (x *RentIPRequest) GetPodNamespace() string {
}
type RentIPResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
IPv4CIDR string `protobuf:"bytes,1,opt,name=IPv4CIDR,proto3" json:"IPv4CIDR,omitempty"`
IPv6CIDR string `protobuf:"bytes,2,opt,name=IPv6CIDR,proto3" json:"IPv6CIDR,omitempty"`
unknownFields protoimpl.UnknownFields
IPv4CIDR string `protobuf:"bytes,1,opt,name=IPv4CIDR,proto3" json:"IPv4CIDR,omitempty"`
IPv6CIDR string `protobuf:"bytes,2,opt,name=IPv6CIDR,proto3" json:"IPv6CIDR,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *RentIPResponse) Reset() {
*x = RentIPResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_dhcpserver_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_dhcpserver_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *RentIPResponse) String() string {
@@ -101,7 +96,7 @@ func (*RentIPResponse) ProtoMessage() {}
func (x *RentIPResponse) ProtoReflect() protoreflect.Message {
mi := &file_dhcpserver_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -131,23 +126,20 @@ func (x *RentIPResponse) GetIPv6CIDR() string {
}
type ReleaseIPRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
PodName string `protobuf:"bytes,1,opt,name=PodName,proto3" json:"PodName,omitempty"`
PodNamespace string `protobuf:"bytes,2,opt,name=PodNamespace,proto3" json:"PodNamespace,omitempty"`
IPv4CIDR string `protobuf:"bytes,3,opt,name=IPv4CIDR,proto3" json:"IPv4CIDR,omitempty"`
IPv6CIDR string `protobuf:"bytes,4,opt,name=IPv6CIDR,proto3" json:"IPv6CIDR,omitempty"`
unknownFields protoimpl.UnknownFields
PodName string `protobuf:"bytes,1,opt,name=PodName,proto3" json:"PodName,omitempty"`
PodNamespace string `protobuf:"bytes,2,opt,name=PodNamespace,proto3" json:"PodNamespace,omitempty"`
IPv4CIDR string `protobuf:"bytes,3,opt,name=IPv4CIDR,proto3" json:"IPv4CIDR,omitempty"`
IPv6CIDR string `protobuf:"bytes,4,opt,name=IPv6CIDR,proto3" json:"IPv6CIDR,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *ReleaseIPRequest) Reset() {
*x = ReleaseIPRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_dhcpserver_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_dhcpserver_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ReleaseIPRequest) String() string {
@@ -158,7 +150,7 @@ func (*ReleaseIPRequest) ProtoMessage() {}
func (x *ReleaseIPRequest) ProtoReflect() protoreflect.Message {
mi := &file_dhcpserver_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -202,20 +194,17 @@ func (x *ReleaseIPRequest) GetIPv6CIDR() string {
}
type ReleaseIPResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *ReleaseIPResponse) Reset() {
*x = ReleaseIPResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_dhcpserver_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_dhcpserver_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ReleaseIPResponse) String() string {
@@ -226,7 +215,7 @@ func (*ReleaseIPResponse) ProtoMessage() {}
func (x *ReleaseIPResponse) ProtoReflect() protoreflect.Message {
mi := &file_dhcpserver_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -250,55 +239,40 @@ func (x *ReleaseIPResponse) GetMessage() string {
var File_dhcpserver_proto protoreflect.FileDescriptor
var file_dhcpserver_proto_rawDesc = []byte{
0x0a, 0x10, 0x64, 0x68, 0x63, 0x70, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x22, 0x4d, 0x0a, 0x0d, 0x52, 0x65, 0x6e, 0x74, 0x49,
0x50, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x6f, 0x64, 0x4e,
0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x50, 0x6f, 0x64, 0x4e, 0x61,
0x6d, 0x65, 0x12, 0x22, 0x0a, 0x0c, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61,
0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d,
0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x22, 0x48, 0x0a, 0x0e, 0x52, 0x65, 0x6e, 0x74, 0x49, 0x50,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x49, 0x50, 0x76, 0x34,
0x43, 0x49, 0x44, 0x52, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x49, 0x50, 0x76, 0x34,
0x43, 0x49, 0x44, 0x52, 0x12, 0x1a, 0x0a, 0x08, 0x49, 0x50, 0x76, 0x36, 0x43, 0x49, 0x44, 0x52,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x49, 0x50, 0x76, 0x36, 0x43, 0x49, 0x44, 0x52,
0x22, 0x88, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x49, 0x50, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12,
0x22, 0x0a, 0x0c, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70,
0x61, 0x63, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x49, 0x50, 0x76, 0x34, 0x43, 0x49, 0x44, 0x52, 0x18,
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x49, 0x50, 0x76, 0x34, 0x43, 0x49, 0x44, 0x52, 0x12,
0x1a, 0x0a, 0x08, 0x49, 0x50, 0x76, 0x36, 0x43, 0x49, 0x44, 0x52, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x49, 0x50, 0x76, 0x36, 0x43, 0x49, 0x44, 0x52, 0x22, 0x2d, 0x0a, 0x11, 0x52,
0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x49, 0x50, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x79, 0x0a, 0x04, 0x44, 0x48,
0x43, 0x50, 0x12, 0x33, 0x0a, 0x06, 0x52, 0x65, 0x6e, 0x74, 0x49, 0x50, 0x12, 0x12, 0x2e, 0x72,
0x70, 0x63, 0x2e, 0x52, 0x65, 0x6e, 0x74, 0x49, 0x50, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x13, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x6e, 0x74, 0x49, 0x50, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x09, 0x52, 0x65, 0x6c, 0x65, 0x61,
0x73, 0x65, 0x49, 0x50, 0x12, 0x15, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61,
0x73, 0x65, 0x49, 0x50, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x72, 0x70,
0x63, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x49, 0x50, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
const file_dhcpserver_proto_rawDesc = "" +
"\n" +
"\x10dhcpserver.proto\x12\x03rpc\"M\n" +
"\rRentIPRequest\x12\x18\n" +
"\aPodName\x18\x01 \x01(\tR\aPodName\x12\"\n" +
"\fPodNamespace\x18\x02 \x01(\tR\fPodNamespace\"H\n" +
"\x0eRentIPResponse\x12\x1a\n" +
"\bIPv4CIDR\x18\x01 \x01(\tR\bIPv4CIDR\x12\x1a\n" +
"\bIPv6CIDR\x18\x02 \x01(\tR\bIPv6CIDR\"\x88\x01\n" +
"\x10ReleaseIPRequest\x12\x18\n" +
"\aPodName\x18\x01 \x01(\tR\aPodName\x12\"\n" +
"\fPodNamespace\x18\x02 \x01(\tR\fPodNamespace\x12\x1a\n" +
"\bIPv4CIDR\x18\x03 \x01(\tR\bIPv4CIDR\x12\x1a\n" +
"\bIPv6CIDR\x18\x04 \x01(\tR\bIPv6CIDR\"-\n" +
"\x11ReleaseIPResponse\x12\x18\n" +
"\amessage\x18\x01 \x01(\tR\amessage2y\n" +
"\x04DHCP\x123\n" +
"\x06RentIP\x12\x12.rpc.RentIPRequest\x1a\x13.rpc.RentIPResponse\"\x00\x12<\n" +
"\tReleaseIP\x12\x15.rpc.ReleaseIPRequest\x1a\x16.rpc.ReleaseIPResponse\"\x00B\aZ\x05.;rpcb\x06proto3"
var (
file_dhcpserver_proto_rawDescOnce sync.Once
file_dhcpserver_proto_rawDescData = file_dhcpserver_proto_rawDesc
file_dhcpserver_proto_rawDescData []byte
)
func file_dhcpserver_proto_rawDescGZIP() []byte {
file_dhcpserver_proto_rawDescOnce.Do(func() {
file_dhcpserver_proto_rawDescData = protoimpl.X.CompressGZIP(file_dhcpserver_proto_rawDescData)
file_dhcpserver_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_dhcpserver_proto_rawDesc), len(file_dhcpserver_proto_rawDesc)))
})
return file_dhcpserver_proto_rawDescData
}
var file_dhcpserver_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_dhcpserver_proto_goTypes = []interface{}{
var file_dhcpserver_proto_goTypes = []any{
(*RentIPRequest)(nil), // 0: rpc.RentIPRequest
(*RentIPResponse)(nil), // 1: rpc.RentIPResponse
(*ReleaseIPRequest)(nil), // 2: rpc.ReleaseIPRequest
@@ -321,61 +295,11 @@ func file_dhcpserver_proto_init() {
if File_dhcpserver_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_dhcpserver_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RentIPRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_dhcpserver_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RentIPResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_dhcpserver_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReleaseIPRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_dhcpserver_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReleaseIPResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_dhcpserver_proto_rawDesc,
RawDescriptor: unsafe.Slice(unsafe.StringData(file_dhcpserver_proto_rawDesc), len(file_dhcpserver_proto_rawDesc)),
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
@@ -386,7 +310,6 @@ func file_dhcpserver_proto_init() {
MessageInfos: file_dhcpserver_proto_msgTypes,
}.Build()
File_dhcpserver_proto = out.File
file_dhcpserver_proto_rawDesc = nil
file_dhcpserver_proto_goTypes = nil
file_dhcpserver_proto_depIdxs = nil
}

View File

@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc-gen-go-grpc v1.5.1
// - protoc v5.29.3
// source: dhcpserver.proto
@@ -15,8 +15,8 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
DHCP_RentIP_FullMethodName = "/rpc.DHCP/RentIP"
@@ -40,8 +40,9 @@ func NewDHCPClient(cc grpc.ClientConnInterface) DHCPClient {
}
func (c *dHCPClient) RentIP(ctx context.Context, in *RentIPRequest, opts ...grpc.CallOption) (*RentIPResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(RentIPResponse)
err := c.cc.Invoke(ctx, DHCP_RentIP_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, DHCP_RentIP_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -49,8 +50,9 @@ func (c *dHCPClient) RentIP(ctx context.Context, in *RentIPRequest, opts ...grpc
}
func (c *dHCPClient) ReleaseIP(ctx context.Context, in *ReleaseIPRequest, opts ...grpc.CallOption) (*ReleaseIPResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ReleaseIPResponse)
err := c.cc.Invoke(ctx, DHCP_ReleaseIP_FullMethodName, in, out, opts...)
err := c.cc.Invoke(ctx, DHCP_ReleaseIP_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
@@ -59,16 +61,19 @@ func (c *dHCPClient) ReleaseIP(ctx context.Context, in *ReleaseIPRequest, opts .
// DHCPServer is the server API for DHCP service.
// All implementations must embed UnimplementedDHCPServer
// for forward compatibility
// for forward compatibility.
type DHCPServer interface {
RentIP(context.Context, *RentIPRequest) (*RentIPResponse, error)
ReleaseIP(context.Context, *ReleaseIPRequest) (*ReleaseIPResponse, error)
mustEmbedUnimplementedDHCPServer()
}
// UnimplementedDHCPServer must be embedded to have forward compatible implementations.
type UnimplementedDHCPServer struct {
}
// UnimplementedDHCPServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedDHCPServer struct{}
func (UnimplementedDHCPServer) RentIP(context.Context, *RentIPRequest) (*RentIPResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RentIP not implemented")
@@ -77,6 +82,7 @@ func (UnimplementedDHCPServer) ReleaseIP(context.Context, *ReleaseIPRequest) (*R
return nil, status.Errorf(codes.Unimplemented, "method ReleaseIP not implemented")
}
func (UnimplementedDHCPServer) mustEmbedUnimplementedDHCPServer() {}
func (UnimplementedDHCPServer) testEmbeddedByValue() {}
// UnsafeDHCPServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to DHCPServer will
@@ -86,6 +92,13 @@ type UnsafeDHCPServer interface {
}
func RegisterDHCPServer(s grpc.ServiceRegistrar, srv DHCPServer) {
// If the following call pancis, it indicates UnimplementedDHCPServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&DHCP_ServiceDesc, srv)
}

View File

@@ -11,7 +11,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/containernetworking/cni/pkg/types"
@@ -116,7 +115,7 @@ func (c *ConnectOptions) RentIP(ctx context.Context) (context.Context, error) {
return nil, err
}
ctx1 := metadata.AppendToOutgoingContext(
ctx,
context.Background(),
config.HeaderIPv4, c.localTunIPv4.String(),
config.HeaderIPv6, c.localTunIPv6.String(),
)
@@ -138,7 +137,7 @@ func (c *ConnectOptions) GetIPFromContext(ctx context.Context, logger *log.Logge
return fmt.Errorf("cat not convert IPv4 string: %s: %v", ipv4[0], err)
}
c.localTunIPv4 = &net.IPNet{IP: ip, Mask: ipNet.Mask}
plog.G(ctx).Debugf("Get IPv4 %s from context", c.localTunIPv4.String())
logger.Debugf("Get IPv4 %s from context", c.localTunIPv4.String())
ipv6 := md.Get(config.HeaderIPv6)
if len(ipv6) == 0 {
@@ -149,7 +148,7 @@ func (c *ConnectOptions) GetIPFromContext(ctx context.Context, logger *log.Logge
return fmt.Errorf("cat not convert IPv6 string: %s: %v", ipv6[0], err)
}
c.localTunIPv6 = &net.IPNet{IP: ip, Mask: ipNet.Mask}
plog.G(ctx).Debugf("Get IPv6 %s from context", c.localTunIPv6.String())
logger.Debugf("Get IPv6 %s from context", c.localTunIPv6.String())
return nil
}
@@ -208,17 +207,8 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context, namespace s
return
}
func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <-chan struct{}) (err error) {
func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error) {
c.ctx, c.cancel = context.WithCancel(ctx)
var success atomic.Bool
go func() {
// if stop chan done before current function finished, means client ctrl+c to cancel operation
<-stopChan
if !success.Load() {
c.cancel()
}
}()
plog.G(ctx).Info("Starting connect to cluster")
m := dhcp.NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace)
if err = m.InitDHCP(c.ctx); err != nil {
@@ -293,7 +283,6 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool, stopChan <-
plog.G(ctx).Errorf("Configure DNS failed: %v", err)
return
}
success.Store(true)
return
}

View File

@@ -521,7 +521,7 @@ func checkConnectStatus(t *testing.T) {
ID: 0,
Mode: "full",
Namespace: namespace,
Status: "Connected",
Status: "connected",
ProxyList: nil,
}}
@@ -545,7 +545,7 @@ func centerCheckConnectStatus(t *testing.T) {
ID: 0,
Mode: "full",
Namespace: "default",
Status: "Connected",
Status: "connected",
ProxyList: nil,
}}
@@ -588,7 +588,7 @@ func checkProxyStatus(t *testing.T) {
ID: 0,
Mode: "full",
Namespace: namespace,
Status: "Connected",
Status: "connected",
ProxyList: []*proxy{{
Namespace: namespace,
Workload: "deployments.apps/reviews",
@@ -621,7 +621,7 @@ func centerCheckProxyStatus(t *testing.T) {
ID: 0,
Mode: "full",
Namespace: "default",
Status: "Connected",
Status: "connected",
ProxyList: []*proxy{{
Namespace: "default",
Workload: "deployments.apps/reviews",
@@ -654,7 +654,7 @@ func checkProxyWithServiceMeshStatus(t *testing.T) {
ID: 0,
Mode: "full",
Namespace: namespace,
Status: "Connected",
Status: "connected",
ProxyList: []*proxy{{
Namespace: namespace,
Workload: "deployments.apps/reviews",
@@ -687,7 +687,7 @@ func centerCheckProxyWithServiceMeshStatus(t *testing.T) {
ID: 0,
Mode: "full",
Namespace: "default",
Status: "Connected",
Status: "connected",
ProxyList: []*proxy{{
Namespace: "default",
Workload: "deployments.apps/reviews",
@@ -720,7 +720,7 @@ func checkProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
ID: 0,
Mode: "full",
Namespace: namespace,
Status: "Connected",
Status: "connected",
ProxyList: []*proxy{{
Namespace: namespace,
Workload: "services/reviews",
@@ -760,7 +760,7 @@ func centerCheckProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
ID: 0,
Mode: "full",
Namespace: "default",
Status: "Connected",
Status: "connected",
ProxyList: []*proxy{{
Namespace: "default",
Workload: "services/reviews",

View File

@@ -66,6 +66,22 @@ func (l *ProxyList) IsMe(ns, uid string, headers map[string]string) bool {
return false
}
type Resources struct {
Namespace string
Workload string
}
func (l ProxyList) ToResources() []Resources {
var resources []Resources
for _, proxy := range l {
resources = append(resources, Resources{
Namespace: proxy.namespace,
Workload: proxy.workload,
})
}
return resources
}
func NewMapper(clientset *kubernetes.Clientset, ns string, labels string, headers map[string]string, workload string) *Mapper {
ctx, cancelFunc := context.WithCancel(context.Background())
return &Mapper{

View File

@@ -9,6 +9,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"
@@ -25,13 +26,6 @@ import (
// 3) cleanup all containers
// 4) cleanup hosts
func (c *ConnectOptions) Uninstall(ctx context.Context) error {
err := c.LeaveAllProxyResources(ctx)
if err != nil {
plog.G(ctx).Errorf("Leave proxy resources error: %v", err)
} else {
plog.G(ctx).Debugf("Leave proxy resources successfully")
}
plog.G(ctx).Infof("Cleaning up resources")
ns := c.Namespace
name := config.ConfigMapPodTrafficManager
@@ -89,29 +83,37 @@ func (c *ConnectOptions) LeaveAllProxyResources(ctx context.Context) (err error)
return
}
v4, _ := c.GetLocalTunIP()
for _, workload := range c.ProxyResources() {
return c.LeaveResource(ctx, c.ProxyResources().ToResources(), v4)
}
func (c *ConnectOptions) LeaveResource(ctx context.Context, resources []Resources, v4 string) error {
var errs []error
for _, workload := range resources {
// deployments.apps.ry-server --> deployments.apps/ry-server
object, controller, err := util.GetTopOwnerObject(ctx, c.factory, workload.namespace, workload.workload)
object, controller, err := util.GetTopOwnerObject(ctx, c.factory, workload.Namespace, workload.Workload)
if err != nil {
plog.G(ctx).Errorf("Failed to get unstructured object: %v", err)
return err
errs = append(errs, err)
continue
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
var empty bool
empty, err = inject.UnPatchContainer(ctx, nodeID, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), controller, func(isFargateMode bool, rule *controlplane.Rule) bool {
if isFargateMode {
return c.IsMe(workload.namespace, util.ConvertWorkloadToUid(workload.workload), rule.Headers)
return c.IsMe(workload.Namespace, util.ConvertWorkloadToUid(workload.Workload), rule.Headers)
}
return rule.LocalTunIPv4 == v4
})
if err != nil {
plog.G(ctx).Errorf("Failed to leave workload %s in namespace %s: %v", workload.workload, workload.namespace, err)
plog.G(ctx).Errorf("Failed to leave workload %s in namespace %s: %v", workload.Workload, workload.Namespace, err)
errs = append(errs, err)
continue
}
if empty && util.IsK8sService(object) {
err = inject.ModifyServiceTargetPort(ctx, c.clientset, workload.namespace, object.Name, map[int32]int32{})
err = inject.ModifyServiceTargetPort(ctx, c.clientset, workload.Namespace, object.Name, map[int32]int32{})
errs = append(errs, err)
}
c.LeavePortMap(workload.namespace, workload.workload)
c.LeavePortMap(workload.Namespace, workload.Workload)
}
return err
return errors.NewAggregate(errs)
}

View File

@@ -368,9 +368,7 @@ func AddSshFlags(flags *pflag.FlagSet, sshConf *SshConfig) {
flags.StringVar(&sshConf.GSSAPIPassword, "gssapi-password", "", "GSSAPI password")
flags.StringVar(&sshConf.GSSAPIKeytabConf, "gssapi-keytab", "", "GSSAPI keytab file path")
flags.StringVar(&sshConf.GSSAPICacheFile, "gssapi-cache", "", "GSSAPI cache file path, use command `kinit -c /path/to/cache USERNAME@RELAM` to generate")
flags.StringVar(&sshConf.RemoteKubeconfig, "remote-kubeconfig", "", "Remote kubeconfig abstract path of ssh server, default is /home/$USERNAME/.kube/config")
lookup := flags.Lookup("remote-kubeconfig")
lookup.NoOptDefVal = "~/.kube/config"
flags.StringVar(&sshConf.RemoteKubeconfig, "remote-kubeconfig", "", "Abstract path of kubeconfig on ssh remote server")
}
func keepAlive(cl *ssh.Client, conn net.Conn, done <-chan struct{}) error {

View File

@@ -17,16 +17,12 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/spf13/pflag"
gossh "golang.org/x/crypto/ssh"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/clientcmd/api/latest"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
@@ -143,40 +139,14 @@ func PortMapUntil(ctx context.Context, conf *SshConfig, remote, local netip.Addr
return nil
}
func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print bool) (path string, err error) {
if conf.Addr == "" && conf.ConfigAlias == "" {
if flags != nil {
lookup := flags.Lookup("kubeconfig")
if lookup != nil {
if lookup.Value != nil && lookup.Value.String() != "" {
path = lookup.Value.String()
} else if lookup.DefValue != "" {
path = lookup.DefValue
} else {
path = lookup.NoOptDefVal
}
}
}
return
}
defer func() {
if er := recover(); er != nil {
err = er.(error)
}
}()
configFlags := genericclioptions.NewConfigFlags(true)
if conf.RemoteKubeconfig != "" || (flags != nil && flags.Changed("remote-kubeconfig")) {
func SshJump(ctx context.Context, conf *SshConfig, kubeconfig string, print bool) (path string, err error) {
var kubeconfigBytes []byte
if len(conf.RemoteKubeconfig) != 0 {
var stdout []byte
var stderr []byte
if len(conf.RemoteKubeconfig) != 0 && conf.RemoteKubeconfig[0] == '~' {
if conf.RemoteKubeconfig[0] == '~' {
conf.RemoteKubeconfig = filepath.Join("/home", conf.User, conf.RemoteKubeconfig[1:])
}
if conf.RemoteKubeconfig == "" {
// if `--remote-kubeconfig` is parsed then Entrypoint is reset
conf.RemoteKubeconfig = filepath.Join("/home", conf.User, clientcmd.RecommendedHomeDir, clientcmd.RecommendedFileName)
}
// pre-check network ip connect
var cli *gossh.Client
cli, err = DialSshRemote(ctx, conf, ctx.Done())
@@ -198,28 +168,20 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b
err = errors.Errorf("can not get kubeconfig %s from remote ssh server: %s", conf.RemoteKubeconfig, string(stderr))
return
}
var file string
file, err = pkgutil.ConvertToTempKubeconfigFile(bytes.TrimSpace(stdout))
kubeconfigBytes = bytes.TrimSpace(stdout)
} else {
kubeconfigBytes, err = os.ReadFile(kubeconfig)
if err != nil {
return
}
configFlags.KubeConfig = pointer.String(file)
} else {
if flags != nil {
lookup := flags.Lookup("kubeconfig")
if lookup != nil {
if lookup.Value != nil && lookup.Value.String() != "" {
configFlags.KubeConfig = pointer.String(lookup.Value.String())
} else if lookup.DefValue != "" {
configFlags.KubeConfig = pointer.String(lookup.DefValue)
}
}
}
}
matchVersionFlags := util.NewMatchVersionFlags(configFlags)
var clientConfig clientcmd.ClientConfig
clientConfig, err = clientcmd.NewClientConfigFromBytes(kubeconfigBytes)
if err != nil {
return
}
var rawConfig api.Config
rawConfig, err = matchVersionFlags.ToRawKubeConfigLoader().RawConfig()
rawConfig, err = clientConfig.RawConfig()
if err != nil {
plog.G(ctx).WithError(err).Errorf("failed to build config: %v", err)
return
@@ -332,6 +294,11 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b
plog.G(ctx).Errorf("failed to write kubeconfig: %v", err)
return
}
go func() {
<-ctx.Done()
_ = os.Remove(path)
_ = os.Remove(kubeconfig)
}()
if print {
plog.G(ctx).Infof("Use temp kubeconfig: %s", path)
} else {
@@ -340,11 +307,11 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b
return
}
func SshJumpAndSetEnv(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print bool) error {
if conf.Addr == "" && conf.ConfigAlias == "" {
func SshJumpAndSetEnv(ctx context.Context, sshConf *SshConfig, file string, print bool) error {
if sshConf.IsEmpty() {
return nil
}
path, err := SshJump(ctx, conf, flags, print)
path, err := SshJump(ctx, sshConf, file, print)
if err != nil {
return err
}

View File

@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
@@ -18,13 +19,20 @@ type Printable interface {
GetMessage() string
}
func PrintGRPCStream[T any](clientStream grpc.ClientStream, writers ...io.Writer) error {
func PrintGRPCStream[T any](ctx context.Context, clientStream grpc.ClientStream, writers ...io.Writer) error {
var out io.Writer = os.Stdout
for _, writer := range writers {
out = writer
break
}
go func() {
if ctx != nil {
<-ctx.Done()
_ = clientStream.SendMsg(&rpc.Cancel{})
}
}()
for {
var t = new(T)
err := clientStream.RecvMsg(t)
@@ -88,3 +96,10 @@ func HandleCrash() {
panic(r)
}
}
func ListenCancel(resp grpc.ServerStream, cancelFunc context.CancelFunc) {
var s rpc.Cancel
if resp.RecvMsg(&s) == nil {
cancelFunc()
}
}