support multiple port with envoy

This commit is contained in:
p_caiwfeng
2022-03-07 16:03:14 +08:00
committed by wencaiwulue
parent e7a9d28a6c
commit a48a256bdc
26 changed files with 896 additions and 738 deletions

View File

@@ -75,9 +75,9 @@ jobs:
rm '/usr/local/bin/kubectl'
set -x
docker version
docker pull naison/kubevpn:v2
docker pull naison/kubevpnmesh:v2
docker run --rm hello-world
docker pull naison/kubevpn:test
docker pull naison/kubevpn-mesh:test
docker pull naison/envoy-xds-server:test
- name: Install minikube
run: |

View File

@@ -10,7 +10,7 @@ OS_ARCH := ${GOOS}/${GOARCH}
BASE := github.com/wencaiwulue/kubevpn
FOLDER := ${BASE}/cmd/kubevpn
CONTROL_PLANE_FOLDER := ${BASE}/pkg/controlplane/cmd/server
CONTROL_PLANE_FOLDER := ${BASE}/cmd/mesh
# Setup the -ldflags option for go build here, interpolate the variable values
LDFLAGS=--ldflags "\
@@ -84,14 +84,14 @@ image: kubevpn-linux-amd64
.PHONY: image-mesh
image-mesh:
docker build -t naison/kubevpnmesh:${VERSION} -f ./dockerfile/mesh/Dockerfile .
docker push naison/kubevpnmesh:${VERSION}
docker build -t naison/kubevpn-mesh:${VERSION} -f ./dockerfile/mesh/Dockerfile .
docker push naison/kubevpn-mesh:${VERSION}
.PHONY: image-control-plane
image-control-plane:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o envoy-xds-server ${CONTROL_PLANE_FOLDER}
chmod +x envoy-xds-server
docker build -t naison/envoy-xds-server:${VERSION} -f ./dockerfile/controlplane/Dockerfile .
docker build -t naison/envoy-xds-server:${VERSION} -f ./dockerfile/control_plane/Dockerfile .
rm -fr envoy-xds-server
docker push naison/envoy-xds-server:${VERSION}

109
cmd/mesh/main.go Normal file
View File

@@ -0,0 +1,109 @@
package main
import (
"context"
"flag"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/control_plane"
"github.com/wencaiwulue/kubevpn/util"
)
var (
logger log.FieldLogger
watchDirectoryFileName string
port uint = 9002
)
func init() {
logger = log.New()
log.SetLevel(log.DebugLevel)
log.SetReportCaller(true)
log.SetFormatter(&util.Format{})
flag.StringVar(&watchDirectoryFileName, "watchDirectoryFileName", "/etc/envoy/envoy-config.yaml", "full path to directory to watch for files")
}
func main() {
// Create a cache
snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, logger)
// Create a processor
proc := control_plane.NewProcessor(snapshotCache, log.WithField("context", "processor"))
go func() {
// Run the xDS server
ctx := context.Background()
srv := serverv3.NewServer(ctx, snapshotCache, nil)
control_plane.RunServer(ctx, srv, port)
}()
// Notify channel for file system events
notifyCh := make(chan control_plane.NotifyMessage)
go func() {
// Watch for file changes
control_plane.Watch(watchDirectoryFileName, notifyCh)
}()
for {
select {
case msg := <-notifyCh:
log.Infof("path: %s, event: %v", msg.FilePath, msg.Operation)
proc.ProcessFile(msg)
}
}
//config, err := rest.InClusterConfig()
//if err != nil {
// panic(err)
//}
//clientset, err := kubernetes.NewForConfig(config)
//if err != nil {
// panic(err)
//}
//namespace, _, err := util.Namespace()
//if err != nil {
// panic(err)
//}
//informerFactory := informers.NewSharedInformerFactoryWithOptions(
// clientset,
// time.Second*5,
// informers.WithNamespace(namespace),
// informers.WithTweakListOptions(func(options *metav1.ListOptions) {
// options.FieldSelector = fields.OneTermEqualSelector(
// "metadata.name", config2.PodTrafficManager,
// ).String()
// }),
//)
//informer, err := informerFactory.ForResource(v1.SchemeGroupVersion.WithResource("configmaps"))
//if err != nil {
// panic(err)
//}
//informer.Informer().AddEventHandler(
// k8scache.FilteringResourceEventHandler{
// FilterFunc: func(obj interface{}) bool {
// if cm, ok := obj.(*v1.ConfigMap); ok {
// if _, found := cm.Data[config2.Envoy]; found {
// return true
// }
// }
// return false
// },
// Handler: k8scache.ResourceEventHandlerFuncs{
// AddFunc: func(obj interface{}) {
// proc.ProcessConfig(obj.(*v1.ConfigMap).Data[config2.Envoy])
// },
// UpdateFunc: func(_, newObj interface{}) {
// proc.ProcessConfig(newObj.(*v1.ConfigMap).Data[config2.Envoy])
// },
// DeleteFunc: func(obj interface{}) {
// proc.ProcessConfig(obj.(*v1.ConfigMap).Data[config2.Envoy])
// },
// },
// },
//)
//informerFactory.Start(context.TODO().Done())
//informerFactory.WaitForCacheSync(context.TODO().Done())
//<-context.TODO().Done()
}

View File

@@ -7,19 +7,24 @@ import (
const (
PodTrafficManager = "kubevpn.traffic.manager"
DHCP = "DHCP"
Envoy = "ENVOY_CONFIG"
SidecarEnvoyProxy = "envoy-proxy"
SidecarControlPlane = "control-plane"
SidecarEnvoyConfig = "envoy-config"
SidecarVPN = "vpn"
VolumeEnvoyConfig = "envoy-config"
s = "223.254.254.100/24"
)
var (
Version = ""
// Version inject --ldflags -X
Version = ""
ImageServer = "naison/kubevpn:" + Version
ImageMesh = "naison/kubevpnmesh:" + Version
ImageMesh = "naison/kubevpn-mesh:" + Version
ImageControlPlane = "naison/envoy-xds-server:" + Version
)

View File

@@ -0,0 +1,6 @@
FROM ubuntu:latest
WORKDIR /app
RUN sed -i s@/security.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list \
&& sed -i s@/archive.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list
RUN apt-get clean && apt-get update && apt-get install -y wget dnsutils vim curl net-tools iptables iputils-ping lsof iproute2 tcpdump
COPY envoy-xds-server /bin/envoy-xds-server

View File

@@ -1,2 +0,0 @@
FROM scratch
COPY envoy-xds-server /bin/envoy-xds-server

View File

@@ -1,4 +1,4 @@
FROM envoyproxy/envoy-dev:5f7d6efb5786ee3de31b1fb37c78fa281718b704
FROM envoyproxy/envoy:v1.21.1
WORKDIR /app
RUN sed -i s@/security.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list \

View File

@@ -97,14 +97,14 @@ func (c *ConnectOptions) DoConnect() (err error) {
return
}
trafficMangerNet := net.IPNet{IP: config.RouterIP, Mask: config.CIDR.Mask}
c.routerIP, err = CreateOutboundPod(c.clientset, c.Namespace, trafficMangerNet.String(), c.cidrs)
if err != nil {
return
}
c.dhcp = NewDHCPManager(c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, &trafficMangerNet)
if err = c.dhcp.InitDHCP(); err != nil {
return
}
c.routerIP, err = CreateOutboundPod(c.clientset, c.Namespace, trafficMangerNet.String(), c.cidrs)
if err != nil {
return
}
if err = c.createRemoteInboundPod(); err != nil {
return
}
@@ -344,6 +344,7 @@ func (c *ConnectOptions) InitClient() (err error) {
return
}
}
c.factory.ToRESTConfig()
log.Infof("kubeconfig path: %s, namespace: %s, services: %v", c.KubeconfigPath, c.Namespace, c.Workloads)
return
}

View File

@@ -1,4 +1,4 @@
package resources
package control_plane
type Listener struct {
Name string

View File

@@ -0,0 +1,280 @@
package control_plane
import (
"fmt"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v32 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
v1 "k8s.io/api/core/v1"
"time"
)
type Virtual struct {
Uid string // group.resource.name
Ports []v1.ContainerPort
Rules []*Rule
}
type Rule struct {
Headers map[string]string
LocalTunIP string
}
func (a *Virtual) To() (
listeners []types.Resource,
clusters []types.Resource,
routes []types.Resource,
endpoints []types.Resource,
) {
//clusters = append(clusters, OriginCluster())
for _, port := range a.Ports {
listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, port.ContainerPort, port.Protocol)
routeName := listenerName
listeners = append(listeners, ToListener(listenerName, routeName, port.ContainerPort, port.Protocol))
var rr []*route.Route
for _, rule := range a.Rules {
clusterName := fmt.Sprintf("%s_%v", rule.LocalTunIP, port.ContainerPort)
clusters = append(clusters, ToCluster(clusterName))
endpoints = append(endpoints, ToEndPoint(clusterName, rule.LocalTunIP, port.ContainerPort))
rr = append(rr, ToRoute(clusterName, rule.Headers))
}
rr = append(rr, DefaultRoute())
routes = append(routes, &route.RouteConfiguration{
Name: routeName,
VirtualHosts: []*route.VirtualHost{
{
Name: "local_service",
Domains: []string{"*"},
Routes: rr,
},
},
})
}
return
}
func ToEndPoint(clusterName string, localTunIP string, port int32) *endpoint.ClusterLoadAssignment {
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*endpoint.LocalityLbEndpoints{{
LbEndpoints: []*endpoint.LbEndpoint{{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Address: localTunIP,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
},
},
}},
}},
}
}
func ToCluster(clusterName string) *cluster.Cluster {
return &cluster.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
DnsLookupFamily: cluster.Cluster_V4_ONLY,
EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{
EdsConfig: &core.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{
ApiConfigSource: &core.ApiConfigSource{
TransportApiVersion: resource.DefaultAPIVersion,
ApiType: core.ApiConfigSource_GRPC,
SetNodeOnFirstMessageOnly: true,
GrpcServices: []*core.GrpcService{{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"},
},
}},
},
},
},
},
}
}
func OriginCluster() *cluster.Cluster {
return &cluster.Cluster{
Name: "origin_cluster",
ConnectTimeout: durationpb.New(time.Second * 5),
LbPolicy: cluster.Cluster_CLUSTER_PROVIDED,
ClusterDiscoveryType: &cluster.Cluster_Type{
Type: cluster.Cluster_ORIGINAL_DST,
},
}
}
func ToRoute(clusterName string, headers map[string]string) *route.Route {
var r []*route.HeaderMatcher
for k, v := range headers {
r = append(r, &route.HeaderMatcher{
Name: k,
HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{
StringMatch: &v32.StringMatcher{
MatchPattern: &v32.StringMatcher_Exact{
Exact: v,
},
},
},
})
}
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
Headers: r,
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: clusterName,
},
},
},
}
}
func DefaultRoute() *route.Route {
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: "origin_cluster",
},
},
},
}
}
func ToListener(listenerName string, routeName string, port int32, p v1.Protocol) *listener.Listener {
var protocol core.SocketAddress_Protocol
switch p {
case v1.ProtocolTCP:
protocol = core.SocketAddress_TCP
case v1.ProtocolUDP:
protocol = core.SocketAddress_UDP
case v1.ProtocolSCTP:
protocol = core.SocketAddress_TCP
}
any := func(m proto.Message) *anypb.Any {
pbst, _ := anypb.New(m)
return pbst
}
httpManager := &hcm.HttpConnectionManager{
CodecType: hcm.HttpConnectionManager_AUTO,
StatPrefix: "http",
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
}},
RouteSpecifier: &hcm.HttpConnectionManager_Rds{
Rds: &hcm.Rds{
ConfigSource: &core.ConfigSource{
ResourceApiVersion: resource.DefaultAPIVersion,
ConfigSourceSpecifier: &core.ConfigSource_ApiConfigSource{
ApiConfigSource: &core.ApiConfigSource{
TransportApiVersion: resource.DefaultAPIVersion,
ApiType: core.ApiConfigSource_GRPC,
SetNodeOnFirstMessageOnly: true,
GrpcServices: []*core.GrpcService{{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{ClusterName: "xds_cluster"},
},
}},
},
},
},
RouteConfigName: routeName,
},
},
}
//tcpConfig := &tcp.TcpProxy{
// StatPrefix: "tcp",
// ClusterSpecifier: &tcp.TcpProxy_Cluster{
// Cluster: "origin_cluster",
// },
//}
return &listener.Listener{
Name: listenerName,
TrafficDirection: core.TrafficDirection_INBOUND,
BindToPort: &wrappers.BoolValue{Value: false},
//UseOriginalDst: &wrappers.BoolValue{Value: true},
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: protocol,
Address: "0.0.0.0",
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: uint32(port),
},
},
},
},
FilterChains: []*listener.FilterChain{
{
FilterChainMatch: &listener.FilterChainMatch{
ApplicationProtocols: []string{"http/1.0", "http/1.1", "h2c"},
},
Filters: []*listener.Filter{
{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: any(httpManager),
},
},
},
},
//{
// Filters: []*listener.Filter{
// {
// Name: wellknown.TCPProxy,
// ConfigType: &listener.Filter_TypedConfig{
// TypedConfig: any(tcpConfig),
// },
// },
// },
//},
},
//ListenerFilters: []*listener.ListenerFilter{
// {
// Name: wellknown.HttpInspector,
// ConfigType: &listener.ListenerFilter_TypedConfig{
// TypedConfig: any(&httpinspector.HttpInspector{}),
// },
// },
//},
}
}

View File

@@ -1,23 +1,21 @@
package xdscache
package control_plane
import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/apis/v1alpha1"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/resources"
)
type XDSCache struct {
Listeners map[string]resources.Listener
Routes map[string]resources.Route
Clusters map[string]resources.Cluster
Endpoints map[string]resources.Endpoint
Listeners map[string]Listener
Routes map[string]Route
Clusters map[string]Cluster
Endpoints map[string]Endpoint
}
func (xds *XDSCache) ClusterContents() []types.Resource {
var r []types.Resource
for _, c := range xds.Clusters {
r = append(r, resources.MakeCluster(c.Name))
r = append(r, MakeCluster(c.Name))
}
return r
@@ -25,19 +23,19 @@ func (xds *XDSCache) ClusterContents() []types.Resource {
func (xds *XDSCache) RouteContents() []types.Resource {
var routesArray []resources.Route
var routesArray []Route
for _, r := range xds.Routes {
routesArray = append(routesArray, r)
}
return []types.Resource{resources.MakeRoute(routesArray)}
return []types.Resource{MakeRoute(routesArray)}
}
func (xds *XDSCache) ListenerContents() []types.Resource {
var r []types.Resource
for _, l := range xds.Listeners {
r = append(r, resources.MakeHTTPListener(l.Name, l.RouteNames[0], l.Address, l.Port))
r = append(r, MakeHTTPListener(l.Name, l.RouteNames[0], l.Address, l.Port))
}
return r
@@ -47,14 +45,14 @@ func (xds *XDSCache) EndpointsContents() []types.Resource {
var r []types.Resource
for _, c := range xds.Clusters {
r = append(r, resources.MakeEndpoint(c.Name, c.Endpoints))
r = append(r, MakeEndpoint(c.Name, c.Endpoints))
}
return r
}
func (xds *XDSCache) AddListener(name string, routeNames []string, address string, port uint32) {
xds.Listeners[name] = resources.Listener{
xds.Listeners[name] = Listener{
Name: name,
Address: address,
Port: port,
@@ -62,23 +60,23 @@ func (xds *XDSCache) AddListener(name string, routeNames []string, address strin
}
}
func (xds *XDSCache) AddRoute(name string, headers []v1alpha1.HeaderMatch, clusters []string) {
var h []resources.Header
func (xds *XDSCache) AddRoute(name string, headers []HeaderMatch, clusterName string) {
var h []Header
for _, header := range headers {
h = append(h, resources.Header{
h = append(h, Header{
Key: header.Key,
Value: header.Value,
})
}
xds.Routes[name] = resources.Route{
xds.Routes[name] = Route{
Name: name,
Headers: h,
Cluster: clusters[0],
Cluster: clusterName,
}
}
func (xds *XDSCache) AddCluster(name string) {
xds.Clusters[name] = resources.Cluster{
xds.Clusters[name] = Cluster{
Name: name,
}
}
@@ -86,7 +84,7 @@ func (xds *XDSCache) AddCluster(name string) {
func (xds *XDSCache) AddEndpoint(clusterName, upstreamHost string, upstreamPort uint32) {
cluster := xds.Clusters[clusterName]
cluster.Endpoints = append(cluster.Endpoints, resources.Endpoint{
cluster.Endpoints = append(cluster.Endpoints, Endpoint{
UpstreamHost: upstreamHost,
UpstreamPort: upstreamPort,
})

View File

@@ -0,0 +1,39 @@
package control_plane
type EnvoyConfig struct {
NodeID string `yaml:"nodeID"`
Spec Spec `yaml:"spec"`
}
type Spec struct {
Listeners []ListenerTemp `yaml:"listeners"`
Clusters []ClusterTemp `yaml:"clusters"`
}
type ListenerTemp struct {
Name string `yaml:"name"`
Address string `yaml:"address"`
Port uint32 `yaml:"port"`
Routes []RouteTemp `yaml:"routes"`
}
type RouteTemp struct {
Name string `yaml:"name"`
Headers []HeaderMatch `yaml:"headers"`
ClusterName string `yaml:"clusters"`
}
type HeaderMatch struct {
Key string `yaml:"key"`
Value string `yaml:"value"`
}
type ClusterTemp struct {
Name string `yaml:"name"`
Endpoints []EndpointTemp `yaml:"endpoints"`
}
type EndpointTemp struct {
Address string `yaml:"address"`
Port uint32 `yaml:"port"`
}

View File

@@ -0,0 +1,135 @@
package control_plane
import (
"context"
"fmt"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/sirupsen/logrus"
"io/ioutil"
"k8s.io/apimachinery/pkg/util/yaml"
"math"
"math/rand"
"os"
"strconv"
)
type Processor struct {
cache cache.SnapshotCache
// snapshotVersion holds the current version of the snapshot.
snapshotVersion int64
log logrus.FieldLogger
xdsCache XDSCache
}
func NewProcessor(cache cache.SnapshotCache, log logrus.FieldLogger) *Processor {
return &Processor{
cache: cache,
snapshotVersion: rand.Int63n(1000),
log: log,
xdsCache: XDSCache{
Listeners: make(map[string]Listener),
Clusters: make(map[string]Cluster),
Routes: make(map[string]Route),
Endpoints: make(map[string]Endpoint),
},
}
}
// newSnapshotVersion increments the current snapshotVersion
// and returns as a string.
func (p *Processor) newSnapshotVersion() string {
// Reset the snapshotVersion if it ever hits max size.
if p.snapshotVersion == math.MaxInt64 {
p.snapshotVersion = 0
}
// Increment the snapshot version & return as string.
p.snapshotVersion++
return strconv.FormatInt(p.snapshotVersion, 10)
}
// ProcessFile takes a file and generates an xDS snapshot
func (p *Processor) ProcessFile(file NotifyMessage) {
// Parse file into object
envoyConfigList, err := ParseYaml(file.FilePath)
if err != nil {
p.log.Errorf("error parsing yaml file: %+v", err)
return
}
for _, envoyConfig := range envoyConfigList {
// Parse Listeners
for _, l := range envoyConfig.Spec.Listeners {
var lRoutes []string
for _, lr := range l.Routes {
lRoutes = append(lRoutes, lr.Name)
}
p.xdsCache.AddListener(l.Name, lRoutes, l.Address, l.Port)
for _, r := range l.Routes {
p.xdsCache.AddRoute(r.Name, r.Headers, r.ClusterName)
}
}
// Parse Clusters
for _, c := range envoyConfig.Spec.Clusters {
p.xdsCache.AddCluster(c.Name)
// Parse endpoints
for _, e := range c.Endpoints {
p.xdsCache.AddEndpoint(c.Name, e.Address, e.Port)
}
}
a := map[resource.Type][]types.Resource{
resource.EndpointType: p.xdsCache.EndpointsContents(), // endpoints
resource.ClusterType: p.xdsCache.ClusterContents(), // clusters
resource.RouteType: p.xdsCache.RouteContents(), // routes
resource.ListenerType: p.xdsCache.ListenerContents(), // listeners
resource.RuntimeType: {}, // runtimes
resource.SecretType: {}, // secrets
}
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(p.newSnapshotVersion(), a)
if err != nil {
p.log.Errorf("snapshot inconsistency err: %+v\n\n\n%+v", snapshot, err)
return
}
if err = snapshot.Consistent(); err != nil {
p.log.Errorf("snapshot inconsistency: %+v\n\n\n%+v", snapshot, err)
return
}
p.log.Debugf("will serve snapshot %+v", snapshot)
// Add the snapshot to the cache
if err = p.cache.SetSnapshot(context.TODO(), envoyConfig.NodeID, snapshot); err != nil {
p.log.Errorf("snapshot error %q for %+v", err, snapshot)
os.Exit(1)
}
}
}
// ParseYaml takes in a yaml envoy config and returns a typed version
func ParseYaml(file string) ([]*EnvoyConfig, error) {
var envoyConfigs = make([]*EnvoyConfig, 0)
yamlFile, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("Error reading YAML file: %s\n", err)
}
err = yaml.Unmarshal(yamlFile, &envoyConfigs)
if err != nil {
return nil, err
}
return envoyConfigs, nil
}

View File

@@ -1,7 +1,8 @@
package resources
package control_plane
import (
etmv3 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"time"
@@ -149,6 +150,9 @@ func MakeHTTPListener(listenerName, route, address string, port uint32) *listene
return &listener.Listener{
Name: listenerName,
BindToPort: &wrappers.BoolValue{
Value: false,
},
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
@@ -188,77 +192,3 @@ func makeConfigSource() *core.ConfigSource {
}
return source
}
// todo consider using redirect instead of route
// doc https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/dynamic_forward_proxy_filter#config-http-filters-dynamic-forward-proxy
// &route.Route_Redirect{
// Redirect: &route.RedirectAction{
// HostRedirect: "",
// },
var _ = `
admin:
address:
socket_address:
protocol: TCP
address: 127.0.0.1
port_value: 9901
static_resources:
listeners:
- name: listener_0
address:
socket_address:
protocol: TCP
address: 0.0.0.0
port_value: 15006
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match:
prefix: "/"
headers:
- name: "a"
exact_match: "2"
route:
cluster: dynamic_forward_proxy_cluster
typed_per_filter_config:
envoy.filters.http.dynamic_forward_proxy:
"@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.PerRouteConfig
host_rewrite_literal: 223.254.254.73:9080
- match:
prefix: "/"
route:
cluster: dynamic_forward_proxy_cluster
typed_per_filter_config:
envoy.filters.http.dynamic_forward_proxy:
"@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.PerRouteConfig
host_rewrite_literal: 127.0.0.1:9080
http_filters:
- name: envoy.filters.http.dynamic_forward_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig
dns_cache_config:
name: dynamic_forward_proxy_cache_config
dns_lookup_family: V4_ONLY
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: dynamic_forward_proxy_cluster
lb_policy: CLUSTER_PROVIDED
cluster_type:
name: envoy.clusters.dynamic_forward_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig
dns_cache_config:
name: dynamic_forward_proxy_cache_config
dns_lookup_family: V4_ONLY
`

View File

@@ -1,4 +1,4 @@
package server
package control_plane
import (
"context"

View File

@@ -1,4 +1,4 @@
package watcher
package control_plane
import (
"log"

View File

@@ -1,39 +0,0 @@
package v1alpha1
type EnvoyConfig struct {
Name string `yaml:"name"`
Spec `yaml:"spec"`
}
type Spec struct {
Listeners []Listener `yaml:"listeners"`
Clusters []Cluster `yaml:"clusters"`
}
type Listener struct {
Name string `yaml:"name"`
Address string `yaml:"address"`
Port uint32 `yaml:"port"`
Routes []Route `yaml:"routes"`
}
type Route struct {
Name string `yaml:"name"`
Headers []HeaderMatch `yaml:"headers"`
ClusterNames []string `yaml:"clusters"`
}
type HeaderMatch struct {
Key string `yaml:"key"`
Value string `yaml:"value"`
}
type Cluster struct {
Name string `yaml:"name"`
Endpoints []Endpoint `yaml:"endpoints"`
}
type Endpoint struct {
Address string `yaml:"address"`
Port uint32 `yaml:"port"`
}

View File

@@ -1,65 +0,0 @@
package main
import (
"context"
"flag"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/processor"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/server"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/watcher"
)
var (
l log.FieldLogger
watchDirectoryFileName string
port uint = 9002
nodeID = "test-id"
)
func init() {
l = log.New()
log.SetLevel(log.DebugLevel)
// Define the directory to watch for Envoy configuration files
flag.StringVar(&watchDirectoryFileName, "watchDirectoryFileName", "/etc/envoy/envoy-config.yaml", "full path to directory to watch for files")
}
func main() {
flag.Parse()
// Create a cache
snapshotCache := cache.NewSnapshotCache(false, cache.IDHash{}, l)
// Create a processor
proc := processor.NewProcessor(snapshotCache, nodeID, log.WithField("context", "processor"))
// Create initial snapshot from file
proc.ProcessFile(watcher.NotifyMessage{
Operation: watcher.Create,
FilePath: watchDirectoryFileName,
})
// Notify channel for file system events
notifyCh := make(chan watcher.NotifyMessage)
go func() {
// Watch for file changes
watcher.Watch(watchDirectoryFileName, notifyCh)
}()
go func() {
// Run the xDS server
ctx := context.Background()
srv := serverv3.NewServer(ctx, snapshotCache, nil)
server.RunServer(ctx, srv, port)
}()
for {
select {
case msg := <-notifyCh:
proc.ProcessFile(msg)
}
}
}

View File

@@ -1,110 +0,0 @@
/*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
* This source code is licensed under the Apache License Version 2.0.
*/
package processor
import (
"fmt"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/apis/v1alpha1"
"github.com/wencaiwulue/kubevpn/util"
"gopkg.in/yaml.v2"
"testing"
)
func TestName(t *testing.T) {
var config = v1alpha1.EnvoyConfig{
Name: "config-test",
Spec: v1alpha1.Spec{
Listeners: []v1alpha1.Listener{
{
Name: "listener1",
Address: "127.0.0.1",
Port: 15006,
Routes: []v1alpha1.Route{
{
Name: "route-0",
Headers: []v1alpha1.HeaderMatch{
{
Key: "a",
Value: "aa",
},
{
Key: "b",
Value: "bb",
},
},
ClusterNames: []string{"cluster0"},
}, {
Name: "route-1",
Headers: []v1alpha1.HeaderMatch{
{
Key: "c",
Value: "cc",
},
{
Key: "d",
Value: "dd",
},
},
ClusterNames: []string{"cluster-1"},
},
},
},
},
Clusters: []v1alpha1.Cluster{
{
Name: "cluster-0",
Endpoints: []v1alpha1.Endpoint{
{
Address: "127.0.0.1",
Port: 9101,
},
{
Address: "127.0.0.1",
Port: 9102,
},
},
}, {
Name: "cluster-1",
Endpoints: []v1alpha1.Endpoint{
{
Address: "127.0.0.1",
Port: 9103,
},
{
Address: "127.0.0.1",
Port: 9104,
},
},
},
},
},
}
marshal, _ := yaml.Marshal(config)
fmt.Println(string(marshal))
}
func TestName1(t *testing.T) {
parseYaml, err := util.ParseYamlBytes([]byte(sss))
fmt.Println(err)
fmt.Println(parseYaml)
}
var sss = `name: config-test
spec:
listeners:
- name: listener1
address: 127.0.0.1
port: 15006
routes:
- name: route-0
clusters:
- cluster-0
clusters:
- name: cluster-0
endpoints:
- address: 127.0.0.1
port: 9080
`

View File

@@ -1,120 +0,0 @@
package processor
import (
"context"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/resources"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/watcher"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/internal/xdscache"
"github.com/wencaiwulue/kubevpn/util"
"math"
"math/rand"
"os"
"strconv"
)
type Processor struct {
cache cache.SnapshotCache
nodeID string
// snapshotVersion holds the current version of the snapshot.
snapshotVersion int64
logrus.FieldLogger
xdsCache xdscache.XDSCache
}
func NewProcessor(cache cache.SnapshotCache, nodeID string, log logrus.FieldLogger) *Processor {
return &Processor{
cache: cache,
nodeID: nodeID,
snapshotVersion: rand.Int63n(1000),
FieldLogger: log,
xdsCache: xdscache.XDSCache{
Listeners: make(map[string]resources.Listener),
Clusters: make(map[string]resources.Cluster),
Routes: make(map[string]resources.Route),
Endpoints: make(map[string]resources.Endpoint),
},
}
}
// newSnapshotVersion increments the current snapshotVersion
// and returns as a string.
func (p *Processor) newSnapshotVersion() string {
// Reset the snapshotVersion if it ever hits max size.
if p.snapshotVersion == math.MaxInt64 {
p.snapshotVersion = 0
}
// Increment the snapshot version & return as string.
p.snapshotVersion++
return strconv.FormatInt(p.snapshotVersion, 10)
}
// ProcessFile takes a file and generates an xDS snapshot
func (p *Processor) ProcessFile(file watcher.NotifyMessage) {
// Parse file into object
envoyConfig, err := util.ParseYaml(file.FilePath)
if err != nil {
p.Errorf("error parsing yaml file: %+v", err)
return
}
// Parse Listeners
for _, l := range envoyConfig.Listeners {
var lRoutes []string
for _, lr := range l.Routes {
lRoutes = append(lRoutes, lr.Name)
}
p.xdsCache.AddListener(l.Name, lRoutes, l.Address, l.Port)
for _, r := range l.Routes {
p.xdsCache.AddRoute(r.Name, r.Headers, r.ClusterNames)
}
}
// Parse Clusters
for _, c := range envoyConfig.Clusters {
p.xdsCache.AddCluster(c.Name)
// Parse endpoints
for _, e := range c.Endpoints {
p.xdsCache.AddEndpoint(c.Name, e.Address, e.Port)
}
}
a := map[resource.Type][]types.Resource{
resource.EndpointType: p.xdsCache.EndpointsContents(), // endpoints
resource.ClusterType: p.xdsCache.ClusterContents(), // clusters
resource.RouteType: p.xdsCache.RouteContents(), // routes
resource.ListenerType: p.xdsCache.ListenerContents(), // listeners
resource.RuntimeType: {}, // runtimes
resource.SecretType: {}, // secrets
}
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(p.newSnapshotVersion(), a)
if err != nil {
p.Errorf("snapshot inconsistency err: %+v\n\n\n%+v", snapshot, err)
return
}
if err = snapshot.Consistent(); err != nil {
p.Errorf("snapshot inconsistency: %+v\n\n\n%+v", snapshot, err)
return
}
p.Debugf("will serve snapshot %+v", snapshot)
// Add the snapshot to the cache
if err = p.cache.SetSnapshot(context.TODO(), p.nodeID, snapshot); err != nil {
p.Errorf("snapshot error %q for %+v", err, snapshot)
os.Exit(1)
}
}

View File

@@ -2,20 +2,18 @@ package pkg
import (
"context"
"fmt"
"github.com/cilium/ipam/service/allocator"
"github.com/cilium/ipam/service/ipallocator"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/config"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
"net"
)
const (
DHCP = "DHCP"
)
type DHCPManager struct {
client v12.ConfigMapInterface
cidr *net.IPNet
@@ -34,6 +32,16 @@ func NewDHCPManager(client v12.ConfigMapInterface, namespace string, cidr *net.I
func (d *DHCPManager) InitDHCP() error {
configMap, err := d.client.Get(context.Background(), config.PodTrafficManager, metav1.GetOptions{})
if err == nil && configMap != nil {
if _, found := configMap.Data[config.Envoy]; !found {
_, err = d.client.Patch(
context.Background(),
configMap.Name,
types.MergePatchType,
[]byte(fmt.Sprintf("{\"data\":{\"%s\":\"%s\"}}", config.Envoy, "")),
metav1.PatchOptions{},
)
return err
}
return nil
}
result := &v1.ConfigMap{
@@ -42,7 +50,7 @@ func (d *DHCPManager) InitDHCP() error {
Namespace: d.namespace,
Labels: map[string]string{},
},
Data: map[string]string{},
Data: map[string]string{config.Envoy: ""},
}
_, err = d.client.Create(context.Background(), result, metav1.CreateOptions{})
if err != nil {
@@ -104,7 +112,7 @@ func (d *DHCPManager) updateDHCPConfigMap(f func(*ipallocator.Range) error) erro
if err != nil {
return err
}
if err = dhcp.Restore(d.cidr, []byte(cm.Data[DHCP])); err != nil {
if err = dhcp.Restore(d.cidr, []byte(cm.Data[config.DHCP])); err != nil {
return err
}
if err = f(dhcp); err != nil {
@@ -114,10 +122,10 @@ func (d *DHCPManager) updateDHCPConfigMap(f func(*ipallocator.Range) error) erro
if err != nil {
return err
}
cm.Data[DHCP] = string(bytes)
cm.Data[config.DHCP] = string(bytes)
_, err = d.client.Update(context.Background(), cm, metav1.UpdateOptions{})
if err != nil {
log.Errorf("update dhcp error after release ip, need to try again, err: %v", err)
log.Errorf("update dhcp failed, err: %v", err)
return err
}
return nil

View File

@@ -6,10 +6,10 @@ import (
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/apis/v1alpha1"
config2 "github.com/wencaiwulue/kubevpn/config"
"github.com/wencaiwulue/kubevpn/pkg/control_plane"
"github.com/wencaiwulue/kubevpn/pkg/mesh"
"github.com/wencaiwulue/kubevpn/util"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -17,6 +17,7 @@ import (
pkgresource "k8s.io/cli-runtime/pkg/resource"
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/yaml"
"strconv"
"strings"
"time"
@@ -42,17 +43,19 @@ func PatchSidecar(factory cmdutil.Factory, clientset v12.ConfigMapInterface, nam
origin := *templateSpec
port := uint32(templateSpec.Spec.Containers[0].Ports[0].ContainerPort)
configMapName := fmt.Sprintf("%s-%s", object.Mapping.Resource.Resource, object.Name)
var port []v1.ContainerPort
for _, container := range templateSpec.Spec.Containers {
port = append(port, container.Ports...)
}
nodeID := fmt.Sprintf("%s-%s-%s", object.Mapping.Resource.Resource, object.Mapping.Resource.Group, object.Name)
createEnvoyConfigMapIfNeeded(clientset, object.Namespace, configMapName, strconv.Itoa(int(port)))
err = addEnvoyConfig(clientset, configMapName, c.LocalTunIP, headers, port)
err = addEnvoyConfig(clientset, nodeID, c.LocalTunIP, headers, port)
if err != nil {
log.Warnln(err)
return err
}
mesh.AddMeshContainer(templateSpec, configMapName, c)
mesh.AddMeshContainer(templateSpec, nodeID, c)
helper := pkgresource.NewHelper(object.Client, object.Mapping)
bytes, err := json.Marshal([]struct {
Op string `json:"op"`
@@ -78,11 +81,6 @@ func PatchSidecar(factory cmdutil.Factory, clientset v12.ConfigMapInterface, nam
object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
}
//_ = util.WaitPod(clientset, namespace, metav1.ListOptions{
// FieldSelector: fields.OneTermEqualSelector("metadata.name", object.Name+"-shadow").String(),
//}, func(pod *v1.Pod) bool {
// return pod.Status.Phase == v1.PodRunning
//})
rollbackFuncList = append(rollbackFuncList, func() {
if err = UnPatchContainer(factory, clientset, namespace, workloads, headers); err != nil {
log.Error(err)
@@ -110,11 +108,9 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
return err
}
//port := uint32(templateSpec.Spec.Containers[0].Ports[0].ContainerPort)
configMapName := fmt.Sprintf("%s-%s", object.Mapping.Resource.Resource, object.Name)
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
//createEnvoyConfigMapIfNeeded(mapInterface, object.Namespace, configMapName, strconv.Itoa(int(port)))
err = removeEnvoyConfig(mapInterface, configMapName, headers)
err = removeEnvoyConfig(mapInterface, nodeID, headers)
if err != nil {
log.Warnln(err)
return err
@@ -138,211 +134,142 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{
//Force: &t,
})
//_ = util.WaitPod(mapInterface, namespace, metav1.ListOptions{
// FieldSelector: fields.OneTermEqualSelector("metadata.name", object.Name+"-shadow").String(),
//}, func(pod *v1.Pod) bool {
// return pod.Status.Phase == v1.PodRunning
//})
return err
}
var s = `
static_resources:
clusters:
- connect_timeout: 1s
load_assignment:
cluster_name: xds_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 9002
http2_protocol_options: {}
name: xds_cluster
dynamic_resources:
cds_config:
resource_api_version: V3
api_config_source:
api_type: GRPC
transport_api_version: V3
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
set_node_on_first_message_only: true
lds_config:
resource_api_version: V3
api_config_source:
api_type: GRPC
transport_api_version: V3
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
set_node_on_first_message_only: true
node:
cluster: test-cluster
id: test-id
layered_runtime:
layers:
- name: runtime-0
rtds_layer:
rtds_config:
resource_api_version: V3
api_config_source:
transport_api_version: V3
api_type: GRPC
grpc_services:
envoy_grpc:
cluster_name: xds_cluster
name: runtime-0
admin:
access_log_path: /dev/null
address:
socket_address:
address: 127.0.0.1
port_value: 9003
`
var ss = `name: config-test
spec:
listeners:
- name: listener1
address: 127.0.0.1
port: 15006
routes:
- name: route-0
clusters:
- cluster-0
clusters:
- name: cluster-0
endpoints:
- address: 127.0.0.1
port: %s
`
func addEnvoyConfig(clientset v12.ConfigMapInterface, workloads, localTUNIP string, headers map[string]string, port uint32) error {
get, err := clientset.Get(context.TODO(), workloads, metav1.GetOptions{})
func addEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTUNIP string, headers map[string]string, containerPorts []v1.ContainerPort) error {
configMap, err := mapInterface.Get(context.TODO(), config2.PodTrafficManager, metav1.GetOptions{})
if err != nil {
return err
}
s2, ok := get.Data["envoy-config.yaml"]
if !ok {
return errors.New("can not found value for key: envoy-config.yaml")
}
envoyConfig, err := util.ParseYamlBytes([]byte(s2))
if err != nil {
return err
}
var headersMatch []v1alpha1.HeaderMatch
for k, v := range headers {
headersMatch = append(headersMatch, v1alpha1.HeaderMatch{
Key: k,
Value: v,
})
}
// move router to front
i := len(envoyConfig.Listeners[0].Routes)
index := strconv.Itoa(i)
envoyConfig.Listeners[0].Routes = append(envoyConfig.Listeners[0].Routes, v1alpha1.Route{
Name: "route-" + index,
Headers: headersMatch,
ClusterNames: []string{"cluster-" + index},
})
// swap last element and the last second element
temp := envoyConfig.Listeners[0].Routes[i-1]
envoyConfig.Listeners[0].Routes[i-1] = envoyConfig.Listeners[0].Routes[i]
envoyConfig.Listeners[0].Routes[i] = temp
envoyConfig.Clusters = append(envoyConfig.Clusters, v1alpha1.Cluster{
Name: "cluster-" + index,
Endpoints: []v1alpha1.Endpoint{{
Address: localTUNIP,
Port: port,
}},
})
marshal, err := yaml.Marshal(envoyConfig)
if err != nil {
return err
}
get.Data["envoy-config.yaml"] = string(marshal)
_, err = clientset.Update(context.TODO(), get, metav1.UpdateOptions{})
return err
}
func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, configMapName string, headers map[string]string) error {
configMap, err := mapInterface.Get(context.TODO(), configMapName, metav1.GetOptions{})
if err != nil {
return err
}
s2, ok := configMap.Data["envoy-config.yaml"]
if !ok {
return errors.New("can not found value for key: envoy-config.yaml")
}
envoyConfig, err := util.ParseYamlBytes([]byte(s2))
if err != nil {
return err
}
var routeC []v1alpha1.Route
var name string
var route v1alpha1.Route
for _, route = range envoyConfig.Listeners[0].Routes {
var m = make(map[string]string)
for _, header := range route.Headers {
m[header.Key] = header.Value
var v = make([]*control_plane.EnvoyConfig, 0)
if str, ok := configMap.Data[config2.Envoy]; ok {
if err = yaml.Unmarshal([]byte(str), &v); err != nil {
return err
}
allMatch := true
for k, v := range headers {
if value, ok := m[k]; !ok || value != v {
allMatch = false
}
var index = -1
for i, virtual := range v {
if nodeID == virtual.NodeID {
index = i
}
}
var h []control_plane.HeaderMatch
for k, v := range headers {
h = append(h, control_plane.HeaderMatch{Key: k, Value: v})
}
var l []control_plane.ListenerTemp
var c []control_plane.ClusterTemp
// if we can't find nodeID, just add it
if index < 0 {
for _, port := range containerPorts {
clusterName := localTUNIP + "_" + strconv.Itoa(int(port.ContainerPort))
c = append(c, control_plane.ClusterTemp{
Name: clusterName,
Endpoints: []control_plane.EndpointTemp{{Address: localTUNIP, Port: uint32(port.ContainerPort)}},
})
l = append(l, control_plane.ListenerTemp{
Name: strconv.Itoa(int(port.ContainerPort)),
Address: "0.0.0.0",
Port: uint32(port.ContainerPort),
Routes: []control_plane.RouteTemp{
{
Headers: h,
ClusterName: clusterName,
},
{
Headers: nil,
ClusterName: "origin_cluster",
},
},
})
}
v = append(v, &control_plane.EnvoyConfig{
NodeID: nodeID,
Spec: control_plane.Spec{
Listeners: l,
Clusters: c,
},
})
} else {
// if listener already exist, needs to add route
// if not exist, needs to create this listener, and then add route
// make sure position of default route is last
for _, port := range containerPorts {
clusterName := localTUNIP + "_" + strconv.Itoa(int(port.ContainerPort))
for _, listener := range v[index].Spec.Listeners {
if listener.Port == uint32(port.ContainerPort) {
listener.Routes = append(
[]control_plane.RouteTemp{{Headers: h, ClusterName: clusterName}}, listener.Routes...,
)
}
}
var found = false
for _, cluster := range v[index].Spec.Clusters {
if cluster.Name == clusterName {
found = true
break
}
}
if !found {
v[index].Spec.Clusters = append(v[index].Spec.Clusters, control_plane.ClusterTemp{
Name: clusterName,
Endpoints: []control_plane.EndpointTemp{{
Address: localTUNIP,
Port: uint32(port.ContainerPort),
}},
})
}
}
if !allMatch {
routeC = append(routeC, route)
} else {
name = route.ClusterNames[0]
}
}
// move router to front
envoyConfig.Listeners[0].Routes = routeC
var clusterC []v1alpha1.Cluster
for _, cluster := range envoyConfig.Clusters {
if cluster.Name != name {
clusterC = append(clusterC, cluster)
}
}
envoyConfig.Clusters = clusterC
marshal, err := yaml.Marshal(envoyConfig)
marshal, err := yaml.Marshal(v)
if err != nil {
return err
}
configMap.Data["envoy-config.yaml"] = string(marshal)
_, err = mapInterface.Update(context.TODO(), configMap, metav1.UpdateOptions{})
configMap.Data[config2.Envoy] = string(marshal)
_, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{})
return err
}
func createEnvoyConfigMapIfNeeded(clientset v12.ConfigMapInterface, namespace, configMapName, port string) {
cm, err := clientset.Get(context.TODO(), configMapName, metav1.GetOptions{})
if err == nil && cm != nil {
return
}
configMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Namespace: namespace,
Labels: map[string]string{"kubevpn": "kubevpn"},
},
Data: map[string]string{
"base-envoy.yaml": fmt.Sprintf(s /*"kubevpn", podIp, port.TargetPort.String(), port.TargetPort.String()*/),
"envoy-config.yaml": fmt.Sprintf(ss, port),
},
}
_, err = clientset.Create(context.TODO(), &configMap, metav1.CreateOptions{})
func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, headers map[string]string) error {
configMap, err := mapInterface.Get(context.TODO(), config2.PodTrafficManager, metav1.GetOptions{})
if err != nil {
log.Warnln(err)
return err
}
str, ok := configMap.Data[config2.Envoy]
if !ok {
return errors.New("can not found value for key: envoy-config.yaml")
}
var v []*control_plane.Virtual
if err = yaml.Unmarshal([]byte(str), &v); err != nil {
return err
}
for _, virtual := range v {
if nodeID == virtual.Uid {
for i := 0; i < len(virtual.Rules); i++ {
if contains(virtual.Rules[i].Headers, headers) {
virtual.Rules = append(virtual.Rules[:i], virtual.Rules[i+1:]...)
i--
}
}
}
}
marshal, err := yaml.Marshal(v)
if err != nil {
return err
}
configMap.Data[config2.Envoy] = string(marshal)
_, err = mapInterface.Update(context.Background(), configMap, metav1.UpdateOptions{})
return err
}
func contains(a map[string]string, sub map[string]string) bool {
for k, v := range sub {
if a[k] != v {
return false
}
}
return true
}

View File

@@ -1,6 +1,7 @@
package mesh
import (
"fmt"
"github.com/wencaiwulue/kubevpn/config"
"github.com/wencaiwulue/kubevpn/util"
v1 "k8s.io/api/core/v1"
@@ -28,7 +29,7 @@ func RemoveContainers(spec *v1.PodTemplateSpec) {
}
}
func AddMeshContainer(spec *v1.PodTemplateSpec, configMapName string, c util.PodRouteConfig) {
func AddMeshContainer(spec *v1.PodTemplateSpec, nodeId string, c util.PodRouteConfig) {
// remove volume envoyConfig if already exist
for i := 0; i < len(spec.Spec.Volumes); i++ {
if spec.Spec.Volumes[i].Name == config.SidecarEnvoyConfig {
@@ -44,32 +45,22 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, configMapName string, c util.Pod
}
zero := int64(0)
t := true
spec.Spec.Volumes = append(spec.Spec.Volumes, v1.Volume{
Name: config.SidecarEnvoyConfig,
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: configMapName,
},
Items: []v1.KeyToPath{
{
Key: "base-envoy.yaml",
Path: "base-envoy.yaml",
},
{
Key: "envoy-config.yaml",
Path: "envoy-config.yaml",
},
},
},
},
})
spec.Spec.Containers = append(spec.Spec.Containers, v1.Container{
Name: config.SidecarVPN,
Image: config.ImageServer,
Command: []string{"/bin/sh", "-c"},
Args: []string{
"kubevpn serve -L 'tun://0.0.0.0:8421/" + c.TrafficManagerRealIP + ":8422?net=" + c.InboundPodTunIP + "&route=" + c.Route + "' --debug=true",
"sysctl net.ipv4.ip_forward=1;" +
"iptables -F;" +
"iptables -P INPUT ACCEPT;" +
"iptables -P FORWARD ACCEPT;" +
//"iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" +
//"iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" +
"iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" +
"iptables -t nat -A POSTROUTING -p tcp -m tcp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" +
"iptables -t nat -A PREROUTING -i eth0 -p udp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" +
"iptables -t nat -A POSTROUTING -p udp -m udp --dport 80:60000 ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" +
"kubevpn serve -L 'tun://0.0.0.0:8421/" + c.TrafficManagerRealIP + ":8422?net=" + c.InboundPodTunIP + "&route=" + c.Route + "' --debug=true",
},
SecurityContext: &v1.SecurityContext{
Capabilities: &v1.Capabilities{
@@ -96,19 +87,9 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, configMapName string, c util.Pod
spec.Spec.Containers = append(spec.Spec.Containers, v1.Container{
Name: config.SidecarEnvoyProxy,
Image: config.ImageMesh,
Command: []string{"/bin/sh", "-c"},
Command: []string{"envoy", "-l", "debug", "--config-yaml"},
Args: []string{
"sysctl net.ipv4.ip_forward=1;" +
"iptables -F;" +
"iptables -P INPUT ACCEPT;" +
"iptables -P FORWARD ACCEPT;" +
"iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j DNAT --to 127.0.0.1:15006;" +
"iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d " + config.CIDR.String() + " -j MASQUERADE;" +
"envoy -l debug -c /etc/envoy/base-envoy.yaml",
},
SecurityContext: &v1.SecurityContext{
RunAsUser: &zero,
Privileged: &t,
fmt.Sprintf(s, nodeId, nodeId /*c.TrafficManagerRealIP*/, "127.0.0.1"),
},
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
@@ -121,28 +102,37 @@ func AddMeshContainer(spec *v1.PodTemplateSpec, configMapName string, c util.Pod
},
},
ImagePullPolicy: v1.PullAlways,
VolumeMounts: []v1.VolumeMount{
{
Name: config.SidecarEnvoyConfig,
ReadOnly: false,
MountPath: "/etc/envoy/",
//SubPath: "envoy.yaml",
},
},
})
spec.Spec.Containers = append(spec.Spec.Containers, v1.Container{
Name: config.SidecarControlPlane,
Image: config.ImageControlPlane,
Command: []string{"envoy-xds-server"},
Args: []string{"--watchDirectoryFileName", "/etc/envoy-config/envoy-config.yaml"},
Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy-config.yaml"},
VolumeMounts: []v1.VolumeMount{
{
Name: "envoy-config",
ReadOnly: false,
MountPath: "/etc/envoy-config/",
//SubPath: "envoy.yaml",
Name: config.VolumeEnvoyConfig,
ReadOnly: true,
MountPath: "/etc/envoy",
},
},
ImagePullPolicy: v1.PullAlways,
})
f := false
spec.Spec.Volumes = append(spec.Spec.Volumes, v1.Volume{
Name: config.VolumeEnvoyConfig,
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: config.PodTrafficManager,
},
Items: []v1.KeyToPath{
{
Key: config.Envoy,
Path: "envoy-config.yaml",
},
},
Optional: &f,
},
},
})
}

View File

@@ -0,0 +1,61 @@
package mesh
var s = `
node:
cluster: %s
id: %s
admin:
access_log_path: /dev/null
address:
socket_address:
address: 0.0.0.0
port_value: 9003
dynamic_resources:
ads_config:
api_type: GRPC
transport_api_version: V3
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster
set_node_on_first_message_only: true
cds_config:
resource_api_version: V3
ads: {}
lds_config:
resource_api_version: V3
ads: {}
static_resources:
listeners:
- name: default_listener
address:
socket_address:
address: 0.0.0.0
port_value: 15006
use_original_dst: true
filter_chains:
- filters:
- name: envoy.filters.network.tcp_proxy
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
stat_prefix: tcp
cluster: origin_cluster
clusters:
- name: xds_cluster
connect_timeout: 2s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: xds_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: %s
port_value: 9002
http2_protocol_options: {}
- name: origin_cluster
connect_timeout: 5s
type: ORIGINAL_DST
lb_policy: CLUSTER_PROVIDED
`

View File

@@ -59,6 +59,7 @@ func CreateOutboundPod(clientset *kubernetes.Clientset, namespace string, traffi
args = append(args, "kubevpn serve -L tcp://:10800 -L tun://:8422?net="+trafficManagerIP+" --debug=true")
t := true
f := false
zero := int64(0)
manager = &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@@ -68,7 +69,23 @@ func CreateOutboundPod(clientset *kubernetes.Clientset, namespace string, traffi
Annotations: map[string]string{"ref-count": "1"},
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
Volumes: []v1.Volume{{
Name: config.VolumeEnvoyConfig,
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: config.PodTrafficManager,
},
Items: []v1.KeyToPath{
{
Key: config.Envoy,
Path: "envoy.yaml",
},
},
Optional: &f,
},
},
}},
Containers: []v1.Container{
{
Name: config.SidecarVPN,
@@ -97,7 +114,22 @@ func CreateOutboundPod(clientset *kubernetes.Clientset, namespace string, traffi
},
ImagePullPolicy: v1.PullAlways,
},
//{
// Name: config.SidecarControlPlane,
// Image: config.ImageControlPlane,
// Command: []string{"envoy-xds-server"},
// Args: []string{"--watchDirectoryFileName", "/etc/envoy/envoy.yaml"},
// VolumeMounts: []v1.VolumeMount{
// {
// Name: config.VolumeEnvoyConfig,
// ReadOnly: true,
// MountPath: "/etc/envoy",
// },
// },
// ImagePullPolicy: v1.PullAlways,
//},
},
RestartPolicy: v1.RestartPolicyAlways,
PriorityClassName: "system-cluster-critical",
},
}

View File

@@ -15,12 +15,9 @@ import (
dockerterm "github.com/moby/term"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/controlplane/apis/v1alpha1"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -334,32 +331,6 @@ func Ping(targetIP string) (bool, error) {
}
}
// ParseYaml takes in a yaml envoy config and returns a typed version
func ParseYaml(file string) (*v1alpha1.EnvoyConfig, error) {
var config v1alpha1.EnvoyConfig
yamlFile, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("Error reading YAML file: %s\n", err)
}
err = yaml.Unmarshal(yamlFile, &config)
if err != nil {
return nil, err
}
return &config, nil
}
func ParseYamlBytes(file []byte) (*v1alpha1.EnvoyConfig, error) {
var config v1alpha1.EnvoyConfig
err := yaml.Unmarshal(file, &config)
if err != nil {
return nil, err
}
return &config, nil
}
func RolloutStatus(factory cmdutil.Factory, namespace, workloads string, timeout time.Duration) error {
client, _ := factory.DynamicClient()
r := factory.NewBuilder().
@@ -489,12 +460,14 @@ func Heartbeats(ctx context.Context) {
}
func WaitPortToBeFree(port int, timeout time.Duration) error {
log.Infoln(fmt.Sprintf("wait port %v to be free...", port))
for {
select {
case <-time.Tick(timeout):
return fmt.Errorf("wait port %v to be free timeout", port)
case <-time.Tick(time.Second * 1):
case <-time.Tick(time.Second * 2):
if !IsPortListening(port) {
log.Infoln(fmt.Sprintf("port %v are free", port))
return nil
}
}