feat: aws fargate mode works

This commit is contained in:
fengcaiwen
2025-01-18 11:10:20 +08:00
parent 2e96247e74
commit 12920650ba
17 changed files with 416 additions and 180 deletions

View File

@@ -32,6 +32,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
@@ -50,10 +51,9 @@ type ContainerPort struct {
Name string `json:"name,omitempty"`
// Number of port to expose on the host.
// If specified, this must be a valid port number, 0 < x < 65536.
// If HostNetwork is specified, this must match ContainerPort.
// Most containers do not need this.
// envoy listener port, if is not 0, means fargate mode
// +optional
InnerPort int32 `json:"hostPort,omitempty"`
EnvoyListenerPort int32 `json:"envoyListenerPort,omitempty"`
// Number of port to expose on the pod's IP address.
// This must be a valid port number, 0 < x < 65536.
ContainerPort int32 `json:"containerPort"`
@@ -68,10 +68,10 @@ func ConvertContainerPort(ports ...corev1.ContainerPort) []ContainerPort {
var result []ContainerPort
for _, port := range ports {
result = append(result, ContainerPort{
Name: port.Name,
InnerPort: 0,
ContainerPort: port.ContainerPort,
Protocol: port.Protocol,
Name: port.Name,
EnvoyListenerPort: 0,
ContainerPort: port.ContainerPort,
Protocol: port.Protocol,
})
}
return result
@@ -81,12 +81,12 @@ type Rule struct {
Headers map[string]string
LocalTunIPv4 string
LocalTunIPv6 string
// for no privileged mode, don't have cap NET_ADMIN and privileged: true. so we can not use OSI layer 3 proxy
// port -> port1:port2
// port1 for envoy forward to localhost:port1
// port2 for local pc listen localhost:port2
// use ssh reverse tunnel, envoy rule endpoint localhost:port1 will forward to local pc localhost:port2
// port1 is required and port2 is optional
// for no privileged mode (AWS Fargate mode), don't have cap NET_ADMIN and privileged: true. so we can not use OSI layer 3 proxy
// containerPort -> envoyRulePort:localPort
// envoyRulePort for envoy forward to localhost:envoyRulePort
// localPort for local pc listen localhost:localPort
// use ssh reverse tunnel, envoy rule endpoint localhost:envoyRulePort will forward to local pc localhost:localPort
// localPort is required and envoyRulePort is optional
PortMap map[int32]string
}
@@ -98,11 +98,11 @@ func (a *Virtual) To(enableIPv6 bool) (
) {
//clusters = append(clusters, OriginCluster())
for _, port := range a.Ports {
isFargateMode := port.InnerPort != 0
isFargateMode := port.EnvoyListenerPort != 0
listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, util.If(isFargateMode, port.InnerPort, port.ContainerPort), port.Protocol)
listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, util.If(isFargateMode, port.EnvoyListenerPort, port.ContainerPort), port.Protocol)
routeName := listenerName
listeners = append(listeners, ToListener(listenerName, routeName, util.If(isFargateMode, port.InnerPort, port.ContainerPort), port.Protocol, isFargateMode))
listeners = append(listeners, ToListener(listenerName, routeName, util.If(isFargateMode, port.EnvoyListenerPort, port.ContainerPort), port.Protocol, isFargateMode))
var rr []*route.Route
for _, rule := range a.Rules {
@@ -120,23 +120,32 @@ func (a *Virtual) To(enableIPv6 bool) (
logrus.Errorf("fargate mode port should have two pair")
}
}
p, _ := strconv.Atoi(ports)
envoyRulePort, _ := strconv.Atoi(ports)
for _, ip := range ips {
clusterName := fmt.Sprintf("%s_%v", ip, p)
clusterName := fmt.Sprintf("%s_%v", ip, envoyRulePort)
clusters = append(clusters, ToCluster(clusterName))
endpoints = append(endpoints, ToEndPoint(clusterName, ip, int32(p)))
endpoints = append(endpoints, ToEndPoint(clusterName, ip, int32(envoyRulePort)))
rr = append(rr, ToRoute(clusterName, rule.Headers))
if isFargateMode {
defaultClusterName := fmt.Sprintf("%s_%v", ip, port.ContainerPort)
clusters = append(clusters, ToCluster(defaultClusterName))
endpoints = append(endpoints, ToEndPoint(defaultClusterName, ip, port.ContainerPort))
rr = append(rr, DefaultRouteToCluster(defaultClusterName))
}
}
}
// if isFargateMode is true, already add default route, no long to add it
if !isFargateMode {
// if isFargateMode is true, needs to add default route to container port, because use_original_dst not work
if isFargateMode {
// all ips should is IPv4 127.0.0.1 and ::1
var ips = sets.New[string]()
for _, rule := range a.Rules {
if enableIPv6 {
ips.Insert(rule.LocalTunIPv4, rule.LocalTunIPv6)
} else {
ips.Insert(rule.LocalTunIPv4)
}
}
for _, ip := range ips.UnsortedList() {
defaultClusterName := fmt.Sprintf("%s_%v", ip, port.ContainerPort)
clusters = append(clusters, ToCluster(defaultClusterName))
endpoints = append(endpoints, ToEndPoint(defaultClusterName, ip, port.ContainerPort))
rr = append(rr, DefaultRouteToCluster(defaultClusterName))
}
} else {
rr = append(rr, DefaultRoute())
clusters = append(clusters, OriginCluster())
}

View File

@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"io"
defaultlog "log"
golog "log"
"time"
log "github.com/sirupsen/logrus"
@@ -63,7 +63,7 @@ func (svr *Server) Connect(req *rpc.ConnectRequest, resp rpc.Daemon_ConnectServe
var sshConf = ssh.ParseSshFromRPC(req.SshJump)
var transferImage = req.TransferImage
defaultlog.Default().SetOutput(io.Discard)
golog.Default().SetOutput(io.Discard)
if transferImage {
err := ssh.TransferImage(ctx, sshConf, config.OriginImage, req.Image, out)
if err != nil {

View File

@@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/controlplane"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/inject"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
@@ -32,8 +33,19 @@ func (svr *Server) Leave(req *rpc.LeaveRequest, resp rpc.Daemon_LeaveServer) err
maps := svr.connect.GetClientset().CoreV1().ConfigMaps(namespace)
v4, _ := svr.connect.GetLocalTunIP()
for _, workload := range req.GetWorkloads() {
object, err := util.GetUnstructuredObject(factory, namespace, workload)
if err != nil {
log.Errorf("Failed to get unstructured object: %v", err)
return err
}
svr.connect.PortMapper.Stop(workload)
// add rollback func to remove envoy config
err := inject.UnPatchContainer(factory, maps, namespace, workload, v4)
err = inject.UnPatchContainer(factory, maps, object, func(isFargateMode bool, rule *controlplane.Rule) bool {
if isFargateMode {
return svr.connect.PortMapper.IsMe(util.ConvertWorkloadToUid(workload), rule.Headers)
}
return rule.LocalTunIPv4 == v4
})
if err != nil {
log.Errorf("Leaving workload %s failed: %v", workload, err)
continue

View File

@@ -3,7 +3,6 @@ package action
import (
"context"
"fmt"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
@@ -12,6 +11,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/controlplane"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (svr *Server) List(ctx context.Context, req *rpc.ListRequest) (*rpc.ListResponse, error) {
@@ -30,9 +30,8 @@ func (svr *Server) List(ctx context.Context, req *rpc.ListRequest) (*rpc.ListRes
}
}
for _, virtual := range v {
// deployments.apps.ry-server --> deployments.apps/ry-server
lastIndex := strings.LastIndex(virtual.Uid, ".")
virtual.Uid = virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:]
// deployments.apps.productpage --> deployments.apps/productpage
virtual.Uid = util.ConvertUidToWorkload(virtual.Uid)
}
bytes, err := k8syaml.Marshal(v)
if err != nil {

View File

@@ -129,7 +129,7 @@ func (svr *Server) Proxy(req *rpc.ConnectRequest, resp rpc.Daemon_ProxyServer) (
log.SetOutput(out)
}
svr.connect.Workloads = req.Workloads
svr.connect.Workloads = connect.Workloads
svr.connect.Headers = req.Headers
svr.connect.PortMap = req.PortMap
err = svr.connect.CreateRemoteInboundPod(ctx)

View File

@@ -97,8 +97,13 @@ func gen(connect *handler.ConnectOptions, clone *handler.CloneOptions) ([]*rpc.P
v4, v6 := connect.GetLocalTunIP()
for _, virtual := range v {
// deployments.apps.ry-server --> deployments.apps/ry-server
lastIndex := strings.LastIndex(virtual.Uid, ".")
virtual.Uid = virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:]
virtual.Uid = util.ConvertUidToWorkload(virtual.Uid)
var isFargateMode bool
for _, port := range virtual.Ports {
if port.EnvoyListenerPort != 0 {
isFargateMode = true
}
}
var proxyRule []*rpc.ProxyRule
for _, rule := range virtual.Rules {
@@ -106,7 +111,7 @@ func gen(connect *handler.ConnectOptions, clone *handler.CloneOptions) ([]*rpc.P
Headers: rule.Headers,
LocalTunIPv4: rule.LocalTunIPv4,
LocalTunIPv6: rule.LocalTunIPv6,
CurrentDevice: v4 == rule.LocalTunIPv4 && v6 == rule.LocalTunIPv6,
CurrentDevice: util.If(isFargateMode, connect.PortMapper.IsMe(util.ConvertWorkloadToUid(virtual.Uid), rule.Headers), v4 == rule.LocalTunIPv4 && v6 == rule.LocalTunIPv6),
PortMap: useSecondPort(rule.PortMap),
})
}

View File

@@ -23,13 +23,16 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
pkgtypes "k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/cli-runtime/pkg/resource"
runtimeresource "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/cmd/set"
@@ -79,6 +82,7 @@ type ConnectOptions struct {
extraHost []dns.Entry
once sync.Once
tunName string
PortMapper *inject.Mapper
}
func (c *ConnectOptions) Context() context.Context {
@@ -151,15 +155,31 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context) (err error)
LocalTunIPv4: c.localTunIPv4.IP.String(),
LocalTunIPv6: c.localTunIPv6.IP.String(),
}
var object *runtimeresource.Info
object, err = util.GetUnstructuredObject(c.factory, c.Namespace, workload)
if err != nil {
return err
}
var templateSpec *v1.PodTemplateSpec
templateSpec, _, err = util.GetPodTemplateSpecPath(object.Object.(*unstructured.Unstructured))
if err != nil {
return
}
// todo consider to use ephemeral container
// https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/
// means mesh mode
if c.Engine == config.EngineGvisor {
err = inject.InjectEnvoySidecar(ctx, c.factory, c.clientset, c.Namespace, workload, c.Headers, c.PortMap)
err = inject.InjectEnvoySidecar(ctx, c.factory, c.clientset, c.Namespace, workload, object, c.Headers, c.PortMap)
if err != nil {
return err
}
// 3) ssh reverse tunnel eg: "ssh -o StrictHostKeychecking=no -fNR remote:33333:localhost:44444 root@127.0.0.1 -p 2222"
c.PortMapper = inject.NewMapper(c.clientset, c.Namespace, labels.SelectorFromSet(templateSpec.Labels).String(), c.Headers, c.Workloads)
go c.PortMapper.Run()
} else if len(c.Headers) != 0 || len(c.PortMap) != 0 {
err = inject.InjectVPNAndEnvoySidecar(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, configInfo, c.Headers, c.PortMap)
err = inject.InjectVPNAndEnvoySidecar(ctx, c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, workload, object, configInfo, c.Headers, c.PortMap)
} else {
err = inject.InjectVPNSidecar(ctx, c.factory, c.Namespace, workload, configInfo)
err = inject.InjectVPNSidecar(ctx, c.factory, c.Namespace, workload, object, configInfo)
}
if err != nil {
log.Errorf("Injecting inbound sidecar for %s failed: %s", workload, err.Error())
@@ -688,7 +708,7 @@ func (c *ConnectOptions) PreCheckResource() error {
}
var resources []string
for _, info := range list {
resources = append(resources, fmt.Sprintf("%s/%s", info.Mapping.GroupVersionKind.GroupKind().String(), info.Name))
resources = append(resources, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
}
c.Workloads = resources
@@ -696,7 +716,7 @@ func (c *ConnectOptions) PreCheckResource() error {
for i, workload := range c.Workloads {
ownerReference, err := util.GetTopOwnerReference(c.factory, c.Namespace, workload)
if err == nil {
c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.GroupVersionKind.GroupKind().String(), ownerReference.Name)
c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name)
}
}
// service which associate with pod
@@ -720,7 +740,7 @@ func (c *ConnectOptions) PreCheckResource() error {
if err == nil && list != nil && len(list.Items) != 0 {
ownerReference, err := util.GetTopOwnerReference(c.factory, c.Namespace, fmt.Sprintf("%s/%s", "pods", list.Items[0].Name))
if err == nil {
c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.GroupVersionKind.GroupKind().String(), ownerReference.Name)
c.Workloads[i] = fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name)
}
} else
// if list is empty, means not create pods, just controllers

View File

@@ -72,7 +72,7 @@ func resetConfigMap(ctx context.Context, mapInterface v1.ConfigMapInterface, wor
}
ws := sets.New[string]()
for _, workload := range workloads {
ws.Insert(ConvertWorkloadToUid(workload))
ws.Insert(util.ConvertWorkloadToUid(workload))
}
for i := 0; i < len(v); i++ {
@@ -91,22 +91,6 @@ func resetConfigMap(ctx context.Context, mapInterface v1.ConfigMapInterface, wor
return err
}
// ConvertUidToWorkload
// deployments.apps.ry-server --> deployments.apps/ry-server
func ConvertUidToWorkload(uid string) string {
lastIndex := strings.LastIndex(uid, ".")
workload := uid[:lastIndex] + "/" + uid[lastIndex+1:]
return workload
}
// ConvertWorkloadToUid
// deployments.apps/ry-server --> deployments.apps.ry-server
func ConvertWorkloadToUid(workload string) string {
lastIndex := strings.LastIndex(workload, "/")
uid := workload[:lastIndex] + "." + workload[lastIndex+1:]
return uid
}
func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workload string) error {
object, err := util.GetUnstructuredObject(factory, namespace, workload)
if err != nil {
@@ -142,7 +126,7 @@ func removeInjectContainer(ctx context.Context, factory cmdutil.Factory, clients
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
if err != nil {
log.Errorf("Failed to patch resource: %s %s: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
log.Errorf("Failed to patch resource: %s %s: %v", object.Mapping.Resource.Resource, object.Name, err)
return err
}

View File

@@ -2,7 +2,6 @@ package handler
import (
"context"
"strings"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
@@ -16,6 +15,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/controlplane"
"github.com/wencaiwulue/kubevpn/v2/pkg/inject"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
// Uninstall
@@ -101,11 +101,21 @@ func (c *ConnectOptions) LeaveProxyResources(ctx context.Context) (err error) {
}
// deployments.apps.ry-server --> deployments.apps/ry-server
lastIndex := strings.LastIndex(virtual.Uid, ".")
uid := virtual.Uid[:lastIndex] + "/" + virtual.Uid[lastIndex+1:]
err = inject.UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), c.Namespace, uid, v4)
workload := util.ConvertUidToWorkload(virtual.Uid)
object, err := util.GetUnstructuredObject(c.factory, c.Namespace, workload)
if err != nil {
log.Errorf("Failed to leave workload %s: %v", uid, err)
log.Errorf("Failed to get unstructured object: %v", err)
return err
}
c.PortMapper.Stop(workload)
err = inject.UnPatchContainer(c.factory, c.clientset.CoreV1().ConfigMaps(c.Namespace), object, func(isFargateMode bool, rule *controlplane.Rule) bool {
if isFargateMode {
return c.PortMapper.IsMe(virtual.Uid, rule.Headers)
}
return rule.LocalTunIPv4 == v4
})
if err != nil {
log.Errorf("Failed to leave workload %s: %v", workload, err)
continue
}
}

View File

@@ -0,0 +1,60 @@
package inject
import (
"net/netip"
"reflect"
"testing"
"github.com/wencaiwulue/kubevpn/v2/pkg/controlplane"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func TestAddVirtualRule(t *testing.T) {
testdatas := []struct {
Rule []*controlplane.Virtual
Ports []controlplane.ContainerPort
Headers map[string]string
TunIP util.PodRouteConfig
Uid string
PortMap map[int32]string
Expect []*controlplane.Virtual
}{
{
Ports: []controlplane.ContainerPort{
{
EnvoyListenerPort: 15006,
ContainerPort: 9080,
},
},
TunIP: util.PodRouteConfig{
LocalTunIPv4: "127.0.0.1",
LocalTunIPv6: netip.IPv6Loopback().String(),
},
Uid: "deployments.authors",
Expect: []*controlplane.Virtual{
{
Uid: "deployments.authors",
Ports: []controlplane.ContainerPort{
{
EnvoyListenerPort: 15006,
ContainerPort: 9080,
},
},
Rules: []*controlplane.Rule{{
Headers: nil,
LocalTunIPv4: "127.0.0.1",
LocalTunIPv6: netip.IPv6Loopback().String(),
PortMap: nil,
}},
},
},
},
}
for _, data := range testdatas {
rule := addVirtualRule(data.Rule, data.Uid, data.Ports, data.Headers, data.TunIP, nil)
if !reflect.DeepEqual(rule, data.Expect) {
t.FailNow()
}
}
}

View File

@@ -3,8 +3,9 @@ package inject
import (
"context"
"fmt"
"net"
"net/netip"
"reflect"
"strconv"
"strings"
"sync"
"time"
@@ -21,8 +22,8 @@ import (
pkgresource "k8s.io/cli-runtime/pkg/resource"
runtimeresource "k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/yaml"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/controlplane"
@@ -32,13 +33,7 @@ import (
// InjectEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1
// https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio
func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workload string, headers map[string]string, portMaps []string) (err error) {
var object *runtimeresource.Info
object, err = util.GetUnstructuredObject(f, namespace, workload)
if err != nil {
return err
}
func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workload string, object *runtimeresource.Info, headers map[string]string, portMap []string) (err error) {
u := object.Object.(*unstructured.Unstructured)
var templateSpec *v1.PodTemplateSpec
var path []string
@@ -50,12 +45,12 @@ func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kuber
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
c := util.PodRouteConfig{LocalTunIPv4: "127.0.0.1", LocalTunIPv6: netip.IPv6Loopback().String()}
ports, portmap, containerPort2EnvoyRulePort := getPort(templateSpec, portMaps)
ports, portmap := GetPort(templateSpec, portMap)
port := controlplane.ConvertContainerPort(ports...)
var containerPort2EnvoyListenerPort = make(map[int32]int32)
for i := range len(port) {
randomPort, _ := util.GetAvailableTCPPortOrDie()
port[i].InnerPort = int32(randomPort)
port[i].EnvoyListenerPort = int32(randomPort)
containerPort2EnvoyListenerPort[port[i].ContainerPort] = int32(randomPort)
}
err = addEnvoyConfig(clientset.CoreV1().ConfigMaps(namespace), nodeID, c, headers, port, portmap)
@@ -71,7 +66,7 @@ func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kuber
}
if containerNames.HasAll(config.ContainerSidecarVPN, config.ContainerSidecarEnvoyProxy) {
log.Infof("Workload %s/%s has already been injected with sidecar", namespace, workload)
//return nil
return
}
enableIPv6, _ := util.DetectPodSupportIPv6(ctx, f, namespace)
@@ -92,7 +87,7 @@ func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kuber
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
if err != nil {
log.Errorf("Failed to patch resource: %s %s, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
log.Errorf("Failed to patch resource: %s %s, err: %v", object.Mapping.Resource.Resource, object.Name, err)
return err
}
log.Infof("Patching workload %s", workload)
@@ -106,9 +101,7 @@ func InjectEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset *kuber
if err != nil {
return err
}
// 3) ssh reverse tunnel eg: "ssh -o StrictHostKeychecking=no -fNR remote:33333:localhost:44444 root@127.0.0.1 -p 2222"
err = exposeLocalPortToRemote(ctx, clientset, namespace, labels.SelectorFromSet(templateSpec.Labels).String(), containerPort2EnvoyRulePort, containerPort2EnvoyListenerPort)
return err
return nil
}
func ModifyServiceTargetPort(ctx context.Context, clientset *kubernetes.Clientset, namespace string, labels string, m map[int32]int32) error {
@@ -126,7 +119,7 @@ func ModifyServiceTargetPort(ctx context.Context, clientset *kubernetes.Clientse
return err
}
func getPort(templateSpec *v1.PodTemplateSpec, portMaps []string) ([]v1.ContainerPort, map[int32]string, map[int32]int32) {
func GetPort(templateSpec *v1.PodTemplateSpec, portMaps []string) ([]v1.ContainerPort, map[int32]string) {
var ports []v1.ContainerPort
for _, container := range templateSpec.Spec.Containers {
ports = append(ports, container.Ports...)
@@ -148,21 +141,18 @@ func getPort(templateSpec *v1.PodTemplateSpec, portMaps []string) ([]v1.Containe
}
var portmap = make(map[int32]string)
var m = make(map[int32]int32)
for _, port := range ports {
randomPort, _ := util.GetAvailableTCPPortOrDie()
portmap[port.ContainerPort] = fmt.Sprintf("%d:%d", randomPort, port.ContainerPort)
m[port.ContainerPort] = int32(randomPort)
}
for _, portMap := range portMaps {
port := util.ParsePort(portMap)
if port.ContainerPort != 0 {
randomPort, _ := util.GetAvailableTCPPortOrDie()
portmap[port.ContainerPort] = fmt.Sprintf("%d:%d", randomPort, port.HostPort)
m[port.ContainerPort] = int32(randomPort)
}
}
return ports, portmap, m
return ports, portmap
}
var _ = `function EPHEMERAL_PORT() {
@@ -178,50 +168,172 @@ var _ = `function EPHEMERAL_PORT() {
done
}`
func exposeLocalPortToRemote(ctx context.Context, clientset *kubernetes.Clientset, ns string, labels string, containerPort2EnvoyRulePort map[int32]int32, containerPort2EnvoyListenerPort map[int32]int32) error {
list, err := util.GetRunningPodList(ctx, clientset, ns, labels)
if err != nil {
return err
func NewMapper(clientset *kubernetes.Clientset, ns string, labels string, headers map[string]string, workloads []string) *Mapper {
ctx, cancelFunc := context.WithCancel(context.Background())
return &Mapper{
ns: ns,
headers: headers,
workloads: workloads,
labels: labels,
ctx: ctx,
cancel: cancelFunc,
clientset: clientset,
}
for _, pod := range list {
addr, err := netip.ParseAddr(pod.Status.PodIP)
if err != nil {
return err
}
go func(addrPort netip.AddrPort) {
for containerPort, envoyRulePort := range containerPort2EnvoyRulePort {
go func(containerPort, envoyRulePort int32) {
for {
local := netip.AddrPortFrom(netip.IPv4Unspecified(), uint16(containerPort))
remote := netip.AddrPortFrom(netip.IPv4Unspecified(), uint16(envoyRulePort))
_ = ssh.ExposeLocalPortToRemote(ctx, addrPort, remote, local)
time.Sleep(time.Second * 1)
}
}(containerPort, envoyRulePort)
}
}(netip.AddrPortFrom(addr, 2222))
}
return nil
}
type Injector struct {
Namespace string
Headers map[string]string
PortMap []string
Workloads []string
Engine config.Engine
Lock *sync.Mutex
type Mapper struct {
ns string
headers map[string]string
workloads []string
labels string
ctx context.Context
cancel context.CancelFunc
clientset *kubernetes.Clientset
restclient *rest.RESTClient
config *rest.Config
factory cmdutil.Factory
// needs to give it back to dhcp
localTunIPv4 *net.IPNet
localTunIPv6 *net.IPNet
tunName string
clientset *kubernetes.Clientset
}
func (m *Mapper) Run() {
var podNameCtx = &sync.Map{}
defer func() {
podNameCtx.Range(func(key, value any) bool {
value.(context.CancelFunc)()
return true
})
podNameCtx.Clear()
}()
var lastLocalPort2EnvoyRulePort map[int32]int32
for m.ctx.Err() == nil {
localPort2EnvoyRulePort, err := m.getLocalPort2EnvoyRulePort()
if err != nil {
log.Errorf("failed to get local port to envoy rule port: %v", err)
time.Sleep(time.Second * 2)
continue
}
if !reflect.DeepEqual(localPort2EnvoyRulePort, lastLocalPort2EnvoyRulePort) {
podNameCtx.Range(func(key, value any) bool {
value.(context.CancelFunc)()
return true
})
podNameCtx.Clear()
}
lastLocalPort2EnvoyRulePort = localPort2EnvoyRulePort
list, err := util.GetRunningPodList(m.ctx, m.clientset, m.ns, m.labels)
if err != nil {
log.Errorf("failed to list running pod: %v", err)
time.Sleep(time.Second * 2)
continue
}
podNames := sets.New[string]()
for _, pod := range list {
podNames.Insert(pod.Name)
if _, ok := podNameCtx.Load(pod.Name); ok {
continue
}
containerNames := sets.New[string]()
for _, container := range pod.Spec.Containers {
containerNames.Insert(container.Name)
}
if !containerNames.HasAny(config.ContainerSidecarVPN, config.ContainerSidecarEnvoyProxy) {
log.Infof("Labels with pod have been reset")
return
}
podIP, err := netip.ParseAddr(pod.Status.PodIP)
if err != nil {
continue
}
ctx, cancel := context.WithCancel(m.ctx)
podNameCtx.Store(pod.Name, cancel)
go func(remoteSSHServer netip.AddrPort, podName string) {
for containerPort, envoyRulePort := range localPort2EnvoyRulePort {
go func(containerPort, envoyRulePort int32) {
local := netip.AddrPortFrom(netip.IPv4Unspecified(), uint16(containerPort))
remote := netip.AddrPortFrom(netip.IPv4Unspecified(), uint16(envoyRulePort))
for ctx.Err() == nil {
_ = ssh.ExposeLocalPortToRemote(ctx, remoteSSHServer, remote, local)
time.Sleep(time.Second * 1)
}
}(containerPort, envoyRulePort)
}
}(netip.AddrPortFrom(podIP, 2222), pod.Name)
}
podNameCtx.Range(func(key, value any) bool {
if !podNames.Has(key.(string)) {
value.(context.CancelFunc)()
podNameCtx.Delete(key.(string))
}
return true
})
time.Sleep(time.Second * 2)
}
}
func (m *Mapper) getLocalPort2EnvoyRulePort() (map[int32]int32, error) {
configMap, err := m.clientset.CoreV1().ConfigMaps(m.ns).Get(m.ctx, config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if err != nil {
return nil, err
}
var v = make([]*controlplane.Virtual, 0)
if str, ok := configMap.Data[config.KeyEnvoy]; ok {
if err = yaml.Unmarshal([]byte(str), &v); err != nil {
return nil, err
}
}
uidList := sets.New[string]()
for _, workload := range m.workloads {
uidList.Insert(util.ConvertWorkloadToUid(workload))
}
var localPort2EnvoyRulePort = make(map[int32]int32)
for _, virtual := range v {
if uidList.Has(virtual.Uid) {
for _, rule := range virtual.Rules {
if reflect.DeepEqual(m.headers, rule.Headers) {
for containerPort, portPair := range rule.PortMap {
if strings.Index(portPair, ":") > 0 {
split := strings.Split(portPair, ":")
if len(split) == 2 {
envoyRulePort, _ := strconv.Atoi(split[0])
localPort, _ := strconv.Atoi(split[1])
localPort2EnvoyRulePort[int32(localPort)] = int32(envoyRulePort)
}
} else {
envoyRulePort, _ := strconv.Atoi(portPair)
localPort2EnvoyRulePort[containerPort] = int32(envoyRulePort)
}
}
}
}
}
}
return localPort2EnvoyRulePort, nil
}
func (m *Mapper) Stop(workload string) {
if m == nil {
return
}
if !sets.New[string]().Insert(m.workloads...).Has(workload) {
return
}
m.cancel()
}
func (m *Mapper) IsMe(uid string, headers map[string]string) bool {
if m == nil {
return false
}
if !sets.New[string]().Insert(m.workloads...).Has(util.ConvertUidToWorkload(uid)) {
return false
}
if !reflect.DeepEqual(m.headers, headers) {
return false
}
return true
}

View File

@@ -31,13 +31,7 @@ import (
// https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio
// InjectVPNAndEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1
func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset v12.ConfigMapInterface, namespace, workload string, c util.PodRouteConfig, headers map[string]string, portMaps []string) (err error) {
var object *runtimeresource.Info
object, err = util.GetUnstructuredObject(f, namespace, workload)
if err != nil {
return err
}
func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset v12.ConfigMapInterface, namespace, workload string, object *runtimeresource.Info, c util.PodRouteConfig, headers map[string]string, portMaps []string) (err error) {
u := object.Object.(*unstructured.Unstructured)
var templateSpec *v1.PodTemplateSpec
var path []string
@@ -112,7 +106,7 @@ func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
if err != nil {
log.Errorf("Failed to patch resource: %s %s, err: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
log.Errorf("Failed to patch resource: %s %s, err: %v", object.Mapping.Resource.Resource, object.Name, err)
return err
}
log.Infof("Patching workload %s", workload)
@@ -120,13 +114,7 @@ func InjectVPNAndEnvoySidecar(ctx context.Context, f cmdutil.Factory, clientset
return err
}
func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, namespace, workload string, localTunIPv4 string) error {
object, err := util.GetUnstructuredObject(factory, namespace, workload)
if err != nil {
log.Errorf("Failed to get unstructured object: %v", err)
return err
}
func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterface, object *runtimeresource.Info, isMeFunc func(isFargateMode bool, rule *controlplane.Rule) bool) error {
u := object.Object.(*unstructured.Unstructured)
templateSpec, depth, err := util.GetPodTemplateSpecPath(u)
if err != nil {
@@ -135,9 +123,9 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
}
nodeID := fmt.Sprintf("%s.%s", object.Mapping.Resource.GroupResource().String(), object.Name)
workload := util.ConvertUidToWorkload(nodeID)
var empty, found bool
empty, found, err = removeEnvoyConfig(mapInterface, nodeID, localTunIPv4)
empty, found, err = removeEnvoyConfig(mapInterface, nodeID, isMeFunc)
if err != nil {
log.Errorf("Failed to remove envoy config: %v", err)
return err
@@ -178,7 +166,7 @@ func UnPatchContainer(factory cmdutil.Factory, mapInterface v12.ConfigMapInterfa
}
_, err = helper.Patch(object.Namespace, object.Name, types.JSONPatchType, bytes, &metav1.PatchOptions{})
if err != nil {
log.Errorf("Failed to patch resource: %s %s: %v", object.Mapping.GroupVersionKind.GroupKind().String(), object.Name, err)
log.Errorf("Failed to patch resource: %s %s: %v", object.Mapping.Resource.Resource, object.Name, err)
return err
}
}
@@ -229,13 +217,21 @@ func addVirtualRule(v []*controlplane.Virtual, nodeID string, port []controlplan
})
}
var isFargateMode bool
for _, containerPort := range v[index].Ports {
if containerPort.EnvoyListenerPort != 0 {
isFargateMode = true
}
}
// 2) if already proxy deployment/xxx with header foo=bar. also want to add env=dev
for j, rule := range v[index].Rules {
if rule.LocalTunIPv4 == tunIP.LocalTunIPv4 &&
rule.LocalTunIPv6 == tunIP.LocalTunIPv6 {
v[index].Rules[j].Headers = util.Merge[string, string](v[index].Rules[j].Headers, headers)
v[index].Rules[j].PortMap = util.Merge[int32, string](v[index].Rules[j].PortMap, portmap)
return v
if !isFargateMode {
for j, rule := range v[index].Rules {
if rule.LocalTunIPv4 == tunIP.LocalTunIPv4 &&
rule.LocalTunIPv6 == tunIP.LocalTunIPv6 {
v[index].Rules[j].Headers = util.Merge[string, string](v[index].Rules[j].Headers, headers)
v[index].Rules[j].PortMap = util.Merge[int32, string](v[index].Rules[j].PortMap, portmap)
return v
}
}
}
@@ -262,7 +258,7 @@ func addVirtualRule(v []*controlplane.Virtual, nodeID string, port []controlplan
return v
}
func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, localTunIPv4 string) (empty bool, found bool, err error) {
func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, isMeFunc func(isFargateMode bool, rule *controlplane.Rule) bool) (empty bool, found bool, err error) {
configMap, err := mapInterface.Get(context.Background(), config.ConfigMapPodTrafficManager, metav1.GetOptions{})
if k8serrors.IsNotFound(err) {
return true, false, nil
@@ -280,8 +276,14 @@ func removeEnvoyConfig(mapInterface v12.ConfigMapInterface, nodeID string, local
}
for _, virtual := range v {
if nodeID == virtual.Uid {
var isFargateMode bool
for _, port := range virtual.Ports {
if port.EnvoyListenerPort != 0 {
isFargateMode = true
}
}
for i := 0; i < len(virtual.Rules); i++ {
if virtual.Rules[i].LocalTunIPv4 == localTunIPv4 {
if isMeFunc(isFargateMode, virtual.Rules[i]) {
found = true
virtual.Rules = append(virtual.Rules[:i], virtual.Rules[i+1:]...)
i--

View File

@@ -24,12 +24,7 @@ import (
util2 "github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func InjectVPNSidecar(ctx context.Context, f util.Factory, namespace, workload string, c util2.PodRouteConfig) error {
object, err := util2.GetUnstructuredObject(f, namespace, workload)
if err != nil {
return err
}
func InjectVPNSidecar(ctx context.Context, f util.Factory, namespace, workload string, object *resource.Info, c util2.PodRouteConfig) error {
u := object.Object.(*unstructured.Unstructured)
podTempSpec, path, err := util2.GetPodTemplateSpecPath(u)

View File

@@ -52,13 +52,16 @@ func ExposeLocalPortToRemote(ctx context.Context, remoteSSHServer, remotePort, l
log.Errorf("Accept on remote service error: %s", err)
return err
}
// Open a (local) connection to localEndpoint whose content will be forwarded so serverEndpoint
local, err := net.Dial("tcp", localPort.String())
if err != nil {
log.Errorf("Dial INTO local service error: %s", err)
continue
}
copyStream(ctx, client, local)
go func(client net.Conn) {
defer client.Close()
// Open a (local) connection to localEndpoint whose content will be forwarded so serverEndpoint
local, err := net.Dial("tcp", localPort.String())
if err != nil {
log.Errorf("Dial INTO local service error: %s", err)
return
}
defer local.Close()
copyStream(ctx, client, local)
}(client)
}
return nil
}

View File

@@ -22,7 +22,6 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
@@ -198,16 +197,7 @@ func GetTopOwnerReference(factory util.Factory, ns, workload string) (*resource.
if ownerReference == nil {
return object, nil
}
// apiVersion format is Group/Version is like: apps/v1, apps.kruise.io/v1beta1
groupVersion, err := schema.ParseGroupVersion(ownerReference.APIVersion)
if err != nil {
return object, nil
}
gk := v1.GroupKind{
Group: groupVersion.Group,
Kind: ownerReference.Kind,
}
workload = fmt.Sprintf("%s/%s", gk.String(), ownerReference.Name)
workload = fmt.Sprintf("%s/%s", ownerReference.Kind, ownerReference.Name)
}
}
@@ -221,7 +211,7 @@ func GetTopOwnerReferenceBySelector(factory util.Factory, ns, selector string) (
for _, info := range object {
ownerReference, err := GetTopOwnerReference(factory, ns, fmt.Sprintf("%s/%s", info.Mapping.Resource.GroupResource().String(), info.Name))
if err == nil && ownerReference.Mapping.Resource.Resource != "services" {
set.Insert(fmt.Sprintf("%s/%s", ownerReference.Mapping.GroupVersionKind.GroupKind().String(), ownerReference.Name))
set.Insert(fmt.Sprintf("%s/%s", ownerReference.Mapping.Resource.GroupResource().String(), ownerReference.Name))
}
}
return set, nil

View File

@@ -365,3 +365,16 @@ func If[T any](b bool, t1, t2 T) T {
}
return t2
}
// ConvertUidToWorkload
// deployments.apps.productpage --> deployments.apps/productpage
func ConvertUidToWorkload(uid string) string {
index := strings.LastIndex(uid, ".")
return uid[:index] + "/" + uid[index+1:]
}
// ConvertWorkloadToUid
// deployments.apps/productpage --> deployments.apps.productpage
func ConvertWorkloadToUid(workload string) string {
return strings.ReplaceAll(workload, "/", ".")
}

View File

@@ -108,3 +108,25 @@ func TestPing(t *testing.T) {
}
log.Print("Packet sent!")
}
func TestConvertUidToWorkload(t *testing.T) {
testDatas := []struct {
uid string
expect string
}{
{
uid: "deployments.apps.productpage",
expect: "deployments.apps/productpage",
},
{
uid: "deployments..productpage",
expect: "deployments./productpage",
},
}
for _, data := range testDatas {
workload := ConvertUidToWorkload(data.uid)
if workload != data.expect {
t.FailNow()
}
}
}