From 8758f910c88909900c3ae118ffdc699cf99c0d60 Mon Sep 17 00:00:00 2001 From: wencaiwulue <895703375@qq.com> Date: Sun, 5 Sep 2021 17:12:11 +0800 Subject: [PATCH] support statefulset deployment replicaset, but needs to restore right scale --- pkg/main.go | 2 +- remote/cleaner.go | 13 +---- remote/remote.go | 34 +------------ remote/remote_test.go | 44 ++++++++++++++++- util/util.go | 108 ++++++++++++++++++++++++++++++++++++++---- 5 files changed, 145 insertions(+), 56 deletions(-) diff --git a/pkg/main.go b/pkg/main.go index d67a36e8..88ce1ef2 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -58,7 +58,7 @@ func init() { if clientset, err = kubernetes.NewForConfig(config); err != nil { log.Fatal(err) } - if namespace == "" { + if len(namespace) == 0 { if namespace, _, err = factory.ToRawKubeConfigLoader().Namespace(); err != nil { log.Fatal(err) } diff --git a/remote/cleaner.go b/remote/cleaner.go index 13e65c49..c6c87ef5 100644 --- a/remote/cleaner.go +++ b/remote/cleaner.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" log "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -40,7 +39,7 @@ func AddCleanUpResourceHandler(client *kubernetes.Clientset, namespace string, s defer wg.Done() util.ScaleDeploymentReplicasTo(client, namespace, finalService, 1) newName := finalService + "-" + "shadow" - deletePod(client, namespace, newName) + util.DeletePod(client, namespace, newName) }(service) } } @@ -50,16 +49,6 @@ func AddCleanUpResourceHandler(client *kubernetes.Clientset, namespace string, s }() } -func deletePod(client *kubernetes.Clientset, namespace, podName string) { - zero := int64(0) - err := client.CoreV1().Pods(namespace).Delete(context.TODO(), podName, v1.DeleteOptions{ - GracePeriodSeconds: &zero, - }) - if err != nil && errors.IsNotFound(err) { - log.Infof("not found shadow pod: %s, no need to delete it", podName) - } -} - // vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99 func updateRefCount(client *kubernetes.Clientset, namespace, name string, increment int) { if err := retry.OnError(retry.DefaultRetry, func(err error) bool { diff --git a/remote/remote.go b/remote/remote.go index efae30a2..9d9bd4d7 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -199,42 +199,10 @@ func updateReplicasToZeroAndGetLabels(clientset *kubernetes.Clientset, namespace } log.Info("prepare to expose local service to remote service: " + service) util.ScaleDeploymentReplicasTo(clientset, namespace, service, 0) - labels, ports := getLabels(clientset, namespace, service) + labels, ports := util.GetLabels(clientset, namespace, service) if labels == nil { log.Info("fail to create shadow") return nil, nil } return labels, ports } -func getLabels(clientset *kubernetes.Clientset, namespace, service string) (map[string]string, []v1.ContainerPort) { - get, err := clientset.CoreV1().Services(namespace). - Get(context.TODO(), service, metav1.GetOptions{}) - if err != nil { - log.Error(err) - return nil, nil - } - selector := get.Spec.Selector - _, err = clientset.AppsV1().Deployments(namespace).Get(context.TODO(), service, metav1.GetOptions{}) - if err != nil { - log.Error(err) - return nil, nil - } - newName := service + "-" + "shadow" - deletePod(clientset, namespace, newName) - var ports []v1.ContainerPort - for _, port := range get.Spec.Ports { - val := port.TargetPort.IntVal - if val == 0 { - //if strings.ToLower(port.TargetPort.StrVal) == "http" { - // val = 8080 - //} - val = port.Port - } - ports = append(ports, v1.ContainerPort{ - Name: port.Name, - ContainerPort: val, - Protocol: port.Protocol, - }) - } - return selector, ports -} diff --git a/remote/remote_test.go b/remote/remote_test.go index 28449b12..42d528ce 100644 --- a/remote/remote_test.go +++ b/remote/remote_test.go @@ -1,13 +1,19 @@ package remote import ( + "context" + "encoding/json" "fmt" log "github.com/sirupsen/logrus" + v1 "k8s.io/api/apps/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "net" "path/filepath" + "strings" "testing" "time" ) @@ -87,5 +93,41 @@ func TestGetIPFromDHCP(t *testing.T) { } time.Sleep(time.Millisecond * 10) } - +} + +func TestOwnerRef(t *testing.T) { + clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + &clientcmd.ClientConfigLoadingRules{ExplicitPath: clientcmd.RecommendedHomeFile}, nil, + ) + config, _ := clientConfig.ClientConfig() + clientset, _ := kubernetes.NewForConfig(config) + //get, _ := clientset.CoreV1().Pods("test").Get(context.Background(), "tomcat-7449544d95-nv7gr", metav1.GetOptions{}) + get, _ := clientset.CoreV1().Pods("test").Get(context.Background(), "mysql-0", metav1.GetOptions{}) + + of := metav1.GetControllerOf(get) + for of != nil { + b, err := clientset.AppsV1().RESTClient().Get().Namespace("test"). + Name(of.Name).Resource(strings.ToLower(of.Kind) + "s").Do(context.Background()).Raw() + if k8serrors.IsNotFound(err) { + return + } + var replicaSet v1.ReplicaSet + if err = json.Unmarshal(b, &replicaSet); err == nil && len(replicaSet.Name) != 0 { + fmt.Printf("%s-%s\n", replicaSet.Kind, replicaSet.Name) + of = metav1.GetControllerOfNoCopy(&replicaSet) + continue + } + var statefulSet v1.StatefulSet + if err = json.Unmarshal(b, &statefulSet); err == nil && len(statefulSet.Name) != 0 { + fmt.Printf("%s-%s\n", statefulSet.Kind, statefulSet.Name) + of = metav1.GetControllerOfNoCopy(&statefulSet) + continue + } + var deployment v1.Deployment + if err = json.Unmarshal(b, &deployment); err == nil && len(deployment.Name) != 0 { + fmt.Printf("%s-%s\n", deployment.Kind, deployment.Name) + of = metav1.GetControllerOfNoCopy(&deployment) + continue + } + } } diff --git a/util/util.go b/util/util.go index ca7d6888..d7b529ff 100644 --- a/util/util.go +++ b/util/util.go @@ -3,12 +3,15 @@ package util import ( "bytes" "context" + "encoding/json" "fmt" dockerterm "github.com/moby/term" log "github.com/sirupsen/logrus" "io" + appsv1 "k8s.io/api/apps/v1" v12 "k8s.io/api/autoscaling/v1" "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" @@ -24,6 +27,7 @@ import ( "k8s.io/client-go/transport/spdy" "k8s.io/client-go/util/retry" "k8s.io/kubectl/pkg/cmd/exec" + //"kubevpn/remote" "net" "net/http" "os" @@ -137,15 +141,67 @@ func PortForwardPod(config *rest.Config, clientset *kubernetes.Clientset, podNam return nil } -func ScaleDeploymentReplicasTo(options *kubernetes.Clientset, namespace, name string, replicas int32) { +func GetTopController(clientset *kubernetes.Clientset, namespace, serviceName string) (resource string, name string) { + labels, _ := GetLabels(clientset, namespace, serviceName) + + // todo verify it's correct or not + asSelector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: labels}) + get, _ := clientset.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: asSelector.String(), + }) + if len(get.Items) == 0 { + return + } + of := metav1.GetControllerOf(&get.Items[0]) + for of != nil { + b, err := clientset.AppsV1().RESTClient().Get().Namespace("test"). + Name(of.Name).Resource(strings.ToLower(of.Kind) + "s").Do(context.Background()).Raw() + if k8serrors.IsNotFound(err) { + return + } + var replicaSet appsv1.ReplicaSet + if err = json.Unmarshal(b, &replicaSet); err == nil && len(replicaSet.Name) != 0 { + resource = strings.ToLower(replicaSet.Kind) + "s" + name = replicaSet.Name + of = metav1.GetControllerOfNoCopy(&replicaSet) + continue + } + var statefulSet appsv1.StatefulSet + if err = json.Unmarshal(b, &statefulSet); err == nil && len(statefulSet.Name) != 0 { + resource = strings.ToLower(statefulSet.Kind) + "s" + name = statefulSet.Name + of = metav1.GetControllerOfNoCopy(&statefulSet) + continue + } + var deployment appsv1.Deployment + if err = json.Unmarshal(b, &deployment); err == nil && len(deployment.Name) != 0 { + resource = strings.ToLower(deployment.Kind) + "s" + name = deployment.Name + of = metav1.GetControllerOfNoCopy(&deployment) + continue + } + } + return +} + +// todo restore scale if replicaset is zero, needs to remember top controller type and name +func ScaleDeploymentReplicasTo(clientset *kubernetes.Clientset, namespace, serviceName string, replicas int32) { + controller, name := GetTopController(clientset, namespace, serviceName) + if len(controller) == 0 || len(name) == 0 { + log.Warnf("controller is empty, service: %s-%s", namespace, serviceName) + } err := retry.OnError( retry.DefaultRetry, func(err error) bool { return err != nil }, func() error { - _, err := options.AppsV1().Deployments(namespace).UpdateScale( - context.TODO(), - name, - &v12.Scale{ + result := &v12.Scale{} + err := clientset.AppsV1().RESTClient().Put(). + Namespace(namespace). + Resource(controller). + Name(name). + SubResource("scale"). + VersionedParams(&metav1.UpdateOptions{}, scheme.ParameterCodec). + Body(&v12.Scale{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, @@ -153,13 +209,13 @@ func ScaleDeploymentReplicasTo(options *kubernetes.Clientset, namespace, name st Spec: v12.ScaleSpec{ Replicas: replicas, }, - }, - metav1.UpdateOptions{}, - ) + }). + Do(context.Background()). + Into(result) return err }) if err != nil { - log.Errorf("update deployment: %s's replicas to %d failed, error: %v", name, replicas, err) + log.Errorf("update deployment: %s's replicas to %d failed, error: %v", serviceName, replicas, err) } } @@ -222,3 +278,37 @@ func Shell(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config func IsWindows() bool { return runtime.GOOS == "windows" } +func GetLabels(clientset *kubernetes.Clientset, namespace, service string) (map[string]string, []v1.ContainerPort) { + get, err := clientset.CoreV1().Services(namespace). + Get(context.TODO(), service, metav1.GetOptions{}) + if err != nil { + log.Error(err) + return nil, nil + } + selector := get.Spec.Selector + newName := service + "-" + "shadow" + DeletePod(clientset, namespace, newName) + var ports []v1.ContainerPort + for _, port := range get.Spec.Ports { + val := port.TargetPort.IntVal + if val == 0 { + val = port.Port + } + ports = append(ports, v1.ContainerPort{ + Name: port.Name, + ContainerPort: val, + Protocol: port.Protocol, + }) + } + return selector, ports +} + +func DeletePod(client *kubernetes.Clientset, namespace, podName string) { + zero := int64(0) + err := client.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{ + GracePeriodSeconds: &zero, + }) + if err != nil && k8serrors.IsNotFound(err) { + log.Infof("not found shadow pod: %s, no need to delete it", podName) + } +}