diff --git a/pkg/connect.go b/pkg/connect.go index d37eaac3..840eeec5 100644 --- a/pkg/connect.go +++ b/pkg/connect.go @@ -64,7 +64,7 @@ func (c *ConnectOptions) createRemotePod() { if err != nil { log.Fatal(err) } - pod, err := remote.CreateServerOutbound(c.clientset, c.Namespace, &trafficManager, k8sCIDRs) + pod, err := CreateServerOutbound(c.clientset, c.Namespace, &trafficManager, k8sCIDRs) if err != nil { log.Fatal(err) } @@ -94,7 +94,7 @@ func (c *ConnectOptions) createRemotePod() { strings.Join(list, ","), ) } else { - err = remote.CreateServerInbound( + err = CreateServerInbound( c.factory, c.clientset, c.Namespace, diff --git a/pkg/controller.go b/pkg/controller.go new file mode 100644 index 00000000..fbf0c4df --- /dev/null +++ b/pkg/controller.go @@ -0,0 +1,10 @@ +package pkg + +import ( + v1 "k8s.io/api/core/v1" +) + +type Scalable interface { + ScaleToZero() (map[string]string, []v1.ContainerPort, error) + Cancel() error +} diff --git a/pkg/deploymentcontroller.go b/pkg/deploymentcontroller.go new file mode 100644 index 00000000..b81dab13 --- /dev/null +++ b/pkg/deploymentcontroller.go @@ -0,0 +1,72 @@ +package pkg + +import ( + "context" + autoscalingv1 "k8s.io/api/autoscaling/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +type DeploymentController struct { + factory cmdutil.Factory + clientset *kubernetes.Clientset + namespace string + name string + f func() error +} + +func NewDeploymentController(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, name string) *DeploymentController { + return &DeploymentController{ + factory: factory, + clientset: clientset, + namespace: namespace, + name: name, + } +} + +func (deployment *DeploymentController) ScaleToZero() (map[string]string, []v1.ContainerPort, error) { + scale, err2 := deployment.clientset.AppsV1().Deployments(deployment.namespace).GetScale(context.TODO(), deployment.name, metav1.GetOptions{}) + if err2 != nil { + return nil, nil, err2 + } + deployment.f = func() error { + _, err := deployment.clientset.AppsV1().Deployments(deployment.namespace).UpdateScale( + context.TODO(), + deployment.name, + &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{Name: deployment.name, Namespace: deployment.namespace}, + Spec: autoscalingv1.ScaleSpec{Replicas: scale.Spec.Replicas}, + }, + metav1.UpdateOptions{}, + ) + return err + } + _, err := deployment.clientset.AppsV1().Deployments(deployment.namespace).UpdateScale( + context.TODO(), + deployment.name, + &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: deployment.name, + Namespace: deployment.namespace, + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: int32(0), + }, + }, + metav1.UpdateOptions{}, + ) + if err != nil { + return nil, nil, err + } + get, err := deployment.clientset.AppsV1().Deployments(deployment.namespace).Get(context.TODO(), deployment.name, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + return get.Spec.Template.GetLabels(), get.Spec.Template.Spec.Containers[0].Ports, nil +} + +func (deployment *DeploymentController) Cancel() error { + return deployment.f() +} diff --git a/pkg/podcontroller.go b/pkg/podcontroller.go new file mode 100644 index 00000000..7bd62cbf --- /dev/null +++ b/pkg/podcontroller.go @@ -0,0 +1,64 @@ +package pkg + +import ( + "context" + "fmt" + log "github.com/sirupsen/logrus" + "github.com/wencaiwulue/kubevpn/util" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +type PodController struct { + factory cmdutil.Factory + clientset *kubernetes.Clientset + namespace string + name string + f func() error +} + +func NewPodController(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, name string) *PodController { + return &PodController{ + factory: factory, + clientset: clientset, + namespace: namespace, + name: name, + } +} + +func (pod *PodController) ScaleToZero() (map[string]string, []v1.ContainerPort, error) { + topController := util.GetTopController(pod.factory, pod.clientset, pod.namespace, fmt.Sprintf("pods/%s", pod.name)) + // controllerBy is empty + if len(topController.Name) == 0 || len(topController.Resource) == 0 { + get, err := pod.clientset.CoreV1().Pods(pod.namespace).Get(context.TODO(), pod.name, v12.GetOptions{}) + if err != nil { + return nil, nil, err + } + pod.f = func() error { + _, err = pod.clientset.CoreV1().Pods(pod.namespace).Create(context.TODO(), get, v12.CreateOptions{}) + if err != nil { + log.Warnln(err) + } + return err + } + _ = pod.clientset.CoreV1().Pods(pod.namespace).Delete(context.TODO(), pod.name, v12.DeleteOptions{}) + return get.GetLabels(), get.Spec.Containers[0].Ports, nil + } + object, err := util.GetUnstructuredObject(pod.factory, pod.namespace, fmt.Sprintf("%s/%s", topController.Resource, topController.Name)) + helper := resource.NewHelper(object.Client, object.Mapping) + pod.f = func() error { + _, err = helper.Create(pod.namespace, true, object.Object) + return err + } + if _, err = helper.Delete(pod.namespace, object.Name); err != nil { + return nil, nil, err + } + return util.GetLabelSelector(object.Object).MatchLabels, util.GetPorts(object.Object), err +} + +func (pod *PodController) Cancel() error { + return pod.f() +} diff --git a/remote/remote.go b/pkg/remote.go similarity index 83% rename from remote/remote.go rename to pkg/remote.go index 00c212d9..105ca1f0 100644 --- a/remote/remote.go +++ b/pkg/remote.go @@ -1,9 +1,10 @@ -package remote +package pkg import ( "context" "errors" log "github.com/sirupsen/logrus" + "github.com/wencaiwulue/kubevpn/remote" "github.com/wencaiwulue/kubevpn/util" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -30,7 +31,7 @@ func CreateServerOutbound(clientset *kubernetes.Clientset, namespace string, ser ) if err3 == nil && i != 0 && firstPod != nil { - updateRefCount(clientset, namespace, firstPod.Name, 1) + remote.UpdateRefCount(clientset, namespace, firstPod.Name, 1) return firstPod, nil } args := []string{ @@ -119,17 +120,38 @@ func CreateServerInbound(factory cmdutil.Factory, clientset *kubernetes.Clientse } newName := resourceTuple.Name + "-" + "shadow" util.DeletePod(clientset, namespace, newName) - err := updateScaleToZero(factory, clientset, namespace, workloads) - object, err2 := util.GetUnstructuredObject(factory, namespace, workloads) - lables := util.GetLabelSelector(object) - ports := util.GetPorts(object) + //err := updateScaleToZero(factory, clientset, namespace, workloads) + //object, err2 := util.GetUnstructuredObject(factory, namespace, workloads) + //labels := util.GetLabelSelector(object.Object) + //ports := util.GetPorts(object.Object) + var sc Scalable + switch strings.ToLower(resourceTuple.Resource) { + case "deployment", "deployments": + sc = NewDeploymentController(factory, clientset, namespace, resourceTuple.Name) + case "statefulset", "statefulsets": + sc = NewStatefulsetController(factory, clientset, namespace, resourceTuple.Name) + case "replicas": + sc = NewReplicasController(factory, clientset, namespace, resourceTuple.Name) + case "service", "services": + sc = NewServiceController(factory, clientset, namespace, resourceTuple.Name) + case "pod", "pods": + sc = NewPodController(factory, clientset, namespace, resourceTuple.Name) + default: + sc = NewPodController(factory, clientset, namespace, resourceTuple.Name) + } + remote.CancelFunctions = append(remote.CancelFunctions, func() { + if err := sc.Cancel(); err != nil { + log.Warnln(err) + } + }) + labels, ports, err2 := sc.ScaleToZero() t := true zero := int64(0) pod := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: newName, Namespace: namespace, - Labels: lables.MatchLabels, + Labels: labels, }, Spec: v1.PodSpec{ Containers: []v1.Container{ @@ -177,7 +199,7 @@ func CreateServerInbound(factory cmdutil.Factory, clientset *kubernetes.Clientse PriorityClassName: "system-cluster-critical", }, } - if _, err = clientset.CoreV1().Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil { + if _, err := clientset.CoreV1().Pods(namespace).Create(context.TODO(), &pod, metav1.CreateOptions{}); err != nil { log.Fatal(err) } watch, err := clientset.CoreV1().Pods(namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: newName})) diff --git a/pkg/replicassetcontroller.go b/pkg/replicassetcontroller.go new file mode 100644 index 00000000..b8cf9c12 --- /dev/null +++ b/pkg/replicassetcontroller.go @@ -0,0 +1,64 @@ +package pkg + +import ( + "context" + autoscalingv1 "k8s.io/api/autoscaling/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +type ReplicasController struct { + factory cmdutil.Factory + clientset *kubernetes.Clientset + namespace string + name string + f func() error +} + +func NewReplicasController(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, name string) *ReplicasController { + return &ReplicasController{ + factory: factory, + clientset: clientset, + namespace: namespace, + name: name, + } +} + +func (replicas *ReplicasController) ScaleToZero() (map[string]string, []v1.ContainerPort, error) { + updateScale, err2 := replicas.clientset.AppsV1().ReplicaSets(replicas.namespace).Get(context.TODO(), replicas.name, metav1.GetOptions{}) + if err2 != nil { + return nil, nil, err2 + } + _, err := replicas.clientset.AppsV1().ReplicaSets(replicas.namespace).UpdateScale(context.TODO(), replicas.name, &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: replicas.name, + Namespace: replicas.namespace, + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: int32(0), + }, + }, metav1.UpdateOptions{}) + if err != nil { + return nil, nil, err + } + replicas.f = func() error { + _, err = replicas.clientset.AppsV1().ReplicaSets(replicas.namespace). + UpdateScale(context.TODO(), replicas.name, &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: replicas.name, + Namespace: replicas.namespace, + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: *updateScale.Spec.Replicas, + }, + }, metav1.UpdateOptions{}) + return err + } + return updateScale.Spec.Template.Labels, updateScale.Spec.Template.Spec.Containers[0].Ports, nil +} + +func (replicas *ReplicasController) Cancel() error { + return replicas.f() +} diff --git a/pkg/servicecontroller.go b/pkg/servicecontroller.go new file mode 100644 index 00000000..328427a1 --- /dev/null +++ b/pkg/servicecontroller.go @@ -0,0 +1,68 @@ +package pkg + +import ( + "context" + "fmt" + "github.com/wencaiwulue/kubevpn/util" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +type ServiceController struct { + factory cmdutil.Factory + clientset *kubernetes.Clientset + namespace string + name string + f func() error +} + +func NewServiceController(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, name string) *ServiceController { + return &ServiceController{ + factory: factory, + clientset: clientset, + namespace: namespace, + name: name, + } +} + +func (s *ServiceController) ScaleToZero() (map[string]string, []v1.ContainerPort, error) { + get, err := s.clientset.CoreV1().Services(s.namespace).Get(context.TODO(), s.name, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + + object, err := util.GetUnstructuredObject(s.factory, s.namespace, fmt.Sprintf("services/%s", s.name)) + if err != nil { + return nil, nil, err + } + asSelector, _ := metav1.LabelSelectorAsSelector(util.GetLabelSelector(object.Object)) + podList, _ := s.clientset.CoreV1().Pods(s.namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: asSelector.String(), + }) + if len(podList.Items) == 0 { + var ports []v1.ContainerPort + for _, port := range get.Spec.Ports { + ports = append(ports, v1.ContainerPort{ + Name: port.Name, + ContainerPort: port.Port, + Protocol: port.Protocol, + }) + } + return get.Spec.Selector, ports, nil + } + podController := PodController{ + factory: s.factory, + clientset: s.clientset, + namespace: s.namespace, + name: podList.Items[0].Name, // if podList is not one, needs to merge ??? + } + zero, ports, err := podController.ScaleToZero() + s.f = podController.f + return zero, ports, err +} + +func (s *ServiceController) Cancel() error { + return s.f() +} diff --git a/pkg/statefulsetcontroller.go b/pkg/statefulsetcontroller.go new file mode 100644 index 00000000..028f6747 --- /dev/null +++ b/pkg/statefulsetcontroller.go @@ -0,0 +1,67 @@ +package pkg + +import ( + "context" + autoscalingv1 "k8s.io/api/autoscaling/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +type StatefulsetController struct { + factory cmdutil.Factory + clientset *kubernetes.Clientset + namespace string + name string + f func() error +} + +func NewStatefulsetController(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, name string) *StatefulsetController { + return &StatefulsetController{ + factory: factory, + clientset: clientset, + namespace: namespace, + name: name, + } +} + +func (s *StatefulsetController) ScaleToZero() (map[string]string, []v1.ContainerPort, error) { + scale, err := s.clientset.AppsV1().StatefulSets(s.namespace).Get(context.TODO(), s.name, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + s.f = func() error { + _, err = s.clientset.AppsV1(). + StatefulSets(s.namespace). + UpdateScale(context.TODO(), s.name, &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.name, + Namespace: s.namespace, + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: *scale.Spec.Replicas, + }, + }, metav1.UpdateOptions{}) + return err + } + _, err = s.clientset.AppsV1(). + StatefulSets(s.namespace). + UpdateScale(context.TODO(), s.name, &autoscalingv1.Scale{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.name, + Namespace: s.namespace, + }, + Spec: autoscalingv1.ScaleSpec{ + Replicas: 0, + }, + }, metav1.UpdateOptions{}) + if err != nil { + return nil, nil, err + } + return scale.Spec.Template.Labels, scale.Spec.Template.Spec.Containers[0].Ports, nil +} + +func (s *StatefulsetController) Cancel() error { + return s.f() +} diff --git a/remote/cleaner.go b/remote/cleaner.go index 96a3bda9..783b0013 100644 --- a/remote/cleaner.go +++ b/remote/cleaner.go @@ -50,15 +50,15 @@ func AddCleanUpResourceHandler(clientset *kubernetes.Clientset, namespace string } } wg.Wait() - wg = sync.WaitGroup{} - for _, controller := range util.TopLevelControllerSet { - wg.Add(1) - go func(control util.ResourceTupleWithScale) { - util.UpdateReplicasScale(clientset, namespace, control) - wg.Done() - }(controller) - } - wg.Wait() + //wg = sync.WaitGroup{} + //for _, controller := range util.TopLevelControllerSet { + // wg.Add(1) + // go func(control util.ResourceTupleWithScale) { + // util.UpdateReplicasScale(clientset, namespace, control) + // wg.Done() + // }(controller) + //} + //wg.Wait() log.Info("clean up successful") for _, function := range CancelFunctions { if function != nil { @@ -69,7 +69,7 @@ func AddCleanUpResourceHandler(clientset *kubernetes.Clientset, namespace string } // vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99 -func updateRefCount(clientset *kubernetes.Clientset, namespace, name string, increment int) { +func UpdateRefCount(clientset *kubernetes.Clientset, namespace, name string, increment int) { if err := retry.OnError(retry.DefaultRetry, func(err error) bool { return err != nil }, func() error { @@ -100,7 +100,7 @@ func updateRefCount(clientset *kubernetes.Clientset, namespace, name string, inc } func cleanUpTrafficManagerIfRefCountIsZero(clientset *kubernetes.Clientset, namespace string) { - updateRefCount(clientset, namespace, util.TrafficManager, -1) + UpdateRefCount(clientset, namespace, util.TrafficManager, -1) pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), util.TrafficManager, v1.GetOptions{}) if err != nil { log.Error(err) diff --git a/remote/envoy.go b/remote/envoy.go index 9226b298..0b8a21a5 100644 --- a/remote/envoy.go +++ b/remote/envoy.go @@ -186,7 +186,7 @@ func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes if err != nil { return } - asSelector, _ := metav1.LabelSelectorAsSelector(util.GetLabelSelector(object)) + asSelector, _ := metav1.LabelSelectorAsSelector(util.GetLabelSelector(object.Object)) serviceList, _ := clientset.CoreV1().Services(namespace).List(context.Background(), metav1.ListOptions{ LabelSelector: asSelector.String(), }) diff --git a/remote/remote_test.go b/remote/remote_test.go index 0f9164bd..8395a84a 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" log "github.com/sirupsen/logrus" + "github.com/wencaiwulue/kubevpn/pkg" "github.com/wencaiwulue/kubevpn/util" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -47,7 +48,7 @@ func TestCreateServer(t *testing.T) { Mask: net.IPv4Mask(255, 255, 0, 0), } - server, err := CreateServerOutbound(clientset, "test", i, []*net.IPNet{j}) + server, err := pkg.CreateServerOutbound(clientset, "test", i, []*net.IPNet{j}) fmt.Println(server) } diff --git a/util/util.go b/util/util.go index b8d1227e..c63a6943 100644 --- a/util/util.go +++ b/util/util.go @@ -164,7 +164,7 @@ func GetTopController(factory cmdutil.Factory, clientset *kubernetes.Clientset, if err != nil { return } - asSelector, _ := metav1.LabelSelectorAsSelector(GetLabelSelector(object)) + asSelector, _ := metav1.LabelSelectorAsSelector(GetLabelSelector(object.Object)) podList, _ := clientset.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{ LabelSelector: asSelector.String(), }) @@ -179,8 +179,8 @@ func GetTopController(factory cmdutil.Factory, clientset *kubernetes.Clientset, } controller.Resource = strings.ToLower(of.Kind) + "s" controller.Name = of.Name - controller.Scale = GetScale(object) - of = GetOwnerReferences(object) + controller.Scale = GetScale(object.Object) + of = GetOwnerReferences(object.Object) } return } @@ -275,7 +275,7 @@ func IsWindows() bool { return runtime.GOOS == "windows" } -func GetUnstructuredObject(f cmdutil.Factory, namespace string, workloads string) (k8sruntime.Object, error) { +func GetUnstructuredObject(f cmdutil.Factory, namespace string, workloads string) (*runtimeresource.Info, error) { do := f.NewBuilder(). Unstructured(). NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false). @@ -297,7 +297,7 @@ func GetUnstructuredObject(f cmdutil.Factory, namespace string, workloads string if len(infos) == 0 { return nil, errors.New("Not found") } - return infos[0].Object, err + return infos[0], err } func GetLabelSelector(object k8sruntime.Object) *metav1.LabelSelector {