refactor: refactor DHCP logic (#298)

This commit is contained in:
naison
2024-07-19 22:07:35 +08:00
committed by GitHub
parent 78de74bf08
commit bc7d205695
37 changed files with 952 additions and 1131 deletions

View File

@@ -19,7 +19,6 @@ const (
KeyDHCP6 = "DHCP6" KeyDHCP6 = "DHCP6"
KeyEnvoy = "ENVOY_CONFIG" KeyEnvoy = "ENVOY_CONFIG"
KeyClusterIPv4POOLS = "IPv4_POOLS" KeyClusterIPv4POOLS = "IPv4_POOLS"
KeyRefCount = "REF_COUNT"
// secret keys // secret keys
// TLSCertKey is the key for tls certificates in a TLS secret. // TLSCertKey is the key for tls certificates in a TLS secret.
@@ -70,15 +69,9 @@ const (
EnvPodNamespace = "POD_NAMESPACE" EnvPodNamespace = "POD_NAMESPACE"
// header name // header name
HeaderPodName = "POD_NAME"
HeaderPodNamespace = "POD_NAMESPACE"
HeaderIPv4 = "IPv4" HeaderIPv4 = "IPv4"
HeaderIPv6 = "IPv6" HeaderIPv6 = "IPv6"
// api
APIRentIP = "/rent/ip"
APIReleaseIP = "/release/ip"
KUBECONFIG = "kubeconfig" KUBECONFIG = "kubeconfig"
// labels // labels
@@ -89,8 +82,6 @@ const (
SudoPProfPort = 33345 SudoPProfPort = 33345
PProfDir = "pprof" PProfDir = "pprof"
// startup by KubeVPN
EnvStartSudoKubeVPNByKubeVPN = "DEPTH_SIGNED_BY_NAISON"
EnvSSHJump = "SSH_JUMP_BY_KUBEVPN" EnvSSHJump = "SSH_JUMP_BY_KUBEVPN"
// transport mode // transport mode

View File

@@ -83,7 +83,7 @@ func (svr *Server) ConnectFork(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectF
if err != nil { if err != nil {
return err return err
} }
_, err = connect.RentInnerIP(ctx) err = connect.GetIPFromContext(ctx)
if err != nil { if err != nil {
return err return err
} }
@@ -162,7 +162,7 @@ func (svr *Server) redirectConnectForkToSudoDaemon(req *rpc.ConnectRequest, resp
} }
} }
ctx, err := connect.RentInnerIP(resp.Context()) ctx, err := connect.RentIP(resp.Context())
if err != nil { if err != nil {
return err return err
} }

View File

@@ -101,7 +101,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
if err != nil { if err != nil {
return err return err
} }
_, err = svr.connect.RentInnerIP(ctx) err = svr.connect.GetIPFromContext(ctx)
if err != nil { if err != nil {
return err return err
} }
@@ -179,7 +179,7 @@ func (svr *Server) redirectToSudoDaemon(req *rpc.ConnectRequest, resp rpc.Daemon
} }
} }
ctx, err := connect.RentInnerIP(resp.Context()) ctx, err := connect.RentIP(resp.Context())
if err != nil { if err != nil {
return err return err
} }

View File

@@ -7,7 +7,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/handler" "github.com/wencaiwulue/kubevpn/v2/pkg/inject"
) )
func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) error { func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) error {
@@ -29,7 +29,7 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err
v4, _ := svr.connect.GetLocalTunIP() v4, _ := svr.connect.GetLocalTunIP()
for _, workload := range req.GetWorkloads() { for _, workload := range req.GetWorkloads() {
// add rollback func to remove envoy config // add rollback func to remove envoy config
err := handler.UnPatchContainer(factory, maps, namespace, workload, v4) err := inject.UnPatchContainer(factory, maps, namespace, workload, v4)
if err != nil { if err != nil {
log.Errorf("leave workload %s failed: %v", workload, err) log.Errorf("leave workload %s failed: %v", workload, err)
continue continue

View File

@@ -88,8 +88,8 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) (
log.Infof("already connect to cluster") log.Infof("already connect to cluster")
} else { } else {
log.Infof("try to disconnect from another cluster") log.Infof("try to disconnect from another cluster")
var disconnect rpc.Daemon_DisconnectClient var disconnectResp rpc.Daemon_DisconnectClient
disconnect, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{ disconnectResp, err = daemonClient.Disconnect(ctx, &rpc.DisconnectRequest{
KubeconfigBytes: ptr.To(req.KubeconfigBytes), KubeconfigBytes: ptr.To(req.KubeconfigBytes),
Namespace: ptr.To(connect.Namespace), Namespace: ptr.To(connect.Namespace),
SshJump: sshConf.ToRPC(), SshJump: sshConf.ToRPC(),
@@ -99,7 +99,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) (
} }
var recv *rpc.DisconnectResponse var recv *rpc.DisconnectResponse
for { for {
recv, err = disconnect.Recv() recv, err = disconnectResp.Recv()
if err == io.EOF { if err == io.EOF {
break break
} else if err != nil { } else if err != nil {

View File

@@ -89,7 +89,7 @@ func (w *wsHandler) handle(c context.Context) {
w.Log("Port map error: %v", err) w.Log("Port map error: %v", err)
return return
} }
cmd := fmt.Sprintf(`export %s=%s && kubevpn ssh-daemon --client-ip %s`, config.EnvStartSudoKubeVPNByKubeVPN, "true", clientIP.String()) cmd := fmt.Sprintf(`kubevpn ssh-daemon --client-ip %s`, clientIP.String())
serverIP, stderr, err := util.RemoteRun(cli, cmd, nil) serverIP, stderr, err := util.RemoteRun(cli, cmd, nil)
if err != nil { if err != nil {
log.Errorf("run error: %v", err) log.Errorf("run error: %v", err)
@@ -145,7 +145,7 @@ func (w *wsHandler) handle(c context.Context) {
// startup daemon process if daemon process not start // startup daemon process if daemon process not start
func startDaemonProcess(cli *ssh.Client) string { func startDaemonProcess(cli *ssh.Client) string {
startDaemonCmd := fmt.Sprintf(`export %s=%s && kubevpn status > /dev/null 2>&1 &`, config.EnvStartSudoKubeVPNByKubeVPN, "true") startDaemonCmd := fmt.Sprintf(`kubevpn status > /dev/null 2>&1 &`)
_, _, _ = util.RemoteRun(cli, startDaemonCmd, nil) _, _, _ = util.RemoteRun(cli, startDaemonCmd, nil)
output, _, err := util.RemoteRun(cli, "kubevpn version", nil) output, _, err := util.RemoteRun(cli, "kubevpn version", nil)
if err != nil { if err != nil {

View File

@@ -417,7 +417,7 @@ func createConnectContainer(noProxy bool, connect handler.ConnectOptions, path s
AttachStderr: false, AttachStderr: false,
ExposedPorts: set, ExposedPorts: set,
StdinOnce: false, StdinOnce: false,
Env: []string{fmt.Sprintf("%s=1", config.EnvStartSudoKubeVPNByKubeVPN)}, Env: []string{},
Cmd: []string{}, Cmd: []string{},
Healthcheck: nil, Healthcheck: nil,
ArgsEscaped: false, ArgsEscaped: false,

272
pkg/dhcp/dhcp.go Normal file
View File

@@ -0,0 +1,272 @@
package dhcp
import (
"context"
"encoding/base64"
"fmt"
"net"
"github.com/cilium/ipam/service/allocator"
"github.com/cilium/ipam/service/ipallocator"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
type Manager struct {
client corev1.ConfigMapInterface
cidr *net.IPNet
cidr6 *net.IPNet
namespace string
clusterID types.UID
}
func NewDHCPManager(client corev1.ConfigMapInterface, namespace string) *Manager {
return &Manager{
client: client,
namespace: namespace,
cidr: &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask},
cidr6: &net.IPNet{IP: config.RouterIP6, Mask: config.CIDR6.Mask},
}
}
// InitDHCP
// TODO optimize dhcp, using mac address, ip and deadline as unit
func (m *Manager) InitDHCP(ctx context.Context) error {
cm, err := m.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get configmap %s, err: %v", config.ConfigMapPodTrafficManager, err)
}
if err == nil {
m.clusterID = util.GetClusterIDByCM(cm)
return nil
}
cm = &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: m.namespace,
Labels: map[string]string{},
},
Data: map[string]string{
config.KeyEnvoy: "",
config.KeyDHCP: "",
config.KeyDHCP6: "",
config.KeyClusterIPv4POOLS: "",
},
}
cm, err = m.client.Create(ctx, cm, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create dhcp error, err: %v", err)
}
m.clusterID = util.GetClusterIDByCM(cm)
return nil
}
func (m *Manager) RentIP(ctx context.Context) (*net.IPNet, *net.IPNet, error) {
addrs, _ := net.InterfaceAddrs()
var isAlreadyExistedFunc = func(ip net.IP) bool {
for _, addr := range addrs {
if addr == nil {
continue
}
if addrIP, ok := addr.(*net.IPNet); ok {
if addrIP.IP.Equal(ip) {
return true
}
}
}
return false
}
var v4, v6 net.IP
err := m.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
for {
if v4, err = ipv4.AllocateNext(); err != nil {
return err
}
if !isAlreadyExistedFunc(v4) {
break
}
}
for {
if v6, err = ipv6.AllocateNext(); err != nil {
return err
}
if !isAlreadyExistedFunc(v6) {
break
}
}
return
})
if err != nil {
log.Errorf("failed to rent ip from DHCP server, err: %v", err)
return nil, nil, err
}
return &net.IPNet{IP: v4, Mask: m.cidr.Mask}, &net.IPNet{IP: v6, Mask: m.cidr6.Mask}, nil
}
func (m *Manager) ReleaseIP(ctx context.Context, ips ...net.IP) error {
if len(ips) == 0 {
return nil
}
return m.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error {
for _, ip := range ips {
var err error
if ip.To4() != nil {
err = ipv4.Release(ip)
} else {
err = ipv6.Release(ip)
}
if err != nil {
return err
}
}
return nil
})
}
func (m *Manager) updateDHCPConfigMap(ctx context.Context, f func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error) error {
cm, err := m.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get configmap DHCP server, err: %v", err)
}
if cm.Data == nil {
return fmt.Errorf("configmap is empty")
}
var dhcp *ipallocator.Range
dhcp, err = ipallocator.NewAllocatorCIDRRange(m.cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
if str := cm.Data[config.KeyDHCP]; str != "" {
var b []byte
if b, err = base64.StdEncoding.DecodeString(str); err != nil {
return err
}
if err = dhcp.Restore(m.cidr, b); err != nil {
return err
}
}
var dhcp6 *ipallocator.Range
dhcp6, err = ipallocator.NewAllocatorCIDRRange(m.cidr6, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
if str := cm.Data[config.KeyDHCP6]; str != "" {
var b []byte
if b, err = base64.StdEncoding.DecodeString(str); err != nil {
return err
}
if err = dhcp6.Restore(m.cidr6, b); err != nil {
return err
}
}
if err = f(dhcp, dhcp6); err != nil {
return err
}
var bytes []byte
if _, bytes, err = dhcp.Snapshot(); err != nil {
return err
}
cm.Data[config.KeyDHCP] = base64.StdEncoding.EncodeToString(bytes)
if _, bytes, err = dhcp6.Snapshot(); err != nil {
return err
}
cm.Data[config.KeyDHCP6] = base64.StdEncoding.EncodeToString(bytes)
_, err = m.client.Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update dhcp failed, err: %v", err)
}
return nil
}
func (m *Manager) Set(ctx context.Context, key, value string) error {
err := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
p := []byte(fmt.Sprintf(`[{"op": "replace", "path": "/data/%s", "value": "%s"}]`, key, value))
_, err := m.client.Patch(ctx, config.ConfigMapPodTrafficManager, types.JSONPatchType, p, metav1.PatchOptions{})
return err
})
if err != nil {
log.Errorf("update configmap failed, err: %v", err)
return err
}
return nil
}
func (m *Manager) Get(ctx context.Context, key string) (string, error) {
cm, err := m.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return "", err
}
if cm != nil && cm.Data != nil {
if v, ok := cm.Data[key]; ok {
return v, nil
}
}
return "", fmt.Errorf("can not get data")
}
func (m *Manager) ForEach(ctx context.Context, fnv4 func(net.IP), fnv6 func(net.IP)) error {
cm, err := m.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get cm DHCP server, err: %v", err)
}
if cm.Data == nil {
cm.Data = make(map[string]string)
}
var dhcp *ipallocator.Range
dhcp, err = ipallocator.NewAllocatorCIDRRange(m.cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
var str []byte
str, err = base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP])
if err == nil {
err = dhcp.Restore(m.cidr, str)
if err != nil {
return err
}
}
dhcp.ForEach(fnv4)
var dhcp6 *ipallocator.Range
dhcp6, err = ipallocator.NewAllocatorCIDRRange(m.cidr6, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
str, err = base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP6])
if err == nil {
err = dhcp6.Restore(m.cidr6, str)
if err != nil {
return err
}
}
dhcp6.ForEach(fnv6)
return nil
}
func (m *Manager) GetClusterID() types.UID {
return m.clusterID
}

69
pkg/dhcp/server.go Normal file
View File

@@ -0,0 +1,69 @@
package dhcp
import (
"context"
"net"
"sync"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"github.com/wencaiwulue/kubevpn/v2/pkg/dhcp/rpc"
)
type Server struct {
rpc.UnimplementedDHCPServer
sync.Mutex
clientset *kubernetes.Clientset
}
func NewServer(clientset *kubernetes.Clientset) *Server {
return &Server{
clientset: clientset,
}
}
func (s *Server) RentIP(ctx context.Context, req *rpc.RentIPRequest) (*rpc.RentIPResponse, error) {
s.Lock()
defer s.Unlock()
log.Infof("handling rent ip request, pod name: %s, ns: %s", req.PodName, req.PodNamespace)
cmi := s.clientset.CoreV1().ConfigMaps(req.PodNamespace)
manager := NewDHCPManager(cmi, req.PodNamespace)
v4, v6, err := manager.RentIP(ctx)
if err != nil {
log.Errorf("rent ip failed, err: %v", err)
return nil, err
}
// todo patch annotation
resp := &rpc.RentIPResponse{
IPv4CIDR: v4.String(),
IPv6CIDR: v6.String(),
}
return resp, nil
}
func (s *Server) ReleaseIP(ctx context.Context, req *rpc.ReleaseIPRequest) (*rpc.ReleaseIPResponse, error) {
s.Lock()
defer s.Unlock()
log.Infof("handling release ip request, pod name: %s, ns: %s, ipv4: %s, ipv6: %s", req.PodName, req.PodNamespace, req.IPv4CIDR, req.IPv6CIDR)
var ips []net.IP
for _, ipStr := range []string{req.IPv4CIDR, req.IPv6CIDR} {
ip, _, err := net.ParseCIDR(ipStr)
if err != nil {
log.Errorf("ip is invailed, ip: %s, err: %v", ipStr, err)
continue
}
ips = append(ips, ip)
}
cmi := s.clientset.CoreV1().ConfigMaps(req.PodNamespace)
manager := NewDHCPManager(cmi, req.PodNamespace)
if err := manager.ReleaseIP(ctx, ips...); err != nil {
log.Errorf("release ip failed, err: %v", err)
return nil, err
}
return &rpc.ReleaseIPResponse{}, nil
}

51
pkg/dhcp/server_test.go Normal file
View File

@@ -0,0 +1,51 @@
package dhcp
import (
"encoding/base64"
"net"
"testing"
"github.com/cilium/ipam/service/allocator"
"github.com/cilium/ipam/service/ipallocator"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)
func TestName(t *testing.T) {
cidr := &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
dhcp, err := ipallocator.NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
t.Fatal(err)
}
s := "Aw=="
var str []byte
str, err = base64.StdEncoding.DecodeString(s)
if err != nil {
t.Fatal(err)
}
err = dhcp.Restore(cidr, str)
if err != nil {
t.Fatal(err)
}
next, err := dhcp.AllocateNext()
if err != nil {
t.Fatal(err)
}
t.Log(next.String())
_, bytes, _ := dhcp.Snapshot()
t.Log(string(bytes))
}
func TestInit(t *testing.T) {
cidr := &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
dhcp, err := ipallocator.NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
t.Fatal(err)
}
snapshot, bytes, err := dhcp.Snapshot()
t.Log(string(snapshot), string(bytes), err)
}

View File

@@ -33,9 +33,6 @@ var resolv = "/etc/resolv.conf"
// service.namespace.svc.cluster.local:port // service.namespace.svc.cluster.local:port
func (c *Config) SetupDNS(ctx context.Context) error { func (c *Config) SetupDNS(ctx context.Context) error {
c.usingResolver(ctx) c.usingResolver(ctx)
_ = exec.Command("killall", "mDNSResponderHelper").Run()
_ = exec.Command("killall", "-HUP", "mDNSResponder").Run()
_ = exec.Command("dscacheutil", "-flushcache").Run()
return nil return nil
} }

View File

@@ -6,18 +6,13 @@ import (
"net" "net"
"os" "os"
"os/signal" "os/signal"
"strconv"
"syscall" "syscall"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/config"
@@ -39,6 +34,8 @@ func (c *ConnectOptions) Cleanup() {
if c == nil { if c == nil {
return return
} }
c.once.Do(func() {
log.Info("prepare to exit, cleaning up") log.Info("prepare to exit, cleaning up")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel() defer cancel()
@@ -57,17 +54,6 @@ func (c *ConnectOptions) Cleanup() {
} }
if c.clientset != nil { if c.clientset != nil {
_ = c.clientset.CoreV1().Pods(c.Namespace).Delete(ctx, config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)}) _ = c.clientset.CoreV1().Pods(c.Namespace).Delete(ctx, config.CniNetName, v1.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)})
count, err := updateRefCount(ctx, c.clientset.CoreV1().ConfigMaps(c.Namespace), config.ConfigMapPodTrafficManager, -1)
// only if ref is zero and deployment is not ready, needs to clean up
if err == nil && count <= 0 {
deployment, errs := c.clientset.AppsV1().Deployments(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, v1.GetOptions{})
if errs == nil && deployment.Status.UnavailableReplicas != 0 {
cleanupK8sResource(ctx, c.clientset, c.Namespace, config.ConfigMapPodTrafficManager, true)
}
}
if err != nil {
log.Errorf("can not update ref-count: %v", err)
}
} }
// leave proxy resources // leave proxy resources
err := c.LeaveProxyResources(ctx) err := c.LeaveProxyResources(ctx)
@@ -77,7 +63,7 @@ func (c *ConnectOptions) Cleanup() {
for _, function := range c.getRolloutFunc() { for _, function := range c.getRolloutFunc() {
if function != nil { if function != nil {
if err := function(); err != nil { if err = function(); err != nil {
log.Warningf("rollout function error: %v", err) log.Warningf("rollout function error: %v", err)
} }
} }
@@ -89,65 +75,7 @@ func (c *ConnectOptions) Cleanup() {
log.Infof("cleanup dns") log.Infof("cleanup dns")
c.dnsConfig.CancelDNS() c.dnsConfig.CancelDNS()
} }
}
// vendor/k8s.io/kubectl/pkg/polymorphichelpers/rollback.go:99
func updateRefCount(ctx context.Context, configMapInterface v12.ConfigMapInterface, name string, increment int) (current int, err error) {
err = retry.OnError(
retry.DefaultRetry,
func(err error) bool {
notFound := k8serrors.IsNotFound(err)
if notFound {
return false
}
conflict := k8serrors.IsConflict(err)
if conflict {
return true
}
return false
},
func() (err error) {
var cm *corev1.ConfigMap
cm, err = configMapInterface.Get(ctx, name, v1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return err
}
err = fmt.Errorf("update ref-count failed, increment: %d, error: %v", increment, err)
return
}
curCount, _ := strconv.Atoi(cm.Data[config.KeyRefCount])
var newVal = curCount + increment
if newVal < 0 {
newVal = 0
}
p := []byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyRefCount, strconv.Itoa(newVal)))
_, err = configMapInterface.Patch(ctx, name, types.MergePatchType, p, v1.PatchOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
return err
}
err = fmt.Errorf("update ref count error, error: %v", err)
return
}
return
}) })
if err != nil {
log.Errorf("update ref count error, increment: %d, error: %v", increment, err)
return
}
log.Info("update ref count successfully")
var cm *corev1.ConfigMap
cm, err = configMapInterface.Get(ctx, name, v1.GetOptions{})
if err != nil {
err = fmt.Errorf("failed to get cm: %s, err: %v", name, err)
return
}
current, err = strconv.Atoi(cm.Data[config.KeyRefCount])
if err != nil {
err = fmt.Errorf("failed to get ref-count, err: %v", err)
}
return
} }
func cleanupK8sResource(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string, keepCIDR bool) { func cleanupK8sResource(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string, keepCIDR bool) {
@@ -157,8 +85,6 @@ func cleanupK8sResource(ctx context.Context, clientset *kubernetes.Clientset, na
// keep configmap // keep configmap
p := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/data/%s"},{"op": "remove", "path": "/data/%s"}]`, config.KeyDHCP, config.KeyDHCP6)) p := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/data/%s"},{"op": "remove", "path": "/data/%s"}]`, config.KeyDHCP, config.KeyDHCP6))
_, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx, name, types.JSONPatchType, p, v1.PatchOptions{}) _, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx, name, types.JSONPatchType, p, v1.PatchOptions{})
p = []byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyRefCount, strconv.Itoa(0)))
_, _ = clientset.CoreV1().ConfigMaps(namespace).Patch(ctx, name, types.MergePatchType, p, v1.PatchOptions{})
} else { } else {
_ = clientset.CoreV1().ConfigMaps(namespace).Delete(ctx, name, options) _ = clientset.CoreV1().ConfigMaps(namespace).Delete(ctx, name, options)
} }

View File

@@ -294,10 +294,7 @@ func (d *CloneOptions) DoClone(ctx context.Context, kubeconfigJsonBytes []byte)
"--engine", string(d.Engine), "--engine", string(d.Engine),
"--foreground", "--foreground",
}, args...), }, args...),
Env: []v1.EnvVar{{ Env: []v1.EnvVar{},
Name: config.EnvStartSudoKubeVPNByKubeVPN,
Value: "1",
}},
Resources: v1.ResourceRequirements{ Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{ Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1000m"), v1.ResourceCPU: resource.MustParse("1000m"),

View File

@@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
pkgruntime "k8s.io/apimachinery/pkg/runtime" pkgruntime "k8s.io/apimachinery/pkg/runtime"
pkgtypes "k8s.io/apimachinery/pkg/types" pkgtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
@@ -48,8 +47,10 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/core" "github.com/wencaiwulue/kubevpn/v2/pkg/core"
"github.com/wencaiwulue/kubevpn/v2/pkg/dhcp"
"github.com/wencaiwulue/kubevpn/v2/pkg/dns" "github.com/wencaiwulue/kubevpn/v2/pkg/dns"
"github.com/wencaiwulue/kubevpn/v2/pkg/driver" "github.com/wencaiwulue/kubevpn/v2/pkg/driver"
"github.com/wencaiwulue/kubevpn/v2/pkg/inject"
"github.com/wencaiwulue/kubevpn/v2/pkg/tun" "github.com/wencaiwulue/kubevpn/v2/pkg/tun"
"github.com/wencaiwulue/kubevpn/v2/pkg/util" "github.com/wencaiwulue/kubevpn/v2/pkg/util"
) )
@@ -73,7 +74,7 @@ type ConnectOptions struct {
config *rest.Config config *rest.Config
factory cmdutil.Factory factory cmdutil.Factory
cidrs []*net.IPNet cidrs []*net.IPNet
dhcp *DHCPManager dhcp *dhcp.Manager
// needs to give it back to dhcp // needs to give it back to dhcp
localTunIPv4 *net.IPNet localTunIPv4 *net.IPNet
localTunIPv6 *net.IPNet localTunIPv6 *net.IPNet
@@ -82,6 +83,7 @@ type ConnectOptions struct {
apiServerIPs []net.IP apiServerIPs []net.IP
extraHost []dns.Entry extraHost []dns.Entry
once sync.Once
} }
func (c *ConnectOptions) Context() context.Context { func (c *ConnectOptions) Context() context.Context {
@@ -89,49 +91,58 @@ func (c *ConnectOptions) Context() context.Context {
} }
func (c *ConnectOptions) InitDHCP(ctx context.Context) error { func (c *ConnectOptions) InitDHCP(ctx context.Context) error {
c.dhcp = NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace) if c.dhcp == nil {
err := c.dhcp.initDHCP(ctx) c.dhcp = dhcp.NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace)
return err return c.dhcp.InitDHCP(ctx)
}
return nil
} }
func (c *ConnectOptions) RentInnerIP(ctx context.Context) (context.Context, error) { func (c *ConnectOptions) RentIP(ctx context.Context) (context.Context, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
ipv4s := md.Get(config.HeaderIPv4)
if len(ipv4s) != 0 {
ip, ipNet, err := net.ParseCIDR(ipv4s[0])
if err == nil {
c.localTunIPv4 = &net.IPNet{IP: ip, Mask: ipNet.Mask}
log.Debugf("get ipv4 %s from context", c.localTunIPv4.String())
}
}
ipv6s := md.Get(config.HeaderIPv6)
if len(ipv6s) != 0 {
ip, ipNet, err := net.ParseCIDR(ipv6s[0])
if err == nil {
c.localTunIPv6 = &net.IPNet{IP: ip, Mask: ipNet.Mask}
log.Debugf("get ipv6 %s from context", c.localTunIPv6.String())
}
}
}
if c.dhcp == nil {
if err := c.InitDHCP(ctx); err != nil { if err := c.InitDHCP(ctx); err != nil {
return nil, err return nil, err
} }
}
var err error var err error
if c.localTunIPv4 == nil || c.localTunIPv6 == nil { c.localTunIPv4, c.localTunIPv6, err = c.dhcp.RentIP(ctx)
c.localTunIPv4, c.localTunIPv6, err = c.dhcp.RentIPBaseNICAddress(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ctx = metadata.AppendToOutgoingContext(ctx, ctx1 := metadata.AppendToOutgoingContext(
ctx,
config.HeaderIPv4, c.localTunIPv4.String(), config.HeaderIPv4, c.localTunIPv4.String(),
config.HeaderIPv6, c.localTunIPv6.String(), config.HeaderIPv6, c.localTunIPv6.String(),
) )
return ctx1, nil
} }
return ctx, nil
func (c *ConnectOptions) GetIPFromContext(ctx context.Context) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return fmt.Errorf("can not get ip from context")
}
ipv4 := md.Get(config.HeaderIPv4)
if len(ipv4) == 0 {
return fmt.Errorf("can not found ipv4 from header: %v", md)
}
ip, ipNet, err := net.ParseCIDR(ipv4[0])
if err != nil {
return fmt.Errorf("cat not convert ipv4 string: %s: %v", ipv4[0], err)
}
c.localTunIPv4 = &net.IPNet{IP: ip, Mask: ipNet.Mask}
log.Debugf("get ipv4 %s from context", c.localTunIPv4.String())
ipv6 := md.Get(config.HeaderIPv6)
if len(ipv6) == 0 {
return fmt.Errorf("can not found ipv6 from header: %v", md)
}
ip, ipNet, err = net.ParseCIDR(ipv6[0])
if err != nil {
return fmt.Errorf("cat not convert ipv6 string: %s: %v", ipv6[0], err)
}
c.localTunIPv6 = &net.IPNet{IP: ip, Mask: ipNet.Mask}
log.Debugf("get ipv6 %s from context", c.localTunIPv6.String())
return nil
} }
func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context) (err error) { func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context) (err error) {
@@ -149,9 +160,9 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context) (err error)
// https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/ // https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/
// means mesh mode // means mesh mode
if len(c.Headers) != 0 || len(c.PortMap) != 0 { if len(c.Headers) != 0 || len(c.PortMap) != 0 {
err = InjectVPNAndEnvoySidecar(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, configInfo, c.Headers, c.PortMap) err = inject.InjectVPNAndEnvoySidecar(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, configInfo, c.Headers, c.PortMap)
} else { } else {
err = InjectVPNSidecar(ctx, c.factory, c.Namespace, workload, configInfo) err = inject.InjectVPNSidecar(ctx, c.factory, c.Namespace, workload, configInfo)
} }
if err != nil { if err != nil {
log.Errorf("create remote inbound pod for %s failed: %s", workload, err.Error()) log.Errorf("create remote inbound pod for %s failed: %s", workload, err.Error())
@@ -166,12 +177,13 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error)
c.ctx, c.cancel = context.WithCancel(ctx) c.ctx, c.cancel = context.WithCancel(ctx)
log.Info("start to connect") log.Info("start to connect")
if err = c.InitDHCP(c.ctx); err != nil { m := dhcp.NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace)
if err = m.InitDHCP(c.ctx); err != nil {
log.Errorf("init dhcp failed: %s", err.Error()) log.Errorf("init dhcp failed: %s", err.Error())
return return
} }
c.addCleanUpResourceHandler() c.addCleanUpResourceHandler()
if err = c.getCIDR(c.ctx); err != nil { if err = c.getCIDR(c.ctx, m); err != nil {
log.Errorf("get cidr failed: %s", err.Error()) log.Errorf("get cidr failed: %s", err.Error())
return return
} }
@@ -182,9 +194,6 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error)
if err = c.upgradeDeploy(c.ctx); err != nil { if err = c.upgradeDeploy(c.ctx); err != nil {
return return
} }
if err = c.upgradeService(c.ctx); err != nil {
return
}
//if err = c.CreateRemoteInboundPod(c.ctx); err != nil { //if err = c.CreateRemoteInboundPod(c.ctx); err != nil {
// return // return
//} //}
@@ -227,13 +236,12 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error)
log.Errorf("add route dynamic failed: %v", err) log.Errorf("add route dynamic failed: %v", err)
return return
} }
c.deleteFirewallRule(c.ctx) go c.deleteFirewallRule(c.ctx)
log.Debugf("setup dns") log.Debugf("setup dns")
if err = c.setupDNS(c.ctx); err != nil { if err = c.setupDNS(c.ctx); err != nil {
log.Errorf("set up dns failed: %v", err) log.Errorf("set up dns failed: %v", err)
return return
} }
go c.heartbeats(c.ctx)
log.Info("dns service ok") log.Info("dns service ok")
return return
} }
@@ -314,10 +322,6 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err
} }
func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string, lite bool) (err error) { func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string, lite bool) (err error) {
// todo figure it out why
if util.IsWindows() {
c.localTunIPv4.Mask = net.CIDRMask(0, 32)
}
var list = sets.New[string]() var list = sets.New[string]()
if !lite { if !lite {
list.Insert(config.CIDR.String()) list.Insert(config.CIDR.String())
@@ -452,20 +456,11 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) error {
} }
func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) { func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) {
// Found those code looks like not works
if !util.FindAllowFirewallRule() {
util.AddAllowFirewallRule()
}
c.AddRolloutFunc(func() error {
util.DeleteAllowFirewallRule()
return nil
})
// The reason why delete firewall rule is: // The reason why delete firewall rule is:
// On windows use 'kubevpn proxy deploy/authors -H user=windows' // On windows use 'kubevpn proxy deploy/authors -H user=windows'
// Open terminal 'curl localhost:9080' ok // Open terminal 'curl localhost:9080' ok
// Open terminal 'curl localTunIP:9080' not ok // Open terminal 'curl localTunIP:9080' not ok
go util.DeleteBlockFirewallRule(ctx) util.DeleteBlockFirewallRule(ctx)
} }
func (c *ConnectOptions) setupDNS(ctx context.Context) error { func (c *ConnectOptions) setupDNS(ctx context.Context) error {
@@ -671,7 +666,7 @@ func (c *ConnectOptions) GetRunningPodList(ctx context.Context) ([]v1.Pod, error
// https://stackoverflow.com/questions/45903123/kubernetes-set-service-cidr-and-pod-cidr-the-same // https://stackoverflow.com/questions/45903123/kubernetes-set-service-cidr-and-pod-cidr-the-same
// https://stackoverflow.com/questions/44190607/how-do-you-find-the-cluster-service-cidr-of-a-kubernetes-cluster/54183373#54183373 // https://stackoverflow.com/questions/44190607/how-do-you-find-the-cluster-service-cidr-of-a-kubernetes-cluster/54183373#54183373
// https://stackoverflow.com/questions/44190607/how-do-you-find-the-cluster-service-cidr-of-a-kubernetes-cluster // https://stackoverflow.com/questions/44190607/how-do-you-find-the-cluster-service-cidr-of-a-kubernetes-cluster
func (c *ConnectOptions) getCIDR(ctx context.Context) (err error) { func (c *ConnectOptions) getCIDR(ctx context.Context, m *dhcp.Manager) (err error) {
defer func() { defer func() {
if err == nil { if err == nil {
u, err2 := url.Parse(c.config.Host) u, err2 := url.Parse(c.config.Host)
@@ -704,7 +699,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context) (err error) {
// (1) get cidr from cache // (1) get cidr from cache
var value string var value string
value, err = c.dhcp.Get(ctx, config.KeyClusterIPv4POOLS) value, err = m.Get(ctx, config.KeyClusterIPv4POOLS)
if err == nil { if err == nil {
for _, s := range strings.Split(value, " ") { for _, s := range strings.Split(value, " ") {
_, cidr, _ := net.ParseCIDR(s) _, cidr, _ := net.ParseCIDR(s)
@@ -713,7 +708,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context) (err error) {
} }
} }
if len(c.cidrs) != 0 { if len(c.cidrs) != 0 {
log.Infoln("got cidr from cache") log.Infoln("get cidr from cache")
return return
} }
} }
@@ -730,7 +725,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context) (err error) {
s.Insert(cidr.String()) s.Insert(cidr.String())
} }
c.cidrs = util.Deduplicate(append(c.cidrs, cidrs...)) c.cidrs = util.Deduplicate(append(c.cidrs, cidrs...))
_ = c.dhcp.Set(config.KeyClusterIPv4POOLS, strings.Join(s.UnsortedList(), " ")) _ = m.Set(ctx, config.KeyClusterIPv4POOLS, strings.Join(s.UnsortedList(), " "))
return return
} }
@@ -920,55 +915,38 @@ func (c *ConnectOptions) GetLocalTunIP() (v4 string, v6 string) {
func (c *ConnectOptions) GetClusterID() string { func (c *ConnectOptions) GetClusterID() string {
if c != nil && c.dhcp != nil { if c != nil && c.dhcp != nil {
return string(c.dhcp.clusterID) return string(c.dhcp.GetClusterID())
} }
return "" return ""
} }
func (c *ConnectOptions) upgradeDeploy(ctx context.Context) error { func (c *ConnectOptions) upgradeDeploy(ctx context.Context) error {
deployment, err := c.clientset.AppsV1().Deployments(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) deploy, err := c.clientset.AppsV1().Deployments(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil { if err != nil {
return err return err
} }
newImg, err := reference.ParseNormalizedNamed(config.Image) if len(deploy.Spec.Template.Spec.Containers) == 0 {
if err != nil { return fmt.Errorf("can not found any container in deploy %s", deploy.Name)
return err
} }
newTag, ok := newImg.(reference.NamedTagged)
if !ok { clientImg := config.Image
serverImg := deploy.Spec.Template.Spec.Containers[0].Image
if clientImg == serverImg {
return nil return nil
} }
oldImg, err := reference.ParseNormalizedNamed(deployment.Spec.Template.Spec.Containers[0].Image) isNewer, _ := newer(clientImg, serverImg)
if err != nil { if deploy.Status.ReadyReplicas > 0 && !isNewer {
return err
}
var oldTag reference.NamedTagged
oldTag, ok = oldImg.(reference.NamedTagged)
if !ok {
return nil return nil
} }
if reference.Domain(newImg) != reference.Domain(oldImg) {
return nil log.Infof("set image %s --> %s...", serverImg, clientImg)
}
var oldVersion, newVersion *goversion.Version
oldVersion, err = goversion.NewVersion(oldTag.Tag())
if err != nil {
return nil
}
newVersion, err = goversion.NewVersion(newTag.Tag())
if err != nil {
return nil
}
if oldVersion.GreaterThanOrEqual(newVersion) {
return nil
}
log.Infof("found newer image %s, set image from %s to it...", config.Image, deployment.Spec.Template.Spec.Containers[0].Image)
r := c.factory.NewBuilder(). r := c.factory.NewBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...). WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
NamespaceParam(c.Namespace).DefaultNamespace(). NamespaceParam(c.Namespace).DefaultNamespace().
ResourceNames("deployments", deployment.Name). ResourceNames("deployments", deploy.Name).
ContinueOnError(). ContinueOnError().
Latest(). Latest().
Flatten(). Flatten().
@@ -983,7 +961,7 @@ func (c *ConnectOptions) upgradeDeploy(ctx context.Context) error {
patches := set.CalculatePatches(infos, scheme.DefaultJSONEncoder(), func(obj pkgruntime.Object) ([]byte, error) { patches := set.CalculatePatches(infos, scheme.DefaultJSONEncoder(), func(obj pkgruntime.Object) ([]byte, error) {
_, err = polymorphichelpers.UpdatePodSpecForObjectFn(obj, func(spec *v1.PodSpec) error { _, err = polymorphichelpers.UpdatePodSpecForObjectFn(obj, func(spec *v1.PodSpec) error {
for i := range spec.Containers { for i := range spec.Containers {
spec.Containers[i].Image = config.Image spec.Containers[i].Image = clientImg
} }
return nil return nil
}) })
@@ -992,9 +970,10 @@ func (c *ConnectOptions) upgradeDeploy(ctx context.Context) error {
} }
return pkgruntime.Encode(scheme.DefaultJSONEncoder(), obj) return pkgruntime.Encode(scheme.DefaultJSONEncoder(), obj)
}) })
for _, p := range patches {
if err != nil { if p.Err != nil {
return err return p.Err
}
} }
for _, p := range patches { for _, p := range patches {
_, err = resource. _, err = resource.
@@ -1013,56 +992,36 @@ func (c *ConnectOptions) upgradeDeploy(ctx context.Context) error {
return nil return nil
} }
// update service spec, just for migrate func newer(clientImgStr, serverImgStr string) (bool, error) {
func (c *ConnectOptions) upgradeService(ctx context.Context) error { clientImg, err := reference.ParseNormalizedNamed(clientImgStr)
service, err := c.clientset.CoreV1().Services(c.Namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil { if err != nil {
return err return false, err
} }
for _, port := range service.Spec.Ports { serverImg, err := reference.ParseNormalizedNamed(serverImgStr)
if port.Port == 53 {
return nil
}
}
r := c.factory.NewBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
NamespaceParam(c.Namespace).DefaultNamespace().
ResourceNames("services", service.Name).
ContinueOnError().
Latest().
Flatten().
Do()
if err = r.Err(); err != nil {
return err
}
infos, err := r.Infos()
if err != nil { if err != nil {
return err return false, err
} }
patches := set.CalculatePatches(infos, scheme.DefaultJSONEncoder(), func(obj pkgruntime.Object) ([]byte, error) { if reference.Domain(clientImg) != reference.Domain(serverImg) {
v, ok := obj.(*v1.Service) return false, nil
if ok {
v.Spec.Ports = append(v.Spec.Ports, v1.ServicePort{
Name: "53-for-dns",
Protocol: v1.ProtocolUDP,
Port: 53,
TargetPort: intstr.FromInt32(53),
})
} }
return pkgruntime.Encode(scheme.DefaultJSONEncoder(), obj)
})
for _, p := range patches { serverTag, ok := serverImg.(reference.NamedTagged)
_, err = resource. if !ok {
NewHelper(p.Info.Client, p.Info.Mapping). return false, fmt.Errorf("can not convert server image")
DryRun(false). }
Patch(p.Info.Namespace, p.Info.Name, pkgtypes.StrategicMergePatchType, p.Patch, nil) serverVersion, err := goversion.NewVersion(serverTag.Tag())
if err != nil { if err != nil {
log.Errorf("failed to patch image update to pod template: %v", err) return false, err
return err
} }
clientTag, ok := clientImg.(reference.NamedTagged)
if !ok {
return false, fmt.Errorf("can not convert client image")
} }
return nil clientVersion, err := goversion.NewVersion(clientTag.Tag())
if err != nil {
return false, err
}
return clientVersion.GreaterThan(serverVersion), nil
} }
// The reason why only Ping each other inner ip on Windows: // The reason why only Ping each other inner ip on Windows:

View File

@@ -1,324 +0,0 @@
package handler
import (
"context"
"encoding/base64"
"fmt"
"net"
"github.com/cilium/ipam/service/allocator"
"github.com/cilium/ipam/service/ipallocator"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
type DHCPManager struct {
client corev1.ConfigMapInterface
cidr *net.IPNet
cidr6 *net.IPNet
namespace string
clusterID types.UID
}
func NewDHCPManager(client corev1.ConfigMapInterface, namespace string) *DHCPManager {
return &DHCPManager{
client: client,
namespace: namespace,
cidr: &net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask},
cidr6: &net.IPNet{IP: config.RouterIP6, Mask: config.CIDR6.Mask},
}
}
// initDHCP
// TODO optimize dhcp, using mac address, ip and deadline as unit
func (d *DHCPManager) initDHCP(ctx context.Context) error {
cm, err := d.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get configmap %s, err: %v", config.ConfigMapPodTrafficManager, err)
}
d.clusterID = util.GetClusterIDByCM(cm)
if err == nil {
// add key envoy in case of mount not exist content
if _, found := cm.Data[config.KeyEnvoy]; !found {
_, err = d.client.Patch(
ctx,
cm.Name,
types.MergePatchType,
[]byte(fmt.Sprintf(`{"data":{"%s":"%s"}}`, config.KeyEnvoy, "")),
metav1.PatchOptions{},
)
}
if err != nil {
return fmt.Errorf("failed to patch configmap %s, err: %v", config.ConfigMapPodTrafficManager, err)
}
return nil
}
cm = &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager,
Namespace: d.namespace,
Labels: map[string]string{},
},
Data: map[string]string{
config.KeyEnvoy: "",
config.KeyRefCount: "0",
},
}
cm, err = d.client.Create(ctx, cm, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create dhcp error, err: %v", err)
}
d.clusterID = util.GetClusterIDByCM(cm)
return nil
}
func (d *DHCPManager) RentIPBaseNICAddress(ctx context.Context) (*net.IPNet, *net.IPNet, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, nil, err
}
var isAlreadyExistedFunc = func(ips ...net.IP) bool {
for _, addr := range addrs {
addrIP, ok := addr.(*net.IPNet)
if ok {
for _, ip := range ips {
if addrIP.IP.Equal(ip) {
return true
}
}
}
}
return false
}
var v4, v6 net.IP
err = d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
for {
if v4, err = ipv4.AllocateNext(); err != nil {
return err
}
if !isAlreadyExistedFunc(v4) {
break
}
}
for {
if v6, err = ipv6.AllocateNext(); err != nil {
return err
}
if !isAlreadyExistedFunc(v6) {
break
}
}
return
})
if err != nil {
return nil, nil, err
}
return &net.IPNet{IP: v4, Mask: d.cidr.Mask}, &net.IPNet{IP: v6, Mask: d.cidr6.Mask}, nil
}
func (d *DHCPManager) RentIPRandom(ctx context.Context) (*net.IPNet, *net.IPNet, error) {
addrs, _ := net.InterfaceAddrs()
var isAlreadyExistedFunc = func(ips ...net.IP) bool {
for _, addr := range addrs {
if addr == nil {
continue
}
if addrIP, ok := addr.(*net.IPNet); ok {
for _, ip := range ips {
if addrIP.IP.Equal(ip) {
return true
}
}
}
}
return false
}
var v4, v6 net.IP
err := d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) (err error) {
for {
if v4, err = ipv4.AllocateNext(); err != nil {
return err
}
if !isAlreadyExistedFunc(v4) {
break
}
}
for {
if v6, err = ipv6.AllocateNext(); err != nil {
return err
}
if !isAlreadyExistedFunc(v6) {
break
}
}
return
})
if err != nil {
log.Errorf("failed to rent ip from DHCP server, err: %v", err)
return nil, nil, err
}
return &net.IPNet{IP: v4, Mask: d.cidr.Mask}, &net.IPNet{IP: v6, Mask: d.cidr6.Mask}, nil
}
func (d *DHCPManager) ReleaseIP(ctx context.Context, ips ...net.IP) error {
if len(ips) == 0 {
return nil
}
return d.updateDHCPConfigMap(ctx, func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error {
for _, ip := range ips {
var use *ipallocator.Range
if ip.To4() != nil {
use = ipv4
} else {
use = ipv6
}
if err := use.Release(ip); err != nil {
return err
}
}
return nil
})
}
func (d *DHCPManager) updateDHCPConfigMap(ctx context.Context, f func(ipv4 *ipallocator.Range, ipv6 *ipallocator.Range) error) error {
cm, err := d.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get cm DHCP server, err: %v", err)
}
if cm.Data == nil {
cm.Data = make(map[string]string)
}
var dhcp *ipallocator.Range
dhcp, err = ipallocator.NewAllocatorCIDRRange(d.cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
var str []byte
str, err = base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP])
if err == nil {
err = dhcp.Restore(d.cidr, str)
if err != nil {
return err
}
}
var dhcp6 *ipallocator.Range
dhcp6, err = ipallocator.NewAllocatorCIDRRange(d.cidr6, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
str, err = base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP6])
if err == nil {
err = dhcp6.Restore(d.cidr6, str)
if err != nil {
return err
}
}
if err = f(dhcp, dhcp6); err != nil {
return err
}
for index, i := range []*ipallocator.Range{dhcp, dhcp6} {
var bytes []byte
if _, bytes, err = i.Snapshot(); err != nil {
return err
}
var key string
if index == 0 {
key = config.KeyDHCP
} else {
key = config.KeyDHCP6
}
cm.Data[key] = base64.StdEncoding.EncodeToString(bytes)
}
_, err = d.client.Update(ctx, cm, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update dhcp failed, err: %v", err)
}
return nil
}
func (d *DHCPManager) Set(key, value string) error {
cm, err := d.client.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
log.Errorf("failed to get data, err: %v", err)
return err
}
if cm.Data == nil {
cm.Data = make(map[string]string)
}
cm.Data[key] = value
_, err = d.client.Update(context.Background(), cm, metav1.UpdateOptions{})
if err != nil {
log.Errorf("update data failed, err: %v", err)
return err
}
return nil
}
func (d *DHCPManager) Get(ctx2 context.Context, key string) (string, error) {
cm, err := d.client.Get(ctx2, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return "", err
}
if cm != nil && cm.Data != nil {
if v, ok := cm.Data[key]; ok {
return v, nil
}
}
return "", fmt.Errorf("can not get data")
}
func (d *DHCPManager) ForEach(ctx context.Context, fnv4 func(net.IP), fnv6 func(net.IP)) error {
cm, err := d.client.Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get cm DHCP server, err: %v", err)
}
if cm.Data == nil {
cm.Data = make(map[string]string)
}
var dhcp *ipallocator.Range
dhcp, err = ipallocator.NewAllocatorCIDRRange(d.cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
var str []byte
str, err = base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP])
if err == nil {
err = dhcp.Restore(d.cidr, str)
if err != nil {
return err
}
}
dhcp.ForEach(fnv4)
var dhcp6 *ipallocator.Range
dhcp6, err = ipallocator.NewAllocatorCIDRRange(d.cidr6, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
})
if err != nil {
return err
}
str, err = base64.StdEncoding.DecodeString(cm.Data[config.KeyDHCP6])
if err == nil {
err = dhcp6.Restore(d.cidr6, str)
if err != nil {
return err
}
}
dhcp6.ForEach(fnv6)
return nil
}

View File

@@ -34,7 +34,6 @@ import (
var ( var (
namespace string namespace string
clientset *kubernetes.Clientset clientset *kubernetes.Clientset
restclient *rest.RESTClient
restconfig *rest.Config restconfig *rest.Config
) )
@@ -348,9 +347,6 @@ func Init() {
if restconfig, err = f.ToRESTConfig(); err != nil { if restconfig, err = f.ToRESTConfig(); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if restclient, err = rest.RESTClientFor(restconfig); err != nil {
log.Fatal(err)
}
if clientset, err = kubernetes.NewForConfig(restconfig); err != nil { if clientset, err = kubernetes.NewForConfig(restconfig); err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@@ -3,12 +3,9 @@ package handler
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"net" "net"
"strconv"
"strings"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -19,16 +16,11 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
k8sjson "k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
pkgresource "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/cert" "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/retry"
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/polymorphichelpers" "k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/util/podutils" "k8s.io/kubectl/pkg/util/podutils"
@@ -36,7 +28,6 @@ import (
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
"github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/exchange"
"github.com/wencaiwulue/kubevpn/v2/pkg/util" "github.com/wencaiwulue/kubevpn/v2/pkg/util"
) )
@@ -46,12 +37,9 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
service, err := clientset.CoreV1().Services(namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{}) service, err := clientset.CoreV1().Services(namespace).Get(ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err == nil { if err == nil {
_, err = polymorphichelpers.AttachablePodForObjectFn(factory, service, 2*time.Second) var pod *v1.Pod
if err == nil { pod, err = polymorphichelpers.AttachablePodForObjectFn(factory, service, 2*time.Second)
_, err = updateRefCount(ctx, clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1) if err == nil && pod.DeletionTimestamp.IsZero() && podutils.IsPodReady(pod) {
if err != nil {
return
}
log.Infoln("traffic manager already exist, reuse it") log.Infoln("traffic manager already exist, reuse it")
return return
} }
@@ -70,7 +58,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
deleteResource(context.Background()) deleteResource(context.Background())
} }
}() }()
deleteResource(context.Background()) deleteResource(ctx)
log.Infoln("traffic manager not exist, try to create it...") log.Infoln("traffic manager not exist, try to create it...")
// 1) label namespace // 1) label namespace
@@ -195,7 +183,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
return err return err
} }
var Resources = v1.ResourceRequirements{ var resourcesSmall = v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{ Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("100m"), v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("128Mi"), v1.ResourceMemory: resource.MustParse("128Mi"),
@@ -205,7 +193,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
v1.ResourceMemory: resource.MustParse("256Mi"), v1.ResourceMemory: resource.MustParse("256Mi"),
}, },
} }
var ResourcesContainerVPN = v1.ResourceRequirements{ var resourcesLarge = v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{ Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("512Mi"), v1.ResourceMemory: resource.MustParse("512Mi"),
@@ -223,7 +211,6 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
log.Errorf("generate self signed cert and key error: %s", err.Error()) log.Errorf("generate self signed cert and key error: %s", err.Error())
return err return err
} }
// reason why not use v1.SecretTypeTls is because it needs key called tls.crt and tls.key, but tls.key can not as env variable // reason why not use v1.SecretTypeTls is because it needs key called tls.crt and tls.key, but tls.key can not as env variable
// ➜ ~ export tls.key=a // ➜ ~ export tls.key=a
//export: not valid in this context: tls.key //export: not valid in this context: tls.key
@@ -239,7 +226,6 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
Type: v1.SecretTypeOpaque, Type: v1.SecretTypeOpaque,
} }
_, err = clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{}) _, err = clientset.CoreV1().Secrets(namespace).Create(ctx, secret, metav1.CreateOptions{})
if err != nil && !k8serrors.IsAlreadyExists(err) { if err != nil && !k8serrors.IsAlreadyExists(err) {
log.Errorf("create secret error: %s", err.Error()) log.Errorf("create secret error: %s", err.Error())
return err return err
@@ -247,7 +233,7 @@ func createOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
// 6) create deployment // 6) create deployment
log.Infof("create deployment %s", config.ConfigMapPodTrafficManager) log.Infof("create deployment %s", config.ConfigMapPodTrafficManager)
deployment := &appsv1.Deployment{ deploy := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: config.ConfigMapPodTrafficManager, Name: config.ConfigMapPodTrafficManager,
Namespace: namespace, Namespace: namespace,
@@ -337,7 +323,7 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
ContainerPort: 10800, ContainerPort: 10800,
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
}}, }},
Resources: ResourcesContainerVPN, Resources: resourcesLarge,
ImagePullPolicy: v1.PullIfNotPresent, ImagePullPolicy: v1.PullIfNotPresent,
SecurityContext: &v1.SecurityContext{ SecurityContext: &v1.SecurityContext{
Capabilities: &v1.Capabilities{ Capabilities: &v1.Capabilities{
@@ -376,7 +362,7 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
}, },
}, },
ImagePullPolicy: v1.PullIfNotPresent, ImagePullPolicy: v1.PullIfNotPresent,
Resources: Resources, Resources: resourcesSmall,
}, },
{ {
Name: "webhook", Name: "webhook",
@@ -397,7 +383,7 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
}}, }},
Env: []v1.EnvVar{}, Env: []v1.EnvVar{},
ImagePullPolicy: v1.PullIfNotPresent, ImagePullPolicy: v1.PullIfNotPresent,
Resources: Resources, Resources: resourcesSmall,
}, },
}, },
RestartPolicy: v1.RestartPolicyAlways, RestartPolicy: v1.RestartPolicyAlways,
@@ -405,13 +391,13 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
}, },
}, },
} }
deployment, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) deploy, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deploy, metav1.CreateOptions{})
if err != nil { if err != nil {
log.Errorf("Failed to create deployment for %s: %v", config.ConfigMapPodTrafficManager, err) log.Errorf("Failed to create deployment for %s: %v", config.ConfigMapPodTrafficManager, err)
return err return err
} }
str := fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String() str := fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String()
_, selector, err := polymorphichelpers.SelectorsForObject(deployment) _, selector, err := polymorphichelpers.SelectorsForObject(deploy)
if err == nil { if err == nil {
str = selector.String() str = selector.String()
} }
@@ -427,7 +413,7 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
ctx2, cancelFunc := context.WithTimeout(ctx, time.Minute*60) ctx2, cancelFunc := context.WithTimeout(ctx, time.Minute*60)
defer cancelFunc() defer cancelFunc()
wait.UntilWithContext(ctx2, func(ctx context.Context) { wait.UntilWithContext(ctx2, func(ctx context.Context) {
podList, err := clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ podList, err := clientset.CoreV1().Pods(namespace).List(ctx2, metav1.ListOptions{
LabelSelector: fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String(), LabelSelector: fields.OneTermEqualSelector("app", config.ConfigMapPodTrafficManager).String(),
}) })
if err != nil { if err != nil {
@@ -518,307 +504,6 @@ kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:1080
if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) { if err != nil && !k8serrors.IsForbidden(err) && !k8serrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create MutatingWebhookConfigurations, err: %v", err) return fmt.Errorf("failed to create MutatingWebhookConfigurations, err: %v", err)
} }
_, err = updateRefCount(ctx, clientset.CoreV1().ConfigMaps(namespace), config.ConfigMapPodTrafficManager, 1)
if err != nil {
log.Errorf("Failed to update ref count for %s: %v", config.ConfigMapPodTrafficManager, err)
return
}
return
}
func InjectVPNSidecar(ctx1 context.Context, factory cmdutil.Factory, namespace, workload string, c util.PodRouteConfig) error {
object, err := util.GetUnstructuredObject(factory, namespace, workload)
if err != nil {
return err
}
u := object.Object.(*unstructured.Unstructured)
podTempSpec, path, err := util.GetPodTemplateSpecPath(u)
if err != nil {
return err
}
clientset, err := factory.KubernetesClientSet()
if err != nil {
return err
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
var ports []v1.ContainerPort
for _, container := range podTempSpec.Spec.Containers {
ports = append(ports, container.Ports...)
}
var portmap = make(map[int32]int32)
for _, port := range ports {
portmap[port.ContainerPort] = port.ContainerPort
}
err = addEnvoyConfig(clientset.CoreV1().ConfigMaps(namespace), nodeID, c, nil, ports, portmap)
if err != nil {
log.Errorf("add envoy config error: %v", err)
return err
}
origin := *podTempSpec
exchange.AddContainer(&podTempSpec.Spec, c)
helper := pkgresource.NewHelper(object.Client, object.Mapping)
// pods without controller
if len(path) == 0 {
log.Infof("workload %s/%s is not controlled by any controller", namespace, workload)
for _, container := range podTempSpec.Spec.Containers {
container.LivenessProbe = nil
container.StartupProbe = nil
container.ReadinessProbe = nil
}
p := &v1.Pod{ObjectMeta: podTempSpec.ObjectMeta, Spec: podTempSpec.Spec}
CleanupUselessInfo(p)
if err = CreateAfterDeletePod(factory, p, helper); err != nil {
return err
}
//rollbackFuncList = append(rollbackFuncList, func() {
// p2 := &v1.Pod{ObjectMeta: origin.ObjectMeta, Spec: origin.Spec}
// CleanupUselessInfo(p2)
// if err = CreateAfterDeletePod(factory, p2, helper); err != nil {
// log.Error(err)
// }
//})
} else
// controllers
{
log.Infof("workload %s/%s is controlled by a controller", namespace, workload)
// remove probe
removePatch, restorePatch := patch(origin, path)
b, _ := json.Marshal(restorePatch)
p := []P{
{
Op: "replace",
Path: "/" + strings.Join(append(path, "spec"), "/"),
Value: podTempSpec.Spec,
},
{
Op: "replace",
Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey,
Value: string(b),
},
}
marshal, _ := json.Marshal(append(p, removePatch...))
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, marshal, &metav1.PatchOptions{})
if err != nil {
log.Errorf("error while inject proxy container, err: %v, exiting...", err)
return err
}
//rollbackFuncList = append(rollbackFuncList, func() {
// if err = removeInboundContainer(factory, namespace, workload); err != nil {
// log.Error(err)
// }
// //b, _ := json.Marshal(restorePatch)
// if _, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, b, &metav1.PatchOptions{}); err != nil {
// log.Warnf("error while restore probe of resource: %s %s, ignore, err: %v",
// object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
// }
//})
}
if err != nil {
return err
}
// todo not work?
err = util.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60)
return err
}
func CreateAfterDeletePod(factory cmdutil.Factory, p *v1.Pod, helper *pkgresource.Helper) error {
_, err := helper.DeleteWithOptions(p.Namespace, p.Name, &metav1.DeleteOptions{
GracePeriodSeconds: pointer.Int64(0),
})
if err != nil {
log.Errorf("error while delete resource: %s %s, ignore, err: %v", p.Namespace, p.Name, err)
}
err = retry.OnError(wait.Backoff{
Steps: 10,
Duration: 50 * time.Millisecond,
Factor: 5.0,
Jitter: 1,
}, func(err error) bool {
if !k8serrors.IsAlreadyExists(err) {
return true
}
clientset, err := factory.KubernetesClientSet()
get, err := clientset.CoreV1().Pods(p.Namespace).Get(context.Background(), p.Name, metav1.GetOptions{})
if err != nil || get.Status.Phase != v1.PodRunning {
return true
}
return false
}, func() error {
if _, err := helper.Create(p.Namespace, true, p); err != nil {
return err
}
return errors.New("")
})
if err != nil {
if k8serrors.IsAlreadyExists(err) {
return nil return nil
} }
log.Errorf("error while create resource: %s %s, err: %v", p.Namespace, p.Name, err)
return err
}
return nil
}
func removeInboundContainer(factory cmdutil.Factory, namespace, workloads string) error {
object, err := util.GetUnstructuredObject(factory, namespace, workloads)
if err != nil {
return err
}
u := object.Object.(*unstructured.Unstructured)
podTempSpec, path, err := util.GetPodTemplateSpecPath(u)
if err != nil {
return err
}
helper := pkgresource.NewHelper(object.Client, object.Mapping)
// pods
if len(path) == 0 {
_, err = helper.DeleteWithOptions(object.Namespace, object.Name, &metav1.DeleteOptions{
GracePeriodSeconds: pointer.Int64(0),
})
if err != nil {
return err
}
}
// how to scale to one
exchange.RemoveContainer(&podTempSpec.Spec)
bytes, err := json.Marshal([]struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value"`
}{{
Op: "replace",
Path: "/" + strings.Join(append(path, "spec"), "/"),
Value: podTempSpec.Spec,
}})
if err != nil {
return err
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{
//Force: &t,
})
return err
}
func CleanupUselessInfo(pod *v1.Pod) {
pod.SetSelfLink("")
pod.SetGeneration(0)
pod.SetResourceVersion("")
pod.SetUID("")
pod.SetDeletionTimestamp(nil)
pod.SetSelfLink("")
pod.SetManagedFields(nil)
pod.SetOwnerReferences(nil)
}
type P struct {
Op string `json:"op,omitempty"`
Path string `json:"path,omitempty"`
Value interface{} `json:"value,omitempty"`
}
func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) {
for i := range spec.Spec.Containers {
index := strconv.Itoa(i)
readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/")
livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/")
startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/")
f := func(p *v1.Probe) string {
if p == nil {
return ""
}
marshal, err := k8sjson.Marshal(p)
if err != nil {
log.Errorf("error while json marshal: %v", err)
return ""
}
return string(marshal)
}
remove = append(remove, P{
Op: "replace",
Path: readinessPath,
Value: nil,
}, P{
Op: "replace",
Path: livenessPath,
Value: nil,
}, P{
Op: "replace",
Path: startupPath,
Value: nil,
})
restore = append(restore, P{
Op: "replace",
Path: readinessPath,
Value: f(spec.Spec.Containers[i].ReadinessProbe),
}, P{
Op: "replace",
Path: livenessPath,
Value: f(spec.Spec.Containers[i].LivenessProbe),
}, P{
Op: "replace",
Path: startupPath,
Value: f(spec.Spec.Containers[i].StartupProbe),
})
}
return
}
func fromPatchToProbe(spec *v1.PodTemplateSpec, path []string, patch []P) {
// 3 = readiness + liveness + startup
if len(patch) != 3*len(spec.Spec.Containers) {
log.Debugf("patch not match container num, not restore")
return
}
for i := range spec.Spec.Containers {
index := strconv.Itoa(i)
readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/")
livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/")
startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/")
var f = func(value any) *v1.Probe {
if value == nil {
return nil
}
str, ok := value.(string)
if ok && str == "" {
return nil
}
if !ok {
marshal, err := k8sjson.Marshal(value)
if err != nil {
log.Errorf("error while json marshal: %v", err)
return nil
}
str = string(marshal)
}
var probe v1.Probe
err := k8sjson.Unmarshal([]byte(str), &probe)
if err != nil {
log.Errorf("error while json unmarsh: %v", err)
return nil
}
return &probe
}
for _, p := range patch {
switch p.Path {
case readinessPath:
spec.Spec.Containers[i].ReadinessProbe = f(p.Value)
case livenessPath:
spec.Spec.Containers[i].LivenessProbe = f(p.Value)
case startupPath:
spec.Spec.Containers[i].StartupProbe = f(p.Value)
}
}
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/controlplane" "github.com/wencaiwulue/kubevpn/v2/pkg/controlplane"
"github.com/wencaiwulue/kubevpn/v2/pkg/inject"
) )
// Reset // Reset
@@ -89,7 +90,7 @@ func (c *ConnectOptions) LeaveProxyResources(ctx context.Context) (err error) {
// deployments.apps.ry-server --> deployments.apps/ry-server // deployments.apps.ry-server --> deployments.apps/ry-server
lastIndex := strings.LastIndex(virtual.Uid, ".") lastIndex := strings.LastIndex(virtual.Uid, ".")
uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:] uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:]
err = UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, v4) err = inject.UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, v4)
if err != nil { if err != nil {
log.Errorf("leave workload %s failed: %v", uid, err) log.Errorf("leave workload %s failed: %v", uid, err)
continue continue

View File

@@ -14,8 +14,8 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/core" "github.com/wencaiwulue/kubevpn/v2/pkg/core"
"github.com/wencaiwulue/kubevpn/v2/pkg/dhcp/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/util" "github.com/wencaiwulue/kubevpn/v2/pkg/util"
"github.com/wencaiwulue/kubevpn/v2/pkg/webhook/rpc"
) )
func Complete(ctx context.Context, route *core.Route) error { func Complete(ctx context.Context, route *core.Route) error {
@@ -32,7 +32,7 @@ func Complete(ctx context.Context, route *core.Route) error {
return err return err
} }
var resp *rpc.RentIPResponse var resp *rpc.RentIPResponse
resp, err = client.RentIP(context.Background(), &rpc.RentIPRequest{ resp, err = client.RentIP(ctx, &rpc.RentIPRequest{
PodName: os.Getenv(config.EnvPodName), PodName: os.Getenv(config.EnvPodName),
PodNamespace: ns, PodNamespace: ns,
}) })
@@ -42,11 +42,16 @@ func Complete(ctx context.Context, route *core.Route) error {
go func() { go func() {
<-ctx.Done() <-ctx.Done()
err := release(context.Background(), client) _, err2 := client.ReleaseIP(context.Background(), &rpc.ReleaseIPRequest{
if err != nil { PodName: os.Getenv(config.EnvPodName),
log.Errorf("release ip failed: %v", err) PodNamespace: os.Getenv(config.EnvPodNamespace),
IPv4CIDR: os.Getenv(config.EnvInboundPodTunIPv4),
IPv6CIDR: os.Getenv(config.EnvInboundPodTunIPv6),
})
if err2 != nil {
log.Errorf("release ip %s and %s failed: %v", resp.IPv4CIDR, resp.IPv6CIDR, err2)
} else { } else {
log.Errorf("release ip secuess") log.Errorf("release ip %s and %s secuess", resp.IPv4CIDR, resp.IPv6CIDR)
} }
}() }()

View File

@@ -1,4 +1,4 @@
package handler package inject
import ( import (
"context" "context"

View File

@@ -1,4 +1,4 @@
package exchange package inject
import ( import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"

301
pkg/inject/mesh.go Normal file
View File

@@ -0,0 +1,301 @@
package inject
import (
"context"
"encoding/json"
errors2 "errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
json2 "k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/util/retry"
"k8s.io/kubectl/pkg/cmd/util"
"k8s.io/utils/pointer"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
util2 "github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func InjectVPNSidecar(ctx1 context.Context, factory util.Factory, namespace, workload string, c util2.PodRouteConfig) error {
object, err := util2.GetUnstructuredObject(factory, namespace, workload)
if err != nil {
return err
}
u := object.Object.(*unstructured.Unstructured)
podTempSpec, path, err := util2.GetPodTemplateSpecPath(u)
if err != nil {
return err
}
clientset, err := factory.KubernetesClientSet()
if err != nil {
return err
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
var ports []v1.ContainerPort
for _, container := range podTempSpec.Spec.Containers {
ports = append(ports, container.Ports...)
}
var portmap = make(map[int32]int32)
for _, port := range ports {
portmap[port.ContainerPort] = port.ContainerPort
}
err = addEnvoyConfig(clientset.CoreV1().ConfigMaps(namespace), nodeID, c, nil, ports, portmap)
if err != nil {
logrus.Errorf("add envoy config error: %v", err)
return err
}
origin := *podTempSpec
AddContainer(&podTempSpec.Spec, c)
helper := resource.NewHelper(object.Client, object.Mapping)
// pods without controller
if len(path) == 0 {
logrus.Infof("workload %s/%s is not controlled by any controller", namespace, workload)
for _, container := range podTempSpec.Spec.Containers {
container.LivenessProbe = nil
container.StartupProbe = nil
container.ReadinessProbe = nil
}
p := &v1.Pod{ObjectMeta: podTempSpec.ObjectMeta, Spec: podTempSpec.Spec}
CleanupUselessInfo(p)
if err = CreateAfterDeletePod(factory, p, helper); err != nil {
return err
}
} else
// controllers
{
logrus.Infof("workload %s/%s is controlled by a controller", namespace, workload)
// remove probe
removePatch, restorePatch := patch(origin, path)
b, _ := json.Marshal(restorePatch)
p := []P{
{
Op: "replace",
Path: "/" + strings.Join(append(path, "spec"), "/"),
Value: podTempSpec.Spec,
},
{
Op: "replace",
Path: "/metadata/annotations/" + config.KubeVPNRestorePatchKey,
Value: string(b),
},
}
marshal, _ := json.Marshal(append(p, removePatch...))
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, marshal, &v12.PatchOptions{})
if err != nil {
logrus.Errorf("error while inject proxy container, err: %v, exiting...", err)
return err
}
}
err = util2.RolloutStatus(ctx1, factory, namespace, workload, time.Minute*60)
return err
}
func CreateAfterDeletePod(factory util.Factory, p *v1.Pod, helper *resource.Helper) error {
_, err := helper.DeleteWithOptions(p.Namespace, p.Name, &v12.DeleteOptions{
GracePeriodSeconds: pointer.Int64(0),
})
if err != nil {
logrus.Errorf("error while delete resource: %s %s, ignore, err: %v", p.Namespace, p.Name, err)
}
err = retry.OnError(wait.Backoff{
Steps: 10,
Duration: 50 * time.Millisecond,
Factor: 5.0,
Jitter: 1,
}, func(err error) bool {
if !errors.IsAlreadyExists(err) {
return true
}
clientset, err := factory.KubernetesClientSet()
get, err := clientset.CoreV1().Pods(p.Namespace).Get(context.Background(), p.Name, v12.GetOptions{})
if err != nil || get.Status.Phase != v1.PodRunning {
return true
}
return false
}, func() error {
if _, err := helper.Create(p.Namespace, true, p); err != nil {
return err
}
return errors2.New("")
})
if err != nil {
if errors.IsAlreadyExists(err) {
return nil
}
logrus.Errorf("error while create resource: %s %s, err: %v", p.Namespace, p.Name, err)
return err
}
return nil
}
func removeInboundContainer(factory util.Factory, namespace, workloads string) error {
object, err := util2.GetUnstructuredObject(factory, namespace, workloads)
if err != nil {
return err
}
u := object.Object.(*unstructured.Unstructured)
podTempSpec, path, err := util2.GetPodTemplateSpecPath(u)
if err != nil {
return err
}
helper := resource.NewHelper(object.Client, object.Mapping)
// pods
if len(path) == 0 {
_, err = helper.DeleteWithOptions(object.Namespace, object.Name, &v12.DeleteOptions{
GracePeriodSeconds: pointer.Int64(0),
})
if err != nil {
return err
}
}
// how to scale to one
RemoveContainer(&podTempSpec.Spec)
bytes, err := json.Marshal([]struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value"`
}{{
Op: "replace",
Path: "/" + strings.Join(append(path, "spec"), "/"),
Value: podTempSpec.Spec,
}})
if err != nil {
return err
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &v12.PatchOptions{
//Force: &t,
})
return err
}
func CleanupUselessInfo(pod *v1.Pod) {
pod.SetSelfLink("")
pod.SetGeneration(0)
pod.SetResourceVersion("")
pod.SetUID("")
pod.SetDeletionTimestamp(nil)
pod.SetSelfLink("")
pod.SetManagedFields(nil)
pod.SetOwnerReferences(nil)
}
type P struct {
Op string `json:"op,omitempty"`
Path string `json:"path,omitempty"`
Value interface{} `json:"value,omitempty"`
}
func patch(spec v1.PodTemplateSpec, path []string) (remove []P, restore []P) {
for i := range spec.Spec.Containers {
index := strconv.Itoa(i)
readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/")
livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/")
startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/")
f := func(p *v1.Probe) string {
if p == nil {
return ""
}
marshal, err := json2.Marshal(p)
if err != nil {
logrus.Errorf("error while json marshal: %v", err)
return ""
}
return string(marshal)
}
remove = append(remove, P{
Op: "replace",
Path: readinessPath,
Value: nil,
}, P{
Op: "replace",
Path: livenessPath,
Value: nil,
}, P{
Op: "replace",
Path: startupPath,
Value: nil,
})
restore = append(restore, P{
Op: "replace",
Path: readinessPath,
Value: f(spec.Spec.Containers[i].ReadinessProbe),
}, P{
Op: "replace",
Path: livenessPath,
Value: f(spec.Spec.Containers[i].LivenessProbe),
}, P{
Op: "replace",
Path: startupPath,
Value: f(spec.Spec.Containers[i].StartupProbe),
})
}
return
}
func fromPatchToProbe(spec *v1.PodTemplateSpec, path []string, patch []P) {
// 3 = readiness + liveness + startup
if len(patch) != 3*len(spec.Spec.Containers) {
logrus.Debugf("patch not match container num, not restore")
return
}
for i := range spec.Spec.Containers {
index := strconv.Itoa(i)
readinessPath := "/" + strings.Join(append(path, "spec", "containers", index, "readinessProbe"), "/")
livenessPath := "/" + strings.Join(append(path, "spec", "containers", index, "livenessProbe"), "/")
startupPath := "/" + strings.Join(append(path, "spec", "containers", index, "startupProbe"), "/")
var f = func(value any) *v1.Probe {
if value == nil {
return nil
}
str, ok := value.(string)
if ok && str == "" {
return nil
}
if !ok {
marshal, err := json2.Marshal(value)
if err != nil {
logrus.Errorf("error while json marshal: %v", err)
return nil
}
str = string(marshal)
}
var probe v1.Probe
err := json2.Unmarshal([]byte(str), &probe)
if err != nil {
logrus.Errorf("error while json unmarsh: %v", err)
return nil
}
return &probe
}
for _, p := range patch {
switch p.Path {
case readinessPath:
spec.Spec.Containers[i].ReadinessProbe = f(p.Value)
case livenessPath:
spec.Spec.Containers[i].LivenessProbe = f(p.Value)
case startupPath:
spec.Spec.Containers[i].StartupProbe = f(p.Value)
}
}
}
}

View File

@@ -27,7 +27,7 @@ func RunCmdWithElevated(exe string, args []string) error {
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin cmd.Stdin = os.Stdin
cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") cmd.Env = append(os.Environ(), config.EnvDisableSyncthingLog+"=1")
err := cmd.Start() err := cmd.Start()
if err != nil { if err != nil {
return err return err
@@ -54,7 +54,7 @@ func RunCmd(exe string, args []string) error {
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin cmd.Stdin = os.Stdin
cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") cmd.Env = append(os.Environ(), config.EnvDisableSyncthingLog+"=1")
err := cmd.Start() err := cmd.Start()
if err != nil { if err != nil {
return err return err

View File

@@ -29,7 +29,7 @@ func RunWithElevated() {
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
cmd.Stdin = os.Stdin cmd.Stdin = os.Stdin
cmd.Env = append(os.Environ(), config.EnvStartSudoKubeVPNByKubeVPN+"=1", config.EnvDisableSyncthingLog+"=1") cmd.Env = append(os.Environ(), config.EnvDisableSyncthingLog+"=1")
// while send single CTRL+C, command will quit immediately, but output will cut off and print util quit final // while send single CTRL+C, command will quit immediately, but output will cut off and print util quit final
// so, mute single CTRL+C, let inner command handle single only // so, mute single CTRL+C, let inner command handle single only
go func() { go func() {

View File

@@ -6,15 +6,4 @@ import (
"context" "context"
) )
func DeleteBlockFirewallRule(_ context.Context) { func DeleteBlockFirewallRule(ctx context.Context) {}
}
func AddAllowFirewallRule() {
}
func DeleteAllowFirewallRule() {
}
func FindAllowFirewallRule() bool {
return false
}

View File

@@ -8,14 +8,63 @@ import (
"syscall" "syscall"
"time" "time"
log "github.com/sirupsen/logrus"
"golang.org/x/text/encoding/simplifiedchinese" "golang.org/x/text/encoding/simplifiedchinese"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
) )
/**
When startup an app listen 0.0.0.0 on Windows
Windows Security Alert
[x] Private networks,such as my home or work network
[ ] Public networks, such as those in airports and coffee shops (not recommended because these networks often have little or no security)
if not select the second options, Windows add a firewall rule like:
Get-NetFirewallRule -Direction Inbound -Action Block | Sort-Object -Property Priority
Name : {9127CE75-0943-4877-B797-1316948CDCA8}
DisplayName : ___go_build_authors.exe
Description : ___go_build_authors.exe
DisplayGroup :
Group :
Enabled : True
Profile : Public
Platform : {}
Direction : Inbound
Action : Block
EdgeTraversalPolicy : Block
LooseSourceMapping : False
LocalOnlyMapping : False
Owner :
PrimaryStatus : OK
Status : The rule was parsed successfully from the store. (65536)
EnforcementStatus : NotApplicable
PolicyStoreSource : PersistentStore
PolicyStoreSourceType : Local
RemoteDynamicKeywordAddresses :
PolicyAppId :
this makes tunIP can not access local service, so we need to delete this rule
*/
// DeleteBlockFirewallRule Delete all action block firewall rule // DeleteBlockFirewallRule Delete all action block firewall rule
func DeleteBlockFirewallRule(ctx context.Context) { func DeleteBlockFirewallRule(ctx context.Context) {
var deleteFirewallBlockRule = func() {
// PowerShell Remove-NetFirewallRule -Action Block
cmd := exec.CommandContext(ctx, "PowerShell", []string{"Remove-NetFirewallRule", "-Action", "Block"}...)
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
_, _ = cmd.CombinedOutput()
/*if err != nil && out != nil {
s := string(out)
var b []byte
if b, err = decode(out); err == nil {
s = string(b)
}
log.Debugf("failed to delete firewall rule: %v", s)
}*/
}
deleteFirewallBlockRule()
ticker := time.NewTicker(time.Second * 10) ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop() defer ticker.Stop()
for { for {
@@ -23,98 +72,19 @@ func DeleteBlockFirewallRule(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
// PowerShell Remove-NetFirewallRule -Action Block deleteFirewallBlockRule()
cmd := exec.Command("PowerShell", []string{
"Remove-NetFirewallRule",
"-Action",
"Block",
}...)
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
cmd.Run()
} }
} }
} }
func AddAllowFirewallRule() { func decode(in []byte) ([]byte, error) {
// netsh advfirewall firewall add rule name=kubevpn-traffic-manager dir=in action=allow enable=yes remoteip=223.254.0.100/16,LocalSubnet out, err := simplifiedchinese.GB18030.NewDecoder().Bytes(in)
cmd := exec.Command("netsh", []string{
"advfirewall",
"firewall",
"add",
"rule",
"name=" + config.ConfigMapPodTrafficManager,
"dir=in",
"action=allow",
"enable=yes",
"remoteip=" + config.CIDR.String() + ",LocalSubnet",
}...)
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
if out, err := cmd.CombinedOutput(); err != nil {
var s string
var b []byte
if b, err = decode(out); err == nil {
s = string(b)
} else {
s = string(out)
}
log.Infof("error while exec command: %s, out: %s", cmd.Args, s)
}
}
func DeleteAllowFirewallRule() {
// netsh advfirewall firewall delete rule name=kubevpn-traffic-manager
cmd := exec.Command("netsh", []string{
"advfirewall",
"firewall",
"delete",
"rule",
"name=" + config.ConfigMapPodTrafficManager,
}...)
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
if out, err := cmd.CombinedOutput(); err != nil {
var s string
var b []byte
if b, err = decode(out); err == nil {
s = string(b)
} else {
s = string(out)
}
log.Errorf("error while exec command: %s, out: %s", cmd.Args, s)
}
}
func FindAllowFirewallRule() bool {
// netsh advfirewall firewall show rule name=kubevpn-traffic-manager
cmd := exec.Command("netsh", []string{
"advfirewall",
"firewall",
"show",
"rule",
"name=" + config.ConfigMapPodTrafficManager,
}...)
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
if out, err := cmd.CombinedOutput(); err != nil {
s := string(out)
var b []byte
if b, err = decode(out); err == nil {
s = string(b)
}
log.Debugf("find route out: %s", s)
return false
} else {
return true
}
}
func decode(in []byte) (out []byte, err error) {
out = in
out, err = simplifiedchinese.GB18030.NewDecoder().Bytes(in)
if err == nil { if err == nil {
return return out, err
} }
out, err = simplifiedchinese.GBK.NewDecoder().Bytes(in) out, err = simplifiedchinese.GBK.NewDecoder().Bytes(in)
if err == nil { if err == nil {
return return out, err
} }
return return nil, err
} }

View File

@@ -125,8 +125,7 @@ func InitFactory(kubeconfigBytes string, ns string) cmdutil.Factory {
} }
return c return c
} }
// todo optimize here temp, err := os.CreateTemp("", "*.kubeconfig")
temp, err := os.CreateTemp("", "*.json")
if err != nil { if err != nil {
return nil return nil
} }

View File

@@ -296,6 +296,8 @@ func WaitPodToBeReady(ctx context.Context, podInterface v12.PodInterface, select
} }
case <-ticker.C: case <-ticker.C:
return errors.New(fmt.Sprintf("wait pod to be ready timeout")) return errors.New(fmt.Sprintf("wait pod to be ready timeout"))
case <-ctx.Done():
return ctx.Err()
} }
} }
} }

View File

@@ -119,11 +119,11 @@ func RolloutStatus(ctx1 context.Context, factory cmdutil.Factory, namespace, wor
lw := &cache.ListWatch{ lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) { ListFunc: func(options metav1.ListOptions) (k8sruntime.Object, error) {
options.FieldSelector = fieldSelector options.FieldSelector = fieldSelector
return client.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.Background(), options) return client.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(ctx1, options)
}, },
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector options.FieldSelector = fieldSelector
return client.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.Background(), options) return client.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(ctx1, options)
}, },
} }

View File

@@ -1,66 +0,0 @@
package webhook
import (
"context"
"net"
"sync"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/kubectl/pkg/cmd/util"
"github.com/wencaiwulue/kubevpn/v2/pkg/handler"
"github.com/wencaiwulue/kubevpn/v2/pkg/webhook/rpc"
)
type dhcpServer struct {
rpc.UnimplementedDHCPServer
sync.Mutex
f util.Factory
clientset *kubernetes.Clientset
}
func (d *dhcpServer) RentIP(ctx context.Context, req *rpc.RentIPRequest) (*rpc.RentIPResponse, error) {
d.Lock()
defer d.Unlock()
log.Infof("handling rent ip request, pod name: %s, ns: %s", req.PodName, req.PodNamespace)
cmi := d.clientset.CoreV1().ConfigMaps(req.PodNamespace)
dhcp := handler.NewDHCPManager(cmi, req.PodNamespace)
v4, v6, err := dhcp.RentIPRandom(ctx)
if err != nil {
log.Errorf("rent ip failed, err: %v", err)
return nil, err
}
// todo patch annotation
resp := &rpc.RentIPResponse{
IPv4CIDR: v4.String(),
IPv6CIDR: v6.String(),
}
return resp, nil
}
func (d *dhcpServer) ReleaseIP(ctx context.Context, req *rpc.ReleaseIPRequest) (*rpc.ReleaseIPResponse, error) {
d.Lock()
defer d.Unlock()
log.Infof("handling release ip request, pod name: %s, ns: %s, ipv4: %s, ipv6: %s", req.PodName, req.PodNamespace, req.IPv4CIDR, req.IPv6CIDR)
var ips []net.IP
for _, s := range []string{req.IPv4CIDR, req.IPv6CIDR} {
ip, _, err := net.ParseCIDR(s)
if err != nil {
log.Errorf("ip is invailed, ip: %s, err: %v", ip.String(), err)
continue
}
ips = append(ips, ip)
}
cmi := d.clientset.CoreV1().ConfigMaps(req.PodNamespace)
dhcp := handler.NewDHCPManager(cmi, req.PodNamespace)
if err := dhcp.ReleaseIP(context.Background(), ips...); err != nil {
log.Errorf("release ip failed, err: %v", err)
return nil, err
}
return &rpc.ReleaseIPResponse{}, nil
}

View File

@@ -18,7 +18,8 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon"
"github.com/wencaiwulue/kubevpn/v2/pkg/webhook/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/dhcp"
"github.com/wencaiwulue/kubevpn/v2/pkg/dhcp/rpc"
) )
func Main(f util.Factory) error { func Main(f util.Factory) error {
@@ -67,7 +68,7 @@ func Main(f util.Factory) error {
handler := daemon.CreateDowngradingHandler(grpcServer, http.HandlerFunc(http.DefaultServeMux.ServeHTTP)) handler := daemon.CreateDowngradingHandler(grpcServer, http.HandlerFunc(http.DefaultServeMux.ServeHTTP))
downgradingServer.Handler = h2c.NewHandler(handler, &h2Server) downgradingServer.Handler = h2c.NewHandler(handler, &h2Server)
defer downgradingServer.Close() defer downgradingServer.Close()
rpc.RegisterDHCPServer(grpcServer, &dhcpServer{f: f, clientset: clientset}) rpc.RegisterDHCPServer(grpcServer, dhcp.NewServer(clientset))
return downgradingServer.ListenAndServeTLS("", "") return downgradingServer.ListenAndServeTLS("", "")
} }

View File

@@ -17,7 +17,7 @@ import (
"k8s.io/utils/ptr" "k8s.io/utils/ptr"
"github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/handler" "github.com/wencaiwulue/kubevpn/v2/pkg/dhcp"
"github.com/wencaiwulue/kubevpn/v2/pkg/util" "github.com/wencaiwulue/kubevpn/v2/pkg/util"
) )
@@ -81,7 +81,7 @@ func (h *admissionReviewHandler) handleCreate(ar v1.AdmissionReview) *v1.Admissi
h.Lock() h.Lock()
defer h.Unlock() defer h.Unlock()
cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace) cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
dhcp := handler.NewDHCPManager(cmi, ar.Request.Namespace) manager := dhcp.NewDHCPManager(cmi, ar.Request.Namespace)
var ips []net.IP var ips []net.IP
for k := 0; k < len(container.Env); k++ { for k := 0; k < len(container.Env); k++ {
envVar := container.Env[k] envVar := container.Env[k]
@@ -91,11 +91,11 @@ func (h *admissionReviewHandler) handleCreate(ar v1.AdmissionReview) *v1.Admissi
} }
} }
} }
_ = dhcp.ReleaseIP(context.Background(), ips...) _ = manager.ReleaseIP(context.Background(), ips...)
// 3) rent new ip // 3) rent new ip
var v4, v6 *net.IPNet var v4, v6 *net.IPNet
v4, v6, err = dhcp.RentIPRandom(context.Background()) v4, v6, err = manager.RentIP(context.Background())
if err != nil { if err != nil {
log.Errorf("rent ip random failed, err: %v", err) log.Errorf("rent ip random failed, err: %v", err)
return toV1AdmissionResponse(err) return toV1AdmissionResponse(err)
@@ -181,7 +181,7 @@ func (h *admissionReviewHandler) handleDelete(ar v1.AdmissionReview) *v1.Admissi
h.Lock() h.Lock()
defer h.Unlock() defer h.Unlock()
cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace) cmi := h.clientset.CoreV1().ConfigMaps(ar.Request.Namespace)
err := handler.NewDHCPManager(cmi, ar.Request.Namespace).ReleaseIP(context.Background(), ips...) err := dhcp.NewDHCPManager(cmi, ar.Request.Namespace).ReleaseIP(context.Background(), ips...)
if err != nil { if err != nil {
log.Errorf("release ip to dhcp err: %v, ips: %v", err, ips) log.Errorf("release ip to dhcp err: %v, ips: %v", err, ips)
} else { } else {