diff --git a/cmd/kubevpn/cmds/serve.go b/cmd/kubevpn/cmds/serve.go index a2e85d9d..66d8918c 100644 --- a/cmd/kubevpn/cmds/serve.go +++ b/cmd/kubevpn/cmds/serve.go @@ -1,16 +1,13 @@ package cmds import ( - "context" "math/rand" - "os" - "os/signal" - "syscall" "time" "github.com/spf13/cobra" "go.uber.org/automaxprocs/maxprocs" cmdutil "k8s.io/kubectl/pkg/cmd/util" + ctrl "sigs.k8s.io/controller-runtime" "github.com/wencaiwulue/kubevpn/pkg/config" "github.com/wencaiwulue/kubevpn/pkg/core" @@ -37,17 +34,11 @@ func CmdServe(_ cmdutil.Factory) *cobra.Command { return err } defer handler.Final() - ctx, cancelFunc := context.WithCancel(context.Background()) - stopChan := make(chan os.Signal) - signal.Notify(stopChan, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGKILL /*, syscall.SIGSTOP*/) - go func() { - <-stopChan - cancelFunc() - }() servers, err := handler.Parse(*route) if err != nil { return err } + ctx := ctrl.SetupSignalHandler() return handler.Run(ctx, servers) }, } diff --git a/go.mod b/go.mod index ea62aa2a..76370a6c 100644 --- a/go.mod +++ b/go.mod @@ -195,6 +195,7 @@ require ( golang.org/x/term v0.6.0 // indirect golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect + gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/api v0.109.0 // indirect google.golang.org/genproto v0.0.0-20230113154510-dbe35b8444a5 // indirect gopkg.in/DataDog/dd-trace-go.v1 v1.47.0 // indirect diff --git a/go.sum b/go.sum index fed868d7..b03fb351 100644 --- a/go.sum +++ b/go.sum @@ -397,6 +397,7 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J github.com/envoyproxy/protoc-gen-validate v0.9.1 h1:PS7VIOgmSVhWUEeZwTe7z7zouA22Cr590PzXKbZHOVY= github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= +github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U= github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -1411,6 +1412,7 @@ golang.zx2c4.com/wireguard v0.0.0-20220920152132-bb719d3a6e2c/go.mod h1:enML0deD golang.zx2c4.com/wireguard/windows v0.5.3 h1:On6j2Rpn3OEMXqBq00QEDC7bWSZrPIHKIus8eIuExIE= golang.zx2c4.com/wireguard/windows v0.5.3/go.mod h1:9TEe8TJmtwyQebdFwAkEWOPr3prrtqm+REGFifP60hI= gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY= +gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY= google.golang.org/api v0.0.0-20160322025152-9bf6e6e569ff/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= diff --git a/pkg/config/config.go b/pkg/config/config.go index 1c356119..f5436c0f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -135,7 +135,7 @@ var ( // network layer ip needs 20 bytes // transport layer UDP header needs 8 bytes // UDP over TCP header needs 22 bytes - DefaultMTU = 1500 - 20 - 8 - 21 + DefaultMTU = 65521 ) var ( diff --git a/pkg/core/route.go b/pkg/core/route.go index b16f7fd5..05b04f9b 100644 --- a/pkg/core/route.go +++ b/pkg/core/route.go @@ -4,6 +4,7 @@ import ( "net" "os" "strings" + "sync" "github.com/containernetworking/cni/pkg/types" "github.com/pkg/errors" @@ -15,8 +16,17 @@ import ( var ( // RouteNAT Globe route table for inner ip RouteNAT = NewNAT() + // RouteConnNAT map[srcIP]net.Conn + RouteConnNAT = &sync.Map{} + // Chan tcp connects + Chan = make(chan *TCPUDPacket, MaxSize) ) +type TCPUDPacket struct { + conn net.Conn + data *datagramPacket +} + // Route example: // -L "tcp://:10800" -L "tun://:8422?net=223.254.0.100/16" // -L "tun:/10.233.24.133:8422?net=223.254.0.102/16&route=223.254.0.0/16" diff --git a/pkg/core/tcphandler.go b/pkg/core/tcphandler.go index e5fc9f54..1da54c6c 100644 --- a/pkg/core/tcphandler.go +++ b/pkg/core/tcphandler.go @@ -4,6 +4,7 @@ import ( "context" "errors" "net" + "sync" "time" log "github.com/sirupsen/logrus" @@ -39,12 +40,15 @@ func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Co } type fakeUdpHandler struct { - nat *NAT + // map[srcIP]net.Conn + connNAT *sync.Map + ch chan *TCPUDPacket } func TCPHandler() Handler { return &fakeUdpHandler{ - nat: RouteNAT, + connNAT: RouteConnNAT, + ch: Chan, } } @@ -53,69 +57,33 @@ var Server8422, _ = net.ResolveUDPAddr("udp", "localhost:8422") func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { defer tcpConn.Close() log.Debugf("[tcpserver] %s -> %s\n", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) - udpConn, err := net.DialUDP("udp", nil, Server8422) - if err != nil { - log.Errorf("[tcpserver] udp-tun %s -> %s : %s", tcpConn.RemoteAddr(), udpConn.LocalAddr(), err) - return - } - defer udpConn.Close() defer func(addr net.Addr) { - n := h.nat.RemoveAddr(addr) - log.Debugf("delete addr %s from globle route, deleted count %d", addr, n) - }(udpConn.LocalAddr()) - - log.Debugf("[tcpserver] udp-tun %s <-> %s", tcpConn.RemoteAddr(), udpConn.LocalAddr()) - errChan := make(chan error, 2) - go func() { - b := config.LPool.Get().([]byte) - defer config.LPool.Put(b[:]) - - for { - dgram, err := readDatagramPacket(tcpConn, b[:]) - if err != nil { - log.Debugf("[tcpserver] %s -> 0 : %v", tcpConn.RemoteAddr(), err) - errChan <- err - return + var keys []string + h.connNAT.Range(func(key, value any) bool { + if value.(net.Conn) == tcpConn { + keys = append(keys, key.(string)) } - - if _, err = udpConn.Write(dgram.Data); err != nil { - log.Debugf("[tcpserver] udp-tun %s -> %s : %s", tcpConn.RemoteAddr(), Server8422, err) - errChan <- err - return - } - log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", tcpConn.RemoteAddr(), Server8422, len(dgram.Data)) + return true + }) + for _, key := range keys { + h.connNAT.Delete(key) } - }() + log.Debugf("delete conn %s from globle routeConnNAT, deleted count %d", addr, len(keys)) + }(tcpConn.LocalAddr()) - go func() { + for { b := config.LPool.Get().([]byte) - defer config.LPool.Put(b[:]) - - for { - n, err := udpConn.Read(b[:]) - if err != nil { - log.Debugf("[tcpserver] %s : %s", tcpConn.RemoteAddr(), err) - errChan <- err - return - } - - // pipe from peer to tunnel - dgram := newDatagramPacket(b[:n]) - if err = dgram.Write(tcpConn); err != nil { - log.Debugf("[tcpserver] udp-tun %s <- %s : %s", tcpConn.RemoteAddr(), dgram.Addr(), err) - errChan <- err - return - } - log.Debugf("[tcpserver] udp-tun %s <<< %s length: %d", tcpConn.RemoteAddr(), dgram.Addr(), len(dgram.Data)) + dgram, err := readDatagramPacketServer(tcpConn, b[:]) + if err != nil { + log.Debugf("[tcpserver] %s -> 0 : %v", tcpConn.RemoteAddr(), err) + return + } + h.ch <- &TCPUDPacket{ + conn: tcpConn, + data: dgram, } - }() - err = <-errChan - if err != nil { - log.Error(err) } - log.Debugf("[tcpserver] udp-tun %s >-< %s", tcpConn.RemoteAddr(), udpConn.LocalAddr()) - return } // fake udp connect over tcp diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index bd7d7d08..44e84534 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -20,15 +20,18 @@ import ( ) const ( - MaxSize = 1024 + MaxSize = 1000000 MaxThread = 10 + MaxConn = 10 ) type tunHandler struct { - chain *Chain - node *Node - routes *NAT - chExit chan error + chain *Chain + node *Node + routeNAT *NAT + // map[srcIP]net.Conn + routeConnNAT *sync.Map + chExit chan error } type NAT struct { @@ -60,9 +63,9 @@ func (n *NAT) RemoveAddr(addr net.Addr) (count int) { } func (n *NAT) LoadOrStore(to net.IP, addr net.Addr) (result net.Addr, load bool) { - n.lock.Lock() - defer n.lock.Unlock() + n.lock.RLock() addrList := n.routes[to.String()] + n.lock.RUnlock() for _, add := range addrList { if add.String() == addr.String() { load = true @@ -71,6 +74,8 @@ func (n *NAT) LoadOrStore(to net.IP, addr net.Addr) (result net.Addr, load bool) } } + n.lock.Lock() + defer n.lock.Unlock() if addrList == nil { n.routes[to.String()] = []net.Addr{addr} result = addr @@ -113,8 +118,8 @@ func (n *NAT) Remove(ip net.IP, addr net.Addr) { } func (n *NAT) Range(f func(key string, v []net.Addr)) { - n.lock.Lock() - defer n.lock.Unlock() + n.lock.RLock() + defer n.lock.RUnlock() for k, v := range n.routes { f(k, v) } @@ -123,10 +128,11 @@ func (n *NAT) Range(f func(key string, v []net.Addr)) { // TunHandler creates a handler for tun tunnel. func TunHandler(chain *Chain, node *Node) Handler { return &tunHandler{ - chain: chain, - node: node, - routes: RouteNAT, - chExit: make(chan error, 1), + chain: chain, + node: node, + routeNAT: RouteNAT, + routeConnNAT: RouteConnNAT, + chExit: make(chan error, 1), } } @@ -143,7 +149,7 @@ func (h tunHandler) printRoute() { select { case <-time.Tick(time.Second * 5): var i int - h.routes.Range(func(key string, value []net.Addr) { + h.routeNAT.Range(func(key string, value []net.Addr) { i++ var s []string for _, addr := range value { @@ -371,7 +377,7 @@ func (d *Device) Start() { go d.parseIPHeader() } go d.writeToTun() - go d.heartbeats() + //go d.heartbeats() } func (h *tunHandler) HandleServer(ctx context.Context, tunConn net.Conn) { @@ -436,8 +442,11 @@ type Peer struct { connInbound chan *udpElem parsedConnInfo chan *udpElem - tun *Device - routes *NAT + tun *Device + routeNAT *NAT + // map[srcIP]net.Conn + // routeConnNAT sync.Map + routeConnNAT *sync.Map errChan chan error } @@ -484,8 +493,8 @@ func (p *Peer) parseHeader() { continue } - if _, loaded := p.routes.LoadOrStore(e.src, e.from); loaded { - log.Debugf("[tun] add route: %s -> %s", e.src, e.from) + if _, loaded := p.routeNAT.LoadOrStore(e.src, e.from); loaded { + log.Debugf("[tun] find route: %s -> %s", e.src, e.from) } else { log.Debugf("[tun] new route: %s -> %s", e.src, e.from) } @@ -498,7 +507,7 @@ func (p *Peer) parseHeader() { func (p *Peer) route() { for e := range p.parsedConnInfo { - if routeToAddr := p.routes.RouteTo(e.dst); routeToAddr != nil { + if routeToAddr := p.routeNAT.RouteTo(e.dst); routeToAddr != nil { log.Debugf("[tun] find route: %s -> %s", e.dst, routeToAddr) _, err := p.conn.WriteTo(e.data[:e.length], routeToAddr) config.LPool.Put(e.data[:]) @@ -506,6 +515,14 @@ func (p *Peer) route() { p.sendErr(err) return } + } else if conn, ok := p.routeConnNAT.Load(e.dst.String()); ok { + dgram := newDatagramPacket(e.data[:e.length]) + if err := dgram.Write(conn.(net.Conn)); err != nil { + log.Debugf("[tcpserver] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) + p.sendErr(err) + return + } + config.LPool.Put(e.data[:]) } else { if !p.tun.closed.Load() { p.tun.tunOutbound <- &DataElem{ @@ -536,14 +553,15 @@ func (p *Peer) Close() { func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.PacketConn) error { errChan := make(chan error, 2) - p := Peer{ + p := &Peer{ conn: conn, thread: MaxThread, closed: &atomic.Bool{}, connInbound: make(chan *udpElem, MaxSize), parsedConnInfo: make(chan *udpElem, MaxSize), tun: tun, - routes: h.routes, + routeNAT: h.routeNAT, + routeConnNAT: h.routeConnNAT, errChan: errChan, } @@ -551,27 +569,62 @@ func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.Pac p.Start() go func() { - for e := range tun.tunInbound { + for packet := range Chan { select { case <-ctx.Done(): return default: } - addr := h.routes.RouteTo(e.dst) - if addr == nil { - config.LPool.Put(e.data[:]) - log.Debug(fmt.Errorf("[tun] no route for %s -> %s", e.src, e.dst)) + u := &udpElem{ + data: packet.data.Data[:], + length: int(packet.data.DataLength), + } + if util.IsIPv4(packet.data.Data) { + // ipv4.ParseHeader + b := packet.data.Data + u.src = net.IPv4(b[12], b[13], b[14], b[15]) + u.dst = net.IPv4(b[16], b[17], b[18], b[19]) + } else if util.IsIPv6(packet.data.Data) { + // ipv6.ParseHeader + u.src = packet.data.Data[8:24] + u.dst = packet.data.Data[24:40] + } else { + log.Errorf("[tun] unknown packet") continue } - - log.Debugf("[tun] find route: %s -> %s", e.dst, addr) - _, err := conn.WriteTo(e.data[:e.length], addr) - config.LPool.Put(e.data[:]) - if err != nil { - log.Debugf("[tun] can not route: %s -> %s", e.dst, addr) - errChan <- err + log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length) + p.routeConnNAT.LoadOrStore(u.src.String(), packet.conn) + log.Debugf("[tun] new routeConnNAT: %s -> %s-%s", u.src, packet.conn.LocalAddr(), packet.conn.RemoteAddr()) + p.parsedConnInfo <- u + } + }() + go func() { + for e := range tun.tunInbound { + select { + case <-ctx.Done(): return + default: + } + if addr := h.routeNAT.RouteTo(e.dst); addr != nil { + log.Debugf("[tun] find route: %s -> %s", e.dst, addr) + _, err := conn.WriteTo(e.data[:e.length], addr) + config.LPool.Put(e.data[:]) + if err != nil { + log.Debugf("[tun] can not route: %s -> %s", e.dst, addr) + p.sendErr(err) + return + } + } else if conn, ok := p.routeConnNAT.Load(e.dst.String()); ok { + dgram := newDatagramPacket(e.data[:e.length]) + if err := dgram.Write(conn.(net.Conn)); err != nil { + log.Debugf("[tcpserver] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) + p.sendErr(err) + return + } + } else { + config.LPool.Put(e.data[:]) + log.Debug(fmt.Errorf("[tun] no route for %s -> %s", e.src, e.dst)) } } }() diff --git a/pkg/core/tunhandlercli.go b/pkg/core/tunhandlercli.go index 94f26adc..884ecdf8 100644 --- a/pkg/core/tunhandlercli.go +++ b/pkg/core/tunhandlercli.go @@ -31,7 +31,7 @@ func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) { return } - for i := 0; i < MaxThread; i++ { + for i := 0; i < MaxConn; i++ { go func() { for { select { diff --git a/pkg/core/udpovertcp.go b/pkg/core/udpovertcp.go index 1f928cec..665917dc 100644 --- a/pkg/core/udpovertcp.go +++ b/pkg/core/udpovertcp.go @@ -45,6 +45,20 @@ func readDatagramPacket(r io.Reader, b []byte) (*datagramPacket, error) { return &datagramPacket{DataLength: dataLength, Data: b[:dataLength]}, nil } +// this method will return all byte array in the way: b[:] +func readDatagramPacketServer(r io.Reader, b []byte) (*datagramPacket, error) { + _, err := io.ReadFull(r, b[:2]) + if err != nil { + return nil, err + } + dataLength := binary.BigEndian.Uint16(b[:2]) + _, err = io.ReadFull(r, b[:dataLength]) + if err != nil /*&& (err != io.ErrUnexpectedEOF || err != io.EOF)*/ { + return nil, err + } + return &datagramPacket{DataLength: dataLength, Data: b[:]}, nil +} + func (addr *datagramPacket) Write(w io.Writer) error { b := config.LPool.Get().([]byte) defer config.LPool.Put(b[:]) diff --git a/pkg/util/util.go b/pkg/util/util.go index 3abf00f2..13a78688 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -39,6 +39,7 @@ import ( v12 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/portforward" "k8s.io/client-go/tools/remotecommand" watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/transport/spdy" @@ -107,17 +108,16 @@ func PortForwardPod(config *rest.Config, clientset *rest.RESTClient, podName, na Resource("pods"). Namespace(namespace). Name(podName). - SubResource("portforward").Timeout(time.Second * 30). - MaxRetries(3). + SubResource("portforward"). URL() transport, upgrader, err := spdy.RoundTripperFor(config) if err != nil { log.Error(err) return err } - dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport, Timeout: time.Second * 30}, "POST", url) + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) p := []string{port} - forwarder, err := NewOnAddresses(dialer, []string{"0.0.0.0"}, p, stopChan, readyChan, nil, os.Stderr) + forwarder, err := portforward.NewOnAddresses(dialer, []string{"0.0.0.0"}, p, stopChan, readyChan, nil, os.Stderr) if err != nil { log.Error(err) return err