mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-09-26 19:31:17 +08:00
hotfix: fix wait envoy rule to works takes too long time (#674)
This commit is contained in:
@@ -39,8 +39,7 @@ const (
|
||||
ContainerSidecarVPN = "vpn"
|
||||
ContainerSidecarSyncthing = "syncthing"
|
||||
|
||||
VolumeEnvoyConfig = "envoy-config"
|
||||
VolumeSyncthing = "syncthing"
|
||||
VolumeSyncthing = "syncthing"
|
||||
|
||||
// innerIPv4Pool is used as tun ip
|
||||
// 198.19.0.0/16 network is part of the 198.18.0.0/15 (reserved for benchmarking).
|
||||
|
@@ -2,17 +2,16 @@ package controlplane
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
|
||||
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
log "github.com/sirupsen/logrus"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
|
||||
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
|
||||
)
|
||||
|
||||
func Main(ctx context.Context, filename string, port uint, logger *log.Logger) error {
|
||||
func Main(ctx context.Context, factory cmdutil.Factory, port uint, logger *log.Logger) error {
|
||||
snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger)
|
||||
proc := NewProcessor(snapshotCache, logger)
|
||||
|
||||
@@ -25,33 +24,20 @@ func Main(ctx context.Context, filename string, port uint, logger *log.Logger) e
|
||||
|
||||
notifyCh := make(chan NotifyMessage, 100)
|
||||
|
||||
notifyCh <- NotifyMessage{
|
||||
Operation: Create,
|
||||
FilePath: filename,
|
||||
}
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create file watcher: %v", err)
|
||||
}
|
||||
defer watcher.Close()
|
||||
err = watcher.Add(filename)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add file: %s to wather: %v", filename, err)
|
||||
}
|
||||
notifyCh <- NotifyMessage{}
|
||||
go func() {
|
||||
errChan <- Watch(watcher, filename, notifyCh)
|
||||
errChan <- Watch(ctx, factory, notifyCh)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-notifyCh:
|
||||
err = proc.ProcessFile(msg)
|
||||
err := proc.ProcessFile(msg)
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("Failed to process file: %v", err)
|
||||
return err
|
||||
}
|
||||
case err = <-errChan:
|
||||
case err := <-errChan:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
|
@@ -5,7 +5,6 @@ import (
|
||||
"encoding/json"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
@@ -46,7 +45,7 @@ func (p *Processor) newVersion() string {
|
||||
}
|
||||
|
||||
func (p *Processor) ProcessFile(file NotifyMessage) error {
|
||||
configList, err := ParseYaml(file.FilePath)
|
||||
configList, err := ParseYaml(file.Content)
|
||||
if err != nil {
|
||||
p.logger.Errorf("failed to parse config file: %v", err)
|
||||
return err
|
||||
@@ -66,7 +65,7 @@ func (p *Processor) ProcessFile(file NotifyMessage) error {
|
||||
uid := util.GenEnvoyUID(config.Namespace, config.Uid)
|
||||
lastConfig, ok := p.expireCache.Get(uid)
|
||||
if ok && reflect.DeepEqual(lastConfig.(*Virtual), config) {
|
||||
p.logger.Infof("not needs to update, config: %s", string(marshal))
|
||||
p.logger.Infof("no need to update, config: %s", string(marshal))
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -104,15 +103,10 @@ func (p *Processor) ProcessFile(file NotifyMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func ParseYaml(file string) ([]*Virtual, error) {
|
||||
func ParseYaml(content string) ([]*Virtual, error) {
|
||||
var virtualList = make([]*Virtual, 0)
|
||||
|
||||
yamlFile, err := os.ReadFile(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = yaml.Unmarshal(yamlFile, &virtualList)
|
||||
err := yaml.Unmarshal([]byte(content), &virtualList)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -1,62 +1,82 @@
|
||||
package controlplane
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
informerv1 "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||
|
||||
type OperationType int
|
||||
|
||||
const (
|
||||
Create OperationType = iota
|
||||
Remove
|
||||
Modify
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
||||
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
|
||||
)
|
||||
|
||||
type NotifyMessage struct {
|
||||
Operation OperationType
|
||||
FilePath string
|
||||
Content string
|
||||
}
|
||||
|
||||
func Watch(watcher *fsnotify.Watcher, filename string, notifyCh chan<- NotifyMessage) error {
|
||||
ticker := time.NewTicker(time.Second * 2)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
return fmt.Errorf("watcher has closed")
|
||||
}
|
||||
if event.Op&fsnotify.Write == fsnotify.Write {
|
||||
notifyCh <- NotifyMessage{
|
||||
Operation: Modify,
|
||||
FilePath: event.Name,
|
||||
}
|
||||
} else if event.Op&fsnotify.Create == fsnotify.Create {
|
||||
notifyCh <- NotifyMessage{
|
||||
Operation: Create,
|
||||
FilePath: event.Name,
|
||||
}
|
||||
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
|
||||
notifyCh <- NotifyMessage{
|
||||
Operation: Remove,
|
||||
FilePath: event.Name,
|
||||
}
|
||||
}
|
||||
func Watch(ctx context.Context, f cmdutil.Factory, notifyCh chan<- NotifyMessage) error {
|
||||
namespace, _, err := f.ToRawKubeConfigLoader().Namespace()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
restConfig, err := f.ToRESTConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conf := rest.CopyConfig(restConfig)
|
||||
conf.QPS = 1
|
||||
conf.Burst = 2
|
||||
clientSet, err := kubernetes.NewForConfig(conf)
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("Failed to create clientset: %v", err)
|
||||
return err
|
||||
}
|
||||
cmIndexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
|
||||
cmInformer := informerv1.NewFilteredConfigMapInformer(clientSet, namespace, 0, cmIndexers, func(options *metav1.ListOptions) {
|
||||
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", config.ConfigMapPodTrafficManager).String()
|
||||
})
|
||||
cmTicker := time.NewTicker(time.Second * 5)
|
||||
_, err = cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
cmTicker.Reset(time.Nanosecond * 1)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
cmTicker.Reset(time.Nanosecond * 1)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
cmTicker.Reset(time.Nanosecond * 1)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("Failed to add service event handler: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
case err, ok := <-watcher.Errors:
|
||||
if !ok {
|
||||
return fmt.Errorf("watcher error closed")
|
||||
}
|
||||
return err
|
||||
|
||||
case <-ticker.C:
|
||||
notifyCh <- NotifyMessage{
|
||||
Operation: Modify,
|
||||
FilePath: filename,
|
||||
go cmInformer.Run(ctx.Done())
|
||||
defer cmTicker.Stop()
|
||||
for ; ctx.Err() == nil; <-cmTicker.C {
|
||||
cmTicker.Reset(time.Second * 5)
|
||||
cmList := cmInformer.GetIndexer().List()
|
||||
if len(cmList) == 0 {
|
||||
continue
|
||||
}
|
||||
for _, cm := range cmList {
|
||||
configMap, ok := cm.(*v1.ConfigMap)
|
||||
if ok {
|
||||
if configMap.Data == nil {
|
||||
configMap.Data = make(map[string]string)
|
||||
}
|
||||
notifyCh <- NotifyMessage{Content: configMap.Data[config.KeyEnvoy]}
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return ctx.Err()
|
||||
}
|
||||
|
@@ -340,23 +340,7 @@ func genDeploySpec(namespace string, udp8422 string, tcp10800 string, tcp9002 st
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
ServiceAccountName: config.ConfigMapPodTrafficManager,
|
||||
Volumes: []v1.Volume{{
|
||||
Name: config.VolumeEnvoyConfig,
|
||||
VolumeSource: v1.VolumeSource{
|
||||
ConfigMap: &v1.ConfigMapVolumeSource{
|
||||
LocalObjectReference: v1.LocalObjectReference{
|
||||
Name: config.ConfigMapPodTrafficManager,
|
||||
},
|
||||
Items: []v1.KeyToPath{
|
||||
{
|
||||
Key: config.KeyEnvoy,
|
||||
Path: "envoy-config.yaml",
|
||||
},
|
||||
},
|
||||
Optional: pointer.Bool(false),
|
||||
},
|
||||
},
|
||||
}},
|
||||
Volumes: []v1.Volume{},
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: config.ContainerSidecarVPN,
|
||||
@@ -434,7 +418,7 @@ kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}"
|
||||
Name: config.ContainerSidecarControlPlane,
|
||||
Image: image,
|
||||
Command: []string{"kubevpn"},
|
||||
Args: []string{"control-plane", "--watchDirectoryFilename", "/etc/envoy/envoy-config.yaml"},
|
||||
Args: []string{"control-plane"},
|
||||
Ports: []v1.ContainerPort{
|
||||
{
|
||||
Name: tcp9002,
|
||||
@@ -447,13 +431,7 @@ kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}"
|
||||
Protocol: v1.ProtocolUDP,
|
||||
},
|
||||
},
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: config.VolumeEnvoyConfig,
|
||||
ReadOnly: true,
|
||||
MountPath: "/etc/envoy",
|
||||
},
|
||||
},
|
||||
VolumeMounts: []v1.VolumeMount{},
|
||||
ImagePullPolicy: v1.PullIfNotPresent,
|
||||
Resources: resourcesSmall,
|
||||
},
|
||||
|
Reference in New Issue
Block a user