feat: rename

This commit is contained in:
wencaiwulue
2022-08-05 16:17:27 +08:00
parent ada917fa18
commit 7b2e9e1937
6 changed files with 18 additions and 18 deletions

274
pkg/controlplane/cache.go Normal file
View File

@@ -0,0 +1,274 @@
package controlplane
import (
"fmt"
"time"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
httpinspector "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/http_inspector/v3"
httpconnectionmanager "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
tcpproxy "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3"
matcher "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
corev1 "k8s.io/api/core/v1"
)
type Virtual struct {
Uid string // group.resource.name
Ports []corev1.ContainerPort
Rules []*Rule
}
type Rule struct {
Headers map[string]string
LocalTunIP string
}
func (a *Virtual) To() (
listeners []types.Resource,
clusters []types.Resource,
routes []types.Resource,
endpoints []types.Resource,
) {
//clusters = append(clusters, OriginCluster())
for _, port := range a.Ports {
listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, port.ContainerPort, port.Protocol)
routeName := listenerName
listeners = append(listeners, ToListener(listenerName, routeName, port.ContainerPort, port.Protocol))
var rr []*route.Route
for _, rule := range a.Rules {
clusterName := fmt.Sprintf("%s_%v", rule.LocalTunIP, port.ContainerPort)
clusters = append(clusters, ToCluster(clusterName))
endpoints = append(endpoints, ToEndPoint(clusterName, rule.LocalTunIP, port.ContainerPort))
rr = append(rr, ToRoute(clusterName, rule.Headers))
}
rr = append(rr, DefaultRoute())
routes = append(routes, &route.RouteConfiguration{
Name: routeName,
VirtualHosts: []*route.VirtualHost{
{
Name: "local_service",
Domains: []string{"*"},
Routes: rr,
},
},
})
}
return
}
func ToEndPoint(clusterName string, localTunIP string, port int32) *endpoint.ClusterLoadAssignment {
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*endpoint.LocalityLbEndpoints{{
LbEndpoints: []*endpoint.LbEndpoint{{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Address: localTunIP,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
},
},
}},
}},
}
}
func ToCluster(clusterName string) *cluster.Cluster {
return &cluster.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
DnsLookupFamily: cluster.Cluster_V4_ONLY,
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
EdsConfig: &core.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
},
},
}
}
func OriginCluster() *cluster.Cluster {
return &cluster.Cluster{
Name: "origin_cluster",
ConnectTimeout: durationpb.New(time.Second * 5),
LbPolicy: cluster.Cluster_CLUSTER_PROVIDED,
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_ORIGINAL_DST,
},
}
}
func ToRoute(clusterName string, headers map[string]string) *route.Route {
var r []*route.HeaderMatcher
for k, v := range headers {
r = append(r, &route.HeaderMatcher{
Name: k,
HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{
StringMatch: &matcher.StringMatcher{
MatchPattern: &matcher.StringMatcher_Exact{
Exact: v,
},
},
},
})
}
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
Headers: r,
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: clusterName,
},
},
},
}
}
func DefaultRoute() *route.Route {
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: "origin_cluster",
},
},
},
}
}
func ToListener(listenerName string, routeName string, port int32, p corev1.Protocol) *listener.Listener {
var protocol core.SocketAddress_Protocol
switch p {
case corev1.ProtocolTCP:
protocol = core.SocketAddress_TCP
case corev1.ProtocolUDP:
protocol = core.SocketAddress_UDP
case corev1.ProtocolSCTP:
protocol = core.SocketAddress_TCP
}
any := func(m proto.Message) *anypb.Any {
pbst, _ := anypb.New(m)
return pbst
}
httpManager := &httpconnectionmanager.HttpConnectionManager{
CodecType: httpconnectionmanager.HttpConnectionManager_AUTO,
StatPrefix: "http",
HttpFilters: []*httpconnectionmanager.HttpFilter{{
Name: wellknown.Router,
}},
RouteSpecifier: &httpconnectionmanager.HttpConnectionManager_Rds{
Rds: &httpconnectionmanager.Rds{
ConfigSource: &core.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{
ApiConfigSource: &core.ApiConfigSource{
TransportApiVersion: resource.DefaultAPIVersion,
ApiType: core.ApiConfigSource_GRPC,
SetNodeOnFirstMessageOnly: true,
GrpcServices: []*core.GrpcService{{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"},
},
}},
},
},
},
RouteConfigName: routeName,
},
},
}
tcpConfig := &tcpproxy.TcpProxy{
StatPrefix: "tcp",
ClusterSpecifier: &tcpproxy.TcpProxy_Cluster{
Cluster: "origin_cluster",
},
}
return &listener.Listener{
Name: listenerName,
TrafficDirection: core.TrafficDirection_INBOUND,
BindToPort: &wrappers.BoolValue{Value: false},
UseOriginalDst: &wrappers.BoolValue{Value: true},
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: protocol,
Address: "0.0.0.0",
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
FilterChains: []*listener.FilterChain{
{
FilterChainMatch: &listener.FilterChainMatch{
ApplicationProtocols: []string{"http/1.0", "http/1.1", "h2c"},
},
Filters: []*listener.Filter{
{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: any(httpManager),
},
},
},
},
{
Filters: []*listener.Filter{
{
Name: wellknown.TCPProxy,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: any(tcpConfig),
},
},
},
},
},
ListenerFilters: []*listener.ListenerFilter{
{
Name: wellknown.HttpInspector,
ConfigType: &listener.ListenerFilter_TypedConfig{
TypedConfig: any(&httpinspector.HttpInspector{}),
},
},
},
}
}

View File

@@ -0,0 +1,92 @@
package controlplane
import (
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"strconv"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/yaml"
)
type Processor struct {
cache cache.SnapshotCache
logger *logrus.Logger
version int64
}
func NewProcessor(cache cache.SnapshotCache, log *logrus.Logger) *Processor {
return &Processor{
cache: cache,
logger: log,
version: rand.Int63n(1000),
}
}
func (p *Processor) newVersion() string {
if p.version == math.MaxInt64 {
p.version = 0
}
p.version++
return strconv.FormatInt(p.version, 10)
}
func (p *Processor) ProcessFile(file NotifyMessage) {
configList, err := ParseYaml(file.FilePath)
if err != nil {
p.logger.Errorf("error parsing yaml file: %+v", err)
return
}
for _, config := range configList {
if len(config.Uid) == 0 {
continue
}
listeners, clusters, routes, endpoints := config.To()
resources := map[resource.Type][]types.Resource{
resource.ListenerType: listeners, // listeners
resource.RouteType: routes, // routes
resource.ClusterType: clusters, // clusters
resource.EndpointType: endpoints, // endpoints
resource.RuntimeType: {}, // runtimes
resource.SecretType: {}, // secrets
}
snapshot, err := cache.NewSnapshot(p.newVersion(), resources)
if err != nil {
p.logger.Errorf("snapshot inconsistency: %v, err: %v", snapshot, err)
return
}
if err = snapshot.Consistent(); err != nil {
p.logger.Errorf("snapshot inconsistency: %v, err: %v", snapshot, err)
return
}
p.logger.Debugf("will serve snapshot %+v, nodeID: %s", snapshot, config.Uid)
if err = p.cache.SetSnapshot(context.TODO(), config.Uid, snapshot); err != nil {
p.logger.Errorf("snapshot error %q for %v", err, snapshot)
p.logger.Fatal(err)
}
}
}
func ParseYaml(file string) ([]*Virtual, error) {
var virtualList = make([]*Virtual, 0)
yamlFile, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("Error reading YAML file: %s\n", err)
}
err = yaml.Unmarshal(yamlFile, &virtualList)
if err != nil {
return nil, err
}
return virtualList, nil
}

View File

@@ -0,0 +1,45 @@
package controlplane
import (
"context"
"fmt"
"log"
"net"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/grpc"
)
const (
grpcMaxConcurrentStreams = 1000000
)
func RunServer(ctx context.Context, server serverv3.Server, port uint) {
grpcServer := grpc.NewServer(grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
var lc net.ListenConfig
listener, err := lc.Listen(ctx, "tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, server)
routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, server)
listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, server)
secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, server)
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, server)
log.Printf("management server listening on %d\n", port)
if err = grpcServer.Serve(listener); err != nil {
log.Fatal(err)
}
}

View File

@@ -0,0 +1,68 @@
package controlplane
import (
"log"
"github.com/fsnotify/fsnotify"
)
type OperationType int
const (
Create OperationType = iota
Remove
Modify
)
type NotifyMessage struct {
Operation OperationType
FilePath string
}
func Watch(directory string, notifyCh chan<- NotifyMessage) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
done := make(chan bool)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op&fsnotify.Write == fsnotify.Write {
notifyCh <- NotifyMessage{
Operation: Modify,
FilePath: event.Name,
}
} else if event.Op&fsnotify.Create == fsnotify.Create {
notifyCh <- NotifyMessage{
Operation: Create,
FilePath: event.Name,
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
notifyCh <- NotifyMessage{
Operation: Remove,
FilePath: event.Name,
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}()
err = watcher.Add(directory)
if err != nil {
log.Fatal(err)
}
<-done
}