mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-12-24 11:51:13 +08:00
fix: add lock to handle webhook event
This commit is contained in:
@@ -21,7 +21,7 @@ func CmdControlPlane(_ cmdutil.Factory) *cobra.Command {
|
||||
Short: "Control-plane is a envoy xds server",
|
||||
Long: `Control-plane is a envoy xds server, distribute envoy route configuration`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
util.InitLogger(config.Debug)
|
||||
util.InitLoggerForServer(config.Debug)
|
||||
go util.StartupPProf(0)
|
||||
controlplane.Main(watchDirectoryFilename, port, log.StandardLogger())
|
||||
},
|
||||
|
||||
@@ -30,7 +30,7 @@ func CmdServe(_ cmdutil.Factory) *cobra.Command {
|
||||
kubevpn serve -L "tcp://:10800" -L "tun://127.0.0.1:8422?net=223.254.0.123/32"
|
||||
`)),
|
||||
PreRun: func(*cobra.Command, []string) {
|
||||
util.InitLogger(config.Debug)
|
||||
util.InitLoggerForServer(config.Debug)
|
||||
runtime.GOMAXPROCS(0)
|
||||
go util.StartupPProf(0)
|
||||
},
|
||||
|
||||
@@ -18,7 +18,7 @@ After deploying it to Kubernetes cluster, the Administrator needs to create a Mu
|
||||
in the Kubernetes cluster to register remote webhook admission controllers.`,
|
||||
Args: cobra.MaximumNArgs(0),
|
||||
PreRun: func(cmd *cobra.Command, args []string) {
|
||||
util.InitLogger(true)
|
||||
util.InitLoggerForServer(true)
|
||||
go util.StartupPProf(0)
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
|
||||
@@ -118,14 +118,13 @@ func (d *DHCPManager) RentIPBaseNICAddress(ctx context.Context) (*net.IPNet, *ne
|
||||
}
|
||||
|
||||
func (d *DHCPManager) RentIPRandom(ctx context.Context) (*net.IPNet, *net.IPNet, error) {
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
addrs, _ := net.InterfaceAddrs()
|
||||
var isAlreadyExistedFunc = func(ips ...net.IP) bool {
|
||||
for _, addr := range addrs {
|
||||
addrIP, ok := addr.(*net.IPNet)
|
||||
if ok {
|
||||
if addr == nil {
|
||||
continue
|
||||
}
|
||||
if addrIP, ok := addr.(*net.IPNet); ok {
|
||||
for _, ip := range ips {
|
||||
if addrIP.IP.Equal(ip) {
|
||||
return true
|
||||
@@ -136,7 +135,7 @@ func (d *DHCPManager) RentIPRandom(ctx context.Context) (*net.IPNet, *net.IPNet,
|
||||
return false
|
||||
}
|
||||
var v4, v6 net.IP
|
||||
err = d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
|
||||
err := d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
|
||||
for {
|
||||
if v4, err = ipv4.AllocateNext(); err != nil {
|
||||
return err
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
"k8s.io/kubectl/pkg/polymorphichelpers"
|
||||
"k8s.io/kubectl/pkg/util/podutils"
|
||||
"k8s.io/utils/pointer"
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/exchange"
|
||||
@@ -470,16 +471,26 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
|
||||
APIGroups: []string{""},
|
||||
APIVersions: []string{"v1"},
|
||||
Resources: []string{"pods"},
|
||||
Scope: (*admissionv1.ScopeType)(pointer.String(string(admissionv1.NamespacedScope))),
|
||||
Scope: ptr.To(admissionv1.NamespacedScope),
|
||||
},
|
||||
}},
|
||||
FailurePolicy: (*admissionv1.FailurePolicyType)(pointer.String(string(admissionv1.Ignore))),
|
||||
FailurePolicy: ptr.To(admissionv1.Ignore),
|
||||
// same as above label ns
|
||||
NamespaceSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"ns": namespace}},
|
||||
SideEffects: (*admissionv1.SideEffectClass)(pointer.String(string(admissionv1.SideEffectClassNone))),
|
||||
TimeoutSeconds: nil,
|
||||
SideEffects: ptr.To(admissionv1.SideEffectClassNone),
|
||||
TimeoutSeconds: ptr.To[int32](15),
|
||||
AdmissionReviewVersions: []string{"v1", "v1beta1"},
|
||||
ReinvocationPolicy: (*admissionv1.ReinvocationPolicyType)(pointer.String(string(admissionv1.NeverReinvocationPolicy))),
|
||||
ReinvocationPolicy: ptr.To(admissionv1.NeverReinvocationPolicy),
|
||||
/*// needs to enable featureGate=AdmissionWebhookMatchConditions
|
||||
MatchConditions: []admissionv1.MatchCondition{
|
||||
{
|
||||
Name: "",
|
||||
Expression: fmt.Sprintf(
|
||||
"container_name.exists(c, c == '%s') && environment_variable.find(e, e == '%s').exists()",
|
||||
config.ContainerSidecarVPN, config.EnvInboundPodTunIPv4,
|
||||
),
|
||||
},
|
||||
},*/
|
||||
}},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) {
|
||||
|
||||
@@ -14,6 +14,13 @@ func InitLogger(debug bool) {
|
||||
log.SetFormatter(&Format{})
|
||||
}
|
||||
|
||||
func InitLoggerForServer(debug bool) {
|
||||
if debug {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
}
|
||||
log.SetReportCaller(true)
|
||||
}
|
||||
|
||||
type Format struct {
|
||||
log.Formatter
|
||||
}
|
||||
|
||||
@@ -346,3 +346,26 @@ func AllContainerIsRunning(pod *corev1.Pod) bool {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func FindContainerEnv(container *corev1.Container, key string) (value string, found bool) {
|
||||
if container == nil {
|
||||
return
|
||||
}
|
||||
for _, envVar := range container.Env {
|
||||
if envVar.Name == key {
|
||||
value = envVar.Value
|
||||
found = true
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func FindContainerByName(pod *corev1.Pod, name string) (*corev1.Container, int) {
|
||||
for i := range pod.Spec.Containers {
|
||||
if pod.Spec.Containers[i].Name == name {
|
||||
return &pod.Spec.Containers[i], i
|
||||
}
|
||||
}
|
||||
return nil, -1
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ func convertAdmissionResponseToV1beta1(r *v1.AdmissionResponse) *v1beta1.Admissi
|
||||
|
||||
func toV1AdmissionResponse(err error) *v1.AdmissionResponse {
|
||||
return &v1.AdmissionResponse{
|
||||
Allowed: false,
|
||||
Result: &metav1.Status{
|
||||
Message: err.Error(),
|
||||
},
|
||||
|
||||
@@ -7,19 +7,23 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
v1 "k8s.io/api/admission/v1"
|
||||
"k8s.io/api/admission/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
||||
)
|
||||
|
||||
// admissionReviewHandler is a handler to handle business logic, holding an util.Factory
|
||||
type admissionReviewHandler struct {
|
||||
sync.Mutex
|
||||
f cmdutil.Factory
|
||||
clientset *kubernetes.Clientset
|
||||
}
|
||||
@@ -87,22 +91,50 @@ func serve(w http.ResponseWriter, r *http.Request, admit admitHandler) {
|
||||
log.Errorf("Expected v1beta1.AdmissionReview but got: %T", obj)
|
||||
return
|
||||
}
|
||||
responseAdmissionReview := &v1beta1.AdmissionReview{}
|
||||
responseAdmissionReview.SetGroupVersionKind(*gvk)
|
||||
responseAdmissionReview.Response = admit.v1beta1(*requestedAdmissionReview)
|
||||
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
|
||||
responseObj = responseAdmissionReview
|
||||
if ptr.Deref(requestedAdmissionReview.Request.DryRun, false) {
|
||||
log.Info("Ignore dryrun")
|
||||
responseObj = &v1beta1.AdmissionReview{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: gvk.GroupVersion().String(),
|
||||
Kind: gvk.Kind,
|
||||
},
|
||||
Response: &v1beta1.AdmissionResponse{
|
||||
Allowed: true,
|
||||
UID: requestedAdmissionReview.Request.UID,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
responseAdmissionReview := &v1beta1.AdmissionReview{}
|
||||
responseAdmissionReview.SetGroupVersionKind(*gvk)
|
||||
responseAdmissionReview.Response = admit.v1beta1(*requestedAdmissionReview)
|
||||
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
|
||||
responseObj = responseAdmissionReview
|
||||
}
|
||||
case v1.SchemeGroupVersion.WithKind("AdmissionReview"):
|
||||
requestedAdmissionReview, ok := obj.(*v1.AdmissionReview)
|
||||
if !ok {
|
||||
log.Errorf("Expected v1.AdmissionReview but got: %T", obj)
|
||||
return
|
||||
}
|
||||
responseAdmissionReview := &v1.AdmissionReview{}
|
||||
responseAdmissionReview.SetGroupVersionKind(*gvk)
|
||||
responseAdmissionReview.Response = admit.v1(*requestedAdmissionReview)
|
||||
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
|
||||
responseObj = responseAdmissionReview
|
||||
if ptr.Deref(requestedAdmissionReview.Request.DryRun, false) {
|
||||
log.Info("Ignore dryrun")
|
||||
responseObj = &v1.AdmissionReview{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
APIVersion: gvk.GroupVersion().String(),
|
||||
Kind: gvk.Kind,
|
||||
},
|
||||
Response: &v1.AdmissionResponse{
|
||||
Allowed: true,
|
||||
UID: requestedAdmissionReview.Request.UID,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
responseAdmissionReview := &v1.AdmissionReview{}
|
||||
responseAdmissionReview.SetGroupVersionKind(*gvk)
|
||||
responseAdmissionReview.Response = admit.v1(*requestedAdmissionReview)
|
||||
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
|
||||
responseObj = responseAdmissionReview
|
||||
}
|
||||
default:
|
||||
msg := fmt.Sprintf("Unsupported group version kind: %v", gvk)
|
||||
log.Error(msg)
|
||||
@@ -128,21 +160,28 @@ func Main(f cmdutil.Factory) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h := &admissionReviewHandler{f: f, clientset: clientset}
|
||||
http.HandleFunc("/pods", func(w http.ResponseWriter, r *http.Request) {
|
||||
serve(w, r, newDelegateToV1AdmitHandler(h.admitPods))
|
||||
})
|
||||
http.HandleFunc("/readyz", func(w http.ResponseWriter, req *http.Request) {
|
||||
_, _ = w.Write([]byte("ok"))
|
||||
})
|
||||
|
||||
http.HandleFunc("/pods", func(w http.ResponseWriter, r *http.Request) { serve(w, r, newDelegateToV1AdmitHandler(h.admitPods)) })
|
||||
http.HandleFunc("/readyz", func(w http.ResponseWriter, req *http.Request) { _, _ = w.Write([]byte("ok")) })
|
||||
|
||||
s := &dhcpServer{f: f, clientset: clientset}
|
||||
http.HandleFunc(config.APIRentIP, s.rentIP)
|
||||
http.HandleFunc(config.APIReleaseIP, s.releaseIP)
|
||||
svr := &dhcpServer{f: f, clientset: clientset}
|
||||
http.HandleFunc(config.APIRentIP, svr.rentIP)
|
||||
http.HandleFunc(config.APIReleaseIP, svr.releaseIP)
|
||||
|
||||
var pairs []tls.Certificate
|
||||
pairs, err = getSSLKeyPairs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
server := &http.Server{Addr: fmt.Sprintf(":%d", 80), TLSConfig: &tls.Config{Certificates: pairs}}
|
||||
server := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", 80),
|
||||
TLSConfig: &tls.Config{Certificates: pairs},
|
||||
}
|
||||
return server.ListenAndServeTLS("", "")
|
||||
}
|
||||
|
||||
|
||||
@@ -14,12 +14,14 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubectl/pkg/cmd/util/podcmd"
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/handler"
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
|
||||
)
|
||||
|
||||
// only allow pods to pull images from specific registry.
|
||||
// create pod will rent ip and delete pod will release ip
|
||||
func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionResponse {
|
||||
r, _ := json.Marshal(ar)
|
||||
log.Infof("admitting pods called, req: %v", string(r))
|
||||
@@ -32,137 +34,11 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR
|
||||
|
||||
switch ar.Request.Operation {
|
||||
case v1.Create:
|
||||
raw := ar.Request.Object.Raw
|
||||
pod := corev1.Pod{}
|
||||
deserializer := codecs.UniversalDeserializer()
|
||||
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
|
||||
log.Errorf("can not decode into pod, err: %v, raw: %s", err, string(raw))
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
return h.handleCreate(ar)
|
||||
|
||||
from, err := json.Marshal(pod)
|
||||
if err != nil {
|
||||
log.Errorf("can not marshal into pod, err: %v", err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
var found bool
|
||||
out:
|
||||
for i := 0; i < len(pod.Spec.Containers); i++ {
|
||||
if pod.Spec.Containers[i].Name == config.ContainerSidecarVPN {
|
||||
var v4, v6 *net.IPNet
|
||||
for j := 0; j < len(pod.Spec.Containers[i].Env); j++ {
|
||||
pair := pod.Spec.Containers[i].Env[j]
|
||||
if pair.Name == config.EnvInboundPodTunIPv4 {
|
||||
if x, _, _ := net.ParseCIDR(pair.Value); config.RouterIP.Equal(x) {
|
||||
break out
|
||||
}
|
||||
found = true
|
||||
cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
|
||||
dhcp := handler.NewDHCPManager(cmi, ar.Request.Namespace)
|
||||
// remove old values
|
||||
if pair.Value != "" {
|
||||
var ips []net.IP
|
||||
for k := 0; k < len(pod.Spec.Containers[i].Env); k++ {
|
||||
envVar := pod.Spec.Containers[i].Env[k]
|
||||
if sets.New[string](config.EnvInboundPodTunIPv4, config.EnvInboundPodTunIPv6).Has(envVar.Name) && envVar.Value != "" {
|
||||
if ip, _, _ := net.ParseCIDR(envVar.Value); ip != nil {
|
||||
ips = append(ips, ip)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = dhcp.ReleaseIP(context.Background(), ips...)
|
||||
}
|
||||
v4, v6, err = dhcp.RentIPRandom(context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("rent ip random failed, err: %v", err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
var name string
|
||||
if accessor, errT := meta.Accessor(ar.Request.Object); errT == nil {
|
||||
name = accessor.GetName()
|
||||
}
|
||||
log.Infof("rent ipv4: %s ipv6: %s for pod %s in namespace: %s", v4.String(), v6.String(), name, ar.Request.Namespace)
|
||||
}
|
||||
}
|
||||
for j := 0; j < len(pod.Spec.Containers[i].Env); j++ {
|
||||
pair := pod.Spec.Containers[i].Env[j]
|
||||
if pair.Name == config.EnvInboundPodTunIPv4 && v4 != nil {
|
||||
pod.Spec.Containers[i].Env[j].Value = v4.String()
|
||||
}
|
||||
if pair.Name == config.EnvInboundPodTunIPv6 && v6 != nil {
|
||||
pod.Spec.Containers[i].Env[j].Value = v6.String()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if found {
|
||||
var to []byte
|
||||
to, err = json.Marshal(pod)
|
||||
if err != nil {
|
||||
log.Errorf("can not marshal pod, err: %v", err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
var patch []jsonpatch.JsonPatchOperation
|
||||
patch, err = jsonpatch.CreatePatch(from, to)
|
||||
if err != nil {
|
||||
log.Errorf("can not create patch json, err: %v", err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
var marshal []byte
|
||||
marshal, err = json.Marshal(patch)
|
||||
if err != nil {
|
||||
log.Errorf("can not marshal json patch %v, err: %v", patch, err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
return applyPodPatch(
|
||||
ar,
|
||||
func(pod *corev1.Pod) bool {
|
||||
name, _ := podcmd.FindContainerByName(pod, config.ContainerSidecarVPN)
|
||||
return name != nil
|
||||
},
|
||||
string(marshal),
|
||||
)
|
||||
}
|
||||
return &v1.AdmissionResponse{
|
||||
UID: ar.Request.UID,
|
||||
Allowed: true,
|
||||
}
|
||||
case v1.Delete:
|
||||
raw := ar.Request.OldObject.Raw
|
||||
pod := corev1.Pod{}
|
||||
deserializer := codecs.UniversalDeserializer()
|
||||
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
|
||||
log.Errorf("can not decode into pod, err: %v, raw: %s", err, string(raw))
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
return h.handleDelete(ar)
|
||||
|
||||
container, _ := podcmd.FindContainerByName(&pod, config.ContainerSidecarVPN)
|
||||
if container != nil {
|
||||
var ips []net.IP
|
||||
for _, envVar := range container.Env {
|
||||
if x, _, _ := net.ParseCIDR(envVar.Value); config.RouterIP.Equal(x) {
|
||||
break
|
||||
}
|
||||
if envVar.Name == config.EnvInboundPodTunIPv4 || envVar.Name == config.EnvInboundPodTunIPv6 {
|
||||
if ip, _, err := net.ParseCIDR(envVar.Value); err == nil {
|
||||
ips = append(ips, ip)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(ips) != 0 {
|
||||
cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
|
||||
err := handler.NewDHCPManager(cmi, ar.Request.Namespace).
|
||||
ReleaseIP(context.Background(), ips...)
|
||||
if err != nil {
|
||||
log.Errorf("release ip to dhcp err: %v, ips: %v", err, ips)
|
||||
} else {
|
||||
log.Errorf("release ip to dhcp ok, ip: %v", ips)
|
||||
}
|
||||
}
|
||||
}
|
||||
return &v1.AdmissionResponse{
|
||||
Allowed: true,
|
||||
}
|
||||
default:
|
||||
err := fmt.Errorf("expect operation is %s or %s, not %s", v1.Create, v1.Delete, ar.Request.Operation)
|
||||
log.Error(err)
|
||||
@@ -170,13 +46,158 @@ func (h *admissionReviewHandler) admitPods(ar v1.AdmissionReview) *v1.AdmissionR
|
||||
}
|
||||
}
|
||||
|
||||
// handle create pod event
|
||||
func (h *admissionReviewHandler) handleCreate(ar v1.AdmissionReview) *v1.AdmissionResponse {
|
||||
raw := ar.Request.Object.Raw
|
||||
pod := corev1.Pod{}
|
||||
deserializer := codecs.UniversalDeserializer()
|
||||
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
|
||||
log.Errorf("can not decode into pod, err: %v, raw: %s", err, string(raw))
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
|
||||
from, err := json.Marshal(pod)
|
||||
if err != nil {
|
||||
log.Errorf("can not marshal into pod, err: %v", err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
|
||||
// 1) pre-check
|
||||
container, index := util.FindContainerByName(&pod, config.ContainerSidecarVPN)
|
||||
if container == nil {
|
||||
return &v1.AdmissionResponse{UID: ar.Request.UID, Allowed: true}
|
||||
}
|
||||
value, ok := util.FindContainerEnv(container, config.EnvInboundPodTunIPv4)
|
||||
if !ok {
|
||||
return &v1.AdmissionResponse{UID: ar.Request.UID, Allowed: true}
|
||||
}
|
||||
// if create pod kubevpn-traffic-manager, just ignore it
|
||||
// because 223.254.0.100 is reserved
|
||||
if x, _, _ := net.ParseCIDR(value); config.RouterIP.Equal(x) {
|
||||
return &v1.AdmissionResponse{UID: ar.Request.UID, Allowed: true}
|
||||
}
|
||||
|
||||
// 2) release old ip
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
|
||||
dhcp := handler.NewDHCPManager(cmi, ar.Request.Namespace)
|
||||
var ips []net.IP
|
||||
for k := 0; k < len(container.Env); k++ {
|
||||
envVar := container.Env[k]
|
||||
if sets.New[string](config.EnvInboundPodTunIPv4, config.EnvInboundPodTunIPv6).Has(envVar.Name) && envVar.Value != "" {
|
||||
if ip, _, _ := net.ParseCIDR(envVar.Value); ip != nil {
|
||||
ips = append(ips, ip)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = dhcp.ReleaseIP(context.Background(), ips...)
|
||||
|
||||
// 3) rent new ip
|
||||
var v4, v6 *net.IPNet
|
||||
v4, v6, err = dhcp.RentIPRandom(context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("rent ip random failed, err: %v", err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
var name string
|
||||
if accessor, errT := meta.Accessor(ar.Request.Object); errT == nil {
|
||||
name = accessor.GetName()
|
||||
}
|
||||
log.Infof("rent ipv4: %s ipv6: %s for pod %s in namespace: %s", v4.String(), v6.String(), name, ar.Request.Namespace)
|
||||
|
||||
//4) update spec
|
||||
for j := 0; j < len(pod.Spec.Containers[index].Env); j++ {
|
||||
pair := pod.Spec.Containers[index].Env[j]
|
||||
if pair.Name == config.EnvInboundPodTunIPv4 && v4 != nil {
|
||||
pod.Spec.Containers[index].Env[j].Value = v4.String()
|
||||
}
|
||||
if pair.Name == config.EnvInboundPodTunIPv6 && v6 != nil {
|
||||
pod.Spec.Containers[index].Env[j].Value = v6.String()
|
||||
}
|
||||
}
|
||||
|
||||
// 5) generate patch and apply patch
|
||||
var to []byte
|
||||
to, err = json.Marshal(pod)
|
||||
if err != nil {
|
||||
log.Errorf("can not marshal pod, err: %v", err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
var patch []jsonpatch.JsonPatchOperation
|
||||
patch, err = jsonpatch.CreatePatch(from, to)
|
||||
if err != nil {
|
||||
log.Errorf("can not create patch json, err: %v", err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
var marshal []byte
|
||||
marshal, err = json.Marshal(patch)
|
||||
if err != nil {
|
||||
log.Errorf("can not marshal json patch %v, err: %v", patch, err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
var shouldPatchPod = func(pod *corev1.Pod) bool {
|
||||
namedContainer, _ := podcmd.FindContainerByName(pod, config.ContainerSidecarVPN)
|
||||
return namedContainer != nil
|
||||
}
|
||||
return applyPodPatch(ar, shouldPatchPod, string(marshal))
|
||||
}
|
||||
|
||||
// handle delete pod event
|
||||
func (h *admissionReviewHandler) handleDelete(ar v1.AdmissionReview) *v1.AdmissionResponse {
|
||||
raw := ar.Request.OldObject.Raw
|
||||
pod := corev1.Pod{}
|
||||
deserializer := codecs.UniversalDeserializer()
|
||||
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
|
||||
log.Errorf("can not decode into pod, err: %v, raw: %s", err, string(raw))
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
|
||||
// 1) pre-check
|
||||
container, _ := util.FindContainerByName(&pod, config.ContainerSidecarVPN)
|
||||
if container == nil {
|
||||
return &v1.AdmissionResponse{Allowed: true}
|
||||
}
|
||||
value, ok := util.FindContainerEnv(container, config.EnvInboundPodTunIPv4)
|
||||
if !ok {
|
||||
return &v1.AdmissionResponse{Allowed: true}
|
||||
}
|
||||
// if delete pod kubevpn-traffic-manager, just ignore it
|
||||
// because 223.254.0.100 is reserved
|
||||
if x, _, _ := net.ParseCIDR(value); config.RouterIP.Equal(x) {
|
||||
return &v1.AdmissionResponse{Allowed: true}
|
||||
}
|
||||
|
||||
// 2) release ip
|
||||
var ips []net.IP
|
||||
for _, envVar := range container.Env {
|
||||
if envVar.Name == config.EnvInboundPodTunIPv4 || envVar.Name == config.EnvInboundPodTunIPv6 {
|
||||
if ip, _, err := net.ParseCIDR(envVar.Value); err == nil {
|
||||
ips = append(ips, ip)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(ips) != 0 {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
|
||||
err := handler.NewDHCPManager(cmi, ar.Request.Namespace).ReleaseIP(context.Background(), ips...)
|
||||
if err != nil {
|
||||
log.Errorf("release ip to dhcp err: %v, ips: %v", err, ips)
|
||||
} else {
|
||||
log.Errorf("release ip to dhcp ok, ip: %v", ips)
|
||||
}
|
||||
}
|
||||
return &v1.AdmissionResponse{Allowed: true}
|
||||
}
|
||||
|
||||
func applyPodPatch(ar v1.AdmissionReview, shouldPatchPod func(*corev1.Pod) bool, patch string) *v1.AdmissionResponse {
|
||||
r, _ := json.Marshal(ar)
|
||||
log.Infof("mutating pods called, req: %s", string(r))
|
||||
log.Infof("apply pod patch: %s", patch)
|
||||
podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
|
||||
if ar.Request.Resource != podResource {
|
||||
log.Errorf("expect resource to be %s but real is %s", podResource, ar.Request.Resource)
|
||||
return nil
|
||||
err := fmt.Errorf("expect resource to be %s but real %s", podResource, ar.Request.Resource)
|
||||
log.Error(err)
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
|
||||
raw := ar.Request.Object.Raw
|
||||
@@ -186,12 +207,10 @@ func applyPodPatch(ar v1.AdmissionReview, shouldPatchPod func(*corev1.Pod) bool,
|
||||
log.Errorf("can not decode request into pod, err: %v, req: %s", err, string(raw))
|
||||
return toV1AdmissionResponse(err)
|
||||
}
|
||||
reviewResponse := v1.AdmissionResponse{}
|
||||
reviewResponse.Allowed = true
|
||||
reviewResponse := v1.AdmissionResponse{Allowed: true}
|
||||
if shouldPatchPod(&pod) {
|
||||
reviewResponse.Patch = []byte(patch)
|
||||
pt := v1.PatchTypeJSONPatch
|
||||
reviewResponse.PatchType = &pt
|
||||
reviewResponse.PatchType = ptr.To(v1.PatchTypeJSONPatch)
|
||||
}
|
||||
return &reviewResponse
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user