feat: optimize performance

This commit is contained in:
wencaiwulue
2023-02-26 18:17:42 +08:00
parent f490801f99
commit 5f814f6d02
30 changed files with 1187 additions and 487 deletions

View File

@@ -9,15 +9,17 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
_ "go.uber.org/automaxprocs"
cmdutil "k8s.io/kubectl/pkg/cmd/util" cmdutil "k8s.io/kubectl/pkg/cmd/util"
"github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/core"
"github.com/wencaiwulue/kubevpn/pkg/handler" "github.com/wencaiwulue/kubevpn/pkg/handler"
"github.com/wencaiwulue/kubevpn/pkg/util" "github.com/wencaiwulue/kubevpn/pkg/util"
) )
func CmdServe(factory cmdutil.Factory) *cobra.Command { func CmdServe(factory cmdutil.Factory) *cobra.Command {
var route = &handler.Route{} var route = &core.Route{}
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "serve", Use: "serve",
Short: "Server side, startup traffic manager, forward inbound and outbound traffic", Short: "Server side, startup traffic manager, forward inbound and outbound traffic",

5
go.mod
View File

@@ -60,7 +60,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/cncf/xds/go v0.0.0-20230112175826-46e39c7b9b43 // indirect github.com/cncf/xds/go v0.0.0-20230112175826-46e39c7b9b43 // indirect
github.com/containerd/containerd v1.6.17 // indirect github.com/containerd/containerd v1.5.18 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker-credential-helpers v0.7.0 // indirect github.com/docker/docker-credential-helpers v0.7.0 // indirect
@@ -100,7 +100,7 @@ require (
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/moby/buildkit v0.11.2 // indirect github.com/moby/buildkit v0.9.0-rc1 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/spdystream v0.2.0 // indirect github.com/moby/spdystream v0.2.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect github.com/moby/sys/sequential v0.5.0 // indirect
@@ -129,6 +129,7 @@ require (
github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xlab/treeprint v1.1.0 // indirect github.com/xlab/treeprint v1.1.0 // indirect
go.starlark.net v0.0.0-20230112144946-fae38c8a6d89 // indirect go.starlark.net v0.0.0-20230112144946-fae38c8a6d89 // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
golang.org/x/crypto v0.2.0 // indirect golang.org/x/crypto v0.2.0 // indirect
golang.org/x/mod v0.7.0 // indirect golang.org/x/mod v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect golang.org/x/sync v0.1.0 // indirect

467
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ package config
import ( import (
"net" "net"
"sync"
"time" "time"
) )
@@ -42,9 +43,6 @@ const (
EnvPodName = "POD_NAME" EnvPodName = "POD_NAME"
EnvPodNamespace = "POD_NAMESPACE" EnvPodNamespace = "POD_NAMESPACE"
// annotation
AnnoServiceAccountName = "service_account_name_backup_by_kubevpn"
// header name // header name
HeaderPodName = "POD_NAME" HeaderPodName = "POD_NAME"
HeaderPodNamespace = "POD_NAMESPACE" HeaderPodNamespace = "POD_NAMESPACE"
@@ -91,3 +89,11 @@ var (
// UDP over TCP header needs 22 bytes // UDP over TCP header needs 22 bytes
DefaultMTU = 1500 - 20 - 8 - 21 DefaultMTU = 1500 - 20 - 8 - 21
) )
var (
LPool = &sync.Pool{
New: func() interface{} {
return make([]byte, LargeBufferSize)
},
}
)

View File

@@ -1,17 +0,0 @@
-----BEGIN CERTIFICATE-----
MIICnjCCAYYCCQCZjx/vIRKxhjANBgkqhkiG9w0BAQsFADAQMQ4wDAYDVQQDDAVj
aGluYTAgFw0yMTEwMzAxNDA2NDRaGA8yMTIxMTAwNjE0MDY0NFowEDEOMAwGA1UE
AwwFY2hpbmEwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC9Zpg8u+dl
gNbPeyWGKuG+yCIE4eh8s9OEuiBjBFV/+p40sQJuxjTtdQWG0LzrWse8IjR3xan4
fyACriP7YJYvzOA1Zi3+G4312NgEvpHZqzp1H6AbKa2voOrxws32RP4vHcCPsWO8
2hnCz0Q2NE4alVqBllTjIM5jESvAGko6C5XNSo9qOZUR8A1sMoufkZTx13A1gpeG
iboqgoCY0vagYB9lRqjBgyxj/bD++Kv5hUC9G5RY2i/l4ZYJx0AYgrLoy4lUtxWP
d5gyAuUAsi+38ziZzPVcGv8g/a/9ga24/QQu2iWdmSzu2h/sxd1pcj6jGCxhyylO
GxJZ1RAhNHalAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAJQzOiUS+3kufHpfvq2i
+07m4t+jo0x4zg1zsVe2Tdv9MCluPuC8fWnDzSPYICTFrVCdhMq4y8Y7HsY/JjG3
+tbArfguWlmxAPVA6f3yNoRZ3oihMUDNjq00Ho28UVomaywoDpcJ8fjWvcpH+xTH
h6Oh1rxQN+n3r7amfGdMVLdeCs7Wylmj0oCYdnkwla7OPEULLn/JPG7O+S9zqEi0
b4x9ij75erZp8mgRQs84C9vzuUgygtYw0b5zycKFP9Rp42Lm1xqV2lX8f3uVO38L
25S9mUtCe63zS1V2MXXldvQtBonO5I8UfV3IZoyAw9pA8s3MfJv4Fi5gaFjV+gZq
W90=
-----END CERTIFICATE-----

View File

@@ -1,28 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQC9Zpg8u+dlgNbP
eyWGKuG+yCIE4eh8s9OEuiBjBFV/+p40sQJuxjTtdQWG0LzrWse8IjR3xan4fyAC
riP7YJYvzOA1Zi3+G4312NgEvpHZqzp1H6AbKa2voOrxws32RP4vHcCPsWO82hnC
z0Q2NE4alVqBllTjIM5jESvAGko6C5XNSo9qOZUR8A1sMoufkZTx13A1gpeGiboq
goCY0vagYB9lRqjBgyxj/bD++Kv5hUC9G5RY2i/l4ZYJx0AYgrLoy4lUtxWPd5gy
AuUAsi+38ziZzPVcGv8g/a/9ga24/QQu2iWdmSzu2h/sxd1pcj6jGCxhyylOGxJZ
1RAhNHalAgMBAAECggEBALFNpMBeSz1rQiyTJMqXxCKcKbSWYtSyZxV+RClNeljH
HWlIN3XJ2OxeOyE8sU5F+mr1PlbNVNOK9kVsDcUaYx42VcHHeNDDrL50E61FVTYG
pD/WrkQfXTfnlWljKvobFjS3Tnd14V9+cNU8wKdZibA7FrHvMGI7aNm2zlUUh14T
sGi1J8vqp69VX4VUtIs9nXMe8DWIs8bVcwbcOnitj/G7i441JCSOY5E/gMRN0JHg
UPNSw5erWQw6rlTZ7fqDhVx/xR5rGCfWsuszIy/kZkp9JMDDve3GyodljepZGA2n
4t0vW1s0OYtwbCjhecSXJpR7WFnZY/ll5sOru/pDIqECgYEA+4jUdOX+IWREHWb3
vNmREWuLmMct+KVaCYJ9GGMQfIwSIJQlJq+NuIcIzt1iN8I0l3EI52byxGyGtHkY
mU6w5eQRpMO/Dv7vKyL6S3aiqESxuy7HWd6mg7bEu/JGuQcBJcRWLLrGsj551K3f
RArltqcxjfw3CytrF/JPW43wMB0CgYEAwMNhXgW42yQNG60zhc4zYnPp8bVwJp/A
yL59d5QXjkyOEO+UlwoOwWGOg56+6cEZNfxc4zjE9nbyz7U5ypFICP82pE00AHXO
q6IUZxqGm/JxQhL61J7jT+wG3e/vwfsteduMlit7vOx/KC9xQW3eCCif0R6Uu1oD
j6Bhub24KikCgYEA9SPqYy9PGD3+wGT8ppmnb0HP4AqmfACymjgJML5DcA6XEBcx
id1oEmHQLMPHmC32UW3BTryfdt2J/tNSLP9rGfHHXDvFtZixgOnq46bwaWvhh1rU
wHpk8FMsszswv9zaunL5xUgWo9qNo7/8qvSv4e5aNlWLU6ByE/l8a+8OGeUCgYA/
KsdpbC6bgUDaZPORUXT9Okbbcj2CKq+eGO48lUby1ptnaVsj86PKMxHkh8zABQsh
6cT2oM/KhEglUJnTi7AzYo6hYLs9u10yWTaeYs7ho50Brf6MVlTfB9VoPQwwYQMR
/6QeQbmWu1kf7gwLCNnNiqJ0gLT0gBbSphfgKg+DoQKBgQCJutEP9+9M8IGtckUz
VWeyEMJKQ2gCn4zGCxEqJ6UsZpIUaw69SdRIaeLMqInAacuhvg7phswQUAZCEvrS
9xLcgBMF0l8DaQg2h+6nbmKn/jEdjkuvZ7yGkg8bx54TK51DAnMdn0V15b3SRbrB
G3rfc1Y6M1U3AjTdvktgivU1DQ==
-----END PRIVATE KEY-----

View File

@@ -1 +0,0 @@
openssl req -x509 -nodes -days 36500 -newkey rsa:2048 -subj "/CN=china" -keyout server.key -out server.crt

View File

@@ -1,34 +0,0 @@
package config
import (
"crypto/tls"
"embed"
log "github.com/sirupsen/logrus"
)
//go:embed server.crt
var crt embed.FS
//go:embed server.key
var key embed.FS
var TlsConfigServer *tls.Config
var TlsConfigClient *tls.Config
func init() {
crtBytes, _ := crt.ReadFile("server.crt")
keyBytes, _ := key.ReadFile("server.key")
pair, err := tls.X509KeyPair(crtBytes, keyBytes)
if err != nil {
log.Fatal(err)
}
TlsConfigServer = &tls.Config{
Certificates: []tls.Certificate{pair},
}
TlsConfigClient = &tls.Config{
Certificates: []tls.Certificate{pair},
InsecureSkipVerify: true,
}
}

View File

@@ -1,52 +0,0 @@
package config
import (
"crypto/tls"
"fmt"
"io"
"net"
"testing"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/util"
)
func init() {
util.InitLogger(true)
}
func TestName(t *testing.T) {
listen, _ := net.Listen("tcp", ":9090")
listener := tls.NewListener(listen, TlsConfigServer)
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Errorln(err)
}
go func(conn net.Conn) {
bytes := make([]byte, 1024)
all, err2 := conn.Read(bytes)
if err2 != nil {
log.Errorln(err2)
return
}
defer conn.Close()
fmt.Println(string(bytes[:all]))
io.WriteString(conn, "hello client")
}(conn)
}
}()
dial, err := net.Dial("tcp", ":9090")
if err != nil {
log.Errorln(err)
}
client := tls.Client(dial, TlsConfigClient)
client.Write([]byte("hi server"))
all, err := io.ReadAll(client)
if err != nil {
log.Errorln(err)
}
fmt.Println(string(all))
}

View File

@@ -67,9 +67,31 @@ func (*Chain) resolve(addr string) string {
return addr return addr
} }
func (c *Chain) getConn(_ context.Context) (net.Conn, error) { func (c *Chain) getConn(ctx context.Context) (net.Conn, error) {
if c.IsEmpty() { if c.IsEmpty() {
return nil, ErrorEmptyChain return nil, ErrorEmptyChain
} }
return c.Node().Client.Dial(c.Node().Addr) return c.Node().Client.Dial(ctx, c.Node().Addr)
}
type Handler interface {
Handle(ctx context.Context, conn net.Conn)
}
type Client struct {
Connector
Transporter
}
type Connector interface {
ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error)
}
type Transporter interface {
Dial(ctx context.Context, addr string) (net.Conn, error)
}
type Server struct {
Listener net.Listener
Handler Handler
} }

View File

@@ -1,19 +0,0 @@
package core
import (
"context"
"net"
)
type Client struct {
Connector
Transporter
}
type Connector interface {
ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error)
}
type Transporter interface {
Dial(addr string) (net.Conn, error)
}

View File

@@ -1,10 +0,0 @@
package core
import (
"context"
"net"
)
type Handler interface {
Handle(ctx context.Context, conn net.Conn)
}

View File

@@ -1,25 +0,0 @@
package core
import (
"sync"
"github.com/wencaiwulue/kubevpn/pkg/config"
)
var (
SPool = &sync.Pool{
New: func() interface{} {
return make([]byte, config.SmallBufferSize)
},
}
MPool = &sync.Pool{
New: func() interface{} {
return make([]byte, config.MediumBufferSize)
},
}
LPool = &sync.Pool{
New: func() interface{} {
return make([]byte, config.LargeBufferSize)
},
}
)

View File

@@ -1,18 +1,20 @@
package handler package core
import ( import (
"crypto/tls"
"net" "net"
"strings" "strings"
"github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/core"
"github.com/wencaiwulue/kubevpn/pkg/tun" "github.com/wencaiwulue/kubevpn/pkg/tun"
) )
var (
// RouteNAT Globe route table for inner ip
RouteNAT = NewNAT()
)
// Route example: // Route example:
// -L "tcp://:10800" -L "tun://:8422?net=223.254.0.100/16" // -L "tcp://:10800" -L "tun://:8422?net=223.254.0.100/16"
// -L "tun:/10.233.24.133:8422?net=223.254.0.102/16&route=223.254.0.0/16" // -L "tun:/10.233.24.133:8422?net=223.254.0.102/16&route=223.254.0.0/16"
@@ -23,46 +25,47 @@ type Route struct {
Retries int Retries int
} }
func (r *Route) parseChain() (*core.Chain, error) { func (r *Route) parseChain() (*Chain, error) {
// parse the base nodes // parse the base nodes
node, err := parseChainNode(r.ChainNode) node, err := parseChainNode(r.ChainNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return core.NewChain(r.Retries, node), nil return NewChain(r.Retries, node), nil
} }
func parseChainNode(ns string) (*core.Node, error) { func parseChainNode(ns string) (*Node, error) {
node, err := core.ParseNode(ns) node, err := ParseNode(ns)
if err != nil { if err != nil {
return nil, err return nil, err
} }
node.Client = &core.Client{ node.Client = &Client{
Connector: core.UDPOverTCPTunnelConnector(), Connector: UDPOverTCPTunnelConnector(),
Transporter: core.TCPTransporter(), Transporter: TCPTransporter(),
} }
return node, nil return node, nil
} }
func (r *Route) GenerateServers() ([]core.Server, error) { func (r *Route) GenerateServers() ([]Server, error) {
chain, err := r.parseChain() chain, err := r.parseChain()
if err != nil && !errors.Is(err, core.ErrorInvalidNode) { if err != nil && !errors.Is(err, ErrorInvalidNode) {
return nil, err return nil, err
} }
servers := make([]core.Server, 0, len(r.ServeNodes)) servers := make([]Server, 0, len(r.ServeNodes))
for _, serveNode := range r.ServeNodes { for _, serveNode := range r.ServeNodes {
node, err := core.ParseNode(serveNode) var node *Node
node, err = ParseNode(serveNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var ln net.Listener var ln net.Listener
var handler core.Handler var handler Handler
switch node.Protocol { switch node.Protocol {
case "tun": case "tun":
handler = core.TunHandler(chain, node) handler = TunHandler(chain, node)
ln, err = tun.Listener(tun.Config{ ln, err = tun.Listener(tun.Config{
Name: node.Get("name"), Name: node.Get("name"),
Addr: node.Get("net"), Addr: node.Get("net"),
@@ -74,14 +77,13 @@ func (r *Route) GenerateServers() ([]core.Server, error) {
return nil, err return nil, err
} }
default: default:
handler = core.TCPHandler() handler = TCPHandler()
tcpListener, err := core.TCPListener(node.Addr) ln, err = TCPListener(node.Addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ln = tls.NewListener(tcpListener, config.TlsConfigServer)
} }
servers = append(servers, core.Server{Listener: ln, Handler: handler}) servers = append(servers, Server{Listener: ln, Handler: handler})
} }
return servers, nil return servers, nil
} }

View File

@@ -1,37 +0,0 @@
package core
import (
"context"
"net"
log "github.com/sirupsen/logrus"
)
type Server struct {
Listener net.Listener
Handler Handler
}
// Serve serves as a proxy server.
func (s *Server) Serve(ctx context.Context) error {
l := s.Listener
defer l.Close()
//go func() {
// <-ctx.Done()
// l.Close()
//}()
for {
select {
case <-ctx.Done():
return nil
default:
conn, err := l.Accept()
if err != nil {
log.Warnf("server: accept error: %v", err)
continue
}
go s.Handler.Handle(ctx, conn)
}
}
}

View File

@@ -1,7 +1,7 @@
package core package core
import ( import (
"crypto/tls" "context"
"net" "net"
"github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/config"
@@ -13,9 +13,9 @@ func TCPTransporter() Transporter {
return &tcpTransporter{} return &tcpTransporter{}
} }
func (tr *tcpTransporter) Dial(addr string) (net.Conn, error) { func (tr *tcpTransporter) Dial(ctx context.Context, addr string) (net.Conn, error) {
dialer := &net.Dialer{Timeout: config.DialTimeout} dialer := &net.Dialer{Timeout: config.DialTimeout}
return tls.DialWithDialer(dialer, "tcp", addr, config.TlsConfigClient) return dialer.DialContext(ctx, "tcp", addr)
} }
func TCPListener(addr string) (net.Listener, error) { func TCPListener(addr string) (net.Listener, error) {
@@ -39,7 +39,17 @@ func (ln *tcpKeepAliveListener) Accept() (c net.Conn, err error) {
if err != nil { if err != nil {
return return
} }
_ = conn.SetKeepAlive(true) err = conn.SetKeepAlive(true)
_ = conn.SetKeepAlivePeriod(config.KeepAliveTime) if err != nil {
return nil, err
}
err = conn.SetKeepAlivePeriod(config.KeepAliveTime)
if err != nil {
return nil, err
}
err = conn.SetNoDelay(true)
if err != nil {
return nil, err
}
return conn, nil return conn, nil
} }

View File

@@ -7,6 +7,8 @@ import (
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/config"
) )
type fakeUDPTunnelConnector struct { type fakeUDPTunnelConnector struct {
@@ -18,40 +20,48 @@ func UDPOverTCPTunnelConnector() Connector {
func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) { func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) {
defer conn.SetDeadline(time.Time{}) defer conn.SetDeadline(time.Time{})
switch con := conn.(type) {
case *net.TCPConn:
err := con.SetNoDelay(true)
if err != nil {
return nil, err
}
}
return newFakeUDPTunnelConnOverTCP(ctx, conn) return newFakeUDPTunnelConnOverTCP(ctx, conn)
} }
type fakeUdpHandler struct { type fakeUdpHandler struct {
nat *NAT
} }
// TCPHandler creates a server Handler
func TCPHandler() Handler { func TCPHandler() Handler {
return &fakeUdpHandler{} return &fakeUdpHandler{
nat: RouteNAT,
}
} }
var Server8422, _ = net.ResolveUDPAddr("udp", "127.0.0.1:8422")
func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) {
defer tcpConn.Close() defer tcpConn.Close()
log.Debugf("[tcpserver] %s -> %s\n", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) log.Debugf("[tcpserver] %s -> %s\n", tcpConn.RemoteAddr(), tcpConn.LocalAddr())
// serve tunnel udp, tunnel <-> remote, handle tunnel udp request
udpConn, err := net.DialUDP("udp", nil, Server8422) udpConn, err := net.DialUDP("udp", nil, Server8422)
if err != nil { if err != nil {
log.Errorf("[tcpserver] udp-tun %s -> %s : %s", tcpConn.RemoteAddr(), udpConn.LocalAddr(), err) log.Errorf("[tcpserver] udp-tun %s -> %s : %s", tcpConn.RemoteAddr(), udpConn.LocalAddr(), err)
return return
} }
defer udpConn.Close() defer udpConn.Close()
defer func(addr net.Addr) {
n := h.nat.RemoveAddr(addr)
log.Debugf("delete addr %s from globle route, deleted count %d", addr, n)
}(udpConn.LocalAddr())
log.Debugf("[tcpserver] udp-tun %s <-> %s", tcpConn.RemoteAddr(), udpConn.LocalAddr()) log.Debugf("[tcpserver] udp-tun %s <-> %s", tcpConn.RemoteAddr(), udpConn.LocalAddr())
_ = h.tunnelServerUDP(tcpConn, udpConn)
log.Debugf("[tcpserver] udp-tun %s >-< %s", tcpConn.RemoteAddr(), udpConn.LocalAddr())
return
}
var Server8422, _ = net.ResolveUDPAddr("udp", "127.0.0.1:8422")
func (h *fakeUdpHandler) tunnelServerUDP(tcpConn net.Conn, udpConn *net.UDPConn) (err error) {
errChan := make(chan error, 2) errChan := make(chan error, 2)
go func() { go func() {
b := LPool.Get().([]byte) b := config.LPool.Get().([]byte)
defer LPool.Put(b) defer config.LPool.Put(b[:])
for { for {
dgram, err := readDatagramPacket(tcpConn, b[:]) dgram, err := readDatagramPacket(tcpConn, b[:])
@@ -71,8 +81,8 @@ func (h *fakeUdpHandler) tunnelServerUDP(tcpConn net.Conn, udpConn *net.UDPConn)
}() }()
go func() { go func() {
b := LPool.Get().([]byte) b := config.LPool.Get().([]byte)
defer LPool.Put(b) defer config.LPool.Put(b[:])
for { for {
n, err := udpConn.Read(b[:]) n, err := udpConn.Read(b[:])
@@ -92,7 +102,12 @@ func (h *fakeUdpHandler) tunnelServerUDP(tcpConn net.Conn, udpConn *net.UDPConn)
log.Debugf("[tcpserver] udp-tun %s <<< %s length: %d", tcpConn.RemoteAddr(), dgram.Addr(), len(dgram.Data)) log.Debugf("[tcpserver] udp-tun %s <<< %s length: %d", tcpConn.RemoteAddr(), dgram.Addr(), len(dgram.Data))
} }
}() }()
return <-errChan err = <-errChan
if err != nil {
log.Error(err)
}
log.Debugf("[tcpserver] udp-tun %s >-< %s", tcpConn.RemoteAddr(), udpConn.LocalAddr())
return
} }
// fake udp connect over tcp // fake udp connect over tcp

View File

@@ -2,26 +2,115 @@ package core
import ( import (
"context" "context"
"errors" "fmt"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
"net" "net"
"strings"
"sync" "sync"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/util" "github.com/wencaiwulue/kubevpn/pkg/util"
) )
func ipToTunRouteKey(ip net.IP) string { const (
return ip.To16().String() MaxSize = 1024
} MaxThread = 10
)
type tunHandler struct { type tunHandler struct {
chain *Chain chain *Chain
node *Node node *Node
routes *sync.Map routes *NAT
chExit chan struct{} chExit chan error
}
type NAT struct {
lock *sync.RWMutex
routes map[string][]net.Addr
}
func NewNAT() *NAT {
return &NAT{
lock: &sync.RWMutex{},
routes: map[string][]net.Addr{},
}
}
func (n *NAT) RemoveAddr(addr net.Addr) (count int) {
n.lock.Lock()
defer n.lock.Unlock()
for k, v := range n.routes {
for i := 0; i < len(v); i++ {
if v[i].String() == addr.String() {
v = append(v[:i], v[i+1:]...)
i--
count++
}
}
n.routes[k] = v
}
return
}
func (n *NAT) LoadOrStore(to net.IP, addr net.Addr) (result net.Addr, load bool) {
n.lock.Lock()
defer n.lock.Unlock()
addrList := n.routes[to.String()]
for _, add := range addrList {
if add.String() == addr.String() {
load = true
result = addr
return
}
}
if addrList == nil {
n.routes[to.String()] = []net.Addr{addr}
result = addr
return
} else {
n.routes[to.String()] = append(n.routes[to.String()], addr)
result = addr
return
}
}
func (n *NAT) RouteTo(ip net.IP) net.Addr {
n.lock.Lock()
defer n.lock.Unlock()
for _, addr := range n.routes[ip.String()] {
return addr
}
return nil
}
func (n *NAT) Remove(ip net.IP, addr net.Addr) {
n.lock.Lock()
defer n.lock.Unlock()
addrList, ok := n.routes[ip.String()]
if !ok {
return
}
for i := 0; i < len(addrList); i++ {
if addrList[i].String() == addr.String() {
addrList = append(addrList[:i], addrList[i+1:]...)
i--
}
}
n.routes[ip.String()] = addrList
return
}
func (n *NAT) Range(f func(key string, v []net.Addr)) {
n.lock.Lock()
defer n.lock.Unlock()
for k, v := range n.routes {
f(k, v)
}
} }
// TunHandler creates a handler for tun tunnel. // TunHandler creates a handler for tun tunnel.
@@ -29,53 +118,152 @@ func TunHandler(chain *Chain, node *Node) Handler {
return &tunHandler{ return &tunHandler{
chain: chain, chain: chain,
node: node, node: node,
routes: &sync.Map{}, routes: RouteNAT,
chExit: make(chan struct{}, 1), chExit: make(chan error, 1),
} }
} }
func (h *tunHandler) Handle(ctx context.Context, tun net.Conn) { func (h *tunHandler) Handle(ctx context.Context, tun net.Conn) {
defer tun.Close() if h.node.Remote != "" {
var err error h.HandleClient(ctx, tun)
var remoteAddr net.Addr } else {
if addr := h.node.Remote; addr != "" { h.HandleServer(ctx, tun)
remoteAddr, err = net.ResolveUDPAddr("udp", addr) }
}
func (h tunHandler) printRoute() {
for {
select {
case <-time.Tick(time.Second * 5):
var i int
h.routes.Range(func(key string, value []net.Addr) {
i++
var s []string
for _, addr := range value {
if addr != nil {
s = append(s, addr.String())
}
}
fmt.Printf("to: %s, route: %s\n", key, strings.Join(s, " "))
})
fmt.Println(i)
}
}
}
type Device struct {
tun net.Conn
closed atomic.Bool
thread int
tunInboundRaw chan *DataElem
tunInbound chan *DataElem
tunOutbound chan *DataElem
chExit chan error
}
func (d *Device) readFromTun() {
for {
b := config.LPool.Get().([]byte)
n, err := d.tun.Read(b[:])
if err != nil { if err != nil {
log.Errorf("[tun] %s: remote addr: %v", tun.LocalAddr(), err) select {
case d.chExit <- err:
default:
}
return
}
if d.closed.Load() {
return
}
d.tunInboundRaw <- &DataElem{
data: b[:],
length: n,
}
}
}
func (d *Device) writeToTun() {
for e := range d.tunOutbound {
_, err := d.tun.Write(e.data[:e.length])
config.LPool.Put(e.data[:])
if err != nil {
select {
case d.chExit <- err:
default:
}
return return
} }
} }
}
for { func (d *Device) parseIPHeader() {
err = func() error { for e := range d.tunInboundRaw {
var err error if util.IsIPv4(e.data[:e.length]) {
var packetConn net.PacketConn // ipv4.ParseHeader
if remoteAddr != nil && !h.chain.IsEmpty() { b := e.data[:e.length]
cc, err := h.chain.DialContext(ctx) e.src = net.IPv4(b[12], b[13], b[14], b[15])
if err != nil { e.dst = net.IPv4(b[16], b[17], b[18], b[19])
return err } else if util.IsIPv6(e.data[:e.length]) {
} // ipv6.ParseHeader
var ok bool e.src = e.data[:e.length][8:24]
packetConn, ok = cc.(net.PacketConn) e.dst = e.data[:e.length][24:40]
if !ok { } else {
err = errors.New("not a packet connection") log.Errorf("[tun] unknown packet")
log.Errorf("[tun] %s - %s: %s", tun.LocalAddr(), remoteAddr, err) continue
return err
}
} else {
var lc net.ListenConfig
packetConn, err = lc.ListenPacket(ctx, "udp", h.node.Addr)
}
if err != nil {
return err
}
return h.transportTun(ctx, tun, packetConn, remoteAddr)
}()
if err != nil {
log.Debugf("[tun] %s: %v", tun.LocalAddr(), err)
} }
log.Debugf("[tun] %s --> %s", e.src, e.dst)
if d.closed.Load() {
return
}
d.tunInbound <- e
}
}
func (d *Device) Close() {
d.closed.Store(true)
d.tun.Close()
close(d.tunInboundRaw)
close(d.tunOutbound)
}
func (d *Device) Start() {
go d.readFromTun()
for i := 0; i < d.thread; i++ {
go d.parseIPHeader()
}
go d.writeToTun()
}
func (h *tunHandler) HandleServer(ctx context.Context, tunConn net.Conn) {
go h.printRoute()
tun := &Device{
tun: tunConn,
thread: MaxThread,
closed: atomic.Bool{},
tunInboundRaw: make(chan *DataElem, MaxSize),
tunInbound: make(chan *DataElem, MaxSize),
tunOutbound: make(chan *DataElem, MaxSize),
chExit: h.chExit,
}
defer tun.Close()
tun.Start()
for {
var lc net.ListenConfig
packetConn, err := lc.ListenPacket(ctx, "udp", h.node.Addr)
if err != nil {
log.Debugf("[udp] can not listen %s, err: %v", h.node.Addr, err)
goto errH
}
err = h.transportTun(ctx, tun, packetConn)
if err != nil {
log.Debugf("[tun] %s: %v", tunConn.LocalAddr(), err)
}
errH:
select { select {
case <-h.chExit: case <-h.chExit:
case <-ctx.Done(): case <-ctx.Done():
@@ -86,160 +274,164 @@ func (h *tunHandler) Handle(ctx context.Context, tun net.Conn) {
} }
} }
func (h *tunHandler) findRouteFor(dst net.IP) net.Addr { type DataElem struct {
if v, ok := h.routes.Load(ipToTunRouteKey(dst)); ok { data []byte
return v.(net.Addr) length int
} src net.IP
//for _, route := range h.options.IPRoutes { dst net.IP
// if route.Dest.Contains(dst) && route.Gateway != nil {
// if v, ok := h.routes.Load(ipToTunRouteKey(route.Gateway)); ok {
// return v.(net.Addr)
// }
// }
//}
return nil
} }
func (h *tunHandler) transportTun(ctx context.Context, tun net.Conn, conn net.PacketConn, remoteAddr net.Addr) error { type udpElem struct {
errChan := make(chan error, 2) from net.Addr
defer conn.Close() data []byte
go func() { length int
b := LPool.Get().([]byte) src net.IP
defer LPool.Put(b) dst net.IP
}
for { type Peer struct {
err := func() error { conn net.PacketConn
n, err := tun.Read(b[:]) thread int
if err != nil { closed *atomic.Bool
select {
case h.chExit <- struct{}{}:
default:
}
return err
}
// client side, deliver packet directly. connInbound chan *udpElem
if remoteAddr != nil { parsedConnInfo chan *udpElem
_, err = conn.WriteTo(b[:n], remoteAddr)
return err
}
var src, dst net.IP tun *Device
if util.IsIPv4(b[:n]) { routes *NAT
header, err := ipv4.ParseHeader(b[:n])
if err != nil {
log.Errorf("[tun] %s: %v", tun.LocalAddr(), err)
return nil
}
log.Debugf("[tun] %s", header.String())
src, dst = header.Src, header.Dst
} else if util.IsIPv6(b[:n]) {
header, err := ipv6.ParseHeader(b[:n])
if err != nil {
log.Errorf("[tun] %s: %v", tun.LocalAddr(), err)
return nil
}
log.Debugf("[tun] %s", header.String())
src, dst = header.Src, header.Dst
} else {
log.Errorf("[tun] unknown packet")
return nil
}
addr := h.findRouteFor(dst) errChan chan error
if addr == nil { }
log.Debugf("[tun] no route for %s -> %s", src, dst)
return nil
}
log.Debugf("[tun] find route: %s -> %s", dst, addr) func (p *Peer) readFromConn() {
_, err = conn.WriteTo(b[:n], addr) for {
return err b := config.LPool.Get().([]byte)
}() n, srcAddr, err := p.conn.ReadFrom(b[:])
if err != nil {
p.errChan <- err
return
}
if p.closed.Load() {
return
}
p.connInbound <- &udpElem{
from: srcAddr,
data: b[:],
length: n,
}
}
}
func (p *Peer) parseHeader() {
for e := range p.connInbound {
if util.IsIPv4(e.data[:e.length]) {
// ipv4.ParseHeader
b := e.data[:e.length]
e.src = net.IPv4(b[12], b[13], b[14], b[15])
e.dst = net.IPv4(b[16], b[17], b[18], b[19])
} else if util.IsIPv6(e.data[:e.length]) {
// ipv6.ParseHeader
e.src = e.data[:e.length][8:24]
e.dst = e.data[:e.length][24:40]
} else {
log.Errorf("[tun] unknown packet")
continue
}
if _, loaded := p.routes.LoadOrStore(e.src, e.from); loaded {
log.Debugf("[tun] add route: %s -> %s", e.src, e.from)
} else {
log.Debugf("[tun] new route: %s -> %s", e.src, e.from)
}
if p.closed.Load() {
return
}
p.parsedConnInfo <- e
}
}
func (p *Peer) route() {
for e := range p.parsedConnInfo {
if routeToAddr := p.routes.RouteTo(e.dst); routeToAddr != nil {
log.Debugf("[tun] find route: %s -> %s", e.dst, routeToAddr)
_, err := p.conn.WriteTo(e.data[:e.length], routeToAddr)
config.LPool.Put(e.data[:])
if err != nil { if err != nil {
select { log.Error(err)
case errChan <- err: }
default: } else {
if !p.tun.closed.Load() {
p.tun.tunOutbound <- &DataElem{
data: e.data,
length: e.length,
src: e.src,
dst: e.dst,
} }
return
} }
} }
}() }
}
func (p *Peer) Start() {
go p.readFromConn()
for i := 0; i < p.thread; i++ {
go p.parseHeader()
}
go p.route()
}
func (p *Peer) Close() {
p.closed.Store(true)
p.conn.Close()
close(p.connInbound)
close(p.parsedConnInfo)
}
func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.PacketConn) error {
errChan := make(chan error, 2)
p := Peer{
conn: conn,
thread: MaxThread,
closed: &atomic.Bool{},
connInbound: make(chan *udpElem, MaxSize),
parsedConnInfo: make(chan *udpElem, MaxSize),
tun: tun,
routes: h.routes,
errChan: errChan,
}
defer p.Close()
p.Start()
go func() { go func() {
b := LPool.Get().([]byte) var err error
defer LPool.Put(b) for e := range tun.tunInbound {
retry:
addr := h.routes.RouteTo(e.dst)
if addr == nil {
log.Debug(fmt.Errorf("[tun] no route for %s -> %s", e.src, e.dst))
continue
}
for { log.Debugf("[tun] find route: %s -> %s", e.dst, addr)
err := func() error { _, err = conn.WriteTo(e.data[:e.length], addr)
n, srcAddr, err := conn.ReadFrom(b[:]) // err should never nil, so retry is not work
if err != nil { if err != nil {
return err h.routes.Remove(e.dst, addr)
} log.Debugf("[tun] remove invalid route: %s -> %s", e.dst, addr)
goto retry
// client side, deliver packet to tun device. }
if remoteAddr != nil { config.LPool.Put(e.data[:])
_, err = tun.Write(b[:n])
return err
}
var src, dst net.IP
if util.IsIPv4(b[:n]) {
header, err := ipv4.ParseHeader(b[:n])
if err != nil {
log.Errorf("[tun] %s: %v", tun.LocalAddr(), err)
return nil
}
log.Debugf("[tun] %s", header.String())
src, dst = header.Src, header.Dst
} else if util.IsIPv6(b[:n]) {
header, err := ipv6.ParseHeader(b[:n])
if err != nil {
log.Errorf("[tun] %s: %v", tun.LocalAddr(), err)
return nil
}
log.Debugf("[tun] %s", header.String())
src, dst = header.Src, header.Dst
} else {
log.Errorf("[tun] unknown packet")
return nil
}
routeKey := ipToTunRouteKey(src)
if actual, loaded := h.routes.LoadOrStore(routeKey, srcAddr); loaded {
if actual.(net.Addr).String() != srcAddr.String() {
log.Debugf("[tun] update route: %s -> %s (old %s)", src, srcAddr, actual.(net.Addr))
h.routes.Store(routeKey, srcAddr)
}
} else {
log.Debugf("[tun] new route: %s -> %s", src, srcAddr)
}
if routeToAddr := h.findRouteFor(dst); routeToAddr != nil {
log.Debugf("[tun] find route: %s -> %s", dst, routeToAddr)
_, err = conn.WriteTo(b[:n], routeToAddr)
return err
}
if _, err = tun.Write(b[:n]); err != nil {
select {
case h.chExit <- struct{}{}:
default:
}
return err
}
return nil
}()
if err != nil { if err != nil {
select { goto errH
case errChan <- err:
default:
}
return
} }
} }
errH:
if err != nil {
errChan <- err
return
}
}() }()
select { select {

122
pkg/core/tunhandlercli.go Normal file
View File

@@ -0,0 +1,122 @@
package core
import (
"context"
"errors"
"net"
"sync/atomic"
log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/config"
)
func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) {
d := &Device{
tun: tun,
closed: atomic.Bool{},
thread: MaxThread,
tunInboundRaw: make(chan *DataElem, MaxSize),
tunInbound: make(chan *DataElem, MaxSize),
tunOutbound: make(chan *DataElem, MaxSize),
chExit: h.chExit,
}
defer d.Close()
d.Start()
remoteAddr, err := net.ResolveUDPAddr("udp", h.node.Remote)
if err != nil {
log.Errorf("[tun] %s: remote addr: %v", tun.LocalAddr(), err)
return
}
for i := 0; i < MaxThread; i++ {
go func() {
for {
if ctx.Err() != nil {
return
}
var packetConn net.PacketConn
if !h.chain.IsEmpty() {
cc, errs := h.chain.DialContext(ctx)
if errs != nil {
log.Error(errs)
continue
}
var ok bool
if packetConn, ok = cc.(net.PacketConn); !ok {
errs = errors.New("not a packet connection")
log.Errorf("[tun] %s - %s: %s", tun.LocalAddr(), remoteAddr, errs)
continue
}
} else {
var errs error
var lc net.ListenConfig
packetConn, errs = lc.ListenPacket(ctx, "udp", "")
if errs != nil {
log.Error(err)
continue
}
}
errs := h.transportTunCli(ctx, d, packetConn, remoteAddr)
if errs != nil {
log.Debugf("[tun] %s: %v", tun.LocalAddr(), errs)
}
}
}()
}
select {
case s := <-h.chExit:
log.Error(s)
return
case <-ctx.Done():
return
}
}
func (h *tunHandler) transportTunCli(ctx context.Context, d *Device, conn net.PacketConn, remoteAddr net.Addr) error {
errChan := make(chan error, 2)
defer conn.Close()
go func() {
var err error
for e := range d.tunInbound {
if e.src.Equal(e.dst) {
if d.closed.Load() {
return
}
d.tunOutbound <- e
continue
}
_, err = conn.WriteTo(e.data[:e.length], remoteAddr)
config.LPool.Put(e.data[:])
if err != nil {
errChan <- err
return
}
}
}()
go func() {
for {
b := config.LPool.Get().([]byte)
n, _, err := conn.ReadFrom(b[:])
if err != nil {
errChan <- err
return
}
if d.closed.Load() {
return
}
d.tunOutbound <- &DataElem{data: b[:], length: n}
}
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
return nil
}
}

View File

@@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"github.com/wencaiwulue/kubevpn/pkg/config"
) )
type datagramPacket struct { type datagramPacket struct {
@@ -44,8 +46,8 @@ func readDatagramPacket(r io.Reader, b []byte) (*datagramPacket, error) {
} }
func (addr *datagramPacket) Write(w io.Writer) error { func (addr *datagramPacket) Write(w io.Writer) error {
b := LPool.Get().([]byte) b := config.LPool.Get().([]byte)
defer LPool.Put(b) defer config.LPool.Put(b[:])
binary.BigEndian.PutUint16(b[:2], uint16(len(addr.Data))) binary.BigEndian.PutUint16(b[:2], uint16(len(addr.Data)))
n := copy(b[2:], addr.Data) n := copy(b[2:], addr.Data)
_, err := w.Write(b[:n+2]) _, err := w.Write(b[:n+2])

View File

@@ -61,7 +61,7 @@ iptables -P FORWARD ACCEPT
iptables -t nat -A PREROUTING ! -p icmp -j DNAT --to ${LocalTunIP} iptables -t nat -A PREROUTING ! -p icmp -j DNAT --to ${LocalTunIP}
iptables -t nat -A POSTROUTING ! -p icmp -j MASQUERADE iptables -t nat -A POSTROUTING ! -p icmp -j MASQUERADE
iptables -t nat -A OUTPUT -o lo ! -p icmp -j DNAT --to-destination ${LocalTunIP} iptables -t nat -A OUTPUT -o lo ! -p icmp -j DNAT --to-destination ${LocalTunIP}
kubevpn serve -L "tun://0.0.0.0:8421/${TrafficManagerRealIP}:8422?net=${InboundPodTunIP}&route=${CIDR}" --debug=true`, kubevpn serve -L "tun:/127.0.0.1:8422?net=${InboundPodTunIP}&route=${CIDR}" -F "tcp://${TrafficManagerRealIP}:10800" --debug=true`,
}, },
SecurityContext: &corev1.SecurityContext{ SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{ Capabilities: &corev1.Capabilities{

View File

@@ -240,7 +240,7 @@ func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress
for _, ipNet := range c.cidrs { for _, ipNet := range c.cidrs {
list.Insert(ipNet.String()) list.Insert(ipNet.String())
} }
r := Route{ r := core.Route{
ServeNodes: []string{ ServeNodes: []string{
fmt.Sprintf("tun:/127.0.0.1:8422?net=%s&route=%s", c.localTunIP.String(), strings.Join(list.List(), ",")), fmt.Sprintf("tun:/127.0.0.1:8422?net=%s&route=%s", c.localTunIP.String(), strings.Join(list.List(), ",")),
}, },
@@ -299,6 +299,7 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) {
}() }()
w, err := c.clientset.CoreV1().Pods(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{Watch: true, TimeoutSeconds: pointer.Int64(30)}) w, err := c.clientset.CoreV1().Pods(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{Watch: true, TimeoutSeconds: pointer.Int64(30)})
if err != nil { if err != nil {
time.Sleep(time.Second * 5)
log.Debugf("wait pod failed, err: %v", err) log.Debugf("wait pod failed, err: %v", err)
return return
} }
@@ -349,6 +350,7 @@ func (c *ConnectOptions) addRouteDynamic(ctx context.Context) {
w, err := c.clientset.CoreV1().Services(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{Watch: true, TimeoutSeconds: pointer.Int64(30)}) w, err := c.clientset.CoreV1().Services(v1.NamespaceAll).Watch(ctx, metav1.ListOptions{Watch: true, TimeoutSeconds: pointer.Int64(30)})
if err != nil { if err != nil {
log.Debugf("wait service failed, err: %v", err) log.Debugf("wait service failed, err: %v", err)
time.Sleep(time.Second * 5)
return return
} }
defer w.Stop() defer w.Stop()
@@ -389,15 +391,17 @@ func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) {
go util.Heartbeats(ctx) go util.Heartbeats(ctx)
} }
func (c *ConnectOptions) setupDNS() { func (c *ConnectOptions) setupDNS() error {
const port = 53 const port = 53
pod, err := c.GetRunningPodList() pod, err := c.GetRunningPodList()
if err != nil { if err != nil {
log.Errorln(err) log.Errorln(err)
return err
} }
relovConf, err := dns.GetDNSServiceIPFromPod(c.clientset, c.restclient, c.config, pod[0].GetName(), c.Namespace) relovConf, err := dns.GetDNSServiceIPFromPod(c.clientset, c.restclient, c.config, pod[0].GetName(), c.Namespace)
if err != nil { if err != nil {
log.Errorln(err) log.Errorln(err)
return err
} }
if relovConf.Port == "" { if relovConf.Port == "" {
relovConf.Port = strconv.Itoa(port) relovConf.Port = strconv.Itoa(port)
@@ -416,13 +420,14 @@ func (c *ConnectOptions) setupDNS() {
} }
} }
if err = dns.SetupDNS(relovConf, ns.List()); err != nil { if err = dns.SetupDNS(relovConf, ns.List()); err != nil {
log.Warningln(err) return err
} }
// dump service in current namespace for support DNS resolve service:port // dump service in current namespace for support DNS resolve service:port
go dns.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.Namespace)) go dns.AddServiceNameToHosts(ctx, c.clientset.CoreV1().Services(c.Namespace))
return nil
} }
func Start(ctx context.Context, r Route) error { func Start(ctx context.Context, r core.Route) error {
servers, err := r.GenerateServers() servers, err := r.GenerateServers()
if err != nil { if err != nil {
return errors.WithStack(err) return errors.WithStack(err)
@@ -430,12 +435,19 @@ func Start(ctx context.Context, r Route) error {
if len(servers) == 0 { if len(servers) == 0 {
return errors.New("invalid config") return errors.New("invalid config")
} }
for _, rr := range servers { for _, server := range servers {
go func(ctx context.Context, server core.Server) { go func(ctx context.Context, server core.Server) {
if err = server.Serve(ctx); err != nil { l := server.Listener
log.Debug(err) defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Warnf("server: accept error: %v", err)
continue
}
go server.Handler.Handle(ctx, conn)
} }
}(ctx, rr) }(ctx, server)
} }
return nil return nil
} }

View File

@@ -174,12 +174,12 @@ func CreateOutboundPod(ctx context.Context, factory cmdutil.Factory, clientset *
var Resources = v1.ResourceRequirements{ var Resources = v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{ Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("128m"), v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("256Mi"), v1.ResourceMemory: resource.MustParse("1024Mi"),
}, },
Limits: map[v1.ResourceName]resource.Quantity{ Limits: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("256m"), v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("512Mi"), v1.ResourceMemory: resource.MustParse("2048Mi"),
}, },
} }

View File

@@ -14,7 +14,7 @@ import (
"github.com/wencaiwulue/kubevpn/pkg/util" "github.com/wencaiwulue/kubevpn/pkg/util"
) )
func Complete(route *Route) error { func Complete(route *core.Route) error {
if v, ok := os.LookupEnv(config.EnvInboundPodTunIP); ok && v == "" { if v, ok := os.LookupEnv(config.EnvInboundPodTunIP); ok && v == "" {
namespace := os.Getenv(config.EnvPodNamespace) namespace := os.Getenv(config.EnvPodNamespace)
if namespace == "" { if namespace == "" {

View File

@@ -36,7 +36,7 @@ iptables -P INPUT ACCEPT
iptables -P FORWARD ACCEPT iptables -P FORWARD ACCEPT
iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d ${CIDR} -j DNAT --to 127.0.0.1:15006 iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d ${CIDR} -j DNAT --to 127.0.0.1:15006
iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d ${CIDR} -j MASQUERADE iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d ${CIDR} -j MASQUERADE
kubevpn serve -L "tun:/${TrafficManagerRealIP}:8422?net=${InboundPodTunIP}&route=${CIDR}" --debug=true`, kubevpn serve -L "tun:/127.0.0.1:8422?net=${InboundPodTunIP}&route=${CIDR}" -F "tcp://${TrafficManagerRealIP}:10800" --debug=true`,
}, },
EnvFrom: []v1.EnvFromSource{{ EnvFrom: []v1.EnvFromSource{{
SecretRef: &v1.SecretEnvSource{ SecretRef: &v1.SecretEnvSource{

69
pkg/tun/route_darwin.txt Normal file
View File

@@ -0,0 +1,69 @@
package tun
// stolen from https://stackoverflow.com/a/11265543
#include <stdio.h>
#include <stdlib.h>
#include <err.h>
#include <sys/param.h>
#include <sys/sysctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <net/route.h>
/* Darwin doesn't define this for some very odd reason */
#ifndef SA_SIZE
# define SA_SIZE(sa) \
( (!(sa) || ((struct sockaddr *)(sa))->sa_len == 0) ? \
sizeof(long) : \
1 + ( (((struct sockaddr *)(sa))->sa_len - 1) | (sizeof(long) - 1) ) )
#endif
static void
ntreestuff(void)
{
size_t needed;
int mib[6];
char *buf, *next, *lim;
struct rt_msghdr *rtm;
struct sockaddr *sa;
struct sockaddr_in *sockin;
char line[MAXHOSTNAMELEN];
mib[0] = CTL_NET;
mib[1] = PF_ROUTE;
mib[2] = 0;
mib[3] = 0;
mib[4] = NET_RT_DUMP;
mib[5] = 0;
if (sysctl(mib, 6, NULL, &needed, NULL, 0) < 0) {
err(1, "sysctl: net.route.0.0.dump estimate");
}
if ((buf = (char *)malloc(needed)) == NULL) {
errx(2, "malloc(%lu)", (unsigned long)needed);
}
if (sysctl(mib, 6, buf, &needed, NULL, 0) < 0) {
err(1, "sysctl: net.route.0.0.dump");
}
lim = buf + needed;
for (next = buf; next < lim; next += rtm->rtm_msglen) {
rtm = (struct rt_msghdr *)next;
sa = (struct sockaddr *)(rtm + 1);
sa = (struct sockaddr *)(SA_SIZE(sa) + (char *)sa);
sockin = (struct sockaddr_in *)sa;
inet_ntop(AF_INET, &sockin->sin_addr.s_addr, line, sizeof(line) - 1);
printf("defaultrouter=%s\n", line);
}
free(buf);
}
int
main(int argc __unused, char *argv[] __unused)
{
ntreestuff();
return (0);
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wencaiwulue/kubevpn/pkg/core"
"golang.zx2c4.com/wireguard/device" "golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun" "golang.zx2c4.com/wireguard/tun"
@@ -82,10 +81,14 @@ type tunConn struct {
func (c *tunConn) Read(b []byte) (n int, err error) { func (c *tunConn) Read(b []byte) (n int, err error) {
offset := device.MessageTransportHeaderSize offset := device.MessageTransportHeaderSize
bytes := core.LPool.Get().([]byte) bytes := config.LPool.Get().([]byte)
core.LPool.Put(bytes) defer config.LPool.Put(bytes[:])
size, err := c.ifce.Read(bytes[:], offset) var size int
size, err = c.ifce.Read(bytes[:], offset)
if err != nil {
return 0, err
}
if size == 0 || size > device.MaxSegmentSize-device.MessageTransportHeaderSize { if size == 0 || size > device.MaxSegmentSize-device.MessageTransportHeaderSize {
return 0, nil return 0, nil
} }
@@ -96,8 +99,8 @@ func (c *tunConn) Write(b []byte) (n int, err error) {
if len(b) < device.MessageTransportHeaderSize { if len(b) < device.MessageTransportHeaderSize {
return 0, err return 0, err
} }
bytes := core.LPool.Get().([]byte) bytes := config.LPool.Get().([]byte)
defer core.LPool.Put(bytes) defer config.LPool.Put(bytes[:])
copy(bytes[device.MessageTransportOffsetContent:], b) copy(bytes[device.MessageTransportOffsetContent:], b)

View File

@@ -6,15 +6,14 @@ package util
import ( import (
"flag" "flag"
"fmt" "fmt"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/tools/clientcmd"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
"runtime" "runtime"
"syscall" "syscall"
"time" "time"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/tools/clientcmd"
) )
const envStartSudoKubeVPNByKubeVPN = "DEPTH_SIGNED_BY_NAISON" const envStartSudoKubeVPNByKubeVPN = "DEPTH_SIGNED_BY_NAISON"

View File

@@ -1,9 +0,0 @@
package util
func IsIPv4(packet []byte) bool {
return 4 == (packet[0] >> 4)
}
func IsIPv6(packet []byte) bool {
return 6 == (packet[0] >> 4)
}

View File

@@ -585,3 +585,11 @@ func DoReq(request *http.Request) (body []byte, err error) {
func GetTlsDomain(namespace string) string { func GetTlsDomain(namespace string) string {
return config.ConfigMapPodTrafficManager + "." + namespace + "." + "svc" return config.ConfigMapPodTrafficManager + "." + namespace + "." + "svc"
} }
func IsIPv4(packet []byte) bool {
return 4 == (packet[0] >> 4)
}
func IsIPv6(packet []byte) bool {
return 6 == (packet[0] >> 4)
}