support multiple port with envoy

This commit is contained in:
p_caiwfeng
2022-03-07 20:25:46 +08:00
committed by wencaiwulue
parent a48a256bdc
commit b6e0b7ce52
13 changed files with 358 additions and 899 deletions

View File

@@ -80,12 +80,17 @@ image: kubevpn-linux-amd64
mv kubevpn-linux-amd64 kubevpn
docker build -t naison/kubevpn:${VERSION} -f ./dockerfile/server/Dockerfile .
rm -fr kubevpn
docker tag naison/kubevpn:${VERSION} naison/kubevpn:latest
docker push naison/kubevpn:${VERSION}
docker push naison/kubevpn:latest
.PHONY: image-mesh
image-mesh:
docker build -t naison/kubevpn-mesh:${VERSION} -f ./dockerfile/mesh/Dockerfile .
docker tag naison/kubevpn-mesh:${VERSION} naison/kubevpn-mesh:latest
docker push naison/kubevpn-mesh:${VERSION}
docker push naison/kubevpn-mesh:latest
.PHONY: image-control-plane
image-control-plane:
@@ -93,5 +98,7 @@ image-control-plane:
chmod +x envoy-xds-server
docker build -t naison/envoy-xds-server:${VERSION} -f ./dockerfile/control_plane/Dockerfile .
rm -fr envoy-xds-server
docker tag naison/envoy-xds-server:${VERSION} naison/envoy-xds-server:latest
docker push naison/envoy-xds-server:${VERSION}
docker push naison/envoy-xds-server:latest

View File

@@ -11,7 +11,7 @@ import (
)
var (
logger log.FieldLogger
logger *log.Logger
watchDirectoryFileName string
port uint = 9002
)
@@ -25,25 +25,23 @@ func init() {
}
func main() {
// Create a cache
snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger)
// Create a processor
proc := control_plane.NewProcessor(snapshotCache, log.WithField("context", "processor"))
proc := control_plane.NewProcessor(snapshotCache, logger)
go func() {
// Run the xDS server
ctx := context.Background()
srv := serverv3.NewServer(ctx, snapshotCache, nil)
control_plane.RunServer(ctx, srv, port)
server := serverv3.NewServer(ctx, snapshotCache, nil)
control_plane.RunServer(ctx, server, port)
}()
// Notify channel for file system events
notifyCh := make(chan control_plane.NotifyMessage)
notifyCh := make(chan control_plane.NotifyMessage, 100)
go func() {
// Watch for file changes
control_plane.Watch(watchDirectoryFileName, notifyCh)
}()
notifyCh <- control_plane.NotifyMessage{
Operation: control_plane.Create,
FilePath: watchDirectoryFileName,
}
go control_plane.Watch(watchDirectoryFileName, notifyCh)
for {
select {
@@ -52,58 +50,4 @@ func main() {
proc.ProcessFile(msg)
}
}
//config, err := rest.InClusterConfig()
//if err != nil {
// panic(err)
//}
//clientset, err := kubernetes.NewForConfig(config)
//if err != nil {
// panic(err)
//}
//namespace, _, err := util.Namespace()
//if err != nil {
// panic(err)
//}
//informerFactory := informers.NewSharedInformerFactoryWithOptions(
// clientset,
// time.Second*5,
// informers.WithNamespace(namespace),
// informers.WithTweakListOptions(func(options *metav1.ListOptions) {
// options.FieldSelector = fields.OneTermEqualSelector(
// "metadata.name", config2.PodTrafficManager,
// ).String()
// }),
//)
//informer, err := informerFactory.ForResource(v1.SchemeGroupVersion.WithResource("configmaps"))
//if err != nil {
// panic(err)
//}
//informer.Informer().AddEventHandler(
// k8scache.FilteringResourceEventHandler{
// FilterFunc: func(obj interface{}) bool {
// if cm, ok := obj.(*v1.ConfigMap); ok {
// if _, found := cm.Data[config2.Envoy]; found {
// return true
// }
// }
// return false
// },
// Handler: k8scache.ResourceEventHandlerFuncs{
// AddFunc: func(obj interface{}) {
// proc.ProcessConfig(obj.(*v1.ConfigMap).Data[config2.Envoy])
// },
// UpdateFunc: func(_, newObj interface{}) {
// proc.ProcessConfig(newObj.(*v1.ConfigMap).Data[config2.Envoy])
// },
// DeleteFunc: func(obj interface{}) {
// proc.ProcessConfig(obj.(*v1.ConfigMap).Data[config2.Envoy])
// },
// },
// },
//)
//informerFactory.Start(context.TODO().Done())
//informerFactory.WaitForCacheSync(context.TODO().Done())
//<-context.TODO().Done()
}

View File

@@ -21,7 +21,7 @@ const (
var (
// Version inject --ldflags -X
Version = ""
Version = "latest"
ImageServer = "naison/kubevpn:" + Version
ImageMesh = "naison/kubevpn-mesh:" + Version

View File

@@ -1,29 +1,273 @@
package control_plane
type Listener struct {
Name string
Address string
Port uint32
RouteNames []string
import (
"fmt"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
httpinspector "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/http_inspector/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
tcp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3"
v32 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
v1 "k8s.io/api/core/v1"
"time"
)
type Virtual struct {
Uid string // group.resource.name
Ports []v1.ContainerPort
Rules []*Rule
}
type Route struct {
Name string
Headers []Header
Cluster string
type Rule struct {
Headers map[string]string
LocalTunIP string
}
type Header struct {
Key string
Value string
func (a *Virtual) To() (
listeners []types.Resource,
clusters []types.Resource,
routes []types.Resource,
endpoints []types.Resource,
) {
//clusters = append(clusters, OriginCluster())
for _, port := range a.Ports {
listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, port.ContainerPort, port.Protocol)
routeName := listenerName
listeners = append(listeners, ToListener(listenerName, routeName, port.ContainerPort, port.Protocol))
var rr []*route.Route
for _, rule := range a.Rules {
clusterName := fmt.Sprintf("%s_%v", rule.LocalTunIP, port.ContainerPort)
clusters = append(clusters, ToCluster(clusterName))
endpoints = append(endpoints, ToEndPoint(clusterName, rule.LocalTunIP, port.ContainerPort))
rr = append(rr, ToRoute(clusterName, rule.Headers))
}
rr = append(rr, DefaultRoute())
routes = append(routes, &route.RouteConfiguration{
Name: routeName,
VirtualHosts: []*route.VirtualHost{
{
Name: "local_service",
Domains: []string{"*"},
Routes: rr,
},
},
})
}
return
}
type Cluster struct {
Name string
Endpoints []Endpoint
func ToEndPoint(clusterName string, localTunIP string, port int32) *endpoint.ClusterLoadAssignment {
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*endpoint.LocalityLbEndpoints{{
LbEndpoints: []*endpoint.LbEndpoint{{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Address: localTunIP,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
},
},
}},
}},
}
}
type Endpoint struct {
UpstreamHost string
UpstreamPort uint32
func ToCluster(clusterName string) *cluster.Cluster {
return &cluster.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
DnsLookupFamily: cluster.Cluster_V4_ONLY,
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
EdsConfig: &core.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_Ads{
Ads: &core.AggregatedConfigSource{},
},
},
},
}
}
func OriginCluster() *cluster.Cluster {
return &cluster.Cluster{
Name: "origin_cluster",
ConnectTimeout: durationpb.New(time.Second * 5),
LbPolicy: cluster.Cluster_CLUSTER_PROVIDED,
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_ORIGINAL_DST,
},
}
}
func ToRoute(clusterName string, headers map[string]string) *route.Route {
var r []*route.HeaderMatcher
for k, v := range headers {
r = append(r, &route.HeaderMatcher{
Name: k,
HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{
StringMatch: &v32.StringMatcher{
MatchPattern: &v32.StringMatcher_Exact{
Exact: v,
},
},
},
})
}
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
Headers: r,
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: clusterName,
},
},
},
}
}
func DefaultRoute() *route.Route {
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: "origin_cluster",
},
},
},
}
}
func ToListener(listenerName string, routeName string, port int32, p v1.Protocol) *listener.Listener {
var protocol core.SocketAddress_Protocol
switch p {
case v1.ProtocolTCP:
protocol = core.SocketAddress_TCP
case v1.ProtocolUDP:
protocol = core.SocketAddress_UDP
case v1.ProtocolSCTP:
protocol = core.SocketAddress_TCP
}
any := func(m proto.Message) *anypb.Any {
pbst, _ := anypb.New(m)
return pbst
}
httpManager := &hcm.HttpConnectionManager{
CodecType: hcm.HttpConnectionManager_AUTO,
StatPrefix: "http",
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
}},
RouteSpecifier: &hcm.HttpConnectionManager_Rds{
Rds: &hcm.Rds{
ConfigSource: &core.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{
ApiConfigSource: &core.ApiConfigSource{
TransportApiVersion: resource.DefaultAPIVersion,
ApiType: core.ApiConfigSource_GRPC,
SetNodeOnFirstMessageOnly: true,
GrpcServices: []*core.GrpcService{{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"},
},
}},
},
},
},
RouteConfigName: routeName,
},
},
}
tcpConfig := &tcp.TcpProxy{
StatPrefix: "tcp",
ClusterSpecifier: &tcp.TcpProxy_Cluster{
Cluster: "origin_cluster",
},
}
return &listener.Listener{
Name: listenerName,
TrafficDirection: core.TrafficDirection_INBOUND,
BindToPort: &wrappers.BoolValue{Value: false},
UseOriginalDst: &wrappers.BoolValue{Value: true},
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: protocol,
Address: "0.0.0.0",
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
FilterChains: []*listener.FilterChain{
{
FilterChainMatch: &listener.FilterChainMatch{
ApplicationProtocols: []string{"http/1.0", "http/1.1", "h2c"},
},
Filters: []*listener.Filter{
{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: any(httpManager),
},
},
},
},
{
Filters: []*listener.Filter{
{
Name: wellknown.TCPProxy,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: any(tcpConfig),
},
},
},
},
},
ListenerFilters: []*listener.ListenerFilter{
{
Name: wellknown.HttpInspector,
ConfigType: &listener.ListenerFilter_TypedConfig{
TypedConfig: any(&httpinspector.HttpInspector{}),
},
},
},
}
}

View File

@@ -1,280 +0,0 @@
package control_plane
import (
"fmt"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v32 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
v1 "k8s.io/api/core/v1"
"time"
)
type Virtual struct {
Uid string // group.resource.name
Ports []v1.ContainerPort
Rules []*Rule
}
type Rule struct {
Headers map[string]string
LocalTunIP string
}
func (a *Virtual) To() (
listeners []types.Resource,
clusters []types.Resource,
routes []types.Resource,
endpoints []types.Resource,
) {
//clusters = append(clusters, OriginCluster())
for _, port := range a.Ports {
listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, port.ContainerPort, port.Protocol)
routeName := listenerName
listeners = append(listeners, ToListener(listenerName, routeName, port.ContainerPort, port.Protocol))
var rr []*route.Route
for _, rule := range a.Rules {
clusterName := fmt.Sprintf("%s_%v", rule.LocalTunIP, port.ContainerPort)
clusters = append(clusters, ToCluster(clusterName))
endpoints = append(endpoints, ToEndPoint(clusterName, rule.LocalTunIP, port.ContainerPort))
rr = append(rr, ToRoute(clusterName, rule.Headers))
}
rr = append(rr, DefaultRoute())
routes = append(routes, &route.RouteConfiguration{
Name: routeName,
VirtualHosts: []*route.VirtualHost{
{
Name: "local_service",
Domains: []string{"*"},
Routes: rr,
},
},
})
}
return
}
func ToEndPoint(clusterName string, localTunIP string, port int32) *endpoint.ClusterLoadAssignment {
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*endpoint.LocalityLbEndpoints{{
LbEndpoints: []*endpoint.LbEndpoint{{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Address: localTunIP,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
},
},
}},
}},
}
}
func ToCluster(clusterName string) *cluster.Cluster {
return &cluster.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
DnsLookupFamily: cluster.Cluster_V4_ONLY,
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
EdsConfig: &core.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{
ApiConfigSource: &core.ApiConfigSource{
TransportApiVersion: resource.DefaultAPIVersion,
ApiType: core.ApiConfigSource_GRPC,
SetNodeOnFirstMessageOnly: true,
GrpcServices: []*core.GrpcService{{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"},
},
}},
},
},
},
},
}
}
func OriginCluster() *cluster.Cluster {
return &cluster.Cluster{
Name: "origin_cluster",
ConnectTimeout: durationpb.New(time.Second * 5),
LbPolicy: cluster.Cluster_CLUSTER_PROVIDED,
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_ORIGINAL_DST,
},
}
}
func ToRoute(clusterName string, headers map[string]string) *route.Route {
var r []*route.HeaderMatcher
for k, v := range headers {
r = append(r, &route.HeaderMatcher{
Name: k,
HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{
StringMatch: &v32.StringMatcher{
MatchPattern: &v32.StringMatcher_Exact{
Exact: v,
},
},
},
})
}
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
Headers: r,
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: clusterName,
},
},
},
}
}
func DefaultRoute() *route.Route {
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: "origin_cluster",
},
},
},
}
}
func ToListener(listenerName string, routeName string, port int32, p v1.Protocol) *listener.Listener {
var protocol core.SocketAddress_Protocol
switch p {
case v1.ProtocolTCP:
protocol = core.SocketAddress_TCP
case v1.ProtocolUDP:
protocol = core.SocketAddress_UDP
case v1.ProtocolSCTP:
protocol = core.SocketAddress_TCP
}
any := func(m proto.Message) *anypb.Any {
pbst, _ := anypb.New(m)
return pbst
}
httpManager := &hcm.HttpConnectionManager{
CodecType: hcm.HttpConnectionManager_AUTO,
StatPrefix: "http",
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
}},
RouteSpecifier: &hcm.HttpConnectionManager_Rds{
Rds: &hcm.Rds{
ConfigSource: &core.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{
ApiConfigSource: &core.ApiConfigSource{
TransportApiVersion: resource.DefaultAPIVersion,
ApiType: core.ApiConfigSource_GRPC,
SetNodeOnFirstMessageOnly: true,
GrpcServices: []*core.GrpcService{{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"},
},
}},
},
},
},
RouteConfigName: routeName,
},
},
}
//tcpConfig := &tcp.TcpProxy{
// StatPrefix: "tcp",
// ClusterSpecifier: &tcp.TcpProxy_Cluster{
// Cluster: "origin_cluster",
// },
//}
return &listener.Listener{
Name: listenerName,
TrafficDirection: core.TrafficDirection_INBOUND,
BindToPort: &wrappers.BoolValue{Value: false},
//UseOriginalDst: &wrappers.BoolValue{Value: true},
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: protocol,
Address: "0.0.0.0",
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
FilterChains: []*listener.FilterChain{
{
FilterChainMatch: &listener.FilterChainMatch{
ApplicationProtocols: []string{"http/1.0", "http/1.1", "h2c"},
},
Filters: []*listener.Filter{
{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: any(httpManager),
},
},
},
},
//{
// Filters: []*listener.Filter{
// {
// Name: wellknown.TCPProxy,
// ConfigType: &listener.Filter_TypedConfig{
// TypedConfig: any(tcpConfig),
// },
// },
// },
//},
},
//ListenerFilters: []*listener.ListenerFilter{
// {
// Name: wellknown.HttpInspector,
// ConfigType: &listener.ListenerFilter_TypedConfig{
// TypedConfig: any(&httpinspector.HttpInspector{}),
// },
// },
//},
}
}

View File

@@ -1,93 +0,0 @@
package control_plane
import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
)
type XDSCache struct {
Listeners map[string]Listener
Routes map[string]Route
Clusters map[string]Cluster
Endpoints map[string]Endpoint
}
func (xds *XDSCache) ClusterContents() []types.Resource {
var r []types.Resource
for _, c := range xds.Clusters {
r = append(r, MakeCluster(c.Name))
}
return r
}
func (xds *XDSCache) RouteContents() []types.Resource {
var routesArray []Route
for _, r := range xds.Routes {
routesArray = append(routesArray, r)
}
return []types.Resource{MakeRoute(routesArray)}
}
func (xds *XDSCache) ListenerContents() []types.Resource {
var r []types.Resource
for _, l := range xds.Listeners {
r = append(r, MakeHTTPListener(l.Name, l.RouteNames[0], l.Address, l.Port))
}
return r
}
func (xds *XDSCache) EndpointsContents() []types.Resource {
var r []types.Resource
for _, c := range xds.Clusters {
r = append(r, MakeEndpoint(c.Name, c.Endpoints))
}
return r
}
func (xds *XDSCache) AddListener(name string, routeNames []string, address string, port uint32) {
xds.Listeners[name] = Listener{
Name: name,
Address: address,
Port: port,
RouteNames: routeNames,
}
}
func (xds *XDSCache) AddRoute(name string, headers []HeaderMatch, clusterName string) {
var h []Header
for _, header := range headers {
h = append(h, Header{
Key: header.Key,
Value: header.Value,
})
}
xds.Routes[name] = Route{
Name: name,
Headers: h,
Cluster: clusterName,
}
}
func (xds *XDSCache) AddCluster(name string) {
xds.Clusters[name] = Cluster{
Name: name,
}
}
func (xds *XDSCache) AddEndpoint(clusterName, upstreamHost string, upstreamPort uint32) {
cluster := xds.Clusters[clusterName]
cluster.Endpoints = append(cluster.Endpoints, Endpoint{
UpstreamHost: upstreamHost,
UpstreamPort: upstreamPort,
})
xds.Clusters[clusterName] = cluster
}

View File

@@ -1,39 +0,0 @@
package control_plane
type EnvoyConfig struct {
NodeID string `yaml:"nodeID"`
Spec Spec `yaml:"spec"`
}
type Spec struct {
Listeners []ListenerTemp `yaml:"listeners"`
Clusters []ClusterTemp `yaml:"clusters"`
}
type ListenerTemp struct {
Name string `yaml:"name"`
Address string `yaml:"address"`
Port uint32 `yaml:"port"`
Routes []RouteTemp `yaml:"routes"`
}
type RouteTemp struct {
Name string `yaml:"name"`
Headers []HeaderMatch `yaml:"headers"`
ClusterName string `yaml:"clusters"`
}
type HeaderMatch struct {
Key string `yaml:"key"`
Value string `yaml:"value"`
}
type ClusterTemp struct {
Name string `yaml:"name"`
Endpoints []EndpointTemp `yaml:"endpoints"`
}
type EndpointTemp struct {
Address string `yaml:"address"`
Port uint32 `yaml:"port"`
}

View File

@@ -11,125 +11,81 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
"math"
"math/rand"
"os"
"strconv"
)
type Processor struct {
cache cache.SnapshotCache
// snapshotVersion holds the current version of the snapshot.
snapshotVersion int64
log logrus.FieldLogger
xdsCache XDSCache
cache cache.SnapshotCache
logger *logrus.Logger
version int64
}
func NewProcessor(cache cache.SnapshotCache, log logrus.FieldLogger) *Processor {
func NewProcessor(cache cache.SnapshotCache, log *logrus.Logger) *Processor {
return &Processor{
cache: cache,
snapshotVersion: rand.Int63n(1000),
log: log,
xdsCache: XDSCache{
Listeners: make(map[string]Listener),
Clusters: make(map[string]Cluster),
Routes: make(map[string]Route),
Endpoints: make(map[string]Endpoint),
},
cache: cache,
logger: log,
version: rand.Int63n(1000),
}
}
// newSnapshotVersion increments the current snapshotVersion
// and returns as a string.
func (p *Processor) newSnapshotVersion() string {
// Reset the snapshotVersion if it ever hits max size.
if p.snapshotVersion == math.MaxInt64 {
p.snapshotVersion = 0
func (p *Processor) newVersion() string {
if p.version == math.MaxInt64 {
p.version = 0
}
// Increment the snapshot version & return as string.
p.snapshotVersion++
return strconv.FormatInt(p.snapshotVersion, 10)
p.version++
return strconv.FormatInt(p.version, 10)
}
// ProcessFile takes a file and generates an xDS snapshot
func (p *Processor) ProcessFile(file NotifyMessage) {
// Parse file into object
envoyConfigList, err := ParseYaml(file.FilePath)
configList, err := ParseYaml(file.FilePath)
if err != nil {
p.log.Errorf("error parsing yaml file: %+v", err)
p.logger.Errorf("error parsing yaml file: %+v", err)
return
}
for _, envoyConfig := range envoyConfigList {
// Parse Listeners
for _, l := range envoyConfig.Spec.Listeners {
var lRoutes []string
for _, lr := range l.Routes {
lRoutes = append(lRoutes, lr.Name)
}
p.xdsCache.AddListener(l.Name, lRoutes, l.Address, l.Port)
for _, r := range l.Routes {
p.xdsCache.AddRoute(r.Name, r.Headers, r.ClusterName)
}
for _, config := range configList {
if len(config.Uid) == 0 {
continue
}
// Parse Clusters
for _, c := range envoyConfig.Spec.Clusters {
p.xdsCache.AddCluster(c.Name)
// Parse endpoints
for _, e := range c.Endpoints {
p.xdsCache.AddEndpoint(c.Name, e.Address, e.Port)
}
listeners, clusters, routes, endpoints := config.To()
resources := map[resource.Type][]types.Resource{
resource.ListenerType: listeners, // listeners
resource.RouteType: routes, // routes
resource.ClusterType: clusters, // clusters
resource.EndpointType: endpoints, // endpoints
resource.RuntimeType: {}, // runtimes
resource.SecretType: {}, // secrets
}
a := map[resource.Type][]types.Resource{
resource.EndpointType: p.xdsCache.EndpointsContents(), // endpoints
resource.ClusterType: p.xdsCache.ClusterContents(), // clusters
resource.RouteType: p.xdsCache.RouteContents(), // routes
resource.ListenerType: p.xdsCache.ListenerContents(), // listeners
resource.RuntimeType: {}, // runtimes
resource.SecretType: {}, // secrets
}
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(p.newSnapshotVersion(), a)
snapshot, err := cache.NewSnapshot(p.newVersion(), resources)
if err != nil {
p.log.Errorf("snapshot inconsistency err: %+v\n\n\n%+v", snapshot, err)
p.logger.Errorf("snapshot inconsistency: %v, err: %v", snapshot, err)
return
}
if err = snapshot.Consistent(); err != nil {
p.log.Errorf("snapshot inconsistency: %+v\n\n\n%+v", snapshot, err)
p.logger.Errorf("snapshot inconsistency: %v, err: %v", snapshot, err)
return
}
p.log.Debugf("will serve snapshot %+v", snapshot)
// Add the snapshot to the cache
if err = p.cache.SetSnapshot(context.TODO(), envoyConfig.NodeID, snapshot); err != nil {
p.log.Errorf("snapshot error %q for %+v", err, snapshot)
os.Exit(1)
p.logger.Debugf("will serve snapshot %+v, nodeID: %s", snapshot, config.Uid)
if err = p.cache.SetSnapshot(context.TODO(), config.Uid, snapshot); err != nil {
p.logger.Errorf("snapshot error %q for %v", err, snapshot)
p.logger.Fatal(err)
}
}
}
// ParseYaml takes in a yaml envoy config and returns a typed version
func ParseYaml(file string) ([]*EnvoyConfig, error) {
var envoyConfigs = make([]*EnvoyConfig, 0)
func ParseYaml(file string) ([]*Virtual, error) {
var virtualList = make([]*Virtual, 0)
yamlFile, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("Error reading YAML file: %s\n", err)
}
err = yaml.Unmarshal(yamlFile, &envoyConfigs)
err = yaml.Unmarshal(yamlFile, &virtualList)
if err != nil {
return nil, err
}
return envoyConfigs, nil
return virtualList, nil
}

View File

@@ -1,194 +0,0 @@
package control_plane
import (
etmv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"time"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
)
func MakeCluster(clusterName string) *cluster.Cluster {
return &cluster.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
//LoadAssignment: makeEndpoint(clusterName, UpstreamHost),
DnsLookupFamily: cluster.Cluster_V4_ONLY,
EdsClusterConfig: makeEDSCluster(),
}
}
func makeEDSCluster() *cluster.Cluster_EdsClusterConfig {
return &cluster.Cluster_EdsClusterConfig{
EdsConfig: makeConfigSource(),
}
}
func MakeEndpoint(clusterName string, eps []Endpoint) *endpoint.ClusterLoadAssignment {
var endpoints []*endpoint.LbEndpoint
for _, e := range eps {
endpoints = append(endpoints, &endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: e.UpstreamHost,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: e.UpstreamPort,
},
},
},
},
},
},
})
}
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*endpoint.LocalityLbEndpoints{{
LbEndpoints: endpoints,
}},
}
}
func MakeRoute(routes []Route) *route.RouteConfiguration {
var rts []*route.Route
for _, r := range routes {
var rr []*route.HeaderMatcher
for _, header := range r.Headers {
rr = append(rr, &route.HeaderMatcher{
Name: header.Key,
HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{
StringMatch: &etmv3.StringMatcher{
MatchPattern: &etmv3.StringMatcher_Contains{
Contains: header.Value,
},
IgnoreCase: true,
},
},
})
}
rts = append(rts, &route.Route{
//Name: r.Name,
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
Headers: rr,
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: r.Cluster,
},
},
},
})
}
//rts = append(rts, &route.Route{
// //Name: r.Name,
// Match: &route.RouteMatch{
// PathSpecifier: &route.RouteMatch_Prefix{
// Prefix: "/",
// },
// },
// Action: &route.Route_Route{
// Route: &route.RouteAction{
// ClusterSpecifier: &route.RouteAction_Cluster{
// Cluster: "default_cluster",
// },
// },
// },
//})
return &route.RouteConfiguration{
Name: "listener_0",
VirtualHosts: []*route.VirtualHost{{
Name: "local_service",
Domains: []string{"*"},
Routes: rts,
}},
}
}
func MakeHTTPListener(listenerName, route, address string, port uint32) *listener.Listener {
// HTTP filter configuration
manager := &hcm.HttpConnectionManager{
CodecType: hcm.HttpConnectionManager_AUTO,
StatPrefix: "http",
RouteSpecifier: &hcm.HttpConnectionManager_Rds{
Rds: &hcm.Rds{
ConfigSource: makeConfigSource(),
RouteConfigName: "listener_0",
},
},
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
}},
}
pbst, err := anypb.New(manager)
if err != nil {
panic(err)
}
return &listener.Listener{
Name: listenerName,
BindToPort: &wrappers.BoolValue{
Value: false,
},
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: address,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
FilterChains: []*listener.FilterChain{{
Filters: []*listener.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: pbst,
},
}},
}},
}
}
func makeConfigSource() *core.ConfigSource {
source := &core.ConfigSource{}
source.ResourceApiVersion = resource.DefaultAPIVersion
source.ConfigSourceSpecifier = &core.ConfigSource_ApiConfigSource{
ApiConfigSource: &core.ApiConfigSource{
TransportApiVersion: resource.DefaultAPIVersion,
ApiType: core.ApiConfigSource_GRPC,
SetNodeOnFirstMessageOnly: true,
GrpcServices: []*core.GrpcService{{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"},
},
}},
},
}
return source
}

View File

@@ -20,7 +20,6 @@ const (
grpcMaxConcurrentStreams = 1000000
)
// RunServer starts an xDS server at the given port.
func RunServer(ctx context.Context, server serverv3.Server, port uint) {
grpcServer := grpc.NewServer(grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
@@ -40,6 +39,6 @@ func RunServer(ctx context.Context, server serverv3.Server, port uint) {
log.Printf("management server listening on %d\n", port)
if err = grpcServer.Serve(listener); err != nil {
log.Println(err)
log.Fatal(err)
}
}

View File

@@ -18,7 +18,6 @@ import (
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/yaml"
"strconv"
"strings"
"time"
)
@@ -26,7 +25,6 @@ import (
// https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio
// patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1
// TODO support multiple port
func PatchSidecar(factory cmdutil.Factory, clientset v12.ConfigMapInterface, namespace, workloads string, c util.PodRouteConfig, headers map[string]string) error {
//t := true
//zero := int64(0)
@@ -47,7 +45,7 @@ func PatchSidecar(factory cmdutil.Factory, clientset v12.ConfigMapInterface, nam
for _, container := range templateSpec.Spec.Containers {
port = append(port, container.Ports...)
}
nodeID := fmt.Sprintf("%s-%s-%s", object.Mapping.Resource.Resource, object.Mapping.Resource.Group, object.Name)
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
err = addEnvoyConfig(clientset, nodeID, c.LocalTunIP, headers, port)
if err != nil {
@@ -137,12 +135,12 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
return err
}
func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTUNIP string, headers map[string]string, containerPorts []v1.ContainerPort) error {
func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTUNIP string, headers map[string]string, port []v1.ContainerPort) error {
configMap, err := mapInterface.Get(context.TODO(), config2.PodTrafficManager, metav1.GetOptions{})
if err != nil {
return err
}
var v = make([]*control_plane.EnvoyConfig, 0)
var v = make([]*control_plane.Virtual, 0)
if str, ok := configMap.Data[config2.Envoy]; ok {
if err = yaml.Unmarshal([]byte(str), &v); err != nil {
return err
@@ -150,78 +148,24 @@ func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTUN
}
var index = -1
for i, virtual := range v {
if nodeID == virtual.NodeID {
if nodeID == virtual.Uid {
index = i
}
}
var h []control_plane.HeaderMatch
for k, v := range headers {
h = append(h, control_plane.HeaderMatch{Key: k, Value: v})
}
var l []control_plane.ListenerTemp
var c []control_plane.ClusterTemp
// if we can't find nodeID, just add it
if index < 0 {
for _, port := range containerPorts {
clusterName := localTUNIP + "_" + strconv.Itoa(int(port.ContainerPort))
c = append(c, control_plane.ClusterTemp{
Name: clusterName,
Endpoints: []control_plane.EndpointTemp{{Address: localTUNIP, Port: uint32(port.ContainerPort)}},
})
l = append(l, control_plane.ListenerTemp{
Name: strconv.Itoa(int(port.ContainerPort)),
Address: "0.0.0.0",
Port: uint32(port.ContainerPort),
Routes: []control_plane.RouteTemp{
{
Headers: h,
ClusterName: clusterName,
},
{
Headers: nil,
ClusterName: "origin_cluster",
},
},
})
}
v = append(v, &control_plane.EnvoyConfig{
NodeID: nodeID,
Spec: control_plane.Spec{
Listeners: l,
Clusters: c,
},
v = append(v, &control_plane.Virtual{
Uid: nodeID,
Ports: port,
Rules: []*control_plane.Rule{{
Headers: headers,
LocalTunIP: localTUNIP,
}},
})
} else {
// if listener already exist, needs to add route
// if not exist, needs to create this listener, and then add route
// make sure position of default route is last
for _, port := range containerPorts {
clusterName := localTUNIP + "_" + strconv.Itoa(int(port.ContainerPort))
for _, listener := range v[index].Spec.Listeners {
if listener.Port == uint32(port.ContainerPort) {
listener.Routes = append(
[]control_plane.RouteTemp{{Headers: h, ClusterName: clusterName}}, listener.Routes...,
)
}
}
var found = false
for _, cluster := range v[index].Spec.Clusters {
if cluster.Name == clusterName {
found = true
break
}
}
if !found {
v[index].Spec.Clusters = append(v[index].Spec.Clusters, control_plane.ClusterTemp{
Name: clusterName,
Endpoints: []control_plane.EndpointTemp{{
Address: localTUNIP,
Port: uint32(port.ContainerPort),
}},
})
}
}
v[index].Rules = append(v[index].Rules, &control_plane.Rule{
Headers: headers,
LocalTunIP: localTUNIP,
})
}
marshal, err := yaml.Marshal(v)
@@ -256,6 +200,13 @@ func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, heade
}
}
}
// remove default
for i := 0; i < len(v); i++ {
if nodeID == v[i].Uid && len(v[i].Rules) == 0 {
v = append(v[:i], v[i+1:]...)
i--
}
}
marshal, err := yaml.Marshal(v)
if err != nil {
return err

View File

@@ -54,12 +54,8 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, nodeId string, c util.PodRouteCo
"iptables -F;" +
"iptables -P INPUT ACCEPT;" +
"iptables -P FORWARD ACCEPT;" +
//"iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" +
//"iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" +
"iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" +
"iptables -t nat -A POSTROUTING -p tcp -m tcp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" +
"iptables -t nat -A PREROUTING -i eth0 -p udp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" +
"iptables -t nat -A POSTROUTING -p udp -m udp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" +
"iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" +
"iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" +
"kubevpn serve -L 'tun://0.0.0.0:8421/" + c.TrafficManagerRealIP + ":8422?net=" + c.InboundPodTunIP + "&route=" + c.Route + "' --debug=true",
},
SecurityContext: &v1.SecurityContext{
@@ -89,7 +85,7 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, nodeId string, c util.PodRouteCo
Image: config.ImageMesh,
Command: []string{"envoy", "-l", "debug", "--config-yaml"},
Args: []string{
fmt.Sprintf(s, nodeId, nodeId /*c.TrafficManagerRealIP*/, "127.0.0.1"),
fmt.Sprintf(s, nodeId, nodeId, c.TrafficManagerRealIP),
},
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
@@ -103,36 +99,4 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, nodeId string, c util.PodRouteCo
},
ImagePullPolicy: v1.PullAlways,
})
spec.Spec.Containers = append(spec.Spec.Containers, v1.Container{
Name: config.SidecarControlPlane,
Image: config.ImageControlPlane,
Command: []string{"envoy-xds-server"},
Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy-config.yaml"},
VolumeMounts: []v1.VolumeMount{
{
Name: config.VolumeEnvoyConfig,
ReadOnly: true,
MountPath: "/etc/envoy",
},
},
ImagePullPolicy: v1.PullAlways,
})
f := false
spec.Spec.Volumes = append(spec.Spec.Volumes, v1.Volume{
Name: config.VolumeEnvoyConfig,
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: config.PodTrafficManager,
},
Items: []v1.KeyToPath{
{
Key: config.Envoy,
Path: "envoy-config.yaml",
},
},
Optional: &f,
},
},
})
}

View File

@@ -79,7 +79,7 @@ func CreateOutboundPod(clientset *kubernetes.Clientset, namespace string, traffi
Items: []v1.KeyToPath{
{
Key: config.Envoy,
Path: "envoy.yaml",
Path: "envoy-config.yaml",
},
},
Optional: &f,
@@ -114,20 +114,20 @@ func CreateOutboundPod(clientset *kubernetes.Clientset, namespace string, traffi
},
ImagePullPolicy: v1.PullAlways,
},
//{
// Name: config.SidecarControlPlane,
// Image: config.ImageControlPlane,
// Command: []string{"envoy-xds-server"},
// Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy.yaml"},
// VolumeMounts: []v1.VolumeMount{
// {
// Name: config.VolumeEnvoyConfig,
// ReadOnly: true,
// MountPath: "/etc/envoy",
// },
// },
// ImagePullPolicy: v1.PullAlways,
//},
{
Name: config.SidecarControlPlane,
Image: config.ImageControlPlane,
Command: []string{"envoy-xds-server"},
Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy-config.yaml"},
VolumeMounts: []v1.VolumeMount{
{
Name: config.VolumeEnvoyConfig,
ReadOnly: true,
MountPath: "/etc/envoy",
},
},
ImagePullPolicy: v1.PullAlways,
},
},
RestartPolicy: v1.RestartPolicyAlways,
PriorityClassName: "system-cluster-critical",