feat: cleanup works fine

This commit is contained in:
fengcaiwen
2023-09-10 19:46:20 +08:00
committed by naison
parent e30e59fc50
commit baedcb114c
18 changed files with 209 additions and 218 deletions

View File

@@ -82,6 +82,7 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command {
} }
log.Print(recv.GetMessage()) log.Print(recv.GetMessage())
} }
util.Print(os.Stdout, "Now you can access resources in the kubernetes cluster, enjoy it :)")
// hangup // hangup
if foreground { if foreground {
// disconnect from cluster network // disconnect from cluster network

View File

@@ -1,9 +1,12 @@
package cmds package cmds
import ( import (
"context"
"errors"
"fmt"
"io" "io"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/i18n" "k8s.io/kubectl/pkg/util/i18n"
@@ -35,7 +38,9 @@ func CmdLogs(f cmdutil.Factory) *cobra.Command {
if err == io.EOF { if err == io.EOF {
break break
} else if err == nil { } else if err == nil {
log.Print(resp.Message) fmt.Fprintln(os.Stdout, resp.Message)
} else if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil
} else { } else {
return err return err
} }

View File

@@ -42,6 +42,7 @@ func CmdReset(factory cmdutil.Factory) *cobra.Command {
return handler.SshJump(cmd.Context(), sshConf, cmd.Flags()) return handler.SshJump(cmd.Context(), sshConf, cmd.Flags())
}, },
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
util.InitLogger(false)
if err := connect.InitClient(factory); err != nil { if err := connect.InitClient(factory); err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@@ -2,7 +2,6 @@ package action
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
defaultlog "log" defaultlog "log"
@@ -11,6 +10,8 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@@ -88,7 +89,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
if !svr.t.IsZero() { if !svr.t.IsZero() {
log.Debugf("already connect to another cluster, you can disconnect this connect by command `kubevpn disconnect`") log.Debugf("already connect to another cluster, you can disconnect this connect by command `kubevpn disconnect`")
// todo define already connect error? // todo define already connect error?
return errors.New("already connected") return status.Error(codes.AlreadyExists, "")
} }
svr.t = time.Now() svr.t = time.Now()
svr.connect = &handler.ConnectOptions{ svr.connect = &handler.ConnectOptions{
@@ -118,14 +119,14 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
return err return err
} }
} }
tempFile, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil { if err != nil {
return err return err
} }
flags := pflag.NewFlagSet("", pflag.ContinueOnError) flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{ flags.AddFlag(&pflag.Flag{
Name: "kubeconfig", Name: "kubeconfig",
DefValue: tempFile, DefValue: file,
}) })
sshCtx, sshCancel := context.WithCancel(context.Background()) sshCtx, sshCancel := context.WithCancel(context.Background())
@@ -154,7 +155,6 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
svr.connect.Cleanup() svr.connect.Cleanup()
return err return err
} }
util.Print(out, "Now you can access resources in the kubernetes cluster, enjoy it :)")
return nil return nil
} }
@@ -180,14 +180,14 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
ConfigAlias: req.ConfigAlias, ConfigAlias: req.ConfigAlias,
RemoteKubeconfig: req.RemoteKubeconfig, RemoteKubeconfig: req.RemoteKubeconfig,
} }
tempFile, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil { if err != nil {
return err return err
} }
flags := pflag.NewFlagSet("", pflag.ContinueOnError) flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{ flags.AddFlag(&pflag.Flag{
Name: "kubeconfig", Name: "kubeconfig",
DefValue: tempFile, DefValue: file,
}) })
sshCtx, sshCancel := context.WithCancel(context.Background()) sshCtx, sshCancel := context.WithCancel(context.Background())
handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel) handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel)

View File

@@ -10,21 +10,6 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
) )
type disconnectWarp struct {
server rpc.Daemon_DisconnectServer
}
func (r *disconnectWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.DisconnectResponse{
Message: string(p),
})
return len(p), err
}
func newDisconnectWarp(server rpc.Daemon_DisconnectServer) io.Writer {
return &disconnectWarp{server: server}
}
func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_DisconnectServer) error { func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_DisconnectServer) error {
if !svr.IsSudo { if !svr.IsSudo {
cli := svr.GetClient(true) cli := svr.GetClient(true)
@@ -35,8 +20,9 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon
if err != nil { if err != nil {
return err return err
} }
var recv *rpc.DisconnectResponse
for { for {
recv, err := connResp.Recv() recv, err = connResp.Recv()
if err == io.EOF { if err == io.EOF {
svr.t = time.Time{} svr.t = time.Time{}
svr.connect = nil svr.connect = nil
@@ -66,3 +52,18 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon
svr.connect = nil svr.connect = nil
return nil return nil
} }
type disconnectWarp struct {
server rpc.Daemon_DisconnectServer
}
func (r *disconnectWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.DisconnectResponse{
Message: string(p),
})
return len(p), err
}
func newDisconnectWarp(server rpc.Daemon_DisconnectServer) io.Writer {
return &disconnectWarp{server: server}
}

View File

@@ -10,21 +10,6 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/handler" "github.com/wencaiwulue/kubevpn/pkg/handler"
) )
type leaveWarp struct {
server rpc.Daemon_LeaveServer
}
func (r *leaveWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.LeaveResponse{
Message: string(p),
})
return len(p), err
}
func newLeaveWarp(server rpc.Daemon_LeaveServer) io.Writer {
return &leaveWarp{server: server}
}
func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) error { func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) error {
out := newLeaveWarp(resp) out := newLeaveWarp(resp)
origin := log.StandardLogger().Out origin := log.StandardLogger().Out
@@ -49,3 +34,18 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err
} }
return nil return nil
} }
type leaveWarp struct {
server rpc.Daemon_LeaveServer
}
func (r *leaveWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.LeaveResponse{
Message: string(p),
})
return len(p), err
}
func newLeaveWarp(server rpc.Daemon_LeaveServer) io.Writer {
return &leaveWarp{server: server}
}

View File

@@ -16,7 +16,7 @@ import (
func (svr *Server) List(ctx context.Context, req *rpc.ListRequest) (*rpc.ListResponse, error) { func (svr *Server) List(ctx context.Context, req *rpc.ListRequest) (*rpc.ListResponse, error) {
if svr.connect == nil { if svr.connect == nil {
return nil, fmt.Errorf("no proxy workloads found") return nil, fmt.Errorf("not connect to any cluster")
} }
mapInterface := svr.connect.GetClientset().CoreV1().ConfigMaps(svr.connect.Namespace) mapInterface := svr.connect.GetClientset().CoreV1().ConfigMaps(svr.connect.Namespace)
configMap, err := mapInterface.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) configMap, err := mapInterface.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})

View File

@@ -2,12 +2,17 @@ package action
import ( import (
"github.com/hpcloud/tail" "github.com/hpcloud/tail"
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
) )
func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error { func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error {
path := GetDaemonLogPath() path := GetDaemonLogPath()
config := tail.Config{Follow: true, ReOpen: true, MustExist: true} config := tail.Config{Follow: req.Follow, ReOpen: true, MustExist: true}
if !req.Follow {
// FATAL -- cannot set ReOpen without Follow.
config.ReOpen = false
}
file, err := tail.TailFile(path, config) file, err := tail.TailFile(path, config)
if err != nil { if err != nil {
return err return err
@@ -16,7 +21,10 @@ func (svr *Server) Logs(req *rpc.LogRequest, resp rpc.Daemon_LogsServer) error {
select { select {
case <-resp.Context().Done(): case <-resp.Context().Done():
return nil return nil
case line := <-file.Lines: case line, ok := <-file.Lines:
if !ok {
return nil
}
if line.Err != nil { if line.Err != nil {
return err return err
} }

View File

@@ -1,10 +1,8 @@
package action package action
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@@ -15,32 +13,17 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/util" "github.com/wencaiwulue/kubevpn/pkg/util"
) )
type proxyWarp struct { // Proxy
server rpc.Daemon_ProxyServer
}
func (r *proxyWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.ConnectResponse{
Message: string(p),
})
return len(p), err
}
func newProxyWarp(server rpc.Daemon_ProxyServer) io.Writer {
return &proxyWarp{server: server}
}
// 1. if not connect to cluster // 1. if not connect to cluster
// 1.1 connect to cluster // 1.1 connect to cluster
// 1.2 proxy workloads // 1.2 proxy workloads
// 2. if already connect to cluster // 2. if already connect to cluster
// 2.1 disconnect from cluster // 2.1 disconnect from cluster
// 2.2 same as step 1 // 2.2 same as step 1
func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) error { func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) error {
out := newProxyWarp(resp)
origin := log.StandardLogger().Out origin := log.StandardLogger().Out
log.SetOutput(io.MultiWriter(out, origin)) out := io.MultiWriter(newProxyWarp(resp), origin)
log.SetOutput(out)
defer func() { defer func() {
log.SetOutput(origin) log.SetOutput(origin)
log.SetLevel(log.DebugLevel) log.SetLevel(log.DebugLevel)
@@ -65,7 +48,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
RemoteKubeconfig: req.RemoteKubeconfig, RemoteKubeconfig: req.RemoteKubeconfig,
} }
file, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes)) file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil { if err != nil {
return err return err
} }
@@ -92,7 +75,8 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
return fmt.Errorf("daemon is not avaliable") return fmt.Errorf("daemon is not avaliable")
} }
if svr.connect != nil { if svr.connect != nil {
isSameCluster, err := util.IsSameCluster( var isSameCluster bool
isSameCluster, err = util.IsSameCluster(
svr.connect.GetClientset().CoreV1().ConfigMaps(svr.connect.Namespace), svr.connect.Namespace, svr.connect.GetClientset().CoreV1().ConfigMaps(svr.connect.Namespace), svr.connect.Namespace,
connect.GetClientset().CoreV1().ConfigMaps(connect.Namespace), connect.Namespace, connect.GetClientset().CoreV1().ConfigMaps(connect.Namespace), connect.Namespace,
) )
@@ -101,12 +85,14 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
log.Debugf("already connect to cluster") log.Debugf("already connect to cluster")
} else { } else {
log.Debugf("try to disconnect from another cluster") log.Debugf("try to disconnect from another cluster")
disconnect, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{}) var disconnect rpc.Daemon_DisconnectClient
disconnect, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{})
if err != nil { if err != nil {
return err return err
} }
var recv *rpc.DisconnectResponse
for { for {
recv, err := disconnect.Recv() recv, err = disconnect.Recv()
if err == io.EOF { if err == io.EOF {
break break
} else if err != nil { } else if err != nil {
@@ -122,12 +108,14 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
if svr.connect == nil { if svr.connect == nil {
log.Debugf("connectting to cluster") log.Debugf("connectting to cluster")
connResp, err := daemonClient.Connect(ctx, req) var connResp rpc.Daemon_ConnectClient
connResp, err = daemonClient.Connect(ctx, req)
if err != nil { if err != nil {
return err return err
} }
var recv *rpc.ConnectResponse
for { for {
recv, err := connResp.Recv() recv, err = connResp.Recv()
if err == io.EOF { if err == io.EOF {
break break
} else if err != nil { } else if err != nil {
@@ -142,70 +130,25 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
log.Debugf("proxy resource...") log.Debugf("proxy resource...")
err = svr.connect.CreateRemoteInboundPod(ctx) err = svr.connect.CreateRemoteInboundPod(ctx)
if err != nil {
return err
}
log.Debugf("proxy resource done") log.Debugf("proxy resource done")
return err util.Print(out, "Now you can access resources in the kubernetes cluster, enjoy it :)")
}
func (svr *Server) redirectToSudoDaemon1(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServer) error {
cli := svr.GetClient(true)
if cli == nil {
return fmt.Errorf("sudo daemon not start")
}
connResp, err := cli.Connect(resp.Context(), req)
if err != nil {
return err
}
for {
recv, err := connResp.Recv()
if err == io.EOF {
break
} else if err != nil {
return err
}
err = resp.Send(recv)
if err != nil {
return err
}
}
svr.t = time.Now()
svr.connect = &handler.ConnectOptions{
Namespace: req.Namespace,
Headers: req.Headers,
Workloads: req.Workloads,
ExtraCIDR: req.ExtraCIDR,
ExtraDomain: req.ExtraDomain,
UseLocalDNS: req.UseLocalDNS,
Engine: config.Engine(req.Engine),
}
var sshConf = &util.SshConfig{
Addr: req.Addr,
User: req.User,
Password: req.Password,
Keyfile: req.Keyfile,
ConfigAlias: req.ConfigAlias,
RemoteKubeconfig: req.RemoteKubeconfig,
}
file, err := util.ConvertToTempFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
err = handler.SshJump(context.Background(), sshConf, flags)
if err != nil {
return err
}
err = svr.connect.InitClient(InitFactory(req.KubeconfigBytes, req.Namespace))
if err != nil {
return err
}
err = svr.connect.PreCheckResource()
if err != nil {
return err
}
return nil return nil
} }
type proxyWarp struct {
server rpc.Daemon_ProxyServer
}
func (r *proxyWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.ConnectResponse{
Message: string(p),
})
return len(p), err
}
func newProxyWarp(server rpc.Daemon_ProxyServer) io.Writer {
return &proxyWarp{server: server}
}

View File

@@ -8,21 +8,6 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
) )
type QuitWarp struct {
server rpc.Daemon_QuitServer
}
func (r *QuitWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.QuitResponse{
Message: string(p),
})
return len(p), err
}
func newQuitWarp(server rpc.Daemon_QuitServer) io.Writer {
return &QuitWarp{server: server}
}
func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error { func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error {
origin := log.StandardLogger().Out origin := log.StandardLogger().Out
defer func() { defer func() {
@@ -39,3 +24,18 @@ func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error
} }
return nil return nil
} }
type quitWarp struct {
server rpc.Daemon_QuitServer
}
func (r *quitWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.QuitResponse{
Message: string(p),
})
return len(p), err
}
func newQuitWarp(server rpc.Daemon_QuitServer) io.Writer {
return &quitWarp{server: server}
}

View File

@@ -9,27 +9,12 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
) )
type StopWarp struct {
server rpc.Daemon_QuitServer
}
func (r *StopWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.QuitResponse{
Message: string(p),
})
return len(p), err
}
func newStopWarp(server rpc.Daemon_QuitServer) io.Writer {
return &StopWarp{server: server}
}
func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error { func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error {
if svr.connect == nil { if svr.connect == nil {
return nil return nil
} }
out := newQuitWarp(resp) out := newStopWarp(resp)
origin := log.StandardLogger().Out origin := log.StandardLogger().Out
defer func() { defer func() {
log.SetOutput(origin) log.SetOutput(origin)
@@ -42,3 +27,18 @@ func (svr *Server) Stop(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error
svr.connect = nil svr.connect = nil
return nil return nil
} }
type stopWarp struct {
server rpc.Daemon_QuitServer
}
func (r *stopWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.QuitResponse{
Message: string(p),
})
return len(p), err
}
func newStopWarp(server rpc.Daemon_QuitServer) io.Writer {
return &stopWarp{server: server}
}

View File

@@ -40,9 +40,6 @@ func (c *ConnectOptions) Cleanup() {
log.Info("prepare to exit, cleaning up") log.Info("prepare to exit, cleaning up")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel() defer cancel()
if c.cancel != nil {
c.cancel()
}
var ips []net.IP var ips []net.IP
if c.localTunIPv4 != nil && c.localTunIPv4.IP != nil { if c.localTunIPv4 != nil && c.localTunIPv4.IP != nil {
ips = append(ips, c.localTunIPv4.IP) ips = append(ips, c.localTunIPv4.IP)
@@ -75,6 +72,14 @@ func (c *ConnectOptions) Cleanup() {
function() function()
} }
} }
// leave proxy resources
err := c.LeaveProxyResources(context.Background())
if err != nil {
log.Error(err)
}
if c.cancel != nil {
c.cancel()
}
RollbackFuncList = RollbackFuncList[:] RollbackFuncList = RollbackFuncList[:]
dns.CancelDNS() dns.CancelDNS()
log.Info("clean up successful") log.Info("clean up successful")

View File

@@ -765,7 +765,12 @@ func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (e
return err return err
} }
var local = &netip.AddrPort{} port := util.GetAvailableTCPPortOrDie()
var local netip.AddrPort
local, err = netip.ParseAddrPort(net.JoinHostPort("127.0.0.1", strconv.Itoa(port)))
if err != nil {
return err
}
errChan := make(chan error, 1) errChan := make(chan error, 1)
readyChan := make(chan struct{}, 1) readyChan := make(chan struct{}, 1)
go func() { go func() {
@@ -776,7 +781,7 @@ func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (e
default: default:
} }
err := util.Main(ctx, &remote, local, conf, readyChan) err := util.Main(ctx, remote, local, conf, readyChan)
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) { if !errors.Is(err, context.Canceled) {
log.Errorf("ssh forward failed err: %v", err) log.Errorf("ssh forward failed err: %v", err)

View File

@@ -2,6 +2,8 @@ package handler
import ( import (
"context" "context"
"fmt"
corev1 "k8s.io/api/core/v1"
"strings" "strings"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
@@ -19,18 +21,50 @@ import (
// 1, get all proxy-resources from configmap // 1, get all proxy-resources from configmap
// 2, cleanup all containers // 2, cleanup all containers
func (c *ConnectOptions) Reset(ctx context.Context) error { func (c *ConnectOptions) Reset(ctx context.Context) error {
cm, err := c.clientset.CoreV1().ConfigMaps(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) err := c.LeaveProxyResources(ctx)
if err != nil {
log.Error(err)
}
cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false)
var cli *client.Client
cli, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return nil
}
var networkResource types.NetworkResource
networkResource, err = cli.NetworkInspect(ctx, config.ConfigMapPodTrafficManager, types.NetworkInspectOptions{})
if err != nil {
return nil
}
if len(networkResource.Containers) == 0 {
return cli.NetworkRemove(ctx, config.ConfigMapPodTrafficManager)
}
return nil
}
func (c *ConnectOptions) LeaveProxyResources(ctx context.Context) (err error) {
if c == nil || c.clientset == nil {
return
}
mapInterface := c.clientset.CoreV1().ConfigMaps(c.Namespace)
var cm *corev1.ConfigMap
cm, err = mapInterface.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) { if err != nil && !apierrors.IsNotFound(err) {
return err return
}
if cm == nil || cm.Data == nil || len(cm.Data[config.KeyEnvoy]) == 0 {
err = fmt.Errorf("can not found proxy resources")
return
} }
var v = make([]*controlplane.Virtual, 0) var v = make([]*controlplane.Virtual, 0)
localTunIPv4 := c.GetLocalTunIPv4() str := cm.Data[config.KeyEnvoy]
if cm != nil && cm.Data != nil {
if str, ok := cm.Data[config.KeyEnvoy]; ok && len(str) != 0 {
if err = yaml.Unmarshal([]byte(str), &v); err != nil { if err = yaml.Unmarshal([]byte(str), &v); err != nil {
log.Error(err) log.Error(err)
return err return
} }
localTunIPv4 := c.GetLocalTunIPv4()
for _, virtual := range v { for _, virtual := range v {
// deployments.apps.ry-server --> deployments.apps/ry-server // deployments.apps.ry-server --> deployments.apps/ry-server
lastIndex := strings.LastIndex(virtual.Uid, ".") lastIndex := strings.LastIndex(virtual.Uid, ".")
@@ -41,19 +75,5 @@ func (c *ConnectOptions) Reset(ctx context.Context) error {
continue continue
} }
} }
} return err
}
cleanup(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, false)
var cli *client.Client
if cli, err = client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()); err != nil {
return nil
}
var i types.NetworkResource
if i, err = cli.NetworkInspect(ctx, config.ConfigMapPodTrafficManager, types.NetworkInspectOptions{}); err != nil {
return nil
}
if len(i.Containers) == 0 {
return cli.NetworkRemove(ctx, config.ConfigMapPodTrafficManager)
}
return nil
} }

View File

@@ -63,8 +63,8 @@ func ConvertToKubeconfigBytes(factory cmdutil.Factory) ([]byte, string, error) {
return marshal, namespace, nil return marshal, namespace, nil
} }
func ConvertToTempFile(kubeconfigBytes []byte) (string, error) { func ConvertToTempKubeconfigFile(kubeconfigBytes []byte) (string, error) {
temp, err := os.CreateTemp("", "") temp, err := os.CreateTemp("", "*.tmp.kubeconfig")
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@@ -29,7 +29,10 @@ type SshConfig struct {
RemoteKubeconfig string RemoteKubeconfig string
} }
func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, conf *SshConfig, done chan struct{}) error { func Main(pctx context.Context, remoteEndpoint, localEndpoint netip.AddrPort, conf *SshConfig, done chan struct{}) error {
ctx, cancelFunc := context.WithCancel(pctx)
defer cancelFunc()
var remote *ssh.Client var remote *ssh.Client
var err error var err error
if conf.ConfigAlias != "" { if conf.ConfigAlias != "" {
@@ -60,16 +63,12 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co
// Listen on remote server port // Listen on remote server port
var lc net.ListenConfig var lc net.ListenConfig
listen, err := lc.Listen(ctx, "tcp", "localhost:0") listen, err := lc.Listen(ctx, "tcp", localEndpoint.String())
if err != nil { if err != nil {
return err return err
} }
defer listen.Close() defer listen.Close()
*localEndpoint, err = netip.ParseAddrPort(listen.Addr().String())
if err != nil {
return err
}
select { select {
case done <- struct{}{}: case done <- struct{}{}:
} }
@@ -94,9 +93,10 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co
if err == nil { if err == nil {
break break
} }
time.Sleep(time.Second) time.Sleep(time.Millisecond * 200)
} }
if conn == nil { if conn == nil {
cancelFunc()
return return
} }
defer conn.Close() defer conn.Close()
@@ -180,12 +180,12 @@ func publicKeyFile(file string) ssh.AuthMethod {
} }
func handleClient(client net.Conn, remote net.Conn) { func handleClient(client net.Conn, remote net.Conn) {
chDone := make(chan bool) chDone := make(chan bool, 2)
// start remote -> local data transfer // start remote -> local data transfer
go func() { go func() {
_, err := io.Copy(client, remote) _, err := io.Copy(client, remote)
if err != nil { if err != nil && !errors.Is(err, net.ErrClosed) {
log.Debugf("error while copy remote->local: %s", err) log.Debugf("error while copy remote->local: %s", err)
} }
chDone <- true chDone <- true
@@ -194,7 +194,7 @@ func handleClient(client net.Conn, remote net.Conn) {
// start local -> remote data transfer // start local -> remote data transfer
go func() { go func() {
_, err := io.Copy(remote, client) _, err := io.Copy(remote, client)
if err != nil { if err != nil && !errors.Is(err, net.ErrClosed) {
log.Debugf("error while copy local->remote: %s", err) log.Debugf("error while copy local->remote: %s", err)
} }
chDone <- true chDone <- true
@@ -267,6 +267,7 @@ func dial(from *SshConfig) (*ssh.Client, error) {
User: from.User, User: from.User,
Auth: []ssh.AuthMethod{publicKeyFile(from.Keyfile)}, Auth: []ssh.AuthMethod{publicKeyFile(from.Keyfile)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(), HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: time.Second * 10,
}) })
} }
@@ -281,6 +282,7 @@ func jump(bClient *ssh.Client, to *SshConfig) (*ssh.Client, error) {
User: to.User, User: to.User,
Auth: []ssh.AuthMethod{publicKeyFile(to.Keyfile)}, Auth: []ssh.AuthMethod{publicKeyFile(to.Keyfile)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(), HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: time.Second * 10,
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -55,7 +55,7 @@ import (
) )
func GetAvailableUDPPortOrDie() int { func GetAvailableUDPPortOrDie() int {
address, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:0", "0.0.0.0")) address, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:0", "localhost"))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -68,7 +68,7 @@ func GetAvailableUDPPortOrDie() int {
} }
func GetAvailableTCPPortOrDie() int { func GetAvailableTCPPortOrDie() int {
address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", "0.0.0.0")) address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", "localhost"))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@@ -489,7 +489,7 @@ func WaitPortToBeFree(ctx context.Context, port int) error {
} }
func IsPortListening(port int) bool { func IsPortListening(port int) bool {
listener, err := net.Listen("tcp4", net.JoinHostPort("0.0.0.0", strconv.Itoa(port))) listener, err := net.Listen("tcp4", net.JoinHostPort("localhost", strconv.Itoa(port)))
if err != nil { if err != nil {
return true return true
} else { } else {

View File

@@ -93,7 +93,7 @@ func TestPing(t *testing.T) {
log.Infof("failed to serialize icmp packet, err: %v", err) log.Infof("failed to serialize icmp packet, err: %v", err)
return return
} }
ipConn, err := net.ListenPacket("ip4:icmp", "0.0.0.0") ipConn, err := net.ListenPacket("ip4:icmp", "localhost")
if err != nil { if err != nil {
if strings.Contains(err.Error(), "operation not permitted") { if strings.Contains(err.Error(), "operation not permitted") {
return return