diff --git a/pkg/core/bufferedtcp.go b/pkg/core/bufferedtcp.go new file mode 100644 index 00000000..b4fe0ce2 --- /dev/null +++ b/pkg/core/bufferedtcp.go @@ -0,0 +1,55 @@ +package core + +import ( + "context" + "errors" + "net" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" +) + +type bufferedTCP struct { + net.Conn + Chan chan *DatagramPacket + closed bool +} + +func NewBufferedTCP(conn net.Conn) net.Conn { + c := &bufferedTCP{ + Conn: conn, + Chan: make(chan *DatagramPacket, MaxSize), + } + go c.Run() + return c +} + +func (c *bufferedTCP) Write(b []byte) (n int, err error) { + if c.closed { + return 0, errors.New("tcp channel is closed") + } + if len(b) == 0 { + return 0, nil + } + + buf := config.LPool.Get().([]byte)[:] + n = copy(buf, b) + c.Chan <- &DatagramPacket{ + DataLength: uint16(n), + Data: buf, + } + return n, nil +} + +func (c *bufferedTCP) Run() { + for buf := range c.Chan { + _, err := c.Conn.Write(buf.Data[:buf.DataLength]) + config.LPool.Put(buf.Data[:]) + if err != nil { + plog.G(context.Background()).Errorf("[TCP] Write packet failed: %v", err) + _ = c.Conn.Close() + c.closed = true + return + } + } +} diff --git a/pkg/core/gvisortcphandler.go b/pkg/core/gvisortcphandler.go index cc4ddd4e..d0bfa976 100644 --- a/pkg/core/gvisortcphandler.go +++ b/pkg/core/gvisortcphandler.go @@ -20,7 +20,7 @@ import ( type gvisorTCPHandler struct { // map[srcIP]net.Conn routeMapTCP *sync.Map - packetChan chan *DatagramPacket + packetChan chan *Packet } func GvisorTCPHandler() Handler { @@ -43,7 +43,7 @@ func (h *gvisorTCPHandler) handle(ctx context.Context, tcpConn net.Conn) { errChan := make(chan error, 2) go func() { defer util.HandleCrash() - h.readFromTCPConnWriteToEndpoint(ctx, tcpConn, endpoint) + h.readFromTCPConnWriteToEndpoint(ctx, NewBufferedTCP(tcpConn), endpoint) util.SafeClose(errChan) }() go func() { diff --git a/pkg/core/gvisortunendpoint.go b/pkg/core/gvisortunendpoint.go index a18c3c1d..161c8a0e 100755 --- a/pkg/core/gvisortunendpoint.go +++ b/pkg/core/gvisortunendpoint.go @@ -21,13 +21,7 @@ import ( func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, conn net.Conn, endpoint *channel.Endpoint) { tcpConn, _ := newGvisorUDPConnOverTCP(ctx, conn) - for { - select { - case <-ctx.Done(): - return - default: - } - + for ctx.Err() == nil { pktBuffer := endpoint.ReadContext(ctx) if pktBuffer != nil { sniffer.LogPacket("[gVISOR] ", sniffer.DirectionSend, pktBuffer.NetworkProtocolNumber, pktBuffer) @@ -45,22 +39,16 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c tcpConn, _ := newGvisorUDPConnOverTCP(ctx, conn) defer h.removeFromRouteMapTCP(ctx, conn) - for { - select { - case <-ctx.Done(): - return - default: - } - + for ctx.Err() == nil { buf := config.LPool.Get().([]byte)[:] read, err := tcpConn.Read(buf[:]) if err != nil { - plog.G(ctx).Errorf("[TUN-GVISOR] Failed to read from tcp conn: %v", err) + plog.G(ctx).Errorf("[TCP-GVISOR] Failed to read from tcp conn: %v", err) config.LPool.Put(buf[:]) return } if read == 0 { - plog.G(ctx).Warnf("[TUN-GVISOR] Read from tcp conn length is %d", read) + plog.G(ctx).Warnf("[TCP-GVISOR] Read from tcp conn length is %d", read) config.LPool.Put(buf[:]) continue } @@ -85,7 +73,7 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c protocol = header.IPv6ProtocolNumber ipHeader, err := ipv6.ParseHeader(buf[:read]) if err != nil { - plog.G(ctx).Errorf("[TUN-GVISOR] Failed to parse IPv6 header: %s", err.Error()) + plog.G(ctx).Errorf("[TCP-GVISOR] Failed to parse IPv6 header: %s", err.Error()) config.LPool.Put(buf[:]) continue } @@ -93,7 +81,7 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c src = ipHeader.Src dst = ipHeader.Dst } else { - plog.G(ctx).Errorf("[TUN-GVISOR] Unknown packet") + plog.G(ctx).Errorf("[TCP-GVISOR] Unknown packet") config.LPool.Put(buf[:]) continue } @@ -101,14 +89,10 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c h.addToRouteMapTCP(ctx, src, conn) // inner ip like 198.19.0.100/102/103 connect each other if config.CIDR.Contains(dst) || config.CIDR6.Contains(dst) { - plog.G(ctx).Debugf("[TUN-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read) - util.SafeWrite(h.packetChan, &DatagramPacket{ - DataLength: uint16(read), - Data: buf[:], - }, func(v *DatagramPacket) { - config.LPool.Put(v.Data[:]) - plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), v.DataLength) - }) + err = h.handlePacket(ctx, buf, read, src, dst, layers.IPProtocol(ipProtocol).String()) + if err != nil { + plog.G(ctx).Errorf("[TCP-GVISOR] Failed to handle packet: %v", err) + } continue } @@ -120,14 +104,42 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c sniffer.LogPacket("[gVISOR] ", sniffer.DirectionRecv, protocol, pkt) endpoint.InjectInbound(protocol, pkt) pkt.DecRef() - plog.G(ctx).Debugf("[TUN-GVISOR] Write to Gvisor. SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read) + plog.G(ctx).Debugf("[TCP-GVISOR] Write to Gvisor. SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read) } } +func (h *gvisorTCPHandler) handlePacket(ctx context.Context, buf []byte, length int, src, dst net.IP, protocol string) error { + if conn, ok := h.routeMapTCP.Load(dst.String()); ok { + plog.G(ctx).Debugf("[TCP-GVISOR] Find TCP route SRC: %s to DST: %s -> %s", src, dst, conn.(net.Conn).RemoteAddr()) + dgram := newDatagramPacket(buf[:length]) + err := dgram.Write(conn.(net.Conn)) + config.LPool.Put(buf[:]) + if err != nil { + plog.G(ctx).Errorf("[TCP-GVISOR] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) + return err + } + } else if config.RouterIP.Equal(dst) || config.RouterIP6.Equal(dst) { + plog.G(ctx).Debugf("[TCP-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, protocol, length) + util.SafeWrite(h.packetChan, &Packet{ + length: length, + data: buf[:], + src: src, + dst: dst, + }, func(v *Packet) { + config.LPool.Put(v.data[:]) + plog.G(context.Background()).Errorf("[TCP-GVISOR] Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, protocol, v.length) + }) + } else { + plog.G(ctx).Warnf("[TCP-GVISOR] No route for src: %s -> dst: %s, drop it", src, dst) + config.LPool.Put(buf[:]) + } + return nil +} + func (h *gvisorTCPHandler) addToRouteMapTCP(ctx context.Context, src net.IP, tcpConn net.Conn) { value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn) if loaded { - if tcpConn != value.(net.Conn) { + if value.(net.Conn) != tcpConn { h.routeMapTCP.Store(src.String(), tcpConn) plog.G(ctx).Infof("[TUN-GVISOR] Replace route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) } diff --git a/pkg/core/gvisorudphandler.go b/pkg/core/gvisorudphandler.go index 763c675c..f26f7539 100644 --- a/pkg/core/gvisorudphandler.go +++ b/pkg/core/gvisorudphandler.go @@ -59,11 +59,11 @@ func (c *gvisorUDPConnOverTCP) Read(b []byte) (int, error) { case <-c.ctx.Done(): return 0, c.ctx.Err() default: - dgram, err := readDatagramPacket(c.Conn, b) + datagram, err := readDatagramPacket(c.Conn, b) if err != nil { return 0, err } - return int(dgram.DataLength), nil + return int(datagram.DataLength), nil } } @@ -107,26 +107,20 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) { buf := config.LPool.Get().([]byte)[:] defer config.LPool.Put(buf[:]) - for { - select { - case <-ctx.Done(): - return - default: - } - + for ctx.Err() == nil { err := tcpConn.SetReadDeadline(time.Now().Add(time.Second * 30)) if err != nil { plog.G(ctx).Errorf("[TUN-UDP] Failed to set read deadline: %v", err) errChan <- err return } - dgram, err := readDatagramPacket(tcpConn, buf[:]) + datagram, err := readDatagramPacket(tcpConn, buf[:]) if err != nil { plog.G(ctx).Errorf("[TUN-UDP] %s -> %s: %v", tcpConn.RemoteAddr(), udpConn.LocalAddr(), err) errChan <- err return } - if dgram.DataLength == 0 { + if datagram.DataLength == 0 { plog.G(ctx).Errorf("[TUN-UDP] Length is zero") errChan <- fmt.Errorf("length of read packet is zero") return @@ -138,12 +132,12 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) { errChan <- err return } - if _, err = udpConn.Write(dgram.Data); err != nil { + if _, err = udpConn.Write(datagram.Data[:datagram.DataLength]); err != nil { plog.G(ctx).Errorf("[TUN-UDP] %s -> %s : %s", tcpConn.RemoteAddr(), "localhost:8422", err) errChan <- err return } - plog.G(ctx).Infof("[TUN-UDP] %s >>> %s length: %d", tcpConn.RemoteAddr(), "localhost:8422", dgram.DataLength) + plog.G(ctx).Infof("[TUN-UDP] %s >>> %s length: %d", tcpConn.RemoteAddr(), "localhost:8422", datagram.DataLength) } }() @@ -152,13 +146,7 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) { buf := config.LPool.Get().([]byte)[:] defer config.LPool.Put(buf[:]) - for { - select { - case <-ctx.Done(): - return - default: - } - + for ctx.Err() == nil { err := udpConn.SetReadDeadline(time.Now().Add(time.Second * 30)) if err != nil { plog.G(ctx).Errorf("[TUN-UDP] Failed to set read deadline failed: %v", err) diff --git a/pkg/core/route.go b/pkg/core/route.go index d5da05a2..19aec976 100644 --- a/pkg/core/route.go +++ b/pkg/core/route.go @@ -18,7 +18,7 @@ var ( // RouteMapTCP map[srcIP]net.Conn Globe route table for inner ip RouteMapTCP = &sync.Map{} // TCPPacketChan tcp connects - TCPPacketChan = make(chan *DatagramPacket, MaxSize) + TCPPacketChan = make(chan *Packet, MaxSize) ) type TCPUDPacket struct { diff --git a/pkg/core/tcphandler.go b/pkg/core/tcphandler.go index 2705d084..f1eb1b3f 100644 --- a/pkg/core/tcphandler.go +++ b/pkg/core/tcphandler.go @@ -6,8 +6,8 @@ import ( "sync" "github.com/google/gopacket/layers" - "github.com/wencaiwulue/kubevpn/v2/pkg/config" + "github.com/wencaiwulue/kubevpn/v2/pkg/config" plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -42,7 +42,7 @@ func (c *UDPOverTCPConnector) ConnectContext(ctx context.Context, conn net.Conn) type UDPOverTCPHandler struct { // map[srcIP]net.Conn routeMapTCP *sync.Map - packetChan chan *DatagramPacket + packetChan chan *Packet } func TCPHandler() Handler { @@ -53,49 +53,74 @@ func TCPHandler() Handler { } func (h *UDPOverTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) { + tcpConn = NewBufferedTCP(tcpConn) defer tcpConn.Close() plog.G(ctx).Infof("[TCP] Handle connection %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) defer h.removeFromRouteMapTCP(ctx, tcpConn) - for { - select { - case <-ctx.Done(): - return - default: - } - + for ctx.Err() == nil { buf := config.LPool.Get().([]byte)[:] - packet, err := readDatagramPacketServer(tcpConn, buf[:]) + datagram, err := readDatagramPacket(tcpConn, buf[:]) if err != nil { plog.G(ctx).Errorf("[TCP] Failed to read from %s -> %s: %v", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), err) config.LPool.Put(buf[:]) return } - var src, dst net.IP - var protocol int - src, dst, protocol, err = util.ParseIP(packet.Data[:packet.DataLength]) + err = h.handlePacket(ctx, tcpConn, datagram) if err != nil { - plog.G(ctx).Errorf("[TCP] Unknown packet") - config.LPool.Put(buf[:]) - continue + return } - value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn) - if loaded { - if tcpConn != value.(net.Conn) { - h.routeMapTCP.Store(src.String(), tcpConn) - plog.G(ctx).Infof("[TCP] Replace route map TCP to DST %s by connation %s -> %s", src, tcpConn.RemoteAddr(), tcpConn.LocalAddr()) - } - } else { - plog.G(ctx).Infof("[TCP] Add new route map TCP to DST %s by connation %s -> %s", src, tcpConn.RemoteAddr(), tcpConn.LocalAddr()) + } +} + +func (h *UDPOverTCPHandler) handlePacket(ctx context.Context, tcpConn net.Conn, datagram *DatagramPacket) error { + src, dst, protocol, err := util.ParseIP(datagram.Data[:datagram.DataLength]) + if err != nil { + plog.G(ctx).Errorf("[TCP] Unknown packet") + config.LPool.Put(datagram.Data[:]) + return err + } + + h.addToRouteMapTCP(ctx, src, tcpConn) + + if conn, ok := h.routeMapTCP.Load(dst.String()); ok { + plog.G(ctx).Debugf("[TCP] Find TCP route SRC: %s to DST: %s -> %s", src, dst, conn.(net.Conn).RemoteAddr()) + err = datagram.Write(conn.(net.Conn)) + config.LPool.Put(datagram.Data[:]) + if err != nil { + plog.G(ctx).Errorf("[TCP] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) + return err } - // here receive too many packet - util.SafeWrite(h.packetChan, packet, func(v *DatagramPacket) { - plog.G(context.Background()).Errorf("Stuck packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), v.DataLength) + } else if (config.CIDR.Contains(dst) || config.CIDR6.Contains(dst)) && (!config.RouterIP.Equal(dst) && !config.RouterIP6.Equal(dst)) { + plog.G(ctx).Warnf("[TCP] No route for src: %s -> dst: %s, drop it", src, dst) + config.LPool.Put(datagram.Data[:]) + } else { + plog.G(ctx).Debugf("[TCP] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), datagram.DataLength) + util.SafeWrite(h.packetChan, &Packet{ + data: datagram.Data, + length: int(datagram.DataLength), + src: src, + dst: dst, + }, func(v *Packet) { + plog.G(context.Background()).Errorf("Stuck packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), v.length) h.packetChan <- v }) } + return nil +} + +func (h *UDPOverTCPHandler) addToRouteMapTCP(ctx context.Context, src net.IP, tcpConn net.Conn) { + value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn) + if loaded { + if value.(net.Conn) != tcpConn { + h.routeMapTCP.Store(src.String(), tcpConn) + plog.G(ctx).Infof("[TCP] Replace route map TCP to DST %s by connation %s -> %s", src, tcpConn.RemoteAddr(), tcpConn.LocalAddr()) + } + } else { + plog.G(ctx).Infof("[TCP] Add new route map TCP to DST %s by connation %s -> %s", src, tcpConn.RemoteAddr(), tcpConn.LocalAddr()) + } } func (h *UDPOverTCPHandler) removeFromRouteMapTCP(ctx context.Context, tcpConn net.Conn) { @@ -126,11 +151,11 @@ func (c *UDPConnOverTCP) ReadFrom(b []byte) (int, net.Addr, error) { case <-c.ctx.Done(): return 0, nil, c.ctx.Err() default: - packet, err := readDatagramPacket(c.Conn, b) + datagram, err := readDatagramPacket(c.Conn, b) if err != nil { return 0, nil, err } - return int(packet.DataLength), nil, nil + return int(datagram.DataLength), nil, nil } } diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index a6431478..c4e56d62 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -131,42 +131,34 @@ func (d *Device) Close() { util.SafeClose(TCPPacketChan) } -func (d *Device) transport(ctx1 context.Context, addr string, routeMapUDP *sync.Map, routeMapTCP *sync.Map) { - for ctx1.Err() == nil { - func() { - ctx, cancelFunc := context.WithCancel(ctx1) - defer cancelFunc() +func (d *Device) transport(ctx context.Context, addr string, routeMapUDP *sync.Map, routeMapTCP *sync.Map) { + packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", addr) + if err != nil { + util.SafeWrite(d.errChan, err) + plog.G(ctx).Errorf("[TUN] Failed to listen %s: %v", addr, err) + return + } - packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", addr) - if err != nil { - plog.G(ctx1).Errorf("[UDP] Failed to listen %s: %v", addr, err) - return - } + p := &Peer{ + conn: packetConn, + tunInbound: d.tunInbound, + tunOutbound: d.tunOutbound, + routeMapUDP: routeMapUDP, + routeMapTCP: routeMapTCP, + errChan: make(chan error, 1), + } - p := &Peer{ - conn: packetConn, - tcpInbound: make(chan *Packet, MaxSize), - tunInbound: d.tunInbound, - tunOutbound: d.tunOutbound, - routeMapUDP: routeMapUDP, - routeMapTCP: routeMapTCP, - errChan: make(chan error, 1), - } + go p.readFromConn(ctx) + go p.routeTUN(ctx) + go p.routeTCPToTun(ctx) - defer p.Close() - go p.readFromConn(ctx) - go p.readFromTCPConn(ctx) - go p.routeTCP(ctx) - go p.routeTUN(ctx) - - select { - case err = <-p.errChan: - plog.G(ctx1).Errorf("[TUN] %s: %v", d.tun.LocalAddr(), err) - return - case <-ctx.Done(): - return - } - }() + select { + case err = <-p.errChan: + plog.G(ctx).Errorf("[TUN] %s: %v", d.tun.LocalAddr(), err) + util.SafeWrite(d.errChan, err) + return + case <-ctx.Done(): + return } } @@ -197,8 +189,6 @@ func (d *Packet) Length() int { type Peer struct { conn net.PacketConn - tcpInbound chan *Packet - tunInbound chan *Packet tunOutbound chan<- *Packet @@ -232,16 +222,10 @@ func (p *Peer) readFromConn(ctx context.Context) { if err != nil { config.LPool.Put(buf[:]) plog.G(ctx).Errorf("[TUN] Unknown packet: %v", err) - continue - } - if addr, loaded := p.routeMapUDP.LoadOrStore(src.String(), from); loaded { - if addr.(net.Addr).String() != from.String() { - p.routeMapUDP.Store(src.String(), from) - plog.G(ctx).Infof("[TUN] Replace route map UDP: %s -> %s", src, from) - } - } else { - plog.G(ctx).Infof("[TUN] Add new route map UDP: %s -> %s", src, from) + p.sendErr(err) + return } + p.addToRouteMapUDP(ctx, src, from) plog.G(context.Background()).Errorf("[TUN] SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), n) p.tunInbound <- &Packet{ data: buf[:], @@ -252,94 +236,49 @@ func (p *Peer) readFromConn(ctx context.Context) { } } -func (p *Peer) readFromTCPConn(ctx context.Context) { - defer util.HandleCrash() - for ctx.Err() == nil { - select { - case <-ctx.Done(): - return - case packet := <-TCPPacketChan: - src, dst, protocol, err := util.ParseIP(packet.Data) - if err != nil { - plog.G(ctx).Errorf("[TCP] Unknown packet") - config.LPool.Put(packet.Data[:]) - continue - } - plog.G(ctx).Debugf("[TCP] SRC: %s > DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), packet.DataLength) - p.tcpInbound <- &Packet{ - data: packet.Data[:], - length: int(packet.DataLength), - src: src, - dst: dst, - } +func (p *Peer) addToRouteMapUDP(ctx context.Context, src net.IP, from net.Addr) { + if addr, loaded := p.routeMapUDP.LoadOrStore(src.String(), from); loaded { + if addr.(net.Addr).String() != from.String() { + p.routeMapUDP.Store(src.String(), from) + plog.G(ctx).Infof("[TUN] Replace route map UDP: %s -> %s", src, from) } + } else { + plog.G(ctx).Infof("[TUN] Add new route map UDP: %s -> %s", src, from) } } -func (p *Peer) routeTCP(ctx context.Context) { +func (p *Peer) routeTCPToTun(ctx context.Context) { defer util.HandleCrash() - for ctx.Err() == nil { - select { - case <-ctx.Done(): - return - case packet := <-p.tcpInbound: - if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok { - plog.G(ctx).Debugf("[TCP] Find TCP route SRC: %s to DST: %s -> %s", packet.src, packet.dst, conn.(net.Conn).RemoteAddr()) - dgram := newDatagramPacket(packet.data[:packet.length]) - err := dgram.Write(conn.(net.Conn)) - config.LPool.Put(packet.data[:]) - if err != nil { - plog.G(ctx).Errorf("[TCP] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) - p.sendErr(err) - return - } - } else { - plog.G(ctx).Debugf("[TCP] Not found route, write to TUN device. SRC: %s, DST: %s", packet.src, packet.dst) - p.tunOutbound <- &Packet{ - data: packet.data, - length: packet.length, - src: packet.src, - dst: packet.dst, - } - } - } + for packet := range TCPPacketChan { + p.tunOutbound <- packet } } func (p *Peer) routeTUN(ctx context.Context) { defer util.HandleCrash() - for ctx.Err() == nil { - select { - case <-ctx.Done(): - return - case packet := <-p.tunInbound: - if addr, ok := p.routeMapUDP.Load(packet.dst.String()); ok { - plog.G(ctx).Debugf("[TUN] Find UDP route to DST: %s -> %s, SRC: %s, DST: %s", packet.dst, addr, packet.src, packet.dst) - _, err := p.conn.WriteTo(packet.data[:packet.length], addr.(net.Addr)) - config.LPool.Put(packet.data[:]) - if err != nil { - plog.G(ctx).Errorf("[TUN] Failed wirte to route dst: %s -> %s", packet.dst, addr) - p.sendErr(err) - return - } - } else if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok { - plog.G(ctx).Debugf("[TUN] Find TCP route to dst: %s -> %s", packet.dst.String(), conn.(net.Conn).RemoteAddr()) - dgram := newDatagramPacket(packet.data[:packet.length]) - err := dgram.Write(conn.(net.Conn)) - config.LPool.Put(packet.data[:]) - if err != nil { - plog.G(ctx).Errorf("[TUN] Failed to write TCP %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) - p.sendErr(err) - return - } - } else { - plog.G(ctx).Warnf("[TUN] No route for src: %s -> dst: %s, drop it", packet.src, packet.dst) - config.LPool.Put(packet.data[:]) + for packet := range p.tunInbound { + if addr, ok := p.routeMapUDP.Load(packet.dst.String()); ok { + plog.G(ctx).Debugf("[TUN] Find UDP route to DST: %s -> %s, SRC: %s, DST: %s", packet.dst, addr, packet.src, packet.dst) + _, err := p.conn.WriteTo(packet.data[:packet.length], addr.(net.Addr)) + config.LPool.Put(packet.data[:]) + if err != nil { + plog.G(ctx).Errorf("[TUN] Failed wirte to route dst: %s -> %s", packet.dst, addr) + p.sendErr(err) + return } + } else if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok { + plog.G(ctx).Debugf("[TUN] Find TCP route to dst: %s -> %s", packet.dst.String(), conn.(net.Conn).RemoteAddr()) + dgram := newDatagramPacket(packet.data[:packet.length]) + err := dgram.Write(conn.(net.Conn)) + config.LPool.Put(packet.data[:]) + if err != nil { + plog.G(ctx).Errorf("[TUN] Failed to write TCP %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) + p.sendErr(err) + return + } + } else { + plog.G(ctx).Warnf("[TUN] No route for src: %s -> dst: %s, drop it", packet.src, packet.dst) + config.LPool.Put(packet.data[:]) } } } - -func (p *Peer) Close() { - p.conn.Close() -} diff --git a/pkg/core/tunhandlerclient.go b/pkg/core/tunhandlerclient.go index e515876e..34a658be 100644 --- a/pkg/core/tunhandlerclient.go +++ b/pkg/core/tunhandlerclient.go @@ -124,6 +124,10 @@ func transportTunPacketClient(ctx context.Context, tunInbound <-chan *Packet, tu util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to read packet from remote %s", remoteAddr))) return } + if n == 0 { + plog.G(ctx).Warnf("Packet length 0") + continue + } util.SafeWrite(tunOutbound, &Packet{data: buf[:], length: n}, func(v *Packet) { config.LPool.Put(v.data[:]) plog.G(context.Background()).Errorf("Drop packet, LocalAddr: %s, Remote: %s, Length: %d", packetConn.LocalAddr(), remoteAddr, v.length) @@ -202,13 +206,7 @@ func heartbeats(ctx context.Context, tun net.Conn) { ticker := time.NewTicker(time.Second * 60) defer ticker.Stop() - for ; true; <-ticker.C { - select { - case <-ctx.Done(): - return - default: - } - + for ; ctx.Err() == nil; <-ticker.C { if srcIPv4 != nil { util.Ping(ctx, srcIPv4.String(), config.RouterIP.String()) } diff --git a/pkg/core/udpovertcp.go b/pkg/core/udpovertcp.go index 4aeafdc9..a204894a 100644 --- a/pkg/core/udpovertcp.go +++ b/pkg/core/udpovertcp.go @@ -19,21 +19,8 @@ func newDatagramPacket(data []byte) (r *DatagramPacket) { } } +// this method will return all byte array in the way: b[:], len(DatagramPacket.Data)==64k func readDatagramPacket(r io.Reader, b []byte) (*DatagramPacket, error) { - _, err := io.ReadFull(r, b[:2]) - if err != nil { - return nil, err - } - dataLength := binary.BigEndian.Uint16(b[:2]) - _, err = io.ReadFull(r, b[:dataLength]) - if err != nil { - return nil, err - } - return &DatagramPacket{DataLength: dataLength, Data: b[:dataLength]}, nil -} - -// this method will return all byte array in the way: b[:] -func readDatagramPacketServer(r io.Reader, b []byte) (*DatagramPacket, error) { _, err := io.ReadFull(r, b[:2]) if err != nil { return nil, err @@ -46,11 +33,11 @@ func readDatagramPacketServer(r io.Reader, b []byte) (*DatagramPacket, error) { return &DatagramPacket{DataLength: dataLength, Data: b[:]}, nil } -func (addr *DatagramPacket) Write(w io.Writer) error { +func (d *DatagramPacket) Write(w io.Writer) error { buf := config.LPool.Get().([]byte)[:] defer config.LPool.Put(buf[:]) - binary.BigEndian.PutUint16(buf[:2], uint16(len(addr.Data))) - n := copy(buf[2:], addr.Data) + binary.BigEndian.PutUint16(buf[:2], d.DataLength) + n := copy(buf[2:], d.Data[:d.DataLength]) _, err := w.Write(buf[:n+2]) return err } diff --git a/pkg/dhcp/dhcp.go b/pkg/dhcp/dhcp.go index 14b6573f..7575aa05 100644 --- a/pkg/dhcp/dhcp.go +++ b/pkg/dhcp/dhcp.go @@ -4,7 +4,6 @@ import ( "context" "encoding/base64" "fmt" - plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "net" "github.com/cilium/ipam/service/allocator" @@ -17,6 +16,7 @@ import ( "k8s.io/client-go/util/retry" "github.com/wencaiwulue/kubevpn/v2/pkg/config" + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) diff --git a/pkg/ssh/ssh.go b/pkg/ssh/ssh.go index ff86f06e..8d375d3a 100644 --- a/pkg/ssh/ssh.go +++ b/pkg/ssh/ssh.go @@ -841,12 +841,8 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b return } if print { - msg := fmt.Sprintf("To use: export KUBECONFIG=%s", temp.Name()) - plog.G(ctx).Info(pkgutil.PrintStr(msg)) plog.G(ctx).Infof("Use temporary kubeconfig: %s", temp.Name()) } else { - msg := fmt.Sprintf("To use: export KUBECONFIG=%s", temp.Name()) - plog.G(ctx).Debugf(pkgutil.PrintStr(msg)) plog.G(ctx).Debugf("Use temporary kubeconfig: %s", temp.Name()) } path = temp.Name()