feat: sync mode modify api server & add ut for mode run and mode sync (#704)

* feat: mode sync modify kubeconfig apiserver to in cluster apiserver
* feat: add ut for sync mode and run mode
* fix: bugs
This commit is contained in:
naison
2025-08-21 22:45:47 +08:00
committed by GitHub
parent 4df63d1642
commit 98c4a61ca1
29 changed files with 1787 additions and 393 deletions

View File

@@ -117,7 +117,11 @@ func (svr *Server) Sync(resp rpc.Daemon_SyncServer) (err error) {
}
logger.Infof("Sync workloads...")
options.SetContext(sshCtx)
err = options.DoSync(plog.WithLogger(sshCtx, logger), []byte(req.KubeconfigBytes), req.Image)
newKubeconfigBytes, err := options.ConvertApiServerToNodeIP(resp.Context(), []byte(req.KubeconfigBytes))
if err != nil {
return err
}
err = options.DoSync(plog.WithLogger(sshCtx, logger), newKubeconfigBytes, req.Image)
if err != nil {
plog.G(context.Background()).Errorf("Sync workloads failed: %v", err)
return err

View File

@@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"reflect"
"runtime"
"sync"
"testing"
"time"
@@ -30,101 +31,150 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
var (
type ut struct {
namespace string
clientset *kubernetes.Clientset
restconfig *rest.Config
)
}
const (
local = `{"status": "Reviews is healthy on local pc"}`
local8080 = `{"status": "Reviews is healthy on local pc 8080"}`
remote = `{"status": "Reviews is healthy"}`
local = `{"status": "Reviews is healthy on local pc"}`
local8080 = `{"status": "Reviews is healthy on local pc 8080"}`
remote = `{"status": "Reviews is healthy"}`
remoteSyncOrigin = `{"status":"Authors is healthy"}`
remoteSyncPod = `{"status":"Authors is healthy in pod"}`
)
func TestFunctions(t *testing.T) {
u := &ut{}
// 1) test connect
t.Run("init", Init)
t.Run("kubevpnConnect", kubevpnConnect)
t.Run("commonTest", commonTest)
t.Run("checkConnectStatus", checkConnectStatus)
t.Run("init", u.init)
t.Run("kubevpnConnect", u.kubevpnConnect)
t.Run("commonTest", u.commonTest)
t.Run("checkConnectStatus", u.checkConnectStatus)
// 2) test proxy mode
t.Run("kubevpnProxy", kubevpnProxy)
t.Run("commonTest", commonTest)
t.Run("testUDP", testUDP)
t.Run("proxyServiceReviewsServiceIP", proxyServiceReviewsServiceIP)
t.Run("proxyServiceReviewsPodIP", proxyServiceReviewsPodIP)
t.Run("checkProxyStatus", checkProxyStatus)
t.Run("kubevpnProxy", u.kubevpnProxy)
t.Run("commonTest", u.commonTest)
t.Run("testUDP", u.testUDP)
t.Run("proxyServiceReviewsServiceIP", u.proxyServiceReviewsServiceIP)
t.Run("proxyServiceReviewsPodIP", u.proxyServiceReviewsPodIP)
t.Run("checkProxyStatus", u.checkProxyStatus)
// 3) test proxy mode with service mesh
t.Run("kubevpnLeave", kubevpnLeave)
t.Run("kubevpnProxyWithServiceMesh", kubevpnProxyWithServiceMesh)
t.Run("commonTest", commonTest)
t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP)
t.Run("serviceMeshReviewsPodIP", serviceMeshReviewsPodIP)
t.Run("checkProxyWithServiceMeshStatus", checkProxyWithServiceMeshStatus)
t.Run("kubevpnLeave", u.kubevpnLeave)
t.Run("kubevpnProxyWithServiceMesh", u.kubevpnProxyWithServiceMesh)
t.Run("commonTest", u.commonTest)
t.Run("serviceMeshReviewsServiceIP", u.serviceMeshReviewsServiceIP)
t.Run("serviceMeshReviewsPodIP", u.serviceMeshReviewsPodIP)
t.Run("checkProxyWithServiceMeshStatus", u.checkProxyWithServiceMeshStatus)
// 4) test proxy mode with service mesh and gvisor
t.Run("kubevpnLeave", kubevpnLeave)
t.Run("kubevpnUninstall", kubevpnUninstall)
t.Run("kubevpnProxyWithServiceMeshAndGvisorMode", kubevpnProxyWithServiceMeshAndGvisorMode)
t.Run("commonTest", commonTest)
t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP)
t.Run("checkProxyWithServiceMeshAndGvisorStatus", checkProxyWithServiceMeshAndGvisorStatus)
t.Run("kubevpnLeaveService", kubevpnLeaveService)
t.Run("kubevpnQuit", kubevpnQuit)
t.Run("kubevpnLeave", u.kubevpnLeave)
t.Run("kubevpnUninstall", u.kubevpnUninstall)
t.Run("kubevpnProxyWithServiceMeshAndFargateMode", u.kubevpnProxyWithServiceMeshAndFargateMode)
t.Run("commonTest", u.commonTest)
t.Run("serviceMeshReviewsServiceIP", u.serviceMeshReviewsServiceIP)
t.Run("checkProxyWithServiceMeshAndGvisorStatus", u.checkProxyWithServiceMeshAndGvisorStatus)
t.Run("kubevpnLeaveService", u.kubevpnLeaveService)
t.Run("kubevpnQuit", u.kubevpnQuit)
// 5) install centrally in ns test -- connect mode
t.Run("centerKubevpnUninstall", kubevpnUninstall)
t.Run("centerKubevpnInstallInNsKubevpn", kubevpnConnectToNsKubevpn)
t.Run("centerKubevpnConnect", kubevpnConnect)
t.Run("checkServiceShouldNotInNsDefault", checkServiceShouldNotInNsDefault)
t.Run("centerCheckConnectStatus", centerCheckConnectStatus)
t.Run("centerCommonTest", commonTest)
// 5) test mode sync
t.Run("deleteDeployForSaveResource", u.deleteDeployForSaveResource)
t.Run("kubevpnSyncWithFullProxy", u.kubevpnSyncWithFullProxy)
t.Run("kubevpnSyncWithFullProxyStatus", u.checkSyncWithFullProxyStatus)
t.Run("commonTest", u.commonTest)
t.Run("kubevpnUnSync", u.kubevpnUnSync)
t.Run("kubevpnSyncWithServiceMesh", u.kubevpnSyncWithServiceMesh)
t.Run("kubevpnSyncWithServiceMeshStatus", u.checkSyncWithServiceMeshStatus)
t.Run("commonTest", u.commonTest)
t.Run("kubevpnUnSync", u.kubevpnUnSync)
// 6) install centrally in ns test -- proxy mode
t.Run("centerKubevpnProxy", kubevpnProxy)
t.Run("checkServiceShouldNotInNsDefault", checkServiceShouldNotInNsDefault)
t.Run("centerCommonTest", commonTest)
t.Run("centerTestUDP", testUDP)
t.Run("centerProxyServiceReviewsServiceIP", proxyServiceReviewsServiceIP)
t.Run("centerProxyServiceReviewsPodIP", proxyServiceReviewsPodIP)
t.Run("centerCheckProxyStatus", centerCheckProxyStatus)
// 6) test mode run
// because of:
// Run container with cmd: [docker run --env-file /tmp/623917040.env --domainname --workdir --cap-add SYS_PTRACE --cap-add SYS_ADMIN --cap-add SYS_PTRACE --cap-add SYS_ADMIN --security-opt apparmor=unconfined --security-opt seccomp=unconfined --pull missing --name default_authors_716db --user root --env LC_ALL=C.UTF-8 --label app=authors --volume /tmp/329021857635767916:/var/run/secrets/kubernetes.io/serviceaccount --network container:default_nginx_45ee1 --pid container:default_nginx_45ee1 --pull missing --attach STDIN --attach STDOUT --attach STDERR --interactive --privileged --volume /tmp/TestFunctionskubevpnRunWithFullProxy2095435677/001:/app/test --rm --entrypoint go ghcr.io/kubenetworks/authors:latest run /app/test/main.go]
// Error: stat /app/test/main.go: no such file or directory
if runtime.GOOS != "darwin" {
t.Run("resetDeployAuthors", u.resetDeployAuthors)
t.Run("kubevpnRunWithFullProxy", u.kubevpnRunWithFullProxy)
t.Run("kubevpnRunWithServiceMesh", u.kubevpnRunWithServiceMesh)
t.Run("kubevpnQuit", u.kubevpnQuit)
}
// 7) install centrally in ns test -- proxy mode with service mesh
t.Run("kubevpnLeave", kubevpnLeave)
t.Run("kubevpnProxyWithServiceMesh", kubevpnProxyWithServiceMesh)
t.Run("checkServiceShouldNotInNsDefault", checkServiceShouldNotInNsDefault)
t.Run("commonTest", commonTest)
t.Run("serviceMeshReviewsServiceIP", serviceMeshReviewsServiceIP)
t.Run("serviceMeshReviewsPodIP", serviceMeshReviewsPodIP)
t.Run("centerCheckProxyWithServiceMeshStatus", centerCheckProxyWithServiceMeshStatus)
// 7) install centrally in ns test -- connect mode
t.Run("centerKubevpnUninstall", u.kubevpnUninstall)
t.Run("centerKubevpnInstallInNsKubevpn", u.kubevpnConnectToNsKubevpn)
t.Run("centerKubevpnConnect", u.kubevpnConnect)
t.Run("checkServiceShouldNotInNsDefault", u.checkServiceShouldNotInNsDefault)
t.Run("centerCheckConnectStatus", u.centerCheckConnectStatus)
t.Run("centerCommonTest", u.commonTest)
// 8) install centrally in ns test -- proxy mode with service mesh and gvisor
t.Run("kubevpnQuit", kubevpnQuit)
t.Run("kubevpnProxyWithServiceMeshAndK8sServicePortMap", kubevpnProxyWithServiceMeshAndK8sServicePortMap)
t.Run("checkServiceShouldNotInNsDefault", checkServiceShouldNotInNsDefault)
t.Run("commonTest", commonTest)
t.Run("serviceMeshReviewsServiceIPPortMap", serviceMeshReviewsServiceIPPortMap)
t.Run("kubevpnLeave", kubevpnLeave)
t.Run("centerCheckProxyWithServiceMeshAndGvisorStatus", centerCheckProxyWithServiceMeshAndGvisorStatus)
t.Run("kubevpnLeaveService", kubevpnLeaveService)
t.Run("kubevpnQuit", kubevpnQuit)
// 8) install centrally in ns test -- proxy mode
t.Run("centerKubevpnProxy", u.kubevpnProxy)
t.Run("checkServiceShouldNotInNsDefault", u.checkServiceShouldNotInNsDefault)
t.Run("centerCommonTest", u.commonTest)
t.Run("centerTestUDP", u.testUDP)
t.Run("centerProxyServiceReviewsServiceIP", u.proxyServiceReviewsServiceIP)
t.Run("centerProxyServiceReviewsPodIP", u.proxyServiceReviewsPodIP)
t.Run("centerCheckProxyStatus", u.centerCheckProxyStatus)
// 9) install centrally in ns test -- proxy mode with service mesh
t.Run("kubevpnLeave", u.kubevpnLeave)
t.Run("kubevpnProxyWithServiceMesh", u.kubevpnProxyWithServiceMesh)
t.Run("checkServiceShouldNotInNsDefault", u.checkServiceShouldNotInNsDefault)
t.Run("commonTest", u.commonTest)
t.Run("serviceMeshReviewsServiceIP", u.serviceMeshReviewsServiceIP)
t.Run("serviceMeshReviewsPodIP", u.serviceMeshReviewsPodIP)
t.Run("centerCheckProxyWithServiceMeshStatus", u.centerCheckProxyWithServiceMeshStatus)
// 10) install centrally in ns test -- proxy mode with service mesh and gvisor
t.Run("kubevpnQuit", u.kubevpnQuit)
t.Run("kubevpnProxyWithServiceMeshAndK8sServicePortMap", u.kubevpnProxyWithServiceMeshAndK8sServicePortMap)
t.Run("checkServiceShouldNotInNsDefault", u.checkServiceShouldNotInNsDefault)
t.Run("commonTest", u.commonTest)
t.Run("serviceMeshReviewsServiceIPPortMap", u.serviceMeshReviewsServiceIPPortMap)
t.Run("kubevpnLeave", u.kubevpnLeave)
t.Run("centerCheckProxyWithServiceMeshAndGvisorStatus", u.centerCheckProxyWithServiceMeshAndGvisorStatus)
t.Run("kubevpnLeaveService", u.kubevpnLeaveService)
t.Run("kubevpnQuit", u.kubevpnQuit)
// 11) test mode sync
t.Run("kubevpnSyncWithFullProxy", u.kubevpnSyncWithFullProxy)
t.Run("checkServiceShouldNotInNsDefault", u.checkServiceShouldNotInNsDefault)
t.Run("kubevpnSyncWithFullProxyStatus", u.checkSyncWithFullProxyStatus)
t.Run("commonTest", u.commonTest)
t.Run("kubevpnUnSync", u.kubevpnUnSync)
t.Run("kubevpnSyncWithServiceMesh", u.kubevpnSyncWithServiceMesh)
t.Run("checkServiceShouldNotInNsDefault", u.checkServiceShouldNotInNsDefault)
t.Run("kubevpnSyncWithServiceMeshStatus", u.checkSyncWithServiceMeshStatus)
t.Run("commonTest", u.commonTest)
t.Run("kubevpnUnSync", u.kubevpnUnSync)
t.Run("kubevpnQuit", u.kubevpnQuit)
// 12) test mode run
// because of:
// Run container with cmd: [docker run --env-file /tmp/623917040.env --domainname --workdir --cap-add SYS_PTRACE --cap-add SYS_ADMIN --cap-add SYS_PTRACE --cap-add SYS_ADMIN --security-opt apparmor=unconfined --security-opt seccomp=unconfined --pull missing --name default_authors_716db --user root --env LC_ALL=C.UTF-8 --label app=authors --volume /tmp/329021857635767916:/var/run/secrets/kubernetes.io/serviceaccount --network container:default_nginx_45ee1 --pid container:default_nginx_45ee1 --pull missing --attach STDIN --attach STDOUT --attach STDERR --interactive --privileged --volume /tmp/TestFunctionskubevpnRunWithFullProxy2095435677/001:/app/test --rm --entrypoint go ghcr.io/kubenetworks/authors:latest run /app/test/main.go]
// Error: stat /app/test/main.go: no such file or directory
if runtime.GOOS != "darwin" {
t.Run("resetDeployAuthors", u.resetDeployAuthors)
t.Run("kubevpnRunWithFullProxy", u.kubevpnRunWithFullProxy)
t.Run("kubevpnRunWithServiceMesh", u.kubevpnRunWithServiceMesh)
t.Run("kubevpnQuit", u.kubevpnQuit)
}
}
func commonTest(t *testing.T) {
func (u *ut) commonTest(t *testing.T) {
// 1) test domain access
t.Run("kubevpnStatus", kubevpnStatus)
t.Run("pingPodIP", pingPodIP)
t.Run("healthCheckPodDetails", healthCheckPodDetails)
t.Run("healthCheckServiceDetails", healthCheckServiceDetails)
t.Run("shortDomainDetails", shortDomainDetails)
t.Run("fullDomainDetails", fullDomainDetails)
t.Run("kubevpnStatus", u.kubevpnStatus)
t.Run("pingPodIP", u.pingPodIP)
t.Run("healthCheckPodDetails", u.healthCheckPodDetails)
t.Run("healthCheckServiceDetails", u.healthCheckServiceDetails)
t.Run("shortDomainDetails", u.shortDomainDetails)
t.Run("fullDomainDetails", u.fullDomainDetails)
}
func pingPodIP(t *testing.T) {
list, err := clientset.CoreV1().Pods(namespace).List(context.Background(), v1.ListOptions{})
func (u *ut) pingPodIP(t *testing.T) {
list, err := u.clientset.CoreV1().Pods(u.namespace).List(context.Background(), v1.ListOptions{})
if err != nil {
t.Fatal(err)
}
@@ -149,23 +199,23 @@ func pingPodIP(t *testing.T) {
}
}
t.Errorf("Failed to ping IP: %s of pod: %s", item.Status.PodIP, item.Name)
kubectl(t)
u.kubectl(t)
}()
}
wg.Wait()
}
func healthCheckPodDetails(t *testing.T) {
func (u *ut) healthCheckPodDetails(t *testing.T) {
var app = "details"
ip, err := getPodIP(app)
ip, err := u.getPodIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080)
healthChecker(t, endpoint, nil, "")
u.healthChecker(t, endpoint, nil, "")
}
func healthChecker(t *testing.T, endpoint string, header map[string]string, keyword string) {
func (u *ut) healthChecker(t *testing.T, endpoint string, header map[string]string, keyword string) {
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
t.Fatal(err)
@@ -176,7 +226,7 @@ func healthChecker(t *testing.T, endpoint string, header map[string]string, keyw
client := &http.Client{Timeout: time.Second * 1}
err = retry.OnError(
wait.Backoff{Duration: time.Second, Factor: 1, Jitter: 0, Steps: 120},
wait.Backoff{Duration: time.Second, Factor: 1, Jitter: 0, Steps: 240},
func(err error) bool { return err != nil },
func() error {
var resp *http.Response
@@ -208,76 +258,76 @@ func healthChecker(t *testing.T, endpoint string, header map[string]string, keyw
},
)
if err != nil {
kubectl(t)
u.kubectl(t)
t.Fatal(err)
}
}
func healthCheckServiceDetails(t *testing.T) {
func (u *ut) healthCheckServiceDetails(t *testing.T) {
var app = "details"
ip, err := getServiceIP(app)
ip, err := u.getServiceIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080)
healthChecker(t, endpoint, nil, "")
u.healthChecker(t, endpoint, nil, "")
}
func shortDomainDetails(t *testing.T) {
func (u *ut) shortDomainDetails(t *testing.T) {
var app = "details"
endpoint := fmt.Sprintf("http://%s:%v/health", app, 9080)
healthChecker(t, endpoint, nil, "")
u.healthChecker(t, endpoint, nil, "")
}
func fullDomainDetails(t *testing.T) {
func (u *ut) fullDomainDetails(t *testing.T) {
var app = "details"
domains := []string{
fmt.Sprintf("%s.%s.svc.cluster.local", app, namespace),
fmt.Sprintf("%s.%s.svc", app, namespace),
fmt.Sprintf("%s.%s", app, namespace),
fmt.Sprintf("%s.%s.svc.cluster.local", app, u.namespace),
fmt.Sprintf("%s.%s.svc", app, u.namespace),
fmt.Sprintf("%s.%s", app, u.namespace),
}
for _, domain := range domains {
endpoint := fmt.Sprintf("http://%s:%v/health", domain, 9080)
healthChecker(t, endpoint, nil, "")
u.healthChecker(t, endpoint, nil, "")
}
}
func serviceMeshReviewsPodIP(t *testing.T) {
func (u *ut) serviceMeshReviewsPodIP(t *testing.T) {
app := "reviews"
ip, err := getPodIP(app)
ip, err := u.getPodIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080)
healthChecker(t, endpoint, nil, remote)
healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
u.healthChecker(t, endpoint, nil, remote)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
}
func serviceMeshReviewsServiceIP(t *testing.T) {
func (u *ut) serviceMeshReviewsServiceIP(t *testing.T) {
app := "reviews"
ip, err := getServiceIP(app)
ip, err := u.getServiceIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080)
healthChecker(t, endpoint, nil, remote)
healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
u.healthChecker(t, endpoint, nil, remote)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
}
func serviceMeshReviewsServiceIPPortMap(t *testing.T) {
func (u *ut) serviceMeshReviewsServiceIPPortMap(t *testing.T) {
app := "reviews"
ip, err := getServiceIP(app)
ip, err := u.getServiceIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080)
healthChecker(t, endpoint, nil, remote)
healthChecker(t, endpoint, map[string]string{"env": "test"}, local8080)
u.healthChecker(t, endpoint, nil, remote)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, local8080)
}
func getServiceIP(app string) (string, error) {
serviceList, err := clientset.CoreV1().Services(namespace).List(context.Background(), v1.ListOptions{
func (u *ut) getServiceIP(app string) (string, error) {
serviceList, err := u.clientset.CoreV1().Services(u.namespace).List(context.Background(), v1.ListOptions{
LabelSelector: fields.OneTermEqualSelector("app", app).String(),
})
if err != nil {
@@ -293,19 +343,19 @@ func getServiceIP(app string) (string, error) {
return "", fmt.Errorf("failed to found service ip for service %s", app)
}
func proxyServiceReviewsPodIP(t *testing.T) {
func (u *ut) proxyServiceReviewsPodIP(t *testing.T) {
app := "reviews"
ip, err := getPodIP(app)
ip, err := u.getPodIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080)
healthChecker(t, endpoint, nil, local)
healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
u.healthChecker(t, endpoint, nil, local)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
}
func getPodIP(app string) (string, error) {
list, err := clientset.CoreV1().Pods(namespace).List(context.Background(), v1.ListOptions{
func (u *ut) getPodIP(app string) (string, error) {
list, err := u.clientset.CoreV1().Pods(u.namespace).List(context.Background(), v1.ListOptions{
LabelSelector: fields.OneTermEqualSelector("app", app).String(),
})
if err != nil {
@@ -320,24 +370,24 @@ func getPodIP(app string) (string, error) {
return "", fmt.Errorf("failed to found pod ip for service %s", app)
}
func proxyServiceReviewsServiceIP(t *testing.T) {
func (u *ut) proxyServiceReviewsServiceIP(t *testing.T) {
app := "reviews"
ip, err := getServiceIP(app)
ip, err := u.getServiceIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, 9080)
healthChecker(t, endpoint, nil, local)
healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
u.healthChecker(t, endpoint, nil, local)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
}
func testUDP(t *testing.T) {
func (u *ut) testUDP(t *testing.T) {
app := "reviews"
port, err := util.GetAvailableUDPPortOrDie()
if err != nil {
t.Fatal(err)
}
go udpServer(t, port)
go u.udpServer(t, port)
var ip string
err = retry.OnError(
@@ -346,19 +396,19 @@ func testUDP(t *testing.T) {
return err != nil
},
func() error {
ip, err = getPodIP(app)
ip, err = u.getPodIP(app)
if err != nil {
t.Fatal(err)
}
t.Logf("Dail udp to IP: %s", ip)
return udpClient(t, ip, port)
return u.udpClient(t, ip, port)
})
if err != nil {
t.Fatalf("Failed to access pod IP: %s, port: %v", ip, port)
}
}
func udpClient(t *testing.T, ip string, port int) error {
func (u *ut) udpClient(t *testing.T, ip string, port int) error {
udpConn, err := net.DialUDP("udp4", nil, &net.UDPAddr{
IP: net.ParseIP(ip),
Port: port,
@@ -390,7 +440,7 @@ func udpClient(t *testing.T, ip string, port int) error {
return nil
}
func udpServer(t *testing.T, port int) {
func (u *ut) udpServer(t *testing.T, port int) {
// 创建监听
udpConn, err := net.ListenUDP("udp4", &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
@@ -420,7 +470,7 @@ func udpServer(t *testing.T, port int) {
}
}
func kubevpnConnect(t *testing.T) {
func (u *ut) kubevpnConnect(t *testing.T) {
cmd := exec.Command("kubevpn", "connect", "--debug")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -430,8 +480,8 @@ func kubevpnConnect(t *testing.T) {
}
}
func kubevpnConnectToNsKubevpn(t *testing.T) {
_, err := clientset.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{
func (u *ut) kubevpnConnectToNsKubevpn(t *testing.T) {
_, err := u.clientset.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{
ObjectMeta: v1.ObjectMeta{
Name: "kubevpn",
},
@@ -451,7 +501,7 @@ func kubevpnConnectToNsKubevpn(t *testing.T) {
}
}
func kubevpnProxy(t *testing.T) {
func (u *ut) kubevpnProxy(t *testing.T) {
cmd := exec.Command("kubevpn", "proxy", "deployments/reviews", "--debug")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -461,7 +511,7 @@ func kubevpnProxy(t *testing.T) {
}
}
func kubevpnProxyWithServiceMesh(t *testing.T) {
func (u *ut) kubevpnProxyWithServiceMesh(t *testing.T) {
cmd := exec.Command("kubevpn", "proxy", "deployments/reviews", "--headers", "env=test", "--debug")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -471,7 +521,7 @@ func kubevpnProxyWithServiceMesh(t *testing.T) {
}
}
func kubevpnProxyWithServiceMeshAndGvisorMode(t *testing.T) {
func (u *ut) kubevpnProxyWithServiceMeshAndFargateMode(t *testing.T) {
cmd := exec.Command("kubevpn", "proxy", "svc/reviews", "--headers", "env=test", "--debug")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -481,7 +531,7 @@ func kubevpnProxyWithServiceMeshAndGvisorMode(t *testing.T) {
}
}
func kubevpnProxyWithServiceMeshAndK8sServicePortMap(t *testing.T) {
func (u *ut) kubevpnProxyWithServiceMeshAndK8sServicePortMap(t *testing.T) {
cmd := exec.Command("kubevpn", "proxy", "svc/reviews", "--headers", "env=test", "--debug", "--portmap", "9080:8080")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -491,7 +541,7 @@ func kubevpnProxyWithServiceMeshAndK8sServicePortMap(t *testing.T) {
}
}
func kubevpnLeave(t *testing.T) {
func (u *ut) kubevpnLeave(t *testing.T) {
cmd := exec.Command("kubevpn", "leave", "deployments/reviews")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -501,7 +551,7 @@ func kubevpnLeave(t *testing.T) {
}
}
func kubevpnLeaveService(t *testing.T) {
func (u *ut) kubevpnLeaveService(t *testing.T) {
cmd := exec.Command("kubevpn", "leave", "services/reviews")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -511,7 +561,7 @@ func kubevpnLeaveService(t *testing.T) {
}
}
func checkConnectStatus(t *testing.T) {
func (u *ut) checkConnectStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
@@ -519,7 +569,7 @@ func checkConnectStatus(t *testing.T) {
}
expect := status{List: []*connection{{
Namespace: namespace,
Namespace: u.namespace,
Status: "connected",
ProxyList: nil,
}}}
@@ -533,7 +583,7 @@ func checkConnectStatus(t *testing.T) {
t.Fatalf("expect: %s, but was: %s", string(marshal), string(output))
}
}
func centerCheckConnectStatus(t *testing.T) {
func (u *ut) centerCheckConnectStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
@@ -563,20 +613,29 @@ type status struct {
type connection struct {
Namespace string
Status string
ProxyList []*proxy
ProxyList []*proxyItem
SyncList []*syncItem
}
type proxy struct {
type proxyItem struct {
Namespace string
Workload string
RuleList []*rule
RuleList []*proxyRule
}
type rule struct {
type proxyRule struct {
Headers map[string]string
CurrentDevice bool
PortMap map[int32]int32
}
type syncItem struct {
Namespace string
Workload string
RuleList []*syncRule
}
type syncRule struct {
DstWorkload string
}
func checkProxyStatus(t *testing.T) {
func (u *ut) checkProxyStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
@@ -584,12 +643,12 @@ func checkProxyStatus(t *testing.T) {
}
expect := status{List: []*connection{{
Namespace: namespace,
Namespace: u.namespace,
Status: "connected",
ProxyList: []*proxy{{
Namespace: namespace,
ProxyList: []*proxyItem{{
Namespace: u.namespace,
Workload: "deployments.apps/reviews",
RuleList: []*rule{{
RuleList: []*proxyRule{{
Headers: nil,
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
@@ -607,7 +666,7 @@ func checkProxyStatus(t *testing.T) {
}
}
func centerCheckProxyStatus(t *testing.T) {
func (u *ut) centerCheckProxyStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
@@ -617,10 +676,10 @@ func centerCheckProxyStatus(t *testing.T) {
expect := status{List: []*connection{{
Namespace: "default",
Status: "connected",
ProxyList: []*proxy{{
ProxyList: []*proxyItem{{
Namespace: "default",
Workload: "deployments.apps/reviews",
RuleList: []*rule{{
RuleList: []*proxyRule{{
Headers: nil,
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
@@ -638,7 +697,7 @@ func centerCheckProxyStatus(t *testing.T) {
}
}
func checkProxyWithServiceMeshStatus(t *testing.T) {
func (u *ut) checkProxyWithServiceMeshStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
@@ -646,12 +705,12 @@ func checkProxyWithServiceMeshStatus(t *testing.T) {
}
expect := status{List: []*connection{{
Namespace: namespace,
Namespace: u.namespace,
Status: "connected",
ProxyList: []*proxy{{
Namespace: namespace,
ProxyList: []*proxyItem{{
Namespace: u.namespace,
Workload: "deployments.apps/reviews",
RuleList: []*rule{{
RuleList: []*proxyRule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
@@ -669,7 +728,7 @@ func checkProxyWithServiceMeshStatus(t *testing.T) {
}
}
func centerCheckProxyWithServiceMeshStatus(t *testing.T) {
func (u *ut) centerCheckProxyWithServiceMeshStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
@@ -679,10 +738,10 @@ func centerCheckProxyWithServiceMeshStatus(t *testing.T) {
expect := status{List: []*connection{{
Namespace: "default",
Status: "connected",
ProxyList: []*proxy{{
ProxyList: []*proxyItem{{
Namespace: "default",
Workload: "deployments.apps/reviews",
RuleList: []*rule{{
RuleList: []*proxyRule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
@@ -700,7 +759,7 @@ func centerCheckProxyWithServiceMeshStatus(t *testing.T) {
}
}
func checkProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
func (u *ut) checkProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
@@ -708,12 +767,12 @@ func checkProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
}
expect := status{List: []*connection{{
Namespace: namespace,
Namespace: u.namespace,
Status: "connected",
ProxyList: []*proxy{{
Namespace: namespace,
ProxyList: []*proxyItem{{
Namespace: u.namespace,
Workload: "services/reviews",
RuleList: []*rule{{
RuleList: []*proxyRule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080},
@@ -738,7 +797,7 @@ func checkProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
}
}
func centerCheckProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
func (u *ut) centerCheckProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
@@ -748,10 +807,10 @@ func centerCheckProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
expect := status{List: []*connection{{
Namespace: "default",
Status: "connected",
ProxyList: []*proxy{{
ProxyList: []*proxyItem{{
Namespace: "default",
Workload: "services/reviews",
RuleList: []*rule{{
RuleList: []*proxyRule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 8080},
@@ -776,7 +835,7 @@ func centerCheckProxyWithServiceMeshAndGvisorStatus(t *testing.T) {
}
}
func kubevpnUninstall(t *testing.T) {
func (u *ut) kubevpnUninstall(t *testing.T) {
cmd := exec.Command("kubevpn", "uninstall", "kubevpn")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -786,7 +845,7 @@ func kubevpnUninstall(t *testing.T) {
}
}
func kubevpnStatus(t *testing.T) {
func (u *ut) kubevpnStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -796,7 +855,7 @@ func kubevpnStatus(t *testing.T) {
}
}
func kubevpnQuit(t *testing.T) {
func (u *ut) kubevpnQuit(t *testing.T) {
cmd := exec.Command("kubevpn", "quit")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
@@ -806,14 +865,14 @@ func kubevpnQuit(t *testing.T) {
}
}
func checkServiceShouldNotInNsDefault(t *testing.T) {
_, err := clientset.CoreV1().Services(namespace).Get(context.Background(), pkgconfig.ConfigMapPodTrafficManager, v1.GetOptions{})
func (u *ut) checkServiceShouldNotInNsDefault(t *testing.T) {
_, err := u.clientset.CoreV1().Services(u.namespace).Get(context.Background(), pkgconfig.ConfigMapPodTrafficManager, v1.GetOptions{})
if !k8serrors.IsNotFound(err) {
t.Fatal(err)
}
}
func kubectl(t *testing.T) {
func (u *ut) kubectl(t *testing.T) {
cmdGetPod := exec.Command("kubectl", "get", "pods", "-o", "wide")
cmdDescribePod := exec.Command("kubectl", "describe", "pods")
cmdGetSvc := exec.Command("kubectl", "get", "services", "-o", "wide")
@@ -829,27 +888,27 @@ func kubectl(t *testing.T) {
}
}
func Init(t *testing.T) {
func (u *ut) init(t *testing.T) {
var err error
configFlags := genericclioptions.NewConfigFlags(true)
f := cmdutil.NewFactory(cmdutil.NewMatchVersionFlags(configFlags))
if restconfig, err = f.ToRESTConfig(); err != nil {
if u.restconfig, err = f.ToRESTConfig(); err != nil {
t.Fatal(err)
}
if clientset, err = kubernetes.NewForConfig(restconfig); err != nil {
if u.clientset, err = kubernetes.NewForConfig(u.restconfig); err != nil {
t.Fatal(err)
}
if namespace, _, err = f.ToRawKubeConfigLoader().Namespace(); err != nil {
if u.namespace, _, err = f.ToRawKubeConfigLoader().Namespace(); err != nil {
t.Fatal(err)
}
go startupHttpServer(t, "localhost:9080", local)
go startupHttpServer(t, "localhost:8080", local8080)
go u.startupHttpServer(t, "localhost:9080", local)
go u.startupHttpServer(t, "localhost:8080", local8080)
}
func startupHttpServer(t *testing.T, addr, str string) {
func (u *ut) startupHttpServer(t *testing.T, addr, str string) {
var health = func(w http.ResponseWriter, r *http.Request) {
_, _ = w.Write([]byte(str))
}

View File

@@ -0,0 +1,240 @@
package handler
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (u *ut) deleteDeployForSaveResource(t *testing.T) {
for _, s := range []string{"deploy/productpage", "deploy/ratings"} {
cmd := exec.Command("kubectl", "delete", s, "--force")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
t.Fatal(err)
}
}
}
func (u *ut) resetDeployAuthors(t *testing.T) {
cmd := exec.Command("kubevpn", "reset", "deploy/authors")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
t.Fatalf("error resetting deploy/authors: %v", err)
}
}
func (u *ut) kubevpnRunWithFullProxy(t *testing.T) {
path := u.writeTempFile(t)
name := filepath.Base(path)
dir := filepath.Dir(path)
remoteDir := "/app/test"
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
localPort := 9090
cmd := exec.CommandContext(ctx, "kubevpn",
"run", "deploy/authors",
"-c", "authors",
"--debug",
"-v", fmt.Sprintf("%s:%s", dir, remoteDir),
"-p", fmt.Sprintf("%d:9080", localPort),
"--tty=false", //https://github.com/actions/runner/issues/241
"--rm",
"--entrypoint", "go", "run", fmt.Sprintf("%s/%s", remoteDir, name),
)
done := make(chan any)
var once = &sync.Once{}
go func() {
stdout, stderr, err := util.RunWithRollingOutWithChecker(cmd, func(log string) (stop bool) {
contains := strings.Contains(log, "Start listening http port 9080 ...")
if contains {
once.Do(func() {
close(done)
})
}
return contains
})
if err != nil {
select {
case <-done:
t.Log(err, stdout, stderr)
default:
t.Fatal(err, stdout, stderr)
}
}
}()
<-done
app := "authors"
ip, err := u.getPodIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", ip, localPort)
u.healthChecker(t, endpoint, nil, remoteSyncPod)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, remoteSyncPod)
endpoint = fmt.Sprintf("http://%s:%v/health", ip, 9080)
u.healthChecker(t, endpoint, nil, local)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
t.Run("kubevpnRunWithFullProxyStatus", u.checkRunWithFullProxyStatus)
t.Run("commonTest", u.commonTest)
err = cmd.Process.Signal(os.Interrupt)
if err != nil {
t.Fatal(err)
}
for cmd.ProcessState == nil {
}
}
func (u *ut) kubevpnRunWithServiceMesh(t *testing.T) {
path := u.writeTempFile(t)
name := filepath.Base(path)
dir := filepath.Dir(path)
remoteDir := "/app/test"
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
localPort := 9090
cmd := exec.CommandContext(ctx, "kubevpn",
"run", "deploy/authors",
"-c", "authors",
"--debug",
"--headers", "env=test",
"-v", fmt.Sprintf("%s:%s", dir, remoteDir),
"-p", fmt.Sprintf("%d:9080", localPort),
"--tty=false", //https://github.com/actions/runner/issues/241
"--rm",
"--entrypoint", "go", "run", fmt.Sprintf("%s/%s", remoteDir, name),
)
done := make(chan any)
var once = &sync.Once{}
go func() {
stdout, stderr, err := util.RunWithRollingOutWithChecker(cmd, func(log string) (stop bool) {
contains := strings.Contains(log, "Start listening http port 9080 ...")
if contains {
once.Do(func() {
close(done)
})
}
return contains
})
if err != nil {
select {
case <-done:
t.Log(err, stdout, stderr)
default:
t.Fatal(err, stdout, stderr)
}
}
}()
<-done
app := "authors"
ip, err := u.getServiceIP(app)
if err != nil {
t.Fatal(err)
}
endpoint := fmt.Sprintf("http://%s:%v/health", "localhost", localPort)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, remoteSyncPod)
u.healthChecker(t, endpoint, nil, remoteSyncPod)
endpoint = fmt.Sprintf("http://%s:%v/health", ip, 9080)
u.healthChecker(t, endpoint, nil, remoteSyncOrigin)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, local)
t.Run("kubevpnRunWithServiceMeshStatus", u.checkRunWithServiceMeshStatus)
t.Run("commonTest", u.commonTest)
err = cmd.Process.Signal(os.Interrupt)
if err != nil {
t.Fatal(err)
}
for cmd.ProcessState == nil {
}
}
func (u *ut) checkRunWithFullProxyStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err, string(output))
}
expect := status{List: []*connection{{
Namespace: u.namespace,
Status: "connected",
ProxyList: []*proxyItem{{
Namespace: u.namespace,
Workload: "deployments.apps/authors",
RuleList: []*proxyRule{{
Headers: nil,
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080, 80: 80},
}},
}},
}}}
var statuses status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
marshalB, _ := json.Marshal(statuses)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(marshalB))
}
}
func (u *ut) checkRunWithServiceMeshStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err, string(output))
}
expect := status{List: []*connection{{
Namespace: u.namespace,
Status: "connected",
ProxyList: []*proxyItem{{
Namespace: u.namespace,
Workload: "deployments.apps/authors",
RuleList: []*proxyRule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: true,
PortMap: map[int32]int32{9080: 9080, 80: 80},
}},
}},
}}}
var statuses status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
marshalB, _ := json.Marshal(statuses)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(marshalB))
}
}

View File

@@ -0,0 +1,304 @@
package handler
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
"testing"
"time"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
var content = `package main
import (
"encoding/json"
"log"
"net/http"
)
func main() {
http.HandleFunc("/health", health)
log.Println("Start listening http port 9080 ...")
if err := http.ListenAndServe(":9080", nil); err != nil {
panic(err)
}
}
func health(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
resp, err := json.Marshal(map[string]string{
"status": "Authors is healthy in pod",
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Println(err)
return
}
w.Write(resp)
}`
func (u *ut) kubevpnSyncWithFullProxy(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
endpoint := u.kubevpnSync(t, ctx, false)
u.healthChecker(t, endpoint, nil, remoteSyncPod)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, remoteSyncPod)
}
func (u *ut) kubevpnSync(t *testing.T, ctx context.Context, isServiceMesh bool) string {
path := u.writeTempFile(t)
name := filepath.Base(path)
dir := filepath.Dir(path)
remoteDir := "/app/test"
args := []string{"sync",
"deploy/authors",
"--debug",
"--sync", fmt.Sprintf("%s:%s", dir, remoteDir),
"-c", "authors",
"--target-image", "ghcr.io/kubenetworks/authors:latest",
}
if isServiceMesh {
args = append(args, "--headers", "env=test")
}
cmd := exec.Command("kubevpn", args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
t.Fatal(err)
}
list, err := util.GetRunningPodList(ctx, u.clientset, u.namespace, fields.OneTermEqualSelector("origin-workload", "authors").String())
if err != nil {
t.Fatal(err)
}
if len(list) == 0 {
t.Fatal("expect at least one pod")
}
remotePath := fmt.Sprintf("%s/%s", remoteDir, name)
containerName := "authors"
u.checkContent(ctx, t, list[0].Name, containerName, remotePath)
go u.execServer(ctx, t, list[0].Name, containerName, remotePath)
endpoint := fmt.Sprintf("http://%s:%v/health", list[0].Status.PodIP, 9080)
u.healthChecker(t, endpoint, nil, remoteSyncPod)
app := "authors"
ip, err := u.getServiceIP(app)
if err != nil {
t.Fatal(err)
}
endpoint = fmt.Sprintf("http://%s:%v/health", ip, 9080)
return endpoint
}
func (u *ut) execServer(ctx context.Context, t *testing.T, podName string, containerName string, remoteDir string) {
for ctx.Err() == nil {
output, err := util.Shell(
ctx,
u.clientset,
u.restconfig,
podName,
containerName,
u.namespace,
[]string{"go", "run", remoteDir},
)
if err != nil {
t.Log(err, output)
}
time.Sleep(time.Second * 1)
}
}
func (u *ut) kubevpnSyncWithServiceMesh(t *testing.T) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
endpoint := u.kubevpnSync(t, ctx, true)
u.healthChecker(t, endpoint, nil, remoteSyncOrigin)
u.healthChecker(t, endpoint, map[string]string{"env": "test"}, remoteSyncPod)
}
func (u *ut) checkContent(ctx context.Context, t *testing.T, podName string, containerName string, remotePath string) {
err := retry.OnError(
wait.Backoff{Duration: time.Second, Factor: 1, Jitter: 0, Steps: 120},
func(err error) bool { return err != nil },
func() error {
shell, err := util.Shell(
ctx,
u.clientset,
u.restconfig,
podName,
containerName,
u.namespace,
[]string{"cat", remotePath},
)
if err != nil {
return err
}
if strings.TrimSpace(shell) != strings.TrimSpace(content) {
t.Logf("expect: %s, but was: %s", content, shell)
return fmt.Errorf("expect: %s, but was: %s", content, shell)
}
return nil
})
if err != nil {
t.Fatal(err)
}
}
func (u *ut) TestCompile(t *testing.T) {
u.writeTempFile(t)
}
func (u *ut) writeTempFile(t *testing.T) string {
dir := t.TempDir()
file := filepath.Join(dir, "main.go")
temp, err := os.Create(file)
if err != nil {
t.Fatal(err)
}
_, err = temp.WriteString(content)
if err != nil {
t.Fatal(err)
}
err = temp.Chmod(0755)
if err != nil {
t.Fatal(err)
}
err = temp.Close()
if err != nil {
t.Fatal(err)
}
return temp.Name()
}
func (u *ut) checkSyncWithFullProxyStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err, string(output))
}
expect := status{List: []*connection{{
Namespace: u.namespace,
Status: "connected",
ProxyList: []*proxyItem{{
Namespace: u.namespace,
Workload: "deployments.apps/authors",
RuleList: []*proxyRule{{
Headers: nil,
CurrentDevice: false,
PortMap: map[int32]int32{9080: 9080, 80: 80},
}},
}},
SyncList: []*syncItem{{
Namespace: u.namespace,
Workload: "deploy/authors",
RuleList: []*syncRule{{}},
}},
}}}
var statuses status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if len(expect.List) == 0 || len(expect.List[0].SyncList) == 0 || len(expect.List[0].SyncList[0].RuleList) == 0 {
t.Fatal("expect List[0].SyncList[0].RuleList[0] not found", string(output))
}
expect.List[0].SyncList[0].RuleList[0].DstWorkload = statuses.List[0].SyncList[0].RuleList[0].DstWorkload
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
marshalB, _ := json.Marshal(statuses)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(marshalB))
}
}
func (u *ut) kubevpnUnSync(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err, string(output))
}
var statuses status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if len(statuses.List) == 0 || len(statuses.List[0].SyncList) == 0 || len(statuses.List[0].SyncList[0].RuleList) == 0 {
t.Fatal("expect List[0].SyncList[0].RuleList[0] not found", string(output))
}
cmd = exec.Command("kubevpn", "unsync", statuses.List[0].SyncList[0].RuleList[0].DstWorkload)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Run()
if err != nil {
t.Fatal(err)
}
}
func (u *ut) checkSyncWithServiceMeshStatus(t *testing.T) {
cmd := exec.Command("kubevpn", "status", "-o", "json")
output, err := cmd.Output()
if err != nil {
t.Fatal(err, string(output))
}
expect := status{List: []*connection{{
Namespace: u.namespace,
Status: "connected",
ProxyList: []*proxyItem{{
Namespace: u.namespace,
Workload: "deployments.apps/authors",
RuleList: []*proxyRule{{
Headers: map[string]string{"env": "test"},
CurrentDevice: false,
PortMap: map[int32]int32{9080: 9080, 80: 80},
}},
}},
SyncList: []*syncItem{{
Namespace: u.namespace,
Workload: "deploy/authors",
RuleList: []*syncRule{{}},
}},
}}}
var statuses status
if err = json.Unmarshal(output, &statuses); err != nil {
t.Fatal(err)
}
if len(expect.List) == 0 || len(expect.List[0].SyncList) == 0 || len(expect.List[0].SyncList[0].RuleList) == 0 {
t.Fatal("expect List[0].SyncList[0].RuleList[0] not found", string(output))
}
expect.List[0].SyncList[0].RuleList[0].DstWorkload = statuses.List[0].SyncList[0].RuleList[0].DstWorkload
if !reflect.DeepEqual(statuses, expect) {
marshal, _ := json.Marshal(expect)
marshalB, _ := json.Marshal(statuses)
t.Fatalf("expect: %s, but was: %s", string(marshal), string(marshalB))
}
}

View File

@@ -143,7 +143,11 @@ func createOutboundPod(ctx context.Context, clientset *kubernetes.Clientset, nam
return err
}
return waitPodReady(ctx, deploy, clientset.CoreV1().Pods(namespace))
_, selector, err := polymorphichelpers.SelectorsForObject(deploy)
if err != nil {
return err
}
return WaitPodReady(ctx, clientset.CoreV1().Pods(namespace), selector.String())
}
func genServiceAccount(namespace string) *v1.ServiceAccount {
@@ -426,11 +430,7 @@ func genDeploySpec(namespace, tcp10801, tcp9002, udp53, tcp80, image, imagePullS
return deploy
}
func waitPodReady(ctx context.Context, deploy *appsv1.Deployment, clientset corev1.PodInterface) error {
_, selector, err := polymorphichelpers.SelectorsForObject(deploy)
if err != nil {
return err
}
func WaitPodReady(ctx context.Context, clientset corev1.PodInterface, labelSelector string) error {
var isPodReady bool
var lastMessage string
ctx2, cancelFunc := context.WithTimeout(ctx, time.Minute*60)
@@ -438,10 +438,10 @@ func waitPodReady(ctx context.Context, deploy *appsv1.Deployment, clientset core
plog.G(ctx).Infoln()
wait.UntilWithContext(ctx2, func(ctx context.Context) {
podList, err := clientset.List(ctx2, metav1.ListOptions{
LabelSelector: selector.String(),
LabelSelector: labelSelector,
})
if err != nil {
plog.G(ctx).Errorf("Failed to list pods for %s: %v", deploy.Name, err)
plog.G(ctx).Errorf("Failed to list pods for %s: %v", labelSelector, err)
return
}
@@ -479,8 +479,8 @@ func waitPodReady(ctx context.Context, deploy *appsv1.Deployment, clientset core
}, time.Second*3)
if !isPodReady {
plog.G(ctx).Errorf("Wait pod %s to be ready timeout", deploy.Name)
return errors.New(fmt.Sprintf("wait pod %s to be ready timeout", deploy.Name))
plog.G(ctx).Errorf("Wait pod %s to be ready timeout", labelSelector)
return errors.New(fmt.Sprintf("wait pod %s to be ready timeout", labelSelector))
}
return nil

View File

@@ -5,12 +5,15 @@ import (
"encoding/json"
"fmt"
"net"
"net/netip"
"net/url"
"sort"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/joho/godotenv"
libconfig "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/netutil"
v1 "k8s.io/api/core/v1"
@@ -254,7 +257,7 @@ func (d *SyncOptions) DoSync(ctx context.Context, kubeconfigJsonBytes []byte, im
plog.G(ctx).Infof("Create sync resource %s/%s in target cluster", u.GetObjectKind().GroupVersionKind().GroupKind().String(), u.GetName())
plog.G(ctx).Infof("Wait for sync resource %s/%s to be ready", u.GetObjectKind().GroupVersionKind().GroupKind().String(), u.GetName())
plog.G(ctx).Infoln()
err = util.WaitPodToBeReady(ctx, d.clientset.CoreV1().Pods(d.Namespace), metav1.LabelSelector{MatchLabels: labelsMap})
err = WaitPodReady(ctx, d.clientset.CoreV1().Pods(d.Namespace), fields.SelectorFromSet(labelsMap).String())
if err != nil {
return err
}
@@ -283,7 +286,7 @@ func genSyncthingContainer(remoteDir string, syncDataDirName string, image strin
},
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("512Mi"),
},
Limits: map[v1.ResourceName]resource.Quantity{
@@ -329,11 +332,11 @@ func genVPNContainer(workload string, namespace string, image string, args []str
Env: []v1.EnvVar{},
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("1024Mi"),
v1.ResourceCPU: resource.MustParse("200m"),
v1.ResourceMemory: resource.MustParse("512Mi"),
},
Limits: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("2048Mi"),
},
},
@@ -500,3 +503,36 @@ func (d *SyncOptions) GetFactory() cmdutil.Factory {
func (d *SyncOptions) GetSyncthingGUIAddr() string {
return d.syncthingGUIAddr
}
func (d *SyncOptions) ConvertApiServerToNodeIP(ctx context.Context, kubeconfigBytes []byte) ([]byte, error) {
list, err := d.clientset.CoreV1().Pods(d.Namespace).List(ctx, metav1.ListOptions{Limit: 100})
if err != nil {
return nil, err
}
var result string
for _, item := range list.Items {
result, err = util.Shell(ctx, d.clientset, d.config, item.Name, "", d.Namespace, []string{"env"})
if err == nil {
break
}
}
parse, err := godotenv.Parse(strings.NewReader(result))
if err != nil {
return nil, err
}
host := parse["KUBERNETES_SERVICE_HOST"]
port := parse["KUBERNETES_SERVICE_PORT"]
addrPort, err := netip.ParseAddrPort(net.JoinHostPort(host, port))
if err != nil {
return nil, err
}
var newKubeconfigBytes []byte
newKubeconfigBytes, _, err = util.ModifyAPIServer(ctx, kubeconfigBytes, addrPort)
if err != nil {
return nil, err
}
return newKubeconfigBytes, nil
}

View File

@@ -12,10 +12,16 @@ import (
log "github.com/sirupsen/logrus"
glog "gvisor.dev/gvisor/pkg/log"
"k8s.io/utils/ptr"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)
func InitLoggerForClient() {
L = GetLoggerForClient(int32(log.InfoLevel), os.Stdout)
if config.Debug {
L = GetLoggerForClient(int32(log.DebugLevel), os.Stdout)
} else {
L = GetLoggerForClient(int32(log.InfoLevel), os.Stdout)
}
}
func GetLoggerForClient(level int32, out io.Writer) *log.Logger {

View File

@@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"github.com/calmh/incontainer"
typescontainer "github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/google/uuid"
@@ -65,7 +66,7 @@ func (option *Options) Main(ctx context.Context, sshConfig *pkgssh.SshConfig, co
mode := typescontainer.NetworkMode(option.ContainerOptions.netMode.NetworkMode())
if mode.IsContainer() {
plog.G(ctx).Infof("Network mode container is %s", mode.ConnectedContainer())
} else if mode.IsDefault() && util.RunningInContainer() {
} else if mode.IsDefault() && incontainer.Detect() {
hostname, err := os.Hostname()
if err != nil {
return err

View File

@@ -107,16 +107,13 @@ func (option *Options) ConvertPodToContainerConfigList(
var allPortMap = nat.PortMap{}
var allPortSet = nat.PortSet{}
for k, v := range hostConfig.PortBindings {
if oldValue, ok := allPortMap[k]; ok {
allPortMap[k] = append(oldValue, v...)
} else {
allPortMap[k] = v
}
}
for k, v := range conf.ExposedPorts {
allPortSet[k] = v
}
// user special -p hostPort:containerPort will work
for k, v := range hostConfig.PortBindings {
allPortMap[k] = v
}
lastContainerIdx := len(temp.Spec.Containers) - 1
lastContainerRandomName := util.Join(option.Namespace, temp.Spec.Containers[lastContainerIdx].Name, strings.ReplaceAll(uuid.New().String(), "-", "")[:5])
@@ -159,12 +156,11 @@ func (option *Options) ConvertPodToContainerConfigList(
} else {
portBinding = nat.PortBinding{HostPort: strconv.FormatInt(int64(port.ContainerPort), 10)}
}
if oldValue, ok := allPortMap[p]; ok {
allPortMap[p] = append(oldValue, portBinding)
} else {
// add only if not found
if allPortMap[p] == nil {
allPortMap[p] = []nat.PortBinding{portBinding}
allPortSet[p] = struct{}{}
}
allPortSet[p] = struct{}{}
}
// if netMode is empty, then 0 ~ last-1 use last container network
@@ -183,7 +179,7 @@ func (option *Options) ConvertPodToContainerConfigList(
}
for p, bindings := range allPortMap {
for _, binding := range bindings {
options = append(options, "--publish", fmt.Sprintf("%s:%s", p.Port(), binding.HostPort))
options = append(options, "--publish", fmt.Sprintf("%s:%s", binding.HostPort, p.Port()))
}
options = append(options, "--expose", p.Port())
}

View File

@@ -3,12 +3,10 @@ package ssh
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/netip"
"net/url"
"os"
"path/filepath"
"strconv"
@@ -18,11 +16,7 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
gossh "golang.org/x/crypto/ssh"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/clientcmd/api/latest"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
@@ -165,77 +159,6 @@ func SshJump(ctx context.Context, conf *SshConfig, kubeconfigBytes []byte, print
}
kubeconfigBytes = bytes.TrimSpace(stdout)
}
var clientConfig clientcmd.ClientConfig
clientConfig, err = clientcmd.NewClientConfigFromBytes(kubeconfigBytes)
if err != nil {
return
}
var rawConfig api.Config
rawConfig, err = clientConfig.RawConfig()
if err != nil {
plog.G(ctx).WithError(err).Errorf("failed to build config: %v", err)
return
}
if err = api.FlattenConfig(&rawConfig); err != nil {
plog.G(ctx).Errorf("failed to flatten config: %v", err)
return
}
if rawConfig.Contexts == nil {
err = errors.New("kubeconfig is invalid")
plog.G(ctx).Error("can not get contexts")
return
}
kubeContext := rawConfig.Contexts[rawConfig.CurrentContext]
if kubeContext == nil {
err = errors.New("kubeconfig is invalid")
plog.G(ctx).Errorf("can not find kubeconfig context %s", rawConfig.CurrentContext)
return
}
cluster := rawConfig.Clusters[kubeContext.Cluster]
if cluster == nil {
err = errors.New("kubeconfig is invalid")
plog.G(ctx).Errorf("can not find cluster %s", kubeContext.Cluster)
return
}
var u *url.URL
u, err = url.Parse(cluster.Server)
if err != nil {
plog.G(ctx).Errorf("failed to parse cluster url: %v", err)
return
}
serverHost := u.Hostname()
serverPort := u.Port()
if serverPort == "" {
if u.Scheme == "https" {
serverPort = "443"
} else if u.Scheme == "http" {
serverPort = "80"
} else {
// handle other schemes if necessary
err = errors.New("kubeconfig is invalid: wrong protocol")
plog.G(ctx).Error(err)
return
}
}
ips, err := net.LookupHost(serverHost)
if err != nil {
return
}
if len(ips) == 0 {
// handle error: no IP associated with the hostname
err = fmt.Errorf("kubeconfig: no IP associated with the hostname %s", serverHost)
plog.G(ctx).Error(err)
return
}
var remote netip.AddrPort
// Use the first IP address
remote, err = netip.ParseAddrPort(net.JoinHostPort(ips[0], serverPort))
if err != nil {
return
}
var port int
port, err = pkgutil.GetAvailableTCPPortOrDie()
if err != nil {
@@ -246,40 +169,28 @@ func SshJump(ctx context.Context, conf *SshConfig, kubeconfigBytes []byte, print
if err != nil {
return
}
var oldAPIServer netip.AddrPort
var newKubeconfigBytes []byte
newKubeconfigBytes, oldAPIServer, err = pkgutil.ModifyAPIServer(ctx, kubeconfigBytes, local)
if err != nil {
return
}
if print {
plog.G(ctx).Infof("Waiting jump to bastion host...")
plog.G(ctx).Infof("Jump ssh bastion host to apiserver: %s", cluster.Server)
plog.G(ctx).Infof("Jump ssh bastion host to apiserver: %s", oldAPIServer.String())
} else {
plog.G(ctx).Debugf("Waiting jump to bastion host...")
plog.G(ctx).Debugf("Jump ssh bastion host to apiserver: %s", cluster.Server)
plog.G(ctx).Debugf("Jump ssh bastion host to apiserver: %s", oldAPIServer.String())
}
err = PortMapUntil(ctx, conf, remote, local)
err = PortMapUntil(ctx, conf, oldAPIServer, local)
if err != nil {
plog.G(ctx).Errorf("SSH port map error: %v", err)
return
}
rawConfig.Clusters[rawConfig.Contexts[rawConfig.CurrentContext].Cluster].Server = fmt.Sprintf("%s://%s", u.Scheme, local.String())
rawConfig.Clusters[rawConfig.Contexts[rawConfig.CurrentContext].Cluster].TLSServerName = serverHost
// To Do: add cli option to skip tls verify
// rawConfig.Clusters[rawConfig.Contexts[rawConfig.CurrentContext].Cluster].CertificateAuthorityData = nil
// rawConfig.Clusters[rawConfig.Contexts[rawConfig.CurrentContext].Cluster].InsecureSkipTLSVerify = true
rawConfig.SetGroupVersionKind(schema.GroupVersionKind{Version: latest.Version, Kind: "Config"})
var convertedObj runtime.Object
convertedObj, err = latest.Scheme.ConvertToVersion(&rawConfig, latest.ExternalVersion)
if err != nil {
plog.G(ctx).Errorf("failed to build config: %v", err)
return
}
var marshal []byte
marshal, err = json.Marshal(convertedObj)
if err != nil {
plog.G(ctx).Errorf("failed to marshal config: %v", err)
return
}
path, err = pkgutil.ConvertToTempKubeconfigFile(marshal, GenKubeconfigTempPath(conf, kubeconfigBytes))
path, err = pkgutil.ConvertToTempKubeconfigFile(newKubeconfigBytes, GenKubeconfigTempPath(conf, kubeconfigBytes))
if err != nil {
plog.G(ctx).Errorf("failed to write kubeconfig: %v", err)
return

View File

@@ -5,6 +5,7 @@ import (
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/tlsutil"
"sigs.k8s.io/yaml"
)
func TestGenerateCertificate(t *testing.T) {
@@ -12,7 +13,11 @@ func TestGenerateCertificate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Log(cert)
marshal, err := yaml.Marshal(cert)
if err != nil {
t.Fatal(err)
}
t.Log(string(marshal))
id := protocol.NewDeviceID(cert.Certificate[0])
t.Log(id)
}

View File

@@ -9,7 +9,6 @@ import (
"testing"
"time"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
@@ -20,39 +19,41 @@ import (
"k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/yaml"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
var (
type cidrUt struct {
namespace string
clientset *kubernetes.Clientset
restclient *rest.RESTClient
restconfig *rest.Config
f util.Factory
)
}
func before() {
func (u *cidrUt) init() {
var err error
configFlags := genericclioptions.NewConfigFlags(true)
f = util.NewFactory(util.NewMatchVersionFlags(configFlags))
u.f = util.NewFactory(util.NewMatchVersionFlags(configFlags))
if restconfig, err = f.ToRESTConfig(); err != nil {
if u.restconfig, err = u.f.ToRESTConfig(); err != nil {
plog.G(context.Background()).Fatal(err)
}
if restclient, err = rest.RESTClientFor(restconfig); err != nil {
if u.restclient, err = rest.RESTClientFor(u.restconfig); err != nil {
plog.G(context.Background()).Fatal(err)
}
if clientset, err = kubernetes.NewForConfig(restconfig); err != nil {
if u.clientset, err = kubernetes.NewForConfig(u.restconfig); err != nil {
plog.G(context.Background()).Fatal(err)
}
if namespace, _, err = f.ToRawKubeConfigLoader().Namespace(); err != nil {
if u.namespace, _, err = u.f.ToRawKubeConfigLoader().Namespace(); err != nil {
plog.G(context.Background()).Fatal(err)
}
}
func TestByDumpClusterInfo(t *testing.T) {
before()
info, err := GetCIDRByDumpClusterInfo(context.Background(), clientset)
u := &cidrUt{}
u.init()
info, err := GetCIDRByDumpClusterInfo(context.Background(), u.clientset)
if err != nil {
t.Log(err.Error())
}
@@ -62,8 +63,9 @@ func TestByDumpClusterInfo(t *testing.T) {
}
func TestByCreateSvc(t *testing.T) {
before()
info, err := GetServiceCIDRByCreateService(context.Background(), clientset.CoreV1().Services("default"))
u := &cidrUt{}
u.init()
info, err := GetServiceCIDRByCreateService(context.Background(), u.clientset.CoreV1().Services("default"))
if err != nil {
t.Log(err.Error())
}
@@ -73,8 +75,9 @@ func TestByCreateSvc(t *testing.T) {
}
func TestElegant(t *testing.T) {
before()
elegant := GetCIDR(context.Background(), clientset, restconfig, namespace, config.Image)
u := &cidrUt{}
u.init()
elegant := GetCIDR(context.Background(), u.clientset, u.restconfig, u.namespace, config.Image)
for _, ipNet := range elegant {
t.Log(ipNet.String())
}

View File

@@ -1,9 +0,0 @@
package util
import "os"
// RunningInContainer returns true if the current process runs from inside a docker container.
func RunningInContainer() bool {
_, err := os.Stat("/.dockerenv")
return err == nil
}

View File

@@ -129,7 +129,7 @@ func WaitDockerContainerRunning(ctx context.Context, name string) error {
}
if inspect.State != nil && (inspect.State.Status == "exited" || inspect.State.Status == "dead" || inspect.State.Dead) {
err = errors.New(fmt.Sprintf("container status: %s", inspect.State.Status))
break
return err
}
if inspect.State != nil && inspect.State.Running {
break

57
pkg/util/exec.go Normal file
View File

@@ -0,0 +1,57 @@
package util
import (
"context"
"io"
"net/url"
"k8s.io/apimachinery/pkg/util/httpstream"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
)
// just same as bellow but add ctx for cancel
// pkg/mod/k8s.io/kubectl@v0.32.3/pkg/cmd/exec/exec.go:121
// DefaultRemoteExecutor is the standard implementation of remote command execution
type DefaultRemoteExecutor struct {
Ctx context.Context
}
func (r *DefaultRemoteExecutor) Execute(url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue remotecommand.TerminalSizeQueue) error {
exec, err := createExecutor(url, config)
if err != nil {
return err
}
return exec.StreamWithContext(r.Ctx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
})
}
// createExecutor returns the Executor or an error if one occurred.
func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Executor, error) {
exec, err := remotecommand.NewSPDYExecutor(config, "POST", url)
if err != nil {
return nil, err
}
// Fallback executor is default, unless feature flag is explicitly disabled.
if !cmdutil.RemoteCommandWebsockets.IsDisabled() {
// WebSocketExecutor must be "GET" method as described in RFC 6455 Sec. 4.1 (page 17).
websocketExec, err := remotecommand.NewWebSocketExecutor(config, "GET", url.String())
if err != nil {
return nil, err
}
exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})
if err != nil {
return nil, err
}
}
return exec, nil
}

View File

@@ -1,6 +1,7 @@
package util
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -10,6 +11,7 @@ import (
"os"
"strconv"
errors2 "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/clientcmd"
@@ -17,6 +19,8 @@ import (
"k8s.io/client-go/tools/clientcmd/api/latest"
clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"github.com/wencaiwulue/kubevpn/v2/pkg/log"
)
func GetKubeConfigPath(f cmdutil.Factory) string {
@@ -109,3 +113,97 @@ func ConvertConfig(factory cmdutil.Factory) ([]byte, error) {
}
return json.Marshal(convertedObj)
}
func ModifyAPIServer(ctx context.Context, kubeconfigBytes []byte, newAPIServer netip.AddrPort) ([]byte, netip.AddrPort, error) {
clientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfigBytes)
if err != nil {
return nil, netip.AddrPort{}, err
}
var rawConfig api.Config
rawConfig, err = clientConfig.RawConfig()
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to build config: %v", err)
return nil, netip.AddrPort{}, err
}
if err = api.FlattenConfig(&rawConfig); err != nil {
log.G(ctx).Errorf("failed to flatten config: %v", err)
return nil, netip.AddrPort{}, err
}
if rawConfig.Contexts == nil {
err = errors2.New("kubeconfig is invalid")
log.G(ctx).Error("can not get contexts")
return nil, netip.AddrPort{}, err
}
kubeContext := rawConfig.Contexts[rawConfig.CurrentContext]
if kubeContext == nil {
err = errors2.New("kubeconfig is invalid")
log.G(ctx).Errorf("can not find kubeconfig context %s", rawConfig.CurrentContext)
return nil, netip.AddrPort{}, err
}
cluster := rawConfig.Clusters[kubeContext.Cluster]
if cluster == nil {
err = errors2.New("kubeconfig is invalid")
log.G(ctx).Errorf("can not find cluster %s", kubeContext.Cluster)
return nil, netip.AddrPort{}, err
}
var u *url.URL
u, err = url.Parse(cluster.Server)
if err != nil {
log.G(ctx).Errorf("failed to parse cluster url: %v", err)
return nil, netip.AddrPort{}, err
}
serverHost := u.Hostname()
serverPort := u.Port()
if serverPort == "" {
if u.Scheme == "https" {
serverPort = "443"
} else if u.Scheme == "http" {
serverPort = "80"
} else {
// handle other schemes if necessary
err = errors2.New("kubeconfig is invalid: wrong protocol")
log.G(ctx).Error(err)
return nil, netip.AddrPort{}, err
}
}
ips, err := net.LookupHost(serverHost)
if err != nil {
return nil, netip.AddrPort{}, err
}
if len(ips) == 0 {
// handle error: no IP associated with the hostname
err = fmt.Errorf("kubeconfig: no IP associated with the hostname %s", serverHost)
log.G(ctx).Error(err)
return nil, netip.AddrPort{}, err
}
var remote netip.AddrPort
// Use the first IP address
remote, err = netip.ParseAddrPort(net.JoinHostPort(ips[0], serverPort))
if err != nil {
return nil, netip.AddrPort{}, err
}
rawConfig.Clusters[rawConfig.Contexts[rawConfig.CurrentContext].Cluster].Server = fmt.Sprintf("%s://%s", u.Scheme, newAPIServer.String())
rawConfig.Clusters[rawConfig.Contexts[rawConfig.CurrentContext].Cluster].TLSServerName = serverHost
// To Do: add cli option to skip tls verify
// rawConfig.Clusters[rawConfig.Contexts[rawConfig.CurrentContext].Cluster].CertificateAuthorityData = nil
// rawConfig.Clusters[rawConfig.Contexts[rawConfig.CurrentContext].Cluster].InsecureSkipTLSVerify = true
rawConfig.SetGroupVersionKind(schema.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"})
var convertedObj runtime.Object
convertedObj, err = clientcmdlatest.Scheme.ConvertToVersion(&rawConfig, clientcmdlatest.ExternalVersion)
if err != nil {
log.G(ctx).Errorf("failed to build config: %v", err)
return nil, netip.AddrPort{}, err
}
var marshal []byte
marshal, err = json.Marshal(convertedObj)
if err != nil {
log.G(ctx).Errorf("failed to marshal config: %v", err)
return nil, netip.AddrPort{}, err
}
return marshal, remote, nil
}

View File

@@ -191,6 +191,7 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
func Shell(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Config, podName, containerName, ns string, cmd []string) (string, error) {
stdin, _, _ := term.StdStreams()
buf := bytes.NewBuffer(nil)
errBuf := bytes.NewBuffer(nil)
options := exec.ExecOptions{
StreamOptions: exec.StreamOptions{
Namespace: ns,
@@ -199,67 +200,19 @@ func Shell(ctx context.Context, clientset *kubernetes.Clientset, config *rest.Co
Stdin: false,
TTY: false,
Quiet: true,
IOStreams: genericiooptions.IOStreams{In: stdin, Out: buf, ErrOut: io.Discard},
IOStreams: genericiooptions.IOStreams{In: stdin, Out: buf, ErrOut: errBuf},
},
Command: cmd,
Executor: &exec.DefaultRemoteExecutor{},
Executor: &DefaultRemoteExecutor{Ctx: ctx},
PodClient: clientset.CoreV1(),
Config: config,
}
if err := options.Run(); err != nil {
return "", err
return errBuf.String(), err
}
return strings.TrimRight(buf.String(), "\n"), nil
}
func WaitPodToBeReady(ctx context.Context, podInterface v12.PodInterface, selector v1.LabelSelector) error {
watchStream, err := podInterface.Watch(ctx, v1.ListOptions{
LabelSelector: fields.SelectorFromSet(selector.MatchLabels).String(),
})
if err != nil {
return err
}
defer watchStream.Stop()
var last string
ticker := time.NewTicker(time.Minute * 60)
defer ticker.Stop()
for {
select {
case e, ok := <-watchStream.ResultChan():
if !ok {
return fmt.Errorf("can not wait pod to be ready because of watch chan has closed")
}
if podT, ok := e.Object.(*corev1.Pod); ok {
if podT.DeletionTimestamp != nil {
continue
}
var sb = bytes.NewBuffer(nil)
sb.WriteString(fmt.Sprintf("Pod %s is %s...\n", podT.Name, podT.Status.Phase))
PrintStatus(podT, sb)
if last != sb.String() {
plog.G(ctx).Infof(sb.String())
}
last = sb.String()
if podutils.IsPodReady(podT) && func() bool {
for _, status := range podT.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
}() {
return nil
}
}
case <-ticker.C:
return errors.New(fmt.Sprintf("wait pod to be ready timeout"))
case <-ctx.Done():
return ctx.Err()
}
}
}
func AllContainerIsRunning(pod *corev1.Pod) bool {
isReady := podutils.IsPodReady(pod)
if !isReady {