refactor: return error if get nil daemon client (#553)

This commit is contained in:
naison
2025-04-20 15:49:03 +08:00
committed by GitHub
parent 537b2940fe
commit dd80717d8d
24 changed files with 139 additions and 79 deletions

View File

@@ -143,7 +143,10 @@ func CmdClone(f cmdutil.Factory) *cobra.Command {
LocalDir: options.LocalDir,
RemoteDir: options.RemoteDir,
}
cli := daemon.GetClient(false)
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
resp, err := cli.Clone(cmd.Context(), req)
if err != nil {
return err

View File

@@ -55,7 +55,10 @@ func cmdConfigAdd(f cmdutil.Factory) *cobra.Command {
Namespace: ns,
SshJump: sshConf.ToRPC(),
}
cli := daemon.GetClient(false)
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
resp, err := cli.ConfigAdd(cmd.Context(), req)
if err != nil {
return err
@@ -88,8 +91,11 @@ func cmdConfigRemove(f cmdutil.Factory) *cobra.Command {
req := &rpc.ConfigRemoveRequest{
ClusterID: args[0],
}
cli := daemon.GetClient(false)
_, err := cli.ConfigRemove(cmd.Context(), req)
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
_, err = cli.ConfigRemove(cmd.Context(), req)
if err != nil {
return err
}

View File

@@ -100,7 +100,10 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command {
Level: int32(util.If(config.Debug, log.DebugLevel, log.InfoLevel)),
}
// if is foreground, send to sudo daemon server
cli := daemon.GetClient(false)
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
var resp grpc.ClientStream
if lite {
resp, err = cli.ConnectFork(cmd.Context(), req)

View File

@@ -61,7 +61,11 @@ func CmdDisconnect(f cmdutil.Factory) *cobra.Command {
}
ids = pointer.Int32(int32(integer))
}
client, err := daemon.GetClient(false).Disconnect(
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
client, err := cli.Disconnect(
cmd.Context(),
&rpc.DisconnectRequest{
ID: ids,

View File

@@ -49,7 +49,11 @@ func CmdGet(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
client, err := daemon.GetClient(true).Get(
cli, err := daemon.GetClient(true)
if err != nil {
return err
}
client, err := cli.Get(
cmd.Context(),
&rpc.GetRequest{
Namespace: ns,

View File

@@ -38,7 +38,11 @@ func CmdLeave(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
resp, err := daemon.GetClient(false).Leave(cmd.Context(), &rpc.LeaveRequest{
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
resp, err := cli.Leave(cmd.Context(), &rpc.LeaveRequest{
Namespace: ns,
Workloads: args,
})

View File

@@ -25,7 +25,11 @@ func CmdList(f cmdutil.Factory) *cobra.Command {
return daemon.StartupDaemon(cmd.Context())
},
RunE: func(cmd *cobra.Command, args []string) error {
client, err := daemon.GetClient(true).List(
cli, err := daemon.GetClient(true)
if err != nil {
return err
}
client, err := cli.List(
cmd.Context(),
&rpc.ListRequest{},
)

View File

@@ -34,7 +34,11 @@ func CmdLogs(f cmdutil.Factory) *cobra.Command {
return daemon.StartupDaemon(cmd.Context())
},
RunE: func(cmd *cobra.Command, args []string) error {
client, err := daemon.GetClient(true).Logs(cmd.Context(), req)
cli, err := daemon.GetClient(true)
if err != nil {
return err
}
client, err := cli.Logs(cmd.Context(), req)
if err != nil {
return err
}

View File

@@ -122,8 +122,11 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
}
}
// todo 将 doConnect 方法封装?内部使用 client 发送到daemon
cli := daemon.GetClient(false)
client, err := cli.Proxy(
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
resp, err := cli.Proxy(
cmd.Context(),
&rpc.ProxyRequest{
KubeconfigBytes: string(bytes),
@@ -145,7 +148,7 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.ConnectResponse](client)
err = util.PrintGRPCStream[rpc.ConnectResponse](resp)
if err != nil {
if status.Code(err) == codes.Canceled {
err = leave(cli, ns, args)

View File

@@ -40,9 +40,9 @@ func CmdQuit(f cmdutil.Factory) *cobra.Command {
}
func quit(ctx context.Context, isSudo bool) error {
cli := daemon.GetClient(isSudo)
if cli == nil {
return nil
cli, err := daemon.GetClient(isSudo)
if err != nil {
return err
}
client, err := cli.Quit(ctx, &rpc.QuitRequest{})
if err != nil {

View File

@@ -32,13 +32,17 @@ func CmdRemove(f cmdutil.Factory) *cobra.Command {
return daemon.StartupDaemon(cmd.Context())
},
RunE: func(cmd *cobra.Command, args []string) error {
leave, err := daemon.GetClient(false).Remove(cmd.Context(), &rpc.RemoveRequest{
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
resp, err := cli.Remove(cmd.Context(), &rpc.RemoveRequest{
Workloads: args,
})
if err != nil {
return err
}
err = util.PrintGRPCStream[rpc.RemoveResponse](leave)
err = util.PrintGRPCStream[rpc.RemoveResponse](resp)
if err != nil {
if status.Code(err) == codes.Canceled {
return nil

View File

@@ -56,7 +56,10 @@ func CmdReset(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
cli := daemon.GetClient(false)
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
req := &rpc.ResetRequest{
KubeconfigBytes: string(bytes),
Namespace: ns,

View File

@@ -31,7 +31,11 @@ func CmdSSHDaemon(cmdutil.Factory) *cobra.Command {
return err
},
RunE: func(cmd *cobra.Command, args []string) error {
client, err := daemon.GetClient(true).SshStart(
cli, err := daemon.GetClient(true)
if err != nil {
return err
}
resp, err := cli.SshStart(
cmd.Context(),
&rpc.SshStartRequest{
ClientIP: clientIP,
@@ -40,7 +44,7 @@ func CmdSSHDaemon(cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
_, err = fmt.Fprint(os.Stdout, client.ServerIP)
_, err = fmt.Fprint(os.Stdout, resp.ServerIP)
return err
},
}

View File

@@ -80,7 +80,11 @@ func CmdStatus(f cmdutil.Factory) *cobra.Command {
}
}
resp, err := daemon.GetClient(false).Status(
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
resp, err := cli.Status(
cmd.Context(),
&rpc.StatusRequest{
ClusterIDs: clusterIDs,

View File

@@ -58,8 +58,11 @@ func CmdUninstall(f cmdutil.Factory) *cobra.Command {
if err != nil {
return err
}
cli := daemon.GetClient(false)
disconnect, err := cli.Disconnect(cmd.Context(), &rpc.DisconnectRequest{
cli, err := daemon.GetClient(false)
if err != nil {
return err
}
disconnectResp, err := cli.Disconnect(cmd.Context(), &rpc.DisconnectRequest{
KubeconfigBytes: ptr.To(string(bytes)),
Namespace: ptr.To(ns),
SshJump: sshConf.ToRPC(),
@@ -67,7 +70,7 @@ func CmdUninstall(f cmdutil.Factory) *cobra.Command {
if err != nil {
plog.G(cmd.Context()).Warnf("Failed to disconnect from cluter: %v", err)
} else {
_ = util.PrintGRPCStream[rpc.DisconnectResponse](disconnect)
_ = util.PrintGRPCStream[rpc.DisconnectResponse](disconnectResp)
}
req := &rpc.UninstallRequest{

View File

@@ -64,12 +64,13 @@ func init() {
}
func getDaemonVersion() string {
cli := daemon.GetClient(false)
if cli != nil {
version, err := cli.Version(context.Background(), &rpc.VersionRequest{})
if err == nil {
return version.Version
}
cli, err := daemon.GetClient(false)
if err != nil {
return "unknown"
}
return "unknown"
version, err := cli.Version(context.Background(), &rpc.VersionRequest{})
if err != nil {
return "unknown"
}
return version.Version
}

View File

@@ -30,7 +30,10 @@ func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) (er
Level: req.Level,
OriginKubeconfigPath: req.OriginKubeconfigPath,
}
cli := svr.GetClient(false)
cli, err := svr.GetClient(false)
if err != nil {
return err
}
connResp, err := cli.Connect(resp.Context(), connReq)
if err != nil {
return err

View File

@@ -2,9 +2,9 @@ package action
import (
"context"
"fmt"
"io"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
@@ -84,9 +84,9 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
}
func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServer, logger *log.Logger) (err error) {
cli := svr.GetClient(true)
if cli == nil {
return fmt.Errorf("sudo daemon not start")
cli, err := svr.GetClient(true)
if err != nil {
return errors.Wrap(err, "sudo daemon not start")
}
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))

View File

@@ -2,10 +2,10 @@ package action
import (
"context"
"fmt"
"io"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"google.golang.org/grpc/codes"
@@ -99,9 +99,9 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
}
func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServer, logger *log.Logger) (e error) {
cli := svr.GetClient(true)
if cli == nil {
return fmt.Errorf("sudo daemon not start")
cli, err := svr.GetClient(true)
if err != nil {
return errors.Wrap(err, "sudo daemon not start")
}
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))

View File

@@ -2,10 +2,10 @@ package action
import (
"context"
"fmt"
"io"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/util/sets"
@@ -104,9 +104,9 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon
}
if !svr.IsSudo {
cli := svr.GetClient(true)
if cli == nil {
return fmt.Errorf("sudo daemon not start")
cli, err := svr.GetClient(true)
if err != nil {
return errors.Wrap(err, "sudo daemon not start")
}
connResp, err := cli.Disconnect(resp.Context(), req)
if err != nil {
@@ -149,8 +149,8 @@ func disconnectByKubeConfig(ctx context.Context, svr *Server, kubeconfigBytes st
}
func disconnect(ctx context.Context, svr *Server, connect *handler.ConnectOptions) {
client := svr.GetClient(false)
if client == nil {
_, err := svr.GetClient(false)
if err != nil {
return
}
if svr.connect != nil {

View File

@@ -2,9 +2,9 @@ package action
import (
"context"
"fmt"
"io"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"k8s.io/utils/ptr"
@@ -67,9 +67,9 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e
}
}()
daemonClient := svr.GetClient(false)
if daemonClient == nil {
return fmt.Errorf("daemon is not avaliable")
cli, err := svr.GetClient(false)
if err != nil {
return errors.Wrap(err, "daemon is not available")
}
connectNs, err := util.DetectConnectNamespace(ctx, connect.GetFactory(), req.ConnectNamespace)
@@ -92,7 +92,7 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e
} else {
plog.G(ctx).Infof("Disconnecting from another cluster...")
var disconnectResp rpc.Daemon_DisconnectClient
disconnectResp, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{
disconnectResp, err = cli.Disconnect(ctx, &rpc.DisconnectRequest{
ID: ptr.To[int32](0),
})
if err != nil {
@@ -114,7 +114,7 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e
if svr.connect == nil {
plog.G(ctx).Debugf("Connectting to cluster")
var connResp rpc.Daemon_ConnectClient
connResp, err = daemonClient.Connect(ctx, convert(req))
connResp, err = cli.Connect(ctx, convert(req))
if err != nil {
return err
}

View File

@@ -18,7 +18,7 @@ type Server struct {
rpc.UnimplementedDaemonServer
Cancel func()
GetClient func(isSudo bool) rpc.DaemonClient
GetClient func(isSudo bool) (rpc.DaemonClient, error)
IsSudo bool
LogFile *lumberjack.Logger
Lock sync.Mutex

View File

@@ -9,6 +9,7 @@ import (
"os"
"time"
pkgerr "github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health/grpc_health_v1"
@@ -23,22 +24,22 @@ import (
var daemonClient, sudoDaemonClient rpc.DaemonClient
func GetClient(isSudo bool) (cli rpc.DaemonClient) {
func GetClient(isSudo bool) (cli rpc.DaemonClient, err error) {
sockPath := config.GetSockPath(isSudo)
if _, err := os.Stat(sockPath); errors.Is(err, os.ErrNotExist) {
return nil
if _, err = os.Stat(sockPath); errors.Is(err, os.ErrNotExist) {
return nil, err
}
if isSudo && sudoDaemonClient != nil {
return sudoDaemonClient
return sudoDaemonClient, nil
}
if !isSudo && daemonClient != nil {
return daemonClient
return daemonClient, nil
}
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "unix:"+sockPath, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient("unix:"+sockPath, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil
return nil, err
}
defer func() {
if cli == nil {
@@ -50,10 +51,10 @@ func GetClient(isSudo bool) (cli rpc.DaemonClient) {
var response *grpc_health_v1.HealthCheckResponse
response, err = healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
if err != nil {
return nil
return nil, err
}
if response.Status != grpc_health_v1.HealthCheckResponse_SERVING {
return nil
return nil, fmt.Errorf("health check failed: %v", response.Status)
}
cli = rpc.NewDaemonClient(conn)
@@ -66,7 +67,7 @@ func GetClient(isSudo bool) (cli rpc.DaemonClient) {
var quitStream rpc.Daemon_QuitClient
quitStream, err = cli.Quit(ctx, &rpc.QuitRequest{})
if err != nil {
return nil
return nil, err
}
err = util.PrintGRPCStream[rpc.QuitResponse](quitStream, nil)
return
@@ -77,7 +78,7 @@ func GetClient(isSudo bool) (cli rpc.DaemonClient) {
} else {
daemonClient = cli
}
return cli
return cli, nil
}
func GetClientWithoutCache(ctx context.Context, isSudo bool) (cli rpc.DaemonClient, conn *grpc.ClientConn, err error) {
@@ -86,7 +87,7 @@ func GetClientWithoutCache(ctx context.Context, isSudo bool) (cli rpc.DaemonClie
if errors.Is(err, os.ErrNotExist) {
return
}
conn, err = grpc.DialContext(ctx, "unix:"+sockPath, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err = grpc.NewClient("unix:"+sockPath, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
@@ -102,6 +103,7 @@ func GetClientWithoutCache(ctx context.Context, isSudo bool) (cli rpc.DaemonClie
return
}
if response.Status != grpc_health_v1.HealthCheckResponse_SERVING {
err = fmt.Errorf("health check failed: %v", response.Status)
return
}
cli = rpc.NewDaemonClient(conn)
@@ -120,14 +122,14 @@ func StartupDaemon(ctx context.Context, path ...string) error {
return err
}
// normal daemon
if daemonClient = GetClient(false); daemonClient == nil {
if daemonClient, err = GetClient(false); daemonClient == nil {
if err = runDaemon(ctx, exe, false); err != nil {
return err
}
}
// sudo daemon
if sudoDaemonClient = GetClient(true); sudoDaemonClient == nil {
if sudoDaemonClient, err = GetClient(true); sudoDaemonClient == nil {
if err = runDaemon(ctx, exe, true); err != nil {
return err
}
@@ -136,13 +138,13 @@ func StartupDaemon(ctx context.Context, path ...string) error {
}
func runDaemon(ctx context.Context, exe string, isSudo bool) error {
cli := GetClient(isSudo)
if cli != nil {
_, err := GetClient(isSudo)
if err == nil {
return nil
}
pidPath := config.GetPidPath(isSudo)
err := os.Remove(pidPath)
err = os.Remove(pidPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
@@ -166,9 +168,9 @@ func runDaemon(ctx context.Context, exe string, isSudo bool) error {
}
}
client := GetClient(isSudo)
if client == nil {
return fmt.Errorf("failed to get daemon server client")
_, err = GetClient(isSudo)
if err != nil {
return pkgerr.Wrap(err, "failed to get daemon server client")
}
return nil
}

View File

@@ -12,6 +12,7 @@ import (
typescontainer "github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/google/uuid"
pkgerr "github.com/pkg/errors"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -91,9 +92,9 @@ func (option *Options) Main(ctx context.Context, sshConfig *pkgssh.SshConfig, co
// Connect to cluster network on docker container or host
func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig, imagePullSecretName string, portBindings nat.PortMap, connectNamespace string) error {
if option.ConnectMode == ConnectModeHost {
daemonCli := daemon.GetClient(false)
if daemonCli == nil {
return fmt.Errorf("get nil daemon client")
cli, err := daemon.GetClient(false)
if err != nil {
return pkgerr.Wrap(err, "get nil daemon client")
}
kubeConfigBytes, ns, err := util.ConvertToKubeConfigBytes(option.factory)
if err != nil {
@@ -121,7 +122,7 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
ConnectNamespace: connectNamespace,
}
option.AddRollbackFunc(func() error {
resp, err := daemonCli.Disconnect(ctx, &rpc.DisconnectRequest{
resp, err := cli.Disconnect(ctx, &rpc.DisconnectRequest{
KubeconfigBytes: ptr.To(string(kubeConfigBytes)),
Namespace: ptr.To(ns),
SshJump: sshConfig.ToRPC(),
@@ -133,7 +134,7 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
return nil
})
var resp rpc.Daemon_ProxyClient
resp, err = daemonCli.Proxy(ctx, req)
resp, err = cli.Proxy(ctx, req)
if err != nil {
plog.G(ctx).Errorf("Connect to cluster error: %s", err.Error())
return err