diff --git a/build/Dockerfile b/build/Dockerfile index 1475505a..86698272 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -16,7 +16,7 @@ ARG BASE=github.com/wencaiwulue/kubevpn RUN sed -i s@/security.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list \ && sed -i s@/archive.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list RUN apt-get clean && apt-get update && apt-get install -y wget dnsutils vim curl \ - net-tools iptables iputils-ping lsof iproute2 tcpdump binutils traceroute conntrack socat + net-tools iptables iputils-ping lsof iproute2 tcpdump binutils traceroute conntrack socat iperf3 ENV TZ=Asia/Shanghai \ DEBIAN_FRONTEND=noninteractive diff --git a/build/local.Dockerfile b/build/local.Dockerfile index 6433a209..5666a2c7 100644 --- a/build/local.Dockerfile +++ b/build/local.Dockerfile @@ -8,7 +8,7 @@ FROM ubuntu:latest RUN sed -i s@/security.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list \ && sed -i s@/archive.ubuntu.com/@/mirrors.aliyun.com/@g /etc/apt/sources.list RUN apt-get clean && apt-get update && apt-get install -y wget dnsutils vim curl \ - net-tools iptables iputils-ping lsof iproute2 tcpdump binutils traceroute conntrack socat + net-tools iptables iputils-ping lsof iproute2 tcpdump binutils traceroute conntrack socat iperf3 ENV TZ=Asia/Shanghai \ DEBIAN_FRONTEND=noninteractive diff --git a/build/test.Dockerfile b/build/test.Dockerfile index bcc815ff..3f085bbd 100644 --- a/build/test.Dockerfile +++ b/build/test.Dockerfile @@ -2,4 +2,6 @@ FROM naison/kubevpn:latest WORKDIR /app +RUN apt-get clean && apt-get update && apt-get install -y iperf3 + COPY bin/kubevpn /usr/local/bin/kubevpn \ No newline at end of file diff --git a/pkg/core/route.go b/pkg/core/route.go index 05b04f9b..dd29a200 100644 --- a/pkg/core/route.go +++ b/pkg/core/route.go @@ -19,11 +19,10 @@ var ( // RouteConnNAT map[srcIP]net.Conn RouteConnNAT = &sync.Map{} // Chan tcp connects - Chan = make(chan *TCPUDPacket, MaxSize) + Chan = make(chan *datagramPacket, MaxSize) ) type TCPUDPacket struct { - conn net.Conn data *datagramPacket } diff --git a/pkg/core/tcphandler.go b/pkg/core/tcphandler.go index 1da54c6c..437d03e3 100644 --- a/pkg/core/tcphandler.go +++ b/pkg/core/tcphandler.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/wencaiwulue/kubevpn/pkg/config" + "github.com/wencaiwulue/kubevpn/pkg/util" ) type fakeUDPTunnelConnector struct { @@ -42,7 +43,7 @@ func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Co type fakeUdpHandler struct { // map[srcIP]net.Conn connNAT *sync.Map - ch chan *TCPUDPacket + ch chan *datagramPacket } func TCPHandler() Handler { @@ -72,6 +73,8 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { log.Debugf("delete conn %s from globle routeConnNAT, deleted count %d", addr, len(keys)) }(tcpConn.LocalAddr()) + var firstIPv4 = true + var firstIPv6 = true for { b := config.LPool.Get().([]byte) dgram, err := readDatagramPacketServer(tcpConn, b[:]) @@ -79,10 +82,24 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { log.Debugf("[tcpserver] %s -> 0 : %v", tcpConn.RemoteAddr(), err) return } - h.ch <- &TCPUDPacket{ - conn: tcpConn, - data: dgram, + + if firstIPv4 || firstIPv6 { + var src net.IP + bb := dgram.Data[:dgram.DataLength] + if util.IsIPv4(bb) { + src = net.IPv4(bb[12], bb[13], bb[14], bb[15]) + firstIPv4 = false + } else if util.IsIPv6(bb) { + src = bb[8:24] + firstIPv6 = false + } else { + log.Errorf("[tun] unknown packet") + continue + } + h.connNAT.LoadOrStore(src.String(), tcpConn) + log.Debugf("[tun] new routeConnNAT: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) } + h.ch <- dgram } } diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index 44e84534..35570318 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -7,7 +7,6 @@ import ( "net" "strings" "sync" - "sync/atomic" "time" "github.com/google/gopacket" @@ -20,9 +19,9 @@ import ( ) const ( - MaxSize = 1000000 + MaxSize = 1000 MaxThread = 10 - MaxConn = 10 + MaxConn = 1 ) type tunHandler struct { @@ -166,7 +165,6 @@ func (h tunHandler) printRoute() { type Device struct { tun net.Conn - closed atomic.Bool thread int tunInboundRaw chan *DataElem @@ -187,9 +185,6 @@ func (d *Device) readFromTun() { } return } - if d.closed.Load() { - return - } d.tunInboundRaw <- &DataElem{ data: b[:], length: n, @@ -228,18 +223,12 @@ func (d *Device) parseIPHeader() { } log.Debugf("[tun] %s --> %s", e.src, e.dst) - if d.closed.Load() { - return - } d.tunInbound <- e } } func (d *Device) Close() { - d.closed.Store(true) d.tun.Close() - close(d.tunInboundRaw) - close(d.tunOutbound) } func (d *Device) heartbeats() { @@ -297,10 +286,6 @@ func (d *Device) heartbeats() { } } for index, i2 := range [][]byte{bytes, bytes6} { - if d.closed.Load() { - return - } - data := config.LPool.Get().([]byte)[:] length := copy(data, i2) var src, dst net.IP @@ -385,7 +370,6 @@ func (h *tunHandler) HandleServer(ctx context.Context, tunConn net.Conn) { tun := &Device{ tun: tunConn, thread: MaxThread, - closed: atomic.Bool{}, tunInboundRaw: make(chan *DataElem, MaxSize), tunInbound: make(chan *DataElem, MaxSize), tunOutbound: make(chan *DataElem, MaxSize), @@ -437,7 +421,6 @@ type udpElem struct { type Peer struct { conn net.PacketConn thread int - closed *atomic.Bool connInbound chan *udpElem parsedConnInfo chan *udpElem @@ -466,9 +449,6 @@ func (p *Peer) readFromConn() { p.sendErr(err) return } - if p.closed.Load() { - return - } p.connInbound <- &udpElem{ from: srcAddr, data: b[:], @@ -478,28 +458,33 @@ func (p *Peer) readFromConn() { } func (p *Peer) parseHeader() { + var firstIPv4, firstIPv6 = true, true for e := range p.connInbound { + b := e.data[:e.length] if util.IsIPv4(e.data[:e.length]) { // ipv4.ParseHeader - b := e.data[:e.length] e.src = net.IPv4(b[12], b[13], b[14], b[15]) e.dst = net.IPv4(b[16], b[17], b[18], b[19]) } else if util.IsIPv6(e.data[:e.length]) { // ipv6.ParseHeader - e.src = e.data[:e.length][8:24] - e.dst = e.data[:e.length][24:40] + e.src = b[:e.length][8:24] + e.dst = b[:e.length][24:40] } else { log.Errorf("[tun] unknown packet") continue } - if _, loaded := p.routeNAT.LoadOrStore(e.src, e.from); loaded { - log.Debugf("[tun] find route: %s -> %s", e.src, e.from) - } else { - log.Debugf("[tun] new route: %s -> %s", e.src, e.from) - } - if p.closed.Load() { - return + if firstIPv4 || firstIPv6 { + if util.IsIPv4(e.data[:e.length]) { + firstIPv4 = false + } else { + firstIPv6 = false + } + if _, loaded := p.routeNAT.LoadOrStore(e.src, e.from); loaded { + log.Debugf("[tun] find route: %s -> %s", e.src, e.from) + } else { + log.Debugf("[tun] new route: %s -> %s", e.src, e.from) + } } p.parsedConnInfo <- e } @@ -524,13 +509,11 @@ func (p *Peer) route() { } config.LPool.Put(e.data[:]) } else { - if !p.tun.closed.Load() { - p.tun.tunOutbound <- &DataElem{ - data: e.data, - length: e.length, - src: e.src, - dst: e.dst, - } + p.tun.tunOutbound <- &DataElem{ + data: e.data, + length: e.length, + src: e.src, + dst: e.dst, } } } @@ -545,10 +528,7 @@ func (p *Peer) Start() { } func (p *Peer) Close() { - p.closed.Store(true) p.conn.Close() - close(p.connInbound) - close(p.parsedConnInfo) } func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.PacketConn) error { @@ -556,7 +536,6 @@ func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.Pac p := &Peer{ conn: conn, thread: MaxThread, - closed: &atomic.Bool{}, connInbound: make(chan *udpElem, MaxSize), parsedConnInfo: make(chan *udpElem, MaxSize), tun: tun, @@ -570,42 +549,29 @@ func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.Pac go func() { for packet := range Chan { - select { - case <-ctx.Done(): - return - default: - } - u := &udpElem{ - data: packet.data.Data[:], - length: int(packet.data.DataLength), + data: packet.Data[:], + length: int(packet.DataLength), } - if util.IsIPv4(packet.data.Data) { + b := packet.Data + if util.IsIPv4(packet.Data) { // ipv4.ParseHeader - b := packet.data.Data u.src = net.IPv4(b[12], b[13], b[14], b[15]) u.dst = net.IPv4(b[16], b[17], b[18], b[19]) - } else if util.IsIPv6(packet.data.Data) { + } else if util.IsIPv6(packet.Data) { // ipv6.ParseHeader - u.src = packet.data.Data[8:24] - u.dst = packet.data.Data[24:40] + u.src = b[8:24] + u.dst = b[24:40] } else { log.Errorf("[tun] unknown packet") continue } log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length) - p.routeConnNAT.LoadOrStore(u.src.String(), packet.conn) - log.Debugf("[tun] new routeConnNAT: %s -> %s-%s", u.src, packet.conn.LocalAddr(), packet.conn.RemoteAddr()) p.parsedConnInfo <- u } }() go func() { for e := range tun.tunInbound { - select { - case <-ctx.Done(): - return - default: - } if addr := h.routeNAT.RouteTo(e.dst); addr != nil { log.Debugf("[tun] find route: %s -> %s", e.dst, addr) _, err := conn.WriteTo(e.data[:e.length], addr) @@ -617,7 +583,9 @@ func (h *tunHandler) transportTun(ctx context.Context, tun *Device, conn net.Pac } } else if conn, ok := p.routeConnNAT.Load(e.dst.String()); ok { dgram := newDatagramPacket(e.data[:e.length]) - if err := dgram.Write(conn.(net.Conn)); err != nil { + err := dgram.Write(conn.(net.Conn)) + config.LPool.Put(e.data[:]) + if err != nil { log.Debugf("[tcpserver] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) p.sendErr(err) return diff --git a/pkg/core/tunhandlercli.go b/pkg/core/tunhandlercli.go index 884ecdf8..6541b74f 100644 --- a/pkg/core/tunhandlercli.go +++ b/pkg/core/tunhandlercli.go @@ -4,7 +4,6 @@ import ( "context" "errors" "net" - "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -15,7 +14,6 @@ import ( func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) { d := &Device{ tun: tun, - closed: atomic.Bool{}, thread: MaxThread, tunInboundRaw: make(chan *DataElem, MaxSize), tunInbound: make(chan *DataElem, MaxSize), @@ -95,16 +93,7 @@ func (h *tunHandler) transportTunCli(ctx context.Context, d *Device, conn net.Pa go func() { for e := range d.tunInbound { - select { - case <-ctx.Done(): - return - default: - } - if e.src.Equal(e.dst) { - if d.closed.Load() { - return - } d.tunOutbound <- e continue } @@ -131,9 +120,6 @@ func (h *tunHandler) transportTunCli(ctx context.Context, d *Device, conn net.Pa errChan <- err return } - if d.closed.Load() { - return - } d.tunOutbound <- &DataElem{data: b[:], length: n} } }()