feat: lite mode not add route diy cidr

This commit is contained in:
fengcaiwen
2023-10-26 18:01:01 +08:00
committed by naison
parent 126eca32f8
commit 2d44c9b00d
13 changed files with 473 additions and 453 deletions

View File

@@ -81,7 +81,7 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
}
config.Image = req.Image
err = connect.DoConnect(sshCtx)
err = connect.DoConnect(sshCtx, true)
if err != nil {
log.Errorf("do connect error: %v", err)
connect.Cleanup()

View File

@@ -96,7 +96,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
}
config.Image = req.Image
err = svr.connect.DoConnect(sshCtx)
err = svr.connect.DoConnect(sshCtx, false)
if err != nil {
log.Errorf("do connect error: %v", err)
svr.connect.Cleanup()

View File

@@ -6,6 +6,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/daemon/rpc"
@@ -80,7 +81,9 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
} else {
log.Infof("try to disconnect from another cluster")
var disconnect rpc.Daemon_DisconnectClient
disconnect, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{})
disconnect, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{
ID: pointer.Int32(0),
})
if err != nil {
return err
}
@@ -90,6 +93,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) e
if err == io.EOF {
break
} else if err != nil {
log.Errorf("recv from disconnect failed, %v", err)
return err
}
err = resp.Send(&rpc.ConnectResponse{Message: recv.Message})

View File

@@ -20,9 +20,6 @@ func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error
log.Info("quit: cleanup connection")
svr.connect.Cleanup()
}
if svr.Cancel != nil {
svr.Cancel()
}
if svr.clone != nil {
log.Info("quit: cleanup clone")
err := svr.clone.Cleanup()
@@ -37,6 +34,10 @@ func (svr *Server) Quit(req *rpc.QuitRequest, resp rpc.Daemon_QuitServer) error
dns.CleanupHosts()
// last step is to quit GRPC server
if svr.Cancel != nil {
svr.Cancel()
}
return nil
}

View File

@@ -13,14 +13,13 @@ func (svr *Server) Status(ctx context.Context, request *rpc.StatusRequest) (*rpc
var sb = new(bytes.Buffer)
w := tabwriter.NewWriter(sb, 1, 1, 1, ' ', 0)
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", "ID", "Priority", "Cluster", "Kubeconfig", "Namespace", "Status")
var status, cluster, namespace, kubeconfig string
if svr.connect != nil {
status = "Connected"
cluster = svr.connect.GetKubeconfigCluster()
namespace = svr.connect.Namespace
kubeconfig = svr.connect.OriginKubeconfigPath
status := "Connected"
cluster := svr.connect.GetKubeconfigCluster()
namespace := svr.connect.Namespace
kubeconfig := svr.connect.OriginKubeconfigPath
_, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\t%s\n", 0, "Main", cluster, kubeconfig, namespace, status)
}
_, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\t%s\n", 0, "Main", cluster, kubeconfig, namespace, status)
for i, options := range svr.secondaryConnect {
_, _ = fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\t%s\n", i+1, "Minor", options.GetKubeconfigCluster(), options.OriginKubeconfigPath, options.Namespace, "Connected")

View File

@@ -42,6 +42,7 @@ import (
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/util/interrupt"
"k8s.io/kubectl/pkg/util/podutils"
"k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/daemon"
@@ -458,11 +459,12 @@ func DoDev(ctx context.Context, devOption *Options, conf *util.SshConfig, flags
// connect to cluster network on docker container or host
func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, conf *util.SshConfig, transferImage bool) (cancel func(), err error) {
connect := &handler.ConnectOptions{
Headers: d.Headers,
Workloads: []string{d.Workload},
ExtraCIDR: d.ExtraCIDR,
ExtraDomain: d.ExtraDomain,
Engine: d.Engine,
Headers: d.Headers,
Workloads: []string{d.Workload},
ExtraCIDR: d.ExtraCIDR,
ExtraDomain: d.ExtraDomain,
Engine: d.Engine,
OriginKubeconfigPath: util.GetKubeconfigPath(f),
}
if err = connect.InitClient(f); err != nil {
return
@@ -503,18 +505,19 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, conf *util.S
// not needs to ssh jump in daemon, because dev mode will hang up until user exit,
// so just ssh jump in client is enough
req := &rpc.ConnectRequest{
KubeconfigBytes: string(kubeconfig),
Namespace: ns,
Headers: connect.Headers,
Workloads: connect.Workloads,
ExtraCIDR: connect.ExtraCIDR,
ExtraDomain: connect.ExtraDomain,
UseLocalDNS: connect.UseLocalDNS,
Engine: string(connect.Engine),
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
SshJump: conf.ToRPC(),
KubeconfigBytes: string(kubeconfig),
Namespace: ns,
Headers: connect.Headers,
Workloads: connect.Workloads,
ExtraCIDR: connect.ExtraCIDR,
ExtraDomain: connect.ExtraDomain,
UseLocalDNS: connect.UseLocalDNS,
Engine: string(connect.Engine),
OriginKubeconfigPath: util.GetKubeconfigPath(f),
TransferImage: transferImage,
Image: config.Image,
Level: int32(log.DebugLevel),
SshJump: conf.ToRPC(),
}
cancel = disconnect(ctx, daemonCli)
var resp rpc.Daemon_ConnectClient
@@ -595,7 +598,9 @@ func (d *Options) doConnect(ctx context.Context, f cmdutil.Factory, conf *util.S
func disconnect(ctx context.Context, daemonClient rpc.DaemonClient) func() {
return func() {
resp, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{})
resp, err := daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{
ID: pointer.Int32(0),
})
if err != nil {
log.Errorf("disconnect error: %v", err)
return

View File

@@ -47,55 +47,57 @@ func (c *Config) AddServiceNameToHosts(ctx context.Context, serviceInterface v13
}
}
}
for {
select {
case <-ctx.Done():
return
default:
func() {
w, err := serviceInterface.Watch(ctx, v1.ListOptions{
Watch: true, ResourceVersion: serviceList.ResourceVersion,
})
go func() {
for {
select {
case <-ctx.Done():
return
default:
func() {
w, err := serviceInterface.Watch(ctx, v1.ListOptions{
Watch: true, ResourceVersion: serviceList.ResourceVersion,
})
if err != nil {
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
time.Sleep(time.Second * 5)
if err != nil {
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
time.Sleep(time.Second * 5)
}
return
}
return
}
defer w.Stop()
for {
select {
case event, ok := <-w.ResultChan():
if !ok {
return
defer w.Stop()
for {
select {
case event, ok := <-w.ResultChan():
if !ok {
return
}
if watch.Error == event.Type || watch.Bookmark == event.Type {
continue
}
if !rateLimiter.TryAccept() {
return
}
list, err := serviceInterface.List(ctx, v1.ListOptions{})
if err != nil {
return
}
entry := c.generateHostsEntry(list.Items, hosts)
if entry == "" {
continue
}
if entry == last {
continue
}
if err = c.updateHosts(entry); err != nil {
return
}
last = entry
}
if watch.Error == event.Type || watch.Bookmark == event.Type {
continue
}
if !rateLimiter.TryAccept() {
return
}
list, err := serviceInterface.List(ctx, v1.ListOptions{})
if err != nil {
return
}
entry := c.generateHostsEntry(list.Items, hosts)
if entry == "" {
continue
}
if entry == last {
continue
}
if err = c.updateHosts(entry); err != nil {
return
}
last = entry
}
}
}()
}()
}
}
}
}()
}
func (c *Config) updateHosts(str string) error {
@@ -163,13 +165,13 @@ func (c *Config) generateHostsEntry(list []v12.Service, hosts []Entry) string {
}
}
}
entryList = append(entryList, hosts...)
sort.SliceStable(entryList, func(i, j int) bool {
if entryList[i].Domain == entryList[j].Domain {
return entryList[i].IP > entryList[j].IP
}
return entryList[i].Domain > entryList[j].Domain
})
entryList = append(entryList, hosts...)
// if dns already works well, not needs to add it to hosts file
for i := 0; i < len(entryList); i++ {

View File

@@ -204,7 +204,7 @@ func Rollback(f cmdutil.Factory, ns, workload string) {
}
}
func (c *ConnectOptions) DoConnect(ctx context.Context) (err error) {
func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error) {
c.ctx, c.cancel = context.WithCancel(ctx)
log.Info("start to connect")
@@ -253,7 +253,7 @@ func (c *ConnectOptions) DoConnect(ctx context.Context) (err error) {
forward := fmt.Sprintf("tcp://127.0.0.1:%d", rawTCPForwardPort)
core.GvisorTCPForwardAddr = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorTCPForwardPort)
core.GvisorUDPForwardAddr = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorUDPForwardPort)
if err = c.startLocalTunServe(c.ctx, forward); err != nil {
if err = c.startLocalTunServe(c.ctx, forward, isLite); err != nil {
log.Errorf("start local tun service failed: %v", err)
return
}
@@ -371,12 +371,15 @@ func checkPodStatus(cCtx context.Context, cFunc context.CancelFunc, podName stri
}
}
func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string) (err error) {
func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string, lite bool) (err error) {
// todo figure it out why
if util.IsWindows() {
c.localTunIPv4.Mask = net.CIDRMask(0, 32)
}
var list = sets.New[string](config.CIDR.String())
var list = sets.New[string]()
if !lite {
list.Insert(config.CIDR.String())
}
for _, ipNet := range c.cidrs {
list.Insert(ipNet.String())
}
@@ -658,7 +661,7 @@ func (c *ConnectOptions) setupDNS(ctx context.Context) error {
return err
}
// dump service in current namespace for support DNS resolve service:port
go c.dnsConfig.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.Namespace), c.extraHost...)
c.dnsConfig.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.Namespace), c.extraHost...)
return nil
}

View File

@@ -4,6 +4,9 @@ import (
"fmt"
"net"
"strings"
"time"
"github.com/prometheus-community/pro-bing"
)
func GetTunDevice(ips ...net.IP) (*net.Interface, error) {
@@ -51,3 +54,28 @@ func GetTunDeviceByConn(tun net.Conn) (*net.Interface, error) {
}
return nil, fmt.Errorf("can not found any interface with ip %v", ip)
}
func Ping(targetIP string) (bool, error) {
pinger, err := probing.NewPinger(targetIP)
if err != nil {
return false, err
}
pinger.SetLogger(nil)
pinger.SetPrivileged(true)
pinger.Count = 3
pinger.Timeout = time.Millisecond * 1500
err = pinger.Run() // Blocks until finished.
if err != nil {
return false, err
}
stat := pinger.Statistics()
return stat.PacketsRecv == stat.PacketsSent, err
}
func IsIPv4(packet []byte) bool {
return 4 == (packet[0] >> 4)
}
func IsIPv6(packet []byte) bool {
return 6 == (packet[0] >> 4)
}

View File

@@ -1,16 +0,0 @@
package util
import (
"fmt"
"net"
"testing"
)
func TestGetDevice(t *testing.T) {
ip := net.ParseIP("223.254.0.104")
device, err := GetTunDevice(ip)
if err != nil {
t.Error(err)
}
fmt.Println(device.Name)
}

View File

@@ -7,14 +7,34 @@ import (
"io"
"math/rand"
"net"
"net/http"
"os"
"strings"
"text/tabwriter"
"time"
"github.com/moby/term"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/constraints"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
"k8s.io/kubectl/pkg/cmd/exec"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/podutils"
"github.com/wencaiwulue/kubevpn/pkg/config"
)
@@ -123,3 +143,206 @@ func Heartbeats() {
}
}
}
func WaitPod(podInterface v12.PodInterface, list v1.ListOptions, checker func(*corev1.Pod) bool) error {
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30)
defer cancelFunc()
w, err := podInterface.Watch(ctx, list)
if err != nil {
return err
}
defer w.Stop()
for {
select {
case e := <-w.ResultChan():
if pod, ok := e.Object.(*corev1.Pod); ok {
if checker(pod) {
return nil
}
}
case <-ctx.Done():
return errors.New("wait for pod to be ready timeout")
}
}
}
func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, namespace string, portPair []string, readyChan chan struct{}, stopChan <-chan struct{}) error {
url := clientset.
Post().
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("portforward").
URL()
transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
logrus.Errorf("create spdy roundtripper error: %s", err.Error())
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
forwarder, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, portPair, stopChan, readyChan, nil, os.Stderr)
if err != nil {
logrus.Errorf("create port forward error: %s", err.Error())
return err
}
if err = forwarder.ForwardPorts(); err != nil {
logrus.Errorf("forward port error: %s", err.Error())
return err
}
return nil
}
func GetTopOwnerReference(factory util.Factory, namespace, workload string) (*resource.Info, error) {
for {
object, err := GetUnstructuredObject(factory, namespace, workload)
if err != nil {
return nil, err
}
ownerReference := v1.GetControllerOf(object.Object.(*unstructured.Unstructured))
if ownerReference == nil {
return object, nil
}
// apiVersion format is Group/Version is like: apps/v1, apps.kruise.io/v1beta1
version, err := schema.ParseGroupVersion(ownerReference.APIVersion)
if err != nil {
return object, nil
}
gk := v1.GroupKind{
Group: version.Group,
Kind: ownerReference.Kind,
}
workload = fmt.Sprintf("%s/%s", gk.String(), ownerReference.Name)
}
}
// GetTopOwnerReferenceBySelector assume pods, controller has same labels
func GetTopOwnerReferenceBySelector(factory util.Factory, namespace, selector string) (sets.Set[string], error) {
object, err := GetUnstructuredObjectBySelector(factory, namespace, selector)
if err != nil {
return nil, err
}
set := sets.New[string]()
for _, info := range object {
reference, err := GetTopOwnerReference(factory, namespace, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
if err == nil && reference.Mapping.Resource.Resource != "services" {
set.Insert(fmt.Sprintf("%s/%s", reference.Mapping.GroupVersionKind.GroupKind().String(), reference.Name))
}
}
return set, nil
}
func Shell(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, containerName, namespace string, cmd []string) (string, error) {
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, v1.GetOptions{})
if err != nil {
return "", err
}
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
err = fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase)
return "", err
}
if containerName == "" {
containerName = pod.Spec.Containers[0].Name
}
stdin, _, _ := term.StdStreams()
stdoutBuf := bytes.NewBuffer(nil)
stdout := io.MultiWriter(stdoutBuf)
StreamOptions := exec.StreamOptions{
Namespace: namespace,
PodName: podName,
ContainerName: containerName,
IOStreams: genericclioptions.IOStreams{In: stdin, Out: stdout, ErrOut: nil},
}
Executor := &exec.DefaultRemoteExecutor{}
// ensure we can recover the terminal while attached
tt := StreamOptions.SetupTTY()
var sizeQueue remotecommand.TerminalSizeQueue
if tt.Raw {
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = tt.MonitorSize(tt.GetSize())
// unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is
// true
StreamOptions.ErrOut = nil
}
fn := func() error {
req := restclient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
req.VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Command: cmd,
Stdin: StreamOptions.Stdin,
Stdout: StreamOptions.Out != nil,
Stderr: StreamOptions.ErrOut != nil,
TTY: tt.Raw,
}, scheme.ParameterCodec)
return Executor.Execute("POST", req.URL(), config, StreamOptions.In, StreamOptions.Out, StreamOptions.ErrOut, tt.Raw, sizeQueue)
}
err = tt.Safe(fn)
return strings.TrimRight(stdoutBuf.String(), "\n"), err
}
func WaitPodToBeReady(ctx context.Context, podInterface v12.PodInterface, selector v1.LabelSelector) error {
watchStream, err := podInterface.Watch(ctx, v1.ListOptions{
LabelSelector: fields.SelectorFromSet(selector.MatchLabels).String(),
})
if err != nil {
return err
}
defer watchStream.Stop()
var last string
for {
select {
case e, ok := <-watchStream.ResultChan():
if !ok {
return fmt.Errorf("can not wait pod to be ready because of watch chan has closed")
}
if podT, ok := e.Object.(*corev1.Pod); ok {
if podT.DeletionTimestamp != nil {
continue
}
var sb = bytes.NewBuffer(nil)
sb.WriteString(fmt.Sprintf("pod [%s] status is %s\n", podT.Name, podT.Status.Phase))
PrintStatus(podT, sb)
if last != sb.String() {
logrus.Infof(sb.String())
}
if podutils.IsPodReady(podT) && func() bool {
for _, status := range podT.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
}() {
return nil
}
last = sb.String()
}
case <-time.Tick(time.Minute * 60):
return errors.New(fmt.Sprintf("wait pod to be ready timeout"))
}
}
}
func AllContainerIsRunning(pod *corev1.Pod) bool {
isReady := podutils.IsPodReady(pod)
if !isReady {
return false
}
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
}

128
pkg/util/unstructure.go Normal file
View File

@@ -0,0 +1,128 @@
package util
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/cmd/util"
)
func GetUnstructuredObject(f util.Factory, namespace string, workloads string) (*resource.Info, error) {
do := f.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, workloads).
ContinueOnError().
Latest().
Flatten().
TransformRequests(func(req *rest.Request) { req.Param("includeObject", "Object") }).
Do()
if err := do.Err(); err != nil {
logrus.Warn(err)
return nil, err
}
infos, err := do.Infos()
if err != nil {
logrus.Println(err)
return nil, err
}
if len(infos) == 0 {
return nil, fmt.Errorf("not found workloads %s", workloads)
}
return infos[0], err
}
func GetUnstructuredObjectList(f util.Factory, namespace string, workloads []string) ([]*resource.Info, error) {
do := f.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, workloads...).
ContinueOnError().
Latest().
Flatten().
TransformRequests(func(req *rest.Request) { req.Param("includeObject", "Object") }).
Do()
if err := do.Err(); err != nil {
logrus.Warn(err)
return nil, err
}
infos, err := do.Infos()
if err != nil {
return nil, err
}
if len(infos) == 0 {
return nil, fmt.Errorf("not found resource %v", workloads)
}
return infos, err
}
func GetUnstructuredObjectBySelector(f util.Factory, namespace string, selector string) ([]*resource.Info, error) {
do := f.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, "all").
LabelSelector(selector).
ContinueOnError().
Latest().
Flatten().
TransformRequests(func(req *rest.Request) { req.Param("includeObject", "Object") }).
Do()
if err := do.Err(); err != nil {
logrus.Warn(err)
return nil, err
}
infos, err := do.Infos()
if err != nil {
logrus.Println(err)
return nil, err
}
if len(infos) == 0 {
return nil, errors.New("Not found")
}
return infos, err
}
func GetPodTemplateSpecPath(u *unstructured.Unstructured) (*v1.PodTemplateSpec, []string, error) {
var stringMap map[string]interface{}
var b bool
var err error
var path []string
if stringMap, b, err = unstructured.NestedMap(u.Object, "spec", "template"); b && err == nil {
path = []string{"spec", "template"}
} else if stringMap, b, err = unstructured.NestedMap(u.Object); b && err == nil {
path = []string{}
} else {
return nil, nil, err
}
marshal, err := json.Marshal(stringMap)
if err != nil {
return nil, nil, err
}
var p v1.PodTemplateSpec
if err = json.Unmarshal(marshal, &p); err != nil {
return nil, nil, err
}
return &p, path, nil
}
func GetAnnotation(f util.Factory, ns string, resources string) (map[string]string, error) {
ownerReference, err := GetTopOwnerReference(f, ns, resources)
if err != nil {
return nil, err
}
u, ok := ownerReference.Object.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("can not convert to unstaructed")
}
annotations := u.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
return annotations, nil
}

View File

@@ -6,7 +6,6 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net"
@@ -19,36 +18,22 @@ import (
"strings"
"time"
dockerterm "github.com/moby/term"
"github.com/pkg/errors"
probing "github.com/prometheus-community/pro-bing"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
runtimeresource "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/transport/spdy"
"k8s.io/client-go/util/retry"
"k8s.io/kubectl/pkg/cmd/exec"
"k8s.io/kubectl/pkg/cmd/util"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/util/podutils"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/driver"
@@ -80,254 +65,10 @@ func GetAvailableTCPPortOrDie() (int, error) {
return listener.Addr().(*net.TCPAddr).Port, nil
}
func WaitPod(podInterface v12.PodInterface, list metav1.ListOptions, checker func(*v1.Pod) bool) error {
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*30)
defer cancelFunc()
w, err := podInterface.Watch(ctx, list)
if err != nil {
return err
}
defer w.Stop()
for {
select {
case e := <-w.ResultChan():
if pod, ok := e.Object.(*v1.Pod); ok {
if checker(pod) {
return nil
}
}
case <-ctx.Done():
return errors.New("wait for pod to be ready timeout")
}
}
}
func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, namespace string, portPair []string, readyChan chan struct{}, stopChan <-chan struct{}) error {
url := clientset.
Post().
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("portforward").
URL()
transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
log.Errorf("create spdy roundtripper error: %s", err.Error())
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
forwarder, err := portforward.NewOnAddresses(dialer, []string{"localhost"}, portPair, stopChan, readyChan, nil, os.Stderr)
if err != nil {
log.Errorf("create port forward error: %s", err.Error())
return err
}
if err = forwarder.ForwardPorts(); err != nil {
log.Errorf("forward port error: %s", err.Error())
return err
}
return nil
}
func GetTopOwnerReference(factory cmdutil.Factory, namespace, workload string) (*runtimeresource.Info, error) {
for {
object, err := GetUnstructuredObject(factory, namespace, workload)
if err != nil {
return nil, err
}
ownerReference := metav1.GetControllerOf(object.Object.(*unstructured.Unstructured))
if ownerReference == nil {
return object, nil
}
// apiVersion format is Group/Version is like: apps/v1, apps.kruise.io/v1beta1
version, err := schema.ParseGroupVersion(ownerReference.APIVersion)
if err != nil {
return object, nil
}
gk := metav1.GroupKind{
Group: version.Group,
Kind: ownerReference.Kind,
}
workload = fmt.Sprintf("%s/%s", gk.String(), ownerReference.Name)
}
}
// GetTopOwnerReferenceBySelector assume pods, controller has same labels
func GetTopOwnerReferenceBySelector(factory cmdutil.Factory, namespace, selector string) (sets.Set[string], error) {
object, err := GetUnstructuredObjectBySelector(factory, namespace, selector)
if err != nil {
return nil, err
}
set := sets.New[string]()
for _, info := range object {
reference, err := GetTopOwnerReference(factory, namespace, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
if err == nil && reference.Mapping.Resource.Resource != "services" {
set.Insert(fmt.Sprintf("%s/%s", reference.Mapping.GroupVersionKind.GroupKind().String(), reference.Name))
}
}
return set, nil
}
func Shell(clientset *kubernetes.Clientset, restclient *rest.RESTClient, config *rest.Config, podName, containerName, namespace string, cmd []string) (string, error) {
pod, err := clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return "", err
}
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
err = fmt.Errorf("cannot exec into a container in a completed pod; current phase is %s", pod.Status.Phase)
return "", err
}
if containerName == "" {
containerName = pod.Spec.Containers[0].Name
}
stdin, _, _ := dockerterm.StdStreams()
stdoutBuf := bytes.NewBuffer(nil)
stdout := io.MultiWriter(stdoutBuf)
StreamOptions := exec.StreamOptions{
Namespace: namespace,
PodName: podName,
ContainerName: containerName,
IOStreams: genericclioptions.IOStreams{In: stdin, Out: stdout, ErrOut: nil},
}
Executor := &exec.DefaultRemoteExecutor{}
// ensure we can recover the terminal while attached
tt := StreamOptions.SetupTTY()
var sizeQueue remotecommand.TerminalSizeQueue
if tt.Raw {
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = tt.MonitorSize(tt.GetSize())
// unset p.Err if it was previously set because both stdout and stderr go over p.Out when tty is
// true
StreamOptions.ErrOut = nil
}
fn := func() error {
req := restclient.Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
req.VersionedParams(&v1.PodExecOptions{
Container: containerName,
Command: cmd,
Stdin: StreamOptions.Stdin,
Stdout: StreamOptions.Out != nil,
Stderr: StreamOptions.ErrOut != nil,
TTY: tt.Raw,
}, scheme.ParameterCodec)
return Executor.Execute("POST", req.URL(), config, StreamOptions.In, StreamOptions.Out, StreamOptions.ErrOut, tt.Raw, sizeQueue)
}
err = tt.Safe(fn)
return strings.TrimRight(stdoutBuf.String(), "\n"), err
}
func IsWindows() bool {
return runtime.GOOS == "windows"
}
func GetUnstructuredObject(f cmdutil.Factory, namespace string, workloads string) (*runtimeresource.Info, error) {
do := f.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, workloads).
ContinueOnError().
Latest().
Flatten().
TransformRequests(func(req *rest.Request) { req.Param("includeObject", "Object") }).
Do()
if err := do.Err(); err != nil {
log.Warn(err)
return nil, err
}
infos, err := do.Infos()
if err != nil {
log.Println(err)
return nil, err
}
if len(infos) == 0 {
return nil, fmt.Errorf("not found workloads %s", workloads)
}
return infos[0], err
}
func GetUnstructuredObjectList(f cmdutil.Factory, namespace string, workloads []string) ([]*runtimeresource.Info, error) {
do := f.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, workloads...).
ContinueOnError().
Latest().
Flatten().
TransformRequests(func(req *rest.Request) { req.Param("includeObject", "Object") }).
Do()
if err := do.Err(); err != nil {
log.Warn(err)
return nil, err
}
infos, err := do.Infos()
if err != nil {
return nil, err
}
if len(infos) == 0 {
return nil, fmt.Errorf("not found resource %v", workloads)
}
return infos, err
}
func GetUnstructuredObjectBySelector(f cmdutil.Factory, namespace string, selector string) ([]*runtimeresource.Info, error) {
do := f.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, "all").
LabelSelector(selector).
ContinueOnError().
Latest().
Flatten().
TransformRequests(func(req *rest.Request) { req.Param("includeObject", "Object") }).
Do()
if err := do.Err(); err != nil {
log.Warn(err)
return nil, err
}
infos, err := do.Infos()
if err != nil {
log.Println(err)
return nil, err
}
if len(infos) == 0 {
return nil, errors.New("Not found")
}
return infos, err
}
func GetPodTemplateSpecPath(u *unstructured.Unstructured) (*v1.PodTemplateSpec, []string, error) {
var stringMap map[string]interface{}
var b bool
var err error
var path []string
if stringMap, b, err = unstructured.NestedMap(u.Object, "spec", "template"); b && err == nil {
path = []string{"spec", "template"}
} else if stringMap, b, err = unstructured.NestedMap(u.Object); b && err == nil {
path = []string{}
} else {
return nil, nil, err
}
marshal, err := json.Marshal(stringMap)
if err != nil {
return nil, nil, err
}
var p v1.PodTemplateSpec
if err = json.Unmarshal(marshal, &p); err != nil {
return nil, nil, err
}
return &p, path, nil
}
func BytesToInt(b []byte) uint32 {
buffer := bytes.NewBuffer(b)
var u uint32
@@ -337,23 +78,6 @@ func BytesToInt(b []byte) uint32 {
return u
}
func Ping(targetIP string) (bool, error) {
pinger, err := probing.NewPinger(targetIP)
if err != nil {
return false, err
}
pinger.SetLogger(nil)
pinger.SetPrivileged(true)
pinger.Count = 3
pinger.Timeout = time.Millisecond * 1500
err = pinger.Run() // Blocks until finished.
if err != nil {
return false, err
}
stat := pinger.Statistics()
return stat.PacketsRecv == stat.PacketsSent, err
}
func RolloutStatus(ctx1 context.Context, factory cmdutil.Factory, namespace, workloads string, timeout time.Duration) (err error) {
log.Infof("rollout status for %s", workloads)
defer func() {
@@ -506,22 +230,6 @@ func IsPortListening(port int) bool {
}
}
func GetAnnotation(f util.Factory, ns string, resources string) (map[string]string, error) {
ownerReference, err := GetTopOwnerReference(f, ns, resources)
if err != nil {
return nil, err
}
u, ok := ownerReference.Object.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("can not convert to unstaructed")
}
annotations := u.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
return annotations, nil
}
func CanI(clientset *kubernetes.Clientset, sa, ns string, resource *rbacv1.PolicyRule) (allowed bool, err error) {
var roleBindingList *rbacv1.RoleBindingList
roleBindingList, err = clientset.RbacV1().RoleBindings(ns).List(context.Background(), metav1.ListOptions{})
@@ -607,14 +315,6 @@ func GetTlsDomain(namespace string) string {
return config.ConfigMapPodTrafficManager + "." + namespace + "." + "svc"
}
func IsIPv4(packet []byte) bool {
return 4 == (packet[0] >> 4)
}
func IsIPv6(packet []byte) bool {
return 6 == (packet[0] >> 4)
}
func Deduplicate(cidr []*net.IPNet) (result []*net.IPNet) {
var set = sets.New[string]()
for _, ipNet := range cidr {
@@ -626,19 +326,6 @@ func Deduplicate(cidr []*net.IPNet) (result []*net.IPNet) {
return
}
func AllContainerIsRunning(pod *v1.Pod) bool {
isReady := podutils.IsPodReady(pod)
if !isReady {
return false
}
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
}
func CleanExtensionLib() {
if !IsWindows() {
return
@@ -666,50 +353,6 @@ func CleanExtensionLib() {
MoveToTemp()
}
func WaitPodToBeReady(ctx context.Context, podInterface v12.PodInterface, selector metav1.LabelSelector) error {
watchStream, err := podInterface.Watch(ctx, metav1.ListOptions{
LabelSelector: fields.SelectorFromSet(selector.MatchLabels).String(),
})
if err != nil {
return err
}
defer watchStream.Stop()
var last string
for {
select {
case e, ok := <-watchStream.ResultChan():
if !ok {
return fmt.Errorf("can not wait pod to be ready because of watch chan has closed")
}
if podT, ok := e.Object.(*v1.Pod); ok {
if podT.DeletionTimestamp != nil {
continue
}
var sb = bytes.NewBuffer(nil)
sb.WriteString(fmt.Sprintf("pod [%s] status is %s\n", podT.Name, podT.Status.Phase))
PrintStatus(podT, sb)
if last != sb.String() {
log.Infof(sb.String())
}
if podutils.IsPodReady(podT) && func() bool {
for _, status := range podT.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
}() {
return nil
}
last = sb.String()
}
case <-time.Tick(time.Minute * 60):
return errors.New(fmt.Sprintf("wait pod to be ready timeout"))
}
}
}
func Print(writer io.Writer, slogan string) {
length := len(slogan) + 4 + 4
var sb strings.Builder