refactor: use origin workload of proxy mode (#621)

This commit is contained in:
naison
2025-06-04 14:30:24 +08:00
committed by GitHub
parent a750327d9e
commit fd59ed242c
14 changed files with 467 additions and 222 deletions

View File

@@ -6,7 +6,6 @@ import (
"time"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/wencaiwulue/kubevpn/v2/pkg/controlplane"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
@@ -28,20 +27,15 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err
mapInterface := svr.connect.GetClientset().CoreV1().ConfigMaps(namespace)
v4, _ := svr.connect.GetLocalTunIP()
for _, workload := range req.GetWorkloads() {
object, err := util.GetUnstructuredObject(factory, req.Namespace, workload)
object, controller, err := util.GetTopOwnerObject(ctx, factory, req.Namespace, workload)
if err != nil {
logger.Errorf("Failed to get unstructured object: %v", err)
return err
}
u := object.Object.(*unstructured.Unstructured)
templateSpec, _, err := util.GetPodTemplateSpecPath(u)
if err != nil {
logger.Errorf("Failed to get template spec path: %v", err)
logger.Errorf("Failed to get unstructured controller: %v", err)
return err
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
// add rollback func to remove envoy config
var empty bool
empty, err = inject.UnPatchContainer(ctx, factory, mapInterface, object, func(isFargateMode bool, rule *controlplane.Rule) bool {
empty, err = inject.UnPatchContainer(ctx, nodeID, factory, mapInterface, controller, func(isFargateMode bool, rule *controlplane.Rule) bool {
if isFargateMode {
return svr.connect.IsMe(req.Namespace, util.ConvertWorkloadToUid(workload), rule.Headers)
}
@@ -51,8 +45,8 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err
plog.G(ctx).Errorf("Leaving workload %s failed: %v", workload, err)
continue
}
if empty {
err = inject.ModifyServiceTargetPort(ctx, svr.connect.GetClientset(), req.Namespace, templateSpec.Labels, map[int32]int32{})
if empty && util.IsK8sService(object) {
err = inject.ModifyServiceTargetPort(ctx, svr.connect.GetClientset(), req.Namespace, object.Name, map[int32]int32{})
}
svr.connect.LeavePortMap(req.Namespace, workload)
err = util.RolloutStatus(ctx, factory, req.Namespace, workload, time.Minute*60)

View File

@@ -9,6 +9,7 @@ import (
"github.com/spf13/pflag"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/utils/ptr"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
@@ -58,10 +59,19 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e
return err
}
var workloads []string
workloads, err = util.NormalizedResource(ctx, connect.GetFactory(), connect.GetClientset(), req.Namespace, req.Workloads)
var objectList []*resource.Info
workloads, objectList, err = util.NormalizedResource(connect.GetFactory(), req.Namespace, req.Workloads)
if err != nil {
return err
}
// netstack gvisor only support k8s service
if config.Engine(req.Engine) == config.EngineGvisor {
for _, info := range objectList {
if !util.IsK8sService(info) {
return errors.Errorf("netstack gvisor mode only support k8s services, but got %s", info.Object.GetObjectKind().GroupVersionKind().Kind)
}
}
}
defer func() {
if e != nil && svr.connect != nil {

View File

@@ -172,25 +172,26 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context, namespace s
LocalTunIPv4: c.localTunIPv4.IP.String(),
LocalTunIPv6: c.localTunIPv6.IP.String(),
}
var object *runtimeresource.Info
object, err = util.GetUnstructuredObject(c.factory, namespace, workload)
var object, controller *runtimeresource.Info
object, controller, err = util.GetTopOwnerObject(ctx, c.factory, namespace, workload)
if err != nil {
return err
}
var templateSpec *v1.PodTemplateSpec
templateSpec, _, err = util.GetPodTemplateSpecPath(object.Object.(*unstructured.Unstructured))
templateSpec, _, err = util.GetPodTemplateSpecPath(controller.Object.(*unstructured.Unstructured))
if err != nil {
return
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
// todo consider to use ephemeral container
// https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/
// means mesh mode
if c.Engine == config.EngineGvisor {
err = inject.InjectEnvoySidecar(ctx, c.factory, c.clientset, c.Namespace, object, headers, portMap, tlsSecret)
err = inject.InjectEnvoySidecar(ctx, nodeID, c.factory, c.clientset, c.Namespace, object, controller, headers, portMap, tlsSecret)
} else if len(headers) != 0 || len(portMap) != 0 {
err = inject.InjectVPNAndEnvoySidecar(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, object, configInfo, headers, portMap, tlsSecret)
err = inject.InjectVPNAndEnvoySidecar(ctx, nodeID, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, controller, configInfo, headers, portMap, tlsSecret)
} else {
err = inject.InjectVPNSidecar(ctx, c.factory, c.Namespace, object, configInfo, tlsSecret)
err = inject.InjectVPNSidecar(ctx, nodeID, c.factory, c.Namespace, controller, configInfo, tlsSecret)
}
if err != nil {
plog.G(ctx).Errorf("Injecting inbound sidecar for %s in namespace %s failed: %s", workload, namespace, err.Error())

View File

@@ -2,16 +2,19 @@ package handler
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"reflect"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -34,8 +37,9 @@ var (
)
const (
local = `{"status": "Reviews is healthy on local pc"}`
remote = `{"status": "Reviews is healthy"}`
local = `{"status": "Reviews is healthy on local pc"}`
local8080 = `{"status": "Reviews is healthy on local pc 8080"}`
remote = `{"status": "Reviews is healthy"}`
)
func TestFunctions(t *testing.T) {
@@ -43,6 +47,7 @@ func TestFunctions(t *testing.T) {
t.Run("init", Init)
t.Run("kubevpnConnect", kubevpnConnect)
t.Run("commonTest", commonTest)
t.Run("checkConnectStatus", checkConnectStatus)
// 2) test proxy mode
t.Run("kubevpnProxy", kubevpnProxy)
@@ -50,6 +55,7 @@ func TestFunctions(t *testing.T) {
t.Run("testUDP", testUDP)
t.Run("proxyServiceReviewsServiceIP", proxyServiceReviewsServiceIP)
t.Run("proxyServiceReviewsPodIP", proxyServiceReviewsPodIP)
t.Run("checkProxyStatus", checkProxyStatus)
// 3) test proxy mode with service mesh
t.Run("kubevpnLeave", kubevpnLeave)
@@ -57,6 +63,7 @@ func TestFunctions(t *testing.T) {
t.Run("commonTest", commonTest)
t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP)
t.Run("serviceMeshReviewsPodIP", serviceMeshReviewsPodIP)
t.Run("checkProxyWithServiceMeshStatus", checkProxyWithServiceMeshStatus)
// 4) test proxy mode with service mesh and gvisor
t.Run("kubevpnLeave", kubevpnLeave)
@@ -64,6 +71,8 @@ func TestFunctions(t *testing.T) {
t.Run("kubevpnProxyWithServiceMeshAndGvisorMode", kubevpnProxyWithServiceMeshAndGvisorMode)
t.Run("commonTest", commonTest)
t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP)
t.Run("checkProxyWithServiceMeshAndGvisorStatus", checkProxyWithServiceMeshAndGvisorStatus)
t.Run("kubevpnLeaveService", kubevpnLeaveService)
t.Run("kubevpnQuit", kubevpnQuit)
// 5) install centrally in ns test -- connect mode
@@ -71,6 +80,7 @@ func TestFunctions(t *testing.T) {
t.Run("centerKubevpnInstallInNsKubevpn", kubevpnConnectToNsKubevpn)
t.Run("centerKubevpnConnect", kubevpnConnect)
t.Run("checkServiceShouldNotInNsDefault", checkServiceShouldNotInNsDefault)
t.Run("centerCheckConnectStatus", centerCheckConnectStatus)
t.Run("centerCommonTest", commonTest)
// 6) install centrally in ns test -- proxy mode
@@ -80,6 +90,7 @@ func TestFunctions(t *testing.T) {
t.Run("centerTestUDP", testUDP)
t.Run("centerProxyServiceReviewsServiceIP", proxyServiceReviewsServiceIP)
t.Run("centerProxyServiceReviewsPodIP", proxyServiceReviewsPodIP)
t.Run("centerCheckProxyStatus", centerCheckProxyStatus)
// 7) install centrally in ns test -- proxy mode with service mesh
t.Run("kubevpnLeave", kubevpnLeave)
@@ -88,13 +99,17 @@ func TestFunctions(t *testing.T) {
t.Run("commonTest", commonTest)
t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP)
t.Run("serviceMeshReviewsPodIP", serviceMeshReviewsPodIP)
t.Run("centerCheckProxyWithServiceMeshStatus", centerCheckProxyWithServiceMeshStatus)
// 8) install centrally in ns test -- proxy mode with service mesh and gvisor
t.Run("kubevpnQuit", kubevpnQuit)
t.Run("kubevpnProxyWithServiceMeshAndGvisorMode", kubevpnProxyWithServiceMeshAndGvisorMode)
t.Run("kubevpnProxyWithServiceMeshAndGvisorModePortMap", kubevpnProxyWithServiceMeshAndGvisorModePortMap)
t.Run("checkServiceShouldNotInNsDefault", checkServiceShouldNotInNsDefault)
t.Run("commonTest", commonTest)
t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP)
t.Run("serviceMeshReviewsServiceIPPortMap", serviceMeshReviewsServiceIPPortMap)
t.Run("kubevpnLeave", kubevpnLeave)
t.Run("centerCheckProxyWithServiceMeshAndGvisorStatus", centerCheckProxyWithServiceMeshAndGvisorStatus)
t.Run("kubevpnLeaveService", kubevpnLeaveService)
t.Run("kubevpnQuit", kubevpnQuit)
}
@@ -249,6 +264,17 @@ func serviceMeshReviewsServiceIP(t *testing.T) {
healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
}
func serviceMeshReviewsServiceIPPortMap(t *testing.T) {
app := "reviews"
ip, err := getServiceIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080)
healthChecker(t, endpoint, nil, remote)
healthChecker(t, endpoint, map[string]string{"env": "test"}, local8080)
}
func getServiceIP(app string) (string, error) {
serviceList, err := clientset.CoreV1().Services(namespace).List(context.Background(), v1.ListOptions{
LabelSelector: fields.OneTermEqualSelector("app", app).String(),
@@ -445,7 +471,17 @@ func kubevpnProxyWithServiceMesh(t *testing.T) {
}
func kubevpnProxyWithServiceMeshAndGvisorMode(t *testing.T) {
cmd := exec.Command("kubevpn", "proxy", "deployments/reviews", "--headers", "env=test", "--netstack", "gvisor", "--debug")
cmd := exec.Command("kubevpn", "proxy", "svc/reviews", "--headers", "env=test", "--netstack", "gvisor", "--debug")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
t.Fatal(err)
}
}
func kubevpnProxyWithServiceMeshAndGvisorModePortMap(t *testing.T) {
cmd := exec.Command("kubevpn", "proxy", "svc/reviews", "--headers", "env=test", "--netstack", "gvisor", "--debug", "--portmap", "9080:8080")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
@@ -464,6 +500,295 @@ func kubevpnLeave(t *testing.T) {
}
}
func kubevpnLeaveService(t *testing.T) {
cmd := exec.Command("kubevpn", "leave", "services/reviews")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
t.Fatal(err)
}
}
func checkConnectStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err)
}
expect := []*status{{
ID: 0,
Mode: "full",
Namespace: namespace,
Status: "Connected",
ProxyList: nil,
}}
var statuses []*status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
func centerCheckConnectStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err)
}
expect := []*status{{
ID: 0,
Mode: "full",
Namespace: "default",
Status: "Connected",
ProxyList: nil,
}}
var statuses []*status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
type status struct {
ID int32
Mode string
Namespace string
Status string
ProxyList []*proxy
}
type proxy struct {
Namespace string
Workload string
RuleList []*rule
}
type rule struct {
Headers map[string]string
CurrentDevice bool
PortMap map[int32]int32
}
func checkProxyStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err)
}
expect := []*status{{
ID: 0,
Mode: "full",
Namespace: namespace,
Status: "Connected",
ProxyList: []*proxy{{
Namespace: namespace,
Workload: "deployments.apps/reviews",
RuleList: []*rule{{
Headers: nil,
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
}},
}},
}}
var statuses []*status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
func centerCheckProxyStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err)
}
expect := []*status{{
ID: 0,
Mode: "full",
Namespace: "default",
Status: "Connected",
ProxyList: []*proxy{{
Namespace: "default",
Workload: "deployments.apps/reviews",
RuleList: []*rule{{
Headers: nil,
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
}},
}},
}}
var statuses []*status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
func checkProxyWithServiceMeshStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err)
}
expect := []*status{{
ID: 0,
Mode: "full",
Namespace: namespace,
Status: "Connected",
ProxyList: []*proxy{{
Namespace: namespace,
Workload: "deployments.apps/reviews",
RuleList: []*rule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
}},
}},
}}
var statuses []*status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
func centerCheckProxyWithServiceMeshStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err)
}
expect := []*status{{
ID: 0,
Mode: "full",
Namespace: "default",
Status: "Connected",
ProxyList: []*proxy{{
Namespace: "default",
Workload: "deployments.apps/reviews",
RuleList: []*rule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
}},
}},
}}
var statuses []*status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
func checkProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err)
}
expect := []*status{{
ID: 0,
Mode: "full",
Namespace: namespace,
Status: "Connected",
ProxyList: []*proxy{{
Namespace: namespace,
Workload: "services/reviews",
RuleList: []*rule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
}},
}},
}}
var statuses []*status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
opt := cmp.FilterPath(func(p cmp.Path) bool {
vx := p.Last().String()
if vx == `["Headers"]` {
return true
}
return false
}, cmp.Ignore())
if !cmp.Equal(statuses, expect, opt) {
marshal, _ := json.Marshal(expect)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
func centerCheckProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err)
}
expect := []*status{{
ID: 0,
Mode: "full",
Namespace: "default",
Status: "Connected",
ProxyList: []*proxy{{
Namespace: "default",
Workload: "services/reviews",
RuleList: []*rule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 8080},
}},
}},
}}
var statuses []*status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
opt := cmp.FilterPath(func(p cmp.Path) bool {
vx := p.Last().String()
if vx == `["Headers"]` {
return true
}
return false
}, cmp.Ignore())
if !cmp.Equal(statuses, expect, opt) {
marshal, _ := json.Marshal(expect)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
func kubevpnUninstall(t *testing.T) {
cmd := exec.Command("kubevpn", "uninstall", "kubevpn")
cmd.Stdout = os.Stdout
@@ -533,18 +858,19 @@ func Init(t *testing.T) {
t.Fatal(err)
}
go startupHttpServer(t, local)
go startupHttpServer(t, "9080", local)
go startupHttpServer(t, "8080", local8080)
}
func startupHttpServer(t *testing.T, str string) {
func startupHttpServer(t *testing.T, port, str string) {
var health = func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(str))
}
http.HandleFunc("/", health)
http.HandleFunc("/health", health)
t.Log("Start listening http port 9080 ...")
err := http.ListenAndServe(":9080", nil)
mux := http.NewServeMux()
mux.HandleFunc("/", health)
mux.HandleFunc("/health", health)
t.Logf("Start listening http port %s ...", port)
err := http.ListenAndServe(":"+port, mux)
if err != nil {
t.Fatal(err)
}

View File

@@ -33,7 +33,7 @@ func (c *ConnectOptions) Reset(ctx context.Context, namespace string, workloads
}
var err error
workloads, err = util.NormalizedResource(ctx, c.factory, c.clientset, namespace, workloads)
workloads, _, err = util.NormalizedResource(c.factory, namespace, workloads)
if err != nil {
return err
}
@@ -93,13 +93,13 @@ func resetConfigMap(ctx context.Context, mapInterface v1.ConfigMapInterface, nam
}
func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workload string) error {
object, err := util.GetUnstructuredObject(factory, namespace, workload)
object, controller, err := util.GetTopOwnerObject(ctx, factory, namespace, workload)
if err != nil {
plog.G(ctx).Errorf("Failed to get unstructured object: %v", err)
return err
}
u := object.Object.(*unstructured.Unstructured)
u := controller.Object.(*unstructured.Unstructured)
templateSpec, depth, err := util.GetPodTemplateSpecPath(u)
if err != nil {
plog.G(ctx).Errorf("Failed to get template spec path: %v", err)
@@ -110,7 +110,7 @@ func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clients
inject.RemoveContainers(templateSpec)
helper := pkgresource.NewHelper(object.Client, object.Mapping)
helper := pkgresource.NewHelper(controller.Client, controller.Mapping)
plog.G(ctx).Debugf("The %s is under controller management", workload)
// resource with controller, like deployment,statefulset
var bytes []byte
@@ -125,11 +125,14 @@ func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clients
plog.G(ctx).Errorf("Failed to generate json patch: %v", err)
return err
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
_, err = helper.Patch(controller.Namespace, controller.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
if err != nil {
plog.G(ctx).Errorf("Failed to patch resource: %s %s: %v", object.Mapping.Resource.Resource, object.Name, err)
plog.G(ctx).Errorf("Failed to patch resource: %s %s: %v", controller.Mapping.Resource.Resource, controller.Name, err)
return err
}
if !util.IsK8sService(object) {
return nil
}
var portmap = make(map[int32]int32)
for _, container := range templateSpec.Spec.Containers {
@@ -137,6 +140,6 @@ func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clients
portmap[port.ContainerPort] = port.ContainerPort
}
}
err = inject.ModifyServiceTargetPort(ctx, clientset, namespace, templateSpec.Labels, portmap)
err = inject.ModifyServiceTargetPort(ctx, clientset, namespace, object.Name, portmap)
return err
}

View File

@@ -2,13 +2,13 @@ package handler
import (
"context"
"fmt"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/utils/pointer"
"sigs.k8s.io/yaml"
@@ -91,19 +91,14 @@ func (c *ConnectOptions) LeaveAllProxyResources(ctx context.Context) (err error)
v4, _ := c.GetLocalTunIP()
for _, workload := range c.ProxyResources() {
// deployments.apps.ry-server --> deployments.apps/ry-server
object, err := util.GetUnstructuredObject(c.factory, workload.namespace, workload.workload)
object, controller, err := util.GetTopOwnerObject(ctx, c.factory, workload.namespace, workload.workload)
if err != nil {
plog.G(ctx).Errorf("Failed to get unstructured object: %v", err)
return err
}
u := object.Object.(*unstructured.Unstructured)
templateSpec, _, err := util.GetPodTemplateSpecPath(u)
if err != nil {
plog.G(ctx).Errorf("Failed to get template spec path: %v", err)
return err
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
var empty bool
empty, err = inject.UnPatchContainer(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), object, func(isFargateMode bool, rule *controlplane.Rule) bool {
empty, err = inject.UnPatchContainer(ctx, nodeID, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), controller, func(isFargateMode bool, rule *controlplane.Rule) bool {
if isFargateMode {
return c.IsMe(workload.namespace, util.ConvertWorkloadToUid(workload.workload), rule.Headers)
}
@@ -113,8 +108,8 @@ func (c *ConnectOptions) LeaveAllProxyResources(ctx context.Context) (err error)
plog.G(ctx).Errorf("Failed to leave workload %s in namespace %s: %v", workload.workload, workload.namespace, err)
continue
}
if empty {
err = inject.ModifyServiceTargetPort(ctx, c.clientset, workload.namespace, templateSpec.Labels, map[int32]int32{})
if empty && util.IsK8sService(object) {
err = inject.ModifyServiceTargetPort(ctx, c.clientset, workload.namespace, object.Name, map[int32]int32{})
}
c.LeavePortMap(workload.namespace, workload.workload)
}

View File

@@ -38,7 +38,7 @@ func RemoveContainers(spec *v1.PodTemplateSpec) {
}
// AddMeshContainer todo envoy support ipv6
func AddMeshContainer(spec *v1.PodTemplateSpec, ns, nodeId string, c util.PodRouteConfig, ipv6 bool, connectNamespace string, secret *v1.Secret) {
func AddMeshContainer(spec *v1.PodTemplateSpec, ns, nodeID string, c util.PodRouteConfig, ipv6 bool, connectNamespace string, secret *v1.Secret) {
// remove envoy proxy containers if already exist
RemoveContainers(spec)
@@ -144,9 +144,9 @@ kubevpn server -l "tun:/localhost:8422?net=${TunIPv4}&net6=${TunIPv6}&route=${CI
"--base-id",
"1",
"--service-node",
util.GenEnvoyUID(ns, nodeId),
util.GenEnvoyUID(ns, nodeID),
"--service-cluster",
util.GenEnvoyUID(ns, nodeId),
util.GenEnvoyUID(ns, nodeID),
"--config-yaml",
},
Args: []string{
@@ -171,7 +171,7 @@ kubevpn server -l "tun:/localhost:8422?net=${TunIPv4}&net6=${TunIPv6}&route=${CI
})
}
func AddEnvoyContainer(spec *v1.PodTemplateSpec, ns, nodeId string, ipv6 bool, connectNamespace string, secret *v1.Secret) {
func AddEnvoyContainer(spec *v1.PodTemplateSpec, ns, nodeID string, ipv6 bool, connectNamespace string, secret *v1.Secret) {
// remove envoy proxy containers if already exist
RemoveContainers(spec)
@@ -208,9 +208,9 @@ kubevpn server -l "ssh://:2222"`,
"--base-id",
"1",
"--service-node",
util.GenEnvoyUID(ns, nodeId),
util.GenEnvoyUID(ns, nodeID),
"--service-cluster",
util.GenEnvoyUID(ns, nodeId),
util.GenEnvoyUID(ns, nodeID),
"--config-yaml",
},
Args: []string{

View File

@@ -10,7 +10,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
k8sjson "k8s.io/apimachinery/pkg/util/json"
@@ -28,7 +27,7 @@ import (
// InjectEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1
// https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio
func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kubernetes.Clientset, connectNamespace string, object *runtimeresource.Info, headers map[string]string, portMap []string, secret *v1.Secret) (err error) {
func InjectEnvoySidecar(ctx context.Context, nodeID string, f cmdutil.Factory, clientset *kubernetes.Clientset, connectNamespace string, current, object *runtimeresource.Info, headers map[string]string, portMap []string, secret *v1.Secret) (err error) {
u := object.Object.(*unstructured.Unstructured)
var templateSpec *v1.PodTemplateSpec
var path []string
@@ -37,8 +36,6 @@ func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kuber
return err
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
c := util.PodRouteConfig{LocalTunIPv4: "127.0.0.1", LocalTunIPv6: netip.IPv6Loopback().String()}
ports, portmap := GetPort(templateSpec, portMap)
port := controlplane.ConvertContainerPort(ports...)
@@ -91,34 +88,22 @@ func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kuber
return err
}
if !util.IsK8sService(current) {
return nil
}
// 2) modify service containerPort to envoy listener port
err = ModifyServiceTargetPort(ctx, clientset, object.Namespace, templateSpec.Labels, containerPort2EnvoyListenerPort)
err = ModifyServiceTargetPort(ctx, clientset, object.Namespace, current.Name, containerPort2EnvoyListenerPort)
if err != nil {
return err
}
return nil
}
func ModifyServiceTargetPort(ctx context.Context, clientset *kubernetes.Clientset, namespace string, podLabels map[string]string, m map[int32]int32) error {
// service selector == pod labels
list, err := clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{})
func ModifyServiceTargetPort(ctx context.Context, clientset *kubernetes.Clientset, namespace string, name string, m map[int32]int32) error {
svc, err := clientset.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
var svc *v1.Service
for _, item := range list.Items {
if item.Spec.Selector == nil {
continue
}
if labels.SelectorFromSet(item.Spec.Selector).Matches(labels.Set(podLabels)) {
svc = &item
break
}
}
if svc == nil {
return fmt.Errorf("can not found service with selector: %v", podLabels)
}
for i := range len(svc.Spec.Ports) {
if p, found := m[svc.Spec.Ports[i].Port]; found {
svc.Spec.Ports[i].TargetPort = intstr.FromInt32(p)

View File

@@ -31,7 +31,7 @@ import (
// https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio
// InjectVPNAndEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1
func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, mapInterface v12.ConfigMapInterface, connectNamespace string, object *runtimeresource.Info, c util.PodRouteConfig, headers map[string]string, portMaps []string, secret *v1.Secret) (err error) {
func InjectVPNAndEnvoySidecar(ctx context.Context, nodeID string, f cmdutil.Factory, mapInterface v12.ConfigMapInterface, connectNamespace string, object *runtimeresource.Info, c util.PodRouteConfig, headers map[string]string, portMaps []string, secret *v1.Secret) (err error) {
u := object.Object.(*unstructured.Unstructured)
var templateSpec *v1.PodTemplateSpec
var path []string
@@ -70,8 +70,6 @@ func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, mapInterfa
}
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
err = addEnvoyConfig(mapInterface, object.Namespace, nodeID, c, headers, ports, portmap)
if err != nil {
plog.G(ctx).Errorf("Failed to add envoy config: %v", err)
@@ -114,7 +112,7 @@ func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, mapInterfa
return err
}
func UnPatchContainer(ctx context.Context, factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, object *runtimeresource.Info, isMeFunc func(isFargateMode bool, rule *controlplane.Rule) bool) (bool, error) {
func UnPatchContainer(ctx context.Context, nodeID string, factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, object *runtimeresource.Info, isMeFunc func(isFargateMode bool, rule *controlplane.Rule) bool) (bool, error) {
u := object.Object.(*unstructured.Unstructured)
templateSpec, depth, err := util.GetPodTemplateSpecPath(u)
if err != nil {
@@ -122,7 +120,6 @@ func UnPatchContainer(ctx context.Context, factory cmdutil.Factory, mapInterface
return false, err
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
workload := util.ConvertUidToWorkload(nodeID)
var empty, found bool
empty, found, err = removeEnvoyConfig(mapInterface, object.Namespace, nodeID, isMeFunc)

View File

@@ -24,7 +24,7 @@ import (
util2 "github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func InjectVPNSidecar(ctx context.Context, f util.Factory, connectNamespace string, object *resource.Info, c util2.PodRouteConfig, secret *v1.Secret) error {
func InjectVPNSidecar(ctx context.Context, nodeID string, f util.Factory, connectNamespace string, object *resource.Info, c util2.PodRouteConfig, secret *v1.Secret) error {
u := object.Object.(*unstructured.Unstructured)
podTempSpec, path, err := util2.GetPodTemplateSpecPath(u)
@@ -36,7 +36,6 @@ func InjectVPNSidecar(ctx context.Context, f util.Factory, connectNamespace stri
if err != nil {
return err
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
var ports []v1.ContainerPort
for _, container := range podTempSpec.Spec.Containers {
ports = append(ports, container.Ports...)
@@ -123,51 +122,6 @@ func CreateAfterDeletePod(ctx context.Context, factory util.Factory, p *v1.Pod,
return nil
}
func removeInboundContainer(factory util.Factory, namespace, workloads string) error {
object, err := util2.GetUnstructuredObject(factory, namespace, workloads)
if err != nil {
return err
}
u := object.Object.(*unstructured.Unstructured)
podTempSpec, path, err := util2.GetPodTemplateSpecPath(u)
if err != nil {
return err
}
helper := resource.NewHelper(object.Client, object.Mapping)
// pods
if len(path) == 0 {
_, err = helper.DeleteWithOptions(object.Namespace, object.Name, &v12.DeleteOptions{
GracePeriodSeconds: pointer.Int64(0),
})
if err != nil {
return err
}
}
// how to scale to one
RemoveContainer(&podTempSpec.Spec)
bytes, err := json.Marshal([]struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value"`
}{{
Op: "replace",
Path: "/" + strings.Join(append(path, "spec"), "/"),
Value: podTempSpec.Spec,
}})
if err != nil {
return err
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &v12.PatchOptions{
//Force: &t,
})
return err
}
func CleanupUselessInfo(pod *v1.Pod) {
pod.SetSelfLink("")
pod.SetGeneration(0)

View File

@@ -189,34 +189,42 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
}
}
func GetTopOwnerReference(factory util.Factory, ns, workload string) (*resource.Info, error) {
func GetTopOwnerReference(factory util.Factory, ns, workload string) (object, controller *resource.Info, err error) {
object, err = GetUnstructuredObject(factory, ns, workload)
if err != nil {
return nil, nil, err
}
ownerRef := v1.GetControllerOf(object.Object.(*unstructured.Unstructured))
if ownerRef == nil {
return object, object, err
}
var owner = fmt.Sprintf("%s/%s", ownerRef.Kind, ownerRef.Name)
for {
object, err := GetUnstructuredObject(factory, ns, workload)
controller, err = GetUnstructuredObject(factory, ns, owner)
if err != nil {
return nil, err
return nil, nil, err
}
ownerReference := v1.GetControllerOf(object.Object.(*unstructured.Unstructured))
if ownerReference == nil {
return object, nil
ownerRef = v1.GetControllerOf(controller.Object.(*unstructured.Unstructured))
if ownerRef == nil {
return object, controller, nil
}
workload = fmt.Sprintf("%s/%s", ownerReference.Kind, ownerReference.Name)
owner = fmt.Sprintf("%s/%s", ownerRef.Kind, ownerRef.Name)
}
}
// GetTopOwnerReferenceBySelector assume pods, controller has same labels
func GetTopOwnerReferenceBySelector(factory util.Factory, ns, selector string) (sets.Set[string], error) {
object, err := GetUnstructuredObjectBySelector(factory, ns, selector)
func GetTopOwnerReferenceBySelector(factory util.Factory, ns, selector string) (object, controller *resource.Info, err error) {
objectList, err := GetUnstructuredObjectBySelector(factory, ns, selector)
if err != nil {
return nil, err
return nil, nil, err
}
set := sets.New[string]()
for _, info := range object {
ownerReference, err := GetTopOwnerReference(factory, ns, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
if err == nil && ownerReference.Mapping.Resource.Resource != "services" {
set.Insert(fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name))
for _, info := range objectList {
if IsK8sService(info) {
continue
}
return GetTopOwnerReference(factory, ns, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
}
return set, nil
return nil, nil, fmt.Errorf("can not find controller for %s", selector)
}
func Shell(_ context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, ns string, cmd []string) (string, error) {

View File

@@ -10,9 +10,7 @@ import (
v2 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/polymorphichelpers"
@@ -111,22 +109,6 @@ func GetPodTemplateSpecPath(u *unstructured.Unstructured) (*v1.PodTemplateSpec,
return &p, path, nil
}
func GetAnnotation(f util.Factory, ns string, resources string) (map[string]string, error) {
ownerReference, err := GetTopOwnerReference(f, ns, resources)
if err != nil {
return nil, err
}
u, ok := ownerReference.Object.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("can not convert to unstaructed")
}
annotations := u.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
return annotations, nil
}
/*
NormalizedResource convert user parameter to standard, example:
@@ -139,71 +121,61 @@ NormalizedResource convert user parameter to standard, example:
pod/productpage-without-controller --> pod/productpage-without-controller
service/productpage-without-pod --> controller/controllerName
*/
func NormalizedResource(ctx context.Context, f util.Factory, clientset *kubernetes.Clientset, ns string, workloads []string) ([]string, error) {
func NormalizedResource(f util.Factory, ns string, workloads []string) ([]string, []*resource.Info, error) {
if len(workloads) == 0 {
return nil, nil
return nil, nil, nil
}
objectList, err := GetUnstructuredObjectList(f, ns, workloads)
if err != nil {
return nil, err
return nil, nil, err
}
var resources []string
for _, info := range objectList {
resources = append(resources, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
}
workloads = resources
// normal workloads, like pod with controller, deployments, statefulset, replicaset etc...
for i, workload := range workloads {
var ownerReference *resource.Info
ownerReference, err = GetTopOwnerReference(f, ns, workload)
if err == nil {
workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name)
}
}
// service which associate with pod
for i, workload := range workloads {
var object *resource.Info
object, err = GetUnstructuredObject(f, ns, workload)
if err != nil {
return nil, err
}
if object.Mapping.Resource.Resource != "services" {
continue
}
var svc *v1.Service
svc, err = clientset.CoreV1().Services(ns).Get(ctx, object.Name, v2.GetOptions{})
if err != nil {
continue
}
var selector labels.Selector
_, selector, err = polymorphichelpers.SelectorsForObject(svc)
if err != nil {
continue
}
var podList *v1.PodList
podList, err = clientset.CoreV1().Pods(ns).List(ctx, v2.ListOptions{LabelSelector: selector.String()})
// if pod is not empty, using pods to find top controller
if err == nil && podList != nil && len(podList.Items) != 0 {
var ownerReference *resource.Info
ownerReference, err = GetTopOwnerReference(f, ns, fmt.Sprintf("%s/%s", "pods", podList.Items[0].Name))
if err == nil {
workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name)
}
} else { // if list is empty, means not create pods, just controllers
var controller sets.Set[string]
controller, err = GetTopOwnerReferenceBySelector(f, ns, selector.String())
if err == nil {
if len(controller) > 0 {
workloads[i] = controller.UnsortedList()[0]
}
}
// only a single service, not support it yet
if controller == nil || controller.Len() == 0 {
return nil, fmt.Errorf("not support resources: %s", workload)
}
}
}
return workloads, nil
return resources, objectList, nil
}
func GetTopOwnerObject(ctx context.Context, f util.Factory, ns string, workload string) (object, controller *resource.Info, err error) {
// normal workload, like pod with controller, deployments, statefulset, replicaset etc...
object, controller, err = GetTopOwnerReference(f, ns, workload)
if err != nil {
return nil, nil, err
}
if !IsK8sService(object) {
return object, controller, nil
}
clientset, err := f.KubernetesClientSet()
if err != nil {
return nil, nil, err
}
var svc *v1.Service
svc, err = clientset.CoreV1().Services(ns).Get(ctx, object.Name, v2.GetOptions{})
if err != nil {
return nil, nil, err
}
var selector labels.Selector
_, selector, err = polymorphichelpers.SelectorsForObject(svc)
if err != nil {
return nil, nil, err
}
var podList *v1.PodList
podList, err = clientset.CoreV1().Pods(ns).List(ctx, v2.ListOptions{LabelSelector: selector.String()})
if err != nil {
return nil, nil, err
}
// if pod is not empty, using pods to find top controller
if len(podList.Items) != 0 {
_, controller, err = GetTopOwnerReference(f, ns, fmt.Sprintf("%s/%s", "pods", podList.Items[0].Name))
return object, controller, err
}
// if list is empty, means not create pods, just controllers
_, controller, err = GetTopOwnerReferenceBySelector(f, ns, selector.String())
return object, controller, err
}
func IsK8sService(info *resource.Info) bool {
return info.Mapping.Resource.Resource == "services"
}