This commit is contained in:
wencaiwulue
2022-01-05 07:46:22 +08:00
parent 7c66aba5b7
commit d61a75aa35
13 changed files with 13 additions and 13 deletions

View File

@@ -0,0 +1,39 @@
package v1alpha1
type EnvoyConfig struct {
Name string `yaml:"name"`
Spec `yaml:"spec"`
}
type Spec struct {
Listeners []Listener `yaml:"listeners"`
Clusters []Cluster `yaml:"clusters"`
}
type Listener struct {
Name string `yaml:"name"`
Address string `yaml:"address"`
Port uint32 `yaml:"port"`
Routes []Route `yaml:"routes"`
}
type Route struct {
Name string `yaml:"name"`
Headers []HeaderMatch `yaml:"headers"`
ClusterNames []string `yaml:"clusters"`
}
type HeaderMatch struct {
Key string `yaml:"key"`
Value string `yaml:"value"`
}
type Cluster struct {
Name string `yaml:"name"`
Endpoints []Endpoint `yaml:"endpoints"`
}
type Endpoint struct {
Address string `yaml:"address"`
Port uint32 `yaml:"port"`
}

View File

@@ -0,0 +1,65 @@
package main
import (
"context"
"flag"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/processor"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/server"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/watcher"
)
var (
l log.FieldLogger
watchDirectoryFileName string
port uint = 9002
nodeID = "test-id"
)
func init() {
l = log.New()
log.SetLevel(log.DebugLevel)
// Define the directory to watch for Envoy configuration files
flag.StringVar(&watchDirectoryFileName, "watchDirectoryFileName", "/etc/envoy/envoy-config.yaml", "full path to directory to watch for files")
}
func main() {
flag.Parse()
// Create a cache
snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, l)
// Create a processor
proc := processor.NewProcessor(snapshotCache, nodeID, log.WithField("context", "processor"))
// Create initial snapshot from file
proc.ProcessFile(watcher.NotifyMessage{
Operation: watcher.Create,
FilePath: watchDirectoryFileName,
})
// Notify channel for file system events
notifyCh := make(chan watcher.NotifyMessage)
go func() {
// Watch for file changes
watcher.Watch(watchDirectoryFileName, notifyCh)
}()
go func() {
// Run the xDS server
ctx := context.Background()
srv := serverv3.NewServer(ctx, snapshotCache, nil)
server.RunServer(ctx, srv, port)
}()
for {
select {
case msg := <-notifyCh:
proc.ProcessFile(msg)
}
}
}

View File

@@ -0,0 +1,110 @@
/*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
* This source code is licensed under the Apache License Version 2.0.
*/
package processor
import (
"fmt"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/apis/v1alpha1"
"github.com/wencaiwulue/kubevpn/util"
"gopkg.in/yaml.v2"
"testing"
)
func TestName(t *testing.T) {
var config = v1alpha1.EnvoyConfig{
Name: "config-test",
Spec: v1alpha1.Spec{
Listeners: []v1alpha1.Listener{
{
Name: "listener1",
Address: "127.0.0.1",
Port: 15006,
Routes: []v1alpha1.Route{
{
Name: "route-0",
Headers: []v1alpha1.HeaderMatch{
{
Key: "a",
Value: "aa",
},
{
Key: "b",
Value: "bb",
},
},
ClusterNames: []string{"cluster0"},
}, {
Name: "route-1",
Headers: []v1alpha1.HeaderMatch{
{
Key: "c",
Value: "cc",
},
{
Key: "d",
Value: "dd",
},
},
ClusterNames: []string{"cluster-1"},
},
},
},
},
Clusters: []v1alpha1.Cluster{
{
Name: "cluster-0",
Endpoints: []v1alpha1.Endpoint{
{
Address: "127.0.0.1",
Port: 9101,
},
{
Address: "127.0.0.1",
Port: 9102,
},
},
}, {
Name: "cluster-1",
Endpoints: []v1alpha1.Endpoint{
{
Address: "127.0.0.1",
Port: 9103,
},
{
Address: "127.0.0.1",
Port: 9104,
},
},
},
},
},
}
marshal, _ := yaml.Marshal(config)
fmt.Println(string(marshal))
}
func TestName1(t *testing.T) {
parseYaml, err := util.ParseYamlBytes([]byte(sss))
fmt.Println(err)
fmt.Println(parseYaml)
}
var sss = `name: config-test
spec:
listeners:
- name: listener1
address: 127.0.0.1
port: 15006
routes:
- name: route-0
clusters:
- cluster-0
clusters:
- name: cluster-0
endpoints:
- address: 127.0.0.1
port: 9080
`

View File

@@ -0,0 +1,120 @@
package processor
import (
"context"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/resources"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/watcher"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/xdscache"
"github.com/wencaiwulue/kubevpn/util"
"math"
"math/rand"
"os"
"strconv"
)
type Processor struct {
cache cache.SnapshotCache
nodeID string
// snapshotVersion holds the current version of the snapshot.
snapshotVersion int64
logrus.FieldLogger
xdsCache xdscache.XDSCache
}
func NewProcessor(cache cache.SnapshotCache, nodeID string, log logrus.FieldLogger) *Processor {
return &Processor{
cache: cache,
nodeID: nodeID,
snapshotVersion: rand.Int63n(1000),
FieldLogger: log,
xdsCache: xdscache.XDSCache{
Listeners: make(map[string]resources.Listener),
Clusters: make(map[string]resources.Cluster),
Routes: make(map[string]resources.Route),
Endpoints: make(map[string]resources.Endpoint),
},
}
}
// newSnapshotVersion increments the current snapshotVersion
// and returns as a string.
func (p *Processor) newSnapshotVersion() string {
// Reset the snapshotVersion if it ever hits max size.
if p.snapshotVersion == math.MaxInt64 {
p.snapshotVersion = 0
}
// Increment the snapshot version & return as string.
p.snapshotVersion++
return strconv.FormatInt(p.snapshotVersion, 10)
}
// ProcessFile takes a file and generates an xDS snapshot
func (p *Processor) ProcessFile(file watcher.NotifyMessage) {
// Parse file into object
envoyConfig, err := util.ParseYaml(file.FilePath)
if err != nil {
p.Errorf("error parsing yaml file: %+v", err)
return
}
// Parse Listeners
for _, l := range envoyConfig.Listeners {
var lRoutes []string
for _, lr := range l.Routes {
lRoutes = append(lRoutes, lr.Name)
}
p.xdsCache.AddListener(l.Name, lRoutes, l.Address, l.Port)
for _, r := range l.Routes {
p.xdsCache.AddRoute(r.Name, r.Headers, r.ClusterNames)
}
}
// Parse Clusters
for _, c := range envoyConfig.Clusters {
p.xdsCache.AddCluster(c.Name)
// Parse endpoints
for _, e := range c.Endpoints {
p.xdsCache.AddEndpoint(c.Name, e.Address, e.Port)
}
}
a := map[resource.Type][]types.Resource{
resource.EndpointType: p.xdsCache.EndpointsContents(), // endpoints
resource.ClusterType: p.xdsCache.ClusterContents(), // clusters
resource.RouteType: p.xdsCache.RouteContents(), // routes
resource.ListenerType: p.xdsCache.ListenerContents(), // listeners
resource.RuntimeType: {}, // runtimes
resource.SecretType: {}, // secrets
}
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(p.newSnapshotVersion(), a)
if err != nil {
p.Errorf("snapshot inconsistency err: %+v\n\n\n%+v", snapshot, err)
return
}
if err = snapshot.Consistent(); err != nil {
p.Errorf("snapshot inconsistency: %+v\n\n\n%+v", snapshot, err)
return
}
p.Debugf("will serve snapshot %+v", snapshot)
// Add the snapshot to the cache
if err = p.cache.SetSnapshot(context.TODO(), p.nodeID, snapshot); err != nil {
p.Errorf("snapshot error %q for %+v", err, snapshot)
os.Exit(1)
}
}

View File

@@ -0,0 +1,29 @@
package resources
type Listener struct {
Name string
Address string
Port uint32
RouteNames []string
}
type Route struct {
Name string
Headers []Header
Cluster string
}
type Header struct {
Key string
Value string
}
type Cluster struct {
Name string
Endpoints []Endpoint
}
type Endpoint struct {
UpstreamHost string
UpstreamPort uint32
}

View File

@@ -0,0 +1,190 @@
package resources
import (
etmv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"time"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
)
func MakeCluster(clusterName string) *cluster.Cluster {
return &cluster.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
//LoadAssignment: makeEndpoint(clusterName, UpstreamHost),
DnsLookupFamily: cluster.Cluster_V4_ONLY,
EdsClusterConfig: makeEDSCluster(),
}
}
func makeEDSCluster() *cluster.Cluster_EdsClusterConfig {
return &cluster.Cluster_EdsClusterConfig{
EdsConfig: makeConfigSource(),
}
}
func MakeEndpoint(clusterName string, eps []Endpoint) *endpoint.ClusterLoadAssignment {
var endpoints []*endpoint.LbEndpoint
for _, e := range eps {
endpoints = append(endpoints, &endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: e.UpstreamHost,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: e.UpstreamPort,
},
},
},
},
},
},
})
}
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*endpoint.LocalityLbEndpoints{{
LbEndpoints: endpoints,
}},
}
}
func MakeRoute(routes []Route) *route.RouteConfiguration {
var rts []*route.Route
for _, r := range routes {
var rr []*route.HeaderMatcher
for _, header := range r.Headers {
rr = append(rr, &route.HeaderMatcher{
Name: header.Key,
HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{
StringMatch: &etmv3.StringMatcher{
MatchPattern: &etmv3.StringMatcher_Contains{
Contains: header.Value,
},
IgnoreCase: true,
},
},
})
}
rts = append(rts, &route.Route{
//Name: r.Name,
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
Headers: rr,
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: r.Cluster,
},
},
},
})
}
//rts = append(rts, &route.Route{
// //Name: r.Name,
// Match: &route.RouteMatch{
// PathSpecifier: &route.RouteMatch_Prefix{
// Prefix: "/",
// },
// },
// Action: &route.Route_Route{
// Route: &route.RouteAction{
// ClusterSpecifier: &route.RouteAction_Cluster{
// Cluster: "default_cluster",
// },
// },
// },
//})
return &route.RouteConfiguration{
Name: "listener_0",
VirtualHosts: []*route.VirtualHost{{
Name: "local_service",
Domains: []string{"*"},
Routes: rts,
}},
}
}
func MakeHTTPListener(listenerName, route, address string, port uint32) *listener.Listener {
// HTTP filter configuration
manager := &hcm.HttpConnectionManager{
CodecType: hcm.HttpConnectionManager_AUTO,
StatPrefix: "http",
RouteSpecifier: &hcm.HttpConnectionManager_Rds{
Rds: &hcm.Rds{
ConfigSource: makeConfigSource(),
RouteConfigName: "listener_0",
},
},
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
}},
}
pbst, err := anypb.New(manager)
if err != nil {
panic(err)
}
return &listener.Listener{
Name: listenerName,
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: address,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
FilterChains: []*listener.FilterChain{{
Filters: []*listener.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: pbst,
},
}},
}},
}
}
func makeConfigSource() *core.ConfigSource {
source := &core.ConfigSource{}
source.ResourceApiVersion = resource.DefaultAPIVersion
source.ConfigSourceSpecifier = &core.ConfigSource_ApiConfigSource{
ApiConfigSource: &core.ApiConfigSource{
TransportApiVersion: resource.DefaultAPIVersion,
ApiType: core.ApiConfigSource_GRPC,
SetNodeOnFirstMessageOnly: true,
GrpcServices: []*core.GrpcService{{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"},
},
}},
},
}
return source
}

View File

@@ -0,0 +1,45 @@
package server
import (
"context"
"fmt"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
"log"
"net"
)
const (
grpcMaxConcurrentStreams = 1000000
)
// RunServer starts an xDS server at the given port.
func RunServer(ctx context.Context, server serverv3.Server, port uint) {
grpcServer := grpc.NewServer(grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
var lc net.ListenConfig
listener, err := lc.Listen(ctx, "tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, server)
routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, server)
listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, server)
secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, server)
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, server)
log.Printf("management server listening on %d\n", port)
if err = grpcServer.Serve(listener); err != nil {
log.Println(err)
}
}

View File

@@ -0,0 +1,68 @@
package watcher
import (
"log"
"github.com/fsnotify/fsnotify"
)
type OperationType int
const (
Create OperationType = iota
Remove
Modify
)
type NotifyMessage struct {
Operation OperationType
FilePath string
}
func Watch(directory string, notifyCh chan<- NotifyMessage) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
done := make(chan bool)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
notifyCh <- NotifyMessage{
Operation: Modify,
FilePath: event.Name,
}
} else if event.Op&fsnotify.Create == fsnotify.Create {
notifyCh <- NotifyMessage{
Operation: Create,
FilePath: event.Name,
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
notifyCh <- NotifyMessage{
Operation: Remove,
FilePath: event.Name,
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}()
err = watcher.Add(directory)
if err != nil {
log.Fatal(err)
}
<-done
}

View File

@@ -0,0 +1,95 @@
package xdscache
import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/apis/v1alpha1"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/resources"
)
type XDSCache struct {
Listeners map[string]resources.Listener
Routes map[string]resources.Route
Clusters map[string]resources.Cluster
Endpoints map[string]resources.Endpoint
}
func (xds *XDSCache) ClusterContents() []types.Resource {
var r []types.Resource
for _, c := range xds.Clusters {
r = append(r, resources.MakeCluster(c.Name))
}
return r
}
func (xds *XDSCache) RouteContents() []types.Resource {
var routesArray []resources.Route
for _, r := range xds.Routes {
routesArray = append(routesArray, r)
}
return []types.Resource{resources.MakeRoute(routesArray)}
}
func (xds *XDSCache) ListenerContents() []types.Resource {
var r []types.Resource
for _, l := range xds.Listeners {
r = append(r, resources.MakeHTTPListener(l.Name, l.RouteNames[0], l.Address, l.Port))
}
return r
}
func (xds *XDSCache) EndpointsContents() []types.Resource {
var r []types.Resource
for _, c := range xds.Clusters {
r = append(r, resources.MakeEndpoint(c.Name, c.Endpoints))
}
return r
}
func (xds *XDSCache) AddListener(name string, routeNames []string, address string, port uint32) {
xds.Listeners[name] = resources.Listener{
Name: name,
Address: address,
Port: port,
RouteNames: routeNames,
}
}
func (xds *XDSCache) AddRoute(name string, headers []v1alpha1.HeaderMatch, clusters []string) {
var h []resources.Header
for _, header := range headers {
h = append(h, resources.Header{
Key: header.Key,
Value: header.Value,
})
}
xds.Routes[name] = resources.Route{
Name: name,
Headers: h,
Cluster: clusters[0],
}
}
func (xds *XDSCache) AddCluster(name string) {
xds.Clusters[name] = resources.Cluster{
Name: name,
}
}
func (xds *XDSCache) AddEndpoint(clusterName, upstreamHost string, upstreamPort uint32) {
cluster := xds.Clusters[clusterName]
cluster.Endpoints = append(cluster.Endpoints, resources.Endpoint{
UpstreamHost: upstreamHost,
UpstreamPort: upstreamPort,
})
xds.Clusters[clusterName] = cluster
}