From c99f14bb8d6a69a262a7630b57a38da64302a125 Mon Sep 17 00:00:00 2001 From: fengcaiwen Date: Sun, 19 Mar 2023 22:34:08 +0800 Subject: [PATCH] feat: add cache for control-plane --- pkg/controlplane/main.go | 3 +-- pkg/controlplane/processor.go | 20 +++++++++++++++++--- pkg/controlplane/watcher.go | 2 +- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/pkg/controlplane/main.go b/pkg/controlplane/main.go index cfe06370..ca7d2df2 100644 --- a/pkg/controlplane/main.go +++ b/pkg/controlplane/main.go @@ -3,10 +3,10 @@ package controlplane import ( "context" "fmt" - "github.com/fsnotify/fsnotify" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/fsnotify/fsnotify" log "github.com/sirupsen/logrus" ) @@ -41,7 +41,6 @@ func Main(filename string, port uint, logger *log.Logger) { for { select { case msg := <-notifyCh: - log.Infof("path: %s, event: %v", msg.FilePath, msg.Operation) proc.ProcessFile(msg) } } diff --git a/pkg/controlplane/processor.go b/pkg/controlplane/processor.go index b9add029..2848e83d 100644 --- a/pkg/controlplane/processor.go +++ b/pkg/controlplane/processor.go @@ -6,12 +6,15 @@ import ( "math" "math/rand" "os" + "reflect" "strconv" + "time" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/sirupsen/logrus" + utilcache "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/util/yaml" ) @@ -19,13 +22,16 @@ type Processor struct { cache cache.SnapshotCache logger *logrus.Logger version int64 + + expireCache *utilcache.Expiring } func NewProcessor(cache cache.SnapshotCache, log *logrus.Logger) *Processor { return &Processor{ - cache: cache, - logger: log, - version: rand.Int63n(1000), + cache: cache, + logger: log, + version: rand.Int63n(1000), + expireCache: utilcache.NewExpiring(), } } @@ -47,6 +53,13 @@ func (p *Processor) ProcessFile(file NotifyMessage) { if len(config.Uid) == 0 { continue } + lastConfig, ok := p.expireCache.Get(config.Uid) + if ok && reflect.DeepEqual(lastConfig.(*Virtual), config) { + p.logger.Infof("config are same, not needs to update, %v", config) + continue + } + p.logger.Infof("update config, version %d, config %v", p.version, config) + listeners, clusters, routes, endpoints := config.To() resources := map[resource.Type][]types.Resource{ resource.ListenerType: listeners, // listeners @@ -72,6 +85,7 @@ func (p *Processor) ProcessFile(file NotifyMessage) { p.logger.Errorf("snapshot error %q for %v", err, snapshot) p.logger.Fatal(err) } + p.expireCache.Set(config.Uid, config, time.Minute*5) } } diff --git a/pkg/controlplane/watcher.go b/pkg/controlplane/watcher.go index 2c004c40..65f0d8ee 100644 --- a/pkg/controlplane/watcher.go +++ b/pkg/controlplane/watcher.go @@ -50,7 +50,7 @@ func Watch(watcher *fsnotify.Watcher, filename string, notifyCh chan<- NotifyMes } log.Println("error:", err) - case <-time.Tick(time.Second * 3): + case <-time.Tick(time.Second * 5): notifyCh <- NotifyMessage{ Operation: Modify, FilePath: filename,