mirror of
https://github.com/kubenetworks/kubevpn.git
synced 2025-10-21 22:39:36 +08:00
206 lines
5.7 KiB
Go
206 lines
5.7 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/google/gopacket/layers"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
|
|
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
|
|
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
|
|
)
|
|
|
|
func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn, remoteAddr *net.UDPAddr) {
|
|
device := &ClientDevice{
|
|
tun: tun,
|
|
tunInbound: make(chan *Packet, MaxSize),
|
|
tunOutbound: make(chan *Packet, MaxSize),
|
|
errChan: h.errChan,
|
|
}
|
|
|
|
defer device.Close()
|
|
go device.handlePacket(ctx, remoteAddr, h.forward)
|
|
go device.readFromTun(ctx)
|
|
go device.writeToTun(ctx)
|
|
go heartbeats(ctx, device.tun)
|
|
select {
|
|
case <-device.errChan:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
|
|
type ClientDevice struct {
|
|
tun net.Conn
|
|
tunInbound chan *Packet
|
|
tunOutbound chan *Packet
|
|
errChan chan error
|
|
|
|
remote *net.UDPAddr
|
|
forward *Forwarder
|
|
}
|
|
|
|
func (d *ClientDevice) handlePacket(ctx context.Context, remoteAddr *net.UDPAddr, forward *Forwarder) {
|
|
for ctx.Err() == nil {
|
|
packetConn, err := getRemotePacketConn(ctx, forward)
|
|
if err != nil {
|
|
plog.G(ctx).Errorf("Failed to get remote conn from %s -> %s: %s", d.tun.LocalAddr(), remoteAddr, err)
|
|
time.Sleep(time.Second * 1)
|
|
continue
|
|
}
|
|
err = handlePacketClient(ctx, d.tunInbound, d.tunOutbound, packetConn, remoteAddr)
|
|
if err != nil {
|
|
plog.G(ctx).Errorf("Failed to transport data to remote %s: %v", remoteAddr, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func getRemotePacketConn(ctx context.Context, forwarder *Forwarder) (net.PacketConn, error) {
|
|
conn, err := forwarder.DialContext(ctx)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to dial forwarder")
|
|
}
|
|
|
|
if packetConn, ok := conn.(net.PacketConn); !ok {
|
|
return nil, errors.Errorf("failed to cast packet conn to PacketConn")
|
|
} else {
|
|
return packetConn, nil
|
|
}
|
|
}
|
|
|
|
func handlePacketClient(ctx context.Context, tunInbound <-chan *Packet, tunOutbound chan<- *Packet, packetConn net.PacketConn, remoteAddr net.Addr) error {
|
|
errChan := make(chan error, 2)
|
|
defer packetConn.Close()
|
|
|
|
go func() {
|
|
defer util.HandleCrash()
|
|
for packet := range tunInbound {
|
|
if packet.src.Equal(packet.dst) {
|
|
util.SafeWrite(tunOutbound, packet, func(v *Packet) {
|
|
var p = "unknown"
|
|
if _, _, protocol, err := util.ParseIP(v.data[:v.length]); err == nil {
|
|
p = layers.IPProtocol(protocol).String()
|
|
}
|
|
config.LPool.Put(v.data[:])
|
|
plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", v.src, v.dst, p, v.length)
|
|
})
|
|
continue
|
|
}
|
|
_, err := packetConn.WriteTo(packet.data[:packet.length], remoteAddr)
|
|
config.LPool.Put(packet.data[:])
|
|
if err != nil {
|
|
util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to write packet to remote %s", remoteAddr)))
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
defer util.HandleCrash()
|
|
for {
|
|
buf := config.LPool.Get().([]byte)[:]
|
|
n, _, err := packetConn.ReadFrom(buf[:])
|
|
if err != nil {
|
|
config.LPool.Put(buf[:])
|
|
util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to read packet from remote %s", remoteAddr)))
|
|
return
|
|
}
|
|
if n == 0 {
|
|
plog.G(ctx).Warnf("Packet length 0")
|
|
config.LPool.Put(buf[:])
|
|
continue
|
|
}
|
|
util.SafeWrite(tunOutbound, NewPacket(buf[:], n, nil, nil), func(v *Packet) {
|
|
config.LPool.Put(v.data[:])
|
|
plog.G(context.Background()).Errorf("Drop packet, LocalAddr: %s, Remote: %s, Length: %d", packetConn.LocalAddr(), remoteAddr, v.length)
|
|
})
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case err := <-errChan:
|
|
return err
|
|
case <-ctx.Done():
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (d *ClientDevice) readFromTun(ctx context.Context) {
|
|
defer util.HandleCrash()
|
|
for {
|
|
buf := config.LPool.Get().([]byte)[:]
|
|
n, err := d.tun.Read(buf[:])
|
|
if err != nil {
|
|
plog.G(ctx).Errorf("Failed to read packet from tun device: %s", err)
|
|
util.SafeWrite(d.errChan, err)
|
|
config.LPool.Put(buf[:])
|
|
return
|
|
}
|
|
|
|
// Try to determine network protocol number, default zero.
|
|
var src, dst net.IP
|
|
var protocol int
|
|
src, dst, protocol, err = util.ParseIP(buf[:n])
|
|
if err != nil {
|
|
plog.G(ctx).Errorf("Unknown packet: %v", err)
|
|
config.LPool.Put(buf[:])
|
|
continue
|
|
}
|
|
plog.G(context.Background()).Debugf("SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), n)
|
|
util.SafeWrite(d.tunInbound, NewPacket(buf[:], n, src, dst), func(v *Packet) {
|
|
config.LPool.Put(v.data[:])
|
|
plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", v.src, v.dst, layers.IPProtocol(protocol).String(), v.length)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (d *ClientDevice) writeToTun(ctx context.Context) {
|
|
defer util.HandleCrash()
|
|
for packet := range d.tunOutbound {
|
|
_, err := d.tun.Write(packet.data[:packet.length])
|
|
config.LPool.Put(packet.data[:])
|
|
if err != nil {
|
|
plog.G(ctx).Errorf("Failed to write packet to tun device: %v", err)
|
|
util.SafeWrite(d.errChan, err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *ClientDevice) Close() {
|
|
d.tun.Close()
|
|
util.SafeClose(d.tunInbound)
|
|
util.SafeClose(d.tunOutbound)
|
|
}
|
|
|
|
func heartbeats(ctx context.Context, tun net.Conn) {
|
|
tunIfi, err := util.GetTunDeviceByConn(tun)
|
|
if err != nil {
|
|
plog.G(ctx).Errorf("Failed to get tun device: %v", err)
|
|
return
|
|
}
|
|
srcIPv4, srcIPv6, dockerSrcIPv4, err := util.GetTunDeviceIP(tunIfi.Name)
|
|
if err != nil {
|
|
plog.G(ctx).Errorf("Failed to get tun device %s IP: %v", tunIfi.Name, err)
|
|
return
|
|
}
|
|
|
|
ticker := time.NewTicker(time.Second * 60)
|
|
defer ticker.Stop()
|
|
|
|
for ; ctx.Err() == nil; <-ticker.C {
|
|
if srcIPv4 != nil {
|
|
util.Ping(ctx, srcIPv4.String(), config.RouterIP.String())
|
|
}
|
|
if srcIPv6 != nil {
|
|
util.Ping(ctx, srcIPv6.String(), config.RouterIP6.String())
|
|
}
|
|
if dockerSrcIPv4 != nil {
|
|
util.Ping(ctx, dockerSrcIPv4.String(), config.DockerRouterIP.String())
|
|
}
|
|
}
|
|
}
|