diff --git a/cmd/kubevpn/cmds/connect.go b/cmd/kubevpn/cmds/connect.go index 63a524d7..e523c0fd 100644 --- a/cmd/kubevpn/cmds/connect.go +++ b/cmd/kubevpn/cmds/connect.go @@ -19,6 +19,7 @@ func init() { connectCmd.Flags().StringVarP(&connect.Namespace, "namespace", "n", "", "namespace") connectCmd.PersistentFlags().StringArrayVar(&connect.Workloads, "workloads", []string{}, "workloads, like: services/tomcat, deployment/nginx, replicaset/tomcat...") connectCmd.Flags().StringVar((*string)(&connect.Mode), "mode", string(pkg.Reverse), "default mode is reverse") + connectCmd.Flags().StringToStringVarP(&connect.Headers, "headers", "H", map[string]string{}, "headers, format is k=v, like: k1=v1,k2=v2") connectCmd.Flags().BoolVar(&util.Debug, "debug", false, "true/false") RootCmd.AddCommand(connectCmd) } diff --git a/go.mod b/go.mod index 6f08c00b..8e748687 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( golang.zx2c4.com/wireguard/windows v0.4.10 google.golang.org/appengine v1.6.7 // indirect google.golang.org/grpc v1.33.2 + google.golang.org/protobuf v1.26.0 // indirect gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.21.2 k8s.io/apimachinery v0.21.2 diff --git a/pkg/connect.go b/pkg/connect.go index 001b0432..3cabe6b1 100644 --- a/pkg/connect.go +++ b/pkg/connect.go @@ -30,6 +30,7 @@ type ConnectOptions struct { KubeconfigPath string Namespace string Mode Mode + Headers map[string]string Workloads []string clientset *kubernetes.Clientset restclient *rest.RESTClient @@ -78,6 +79,7 @@ func (c *ConnectOptions) createRemoteInboundPod() (err error) { c.Namespace, finalWorkload, config, + c.Headers, ) } else { err = CreateInboundPod( diff --git a/pkg/envoy.go b/pkg/envoy.go index a92cb3cd..a2860b8b 100644 --- a/pkg/envoy.go +++ b/pkg/envoy.go @@ -5,8 +5,10 @@ import ( "fmt" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/wencaiwulue/kubevpn/pkg/envoy/apis/v1alpha1" "github.com/wencaiwulue/kubevpn/pkg/mesh" "github.com/wencaiwulue/kubevpn/util" + "gopkg.in/yaml.v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -14,6 +16,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" cmdutil "k8s.io/kubectl/pkg/cmd/util" + "strconv" "strings" ) @@ -22,7 +25,7 @@ import ( // patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 // TODO if using envoy needs to create another pod, if using diy proxy, using one container is enough // TODO support multiple port -func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string, c PodRouteConfig) error { +func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string, c PodRouteConfig, headers map[string]string) error { resourceTuple, parsed, err2 := util.SplitResourceTypeName(workloads) if !parsed || err2 != nil { return errors.New("not need") @@ -57,7 +60,12 @@ func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, name delete(labels, "pod-template-hash") name := fmt.Sprintf("%s-%s", namespace, resourceTuple.Name) - createEnvoyConfigMapIfNeeded(factory, clientset, namespace, workloads, c.LocalTunIP) + createEnvoyConfigMapIfNeeded(factory, clientset, namespace, workloads) + err = addEnvoyConfig(factory, clientset, namespace, workloads, c.LocalTunIP, headers) + if err != nil { + log.Warnln(err) + return err + } inject.Volumes = append(inject.Volumes, v1.Volume{ Name: "envoy-config", VolumeSource: v1.VolumeSource{ @@ -146,6 +154,21 @@ func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, name }, }, }) + inject.Containers = append(inject.Containers, v1.Container{ + Name: "control-plane", + Image: "naison/envoy-xds-server:latest", + Command: []string{"envoy-xds-server"}, + Args: []string{"--watchDirectoryFileName", "/etc/envoy-config/envoy-config.yaml"}, + VolumeMounts: []v1.VolumeMount{ + { + Name: "envoy-config", + ReadOnly: false, + MountPath: "/etc/envoy-config/", + //SubPath: "envoy.yaml", + }, + }, + ImagePullPolicy: v1.PullAlways, + }) if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { _, err = clientset.CoreV1().Pods(namespace).Create(context.TODO(), &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -225,8 +248,96 @@ admin: port_value: 9003 ` +var ss = `name: config-test +spec: + listeners: + - name: listener1 + address: 127.0.0.1 + port: 15006 + routes: + - name: route-0 + clusters: + - cluster-0 + clusters: + - name: cluster-0 + endpoints: + - address: 127.0.0.1 + port: %s +` -func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads, podIp string) { +func addEnvoyConfig(factory cmdutil.Factory, + clientset *kubernetes.Clientset, + namespace, + workloads, + localTUNIP string, + headers map[string]string, +) error { + resourceTuple, parsed, err := util.SplitResourceTypeName(workloads) + if !parsed || err != nil { + return errors.New("parse resource error") + } + object, err := util.GetUnstructuredObject(factory, namespace, workloads) + if err != nil { + return err + } + asSelector, _ := metav1.LabelSelectorAsSelector(util.GetLabelSelector(object.Object)) + serviceList, _ := clientset.CoreV1().Services(namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: asSelector.String(), + }) + if len(serviceList.Items) == 0 { + return errors.New("service list is empty") + } + port := serviceList.Items[0].Spec.Ports[0].Port + name := fmt.Sprintf("%s-%s", namespace, resourceTuple.Name) + get, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return err + } + s2, ok := get.Data["envoy-config.yaml"] + if !ok { + return errors.New("can not found value for key: envoy-config.yaml") + } + envoyConfig, err := util.ParseYamlBytes([]byte(s2)) + if err != nil { + return err + } + var headersMatch []v1alpha1.HeaderMatch + for k, v := range headers { + headersMatch = append(headersMatch, v1alpha1.HeaderMatch{ + Key: k, + Value: v, + }) + } + // move router to front + i := len(envoyConfig.Listeners[0].Routes) + index := strconv.Itoa(i) + envoyConfig.Listeners[0].Routes = append(envoyConfig.Listeners[0].Routes, v1alpha1.Route{ + Name: "route-" + index, + Headers: headersMatch, + ClusterNames: []string{"cluster-" + index}, + }) + // swap last element and the last second element + temp := envoyConfig.Listeners[0].Routes[i-1] + envoyConfig.Listeners[0].Routes[i-1] = envoyConfig.Listeners[0].Routes[i] + envoyConfig.Listeners[0].Routes[i] = temp + + envoyConfig.Clusters = append(envoyConfig.Clusters, v1alpha1.Cluster{ + Name: "cluster-" + index, + Endpoints: []v1alpha1.Endpoint{{ + Address: localTUNIP, + Port: uint32(port), + }}, + }) + marshal, err := yaml.Marshal(envoyConfig) + if err != nil { + return err + } + get.Data["envoy-config.yaml"] = string(marshal) + _, err = clientset.CoreV1().ConfigMaps(namespace).Update(context.TODO(), get, metav1.UpdateOptions{}) + return err +} + +func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string) { resourceTuple, parsed, err2 := util.SplitResourceTypeName(workloads) if !parsed || err2 != nil { return @@ -243,7 +354,7 @@ func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes if len(serviceList.Items) == 0 { return } - //port := serviceList.Items[0].Spec.Ports[0] + port := serviceList.Items[0].Spec.Ports[0] configMap := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -251,8 +362,8 @@ func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes Labels: map[string]string{"kubevpn": "kubevpn"}, }, Data: map[string]string{ - "base-envoy.yaml": fmt.Sprintf(s, /*"kubevpn", podIp, port.TargetPort.String(), port.TargetPort.String()*/), - "envoy-config.yaml": "", + "base-envoy.yaml": fmt.Sprintf(s /*"kubevpn", podIp, port.TargetPort.String(), port.TargetPort.String()*/), + "envoy-config.yaml": fmt.Sprintf(ss, port.TargetPort.String()), }, } _ = clientset.CoreV1().ConfigMaps(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) diff --git a/pkg/envoy/internal/processor/parser.go b/pkg/envoy/internal/processor/parser.go deleted file mode 100644 index a9b9ffe4..00000000 --- a/pkg/envoy/internal/processor/parser.go +++ /dev/null @@ -1,25 +0,0 @@ -package processor - -import ( - "fmt" - "github.com/wencaiwulue/kubevpn/pkg/envoy/apis/v1alpha1" - "gopkg.in/yaml.v2" - "io/ioutil" -) - -// parseYaml takes in a yaml envoy config and returns a typed version -func parseYaml(file string) (*v1alpha1.EnvoyConfig, error) { - var config v1alpha1.EnvoyConfig - - yamlFile, err := ioutil.ReadFile(file) - if err != nil { - return nil, fmt.Errorf("Error reading YAML file: %s\n", err) - } - - err = yaml.Unmarshal(yamlFile, &config) - if err != nil { - return nil, err - } - - return &config, nil -} diff --git a/pkg/envoy/internal/processor/parser_test.go b/pkg/envoy/internal/processor/parser_test.go new file mode 100644 index 00000000..f9bc564a --- /dev/null +++ b/pkg/envoy/internal/processor/parser_test.go @@ -0,0 +1,110 @@ +/* +* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. +* This source code is licensed under the Apache License Version 2.0. + */ + +package processor + +import ( + "fmt" + "github.com/wencaiwulue/kubevpn/pkg/envoy/apis/v1alpha1" + "github.com/wencaiwulue/kubevpn/util" + "gopkg.in/yaml.v2" + "testing" +) + +func TestName(t *testing.T) { + var config = v1alpha1.EnvoyConfig{ + Name: "config-test", + Spec: v1alpha1.Spec{ + Listeners: []v1alpha1.Listener{ + { + Name: "listener1", + Address: "127.0.0.1", + Port: 15006, + Routes: []v1alpha1.Route{ + { + Name: "route-0", + Headers: []v1alpha1.HeaderMatch{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + }, + ClusterNames: []string{"cluster0"}, + }, { + Name: "route-1", + Headers: []v1alpha1.HeaderMatch{ + { + Key: "c", + Value: "cc", + }, + { + Key: "d", + Value: "dd", + }, + }, + ClusterNames: []string{"cluster-1"}, + }, + }, + }, + }, + Clusters: []v1alpha1.Cluster{ + { + Name: "cluster-0", + Endpoints: []v1alpha1.Endpoint{ + { + Address: "127.0.0.1", + Port: 9101, + }, + { + Address: "127.0.0.1", + Port: 9102, + }, + }, + }, { + Name: "cluster-1", + Endpoints: []v1alpha1.Endpoint{ + { + Address: "127.0.0.1", + Port: 9103, + }, + { + Address: "127.0.0.1", + Port: 9104, + }, + }, + }, + }, + }, + } + marshal, _ := yaml.Marshal(config) + fmt.Println(string(marshal)) +} + +func TestName1(t *testing.T) { + parseYaml, err := util.ParseYamlBytes([]byte(sss)) + fmt.Println(err) + fmt.Println(parseYaml) +} + +var sss = `name: config-test +spec: + listeners: + - name: listener1 + address: 127.0.0.1 + port: 15006 + routes: + - name: route-0 + clusters: + - cluster-0 + clusters: + - name: cluster-0 + endpoints: + - address: 127.0.0.1 + port: 9080 +` diff --git a/pkg/envoy/internal/processor/processor.go b/pkg/envoy/internal/processor/processor.go index 7b18a9b1..6a111309 100644 --- a/pkg/envoy/internal/processor/processor.go +++ b/pkg/envoy/internal/processor/processor.go @@ -7,6 +7,7 @@ import ( "github.com/wencaiwulue/kubevpn/pkg/envoy/internal/resources" "github.com/wencaiwulue/kubevpn/pkg/envoy/internal/watcher" "github.com/wencaiwulue/kubevpn/pkg/envoy/internal/xdscache" + "github.com/wencaiwulue/kubevpn/util" "math" "math/rand" "os" @@ -58,7 +59,7 @@ func (p *Processor) newSnapshotVersion() string { func (p *Processor) ProcessFile(file watcher.NotifyMessage) { // Parse file into object - envoyConfig, err := parseYaml(file.FilePath) + envoyConfig, err := util.ParseYaml(file.FilePath) if err != nil { p.Errorf("error parsing yaml file: %+v", err) return diff --git a/pkg/envoy/internal/resources/resource.go b/pkg/envoy/internal/resources/resource.go index 5c9b66ba..2e50fcc4 100644 --- a/pkg/envoy/internal/resources/resource.go +++ b/pkg/envoy/internal/resources/resource.go @@ -1,6 +1,7 @@ package resources import ( + "google.golang.org/protobuf/types/known/durationpb" "time" "github.com/golang/protobuf/ptypes" @@ -15,15 +16,10 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/wellknown" ) -const ( - UpstreamHost = "www.envoyproxy.io" - UpstreamPort = 80 -) - func MakeCluster(clusterName string) *cluster.Cluster { return &cluster.Cluster{ Name: clusterName, - ConnectTimeout: ptypes.DurationProto(5 * time.Second), + ConnectTimeout: durationpb.New(5 * time.Second), ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, LbPolicy: cluster.Cluster_ROUND_ROBIN, //LoadAssignment: makeEndpoint(clusterName, UpstreamHost), diff --git a/util/util.go b/util/util.go index cb683933..825cf8c3 100644 --- a/util/util.go +++ b/util/util.go @@ -8,9 +8,12 @@ import ( dockerterm "github.com/moby/term" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/wencaiwulue/kubevpn/pkg/envoy/apis/v1alpha1" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" + "gopkg.in/yaml.v2" "io" + "io/ioutil" autoscalingv1 "k8s.io/api/autoscaling/v1" "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -390,3 +393,29 @@ func Ping(targetIP string) (bool, error) { return false, nil } } + +// ParseYaml takes in a yaml envoy config and returns a typed version +func ParseYaml(file string) (*v1alpha1.EnvoyConfig, error) { + var config v1alpha1.EnvoyConfig + + yamlFile, err := ioutil.ReadFile(file) + if err != nil { + return nil, fmt.Errorf("Error reading YAML file: %s\n", err) + } + + err = yaml.Unmarshal(yamlFile, &config) + if err != nil { + return nil, err + } + + return &config, nil +} + +func ParseYamlBytes(file []byte) (*v1alpha1.EnvoyConfig, error) { + var config v1alpha1.EnvoyConfig + err := yaml.Unmarshal(file, &config) + if err != nil { + return nil, err + } + return &config, nil +}