feat: fargate mode works basic

This commit is contained in:
fengcaiwen
2025-01-18 10:52:51 +08:00
parent 8b771e82b5
commit b6cfba7db9
88 changed files with 4953 additions and 360 deletions

View File

@@ -2,14 +2,18 @@ package controlplane
import (
"fmt"
"strconv"
"strings"
"time"
v31 "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v3"
cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
accesslogfilev3 "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3"
corsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/cors/v3"
grpcwebv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/grpc_web/v3"
routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
@@ -22,24 +26,68 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"
corev1 "k8s.io/api/core/v1"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
type Virtual struct {
Uid string // group.resource.name
Ports []corev1.ContainerPort
Ports []ContainerPort
Rules []*Rule
}
type ContainerPort struct {
// If specified, this must be an IANA_SVC_NAME and unique within the pod. Each
// named port in a pod must have a unique name. Name for the port that can be
// referred to by services.
// +optional
Name string `json:"name,omitempty"`
// Number of port to expose on the host.
// If specified, this must be a valid port number, 0 < x < 65536.
// If HostNetwork is specified, this must match ContainerPort.
// Most containers do not need this.
// +optional
InnerPort int32 `json:"hostPort,omitempty"`
// Number of port to expose on the pod's IP address.
// This must be a valid port number, 0 < x < 65536.
ContainerPort int32 `json:"containerPort"`
// Protocol for port. Must be UDP, TCP, or SCTP.
// Defaults to "TCP".
// +optional
// +default="TCP"
Protocol corev1.Protocol `json:"protocol,omitempty"`
}
func ConvertContainerPort(ports ...corev1.ContainerPort) []ContainerPort {
var result []ContainerPort
for _, port := range ports {
result = append(result, ContainerPort{
Name: port.Name,
InnerPort: 0,
ContainerPort: port.ContainerPort,
Protocol: port.Protocol,
})
}
return result
}
type Rule struct {
Headers map[string]string
LocalTunIPv4 string
LocalTunIPv6 string
PortMap map[int32]int32
// for no privileged mode, don't have cap NET_ADMIN and privileged: true. so we can not use OSI layer 3 proxy
// port -> port1:port2
// port1 for envoy forward to localhost:port1
// port2 for local pc listen localhost:port2
// use ssh reverse tunnel, envoy rule endpoint localhost:port1 will forward to local pc localhost:port2
// port1 is required and port2 is optional
PortMap map[int32]string
}
func (a *Virtual) To(enableIPv6 bool) (
@@ -50,9 +98,11 @@ func (a *Virtual) To(enableIPv6 bool) (
) {
//clusters = append(clusters, OriginCluster())
for _, port := range a.Ports {
listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, port.ContainerPort, port.Protocol)
isFargateMode := port.InnerPort != 0
listenerName := fmt.Sprintf("%s_%v_%s", a.Uid, util.If(isFargateMode, port.InnerPort, port.ContainerPort), port.Protocol)
routeName := listenerName
listeners = append(listeners, ToListener(listenerName, routeName, port.ContainerPort, port.Protocol))
listeners = append(listeners, ToListener(listenerName, routeName, util.If(isFargateMode, port.InnerPort, port.ContainerPort), port.Protocol, isFargateMode))
var rr []*route.Route
for _, rule := range a.Rules {
@@ -62,15 +112,34 @@ func (a *Virtual) To(enableIPv6 bool) (
} else {
ips = []string{rule.LocalTunIPv4}
}
ports := rule.PortMap[port.ContainerPort]
if isFargateMode {
if strings.Index(ports, ":") > 0 {
ports = strings.Split(ports, ":")[0]
} else {
logrus.Errorf("fargate mode port should have two pair")
}
}
p, _ := strconv.Atoi(ports)
for _, ip := range ips {
clusterName := fmt.Sprintf("%s_%v", ip, rule.PortMap[port.ContainerPort])
clusterName := fmt.Sprintf("%s_%v", ip, p)
clusters = append(clusters, ToCluster(clusterName))
endpoints = append(endpoints, ToEndPoint(clusterName, ip, rule.PortMap[port.ContainerPort]))
endpoints = append(endpoints, ToEndPoint(clusterName, ip, int32(p)))
rr = append(rr, ToRoute(clusterName, rule.Headers))
if isFargateMode {
defaultClusterName := fmt.Sprintf("%s_%v", ip, port.ContainerPort)
clusters = append(clusters, ToCluster(defaultClusterName))
endpoints = append(endpoints, ToEndPoint(defaultClusterName, ip, port.ContainerPort))
rr = append(rr, DefaultRouteToCluster(defaultClusterName))
}
}
}
rr = append(rr, DefaultRoute())
clusters = append(clusters, OriginCluster())
// if isFargateMode is true, already add default route, no long to add it
if !isFargateMode {
rr = append(rr, DefaultRoute())
clusters = append(clusters, OriginCluster())
}
routes = append(routes, &route.RouteConfiguration{
Name: routeName,
VirtualHosts: []*route.VirtualHost{
@@ -225,7 +294,30 @@ func DefaultRoute() *route.Route {
}
}
func ToListener(listenerName string, routeName string, port int32, p corev1.Protocol) *listener.Listener {
func DefaultRouteToCluster(clusterName string) *route.Route {
return &route.Route{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
},
Action: &route.Route_Route{
Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: clusterName,
},
Timeout: durationpb.New(0),
IdleTimeout: durationpb.New(0),
MaxStreamDuration: &route.RouteAction_MaxStreamDuration{
MaxStreamDuration: durationpb.New(0),
GrpcTimeoutHeaderMax: durationpb.New(0),
},
},
},
}
}
func ToListener(listenerName string, routeName string, port int32, p corev1.Protocol, isFargateMode bool) *listener.Listener {
var protocol core.SocketAddress_Protocol
switch p {
case corev1.ProtocolTCP:
@@ -289,6 +381,14 @@ func ToListener(listenerName string, routeName string, port int32, p corev1.Prot
UpgradeConfigs: []*httpconnectionmanager.HttpConnectionManager_UpgradeConfig{{
UpgradeType: "websocket",
}},
AccessLog: []*v31.AccessLog{{
Name: wellknown.FileAccessLog,
ConfigType: &v31.AccessLog_TypedConfig{
TypedConfig: anyFunc(&accesslogfilev3.FileAccessLog{
Path: "/dev/stdout",
}),
},
}},
}
tcpConfig := &tcpproxy.TcpProxy{
@@ -301,7 +401,7 @@ func ToListener(listenerName string, routeName string, port int32, p corev1.Prot
return &listener.Listener{
Name: listenerName,
TrafficDirection: core.TrafficDirection_INBOUND,
BindToPort: &wrapperspb.BoolValue{Value: false},
BindToPort: &wrapperspb.BoolValue{Value: util.If(isFargateMode, true, false)},
UseOriginalDst: &wrapperspb.BoolValue{Value: true},
Address: &core.Address{