From cacb65efb91614a90faeff0f75e4c388cb7a41ae Mon Sep 17 00:00:00 2001 From: fengcaiwen Date: Sat, 9 Sep 2023 12:18:57 +0800 Subject: [PATCH] feat: restore patch works fine --- cmd/kubevpn/cmds/daemon.go | 14 ++++++- pkg/config/const.go | 2 +- pkg/daemon/client.go | 3 ++ pkg/handler/connect.go | 21 ++++++++--- pkg/handler/envoy.go | 41 ++++++++++++-------- pkg/handler/function_test.go | 28 +++++++++++--- pkg/handler/remote.go | 73 ++++++++++++++++++++++++++---------- pkg/util/ssh.go | 11 +++--- pkg/util/util.go | 6 +-- 9 files changed, 145 insertions(+), 54 deletions(-) diff --git a/cmd/kubevpn/cmds/daemon.go b/cmd/kubevpn/cmds/daemon.go index fbd4e87b..22a4719b 100644 --- a/cmd/kubevpn/cmds/daemon.go +++ b/cmd/kubevpn/cmds/daemon.go @@ -6,6 +6,7 @@ import ( cmdutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/i18n" "os" + "strconv" "github.com/wencaiwulue/kubevpn/pkg/daemon" ) @@ -22,7 +23,18 @@ func CmdDaemon(_ cmdutil.Factory) *cobra.Command { if err != nil && !errors.Is(err, os.ErrNotExist) { return err } - return nil + pidPath := daemon.GetPidPath(opt.IsSudo) + err = os.Remove(pidPath) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return err + } + pid := os.Getpid() + err = os.WriteFile(pidPath, []byte(strconv.Itoa(pid)), os.ModePerm) + if err != nil { + return err + } + err = os.Chmod(pidPath, os.ModePerm) + return err }, RunE: func(cmd *cobra.Command, args []string) error { defer opt.Stop() diff --git a/pkg/config/const.go b/pkg/config/const.go index 24740260..fede331b 100644 --- a/pkg/config/const.go +++ b/pkg/config/const.go @@ -14,7 +14,7 @@ const ( LogFile = "daemon.log" - KubeVPNRestorePatchKey = "kubevpnrestorepatch" + KubeVPNRestorePatchKey = "kubevpn-probe-restore-patch" ) func init() { diff --git a/pkg/daemon/client.go b/pkg/daemon/client.go index 9228015d..0b1eb46b 100644 --- a/pkg/daemon/client.go +++ b/pkg/daemon/client.go @@ -25,6 +25,9 @@ import ( var daemonClient, sudoDaemonClient rpc.DaemonClient func GetClient(isSudo bool) rpc.DaemonClient { + if _, err := os.Stat(GetSockPath(isSudo)); errors.Is(err, os.ErrNotExist) { + return nil + } if isSudo && sudoDaemonClient != nil { return sudoDaemonClient } diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index f6595c5e..7464bd86 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -769,11 +769,22 @@ func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (e errChan := make(chan error, 1) readyChan := make(chan struct{}, 1) go func() { - err := util.Main(ctx, &remote, local, conf, readyChan) - if err != nil { - log.Errorf("ssh forward failed err: %v", err) - errChan <- err - return + for { + select { + case <-ctx.Done(): + return + default: + } + + err := util.Main(ctx, &remote, local, conf, readyChan) + if err != nil { + if !errors.Is(err, context.Canceled) { + log.Errorf("ssh forward failed err: %v", err) + } + select { + case errChan <- err: + } + } } }() log.Infof("wait jump to bastion host...") diff --git a/pkg/handler/envoy.go b/pkg/handler/envoy.go index b3dea461..cc5c7a77 100644 --- a/pkg/handler/envoy.go +++ b/pkg/handler/envoy.go @@ -14,6 +14,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + k8sjson "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" pkgresource "k8s.io/cli-runtime/pkg/resource" runtimeresource "k8s.io/cli-runtime/pkg/resource" @@ -77,7 +78,7 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli // (1) add mesh container removePatch, restorePatch := patch(*origin, path) var b []byte - b, err = json.Marshal(restorePatch) + b, err = k8sjson.Marshal(restorePatch) if err != nil { return err } @@ -97,7 +98,7 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli }, } var bytes []byte - bytes, err = json.Marshal(append(ps, removePatch...)) + bytes, err = k8sjson.Marshal(append(ps, removePatch...)) if err != nil { return err } @@ -140,21 +141,21 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa return err } - var ps []P + mesh.RemoveContainers(templateSpec) if u.GetAnnotations() != nil && u.GetAnnotations()[config.KubeVPNRestorePatchKey] != "" { patchStr := u.GetAnnotations()[config.KubeVPNRestorePatchKey] + var ps []P err = json.Unmarshal([]byte(patchStr), &ps) if err != nil { return fmt.Errorf("unmarshal json patch: %s failed, err: %v", patchStr, err) } + fromPatchToProbe(templateSpec, depth, ps) } if empty { - mesh.RemoveContainers(templateSpec) helper := pkgresource.NewHelper(object.Client, object.Mapping) // pod without controller if len(depth) == 0 { - fromPatchToProbe(templateSpec, ps) delete(templateSpec.ObjectMeta.GetAnnotations(), config.KubeVPNRestorePatchKey) pod := &v1.Pod{ObjectMeta: templateSpec.ObjectMeta, Spec: templateSpec.Spec} CleanupUselessInfo(pod) @@ -164,7 +165,7 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa // resource with controller, like deployment,statefulset var bytes []byte - bytes, err = json.Marshal(append(ps, []P{ + bytes, err = json.Marshal([]P{ { Op: "replace", Path: "/" + strings.Join(append(depth, "spec"), "/"), @@ -174,8 +175,8 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa Op: "replace", Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey, Value: "", - }}..., - )) + }, + }) if err != nil { return err } @@ -216,13 +217,23 @@ func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, tunIP ut }}, }) } else { - v[index].Rules = append(v[index].Rules, &controlplane.Rule{ - Headers: headers, - LocalTunIPv4: tunIP.LocalTunIPv4, - LocalTunIPv6: tunIP.LocalTunIPv6, - }) - if v[index].Ports == nil { - v[index].Ports = port + var found bool + for j, rule := range v[index].Rules { + if rule.LocalTunIPv4 == tunIP.LocalTunIPv4 && + rule.LocalTunIPv6 == tunIP.LocalTunIPv6 { + found = true + v[index].Rules[j].Headers = util.Merge[string, string](v[index].Rules[j].Headers, headers) + } + } + if !found { + v[index].Rules = append(v[index].Rules, &controlplane.Rule{ + Headers: headers, + LocalTunIPv4: tunIP.LocalTunIPv4, + LocalTunIPv6: tunIP.LocalTunIPv6, + }) + if v[index].Ports == nil { + v[index].Ports = port + } } } diff --git a/pkg/handler/function_test.go b/pkg/handler/function_test.go index 2a795842..91b588e1 100644 --- a/pkg/handler/function_test.go +++ b/pkg/handler/function_test.go @@ -2,13 +2,15 @@ package handler import ( "context" - "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/util/intstr" + json2 "k8s.io/apimachinery/pkg/util/json" "net" "net/http" "os/exec" "reflect" "runtime" + "sigs.k8s.io/yaml" "strings" "sync" "testing" @@ -371,11 +373,27 @@ func TestArray(t *testing.T) { } } func TestPatch(t *testing.T) { - s := "W3sib3AiOiJyZXBsYWNlIiwicGF0aCI6Ii9zcGVjL3RlbXBsYXRlL3NwZWMvY29udGFpbmVycy8wL3JlYWRpbmVzc1Byb2JlIiwidmFsdWUiOm51bGx9LHsib3AiOiJyZXBsYWNlIiwicGF0aCI6Ii9zcGVjL3RlbXBsYXRlL3NwZWMvY29udGFpbmVycy8wL2xpdmVuZXNzUHJvYmUiLCJ2YWx1ZSI6eyJodHRwR2V0Ijp7InBhdGgiOiIvaGVhbHRoIiwicG9ydCI6OTA4MCwic2NoZW1lIjoiSFRUUCJ9LCJ0aW1lb3V0U2Vjb25kcyI6MSwicGVyaW9kU2Vjb25kcyI6MTAsInN1Y2Nlc3NUaHJlc2hvbGQiOjEsImZhaWx1cmVUaHJlc2hvbGQiOjN9fSx7Im9wIjoicmVwbGFjZSIsInBhdGgiOiIvc3BlYy90ZW1wbGF0ZS9zcGVjL2NvbnRhaW5lcnMvMC9zdGFydHVwUHJvYmUiLCJ2YWx1ZSI6bnVsbH0seyJvcCI6InJlcGxhY2UiLCJwYXRoIjoiL3NwZWMvdGVtcGxhdGUvc3BlYy9jb250YWluZXJzLzEvcmVhZGluZXNzUHJvYmUiLCJ2YWx1ZSI6bnVsbH0seyJvcCI6InJlcGxhY2UiLCJwYXRoIjoiL3NwZWMvdGVtcGxhdGUvc3BlYy9jb250YWluZXJzLzEvbGl2ZW5lc3NQcm9iZSIsInZhbHVlIjpudWxsfSx7Im9wIjoicmVwbGFjZSIsInBhdGgiOiIvc3BlYy90ZW1wbGF0ZS9zcGVjL2NvbnRhaW5lcnMvMS9zdGFydHVwUHJvYmUiLCJ2YWx1ZSI6bnVsbH1d" - var pp []P - err := json.Unmarshal([]byte(s), &pp) + var p = corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromInt(9080), + Scheme: "HTTP", + }}, + } + marshal, err := json2.Marshal(p) if err != nil { panic(err) } - fmt.Println(pp) + fmt.Println(string(marshal)) + + var pp corev1.Probe + err = json2.Unmarshal(marshal, &pp) + if err != nil { + panic(err) + } + bytes, err := yaml.Marshal(pp) + if err != nil { + panic(err) + } + fmt.Println(string(bytes)) } diff --git a/pkg/handler/remote.go b/pkg/handler/remote.go index f85196e0..dd49e7b0 100644 --- a/pkg/handler/remote.go +++ b/pkg/handler/remote.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + k8sjson "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" pkgresource "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/kubernetes" @@ -527,8 +528,8 @@ func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace, Value: b, }, } - bytes, _ := json.Marshal(append(p, removePatch...)) - _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) + marshal, _ := json.Marshal(append(p, removePatch...)) + _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, marshal, &metav1.PatchOptions{}) if err != nil { log.Errorf("error while inject proxy container, err: %v, exiting...", err) return err @@ -656,6 +657,17 @@ func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) { readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/") livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/") startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/") + f := func(p *v1.Probe) string { + if p == nil { + return "" + } + marshal, err := k8sjson.Marshal(p) + if err != nil { + log.Error(err) + return "" + } + return string(marshal) + } remove = append(remove, P{ Op: "replace", Path: readinessPath, @@ -672,41 +684,64 @@ func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) { restore = append(restore, P{ Op: "replace", Path: readinessPath, - Value: spec.Spec.Containers[i].ReadinessProbe, + Value: f(spec.Spec.Containers[i].ReadinessProbe), }, P{ Op: "replace", Path: livenessPath, - Value: spec.Spec.Containers[i].LivenessProbe, + Value: f(spec.Spec.Containers[i].LivenessProbe), }, P{ Op: "replace", Path: startupPath, - Value: spec.Spec.Containers[i].StartupProbe, + Value: f(spec.Spec.Containers[i].StartupProbe), }) } return } -func fromPatchToProbe(spec *v1.PodTemplateSpec, patch []P) { +func fromPatchToProbe(spec *v1.PodTemplateSpec, path []string, patch []P) { // 3 = readiness + liveness + startup if len(patch) != 3*len(spec.Spec.Containers) { log.Debugf("patch not match container num, not restore") return } for i := range spec.Spec.Containers { - ps := patch[i*3 : i*3+3] - marshal, err := json.Marshal(ps[0].Value) - if err == nil { - err = json.Unmarshal(marshal, spec.Spec.Containers[i].ReadinessProbe) + index := strconv.Itoa(i) + readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/") + livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/") + startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/") + var f = func(value any) *v1.Probe { + if value == nil { + return nil + } + str, ok := value.(string) + if ok && str == "" { + return nil + } + if !ok { + marshal, err := k8sjson.Marshal(value) + if err != nil { + log.Error(err) + return nil + } + str = string(marshal) + } + var probe v1.Probe + err := k8sjson.Unmarshal([]byte(str), &probe) + if err != nil { + log.Error(err) + return nil + } + return &probe } - - marshal, err = json.Marshal(ps[1].Value) - if err == nil { - err = json.Unmarshal(marshal, spec.Spec.Containers[i].LivenessProbe) - } - - marshal, err = json.Marshal(ps[2].Value) - if err == nil { - err = json.Unmarshal(marshal, spec.Spec.Containers[i].StartupProbe) + for _, p := range patch { + switch p.Path { + case readinessPath: + spec.Spec.Containers[i].ReadinessProbe = f(p.Value) + case livenessPath: + spec.Spec.Containers[i].LivenessProbe = f(p.Value) + case startupPath: + spec.Spec.Containers[i].StartupProbe = f(p.Value) + } } } } diff --git a/pkg/util/ssh.go b/pkg/util/ssh.go index ea2b44db..8430d684 100644 --- a/pkg/util/ssh.go +++ b/pkg/util/ssh.go @@ -69,7 +69,9 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co if err != nil { return err } - done <- struct{}{} + select { + case done <- struct{}{}: + } // handle incoming connections on reverse forwarded tunnel for { select { @@ -80,10 +82,9 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co local, err := listen.Accept() if err != nil { - log.Error(err) - continue + return err } - go func() { + go func(local net.Conn) { defer local.Close() var conn net.Conn var err error @@ -99,7 +100,7 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co } defer conn.Close() handleClient(local, conn) - }() + }(local) } } diff --git a/pkg/util/util.go b/pkg/util/util.go index 5c66c2c1..11c3596a 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -406,7 +406,7 @@ func RolloutStatus(ctx1 context.Context, factory cmdutil.Factory, namespace, wor if err != nil { return false, err } - _, _ = fmt.Fprintf(os.Stdout, "%s", status) + log.Infof("%s", status) // Quit waiting if the rollout is done if done { return true, nil @@ -752,8 +752,8 @@ func MoveToTemp() { } func Merge[K comparable, V any](fromMap, ToMap map[K]V) map[K]V { - for keyToMap, valueToMap := range ToMap { - fromMap[keyToMap] = valueToMap + for k, v := range ToMap { + fromMap[k] = v } if fromMap == nil { // merge(nil, map[string]interface{...}) -> map[string]interface{...}