mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-10-19 21:54:44 +08:00
perf: route packet by each tcp conn (#548)
This commit is contained in:
55
pkg/core/bufferedtcp.go
Normal file
55
pkg/core/bufferedtcp.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
||||
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
|
||||
)
|
||||
|
||||
type bufferedTCP struct {
|
||||
net.Conn
|
||||
Chan chan *DatagramPacket
|
||||
closed bool
|
||||
}
|
||||
|
||||
func NewBufferedTCP(conn net.Conn) net.Conn {
|
||||
c := &bufferedTCP{
|
||||
Conn: conn,
|
||||
Chan: make(chan *DatagramPacket, MaxSize),
|
||||
}
|
||||
go c.Run()
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *bufferedTCP) Write(b []byte) (n int, err error) {
|
||||
if c.closed {
|
||||
return 0, errors.New("tcp channel is closed")
|
||||
}
|
||||
if len(b) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
buf := config.LPool.Get().([]byte)[:]
|
||||
n = copy(buf, b)
|
||||
c.Chan <- &DatagramPacket{
|
||||
DataLength: uint16(n),
|
||||
Data: buf,
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (c *bufferedTCP) Run() {
|
||||
for buf := range c.Chan {
|
||||
_, err := c.Conn.Write(buf.Data[:buf.DataLength])
|
||||
config.LPool.Put(buf.Data[:])
|
||||
if err != nil {
|
||||
plog.G(context.Background()).Errorf("[TCP] Write packet failed: %v", err)
|
||||
_ = c.Conn.Close()
|
||||
c.closed = true
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
@@ -20,7 +20,7 @@ import (
|
||||
type gvisorTCPHandler struct {
|
||||
// map[srcIP]net.Conn
|
||||
routeMapTCP *sync.Map
|
||||
packetChan chan *DatagramPacket
|
||||
packetChan chan *Packet
|
||||
}
|
||||
|
||||
func GvisorTCPHandler() Handler {
|
||||
@@ -43,7 +43,7 @@ func (h *gvisorTCPHandler) handle(ctx context.Context, tcpConn net.Conn) {
|
||||
errChan := make(chan error, 2)
|
||||
go func() {
|
||||
defer util.HandleCrash()
|
||||
h.readFromTCPConnWriteToEndpoint(ctx, tcpConn, endpoint)
|
||||
h.readFromTCPConnWriteToEndpoint(ctx, NewBufferedTCP(tcpConn), endpoint)
|
||||
util.SafeClose(errChan)
|
||||
}()
|
||||
go func() {
|
||||
|
@@ -21,13 +21,7 @@ import (
|
||||
|
||||
func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, conn net.Conn, endpoint *channel.Endpoint) {
|
||||
tcpConn, _ := newGvisorUDPConnOverTCP(ctx, conn)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
pktBuffer := endpoint.ReadContext(ctx)
|
||||
if pktBuffer != nil {
|
||||
sniffer.LogPacket("[gVISOR] ", sniffer.DirectionSend, pktBuffer.NetworkProtocolNumber, pktBuffer)
|
||||
@@ -45,22 +39,16 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
|
||||
tcpConn, _ := newGvisorUDPConnOverTCP(ctx, conn)
|
||||
defer h.removeFromRouteMapTCP(ctx, conn)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
buf := config.LPool.Get().([]byte)[:]
|
||||
read, err := tcpConn.Read(buf[:])
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TUN-GVISOR] Failed to read from tcp conn: %v", err)
|
||||
plog.G(ctx).Errorf("[TCP-GVISOR] Failed to read from tcp conn: %v", err)
|
||||
config.LPool.Put(buf[:])
|
||||
return
|
||||
}
|
||||
if read == 0 {
|
||||
plog.G(ctx).Warnf("[TUN-GVISOR] Read from tcp conn length is %d", read)
|
||||
plog.G(ctx).Warnf("[TCP-GVISOR] Read from tcp conn length is %d", read)
|
||||
config.LPool.Put(buf[:])
|
||||
continue
|
||||
}
|
||||
@@ -85,7 +73,7 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
|
||||
protocol = header.IPv6ProtocolNumber
|
||||
ipHeader, err := ipv6.ParseHeader(buf[:read])
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TUN-GVISOR] Failed to parse IPv6 header: %s", err.Error())
|
||||
plog.G(ctx).Errorf("[TCP-GVISOR] Failed to parse IPv6 header: %s", err.Error())
|
||||
config.LPool.Put(buf[:])
|
||||
continue
|
||||
}
|
||||
@@ -93,7 +81,7 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
|
||||
src = ipHeader.Src
|
||||
dst = ipHeader.Dst
|
||||
} else {
|
||||
plog.G(ctx).Errorf("[TUN-GVISOR] Unknown packet")
|
||||
plog.G(ctx).Errorf("[TCP-GVISOR] Unknown packet")
|
||||
config.LPool.Put(buf[:])
|
||||
continue
|
||||
}
|
||||
@@ -101,14 +89,10 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
|
||||
h.addToRouteMapTCP(ctx, src, conn)
|
||||
// inner ip like 198.19.0.100/102/103 connect each other
|
||||
if config.CIDR.Contains(dst) || config.CIDR6.Contains(dst) {
|
||||
plog.G(ctx).Debugf("[TUN-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read)
|
||||
util.SafeWrite(h.packetChan, &DatagramPacket{
|
||||
DataLength: uint16(read),
|
||||
Data: buf[:],
|
||||
}, func(v *DatagramPacket) {
|
||||
config.LPool.Put(v.Data[:])
|
||||
plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), v.DataLength)
|
||||
})
|
||||
err = h.handlePacket(ctx, buf, read, src, dst, layers.IPProtocol(ipProtocol).String())
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TCP-GVISOR] Failed to handle packet: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -120,14 +104,42 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
|
||||
sniffer.LogPacket("[gVISOR] ", sniffer.DirectionRecv, protocol, pkt)
|
||||
endpoint.InjectInbound(protocol, pkt)
|
||||
pkt.DecRef()
|
||||
plog.G(ctx).Debugf("[TUN-GVISOR] Write to Gvisor. SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read)
|
||||
plog.G(ctx).Debugf("[TCP-GVISOR] Write to Gvisor. SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *gvisorTCPHandler) handlePacket(ctx context.Context, buf []byte, length int, src, dst net.IP, protocol string) error {
|
||||
if conn, ok := h.routeMapTCP.Load(dst.String()); ok {
|
||||
plog.G(ctx).Debugf("[TCP-GVISOR] Find TCP route SRC: %s to DST: %s -> %s", src, dst, conn.(net.Conn).RemoteAddr())
|
||||
dgram := newDatagramPacket(buf[:length])
|
||||
err := dgram.Write(conn.(net.Conn))
|
||||
config.LPool.Put(buf[:])
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TCP-GVISOR] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err)
|
||||
return err
|
||||
}
|
||||
} else if config.RouterIP.Equal(dst) || config.RouterIP6.Equal(dst) {
|
||||
plog.G(ctx).Debugf("[TCP-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, protocol, length)
|
||||
util.SafeWrite(h.packetChan, &Packet{
|
||||
length: length,
|
||||
data: buf[:],
|
||||
src: src,
|
||||
dst: dst,
|
||||
}, func(v *Packet) {
|
||||
config.LPool.Put(v.data[:])
|
||||
plog.G(context.Background()).Errorf("[TCP-GVISOR] Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, protocol, v.length)
|
||||
})
|
||||
} else {
|
||||
plog.G(ctx).Warnf("[TCP-GVISOR] No route for src: %s -> dst: %s, drop it", src, dst)
|
||||
config.LPool.Put(buf[:])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *gvisorTCPHandler) addToRouteMapTCP(ctx context.Context, src net.IP, tcpConn net.Conn) {
|
||||
value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn)
|
||||
if loaded {
|
||||
if tcpConn != value.(net.Conn) {
|
||||
if value.(net.Conn) != tcpConn {
|
||||
h.routeMapTCP.Store(src.String(), tcpConn)
|
||||
plog.G(ctx).Infof("[TUN-GVISOR] Replace route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr())
|
||||
}
|
||||
|
@@ -59,11 +59,11 @@ func (c *gvisorUDPConnOverTCP) Read(b []byte) (int, error) {
|
||||
case <-c.ctx.Done():
|
||||
return 0, c.ctx.Err()
|
||||
default:
|
||||
dgram, err := readDatagramPacket(c.Conn, b)
|
||||
datagram, err := readDatagramPacket(c.Conn, b)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return int(dgram.DataLength), nil
|
||||
return int(datagram.DataLength), nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,26 +107,20 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) {
|
||||
buf := config.LPool.Get().([]byte)[:]
|
||||
defer config.LPool.Put(buf[:])
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
err := tcpConn.SetReadDeadline(time.Now().Add(time.Second * 30))
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TUN-UDP] Failed to set read deadline: %v", err)
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
dgram, err := readDatagramPacket(tcpConn, buf[:])
|
||||
datagram, err := readDatagramPacket(tcpConn, buf[:])
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TUN-UDP] %s -> %s: %v", tcpConn.RemoteAddr(), udpConn.LocalAddr(), err)
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
if dgram.DataLength == 0 {
|
||||
if datagram.DataLength == 0 {
|
||||
plog.G(ctx).Errorf("[TUN-UDP] Length is zero")
|
||||
errChan <- fmt.Errorf("length of read packet is zero")
|
||||
return
|
||||
@@ -138,12 +132,12 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
if _, err = udpConn.Write(dgram.Data); err != nil {
|
||||
if _, err = udpConn.Write(datagram.Data[:datagram.DataLength]); err != nil {
|
||||
plog.G(ctx).Errorf("[TUN-UDP] %s -> %s : %s", tcpConn.RemoteAddr(), "localhost:8422", err)
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
plog.G(ctx).Infof("[TUN-UDP] %s >>> %s length: %d", tcpConn.RemoteAddr(), "localhost:8422", dgram.DataLength)
|
||||
plog.G(ctx).Infof("[TUN-UDP] %s >>> %s length: %d", tcpConn.RemoteAddr(), "localhost:8422", datagram.DataLength)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -152,13 +146,7 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) {
|
||||
buf := config.LPool.Get().([]byte)[:]
|
||||
defer config.LPool.Put(buf[:])
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
err := udpConn.SetReadDeadline(time.Now().Add(time.Second * 30))
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TUN-UDP] Failed to set read deadline failed: %v", err)
|
||||
|
@@ -18,7 +18,7 @@ var (
|
||||
// RouteMapTCP map[srcIP]net.Conn Globe route table for inner ip
|
||||
RouteMapTCP = &sync.Map{}
|
||||
// TCPPacketChan tcp connects
|
||||
TCPPacketChan = make(chan *DatagramPacket, MaxSize)
|
||||
TCPPacketChan = make(chan *Packet, MaxSize)
|
||||
)
|
||||
|
||||
type TCPUDPacket struct {
|
||||
|
@@ -6,8 +6,8 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
||||
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
|
||||
)
|
||||
@@ -42,7 +42,7 @@ func (c *UDPOverTCPConnector) ConnectContext(ctx context.Context, conn net.Conn)
|
||||
type UDPOverTCPHandler struct {
|
||||
// map[srcIP]net.Conn
|
||||
routeMapTCP *sync.Map
|
||||
packetChan chan *DatagramPacket
|
||||
packetChan chan *Packet
|
||||
}
|
||||
|
||||
func TCPHandler() Handler {
|
||||
@@ -53,49 +53,74 @@ func TCPHandler() Handler {
|
||||
}
|
||||
|
||||
func (h *UDPOverTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) {
|
||||
tcpConn = NewBufferedTCP(tcpConn)
|
||||
defer tcpConn.Close()
|
||||
plog.G(ctx).Infof("[TCP] Handle connection %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr())
|
||||
|
||||
defer h.removeFromRouteMapTCP(ctx, tcpConn)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
for ctx.Err() == nil {
|
||||
buf := config.LPool.Get().([]byte)[:]
|
||||
packet, err := readDatagramPacketServer(tcpConn, buf[:])
|
||||
datagram, err := readDatagramPacket(tcpConn, buf[:])
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TCP] Failed to read from %s -> %s: %v", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), err)
|
||||
config.LPool.Put(buf[:])
|
||||
return
|
||||
}
|
||||
|
||||
var src, dst net.IP
|
||||
var protocol int
|
||||
src, dst, protocol, err = util.ParseIP(packet.Data[:packet.DataLength])
|
||||
err = h.handlePacket(ctx, tcpConn, datagram)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *UDPOverTCPHandler) handlePacket(ctx context.Context, tcpConn net.Conn, datagram *DatagramPacket) error {
|
||||
src, dst, protocol, err := util.ParseIP(datagram.Data[:datagram.DataLength])
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TCP] Unknown packet")
|
||||
config.LPool.Put(buf[:])
|
||||
continue
|
||||
config.LPool.Put(datagram.Data[:])
|
||||
return err
|
||||
}
|
||||
|
||||
h.addToRouteMapTCP(ctx, src, tcpConn)
|
||||
|
||||
if conn, ok := h.routeMapTCP.Load(dst.String()); ok {
|
||||
plog.G(ctx).Debugf("[TCP] Find TCP route SRC: %s to DST: %s -> %s", src, dst, conn.(net.Conn).RemoteAddr())
|
||||
err = datagram.Write(conn.(net.Conn))
|
||||
config.LPool.Put(datagram.Data[:])
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TCP] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err)
|
||||
return err
|
||||
}
|
||||
} else if (config.CIDR.Contains(dst) || config.CIDR6.Contains(dst)) && (!config.RouterIP.Equal(dst) && !config.RouterIP6.Equal(dst)) {
|
||||
plog.G(ctx).Warnf("[TCP] No route for src: %s -> dst: %s, drop it", src, dst)
|
||||
config.LPool.Put(datagram.Data[:])
|
||||
} else {
|
||||
plog.G(ctx).Debugf("[TCP] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), datagram.DataLength)
|
||||
util.SafeWrite(h.packetChan, &Packet{
|
||||
data: datagram.Data,
|
||||
length: int(datagram.DataLength),
|
||||
src: src,
|
||||
dst: dst,
|
||||
}, func(v *Packet) {
|
||||
plog.G(context.Background()).Errorf("Stuck packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), v.length)
|
||||
h.packetChan <- v
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *UDPOverTCPHandler) addToRouteMapTCP(ctx context.Context, src net.IP, tcpConn net.Conn) {
|
||||
value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn)
|
||||
if loaded {
|
||||
if tcpConn != value.(net.Conn) {
|
||||
if value.(net.Conn) != tcpConn {
|
||||
h.routeMapTCP.Store(src.String(), tcpConn)
|
||||
plog.G(ctx).Infof("[TCP] Replace route map TCP to DST %s by connation %s -> %s", src, tcpConn.RemoteAddr(), tcpConn.LocalAddr())
|
||||
}
|
||||
} else {
|
||||
plog.G(ctx).Infof("[TCP] Add new route map TCP to DST %s by connation %s -> %s", src, tcpConn.RemoteAddr(), tcpConn.LocalAddr())
|
||||
}
|
||||
// here receive too many packet
|
||||
util.SafeWrite(h.packetChan, packet, func(v *DatagramPacket) {
|
||||
plog.G(context.Background()).Errorf("Stuck packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), v.DataLength)
|
||||
h.packetChan <- v
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (h *UDPOverTCPHandler) removeFromRouteMapTCP(ctx context.Context, tcpConn net.Conn) {
|
||||
@@ -126,11 +151,11 @@ func (c *UDPConnOverTCP) ReadFrom(b []byte) (int, net.Addr, error) {
|
||||
case <-c.ctx.Done():
|
||||
return 0, nil, c.ctx.Err()
|
||||
default:
|
||||
packet, err := readDatagramPacket(c.Conn, b)
|
||||
datagram, err := readDatagramPacket(c.Conn, b)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
return int(packet.DataLength), nil, nil
|
||||
return int(datagram.DataLength), nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -131,21 +131,16 @@ func (d *Device) Close() {
|
||||
util.SafeClose(TCPPacketChan)
|
||||
}
|
||||
|
||||
func (d *Device) transport(ctx1 context.Context, addr string, routeMapUDP *sync.Map, routeMapTCP *sync.Map) {
|
||||
for ctx1.Err() == nil {
|
||||
func() {
|
||||
ctx, cancelFunc := context.WithCancel(ctx1)
|
||||
defer cancelFunc()
|
||||
|
||||
func (d *Device) transport(ctx context.Context, addr string, routeMapUDP *sync.Map, routeMapTCP *sync.Map) {
|
||||
packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", addr)
|
||||
if err != nil {
|
||||
plog.G(ctx1).Errorf("[UDP] Failed to listen %s: %v", addr, err)
|
||||
util.SafeWrite(d.errChan, err)
|
||||
plog.G(ctx).Errorf("[TUN] Failed to listen %s: %v", addr, err)
|
||||
return
|
||||
}
|
||||
|
||||
p := &Peer{
|
||||
conn: packetConn,
|
||||
tcpInbound: make(chan *Packet, MaxSize),
|
||||
tunInbound: d.tunInbound,
|
||||
tunOutbound: d.tunOutbound,
|
||||
routeMapUDP: routeMapUDP,
|
||||
@@ -153,21 +148,18 @@ func (d *Device) transport(ctx1 context.Context, addr string, routeMapUDP *sync.
|
||||
errChan: make(chan error, 1),
|
||||
}
|
||||
|
||||
defer p.Close()
|
||||
go p.readFromConn(ctx)
|
||||
go p.readFromTCPConn(ctx)
|
||||
go p.routeTCP(ctx)
|
||||
go p.routeTUN(ctx)
|
||||
go p.routeTCPToTun(ctx)
|
||||
|
||||
select {
|
||||
case err = <-p.errChan:
|
||||
plog.G(ctx1).Errorf("[TUN] %s: %v", d.tun.LocalAddr(), err)
|
||||
plog.G(ctx).Errorf("[TUN] %s: %v", d.tun.LocalAddr(), err)
|
||||
util.SafeWrite(d.errChan, err)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
type Packet struct {
|
||||
@@ -197,8 +189,6 @@ func (d *Packet) Length() int {
|
||||
type Peer struct {
|
||||
conn net.PacketConn
|
||||
|
||||
tcpInbound chan *Packet
|
||||
|
||||
tunInbound chan *Packet
|
||||
tunOutbound chan<- *Packet
|
||||
|
||||
@@ -232,16 +222,10 @@ func (p *Peer) readFromConn(ctx context.Context) {
|
||||
if err != nil {
|
||||
config.LPool.Put(buf[:])
|
||||
plog.G(ctx).Errorf("[TUN] Unknown packet: %v", err)
|
||||
continue
|
||||
}
|
||||
if addr, loaded := p.routeMapUDP.LoadOrStore(src.String(), from); loaded {
|
||||
if addr.(net.Addr).String() != from.String() {
|
||||
p.routeMapUDP.Store(src.String(), from)
|
||||
plog.G(ctx).Infof("[TUN] Replace route map UDP: %s -> %s", src, from)
|
||||
}
|
||||
} else {
|
||||
plog.G(ctx).Infof("[TUN] Add new route map UDP: %s -> %s", src, from)
|
||||
p.sendErr(err)
|
||||
return
|
||||
}
|
||||
p.addToRouteMapUDP(ctx, src, from)
|
||||
plog.G(context.Background()).Errorf("[TUN] SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), n)
|
||||
p.tunInbound <- &Packet{
|
||||
data: buf[:],
|
||||
@@ -252,67 +236,27 @@ func (p *Peer) readFromConn(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) readFromTCPConn(ctx context.Context) {
|
||||
defer util.HandleCrash()
|
||||
for ctx.Err() == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case packet := <-TCPPacketChan:
|
||||
src, dst, protocol, err := util.ParseIP(packet.Data)
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TCP] Unknown packet")
|
||||
config.LPool.Put(packet.Data[:])
|
||||
continue
|
||||
}
|
||||
plog.G(ctx).Debugf("[TCP] SRC: %s > DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), packet.DataLength)
|
||||
p.tcpInbound <- &Packet{
|
||||
data: packet.Data[:],
|
||||
length: int(packet.DataLength),
|
||||
src: src,
|
||||
dst: dst,
|
||||
}
|
||||
func (p *Peer) addToRouteMapUDP(ctx context.Context, src net.IP, from net.Addr) {
|
||||
if addr, loaded := p.routeMapUDP.LoadOrStore(src.String(), from); loaded {
|
||||
if addr.(net.Addr).String() != from.String() {
|
||||
p.routeMapUDP.Store(src.String(), from)
|
||||
plog.G(ctx).Infof("[TUN] Replace route map UDP: %s -> %s", src, from)
|
||||
}
|
||||
} else {
|
||||
plog.G(ctx).Infof("[TUN] Add new route map UDP: %s -> %s", src, from)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) routeTCP(ctx context.Context) {
|
||||
func (p *Peer) routeTCPToTun(ctx context.Context) {
|
||||
defer util.HandleCrash()
|
||||
for ctx.Err() == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case packet := <-p.tcpInbound:
|
||||
if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok {
|
||||
plog.G(ctx).Debugf("[TCP] Find TCP route SRC: %s to DST: %s -> %s", packet.src, packet.dst, conn.(net.Conn).RemoteAddr())
|
||||
dgram := newDatagramPacket(packet.data[:packet.length])
|
||||
err := dgram.Write(conn.(net.Conn))
|
||||
config.LPool.Put(packet.data[:])
|
||||
if err != nil {
|
||||
plog.G(ctx).Errorf("[TCP] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err)
|
||||
p.sendErr(err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
plog.G(ctx).Debugf("[TCP] Not found route, write to TUN device. SRC: %s, DST: %s", packet.src, packet.dst)
|
||||
p.tunOutbound <- &Packet{
|
||||
data: packet.data,
|
||||
length: packet.length,
|
||||
src: packet.src,
|
||||
dst: packet.dst,
|
||||
}
|
||||
}
|
||||
}
|
||||
for packet := range TCPPacketChan {
|
||||
p.tunOutbound <- packet
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) routeTUN(ctx context.Context) {
|
||||
defer util.HandleCrash()
|
||||
for ctx.Err() == nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case packet := <-p.tunInbound:
|
||||
for packet := range p.tunInbound {
|
||||
if addr, ok := p.routeMapUDP.Load(packet.dst.String()); ok {
|
||||
plog.G(ctx).Debugf("[TUN] Find UDP route to DST: %s -> %s, SRC: %s, DST: %s", packet.dst, addr, packet.src, packet.dst)
|
||||
_, err := p.conn.WriteTo(packet.data[:packet.length], addr.(net.Addr))
|
||||
@@ -338,8 +282,3 @@ func (p *Peer) routeTUN(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) Close() {
|
||||
p.conn.Close()
|
||||
}
|
||||
|
@@ -124,6 +124,10 @@ func transportTunPacketClient(ctx context.Context, tunInbound <-chan *Packet, tu
|
||||
util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to read packet from remote %s", remoteAddr)))
|
||||
return
|
||||
}
|
||||
if n == 0 {
|
||||
plog.G(ctx).Warnf("Packet length 0")
|
||||
continue
|
||||
}
|
||||
util.SafeWrite(tunOutbound, &Packet{data: buf[:], length: n}, func(v *Packet) {
|
||||
config.LPool.Put(v.data[:])
|
||||
plog.G(context.Background()).Errorf("Drop packet, LocalAddr: %s, Remote: %s, Length: %d", packetConn.LocalAddr(), remoteAddr, v.length)
|
||||
@@ -202,13 +206,7 @@ func heartbeats(ctx context.Context, tun net.Conn) {
|
||||
ticker := time.NewTicker(time.Second * 60)
|
||||
defer ticker.Stop()
|
||||
|
||||
for ; true; <-ticker.C {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
for ; ctx.Err() == nil; <-ticker.C {
|
||||
if srcIPv4 != nil {
|
||||
util.Ping(ctx, srcIPv4.String(), config.RouterIP.String())
|
||||
}
|
||||
|
@@ -19,21 +19,8 @@ func newDatagramPacket(data []byte) (r *DatagramPacket) {
|
||||
}
|
||||
}
|
||||
|
||||
// this method will return all byte array in the way: b[:], len(DatagramPacket.Data)==64k
|
||||
func readDatagramPacket(r io.Reader, b []byte) (*DatagramPacket, error) {
|
||||
_, err := io.ReadFull(r, b[:2])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dataLength := binary.BigEndian.Uint16(b[:2])
|
||||
_, err = io.ReadFull(r, b[:dataLength])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &DatagramPacket{DataLength: dataLength, Data: b[:dataLength]}, nil
|
||||
}
|
||||
|
||||
// this method will return all byte array in the way: b[:]
|
||||
func readDatagramPacketServer(r io.Reader, b []byte) (*DatagramPacket, error) {
|
||||
_, err := io.ReadFull(r, b[:2])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -46,11 +33,11 @@ func readDatagramPacketServer(r io.Reader, b []byte) (*DatagramPacket, error) {
|
||||
return &DatagramPacket{DataLength: dataLength, Data: b[:]}, nil
|
||||
}
|
||||
|
||||
func (addr *DatagramPacket) Write(w io.Writer) error {
|
||||
func (d *DatagramPacket) Write(w io.Writer) error {
|
||||
buf := config.LPool.Get().([]byte)[:]
|
||||
defer config.LPool.Put(buf[:])
|
||||
binary.BigEndian.PutUint16(buf[:2], uint16(len(addr.Data)))
|
||||
n := copy(buf[2:], addr.Data)
|
||||
binary.BigEndian.PutUint16(buf[:2], d.DataLength)
|
||||
n := copy(buf[2:], d.Data[:d.DataLength])
|
||||
_, err := w.Write(buf[:n+2])
|
||||
return err
|
||||
}
|
||||
|
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
|
||||
"net"
|
||||
|
||||
"github.com/cilium/ipam/service/allocator"
|
||||
@@ -17,6 +16,7 @@ import (
|
||||
"k8s.io/client-go/util/retry"
|
||||
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
||||
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
|
||||
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
|
||||
)
|
||||
|
||||
|
@@ -841,12 +841,8 @@ func SshJump(ctx context.Context, conf *SshConfig, flags *pflag.FlagSet, print b
|
||||
return
|
||||
}
|
||||
if print {
|
||||
msg := fmt.Sprintf("To use: export KUBECONFIG=%s", temp.Name())
|
||||
plog.G(ctx).Info(pkgutil.PrintStr(msg))
|
||||
plog.G(ctx).Infof("Use temporary kubeconfig: %s", temp.Name())
|
||||
} else {
|
||||
msg := fmt.Sprintf("To use: export KUBECONFIG=%s", temp.Name())
|
||||
plog.G(ctx).Debugf(pkgutil.PrintStr(msg))
|
||||
plog.G(ctx).Debugf("Use temporary kubeconfig: %s", temp.Name())
|
||||
}
|
||||
path = temp.Name()
|
||||
|
Reference in New Issue
Block a user