refactor: optimize logic (#515)

* refactor: optimize logic
This commit is contained in:
naison
2025-04-05 21:48:18 +08:00
committed by GitHub
parent d55d290677
commit e8735a68be
27 changed files with 386 additions and 502 deletions

View File

@@ -39,17 +39,15 @@ spec:
sysctl -w net.ipv6.conf.all.disable_ipv6=0
sysctl -w net.ipv6.conf.all.forwarding=1
update-alternatives --set iptables /usr/sbin/iptables-legacy
iptables -F
ip6tables -F
iptables -P INPUT ACCEPT
ip6tables -P INPUT ACCEPT
iptables -P FORWARD ACCEPT
ip6tables -P FORWARD ACCEPT
iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE
ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE
kubevpn server -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:10801" -L "gudp://:10802" --debug=true
kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" -l "gudp://:10802" --debug=true
{{- else }}
- kubevpn server -L "tcp://:10800" -L "gtcp://:10801" -L "gudp://:10802" --debug=true
- kubevpn server -l "tcp://:10800" -l "gtcp://:10801" -l "gudp://:10802" --debug=true
{{- end }}
command:
- /bin/sh

View File

@@ -17,7 +17,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func CmdControlPlane(_ cmdutil.Factory) *cobra.Command {
func CmdControlPlane(cmdutil.Factory) *cobra.Command {
var (
watchDirectoryFilename string
port uint = 9002

View File

@@ -21,7 +21,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func CmdDaemon(_ cmdutil.Factory) *cobra.Command {
func CmdDaemon(cmdutil.Factory) *cobra.Command {
var opt = &daemon.SvrOption{}
cmd := &cobra.Command{
Use: "daemon",

View File

@@ -8,7 +8,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util/regctl"
)
func CmdImageCopy(_ cmdutil.Factory) *cobra.Command {
func CmdImageCopy(cmdutil.Factory) *cobra.Command {
var imageCmd = &cobra.Command{
Use: "image <cmd>",
Short: "copy images",

View File

@@ -20,7 +20,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func CmdServer(_ cmdutil.Factory) *cobra.Command {
func CmdServer(cmdutil.Factory) *cobra.Command {
var route = &core.Route{}
cmd := &cobra.Command{
Use: "server",
@@ -30,8 +30,8 @@ func CmdServer(_ cmdutil.Factory) *cobra.Command {
Server side, startup traffic manager, forward inbound and outbound traffic.
`)),
Example: templates.Examples(i18n.T(`
# server node
kubevpn server -L "tcp://:10800" -L "tun://127.0.0.1:8422?net=198.19.0.123/32"
# server listener
kubevpn server -l "tcp://:10800" -l "tun://127.0.0.1:8422?net=198.19.0.123/32"
`)),
PreRun: func(*cobra.Command, []string) {
runtime.GOMAXPROCS(0)
@@ -50,8 +50,8 @@ func CmdServer(_ cmdutil.Factory) *cobra.Command {
return handler.Run(ctx, servers)
},
}
cmd.Flags().StringArrayVarP(&route.ServeNodes, "node", "L", []string{}, "Startup node server. eg: tcp://localhost:1080")
cmd.Flags().StringVarP(&route.ChainNode, "chain", "F", "", "Forward chain. eg: tcp://192.168.1.100:2345")
cmd.Flags().StringArrayVarP(&route.Listeners, "listener", "l", []string{}, "Startup listener server. eg: tcp://localhost:1080")
cmd.Flags().StringVarP(&route.Forwarder, "forwarder", "f", "", "Special forwarder. eg: tcp://192.168.1.100:2345")
cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug log or not")
return cmd
}

View File

@@ -27,7 +27,7 @@ import (
// CmdSSH
// Remember to use network mask 32, because ssh using unique network CIDR 198.18.0.0/16
func CmdSSH(_ cmdutil.Factory) *cobra.Command {
func CmdSSH(cmdutil.Factory) *cobra.Command {
var sshConf = &pkgssh.SshConfig{}
var extraCIDR []string
var platform string

View File

@@ -15,7 +15,7 @@ import (
// CmdSSHDaemon
// set local tun ip 198.19.0.1/32, remember to use mask 32
func CmdSSHDaemon(_ cmdutil.Factory) *cobra.Command {
func CmdSSHDaemon(cmdutil.Factory) *cobra.Command {
var clientIP string
cmd := &cobra.Command{
Use: "ssh-daemon",

View File

@@ -10,7 +10,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func CmdSyncthing(_ cmdutil.Factory) *cobra.Command {
func CmdSyncthing(cmdutil.Factory) *cobra.Command {
var detach bool
var dir string
cmd := &cobra.Command{

View File

@@ -16,7 +16,7 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/upgrade"
)
func CmdUpgrade(_ cmdutil.Factory) *cobra.Command {
func CmdUpgrade(cmdutil.Factory) *cobra.Command {
cmd := &cobra.Command{
Use: "upgrade",
Short: i18n.T("Upgrade kubevpn client to latest version"),

View File

@@ -160,7 +160,7 @@ var (
)
var (
KeepAliveTime = 180 * time.Second
KeepAliveTime = 60 * time.Second
DialTimeout = 15 * time.Second
HandshakeTimeout = 5 * time.Second
ConnectTimeout = 5 * time.Second

View File

@@ -3,34 +3,33 @@ package core
import (
"context"
"errors"
"math"
"net"
)
var (
// ErrorEmptyChain is an error that implies the chain is empty.
ErrorEmptyChain = errors.New("empty chain")
// ErrorEmptyForwarder is an error that implies the forward is empty.
ErrorEmptyForwarder = errors.New("empty forwarder")
)
type Chain struct {
type Forwarder struct {
retries int
node *Node
}
func NewChain(retry int, node *Node) *Chain {
return &Chain{retries: retry, node: node}
func NewForwarder(retry int, node *Node) *Forwarder {
return &Forwarder{retries: retry, node: node}
}
func (c *Chain) Node() *Node {
func (c *Forwarder) Node() *Node {
return c.node
}
func (c *Chain) IsEmpty() bool {
func (c *Forwarder) IsEmpty() bool {
return c == nil || c.node == nil
}
func (c *Chain) DialContext(ctx context.Context) (conn net.Conn, err error) {
for i := 0; i < int(math.Max(float64(1), float64(c.retries))); i++ {
func (c *Forwarder) DialContext(ctx context.Context) (conn net.Conn, err error) {
for i := 0; i < max(1, c.retries); i++ {
conn, err = c.dial(ctx)
if err == nil {
break
@@ -39,9 +38,9 @@ func (c *Chain) DialContext(ctx context.Context) (conn net.Conn, err error) {
return
}
func (c *Chain) dial(ctx context.Context) (net.Conn, error) {
func (c *Forwarder) dial(ctx context.Context) (net.Conn, error) {
if c.IsEmpty() {
return nil, ErrorEmptyChain
return nil, ErrorEmptyForwarder
}
conn, err := c.getConn(ctx)
@@ -58,7 +57,7 @@ func (c *Chain) dial(ctx context.Context) (net.Conn, error) {
return cc, nil
}
func (*Chain) resolve(addr string) string {
func (*Forwarder) resolve(addr string) string {
if host, port, err := net.SplitHostPort(addr); err == nil {
if ips, err := net.LookupIP(host); err == nil && len(ips) > 0 {
return net.JoinHostPort(ips[0].String(), port)
@@ -67,9 +66,9 @@ func (*Chain) resolve(addr string) string {
return addr
}
func (c *Chain) getConn(ctx context.Context) (net.Conn, error) {
func (c *Forwarder) getConn(ctx context.Context) (net.Conn, error) {
if c.IsEmpty() {
return nil, ErrorEmptyChain
return nil, ErrorEmptyForwarder
}
return c.Node().Client.Dial(ctx, c.resolve(c.Node().Addr))
}

View File

@@ -18,7 +18,7 @@ import (
type gvisorTCPHandler struct {
// map[srcIP]net.Conn
routeMapTCP *sync.Map
packetChan chan *datagramPacket
packetChan chan *DatagramPacket
}
func GvisorTCPHandler() Handler {
@@ -32,7 +32,7 @@ func (h *gvisorTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) {
defer tcpConn.Close()
cancel, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
plog.G(ctx).Debugf("[TCP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr())
plog.G(ctx).Debugf("[TUN-GVISOR] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr())
h.handle(cancel, tcpConn)
}

View File

@@ -20,7 +20,7 @@ import (
)
func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, conn net.Conn, endpoint *channel.Endpoint) {
tcpConn, _ := newGvisorFakeUDPTunnelConnOverTCP(ctx, conn)
tcpConn, _ := newGvisorUDPConnOverTCP(ctx, conn)
for {
select {
case <-ctx.Done():
@@ -34,7 +34,7 @@ func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, c
buf := pktBuffer.ToView().AsSlice()
_, err := tcpConn.Write(buf)
if err != nil {
plog.G(ctx).Errorf("[TUN] Failed to write data to tun device: %v", err)
plog.G(ctx).Errorf("[TUN-GVISOR] Failed to write data to tun device: %v", err)
}
}
}
@@ -42,7 +42,9 @@ func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, c
// tun --> dispatcher
func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, conn net.Conn, endpoint *channel.Endpoint) {
tcpConn, _ := newGvisorFakeUDPTunnelConnOverTCP(ctx, conn)
tcpConn, _ := newGvisorUDPConnOverTCP(ctx, conn)
defer h.removeFromRouteMapTCP(ctx, conn)
for {
select {
case <-ctx.Done():
@@ -53,12 +55,12 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
buf := config.LPool.Get().([]byte)[:]
read, err := tcpConn.Read(buf[:])
if err != nil {
plog.G(ctx).Errorf("[TUN] Failed to read from tcp conn: %v", err)
plog.G(ctx).Errorf("[TUN-GVISOR] Failed to read from tcp conn: %v", err)
config.LPool.Put(buf[:])
return
}
if read == 0 {
plog.G(ctx).Warnf("[TUN] Read from tcp conn length is %d", read)
plog.G(ctx).Warnf("[TUN-GVISOR] Read from tcp conn length is %d", read)
config.LPool.Put(buf[:])
continue
}
@@ -83,7 +85,7 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
protocol = header.IPv6ProtocolNumber
ipHeader, err := ipv6.ParseHeader(buf[:read])
if err != nil {
plog.G(ctx).Errorf("Failed to parse IPv6 header: %s", err.Error())
plog.G(ctx).Errorf("[TUN-GVISOR] Failed to parse IPv6 header: %s", err.Error())
config.LPool.Put(buf[:])
continue
}
@@ -96,11 +98,11 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
continue
}
h.addRoute(ctx, src, conn)
h.addToRouteMapTCP(ctx, src, conn)
// inner ip like 198.19.0.100/102/103 connect each other
if config.CIDR.Contains(dst) || config.CIDR6.Contains(dst) {
plog.G(ctx).Debugf("[TUN-RAW] Forward to TUN device, SRC: %s, DST: %s, Length: %d", src.String(), dst.String(), read)
util.SafeWrite(h.packetChan, &datagramPacket{
plog.G(ctx).Debugf("[TUN-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Length: %d", src.String(), dst.String(), read)
util.SafeWrite(h.packetChan, &DatagramPacket{
DataLength: uint16(read),
Data: buf[:],
})
@@ -115,18 +117,28 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c
sniffer.LogPacket("[gVISOR] ", sniffer.DirectionRecv, protocol, pkt)
endpoint.InjectInbound(protocol, pkt)
pkt.DecRef()
plog.G(ctx).Debugf("[TUN-%s] Write to Gvisor IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), layers.IPProtocol(ipProtocol).String(), src.String(), dst, read)
plog.G(ctx).Debugf("[TUN-GVISOR] Write to Gvisor IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), src.String(), dst, read)
}
}
func (h *gvisorTCPHandler) addRoute(ctx context.Context, src net.IP, tcpConn net.Conn) {
func (h *gvisorTCPHandler) addToRouteMapTCP(ctx context.Context, src net.IP, tcpConn net.Conn) {
value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn)
if loaded {
if tcpConn != value.(net.Conn) {
h.routeMapTCP.Store(src.String(), tcpConn)
plog.G(ctx).Debugf("[TCP] Replace route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr())
plog.G(ctx).Debugf("[TUN-GVISOR] Replace route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr())
}
} else {
plog.G(ctx).Debugf("[TCP] Add new route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr())
plog.G(ctx).Debugf("[TUN-GVISOR] Add new route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr())
}
}
func (h *gvisorTCPHandler) removeFromRouteMapTCP(ctx context.Context, tcpConn net.Conn) {
h.routeMapTCP.Range(func(key, value any) bool {
if value.(net.Conn) == tcpConn {
h.routeMapTCP.Delete(key)
plog.G(ctx).Debugf("[TCP-GVISOR] Delete to DST %s by conn %s from globle route map TCP", key, tcpConn.LocalAddr())
}
return true
})
}

View File

@@ -44,17 +44,17 @@ func (h *gvisorUDPHandler) Handle(ctx context.Context, tcpConn net.Conn) {
}
// fake udp connect over tcp
type gvisorFakeUDPTunnelConn struct {
type gvisorUDPConnOverTCP struct {
// tcp connection
net.Conn
ctx context.Context
}
func newGvisorFakeUDPTunnelConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) {
return &gvisorFakeUDPTunnelConn{ctx: ctx, Conn: conn}, nil
func newGvisorUDPConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) {
return &gvisorUDPConnOverTCP{ctx: ctx, Conn: conn}, nil
}
func (c *gvisorFakeUDPTunnelConn) Read(b []byte) (int, error) {
func (c *gvisorUDPConnOverTCP) Read(b []byte) (int, error) {
select {
case <-c.ctx.Done():
return 0, c.ctx.Err()
@@ -67,15 +67,15 @@ func (c *gvisorFakeUDPTunnelConn) Read(b []byte) (int, error) {
}
}
func (c *gvisorFakeUDPTunnelConn) Write(b []byte) (int, error) {
dgram := newDatagramPacket(b)
if err := dgram.Write(c.Conn); err != nil {
func (c *gvisorUDPConnOverTCP) Write(b []byte) (int, error) {
packet := newDatagramPacket(b)
if err := packet.Write(c.Conn); err != nil {
return 0, err
}
return len(b), nil
}
func (c *gvisorFakeUDPTunnelConn) Close() error {
func (c *gvisorUDPConnOverTCP) Close() error {
if cc, ok := c.Conn.(interface{ CloseRead() error }); ok {
_ = cc.CloseRead()
}
@@ -184,13 +184,13 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) {
errChan <- err
return
}
dgram := newDatagramPacket(buf[:n])
if err = dgram.Write(tcpConn); err != nil {
plog.G(ctx).Errorf("[TUN-UDP] Error: %s <- %s : %s", tcpConn.RemoteAddr(), dgram.Addr(), err)
packet := newDatagramPacket(buf[:n])
if err = packet.Write(tcpConn); err != nil {
plog.G(ctx).Errorf("[TUN-UDP] Error: %s <- %s : %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), err)
errChan <- err
return
}
plog.G(ctx).Debugf("[TUN-UDP] %s <<< %s length: %d", tcpConn.RemoteAddr(), dgram.Addr(), len(dgram.Data))
plog.G(ctx).Debugf("[TUN-UDP] %s <<< %s length: %d", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), len(packet.Data))
}
}()
err := <-errChan

View File

@@ -4,14 +4,12 @@ import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"
"github.com/containernetworking/cni/pkg/types"
"github.com/pkg/errors"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"github.com/wencaiwulue/kubevpn/v2/pkg/tun"
)
@@ -20,71 +18,63 @@ var (
// RouteMapTCP map[srcIP]net.Conn Globe route table for inner ip
RouteMapTCP = &sync.Map{}
// TCPPacketChan tcp connects
TCPPacketChan = make(chan *datagramPacket, MaxSize)
TCPPacketChan = make(chan *DatagramPacket, MaxSize)
)
type TCPUDPacket struct {
data *datagramPacket
data *DatagramPacket
}
// Route example:
// -L "tcp://:10800" -L "tun://:8422?net=198.19.0.100/16"
// -L "tun:/10.233.24.133:8422?net=198.19.0.102/16&route=198.19.0.0/16"
// -L "tun:/127.0.0.1:8422?net=198.19.0.102/16&route=198.19.0.0/16,10.233.0.0/16" -F "tcp://127.0.0.1:10800"
// -l "tcp://:10800" -l "tun://:8422?net=198.19.0.100/16"
// -l "tun:/10.233.24.133:8422?net=198.19.0.102/16&route=198.19.0.0/16"
// -l "tun:/127.0.0.1:8422?net=198.19.0.102/16&route=198.19.0.0/16,10.233.0.0/16" -f "tcp://127.0.0.1:10800"
type Route struct {
ServeNodes []string // -L tun
ChainNode string // -F tcp
Retries int
Listeners []string // -l tun
Forwarder string // -f tcp
Retries int
}
func (r *Route) parseChain() (*Chain, error) {
node, err := parseChainNode(r.ChainNode)
func (r *Route) ParseForwarder() (*Forwarder, error) {
forwarder, err := ParseNode(r.Forwarder)
if err != nil {
return nil, err
}
return NewChain(r.Retries, node), nil
}
func parseChainNode(ns string) (*Node, error) {
node, err := ParseNode(ns)
if err != nil {
return nil, err
}
node.Client = &Client{
Connector: UDPOverTCPTunnelConnector(),
forwarder.Client = &Client{
Connector: NewUDPOverTCPConnector(),
Transporter: TCPTransporter(),
}
return node, nil
return NewForwarder(r.Retries, forwarder), nil
}
func (r *Route) GenerateServers() ([]Server, error) {
chain, err := r.parseChain()
forwarder, err := r.ParseForwarder()
if err != nil && !errors.Is(err, ErrorInvalidNode) {
plog.G(context.Background()).Errorf("Failed to parse chain: %v", err)
plog.G(context.Background()).Errorf("Failed to parse forwarder: %v", err)
return nil, err
}
servers := make([]Server, 0, len(r.ServeNodes))
for _, serveNode := range r.ServeNodes {
servers := make([]Server, 0, len(r.Listeners))
for _, l := range r.Listeners {
var node *Node
node, err = ParseNode(serveNode)
node, err = ParseNode(l)
if err != nil {
plog.G(context.Background()).Errorf("Failed to parse node %s: %v", serveNode, err)
plog.G(context.Background()).Errorf("Failed to parse node %s: %v", l, err)
return nil, err
}
var ln net.Listener
var listener net.Listener
var handler Handler
switch node.Protocol {
case "tun":
handler = TunHandler(chain, node)
ln, err = tun.Listener(tun.Config{
handler = TunHandler(forwarder, node)
listener, err = tun.Listener(tun.Config{
Name: node.Get("name"),
Addr: node.Get("net"),
Addr6: os.Getenv(config.EnvInboundPodTunIPv6),
Addr6: node.Get("net6"),
MTU: node.GetInt("mtu"),
Routes: parseIPRoutes(node.Get("route")),
Routes: parseRoutes(node.Get("route")),
Gateway: node.Get("gw"),
})
if err != nil {
@@ -93,28 +83,28 @@ func (r *Route) GenerateServers() ([]Server, error) {
}
case "tcp":
handler = TCPHandler()
ln, err = TCPListener(node.Addr)
listener, err = TCPListener(node.Addr)
if err != nil {
plog.G(context.Background()).Errorf("Failed to create tcp listener: %v", err)
return nil, err
}
case "gtcp":
handler = GvisorTCPHandler()
ln, err = GvisorTCPListener(node.Addr)
listener, err = GvisorTCPListener(node.Addr)
if err != nil {
plog.G(context.Background()).Errorf("Failed to create gvisor tcp listener: %v", err)
return nil, err
}
case "gudp":
handler = GvisorUDPHandler()
ln, err = GvisorUDPListener(node.Addr)
listener, err = GvisorUDPListener(node.Addr)
if err != nil {
plog.G(context.Background()).Errorf("Failed to create gvisor udp listener: %v", err)
return nil, err
}
case "ssh":
handler = SSHHandler()
ln, err = SSHListener(node.Addr)
listener, err = SSHListener(node.Addr)
if err != nil {
plog.G(context.Background()).Errorf("Failed to create ssh listener: %v", err)
return nil, err
@@ -123,21 +113,18 @@ func (r *Route) GenerateServers() ([]Server, error) {
plog.G(context.Background()).Errorf("Not support protocol %s", node.Protocol)
return nil, fmt.Errorf("not support protocol %s", node.Protocol)
}
servers = append(servers, Server{Listener: ln, Handler: handler})
servers = append(servers, Server{Listener: listener, Handler: handler})
}
return servers, nil
}
func parseIPRoutes(routeStringList string) (routes []types.Route) {
if len(routeStringList) == 0 {
return
}
routeList := strings.Split(routeStringList, ",")
for _, route := range routeList {
func parseRoutes(str string) []types.Route {
var routes []types.Route
list := strings.Split(str, ",")
for _, route := range list {
if _, ipNet, _ := net.ParseCIDR(strings.TrimSpace(route)); ipNet != nil {
routes = append(routes, types.Route{Dst: *ipNet})
}
}
return
return routes
}

View File

@@ -3,23 +3,21 @@ package core
import (
"context"
"net"
"strings"
"sync"
"time"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
type fakeUDPTunnelConnector struct {
type UDPOverTCPConnector struct {
}
func UDPOverTCPTunnelConnector() Connector {
return &fakeUDPTunnelConnector{}
func NewUDPOverTCPConnector() Connector {
return &UDPOverTCPConnector{}
}
func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) {
func (c *UDPOverTCPConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) {
//defer conn.SetDeadline(time.Time{})
switch con := conn.(type) {
case *net.TCPConn:
@@ -31,44 +29,32 @@ func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Co
if err != nil {
return nil, err
}
err = con.SetKeepAlivePeriod(15 * time.Second)
err = con.SetKeepAlivePeriod(config.KeepAliveTime)
if err != nil {
return nil, err
}
}
return newFakeUDPTunnelConnOverTCP(ctx, conn)
return newUDPConnOverTCP(ctx, conn)
}
type fakeUdpHandler struct {
type UDPOverTCPHandler struct {
// map[srcIP]net.Conn
routeMapTCP *sync.Map
packetChan chan *datagramPacket
packetChan chan *DatagramPacket
}
func TCPHandler() Handler {
return &fakeUdpHandler{
return &UDPOverTCPHandler{
routeMapTCP: RouteMapTCP,
packetChan: TCPPacketChan,
}
}
func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) {
func (h *UDPOverTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) {
defer tcpConn.Close()
plog.G(ctx).Debugf("[TCP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr())
defer func(addr net.Addr) {
var keys []string
h.routeMapTCP.Range(func(key, value any) bool {
if value.(net.Conn) == tcpConn {
keys = append(keys, key.(string))
}
return true
})
for _, key := range keys {
h.routeMapTCP.Delete(key)
}
plog.G(ctx).Debugf("[TCP] To %s by conn %s from globle route map TCP", strings.Join(keys, " "), addr)
}(tcpConn.LocalAddr())
defer h.removeFromRouteMapTCP(ctx, tcpConn)
for {
select {
@@ -78,7 +64,7 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) {
}
buf := config.LPool.Get().([]byte)[:]
dgram, err := readDatagramPacketServer(tcpConn, buf[:])
packet, err := readDatagramPacketServer(tcpConn, buf[:])
if err != nil {
plog.G(ctx).Errorf("[TCP] %s -> %s : %v", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), err)
config.LPool.Put(buf[:])
@@ -86,7 +72,7 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) {
}
var src net.IP
src, _, err = util.ParseIP(dgram.Data[:dgram.DataLength])
src, _, err = util.ParseIP(packet.Data[:packet.DataLength])
if err != nil {
plog.G(ctx).Errorf("[TCP] Unknown packet")
config.LPool.Put(buf[:])
@@ -101,43 +87,54 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) {
} else {
plog.G(ctx).Debugf("[TCP] Add new route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr())
}
util.SafeWrite(h.packetChan, dgram)
util.SafeWrite(h.packetChan, packet)
}
}
// fake udp connect over tcp
type fakeUDPTunnelConn struct {
func (h *UDPOverTCPHandler) removeFromRouteMapTCP(ctx context.Context, tcpConn net.Conn) {
h.routeMapTCP.Range(func(key, value any) bool {
if value.(net.Conn) == tcpConn {
plog.G(ctx).Debugf("[TCP] Delete to DST: %s by conn %s from globle route map TCP", key, tcpConn.LocalAddr())
}
return true
})
}
var _ net.PacketConn = (*UDPConnOverTCP)(nil)
// UDPConnOverTCP fake udp connection over tcp connection
type UDPConnOverTCP struct {
// tcp connection
net.Conn
ctx context.Context
}
func newFakeUDPTunnelConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) {
return &fakeUDPTunnelConn{ctx: ctx, Conn: conn}, nil
func newUDPConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) {
return &UDPConnOverTCP{ctx: ctx, Conn: conn}, nil
}
func (c *fakeUDPTunnelConn) ReadFrom(b []byte) (int, net.Addr, error) {
func (c *UDPConnOverTCP) ReadFrom(b []byte) (int, net.Addr, error) {
select {
case <-c.ctx.Done():
return 0, nil, c.ctx.Err()
default:
dgram, err := readDatagramPacket(c.Conn, b)
packet, err := readDatagramPacket(c.Conn, b)
if err != nil {
return 0, nil, err
}
return int(dgram.DataLength), dgram.Addr(), nil
return int(packet.DataLength), nil, nil
}
}
func (c *fakeUDPTunnelConn) WriteTo(b []byte, _ net.Addr) (int, error) {
dgram := newDatagramPacket(b)
if err := dgram.Write(c.Conn); err != nil {
func (c *UDPConnOverTCP) WriteTo(b []byte, _ net.Addr) (int, error) {
packet := newDatagramPacket(b)
if err := packet.Write(c.Conn); err != nil {
return 0, err
}
return len(b), nil
}
func (c *fakeUDPTunnelConn) Close() error {
func (c *UDPConnOverTCP) Close() error {
if cc, ok := c.Conn.(interface{ CloseRead() error }); ok {
_ = cc.CloseRead()
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"net"
"sync"
"time"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
@@ -12,103 +11,83 @@ import (
)
const (
MaxSize = 1000
MaxSize = 100
)
type tunHandler struct {
chain *Chain
forward *Forwarder
node *Node
routeMapUDP *RouteMap
// map[srcIP]net.Conn
routeMapUDP *sync.Map
routeMapTCP *sync.Map
chExit chan error
}
type RouteMap struct {
lock *sync.RWMutex
routes map[string]net.Addr
}
func NewRouteMap() *RouteMap {
return &RouteMap{
lock: &sync.RWMutex{},
routes: map[string]net.Addr{},
}
}
func (n *RouteMap) LoadOrStore(to net.IP, addr net.Addr) (net.Addr, bool) {
n.lock.RLock()
route, load := n.routes[to.String()]
n.lock.RUnlock()
if load {
return route, true
}
n.lock.Lock()
defer n.lock.Unlock()
n.routes[to.String()] = addr
return addr, false
}
func (n *RouteMap) Store(to net.IP, addr net.Addr) {
n.lock.Lock()
defer n.lock.Unlock()
n.routes[to.String()] = addr
}
func (n *RouteMap) RouteTo(ip net.IP) net.Addr {
n.lock.RLock()
defer n.lock.RUnlock()
return n.routes[ip.String()]
errChan chan error
}
// TunHandler creates a handler for tun tunnel.
func TunHandler(chain *Chain, node *Node) Handler {
func TunHandler(forward *Forwarder, node *Node) Handler {
return &tunHandler{
chain: chain,
forward: forward,
node: node,
routeMapUDP: NewRouteMap(),
routeMapUDP: &sync.Map{},
routeMapTCP: RouteMapTCP,
chExit: make(chan error, 1),
errChan: make(chan error, 1),
}
}
func (h *tunHandler) Handle(ctx context.Context, tun net.Conn) {
if h.node.Remote != "" {
h.HandleClient(ctx, tun)
if remote := h.node.Remote; remote != "" {
remoteAddr, err := net.ResolveUDPAddr("udp", remote)
if err != nil {
plog.G(ctx).Errorf("[TUN-CLIENT] Failed to resolve udp addr %s: %v", remote, err)
return
}
h.HandleClient(ctx, tun, remoteAddr)
} else {
h.HandleServer(ctx, tun)
}
}
func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) {
device := &Device{
tun: tun,
tunInbound: make(chan *Packet, MaxSize),
tunOutbound: make(chan *Packet, MaxSize),
errChan: h.errChan,
}
defer device.Close()
go device.readFromTUN()
go device.writeToTUN()
go device.transport(ctx, h.node.Addr, h.routeMapUDP, h.routeMapTCP)
select {
case err := <-device.errChan:
plog.G(ctx).Errorf("Device exit: %v", err)
return
case <-ctx.Done():
return
}
}
type Device struct {
tun net.Conn
tunInbound chan *DataElem
tunOutbound chan *DataElem
tunInbound chan *Packet
tunOutbound chan *Packet
// your main logic
tunInboundHandler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)
chExit chan error
errChan chan error
}
func (d *Device) readFromTun() {
func (d *Device) readFromTUN() {
defer util.HandleCrash()
for {
buf := config.LPool.Get().([]byte)[:]
n, err := d.tun.Read(buf[:])
if err != nil {
config.LPool.Put(buf[:])
plog.G(context.Background()).Errorf("[TUN] Failed to read from tun: %v", err)
util.SafeWrite(d.chExit, err)
plog.G(context.Background()).Errorf("[TUN] Failed to read from tun device: %v", err)
util.SafeWrite(d.errChan, err)
return
}
if n == 0 {
plog.G(context.Background()).Errorf("[TUN] Read packet length 0")
config.LPool.Put(buf[:])
continue
}
src, dst, err := util.ParseIP(buf[:n])
if err != nil {
@@ -117,8 +96,8 @@ func (d *Device) readFromTun() {
continue
}
plog.G(context.Background()).Debugf("[TUN] SRC: %s --> DST: %s, length: %d", src, dst, n)
util.SafeWrite(d.tunInbound, &DataElem{
plog.G(context.Background()).Debugf("[TUN] SRC: %s, DST: %s, Length: %d", src, dst, n)
util.SafeWrite(d.tunInbound, &Packet{
data: buf[:],
length: n,
src: src,
@@ -127,13 +106,14 @@ func (d *Device) readFromTun() {
}
}
func (d *Device) writeToTun() {
func (d *Device) writeToTUN() {
defer util.HandleCrash()
for e := range d.tunOutbound {
_, err := d.tun.Write(e.data[:e.length])
config.LPool.Put(e.data[:])
for packet := range d.tunOutbound {
_, err := d.tun.Write(packet.data[:packet.length])
config.LPool.Put(packet.data[:])
if err != nil {
util.SafeWrite(d.chExit, err)
plog.G(context.Background()).Errorf("[TUN] Failed to write to tun device: %v", err)
util.SafeWrite(d.errChan, err)
return
}
}
@@ -146,91 +126,51 @@ func (d *Device) Close() {
util.SafeClose(TCPPacketChan)
}
func heartbeats(ctx context.Context, tun net.Conn) {
tunIfi, err := util.GetTunDeviceByConn(tun)
if err != nil {
plog.G(ctx).Errorf("Failed to get tun device: %s", err.Error())
return
}
srcIPv4, srcIPv6, dockerSrcIPv4, err := util.GetTunDeviceIP(tunIfi.Name)
if err != nil {
return
}
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for ; true; <-ticker.C {
select {
case <-ctx.Done():
return
default:
}
if srcIPv4 != nil {
go util.Ping(ctx, srcIPv4.String(), config.RouterIP.String())
}
if srcIPv6 != nil {
go util.Ping(ctx, srcIPv6.String(), config.RouterIP6.String())
}
if dockerSrcIPv4 != nil {
go util.Ping(ctx, dockerSrcIPv4.String(), config.DockerRouterIP.String())
}
}
}
func (d *Device) Start(ctx context.Context) {
go d.readFromTun()
go d.tunInboundHandler(d.tunInbound, d.tunOutbound)
go d.writeToTun()
select {
case err := <-d.chExit:
plog.G(ctx).Errorf("Device exit: %v", err)
return
case <-ctx.Done():
return
}
}
func (d *Device) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)) {
d.tunInboundHandler = handler
}
func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) {
device := &Device{
tun: tun,
tunInbound: make(chan *DataElem, MaxSize),
tunOutbound: make(chan *DataElem, MaxSize),
chExit: h.chExit,
}
device.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) {
for ctx.Err() == nil {
packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", h.node.Addr)
func (d *Device) transport(ctx context.Context, addr string, routeMapUDP *sync.Map, routeMapTCP *sync.Map) {
for ctx.Err() == nil {
func() {
packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", addr)
if err != nil {
plog.G(ctx).Errorf("[UDP] Failed to listen %s: %v", h.node.Addr, err)
plog.G(ctx).Errorf("[UDP] Failed to listen %s: %v", addr, err)
return
}
err = transportTunServer(ctx, tunInbound, tunOutbound, packetConn, h.routeMapUDP, h.routeMapTCP)
if err != nil {
plog.G(ctx).Errorf("[TUN] %s: %v", tun.LocalAddr(), err)
}
}
})
defer device.Close()
device.Start(ctx)
p := &Peer{
conn: packetConn,
tcpInbound: make(chan *Packet, MaxSize),
tunInbound: d.tunInbound,
tunOutbound: d.tunOutbound,
routeMapUDP: routeMapUDP,
routeMapTCP: routeMapTCP,
errChan: d.errChan,
}
defer p.Close()
go p.readFromConn()
go p.readFromTCPConn()
go p.routeTCP()
go p.routeTUN()
select {
case err = <-p.errChan:
plog.G(ctx).Errorf("[TUN] %s: %v", d.tun.LocalAddr(), err)
return
case <-ctx.Done():
return
}
}()
}
}
type DataElem struct {
type Packet struct {
data []byte
length int
src net.IP
dst net.IP
}
func NewDataElem(data []byte, length int, src net.IP, dst net.IP) *DataElem {
return &DataElem{
func NewDataElem(data []byte, length int, src net.IP, dst net.IP) *Packet {
return &Packet{
data: data,
length: length,
src: src,
@@ -238,32 +178,24 @@ func NewDataElem(data []byte, length int, src net.IP, dst net.IP) *DataElem {
}
}
func (d *DataElem) Data() []byte {
func (d *Packet) Data() []byte {
return d.data
}
func (d *DataElem) Length() int {
func (d *Packet) Length() int {
return d.length
}
type udpElem struct {
from net.Addr
data []byte
length int
src net.IP
dst net.IP
}
type Peer struct {
conn net.PacketConn
connInbound chan *udpElem
tcpInbound chan *Packet
tunInbound <-chan *DataElem
tunOutbound chan<- *DataElem
tunInbound chan *Packet
tunOutbound chan<- *Packet
// map[srcIP.String()]net.Addr for udp
routeMapUDP *RouteMap
routeMapUDP *sync.Map
// map[srcIP.String()]net.Conn for tcp
routeMapTCP *sync.Map
@@ -294,17 +226,16 @@ func (p *Peer) readFromConn() {
plog.G(context.Background()).Errorf("[TUN] Unknown packet: %v", err)
continue
}
if addr, loaded := p.routeMapUDP.LoadOrStore(src, from); loaded {
if addr.String() != from.String() {
p.routeMapUDP.Store(src, from)
if addr, loaded := p.routeMapUDP.LoadOrStore(src.String(), from); loaded {
if addr.(net.Addr).String() != from.String() {
p.routeMapUDP.Store(src.String(), from)
plog.G(context.Background()).Debugf("[TUN] Replace route map UDP: %s -> %s", src, from)
}
} else {
plog.G(context.Background()).Debugf("[TUN] Add new route map UDP: %s -> %s", src, from)
}
p.connInbound <- &udpElem{
from: from,
p.tunInbound <- &Packet{
data: buf[:],
length: n,
src: src,
@@ -318,49 +249,40 @@ func (p *Peer) readFromTCPConn() {
for packet := range TCPPacketChan {
src, dst, err := util.ParseIP(packet.Data)
if err != nil {
plog.G(context.Background()).Errorf("[TUN] Unknown packet")
plog.G(context.Background()).Errorf("[TCP] Unknown packet")
config.LPool.Put(packet.Data[:])
continue
}
u := &udpElem{
plog.G(context.Background()).Debugf("[TCP] SRC: %s > DST: %s Length: %d", src, dst, packet.DataLength)
p.tcpInbound <- &Packet{
data: packet.Data[:],
length: int(packet.DataLength),
src: src,
dst: dst,
}
plog.G(context.Background()).Debugf("[TCP] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length)
p.connInbound <- u
}
}
func (p *Peer) routePeer() {
func (p *Peer) routeTCP() {
defer util.HandleCrash()
for e := range p.connInbound {
if routeToAddr := p.routeMapUDP.RouteTo(e.dst); routeToAddr != nil {
plog.G(context.Background()).Debugf("[UDP] Find UDP route to dst: %s -> %s", e.dst, routeToAddr)
_, err := p.conn.WriteTo(e.data[:e.length], routeToAddr)
config.LPool.Put(e.data[:])
if err != nil {
p.sendErr(err)
return
}
} else if conn, ok := p.routeMapTCP.Load(e.dst.String()); ok {
plog.G(context.Background()).Debugf("[TCP] Find TCP route to dst: %s -> %s", e.dst.String(), conn.(net.Conn).RemoteAddr())
dgram := newDatagramPacket(e.data[:e.length])
for packet := range p.tcpInbound {
if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok {
plog.G(context.Background()).Debugf("[TCP] Find TCP route SRC: %s to DST: %s -> %s", packet.src.String(), packet.dst.String(), conn.(net.Conn).RemoteAddr())
dgram := newDatagramPacket(packet.data[:packet.length])
err := dgram.Write(conn.(net.Conn))
config.LPool.Put(e.data[:])
config.LPool.Put(packet.data[:])
if err != nil {
plog.G(context.Background()).Errorf("[TCP] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err)
plog.G(context.Background()).Errorf("[TCP] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err)
p.sendErr(err)
return
}
} else {
plog.G(context.Background()).Debugf("[TUN] Not found route to dst: %s, write to TUN device", e.dst.String())
p.tunOutbound <- &DataElem{
data: e.data,
length: e.length,
src: e.src,
dst: e.dst,
plog.G(context.Background()).Debugf("[TCP] Not found route, write to TUN device. SRC: %s, DST: %s", packet.src.String(), packet.dst.String())
p.tunOutbound <- &Packet{
data: packet.data,
length: packet.length,
src: packet.src,
dst: packet.dst,
}
}
}
@@ -368,63 +290,33 @@ func (p *Peer) routePeer() {
func (p *Peer) routeTUN() {
defer util.HandleCrash()
for e := range p.tunInbound {
if addr := p.routeMapUDP.RouteTo(e.dst); addr != nil {
plog.G(context.Background()).Debugf("[TUN] Find UDP route to dst: %s -> %s", e.dst, addr)
_, err := p.conn.WriteTo(e.data[:e.length], addr)
config.LPool.Put(e.data[:])
for packet := range p.tunInbound {
if addr, ok := p.routeMapUDP.Load(packet.dst.String()); ok {
plog.G(context.Background()).Debugf("[TUN] Find UDP route to DST: %s -> %s, SRC: %s, DST: %s", packet.dst, addr, packet.src.String(), packet.dst.String())
_, err := p.conn.WriteTo(packet.data[:packet.length], addr.(net.Addr))
config.LPool.Put(packet.data[:])
if err != nil {
plog.G(context.Background()).Debugf("[TUN] Failed wirte to route dst: %s -> %s", e.dst, addr)
plog.G(context.Background()).Debugf("[TUN] Failed wirte to route dst: %s -> %s", packet.dst, addr)
p.sendErr(err)
return
}
} else if conn, ok := p.routeMapTCP.Load(e.dst.String()); ok {
plog.G(context.Background()).Debugf("[TUN] Find TCP route to dst: %s -> %s", e.dst.String(), conn.(net.Conn).RemoteAddr())
dgram := newDatagramPacket(e.data[:e.length])
} else if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok {
plog.G(context.Background()).Debugf("[TUN] Find TCP route to dst: %s -> %s", packet.dst.String(), conn.(net.Conn).RemoteAddr())
dgram := newDatagramPacket(packet.data[:packet.length])
err := dgram.Write(conn.(net.Conn))
config.LPool.Put(e.data[:])
config.LPool.Put(packet.data[:])
if err != nil {
plog.G(context.Background()).Errorf("[TUN] Failed to write TCP %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err)
plog.G(context.Background()).Errorf("[TUN] Failed to write TCP %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err)
p.sendErr(err)
return
}
} else {
plog.G(context.Background()).Errorf("[TUN] No route for src: %s -> dst: %s, drop it", e.src, e.dst)
config.LPool.Put(e.data[:])
plog.G(context.Background()).Errorf("[TUN] No route for src: %s -> dst: %s, drop it", packet.src, packet.dst)
config.LPool.Put(packet.data[:])
}
}
}
func (p *Peer) Start() {
go p.readFromConn()
go p.readFromTCPConn()
go p.routePeer()
go p.routeTUN()
}
func (p *Peer) Close() {
p.conn.Close()
}
func transportTunServer(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, routeMapUDP *RouteMap, routeMapTCP *sync.Map) error {
p := &Peer{
conn: packetConn,
connInbound: make(chan *udpElem, MaxSize),
tunInbound: tunInbound,
tunOutbound: tunOutbound,
routeMapUDP: routeMapUDP,
routeMapTCP: routeMapTCP,
errChan: make(chan error, 2),
}
defer p.Close()
p.Start()
select {
case err := <-p.errChan:
plog.G(ctx).Errorf(err.Error())
return err
case <-ctx.Done():
return nil
}
}

View File

@@ -13,56 +13,66 @@ import (
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) {
defer tun.Close()
remoteAddr, err := net.ResolveUDPAddr("udp", h.node.Remote)
if err != nil {
plog.G(ctx).Errorf("[TUN-CLIENT] Failed to resolve udp addr %s: %v", h.node.Remote, err)
return
}
in := make(chan *DataElem, MaxSize)
out := make(chan *DataElem, MaxSize)
defer util.SafeClose(in)
defer util.SafeClose(out)
d := &ClientDevice{
func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn, remoteAddr *net.UDPAddr) {
device := &ClientDevice{
tun: tun,
tunInbound: in,
tunOutbound: out,
chExit: h.chExit,
tunInbound: make(chan *Packet, MaxSize),
tunOutbound: make(chan *Packet, MaxSize),
errChan: h.errChan,
}
d.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) {
for ctx.Err() == nil {
packetConn, err := getRemotePacketConn(ctx, h.chain)
if err != nil {
plog.G(ctx).Debugf("[TUN-CLIENT] Failed to get remote conn from %s -> %s: %s", tun.LocalAddr(), remoteAddr, err)
time.Sleep(time.Millisecond * 200)
continue
}
err = transportTunClient(ctx, tunInbound, tunOutbound, packetConn, remoteAddr)
if err != nil {
plog.G(ctx).Debugf("[TUN-CLIENT] %s: %v", tun.LocalAddr(), err)
}
}
})
d.Start(ctx)
defer device.Close()
go device.forwardPacketToRemote(ctx, remoteAddr, h.forward)
go device.readFromTun()
go device.writeToTun()
go heartbeats(ctx, device.tun)
select {
case <-device.errChan:
case <-ctx.Done():
}
}
func getRemotePacketConn(ctx context.Context, chain *Chain) (packetConn net.PacketConn, err error) {
type ClientDevice struct {
tun net.Conn
tunInbound chan *Packet
tunOutbound chan *Packet
errChan chan error
remote *net.UDPAddr
forward *Forwarder
}
func (d *ClientDevice) forwardPacketToRemote(ctx context.Context, remoteAddr *net.UDPAddr, forward *Forwarder) {
for ctx.Err() == nil {
func() {
packetConn, err := getRemotePacketConn(ctx, forward)
if err != nil {
plog.G(ctx).Debugf("[TUN-CLIENT] Failed to get remote conn from %s -> %s: %s", d.tun.LocalAddr(), remoteAddr, err)
time.Sleep(time.Millisecond * 200)
return
}
err = transportTunPacketClient(ctx, d.tunInbound, d.tunOutbound, packetConn, remoteAddr)
if err != nil {
plog.G(ctx).Debugf("[TUN-CLIENT] %s: %v", d.tun.LocalAddr(), err)
}
}()
}
}
func getRemotePacketConn(ctx context.Context, forwarder *Forwarder) (packetConn net.PacketConn, err error) {
defer func() {
if err != nil && packetConn != nil {
_ = packetConn.Close()
}
}()
if !chain.IsEmpty() {
var cc net.Conn
cc, err = chain.DialContext(ctx)
if !forwarder.IsEmpty() {
var conn net.Conn
conn, err = forwarder.DialContext(ctx)
if err != nil {
return
}
var ok bool
if packetConn, ok = cc.(net.PacketConn); !ok {
if packetConn, ok = conn.(net.PacketConn); !ok {
err = errors.New("not a packet connection")
return
}
@@ -76,20 +86,21 @@ func getRemotePacketConn(ctx context.Context, chain *Chain) (packetConn net.Pack
return
}
func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, remoteAddr net.Addr) error {
func transportTunPacketClient(ctx context.Context, tunInbound <-chan *Packet, tunOutbound chan<- *Packet, packetConn net.PacketConn, remoteAddr net.Addr) error {
errChan := make(chan error, 2)
defer packetConn.Close()
go func() {
defer util.HandleCrash()
for e := range tunInbound {
if e.src.Equal(e.dst) {
util.SafeWrite(tunOutbound, e)
for packet := range tunInbound {
if packet.src.Equal(packet.dst) {
util.SafeWrite(tunOutbound, packet)
continue
}
_, err := packetConn.WriteTo(e.data[:e.length], remoteAddr)
config.LPool.Put(e.data[:])
_, err := packetConn.WriteTo(packet.data[:packet.length], remoteAddr)
config.LPool.Put(packet.data[:])
if err != nil {
plog.G(ctx).Errorf("failed to write packet to remote %s: %v", remoteAddr, err)
util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to write packet to remote %s", remoteAddr)))
return
}
@@ -106,7 +117,7 @@ func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOut
util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to read packet from remote %s", remoteAddr)))
return
}
util.SafeWrite(tunOutbound, &DataElem{data: buf[:], length: n})
util.SafeWrite(tunOutbound, &Packet{data: buf[:], length: n})
}
}()
@@ -118,54 +129,22 @@ func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOut
}
}
type ClientDevice struct {
tun net.Conn
tunInbound chan *DataElem
tunOutbound chan *DataElem
// your main logic
tunInboundHandler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)
chExit chan error
}
func (d *ClientDevice) Start(ctx context.Context) {
go d.tunInboundHandler(d.tunInbound, d.tunOutbound)
go heartbeats(ctx, d.tun)
go d.readFromTun()
go d.writeToTun()
select {
case err := <-d.chExit:
plog.G(ctx).Errorf("[TUN-CLIENT]: %v", err)
return
case <-ctx.Done():
return
}
}
func (d *ClientDevice) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)) {
d.tunInboundHandler = handler
}
func (d *ClientDevice) readFromTun() {
defer util.HandleCrash()
for {
buf := config.LPool.Get().([]byte)[:]
n, err := d.tun.Read(buf[:])
if err != nil {
util.SafeWrite(d.chExit, err)
util.SafeWrite(d.errChan, err)
config.LPool.Put(buf[:])
return
}
if n == 0 {
config.LPool.Put(buf[:])
continue
}
// Try to determine network protocol number, default zero.
var src, dst net.IP
src, dst, err = util.ParseIP(buf[:n])
if err != nil {
plog.G(context.Background()).Debugf("[TUN-GVISOR] Unknown packet: %v", err)
plog.G(context.Background()).Errorf("[TUN-RAW] Unknown packet: %v", err)
config.LPool.Put(buf[:])
continue
}
@@ -180,8 +159,47 @@ func (d *ClientDevice) writeToTun() {
_, err := d.tun.Write(e.data[:e.length])
config.LPool.Put(e.data[:])
if err != nil {
util.SafeWrite(d.chExit, err)
util.SafeWrite(d.errChan, err)
return
}
}
}
func (d *ClientDevice) Close() {
d.tun.Close()
util.SafeClose(d.tunInbound)
util.SafeClose(d.tunOutbound)
}
func heartbeats(ctx context.Context, tun net.Conn) {
tunIfi, err := util.GetTunDeviceByConn(tun)
if err != nil {
plog.G(ctx).Errorf("Failed to get tun device: %s", err.Error())
return
}
srcIPv4, srcIPv6, dockerSrcIPv4, err := util.GetTunDeviceIP(tunIfi.Name)
if err != nil {
return
}
ticker := time.NewTicker(time.Second * 60)
defer ticker.Stop()
for ; true; <-ticker.C {
select {
case <-ctx.Done():
return
default:
}
if srcIPv4 != nil {
util.Ping(ctx, srcIPv4.String(), config.RouterIP.String())
}
if srcIPv6 != nil {
util.Ping(ctx, srcIPv6.String(), config.RouterIP6.String())
}
if dockerSrcIPv4 != nil {
util.Ping(ctx, dockerSrcIPv4.String(), config.DockerRouterIP.String())
}
}
}

View File

@@ -2,65 +2,51 @@ package core
import (
"encoding/binary"
"fmt"
"io"
"net"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
)
type datagramPacket struct {
type DatagramPacket struct {
DataLength uint16 // [2]byte
Data []byte // []byte
}
func (addr *datagramPacket) String() string {
if addr == nil {
return ""
}
return fmt.Sprintf("DataLength: %d, Data: %v\n", addr.DataLength, addr.Data)
}
func newDatagramPacket(data []byte) (r *datagramPacket) {
return &datagramPacket{
func newDatagramPacket(data []byte) (r *DatagramPacket) {
return &DatagramPacket{
DataLength: uint16(len(data)),
Data: data,
}
}
func (addr *datagramPacket) Addr() net.Addr {
var server8422, _ = net.ResolveUDPAddr("udp", "127.0.0.1:8422")
return server8422
}
func readDatagramPacket(r io.Reader, b []byte) (*datagramPacket, error) {
func readDatagramPacket(r io.Reader, b []byte) (*DatagramPacket, error) {
_, err := io.ReadFull(r, b[:2])
if err != nil {
return nil, err
}
dataLength := binary.BigEndian.Uint16(b[:2])
_, err = io.ReadFull(r, b[:dataLength])
if err != nil /*&& (err != io.ErrUnexpectedEOF || err != io.EOF)*/ {
if err != nil {
return nil, err
}
return &datagramPacket{DataLength: dataLength, Data: b[:dataLength]}, nil
return &DatagramPacket{DataLength: dataLength, Data: b[:dataLength]}, nil
}
// this method will return all byte array in the way: b[:]
func readDatagramPacketServer(r io.Reader, b []byte) (*datagramPacket, error) {
func readDatagramPacketServer(r io.Reader, b []byte) (*DatagramPacket, error) {
_, err := io.ReadFull(r, b[:2])
if err != nil {
return nil, err
}
dataLength := binary.BigEndian.Uint16(b[:2])
_, err = io.ReadFull(r, b[:dataLength])
if err != nil /*&& (err != io.ErrUnexpectedEOF || err != io.EOF)*/ {
if err != nil {
return nil, err
}
return &datagramPacket{DataLength: dataLength, Data: b[:]}, nil
return &DatagramPacket{DataLength: dataLength, Data: b[:]}, nil
}
func (addr *datagramPacket) Write(w io.Writer) error {
func (addr *DatagramPacket) Write(w io.Writer) error {
buf := config.LPool.Get().([]byte)[:]
defer config.LPool.Put(buf[:])
binary.BigEndian.PutUint16(buf[:2], uint16(len(addr.Data)))

View File

@@ -2,15 +2,16 @@ package action
import (
"context"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"net"
"sync"
"github.com/containernetworking/cni/pkg/types"
"github.com/wencaiwulue/kubevpn/v2/pkg/config"
"github.com/wencaiwulue/kubevpn/v2/pkg/core"
"github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc"
"github.com/wencaiwulue/kubevpn/v2/pkg/handler"
plog "github.com/wencaiwulue/kubevpn/v2/pkg/log"
"github.com/wencaiwulue/kubevpn/v2/pkg/tun"
"github.com/wencaiwulue/kubevpn/v2/pkg/util"
)
@@ -44,7 +45,7 @@ func (svr *Server) SshStart(ctx context.Context, req *rpc.SshStartRequest) (resp
}()
r := core.Route{
ServeNodes: []string{
Listeners: []string{
"tun://127.0.0.1:8422?net=" + DefaultServerIP,
"tcp://:10800",
},

View File

@@ -130,10 +130,10 @@ func (w *wsHandler) createTwoWayTUNTunnel(ctx context.Context, cli *ssh.Client)
w.PrintLine(msg)
w.cidr = append(w.cidr, string(serverIP))
r := core.Route{
ServeNodes: []string{
Listeners: []string{
fmt.Sprintf("tun:/127.0.0.1:8422?net=%s&route=%s", clientIP, strings.Join(w.cidr, ",")),
},
ChainNode: fmt.Sprintf("tcp://127.0.0.1:%d", localPort),
Forwarder: fmt.Sprintf("tcp://127.0.0.1:%d", localPort),
Retries: 5,
}
servers, err := handler.Parse(r)

View File

@@ -416,18 +416,18 @@ func (c *ConnectOptions) startLocalTunServer(ctx context.Context, forwardAddress
return err
}
chainNode, err := core.ParseNode(forwardAddress)
forward, err := core.ParseNode(forwardAddress)
if err != nil {
plog.G(ctx).Errorf("Failed to parse forward node %s: %v", forwardAddress, err)
return err
}
chainNode.Client = &core.Client{
Connector: core.UDPOverTCPTunnelConnector(),
forward.Client = &core.Client{
Connector: core.NewUDPOverTCPConnector(),
Transporter: core.TCPTransporter(),
}
chain := core.NewChain(5, chainNode)
forwarder := core.NewForwarder(5, forward)
handler := core.TunHandler(chain, node)
handler := core.TunHandler(forwarder, node)
listener, err := tun.Listener(tunConfig)
if err != nil {
plog.G(ctx).Errorf("Failed to create tun listener: %v", err)
@@ -703,7 +703,7 @@ func Parse(r core.Route) ([]core.Server, error) {
return nil, err
}
if len(servers) == 0 {
return nil, fmt.Errorf("server is empty, server config: %s", strings.Join(r.ServeNodes, ","))
return nil, fmt.Errorf("server is empty, server config: %s", strings.Join(r.Listeners, ","))
}
return servers, nil
}
@@ -772,7 +772,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context, m *dhcp.Manager) (err erro
}
}
if len(c.cidrs) != 0 {
plog.G(ctx).Infoln("Got network CIDR from cache")
plog.G(ctx).Infoln("Get network CIDR from cache")
return nil
}
}

View File

@@ -365,21 +365,19 @@ func genDeploySpec(namespace string, udp8422 string, tcp10800 string, tcp9002 st
Args: []string{util.If(
gvisor,
`
kubevpn server -L "tcp://:10800" -L "gtcp://:10801" -L "gudp://:10802" --debug=true`,
kubevpn server -l "tcp://:10800" -l "gtcp://:10801" -l "gudp://:10802" --debug=true`,
`
sysctl -w net.ipv4.ip_forward=1
sysctl -w net.ipv6.conf.all.disable_ipv6=0
sysctl -w net.ipv6.conf.all.forwarding=1
update-alternatives --set iptables /usr/sbin/iptables-legacy
iptables -F
ip6tables -F
iptables -P INPUT ACCEPT
ip6tables -P INPUT ACCEPT
iptables -P FORWARD ACCEPT
ip6tables -P FORWARD ACCEPT
iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE
ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE
kubevpn server -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:10801" -L "gudp://:10802" --debug=true`,
kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" -l "gudp://:10802" --debug=true`,
)},
EnvFrom: []v1.EnvFromSource{{
SecretRef: &v1.SecretEnvSource{

View File

@@ -51,8 +51,6 @@ sysctl -w net.ipv4.ip_forward=1
sysctl -w net.ipv6.conf.all.disable_ipv6=0
sysctl -w net.ipv6.conf.all.forwarding=1
update-alternatives --set iptables /usr/sbin/iptables-legacy
iptables -F
ip6tables -F
iptables -P INPUT ACCEPT
ip6tables -P INPUT ACCEPT
iptables -P FORWARD ACCEPT
@@ -61,7 +59,7 @@ iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d ${CIDR4} -j DNAT --t
ip6tables -t nat -A PREROUTING ! -p icmp ! -s 0:0:0:0:0:0:0:1 ! -d ${CIDR6} -j DNAT --to :15006
iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d ${CIDR4} -j MASQUERADE
ip6tables -t nat -A POSTROUTING ! -p icmp ! -s 0:0:0:0:0:0:0:1 ! -d ${CIDR6} -j MASQUERADE
kubevpn server -L "tun:/localhost:8422?net=${TunIPv4}&route=${CIDR4}" -F "tcp://${TrafficManagerService}:10800"`,
kubevpn server -l "tun:/localhost:8422?net=${TunIPv4}&net6=${TunIPv6}&route=${CIDR4}" -f "tcp://${TrafficManagerService}:10800"`,
},
Env: []v1.EnvVar{
{
@@ -170,7 +168,7 @@ func AddEnvoyContainer(spec *v1.PodTemplateSpec, ns, nodeId string, ipv6 bool, c
Image: config.Image,
Command: []string{"/bin/sh", "-c"},
Args: []string{`
kubevpn server -L "ssh://:2222"`,
kubevpn server -l "ssh://:2222"`,
},
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{

View File

@@ -84,8 +84,6 @@ sysctl -w net.ipv6.conf.all.disable_ipv6=0
sysctl -w net.ipv6.conf.all.forwarding=1
sysctl -w net.ipv4.conf.all.route_localnet=1
update-alternatives --set iptables /usr/sbin/iptables-legacy
iptables -F
ip6tables -F
iptables -P INPUT ACCEPT
ip6tables -P INPUT ACCEPT
iptables -P FORWARD ACCEPT
@@ -94,7 +92,7 @@ iptables -t nat -A PREROUTING ! -p icmp -j DNAT --to ${LocalTunIPv4}
ip6tables -t nat -A PREROUTING ! -p icmp -j DNAT --to ${LocalTunIPv6}
iptables -t nat -A POSTROUTING ! -p icmp -j MASQUERADE
ip6tables -t nat -A POSTROUTING ! -p icmp -j MASQUERADE
kubevpn server -L "tun:/127.0.0.1:8422?net=${TunIPv4}&route=${CIDR4}" -F "tcp://${TrafficManagerService}:10800"`,
kubevpn server -l "tun:/127.0.0.1:8422?net=${TunIPv4}&net6=${TunIPv6}&route=${CIDR4}" -f "tcp://${TrafficManagerService}:10800"`,
},
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{

View File

@@ -32,7 +32,7 @@ func GetLoggerForClient(level int32, out io.Writer) *log.Logger {
func InitLoggerForServer() *log.Logger {
return &log.Logger{
Out: os.Stderr,
Formatter: &format{},
Formatter: &serverFormat{},
Hooks: make(log.LevelHooks),
Level: log.DebugLevel,
ExitFunc: os.Exit,

View File

@@ -76,7 +76,7 @@ func createTun(cfg Config) (conn net.Conn, itf *net.Interface, err error) {
}
if err = addTunRoutes(name, cfg.Routes...); err != nil {
err = pkgerr.Wrap(err, "Add tun routes failed")
err = pkgerr.Wrap(err, "Add tun device routes failed")
return
}