From fd59ed242c4fcfdd23afe64c09a63686ff635fbb Mon Sep 17 00:00:00 2001 From: naison <895703375@qq.com> Date: Wed, 4 Jun 2025 14:30:24 +0800 Subject: [PATCH] refactor: use origin workload of proxy mode (#621) --- cmd/kubevpn/cmds/leave.go | 2 +- cmd/kubevpn/cmds/remove.go | 2 +- pkg/daemon/action/leave.go | 18 +- pkg/daemon/action/proxy.go | 12 +- pkg/handler/connect.go | 13 +- pkg/handler/function_test.go | 350 +++++++++++++++++++++++++++++++++-- pkg/handler/reset.go | 17 +- pkg/handler/uninstall.go | 17 +- pkg/inject/controller.go | 12 +- pkg/inject/fargate.go | 29 +-- pkg/inject/mesh.go | 7 +- pkg/inject/proxy.go | 48 +---- pkg/util/pod.go | 40 ++-- pkg/util/unstructure.go | 122 +++++------- 14 files changed, 467 insertions(+), 222 deletions(-) diff --git a/cmd/kubevpn/cmds/leave.go b/cmd/kubevpn/cmds/leave.go index 531a9801..b33138e2 100644 --- a/cmd/kubevpn/cmds/leave.go +++ b/cmd/kubevpn/cmds/leave.go @@ -28,7 +28,7 @@ func CmdLeave(f cmdutil.Factory) *cobra.Command { `)), Example: templates.Examples(i18n.T(` # leave proxy resource and restore it to origin - kubevpn leave deployment/authors-clone-645d7 + kubevpn leave deployment/authors `)), Args: cobra.MatchAll(cobra.OnlyValidArgs, cobra.MinimumNArgs(1)), PreRunE: func(cmd *cobra.Command, args []string) (err error) { diff --git a/cmd/kubevpn/cmds/remove.go b/cmd/kubevpn/cmds/remove.go index 7ed6a315..70254af9 100644 --- a/cmd/kubevpn/cmds/remove.go +++ b/cmd/kubevpn/cmds/remove.go @@ -26,7 +26,7 @@ func CmdRemove(f cmdutil.Factory) *cobra.Command { `)), Example: templates.Examples(i18n.T(` # leave proxy resources to origin - kubevpn remove deployment/authors + kubevpn remove deployment/authors-clone-645d7 `)), PreRunE: func(cmd *cobra.Command, args []string) (err error) { return daemon.StartupDaemon(cmd.Context()) diff --git a/pkg/daemon/action/leave.go b/pkg/daemon/action/leave.go index cbe1a2fb..275fc076 100644 --- a/pkg/daemon/action/leave.go +++ b/pkg/daemon/action/leave.go @@ -6,7 +6,6 @@ import ( "time" log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "github.com/wencaiwulue/kubevpn/v2/pkg/controlplane" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" @@ -28,20 +27,15 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err mapInterface := svr.connect.GetClientset().CoreV1().ConfigMaps(namespace) v4, _ := svr.connect.GetLocalTunIP() for _, workload := range req.GetWorkloads() { - object, err := util.GetUnstructuredObject(factory, req.Namespace, workload) + object, controller, err := util.GetTopOwnerObject(ctx, factory, req.Namespace, workload) if err != nil { - logger.Errorf("Failed to get unstructured object: %v", err) - return err - } - u := object.Object.(*unstructured.Unstructured) - templateSpec, _, err := util.GetPodTemplateSpecPath(u) - if err != nil { - logger.Errorf("Failed to get template spec path: %v", err) + logger.Errorf("Failed to get unstructured controller: %v", err) return err } + nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) // add rollback func to remove envoy config var empty bool - empty, err = inject.UnPatchContainer(ctx, factory, mapInterface, object, func(isFargateMode bool, rule *controlplane.Rule) bool { + empty, err = inject.UnPatchContainer(ctx, nodeID, factory, mapInterface, controller, func(isFargateMode bool, rule *controlplane.Rule) bool { if isFargateMode { return svr.connect.IsMe(req.Namespace, util.ConvertWorkloadToUid(workload), rule.Headers) } @@ -51,8 +45,8 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err plog.G(ctx).Errorf("Leaving workload %s failed: %v", workload, err) continue } - if empty { - err = inject.ModifyServiceTargetPort(ctx, svr.connect.GetClientset(), req.Namespace, templateSpec.Labels, map[int32]int32{}) + if empty && util.IsK8sService(object) { + err = inject.ModifyServiceTargetPort(ctx, svr.connect.GetClientset(), req.Namespace, object.Name, map[int32]int32{}) } svr.connect.LeavePortMap(req.Namespace, workload) err = util.RolloutStatus(ctx, factory, req.Namespace, workload, time.Minute*60) diff --git a/pkg/daemon/action/proxy.go b/pkg/daemon/action/proxy.go index a8319b2c..38719b5b 100644 --- a/pkg/daemon/action/proxy.go +++ b/pkg/daemon/action/proxy.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/pflag" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "k8s.io/cli-runtime/pkg/resource" "k8s.io/utils/ptr" "github.com/wencaiwulue/kubevpn/v2/pkg/config" @@ -58,10 +59,19 @@ func (svr *Server) Proxy(req *rpc.ProxyRequest, resp rpc.Daemon_ProxyServer) (e return err } var workloads []string - workloads, err = util.NormalizedResource(ctx, connect.GetFactory(), connect.GetClientset(), req.Namespace, req.Workloads) + var objectList []*resource.Info + workloads, objectList, err = util.NormalizedResource(connect.GetFactory(), req.Namespace, req.Workloads) if err != nil { return err } + // netstack gvisor only support k8s service + if config.Engine(req.Engine) == config.EngineGvisor { + for _, info := range objectList { + if !util.IsK8sService(info) { + return errors.Errorf("netstack gvisor mode only support k8s services, but got %s", info.Object.GetObjectKind().GroupVersionKind().Kind) + } + } + } defer func() { if e != nil && svr.connect != nil { diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 0a8f2f03..b6383385 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -172,25 +172,26 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context, namespace s LocalTunIPv4: c.localTunIPv4.IP.String(), LocalTunIPv6: c.localTunIPv6.IP.String(), } - var object *runtimeresource.Info - object, err = util.GetUnstructuredObject(c.factory, namespace, workload) + var object, controller *runtimeresource.Info + object, controller, err = util.GetTopOwnerObject(ctx, c.factory, namespace, workload) if err != nil { return err } var templateSpec *v1.PodTemplateSpec - templateSpec, _, err = util.GetPodTemplateSpecPath(object.Object.(*unstructured.Unstructured)) + templateSpec, _, err = util.GetPodTemplateSpecPath(controller.Object.(*unstructured.Unstructured)) if err != nil { return } + nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) // todo consider to use ephemeral container // https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/ // means mesh mode if c.Engine == config.EngineGvisor { - err = inject.InjectEnvoySidecar(ctx, c.factory, c.clientset, c.Namespace, object, headers, portMap, tlsSecret) + err = inject.InjectEnvoySidecar(ctx, nodeID, c.factory, c.clientset, c.Namespace, object, controller, headers, portMap, tlsSecret) } else if len(headers) != 0 || len(portMap) != 0 { - err = inject.InjectVPNAndEnvoySidecar(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, object, configInfo, headers, portMap, tlsSecret) + err = inject.InjectVPNAndEnvoySidecar(ctx, nodeID, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, controller, configInfo, headers, portMap, tlsSecret) } else { - err = inject.InjectVPNSidecar(ctx, c.factory, c.Namespace, object, configInfo, tlsSecret) + err = inject.InjectVPNSidecar(ctx, nodeID, c.factory, c.Namespace, controller, configInfo, tlsSecret) } if err != nil { plog.G(ctx).Errorf("Injecting inbound sidecar for %s in namespace %s failed: %s", workload, namespace, err.Error()) diff --git a/pkg/handler/function_test.go b/pkg/handler/function_test.go index e17e3b2d..3c8e6483 100644 --- a/pkg/handler/function_test.go +++ b/pkg/handler/function_test.go @@ -2,16 +2,19 @@ package handler import ( "context" + "encoding/json" "fmt" "io" "net" "net/http" "os" "os/exec" + "reflect" "sync" "testing" "time" + "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,8 +37,9 @@ var ( ) const ( - local = `{"status": "Reviews is healthy on local pc"}` - remote = `{"status": "Reviews is healthy"}` + local = `{"status": "Reviews is healthy on local pc"}` + local8080 = `{"status": "Reviews is healthy on local pc 8080"}` + remote = `{"status": "Reviews is healthy"}` ) func TestFunctions(t *testing.T) { @@ -43,6 +47,7 @@ func TestFunctions(t *testing.T) { t.Run("init", Init) t.Run("kubevpnConnect", kubevpnConnect) t.Run("commonTest", commonTest) + t.Run("checkConnectStatus", checkConnectStatus) // 2) test proxy mode t.Run("kubevpnProxy", kubevpnProxy) @@ -50,6 +55,7 @@ func TestFunctions(t *testing.T) { t.Run("testUDP", testUDP) t.Run("proxyServiceReviewsServiceIP", proxyServiceReviewsServiceIP) t.Run("proxyServiceReviewsPodIP", proxyServiceReviewsPodIP) + t.Run("checkProxyStatus", checkProxyStatus) // 3) test proxy mode with service mesh t.Run("kubevpnLeave", kubevpnLeave) @@ -57,6 +63,7 @@ func TestFunctions(t *testing.T) { t.Run("commonTest", commonTest) t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP) t.Run("serviceMeshReviewsPodIP", serviceMeshReviewsPodIP) + t.Run("checkProxyWithServiceMeshStatus", checkProxyWithServiceMeshStatus) // 4) test proxy mode with service mesh and gvisor t.Run("kubevpnLeave", kubevpnLeave) @@ -64,6 +71,8 @@ func TestFunctions(t *testing.T) { t.Run("kubevpnProxyWithServiceMeshAndGvisorMode", kubevpnProxyWithServiceMeshAndGvisorMode) t.Run("commonTest", commonTest) t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP) + t.Run("checkProxyWithServiceMeshAndGvisorStatus", checkProxyWithServiceMeshAndGvisorStatus) + t.Run("kubevpnLeaveService", kubevpnLeaveService) t.Run("kubevpnQuit", kubevpnQuit) // 5) install centrally in ns test -- connect mode @@ -71,6 +80,7 @@ func TestFunctions(t *testing.T) { t.Run("centerKubevpnInstallInNsKubevpn", kubevpnConnectToNsKubevpn) t.Run("centerKubevpnConnect", kubevpnConnect) t.Run("checkServiceShouldNotInNsDefault", checkServiceShouldNotInNsDefault) + t.Run("centerCheckConnectStatus", centerCheckConnectStatus) t.Run("centerCommonTest", commonTest) // 6) install centrally in ns test -- proxy mode @@ -80,6 +90,7 @@ func TestFunctions(t *testing.T) { t.Run("centerTestUDP", testUDP) t.Run("centerProxyServiceReviewsServiceIP", proxyServiceReviewsServiceIP) t.Run("centerProxyServiceReviewsPodIP", proxyServiceReviewsPodIP) + t.Run("centerCheckProxyStatus", centerCheckProxyStatus) // 7) install centrally in ns test -- proxy mode with service mesh t.Run("kubevpnLeave", kubevpnLeave) @@ -88,13 +99,17 @@ func TestFunctions(t *testing.T) { t.Run("commonTest", commonTest) t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP) t.Run("serviceMeshReviewsPodIP", serviceMeshReviewsPodIP) + t.Run("centerCheckProxyWithServiceMeshStatus", centerCheckProxyWithServiceMeshStatus) // 8) install centrally in ns test -- proxy mode with service mesh and gvisor t.Run("kubevpnQuit", kubevpnQuit) - t.Run("kubevpnProxyWithServiceMeshAndGvisorMode", kubevpnProxyWithServiceMeshAndGvisorMode) + t.Run("kubevpnProxyWithServiceMeshAndGvisorModePortMap", kubevpnProxyWithServiceMeshAndGvisorModePortMap) t.Run("checkServiceShouldNotInNsDefault", checkServiceShouldNotInNsDefault) t.Run("commonTest", commonTest) - t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP) + t.Run("serviceMeshReviewsServiceIPPortMap", serviceMeshReviewsServiceIPPortMap) + t.Run("kubevpnLeave", kubevpnLeave) + t.Run("centerCheckProxyWithServiceMeshAndGvisorStatus", centerCheckProxyWithServiceMeshAndGvisorStatus) + t.Run("kubevpnLeaveService", kubevpnLeaveService) t.Run("kubevpnQuit", kubevpnQuit) } @@ -249,6 +264,17 @@ func serviceMeshReviewsServiceIP(t *testing.T) { healthChecker(t, endpoint, map[string]string{"env": "test"}, local) } +func serviceMeshReviewsServiceIPPortMap(t *testing.T) { + app := "reviews" + ip, err := getServiceIP(app) + if err != nil { + t.Fatal(err) + } + endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080) + healthChecker(t, endpoint, nil, remote) + healthChecker(t, endpoint, map[string]string{"env": "test"}, local8080) +} + func getServiceIP(app string) (string, error) { serviceList, err := clientset.CoreV1().Services(namespace).List(context.Background(), v1.ListOptions{ LabelSelector: fields.OneTermEqualSelector("app", app).String(), @@ -445,7 +471,17 @@ func kubevpnProxyWithServiceMesh(t *testing.T) { } func kubevpnProxyWithServiceMeshAndGvisorMode(t *testing.T) { - cmd := exec.Command("kubevpn", "proxy", "deployments/reviews", "--headers", "env=test", "--netstack", "gvisor", "--debug") + cmd := exec.Command("kubevpn", "proxy", "svc/reviews", "--headers", "env=test", "--netstack", "gvisor", "--debug") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) + } +} + +func kubevpnProxyWithServiceMeshAndGvisorModePortMap(t *testing.T) { + cmd := exec.Command("kubevpn", "proxy", "svc/reviews", "--headers", "env=test", "--netstack", "gvisor", "--debug", "--portmap", "9080:8080") cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr err := cmd.Run() @@ -464,6 +500,295 @@ func kubevpnLeave(t *testing.T) { } } +func kubevpnLeaveService(t *testing.T) { + cmd := exec.Command("kubevpn", "leave", "services/reviews") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) + } +} + +func checkConnectStatus(t *testing.T) { + cmd := exec.Command("kubevpn", "status", "-o", "json") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + + expect := []*status{{ + ID: 0, + Mode: "full", + Namespace: namespace, + Status: "Connected", + ProxyList: nil, + }} + + var statuses []*status + if err = json.Unmarshal(output, &statuses); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(statuses, expect) { + marshal, _ := json.Marshal(expect) + t.Fatalf("expect: %s, but was: %s", string(marshal), string(output)) + } +} +func centerCheckConnectStatus(t *testing.T) { + cmd := exec.Command("kubevpn", "status", "-o", "json") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + + expect := []*status{{ + ID: 0, + Mode: "full", + Namespace: "default", + Status: "Connected", + ProxyList: nil, + }} + + var statuses []*status + if err = json.Unmarshal(output, &statuses); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(statuses, expect) { + marshal, _ := json.Marshal(expect) + t.Fatalf("expect: %s, but was: %s", string(marshal), string(output)) + } +} + +type status struct { + ID int32 + Mode string + Namespace string + Status string + ProxyList []*proxy +} +type proxy struct { + Namespace string + Workload string + RuleList []*rule +} +type rule struct { + Headers map[string]string + CurrentDevice bool + PortMap map[int32]int32 +} + +func checkProxyStatus(t *testing.T) { + cmd := exec.Command("kubevpn", "status", "-o", "json") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + + expect := []*status{{ + ID: 0, + Mode: "full", + Namespace: namespace, + Status: "Connected", + ProxyList: []*proxy{{ + Namespace: namespace, + Workload: "deployments.apps/reviews", + RuleList: []*rule{{ + Headers: nil, + CurrentDevice: true, + PortMap: map[int32]int32{9080: 9080}, + }}, + }}, + }} + + var statuses []*status + if err = json.Unmarshal(output, &statuses); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(statuses, expect) { + marshal, _ := json.Marshal(expect) + t.Fatalf("expect: %s, but was: %s", string(marshal), string(output)) + } +} + +func centerCheckProxyStatus(t *testing.T) { + cmd := exec.Command("kubevpn", "status", "-o", "json") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + + expect := []*status{{ + ID: 0, + Mode: "full", + Namespace: "default", + Status: "Connected", + ProxyList: []*proxy{{ + Namespace: "default", + Workload: "deployments.apps/reviews", + RuleList: []*rule{{ + Headers: nil, + CurrentDevice: true, + PortMap: map[int32]int32{9080: 9080}, + }}, + }}, + }} + + var statuses []*status + if err = json.Unmarshal(output, &statuses); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(statuses, expect) { + marshal, _ := json.Marshal(expect) + t.Fatalf("expect: %s, but was: %s", string(marshal), string(output)) + } +} + +func checkProxyWithServiceMeshStatus(t *testing.T) { + cmd := exec.Command("kubevpn", "status", "-o", "json") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + + expect := []*status{{ + ID: 0, + Mode: "full", + Namespace: namespace, + Status: "Connected", + ProxyList: []*proxy{{ + Namespace: namespace, + Workload: "deployments.apps/reviews", + RuleList: []*rule{{ + Headers: map[string]string{"env": "test"}, + CurrentDevice: true, + PortMap: map[int32]int32{9080: 9080}, + }}, + }}, + }} + + var statuses []*status + if err = json.Unmarshal(output, &statuses); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(statuses, expect) { + marshal, _ := json.Marshal(expect) + t.Fatalf("expect: %s, but was: %s", string(marshal), string(output)) + } +} + +func centerCheckProxyWithServiceMeshStatus(t *testing.T) { + cmd := exec.Command("kubevpn", "status", "-o", "json") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + + expect := []*status{{ + ID: 0, + Mode: "full", + Namespace: "default", + Status: "Connected", + ProxyList: []*proxy{{ + Namespace: "default", + Workload: "deployments.apps/reviews", + RuleList: []*rule{{ + Headers: map[string]string{"env": "test"}, + CurrentDevice: true, + PortMap: map[int32]int32{9080: 9080}, + }}, + }}, + }} + + var statuses []*status + if err = json.Unmarshal(output, &statuses); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(statuses, expect) { + marshal, _ := json.Marshal(expect) + t.Fatalf("expect: %s, but was: %s", string(marshal), string(output)) + } +} + +func checkProxyWithServiceMeshAndGvisorStatus(t *testing.T) { + cmd := exec.Command("kubevpn", "status", "-o", "json") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + + expect := []*status{{ + ID: 0, + Mode: "full", + Namespace: namespace, + Status: "Connected", + ProxyList: []*proxy{{ + Namespace: namespace, + Workload: "services/reviews", + RuleList: []*rule{{ + Headers: map[string]string{"env": "test"}, + CurrentDevice: true, + PortMap: map[int32]int32{9080: 9080}, + }}, + }}, + }} + + var statuses []*status + if err = json.Unmarshal(output, &statuses); err != nil { + t.Fatal(err) + } + opt := cmp.FilterPath(func(p cmp.Path) bool { + vx := p.Last().String() + if vx == `["Headers"]` { + return true + } + return false + }, cmp.Ignore()) + if !cmp.Equal(statuses, expect, opt) { + marshal, _ := json.Marshal(expect) + t.Fatalf("expect: %s, but was: %s", string(marshal), string(output)) + } +} + +func centerCheckProxyWithServiceMeshAndGvisorStatus(t *testing.T) { + cmd := exec.Command("kubevpn", "status", "-o", "json") + output, err := cmd.Output() + if err != nil { + t.Fatal(err) + } + + expect := []*status{{ + ID: 0, + Mode: "full", + Namespace: "default", + Status: "Connected", + ProxyList: []*proxy{{ + Namespace: "default", + Workload: "services/reviews", + RuleList: []*rule{{ + Headers: map[string]string{"env": "test"}, + CurrentDevice: true, + PortMap: map[int32]int32{9080: 8080}, + }}, + }}, + }} + + var statuses []*status + if err = json.Unmarshal(output, &statuses); err != nil { + t.Fatal(err) + } + opt := cmp.FilterPath(func(p cmp.Path) bool { + vx := p.Last().String() + if vx == `["Headers"]` { + return true + } + return false + }, cmp.Ignore()) + if !cmp.Equal(statuses, expect, opt) { + marshal, _ := json.Marshal(expect) + t.Fatalf("expect: %s, but was: %s", string(marshal), string(output)) + } +} + func kubevpnUninstall(t *testing.T) { cmd := exec.Command("kubevpn", "uninstall", "kubevpn") cmd.Stdout = os.Stdout @@ -533,18 +858,19 @@ func Init(t *testing.T) { t.Fatal(err) } - go startupHttpServer(t, local) + go startupHttpServer(t, "9080", local) + go startupHttpServer(t, "8080", local8080) } -func startupHttpServer(t *testing.T, str string) { +func startupHttpServer(t *testing.T, port, str string) { var health = func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(str)) } - - http.HandleFunc("/", health) - http.HandleFunc("/health", health) - t.Log("Start listening http port 9080 ...") - err := http.ListenAndServe(":9080", nil) + mux := http.NewServeMux() + mux.HandleFunc("/", health) + mux.HandleFunc("/health", health) + t.Logf("Start listening http port %s ...", port) + err := http.ListenAndServe(":"+port, mux) if err != nil { t.Fatal(err) } diff --git a/pkg/handler/reset.go b/pkg/handler/reset.go index 508cbfcc..108d0dfe 100644 --- a/pkg/handler/reset.go +++ b/pkg/handler/reset.go @@ -33,7 +33,7 @@ func (c *ConnectOptions) Reset(ctx context.Context, namespace string, workloads } var err error - workloads, err = util.NormalizedResource(ctx, c.factory, c.clientset, namespace, workloads) + workloads, _, err = util.NormalizedResource(c.factory, namespace, workloads) if err != nil { return err } @@ -93,13 +93,13 @@ func resetConfigMap(ctx context.Context, mapInterface v1.ConfigMapInterface, nam } func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workload string) error { - object, err := util.GetUnstructuredObject(factory, namespace, workload) + object, controller, err := util.GetTopOwnerObject(ctx, factory, namespace, workload) if err != nil { plog.G(ctx).Errorf("Failed to get unstructured object: %v", err) return err } - u := object.Object.(*unstructured.Unstructured) + u := controller.Object.(*unstructured.Unstructured) templateSpec, depth, err := util.GetPodTemplateSpecPath(u) if err != nil { plog.G(ctx).Errorf("Failed to get template spec path: %v", err) @@ -110,7 +110,7 @@ func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clients inject.RemoveContainers(templateSpec) - helper := pkgresource.NewHelper(object.Client, object.Mapping) + helper := pkgresource.NewHelper(controller.Client, controller.Mapping) plog.G(ctx).Debugf("The %s is under controller management", workload) // resource with controller, like deployment,statefulset var bytes []byte @@ -125,11 +125,14 @@ func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clients plog.G(ctx).Errorf("Failed to generate json patch: %v", err) return err } - _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) + _, err = helper.Patch(controller.Namespace, controller.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) if err != nil { - plog.G(ctx).Errorf("Failed to patch resource: %s %s: %v", object.Mapping.Resource.Resource, object.Name, err) + plog.G(ctx).Errorf("Failed to patch resource: %s %s: %v", controller.Mapping.Resource.Resource, controller.Name, err) return err } + if !util.IsK8sService(object) { + return nil + } var portmap = make(map[int32]int32) for _, container := range templateSpec.Spec.Containers { @@ -137,6 +140,6 @@ func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clients portmap[port.ContainerPort] = port.ContainerPort } } - err = inject.ModifyServiceTargetPort(ctx, clientset, namespace, templateSpec.Labels, portmap) + err = inject.ModifyServiceTargetPort(ctx, clientset, namespace, object.Name, portmap) return err } diff --git a/pkg/handler/uninstall.go b/pkg/handler/uninstall.go index bc4e19f5..f2077242 100644 --- a/pkg/handler/uninstall.go +++ b/pkg/handler/uninstall.go @@ -2,13 +2,13 @@ package handler import ( "context" + "fmt" "github.com/docker/docker/api/types" "github.com/docker/docker/client" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/utils/pointer" "sigs.k8s.io/yaml" @@ -91,19 +91,14 @@ func (c *ConnectOptions) LeaveAllProxyResources(ctx context.Context) (err error) v4, _ := c.GetLocalTunIP() for _, workload := range c.ProxyResources() { // deployments.apps.ry-server --> deployments.apps/ry-server - object, err := util.GetUnstructuredObject(c.factory, workload.namespace, workload.workload) + object, controller, err := util.GetTopOwnerObject(ctx, c.factory, workload.namespace, workload.workload) if err != nil { plog.G(ctx).Errorf("Failed to get unstructured object: %v", err) return err } - u := object.Object.(*unstructured.Unstructured) - templateSpec, _, err := util.GetPodTemplateSpecPath(u) - if err != nil { - plog.G(ctx).Errorf("Failed to get template spec path: %v", err) - return err - } + nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) var empty bool - empty, err = inject.UnPatchContainer(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), object, func(isFargateMode bool, rule *controlplane.Rule) bool { + empty, err = inject.UnPatchContainer(ctx, nodeID, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), controller, func(isFargateMode bool, rule *controlplane.Rule) bool { if isFargateMode { return c.IsMe(workload.namespace, util.ConvertWorkloadToUid(workload.workload), rule.Headers) } @@ -113,8 +108,8 @@ func (c *ConnectOptions) LeaveAllProxyResources(ctx context.Context) (err error) plog.G(ctx).Errorf("Failed to leave workload %s in namespace %s: %v", workload.workload, workload.namespace, err) continue } - if empty { - err = inject.ModifyServiceTargetPort(ctx, c.clientset, workload.namespace, templateSpec.Labels, map[int32]int32{}) + if empty && util.IsK8sService(object) { + err = inject.ModifyServiceTargetPort(ctx, c.clientset, workload.namespace, object.Name, map[int32]int32{}) } c.LeavePortMap(workload.namespace, workload.workload) } diff --git a/pkg/inject/controller.go b/pkg/inject/controller.go index 5d6f0d39..7e032cc7 100644 --- a/pkg/inject/controller.go +++ b/pkg/inject/controller.go @@ -38,7 +38,7 @@ func RemoveContainers(spec *v1.PodTemplateSpec) { } // AddMeshContainer todo envoy support ipv6 -func AddMeshContainer(spec *v1.PodTemplateSpec, ns, nodeId string, c util.PodRouteConfig, ipv6 bool, connectNamespace string, secret *v1.Secret) { +func AddMeshContainer(spec *v1.PodTemplateSpec, ns, nodeID string, c util.PodRouteConfig, ipv6 bool, connectNamespace string, secret *v1.Secret) { // remove envoy proxy containers if already exist RemoveContainers(spec) @@ -144,9 +144,9 @@ kubevpn server -l "tun:/localhost:8422?net=${TunIPv4}&net6=${TunIPv6}&route=${CI "--base-id", "1", "--service-node", - util.GenEnvoyUID(ns, nodeId), + util.GenEnvoyUID(ns, nodeID), "--service-cluster", - util.GenEnvoyUID(ns, nodeId), + util.GenEnvoyUID(ns, nodeID), "--config-yaml", }, Args: []string{ @@ -171,7 +171,7 @@ kubevpn server -l "tun:/localhost:8422?net=${TunIPv4}&net6=${TunIPv6}&route=${CI }) } -func AddEnvoyContainer(spec *v1.PodTemplateSpec, ns, nodeId string, ipv6 bool, connectNamespace string, secret *v1.Secret) { +func AddEnvoyContainer(spec *v1.PodTemplateSpec, ns, nodeID string, ipv6 bool, connectNamespace string, secret *v1.Secret) { // remove envoy proxy containers if already exist RemoveContainers(spec) @@ -208,9 +208,9 @@ kubevpn server -l "ssh://:2222"`, "--base-id", "1", "--service-node", - util.GenEnvoyUID(ns, nodeId), + util.GenEnvoyUID(ns, nodeID), "--service-cluster", - util.GenEnvoyUID(ns, nodeId), + util.GenEnvoyUID(ns, nodeID), "--config-yaml", }, Args: []string{ diff --git a/pkg/inject/fargate.go b/pkg/inject/fargate.go index cf710dc3..5a04b8f9 100644 --- a/pkg/inject/fargate.go +++ b/pkg/inject/fargate.go @@ -10,7 +10,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" k8sjson "k8s.io/apimachinery/pkg/util/json" @@ -28,7 +27,7 @@ import ( // InjectEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 // https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio -func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kubernetes.Clientset, connectNamespace string, object *runtimeresource.Info, headers map[string]string, portMap []string, secret *v1.Secret) (err error) { +func InjectEnvoySidecar(ctx context.Context, nodeID string, f cmdutil.Factory, clientset *kubernetes.Clientset, connectNamespace string, current, object *runtimeresource.Info, headers map[string]string, portMap []string, secret *v1.Secret) (err error) { u := object.Object.(*unstructured.Unstructured) var templateSpec *v1.PodTemplateSpec var path []string @@ -37,8 +36,6 @@ func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kuber return err } - nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) - c := util.PodRouteConfig{LocalTunIPv4: "127.0.0.1", LocalTunIPv6: netip.IPv6Loopback().String()} ports, portmap := GetPort(templateSpec, portMap) port := controlplane.ConvertContainerPort(ports...) @@ -91,34 +88,22 @@ func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kuber return err } + if !util.IsK8sService(current) { + return nil + } // 2) modify service containerPort to envoy listener port - err = ModifyServiceTargetPort(ctx, clientset, object.Namespace, templateSpec.Labels, containerPort2EnvoyListenerPort) + err = ModifyServiceTargetPort(ctx, clientset, object.Namespace, current.Name, containerPort2EnvoyListenerPort) if err != nil { return err } return nil } -func ModifyServiceTargetPort(ctx context.Context, clientset *kubernetes.Clientset, namespace string, podLabels map[string]string, m map[int32]int32) error { - // service selector == pod labels - list, err := clientset.CoreV1().Services(namespace).List(ctx, metav1.ListOptions{}) +func ModifyServiceTargetPort(ctx context.Context, clientset *kubernetes.Clientset, namespace string, name string, m map[int32]int32) error { + svc, err := clientset.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { return err } - - var svc *v1.Service - for _, item := range list.Items { - if item.Spec.Selector == nil { - continue - } - if labels.SelectorFromSet(item.Spec.Selector).Matches(labels.Set(podLabels)) { - svc = &item - break - } - } - if svc == nil { - return fmt.Errorf("can not found service with selector: %v", podLabels) - } for i := range len(svc.Spec.Ports) { if p, found := m[svc.Spec.Ports[i].Port]; found { svc.Spec.Ports[i].TargetPort = intstr.FromInt32(p) diff --git a/pkg/inject/mesh.go b/pkg/inject/mesh.go index f38a0f2d..92b23e23 100644 --- a/pkg/inject/mesh.go +++ b/pkg/inject/mesh.go @@ -31,7 +31,7 @@ import ( // https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio // InjectVPNAndEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 -func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, mapInterface v12.ConfigMapInterface, connectNamespace string, object *runtimeresource.Info, c util.PodRouteConfig, headers map[string]string, portMaps []string, secret *v1.Secret) (err error) { +func InjectVPNAndEnvoySidecar(ctx context.Context, nodeID string, f cmdutil.Factory, mapInterface v12.ConfigMapInterface, connectNamespace string, object *runtimeresource.Info, c util.PodRouteConfig, headers map[string]string, portMaps []string, secret *v1.Secret) (err error) { u := object.Object.(*unstructured.Unstructured) var templateSpec *v1.PodTemplateSpec var path []string @@ -70,8 +70,6 @@ func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, mapInterfa } } - nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) - err = addEnvoyConfig(mapInterface, object.Namespace, nodeID, c, headers, ports, portmap) if err != nil { plog.G(ctx).Errorf("Failed to add envoy config: %v", err) @@ -114,7 +112,7 @@ func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, mapInterfa return err } -func UnPatchContainer(ctx context.Context, factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, object *runtimeresource.Info, isMeFunc func(isFargateMode bool, rule *controlplane.Rule) bool) (bool, error) { +func UnPatchContainer(ctx context.Context, nodeID string, factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, object *runtimeresource.Info, isMeFunc func(isFargateMode bool, rule *controlplane.Rule) bool) (bool, error) { u := object.Object.(*unstructured.Unstructured) templateSpec, depth, err := util.GetPodTemplateSpecPath(u) if err != nil { @@ -122,7 +120,6 @@ func UnPatchContainer(ctx context.Context, factory cmdutil.Factory, mapInterface return false, err } - nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) workload := util.ConvertUidToWorkload(nodeID) var empty, found bool empty, found, err = removeEnvoyConfig(mapInterface, object.Namespace, nodeID, isMeFunc) diff --git a/pkg/inject/proxy.go b/pkg/inject/proxy.go index f50bd5b1..9957b59a 100644 --- a/pkg/inject/proxy.go +++ b/pkg/inject/proxy.go @@ -24,7 +24,7 @@ import ( util2 "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func InjectVPNSidecar(ctx context.Context, f util.Factory, connectNamespace string, object *resource.Info, c util2.PodRouteConfig, secret *v1.Secret) error { +func InjectVPNSidecar(ctx context.Context, nodeID string, f util.Factory, connectNamespace string, object *resource.Info, c util2.PodRouteConfig, secret *v1.Secret) error { u := object.Object.(*unstructured.Unstructured) podTempSpec, path, err := util2.GetPodTemplateSpecPath(u) @@ -36,7 +36,6 @@ func InjectVPNSidecar(ctx context.Context, f util.Factory, connectNamespace stri if err != nil { return err } - nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) var ports []v1.ContainerPort for _, container := range podTempSpec.Spec.Containers { ports = append(ports, container.Ports...) @@ -123,51 +122,6 @@ func CreateAfterDeletePod(ctx context.Context, factory util.Factory, p *v1.Pod, return nil } -func removeInboundContainer(factory util.Factory, namespace, workloads string) error { - object, err := util2.GetUnstructuredObject(factory, namespace, workloads) - if err != nil { - return err - } - - u := object.Object.(*unstructured.Unstructured) - - podTempSpec, path, err := util2.GetPodTemplateSpecPath(u) - if err != nil { - return err - } - - helper := resource.NewHelper(object.Client, object.Mapping) - - // pods - if len(path) == 0 { - _, err = helper.DeleteWithOptions(object.Namespace, object.Name, &v12.DeleteOptions{ - GracePeriodSeconds: pointer.Int64(0), - }) - if err != nil { - return err - } - } - // how to scale to one - RemoveContainer(&podTempSpec.Spec) - - bytes, err := json.Marshal([]struct { - Op string `json:"op"` - Path string `json:"path"` - Value interface{} `json:"value"` - }{{ - Op: "replace", - Path: "/" + strings.Join(append(path, "spec"), "/"), - Value: podTempSpec.Spec, - }}) - if err != nil { - return err - } - _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &v12.PatchOptions{ - //Force: &t, - }) - return err -} - func CleanupUselessInfo(pod *v1.Pod) { pod.SetSelfLink("") pod.SetGeneration(0) diff --git a/pkg/util/pod.go b/pkg/util/pod.go index c8470965..8ba4de16 100644 --- a/pkg/util/pod.go +++ b/pkg/util/pod.go @@ -189,34 +189,42 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na } } -func GetTopOwnerReference(factory util.Factory, ns, workload string) (*resource.Info, error) { +func GetTopOwnerReference(factory util.Factory, ns, workload string) (object, controller *resource.Info, err error) { + object, err = GetUnstructuredObject(factory, ns, workload) + if err != nil { + return nil, nil, err + } + ownerRef := v1.GetControllerOf(object.Object.(*unstructured.Unstructured)) + if ownerRef == nil { + return object, object, err + } + var owner = fmt.Sprintf("%s/%s", ownerRef.Kind, ownerRef.Name) for { - object, err := GetUnstructuredObject(factory, ns, workload) + controller, err = GetUnstructuredObject(factory, ns, owner) if err != nil { - return nil, err + return nil, nil, err } - ownerReference := v1.GetControllerOf(object.Object.(*unstructured.Unstructured)) - if ownerReference == nil { - return object, nil + ownerRef = v1.GetControllerOf(controller.Object.(*unstructured.Unstructured)) + if ownerRef == nil { + return object, controller, nil } - workload = fmt.Sprintf("%s/%s", ownerReference.Kind, ownerReference.Name) + owner = fmt.Sprintf("%s/%s", ownerRef.Kind, ownerRef.Name) } } // GetTopOwnerReferenceBySelector assume pods, controller has same labels -func GetTopOwnerReferenceBySelector(factory util.Factory, ns, selector string) (sets.Set[string], error) { - object, err := GetUnstructuredObjectBySelector(factory, ns, selector) +func GetTopOwnerReferenceBySelector(factory util.Factory, ns, selector string) (object, controller *resource.Info, err error) { + objectList, err := GetUnstructuredObjectBySelector(factory, ns, selector) if err != nil { - return nil, err + return nil, nil, err } - set := sets.New[string]() - for _, info := range object { - ownerReference, err := GetTopOwnerReference(factory, ns, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name)) - if err == nil && ownerReference.Mapping.Resource.Resource != "services" { - set.Insert(fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name)) + for _, info := range objectList { + if IsK8sService(info) { + continue } + return GetTopOwnerReference(factory, ns, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name)) } - return set, nil + return nil, nil, fmt.Errorf("can not find controller for %s", selector) } func Shell(_ context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, ns string, cmd []string) (string, error) { diff --git a/pkg/util/unstructure.go b/pkg/util/unstructure.go index 06f7c92d..c6d42112 100644 --- a/pkg/util/unstructure.go +++ b/pkg/util/unstructure.go @@ -10,9 +10,7 @@ import ( v2 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/cli-runtime/pkg/resource" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/polymorphichelpers" @@ -111,22 +109,6 @@ func GetPodTemplateSpecPath(u *unstructured.Unstructured) (*v1.PodTemplateSpec, return &p, path, nil } -func GetAnnotation(f util.Factory, ns string, resources string) (map[string]string, error) { - ownerReference, err := GetTopOwnerReference(f, ns, resources) - if err != nil { - return nil, err - } - u, ok := ownerReference.Object.(*unstructured.Unstructured) - if !ok { - return nil, fmt.Errorf("can not convert to unstaructed") - } - annotations := u.GetAnnotations() - if annotations == nil { - annotations = map[string]string{} - } - return annotations, nil -} - /* NormalizedResource convert user parameter to standard, example: @@ -139,71 +121,61 @@ NormalizedResource convert user parameter to standard, example: pod/productpage-without-controller --> pod/productpage-without-controller service/productpage-without-pod --> controller/controllerName */ -func NormalizedResource(ctx context.Context, f util.Factory, clientset *kubernetes.Clientset, ns string, workloads []string) ([]string, error) { +func NormalizedResource(f util.Factory, ns string, workloads []string) ([]string, []*resource.Info, error) { if len(workloads) == 0 { - return nil, nil + return nil, nil, nil } objectList, err := GetUnstructuredObjectList(f, ns, workloads) if err != nil { - return nil, err + return nil, nil, err } var resources []string for _, info := range objectList { resources = append(resources, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name)) } - workloads = resources - - // normal workloads, like pod with controller, deployments, statefulset, replicaset etc... - for i, workload := range workloads { - var ownerReference *resource.Info - ownerReference, err = GetTopOwnerReference(f, ns, workload) - if err == nil { - workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name) - } - } - // service which associate with pod - for i, workload := range workloads { - var object *resource.Info - object, err = GetUnstructuredObject(f, ns, workload) - if err != nil { - return nil, err - } - if object.Mapping.Resource.Resource != "services" { - continue - } - var svc *v1.Service - svc, err = clientset.CoreV1().Services(ns).Get(ctx, object.Name, v2.GetOptions{}) - if err != nil { - continue - } - var selector labels.Selector - _, selector, err = polymorphichelpers.SelectorsForObject(svc) - if err != nil { - continue - } - var podList *v1.PodList - podList, err = clientset.CoreV1().Pods(ns).List(ctx, v2.ListOptions{LabelSelector: selector.String()}) - // if pod is not empty, using pods to find top controller - if err == nil && podList != nil && len(podList.Items) != 0 { - var ownerReference *resource.Info - ownerReference, err = GetTopOwnerReference(f, ns, fmt.Sprintf("%s/%s", "pods", podList.Items[0].Name)) - if err == nil { - workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name) - } - } else { // if list is empty, means not create pods, just controllers - var controller sets.Set[string] - controller, err = GetTopOwnerReferenceBySelector(f, ns, selector.String()) - if err == nil { - if len(controller) > 0 { - workloads[i] = controller.UnsortedList()[0] - } - } - // only a single service, not support it yet - if controller == nil || controller.Len() == 0 { - return nil, fmt.Errorf("not support resources: %s", workload) - } - } - } - return workloads, nil + return resources, objectList, nil +} + +func GetTopOwnerObject(ctx context.Context, f util.Factory, ns string, workload string) (object, controller *resource.Info, err error) { + // normal workload, like pod with controller, deployments, statefulset, replicaset etc... + object, controller, err = GetTopOwnerReference(f, ns, workload) + if err != nil { + return nil, nil, err + } + if !IsK8sService(object) { + return object, controller, nil + } + + clientset, err := f.KubernetesClientSet() + if err != nil { + return nil, nil, err + } + var svc *v1.Service + svc, err = clientset.CoreV1().Services(ns).Get(ctx, object.Name, v2.GetOptions{}) + if err != nil { + return nil, nil, err + } + var selector labels.Selector + _, selector, err = polymorphichelpers.SelectorsForObject(svc) + if err != nil { + return nil, nil, err + } + var podList *v1.PodList + podList, err = clientset.CoreV1().Pods(ns).List(ctx, v2.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return nil, nil, err + } + // if pod is not empty, using pods to find top controller + if len(podList.Items) != 0 { + _, controller, err = GetTopOwnerReference(f, ns, fmt.Sprintf("%s/%s", "pods", podList.Items[0].Name)) + return object, controller, err + } + // if list is empty, means not create pods, just controllers + _, controller, err = GetTopOwnerReferenceBySelector(f, ns, selector.String()) + return object, controller, err +} + +func IsK8sService(info *resource.Info) bool { + return info.Mapping.Resource.Resource == "services" }