feat: multiple connect works well

This commit is contained in:
fengcaiwen
2023-10-25 20:56:36 +08:00
committed by naison
parent 1f4ea3ba87
commit d3640ec2d1
12 changed files with 176 additions and 86 deletions

View File

@@ -110,7 +110,7 @@ Startup your kubernetes workloads in local Docker container with same volume、e
}
err = dev.DoDev(cmd.Context(), devOptions, sshConf, cmd.Flags(), f, transferImage)
for _, fun := range handler.RollbackFuncList {
for _, fun := range devOptions.RollbackFuncList {
if fun != nil {
fun()
}

View File

@@ -26,7 +26,7 @@ func CmdStatus(f cmdutil.Factory) *cobra.Command {
return daemon.StartupDaemon(cmd.Context())
},
RunE: func(cmd *cobra.Command, args []string) error {
client, err := daemon.GetClient(false).Status(
client, err := daemon.GetClient(true).Status(
cmd.Context(),
&rpc.StatusRequest{},
)

View File

@@ -4,15 +4,14 @@ import (
"context"
"fmt"
"io"
"os"
"os/exec"
"strings"
defaultlog "log"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/spf13/pflag"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/pkg/handler"
"github.com/wencaiwulue/kubevpn/pkg/util"
)
@@ -21,85 +20,165 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
log.SetOutput(svr.LogFile)
log.SetLevel(log.DebugLevel)
}()
out := io.MultiWriter(newWarp(resp), svr.LogFile)
out := io.MultiWriter(newConnectForkWarp(resp), svr.LogFile)
log.SetOutput(out)
log.SetLevel(log.InfoLevel)
if !svr.IsSudo {
return fmt.Errorf("connect-fork should not send to sudo daemon server")
return svr.redirectConnectForkToSudoDaemon(req, resp)
}
ctx := resp.Context()
return fork(ctx, req, out)
}
connect := &handler.ConnectOptions{
Namespace: req.Namespace,
Headers: req.Headers,
Workloads: req.Workloads,
ExtraCIDR: req.ExtraCIDR,
ExtraDomain: req.ExtraDomain,
UseLocalDNS: req.UseLocalDNS,
Engine: config.Engine(req.Engine),
}
var sshConf = util.ParseSshFromRPC(req.SshJump)
var transferImage = req.TransferImage
func fork(ctx context.Context, req *rpc.ConnectRequest, out io.Writer) error {
exe, err := os.Executable()
if err != nil {
return fmt.Errorf("get executable error: %s", err.Error())
}
var args = []string{"connect-fork"}
if req.SshJump != nil {
if req.SshJump.Addr != "" {
args = append(args, "--ssh-addr", req.SshJump.Addr)
}
if req.SshJump.User != "" {
args = append(args, "--ssh-username", req.SshJump.User)
}
if req.SshJump.Password != "" {
args = append(args, "--ssh-password", req.SshJump.Password)
}
if req.SshJump.Keyfile != "" {
args = append(args, "--ssh-keyfile", req.SshJump.Keyfile)
}
if req.SshJump.ConfigAlias != "" { // alias in ~/.ssh/config
args = append(args, "--ssh-alias", req.SshJump.ConfigAlias)
}
if req.SshJump.RemoteKubeconfig != "" { // remote path in ssh server
args = append(args, "--remote-kubeconfig", req.SshJump.RemoteKubeconfig)
}
}
if req.KubeconfigBytes != "" {
var path string
path, err = util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
go util.StartupPProf(config.PProfPort)
defaultlog.Default().SetOutput(io.Discard)
if transferImage {
err := util.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out)
if err != nil {
return err
}
args = append(args, "--kubeconfig", path)
}
if req.Namespace != "" {
args = append(args, "-n", req.Namespace)
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
}
if req.Image != "" {
args = append(args, "--image", req.Image)
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
sshCtx, sshCancel := context.WithCancel(context.Background())
connect.RollbackFuncList = append(connect.RollbackFuncList, sshCancel)
var path string
path, err = handler.SshJump(sshCtx, sshConf, flags, false)
if err != nil {
return err
}
if req.TransferImage {
args = append(args, "--transfer-image")
err = connect.InitClient(InitFactoryByPath(path, req.Namespace))
if err != nil {
return err
}
for _, v := range req.ExtraCIDR {
args = append(args, "--extra-cidr", v)
err = connect.PreCheckResource()
if err != nil {
return err
}
for _, v := range req.ExtraDomain {
args = append(args, "--extra-domain", v)
_, err = connect.RentInnerIP(ctx)
if err != nil {
return err
}
env := os.Environ()
envKeys := sets.New[string](config.EnvInboundPodTunIPv4, config.EnvInboundPodTunIPv6)
for i := 0; i < len(env); i++ {
index := strings.Index(env[i], "=")
envKey := env[i][:index]
if envKeys.HasAny(envKey) {
env = append(env[:i], env[i+1:]...)
i--
continue
}
}
cmd := exec.CommandContext(ctx, exe, args...)
cmd.Env = env
cmd.Stdout = out
cmd.Stderr = out
err = cmd.Run()
config.Image = req.Image
err = connect.DoConnect(sshCtx)
if err != nil {
return fmt.Errorf("fork to exec connect error: %s", err.Error())
log.Errorf("do connect error: %v", err)
connect.Cleanup()
return err
}
svr.secondaryConnect = append(svr.secondaryConnect, connect)
return nil
}
func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServer) error {
cli := svr.GetClient(true)
if cli == nil {
return fmt.Errorf("sudo daemon not start")
}
connect := &handler.ConnectOptions{
Namespace: req.Namespace,
Headers: req.Headers,
Workloads: req.Workloads,
ExtraCIDR: req.ExtraCIDR,
ExtraDomain: req.ExtraDomain,
UseLocalDNS: req.UseLocalDNS,
Engine: config.Engine(req.Engine),
}
var sshConf = util.ParseSshFromRPC(req.SshJump)
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
}
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.AddFlag(&pflag.Flag{
Name: "kubeconfig",
DefValue: file,
})
sshCtx, sshCancel := context.WithCancel(context.Background())
connect.RollbackFuncList = append(connect.RollbackFuncList, sshCancel)
var path string
path, err = handler.SshJump(sshCtx, sshConf, flags, true)
if err != nil {
return err
}
err = connect.InitClient(InitFactoryByPath(path, req.Namespace))
if err != nil {
return err
}
err = connect.PreCheckResource()
if err != nil {
return err
}
for _, options := range svr.secondaryConnect {
var isSameCluster bool
isSameCluster, err = util.IsSameCluster(
options.GetClientset().CoreV1().ConfigMaps(options.Namespace), options.Namespace,
connect.GetClientset().CoreV1().ConfigMaps(connect.Namespace), connect.Namespace,
)
if err == nil && isSameCluster && options.Equal(connect) {
// same cluster, do nothing
log.Infof("already connect to cluster")
return nil
}
}
ctx, err := connect.RentInnerIP(resp.Context())
if err != nil {
return err
}
connResp, err := cli.Connect(ctx, req)
if err != nil {
return err
}
for {
recv, err := connResp.Recv()
if err == io.EOF {
break
} else if err != nil {
return err
}
err = resp.Send(recv)
if err != nil {
return err
}
}
svr.secondaryConnect = append(svr.secondaryConnect, connect)
return nil
}
type connectForkWarp struct {
server rpc.Daemon_ConnectServer
}
func (r *connectForkWarp) Write(p []byte) (n int, err error) {
err = r.server.Send(&rpc.ConnectResponse{
Message: string(p),
})
return len(p), err
}
func newConnectForkWarp(server rpc.Daemon_ConnectForkServer) io.Writer {
return &connectForkWarp{server: server}
}

View File

@@ -75,7 +75,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
})
sshCtx, sshCancel := context.WithCancel(context.Background())
handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel)
svr.connect.RollbackFuncList = append(svr.connect.RollbackFuncList, sshCancel)
var path string
path, err = handler.SshJump(sshCtx, sshConf, flags, false)
if err != nil {
@@ -129,7 +129,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
DefValue: file,
})
sshCtx, sshCancel := context.WithCancel(context.Background())
handler.RollbackFuncList = append(handler.RollbackFuncList, sshCancel)
connect.RollbackFuncList = append(connect.RollbackFuncList, sshCancel)
var path string
path, err = handler.SshJump(sshCtx, sshConf, flags, true)
if err != nil {

View File

@@ -57,6 +57,7 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon
for _, options := range svr.secondaryConnect {
options.Cleanup()
}
svr.secondaryConnect = nil
} else if req.ID != nil && req.GetID() == 0 {
if svr.connect != nil {
svr.connect.Cleanup()
@@ -71,6 +72,7 @@ func (svr *Server) Disconnect(req *rpc.DisconnectRequest, resp rpc.Daemon_Discon
index := req.GetID() - 1
if index < int32(len(svr.secondaryConnect)) {
svr.secondaryConnect[index].Cleanup()
svr.secondaryConnect = append(svr.secondaryConnect[:index], svr.secondaryConnect[index+1:]...)
} else {
log.Errorf("index %d out of range", req.GetID())
}

View File

@@ -29,6 +29,10 @@ func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error
log.Errorf("quit: cleanup clone failed: %v", err)
}
}
for _, options := range svr.secondaryConnect {
log.Info("quit: cleanup connection")
options.Cleanup()
}
return nil
}

View File

@@ -12,17 +12,19 @@ import (
func (svr *Server) Status(ctx context.Context, request *rpc.StatusRequest) (*rpc.StatusResponse, error) {
var sb = new(bytes.Buffer)
w := tabwriter.NewWriter(sb, 1, 1, 1, ' ', 0)
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", "ID", "Priority", "Context", "Status")
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n", "ID", "Priority", "Context", "Namespace", "Status")
status := "None"
kubeContext := ""
namespace := ""
if svr.connect != nil {
status = "Connected"
kubeContext = svr.connect.GetKubeconfigContext()
namespace = svr.connect.Namespace
}
_, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\n", 0, "Main", kubeContext, status)
_, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\n", 0, "Main", kubeContext, namespace, status)
for i, options := range svr.secondaryConnect {
_, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\n", i+1, "Minor", options.GetKubeconfigContext(), "Connected")
_, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\n", i+1, "Minor", options.GetKubeconfigContext(), options.Namespace, "Connected")
}
_ = w.Flush()
return &rpc.StatusResponse{Message: sb.String()}, nil

View File

@@ -29,7 +29,6 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/cp"
"github.com/wencaiwulue/kubevpn/pkg/dns"
"github.com/wencaiwulue/kubevpn/pkg/handler"
)
type RunConfig struct {
@@ -195,7 +194,7 @@ func GetDNS(ctx context.Context, f util.Factory, ns, pod string) (*miekgdns.Clie
}
// GetVolume key format: [container name]-[volume mount name]
func GetVolume(ctx context.Context, f util.Factory, ns, pod string) (map[string][]mount.Mount, error) {
func GetVolume(ctx context.Context, f util.Factory, ns, pod string, d *Options) (map[string][]mount.Mount, error) {
clientSet, err := f.KubernetesClientSet()
if err != nil {
return nil, err
@@ -224,7 +223,7 @@ func GetVolume(ctx context.Context, f util.Factory, ns, pod string) (map[string]
if volumeMount.SubPath != "" {
join = filepath.Join(join, volumeMount.SubPath)
}
handler.RollbackFuncList = append(handler.RollbackFuncList, func() {
d.RollbackFuncList = append(d.RollbackFuncList, func() {
_ = os.RemoveAll(join)
})
// pod-namespace/pod-name:path

View File

@@ -78,6 +78,9 @@ type Options struct {
// inner
Cli *client.Client
DockerCli *command.DockerCli
// rollback
RollbackFuncList []func()
}
func (d *Options) Main(ctx context.Context, tempContainerConfig *containerConfig) error {
@@ -124,7 +127,7 @@ func (d *Options) Main(ctx context.Context, tempContainerConfig *containerConfig
log.Errorf("get env from k8s: %v", err)
return err
}
volume, err := GetVolume(ctx, d.Factory, d.Namespace, pod)
volume, err := GetVolume(ctx, d.Factory, d.Namespace, pod, d)
if err != nil {
log.Errorf("get volume from k8s: %v", err)
return err
@@ -213,7 +216,7 @@ func (d *Options) Main(ctx context.Context, tempContainerConfig *containerConfig
}
}
handler.RollbackFuncList = append(handler.RollbackFuncList, func() {
d.RollbackFuncList = append(d.RollbackFuncList, func() {
_ = runConfigList.Remove(ctx, d.Cli)
})
err = runConfigList.Run(ctx, volume, d.Cli, d.DockerCli)
@@ -570,7 +573,7 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, conf *util.S
},
)
go h.Run(func() error { select {} })
handler.RollbackFuncList = append(handler.RollbackFuncList, func() {
d.RollbackFuncList = append(d.RollbackFuncList, func() {
h.Close()
})
err = runLogsWaitRunning(cancelCtx, d.DockerCli, id)

View File

@@ -25,8 +25,6 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/util"
)
var RollbackFuncList = make([]func(), 2)
func (c *ConnectOptions) addCleanUpResourceHandler() {
var stopChan = make(chan os.Signal)
signal.Notify(stopChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL)
@@ -67,7 +65,7 @@ func (c *ConnectOptions) Cleanup() {
log.Errorf("can not update ref-count: %v", err)
}
}
for _, function := range RollbackFuncList {
for _, function := range c.RollbackFuncList {
if function != nil {
function()
}
@@ -80,7 +78,7 @@ func (c *ConnectOptions) Cleanup() {
if c.cancel != nil {
c.cancel()
}
RollbackFuncList = RollbackFuncList[:]
c.RollbackFuncList = c.RollbackFuncList[:]
name, err := c.GetTunDeviceName()
if err == nil {
log.Errorf("get tun device error: %v", err)

View File

@@ -67,6 +67,8 @@ type CloneOptions struct {
restclient *rest.RESTClient
config *rest.Config
factory cmdutil.Factory
RollbackFuncList []func()
}
func (d *CloneOptions) InitClient(f cmdutil.Factory) (err error) {
@@ -187,7 +189,7 @@ func (d *CloneOptions) DoClone(ctx context.Context) error {
if err != nil {
return err
}
RollbackFuncList = append(RollbackFuncList, func() {
d.RollbackFuncList = append(d.RollbackFuncList, func() {
_ = client.Resource(object.Mapping.Resource).Namespace(d.TargetNamespace).Delete(context.Background(), u.GetName(), metav1.DeleteOptions{})
})
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {

View File

@@ -91,6 +91,7 @@ type ConnectOptions struct {
// needs to give it back to dhcp
localTunIPv4 *net.IPNet
localTunIPv6 *net.IPNet
RollbackFuncList []func()
apiServerIPs []net.IP
extraHost []dns.Entry
@@ -608,7 +609,7 @@ func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) {
if !util.FindAllowFirewallRule() {
util.AddAllowFirewallRule()
}
RollbackFuncList = append(RollbackFuncList, util.DeleteAllowFirewallRule)
c.RollbackFuncList = append(c.RollbackFuncList, util.DeleteAllowFirewallRule)
go util.DeleteBlockFirewallRule(ctx)
}