refactor: optimize code (#650)

This commit is contained in:
naison
2025-06-18 10:16:30 +08:00
committed by GitHub
parent de9bbcd1b0
commit 9010d05198
199 changed files with 1247 additions and 31867 deletions

View File

@@ -6,7 +6,6 @@ import (
"context"
"errors"
"fmt"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"io"
"os"
"runtime"
@@ -18,6 +17,8 @@ import (
restclient "k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/cmd/exec"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
// CopyOptions have the data required to perform the copy operation

View File

@@ -4,7 +4,6 @@ import (
"context"
"io"
"os"
"time"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
@@ -41,7 +40,7 @@ func (svr *Server) Connect(resp rpc.Daemon_ConnectServer) (err error) {
}
ctx := resp.Context()
if !svr.t.IsZero() {
if svr.connect != nil {
s := "Only support one cluster connect with full mode, you can use options `--lite` to connect to another cluster"
return status.Error(codes.AlreadyExists, s)
}
@@ -51,10 +50,8 @@ func (svr *Server) Connect(resp rpc.Daemon_ConnectServer) (err error) {
svr.connect.Cleanup(plog.WithLogger(context.Background(), logger))
svr.connect = nil
}
svr.t = time.Time{}
}
}()
svr.t = time.Now()
svr.connect = &handler.ConnectOptions{
Namespace: req.ManagerNamespace,
ExtraRouteInfo: *handler.ParseExtraRouteFromRPC(req.ExtraRoute),
@@ -84,7 +81,6 @@ func (svr *Server) Connect(resp rpc.Daemon_ConnectServer) (err error) {
if err != nil {
svr.connect.Cleanup(sshCtx)
svr.connect = nil
svr.t = time.Time{}
sshCancel()
}
}()
@@ -224,7 +220,6 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
if resp.Context().Err() != nil {
return resp.Context().Err()
}
svr.t = time.Now()
svr.connect = connect
return nil

View File

@@ -4,7 +4,6 @@ import (
"context"
"io"
"os"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@@ -72,13 +71,11 @@ func (svr *Server) Disconnect(resp rpc.Daemon_DisconnectServer) (err error) {
}
svr.secondaryConnect = nil
svr.connect = nil
svr.t = time.Time{}
case req.ID != nil && req.GetID() == 0:
if svr.connect != nil {
svr.connect.Cleanup(ctx)
}
svr.connect = nil
svr.t = time.Time{}
if svr.clone != nil {
_ = svr.clone.Cleanup(ctx)
@@ -125,7 +122,6 @@ func (svr *Server) Disconnect(resp rpc.Daemon_DisconnectServer) (err error) {
}
if foundModeFull {
svr.connect = nil
svr.t = time.Time{}
if svr.clone != nil {
_ = svr.clone.Cleanup(ctx)
}
@@ -174,7 +170,6 @@ func disconnect(ctx context.Context, svr *Server, connect *handler.ConnectOption
plog.G(ctx).Infof("Disconnecting from the cluster...")
svr.connect.Cleanup(ctx)
svr.connect = nil
svr.t = time.Time{}
}
}
for i := 0; i < len(svr.secondaryConnect); i++ {

View File

@@ -1,123 +0,0 @@
package action
import (
"context"
"encoding/json"
"errors"
"time"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
)
func (svr *Server) Get(ctx context.Context, req *rpc.GetRequest) (*rpc.GetResponse, error) {
if svr.connect == nil || svr.connect.Context() == nil {
return nil, errors.New("not connected")
}
if svr.resourceLists == nil {
restConfig, err := svr.connect.GetFactory().ToRESTConfig()
if err != nil {
return nil, err
}
restConfig.WarningHandler = rest.NoWarnings{}
config, err := discovery.NewDiscoveryClientForConfig(restConfig)
if err != nil {
return nil, err
}
svr.resourceLists, err = discovery.ServerPreferredResources(config)
if err != nil {
return nil, err
}
forConfig, err := metadata.NewForConfig(restConfig)
if err != nil {
return nil, err
}
mapper, err := svr.connect.GetFactory().ToRESTMapper()
if err != nil {
return nil, err
}
svr.informer = metadatainformer.NewSharedInformerFactory(forConfig, time.Second*5)
for _, resourceList := range svr.resourceLists {
for _, resource := range resourceList.APIResources {
var groupVersion schema.GroupVersion
groupVersion, err = schema.ParseGroupVersion(resourceList.GroupVersion)
if err != nil {
continue
}
var mapping schema.GroupVersionResource
mapping, err = mapper.ResourceFor(groupVersion.WithResource(resource.Name))
if err != nil {
if meta.IsNoMatchError(err) {
continue
}
return nil, err
}
_ = svr.informer.ForResource(mapping).Informer().SetWatchErrorHandler(func(r *cache.Reflector, err error) {
_, _ = svr.LogFile.Write([]byte(err.Error()))
})
}
}
svr.informer.Start(svr.connect.Context().Done())
svr.informer.WaitForCacheSync(ctx.Done())
}
informer, gvk, err := svr.getInformer(req)
if err != nil {
return nil, err
}
var result []string
for _, m := range informer.Informer().GetStore().List() {
objectMetadata, ok := m.(*v1.PartialObjectMetadata)
if ok {
deepCopy := objectMetadata.DeepCopy()
deepCopy.SetGroupVersionKind(*gvk)
deepCopy.ManagedFields = nil
marshal, err := json.Marshal(deepCopy)
if err != nil {
continue
}
result = append(result, string(marshal))
}
}
return &rpc.GetResponse{Metadata: result}, nil
}
func (svr *Server) getInformer(req *rpc.GetRequest) (informers.GenericInformer, *schema.GroupVersionKind, error) {
mapper, err := svr.connect.GetFactory().ToRESTMapper()
if err != nil {
return nil, nil, err
}
for _, resources := range svr.resourceLists {
for _, resource := range resources.APIResources {
have := sets.New[string](resource.Kind, resource.Name, resource.SingularName).Insert(resource.ShortNames...).Has(req.Resource)
if have {
var groupVersion schema.GroupVersion
groupVersion, err = schema.ParseGroupVersion(resources.GroupVersion)
if err != nil {
continue
}
var mapping schema.GroupVersionResource
mapping, err = mapper.ResourceFor(groupVersion.WithResource(resource.Name))
if err != nil {
if meta.IsNoMatchError(err) {
continue
}
return nil, nil, err
}
return svr.informer.ForResource(mapping), ptr.To(groupVersion.WithKind(resource.Kind)), nil
}
}
}
return nil, nil, errors.New("ErrResourceNotFound")
}

View File

@@ -8,8 +8,6 @@ import (
"time"
"gopkg.in/natefinch/lumberjack.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/metadata/metadatainformer"
"sigs.k8s.io/yaml"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
@@ -27,14 +25,10 @@ type Server struct {
LogFile *lumberjack.Logger
Lock sync.Mutex
t time.Time
connect *handler.ConnectOptions
clone *handler.CloneOptions
secondaryConnect []*handler.ConnectOptions
resourceLists []*metav1.APIResourceList
informer metadatainformer.SharedInformerFactory
ID string
}
@@ -84,7 +78,7 @@ func (svr *Server) LoadFromConfig() error {
_ = util.PrintGRPCStream[rpc.ConnectResponse](nil, resp, svr.LogFile)
}
}
for _, c := range append(conf.SecondaryConnect) {
for _, c := range conf.SecondaryConnect {
if c != nil {
var resp rpc.Daemon_ConnectClient
resp, err = client.ConnectFork(context.Background())
@@ -97,9 +91,6 @@ func (svr *Server) LoadFromConfig() error {
_ = util.PrintGRPCStream[rpc.ConnectResponse](nil, resp, svr.LogFile)
}
}
// because connect/connect-fork will also offload to disk,
// but it maybe failed, so we need to write it back to disk
err = os.WriteFile(config.GetDBPath(), content, 0644)
return nil
}

View File

@@ -2,7 +2,6 @@ package action
import (
"io"
"time"
log "github.com/sirupsen/logrus"
@@ -23,7 +22,6 @@ func (svr *Server) Stop(resp rpc.Daemon_QuitServer) error {
}
svr.connect.Cleanup(ctx)
svr.t = time.Time{}
svr.connect = nil
return nil
}

View File

@@ -1,13 +1,17 @@
package action
import (
"context"
"io"
"os"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/handler"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"github.com/wencaiwulue/kubevpn/v2/pkg/ssh"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
@@ -20,11 +24,6 @@ func (svr *Server) Uninstall(resp rpc.Daemon_UninstallServer) (err error) {
}
logger := plog.GetLoggerForClient(int32(log.InfoLevel), io.MultiWriter(newUninstallWarp(resp), svr.LogFile))
connect := &handler.ConnectOptions{
Namespace: req.Namespace,
Lock: &svr.Lock,
}
file, err := util.ConvertToTempKubeconfigFile([]byte(req.KubeconfigBytes))
if err != nil {
return err
@@ -38,17 +37,53 @@ func (svr *Server) Uninstall(resp rpc.Daemon_UninstallServer) (err error) {
return err
}
}
err = connect.InitClient(util.InitFactoryByPath(file, req.Namespace))
factory := util.InitFactoryByPath(file, req.Namespace)
clientset, err := factory.KubernetesClientSet()
if err != nil {
return err
}
err = connect.Uninstall(ctx)
err = Uninstall(ctx, clientset, req.Namespace)
if err != nil {
return err
}
return nil
}
// Uninstall
// 1) quit daemon
// 2) get all proxy-resources from configmap
// 3) cleanup all containers
// 4) cleanup hosts
func Uninstall(ctx context.Context, clientset *kubernetes.Clientset, ns string) error {
plog.G(ctx).Infof("Cleaning up resources")
name := config.ConfigMapPodTrafficManager
options := metav1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}
_ = clientset.CoreV1().ConfigMaps(ns).Delete(ctx, name, options)
_ = clientset.CoreV1().Pods(ns).Delete(ctx, config.CniNetName, options)
_ = clientset.CoreV1().Secrets(ns).Delete(ctx, name, options)
_ = clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(ctx, name+"."+ns, options)
_ = clientset.RbacV1().RoleBindings(ns).Delete(ctx, name, options)
_ = clientset.CoreV1().ServiceAccounts(ns).Delete(ctx, name, options)
_ = clientset.RbacV1().Roles(ns).Delete(ctx, name, options)
_ = clientset.CoreV1().Services(ns).Delete(ctx, name, options)
_ = clientset.AppsV1().Deployments(ns).Delete(ctx, name, options)
_ = clientset.BatchV1().Jobs(ns).Delete(ctx, name, options)
_ = CleanupLocalContainer(ctx)
plog.G(ctx).Info("Done")
return nil
}
func CleanupLocalContainer(ctx context.Context) error {
inspect, err := util.NetworkInspect(ctx, config.ConfigMapPodTrafficManager)
if err != nil {
return err
}
if len(inspect.Containers) == 0 {
err = util.NetworkRemove(ctx, config.ConfigMapPodTrafficManager)
}
return err
}
type uninstallWarp struct {
server rpc.Daemon_UninstallServer
}

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,6 @@ service Daemon {
rpc Logs (stream LogRequest) returns (stream LogResponse) {}
rpc List (ListRequest) returns (ListResponse) {}
rpc Get (GetRequest) returns (GetResponse) {}
rpc Upgrade (UpgradeRequest) returns (UpgradeResponse) {}
rpc Status (StatusRequest) returns (StatusResponse) {}
rpc Version (VersionRequest) returns (VersionResponse) {}
@@ -88,7 +87,7 @@ message DisconnectRequest {
}
message DisconnectResponse {
string message = 1;
string Message = 1;
}
@@ -129,7 +128,7 @@ message ProxyRequest {
}
message ProxyResponse {
string message = 1;
string Message = 1;
}
message LeaveRequest {
@@ -138,7 +137,7 @@ message LeaveRequest {
}
message LeaveResponse {
string message = 1;
string Message = 1;
}
message CloneRequest {
@@ -173,7 +172,7 @@ message CloneRequest {
}
message CloneResponse {
string message = 1;
string Message = 1;
}
message RemoveRequest {
@@ -181,14 +180,14 @@ message RemoveRequest {
}
message RemoveResponse {
string message = 1;
string Message = 1;
}
message QuitRequest {
}
message QuitResponse {
string message = 1;
string Message = 1;
}
message StatusRequest {
@@ -252,7 +251,7 @@ message VersionRequest {
}
message VersionResponse {
string version = 1;
string Version = 1;
}
message ConfigAddRequest {
@@ -305,23 +304,14 @@ message LogRequest {
}
message LogResponse {
string message = 1;
string Message = 1;
}
message ListRequest {
}
message ListResponse {
string message = 1;
}
message GetRequest {
string Namespace = 2;
string resource = 4;
}
message GetResponse {
repeated string metadata = 1;
string Message = 1;
}
message UpgradeRequest {
@@ -340,7 +330,7 @@ message UninstallRequest {
}
message UninstallResponse {
string message = 1;
string Message = 1;
}
message ResetRequest {
@@ -352,7 +342,7 @@ message ResetRequest {
}
message ResetResponse {
string message = 1;
string Message = 1;
}
message SshJump {

File diff suppressed because it is too large Load Diff

View File

@@ -1,23 +1,9 @@
package dev
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/stdcopy"
corev1 "k8s.io/api/core/v1"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
// Pull constants
@@ -38,91 +24,8 @@ func ConvertK8sImagePullPolicyToDocker(policy corev1.PullPolicy) string {
}
}
func RunLogsWaitRunning(ctx context.Context, name string) error {
buf := bytes.NewBuffer(nil)
w := io.MultiWriter(buf, os.Stdout)
args := []string{"logs", name, "--since", "0m", "--details", "--follow"}
cmd := exec.Command("docker", args...)
cmd.Stdout = w
go cmd.Start()
cancel, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
go func() {
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
// keyword, maybe can find another way more elegant
if strings.Contains(buf.String(), config.Slogan) {
cancelFunc()
return
}
}
}()
var errChan = make(chan error)
go func() {
var err error
_, err = stdcopy.StdCopy(w, os.Stdout, buf)
if err != nil {
errChan <- err
}
}()
select {
case err := <-errChan:
return err
case <-cancel.Done():
return nil
}
}
func RunLogsSinceNow(name string, follow bool) error {
args := []string{"logs", name, "--since", "0m", "--details"}
if follow {
args = append(args, "--follow")
}
output, err := exec.Command("docker", args...).CombinedOutput()
_, err = stdcopy.StdCopy(os.Stdout, os.Stderr, bytes.NewReader(output))
return err
}
// CreateNetwork
// docker create kubevpn-traffic-manager --labels owner=config.ConfigMapPodTrafficManager --subnet 198.18.0.0/16 --gateway 198.18.0.100
func CreateNetwork(ctx context.Context, name string) (string, error) {
args := []string{
"network",
"inspect",
name,
}
_, err := exec.CommandContext(ctx, "docker", args...).CombinedOutput()
if err == nil {
return name, nil
}
args = []string{
"network",
"create",
name,
"--label", "owner=" + name,
"--subnet", config.DockerCIDR.String(),
"--gateway", config.DockerRouterIP.String(),
"--driver", "bridge",
"--scope", "local",
}
id, err := exec.CommandContext(ctx, "docker", args...).CombinedOutput()
if err != nil {
return "", err
}
return string(id), nil
}
func RunContainer(ctx context.Context, runConfig *RunConfig) error {
var result []string
func convertToDockerArgs(runConfig *RunConfig) []string {
var result = []string{"docker"}
result = append(result, "run")
result = append(result, runConfig.options...)
if len(runConfig.command) != 0 {
@@ -130,116 +33,5 @@ func RunContainer(ctx context.Context, runConfig *RunConfig) error {
}
result = append(result, runConfig.image)
result = append(result, runConfig.args...)
cmd := exec.CommandContext(ctx, "docker", result...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
plog.G(ctx).Debugf("Run container with cmd: %v", cmd.Args)
err := cmd.Start()
if err != nil {
plog.G(ctx).Errorf("Failed to run container with cmd: %v: %v", cmd.Args, err)
return err
}
return cmd.Wait()
}
func WaitDockerContainerRunning(ctx context.Context, name string) error {
plog.G(ctx).Infof("Wait container %s to be running...", name)
for ctx.Err() == nil {
time.Sleep(time.Second * 1)
inspect, err := ContainerInspect(ctx, name)
if err != nil {
return err
}
if inspect.State != nil && (inspect.State.Status == "exited" || inspect.State.Status == "dead" || inspect.State.Dead) {
err = errors.New(fmt.Sprintf("container status: %s", inspect.State.Status))
break
}
if inspect.State != nil && inspect.State.Running {
break
}
}
plog.G(ctx).Infof("Container %s is running now", name)
return nil
}
func ContainerInspect(ctx context.Context, name string) (types.ContainerJSON, error) {
output, err := exec.CommandContext(ctx, "docker", "inspect", name).CombinedOutput()
if err != nil {
plog.G(ctx).Errorf("Failed to wait container to be ready output: %s: %v", string(output), err)
_ = RunLogsSinceNow(name, false)
return types.ContainerJSON{}, err
}
var inspect []types.ContainerJSON
rdr := bytes.NewReader(output)
err = json.NewDecoder(rdr).Decode(&inspect)
if err != nil {
return types.ContainerJSON{}, err
}
if len(inspect) == 0 {
return types.ContainerJSON{}, err
}
return inspect[0], nil
}
func NetworkInspect(ctx context.Context, name string) (types.NetworkResource, error) {
//var cli *client.Client
//var dockerCli *command.DockerCli
//cli.NetworkInspect()
output, err := exec.CommandContext(ctx, "docker", "network", "inspect", name).CombinedOutput()
if err != nil {
plog.G(ctx).Errorf("Failed to wait container to be ready: %v", err)
_ = RunLogsSinceNow(name, false)
return types.NetworkResource{}, err
}
var inspect []types.NetworkResource
rdr := bytes.NewReader(output)
err = json.NewDecoder(rdr).Decode(&inspect)
if err != nil {
return types.NetworkResource{}, err
}
if len(inspect) == 0 {
return types.NetworkResource{}, err
}
return inspect[0], nil
}
func NetworkRemove(ctx context.Context, name string) error {
output, err := exec.CommandContext(ctx, "docker", "network", "remove", name).CombinedOutput()
if err != nil && strings.Contains(string(output), "not found") {
return nil
}
return err
}
// NetworkDisconnect
// docker network disconnect --force
func NetworkDisconnect(ctx context.Context, containerName string) ([]byte, error) {
output, err := exec.CommandContext(ctx, "docker", "network", "disconnect", "--force", config.ConfigMapPodTrafficManager, containerName).CombinedOutput()
if err != nil && strings.Contains(string(output), "not found") {
return output, nil
}
return output, err
}
// ContainerRemove
// docker remove --force
func ContainerRemove(ctx context.Context, containerName string) ([]byte, error) {
output, err := exec.CommandContext(ctx, "docker", "remove", "--force", containerName).CombinedOutput()
if err != nil && strings.Contains(string(output), "not found") {
return output, nil
}
return output, err
}
func ContainerKill(ctx context.Context, name *string) ([]byte, error) {
output, err := exec.CommandContext(ctx, "docker", "kill", *name, "--signal", "SIGTERM").CombinedOutput()
if err != nil && strings.Contains(string(output), "not found") {
return output, nil
}
return output, err
return result
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
@@ -158,17 +157,17 @@ func (option *Options) Connect(ctx context.Context, sshConfig *pkgssh.SshConfig,
return err
}
plog.G(ctx).Infof("Starting connect to cluster in container")
err = WaitDockerContainerRunning(ctx, *name)
err = util.WaitDockerContainerRunning(ctx, *name)
if err != nil {
return err
}
option.AddRollbackFunc(func() error {
// docker kill --signal
_, _ = ContainerKill(context.Background(), name)
_ = RunLogsSinceNow(*name, true)
_, _ = util.ContainerKill(context.Background(), name)
_ = util.RunLogsSinceNow(*name, true)
return nil
})
err = RunLogsWaitRunning(ctx, *name)
err = util.RunLogsWaitRunning(ctx, *name)
if err != nil {
// interrupt by signal KILL
if errors.Is(err, context.Canceled) {
@@ -291,7 +290,7 @@ func (option *Options) CreateConnectContainer(ctx context.Context, portBindings
suffix := strings.ReplaceAll(uuid.New().String(), "-", "")[:5]
name := util.Join(option.Namespace, "kubevpn", suffix)
_, err = CreateNetwork(ctx, config.ConfigMapPodTrafficManager)
_, err = util.CreateNetwork(ctx, config.ConfigMapPodTrafficManager)
if err != nil {
return nil, err
}
@@ -318,22 +317,17 @@ func (option *Options) CreateConnectContainer(ctx context.Context, portBindings
args = append(args, "--publish", fmt.Sprintf("%s:%s", port.Port(), bindings[0].HostPort))
}
var result []string
var result = []string{"docker"}
result = append(result, args...)
result = append(result, config.Image)
result = append(result, entrypoint...)
err = ContainerRun(ctx, result...)
err = util.RunContainer(ctx, result)
if err != nil {
return nil, err
}
return &name, nil
}
func ContainerRun(ctx context.Context, args ...string) error {
err := exec.CommandContext(ctx, "docker", args...).Run()
return err
}
func (option *Options) AddRollbackFunc(f func() error) {
option.rollbackFuncList = append(option.rollbackFuncList, f)
}

View File

@@ -40,23 +40,23 @@ type ConfigList []*RunConfig
func (l ConfigList) Remove(ctx context.Context, userAnotherContainerNet bool) error {
for index, runConfig := range l {
if !userAnotherContainerNet && index == len(l)-1 {
output, err := NetworkDisconnect(ctx, runConfig.name)
output, err := util.NetworkDisconnect(ctx, runConfig.name)
if err != nil {
plog.G(ctx).Warnf("Failed to disconnect container network: %s: %v", string(output), err)
}
}
output, err := ContainerRemove(ctx, runConfig.name)
output, err := util.ContainerRemove(ctx, runConfig.name)
if err != nil {
plog.G(ctx).Warnf("Failed to remove container: %s: %v", string(output), err)
}
}
name := config.ConfigMapPodTrafficManager
inspect, err := NetworkInspect(ctx, name)
inspect, err := util.NetworkInspect(ctx, name)
if err != nil {
return err
}
if len(inspect.Containers) == 0 {
return NetworkRemove(ctx, name)
return util.NetworkRemove(ctx, name)
}
return nil
}
@@ -65,13 +65,13 @@ func (l ConfigList) Run(ctx context.Context) error {
for index := len(l) - 1; index >= 0; index-- {
conf := l[index]
err := RunContainer(ctx, conf)
err := util.RunContainer(ctx, convertToDockerArgs(conf))
if err != nil {
return err
}
if index != 0 {
err = WaitDockerContainerRunning(ctx, conf.name)
err = util.WaitDockerContainerRunning(ctx, conf.name)
if err != nil {
return err
}
@@ -190,7 +190,7 @@ func (option *Options) ConvertPodToContainerConfigList(
if hostConfig.PublishAllPorts {
options = append(options, "--publish-all")
}
_, err = CreateNetwork(ctx, config.ConfigMapPodTrafficManager)
_, err = util.CreateNetwork(ctx, config.ConfigMapPodTrafficManager)
if err != nil {
plog.G(ctx).Errorf("Failed to create network: %v", err)
return nil, err

View File

@@ -1,392 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.33.0
// protoc v5.29.3
// source: dhcpserver.proto
package rpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type RentIPRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
PodName string `protobuf:"bytes,1,opt,name=PodName,proto3" json:"PodName,omitempty"`
PodNamespace string `protobuf:"bytes,2,opt,name=PodNamespace,proto3" json:"PodNamespace,omitempty"`
}
func (x *RentIPRequest) Reset() {
*x = RentIPRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_dhcpserver_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *RentIPRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RentIPRequest) ProtoMessage() {}
func (x *RentIPRequest) ProtoReflect() protoreflect.Message {
mi := &file_dhcpserver_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RentIPRequest.ProtoReflect.Descriptor instead.
func (*RentIPRequest) Descriptor() ([]byte, []int) {
return file_dhcpserver_proto_rawDescGZIP(), []int{0}
}
func (x *RentIPRequest) GetPodName() string {
if x != nil {
return x.PodName
}
return ""
}
func (x *RentIPRequest) GetPodNamespace() string {
if x != nil {
return x.PodNamespace
}
return ""
}
type RentIPResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
IPv4CIDR string `protobuf:"bytes,1,opt,name=IPv4CIDR,proto3" json:"IPv4CIDR,omitempty"`
IPv6CIDR string `protobuf:"bytes,2,opt,name=IPv6CIDR,proto3" json:"IPv6CIDR,omitempty"`
}
func (x *RentIPResponse) Reset() {
*x = RentIPResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_dhcpserver_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *RentIPResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RentIPResponse) ProtoMessage() {}
func (x *RentIPResponse) ProtoReflect() protoreflect.Message {
mi := &file_dhcpserver_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RentIPResponse.ProtoReflect.Descriptor instead.
func (*RentIPResponse) Descriptor() ([]byte, []int) {
return file_dhcpserver_proto_rawDescGZIP(), []int{1}
}
func (x *RentIPResponse) GetIPv4CIDR() string {
if x != nil {
return x.IPv4CIDR
}
return ""
}
func (x *RentIPResponse) GetIPv6CIDR() string {
if x != nil {
return x.IPv6CIDR
}
return ""
}
type ReleaseIPRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
PodName string `protobuf:"bytes,1,opt,name=PodName,proto3" json:"PodName,omitempty"`
PodNamespace string `protobuf:"bytes,2,opt,name=PodNamespace,proto3" json:"PodNamespace,omitempty"`
IPv4CIDR string `protobuf:"bytes,3,opt,name=IPv4CIDR,proto3" json:"IPv4CIDR,omitempty"`
IPv6CIDR string `protobuf:"bytes,4,opt,name=IPv6CIDR,proto3" json:"IPv6CIDR,omitempty"`
}
func (x *ReleaseIPRequest) Reset() {
*x = ReleaseIPRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_dhcpserver_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReleaseIPRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReleaseIPRequest) ProtoMessage() {}
func (x *ReleaseIPRequest) ProtoReflect() protoreflect.Message {
mi := &file_dhcpserver_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReleaseIPRequest.ProtoReflect.Descriptor instead.
func (*ReleaseIPRequest) Descriptor() ([]byte, []int) {
return file_dhcpserver_proto_rawDescGZIP(), []int{2}
}
func (x *ReleaseIPRequest) GetPodName() string {
if x != nil {
return x.PodName
}
return ""
}
func (x *ReleaseIPRequest) GetPodNamespace() string {
if x != nil {
return x.PodNamespace
}
return ""
}
func (x *ReleaseIPRequest) GetIPv4CIDR() string {
if x != nil {
return x.IPv4CIDR
}
return ""
}
func (x *ReleaseIPRequest) GetIPv6CIDR() string {
if x != nil {
return x.IPv6CIDR
}
return ""
}
type ReleaseIPResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *ReleaseIPResponse) Reset() {
*x = ReleaseIPResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_dhcpserver_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ReleaseIPResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ReleaseIPResponse) ProtoMessage() {}
func (x *ReleaseIPResponse) ProtoReflect() protoreflect.Message {
mi := &file_dhcpserver_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ReleaseIPResponse.ProtoReflect.Descriptor instead.
func (*ReleaseIPResponse) Descriptor() ([]byte, []int) {
return file_dhcpserver_proto_rawDescGZIP(), []int{3}
}
func (x *ReleaseIPResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
var File_dhcpserver_proto protoreflect.FileDescriptor
var file_dhcpserver_proto_rawDesc = []byte{
0x0a, 0x10, 0x64, 0x68, 0x63, 0x70, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x22, 0x4d, 0x0a, 0x0d, 0x52, 0x65, 0x6e, 0x74, 0x49,
0x50, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x6f, 0x64, 0x4e,
0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x50, 0x6f, 0x64, 0x4e, 0x61,
0x6d, 0x65, 0x12, 0x22, 0x0a, 0x0c, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61,
0x63, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d,
0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x22, 0x48, 0x0a, 0x0e, 0x52, 0x65, 0x6e, 0x74, 0x49, 0x50,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x49, 0x50, 0x76, 0x34,
0x43, 0x49, 0x44, 0x52, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x49, 0x50, 0x76, 0x34,
0x43, 0x49, 0x44, 0x52, 0x12, 0x1a, 0x0a, 0x08, 0x49, 0x50, 0x76, 0x36, 0x43, 0x49, 0x44, 0x52,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x49, 0x50, 0x76, 0x36, 0x43, 0x49, 0x44, 0x52,
0x22, 0x88, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x49, 0x50, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12,
0x22, 0x0a, 0x0c, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x50, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70,
0x61, 0x63, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x49, 0x50, 0x76, 0x34, 0x43, 0x49, 0x44, 0x52, 0x18,
0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x49, 0x50, 0x76, 0x34, 0x43, 0x49, 0x44, 0x52, 0x12,
0x1a, 0x0a, 0x08, 0x49, 0x50, 0x76, 0x36, 0x43, 0x49, 0x44, 0x52, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x49, 0x50, 0x76, 0x36, 0x43, 0x49, 0x44, 0x52, 0x22, 0x2d, 0x0a, 0x11, 0x52,
0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x49, 0x50, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x79, 0x0a, 0x04, 0x44, 0x48,
0x43, 0x50, 0x12, 0x33, 0x0a, 0x06, 0x52, 0x65, 0x6e, 0x74, 0x49, 0x50, 0x12, 0x12, 0x2e, 0x72,
0x70, 0x63, 0x2e, 0x52, 0x65, 0x6e, 0x74, 0x49, 0x50, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x13, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x6e, 0x74, 0x49, 0x50, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3c, 0x0a, 0x09, 0x52, 0x65, 0x6c, 0x65, 0x61,
0x73, 0x65, 0x49, 0x50, 0x12, 0x15, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61,
0x73, 0x65, 0x49, 0x50, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x72, 0x70,
0x63, 0x2e, 0x52, 0x65, 0x6c, 0x65, 0x61, 0x73, 0x65, 0x49, 0x50, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_dhcpserver_proto_rawDescOnce sync.Once
file_dhcpserver_proto_rawDescData = file_dhcpserver_proto_rawDesc
)
func file_dhcpserver_proto_rawDescGZIP() []byte {
file_dhcpserver_proto_rawDescOnce.Do(func() {
file_dhcpserver_proto_rawDescData = protoimpl.X.CompressGZIP(file_dhcpserver_proto_rawDescData)
})
return file_dhcpserver_proto_rawDescData
}
var file_dhcpserver_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_dhcpserver_proto_goTypes = []interface{}{
(*RentIPRequest)(nil), // 0: rpc.RentIPRequest
(*RentIPResponse)(nil), // 1: rpc.RentIPResponse
(*ReleaseIPRequest)(nil), // 2: rpc.ReleaseIPRequest
(*ReleaseIPResponse)(nil), // 3: rpc.ReleaseIPResponse
}
var file_dhcpserver_proto_depIdxs = []int32{
0, // 0: rpc.DHCP.RentIP:input_type -> rpc.RentIPRequest
2, // 1: rpc.DHCP.ReleaseIP:input_type -> rpc.ReleaseIPRequest
1, // 2: rpc.DHCP.RentIP:output_type -> rpc.RentIPResponse
3, // 3: rpc.DHCP.ReleaseIP:output_type -> rpc.ReleaseIPResponse
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_dhcpserver_proto_init() }
func file_dhcpserver_proto_init() {
if File_dhcpserver_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_dhcpserver_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RentIPRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_dhcpserver_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*RentIPResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_dhcpserver_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReleaseIPRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_dhcpserver_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ReleaseIPResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_dhcpserver_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_dhcpserver_proto_goTypes,
DependencyIndexes: file_dhcpserver_proto_depIdxs,
MessageInfos: file_dhcpserver_proto_msgTypes,
}.Build()
File_dhcpserver_proto = out.File
file_dhcpserver_proto_rawDesc = nil
file_dhcpserver_proto_goTypes = nil
file_dhcpserver_proto_depIdxs = nil
}

View File

@@ -1,30 +0,0 @@
syntax = "proto3";
option go_package = ".;rpc";
package rpc;
service DHCP {
rpc RentIP (RentIPRequest) returns (RentIPResponse) {}
rpc ReleaseIP (ReleaseIPRequest) returns (ReleaseIPResponse) {}
}
message RentIPRequest {
string PodName = 1;
string PodNamespace = 2;
}
message RentIPResponse {
string IPv4CIDR = 1;
string IPv6CIDR = 2;
}
message ReleaseIPRequest {
string PodName = 1;
string PodNamespace = 2;
string IPv4CIDR = 3;
string IPv6CIDR = 4;
}
message ReleaseIPResponse {
string message = 1;
}

View File

@@ -1,146 +0,0 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v5.29.3
// source: dhcpserver.proto
package rpc
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
const (
DHCP_RentIP_FullMethodName = "/rpc.DHCP/RentIP"
DHCP_ReleaseIP_FullMethodName = "/rpc.DHCP/ReleaseIP"
)
// DHCPClient is the client API for DHCP service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DHCPClient interface {
RentIP(ctx context.Context, in *RentIPRequest, opts ...grpc.CallOption) (*RentIPResponse, error)
ReleaseIP(ctx context.Context, in *ReleaseIPRequest, opts ...grpc.CallOption) (*ReleaseIPResponse, error)
}
type dHCPClient struct {
cc grpc.ClientConnInterface
}
func NewDHCPClient(cc grpc.ClientConnInterface) DHCPClient {
return &dHCPClient{cc}
}
func (c *dHCPClient) RentIP(ctx context.Context, in *RentIPRequest, opts ...grpc.CallOption) (*RentIPResponse, error) {
out := new(RentIPResponse)
err := c.cc.Invoke(ctx, DHCP_RentIP_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *dHCPClient) ReleaseIP(ctx context.Context, in *ReleaseIPRequest, opts ...grpc.CallOption) (*ReleaseIPResponse, error) {
out := new(ReleaseIPResponse)
err := c.cc.Invoke(ctx, DHCP_ReleaseIP_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// DHCPServer is the server API for DHCP service.
// All implementations must embed UnimplementedDHCPServer
// for forward compatibility
type DHCPServer interface {
RentIP(context.Context, *RentIPRequest) (*RentIPResponse, error)
ReleaseIP(context.Context, *ReleaseIPRequest) (*ReleaseIPResponse, error)
mustEmbedUnimplementedDHCPServer()
}
// UnimplementedDHCPServer must be embedded to have forward compatible implementations.
type UnimplementedDHCPServer struct {
}
func (UnimplementedDHCPServer) RentIP(context.Context, *RentIPRequest) (*RentIPResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RentIP not implemented")
}
func (UnimplementedDHCPServer) ReleaseIP(context.Context, *ReleaseIPRequest) (*ReleaseIPResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReleaseIP not implemented")
}
func (UnimplementedDHCPServer) mustEmbedUnimplementedDHCPServer() {}
// UnsafeDHCPServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to DHCPServer will
// result in compilation errors.
type UnsafeDHCPServer interface {
mustEmbedUnimplementedDHCPServer()
}
func RegisterDHCPServer(s grpc.ServiceRegistrar, srv DHCPServer) {
s.RegisterService(&DHCP_ServiceDesc, srv)
}
func _DHCP_RentIP_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RentIPRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DHCPServer).RentIP(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: DHCP_RentIP_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DHCPServer).RentIP(ctx, req.(*RentIPRequest))
}
return interceptor(ctx, in, info, handler)
}
func _DHCP_ReleaseIP_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReleaseIPRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DHCPServer).ReleaseIP(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: DHCP_ReleaseIP_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DHCPServer).ReleaseIP(ctx, req.(*ReleaseIPRequest))
}
return interceptor(ctx, in, info, handler)
}
// DHCP_ServiceDesc is the grpc.ServiceDesc for DHCP service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var DHCP_ServiceDesc = grpc.ServiceDesc{
ServiceName: "rpc.DHCP",
HandlerType: (*DHCPServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "RentIP",
Handler: _DHCP_RentIP_Handler,
},
{
MethodName: "ReleaseIP",
Handler: _DHCP_ReleaseIP_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "dhcpserver.proto",
}

View File

@@ -1,3 +0,0 @@
package rpc
//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative dhcpserver.proto

View File

@@ -1,68 +0,0 @@
package dhcp
import (
"context"
"net"
"sync"
"k8s.io/client-go/kubernetes"
"github.com/wencaiwulue/kubevpn/v2/pkg/dhcp/rpc"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
type Server struct {
rpc.UnimplementedDHCPServer
sync.Mutex
clientset *kubernetes.Clientset
}
func NewServer(clientset *kubernetes.Clientset) *Server {
return &Server{
clientset: clientset,
}
}
func (s *Server) RentIP(ctx context.Context, req *rpc.RentIPRequest) (*rpc.RentIPResponse, error) {
s.Lock()
defer s.Unlock()
plog.G(ctx).Infof("Handling rent IP request, pod name: %s, ns: %s", req.PodName, req.PodNamespace)
manager := NewDHCPManager(s.clientset, req.PodNamespace)
v4, v6, err := manager.RentIP(ctx)
if err != nil {
plog.G(ctx).Errorf("Failed to rent IP: %v", err)
return nil, err
}
// todo patch annotation
resp := &rpc.RentIPResponse{
IPv4CIDR: v4.String(),
IPv6CIDR: v6.String(),
}
return resp, nil
}
func (s *Server) ReleaseIP(ctx context.Context, req *rpc.ReleaseIPRequest) (*rpc.ReleaseIPResponse, error) {
s.Lock()
defer s.Unlock()
plog.G(ctx).Infof("Handling release IP request, pod name: %s, ns: %s, IPv4: %s, IPv6: %s", req.PodName, req.PodNamespace, req.IPv4CIDR, req.IPv6CIDR)
ipv4, _, err := net.ParseCIDR(req.IPv4CIDR)
if err != nil {
plog.G(ctx).Errorf("IP %s is invailed: %v", req.IPv4CIDR, err)
}
var ipv6 net.IP
ipv6, _, err = net.ParseCIDR(req.IPv6CIDR)
if err != nil {
plog.G(ctx).Errorf("IP %s is invailed: %v", req.IPv6CIDR, err)
}
manager := NewDHCPManager(s.clientset, req.PodNamespace)
if err = manager.ReleaseIP(ctx, ipv4, ipv6); err != nil {
plog.G(ctx).Errorf("Failed to release IP: %v", err)
return nil, err
}
return &rpc.ReleaseIPResponse{}, nil
}

View File

@@ -1,51 +0,0 @@
package dhcp
import (
"encoding/base64"
"net"
"testing"
"github.com/cilium/ipam/service/allocator"
"github.com/cilium/ipam/service/ipallocator"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)
func TestName(t *testing.T) {
cidr := &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
dhcp, err := ipallocator.NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
t.Fatal(err)
}
s := "Aw=="
var str []byte
str, err = base64.StdEncoding.DecodeString(s)
if err != nil {
t.Fatal(err)
}
err = dhcp.Restore(cidr, str)
if err != nil {
t.Fatal(err)
}
next, err := dhcp.AllocateNext()
if err != nil {
t.Fatal(err)
}
t.Log(next.String())
_, bytes, _ := dhcp.Snapshot()
t.Log(string(bytes))
}
func TestInit(t *testing.T) {
cidr := &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
dhcp, err := ipallocator.NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
t.Fatal(err)
}
snapshot, bytes, err := dhcp.Snapshot()
t.Log(string(snapshot), string(bytes), err)
}

View File

@@ -4,13 +4,10 @@ import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
@@ -20,47 +17,6 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
// Uninstall
// 1) quit daemon
// 2) get all proxy-resources from configmap
// 3) cleanup all containers
// 4) cleanup hosts
func (c *ConnectOptions) Uninstall(ctx context.Context) error {
plog.G(ctx).Infof("Cleaning up resources")
ns := c.Namespace
name := config.ConfigMapPodTrafficManager
options := metav1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}
_ = c.clientset.CoreV1().ConfigMaps(ns).Delete(ctx, name, options)
_ = c.clientset.CoreV1().Pods(ns).Delete(ctx, config.CniNetName, options)
_ = c.clientset.CoreV1().Secrets(ns).Delete(ctx, name, options)
_ = c.clientset.AdmissionregistrationV1().MutatingWebhookConfigurations().Delete(ctx, name+"."+ns, options)
_ = c.clientset.RbacV1().RoleBindings(ns).Delete(ctx, name, options)
_ = c.clientset.CoreV1().ServiceAccounts(ns).Delete(ctx, name, options)
_ = c.clientset.RbacV1().Roles(ns).Delete(ctx, name, options)
_ = c.clientset.CoreV1().Services(ns).Delete(ctx, name, options)
_ = c.clientset.AppsV1().Deployments(ns).Delete(ctx, name, options)
_ = c.clientset.BatchV1().Jobs(ns).Delete(ctx, name, options)
_ = c.CleanupLocalContainer(ctx)
plog.G(ctx).Info("Done")
return nil
}
func (c *ConnectOptions) CleanupLocalContainer(ctx context.Context) error {
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
if err != nil {
return err
}
var networkResource types.NetworkResource
networkResource, err = cli.NetworkInspect(ctx, config.ConfigMapPodTrafficManager, types.NetworkInspectOptions{})
if err != nil {
return err
}
if len(networkResource.Containers) == 0 {
err = cli.NetworkRemove(ctx, config.ConfigMapPodTrafficManager)
}
return err
}
func (c *ConnectOptions) LeaveAllProxyResources(ctx context.Context) (err error) {
if c == nil || c.clientset == nil {
return

View File

@@ -1,202 +0,0 @@
package ssh
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"github.com/distribution/reference"
"github.com/docker/cli/cli/command"
"github.com/docker/cli/cli/command/image"
"github.com/docker/cli/cli/flags"
"github.com/docker/cli/cli/streams"
"github.com/docker/cli/cli/trust"
typesimage "github.com/docker/docker/api/types/image"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/moby/term"
"github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/crypto/ssh"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
func GetClient() (*client.Client, *command.DockerCli, error) {
cli, err := client.NewClientWithOpts(
client.FromEnv,
client.WithAPIVersionNegotiation(),
)
if err != nil {
return nil, nil, err
}
var dockerCli *command.DockerCli
dockerCli, err = command.NewDockerCli(command.WithAPIClient(cli))
if err != nil {
return nil, nil, err
}
err = dockerCli.Initialize(flags.NewClientOptions())
if err != nil {
return nil, nil, err
}
return cli, dockerCli, nil
}
// TransferImage
// 1) if not special ssh config, just pull image and tag and push
// 2) if special ssh config, pull image, tag image, save image and scp image to remote, load image and push
func TransferImage(ctx context.Context, conf *SshConfig, imageSource, imageTarget string, out io.Writer) error {
client, cli, err := GetClient()
if err != nil {
plog.G(ctx).Errorf("Failed to get docker client: %v", err)
return err
}
// todo add flags? or detect k8s node runtime ?
platform := &v1.Platform{Architecture: "amd64", OS: "linux"}
err = PullImage(ctx, platform, client, cli, imageSource, out)
if err != nil {
plog.G(ctx).Errorf("Failed to pull image: %v", err)
return err
}
err = client.ImageTag(ctx, imageSource, imageTarget)
if err != nil {
plog.G(ctx).Errorf("Failed to tag image %s to %s: %v", imageSource, imageTarget, err)
return err
}
// use it if sshConfig is not empty
if conf.ConfigAlias == "" && conf.Addr == "" {
var distributionRef reference.Named
distributionRef, err = reference.ParseNormalizedNamed(imageTarget)
if err != nil {
plog.G(ctx).Errorf("Failed to parse image name %s: %v", imageTarget, err)
return err
}
var imgRefAndAuth trust.ImageRefAndAuth
imgRefAndAuth, err = trust.GetImageReferencesAndAuth(ctx, image.AuthResolver(cli), distributionRef.String())
if err != nil {
plog.G(ctx).Errorf("Failed to get image auth: %v", err)
return err
}
var encodedAuth string
encodedAuth, err = registrytypes.EncodeAuthConfig(*imgRefAndAuth.AuthConfig())
if err != nil {
plog.G(ctx).Errorf("Failed to encode auth config to base64: %v", err)
return err
}
requestPrivilege := command.RegistryAuthenticationPrivilegedFunc(cli, imgRefAndAuth.RepoInfo().Index, "push")
var readCloser io.ReadCloser
readCloser, err = client.ImagePush(ctx, imageTarget, typesimage.PushOptions{
RegistryAuth: encodedAuth,
PrivilegeFunc: requestPrivilege,
})
if err != nil {
plog.G(ctx).Errorf("Failed to push image %s, err: %v", imageTarget, err)
return err
}
defer readCloser.Close()
if out == nil {
_, out, _ = term.StdStreams()
}
outWarp := streams.NewOut(out)
err = jsonmessage.DisplayJSONMessagesToStream(readCloser, outWarp, nil)
if err != nil {
plog.G(ctx).Errorf("Failed to display message, err: %v", err)
return err
}
return nil
}
// transfer image to remote
var sshClient *ssh.Client
sshClient, err = DialSshRemote(ctx, conf, ctx.Done())
if err != nil {
return err
}
defer sshClient.Close()
var responseReader io.ReadCloser
responseReader, err = client.ImageSave(ctx, []string{imageTarget})
if err != nil {
plog.G(ctx).Errorf("Failed to save image %s: %v", imageTarget, err)
return err
}
defer responseReader.Close()
file, err := os.CreateTemp("", "*.tar")
if err != nil {
return err
}
plog.G(ctx).Infof("Saving image %s to temp file %s", imageTarget, file.Name())
if _, err = io.Copy(file, responseReader); err != nil {
return err
}
if err = file.Close(); err != nil {
return err
}
defer os.Remove(file.Name())
plog.G(ctx).Infof("Transferring image %s", imageTarget)
filename := filepath.Base(file.Name())
cmd := fmt.Sprintf(
"(docker load -i ~/.kubevpn/%s && docker push %s) || (nerdctl image load -i ~/.kubevpn/%s && nerdctl image push %s)",
filename, imageTarget,
filename, imageTarget,
)
stdout := plog.G(ctx).Out
err = SCPAndExec(ctx, stdout, stdout, sshClient, file.Name(), filename, []string{cmd}...)
if err != nil {
return err
}
plog.G(ctx).Infof("Loaded image: %s", imageTarget)
return nil
}
// PullImage image.RunPull(ctx, c, image.PullOptions{})
func PullImage(ctx context.Context, platform *v1.Platform, cli *client.Client, dockerCli *command.DockerCli, img string, out io.Writer) error {
var readCloser io.ReadCloser
var plat string
if platform != nil && platform.Architecture != "" && platform.OS != "" {
plat = fmt.Sprintf("%s/%s", platform.OS, platform.Architecture)
}
distributionRef, err := reference.ParseNormalizedNamed(img)
if err != nil {
plog.G(ctx).Errorf("Failed to parse image name %s: %v", img, err)
return err
}
var imgRefAndAuth trust.ImageRefAndAuth
imgRefAndAuth, err = trust.GetImageReferencesAndAuth(ctx, image.AuthResolver(dockerCli), distributionRef.String())
if err != nil {
plog.G(ctx).Errorf("Failed to get image auth: %v", err)
return err
}
var encodedAuth string
encodedAuth, err = registrytypes.EncodeAuthConfig(*imgRefAndAuth.AuthConfig())
if err != nil {
plog.G(ctx).Errorf("Failed to encode auth config to base64: %v", err)
return err
}
requestPrivilege := command.RegistryAuthenticationPrivilegedFunc(dockerCli, imgRefAndAuth.RepoInfo().Index, "pull")
readCloser, err = cli.ImagePull(ctx, img, typesimage.PullOptions{
All: false,
RegistryAuth: encodedAuth,
PrivilegeFunc: requestPrivilege,
Platform: plat,
})
if err != nil {
plog.G(ctx).Errorf("Failed to pull image %s: %v", img, err)
return err
}
defer readCloser.Close()
if out == nil {
_, out, _ = term.StdStreams()
}
outWarp := streams.NewOut(out)
err = jsonmessage.DisplayJSONMessagesToStream(readCloser, outWarp, nil)
if err != nil {
plog.G(ctx).Errorf("Failed to display message, err: %v", err)
return err
}
return nil
}

215
pkg/util/docker.go Normal file
View File

@@ -0,0 +1,215 @@
package util
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"strings"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/pkg/stdcopy"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
func RunLogsWaitRunning(ctx context.Context, name string) error {
buf := bytes.NewBuffer(nil)
w := io.MultiWriter(buf, os.Stdout)
args := []string{"logs", name, "--since", "0m", "--details", "--follow"}
cmd := exec.Command("docker", args...)
cmd.Stdout = w
go cmd.Start()
cancel, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
go func() {
t := time.NewTicker(time.Second)
defer t.Stop()
for range t.C {
// keyword, maybe can find another way more elegant
if strings.Contains(buf.String(), config.Slogan) {
cancelFunc()
return
}
}
}()
var errChan = make(chan error)
go func() {
var err error
_, err = stdcopy.StdCopy(w, os.Stdout, buf)
if err != nil {
errChan <- err
}
}()
select {
case err := <-errChan:
return err
case <-cancel.Done():
return nil
}
}
func RunLogsSinceNow(name string, follow bool) error {
args := []string{"logs", name, "--since", "0m", "--details"}
if follow {
args = append(args, "--follow")
}
output, err := exec.Command("docker", args...).CombinedOutput()
_, err = stdcopy.StdCopy(os.Stdout, os.Stderr, bytes.NewReader(output))
return err
}
// CreateNetwork
// docker create kubevpn-traffic-manager --labels owner=config.ConfigMapPodTrafficManager --subnet 198.18.0.0/16 --gateway 198.18.0.100
func CreateNetwork(ctx context.Context, name string) (string, error) {
args := []string{
"network",
"inspect",
name,
}
_, err := exec.CommandContext(ctx, "docker", args...).CombinedOutput()
if err == nil {
return name, nil
}
args = []string{
"network",
"create",
name,
"--label", "owner=" + name,
"--subnet", config.DockerCIDR.String(),
"--gateway", config.DockerRouterIP.String(),
"--driver", "bridge",
"--scope", "local",
}
id, err := exec.CommandContext(ctx, "docker", args...).CombinedOutput()
if err != nil {
return "", err
}
return string(id), nil
}
func RunContainer(ctx context.Context, args []string) error {
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
log.G(ctx).Debugf("Run container with cmd: %v", cmd.Args)
err := cmd.Start()
if err != nil {
log.G(ctx).Errorf("Failed to run container with cmd: %v: %v", cmd.Args, err)
return err
}
return cmd.Wait()
}
func WaitDockerContainerRunning(ctx context.Context, name string) error {
log.G(ctx).Infof("Wait container %s to be running...", name)
for ctx.Err() == nil {
time.Sleep(time.Second * 1)
inspect, err := ContainerInspect(ctx, name)
if err != nil {
return err
}
if inspect.State != nil && (inspect.State.Status == "exited" || inspect.State.Status == "dead" || inspect.State.Dead) {
err = errors.New(fmt.Sprintf("container status: %s", inspect.State.Status))
break
}
if inspect.State != nil && inspect.State.Running {
break
}
}
log.G(ctx).Infof("Container %s is running now", name)
return nil
}
func ContainerInspect(ctx context.Context, name string) (types.ContainerJSON, error) {
output, err := exec.CommandContext(ctx, "docker", "inspect", name).CombinedOutput()
if err != nil {
log.G(ctx).Errorf("Failed to wait container to be ready output: %s: %v", string(output), err)
_ = RunLogsSinceNow(name, false)
return types.ContainerJSON{}, err
}
var inspect []types.ContainerJSON
rdr := bytes.NewReader(output)
err = json.NewDecoder(rdr).Decode(&inspect)
if err != nil {
return types.ContainerJSON{}, err
}
if len(inspect) == 0 {
return types.ContainerJSON{}, err
}
return inspect[0], nil
}
func NetworkInspect(ctx context.Context, name string) (network.Inspect, error) {
output, err := exec.CommandContext(ctx, "docker", "network", "inspect", name).CombinedOutput()
if err != nil {
log.G(ctx).Errorf("Failed to wait container to be ready: %v", err)
_ = RunLogsSinceNow(name, false)
return network.Inspect{}, err
}
var inspect []network.Inspect
rdr := bytes.NewReader(output)
err = json.NewDecoder(rdr).Decode(&inspect)
if err != nil {
return network.Inspect{}, err
}
if len(inspect) == 0 {
return network.Inspect{}, err
}
return inspect[0], nil
}
func NetworkRemove(ctx context.Context, name string) error {
output, err := exec.CommandContext(ctx, "docker", "network", "remove", name).CombinedOutput()
if err != nil && strings.Contains(string(output), "not found") {
return nil
}
return err
}
// NetworkDisconnect
// docker network disconnect --force
func NetworkDisconnect(ctx context.Context, containerName string) ([]byte, error) {
output, err := exec.CommandContext(ctx, "docker", "network", "disconnect", "--force", config.ConfigMapPodTrafficManager, containerName).CombinedOutput()
if err != nil && strings.Contains(string(output), "not found") {
return output, nil
}
return output, err
}
// ContainerRemove
// docker remove --force
func ContainerRemove(ctx context.Context, containerName string) ([]byte, error) {
output, err := exec.CommandContext(ctx, "docker", "remove", "--force", containerName).CombinedOutput()
if err != nil && strings.Contains(string(output), "not found") {
return output, nil
}
return output, err
}
func ContainerKill(ctx context.Context, name *string) ([]byte, error) {
output, err := exec.CommandContext(ctx, "docker", "kill", *name, "--signal", "SIGTERM").CombinedOutput()
if err != nil && strings.Contains(string(output), "not found") {
return output, nil
}
return output, err
}

View File

@@ -1,28 +1,15 @@
package webhook
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
"google.golang.org/grpc/admin"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"k8s.io/client-go/kubernetes"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon"
"github.com/wencaiwulue/kubevpn/v2/pkg/dhcp"
"github.com/wencaiwulue/kubevpn/v2/pkg/dhcp/rpc"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
putil "github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func Main(manager *dhcp.Manager, clientset *kubernetes.Clientset) error {
func Main(manager *dhcp.Manager) error {
h := &admissionReviewHandler{dhcp: manager}
http.HandleFunc("/pods", func(w http.ResponseWriter, r *http.Request) {
serve(w, r, newDelegateToV1AdmitHandler(h.admitPods))
@@ -36,32 +23,10 @@ func Main(manager *dhcp.Manager, clientset *kubernetes.Clientset) error {
return fmt.Errorf("failed to load tls certificate: %v", err)
}
grpcServer := grpc.NewServer()
cleanup, err := admin.Register(grpcServer)
if err != nil {
plog.G(context.Background()).Errorf("Failed to register admin: %v", err)
return err
}
grpc_health_v1.RegisterHealthServer(grpcServer, health.NewServer())
defer cleanup()
reflection.Register(grpcServer)
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 100
// startup a http server
// With downgrading-capable gRPC server, which can also handle HTTP.
downgradingServer := &http.Server{
server := &http.Server{
Addr: fmt.Sprintf(":%d", 80),
TLSConfig: &tls.Config{Certificates: tlsConfig.Certificates},
}
defer downgradingServer.Close()
var h2Server http2.Server
err = http2.ConfigureServer(downgradingServer, &h2Server)
if err != nil {
plog.G(context.Background()).Errorf("Failed to configure http2 server: %v", err)
return err
}
handler := daemon.CreateDowngradingHandler(grpcServer, http.HandlerFunc(http.DefaultServeMux.ServeHTTP))
downgradingServer.Handler = h2c.NewHandler(handler, &h2Server)
defer downgradingServer.Close()
rpc.RegisterDHCPServer(grpcServer, dhcp.NewServer(clientset))
return downgradingServer.ListenAndServeTLS("", "")
return server.ListenAndServeTLS("", "")
}