Files
kubevpn/util/util.go
2021-08-07 20:20:56 +08:00

237 lines
7.0 KiB
Go

package util
import (
"bytes"
"context"
"fmt"
dockerterm "github.com/moby/term"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"io"
v12 "k8s.io/api/autoscaling/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
clientgowatch "k8s.io/client-go/tools/watch"
"k8s.io/client-go/transport/spdy"
"k8s.io/client-go/util/retry"
"k8s.io/kubectl/pkg/cmd/exec"
"net"
"net/http"
"os"
"strings"
"time"
)
func WaitResource(client *kubernetes.Clientset, getter cache.Getter, namespace, apiVersion, kind string, list metav1.ListOptions, checker func(interface{}) bool) error {
groupResources, _ := restmapper.GetAPIGroupResources(client)
mapper := restmapper.NewDiscoveryRESTMapper(groupResources)
groupVersionKind := schema.FromAPIVersionAndKind(apiVersion, kind)
mapping, err := mapper.RESTMapping(groupVersionKind.GroupKind(), groupVersionKind.Version)
if err != nil {
log.Error(err)
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
watchlist := cache.NewFilteredListWatchFromClient(
getter,
mapping.Resource.Resource,
namespace,
func(options *metav1.ListOptions) {
options.LabelSelector = list.LabelSelector
options.FieldSelector = list.FieldSelector
options.Watch = list.Watch
},
)
preConditionFunc := func(store cache.Store) (bool, error) {
if len(store.List()) == 0 {
return false, nil
}
for _, p := range store.List() {
if !checker(p) {
return false, nil
}
}
return true, nil
}
conditionFunc := func(e watch.Event) (bool, error) { return checker(e.Object), nil }
object, err := scheme.Scheme.New(mapping.GroupVersionKind)
if err != nil {
return err
}
event, err := clientgowatch.UntilWithSync(ctx, watchlist, object, preConditionFunc, conditionFunc)
if err != nil {
log.Infof("wait to ready failed, error: %v, event: %v", err, event)
return err
}
return nil
}
func GetAvailablePortOrDie() int {
address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", "0.0.0.0"))
if err != nil {
log.Fatal(err)
}
listener, err := net.ListenTCP("tcp", address)
if err != nil {
log.Fatal(err)
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port
}
func WaitPod(clientset *kubernetes.Clientset, namespace string, list metav1.ListOptions, checker func(*v1.Pod) bool) error {
return WaitResource(
clientset,
clientset.CoreV1().RESTClient(),
namespace,
"v1",
"Pod",
list,
func(i interface{}) bool { return checker(i.(*v1.Pod)) },
)
}
func PortForwardPod(config *rest.Config, clientset *kubernetes.Clientset, podName, namespace, portPair string, readyChan, stopChan chan struct{}) error {
url := clientset.CoreV1().
RESTClient().
Post().
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("portforward").
URL()
transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
log.Error(err)
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
p := []string{portPair}
forwarder, err := portforward.New(dialer, p, stopChan, readyChan, os.Stdout, os.Stderr)
if err != nil {
log.Error(err)
return err
}
if err = forwarder.ForwardPorts(); err != nil {
log.Error(err)
return err
}
return nil
}
func ScaleDeploymentReplicasTo(options *kubernetes.Clientset, name, namespace string, replicas int32) {
err := retry.OnError(
retry.DefaultRetry,
func(err error) bool { return err != nil },
func() error {
_, err := options.AppsV1().Deployments(namespace).
UpdateScale(context.TODO(), name, &v12.Scale{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
Spec: v12.ScaleSpec{Replicas: replicas},
}, metav1.UpdateOptions{})
return err
})
if err != nil {
log.Errorf("update deployment: %s's replicas to %d failed, error: %v", name, replicas, err)
}
}
func Shell(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, namespace, cmd string) (string, error) {
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return "", err
}
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
err = fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase)
return "", err
}
containerName := pod.Spec.Containers[0].Name
stdin, stdout, stderr := dockerterm.StdStreams()
stdoutBuf := bytes.NewBuffer(nil)
stdout = io.MultiWriter(stdout, stdoutBuf)
StreamOptions := exec.StreamOptions{
Namespace: namespace,
PodName: podName,
ContainerName: containerName,
IOStreams: genericclioptions.IOStreams{In: stdin, Out: stdout, ErrOut: stderr},
}
Executor := &exec.DefaultRemoteExecutor{}
// ensure we can recover the terminal while attached
tt := StreamOptions.SetupTTY()
var sizeQueue remotecommand.TerminalSizeQueue
if tt.Raw {
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = tt.MonitorSize(tt.GetSize())
// unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is
// true
StreamOptions.ErrOut = nil
}
fn := func() error {
req := restclient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
req.VersionedParams(&v1.PodExecOptions{
Container: containerName,
Command: []string{"sh", "-c", cmd},
Stdin: StreamOptions.Stdin,
Stdout: StreamOptions.Out != nil,
Stderr: StreamOptions.ErrOut != nil,
TTY: tt.Raw,
}, scheme.ParameterCodec)
return Executor.Execute("POST", req.URL(), config, StreamOptions.In, StreamOptions.Out, StreamOptions.ErrOut, tt.Raw, sizeQueue)
}
err = tt.Safe(fn)
return strings.TrimRight(stdoutBuf.String(), "\n"), err
}
func GetDNSIp(clientset *kubernetes.Clientset) (string, error) {
serviceList, err := clientset.CoreV1().Services(metav1.NamespaceSystem).List(context.Background(), metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", "kube-dns").String(),
})
if err != nil {
return "", err
}
if len(serviceList.Items) == 0 {
return "", errors.New("Not found kube-dns")
}
return serviceList.Items[0].Spec.ClusterIP, nil
}
func GetDNSServiceIpFromPod(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, namespace string) string {
if ip, err := GetDNSIp(clientset); err == nil && len(ip) != 0 {
return ip
}
if ip, err := Shell(clientset, restclient, config, TrafficManager, namespace, "cat /etc/resolv.conf | grep nameserver | awk '{print$2}'"); err == nil && len(ip) != 0 {
return ip
}
log.Fatal("this should not happened")
return ""
}