mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-12-24 11:51:13 +08:00
feat: add cache for control-plane
This commit is contained in:
@@ -3,10 +3,10 @@ package controlplane
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
|
||||
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
|
||||
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -41,7 +41,6 @@ func Main(filename string, port uint, logger *log.Logger) {
|
||||
for {
|
||||
select {
|
||||
case msg := <-notifyCh:
|
||||
log.Infof("path: %s, event: %v", msg.FilePath, msg.Operation)
|
||||
proc.ProcessFile(msg)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,12 +6,15 @@ import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
|
||||
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
|
||||
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
|
||||
"github.com/sirupsen/logrus"
|
||||
utilcache "k8s.io/apimachinery/pkg/util/cache"
|
||||
"k8s.io/apimachinery/pkg/util/yaml"
|
||||
)
|
||||
|
||||
@@ -19,13 +22,16 @@ type Processor struct {
|
||||
cache cache.SnapshotCache
|
||||
logger *logrus.Logger
|
||||
version int64
|
||||
|
||||
expireCache *utilcache.Expiring
|
||||
}
|
||||
|
||||
func NewProcessor(cache cache.SnapshotCache, log *logrus.Logger) *Processor {
|
||||
return &Processor{
|
||||
cache: cache,
|
||||
logger: log,
|
||||
version: rand.Int63n(1000),
|
||||
cache: cache,
|
||||
logger: log,
|
||||
version: rand.Int63n(1000),
|
||||
expireCache: utilcache.NewExpiring(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,6 +53,13 @@ func (p *Processor) ProcessFile(file NotifyMessage) {
|
||||
if len(config.Uid) == 0 {
|
||||
continue
|
||||
}
|
||||
lastConfig, ok := p.expireCache.Get(config.Uid)
|
||||
if ok && reflect.DeepEqual(lastConfig.(*Virtual), config) {
|
||||
p.logger.Infof("config are same, not needs to update, %v", config)
|
||||
continue
|
||||
}
|
||||
p.logger.Infof("update config, version %d, config %v", p.version, config)
|
||||
|
||||
listeners, clusters, routes, endpoints := config.To()
|
||||
resources := map[resource.Type][]types.Resource{
|
||||
resource.ListenerType: listeners, // listeners
|
||||
@@ -72,6 +85,7 @@ func (p *Processor) ProcessFile(file NotifyMessage) {
|
||||
p.logger.Errorf("snapshot error %q for %v", err, snapshot)
|
||||
p.logger.Fatal(err)
|
||||
}
|
||||
p.expireCache.Set(config.Uid, config, time.Minute*5)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ func Watch(watcher *fsnotify.Watcher, filename string, notifyCh chan<- NotifyMes
|
||||
}
|
||||
log.Println("error:", err)
|
||||
|
||||
case <-time.Tick(time.Second * 3):
|
||||
case <-time.Tick(time.Second * 5):
|
||||
notifyCh <- NotifyMessage{
|
||||
Operation: Modify,
|
||||
FilePath: filename,
|
||||
|
||||
Reference in New Issue
Block a user