diff --git a/pkg/core/gvisortunendpoint.go b/pkg/core/gvisortunendpoint.go index 0c5cf101..a18c3c1d 100755 --- a/pkg/core/gvisortunendpoint.go +++ b/pkg/core/gvisortunendpoint.go @@ -101,13 +101,13 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c h.addToRouteMapTCP(ctx, src, conn) // inner ip like 198.19.0.100/102/103 connect each other if config.CIDR.Contains(dst) || config.CIDR6.Contains(dst) { - plog.G(ctx).Debugf("[TUN-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Length: %d", src.String(), dst.String(), read) + plog.G(ctx).Debugf("[TUN-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read) util.SafeWrite(h.packetChan, &DatagramPacket{ DataLength: uint16(read), Data: buf[:], }, func(v *DatagramPacket) { config.LPool.Put(v.Data[:]) - plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Length: %d", src, dst, v.DataLength) + plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), v.DataLength) }) continue } @@ -120,7 +120,7 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c sniffer.LogPacket("[gVISOR] ", sniffer.DirectionRecv, protocol, pkt) endpoint.InjectInbound(protocol, pkt) pkt.DecRef() - plog.G(ctx).Debugf("[TUN-GVISOR] Write to Gvisor IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), src.String(), dst, read) + plog.G(ctx).Debugf("[TUN-GVISOR] Write to Gvisor. SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read) } } diff --git a/pkg/core/gvisorudpforwarder.go b/pkg/core/gvisorudpforwarder.go index ed81998a..16d0033c 100644 --- a/pkg/core/gvisorudpforwarder.go +++ b/pkg/core/gvisorudpforwarder.go @@ -78,7 +78,7 @@ func UDPForwarder(ctx context.Context, s *stack.Stack) func(id stack.TransportEn break } } - plog.G(ctx).Infof("[TUN-UDP] Write length %d data from src: %s -> dst: %s", written, src.String(), dst.String()) + plog.G(ctx).Infof("[TUN-UDP] Write length %d data from src: %s -> dst: %s", written, src, dst) errChan <- err }() go func() { @@ -108,7 +108,7 @@ func UDPForwarder(ctx context.Context, s *stack.Stack) func(id stack.TransportEn break } } - plog.G(ctx).Infof("[TUN-UDP] Read length %d data from dst: %s -> src: %s", written, dst.String(), src.String()) + plog.G(ctx).Infof("[TUN-UDP] Read length %d data from dst: %s -> src: %s", written, dst, src) errChan <- err }() err1 = <-errChan diff --git a/pkg/core/tcphandler.go b/pkg/core/tcphandler.go index cb48e8fd..16aaf2c5 100644 --- a/pkg/core/tcphandler.go +++ b/pkg/core/tcphandler.go @@ -5,7 +5,9 @@ import ( "net" "sync" + "github.com/google/gopacket/layers" "github.com/wencaiwulue/kubevpn/v2/pkg/config" + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -72,7 +74,8 @@ func (h *UDPOverTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) { } var src, dst net.IP - src, dst, err = util.ParseIP(packet.Data[:packet.DataLength]) + var protocol int + src, dst, protocol, err = util.ParseIP(packet.Data[:packet.DataLength]) if err != nil { plog.G(ctx).Errorf("[TCP] Unknown packet") config.LPool.Put(buf[:]) @@ -87,9 +90,10 @@ func (h *UDPOverTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) { } else { plog.G(ctx).Infof("[TCP] Add new route map TCP to DST %s by connation %s -> %s", src, tcpConn.RemoteAddr(), tcpConn.LocalAddr()) } + // here receive too many packet util.SafeWrite(h.packetChan, packet, func(v *DatagramPacket) { - config.LPool.Put(v.Data[:]) - plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Length: %d", src, dst, v.DataLength) + plog.G(context.Background()).Errorf("Stuck packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), v.DataLength) + h.packetChan <- v }) } } diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index 1243767c..a6431478 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -5,13 +5,15 @@ import ( "net" "sync" + "github.com/google/gopacket/layers" + "github.com/wencaiwulue/kubevpn/v2/pkg/config" plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) const ( - MaxSize = 100 + MaxSize = 1000 ) type tunHandler struct { @@ -37,7 +39,7 @@ func (h *tunHandler) Handle(ctx context.Context, tun net.Conn) { if remote := h.node.Remote; remote != "" { remoteAddr, err := net.ResolveUDPAddr("udp", remote) if err != nil { - plog.G(ctx).Errorf("[TUN-CLIENT] Failed to resolve udp addr %s: %v", remote, err) + plog.G(ctx).Errorf("Failed to resolve udp addr %s: %v", remote, err) return } h.HandleClient(ctx, tun, remoteAddr) @@ -89,14 +91,14 @@ func (d *Device) readFromTUN(ctx context.Context) { return } - src, dst, err := util.ParseIP(buf[:n]) + src, dst, protocol, err := util.ParseIP(buf[:n]) if err != nil { plog.G(ctx).Errorf("[TUN] Unknown packet") config.LPool.Put(buf[:]) continue } - plog.G(ctx).Debugf("[TUN] SRC: %s, DST: %s, Length: %d", src, dst, n) + plog.G(ctx).Debugf("[TUN] SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), n) util.SafeWrite(d.tunInbound, &Packet{ data: buf[:], length: n, @@ -104,7 +106,7 @@ func (d *Device) readFromTUN(ctx context.Context) { dst: dst, }, func(v *Packet) { config.LPool.Put(v.data[:]) - plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Length: %d", v.src, v.dst, v.length) + plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", v.src, v.dst, layers.IPProtocol(protocol).String(), v.length) }) } } @@ -226,7 +228,7 @@ func (p *Peer) readFromConn(ctx context.Context) { return } - src, dst, err := util.ParseIP(buf[:n]) + src, dst, protocol, err := util.ParseIP(buf[:n]) if err != nil { config.LPool.Put(buf[:]) plog.G(ctx).Errorf("[TUN] Unknown packet: %v", err) @@ -240,7 +242,7 @@ func (p *Peer) readFromConn(ctx context.Context) { } else { plog.G(ctx).Infof("[TUN] Add new route map UDP: %s -> %s", src, from) } - + plog.G(context.Background()).Errorf("[TUN] SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), n) p.tunInbound <- &Packet{ data: buf[:], length: n, @@ -257,13 +259,13 @@ func (p *Peer) readFromTCPConn(ctx context.Context) { case <-ctx.Done(): return case packet := <-TCPPacketChan: - src, dst, err := util.ParseIP(packet.Data) + src, dst, protocol, err := util.ParseIP(packet.Data) if err != nil { plog.G(ctx).Errorf("[TCP] Unknown packet") config.LPool.Put(packet.Data[:]) continue } - plog.G(ctx).Debugf("[TCP] SRC: %s > DST: %s Length: %d", src, dst, packet.DataLength) + plog.G(ctx).Debugf("[TCP] SRC: %s > DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), packet.DataLength) p.tcpInbound <- &Packet{ data: packet.Data[:], length: int(packet.DataLength), @@ -282,7 +284,7 @@ func (p *Peer) routeTCP(ctx context.Context) { return case packet := <-p.tcpInbound: if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok { - plog.G(ctx).Debugf("[TCP] Find TCP route SRC: %s to DST: %s -> %s", packet.src.String(), packet.dst.String(), conn.(net.Conn).RemoteAddr()) + plog.G(ctx).Debugf("[TCP] Find TCP route SRC: %s to DST: %s -> %s", packet.src, packet.dst, conn.(net.Conn).RemoteAddr()) dgram := newDatagramPacket(packet.data[:packet.length]) err := dgram.Write(conn.(net.Conn)) config.LPool.Put(packet.data[:]) @@ -292,7 +294,7 @@ func (p *Peer) routeTCP(ctx context.Context) { return } } else { - plog.G(ctx).Debugf("[TCP] Not found route, write to TUN device. SRC: %s, DST: %s", packet.src.String(), packet.dst.String()) + plog.G(ctx).Debugf("[TCP] Not found route, write to TUN device. SRC: %s, DST: %s", packet.src, packet.dst) p.tunOutbound <- &Packet{ data: packet.data, length: packet.length, @@ -312,7 +314,7 @@ func (p *Peer) routeTUN(ctx context.Context) { return case packet := <-p.tunInbound: if addr, ok := p.routeMapUDP.Load(packet.dst.String()); ok { - plog.G(ctx).Debugf("[TUN] Find UDP route to DST: %s -> %s, SRC: %s, DST: %s", packet.dst, addr, packet.src.String(), packet.dst.String()) + plog.G(ctx).Debugf("[TUN] Find UDP route to DST: %s -> %s, SRC: %s, DST: %s", packet.dst, addr, packet.src, packet.dst) _, err := p.conn.WriteTo(packet.data[:packet.length], addr.(net.Addr)) config.LPool.Put(packet.data[:]) if err != nil { diff --git a/pkg/core/tunhandlerclient.go b/pkg/core/tunhandlerclient.go index fba2a459..e515876e 100644 --- a/pkg/core/tunhandlerclient.go +++ b/pkg/core/tunhandlerclient.go @@ -6,6 +6,7 @@ import ( "net" "time" + "github.com/google/gopacket/layers" "github.com/pkg/errors" "github.com/wencaiwulue/kubevpn/v2/pkg/config" @@ -47,13 +48,13 @@ func (d *ClientDevice) forwardPacketToRemote(ctx context.Context, remoteAddr *ne func() { packetConn, err := getRemotePacketConn(ctx, forward) if err != nil { - plog.G(ctx).Errorf("[TUN-CLIENT] Failed to get remote conn from %s -> %s: %s", d.tun.LocalAddr(), remoteAddr, err) + plog.G(ctx).Errorf("Failed to get remote conn from %s -> %s: %s", d.tun.LocalAddr(), remoteAddr, err) time.Sleep(time.Second * 1) return } err = transportTunPacketClient(ctx, d.tunInbound, d.tunOutbound, packetConn, remoteAddr) if err != nil { - plog.G(ctx).Errorf("[TUN-CLIENT] Failed to transport data to remote %s: %v", remoteAddr, err) + plog.G(ctx).Errorf("Failed to transport data to remote %s: %v", remoteAddr, err) } }() } @@ -95,8 +96,12 @@ func transportTunPacketClient(ctx context.Context, tunInbound <-chan *Packet, tu for packet := range tunInbound { if packet.src.Equal(packet.dst) { util.SafeWrite(tunOutbound, packet, func(v *Packet) { + var p = "unknown" + if _, _, protocol, err := util.ParseIP(v.data[:v.length]); err == nil { + p = layers.IPProtocol(protocol).String() + } config.LPool.Put(v.data[:]) - plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Length: %d", v.src, v.dst, v.length) + plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", v.src, v.dst, p, v.length) }) continue } @@ -140,7 +145,7 @@ func (d *ClientDevice) readFromTun(ctx context.Context) { buf := config.LPool.Get().([]byte)[:] n, err := d.tun.Read(buf[:]) if err != nil { - plog.G(ctx).Errorf("[TUN-CLIENT] Failed to read packet from tun device: %s", err) + plog.G(ctx).Errorf("Failed to read packet from tun device: %s", err) util.SafeWrite(d.errChan, err) config.LPool.Put(buf[:]) return @@ -148,16 +153,17 @@ func (d *ClientDevice) readFromTun(ctx context.Context) { // Try to determine network protocol number, default zero. var src, dst net.IP - src, dst, err = util.ParseIP(buf[:n]) + var protocol int + src, dst, protocol, err = util.ParseIP(buf[:n]) if err != nil { - plog.G(ctx).Errorf("[TUN-CLIENT] Unknown packet: %v", err) + plog.G(ctx).Errorf("Unknown packet: %v", err) config.LPool.Put(buf[:]) continue } - plog.G(context.Background()).Debugf("[TUN-CLIENT] SRC: %s, DST: %s, Length: %d", src.String(), dst, n) + plog.G(context.Background()).Debugf("SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), n) util.SafeWrite(d.tunInbound, NewPacket(buf[:], n, src, dst), func(v *Packet) { config.LPool.Put(v.data[:]) - plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Length: %d", v.src, v.dst, v.length) + plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", v.src, v.dst, layers.IPProtocol(protocol).String(), v.length) }) } } @@ -168,7 +174,7 @@ func (d *ClientDevice) writeToTun(ctx context.Context) { _, err := d.tun.Write(e.data[:e.length]) config.LPool.Put(e.data[:]) if err != nil { - plog.G(ctx).Errorf("[TUN-CLIENT] Failed to write packet to tun device: %v", err) + plog.G(ctx).Errorf("Failed to write packet to tun device: %v", err) util.SafeWrite(d.errChan, err) return } diff --git a/pkg/util/net.go b/pkg/util/net.go index 4e756555..16274da4 100644 --- a/pkg/util/net.go +++ b/pkg/util/net.go @@ -129,22 +129,22 @@ func IsIPv6(packet []byte) bool { return 6 == (packet[0] >> 4) } -func ParseIP(packet []byte) (src net.IP, dst net.IP, err error) { +func ParseIP(packet []byte) (src net.IP, dst net.IP, protocol int, err error) { if IsIPv4(packet) { header, err := ipv4.ParseHeader(packet) if err != nil { - return nil, nil, err + return nil, nil, -1, err } - return header.Src, header.Dst, nil + return header.Src, header.Dst, header.Protocol, nil } if IsIPv6(packet) { header, err := ipv6.ParseHeader(packet) if err != nil { - return nil, nil, err + return nil, nil, -1, err } - return header.Src, header.Dst, nil + return header.Src, header.Dst, header.NextHeader, nil } - return nil, nil, errors.New("packet is invalid") + return nil, nil, -1, errors.New("packet is invalid") } func GetIPBaseNic() (*net.IPNet, error) {