support statefulset deployment replicaset, but needs to restore right scale

This commit is contained in:
wencaiwulue
2021-09-05 17:12:11 +08:00
parent fd2db7dca8
commit 8758f910c8
5 changed files with 145 additions and 56 deletions

View File

@@ -58,7 +58,7 @@ func init() {
if clientset, err = kubernetes.NewForConfig(config); err != nil {
log.Fatal(err)
}
if namespace == "" {
if len(namespace) == 0 {
if namespace, _, err = factory.ToRawKubeConfigLoader().Namespace(); err != nil {
log.Fatal(err)
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
@@ -40,7 +39,7 @@ func AddCleanUpResourceHandler(client *kubernetes.Clientset, namespace string, s
defer wg.Done()
util.ScaleDeploymentReplicasTo(client, namespace, finalService, 1)
newName := finalService + "-" + "shadow"
deletePod(client, namespace, newName)
util.DeletePod(client, namespace, newName)
}(service)
}
}
@@ -50,16 +49,6 @@ func AddCleanUpResourceHandler(client *kubernetes.Clientset, namespace string, s
}()
}
func deletePod(client *kubernetes.Clientset, namespace, podName string) {
zero := int64(0)
err := client.CoreV1().Pods(namespace).Delete(context.TODO(), podName, v1.DeleteOptions{
GracePeriodSeconds: &zero,
})
if err != nil && errors.IsNotFound(err) {
log.Infof("not found shadow pod: %s, no need to delete it", podName)
}
}
// vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99
func updateRefCount(client *kubernetes.Clientset, namespace, name string, increment int) {
if err := retry.OnError(retry.DefaultRetry, func(err error) bool {

View File

@@ -199,42 +199,10 @@ func updateReplicasToZeroAndGetLabels(clientset *kubernetes.Clientset, namespace
}
log.Info("prepare to expose local service to remote service: " + service)
util.ScaleDeploymentReplicasTo(clientset, namespace, service, 0)
labels, ports := getLabels(clientset, namespace, service)
labels, ports := util.GetLabels(clientset, namespace, service)
if labels == nil {
log.Info("fail to create shadow")
return nil, nil
}
return labels, ports
}
func getLabels(clientset *kubernetes.Clientset, namespace, service string) (map[string]string, []v1.ContainerPort) {
get, err := clientset.CoreV1().Services(namespace).
Get(context.TODO(), service, metav1.GetOptions{})
if err != nil {
log.Error(err)
return nil, nil
}
selector := get.Spec.Selector
_, err = clientset.AppsV1().Deployments(namespace).Get(context.TODO(), service, metav1.GetOptions{})
if err != nil {
log.Error(err)
return nil, nil
}
newName := service + "-" + "shadow"
deletePod(clientset, namespace, newName)
var ports []v1.ContainerPort
for _, port := range get.Spec.Ports {
val := port.TargetPort.IntVal
if val == 0 {
//if strings.ToLower(port.TargetPort.StrVal) == "http" {
// val = 8080
//}
val = port.Port
}
ports = append(ports, v1.ContainerPort{
Name: port.Name,
ContainerPort: val,
Protocol: port.Protocol,
})
}
return selector, ports
}

View File

@@ -1,13 +1,19 @@
package remote
import (
"context"
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/apps/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"net"
"path/filepath"
"strings"
"testing"
"time"
)
@@ -87,5 +93,41 @@ func TestGetIPFromDHCP(t *testing.T) {
}
time.Sleep(time.Millisecond * 10)
}
}
func TestOwnerRef(t *testing.T) {
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: clientcmd.RecommendedHomeFile}, nil,
)
config, _ := clientConfig.ClientConfig()
clientset, _ := kubernetes.NewForConfig(config)
//get, _ := clientset.CoreV1().Pods("test").Get(context.Background(), "tomcat-7449544d95-nv7gr", metav1.GetOptions{})
get, _ := clientset.CoreV1().Pods("test").Get(context.Background(), "mysql-0", metav1.GetOptions{})
of := metav1.GetControllerOf(get)
for of != nil {
b, err := clientset.AppsV1().RESTClient().Get().Namespace("test").
Name(of.Name).Resource(strings.ToLower(of.Kind) + "s").Do(context.Background()).Raw()
if k8serrors.IsNotFound(err) {
return
}
var replicaSet v1.ReplicaSet
if err = json.Unmarshal(b, &replicaSet); err == nil && len(replicaSet.Name) != 0 {
fmt.Printf("%s-%s\n", replicaSet.Kind, replicaSet.Name)
of = metav1.GetControllerOfNoCopy(&replicaSet)
continue
}
var statefulSet v1.StatefulSet
if err = json.Unmarshal(b, &statefulSet); err == nil && len(statefulSet.Name) != 0 {
fmt.Printf("%s-%s\n", statefulSet.Kind, statefulSet.Name)
of = metav1.GetControllerOfNoCopy(&statefulSet)
continue
}
var deployment v1.Deployment
if err = json.Unmarshal(b, &deployment); err == nil && len(deployment.Name) != 0 {
fmt.Printf("%s-%s\n", deployment.Kind, deployment.Name)
of = metav1.GetControllerOfNoCopy(&deployment)
continue
}
}
}

View File

@@ -3,12 +3,15 @@ package util
import (
"bytes"
"context"
"encoding/json"
"fmt"
dockerterm "github.com/moby/term"
log "github.com/sirupsen/logrus"
"io"
appsv1 "k8s.io/api/apps/v1"
v12 "k8s.io/api/autoscaling/v1"
"k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
@@ -24,6 +27,7 @@ import (
"k8s.io/client-go/transport/spdy"
"k8s.io/client-go/util/retry"
"k8s.io/kubectl/pkg/cmd/exec"
//"kubevpn/remote"
"net"
"net/http"
"os"
@@ -137,15 +141,67 @@ func PortForwardPod(config *rest.Config, clientset *kubernetes.Clientset, podNam
return nil
}
func ScaleDeploymentReplicasTo(options *kubernetes.Clientset, namespace, name string, replicas int32) {
func GetTopController(clientset *kubernetes.Clientset, namespace, serviceName string) (resource string, name string) {
labels, _ := GetLabels(clientset, namespace, serviceName)
// todo verify it's correct or not
asSelector, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: labels})
get, _ := clientset.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: asSelector.String(),
})
if len(get.Items) == 0 {
return
}
of := metav1.GetControllerOf(&get.Items[0])
for of != nil {
b, err := clientset.AppsV1().RESTClient().Get().Namespace("test").
Name(of.Name).Resource(strings.ToLower(of.Kind) + "s").Do(context.Background()).Raw()
if k8serrors.IsNotFound(err) {
return
}
var replicaSet appsv1.ReplicaSet
if err = json.Unmarshal(b, &replicaSet); err == nil && len(replicaSet.Name) != 0 {
resource = strings.ToLower(replicaSet.Kind) + "s"
name = replicaSet.Name
of = metav1.GetControllerOfNoCopy(&replicaSet)
continue
}
var statefulSet appsv1.StatefulSet
if err = json.Unmarshal(b, &statefulSet); err == nil && len(statefulSet.Name) != 0 {
resource = strings.ToLower(statefulSet.Kind) + "s"
name = statefulSet.Name
of = metav1.GetControllerOfNoCopy(&statefulSet)
continue
}
var deployment appsv1.Deployment
if err = json.Unmarshal(b, &deployment); err == nil && len(deployment.Name) != 0 {
resource = strings.ToLower(deployment.Kind) + "s"
name = deployment.Name
of = metav1.GetControllerOfNoCopy(&deployment)
continue
}
}
return
}
// todo restore scale if replicaset is zero, needs to remember top controller type and name
func ScaleDeploymentReplicasTo(clientset *kubernetes.Clientset, namespace, serviceName string, replicas int32) {
controller, name := GetTopController(clientset, namespace, serviceName)
if len(controller) == 0 || len(name) == 0 {
log.Warnf("controller is empty, service: %s-%s", namespace, serviceName)
}
err := retry.OnError(
retry.DefaultRetry,
func(err error) bool { return err != nil },
func() error {
_, err := options.AppsV1().Deployments(namespace).UpdateScale(
context.TODO(),
name,
&v12.Scale{
result := &v12.Scale{}
err := clientset.AppsV1().RESTClient().Put().
Namespace(namespace).
Resource(controller).
Name(name).
SubResource("scale").
VersionedParams(&metav1.UpdateOptions{}, scheme.ParameterCodec).
Body(&v12.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
@@ -153,13 +209,13 @@ func ScaleDeploymentReplicasTo(options *kubernetes.Clientset, namespace, name st
Spec: v12.ScaleSpec{
Replicas: replicas,
},
},
metav1.UpdateOptions{},
)
}).
Do(context.Background()).
Into(result)
return err
})
if err != nil {
log.Errorf("update deployment: %s's replicas to %d failed, error: %v", name, replicas, err)
log.Errorf("update deployment: %s's replicas to %d failed, error: %v", serviceName, replicas, err)
}
}
@@ -222,3 +278,37 @@ func Shell(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config
func IsWindows() bool {
return runtime.GOOS == "windows"
}
func GetLabels(clientset *kubernetes.Clientset, namespace, service string) (map[string]string, []v1.ContainerPort) {
get, err := clientset.CoreV1().Services(namespace).
Get(context.TODO(), service, metav1.GetOptions{})
if err != nil {
log.Error(err)
return nil, nil
}
selector := get.Spec.Selector
newName := service + "-" + "shadow"
DeletePod(clientset, namespace, newName)
var ports []v1.ContainerPort
for _, port := range get.Spec.Ports {
val := port.TargetPort.IntVal
if val == 0 {
val = port.Port
}
ports = append(ports, v1.ContainerPort{
Name: port.Name,
ContainerPort: val,
Protocol: port.Protocol,
})
}
return selector, ports
}
func DeletePod(client *kubernetes.Clientset, namespace, podName string) {
zero := int64(0)
err := client.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{
GracePeriodSeconds: &zero,
})
if err != nil && k8serrors.IsNotFound(err) {
log.Infof("not found shadow pod: %s, no need to delete it", podName)
}
}