diff --git a/Makefile b/Makefile index bdfff4b3..69fe3c91 100644 --- a/Makefile +++ b/Makefile @@ -108,7 +108,7 @@ gen: .PHONY: ut ut: - go test -tags=noassets -coverprofile=coverage.txt -coverpkg=./... -v ./... -timeout=60m + go test -tags=noassets -coverprofile=coverage.txt -coverpkg=./... -test.v ./... -timeout=60m .PHONY: cover cover: ut diff --git a/pkg/handler/function_test.go b/pkg/handler/function_test.go index 73d3bb92..e3140984 100644 --- a/pkg/handler/function_test.go +++ b/pkg/handler/function_test.go @@ -3,11 +3,11 @@ package handler import ( "context" "fmt" + "io" "net" "net/http" + "os" "os/exec" - "reflect" - "runtime" "sync" "testing" "time" @@ -16,8 +16,6 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/util/intstr" - json2 "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" @@ -25,7 +23,6 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/retry" cmdutil "k8s.io/kubectl/pkg/cmd/util" - "sigs.k8s.io/yaml" plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/util" @@ -37,157 +34,149 @@ var ( restconfig *rest.Config ) +const ( + local = `{"status": "Reviews is healthy on local pc"}` + remote = `{"status": "Reviews is healthy"}` +) + func TestFunctions(t *testing.T) { + // 1) test connect Init() kubevpnConnect(t) - kubevpnStatus(t) - t.Run(runtime.FuncForPC(reflect.ValueOf(pingPodIP).Pointer()).Name(), pingPodIP) - t.Run(runtime.FuncForPC(reflect.ValueOf(dialUDP).Pointer()).Name(), dialUDP) - t.Run(runtime.FuncForPC(reflect.ValueOf(healthCheckPod).Pointer()).Name(), healthCheckPod) - t.Run(runtime.FuncForPC(reflect.ValueOf(healthCheckService).Pointer()).Name(), healthCheckService) - t.Run(runtime.FuncForPC(reflect.ValueOf(shortDomain).Pointer()).Name(), shortDomain) - t.Run(runtime.FuncForPC(reflect.ValueOf(fullDomain).Pointer()).Name(), fullDomain) - kubevpnStatus(t) + commonTest(t) + + // 2) test proxy mode + kubevpnProxy(t) + commonTest(t) + t.Run("testUDP", testUDP) + t.Run("proxyServiceReviewsServiceIP", proxyServiceReviewsServiceIP) + t.Run("proxyServiceReviewsPodIP", proxyServiceReviewsPodIP) + + // 3) test proxy mode with service mesh + kubevpnLeave(t) + kubevpnProxyWithServiceMesh(t) + commonTest(t) + t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP) + t.Run("serviceMeshReviewsPodIP", serviceMeshReviewsPodIP) + + // 4) test proxy mode with service mesh and gvisor + kubevpnLeave(t) + kubevpnUninstall(t) + kubevpnProxyWithServiceMeshAndGvisorMode(t) + commonTest(t) + t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP) + kubevpnQuit(t) +} + +func commonTest(t *testing.T) { + // 1) test domain access + t.Run("kubevpnStatus", kubevpnStatus) + t.Run("pingPodIP", pingPodIP) + t.Run("healthCheckPodAuthors", healthCheckPodAuthors) + t.Run("healthCheckServiceAuthors", healthCheckServiceAuthors) + t.Run("shortDomainAuthors", shortDomainAuthors) + t.Run("fullDomainAuthors", fullDomainAuthors) } func pingPodIP(t *testing.T) { list, err := clientset.CoreV1().Pods(namespace).List(context.Background(), v1.ListOptions{}) if err != nil { - t.Error(err) + t.Fatal(err) } var wg = &sync.WaitGroup{} for _, item := range list.Items { - if item.Status.Phase == corev1.PodRunning { - wg.Add(1) - go func() { - defer wg.Done() - command := exec.Command("ping", "-c", "4", item.Status.PodIP) - if err = command.Run(); err == nil { - if !command.ProcessState.Success() { - t.Errorf("Failed to ping IP: %s of pod: %s", item.Status.PodIP, item.Name) - } - } - }() + if item.Status.Phase != corev1.PodRunning { + continue } + wg.Add(1) + go func() { + defer wg.Done() + cmd := exec.Command("ping", "-c", "4", item.Status.PodIP) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err = cmd.Run() + if err != nil || !cmd.ProcessState.Success() { + t.Errorf("Failed to ping IP: %s of pod: %s", item.Status.PodIP, item.Name) + } + }() } wg.Wait() } -func healthCheckPod(t *testing.T) { +func healthCheckPodAuthors(t *testing.T) { var app = "authors" - podList, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), v1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector("app", app).String(), - }) - if err != nil { - t.Error(err) - } - if len(podList.Items) == 0 { - t.Error("Failed to found pods of authors") - } - for _, pod := range podList.Items { - pod := pod - if pod.Status.Phase != corev1.PodRunning { - continue - } - endpoint := fmt.Sprintf("http://%s:%v/health", pod.Status.PodIP, pod.Spec.Containers[0].Ports[0].ContainerPort) - req, _ := http.NewRequest("GET", endpoint, nil) - var res *http.Response - err = retry.OnError( - wait.Backoff{Duration: time.Second, Factor: 2, Jitter: 0.2, Steps: 5}, - func(err error) bool { - return err != nil - }, - func() error { - res, err = http.DefaultClient.Do(req) - return err - }, - ) - if err != nil { - t.Error(err) - } - if res == nil || res.StatusCode != 200 { - t.Errorf("health check not pass") - } - } -} - -func healthCheckService(t *testing.T) { - var app = "authors" - serviceList, err := clientset.CoreV1().Services(namespace).List(context.TODO(), v1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector("app", app).String(), - }) - if err != nil { - t.Error(err) - } - if len(serviceList.Items) == 0 { - t.Error("Failed to found pods of authors") - } - endpoint := fmt.Sprintf("http://%s:%v/health", serviceList.Items[0].Spec.ClusterIP, serviceList.Items[0].Spec.Ports[0].Port) - req, _ := http.NewRequest("GET", endpoint, nil) - var res *http.Response - err = retry.OnError( - wait.Backoff{Duration: time.Second, Factor: 2, Jitter: 0.2, Steps: 5}, - func(err error) bool { - return err != nil - }, - func() error { - res, err = http.DefaultClient.Do(req) - return err - }, - ) - if err != nil { - t.Error(err) - } - if res == nil || res.StatusCode != 200 { - t.Errorf("health check not pass") - return - } -} - -func shortDomain(t *testing.T) { - var app = "authors" - serviceList, err := clientset.CoreV1().Services(namespace).List(context.TODO(), v1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector("app", app).String(), - }) - if err != nil { - t.Error(err) - } - if len(serviceList.Items) == 0 { - t.Errorf("Failed to found pods of %s", app) - } - endpoint := fmt.Sprintf("http://%s:%v/health", app, serviceList.Items[0].Spec.Ports[0].Port) - req, _ := http.NewRequest("GET", endpoint, nil) - var res *http.Response - err = retry.OnError( - wait.Backoff{Duration: time.Second, Factor: 2, Jitter: 0.2, Steps: 5}, - func(err error) bool { - return err != nil - }, - func() error { - res, err = http.DefaultClient.Do(req) - return err - }, - ) - if err != nil { - t.Error(err) - } - if res == nil || res.StatusCode != 200 { - t.Errorf("health check not pass") - } -} - -func fullDomain(t *testing.T) { - var app = "authors" - serviceList, err := clientset.CoreV1().Services(namespace).List(context.TODO(), v1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector("app", app).String(), - }) + ip, err := getPodIP(app) if err != nil { t.Fatal(err) } - if len(serviceList.Items) == 0 { - t.Fatalf("Failed to found pods of %s", app) + endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080) + healthChecker(t, endpoint, nil, "") +} + +func healthChecker(t *testing.T, endpoint string, header map[string]string, keyword string) { + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + t.Fatal(err) + } + for k, v := range header { + req.Header.Add(k, v) } + err = retry.OnError( + wait.Backoff{Duration: time.Second, Factor: 1, Jitter: 0, Steps: 300}, + func(err error) bool { return err != nil }, + func() error { + var resp *http.Response + resp, err = http.DefaultClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode != 200 { + if resp.Body != nil { + defer resp.Body.Close() + all, _ := io.ReadAll(resp.Body) + return fmt.Errorf("status code is %s, conetent: %v", resp.Status, string(all)) + } + return fmt.Errorf("status code is %s", resp.Status) + } + defer resp.Body.Close() + if keyword != "" { + content, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + if string(content) != keyword { + return fmt.Errorf("response=%s, want: %s", string(content), keyword) + } + return nil + } + return nil + }, + ) + if err != nil { + kubectl(t) + t.Fatal(err) + } +} + +func healthCheckServiceAuthors(t *testing.T) { + var app = "authors" + ip, err := getServiceIP(app) + if err != nil { + t.Fatal(err) + } + endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080) + healthChecker(t, endpoint, nil, "") +} + +func shortDomainAuthors(t *testing.T) { + var app = "authors" + endpoint := fmt.Sprintf("http://%s:%v/health", app, 9080) + healthChecker(t, endpoint, nil, "") +} + +func fullDomainAuthors(t *testing.T) { + var app = "authors" domains := []string{ fmt.Sprintf("%s.%s.svc.cluster.local", app, namespace), fmt.Sprintf("%s.%s.svc", app, namespace), @@ -195,67 +184,112 @@ func fullDomain(t *testing.T) { } for _, domain := range domains { - port := serviceList.Items[0].Spec.Ports[0].Port - endpoint := fmt.Sprintf("http://%s:%v/health", domain, port) - var req *http.Request - req, err = http.NewRequest("GET", endpoint, nil) - if err != nil { - t.Fatal(err) - } - var res *http.Response - err = retry.OnError( - wait.Backoff{Duration: time.Second, Factor: 2, Jitter: 0.2, Steps: 5}, - func(err error) bool { - return err != nil - }, - func() error { - res, err = http.DefaultClient.Do(req) - return err - }, - ) - if err != nil { - t.Fatal(err) - } - if res == nil || res.StatusCode != 200 { - t.Fatal("health check not pass") - } + endpoint := fmt.Sprintf("http://%s:%v/health", domain, 9080) + healthChecker(t, endpoint, nil, "") } } -func dialUDP(t *testing.T) { - port, _ := util.GetAvailableUDPPortOrDie() - go server(port) +func serviceMeshReviewsPodIP(t *testing.T) { + app := "reviews" + ip, err := getPodIP(app) + if err != nil { + t.Fatal(err) + } + endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080) + healthChecker(t, endpoint, nil, remote) + healthChecker(t, endpoint, map[string]string{"env": "test"}, local) +} - list, err := clientset.CoreV1().Pods(namespace).List(context.Background(), v1.ListOptions{ - LabelSelector: fields.OneTermEqualSelector("app", "reviews").String(), +func serviceMeshReviewsServiceIP(t *testing.T) { + app := "reviews" + ip, err := getServiceIP(app) + if err != nil { + t.Fatal(err) + } + endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080) + healthChecker(t, endpoint, nil, remote) + healthChecker(t, endpoint, map[string]string{"env": "test"}, local) +} + +func getServiceIP(app string) (string, error) { + serviceList, err := clientset.CoreV1().Services(namespace).List(context.Background(), v1.ListOptions{ + LabelSelector: fields.OneTermEqualSelector("app", app).String(), }) if err != nil { - t.Error(err) + return "", err } var ip string - for _, item := range list.Items { - if item.DeletionTimestamp == nil && item.Status.Phase == corev1.PodRunning { - ip = item.Status.PodIP - break + for _, item := range serviceList.Items { + ip = item.Spec.ClusterIP + if ip != "" { + return ip, nil } } - if len(ip) == 0 { - t.Errorf("Failed to found pods for service reviews") - return + return "", fmt.Errorf("failed to found service ip for service %s", app) +} + +func proxyServiceReviewsPodIP(t *testing.T) { + app := "reviews" + ip, err := getPodIP(app) + if err != nil { + t.Fatal(err) + } + endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080) + healthChecker(t, endpoint, nil, local) + healthChecker(t, endpoint, map[string]string{"env": "test"}, local) +} + +func getPodIP(app string) (string, error) { + list, err := clientset.CoreV1().Pods(namespace).List(context.Background(), v1.ListOptions{ + LabelSelector: fields.OneTermEqualSelector("app", app).String(), + }) + if err != nil { + return "", err + } + for _, pod := range list.Items { + if pod.DeletionTimestamp == nil && + pod.Status.Phase == corev1.PodRunning && pod.Status.PodIP != "" { + return pod.Status.PodIP, nil + } + } + return "", fmt.Errorf("failed to found pod ip for service %s", app) +} + +func proxyServiceReviewsServiceIP(t *testing.T) { + app := "reviews" + ip, err := getServiceIP(app) + if err != nil { + t.Fatal(err) + } + endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080) + healthChecker(t, endpoint, nil, local) + healthChecker(t, endpoint, map[string]string{"env": "test"}, local) +} + +func testUDP(t *testing.T) { + app := "reviews" + port, _ := util.GetAvailableUDPPortOrDie() + go udpServer(port) + + ip, err := getPodIP(app) + if err != nil { + t.Fatal(err) } log.Printf("Dail udp to IP: %s", ip) - if err = retry.OnError( + err = retry.OnError( wait.Backoff{Duration: time.Second, Factor: 2, Jitter: 0.2, Steps: 5}, func(err error) bool { return err != nil - }, func() error { - return udpclient(ip, port) - }); err != nil { - t.Errorf("Failed to access pod IP: %s, port: %v", ip, port) + }, + func() error { + return udpClient(ip, port) + }) + if err != nil { + t.Fatalf("Failed to access pod IP: %s, port: %v", ip, port) } } -func udpclient(ip string, port int) error { +func udpClient(ip string, port int) error { udpConn, err := net.DialUDP("udp4", nil, &net.UDPAddr{ IP: net.ParseIP(ip), Port: port, @@ -291,7 +325,7 @@ func udpclient(ip string, port int) error { return nil } -func server(port int) { +func udpServer(port int) { // 创建监听 udpConn, err := net.ListenUDP("udp4", &net.UDPAddr{ IP: net.IPv4(0, 0, 0, 0), @@ -322,31 +356,101 @@ func server(port int) { } func kubevpnConnect(t *testing.T) { - cmd := exec.Command("kubevpn", "proxy", "--debug", "deployments/reviews") - check := func(log string) bool { - t.Log(util.PrintStr(log)) - return false - } - stdout, stderr, err := util.RunWithRollingOutWithChecker(cmd, check) + cmd := exec.Command("kubevpn", "connect", "--debug") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() if err != nil { - t.Log(stdout, stderr) - t.Error(err) - t.Fail() - return + t.Fatal(err) + } +} + +func kubevpnProxy(t *testing.T) { + cmd := exec.Command("kubevpn", "proxy", "deployments/reviews", "--debug") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) + } +} + +func kubevpnProxyWithServiceMesh(t *testing.T) { + cmd := exec.Command("kubevpn", "proxy", "deployments/reviews", "--headers", "env=test", "--debug") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) + } +} + +func kubevpnProxyWithServiceMeshAndGvisorMode(t *testing.T) { + cmd := exec.Command("kubevpn", "proxy", "deployments/reviews", "--headers", "env=test", "--netstack", "gvisor", "--debug") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) + } +} + +func kubevpnLeave(t *testing.T) { + cmd := exec.Command("kubevpn", "leave", "deployments/reviews") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) + } +} + +func kubevpnUninstall(t *testing.T) { + cmd := exec.Command("kubevpn", "uninstall", "kubevpn") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) } } func kubevpnStatus(t *testing.T) { cmd := exec.Command("kubevpn", "status") - stdout, stderr, err := util.RunWithRollingOutWithChecker(cmd, nil) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() if err != nil { - t.Log(stdout, stderr) - t.Error(err) - t.Fail() - return + t.Fatal(err) + } +} + +func kubevpnQuit(t *testing.T) { + cmd := exec.Command("kubevpn", "quit") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) + } +} + +func kubectl(t *testing.T) { + cmd := exec.Command("kubectl", "get", "pods", "-o", "wide") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + t.Fatal(err) + } + + cmd = exec.Command("kubectl", "get", "services", "-o", "wide") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err = cmd.Run() + if err != nil { + t.Fatal(err) } - t.Log(stdout) - t.Log(stderr) } func Init() { @@ -365,53 +469,19 @@ func Init() { if namespace, _, err = f.ToRawKubeConfigLoader().Namespace(); err != nil { plog.G(context.Background()).Fatal(err) } + + go startupHttpServer(local) } -func TestWaitBackoff(t *testing.T) { - var last = time.Now() - _ = retry.OnError( - wait.Backoff{ - Steps: 10, - Duration: time.Millisecond * 50, - }, func(err error) bool { - return err != nil - }, func() error { - now := time.Now() - fmt.Println(now.Sub(last).String()) - last = now - return fmt.Errorf("") - }) -} +func startupHttpServer(str string) { + var health = func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(str)) + } -func TestArray(t *testing.T) { - s := []int{1, 2, 3, 1, 2, 3, 1, 2, 3} - for i := 0; i < 3; i++ { - ints := s[i*3 : i*3+3] - println(ints[0], ints[1], ints[2]) + http.HandleFunc("/", health) + http.HandleFunc("/health", health) + log.Println("Start listening http port 9080 ...") + if err := http.ListenAndServe(":9080", nil); err != nil { + panic(err) } } -func TestPatch(t *testing.T) { - var p = corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{HTTPGet: &corev1.HTTPGetAction{ - Path: "/health", - Port: intstr.FromInt32(9080), - Scheme: "HTTP", - }}, - } - marshal, err := json2.Marshal(p) - if err != nil { - panic(err) - } - fmt.Println(string(marshal)) - - var pp corev1.Probe - err = json2.Unmarshal(marshal, &pp) - if err != nil { - panic(err) - } - bytes, err := yaml.Marshal(pp) - if err != nil { - panic(err) - } - fmt.Println(string(bytes)) -} diff --git a/pkg/util/cidr_test.go b/pkg/util/cidr_test.go index 79f1902e..f2d3399a 100644 --- a/pkg/util/cidr_test.go +++ b/pkg/util/cidr_test.go @@ -2,13 +2,20 @@ package util import ( "context" + "encoding/json" "fmt" "testing" + "time" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/util/retry" "k8s.io/kubectl/pkg/cmd/util" + "sigs.k8s.io/yaml" plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" ) @@ -70,3 +77,53 @@ func TestElegant(t *testing.T) { fmt.Println(net.String()) } } + +func TestWaitBackoff(t *testing.T) { + var last = time.Now() + _ = retry.OnError( + wait.Backoff{ + Steps: 10, + Duration: time.Millisecond * 50, + }, func(err error) bool { + return err != nil + }, func() error { + now := time.Now() + fmt.Println(now.Sub(last).String()) + last = now + return fmt.Errorf("") + }) +} + +func TestArray(t *testing.T) { + s := []int{1, 2, 3, 1, 2, 3, 1, 2, 3} + for i := 0; i < 3; i++ { + ints := s[i*3 : i*3+3] + println(ints[0], ints[1], ints[2]) + } +} + +func TestPatch(t *testing.T) { + var p = v1.Probe{ + ProbeHandler: v1.ProbeHandler{HTTPGet: &v1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromInt32(9080), + Scheme: "HTTP", + }}, + } + marshal, err := json.Marshal(p) + if err != nil { + panic(err) + } + fmt.Println(string(marshal)) + + var pp v1.Probe + err = json.Unmarshal(marshal, &pp) + if err != nil { + panic(err) + } + bytes, err := yaml.Marshal(pp) + if err != nil { + panic(err) + } + fmt.Println(string(bytes)) +}