perf: use chan to communicate between tcpserver and tun

This commit is contained in:
fengcaiwen
2023-07-22 20:08:35 +08:00
committed by naison
parent fdf75b0f0f
commit d87363d2cd
10 changed files with 147 additions and 108 deletions

View File

@@ -1,16 +1,13 @@
package cmds
import (
"context"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
"github.com/spf13/cobra"
"go.uber.org/automaxprocs/maxprocs"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
ctrl "sigs.k8s.io/controller-runtime"
"github.com/wencaiwulue/kubevpn/pkg/config"
"github.com/wencaiwulue/kubevpn/pkg/core"
@@ -37,17 +34,11 @@ func CmdServe(_ cmdutil.Factory) *cobra.Command {
return err
}
defer handler.Final()
ctx, cancelFunc := context.WithCancel(context.Background())
stopChan := make(chan os.Signal)
signal.Notify(stopChan, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL /*, syscall.SIGSTOP*/)
go func() {
<-stopChan
cancelFunc()
}()
servers, err := handler.Parse(*route)
if err != nil {
return err
}
ctx := ctrl.SetupSignalHandler()
return handler.Run(ctx, servers)
},
}

1
go.mod
View File

@@ -195,6 +195,7 @@ require (
golang.org/x/term v0.6.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/api v0.109.0 // indirect
google.golang.org/genproto v0.0.0-20230113154510-dbe35b8444a5 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.47.0 // indirect

2
go.sum
View File

@@ -397,6 +397,7 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J
github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY=
github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w=
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
@@ -1411,6 +1412,7 @@ golang.zx2c4.com/wireguard v0.0.0-20220920152132-bb719d3a6e2c/go.mod h1:enML0deD
golang.zx2c4.com/wireguard/windows v0.5.3 h1:On6j2Rpn3OEMXqBq00QEDC7bWSZrPIHKIus8eIuExIE=
golang.zx2c4.com/wireguard/windows v0.5.3/go.mod h1:9TEe8TJmtwyQebdFwAkEWOPr3prrtqm+REGFifP60hI=
gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY=
gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY=
google.golang.org/api v0.0.0-20160322025152-9bf6e6e569ff/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=

View File

@@ -135,7 +135,7 @@ var (
// network layer ip needs 20 bytes
// transport layer UDP header needs 8 bytes
// UDP over TCP header needs 22 bytes
DefaultMTU = 1500 - 20 - 8 - 21
DefaultMTU = 65521
)
var (

View File

@@ -4,6 +4,7 @@ import (
"net"
"os"
"strings"
"sync"
"github.com/containernetworking/cni/pkg/types"
"github.com/pkg/errors"
@@ -15,8 +16,17 @@ import (
var (
// RouteNAT Globe route table for inner ip
RouteNAT = NewNAT()
// RouteConnNAT map[srcIP]net.Conn
RouteConnNAT = &sync.Map{}
// Chan tcp connects
Chan = make(chan *TCPUDPacket, MaxSize)
)
type TCPUDPacket struct {
conn net.Conn
data *datagramPacket
}
// Route example:
// -L "tcp://:10800" -L "tun://:8422?net=223.254.0.100/16"
// -L "tun:/10.233.24.133:8422?net=223.254.0.102/16&route=223.254.0.0/16"

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"net"
"sync"
"time"
log "github.com/sirupsen/logrus"
@@ -39,12 +40,15 @@ func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Co
}
type fakeUdpHandler struct {
nat *NAT
// map[srcIP]net.Conn
connNAT *sync.Map
ch chan *TCPUDPacket
}
func TCPHandler() Handler {
return &fakeUdpHandler{
nat: RouteNAT,
connNAT: RouteConnNAT,
ch: Chan,
}
}
@@ -53,69 +57,33 @@ var Server8422, _ = net.ResolveUDPAddr("udp", "localhost:8422")
func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) {
defer tcpConn.Close()
log.Debugf("[tcpserver] %s -> %s\n", tcpConn.RemoteAddr(), tcpConn.LocalAddr())
udpConn, err := net.DialUDP("udp", nil, Server8422)
if err != nil {
log.Errorf("[tcpserver] udp-tun %s -> %s : %s", tcpConn.RemoteAddr(), udpConn.LocalAddr(), err)
return
}
defer udpConn.Close()
defer func(addr net.Addr) {
n := h.nat.RemoveAddr(addr)
log.Debugf("delete addr %s from globle route, deleted count %d", addr, n)
}(udpConn.LocalAddr())
log.Debugf("[tcpserver] udp-tun %s <-> %s", tcpConn.RemoteAddr(), udpConn.LocalAddr())
errChan := make(chan error, 2)
go func() {
b := config.LPool.Get().([]byte)
defer config.LPool.Put(b[:])
for {
dgram, err := readDatagramPacket(tcpConn, b[:])
if err != nil {
log.Debugf("[tcpserver] %s -> 0 : %v", tcpConn.RemoteAddr(), err)
errChan <- err
return
var keys []string
h.connNAT.Range(func(key, value any) bool {
if value.(net.Conn) == tcpConn {
keys = append(keys, key.(string))
}
if _, err = udpConn.Write(dgram.Data); err != nil {
log.Debugf("[tcpserver] udp-tun %s -> %s : %s", tcpConn.RemoteAddr(), Server8422, err)
errChan <- err
return
}
log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", tcpConn.RemoteAddr(), Server8422, len(dgram.Data))
return true
})
for _, key := range keys {
h.connNAT.Delete(key)
}
}()
log.Debugf("delete conn %s from globle routeConnNAT, deleted count %d", addr, len(keys))
}(tcpConn.LocalAddr())
go func() {
for {
b := config.LPool.Get().([]byte)
defer config.LPool.Put(b[:])
for {
n, err := udpConn.Read(b[:])
if err != nil {
log.Debugf("[tcpserver] %s : %s", tcpConn.RemoteAddr(), err)
errChan <- err
return
}
// pipe from peer to tunnel
dgram := newDatagramPacket(b[:n])
if err = dgram.Write(tcpConn); err != nil {
log.Debugf("[tcpserver] udp-tun %s <- %s : %s", tcpConn.RemoteAddr(), dgram.Addr(), err)
errChan <- err
return
}
log.Debugf("[tcpserver] udp-tun %s <<< %s length: %d", tcpConn.RemoteAddr(), dgram.Addr(), len(dgram.Data))
dgram, err := readDatagramPacketServer(tcpConn, b[:])
if err != nil {
log.Debugf("[tcpserver] %s -> 0 : %v", tcpConn.RemoteAddr(), err)
return
}
h.ch <- &TCPUDPacket{
conn: tcpConn,
data: dgram,
}
}()
err = <-errChan
if err != nil {
log.Error(err)
}
log.Debugf("[tcpserver] udp-tun %s >-< %s", tcpConn.RemoteAddr(), udpConn.LocalAddr())
return
}
// fake udp connect over tcp

View File

@@ -20,15 +20,18 @@ import (
)
const (
MaxSize = 1024
MaxSize = 1000000
MaxThread = 10
MaxConn = 10
)
type tunHandler struct {
chain *Chain
node *Node
routes *NAT
chExit chan error
chain *Chain
node *Node
routeNAT *NAT
// map[srcIP]net.Conn
routeConnNAT *sync.Map
chExit chan error
}
type NAT struct {
@@ -60,9 +63,9 @@ func (n *NAT) RemoveAddr(addr net.Addr) (count int) {
}
func (n *NAT) LoadOrStore(to net.IP, addr net.Addr) (result net.Addr, load bool) {
n.lock.Lock()
defer n.lock.Unlock()
n.lock.RLock()
addrList := n.routes[to.String()]
n.lock.RUnlock()
for _, add := range addrList {
if add.String() == addr.String() {
load = true
@@ -71,6 +74,8 @@ func (n *NAT) LoadOrStore(to net.IP, addr net.Addr) (result net.Addr, load bool)
}
}
n.lock.Lock()
defer n.lock.Unlock()
if addrList == nil {
n.routes[to.String()] = []net.Addr{addr}
result = addr
@@ -113,8 +118,8 @@ func (n *NAT) Remove(ip net.IP, addr net.Addr) {
}
func (n *NAT) Range(f func(key string, v []net.Addr)) {
n.lock.Lock()
defer n.lock.Unlock()
n.lock.RLock()
defer n.lock.RUnlock()
for k, v := range n.routes {
f(k, v)
}
@@ -123,10 +128,11 @@ func (n *NAT) Range(f func(key string, v []net.Addr)) {
// TunHandler creates a handler for tun tunnel.
func TunHandler(chain *Chain, node *Node) Handler {
return &tunHandler{
chain: chain,
node: node,
routes: RouteNAT,
chExit: make(chan error, 1),
chain: chain,
node: node,
routeNAT: RouteNAT,
routeConnNAT: RouteConnNAT,
chExit: make(chan error, 1),
}
}
@@ -143,7 +149,7 @@ func (h tunHandler) printRoute() {
select {
case <-time.Tick(time.Second * 5):
var i int
h.routes.Range(func(key string, value []net.Addr) {
h.routeNAT.Range(func(key string, value []net.Addr) {
i++
var s []string
for _, addr := range value {
@@ -371,7 +377,7 @@ func (d *Device) Start() {
go d.parseIPHeader()
}
go d.writeToTun()
go d.heartbeats()
//go d.heartbeats()
}
func (h *tunHandler) HandleServer(ctx context.Context, tunConn net.Conn) {
@@ -436,8 +442,11 @@ type Peer struct {
connInbound chan *udpElem
parsedConnInfo chan *udpElem
tun *Device
routes *NAT
tun *Device
routeNAT *NAT
// map[srcIP]net.Conn
// routeConnNAT sync.Map
routeConnNAT *sync.Map
errChan chan error
}
@@ -484,8 +493,8 @@ func (p *Peer) parseHeader() {
continue
}
if _, loaded := p.routes.LoadOrStore(e.src, e.from); loaded {
log.Debugf("[tun] add route: %s -> %s", e.src, e.from)
if _, loaded := p.routeNAT.LoadOrStore(e.src, e.from); loaded {
log.Debugf("[tun] find route: %s -> %s", e.src, e.from)
} else {
log.Debugf("[tun] new route: %s -> %s", e.src, e.from)
}
@@ -498,7 +507,7 @@ func (p *Peer) parseHeader() {
func (p *Peer) route() {
for e := range p.parsedConnInfo {
if routeToAddr := p.routes.RouteTo(e.dst); routeToAddr != nil {
if routeToAddr := p.routeNAT.RouteTo(e.dst); routeToAddr != nil {
log.Debugf("[tun] find route: %s -> %s", e.dst, routeToAddr)
_, err := p.conn.WriteTo(e.data[:e.length], routeToAddr)
config.LPool.Put(e.data[:])
@@ -506,6 +515,14 @@ func (p *Peer) route() {
p.sendErr(err)
return
}
} else if conn, ok := p.routeConnNAT.Load(e.dst.String()); ok {
dgram := newDatagramPacket(e.data[:e.length])
if err := dgram.Write(conn.(net.Conn)); err != nil {
log.Debugf("[tcpserver] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err)
p.sendErr(err)
return
}
config.LPool.Put(e.data[:])
} else {
if !p.tun.closed.Load() {
p.tun.tunOutbound <- &DataElem{
@@ -536,14 +553,15 @@ func (p *Peer) Close() {
func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.PacketConn) error {
errChan := make(chan error, 2)
p := Peer{
p := &Peer{
conn: conn,
thread: MaxThread,
closed: &atomic.Bool{},
connInbound: make(chan *udpElem, MaxSize),
parsedConnInfo: make(chan *udpElem, MaxSize),
tun: tun,
routes: h.routes,
routeNAT: h.routeNAT,
routeConnNAT: h.routeConnNAT,
errChan: errChan,
}
@@ -551,27 +569,62 @@ func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.Pac
p.Start()
go func() {
for e := range tun.tunInbound {
for packet := range Chan {
select {
case <-ctx.Done():
return
default:
}
addr := h.routes.RouteTo(e.dst)
if addr == nil {
config.LPool.Put(e.data[:])
log.Debug(fmt.Errorf("[tun] no route for %s -> %s", e.src, e.dst))
u := &udpElem{
data: packet.data.Data[:],
length: int(packet.data.DataLength),
}
if util.IsIPv4(packet.data.Data) {
// ipv4.ParseHeader
b := packet.data.Data
u.src = net.IPv4(b[12], b[13], b[14], b[15])
u.dst = net.IPv4(b[16], b[17], b[18], b[19])
} else if util.IsIPv6(packet.data.Data) {
// ipv6.ParseHeader
u.src = packet.data.Data[8:24]
u.dst = packet.data.Data[24:40]
} else {
log.Errorf("[tun] unknown packet")
continue
}
log.Debugf("[tun] find route: %s -> %s", e.dst, addr)
_, err := conn.WriteTo(e.data[:e.length], addr)
config.LPool.Put(e.data[:])
if err != nil {
log.Debugf("[tun] can not route: %s -> %s", e.dst, addr)
errChan <- err
log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length)
p.routeConnNAT.LoadOrStore(u.src.String(), packet.conn)
log.Debugf("[tun] new routeConnNAT: %s -> %s-%s", u.src, packet.conn.LocalAddr(), packet.conn.RemoteAddr())
p.parsedConnInfo <- u
}
}()
go func() {
for e := range tun.tunInbound {
select {
case <-ctx.Done():
return
default:
}
if addr := h.routeNAT.RouteTo(e.dst); addr != nil {
log.Debugf("[tun] find route: %s -> %s", e.dst, addr)
_, err := conn.WriteTo(e.data[:e.length], addr)
config.LPool.Put(e.data[:])
if err != nil {
log.Debugf("[tun] can not route: %s -> %s", e.dst, addr)
p.sendErr(err)
return
}
} else if conn, ok := p.routeConnNAT.Load(e.dst.String()); ok {
dgram := newDatagramPacket(e.data[:e.length])
if err := dgram.Write(conn.(net.Conn)); err != nil {
log.Debugf("[tcpserver] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err)
p.sendErr(err)
return
}
} else {
config.LPool.Put(e.data[:])
log.Debug(fmt.Errorf("[tun] no route for %s -> %s", e.src, e.dst))
}
}
}()

View File

@@ -31,7 +31,7 @@ func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) {
return
}
for i := 0; i < MaxThread; i++ {
for i := 0; i < MaxConn; i++ {
go func() {
for {
select {

View File

@@ -45,6 +45,20 @@ func readDatagramPacket(r io.Reader, b []byte) (*datagramPacket, error) {
return &datagramPacket{DataLength: dataLength, Data: b[:dataLength]}, nil
}
// this method will return all byte array in the way: b[:]
func readDatagramPacketServer(r io.Reader, b []byte) (*datagramPacket, error) {
_, err := io.ReadFull(r, b[:2])
if err != nil {
return nil, err
}
dataLength := binary.BigEndian.Uint16(b[:2])
_, err = io.ReadFull(r, b[:dataLength])
if err != nil /*&& (err != io.ErrUnexpectedEOF || err != io.EOF)*/ {
return nil, err
}
return &datagramPacket{DataLength: dataLength, Data: b[:]}, nil
}
func (addr *datagramPacket) Write(w io.Writer) error {
b := config.LPool.Get().([]byte)
defer config.LPool.Put(b[:])

View File

@@ -39,6 +39,7 @@ import (
v12 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/tools/remotecommand"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/client-go/transport/spdy"
@@ -107,17 +108,16 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na
Resource("pods").
Namespace(namespace).
Name(podName).
SubResource("portforward").Timeout(time.Second * 30).
MaxRetries(3).
SubResource("portforward").
URL()
transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
log.Error(err)
return err
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport, Timeout: time.Second * 30}, "POST", url)
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
p := []string{port}
forwarder, err := NewOnAddresses(dialer, []string{"0.0.0.0"}, p, stopChan, readyChan, nil, os.Stderr)
forwarder, err := portforward.NewOnAddresses(dialer, []string{"0.0.0.0"}, p, stopChan, readyChan, nil, os.Stderr)
if err != nil {
log.Error(err)
return err