Files
kubevpn/pkg/envoy/internal/processor/processor.go
2021-12-31 08:54:46 +08:00

121 lines
3.3 KiB
Go

package processor
import (
"context"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/envoy/internal/resources"
"github.com/wencaiwulue/kubevpn/pkg/envoy/internal/watcher"
"github.com/wencaiwulue/kubevpn/pkg/envoy/internal/xdscache"
"github.com/wencaiwulue/kubevpn/util"
"math"
"math/rand"
"os"
"strconv"
)
type Processor struct {
cache cache.SnapshotCache
nodeID string
// snapshotVersion holds the current version of the snapshot.
snapshotVersion int64
logrus.FieldLogger
xdsCache xdscache.XDSCache
}
func NewProcessor(cache cache.SnapshotCache, nodeID string, log logrus.FieldLogger) *Processor {
return &Processor{
cache: cache,
nodeID: nodeID,
snapshotVersion: rand.Int63n(1000),
FieldLogger: log,
xdsCache: xdscache.XDSCache{
Listeners: make(map[string]resources.Listener),
Clusters: make(map[string]resources.Cluster),
Routes: make(map[string]resources.Route),
Endpoints: make(map[string]resources.Endpoint),
},
}
}
// newSnapshotVersion increments the current snapshotVersion
// and returns as a string.
func (p *Processor) newSnapshotVersion() string {
// Reset the snapshotVersion if it ever hits max size.
if p.snapshotVersion == math.MaxInt64 {
p.snapshotVersion = 0
}
// Increment the snapshot version & return as string.
p.snapshotVersion++
return strconv.FormatInt(p.snapshotVersion, 10)
}
// ProcessFile takes a file and generates an xDS snapshot
func (p *Processor) ProcessFile(file watcher.NotifyMessage) {
// Parse file into object
envoyConfig, err := util.ParseYaml(file.FilePath)
if err != nil {
p.Errorf("error parsing yaml file: %+v", err)
return
}
// Parse Listeners
for _, l := range envoyConfig.Listeners {
var lRoutes []string
for _, lr := range l.Routes {
lRoutes = append(lRoutes, lr.Name)
}
p.xdsCache.AddListener(l.Name, lRoutes, l.Address, l.Port)
for _, r := range l.Routes {
p.xdsCache.AddRoute(r.Name, r.Headers, r.ClusterNames)
}
}
// Parse Clusters
for _, c := range envoyConfig.Clusters {
p.xdsCache.AddCluster(c.Name)
// Parse endpoints
for _, e := range c.Endpoints {
p.xdsCache.AddEndpoint(c.Name, e.Address, e.Port)
}
}
a := map[resource.Type][]types.Resource{
resource.EndpointType: p.xdsCache.EndpointsContents(), // endpoints
resource.ClusterType: p.xdsCache.ClusterContents(), // clusters
resource.RouteType: p.xdsCache.RouteContents(), // routes
resource.ListenerType: p.xdsCache.ListenerContents(), // listeners
resource.RuntimeType: {}, // runtimes
resource.SecretType: {}, // secrets
}
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(p.newSnapshotVersion(), a)
if err != nil {
p.Errorf("snapshot inconsistency err: %+v\n\n\n%+v", snapshot, err)
return
}
if err = snapshot.Consistent(); err != nil {
p.Errorf("snapshot inconsistency: %+v\n\n\n%+v", snapshot, err)
return
}
p.Debugf("will serve snapshot %+v", snapshot)
// Add the snapshot to the cache
if err = p.cache.SetSnapshot(context.TODO(), p.nodeID, snapshot); err != nil {
p.Errorf("snapshot error %q for %+v", err, snapshot)
os.Exit(1)
}
}