feat: restore patch works fine

This commit is contained in:
fengcaiwen
2023-09-09 12:18:57 +08:00
committed by naison
parent b9798e66f0
commit cacb65efb9
9 changed files with 145 additions and 54 deletions

View File

@@ -6,6 +6,7 @@ import (
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/i18n" "k8s.io/kubectl/pkg/util/i18n"
"os" "os"
"strconv"
"github.com/wencaiwulue/kubevpn/pkg/daemon" "github.com/wencaiwulue/kubevpn/pkg/daemon"
) )
@@ -22,7 +23,18 @@ func CmdDaemon(_ cmdutil.Factory) *cobra.Command {
if err != nil && !errors.Is(err, os.ErrNotExist) { if err != nil && !errors.Is(err, os.ErrNotExist) {
return err return err
} }
return nil pidPath := daemon.GetPidPath(opt.IsSudo)
err = os.Remove(pidPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
pid := os.Getpid()
err = os.WriteFile(pidPath, []byte(strconv.Itoa(pid)), os.ModePerm)
if err != nil {
return err
}
err = os.Chmod(pidPath, os.ModePerm)
return err
}, },
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
defer opt.Stop() defer opt.Stop()

View File

@@ -14,7 +14,7 @@ const (
LogFile = "daemon.log" LogFile = "daemon.log"
KubeVPNRestorePatchKey = "kubevpnrestorepatch" KubeVPNRestorePatchKey = "kubevpn-probe-restore-patch"
) )
func init() { func init() {

View File

@@ -25,6 +25,9 @@ import (
var daemonClient, sudoDaemonClient rpc.DaemonClient var daemonClient, sudoDaemonClient rpc.DaemonClient
func GetClient(isSudo bool) rpc.DaemonClient { func GetClient(isSudo bool) rpc.DaemonClient {
if _, err := os.Stat(GetSockPath(isSudo)); errors.Is(err, os.ErrNotExist) {
return nil
}
if isSudo && sudoDaemonClient != nil { if isSudo && sudoDaemonClient != nil {
return sudoDaemonClient return sudoDaemonClient
} }

View File

@@ -769,11 +769,22 @@ func SshJump(ctx context.Context, conf *util.SshConfig, flags *pflag.FlagSet) (e
errChan := make(chan error, 1) errChan := make(chan error, 1)
readyChan := make(chan struct{}, 1) readyChan := make(chan struct{}, 1)
go func() { go func() {
for {
select {
case <-ctx.Done():
return
default:
}
err := util.Main(ctx, &remote, local, conf, readyChan) err := util.Main(ctx, &remote, local, conf, readyChan)
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) {
log.Errorf("ssh forward failed err: %v", err) log.Errorf("ssh forward failed err: %v", err)
errChan <- err }
return select {
case errChan <- err:
}
}
} }
}() }()
log.Infof("wait jump to bastion host...") log.Infof("wait jump to bastion host...")

View File

@@ -14,6 +14,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
k8sjson "k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
pkgresource "k8s.io/cli-runtime/pkg/resource" pkgresource "k8s.io/cli-runtime/pkg/resource"
runtimeresource "k8s.io/cli-runtime/pkg/resource" runtimeresource "k8s.io/cli-runtime/pkg/resource"
@@ -77,7 +78,7 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli
// (1) add mesh container // (1) add mesh container
removePatch, restorePatch := patch(*origin, path) removePatch, restorePatch := patch(*origin, path)
var b []byte var b []byte
b, err = json.Marshal(restorePatch) b, err = k8sjson.Marshal(restorePatch)
if err != nil { if err != nil {
return err return err
} }
@@ -97,7 +98,7 @@ func InjectVPNAndEnvoySidecar(ctx1 context.Context, factory cmdutil.Factory, cli
}, },
} }
var bytes []byte var bytes []byte
bytes, err = json.Marshal(append(ps, removePatch...)) bytes, err = k8sjson.Marshal(append(ps, removePatch...))
if err != nil { if err != nil {
return err return err
} }
@@ -140,21 +141,21 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
return err return err
} }
var ps []P mesh.RemoveContainers(templateSpec)
if u.GetAnnotations() != nil && u.GetAnnotations()[config.KubeVPNRestorePatchKey] != "" { if u.GetAnnotations() != nil && u.GetAnnotations()[config.KubeVPNRestorePatchKey] != "" {
patchStr := u.GetAnnotations()[config.KubeVPNRestorePatchKey] patchStr := u.GetAnnotations()[config.KubeVPNRestorePatchKey]
var ps []P
err = json.Unmarshal([]byte(patchStr), &ps) err = json.Unmarshal([]byte(patchStr), &ps)
if err != nil { if err != nil {
return fmt.Errorf("unmarshal json patch: %s failed, err: %v", patchStr, err) return fmt.Errorf("unmarshal json patch: %s failed, err: %v", patchStr, err)
} }
fromPatchToProbe(templateSpec, depth, ps)
} }
if empty { if empty {
mesh.RemoveContainers(templateSpec)
helper := pkgresource.NewHelper(object.Client, object.Mapping) helper := pkgresource.NewHelper(object.Client, object.Mapping)
// pod without controller // pod without controller
if len(depth) == 0 { if len(depth) == 0 {
fromPatchToProbe(templateSpec, ps)
delete(templateSpec.ObjectMeta.GetAnnotations(), config.KubeVPNRestorePatchKey) delete(templateSpec.ObjectMeta.GetAnnotations(), config.KubeVPNRestorePatchKey)
pod := &v1.Pod{ObjectMeta: templateSpec.ObjectMeta, Spec: templateSpec.Spec} pod := &v1.Pod{ObjectMeta: templateSpec.ObjectMeta, Spec: templateSpec.Spec}
CleanupUselessInfo(pod) CleanupUselessInfo(pod)
@@ -164,7 +165,7 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
// resource with controller, like deployment,statefulset // resource with controller, like deployment,statefulset
var bytes []byte var bytes []byte
bytes, err = json.Marshal(append(ps, []P{ bytes, err = json.Marshal([]P{
{ {
Op: "replace", Op: "replace",
Path: "/" + strings.Join(append(depth, "spec"), "/"), Path: "/" + strings.Join(append(depth, "spec"), "/"),
@@ -174,8 +175,8 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
Op: "replace", Op: "replace",
Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey, Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey,
Value: "", Value: "",
}}..., },
)) })
if err != nil { if err != nil {
return err return err
} }
@@ -216,6 +217,15 @@ func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, tunIP ut
}}, }},
}) })
} else { } else {
var found bool
for j, rule := range v[index].Rules {
if rule.LocalTunIPv4 == tunIP.LocalTunIPv4 &&
rule.LocalTunIPv6 == tunIP.LocalTunIPv6 {
found = true
v[index].Rules[j].Headers = util.Merge[string, string](v[index].Rules[j].Headers, headers)
}
}
if !found {
v[index].Rules = append(v[index].Rules, &controlplane.Rule{ v[index].Rules = append(v[index].Rules, &controlplane.Rule{
Headers: headers, Headers: headers,
LocalTunIPv4: tunIP.LocalTunIPv4, LocalTunIPv4: tunIP.LocalTunIPv4,
@@ -225,6 +235,7 @@ func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, tunIP ut
v[index].Ports = port v[index].Ports = port
} }
} }
}
marshal, err := yaml.Marshal(v) marshal, err := yaml.Marshal(v)
if err != nil { if err != nil {

View File

@@ -2,13 +2,15 @@ package handler
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"k8s.io/apimachinery/pkg/util/intstr"
json2 "k8s.io/apimachinery/pkg/util/json"
"net" "net"
"net/http" "net/http"
"os/exec" "os/exec"
"reflect" "reflect"
"runtime" "runtime"
"sigs.k8s.io/yaml"
"strings" "strings"
"sync" "sync"
"testing" "testing"
@@ -371,11 +373,27 @@ func TestArray(t *testing.T) {
} }
} }
func TestPatch(t *testing.T) { func TestPatch(t *testing.T) {
s := "W3sib3AiOiJyZXBsYWNlIiwicGF0aCI6Ii9zcGVjL3RlbXBsYXRlL3NwZWMvY29udGFpbmVycy8wL3JlYWRpbmVzc1Byb2JlIiwidmFsdWUiOm51bGx9LHsib3AiOiJyZXBsYWNlIiwicGF0aCI6Ii9zcGVjL3RlbXBsYXRlL3NwZWMvY29udGFpbmVycy8wL2xpdmVuZXNzUHJvYmUiLCJ2YWx1ZSI6eyJodHRwR2V0Ijp7InBhdGgiOiIvaGVhbHRoIiwicG9ydCI6OTA4MCwic2NoZW1lIjoiSFRUUCJ9LCJ0aW1lb3V0U2Vjb25kcyI6MSwicGVyaW9kU2Vjb25kcyI6MTAsInN1Y2Nlc3NUaHJlc2hvbGQiOjEsImZhaWx1cmVUaHJlc2hvbGQiOjN9fSx7Im9wIjoicmVwbGFjZSIsInBhdGgiOiIvc3BlYy90ZW1wbGF0ZS9zcGVjL2NvbnRhaW5lcnMvMC9zdGFydHVwUHJvYmUiLCJ2YWx1ZSI6bnVsbH0seyJvcCI6InJlcGxhY2UiLCJwYXRoIjoiL3NwZWMvdGVtcGxhdGUvc3BlYy9jb250YWluZXJzLzEvcmVhZGluZXNzUHJvYmUiLCJ2YWx1ZSI6bnVsbH0seyJvcCI6InJlcGxhY2UiLCJwYXRoIjoiL3NwZWMvdGVtcGxhdGUvc3BlYy9jb250YWluZXJzLzEvbGl2ZW5lc3NQcm9iZSIsInZhbHVlIjpudWxsfSx7Im9wIjoicmVwbGFjZSIsInBhdGgiOiIvc3BlYy90ZW1wbGF0ZS9zcGVjL2NvbnRhaW5lcnMvMS9zdGFydHVwUHJvYmUiLCJ2YWx1ZSI6bnVsbH1d" var p = corev1.Probe{
var pp []P ProbeHandler: corev1.ProbeHandler{HTTPGet: &corev1.HTTPGetAction{
err := json.Unmarshal([]byte(s), &pp) Path: "/health",
Port: intstr.FromInt(9080),
Scheme: "HTTP",
}},
}
marshal, err := json2.Marshal(p)
if err != nil { if err != nil {
panic(err) panic(err)
} }
fmt.Println(pp) fmt.Println(string(marshal))
var pp corev1.Probe
err = json2.Unmarshal(marshal, &pp)
if err != nil {
panic(err)
}
bytes, err := yaml.Marshal(pp)
if err != nil {
panic(err)
}
fmt.Println(string(bytes))
} }

View File

@@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
k8sjson "k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
pkgresource "k8s.io/cli-runtime/pkg/resource" pkgresource "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
@@ -527,8 +528,8 @@ func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace,
Value: b, Value: b,
}, },
} }
bytes, _ := json.Marshal(append(p, removePatch...)) marshal, _ := json.Marshal(append(p, removePatch...))
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{}) _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, marshal, &metav1.PatchOptions{})
if err != nil { if err != nil {
log.Errorf("error while inject proxy container, err: %v, exiting...", err) log.Errorf("error while inject proxy container, err: %v, exiting...", err)
return err return err
@@ -656,6 +657,17 @@ func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) {
readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/") readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/")
livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/") livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/")
startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/") startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/")
f := func(p *v1.Probe) string {
if p == nil {
return ""
}
marshal, err := k8sjson.Marshal(p)
if err != nil {
log.Error(err)
return ""
}
return string(marshal)
}
remove = append(remove, P{ remove = append(remove, P{
Op: "replace", Op: "replace",
Path: readinessPath, Path: readinessPath,
@@ -672,41 +684,64 @@ func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) {
restore = append(restore, P{ restore = append(restore, P{
Op: "replace", Op: "replace",
Path: readinessPath, Path: readinessPath,
Value: spec.Spec.Containers[i].ReadinessProbe, Value: f(spec.Spec.Containers[i].ReadinessProbe),
}, P{ }, P{
Op: "replace", Op: "replace",
Path: livenessPath, Path: livenessPath,
Value: spec.Spec.Containers[i].LivenessProbe, Value: f(spec.Spec.Containers[i].LivenessProbe),
}, P{ }, P{
Op: "replace", Op: "replace",
Path: startupPath, Path: startupPath,
Value: spec.Spec.Containers[i].StartupProbe, Value: f(spec.Spec.Containers[i].StartupProbe),
}) })
} }
return return
} }
func fromPatchToProbe(spec *v1.PodTemplateSpec, patch []P) { func fromPatchToProbe(spec *v1.PodTemplateSpec, path []string, patch []P) {
// 3 = readiness + liveness + startup // 3 = readiness + liveness + startup
if len(patch) != 3*len(spec.Spec.Containers) { if len(patch) != 3*len(spec.Spec.Containers) {
log.Debugf("patch not match container num, not restore") log.Debugf("patch not match container num, not restore")
return return
} }
for i := range spec.Spec.Containers { for i := range spec.Spec.Containers {
ps := patch[i*3 : i*3+3] index := strconv.Itoa(i)
marshal, err := json.Marshal(ps[0].Value) readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/")
if err == nil { livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/")
err = json.Unmarshal(marshal, spec.Spec.Containers[i].ReadinessProbe) startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/")
var f = func(value any) *v1.Probe {
if value == nil {
return nil
} }
str, ok := value.(string)
marshal, err = json.Marshal(ps[1].Value) if ok && str == "" {
if err == nil { return nil
err = json.Unmarshal(marshal, spec.Spec.Containers[i].LivenessProbe) }
if !ok {
marshal, err := k8sjson.Marshal(value)
if err != nil {
log.Error(err)
return nil
}
str = string(marshal)
}
var probe v1.Probe
err := k8sjson.Unmarshal([]byte(str), &probe)
if err != nil {
log.Error(err)
return nil
}
return &probe
}
for _, p := range patch {
switch p.Path {
case readinessPath:
spec.Spec.Containers[i].ReadinessProbe = f(p.Value)
case livenessPath:
spec.Spec.Containers[i].LivenessProbe = f(p.Value)
case startupPath:
spec.Spec.Containers[i].StartupProbe = f(p.Value)
} }
marshal, err = json.Marshal(ps[2].Value)
if err == nil {
err = json.Unmarshal(marshal, spec.Spec.Containers[i].StartupProbe)
} }
} }
} }

View File

@@ -69,7 +69,9 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co
if err != nil { if err != nil {
return err return err
} }
done <- struct{}{} select {
case done <- struct{}{}:
}
// handle incoming connections on reverse forwarded tunnel // handle incoming connections on reverse forwarded tunnel
for { for {
select { select {
@@ -80,10 +82,9 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co
local, err := listen.Accept() local, err := listen.Accept()
if err != nil { if err != nil {
log.Error(err) return err
continue
} }
go func() { go func(local net.Conn) {
defer local.Close() defer local.Close()
var conn net.Conn var conn net.Conn
var err error var err error
@@ -99,7 +100,7 @@ func Main(ctx context.Context, remoteEndpoint, localEndpoint *netip.AddrPort, co
} }
defer conn.Close() defer conn.Close()
handleClient(local, conn) handleClient(local, conn)
}() }(local)
} }
} }

View File

@@ -406,7 +406,7 @@ func RolloutStatus(ctx1 context.Context, factory cmdutil.Factory, namespace, wor
if err != nil { if err != nil {
return false, err return false, err
} }
_, _ = fmt.Fprintf(os.Stdout, "%s", status) log.Infof("%s", status)
// Quit waiting if the rollout is done // Quit waiting if the rollout is done
if done { if done {
return true, nil return true, nil
@@ -752,8 +752,8 @@ func MoveToTemp() {
} }
func Merge[K comparable, V any](fromMap, ToMap map[K]V) map[K]V { func Merge[K comparable, V any](fromMap, ToMap map[K]V) map[K]V {
for keyToMap, valueToMap := range ToMap { for k, v := range ToMap {
fromMap[keyToMap] = valueToMap fromMap[k] = v
} }
if fromMap == nil { if fromMap == nil {
// merge(nil, map[string]interface{...}) -> map[string]interface{...} // merge(nil, map[string]interface{...}) -> map[string]interface{...}