optimize code

This commit is contained in:
p_caiwfeng
2021-12-12 11:39:00 +08:00
parent c957bedf5e
commit 1f3de12da8
28 changed files with 89 additions and 100 deletions

View File

@@ -1,6 +1,7 @@
package core
import (
"context"
"github.com/wencaiwulue/kubevpn/tun"
"net"
)
@@ -8,7 +9,7 @@ import (
// Handler is a proxy server handler
type Handler interface {
Init(options ...HandlerOption)
Handle(net.Conn)
Handle(ctx context.Context, conn net.Conn)
}
// HandlerOptions describes the options for Handler.

View File

@@ -66,7 +66,7 @@ func (s *Server) Serve(ctx context.Context, h Handler) error {
}
tempDelay = 0
go h.Handle(conn)
go h.Handle(ctx, conn)
}
return nil
}

View File

@@ -41,7 +41,7 @@ func TCPHandler() Handler {
func (h *fakeUdpHandler) Init(...HandlerOption) {
}
func (h *fakeUdpHandler) Handle(conn net.Conn) {
func (h *fakeUdpHandler) Handle(ctx context.Context, conn net.Conn) {
defer conn.Close()
if util.Debug {
log.Debugf("[tcpserver] %s -> %s\n", conn.RemoteAddr(), conn.LocalAddr())

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/wencaiwulue/kubevpn/remote"
"github.com/wencaiwulue/kubevpn/util"
"net"
"sync"
@@ -48,11 +47,8 @@ func (h *tunHandler) Init(options ...HandlerOption) {
}
}
func (h *tunHandler) Handle(conn net.Conn) {
func (h *tunHandler) Handle(ctx context.Context, conn net.Conn) {
defer conn.Close()
ctx, cancelFunc := context.WithCancel(context.TODO())
remote.CancelFunctions = append(remote.CancelFunctions, cancelFunc)
var err error
var raddr net.Addr
if addr := h.options.Node.Remote; addr != "" {
@@ -69,7 +65,7 @@ func (h *tunHandler) Handle(conn net.Conn) {
var err error
var pc net.PacketConn
if raddr != nil && !h.options.Chain.IsEmpty() {
cc, err := h.options.Chain.DialContext(context.Background(), "udp", raddr.String())
cc, err := h.options.Chain.DialContext(ctx, "udp", raddr.String())
if err != nil {
return err
}
@@ -88,7 +84,7 @@ func (h *tunHandler) Handle(conn net.Conn) {
return err
}
return h.transportTun(conn, pc, raddr)
return h.transportTun(ctx, conn, pc, raddr)
}()
if err != nil {
log.Debugf("[tun] %s: %v", conn.LocalAddr(), err)
@@ -133,7 +129,7 @@ func (h *tunHandler) findRouteFor(dst net.IP) net.Addr {
return nil
}
func (h *tunHandler) transportTun(tun net.Conn, conn net.PacketConn, raddr net.Addr) error {
func (h *tunHandler) transportTun(ctx context.Context, tun net.Conn, conn net.PacketConn, raddr net.Addr) error {
errChan := make(chan error, 2)
defer func() {
if c, ok := conn.(interface{ CloseRead() error }); ok {
@@ -144,8 +140,6 @@ func (h *tunHandler) transportTun(tun net.Conn, conn net.PacketConn, raddr net.A
}
_ = conn.Close()
}()
ctx, cancelFunc := context.WithCancel(context.Background())
remote.CancelFunctions = append(remote.CancelFunctions, cancelFunc)
go func() {
for ctx.Err() == nil {
err := func() error {
@@ -288,7 +282,6 @@ func (h *tunHandler) transportTun(tun net.Conn, conn net.PacketConn, raddr net.A
if err != nil {
errChan <- err
cancelFunc()
return
}
}

View File

@@ -1,4 +1,4 @@
package remote
package pkg
import (
"context"

View File

@@ -6,7 +6,6 @@ import (
"fmt"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/dns"
"github.com/wencaiwulue/kubevpn/remote"
"github.com/wencaiwulue/kubevpn/util"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -39,7 +38,7 @@ type ConnectOptions struct {
factory cmdutil.Factory
cidrs []*net.IPNet
routerIP string
dhcp *remote.DHCPManager
dhcp *DHCPManager
}
var trafficManager = net.IPNet{
@@ -70,18 +69,20 @@ func (c *ConnectOptions) createRemoteInboundPod() {
virtualShadowIp, _ := c.dhcp.RentIPRandom()
tempIps = append(tempIps, virtualShadowIp)
lock.Unlock()
config := PodRouteConfig{
LocalTunIP: tunIp.IP.String(),
InboundPodTunIP: virtualShadowIp.String(),
TrafficManagerRealIP: c.routerIP,
Route: trafficManager.String(),
}
// TODO OPTIMIZE CODE
if c.Mode == Mesh {
err = remote.PatchSidecar(
err = PatchSidecar(
c.factory,
c.clientset,
c.Namespace,
finalWorkload,
tunIp.IP.String(),
c.routerIP,
virtualShadowIp.String(),
trafficManager.String(),
config,
)
} else {
err = CreateInboundPod(
@@ -89,10 +90,7 @@ func (c *ConnectOptions) createRemoteInboundPod() {
c.clientset,
c.Namespace,
finalWorkload,
tunIp.IP.String(),
c.routerIP,
virtualShadowIp.String(),
trafficManager.String(),
config,
)
}
if err != nil {
@@ -102,7 +100,7 @@ func (c *ConnectOptions) createRemoteInboundPod() {
}
}
wg.Wait()
remote.AddCleanUpResourceHandler(c.clientset, c.Namespace, c.Workloads, c.dhcp, tempIps...)
AddCleanUpResourceHandler(c.clientset, c.Namespace, c.Workloads, c.dhcp, tempIps...)
if util.IsWindows() {
tunIp.Mask = net.CIDRMask(0, 32)
} else {
@@ -127,7 +125,7 @@ func (c *ConnectOptions) DoConnect() {
if err != nil {
log.Fatal(err)
}
c.dhcp = remote.NewDHCPManager(c.clientset, c.Namespace, &trafficManager)
c.dhcp = NewDHCPManager(c.clientset, c.Namespace, &trafficManager)
if err = c.dhcp.InitDHCP(); err != nil {
log.Fatal(err)
}
@@ -157,7 +155,7 @@ func (c ConnectOptions) heartbeats() {
func (c *ConnectOptions) portForward() {
var readyChanRef *chan struct{}
ctx, cancelFunc := context.WithCancel(context.Background())
remote.CancelFunctions = append(remote.CancelFunctions, cancelFunc)
CancelFunctions = append(CancelFunctions, cancelFunc)
go func() {
for ctx.Err() == nil {
func() {
@@ -234,7 +232,7 @@ func Start(r Route) (chan error, error) {
c := make(chan error, len(routers))
for i := range routers {
ctx, cancelFunc := context.WithCancel(context.Background())
remote.CancelFunctions = append(remote.CancelFunctions, cancelFunc)
CancelFunctions = append(CancelFunctions, cancelFunc)
go func(finalCtx context.Context, finalI int, c chan error) {
if err = routers[finalI].Serve(finalCtx); err != nil {
log.Warn(err)

View File

@@ -1,4 +1,4 @@
package remote
package pkg
import (
"context"

View File

@@ -1,4 +1,4 @@
package remote
package pkg
import (
"context"
@@ -12,6 +12,7 @@ import (
envoyresource "github.com/envoyproxy/go-control-plane/pkg/test/resource"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/mesh"
"github.com/wencaiwulue/kubevpn/util"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@@ -30,9 +31,9 @@ import (
// patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1
// TODO if using envoy needs to create another pod, if using diy proxy, using one container is enough
// TODO support multiple port
func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads, virtualLocalIp, realRouterIP, virtualShadowIp, routes string) error {
func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string, c PodRouteConfig) error {
// create pod in bound for mesh
err, podIp := CreateServerInboundForMesh(clientset, namespace, workloads, virtualLocalIp, realRouterIP, virtualShadowIp, routes)
err, podIp := CreateServerInboundForMesh(clientset, namespace, workloads, c)
if err != nil {
log.Warnln(err)
return err
@@ -44,20 +45,20 @@ func PatchSidecar(factory cmdutil.Factory, clientset *kubernetes.Clientset, name
}
t := true
zero := int64(0)
var sc Injectable
var sc mesh.Injectable
switch strings.ToLower(resourceTuple.Resource) {
case "deployment", "deployments":
sc = NewDeploymentController(factory, clientset, namespace, resourceTuple.Name)
sc = mesh.NewDeploymentController(factory, clientset, namespace, resourceTuple.Name)
case "statefulset", "statefulsets":
sc = NewStatefulsetController(factory, clientset, namespace, resourceTuple.Name)
sc = mesh.NewStatefulsetController(factory, clientset, namespace, resourceTuple.Name)
case "replicaset", "replicasets":
sc = NewReplicasController(factory, clientset, namespace, resourceTuple.Name)
sc = mesh.NewReplicasController(factory, clientset, namespace, resourceTuple.Name)
case "service", "services":
sc = NewServiceController(factory, clientset, namespace, resourceTuple.Name)
sc = mesh.NewServiceController(factory, clientset, namespace, resourceTuple.Name)
case "pod", "pods":
sc = NewPodController(factory, clientset, namespace, "pods", resourceTuple.Name)
sc = mesh.NewPodController(factory, clientset, namespace, "pods", resourceTuple.Name)
default:
sc = NewPodController(factory, clientset, namespace, resourceTuple.Resource, resourceTuple.Name)
sc = mesh.NewPodController(factory, clientset, namespace, resourceTuple.Resource, resourceTuple.Name)
}
CancelFunctions = append(CancelFunctions, func() {
if err = sc.Cancel(); err != nil {
@@ -248,7 +249,7 @@ func createEnvoyConfigMapIfNeeded(factory cmdutil.Factory, clientset *kubernetes
}
}
func CreateServerInboundForMesh(clientset *kubernetes.Clientset, namespace, workloads, virtualLocalIp, realRouterIP, virtualShadowIp, routes string) (error, string) {
func CreateServerInboundForMesh(clientset *kubernetes.Clientset, namespace, workloads string, config PodRouteConfig) (error, string) {
resourceTuple, parsed, err2 := util.SplitResourceTypeName(workloads)
if !parsed || err2 != nil {
return errors.New("not need"), ""
@@ -274,11 +275,11 @@ func CreateServerInboundForMesh(clientset *kubernetes.Clientset, namespace, work
"iptables -F;" +
"iptables -P INPUT ACCEPT;" +
"iptables -P FORWARD ACCEPT;" +
"iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80:60000 -j DNAT --to " + virtualLocalIp + ":80-60000;" +
"iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80:60000 -j DNAT --to " + config.LocalTunIP + ":80-60000;" +
"iptables -t nat -A POSTROUTING -p tcp -m tcp --dport 80:60000 -j MASQUERADE;" +
"iptables -t nat -A PREROUTING -i eth0 -p udp --dport 80:60000 -j DNAT --to " + virtualLocalIp + ":80-60000;" +
"iptables -t nat -A PREROUTING -i eth0 -p udp --dport 80:60000 -j DNAT --to " + config.LocalTunIP + ":80-60000;" +
"iptables -t nat -A POSTROUTING -p udp -m udp --dport 80:60000 -j MASQUERADE;" +
"kubevpn serve -L 'tun://0.0.0.0:8421/" + realRouterIP + ":8421?net=" + virtualShadowIp + "&route=" + routes + "' --debug=true",
"kubevpn serve -L 'tun://0.0.0.0:8421/" + config.TrafficManagerRealIP + ":8421?net=" + config.InboundPodTunIP + "&route=" + config.Route + "' --debug=true",
},
SecurityContext: &v1.SecurityContext{
Capabilities: &v1.Capabilities{

View File

@@ -1,4 +1,4 @@
package pkg
package exchange
import (
v1 "k8s.io/api/core/v1"

View File

@@ -1,4 +1,4 @@
package pkg
package exchange
import (
"context"

View File

@@ -1,4 +1,4 @@
package pkg
package exchange
import (
"context"

View File

@@ -1,4 +1,4 @@
package pkg
package exchange
import (
"context"

View File

@@ -1,4 +1,4 @@
package pkg
package exchange
import (
"context"

View File

@@ -1,4 +1,4 @@
package pkg
package exchange
import (
"context"

View File

@@ -1,4 +1,4 @@
package remote
package mesh
import (
v1 "k8s.io/api/core/v1"

View File

@@ -1,4 +1,4 @@
package remote
package mesh
import (
"context"

View File

@@ -1,4 +1,4 @@
package remote
package mesh
import (
"context"

View File

@@ -1,4 +1,4 @@
package remote
package mesh
import (
"context"

View File

@@ -1,4 +1,4 @@
package remote
package mesh
import (
"context"

View File

@@ -1,4 +1,4 @@
package remote
package mesh
import (
"context"

View File

@@ -4,7 +4,7 @@ import (
"context"
"errors"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/remote"
"github.com/wencaiwulue/kubevpn/pkg/exchange"
"github.com/wencaiwulue/kubevpn/util"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@@ -20,7 +20,7 @@ import (
"time"
)
func CreateOutboundRouterPod(clientset *kubernetes.Clientset, namespace string, serverIp *net.IPNet, nodeCIDR []*net.IPNet) (string, error) {
func CreateOutboundRouterPod(clientset *kubernetes.Clientset, namespace string, trafficManagerIP *net.IPNet, nodeCIDR []*net.IPNet) (string, error) {
firstPod, i, err3 := polymorphichelpers.GetFirstPod(clientset.CoreV1(),
namespace,
fields.OneTermEqualSelector("app", util.TrafficManager).String(),
@@ -31,7 +31,7 @@ func CreateOutboundRouterPod(clientset *kubernetes.Clientset, namespace string,
)
if err3 == nil && i != 0 && firstPod != nil {
remote.UpdateRefCount(clientset, namespace, firstPod.Name, 1)
UpdateRefCount(clientset, namespace, firstPod.Name, 1)
return firstPod.Status.PodIP, nil
}
args := []string{
@@ -44,7 +44,7 @@ func CreateOutboundRouterPod(clientset *kubernetes.Clientset, namespace string,
for _, ipNet := range nodeCIDR {
args = append(args, "iptables -t nat -A POSTROUTING -s "+ipNet.String()+" -o eth0 -j MASQUERADE")
}
args = append(args, "kubevpn serve -L tcp://:10800 -L tun://:8421?net="+serverIp.String()+" --debug=true")
args = append(args, "kubevpn serve -L tcp://:10800 -L tun://:8421?net="+trafficManagerIP.String()+" --debug=true")
t := true
zero := int64(0)
@@ -114,38 +114,45 @@ func CreateOutboundRouterPod(clientset *kubernetes.Clientset, namespace string,
}
}
func CreateInboundPod(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads, virtualLocalIp, realRouterIP, virtualShadowIp, routes string) error {
func CreateInboundPod(factory cmdutil.Factory, clientset *kubernetes.Clientset, namespace, workloads string, config PodRouteConfig) error {
resourceTuple, parsed, err2 := util.SplitResourceTypeName(workloads)
if !parsed || err2 != nil {
return errors.New("not need")
}
newName := resourceTuple.Name + "-" + "shadow"
util.DeletePod(clientset, namespace, newName)
//err := updateScaleToZero(factory, clientset, namespace, workloads)
//object, err2 := util.GetUnstructuredObject(factory, namespace, workloads)
//labels := util.GetLabelSelector(object.Object)
//ports := util.GetPorts(object.Object)
var sc Scalable
var sc exchange.Scalable
switch strings.ToLower(resourceTuple.Resource) {
case "deployment", "deployments":
sc = NewDeploymentController(factory, clientset, namespace, resourceTuple.Name)
sc = exchange.NewDeploymentController(factory, clientset, namespace, resourceTuple.Name)
case "statefulset", "statefulsets":
sc = NewStatefulsetController(factory, clientset, namespace, resourceTuple.Name)
sc = exchange.NewStatefulsetController(factory, clientset, namespace, resourceTuple.Name)
case "replicaset", "replicasets":
sc = NewReplicasController(factory, clientset, namespace, resourceTuple.Name)
sc = exchange.NewReplicasController(factory, clientset, namespace, resourceTuple.Name)
case "service", "services":
sc = NewServiceController(factory, clientset, namespace, resourceTuple.Name)
sc = exchange.NewServiceController(factory, clientset, namespace, resourceTuple.Name)
case "pod", "pods":
sc = NewPodController(factory, clientset, namespace, "pods", resourceTuple.Name)
sc = exchange.NewPodController(factory, clientset, namespace, "pods", resourceTuple.Name)
default:
sc = NewPodController(factory, clientset, namespace, resourceTuple.Resource, resourceTuple.Name)
sc = exchange.NewPodController(factory, clientset, namespace, resourceTuple.Resource, resourceTuple.Name)
}
remote.CancelFunctions = append(remote.CancelFunctions, func() {
CancelFunctions = append(CancelFunctions, func() {
if err := sc.Cancel(); err != nil {
log.Warnln(err)
}
})
labels, ports, err2 := sc.ScaleToZero()
//sc.CreateOutboundPod()
return createInboundPod(newName, namespace, labels, ports, clientset, config)
}
func createInboundPod(newName string,
namespace string,
labels map[string]string,
ports []v1.ContainerPort,
clientset *kubernetes.Clientset,
c PodRouteConfig,
) error {
t := true
zero := int64(0)
pod := v1.Pod{
@@ -166,11 +173,11 @@ func CreateInboundPod(factory cmdutil.Factory, clientset *kubernetes.Clientset,
"iptables -F;" +
"iptables -P INPUT ACCEPT;" +
"iptables -P FORWARD ACCEPT;" +
"iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80:60000 -j DNAT --to " + virtualLocalIp + ":80-60000;" +
"iptables -t nat -A PREROUTING -i eth0 -p tcp --dport 80:60000 -j DNAT --to " + c.LocalTunIP + ":80-60000;" +
"iptables -t nat -A POSTROUTING -p tcp -m tcp --dport 80:60000 -j MASQUERADE;" +
"iptables -t nat -A PREROUTING -i eth0 -p udp --dport 80:60000 -j DNAT --to " + virtualLocalIp + ":80-60000;" +
"iptables -t nat -A PREROUTING -i eth0 -p udp --dport 80:60000 -j DNAT --to " + c.LocalTunIP + ":80-60000;" +
"iptables -t nat -A POSTROUTING -p udp -m udp --dport 80:60000 -j MASQUERADE;" +
"kubevpn serve -L 'tun://0.0.0.0:8421/" + realRouterIP + ":8421?net=" + virtualShadowIp + "&route=" + routes + "' --debug=true",
"kubevpn serve -L 'tun://0.0.0.0:8421/" + c.TrafficManagerRealIP + ":8421?net=" + c.InboundPodTunIP + "&route=" + c.Route + "' --debug=true",
},
SecurityContext: &v1.SecurityContext{
Capabilities: &v1.Capabilities{

View File

@@ -1,4 +1,4 @@
package remote
package pkg
import (
"bytes"

8
pkg/route_config.go Normal file
View File

@@ -0,0 +1,8 @@
package pkg
type PodRouteConfig struct {
LocalTunIP string
InboundPodTunIP string
TrafficManagerRealIP string
Route string
}

View File

@@ -7,6 +7,8 @@ import (
"time"
)
const TrafficManager = "kubevpn.traffic.manager"
// Debug is a flag that enables the debug log.
var Debug bool

View File

@@ -1,3 +0,0 @@
package util
const TrafficManager = "kubevpn.traffic.manager"

View File

@@ -38,7 +38,6 @@ import (
"net/http"
"os"
"runtime"
"strconv"
"strings"
"time"
)
@@ -347,23 +346,6 @@ func GetOwnerReferences(object k8sruntime.Object) *metav1.OwnerReference {
return nil
}
func GetScale(object k8sruntime.Object) int {
defer func() {
if err := recover(); err != nil {
log.Errorln(err)
}
}()
printer, _ := printers.NewJSONPathPrinter("{.spec.replicas}")
buf := bytes.NewBuffer([]byte{})
if err := printer.PrintObj(object, buf); err != nil {
return 0
}
if atoi, err := strconv.Atoi(buf.String()); err == nil {
return atoi
}
return 0
}
func DeletePod(clientset *kubernetes.Clientset, namespace, podName string) {
zero := int64(0)
err := clientset.CoreV1().Pods(namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{