diff --git a/Makefile b/Makefile index e312c51d..1b44a457 100644 --- a/Makefile +++ b/Makefile @@ -80,12 +80,17 @@ image: kubevpn-linux-amd64 mv kubevpn-linux-amd64 kubevpn docker build -t naison/kubevpn:${VERSION} -f ./dockerfile/server/Dockerfile . rm -fr kubevpn + docker tag naison/kubevpn:${VERSION} naison/kubevpn:latest docker push naison/kubevpn:${VERSION} + docker push naison/kubevpn:latest .PHONY: image-mesh image-mesh: docker build -t naison/kubevpn-mesh:${VERSION} -f ./dockerfile/mesh/Dockerfile . + docker tag naison/kubevpn-mesh:${VERSION} naison/kubevpn-mesh:latest docker push naison/kubevpn-mesh:${VERSION} + docker push naison/kubevpn-mesh:latest + .PHONY: image-control-plane image-control-plane: @@ -93,5 +98,7 @@ image-control-plane: chmod +x envoy-xds-server docker build -t naison/envoy-xds-server:${VERSION} -f ./dockerfile/control_plane/Dockerfile . rm -fr envoy-xds-server + docker tag naison/envoy-xds-server:${VERSION} naison/envoy-xds-server:latest docker push naison/envoy-xds-server:${VERSION} + docker push naison/envoy-xds-server:latest diff --git a/cmd/mesh/main.go b/cmd/mesh/main.go index 58d89aee..074a9801 100644 --- a/cmd/mesh/main.go +++ b/cmd/mesh/main.go @@ -11,7 +11,7 @@ import ( ) var ( - logger log.FieldLogger + logger *log.Logger watchDirectoryFileName string port uint = 9002 ) @@ -25,25 +25,23 @@ func init() { } func main() { - // Create a cache snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger) - // Create a processor - proc := control_plane.NewProcessor(snapshotCache, log.WithField("context", "processor")) + proc := control_plane.NewProcessor(snapshotCache, logger) go func() { - // Run the xDS server ctx := context.Background() - srv := serverv3.NewServer(ctx, snapshotCache, nil) - control_plane.RunServer(ctx, srv, port) + server := serverv3.NewServer(ctx, snapshotCache, nil) + control_plane.RunServer(ctx, server, port) }() - // Notify channel for file system events - notifyCh := make(chan control_plane.NotifyMessage) + notifyCh := make(chan control_plane.NotifyMessage, 100) - go func() { - // Watch for file changes - control_plane.Watch(watchDirectoryFileName, notifyCh) - }() + notifyCh <- control_plane.NotifyMessage{ + Operation: control_plane.Create, + FilePath: watchDirectoryFileName, + } + + go control_plane.Watch(watchDirectoryFileName, notifyCh) for { select { @@ -52,58 +50,4 @@ func main() { proc.ProcessFile(msg) } } - - //config, err := rest.InClusterConfig() - //if err != nil { - // panic(err) - //} - //clientset, err := kubernetes.NewForConfig(config) - //if err != nil { - // panic(err) - //} - //namespace, _, err := util.Namespace() - //if err != nil { - // panic(err) - //} - - //informerFactory := informers.NewSharedInformerFactoryWithOptions( - // clientset, - // time.Second*5, - // informers.WithNamespace(namespace), - // informers.WithTweakListOptions(func(options *metav1.ListOptions) { - // options.FieldSelector = fields.OneTermEqualSelector( - // "metadata.name", config2.PodTrafficManager, - // ).String() - // }), - //) - //informer, err := informerFactory.ForResource(v1.SchemeGroupVersion.WithResource("configmaps")) - //if err != nil { - // panic(err) - //} - //informer.Informer().AddEventHandler( - // k8scache.FilteringResourceEventHandler{ - // FilterFunc: func(obj interface{}) bool { - // if cm, ok := obj.(*v1.ConfigMap); ok { - // if _, found := cm.Data[config2.Envoy]; found { - // return true - // } - // } - // return false - // }, - // Handler: k8scache.ResourceEventHandlerFuncs{ - // AddFunc: func(obj interface{}) { - // proc.ProcessConfig(obj.(*v1.ConfigMap).Data[config2.Envoy]) - // }, - // UpdateFunc: func(_, newObj interface{}) { - // proc.ProcessConfig(newObj.(*v1.ConfigMap).Data[config2.Envoy]) - // }, - // DeleteFunc: func(obj interface{}) { - // proc.ProcessConfig(obj.(*v1.ConfigMap).Data[config2.Envoy]) - // }, - // }, - // }, - //) - //informerFactory.Start(context.TODO().Done()) - //informerFactory.WaitForCacheSync(context.TODO().Done()) - //<-context.TODO().Done() } diff --git a/config/config.go b/config/config.go index 5524cbd3..1144b8ae 100644 --- a/config/config.go +++ b/config/config.go @@ -21,7 +21,7 @@ const ( var ( // Version inject --ldflags -X - Version = "" + Version = "latest" ImageServer = "naison/kubevpn:" + Version ImageMesh = "naison/kubevpn-mesh:" + Version diff --git a/pkg/control_plane/cache.go b/pkg/control_plane/cache.go index 4ac97fbb..5845e397 100644 --- a/pkg/control_plane/cache.go +++ b/pkg/control_plane/cache.go @@ -1,29 +1,273 @@ package control_plane -type Listener struct { - Name string - Address string - Port uint32 - RouteNames []string +import ( + "fmt" + cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + httpinspector "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/http_inspector/v3" + hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + tcp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" + v32 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/golang/protobuf/ptypes/wrappers" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/durationpb" + v1 "k8s.io/api/core/v1" + "time" +) + +type Virtual struct { + Uid string // group.resource.name + Ports []v1.ContainerPort + Rules []*Rule } -type Route struct { - Name string - Headers []Header - Cluster string +type Rule struct { + Headers map[string]string + LocalTunIP string } -type Header struct { - Key string - Value string +func (a *Virtual) To() ( + listeners []types.Resource, + clusters []types.Resource, + routes []types.Resource, + endpoints []types.Resource, +) { + //clusters = append(clusters, OriginCluster()) + for _, port := range a.Ports { + listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, port.ContainerPort, port.Protocol) + routeName := listenerName + listeners = append(listeners, ToListener(listenerName, routeName, port.ContainerPort, port.Protocol)) + + var rr []*route.Route + for _, rule := range a.Rules { + clusterName := fmt.Sprintf("%s_%v", rule.LocalTunIP, port.ContainerPort) + clusters = append(clusters, ToCluster(clusterName)) + endpoints = append(endpoints, ToEndPoint(clusterName, rule.LocalTunIP, port.ContainerPort)) + rr = append(rr, ToRoute(clusterName, rule.Headers)) + } + rr = append(rr, DefaultRoute()) + routes = append(routes, &route.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*route.VirtualHost{ + { + Name: "local_service", + Domains: []string{"*"}, + Routes: rr, + }, + }, + }) + } + return } -type Cluster struct { - Name string - Endpoints []Endpoint +func ToEndPoint(clusterName string, localTunIP string, port int32) *endpoint.ClusterLoadAssignment { + return &endpoint.ClusterLoadAssignment{ + ClusterName: clusterName, + Endpoints: []*endpoint.LocalityLbEndpoints{{ + LbEndpoints: []*endpoint.LbEndpoint{{ + HostIdentifier: &endpoint.LbEndpoint_Endpoint{ + Endpoint: &endpoint.Endpoint{ + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Address: localTunIP, + PortSpecifier: &core.SocketAddress_PortValue{ + PortValue: uint32(port), + }, + }, + }, + }, + }, + }, + }}, + }}, + } } -type Endpoint struct { - UpstreamHost string - UpstreamPort uint32 +func ToCluster(clusterName string) *cluster.Cluster { + return &cluster.Cluster{ + Name: clusterName, + ConnectTimeout: durationpb.New(5 * time.Second), + ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, + LbPolicy: cluster.Cluster_ROUND_ROBIN, + DnsLookupFamily: cluster.Cluster_V4_ONLY, + EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{ + EdsConfig: &core.ConfigSource{ + ResourceApiVersion: resource.DefaultAPIVersion, + ConfigSourceSpecifier: &core.ConfigSource_Ads{ + Ads: &core.AggregatedConfigSource{}, + }, + }, + }, + } +} + +func OriginCluster() *cluster.Cluster { + return &cluster.Cluster{ + Name: "origin_cluster", + ConnectTimeout: durationpb.New(time.Second * 5), + LbPolicy: cluster.Cluster_CLUSTER_PROVIDED, + ClusterDiscoveryType: &cluster.Cluster_Type{ + Type: cluster.Cluster_ORIGINAL_DST, + }, + } +} + +func ToRoute(clusterName string, headers map[string]string) *route.Route { + var r []*route.HeaderMatcher + for k, v := range headers { + r = append(r, &route.HeaderMatcher{ + Name: k, + HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ + StringMatch: &v32.StringMatcher{ + MatchPattern: &v32.StringMatcher_Exact{ + Exact: v, + }, + }, + }, + }) + } + return &route.Route{ + Match: &route.RouteMatch{ + PathSpecifier: &route.RouteMatch_Prefix{ + Prefix: "/", + }, + Headers: r, + }, + Action: &route.Route_Route{ + Route: &route.RouteAction{ + ClusterSpecifier: &route.RouteAction_Cluster{ + Cluster: clusterName, + }, + }, + }, + } +} + +func DefaultRoute() *route.Route { + return &route.Route{ + Match: &route.RouteMatch{ + PathSpecifier: &route.RouteMatch_Prefix{ + Prefix: "/", + }, + }, + Action: &route.Route_Route{ + Route: &route.RouteAction{ + ClusterSpecifier: &route.RouteAction_Cluster{ + Cluster: "origin_cluster", + }, + }, + }, + } +} + +func ToListener(listenerName string, routeName string, port int32, p v1.Protocol) *listener.Listener { + var protocol core.SocketAddress_Protocol + switch p { + case v1.ProtocolTCP: + protocol = core.SocketAddress_TCP + case v1.ProtocolUDP: + protocol = core.SocketAddress_UDP + case v1.ProtocolSCTP: + protocol = core.SocketAddress_TCP + } + + any := func(m proto.Message) *anypb.Any { + pbst, _ := anypb.New(m) + return pbst + } + + httpManager := &hcm.HttpConnectionManager{ + CodecType: hcm.HttpConnectionManager_AUTO, + StatPrefix: "http", + HttpFilters: []*hcm.HttpFilter{{ + Name: wellknown.Router, + }}, + RouteSpecifier: &hcm.HttpConnectionManager_Rds{ + Rds: &hcm.Rds{ + ConfigSource: &core.ConfigSource{ + ResourceApiVersion: resource.DefaultAPIVersion, + ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{ + ApiConfigSource: &core.ApiConfigSource{ + TransportApiVersion: resource.DefaultAPIVersion, + ApiType: core.ApiConfigSource_GRPC, + SetNodeOnFirstMessageOnly: true, + GrpcServices: []*core.GrpcService{{ + TargetSpecifier: &core.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"}, + }, + }}, + }, + }, + }, + RouteConfigName: routeName, + }, + }, + } + + tcpConfig := &tcp.TcpProxy{ + StatPrefix: "tcp", + ClusterSpecifier: &tcp.TcpProxy_Cluster{ + Cluster: "origin_cluster", + }, + } + + return &listener.Listener{ + Name: listenerName, + TrafficDirection: core.TrafficDirection_INBOUND, + BindToPort: &wrappers.BoolValue{Value: false}, + UseOriginalDst: &wrappers.BoolValue{Value: true}, + + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Protocol: protocol, + Address: "0.0.0.0", + PortSpecifier: &core.SocketAddress_PortValue{ + PortValue: uint32(port), + }, + }, + }, + }, + FilterChains: []*listener.FilterChain{ + { + FilterChainMatch: &listener.FilterChainMatch{ + ApplicationProtocols: []string{"http/1.0", "http/1.1", "h2c"}, + }, + Filters: []*listener.Filter{ + { + Name: wellknown.HTTPConnectionManager, + ConfigType: &listener.Filter_TypedConfig{ + TypedConfig: any(httpManager), + }, + }, + }, + }, + { + Filters: []*listener.Filter{ + { + Name: wellknown.TCPProxy, + ConfigType: &listener.Filter_TypedConfig{ + TypedConfig: any(tcpConfig), + }, + }, + }, + }, + }, + ListenerFilters: []*listener.ListenerFilter{ + { + Name: wellknown.HttpInspector, + ConfigType: &listener.ListenerFilter_TypedConfig{ + TypedConfig: any(&httpinspector.HttpInspector{}), + }, + }, + }, + } } diff --git a/pkg/control_plane/cache_bak.go b/pkg/control_plane/cache_bak.go deleted file mode 100644 index 3c5adaac..00000000 --- a/pkg/control_plane/cache_bak.go +++ /dev/null @@ -1,280 +0,0 @@ -package control_plane - -import ( - "fmt" - cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" - v32 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/wellknown" - "github.com/golang/protobuf/ptypes/wrappers" - "google.golang.org/protobuf/proto" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/durationpb" - v1 "k8s.io/api/core/v1" - "time" -) - -type Virtual struct { - Uid string // group.resource.name - Ports []v1.ContainerPort - Rules []*Rule -} - -type Rule struct { - Headers map[string]string - LocalTunIP string -} - -func (a *Virtual) To() ( - listeners []types.Resource, - clusters []types.Resource, - routes []types.Resource, - endpoints []types.Resource, -) { - //clusters = append(clusters, OriginCluster()) - for _, port := range a.Ports { - listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, port.ContainerPort, port.Protocol) - routeName := listenerName - listeners = append(listeners, ToListener(listenerName, routeName, port.ContainerPort, port.Protocol)) - - var rr []*route.Route - for _, rule := range a.Rules { - clusterName := fmt.Sprintf("%s_%v", rule.LocalTunIP, port.ContainerPort) - clusters = append(clusters, ToCluster(clusterName)) - endpoints = append(endpoints, ToEndPoint(clusterName, rule.LocalTunIP, port.ContainerPort)) - rr = append(rr, ToRoute(clusterName, rule.Headers)) - } - rr = append(rr, DefaultRoute()) - routes = append(routes, &route.RouteConfiguration{ - Name: routeName, - VirtualHosts: []*route.VirtualHost{ - { - Name: "local_service", - Domains: []string{"*"}, - Routes: rr, - }, - }, - }) - } - return -} - -func ToEndPoint(clusterName string, localTunIP string, port int32) *endpoint.ClusterLoadAssignment { - return &endpoint.ClusterLoadAssignment{ - ClusterName: clusterName, - Endpoints: []*endpoint.LocalityLbEndpoints{{ - LbEndpoints: []*endpoint.LbEndpoint{{ - HostIdentifier: &endpoint.LbEndpoint_Endpoint{ - Endpoint: &endpoint.Endpoint{ - Address: &core.Address{ - Address: &core.Address_SocketAddress{ - SocketAddress: &core.SocketAddress{ - Address: localTunIP, - PortSpecifier: &core.SocketAddress_PortValue{ - PortValue: uint32(port), - }, - }, - }, - }, - }, - }, - }}, - }}, - } -} - -func ToCluster(clusterName string) *cluster.Cluster { - return &cluster.Cluster{ - Name: clusterName, - ConnectTimeout: durationpb.New(5 * time.Second), - ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, - LbPolicy: cluster.Cluster_ROUND_ROBIN, - DnsLookupFamily: cluster.Cluster_V4_ONLY, - EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{ - EdsConfig: &core.ConfigSource{ - ResourceApiVersion: resource.DefaultAPIVersion, - ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{ - ApiConfigSource: &core.ApiConfigSource{ - TransportApiVersion: resource.DefaultAPIVersion, - ApiType: core.ApiConfigSource_GRPC, - SetNodeOnFirstMessageOnly: true, - GrpcServices: []*core.GrpcService{{ - TargetSpecifier: &core.GrpcService_EnvoyGrpc_{ - EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"}, - }, - }}, - }, - }, - }, - }, - } -} - -func OriginCluster() *cluster.Cluster { - return &cluster.Cluster{ - Name: "origin_cluster", - ConnectTimeout: durationpb.New(time.Second * 5), - LbPolicy: cluster.Cluster_CLUSTER_PROVIDED, - ClusterDiscoveryType: &cluster.Cluster_Type{ - Type: cluster.Cluster_ORIGINAL_DST, - }, - } -} - -func ToRoute(clusterName string, headers map[string]string) *route.Route { - var r []*route.HeaderMatcher - for k, v := range headers { - r = append(r, &route.HeaderMatcher{ - Name: k, - HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ - StringMatch: &v32.StringMatcher{ - MatchPattern: &v32.StringMatcher_Exact{ - Exact: v, - }, - }, - }, - }) - } - return &route.Route{ - Match: &route.RouteMatch{ - PathSpecifier: &route.RouteMatch_Prefix{ - Prefix: "/", - }, - Headers: r, - }, - Action: &route.Route_Route{ - Route: &route.RouteAction{ - ClusterSpecifier: &route.RouteAction_Cluster{ - Cluster: clusterName, - }, - }, - }, - } -} - -func DefaultRoute() *route.Route { - return &route.Route{ - Match: &route.RouteMatch{ - PathSpecifier: &route.RouteMatch_Prefix{ - Prefix: "/", - }, - }, - Action: &route.Route_Route{ - Route: &route.RouteAction{ - ClusterSpecifier: &route.RouteAction_Cluster{ - Cluster: "origin_cluster", - }, - }, - }, - } -} - -func ToListener(listenerName string, routeName string, port int32, p v1.Protocol) *listener.Listener { - var protocol core.SocketAddress_Protocol - switch p { - case v1.ProtocolTCP: - protocol = core.SocketAddress_TCP - case v1.ProtocolUDP: - protocol = core.SocketAddress_UDP - case v1.ProtocolSCTP: - protocol = core.SocketAddress_TCP - } - - any := func(m proto.Message) *anypb.Any { - pbst, _ := anypb.New(m) - return pbst - } - - httpManager := &hcm.HttpConnectionManager{ - CodecType: hcm.HttpConnectionManager_AUTO, - StatPrefix: "http", - HttpFilters: []*hcm.HttpFilter{{ - Name: wellknown.Router, - }}, - RouteSpecifier: &hcm.HttpConnectionManager_Rds{ - Rds: &hcm.Rds{ - ConfigSource: &core.ConfigSource{ - ResourceApiVersion: resource.DefaultAPIVersion, - ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{ - ApiConfigSource: &core.ApiConfigSource{ - TransportApiVersion: resource.DefaultAPIVersion, - ApiType: core.ApiConfigSource_GRPC, - SetNodeOnFirstMessageOnly: true, - GrpcServices: []*core.GrpcService{{ - TargetSpecifier: &core.GrpcService_EnvoyGrpc_{ - EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"}, - }, - }}, - }, - }, - }, - RouteConfigName: routeName, - }, - }, - } - - //tcpConfig := &tcp.TcpProxy{ - // StatPrefix: "tcp", - // ClusterSpecifier: &tcp.TcpProxy_Cluster{ - // Cluster: "origin_cluster", - // }, - //} - - return &listener.Listener{ - Name: listenerName, - TrafficDirection: core.TrafficDirection_INBOUND, - BindToPort: &wrappers.BoolValue{Value: false}, - //UseOriginalDst: &wrappers.BoolValue{Value: true}, - - Address: &core.Address{ - Address: &core.Address_SocketAddress{ - SocketAddress: &core.SocketAddress{ - Protocol: protocol, - Address: "0.0.0.0", - PortSpecifier: &core.SocketAddress_PortValue{ - PortValue: uint32(port), - }, - }, - }, - }, - FilterChains: []*listener.FilterChain{ - { - FilterChainMatch: &listener.FilterChainMatch{ - ApplicationProtocols: []string{"http/1.0", "http/1.1", "h2c"}, - }, - Filters: []*listener.Filter{ - { - Name: wellknown.HTTPConnectionManager, - ConfigType: &listener.Filter_TypedConfig{ - TypedConfig: any(httpManager), - }, - }, - }, - }, - //{ - // Filters: []*listener.Filter{ - // { - // Name: wellknown.TCPProxy, - // ConfigType: &listener.Filter_TypedConfig{ - // TypedConfig: any(tcpConfig), - // }, - // }, - // }, - //}, - }, - //ListenerFilters: []*listener.ListenerFilter{ - // { - // Name: wellknown.HttpInspector, - // ConfigType: &listener.ListenerFilter_TypedConfig{ - // TypedConfig: any(&httpinspector.HttpInspector{}), - // }, - // }, - //}, - } -} diff --git a/pkg/control_plane/cache_xds.go b/pkg/control_plane/cache_xds.go deleted file mode 100644 index 3d93c972..00000000 --- a/pkg/control_plane/cache_xds.go +++ /dev/null @@ -1,93 +0,0 @@ -package control_plane - -import ( - "github.com/envoyproxy/go-control-plane/pkg/cache/types" -) - -type XDSCache struct { - Listeners map[string]Listener - Routes map[string]Route - Clusters map[string]Cluster - Endpoints map[string]Endpoint -} - -func (xds *XDSCache) ClusterContents() []types.Resource { - var r []types.Resource - - for _, c := range xds.Clusters { - r = append(r, MakeCluster(c.Name)) - } - - return r -} - -func (xds *XDSCache) RouteContents() []types.Resource { - - var routesArray []Route - for _, r := range xds.Routes { - routesArray = append(routesArray, r) - } - - return []types.Resource{MakeRoute(routesArray)} -} - -func (xds *XDSCache) ListenerContents() []types.Resource { - var r []types.Resource - - for _, l := range xds.Listeners { - r = append(r, MakeHTTPListener(l.Name, l.RouteNames[0], l.Address, l.Port)) - } - - return r -} - -func (xds *XDSCache) EndpointsContents() []types.Resource { - var r []types.Resource - - for _, c := range xds.Clusters { - r = append(r, MakeEndpoint(c.Name, c.Endpoints)) - } - - return r -} - -func (xds *XDSCache) AddListener(name string, routeNames []string, address string, port uint32) { - xds.Listeners[name] = Listener{ - Name: name, - Address: address, - Port: port, - RouteNames: routeNames, - } -} - -func (xds *XDSCache) AddRoute(name string, headers []HeaderMatch, clusterName string) { - var h []Header - for _, header := range headers { - h = append(h, Header{ - Key: header.Key, - Value: header.Value, - }) - } - xds.Routes[name] = Route{ - Name: name, - Headers: h, - Cluster: clusterName, - } -} - -func (xds *XDSCache) AddCluster(name string) { - xds.Clusters[name] = Cluster{ - Name: name, - } -} - -func (xds *XDSCache) AddEndpoint(clusterName, upstreamHost string, upstreamPort uint32) { - cluster := xds.Clusters[clusterName] - - cluster.Endpoints = append(cluster.Endpoints, Endpoint{ - UpstreamHost: upstreamHost, - UpstreamPort: upstreamPort, - }) - - xds.Clusters[clusterName] = cluster -} diff --git a/pkg/control_plane/envoy.go b/pkg/control_plane/envoy.go deleted file mode 100644 index a5b79e60..00000000 --- a/pkg/control_plane/envoy.go +++ /dev/null @@ -1,39 +0,0 @@ -package control_plane - -type EnvoyConfig struct { - NodeID string `yaml:"nodeID"` - Spec Spec `yaml:"spec"` -} - -type Spec struct { - Listeners []ListenerTemp `yaml:"listeners"` - Clusters []ClusterTemp `yaml:"clusters"` -} - -type ListenerTemp struct { - Name string `yaml:"name"` - Address string `yaml:"address"` - Port uint32 `yaml:"port"` - Routes []RouteTemp `yaml:"routes"` -} - -type RouteTemp struct { - Name string `yaml:"name"` - Headers []HeaderMatch `yaml:"headers"` - ClusterName string `yaml:"clusters"` -} - -type HeaderMatch struct { - Key string `yaml:"key"` - Value string `yaml:"value"` -} - -type ClusterTemp struct { - Name string `yaml:"name"` - Endpoints []EndpointTemp `yaml:"endpoints"` -} - -type EndpointTemp struct { - Address string `yaml:"address"` - Port uint32 `yaml:"port"` -} diff --git a/pkg/control_plane/processor.go b/pkg/control_plane/processor.go index d2de51a8..281e1a95 100644 --- a/pkg/control_plane/processor.go +++ b/pkg/control_plane/processor.go @@ -11,125 +11,81 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" "math" "math/rand" - "os" "strconv" ) type Processor struct { - cache cache.SnapshotCache - - // snapshotVersion holds the current version of the snapshot. - snapshotVersion int64 - - log logrus.FieldLogger - - xdsCache XDSCache + cache cache.SnapshotCache + logger *logrus.Logger + version int64 } -func NewProcessor(cache cache.SnapshotCache, log logrus.FieldLogger) *Processor { +func NewProcessor(cache cache.SnapshotCache, log *logrus.Logger) *Processor { return &Processor{ - cache: cache, - snapshotVersion: rand.Int63n(1000), - log: log, - xdsCache: XDSCache{ - Listeners: make(map[string]Listener), - Clusters: make(map[string]Cluster), - Routes: make(map[string]Route), - Endpoints: make(map[string]Endpoint), - }, + cache: cache, + logger: log, + version: rand.Int63n(1000), } } -// newSnapshotVersion increments the current snapshotVersion -// and returns as a string. -func (p *Processor) newSnapshotVersion() string { - - // Reset the snapshotVersion if it ever hits max size. - if p.snapshotVersion == math.MaxInt64 { - p.snapshotVersion = 0 +func (p *Processor) newVersion() string { + if p.version == math.MaxInt64 { + p.version = 0 } - - // Increment the snapshot version & return as string. - p.snapshotVersion++ - return strconv.FormatInt(p.snapshotVersion, 10) + p.version++ + return strconv.FormatInt(p.version, 10) } -// ProcessFile takes a file and generates an xDS snapshot func (p *Processor) ProcessFile(file NotifyMessage) { - // Parse file into object - envoyConfigList, err := ParseYaml(file.FilePath) + configList, err := ParseYaml(file.FilePath) if err != nil { - p.log.Errorf("error parsing yaml file: %+v", err) + p.logger.Errorf("error parsing yaml file: %+v", err) return } - - for _, envoyConfig := range envoyConfigList { - // Parse Listeners - for _, l := range envoyConfig.Spec.Listeners { - var lRoutes []string - for _, lr := range l.Routes { - lRoutes = append(lRoutes, lr.Name) - } - - p.xdsCache.AddListener(l.Name, lRoutes, l.Address, l.Port) - - for _, r := range l.Routes { - p.xdsCache.AddRoute(r.Name, r.Headers, r.ClusterName) - } + for _, config := range configList { + if len(config.Uid) == 0 { + continue } - - // Parse Clusters - for _, c := range envoyConfig.Spec.Clusters { - p.xdsCache.AddCluster(c.Name) - - // Parse endpoints - for _, e := range c.Endpoints { - p.xdsCache.AddEndpoint(c.Name, e.Address, e.Port) - } + listeners, clusters, routes, endpoints := config.To() + resources := map[resource.Type][]types.Resource{ + resource.ListenerType: listeners, // listeners + resource.RouteType: routes, // routes + resource.ClusterType: clusters, // clusters + resource.EndpointType: endpoints, // endpoints + resource.RuntimeType: {}, // runtimes + resource.SecretType: {}, // secrets } - a := map[resource.Type][]types.Resource{ - resource.EndpointType: p.xdsCache.EndpointsContents(), // endpoints - resource.ClusterType: p.xdsCache.ClusterContents(), // clusters - resource.RouteType: p.xdsCache.RouteContents(), // routes - resource.ListenerType: p.xdsCache.ListenerContents(), // listeners - resource.RuntimeType: {}, // runtimes - resource.SecretType: {}, // secrets - } - // Create the snapshot that we'll serve to Envoy - snapshot, err := cache.NewSnapshot(p.newSnapshotVersion(), a) + snapshot, err := cache.NewSnapshot(p.newVersion(), resources) if err != nil { - p.log.Errorf("snapshot inconsistency err: %+v\n\n\n%+v", snapshot, err) + p.logger.Errorf("snapshot inconsistency: %v, err: %v", snapshot, err) return } if err = snapshot.Consistent(); err != nil { - p.log.Errorf("snapshot inconsistency: %+v\n\n\n%+v", snapshot, err) + p.logger.Errorf("snapshot inconsistency: %v, err: %v", snapshot, err) return } - p.log.Debugf("will serve snapshot %+v", snapshot) - - // Add the snapshot to the cache - if err = p.cache.SetSnapshot(context.TODO(), envoyConfig.NodeID, snapshot); err != nil { - p.log.Errorf("snapshot error %q for %+v", err, snapshot) - os.Exit(1) + p.logger.Debugf("will serve snapshot %+v, nodeID: %s", snapshot, config.Uid) + if err = p.cache.SetSnapshot(context.TODO(), config.Uid, snapshot); err != nil { + p.logger.Errorf("snapshot error %q for %v", err, snapshot) + p.logger.Fatal(err) } } } -// ParseYaml takes in a yaml envoy config and returns a typed version -func ParseYaml(file string) ([]*EnvoyConfig, error) { - var envoyConfigs = make([]*EnvoyConfig, 0) +func ParseYaml(file string) ([]*Virtual, error) { + var virtualList = make([]*Virtual, 0) yamlFile, err := ioutil.ReadFile(file) if err != nil { return nil, fmt.Errorf("Error reading YAML file: %s\n", err) } - err = yaml.Unmarshal(yamlFile, &envoyConfigs) + err = yaml.Unmarshal(yamlFile, &virtualList) if err != nil { return nil, err } - return envoyConfigs, nil + return virtualList, nil } diff --git a/pkg/control_plane/resource.go b/pkg/control_plane/resource.go deleted file mode 100644 index 0a1d98b1..00000000 --- a/pkg/control_plane/resource.go +++ /dev/null @@ -1,194 +0,0 @@ -package control_plane - -import ( - etmv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" - "github.com/golang/protobuf/ptypes/wrappers" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/durationpb" - "time" - - cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" - route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" - hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" - "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - "github.com/envoyproxy/go-control-plane/pkg/wellknown" -) - -func MakeCluster(clusterName string) *cluster.Cluster { - return &cluster.Cluster{ - Name: clusterName, - ConnectTimeout: durationpb.New(5 * time.Second), - ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, - LbPolicy: cluster.Cluster_ROUND_ROBIN, - //LoadAssignment: makeEndpoint(clusterName, UpstreamHost), - DnsLookupFamily: cluster.Cluster_V4_ONLY, - EdsClusterConfig: makeEDSCluster(), - } -} - -func makeEDSCluster() *cluster.Cluster_EdsClusterConfig { - return &cluster.Cluster_EdsClusterConfig{ - EdsConfig: makeConfigSource(), - } -} - -func MakeEndpoint(clusterName string, eps []Endpoint) *endpoint.ClusterLoadAssignment { - var endpoints []*endpoint.LbEndpoint - - for _, e := range eps { - endpoints = append(endpoints, &endpoint.LbEndpoint{ - HostIdentifier: &endpoint.LbEndpoint_Endpoint{ - Endpoint: &endpoint.Endpoint{ - Address: &core.Address{ - Address: &core.Address_SocketAddress{ - SocketAddress: &core.SocketAddress{ - Protocol: core.SocketAddress_TCP, - Address: e.UpstreamHost, - PortSpecifier: &core.SocketAddress_PortValue{ - PortValue: e.UpstreamPort, - }, - }, - }, - }, - }, - }, - }) - } - - return &endpoint.ClusterLoadAssignment{ - ClusterName: clusterName, - Endpoints: []*endpoint.LocalityLbEndpoints{{ - LbEndpoints: endpoints, - }}, - } -} - -func MakeRoute(routes []Route) *route.RouteConfiguration { - var rts []*route.Route - - for _, r := range routes { - var rr []*route.HeaderMatcher - for _, header := range r.Headers { - rr = append(rr, &route.HeaderMatcher{ - Name: header.Key, - HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ - StringMatch: &etmv3.StringMatcher{ - MatchPattern: &etmv3.StringMatcher_Contains{ - Contains: header.Value, - }, - IgnoreCase: true, - }, - }, - }) - } - rts = append(rts, &route.Route{ - //Name: r.Name, - Match: &route.RouteMatch{ - PathSpecifier: &route.RouteMatch_Prefix{ - Prefix: "/", - }, - Headers: rr, - }, - Action: &route.Route_Route{ - Route: &route.RouteAction{ - ClusterSpecifier: &route.RouteAction_Cluster{ - Cluster: r.Cluster, - }, - }, - }, - }) - } - - //rts = append(rts, &route.Route{ - // //Name: r.Name, - // Match: &route.RouteMatch{ - // PathSpecifier: &route.RouteMatch_Prefix{ - // Prefix: "/", - // }, - // }, - // Action: &route.Route_Route{ - // Route: &route.RouteAction{ - // ClusterSpecifier: &route.RouteAction_Cluster{ - // Cluster: "default_cluster", - // }, - // }, - // }, - //}) - - return &route.RouteConfiguration{ - Name: "listener_0", - VirtualHosts: []*route.VirtualHost{{ - Name: "local_service", - Domains: []string{"*"}, - Routes: rts, - }}, - } -} - -func MakeHTTPListener(listenerName, route, address string, port uint32) *listener.Listener { - // HTTP filter configuration - manager := &hcm.HttpConnectionManager{ - CodecType: hcm.HttpConnectionManager_AUTO, - StatPrefix: "http", - RouteSpecifier: &hcm.HttpConnectionManager_Rds{ - Rds: &hcm.Rds{ - ConfigSource: makeConfigSource(), - RouteConfigName: "listener_0", - }, - }, - HttpFilters: []*hcm.HttpFilter{{ - Name: wellknown.Router, - }}, - } - pbst, err := anypb.New(manager) - if err != nil { - panic(err) - } - - return &listener.Listener{ - Name: listenerName, - BindToPort: &wrappers.BoolValue{ - Value: false, - }, - Address: &core.Address{ - Address: &core.Address_SocketAddress{ - SocketAddress: &core.SocketAddress{ - Protocol: core.SocketAddress_TCP, - Address: address, - PortSpecifier: &core.SocketAddress_PortValue{ - PortValue: port, - }, - }, - }, - }, - FilterChains: []*listener.FilterChain{{ - Filters: []*listener.Filter{{ - Name: wellknown.HTTPConnectionManager, - ConfigType: &listener.Filter_TypedConfig{ - TypedConfig: pbst, - }, - }}, - }}, - } -} - -func makeConfigSource() *core.ConfigSource { - source := &core.ConfigSource{} - source.ResourceApiVersion = resource.DefaultAPIVersion - source.ConfigSourceSpecifier = &core.ConfigSource_ApiConfigSource{ - ApiConfigSource: &core.ApiConfigSource{ - TransportApiVersion: resource.DefaultAPIVersion, - ApiType: core.ApiConfigSource_GRPC, - SetNodeOnFirstMessageOnly: true, - GrpcServices: []*core.GrpcService{{ - TargetSpecifier: &core.GrpcService_EnvoyGrpc_{ - EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"}, - }, - }}, - }, - } - return source -} diff --git a/pkg/control_plane/server.go b/pkg/control_plane/server.go index 93fec14b..a5d805da 100644 --- a/pkg/control_plane/server.go +++ b/pkg/control_plane/server.go @@ -20,7 +20,6 @@ const ( grpcMaxConcurrentStreams = 1000000 ) -// RunServer starts an xDS server at the given port. func RunServer(ctx context.Context, server serverv3.Server, port uint) { grpcServer := grpc.NewServer(grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams)) @@ -40,6 +39,6 @@ func RunServer(ctx context.Context, server serverv3.Server, port uint) { log.Printf("management server listening on %d\n", port) if err = grpcServer.Serve(listener); err != nil { - log.Println(err) + log.Fatal(err) } } diff --git a/pkg/envoy.go b/pkg/envoy.go index a191abd2..a213f672 100644 --- a/pkg/envoy.go +++ b/pkg/envoy.go @@ -18,7 +18,6 @@ import ( v12 "k8s.io/client-go/kubernetes/typed/core/v1" cmdutil "k8s.io/kubectl/pkg/cmd/util" "sigs.k8s.io/yaml" - "strconv" "strings" "time" ) @@ -26,7 +25,6 @@ import ( // https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio // patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 -// TODO support multiple port func PatchSidecar(factory cmdutil.Factory, clientset v12.ConfigMapInterface, namespace, workloads string, c util.PodRouteConfig, headers map[string]string) error { //t := true //zero := int64(0) @@ -47,7 +45,7 @@ func PatchSidecar(factory cmdutil.Factory, clientset v12.ConfigMapInterface, nam for _, container := range templateSpec.Spec.Containers { port = append(port, container.Ports...) } - nodeID := fmt.Sprintf("%s-%s-%s", object.Mapping.Resource.Resource, object.Mapping.Resource.Group, object.Name) + nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name) err = addEnvoyConfig(clientset, nodeID, c.LocalTunIP, headers, port) if err != nil { @@ -137,12 +135,12 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa return err } -func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTUNIP string, headers map[string]string, containerPorts []v1.ContainerPort) error { +func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTUNIP string, headers map[string]string, port []v1.ContainerPort) error { configMap, err := mapInterface.Get(context.TODO(), config2.PodTrafficManager, metav1.GetOptions{}) if err != nil { return err } - var v = make([]*control_plane.EnvoyConfig, 0) + var v = make([]*control_plane.Virtual, 0) if str, ok := configMap.Data[config2.Envoy]; ok { if err = yaml.Unmarshal([]byte(str), &v); err != nil { return err @@ -150,78 +148,24 @@ func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTUN } var index = -1 for i, virtual := range v { - if nodeID == virtual.NodeID { + if nodeID == virtual.Uid { index = i } } - var h []control_plane.HeaderMatch - for k, v := range headers { - h = append(h, control_plane.HeaderMatch{Key: k, Value: v}) - } - var l []control_plane.ListenerTemp - var c []control_plane.ClusterTemp - // if we can't find nodeID, just add it if index < 0 { - for _, port := range containerPorts { - clusterName := localTUNIP + "_" + strconv.Itoa(int(port.ContainerPort)) - c = append(c, control_plane.ClusterTemp{ - Name: clusterName, - Endpoints: []control_plane.EndpointTemp{{Address: localTUNIP, Port: uint32(port.ContainerPort)}}, - }) - l = append(l, control_plane.ListenerTemp{ - Name: strconv.Itoa(int(port.ContainerPort)), - Address: "0.0.0.0", - Port: uint32(port.ContainerPort), - Routes: []control_plane.RouteTemp{ - { - Headers: h, - ClusterName: clusterName, - }, - { - Headers: nil, - ClusterName: "origin_cluster", - }, - }, - }) - } - v = append(v, &control_plane.EnvoyConfig{ - NodeID: nodeID, - Spec: control_plane.Spec{ - Listeners: l, - Clusters: c, - }, + v = append(v, &control_plane.Virtual{ + Uid: nodeID, + Ports: port, + Rules: []*control_plane.Rule{{ + Headers: headers, + LocalTunIP: localTUNIP, + }}, }) } else { - // if listener already exist, needs to add route - // if not exist, needs to create this listener, and then add route - // make sure position of default route is last - - for _, port := range containerPorts { - clusterName := localTUNIP + "_" + strconv.Itoa(int(port.ContainerPort)) - for _, listener := range v[index].Spec.Listeners { - if listener.Port == uint32(port.ContainerPort) { - listener.Routes = append( - []control_plane.RouteTemp{{Headers: h, ClusterName: clusterName}}, listener.Routes..., - ) - } - } - var found = false - for _, cluster := range v[index].Spec.Clusters { - if cluster.Name == clusterName { - found = true - break - } - } - if !found { - v[index].Spec.Clusters = append(v[index].Spec.Clusters, control_plane.ClusterTemp{ - Name: clusterName, - Endpoints: []control_plane.EndpointTemp{{ - Address: localTUNIP, - Port: uint32(port.ContainerPort), - }}, - }) - } - } + v[index].Rules = append(v[index].Rules, &control_plane.Rule{ + Headers: headers, + LocalTunIP: localTUNIP, + }) } marshal, err := yaml.Marshal(v) @@ -256,6 +200,13 @@ func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, heade } } } + // remove default + for i := 0; i < len(v); i++ { + if nodeID == v[i].Uid && len(v[i].Rules) == 0 { + v = append(v[:i], v[i+1:]...) + i-- + } + } marshal, err := yaml.Marshal(v) if err != nil { return err diff --git a/pkg/mesh/controller.go b/pkg/mesh/controller.go index 2ae86bc9..8caa2ef5 100644 --- a/pkg/mesh/controller.go +++ b/pkg/mesh/controller.go @@ -54,12 +54,8 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, nodeId string, c util.PodRouteCo "iptables -F;" + "iptables -P INPUT ACCEPT;" + "iptables -P FORWARD ACCEPT;" + - //"iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" + - //"iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" + - "iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" + - "iptables -t nat -A POSTROUTING -p tcp -m tcp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" + - "iptables -t nat -A PREROUTING -i eth0 -p udp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" + - "iptables -t nat -A POSTROUTING -p udp -m udp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" + + "iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" + + "iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" + "kubevpn serve -L 'tun://0.0.0.0:8421/" + c.TrafficManagerRealIP + ":8422?net=" + c.InboundPodTunIP + "&route=" + c.Route + "' --debug=true", }, SecurityContext: &v1.SecurityContext{ @@ -89,7 +85,7 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, nodeId string, c util.PodRouteCo Image: config.ImageMesh, Command: []string{"envoy", "-l", "debug", "--config-yaml"}, Args: []string{ - fmt.Sprintf(s, nodeId, nodeId /*c.TrafficManagerRealIP*/, "127.0.0.1"), + fmt.Sprintf(s, nodeId, nodeId, c.TrafficManagerRealIP), }, Resources: v1.ResourceRequirements{ Requests: map[v1.ResourceName]resource.Quantity{ @@ -103,36 +99,4 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, nodeId string, c util.PodRouteCo }, ImagePullPolicy: v1.PullAlways, }) - spec.Spec.Containers = append(spec.Spec.Containers, v1.Container{ - Name: config.SidecarControlPlane, - Image: config.ImageControlPlane, - Command: []string{"envoy-xds-server"}, - Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy-config.yaml"}, - VolumeMounts: []v1.VolumeMount{ - { - Name: config.VolumeEnvoyConfig, - ReadOnly: true, - MountPath: "/etc/envoy", - }, - }, - ImagePullPolicy: v1.PullAlways, - }) - f := false - spec.Spec.Volumes = append(spec.Spec.Volumes, v1.Volume{ - Name: config.VolumeEnvoyConfig, - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: config.PodTrafficManager, - }, - Items: []v1.KeyToPath{ - { - Key: config.Envoy, - Path: "envoy-config.yaml", - }, - }, - Optional: &f, - }, - }, - }) } diff --git a/pkg/remote.go b/pkg/remote.go index 8f4cfe5e..34ba41f3 100644 --- a/pkg/remote.go +++ b/pkg/remote.go @@ -79,7 +79,7 @@ func CreateOutboundPod(clientset *kubernetes.Clientset, namespace string, traffi Items: []v1.KeyToPath{ { Key: config.Envoy, - Path: "envoy.yaml", + Path: "envoy-config.yaml", }, }, Optional: &f, @@ -114,20 +114,20 @@ func CreateOutboundPod(clientset *kubernetes.Clientset, namespace string, traffi }, ImagePullPolicy: v1.PullAlways, }, - //{ - // Name: config.SidecarControlPlane, - // Image: config.ImageControlPlane, - // Command: []string{"envoy-xds-server"}, - // Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy.yaml"}, - // VolumeMounts: []v1.VolumeMount{ - // { - // Name: config.VolumeEnvoyConfig, - // ReadOnly: true, - // MountPath: "/etc/envoy", - // }, - // }, - // ImagePullPolicy: v1.PullAlways, - //}, + { + Name: config.SidecarControlPlane, + Image: config.ImageControlPlane, + Command: []string{"envoy-xds-server"}, + Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy-config.yaml"}, + VolumeMounts: []v1.VolumeMount{ + { + Name: config.VolumeEnvoyConfig, + ReadOnly: true, + MountPath: "/etc/envoy", + }, + }, + ImagePullPolicy: v1.PullAlways, + }, }, RestartPolicy: v1.RestartPolicyAlways, PriorityClassName: "system-cluster-critical",