mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-10-06 07:47:08 +08:00
feat: add mode connect-mode
This commit is contained in:
411
pkg/dev/main.go
411
pkg/dev/main.go
@@ -1,14 +1,18 @@
|
||||
package dev
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/docker/cli/cli/command"
|
||||
@@ -19,9 +23,12 @@ import (
|
||||
containertypes "github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/mount"
|
||||
"github.com/docker/docker/api/types/network"
|
||||
"github.com/docker/docker/api/types/strslice"
|
||||
"github.com/docker/docker/client"
|
||||
"github.com/docker/docker/errdefs"
|
||||
"github.com/docker/docker/pkg/archive"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
@@ -34,10 +41,16 @@ import (
|
||||
"github.com/wencaiwulue/kubevpn/pkg/config"
|
||||
"github.com/wencaiwulue/kubevpn/pkg/handler"
|
||||
"github.com/wencaiwulue/kubevpn/pkg/mesh"
|
||||
"github.com/wencaiwulue/kubevpn/pkg/tun"
|
||||
"github.com/wencaiwulue/kubevpn/pkg/util"
|
||||
)
|
||||
|
||||
type ConnectMode string
|
||||
|
||||
const (
|
||||
ConnectModeContainer ConnectMode = "container"
|
||||
ConnectModeHost ConnectMode = "host"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Headers map[string]string
|
||||
Namespace string
|
||||
@@ -47,6 +60,7 @@ type Options struct {
|
||||
NoProxy bool
|
||||
ExtraCIDR []string
|
||||
ExtraDomain []string
|
||||
ConnectMode ConnectMode
|
||||
|
||||
// docker options
|
||||
Platform string
|
||||
@@ -58,9 +72,6 @@ type Options struct {
|
||||
Expose opts.ListOpts
|
||||
ExtraHosts opts.ListOpts
|
||||
NetMode opts.NetworkOpt
|
||||
Aliases opts.ListOpts
|
||||
Links opts.ListOpts
|
||||
LinkLocalIPs opts.ListOpts
|
||||
Env opts.ListOpts
|
||||
Mounts opts.MountOpt
|
||||
Volumes opts.ListOpts
|
||||
@@ -136,7 +147,7 @@ func (d Options) Main(ctx context.Context) error {
|
||||
return fmt.Errorf("your pod resource request is bigger than docker-desktop resource, please adjust your docker-desktop resource")
|
||||
}
|
||||
mode := container.NetworkMode(d.NetMode.NetworkMode())
|
||||
if mode.IsBridge() || mode.IsHost() || mode.IsContainer() || mode.IsNone() {
|
||||
if len(d.NetMode.Value()) != 0 {
|
||||
for _, runConfig := range list[:] {
|
||||
// remove expose port
|
||||
runConfig.config.ExposedPorts = nil
|
||||
@@ -155,7 +166,7 @@ func (d Options) Main(ctx context.Context) error {
|
||||
}
|
||||
} else {
|
||||
var networkID string
|
||||
networkID, err = createNetwork(ctx, cli, list[0].containerName)
|
||||
networkID, err = createKubevpnNetwork(ctx, cli, list[0].containerName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -190,50 +201,6 @@ func (d Options) Main(ctx context.Context) error {
|
||||
return terminal(list[0].containerName, dockerCli)
|
||||
}
|
||||
|
||||
func createNetwork(ctx context.Context, cli *client.Client, networkName string) (string, error) {
|
||||
getInterface, err := tun.GetInterface()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
addrs, err := getInterface.Addrs()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
cidr, _, err := net.ParseCIDR(addrs[0].String())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
by := map[string]string{"created_by": config.ConfigMapPodTrafficManager}
|
||||
create, err := cli.NetworkCreate(ctx, networkName, types.NetworkCreate{
|
||||
Driver: "bridge",
|
||||
Scope: "local",
|
||||
IPAM: &network.IPAM{
|
||||
Driver: "",
|
||||
Options: nil,
|
||||
Config: []network.IPAMConfig{
|
||||
{
|
||||
Subnet: config.CIDR.String(),
|
||||
Gateway: cidr.String(),
|
||||
},
|
||||
},
|
||||
},
|
||||
Internal: true,
|
||||
Labels: by,
|
||||
})
|
||||
if err != nil {
|
||||
if errdefs.IsForbidden(err) {
|
||||
list, _ := cli.NetworkList(ctx, types.NetworkListOptions{})
|
||||
for _, resource := range list {
|
||||
if reflect.DeepEqual(resource.Labels, by) {
|
||||
return resource.ID, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
return create.ID, nil
|
||||
}
|
||||
|
||||
type Run []*RunConfig
|
||||
|
||||
func (r Run) Remove(ctx context.Context) error {
|
||||
@@ -396,3 +363,345 @@ func checkOutOfMemory(spec *v1.PodTemplateSpec, cli *client.Client) (outOfMemory
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func DoDev(devOptions Options, args []string, f cmdutil.Factory) error {
|
||||
connect := handler.ConnectOptions{
|
||||
Headers: devOptions.Headers,
|
||||
Workloads: args,
|
||||
ExtraCIDR: devOptions.ExtraCIDR,
|
||||
ExtraDomain: devOptions.ExtraDomain,
|
||||
}
|
||||
|
||||
mode := container.NetworkMode(devOptions.NetMode.NetworkMode())
|
||||
if mode.IsContainer() {
|
||||
client, _, err := GetClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var inspect types.ContainerJSON
|
||||
inspect, err = client.ContainerInspect(context.Background(), mode.ConnectedContainer())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if inspect.State == nil {
|
||||
return fmt.Errorf("can not get container status, please make contianer name is valid")
|
||||
}
|
||||
if !inspect.State.Running {
|
||||
return fmt.Errorf("container %s status is %s, expect is running, please make sure your outer docker name is correct", mode.ConnectedContainer(), inspect.State.Status)
|
||||
}
|
||||
}
|
||||
|
||||
if err := connect.InitClient(f); err != nil {
|
||||
return err
|
||||
}
|
||||
err := connect.PreCheckResource()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(connect.Workloads) > 1 {
|
||||
return fmt.Errorf("can only dev one workloads at same time, workloads: %v", connect.Workloads)
|
||||
}
|
||||
if len(connect.Workloads) < 1 {
|
||||
return fmt.Errorf("you must provide resource to dev, workloads : %v is invaild", connect.Workloads)
|
||||
}
|
||||
|
||||
devOptions.Workload = connect.Workloads[0]
|
||||
// if no-proxy is true, not needs to intercept traffic
|
||||
if devOptions.NoProxy {
|
||||
if len(connect.Headers) != 0 {
|
||||
return fmt.Errorf("not needs to provide headers if is no-proxy mode")
|
||||
}
|
||||
connect.Workloads = []string{}
|
||||
}
|
||||
path, err := connect.GetKubeconfigPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch devOptions.ConnectMode {
|
||||
case ConnectModeHost:
|
||||
defer func() {
|
||||
handler.Cleanup(syscall.SIGQUIT)
|
||||
select {}
|
||||
}()
|
||||
if err = connect.DoConnect(); err != nil {
|
||||
log.Errorln(err)
|
||||
return err
|
||||
}
|
||||
case ConnectModeContainer:
|
||||
var dockerCli *command.DockerCli
|
||||
var cli *client.Client
|
||||
cli, dockerCli, err = GetClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var entrypoint []string
|
||||
if devOptions.NoProxy {
|
||||
entrypoint = []string{"kubevpn", "connect", "-n", connect.Namespace, "--kubeconfig", "/root/.kube/config", "--image", config.Image}
|
||||
for _, v := range connect.ExtraCIDR {
|
||||
entrypoint = append(entrypoint, "--extra-cidr", v)
|
||||
}
|
||||
for _, v := range connect.ExtraDomain {
|
||||
entrypoint = append(entrypoint, "--extra-domain", v)
|
||||
}
|
||||
} else {
|
||||
entrypoint = []string{"kubevpn", "proxy", connect.Workloads[0], "-n", connect.Namespace, "--kubeconfig", "/root/.kube/config", "--image", config.Image}
|
||||
for k, v := range connect.Headers {
|
||||
entrypoint = append(entrypoint, "--headers", fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
for _, v := range connect.ExtraCIDR {
|
||||
entrypoint = append(entrypoint, "--extra-cidr", v)
|
||||
}
|
||||
for _, v := range connect.ExtraDomain {
|
||||
entrypoint = append(entrypoint, "--extra-domain", v)
|
||||
}
|
||||
}
|
||||
|
||||
runConfig := &container.Config{
|
||||
User: "root",
|
||||
AttachStdin: false,
|
||||
AttachStdout: false,
|
||||
AttachStderr: false,
|
||||
ExposedPorts: nil,
|
||||
StdinOnce: false,
|
||||
Env: []string{fmt.Sprintf("%s=1", config.EnvStartSudoKubeVPNByKubeVPN)},
|
||||
Cmd: []string{},
|
||||
Healthcheck: nil,
|
||||
ArgsEscaped: false,
|
||||
Image: config.Image,
|
||||
Volumes: nil,
|
||||
Entrypoint: entrypoint,
|
||||
NetworkDisabled: false,
|
||||
MacAddress: "",
|
||||
OnBuild: nil,
|
||||
StopSignal: "",
|
||||
StopTimeout: nil,
|
||||
Shell: nil,
|
||||
}
|
||||
hostConfig := &container.HostConfig{
|
||||
Binds: []string{fmt.Sprintf("%s:%s", path, "/root/.kube/config")},
|
||||
LogConfig: container.LogConfig{},
|
||||
PortBindings: nil,
|
||||
RestartPolicy: container.RestartPolicy{},
|
||||
AutoRemove: true,
|
||||
VolumeDriver: "",
|
||||
VolumesFrom: nil,
|
||||
ConsoleSize: [2]uint{},
|
||||
CapAdd: strslice.StrSlice{"SYS_PTRACE", "SYS_ADMIN"}, // for dlv
|
||||
CgroupnsMode: "",
|
||||
ExtraHosts: nil,
|
||||
GroupAdd: nil,
|
||||
IpcMode: "",
|
||||
Cgroup: "",
|
||||
Links: nil,
|
||||
OomScoreAdj: 0,
|
||||
PidMode: "",
|
||||
Privileged: true,
|
||||
PublishAllPorts: false,
|
||||
ReadonlyRootfs: false,
|
||||
SecurityOpt: []string{"apparmor=unconfined", "seccomp=unconfined"},
|
||||
StorageOpt: nil,
|
||||
Tmpfs: nil,
|
||||
UTSMode: "",
|
||||
UsernsMode: "",
|
||||
ShmSize: 0,
|
||||
Sysctls: nil,
|
||||
Runtime: "",
|
||||
Isolation: "",
|
||||
Resources: container.Resources{},
|
||||
MaskedPaths: nil,
|
||||
ReadonlyPaths: nil,
|
||||
Init: nil,
|
||||
}
|
||||
var suffix string
|
||||
if newUUID, err := uuid.NewUUID(); err == nil {
|
||||
suffix = strings.ReplaceAll(newUUID.String(), "-", "")[:5]
|
||||
}
|
||||
name := fmt.Sprintf("%s_%s_%s", "kubevpn", "local", suffix)
|
||||
var kubevpnNetwork string
|
||||
kubevpnNetwork, err = createKubevpnNetwork(context.Background(), cli, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c := &RunConfig{
|
||||
config: runConfig,
|
||||
hostConfig: hostConfig,
|
||||
networkingConfig: &network.NetworkingConfig{
|
||||
EndpointsConfig: map[string]*network.EndpointSettings{name: {
|
||||
NetworkID: kubevpnNetwork,
|
||||
}},
|
||||
},
|
||||
platform: nil,
|
||||
containerName: name,
|
||||
k8sContainerName: name,
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
handler.AddCleanUpResourceHandler(connect.GetClientset(), connect.Namespace, nil)
|
||||
handler.RollbackFuncList = append(handler.RollbackFuncList, cancel)
|
||||
var id string
|
||||
if id, err = run(ctx, c, cli, dockerCli); err != nil {
|
||||
return err
|
||||
}
|
||||
handler.RollbackFuncList = append(handler.RollbackFuncList, func() {
|
||||
_ = cli.ContainerKill(context.Background(), id, "KILL")
|
||||
})
|
||||
if err = runLogs(dockerCli, id); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = devOptions.NetMode.Set("container:" + id); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unsupport connect mode: %s", devOptions.ConnectMode)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
handler.Cleanup(os.Kill)
|
||||
select {}
|
||||
}()
|
||||
devOptions.Namespace = connect.Namespace
|
||||
err = devOptions.Main(context.Background())
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func runLogs(dockerCli command.Cli, container string) error {
|
||||
ctx := context.Background()
|
||||
|
||||
c, err := dockerCli.Client().ContainerInspect(ctx, container)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
options := types.ContainerLogsOptions{
|
||||
ShowStdout: true,
|
||||
ShowStderr: true,
|
||||
Follow: true,
|
||||
}
|
||||
responseBody, err := dockerCli.Client().ContainerLogs(ctx, c.ID, options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer responseBody.Close()
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
writer := io.MultiWriter(buf, dockerCli.Out())
|
||||
|
||||
var errChan = make(chan error)
|
||||
var stopChan = make(chan struct{})
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
if strings.Contains(buf.String(), "enjoy it") {
|
||||
close(stopChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
var err error
|
||||
if c.Config.Tty {
|
||||
_, err = io.Copy(writer, responseBody)
|
||||
} else {
|
||||
_, err = stdcopy.StdCopy(writer, dockerCli.Err(), responseBody)
|
||||
}
|
||||
if err != nil {
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err = <-errChan:
|
||||
return err
|
||||
case <-stopChan:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func runKill(dockerCli command.Cli, containers ...string) error {
|
||||
var errs []string
|
||||
ctx := context.Background()
|
||||
errChan := parallelOperation(ctx, append([]string{}, containers...), func(ctx context.Context, container string) error {
|
||||
return dockerCli.Client().ContainerKill(ctx, container, "KILL")
|
||||
})
|
||||
for _, name := range containers {
|
||||
if err := <-errChan; err != nil {
|
||||
errs = append(errs, err.Error())
|
||||
} else {
|
||||
fmt.Fprintln(dockerCli.Out(), name)
|
||||
}
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
return errors.New(strings.Join(errs, "\n"))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func parallelOperation(ctx context.Context, containers []string, op func(ctx context.Context, container string) error) chan error {
|
||||
if len(containers) == 0 {
|
||||
return nil
|
||||
}
|
||||
const defaultParallel int = 50
|
||||
sem := make(chan struct{}, defaultParallel)
|
||||
errChan := make(chan error)
|
||||
|
||||
// make sure result is printed in correct order
|
||||
output := map[string]chan error{}
|
||||
for _, c := range containers {
|
||||
output[c] = make(chan error, 1)
|
||||
}
|
||||
go func() {
|
||||
for _, c := range containers {
|
||||
err := <-output[c]
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for _, c := range containers {
|
||||
sem <- struct{}{} // Wait for active queue sem to drain.
|
||||
go func(container string) {
|
||||
output[container] <- op(ctx, container)
|
||||
<-sem
|
||||
}(c)
|
||||
}
|
||||
}()
|
||||
return errChan
|
||||
}
|
||||
|
||||
func createKubevpnNetwork(ctx context.Context, cli *client.Client, networkName string) (string, error) {
|
||||
by := map[string]string{"owner": config.ConfigMapPodTrafficManager}
|
||||
create, err := cli.NetworkCreate(ctx, networkName, types.NetworkCreate{
|
||||
Driver: "bridge",
|
||||
Scope: "local",
|
||||
IPAM: &network.IPAM{
|
||||
Driver: "",
|
||||
Options: nil,
|
||||
Config: []network.IPAMConfig{
|
||||
{
|
||||
Subnet: config.DockerCIDR.String(),
|
||||
Gateway: config.DockerRouterIP.String(),
|
||||
},
|
||||
},
|
||||
},
|
||||
//Options: map[string]string{"--icc": "", "--ip-masq": ""},
|
||||
Labels: by,
|
||||
})
|
||||
if err != nil {
|
||||
if errdefs.IsForbidden(err) {
|
||||
list, _ := cli.NetworkList(ctx, types.NetworkListOptions{})
|
||||
for _, resource := range list {
|
||||
if reflect.DeepEqual(resource.Labels, by) {
|
||||
return resource.ID, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
return create.ID, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user