mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-12-24 11:51:13 +08:00
use ipam to optimize dhcp
This commit is contained in:
@@ -34,16 +34,9 @@ func (s *Server) Serve(ctx context.Context, h Handler) error {
|
||||
l := s.Listener
|
||||
var tempDelay time.Duration
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err := retry.OnError(retry.DefaultBackoff, func(err error) bool {
|
||||
return err != nil
|
||||
}, func() error {
|
||||
return l.Close()
|
||||
})
|
||||
if err != nil {
|
||||
log.Warnf("error while close listener, err: %v", err)
|
||||
}
|
||||
<-ctx.Done()
|
||||
if err := retry.OnError(retry.DefaultBackoff, func(err error) bool { return err != nil }, l.Close); err != nil {
|
||||
log.Warnf("error while close listener, err: %v", err)
|
||||
}
|
||||
}()
|
||||
for ctx.Err() == nil {
|
||||
|
||||
@@ -3,7 +3,6 @@ package core
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/wencaiwulue/kubevpn/util"
|
||||
"net"
|
||||
"sync"
|
||||
@@ -96,7 +95,7 @@ func (h *tunHandler) Handle(ctx context.Context, conn net.Conn) {
|
||||
case <-ctx.Done():
|
||||
h.chExit <- struct{}{}
|
||||
default:
|
||||
fmt.Println("next loop")
|
||||
log.Warnln("next loop")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
||||
3
go.mod
3
go.mod
@@ -3,6 +3,7 @@ module github.com/wencaiwulue/kubevpn
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/cilium/ipam v0.0.0-20211026130907-54a76012817c
|
||||
github.com/containerd/containerd v1.5.7
|
||||
github.com/docker/libcontainer v2.2.1+incompatible
|
||||
github.com/envoyproxy/go-control-plane v0.10.1
|
||||
@@ -33,7 +34,6 @@ require (
|
||||
k8s.io/client-go v0.21.2
|
||||
k8s.io/klog/v2 v2.10.0 // indirect
|
||||
k8s.io/kubectl v0.21.2
|
||||
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -91,6 +91,7 @@ require (
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
|
||||
k8s.io/component-base v0.21.2 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect
|
||||
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 // indirect
|
||||
sigs.k8s.io/kustomize/api v0.8.8 // indirect
|
||||
sigs.k8s.io/kustomize/kyaml v0.10.17 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.1.0 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -115,6 +115,8 @@ github.com/cilium/ebpf v0.0.0-20200702112145-1c8d4c9ef775/go.mod h1:7cR51M8ViRLI
|
||||
github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs=
|
||||
github.com/cilium/ebpf v0.4.0/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs=
|
||||
github.com/cilium/ebpf v0.6.2/go.mod h1:4tRaxcgiL706VnOzHOdBlY8IEAIdxINsQBcU4xJJXRs=
|
||||
github.com/cilium/ipam v0.0.0-20211026130907-54a76012817c h1:BNplQ8/gUxxF3ISPjM5h6+e/r1lld3VhGEjj2S02/7c=
|
||||
github.com/cilium/ipam v0.0.0-20211026130907-54a76012817c/go.mod h1:Ascfar4FtgB+K+mwqbZpSb3WVZ5sPFIarg+iAOXNZqI=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||
|
||||
@@ -74,7 +74,7 @@ func (c *ConnectOptions) createRemoteInboundPod() (err error) {
|
||||
}
|
||||
// TODO OPTIMIZE CODE
|
||||
if c.Mode == Mesh {
|
||||
err = PatchSidecar(c.factory, c.clientset, c.Namespace, finalWorkload, config, c.Headers)
|
||||
err = PatchSidecar(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, finalWorkload, config, c.Headers)
|
||||
} else {
|
||||
err = CreateInboundPod(c.factory, c.Namespace, finalWorkload, config)
|
||||
}
|
||||
@@ -100,7 +100,7 @@ func (c *ConnectOptions) DoConnect() (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
c.dhcp = NewDHCPManager(c.clientset, c.Namespace)
|
||||
c.dhcp = NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, &trafficMangerNet)
|
||||
if err = c.dhcp.InitDHCP(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
129
pkg/dhcp.go
129
pkg/dhcp.go
@@ -2,38 +2,37 @@ package pkg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"github.com/cilium/ipam/service/allocator"
|
||||
"github.com/cilium/ipam/service/ipallocator"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/wencaiwulue/kubevpn/util"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
DHCP = "DHCP"
|
||||
SPLITTER = ","
|
||||
DHCP = "DHCP"
|
||||
)
|
||||
|
||||
type DHCPManager struct {
|
||||
client *kubernetes.Clientset
|
||||
client v12.ConfigMapInterface
|
||||
cidr *net.IPNet
|
||||
namespace string
|
||||
}
|
||||
|
||||
func NewDHCPManager(client *kubernetes.Clientset, namespace string) *DHCPManager {
|
||||
func NewDHCPManager(client v12.ConfigMapInterface, namespace string, cidr *net.IPNet) *DHCPManager {
|
||||
return &DHCPManager{
|
||||
client: client,
|
||||
namespace: namespace,
|
||||
cidr: cidr,
|
||||
}
|
||||
}
|
||||
|
||||
// todo optimize dhcp, using mac address, ip and deadline as unit
|
||||
func (d *DHCPManager) InitDHCP() error {
|
||||
configMap, err := d.client.CoreV1().ConfigMaps(d.namespace).Get(context.Background(), util.TrafficManager, metav1.GetOptions{})
|
||||
configMap, err := d.client.Get(context.Background(), util.TrafficManager, metav1.GetOptions{})
|
||||
if err == nil && configMap != nil {
|
||||
return nil
|
||||
}
|
||||
@@ -43,9 +42,9 @@ func (d *DHCPManager) InitDHCP() error {
|
||||
Namespace: d.namespace,
|
||||
Labels: map[string]string{},
|
||||
},
|
||||
Data: map[string]string{DHCP: "100"},
|
||||
Data: map[string]string{},
|
||||
}
|
||||
_, err = d.client.CoreV1().ConfigMaps(d.namespace).Create(context.Background(), result, metav1.CreateOptions{})
|
||||
_, err = d.client.Create(context.Background(), result, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
log.Errorf("create dhcp error, err: %v", err)
|
||||
return err
|
||||
@@ -54,97 +53,69 @@ func (d *DHCPManager) InitDHCP() error {
|
||||
}
|
||||
|
||||
func (d *DHCPManager) RentIPBaseNICAddress() (*net.IPNet, error) {
|
||||
var ipC = make(chan int, 1)
|
||||
err := d.updateDHCPConfigMap(func(i sets.Int) sets.Int {
|
||||
ip := getIP(i)
|
||||
ipC <- ip
|
||||
return i.Insert(ip)
|
||||
ips := make(chan net.IP, 1)
|
||||
err := d.updateDHCPConfigMap(func(allocator *ipallocator.Range) error {
|
||||
ip, err := allocator.AllocateNext()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ips <- ip
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p := make(net.IP, net.IPv4len)
|
||||
copy(p, util.RouterIP.To4())
|
||||
p[3] = byte(<-ipC)
|
||||
return &net.IPNet{IP: p, Mask: util.CIDR.Mask}, nil
|
||||
return &net.IPNet{IP: <-ips, Mask: d.cidr.Mask}, nil
|
||||
}
|
||||
|
||||
func (d *DHCPManager) RentIPRandom() (*net.IPNet, error) {
|
||||
var ipC = make(chan int, 1)
|
||||
err := d.updateDHCPConfigMap(func(alreadyInUse sets.Int) sets.Int {
|
||||
ip := 0
|
||||
for i := 1; i < 255; i++ {
|
||||
if !alreadyInUse.Has(i) && i != 100 {
|
||||
ip = i
|
||||
break
|
||||
}
|
||||
var ipC = make(chan net.IP, 1)
|
||||
err := d.updateDHCPConfigMap(func(dhcp *ipallocator.Range) error {
|
||||
ip, err := dhcp.AllocateNext()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ipC <- ip
|
||||
return alreadyInUse.Insert(ip)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("update dhcp error after get ip, need to put ip back, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
p := make(net.IP, net.IPv4len)
|
||||
copy(p, util.RouterIP.To4())
|
||||
p[3] = byte(<-ipC)
|
||||
return &net.IPNet{IP: p, Mask: util.CIDR.Mask}, nil
|
||||
}
|
||||
|
||||
func getIP(alreadyInUse sets.Int) int {
|
||||
var v uint32
|
||||
interfaces, _ := net.Interfaces()
|
||||
for _, i := range interfaces {
|
||||
if i.HardwareAddr != nil {
|
||||
hash := md5.New()
|
||||
hash.Write([]byte(i.HardwareAddr.String()))
|
||||
sum := hash.Sum(nil)
|
||||
v = util.BytesToInt(sum)
|
||||
}
|
||||
}
|
||||
for {
|
||||
if i := int(v % 255); !alreadyInUse.Has(i) && i != 100 && i != 0 {
|
||||
return i
|
||||
}
|
||||
v++
|
||||
}
|
||||
}
|
||||
|
||||
func convertToString(m sets.Int) []string {
|
||||
var result []string
|
||||
for _, i := range m.List() {
|
||||
result = append(result, strconv.Itoa(i))
|
||||
}
|
||||
return result
|
||||
return &net.IPNet{IP: <-ipC, Mask: d.cidr.Mask}, nil
|
||||
}
|
||||
|
||||
func (d *DHCPManager) ReleaseIpToDHCP(ip *net.IPNet) error {
|
||||
ipN, err := strconv.Atoi(strings.Split(ip.IP.To4().String(), ".")[3])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return d.updateDHCPConfigMap(func(i sets.Int) sets.Int {
|
||||
return i.Delete(ipN)
|
||||
})
|
||||
return d.updateDHCPConfigMap(func(r *ipallocator.Range) error { return r.Release(ip.IP) })
|
||||
}
|
||||
|
||||
func (d *DHCPManager) updateDHCPConfigMap(f func(sets.Int) sets.Int) error {
|
||||
get, err := d.client.CoreV1().ConfigMaps(d.namespace).Get(context.Background(), util.TrafficManager, metav1.GetOptions{})
|
||||
func (d *DHCPManager) updateDHCPConfigMap(f func(*ipallocator.Range) error) error {
|
||||
cm, err := d.client.Get(context.Background(), util.TrafficManager, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
log.Errorf("failed to get dhcp, err: %v", err)
|
||||
return err
|
||||
}
|
||||
alreadyInUsed := strings.Split(get.Data[DHCP], SPLITTER)
|
||||
ipSet := sets.NewInt()
|
||||
for _, ip := range alreadyInUsed {
|
||||
if i, err := strconv.Atoi(ip); err == nil {
|
||||
ipSet.Insert(i)
|
||||
}
|
||||
if cm.Data == nil {
|
||||
cm.Data = make(map[string]string)
|
||||
}
|
||||
get.Data[DHCP] = strings.Join(convertToString(f(ipSet)), SPLITTER)
|
||||
_, err = d.client.CoreV1().ConfigMaps(d.namespace).Update(context.Background(), get, metav1.UpdateOptions{})
|
||||
dhcp, err := ipallocator.NewAllocatorCIDRRange(d.cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
|
||||
return allocator.NewContiguousAllocationMap(max, rangeSpec), nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = dhcp.Restore(d.cidr, []byte(cm.Data[DHCP])); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = f(dhcp); err != nil {
|
||||
return err
|
||||
}
|
||||
_, bytes, err := dhcp.Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cm.Data[DHCP] = string(bytes)
|
||||
_, err = d.client.Update(context.Background(), cm, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
log.Errorf("update dhcp error after release ip, need to try again, err: %v", err)
|
||||
return err
|
||||
|
||||
32
pkg/envoy.go
32
pkg/envoy.go
@@ -15,7 +15,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
pkgresource "k8s.io/cli-runtime/pkg/resource"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -26,7 +26,7 @@ import (
|
||||
|
||||
// patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1
|
||||
// TODO support multiple port
|
||||
func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string, c util.PodRouteConfig, headers map[string]string) error {
|
||||
func PatchSidecar(factory cmdutil.Factory, clientset v12.ConfigMapInterface, namespace, workloads string, c util.PodRouteConfig, headers map[string]string) error {
|
||||
//t := true
|
||||
//zero := int64(0)
|
||||
object, err := util.GetUnstructuredObject(factory, namespace, workloads)
|
||||
@@ -46,7 +46,7 @@ func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, name
|
||||
configMapName := fmt.Sprintf("%s-%s", object.Mapping.Resource.Resource, object.Name)
|
||||
|
||||
createEnvoyConfigMapIfNeeded(clientset, object.Namespace, configMapName, strconv.Itoa(int(port)))
|
||||
err = addEnvoyConfig(clientset, object.Namespace, configMapName, c.LocalTunIP, headers, port)
|
||||
err = addEnvoyConfig(clientset, configMapName, c.LocalTunIP, headers, port)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
return err
|
||||
@@ -96,7 +96,7 @@ func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, name
|
||||
return err
|
||||
}
|
||||
|
||||
func UnPatchContainer(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string, headers map[string]string) error {
|
||||
func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, namespace, workloads string, headers map[string]string) error {
|
||||
//t := true
|
||||
//zero := int64(0)
|
||||
object, err := util.GetUnstructuredObject(factory, namespace, workloads)
|
||||
@@ -113,8 +113,8 @@ func UnPatchContainer(factory cmdutil.Factory, clientset *kubernetes.Clientset,
|
||||
//port := uint32(templateSpec.Spec.Containers[0].Ports[0].ContainerPort)
|
||||
configMapName := fmt.Sprintf("%s-%s", object.Mapping.Resource.Resource, object.Name)
|
||||
|
||||
//createEnvoyConfigMapIfNeeded(clientset, object.Namespace, configMapName, strconv.Itoa(int(port)))
|
||||
err = removeEnvoyConfig(clientset, object.Namespace, configMapName, headers)
|
||||
//createEnvoyConfigMapIfNeeded(mapInterface, object.Namespace, configMapName, strconv.Itoa(int(port)))
|
||||
err = removeEnvoyConfig(mapInterface, configMapName, headers)
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
return err
|
||||
@@ -139,7 +139,7 @@ func UnPatchContainer(factory cmdutil.Factory, clientset *kubernetes.Clientset,
|
||||
//Force: &t,
|
||||
})
|
||||
|
||||
//_ = util.WaitPod(clientset, namespace, metav1.ListOptions{
|
||||
//_ = util.WaitPod(mapInterface, namespace, metav1.ListOptions{
|
||||
// FieldSelector: fields.OneTermEqualSelector("metadata.name", object.Name+"-shadow").String(),
|
||||
//}, func(pod *v1.Pod) bool {
|
||||
// return pod.Status.Phase == v1.PodRunning
|
||||
@@ -222,8 +222,8 @@ spec:
|
||||
port: %s
|
||||
`
|
||||
|
||||
func addEnvoyConfig(clientset *kubernetes.Clientset, namespace, workloads, localTUNIP string, headers map[string]string, port uint32) error {
|
||||
get, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), workloads, metav1.GetOptions{})
|
||||
func addEnvoyConfig(clientset v12.ConfigMapInterface, workloads, localTUNIP string, headers map[string]string, port uint32) error {
|
||||
get, err := clientset.Get(context.TODO(), workloads, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -267,12 +267,12 @@ func addEnvoyConfig(clientset *kubernetes.Clientset, namespace, workloads, local
|
||||
return err
|
||||
}
|
||||
get.Data["envoy-config.yaml"] = string(marshal)
|
||||
_, err = clientset.CoreV1().ConfigMaps(namespace).Update(context.TODO(), get, metav1.UpdateOptions{})
|
||||
_, err = clientset.Update(context.TODO(), get, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func removeEnvoyConfig(clientset *kubernetes.Clientset, namespace, configMapName string, headers map[string]string) error {
|
||||
configMap, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), configMapName, metav1.GetOptions{})
|
||||
func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, configMapName string, headers map[string]string) error {
|
||||
configMap, err := mapInterface.Get(context.TODO(), configMapName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -319,12 +319,12 @@ func removeEnvoyConfig(clientset *kubernetes.Clientset, namespace, configMapName
|
||||
return err
|
||||
}
|
||||
configMap.Data["envoy-config.yaml"] = string(marshal)
|
||||
_, err = clientset.CoreV1().ConfigMaps(namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{})
|
||||
_, err = mapInterface.Update(context.TODO(), configMap, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func createEnvoyConfigMapIfNeeded(clientset *kubernetes.Clientset, namespace, configMapName, port string) {
|
||||
cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), configMapName, metav1.GetOptions{})
|
||||
func createEnvoyConfigMapIfNeeded(clientset v12.ConfigMapInterface, namespace, configMapName, port string) {
|
||||
cm, err := clientset.Get(context.TODO(), configMapName, metav1.GetOptions{})
|
||||
if err == nil && cm != nil {
|
||||
return
|
||||
}
|
||||
@@ -341,7 +341,7 @@ func createEnvoyConfigMapIfNeeded(clientset *kubernetes.Clientset, namespace, co
|
||||
},
|
||||
}
|
||||
|
||||
_, err = clientset.CoreV1().ConfigMaps(namespace).Create(context.TODO(), &configMap, metav1.CreateOptions{})
|
||||
_, err = clientset.Create(context.TODO(), &configMap, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
}
|
||||
|
||||
@@ -75,8 +75,8 @@ func TestGetIPFromDHCP(t *testing.T) {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
manager := NewDHCPManager(clientset, "test")
|
||||
_, ipNet, err := net.ParseCIDR("192.168.1.100/24")
|
||||
manager := NewDHCPManager(clientset.CoreV1().ConfigMaps("test"), "test", ipNet)
|
||||
manager.InitDHCP()
|
||||
for i := 0; i < 10; i++ {
|
||||
ipNet, err := manager.RentIPRandom()
|
||||
|
||||
Reference in New Issue
Block a user