diff --git a/cmd/kubevpn/cmds/connect.go b/cmd/kubevpn/cmds/connect.go index be48a2e3..2511c4aa 100644 --- a/cmd/kubevpn/cmds/connect.go +++ b/cmd/kubevpn/cmds/connect.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "os" "time" log "github.com/sirupsen/logrus" @@ -68,18 +69,18 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { Level: int32(log.DebugLevel), } cli := daemon.GetClient(false) - stream, err := cli.Connect(cmd.Context(), req) + resp, err := cli.Connect(cmd.Context(), req) if err != nil { return err } for { - resp, err := stream.Recv() + recv, err := resp.Recv() if err == io.EOF { break } else if err != nil { return err } - log.Print(resp.GetMessage()) + log.Print(recv.GetMessage()) } // hangup if foreground { @@ -97,11 +98,10 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { resp, err = stream.Recv() if err == io.EOF { return nil - } else if err == nil { - fmt.Print(resp.Message) - } else { + } else if err != nil { return err } + fmt.Fprint(os.Stdout, resp.Message) } } return nil diff --git a/cmd/kubevpn/cmds/daemon.go b/cmd/kubevpn/cmds/daemon.go index 22a4719b..a8e61f89 100644 --- a/cmd/kubevpn/cmds/daemon.go +++ b/cmd/kubevpn/cmds/daemon.go @@ -2,11 +2,12 @@ package cmds import ( "errors" + "os" + "strconv" + "github.com/spf13/cobra" cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/i18n" - "os" - "strconv" "github.com/wencaiwulue/kubevpn/pkg/daemon" ) diff --git a/cmd/kubevpn/cmds/dev.go b/cmd/kubevpn/cmds/dev.go index a77c141d..5e4a8939 100644 --- a/cmd/kubevpn/cmds/dev.go +++ b/cmd/kubevpn/cmds/dev.go @@ -72,14 +72,15 @@ Startup your kubernetes workloads in local Docker container with same volume态e Args: dockercli.RequiresMinArgs(1), DisableFlagsInUseLine: true, PreRunE: func(cmd *cobra.Command, args []string) error { - err = daemon.StartupDaemon(cmd.Context()) - if err != nil { - return err - } // not support temporally if devOptions.Engine == config.EngineGvisor { return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw) } + + err = daemon.StartupDaemon(cmd.Context()) + if err != nil { + return err + } return handler.SshJump(cmd.Context(), sshConf, cmd.Flags()) }, RunE: func(cmd *cobra.Command, args []string) error { diff --git a/cmd/kubevpn/cmds/disconnect.go b/cmd/kubevpn/cmds/disconnect.go index f44da4dd..bf6e1a80 100644 --- a/cmd/kubevpn/cmds/disconnect.go +++ b/cmd/kubevpn/cmds/disconnect.go @@ -3,6 +3,7 @@ package cmds import ( "fmt" "io" + "os" "time" "github.com/spf13/cobra" @@ -42,7 +43,7 @@ func CmdDisconnect(f cmdutil.Factory) *cobra.Command { if err == io.EOF { return nil } else if err == nil { - fmt.Print(resp.Message) + fmt.Fprint(os.Stdout, resp.Message) } else { return err } diff --git a/cmd/kubevpn/cmds/logs.go b/cmd/kubevpn/cmds/logs.go index 341902fd..7d0af0a0 100644 --- a/cmd/kubevpn/cmds/logs.go +++ b/cmd/kubevpn/cmds/logs.go @@ -14,6 +14,7 @@ import ( ) func CmdLogs(f cmdutil.Factory) *cobra.Command { + req := &rpc.LogRequest{} cmd := &cobra.Command{ Use: "logs", Short: i18n.T("Logs to kubernetes cluster network"), @@ -24,10 +25,7 @@ func CmdLogs(f cmdutil.Factory) *cobra.Command { return daemon.StartupDaemon(cmd.Context()) }, RunE: func(cmd *cobra.Command, args []string) error { - client, err := daemon.GetClient(true).Logs( - cmd.Context(), - &rpc.LogRequest{}, - ) + client, err := daemon.GetClient(true).Logs(cmd.Context(), req) if err != nil { return err } @@ -45,5 +43,6 @@ func CmdLogs(f cmdutil.Factory) *cobra.Command { return nil }, } + cmd.Flags().BoolVarP(&req.Follow, "follow", "f", false, "Specify if the logs should be streamed.") return cmd } diff --git a/cmd/kubevpn/cmds/proxy.go b/cmd/kubevpn/cmds/proxy.go index a7d12959..e88d6aba 100644 --- a/cmd/kubevpn/cmds/proxy.go +++ b/cmd/kubevpn/cmds/proxy.go @@ -110,7 +110,7 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command { if err == io.EOF { break } else if err == nil { - log.Print(resp.Message) + fmt.Fprint(os.Stdout, resp.Message) } else { return err } diff --git a/cmd/kubevpn/cmds/quit.go b/cmd/kubevpn/cmds/quit.go index d17772f9..ea9be665 100644 --- a/cmd/kubevpn/cmds/quit.go +++ b/cmd/kubevpn/cmds/quit.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "os" "github.com/spf13/cobra" cmdutil "k8s.io/kubectl/pkg/cmd/util" @@ -47,7 +48,7 @@ func quit(ctx context.Context, isSudo bool) error { if err == io.EOF { break } else if err == nil { - fmt.Print(resp.Message) + fmt.Fprint(os.Stdout, resp.Message) } else { return err } diff --git a/cmd/kubevpn/cmds/upgrade.go b/cmd/kubevpn/cmds/upgrade.go index c1a755c5..8e811f81 100644 --- a/cmd/kubevpn/cmds/upgrade.go +++ b/cmd/kubevpn/cmds/upgrade.go @@ -1,7 +1,9 @@ package cmds import ( + "fmt" "net/http" + "os" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -32,7 +34,7 @@ func CmdUpgrade(_ cmdutil.Factory) *cobra.Command { if err != nil { log.Fatal(err) } - println("Done") + fmt.Fprint(os.Stdout, "Done") }, } return cmd diff --git a/go.mod b/go.mod index 8bdf726e..7b299523 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/google/gopacket v1.1.19 github.com/google/uuid v1.3.0 github.com/hashicorp/go-version v1.6.0 + github.com/hpcloud/tail v1.0.0 github.com/kevinburke/ssh_config v1.2.0 github.com/libp2p/go-netroute v0.2.1 github.com/mattbaird/jsonpatch v0.0.0-20200820163806-098863c1fc24 @@ -200,7 +201,9 @@ require ( google.golang.org/api v0.109.0 // indirect google.golang.org/genproto v0.0.0-20230113154510-dbe35b8444a5 // indirect gopkg.in/DataDog/dd-trace-go.v1 v1.47.0 // indirect + gopkg.in/fsnotify.v1 v1.4.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect inet.af/netaddr v0.0.0-20220617031823-097006376321 // indirect k8s.io/apiextensions-apiserver v0.26.3 // indirect diff --git a/go.sum b/go.sum index eaaa3d5a..a608227b 100644 --- a/go.sum +++ b/go.sum @@ -613,6 +613,7 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -1538,6 +1539,7 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= @@ -1549,6 +1551,7 @@ gopkg.in/rethinkdb/rethinkdb-go.v6 v6.2.1/go.mod h1:WbjuEoo1oadwzQ4apSDU+JTvmllE gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/square/go-jose.v2 v2.3.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/daemon/action/connect.go b/pkg/daemon/action/connect.go index dad1847f..61cba6d1 100644 --- a/pkg/daemon/action/connect.go +++ b/pkg/daemon/action/connect.go @@ -76,8 +76,8 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe if !svr.IsSudo { return svr.redirectToSudoDaemon(req, resp) } - ctx := resp.Context() + ctx := resp.Context() out := newWarp(resp) file, err := os.OpenFile(GetDaemonLog(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) if err != nil { @@ -118,7 +118,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe go util.StartupPProf(config.PProfPort) defaultlog.Default().SetOutput(io.Discard) if transferImage { - err = util.TransferImage(ctx, sshConf, config.OriginImage, req.Image) + err = util.TransferImage(ctx, sshConf, config.OriginImage, req.Image, io.MultiWriter(out, file)) if err != nil { return err } @@ -155,7 +155,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe config.Image = req.Image err = svr.connect.DoConnect(sshCtx) if err != nil { - log.Errorln(err) + log.Error(err) svr.connect.Cleanup() return err } diff --git a/pkg/daemon/action/disconnect.go b/pkg/daemon/action/disconnect.go index 8d1e6a05..51d207f8 100644 --- a/pkg/daemon/action/disconnect.go +++ b/pkg/daemon/action/disconnect.go @@ -10,11 +10,11 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" ) -type DisconnectWarp struct { +type disconnectWarp struct { server rpc.Daemon_DisconnectServer } -func (r *DisconnectWarp) Write(p []byte) (n int, err error) { +func (r *disconnectWarp) Write(p []byte) (n int, err error) { err = r.server.Send(&rpc.DisconnectResponse{ Message: string(p), }) @@ -22,7 +22,7 @@ func (r *DisconnectWarp) Write(p []byte) (n int, err error) { } func newDisconnectWarp(server rpc.Daemon_DisconnectServer) io.Writer { - return &DisconnectWarp{server: server} + return &disconnectWarp{server: server} } func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_DisconnectServer) error { diff --git a/pkg/daemon/action/leave.go b/pkg/daemon/action/leave.go index 3437b249..2a840017 100644 --- a/pkg/daemon/action/leave.go +++ b/pkg/daemon/action/leave.go @@ -2,10 +2,12 @@ package action import ( "fmt" + "io" + log "github.com/sirupsen/logrus" + "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/pkg/handler" - "io" ) type leaveWarp struct { diff --git a/pkg/daemon/action/list.go b/pkg/daemon/action/list.go index 1f328924..6751b9a5 100644 --- a/pkg/daemon/action/list.go +++ b/pkg/daemon/action/list.go @@ -7,7 +7,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/yaml" - yaml2 "sigs.k8s.io/yaml" + k8syaml "sigs.k8s.io/yaml" "github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/controlplane" @@ -34,7 +34,7 @@ func (svr *Server) List(ctx context.Context, req *rpc.ListRequest) (*rpc.ListRes lastIndex := strings.LastIndex(virtual.Uid, ".") virtual.Uid = virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:] } - bytes, err := yaml2.Marshal(v) + bytes, err := k8syaml.Marshal(v) if err != nil { return nil, err } diff --git a/pkg/daemon/action/logs.go b/pkg/daemon/action/logs.go index ddf1fd0f..8154fa1d 100644 --- a/pkg/daemon/action/logs.go +++ b/pkg/daemon/action/logs.go @@ -1,36 +1,30 @@ package action import ( - "io" - - log "github.com/sirupsen/logrus" + "github.com/hpcloud/tail" "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" ) -type logWarp struct { - server rpc.Daemon_LogsServer -} - -func (r *logWarp) Write(p []byte) (n int, err error) { - err = r.server.Send(&rpc.LogResponse{ - Message: string(p), - }) - return len(p), err -} - -func newLogWarp(server rpc.Daemon_LogsServer) io.Writer { - return &logWarp{server: server} -} - func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error { - out := newLogWarp(resp) - origin := log.StandardLogger().Out - defer func() { - log.SetOutput(origin) - }() - multiWriter := io.MultiWriter(origin, out) - log.SetOutput(multiWriter) - <-resp.Context().Done() - return nil + path := GetDaemonLog() + config := tail.Config{Follow: true, ReOpen: true, MustExist: true} + file, err := tail.TailFile(path, config) + if err != nil { + return err + } + for { + select { + case <-resp.Context().Done(): + return nil + case line := <-file.Lines: + if line.Err != nil { + return err + } + err = resp.Send(&rpc.LogResponse{Message: line.Text}) + if err != nil { + return err + } + } + } } diff --git a/pkg/daemon/action/proxy.go b/pkg/daemon/action/proxy.go index e799fcec..b2b37a19 100644 --- a/pkg/daemon/action/proxy.go +++ b/pkg/daemon/action/proxy.go @@ -3,12 +3,12 @@ package action import ( "context" "fmt" - "github.com/spf13/pflag" "io" "os" "time" log "github.com/sirupsen/logrus" + "github.com/spf13/pflag" "github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" diff --git a/pkg/daemon/rpc/daemon.pb.go b/pkg/daemon/rpc/daemon.pb.go index 2eb460b9..be1c12d7 100644 --- a/pkg/daemon/rpc/daemon.pb.go +++ b/pkg/daemon/rpc/daemon.pb.go @@ -428,6 +428,8 @@ type LogRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + Follow bool `protobuf:"varint,1,opt,name=Follow,proto3" json:"Follow,omitempty"` } func (x *LogRequest) Reset() { @@ -462,6 +464,13 @@ func (*LogRequest) Descriptor() ([]byte, []int) { return file_daemon_proto_rawDescGZIP(), []int{6} } +func (x *LogRequest) GetFollow() bool { + if x != nil { + return x.Follow + } + return false +} + type LogResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -827,52 +836,54 @@ var file_daemon_proto_rawDesc = []byte{ 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x2a, 0x0a, 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x22, 0x0c, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x22, 0x27, 0x0a, 0x0b, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x13, 0x0a, 0x11, 0x44, 0x69, - 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, - 0x2e, 0x0a, 0x12, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, - 0x0d, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x28, - 0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, - 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x2c, 0x0a, 0x0c, 0x4c, 0x65, 0x61, 0x76, - 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x57, 0x6f, 0x72, 0x6b, - 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x57, 0x6f, 0x72, - 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x22, 0x29, 0x0a, 0x0d, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x32, 0xb5, 0x03, 0x0a, 0x06, 0x44, 0x61, 0x65, 0x6d, 0x6f, 0x6e, 0x12, 0x38, 0x0a, 0x07, - 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x13, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6f, - 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x72, - 0x70, 0x63, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x05, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, + 0x61, 0x67, 0x65, 0x22, 0x24, 0x0a, 0x0a, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x16, 0x0a, 0x06, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x06, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x22, 0x27, 0x0a, 0x0b, 0x4c, 0x6f, 0x67, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x22, 0x13, 0x0a, 0x11, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x2e, 0x0a, 0x12, 0x44, 0x69, 0x73, 0x63, 0x6f, + 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x28, 0x0a, 0x0c, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x22, 0x2c, 0x0a, 0x0c, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x1c, 0x0a, 0x09, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x09, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x22, 0x29, + 0x0a, 0x0d, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xb5, 0x03, 0x0a, 0x06, 0x44, 0x61, + 0x65, 0x6d, 0x6f, 0x6e, 0x12, 0x38, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x13, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x41, - 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x16, 0x2e, 0x72, - 0x70, 0x63, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, - 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, - 0x01, 0x12, 0x2d, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x0f, 0x2e, 0x72, 0x70, 0x63, 0x2e, - 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x72, 0x70, 0x63, - 0x2e, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, - 0x12, 0x33, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x12, 0x2e, 0x72, 0x70, 0x63, - 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, - 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x2f, 0x0a, 0x04, 0x51, 0x75, 0x69, 0x74, 0x12, 0x10, 0x2e, - 0x72, 0x70, 0x63, 0x2e, 0x51, 0x75, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x11, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x51, 0x75, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x10, - 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x11, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x32, 0x0a, 0x05, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x12, 0x11, - 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x12, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, - 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x36, + 0x0a, 0x05, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x13, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6f, + 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x72, + 0x70, 0x63, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x41, 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x12, 0x16, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, + 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x72, + 0x70, 0x63, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x04, 0x4c, 0x6f, 0x67, + 0x73, 0x12, 0x0f, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x33, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x12, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x2f, 0x0a, + 0x04, 0x51, 0x75, 0x69, 0x74, 0x12, 0x10, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x51, 0x75, 0x69, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x51, 0x75, + 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x2d, + 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x10, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x73, + 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, + 0x69, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x32, 0x0a, + 0x05, 0x4c, 0x65, 0x61, 0x76, 0x65, 0x12, 0x11, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x65, 0x61, + 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x72, 0x70, 0x63, 0x2e, + 0x4c, 0x65, 0x61, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, + 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( diff --git a/pkg/daemon/rpc/daemon.proto b/pkg/daemon/rpc/daemon.proto index 0f5af02d..7fe7b71f 100644 --- a/pkg/daemon/rpc/daemon.proto +++ b/pkg/daemon/rpc/daemon.proto @@ -60,6 +60,7 @@ message StatusResponse { } message LogRequest { + bool Follow = 1; } message LogResponse { diff --git a/pkg/dev/main.go b/pkg/dev/main.go index dd6d89eb..9a5a6688 100644 --- a/pkg/dev/main.go +++ b/pkg/dev/main.go @@ -386,13 +386,13 @@ func DoDev(ctx context.Context, devOption *Options, flags *pflag.FlagSet, f cmdu } // connect to cluster, in container or host - finalFunc, err := devOption.doConnect(ctx, f, transferImage) + cancel, err := devOption.doConnect(ctx, f, transferImage) if err != nil { return err } defer func() { - if finalFunc != nil { - finalFunc() + if cancel != nil { + cancel() } }() @@ -416,14 +416,16 @@ func DoDev(ctx context.Context, devOption *Options, flags *pflag.FlagSet, f cmdu if err != nil { return err } - if err = validateAPIVersion(tempContainerConfig, dockerCli.Client().ClientVersion()); err != nil { + err = validateAPIVersion(tempContainerConfig, dockerCli.Client().ClientVersion()) + if err != nil { return err } return devOption.Main(ctx, tempContainerConfig) } -func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImage bool) (ff func(), err error) { +// connect to cluster network on docker container or host +func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImage bool) (cancel func(), err error) { connect := &handler.ConnectOptions{ Headers: d.Headers, Workloads: []string{d.Workload}, @@ -457,30 +459,15 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImag switch d.ConnectMode { case ConnectModeHost: - daemonClient := daemon.GetClient(true) - ff = func() { - connectClient, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{}) - if err != nil { - log.Error(err) - return - } - for { - recv, err := connectClient.Recv() - if err == io.EOF { - return - } else if err != nil { - log.Error(err) - return - } - log.Print(recv.Message) - } - } + daemonCli := daemon.GetClient(false) var kubeconfig []byte var ns string kubeconfig, ns, err = util.ConvertToKubeconfigBytes(f) if err != nil { return } + // not needs to ssh jump in daemon, because dev mode will hang up until user exit, + // so just ssh jump in client is enough req := &rpc.ConnectRequest{ KubeconfigBytes: string(kubeconfig), Namespace: ns, @@ -494,71 +481,92 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImag Image: config.Image, Level: int32(log.DebugLevel), } - var connectClient rpc.Daemon_ConnectClient - connectClient, err = daemonClient.Connect(ctx, req) + cancel = disconnect(ctx, daemonCli) + var resp rpc.Daemon_ConnectClient + resp, err = daemonCli.Connect(ctx, req) if err != nil { return } for { - recv, err := connectClient.Recv() + recv, err := resp.Recv() if err == io.EOF { - return ff, nil + return cancel, nil } else if err != nil { - return ff, err + return cancel, err } log.Print(recv.Message) } case ConnectModeContainer: - var connectContainer *RunConfig var path string path, err = connect.GetKubeconfigPath() if err != nil { return } - var platform *specs.Platform + var platform specs.Platform if d.Options.Platform != "" { - var p specs.Platform - p, err = platforms.Parse(d.Options.Platform) + platform, err = platforms.Parse(d.Options.Platform) if err != nil { return nil, pkgerr.Wrap(err, "error parsing specified platform") } - platform = &p } - connectContainer, err = createConnectContainer(d.NoProxy, *connect, path, d.Cli, platform) + + var connectContainer *RunConfig + connectContainer, err = createConnectContainer(d.NoProxy, *connect, path, d.Cli, &platform) if err != nil { return } - ctx, cancel := context.WithCancel(ctx) - defer cancel() + cancelCtx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() var id string - id, err = run(ctx, connectContainer, d.Cli, d.DockerCli) + id, err = run(cancelCtx, connectContainer, d.Cli, d.DockerCli) if err != nil { return } - h := interrupt.New(func(signal os.Signal) { - os.Exit(0) - }, func() { - cancel() - _ = d.Cli.ContainerKill(ctx, id, "SIGTERM") - _ = runLogsSinceNow(d.DockerCli, id) - }) + h := interrupt.New( + func(signal os.Signal) { return }, + func() { + cancelFunc() + _ = d.Cli.ContainerKill(context.Background(), id, "SIGTERM") + _ = runLogsSinceNow(d.DockerCli, id) + }, + ) go h.Run(func() error { select {} }) defer h.Close() - if err = runLogsWaitRunning(ctx, d.DockerCli, id); err != nil { + err = runLogsWaitRunning(cancelCtx, d.DockerCli, id) + if err != nil { // interrupt by signal KILL - if ctx.Err() == context.Canceled { - return nil, nil + if errors.Is(err, context.Canceled) { + err = nil + return } - return nil, err - } - if err = d.Copts.netMode.Set("container:" + id); err != nil { - return nil, err + return } + err = d.Copts.netMode.Set(fmt.Sprintf("container:%s", id)) + return default: return nil, fmt.Errorf("unsupport connect mode: %s", d.ConnectMode) } - return nil, nil +} + +func disconnect(ctx context.Context, daemonClient rpc.DaemonClient) func() { + return func() { + resp, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{}) + if err != nil { + log.Error(err) + return + } + for { + msg, err := resp.Recv() + if err == io.EOF { + return + } else if err != nil { + log.Error(err) + return + } + log.Print(msg.Message) + } + } } func createConnectContainer(devOptions bool, connect handler.ConnectOptions, path string, cli *client.Client, platform *specs.Platform) (*RunConfig, error) { @@ -675,35 +683,37 @@ func runLogsWaitRunning(ctx context.Context, dockerCli command.Cli, container st ShowStderr: true, Follow: true, } - responseBody, err := dockerCli.Client().ContainerLogs(ctx, c.ID, options) + logStream, err := dockerCli.Client().ContainerLogs(ctx, c.ID, options) if err != nil { return err } - defer responseBody.Close() + defer logStream.Close() buf := bytes.NewBuffer(nil) - writer := io.MultiWriter(buf, dockerCli.Out()) + w := io.MultiWriter(buf, dockerCli.Out()) - var errChan = make(chan error) - var stopChan = make(chan struct{}) + cancel, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for range ticker.C { + t := time.NewTicker(time.Second) + defer t.Stop() + for range t.C { + // keyword, maybe can find another way more elegant if strings.Contains(buf.String(), "enjoy it") { - close(stopChan) + cancelFunc() return } } }() + var errChan = make(chan error) go func() { var err error if c.Config.Tty { - _, err = io.Copy(writer, responseBody) + _, err = io.Copy(w, logStream) } else { - _, err = stdcopy.StdCopy(writer, dockerCli.Err(), responseBody) + _, err = stdcopy.StdCopy(w, dockerCli.Err(), logStream) } if err != nil { errChan <- err @@ -713,7 +723,7 @@ func runLogsWaitRunning(ctx context.Context, dockerCli command.Cli, container st select { case err = <-errChan: return err - case <-stopChan: + case <-cancel.Done(): return nil } } diff --git a/pkg/dev/run.go b/pkg/dev/run.go index ac763db7..c7214d7e 100644 --- a/pkg/dev/run.go +++ b/pkg/dev/run.go @@ -47,7 +47,8 @@ func run(ctx context.Context, runConfig *RunConfig, cli *client.Client, c *comma } } if needPull { - if err = util.PullImage(ctx, runConfig.platform, cli, c, config.Image); err != nil { + err = util.PullImage(ctx, runConfig.platform, cli, c, config.Image, nil) + if err != nil { return } } diff --git a/pkg/handler/cleaner.go b/pkg/handler/cleaner.go index 917700d6..37ed71fa 100644 --- a/pkg/handler/cleaner.go +++ b/pkg/handler/cleaner.go @@ -50,22 +50,25 @@ func (c *ConnectOptions) Cleanup() { if c.localTunIPv6 != nil && c.localTunIPv6.IP != nil { ips = append(ips, c.localTunIPv6.IP) } - err := c.dhcp.ReleaseIP(ctx, ips...) - if err != nil { - log.Errorf("failed to release ip to dhcp, err: %v", err) - } - _ = c.clientset.CoreV1().Pods(c.Namespace).Delete(ctx, config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}) - var count int - count, err = updateRefCount(ctx, c.clientset.CoreV1().ConfigMaps(c.Namespace), config.ConfigMapPodTrafficManager, -1) - // only if ref is zero and deployment is not ready, needs to clean up - if err == nil && count <= 0 { - deployment, errs := c.clientset.AppsV1().Deployments(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, v1.GetOptions{}) - if errs == nil && deployment.Status.UnavailableReplicas != 0 { - cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, true) + if c.dhcp != nil { + err := c.dhcp.ReleaseIP(ctx, ips...) + if err != nil { + log.Errorf("failed to release ip to dhcp, err: %v", err) } } - if err != nil { - log.Errorf("can not update ref-count: %v", err) + if c.clientset != nil { + _ = c.clientset.CoreV1().Pods(c.Namespace).Delete(ctx, config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}) + count, err := updateRefCount(ctx, c.clientset.CoreV1().ConfigMaps(c.Namespace), config.ConfigMapPodTrafficManager, -1) + // only if ref is zero and deployment is not ready, needs to clean up + if err == nil && count <= 0 { + deployment, errs := c.clientset.AppsV1().Deployments(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, v1.GetOptions{}) + if errs == nil && deployment.Status.UnavailableReplicas != 0 { + cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, true) + } + } + if err != nil { + log.Errorf("can not update ref-count: %v", err) + } } for _, function := range RollbackFuncList { if function != nil { diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 7464bd86..7a2c5325 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -458,7 +458,7 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) (err error) { func() { defer func() { if er := recover(); er != nil { - log.Errorln(er) + log.Error(er) } }() w, errs := c.clientset.CoreV1().Pods(podNs).Watch(ctx, metav1.ListOptions{ @@ -590,7 +590,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error { const port = 53 pod, err := c.GetRunningPodList(ctx) if err != nil { - log.Errorln(err) + log.Error(err) return err } relovConf, err := dns.GetDNSServiceIPFromPod(c.clientset, c.restclient, c.config, pod[0].GetName(), c.Namespace) diff --git a/pkg/util/image.go b/pkg/util/image.go index a32270a1..11135c54 100644 --- a/pkg/util/image.go +++ b/pkg/util/image.go @@ -44,16 +44,14 @@ func GetClient() (*client.Client, *command.DockerCli, error) { // TransferImage // 1) if not special ssh config, just pull image and tag and push // 2) if special ssh config, pull image, tag image, save image and scp image to remote, load image and push -func TransferImage(ctx context.Context, conf *SshConfig, from, to string) error { +func TransferImage(ctx context.Context, conf *SshConfig, from, to string, out io.Writer) error { cli, c, err := GetClient() if err != nil { return fmt.Errorf("failed to get docker client: %v", err) } // todo add flags? or detect k8s node runtime ? - err = PullImage(ctx, &v1.Platform{ - Architecture: "amd64", - OS: "linux", - }, cli, c, from) + platform := &v1.Platform{Architecture: "amd64", OS: "linux"} + err = PullImage(ctx, platform, cli, c, from, out) if err != nil { return fmt.Errorf("failed to pull image: %v", err) } @@ -91,9 +89,11 @@ func TransferImage(ctx context.Context, conf *SshConfig, from, to string) error return err } defer readCloser.Close() - _, stdout, _ := term.StdStreams() - out := streams.NewOut(stdout) - err = jsonmessage.DisplayJSONMessagesToStream(readCloser, out, nil) + if out == nil { + _, out, _ = term.StdStreams() + } + outWarp := streams.NewOut(out) + err = jsonmessage.DisplayJSONMessagesToStream(readCloser, outWarp, nil) if err != nil { err = fmt.Errorf("can not display message, err: %v", err) return err @@ -122,13 +122,12 @@ func TransferImage(ctx context.Context, conf *SshConfig, from, to string) error defer os.Remove(file.Name()) logrus.Infof("Transfering image %s", to) - err = SCP(conf, file.Name(), []string{ - fmt.Sprintf( - "(docker load image -i kubevpndir/%s && docker push %s) || (nerdctl image load -i kubevpndir/%s && nerdctl image push %s)", - filepath.Base(file.Name()), to, - filepath.Base(file.Name()), to, - ), - }...) + cmd := fmt.Sprintf( + "(docker load image -i kubevpndir/%s && docker push %s) || (nerdctl image load -i kubevpndir/%s && nerdctl image push %s)", + filepath.Base(file.Name()), to, + filepath.Base(file.Name()), to, + ) + err = SCP(conf, file.Name(), []string{cmd}...) if err != nil { return err } @@ -136,7 +135,7 @@ func TransferImage(ctx context.Context, conf *SshConfig, from, to string) error return nil } -func PullImage(ctx context.Context, platform *v1.Platform, cli *client.Client, c *command.DockerCli, img string) error { +func PullImage(ctx context.Context, platform *v1.Platform, cli *client.Client, c *command.DockerCli, img string, out io.Writer) error { var readCloser io.ReadCloser var plat string if platform != nil && platform.Architecture != "" && platform.OS != "" { @@ -168,9 +167,11 @@ func PullImage(ctx context.Context, platform *v1.Platform, cli *client.Client, c return err } defer readCloser.Close() - _, stdout, _ := term.StdStreams() - out := streams.NewOut(stdout) - err = jsonmessage.DisplayJSONMessagesToStream(readCloser, out, nil) + if out == nil { + _, out, _ = term.StdStreams() + } + outWarp := streams.NewOut(out) + err = jsonmessage.DisplayJSONMessagesToStream(readCloser, outWarp, nil) if err != nil { err = fmt.Errorf("can not display message, err: %v", err) return err diff --git a/pkg/util/ns.go b/pkg/util/ns.go index 6f625de1..40527072 100644 --- a/pkg/util/ns.go +++ b/pkg/util/ns.go @@ -9,6 +9,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" v12 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/tools/clientcmd/api/latest" cmdutil "k8s.io/kubectl/pkg/cmd/util" @@ -42,18 +43,22 @@ func IsSameCluster(client v12.ConfigMapInterface, namespace string, clientB v12. func ConvertToKubeconfigBytes(factory cmdutil.Factory) ([]byte, string, error) { loader := factory.ToRawKubeConfigLoader() - namespace, _, err2 := loader.Namespace() - if err2 != nil { - return nil, "", err2 + namespace, _, err := loader.Namespace() + if err != nil { + return nil, "", err } rawConfig, err := loader.RawConfig() + err = api.FlattenConfig(&rawConfig) + if err != nil { + return nil, "", err + } convertedObj, err := latest.Scheme.ConvertToVersion(&rawConfig, latest.ExternalVersion) if err != nil { return nil, "", err } - marshal, err2 := json.Marshal(convertedObj) - if err2 != nil { - return nil, "", err2 + marshal, err := json.Marshal(convertedObj) + if err != nil { + return nil, "", err } return marshal, namespace, nil } diff --git a/pkg/util/scp.go b/pkg/util/scp.go index 2ac85337..9667a38c 100644 --- a/pkg/util/scp.go +++ b/pkg/util/scp.go @@ -5,6 +5,7 @@ import ( "io" "os" "path/filepath" + "time" "github.com/schollz/progressbar/v3" log "github.com/sirupsen/logrus" @@ -29,6 +30,7 @@ func SCP(conf *SshConfig, filename string, commands ...string) error { User: conf.User, Auth: auth, HostKeyCallback: ssh.InsecureIgnoreHostKey(), + Timeout: time.Second * 10, } remote, err = ssh.Dial("tcp", conf.Addr, sshConfig) } @@ -52,10 +54,10 @@ func SCP(conf *SshConfig, filename string, commands ...string) error { for _, command := range commands { output, err := sess.CombinedOutput(command) if err != nil { - fmt.Fprint(os.Stderr, string(output)) + log.Error(string(output)) return err } else { - fmt.Fprint(os.Stdout, string(output)) + log.Info(string(output)) } } return nil diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index 8430d684..5af5fd69 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -48,6 +48,7 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co User: conf.User, Auth: auth, HostKeyCallback: ssh.InsecureIgnoreHostKey(), + Timeout: time.Second * 10, } // Connect to SSH remote server using serverEndpoint remote, err = ssh.Dial("tcp", conf.Addr, sshConfig) @@ -122,6 +123,7 @@ func Run(conf *SshConfig, cmd string, env []string) (output []byte, errOut []byt User: conf.User, Auth: auth, HostKeyCallback: ssh.InsecureIgnoreHostKey(), + Timeout: time.Second * 10, } // Connect to SSH remote server using serverEndpoint remote, err = ssh.Dial("tcp", conf.Addr, sshConfig)