optimize envoy control plane

This commit is contained in:
p_caiwfeng
2021-12-27 22:12:48 +08:00
committed by wencaiwulue
parent b8eb1a2eb7
commit 52eade0df2
9 changed files with 264 additions and 38 deletions

View File

@@ -19,6 +19,7 @@ func init() {
connectCmd.Flags().StringVarP(&connect.Namespace, "namespace", "n", "", "namespace")
connectCmd.PersistentFlags().StringArrayVar(&connect.Workloads, "workloads", []string{}, "workloads, like: services/tomcat, deployment/nginx, replicaset/tomcat...")
connectCmd.Flags().StringVar((*string)(&connect.Mode), "mode", string(pkg.Reverse), "default mode is reverse")
connectCmd.Flags().StringToStringVarP(&connect.Headers, "headers", "H", map[string]string{}, "headers, format is k=v, like: k1=v1,k2=v2")
connectCmd.Flags().BoolVar(&util.Debug, "debug", false, "true/false")
RootCmd.AddCommand(connectCmd)
}

1
go.mod
View File

@@ -27,6 +27,7 @@ require (
golang.zx2c4.com/wireguard/windows v0.4.10
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.2

View File

@@ -30,6 +30,7 @@ type ConnectOptions struct {
KubeconfigPath string
Namespace string
Mode Mode
Headers map[string]string
Workloads []string
clientset *kubernetes.Clientset
restclient *rest.RESTClient
@@ -78,6 +79,7 @@ func (c *ConnectOptions) createRemoteInboundPod() (err error) {
c.Namespace,
finalWorkload,
config,
c.Headers,
)
} else {
err = CreateInboundPod(

View File

@@ -5,8 +5,10 @@ import (
"fmt"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/envoy/apis/v1alpha1"
"github.com/wencaiwulue/kubevpn/pkg/mesh"
"github.com/wencaiwulue/kubevpn/util"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -14,6 +16,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"strconv"
"strings"
)
@@ -22,7 +25,7 @@ import (
// patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1
// TODO if using envoy needs to create another pod, if using diy proxy, using one container is enough
// TODO support multiple port
func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string, c PodRouteConfig) error {
func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string, c PodRouteConfig, headers map[string]string) error {
resourceTuple, parsed, err2 := util.SplitResourceTypeName(workloads)
if !parsed || err2 != nil {
return errors.New("not need")
@@ -57,7 +60,12 @@ func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, name
delete(labels, "pod-template-hash")
name := fmt.Sprintf("%s-%s", namespace, resourceTuple.Name)
createEnvoyConfigMapIfNeeded(factory, clientset, namespace, workloads, c.LocalTunIP)
createEnvoyConfigMapIfNeeded(factory, clientset, namespace, workloads)
err = addEnvoyConfig(factory, clientset, namespace, workloads, c.LocalTunIP, headers)
if err != nil {
log.Warnln(err)
return err
}
inject.Volumes = append(inject.Volumes, v1.Volume{
Name: "envoy-config",
VolumeSource: v1.VolumeSource{
@@ -146,6 +154,21 @@ func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, name
},
},
})
inject.Containers = append(inject.Containers, v1.Container{
Name: "control-plane",
Image: "naison/envoy-xds-server:latest",
Command: []string{"envoy-xds-server"},
Args: []string{"--watchDirectoryFileName", "/etc/envoy-config/envoy-config.yaml"},
VolumeMounts: []v1.VolumeMount{
{
Name: "envoy-config",
ReadOnly: false,
MountPath: "/etc/envoy-config/",
//SubPath: "envoy.yaml",
},
},
ImagePullPolicy: v1.PullAlways,
})
if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err = clientset.CoreV1().Pods(namespace).Create(context.TODO(), &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
@@ -225,8 +248,96 @@ admin:
port_value: 9003
`
var ss = `name: config-test
spec:
listeners:
- name: listener1
address: 127.0.0.1
port: 15006
routes:
- name: route-0
clusters:
- cluster-0
clusters:
- name: cluster-0
endpoints:
- address: 127.0.0.1
port: %s
`
func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads, podIp string) {
func addEnvoyConfig(factory cmdutil.Factory,
clientset *kubernetes.Clientset,
namespace,
workloads,
localTUNIP string,
headers map[string]string,
) error {
resourceTuple, parsed, err := util.SplitResourceTypeName(workloads)
if !parsed || err != nil {
return errors.New("parse resource error")
}
object, err := util.GetUnstructuredObject(factory, namespace, workloads)
if err != nil {
return err
}
asSelector, _ := metav1.LabelSelectorAsSelector(util.GetLabelSelector(object.Object))
serviceList, _ := clientset.CoreV1().Services(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: asSelector.String(),
})
if len(serviceList.Items) == 0 {
return errors.New("service list is empty")
}
port := serviceList.Items[0].Spec.Ports[0].Port
name := fmt.Sprintf("%s-%s", namespace, resourceTuple.Name)
get, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
s2, ok := get.Data["envoy-config.yaml"]
if !ok {
return errors.New("can not found value for key: envoy-config.yaml")
}
envoyConfig, err := util.ParseYamlBytes([]byte(s2))
if err != nil {
return err
}
var headersMatch []v1alpha1.HeaderMatch
for k, v := range headers {
headersMatch = append(headersMatch, v1alpha1.HeaderMatch{
Key: k,
Value: v,
})
}
// move router to front
i := len(envoyConfig.Listeners[0].Routes)
index := strconv.Itoa(i)
envoyConfig.Listeners[0].Routes = append(envoyConfig.Listeners[0].Routes, v1alpha1.Route{
Name: "route-" + index,
Headers: headersMatch,
ClusterNames: []string{"cluster-" + index},
})
// swap last element and the last second element
temp := envoyConfig.Listeners[0].Routes[i-1]
envoyConfig.Listeners[0].Routes[i-1] = envoyConfig.Listeners[0].Routes[i]
envoyConfig.Listeners[0].Routes[i] = temp
envoyConfig.Clusters = append(envoyConfig.Clusters, v1alpha1.Cluster{
Name: "cluster-" + index,
Endpoints: []v1alpha1.Endpoint{{
Address: localTUNIP,
Port: uint32(port),
}},
})
marshal, err := yaml.Marshal(envoyConfig)
if err != nil {
return err
}
get.Data["envoy-config.yaml"] = string(marshal)
_, err = clientset.CoreV1().ConfigMaps(namespace).Update(context.TODO(), get, metav1.UpdateOptions{})
return err
}
func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string) {
resourceTuple, parsed, err2 := util.SplitResourceTypeName(workloads)
if !parsed || err2 != nil {
return
@@ -243,7 +354,7 @@ func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes
if len(serviceList.Items) == 0 {
return
}
//port := serviceList.Items[0].Spec.Ports[0]
port := serviceList.Items[0].Spec.Ports[0]
configMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@@ -251,8 +362,8 @@ func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes
Labels: map[string]string{"kubevpn": "kubevpn"},
},
Data: map[string]string{
"base-envoy.yaml": fmt.Sprintf(s, /*"kubevpn", podIp, port.TargetPort.String(), port.TargetPort.String()*/),
"envoy-config.yaml": "",
"base-envoy.yaml": fmt.Sprintf(s /*"kubevpn", podIp, port.TargetPort.String(), port.TargetPort.String()*/),
"envoy-config.yaml": fmt.Sprintf(ss, port.TargetPort.String()),
},
}
_ = clientset.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})

View File

@@ -1,25 +0,0 @@
package processor
import (
"fmt"
"github.com/wencaiwulue/kubevpn/pkg/envoy/apis/v1alpha1"
"gopkg.in/yaml.v2"
"io/ioutil"
)
// parseYaml takes in a yaml envoy config and returns a typed version
func parseYaml(file string) (*v1alpha1.EnvoyConfig, error) {
var config v1alpha1.EnvoyConfig
yamlFile, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("Error reading YAML file: %s\n", err)
}
err = yaml.Unmarshal(yamlFile, &config)
if err != nil {
return nil, err
}
return &config, nil
}

View File

@@ -0,0 +1,110 @@
/*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
* This source code is licensed under the Apache License Version 2.0.
*/
package processor
import (
"fmt"
"github.com/wencaiwulue/kubevpn/pkg/envoy/apis/v1alpha1"
"github.com/wencaiwulue/kubevpn/util"
"gopkg.in/yaml.v2"
"testing"
)
func TestName(t *testing.T) {
var config = v1alpha1.EnvoyConfig{
Name: "config-test",
Spec: v1alpha1.Spec{
Listeners: []v1alpha1.Listener{
{
Name: "listener1",
Address: "127.0.0.1",
Port: 15006,
Routes: []v1alpha1.Route{
{
Name: "route-0",
Headers: []v1alpha1.HeaderMatch{
{
Key: "a",
Value: "aa",
},
{
Key: "b",
Value: "bb",
},
},
ClusterNames: []string{"cluster0"},
}, {
Name: "route-1",
Headers: []v1alpha1.HeaderMatch{
{
Key: "c",
Value: "cc",
},
{
Key: "d",
Value: "dd",
},
},
ClusterNames: []string{"cluster-1"},
},
},
},
},
Clusters: []v1alpha1.Cluster{
{
Name: "cluster-0",
Endpoints: []v1alpha1.Endpoint{
{
Address: "127.0.0.1",
Port: 9101,
},
{
Address: "127.0.0.1",
Port: 9102,
},
},
}, {
Name: "cluster-1",
Endpoints: []v1alpha1.Endpoint{
{
Address: "127.0.0.1",
Port: 9103,
},
{
Address: "127.0.0.1",
Port: 9104,
},
},
},
},
},
}
marshal, _ := yaml.Marshal(config)
fmt.Println(string(marshal))
}
func TestName1(t *testing.T) {
parseYaml, err := util.ParseYamlBytes([]byte(sss))
fmt.Println(err)
fmt.Println(parseYaml)
}
var sss = `name: config-test
spec:
listeners:
- name: listener1
address: 127.0.0.1
port: 15006
routes:
- name: route-0
clusters:
- cluster-0
clusters:
- name: cluster-0
endpoints:
- address: 127.0.0.1
port: 9080
`

View File

@@ -7,6 +7,7 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/envoy/internal/resources"
"github.com/wencaiwulue/kubevpn/pkg/envoy/internal/watcher"
"github.com/wencaiwulue/kubevpn/pkg/envoy/internal/xdscache"
"github.com/wencaiwulue/kubevpn/util"
"math"
"math/rand"
"os"
@@ -58,7 +59,7 @@ func (p *Processor) newSnapshotVersion() string {
func (p *Processor) ProcessFile(file watcher.NotifyMessage) {
// Parse file into object
envoyConfig, err := parseYaml(file.FilePath)
envoyConfig, err := util.ParseYaml(file.FilePath)
if err != nil {
p.Errorf("error parsing yaml file: %+v", err)
return

View File

@@ -1,6 +1,7 @@
package resources
import (
"google.golang.org/protobuf/types/known/durationpb"
"time"
"github.com/golang/protobuf/ptypes"
@@ -15,15 +16,10 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
)
const (
UpstreamHost = "www.envoyproxy.io"
UpstreamPort = 80
)
func MakeCluster(clusterName string) *cluster.Cluster {
return &cluster.Cluster{
Name: clusterName,
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
//LoadAssignment: makeEndpoint(clusterName, UpstreamHost),

View File

@@ -8,9 +8,12 @@ import (
dockerterm "github.com/moby/term"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/envoy/apis/v1alpha1"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
autoscalingv1 "k8s.io/api/autoscaling/v1"
"k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -390,3 +393,29 @@ func Ping(targetIP string) (bool, error) {
return false, nil
}
}
// ParseYaml takes in a yaml envoy config and returns a typed version
func ParseYaml(file string) (*v1alpha1.EnvoyConfig, error) {
var config v1alpha1.EnvoyConfig
yamlFile, err := ioutil.ReadFile(file)
if err != nil {
return nil, fmt.Errorf("Error reading YAML file: %s\n", err)
}
err = yaml.Unmarshal(yamlFile, &config)
if err != nil {
return nil, err
}
return &config, nil
}
func ParseYamlBytes(file []byte) (*v1alpha1.EnvoyConfig, error) {
var config v1alpha1.EnvoyConfig
err := yaml.Unmarshal(file, &config)
if err != nil {
return nil, err
}
return &config, nil
}