feat: log more info

This commit is contained in:
fengcaiwen
2023-09-22 21:15:42 +08:00
committed by naison
parent 5c818af126
commit 76f1b74076
39 changed files with 168 additions and 124 deletions

View File

@@ -93,7 +93,7 @@ container-local: kubevpn-linux-amd64
.PHONY: container-test .PHONY: container-test
container-test: kubevpn-linux-amd64 container-test: kubevpn-linux-amd64
docker buildx build --platform linux/amd64,linux/arm64 -t docker.io/naison/kubevpn:test -f $(BUILD_DIR)/test.Dockerfile --push . docker buildx build --platform linux/amd64,linux/arm64 -t docker.io/naison/kubevpn:test2309172253 -f $(BUILD_DIR)/test.Dockerfile --push .
.PHONY: version .PHONY: version
version: version:

View File

@@ -119,7 +119,7 @@ func CmdClone(f cmdutil.Factory) *cobra.Command {
} else if err != nil { } else if err != nil {
return err return err
} }
log.Print(recv.GetMessage()) fmt.Fprint(os.Stdout, recv.GetMessage())
} }
util.Print(os.Stdout, "Now clone workloads running successfully on other cluster, enjoy it :)") util.Print(os.Stdout, "Now clone workloads running successfully on other cluster, enjoy it :)")
return nil return nil

View File

@@ -80,7 +80,7 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command {
} else if err != nil { } else if err != nil {
return err return err
} }
log.Print(recv.GetMessage()) fmt.Fprint(os.Stdout, recv.GetMessage())
} }
util.Print(os.Stdout, "Now you can access resources in the kubernetes cluster, enjoy it :)") util.Print(os.Stdout, "Now you can access resources in the kubernetes cluster, enjoy it :)")
// hangup // hangup

View File

@@ -72,6 +72,7 @@ Startup your kubernetes workloads in local Docker container with same volume、e
Args: dockercli.RequiresMinArgs(1), Args: dockercli.RequiresMinArgs(1),
DisableFlagsInUseLine: true, DisableFlagsInUseLine: true,
PreRunE: func(cmd *cobra.Command, args []string) error { PreRunE: func(cmd *cobra.Command, args []string) error {
util.InitLogger(false)
// not support temporally // not support temporally
if devOptions.Engine == config.EngineGvisor { if devOptions.Engine == config.EngineGvisor {
return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw) return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw)

View File

@@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"time"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@@ -24,34 +23,29 @@ func CmdDisconnect(f cmdutil.Factory) *cobra.Command {
Long: templates.LongDesc(i18n.T(`Disconnect from kubernetes cluster network`)), Long: templates.LongDesc(i18n.T(`Disconnect from kubernetes cluster network`)),
Example: templates.Examples(i18n.T(``)), Example: templates.Examples(i18n.T(``)),
PreRunE: func(cmd *cobra.Command, args []string) (err error) { PreRunE: func(cmd *cobra.Command, args []string) (err error) {
t := time.Now()
err = daemon.StartupDaemon(cmd.Context()) err = daemon.StartupDaemon(cmd.Context())
fmt.Printf("exec prerun use %s\n", time.Now().Sub(t).String())
return err return err
}, },
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
now := time.Now()
client, err := daemon.GetClient(false).Disconnect( client, err := daemon.GetClient(false).Disconnect(
cmd.Context(), cmd.Context(),
&rpc.DisconnectRequest{}, &rpc.DisconnectRequest{},
) )
fmt.Printf("call api disconnect use %s\n", time.Now().Sub(now).String())
if err != nil {
return err
}
var resp *rpc.DisconnectResponse var resp *rpc.DisconnectResponse
for { for {
resp, err = client.Recv() resp, err = client.Recv()
if err == io.EOF { if err == io.EOF {
return nil break
} else if err == nil { } else if err == nil {
fmt.Fprint(os.Stdout, resp.Message) fmt.Fprint(os.Stdout, resp.Message)
} else if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Canceled { } else if code := status.Code(err); code == codes.DeadlineExceeded || code == codes.Canceled {
return nil break
} else { } else {
return err return err
} }
} }
fmt.Fprint(os.Stdout, "disconnect successfully")
return nil
}, },
} }
return cmd return cmd

View File

@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -122,14 +121,9 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command {
// leave from cluster resources // leave from cluster resources
<-cmd.Context().Done() <-cmd.Context().Done()
now := time.Now()
stream, err := cli.Leave(context.Background(), &rpc.LeaveRequest{ stream, err := cli.Leave(context.Background(), &rpc.LeaveRequest{
Workloads: args, Workloads: args,
}) })
fmt.Printf("call api leave use %s\n", time.Now().Sub(now).String())
if err != nil {
return err
}
var resp *rpc.LeaveResponse var resp *rpc.LeaveResponse
for { for {
resp, err = stream.Recv() resp, err = stream.Recv()

View File

@@ -29,6 +29,7 @@ func CmdQuit(f cmdutil.Factory) *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
_ = quit(cmd.Context(), true) _ = quit(cmd.Context(), true)
_ = quit(cmd.Context(), false) _ = quit(cmd.Context(), false)
fmt.Fprint(os.Stdout, "quit success")
return nil return nil
}, },
} }

View File

@@ -50,7 +50,7 @@ func CmdReset(factory cmdutil.Factory) *cobra.Command {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Fprint(os.Stdout, "Done") fmt.Fprint(os.Stdout, "done")
}, },
} }

View File

@@ -38,7 +38,7 @@ func CmdServe(_ cmdutil.Factory) *cobra.Command {
defer func() { defer func() {
err := handler.ReleaseIPIfNeeded() err := handler.ReleaseIPIfNeeded()
if err != nil { if err != nil {
log.Error(err) log.Errorf("release ip failed: %v", err)
} }
}() }()
servers, err := handler.Parse(*route) servers, err := handler.Parse(*route)

View File

@@ -34,7 +34,7 @@ func CmdUpgrade(_ cmdutil.Factory) *cobra.Command {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Fprint(os.Stdout, "Done") fmt.Fprint(os.Stdout, "done")
}, },
} }
return cmd return cmd

View File

@@ -104,7 +104,12 @@ func (e *tunEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
bytes := config.LPool.Get().([]byte)[:] bytes := config.LPool.Get().([]byte)[:]
read, err := e.tun.Read(bytes[:]) read, err := e.tun.Read(bytes[:])
if err != nil { if err != nil {
log.Warningln(err) // if context is still going
if e.ctx.Err() == nil {
log.Errorf("[TUN]: read from tun failed: %s", err.Error())
} else {
log.Info("tun device closed")
}
return return
} }
if read == 0 { if read == 0 {
@@ -122,7 +127,7 @@ func (e *tunEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
protocol = header.IPv4ProtocolNumber protocol = header.IPv4ProtocolNumber
ipHeader, err := ipv4.ParseHeader(bytes[:read]) ipHeader, err := ipv4.ParseHeader(bytes[:read])
if err != nil { if err != nil {
log.Error(err) log.Errorf("parse ipv4 header failed: %s", err.Error())
continue continue
} }
ipProtocol = ipHeader.Protocol ipProtocol = ipHeader.Protocol
@@ -132,7 +137,7 @@ func (e *tunEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
protocol = header.IPv6ProtocolNumber protocol = header.IPv6ProtocolNumber
ipHeader, err := ipv6.ParseHeader(bytes[:read]) ipHeader, err := ipv6.ParseHeader(bytes[:read])
if err != nil { if err != nil {
log.Error(err) log.Errorf("parse ipv6 header failed: %s", err.Error())
continue continue
} }
ipProtocol = ipHeader.NextHeader ipProtocol = ipHeader.NextHeader

View File

@@ -260,14 +260,14 @@ func heartbeats(in chan<- *DataElem) {
if bytes == nil { if bytes == nil {
bytes, err = genICMPPacket(srcIPv4, config.RouterIP) bytes, err = genICMPPacket(srcIPv4, config.RouterIP)
if err != nil { if err != nil {
log.Error(err) log.Errorf("generate ipv4 packet error: %s", err.Error())
continue continue
} }
} }
if bytes6 == nil { if bytes6 == nil {
bytes6, err = genICMPPacketIPv6(srcIPv6, config.RouterIP6) bytes6, err = genICMPPacketIPv6(srcIPv6, config.RouterIP6)
if err != nil { if err != nil {
log.Error(err) log.Errorf("generate ipv6 packet error: %s", err.Error())
continue continue
} }
} }
@@ -353,7 +353,7 @@ func (d *Device) Start(ctx context.Context) {
select { select {
case err := <-d.chExit: case err := <-d.chExit:
log.Error(err) log.Errorf("device exit: %s", err.Error())
return return
case <-ctx.Done(): case <-ctx.Done():
return return

View File

@@ -133,7 +133,7 @@ func (d *ClientDevice) Start(ctx context.Context) {
select { select {
case err := <-d.chExit: case err := <-d.chExit:
log.Error(err) log.Errorf("[tun-client]: %v", err)
return return
case <-ctx.Done(): case <-ctx.Done():
return return

View File

@@ -183,7 +183,7 @@ func (o *CopyOptions) copyToPod(src, dest fileSpec, options *exec.ExecOptions) e
go func(src localPath, dest remotePath, writer io.WriteCloser) { go func(src localPath, dest remotePath, writer io.WriteCloser) {
defer writer.Close() defer writer.Close()
if err := makeTar(src, dest, writer); err != nil { if err := makeTar(src, dest, writer); err != nil {
log.Error(err) log.Errorf("Error making tar: %v", err)
} }
}(srcFile, destFile, writer) }(srcFile, destFile, writer)
var cmdArr []string var cmdArr []string
@@ -266,7 +266,7 @@ func (t *TarPipe) initReadFrom(n uint64) {
go func() { go func() {
defer t.outStream.Close() defer t.outStream.Close()
if err := t.o.execute(options); err != nil { if err := t.o.execute(options); err != nil {
log.Error(err) log.Errorf("Error executing command: %v", err)
} }
}() }()
} }

View File

@@ -87,11 +87,13 @@ func (svr *Server) Clone(req *rpc.CloneRequest, resp rpc.Daemon_CloneServer) err
f := InitFactory(req.KubeconfigBytes, req.Namespace) f := InitFactory(req.KubeconfigBytes, req.Namespace)
err = options.InitClient(f) err = options.InitClient(f)
if err != nil { if err != nil {
log.Errorf("init client failed: %v", err)
return err return err
} }
config.Image = req.Image config.Image = req.Image
err = options.DoClone(resp.Context()) err = options.DoClone(resp.Context())
if err != nil { if err != nil {
log.Errorf("clone workloads failed: %v", err)
return err return err
} }
svr.clone = options svr.clone = options
@@ -112,13 +114,3 @@ func (r *cloneWarp) Write(p []byte) (n int, err error) {
func newCloneWarp(server rpc.Daemon_CloneServer) io.Writer { func newCloneWarp(server rpc.Daemon_CloneServer) io.Writer {
return &cloneWarp{server: server} return &cloneWarp{server: server}
} }
//type daemonConnectServer struct {
// out io.Writer
// grpc.ServerStream
//}
//
//func (d *daemonConnectServer) Send(response *rpc.ConnectResponse) error {
// _, err := d.out.Write([]byte(response.Message))
// return err
//}

View File

@@ -96,7 +96,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
config.Image = req.Image config.Image = req.Image
err = svr.connect.DoConnect(sshCtx) err = svr.connect.DoConnect(sshCtx)
if err != nil { if err != nil {
log.Error(err) log.Infof("do connect error: %v", err)
svr.connect.Cleanup() svr.connect.Cleanup()
return err return err
} }
@@ -150,7 +150,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
) )
if err == nil && isSameCluster && svr.connect.Equal(connect) { if err == nil && isSameCluster && svr.connect.Equal(connect) {
// same cluster, do nothing // same cluster, do nothing
log.Debugf("already connect to cluster") log.Infof("already connect to cluster")
return nil return nil
} }
} }

View File

@@ -24,9 +24,7 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon
for { for {
recv, err = connResp.Recv() recv, err = connResp.Recv()
if err == io.EOF { if err == io.EOF {
svr.t = time.Time{} break
svr.connect = nil
return nil
} else if err != nil { } else if err != nil {
return err return err
} }

View File

@@ -19,6 +19,7 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err
log.SetOutput(out) log.SetOutput(out)
log.SetLevel(log.InfoLevel) log.SetLevel(log.InfoLevel)
if svr.connect == nil { if svr.connect == nil {
log.Infof("not proxy any resource in cluster")
return fmt.Errorf("not proxy any resource in cluster") return fmt.Errorf("not proxy any resource in cluster")
} }
@@ -27,10 +28,13 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err
maps := svr.connect.GetClientset().CoreV1().ConfigMaps(namespace) maps := svr.connect.GetClientset().CoreV1().ConfigMaps(namespace)
for _, workload := range req.GetWorkloads() { for _, workload := range req.GetWorkloads() {
// add rollback func to remove envoy config // add rollback func to remove envoy config
log.Infof("leave workload %s", workload)
err := handler.UnPatchContainer(factory, maps, namespace, workload, svr.connect.GetLocalTunIPv4()) err := handler.UnPatchContainer(factory, maps, namespace, workload, svr.connect.GetLocalTunIPv4())
if err != nil { if err != nil {
log.Error(err) log.Errorf("leave workload %s failed: %v", workload, err)
continue
} }
log.Infof("leave workload %s success", workload)
} }
return nil return nil
} }

View File

@@ -15,7 +15,7 @@ import (
) )
func (svr *Server) List(ctx context.Context, req *rpc.ListRequest) (*rpc.ListResponse, error) { func (svr *Server) List(ctx context.Context, req *rpc.ListRequest) (*rpc.ListResponse, error) {
if svr.connect == nil { if svr.connect == nil || svr.connect.GetClientset() == nil {
return nil, fmt.Errorf("not connect to any cluster") return nil, fmt.Errorf("not connect to any cluster")
} }
mapInterface := svr.connect.GetClientset().CoreV1().ConfigMaps(svr.connect.Namespace) mapInterface := svr.connect.GetClientset().CoreV1().ConfigMaps(svr.connect.Namespace)

View File

@@ -74,9 +74,9 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
) )
if err == nil && isSameCluster && svr.connect.Equal(connect) { if err == nil && isSameCluster && svr.connect.Equal(connect) {
// same cluster, do nothing // same cluster, do nothing
log.Debugf("already connect to cluster") log.Infof("already connect to cluster")
} else { } else {
log.Debugf("try to disconnect from another cluster") log.Infof("try to disconnect from another cluster")
var disconnect rpc.Daemon_DisconnectClient var disconnect rpc.Daemon_DisconnectClient
disconnect, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{}) disconnect, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{})
if err != nil { if err != nil {
@@ -95,11 +95,12 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
return err return err
} }
} }
log.SetOutput(out)
} }
} }
if svr.connect == nil { if svr.connect == nil {
log.Debugf("connectting to cluster") log.Infof("connectting to cluster")
var connResp rpc.Daemon_ConnectClient var connResp rpc.Daemon_ConnectClient
connResp, err = daemonClient.Connect(ctx, req) connResp, err = daemonClient.Connect(ctx, req)
if err != nil { if err != nil {
@@ -118,14 +119,14 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
return err return err
} }
} }
log.SetOutput(out)
} }
log.Debugf("proxy resource...")
err = svr.connect.CreateRemoteInboundPod(ctx) err = svr.connect.CreateRemoteInboundPod(ctx)
if err != nil { if err != nil {
log.Errorf("create remote inbound pod failed: %s", err.Error())
return err return err
} }
log.Debugf("proxy resource done")
util.Print(out, "Now you can access resources in the kubernetes cluster, enjoy it :)") util.Print(out, "Now you can access resources in the kubernetes cluster, enjoy it :)")
return nil return nil
} }

View File

@@ -16,12 +16,14 @@ func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error
log.SetOutput(io.MultiWriter(newQuitWarp(resp), svr.LogFile)) log.SetOutput(io.MultiWriter(newQuitWarp(resp), svr.LogFile))
log.SetLevel(log.InfoLevel) log.SetLevel(log.InfoLevel)
if svr.connect != nil { if svr.connect != nil {
log.Info("quit: cleanup connection")
svr.connect.Cleanup() svr.connect.Cleanup()
} }
if svr.Cancel != nil { if svr.Cancel != nil {
svr.Cancel() svr.Cancel()
} }
if svr.clone != nil { if svr.clone != nil {
log.Info("quit: cleanup clone")
err := svr.clone.Cleanup(nil) err := svr.clone.Cleanup(nil)
if err != nil { if err != nil {
log.Errorf("quit: cleanup clone failed: %v", err) log.Errorf("quit: cleanup clone failed: %v", err)

View File

@@ -10,9 +10,6 @@ import (
) )
func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error { func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error {
if svr.connect == nil {
return nil
}
defer func() { defer func() {
log.SetOutput(svr.LogFile) log.SetOutput(svr.LogFile)
log.SetLevel(log.DebugLevel) log.SetLevel(log.DebugLevel)
@@ -21,6 +18,11 @@ func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error
log.SetOutput(out) log.SetOutput(out)
log.SetLevel(log.InfoLevel) log.SetLevel(log.InfoLevel)
if svr.connect == nil {
log.Info("stop: no connection")
return nil
}
svr.connect.Cleanup() svr.connect.Cleanup()
svr.t = time.Time{} svr.t = time.Time{}
svr.connect = nil svr.connect = nil

View File

@@ -35,41 +35,39 @@ func GetClient(isSudo bool) rpc.DaemonClient {
return daemonClient return daemonClient
} }
sudo := "" name := "daemon"
if isSudo { if isSudo {
sudo = "sudo" name = "sudo daemon"
} }
ctx := context.Background() ctx := context.Background()
conn, err := grpc.DialContext(ctx, "unix:"+GetSockPath(isSudo), grpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := grpc.DialContext(ctx, "unix:"+GetSockPath(isSudo), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil { if err != nil {
log.Errorf("cannot connect to %s server: %v", sudo, err) log.Errorf("cannot connect to %s: %v", name, err)
fmt.Println(fmt.Errorf("cannot connect to %s server: %v", sudo, err))
return nil return nil
} }
c := rpc.NewDaemonClient(conn) cli := rpc.NewDaemonClient(conn)
now := time.Now()
healthClient := grpc_health_v1.NewHealthClient(conn) healthClient := grpc_health_v1.NewHealthClient(conn)
var response *grpc_health_v1.HealthCheckResponse var response *grpc_health_v1.HealthCheckResponse
response, err = healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) response, err = healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
if err != nil { if err != nil {
log.Printf("%v", err) log.Errorf("%v", err)
return nil return nil
} }
fmt.Println(response.Status, sudo, time.Now().Sub(now).String()) if response.Status != grpc_health_v1.HealthCheckResponse_SERVING {
now = time.Now() log.Error(fmt.Sprintf("%s is not health", name), "status", response.Status)
_, err = c.Status(ctx, &rpc.StatusRequest{}) return nil
fmt.Printf("call %s api status use %s\n", sudo, time.Now().Sub(now)) }
_, err = cli.Status(ctx, &rpc.StatusRequest{})
if err != nil { if err != nil {
fmt.Println(fmt.Errorf("cannot call %s api status: %v", sudo, err)) log.Error("cannot call api status", "err", err)
log.Error(err)
return nil return nil
} }
if isSudo { if isSudo {
sudoDaemonClient = c sudoDaemonClient = cli
} else { } else {
daemonClient = c daemonClient = cli
} }
return c return cli
} }
func GetSockPath(isSudo bool) string { func GetSockPath(isSudo bool) string {
@@ -119,7 +117,7 @@ func runDaemon(ctx context.Context, isSudo bool) error {
var p *os.Process var p *os.Process
if p, err = os.FindProcess(pid); err == nil { if p, err = os.FindProcess(pid); err == nil {
if err = p.Kill(); err != nil && err != os.ErrProcessDone { if err = p.Kill(); err != nil && err != os.ErrProcessDone {
log.Error(err) log.Error("kill process", "err", err)
} }
_, _ = p.Wait() _, _ = p.Wait()
} }

View File

@@ -32,7 +32,7 @@ type SvrOption struct {
func (o *SvrOption) Start(ctx context.Context) error { func (o *SvrOption) Start(ctx context.Context) error {
file, err := os.OpenFile(action.GetDaemonLogPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666) file, err := os.OpenFile(action.GetDaemonLogPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil { if err != nil {
log.Error(err) log.Errorf("open log file error: %v", err)
return err return err
} }
defer file.Close() defer file.Close()

View File

@@ -494,7 +494,7 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, transferImag
} else if err != nil { } else if err != nil {
return cancel, err return cancel, err
} }
log.Print(recv.Message) log.Infof(recv.Message)
} }
case ConnectModeContainer: case ConnectModeContainer:
@@ -553,7 +553,7 @@ func disconnect(ctx context.Context, daemonClient rpc.DaemonClient) func() {
return func() { return func() {
resp, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{}) resp, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{})
if err != nil { if err != nil {
log.Error(err) log.Errorf("disconnect error: %v", err)
return return
} }
for { for {
@@ -561,10 +561,10 @@ func disconnect(ctx context.Context, daemonClient rpc.DaemonClient) func() {
if err == io.EOF { if err == io.EOF {
return return
} else if err != nil { } else if err != nil {
log.Error(err) log.Errorf("disconnect error: %v", err)
return return
} }
log.Print(msg.Message) log.Info(msg.Message)
} }
} }
} }

View File

@@ -43,7 +43,7 @@ func usingResolver(clientConfig *miekgdns.ClientConfig, ns []string) {
var err error var err error
_ = os.RemoveAll(filepath.Join("/", "etc", "resolver")) _ = os.RemoveAll(filepath.Join("/", "etc", "resolver"))
if err = os.MkdirAll(filepath.Join("/", "etc", "resolver"), fs.ModePerm); err != nil { if err = os.MkdirAll(filepath.Join("/", "etc", "resolver"), fs.ModePerm); err != nil {
log.Error(err) log.Errorf("create resolver error: %v", err)
} }
config := miekgdns.ClientConfig{ config := miekgdns.ClientConfig{
Servers: clientConfig.Servers, Servers: clientConfig.Servers,
@@ -59,7 +59,7 @@ func usingResolver(clientConfig *miekgdns.ClientConfig, ns []string) {
var port int var port int
port, err = util.GetAvailableUDPPortOrDie() port, err = util.GetAvailableUDPPortOrDie()
if err != nil { if err != nil {
log.Error(err) log.Errorf("get available port error: %v", err)
return return
} }
go func(port int, clientConfig *miekgdns.ClientConfig) { go func(port int, clientConfig *miekgdns.ClientConfig) {

View File

@@ -75,7 +75,7 @@ func (c *ConnectOptions) Cleanup() {
// leave proxy resources // leave proxy resources
err := c.LeaveProxyResources(context.Background()) err := c.LeaveProxyResources(context.Background())
if err != nil { if err != nil {
log.Error(err) log.Errorf("leave proxy resources error: %v", err)
} }
if c.cancel != nil { if c.cancel != nil {
c.cancel() c.cancel()

View File

@@ -139,6 +139,7 @@ func (d *CloneOptions) DoClone(ctx context.Context) error {
} }
for _, workload := range d.Workloads { for _, workload := range d.Workloads {
log.Infof("clone workload %s", workload)
var object *runtimeresource.Info var object *runtimeresource.Info
object, err = util.GetUnstructuredObject(d.factory, d.Namespace, workload) object, err = util.GetUnstructuredObject(d.factory, d.Namespace, workload)
if err != nil { if err != nil {
@@ -723,8 +724,10 @@ func (d *CloneOptions) Cleanup(workloads []string) error {
workloads = d.Workloads workloads = d.Workloads
} }
for _, workload := range workloads { for _, workload := range workloads {
log.Infof("start to clean up clone workload: %s", workload)
object, err := util.GetUnstructuredObject(d.factory, d.Namespace, workload) object, err := util.GetUnstructuredObject(d.factory, d.Namespace, workload)
if err != nil { if err != nil {
log.Errorf("get unstructured object error: %s", err.Error())
return err return err
} }
labelsMap := map[string]string{ labelsMap := map[string]string{
@@ -740,15 +743,18 @@ func (d *CloneOptions) Cleanup(workloads []string) error {
var client dynamic.Interface var client dynamic.Interface
client, err = d.targetFactory.DynamicClient() client, err = d.targetFactory.DynamicClient()
if err != nil { if err != nil {
log.Errorf("get dynamic client error: %s", err.Error())
return err return err
} }
for _, cloneName := range controller.UnsortedList() { for _, cloneName := range controller.UnsortedList() {
err = client.Resource(object.Mapping.Resource).Namespace(d.TargetNamespace).Delete(context.Background(), cloneName, metav1.DeleteOptions{}) err = client.Resource(object.Mapping.Resource).Namespace(d.TargetNamespace).Delete(context.Background(), cloneName, metav1.DeleteOptions{})
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
log.Errorf("delete clone object error: %s", err.Error())
return err return err
} }
log.Infof("delete clone object: %s", cloneName) log.Infof("delete clone object: %s", cloneName)
} }
log.Infof("clean up clone workload: %s successfully", workload)
} }
return nil return nil
} }

View File

@@ -145,6 +145,7 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context) (err error)
} }
for _, workload := range c.Workloads { for _, workload := range c.Workloads {
log.Infof("start to create remote inbound pod for %s", workload)
configInfo := util.PodRouteConfig{ configInfo := util.PodRouteConfig{
LocalTunIPv4: c.localTunIPv4.IP.String(), LocalTunIPv4: c.localTunIPv4.IP.String(),
LocalTunIPv6: c.localTunIPv6.IP.String(), LocalTunIPv6: c.localTunIPv6.IP.String(),
@@ -156,8 +157,10 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context) (err error)
err = InjectVPNSidecar(ctx, c.factory, c.Namespace, workload, configInfo) err = InjectVPNSidecar(ctx, c.factory, c.Namespace, workload, configInfo)
} }
if err != nil { if err != nil {
log.Errorf("create remote inbound pod for %s failed: %s", workload, err.Error())
return err return err
} }
log.Infof("create remote inbound pod for %s successfully", workload)
} }
return return
} }
@@ -190,13 +193,17 @@ func (c *ConnectOptions) DoConnect(ctx context.Context) (err error) {
c.ctx, c.cancel = context.WithCancel(ctx) c.ctx, c.cancel = context.WithCancel(ctx)
_ = os.Setenv(config.EnvKubeVPNTransportEngine, string(c.Engine)) _ = os.Setenv(config.EnvKubeVPNTransportEngine, string(c.Engine))
log.Info("start to connect")
if err = c.InitDHCP(c.ctx); err != nil { if err = c.InitDHCP(c.ctx); err != nil {
log.Errorf("init dhcp failed: %s", err.Error())
return return
} }
c.addCleanUpResourceHandler() c.addCleanUpResourceHandler()
if err = c.getCIDR(c.ctx); err != nil { if err = c.getCIDR(c.ctx); err != nil {
log.Errorf("get cidr failed: %s", err.Error())
return return
} }
log.Info("get cidr successfully")
if err = createOutboundPod(c.ctx, c.factory, c.clientset, c.Namespace); err != nil { if err = createOutboundPod(c.ctx, c.factory, c.clientset, c.Namespace); err != nil {
return return
} }
@@ -233,16 +240,20 @@ func (c *ConnectOptions) DoConnect(ctx context.Context) (err error) {
core.GvisorTCPForwardAddr = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorTCPForwardPort) core.GvisorTCPForwardAddr = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorTCPForwardPort)
core.GvisorUDPForwardAddr = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorUDPForwardPort) core.GvisorUDPForwardAddr = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorUDPForwardPort)
if err = c.startLocalTunServe(c.ctx, forward); err != nil { if err = c.startLocalTunServe(c.ctx, forward); err != nil {
log.Errorf("start local tun service failed: %s", err.Error())
return return
} }
if err = c.addRouteDynamic(c.ctx); err != nil { if err = c.addRouteDynamic(c.ctx); err != nil {
log.Errorf("add route dynamic failed: %s", err.Error())
return return
} }
c.deleteFirewallRule(c.ctx) c.deleteFirewallRule(c.ctx)
if err = c.addExtraRoute(c.ctx); err != nil { if err = c.addExtraRoute(c.ctx); err != nil {
log.Errorf("add extra route failed: %s", err.Error())
return return
} }
if err = c.setupDNS(c.ctx); err != nil { if err = c.setupDNS(c.ctx); err != nil {
log.Errorf("set up dns failed: %s", err.Error())
return return
} }
go c.heartbeats(c.ctx) go c.heartbeats(c.ctx)
@@ -600,7 +611,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error {
const port = 53 const port = 53
pod, err := c.GetRunningPodList(ctx) pod, err := c.GetRunningPodList(ctx)
if err != nil { if err != nil {
log.Error(err) log.Errorf("get running pod list failed, err: %v", err)
return err return err
} }
relovConf, err := dns.GetDNSServiceIPFromPod(c.clientset, c.restclient, c.config, pod[0].GetName(), c.Namespace) relovConf, err := dns.GetDNSServiceIPFromPod(c.clientset, c.restclient, c.config, pod[0].GetName(), c.Namespace)

View File

@@ -109,6 +109,9 @@ func (d *DHCPManager) RentIPRandom(ctx context.Context) (*net.IPNet, *net.IPNet,
} }
func (d *DHCPManager) ReleaseIP(ctx context.Context, ips ...net.IP) error { func (d *DHCPManager) ReleaseIP(ctx context.Context, ips ...net.IP) error {
if len(ips) == 0 {
return nil
}
return d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error { return d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error {
for _, ip := range ips { for _, ip := range ips {
var use *ipallocator.Range var use *ipallocator.Range

View File

@@ -56,7 +56,7 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli
err = addEnvoyConfig(clientset, nodeID, c, headers, port) err = addEnvoyConfig(clientset, nodeID, c, headers, port)
if err != nil { if err != nil {
log.Warnln(err) log.Errorf("add envoy config error: %v", err)
return err return err
} }
@@ -73,6 +73,7 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli
// log.Error(err) // log.Error(err)
// } // }
//}) //})
log.Infof("workload %s/%s has already been injected with sidecar", namespace, workload)
return nil return nil
} }
// (1) add mesh container // (1) add mesh container
@@ -80,6 +81,7 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli
var b []byte var b []byte
b, err = k8sjson.Marshal(restorePatch) b, err = k8sjson.Marshal(restorePatch)
if err != nil { if err != nil {
log.Error("marshal patch error: %v", err)
return err return err
} }
@@ -104,18 +106,10 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli
} }
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
if err != nil { if err != nil {
log.Warnf("error while path resource: %s %s, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err) log.Errorf("error while path resource: %s %s, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
return err
}
//RollbackFuncList = append(RollbackFuncList, func() {
// if err := UnPatchContainer(factory, clientset, namespace, workload, c.LocalTunIPv4); err != nil {
// log.Error(err)
// }
//})
if err != nil {
return err return err
} }
log.Infof("patch workload %s/%s with sidecar", namespace, workload)
err = util.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60) err = util.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60)
return err return err
} }
@@ -123,12 +117,14 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli
func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, namespace, workload string, localTunIPv4 string) error { func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, namespace, workload string, localTunIPv4 string) error {
object, err := util.GetUnstructuredObject(factory, namespace, workload) object, err := util.GetUnstructuredObject(factory, namespace, workload)
if err != nil { if err != nil {
log.Errorf("get unstructured object error: %v", err)
return err return err
} }
u := object.Object.(*unstructured.Unstructured) u := object.Object.(*unstructured.Unstructured)
templateSpec, depth, err := util.GetPodTemplateSpecPath(u) templateSpec, depth, err := util.GetPodTemplateSpecPath(u)
if err != nil { if err != nil {
log.Errorf("get template spec path error: %v", err)
return err return err
} }
@@ -137,7 +133,7 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
var empty bool var empty bool
empty, err = removeEnvoyConfig(mapInterface, nodeID, localTunIPv4) empty, err = removeEnvoyConfig(mapInterface, nodeID, localTunIPv4)
if err != nil { if err != nil {
log.Warnln(err) log.Errorf("remove envoy config error: %v", err)
return err return err
} }
@@ -156,6 +152,7 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
helper := pkgresource.NewHelper(object.Client, object.Mapping) helper := pkgresource.NewHelper(object.Client, object.Mapping)
// pod without controller // pod without controller
if len(depth) == 0 { if len(depth) == 0 {
log.Infof("workload %s/%s is not controlled by any controller", namespace, workload)
delete(templateSpec.ObjectMeta.GetAnnotations(), config.KubeVPNRestorePatchKey) delete(templateSpec.ObjectMeta.GetAnnotations(), config.KubeVPNRestorePatchKey)
pod := &v1.Pod{ObjectMeta: templateSpec.ObjectMeta, Spec: templateSpec.Spec} pod := &v1.Pod{ObjectMeta: templateSpec.ObjectMeta, Spec: templateSpec.Spec}
CleanupUselessInfo(pod) CleanupUselessInfo(pod)
@@ -163,6 +160,7 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
return err return err
} }
log.Infof("workload %s/%s is controlled by a controller", namespace, workload)
// resource with controller, like deployment,statefulset // resource with controller, like deployment,statefulset
var bytes []byte var bytes []byte
bytes, err = json.Marshal([]P{ bytes, err = json.Marshal([]P{

View File

@@ -208,7 +208,7 @@ func fullDomain(t *testing.T) {
} }
func dialUDP(t *testing.T) { func dialUDP(t *testing.T) {
port := util.GetAvailableUDPPortOrDie() port, _ := util.GetAvailableUDPPortOrDie()
go server(port) go server(port)
list, err := clientset.CoreV1().Pods(namespace).List(context.Background(), v1.ListOptions{ list, err := clientset.CoreV1().Pods(namespace).List(context.Background(), v1.ListOptions{

View File

@@ -73,8 +73,10 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
log.Infoln("traffic manager not exist, try to create it...") log.Infoln("traffic manager not exist, try to create it...")
// 1) label namespace // 1) label namespace
log.Infof("label namespace %s", namespace)
ns, err := clientset.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) ns, err := clientset.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
if err != nil { if err != nil {
log.Errorf("get namespace error: %s", err.Error())
return err return err
} }
if ns.Labels == nil { if ns.Labels == nil {
@@ -83,10 +85,12 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
ns.Labels["ns"] = namespace ns.Labels["ns"] = namespace
_, err = clientset.CoreV1().Namespaces().Update(ctx, ns, metav1.UpdateOptions{}) _, err = clientset.CoreV1().Namespaces().Update(ctx, ns, metav1.UpdateOptions{})
if err != nil { if err != nil {
log.Infof("label namespace error: %s", err.Error())
return err return err
} }
// 2) create serviceAccount // 2) create serviceAccount
log.Infof("create serviceAccount %s", config.ConfigMapPodTrafficManager)
_, err = clientset.CoreV1().ServiceAccounts(namespace).Create(ctx, &v1.ServiceAccount{ _, err = clientset.CoreV1().ServiceAccounts(namespace).Create(ctx, &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager, Name: config.ConfigMapPodTrafficManager,
@@ -95,10 +99,12 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
AutomountServiceAccountToken: pointer.Bool(true), AutomountServiceAccountToken: pointer.Bool(true),
}, metav1.CreateOptions{}) }, metav1.CreateOptions{})
if err != nil { if err != nil {
log.Infof("create serviceAccount error: %s", err.Error())
return err return err
} }
// 3) create roles // 3) create roles
log.Infof("create roles %s", config.ConfigMapPodTrafficManager)
_, err = clientset.RbacV1().Roles(namespace).Create(ctx, &rbacv1.Role{ _, err = clientset.RbacV1().Roles(namespace).Create(ctx, &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager, Name: config.ConfigMapPodTrafficManager,
@@ -112,10 +118,12 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
}}, }},
}, metav1.CreateOptions{}) }, metav1.CreateOptions{})
if err != nil { if err != nil {
log.Errorf("create roles error: %s", err.Error())
return err return err
} }
// 4) create roleBinding // 4) create roleBinding
log.Infof("create roleBinding %s", config.ConfigMapPodTrafficManager)
_, err = clientset.RbacV1().RoleBindings(namespace).Create(ctx, &rbacv1.RoleBinding{ _, err = clientset.RbacV1().RoleBindings(namespace).Create(ctx, &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager, Name: config.ConfigMapPodTrafficManager,
@@ -134,9 +142,12 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
}, },
}, metav1.CreateOptions{}) }, metav1.CreateOptions{})
if err != nil { if err != nil {
log.Errorf("create roleBinding error: %s", err.Error())
return err return err
} }
// 5) create service
log.Infof("create service %s", config.ConfigMapPodTrafficManager)
udp8422 := "8422-for-udp" udp8422 := "8422-for-udp"
tcp10800 := "10800-for-tcp" tcp10800 := "10800-for-tcp"
tcp9002 := "9002-for-envoy" tcp9002 := "9002-for-envoy"
@@ -173,6 +184,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
}, },
}, metav1.CreateOptions{}) }, metav1.CreateOptions{})
if err != nil { if err != nil {
log.Errorf("create service error: %s", err.Error())
return err return err
} }
@@ -201,6 +213,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
var crt, key []byte var crt, key []byte
crt, key, err = cert.GenerateSelfSignedCertKey(domain, nil, nil) crt, key, err = cert.GenerateSelfSignedCertKey(domain, nil, nil)
if err != nil { if err != nil {
log.Errorf("generate self signed cert and key error: %s", err.Error())
return err return err
} }
@@ -221,9 +234,12 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
_, err = clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) _, err = clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) { if err != nil && !k8serrors.IsAlreadyExists(err) {
log.Errorf("create secret error: %s", err.Error())
return err return err
} }
// 6) create deployment
log.Infof("create deployment %s", config.ConfigMapPodTrafficManager)
deployment := &appsv1.Deployment{ deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager, Name: config.ConfigMapPodTrafficManager,
@@ -376,10 +392,12 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
LabelSelector: fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String(), LabelSelector: fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String(),
}) })
if err != nil { if err != nil {
log.Errorf("Failed to create watch for %s: %v", config.ConfigMapPodTrafficManager, err)
return err return err
} }
defer watchStream.Stop() defer watchStream.Stop()
if _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil { if _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil {
log.Errorf("Failed to create deployment for %s: %v", config.ConfigMapPodTrafficManager, err)
return err return err
} }
var ok bool var ok bool
@@ -390,6 +408,7 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
LabelSelector: fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String(), LabelSelector: fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String(),
}) })
if err != nil { if err != nil {
log.Errorf("Failed to list pods for %s: %v", config.ConfigMapPodTrafficManager, err)
return return
} }
@@ -423,9 +442,12 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
} }
}, time.Second*3) }, time.Second*3)
if !ok { if !ok {
log.Errorf("wait pod %s to be ready timeout", config.ConfigMapPodTrafficManager)
return errors.New(fmt.Sprintf("wait pod %s to be ready timeout", config.ConfigMapPodTrafficManager)) return errors.New(fmt.Sprintf("wait pod %s to be ready timeout", config.ConfigMapPodTrafficManager))
} }
// 7) create mutatingWebhookConfigurations
log.Infof("Creating mutatingWebhook_configuration for %s", config.ConfigMapPodTrafficManager)
_, err = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, &admissionv1.MutatingWebhookConfiguration{ _, err = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, &admissionv1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager + "." + namespace, Name: config.ConfigMapPodTrafficManager + "." + namespace,
@@ -465,13 +487,14 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
} }
_, err = updateRefCount(ctx, clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) _, err = updateRefCount(ctx, clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1)
if err != nil { if err != nil {
log.Errorf("Failed to update ref count for %s: %v", config.ConfigMapPodTrafficManager, err)
return return
} }
return return
} }
func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace, workloads string, c util.PodRouteConfig) error { func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace, workload string, c util.PodRouteConfig) error {
object, err := util.GetUnstructuredObject(factory, namespace, workloads) object, err := util.GetUnstructuredObject(factory, namespace, workload)
if err != nil { if err != nil {
return err return err
} }
@@ -491,6 +514,7 @@ func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace,
// pods without controller // pods without controller
if len(path) == 0 { if len(path) == 0 {
log.Infof("workload %s/%s is not controlled by any controller", namespace, workload)
podTempSpec.Spec.PriorityClassName = "" podTempSpec.Spec.PriorityClassName = ""
for _, container := range podTempSpec.Spec.Containers { for _, container := range podTempSpec.Spec.Containers {
container.LivenessProbe = nil container.LivenessProbe = nil
@@ -513,6 +537,7 @@ func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace,
} else } else
// controllers // controllers
{ {
log.Infof("workload %s/%s is controlled by a controller", namespace, workload)
// remove probe // remove probe
removePatch, restorePatch := patch(origin, path) removePatch, restorePatch := patch(origin, path)
b, _ := json.Marshal(restorePatch) b, _ := json.Marshal(restorePatch)
@@ -536,7 +561,7 @@ func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace,
} }
//RollbackFuncList = append(RollbackFuncList, func() { //RollbackFuncList = append(RollbackFuncList, func() {
// if err = removeInboundContainer(factory, namespace, workloads); err != nil { // if err = removeInboundContainer(factory, namespace, workload); err != nil {
// log.Error(err) // log.Error(err)
// } // }
// //b, _ := json.Marshal(restorePatch) // //b, _ := json.Marshal(restorePatch)
@@ -550,7 +575,7 @@ func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace,
return err return err
} }
// todo not work? // todo not work?
err = util.RolloutStatus(ctx1, factory, namespace, workloads, time.Minute*60) err = util.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60)
return err return err
} }
@@ -558,7 +583,7 @@ func CreateAfterDeletePod(factory cmdutil.Factory, p *v1.Pod, helper *pkgresourc
if _, err := helper.DeleteWithOptions(p.Namespace, p.Name, &metav1.DeleteOptions{ if _, err := helper.DeleteWithOptions(p.Namespace, p.Name, &metav1.DeleteOptions{
GracePeriodSeconds: pointer.Int64(0), GracePeriodSeconds: pointer.Int64(0),
}); err != nil { }); err != nil {
log.Error(err) log.Errorf("error while delete resource: %s %s, ignore, err: %v", p.Namespace, p.Name, err)
} }
if err := retry.OnError(wait.Backoff{ if err := retry.OnError(wait.Backoff{
Steps: 10, Steps: 10,
@@ -663,7 +688,7 @@ func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) {
} }
marshal, err := k8sjson.Marshal(p) marshal, err := k8sjson.Marshal(p)
if err != nil { if err != nil {
log.Error(err) log.Errorf("error while json marshal: %v", err)
return "" return ""
} }
return string(marshal) return string(marshal)
@@ -720,7 +745,7 @@ func fromPatchToProbe(spec *v1.PodTemplateSpec, path []string, patch []P) {
if !ok { if !ok {
marshal, err := k8sjson.Marshal(value) marshal, err := k8sjson.Marshal(value)
if err != nil { if err != nil {
log.Error(err) log.Errorf("error while json marshal: %v", err)
return nil return nil
} }
str = string(marshal) str = string(marshal)
@@ -728,7 +753,7 @@ func fromPatchToProbe(spec *v1.PodTemplateSpec, path []string, patch []P) {
var probe v1.Probe var probe v1.Probe
err := k8sjson.Unmarshal([]byte(str), &probe) err := k8sjson.Unmarshal([]byte(str), &probe)
if err != nil { if err != nil {
log.Error(err) log.Errorf("error while json unmarsh: %v", err)
return nil return nil
} }
return &probe return &probe

View File

@@ -2,13 +2,12 @@ package handler
import ( import (
"context" "context"
"fmt"
corev1 "k8s.io/api/core/v1"
"strings" "strings"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/client" "github.com/docker/docker/client"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
@@ -23,7 +22,7 @@ import (
func (c *ConnectOptions) Reset(ctx context.Context) error { func (c *ConnectOptions) Reset(ctx context.Context) error {
err := c.LeaveProxyResources(ctx) err := c.LeaveProxyResources(ctx)
if err != nil { if err != nil {
log.Error(err) log.Errorf("leave proxy resources error: %v", err)
} }
cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false) cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false)
@@ -55,13 +54,13 @@ func (c *ConnectOptions) LeaveProxyResources(ctx context.Context) (err error) {
return return
} }
if cm == nil || cm.Data == nil || len(cm.Data[config.KeyEnvoy]) == 0 { if cm == nil || cm.Data == nil || len(cm.Data[config.KeyEnvoy]) == 0 {
err = fmt.Errorf("can not found proxy resources") log.Infof("no proxy resources found")
return return
} }
var v = make([]*controlplane.Virtual, 0) var v = make([]*controlplane.Virtual, 0)
str := cm.Data[config.KeyEnvoy] str := cm.Data[config.KeyEnvoy]
if err = yaml.Unmarshal([]byte(str), &v); err != nil { if err = yaml.Unmarshal([]byte(str), &v); err != nil {
log.Error(err) log.Errorf("unmarshal envoy config error: %v", err)
return return
} }
localTunIPv4 := c.GetLocalTunIPv4() localTunIPv4 := c.GetLocalTunIPv4()
@@ -69,11 +68,13 @@ func (c *ConnectOptions) LeaveProxyResources(ctx context.Context) (err error) {
// deployments.apps.ry-server --> deployments.apps/ry-server // deployments.apps.ry-server --> deployments.apps/ry-server
lastIndex := strings.LastIndex(virtual.Uid, ".") lastIndex := strings.LastIndex(virtual.Uid, ".")
uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:] uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:]
log.Infof("leave resource: %s", uid)
err = UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, localTunIPv4) err = UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, localTunIPv4)
if err != nil { if err != nil {
log.Error(err) log.Errorf("unpatch container error: %v", err)
continue continue
} }
log.Infof("leave resource: %s success", uid)
} }
return err return err
} }

View File

@@ -29,7 +29,7 @@ func RentIPIfNeeded(route *core.Route) error {
var ip []byte var ip []byte
ip, err = util.DoReq(req) ip, err = util.DoReq(req)
if err != nil { if err != nil {
log.Error(err) log.Errorf("can not get ip, err: %v", err)
return err return err
} }
log.Infof("rent an ip %s", strings.TrimSpace(string(ip))) log.Infof("rent an ip %s", strings.TrimSpace(string(ip)))
@@ -38,11 +38,11 @@ func RentIPIfNeeded(route *core.Route) error {
return fmt.Errorf("can not get ip from %s", string(ip)) return fmt.Errorf("can not get ip from %s", string(ip))
} }
if err = os.Setenv(config.EnvInboundPodTunIPv4, ips[0]); err != nil { if err = os.Setenv(config.EnvInboundPodTunIPv4, ips[0]); err != nil {
log.Error(err) log.Errorf("can not set ip, err: %v", err)
return err return err
} }
if err = os.Setenv(config.EnvInboundPodTunIPv6, ips[1]); err != nil { if err = os.Setenv(config.EnvInboundPodTunIPv6, ips[1]); err != nil {
log.Error(err) log.Errorf("can not set ip, err: %v", err)
return err return err
} }
for i := 0; i < len(route.ServeNodes); i++ { for i := 0; i < len(route.ServeNodes); i++ {

View File

@@ -153,7 +153,7 @@ kubevpn serve -L "tun:/localhost:8422?net=${TunIPv4}&route=${CIDR4}" -F "tcp://$
func init() { func init() {
json, err := yaml.ToJSON(envoyConfig) json, err := yaml.ToJSON(envoyConfig)
if err != nil { if err != nil {
log.Error(err) log.Errorf("Error converting json to bytes: %v", err)
return return
} }
envoyConfig = json envoyConfig = json

View File

@@ -112,18 +112,18 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
URL() URL()
transport, upgrader, err := spdy.RoundTripperFor(config) transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil { if err != nil {
log.Error(err) log.Errorf("create spdy roundtripper error: %s", err.Error())
return err return err
} }
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
forwarder, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, portPair, stopChan, readyChan, nil, os.Stderr) forwarder, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, portPair, stopChan, readyChan, nil, os.Stderr)
if err != nil { if err != nil {
log.Error(err) log.Errorf("create port forward error: %s", err.Error())
return err return err
} }
if err = forwarder.ForwardPorts(); err != nil { if err = forwarder.ForwardPorts(); err != nil {
log.Error(err) log.Errorf("forward port error: %s", err.Error())
return err return err
} }
return nil return nil
@@ -354,7 +354,15 @@ func Ping(targetIP string) (bool, error) {
return stat.PacketsRecv == stat.PacketsSent, err return stat.PacketsRecv == stat.PacketsSent, err
} }
func RolloutStatus(ctx1 context.Context, factory cmdutil.Factory, namespace, workloads string, timeout time.Duration) error { func RolloutStatus(ctx1 context.Context, factory cmdutil.Factory, namespace, workloads string, timeout time.Duration) (err error) {
log.Infof("rollout status for %s", workloads)
defer func() {
if err != nil {
log.Errorf("rollout status for %s failed: %s", workloads, err.Error())
} else {
log.Infof("rollout status for %s success", workloads)
}
}()
client, _ := factory.DynamicClient() client, _ := factory.DynamicClient()
r := factory.NewBuilder(). r := factory.NewBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
@@ -363,7 +371,7 @@ func RolloutStatus(ctx1 context.Context, factory cmdutil.Factory, namespace, wor
SingleResourceType(). SingleResourceType().
Latest(). Latest().
Do() Do()
err := r.Err() err = r.Err()
if err != nil { if err != nil {
return err return err
} }
@@ -406,7 +414,7 @@ func RolloutStatus(ctx1 context.Context, factory cmdutil.Factory, namespace, wor
if err != nil { if err != nil {
return false, err return false, err
} }
log.Infof("%s", status) log.Info(strings.TrimSpace(status))
// Quit waiting if the rollout is done // Quit waiting if the rollout is done
if done { if done {
return true, nil return true, nil

View File

@@ -29,7 +29,7 @@ func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) {
dhcp := handler.NewDHCPManager(cmi, namespace) dhcp := handler.NewDHCPManager(cmi, namespace)
v4, v6, err := dhcp.RentIPRandom(ctx) v4, v6, err := dhcp.RentIPRandom(ctx)
if err != nil { if err != nil {
log.Error(err) log.Errorf("rent ip failed, err: %v", err)
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
@@ -37,7 +37,7 @@ func (d *dhcpServer) rentIP(w http.ResponseWriter, r *http.Request) {
// todo patch annotation // todo patch annotation
_, err = w.Write([]byte(fmt.Sprintf("%s,%s", v4.String(), v6.String()))) _, err = w.Write([]byte(fmt.Sprintf("%s,%s", v4.String(), v6.String())))
if err != nil { if err != nil {
log.Error(err) log.Errorf("write response failed, err: %v", err)
} }
} }
@@ -59,7 +59,7 @@ func (d *dhcpServer) releaseIP(w http.ResponseWriter, r *http.Request) {
cmi := d.clientset.CoreV1().ConfigMaps(namespace) cmi := d.clientset.CoreV1().ConfigMaps(namespace)
dhcp := handler.NewDHCPManager(cmi, namespace) dhcp := handler.NewDHCPManager(cmi, namespace)
if err := dhcp.ReleaseIP(context.Background(), ips...); err != nil { if err := dhcp.ReleaseIP(context.Background(), ips...); err != nil {
log.Error(err) log.Errorf("release ip failed, err: %v", err)
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }

View File

@@ -113,13 +113,13 @@ func serve(w http.ResponseWriter, r *http.Request, admit admitHandler) {
log.Infof("sending response: %v", responseObj) log.Infof("sending response: %v", responseObj)
respBytes, err := json.Marshal(responseObj) respBytes, err := json.Marshal(responseObj)
if err != nil { if err != nil {
log.Error(err) log.Errorf("Unable to encode response: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil { if _, err := w.Write(respBytes); err != nil {
log.Error(err) log.Errorf("Unable to write response: %v", err)
} }
} }