diff --git a/charts/kubevpn/templates/deployment.yaml b/charts/kubevpn/templates/deployment.yaml index 60347560..0ee51f05 100644 --- a/charts/kubevpn/templates/deployment.yaml +++ b/charts/kubevpn/templates/deployment.yaml @@ -39,17 +39,15 @@ spec: sysctl -w net.ipv6.conf.all.disable_ipv6=0 sysctl -w net.ipv6.conf.all.forwarding=1 update-alternatives --set iptables /usr/sbin/iptables-legacy - iptables -F - ip6tables -F iptables -P INPUT ACCEPT ip6tables -P INPUT ACCEPT iptables -P FORWARD ACCEPT ip6tables -P FORWARD ACCEPT iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE - kubevpn server -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:10801" -L "gudp://:10802" --debug=true + kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" -l "gudp://:10802" --debug=true {{- else }} - - kubevpn server -L "tcp://:10800" -L "gtcp://:10801" -L "gudp://:10802" --debug=true + - kubevpn server -l "tcp://:10800" -l "gtcp://:10801" -l "gudp://:10802" --debug=true {{- end }} command: - /bin/sh diff --git a/cmd/kubevpn/cmds/controlplane.go b/cmd/kubevpn/cmds/controlplane.go index 539b8deb..a72b8cdb 100644 --- a/cmd/kubevpn/cmds/controlplane.go +++ b/cmd/kubevpn/cmds/controlplane.go @@ -17,7 +17,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func CmdControlPlane(_ cmdutil.Factory) *cobra.Command { +func CmdControlPlane(cmdutil.Factory) *cobra.Command { var ( watchDirectoryFilename string port uint = 9002 diff --git a/cmd/kubevpn/cmds/daemon.go b/cmd/kubevpn/cmds/daemon.go index 9395c790..6fa72f76 100644 --- a/cmd/kubevpn/cmds/daemon.go +++ b/cmd/kubevpn/cmds/daemon.go @@ -21,7 +21,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func CmdDaemon(_ cmdutil.Factory) *cobra.Command { +func CmdDaemon(cmdutil.Factory) *cobra.Command { var opt = &daemon.SvrOption{} cmd := &cobra.Command{ Use: "daemon", diff --git a/cmd/kubevpn/cmds/imagecopy.go b/cmd/kubevpn/cmds/imagecopy.go index 2e51596d..0418b2f0 100644 --- a/cmd/kubevpn/cmds/imagecopy.go +++ b/cmd/kubevpn/cmds/imagecopy.go @@ -8,7 +8,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/util/regctl" ) -func CmdImageCopy(_ cmdutil.Factory) *cobra.Command { +func CmdImageCopy(cmdutil.Factory) *cobra.Command { var imageCmd = &cobra.Command{ Use: "image ", Short: "copy images", diff --git a/cmd/kubevpn/cmds/server.go b/cmd/kubevpn/cmds/server.go index 5c759860..5841c0b4 100644 --- a/cmd/kubevpn/cmds/server.go +++ b/cmd/kubevpn/cmds/server.go @@ -20,7 +20,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func CmdServer(_ cmdutil.Factory) *cobra.Command { +func CmdServer(cmdutil.Factory) *cobra.Command { var route = &core.Route{} cmd := &cobra.Command{ Use: "server", @@ -30,8 +30,8 @@ func CmdServer(_ cmdutil.Factory) *cobra.Command { Server side, startup traffic manager, forward inbound and outbound traffic. `)), Example: templates.Examples(i18n.T(` - # server node - kubevpn server -L "tcp://:10800" -L "tun://127.0.0.1:8422?net=198.19.0.123/32" + # server listener + kubevpn server -l "tcp://:10800" -l "tun://127.0.0.1:8422?net=198.19.0.123/32" `)), PreRun: func(*cobra.Command, []string) { runtime.GOMAXPROCS(0) @@ -50,8 +50,8 @@ func CmdServer(_ cmdutil.Factory) *cobra.Command { return handler.Run(ctx, servers) }, } - cmd.Flags().StringArrayVarP(&route.ServeNodes, "node", "L", []string{}, "Startup node server. eg: tcp://localhost:1080") - cmd.Flags().StringVarP(&route.ChainNode, "chain", "F", "", "Forward chain. eg: tcp://192.168.1.100:2345") + cmd.Flags().StringArrayVarP(&route.Listeners, "listener", "l", []string{}, "Startup listener server. eg: tcp://localhost:1080") + cmd.Flags().StringVarP(&route.Forwarder, "forwarder", "f", "", "Special forwarder. eg: tcp://192.168.1.100:2345") cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug log or not") return cmd } diff --git a/cmd/kubevpn/cmds/ssh.go b/cmd/kubevpn/cmds/ssh.go index efa88a50..40a472dc 100644 --- a/cmd/kubevpn/cmds/ssh.go +++ b/cmd/kubevpn/cmds/ssh.go @@ -27,7 +27,7 @@ import ( // CmdSSH // Remember to use network mask 32, because ssh using unique network CIDR 198.18.0.0/16 -func CmdSSH(_ cmdutil.Factory) *cobra.Command { +func CmdSSH(cmdutil.Factory) *cobra.Command { var sshConf = &pkgssh.SshConfig{} var extraCIDR []string var platform string diff --git a/cmd/kubevpn/cmds/sshdaemon.go b/cmd/kubevpn/cmds/sshdaemon.go index 7ddac1ba..fd11d560 100644 --- a/cmd/kubevpn/cmds/sshdaemon.go +++ b/cmd/kubevpn/cmds/sshdaemon.go @@ -15,7 +15,7 @@ import ( // CmdSSHDaemon // set local tun ip 198.19.0.1/32, remember to use mask 32 -func CmdSSHDaemon(_ cmdutil.Factory) *cobra.Command { +func CmdSSHDaemon(cmdutil.Factory) *cobra.Command { var clientIP string cmd := &cobra.Command{ Use: "ssh-daemon", diff --git a/cmd/kubevpn/cmds/syncthing.go b/cmd/kubevpn/cmds/syncthing.go index 8ed6cd6a..1f4c3ea7 100644 --- a/cmd/kubevpn/cmds/syncthing.go +++ b/cmd/kubevpn/cmds/syncthing.go @@ -10,7 +10,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func CmdSyncthing(_ cmdutil.Factory) *cobra.Command { +func CmdSyncthing(cmdutil.Factory) *cobra.Command { var detach bool var dir string cmd := &cobra.Command{ diff --git a/cmd/kubevpn/cmds/upgrade.go b/cmd/kubevpn/cmds/upgrade.go index f8b56758..dd0e7bd0 100644 --- a/cmd/kubevpn/cmds/upgrade.go +++ b/cmd/kubevpn/cmds/upgrade.go @@ -16,7 +16,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/upgrade" ) -func CmdUpgrade(_ cmdutil.Factory) *cobra.Command { +func CmdUpgrade(cmdutil.Factory) *cobra.Command { cmd := &cobra.Command{ Use: "upgrade", Short: i18n.T("Upgrade kubevpn client to latest version"), diff --git a/pkg/config/config.go b/pkg/config/config.go index 13ce6ece..9da86fd0 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -160,7 +160,7 @@ var ( ) var ( - KeepAliveTime = 180 * time.Second + KeepAliveTime = 60 * time.Second DialTimeout = 15 * time.Second HandshakeTimeout = 5 * time.Second ConnectTimeout = 5 * time.Second diff --git a/pkg/core/chain.go b/pkg/core/forwarder.go similarity index 61% rename from pkg/core/chain.go rename to pkg/core/forwarder.go index 4f9be4cc..7c18c2e3 100644 --- a/pkg/core/chain.go +++ b/pkg/core/forwarder.go @@ -3,34 +3,33 @@ package core import ( "context" "errors" - "math" "net" ) var ( - // ErrorEmptyChain is an error that implies the chain is empty. - ErrorEmptyChain = errors.New("empty chain") + // ErrorEmptyForwarder is an error that implies the forward is empty. + ErrorEmptyForwarder = errors.New("empty forwarder") ) -type Chain struct { +type Forwarder struct { retries int node *Node } -func NewChain(retry int, node *Node) *Chain { - return &Chain{retries: retry, node: node} +func NewForwarder(retry int, node *Node) *Forwarder { + return &Forwarder{retries: retry, node: node} } -func (c *Chain) Node() *Node { +func (c *Forwarder) Node() *Node { return c.node } -func (c *Chain) IsEmpty() bool { +func (c *Forwarder) IsEmpty() bool { return c == nil || c.node == nil } -func (c *Chain) DialContext(ctx context.Context) (conn net.Conn, err error) { - for i := 0; i < int(math.Max(float64(1), float64(c.retries))); i++ { +func (c *Forwarder) DialContext(ctx context.Context) (conn net.Conn, err error) { + for i := 0; i < max(1, c.retries); i++ { conn, err = c.dial(ctx) if err == nil { break @@ -39,9 +38,9 @@ func (c *Chain) DialContext(ctx context.Context) (conn net.Conn, err error) { return } -func (c *Chain) dial(ctx context.Context) (net.Conn, error) { +func (c *Forwarder) dial(ctx context.Context) (net.Conn, error) { if c.IsEmpty() { - return nil, ErrorEmptyChain + return nil, ErrorEmptyForwarder } conn, err := c.getConn(ctx) @@ -58,7 +57,7 @@ func (c *Chain) dial(ctx context.Context) (net.Conn, error) { return cc, nil } -func (*Chain) resolve(addr string) string { +func (*Forwarder) resolve(addr string) string { if host, port, err := net.SplitHostPort(addr); err == nil { if ips, err := net.LookupIP(host); err == nil && len(ips) > 0 { return net.JoinHostPort(ips[0].String(), port) @@ -67,9 +66,9 @@ func (*Chain) resolve(addr string) string { return addr } -func (c *Chain) getConn(ctx context.Context) (net.Conn, error) { +func (c *Forwarder) getConn(ctx context.Context) (net.Conn, error) { if c.IsEmpty() { - return nil, ErrorEmptyChain + return nil, ErrorEmptyForwarder } return c.Node().Client.Dial(ctx, c.resolve(c.Node().Addr)) } diff --git a/pkg/core/gvisortcphandler.go b/pkg/core/gvisortcphandler.go index 73daa772..0c40055f 100644 --- a/pkg/core/gvisortcphandler.go +++ b/pkg/core/gvisortcphandler.go @@ -18,7 +18,7 @@ import ( type gvisorTCPHandler struct { // map[srcIP]net.Conn routeMapTCP *sync.Map - packetChan chan *datagramPacket + packetChan chan *DatagramPacket } func GvisorTCPHandler() Handler { @@ -32,7 +32,7 @@ func (h *gvisorTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) { defer tcpConn.Close() cancel, cancelFunc := context.WithCancel(ctx) defer cancelFunc() - plog.G(ctx).Debugf("[TCP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) + plog.G(ctx).Debugf("[TUN-GVISOR] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) h.handle(cancel, tcpConn) } diff --git a/pkg/core/gvisortunendpoint.go b/pkg/core/gvisortunendpoint.go index 8816ecad..aa7b6056 100755 --- a/pkg/core/gvisortunendpoint.go +++ b/pkg/core/gvisortunendpoint.go @@ -20,7 +20,7 @@ import ( ) func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, conn net.Conn, endpoint *channel.Endpoint) { - tcpConn, _ := newGvisorFakeUDPTunnelConnOverTCP(ctx, conn) + tcpConn, _ := newGvisorUDPConnOverTCP(ctx, conn) for { select { case <-ctx.Done(): @@ -34,7 +34,7 @@ func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, c buf := pktBuffer.ToView().AsSlice() _, err := tcpConn.Write(buf) if err != nil { - plog.G(ctx).Errorf("[TUN] Failed to write data to tun device: %v", err) + plog.G(ctx).Errorf("[TUN-GVISOR] Failed to write data to tun device: %v", err) } } } @@ -42,7 +42,9 @@ func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, c // tun --> dispatcher func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, conn net.Conn, endpoint *channel.Endpoint) { - tcpConn, _ := newGvisorFakeUDPTunnelConnOverTCP(ctx, conn) + tcpConn, _ := newGvisorUDPConnOverTCP(ctx, conn) + defer h.removeFromRouteMapTCP(ctx, conn) + for { select { case <-ctx.Done(): @@ -53,12 +55,12 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c buf := config.LPool.Get().([]byte)[:] read, err := tcpConn.Read(buf[:]) if err != nil { - plog.G(ctx).Errorf("[TUN] Failed to read from tcp conn: %v", err) + plog.G(ctx).Errorf("[TUN-GVISOR] Failed to read from tcp conn: %v", err) config.LPool.Put(buf[:]) return } if read == 0 { - plog.G(ctx).Warnf("[TUN] Read from tcp conn length is %d", read) + plog.G(ctx).Warnf("[TUN-GVISOR] Read from tcp conn length is %d", read) config.LPool.Put(buf[:]) continue } @@ -83,7 +85,7 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c protocol = header.IPv6ProtocolNumber ipHeader, err := ipv6.ParseHeader(buf[:read]) if err != nil { - plog.G(ctx).Errorf("Failed to parse IPv6 header: %s", err.Error()) + plog.G(ctx).Errorf("[TUN-GVISOR] Failed to parse IPv6 header: %s", err.Error()) config.LPool.Put(buf[:]) continue } @@ -96,11 +98,11 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c continue } - h.addRoute(ctx, src, conn) + h.addToRouteMapTCP(ctx, src, conn) // inner ip like 198.19.0.100/102/103 connect each other if config.CIDR.Contains(dst) || config.CIDR6.Contains(dst) { - plog.G(ctx).Debugf("[TUN-RAW] Forward to TUN device, SRC: %s, DST: %s, Length: %d", src.String(), dst.String(), read) - util.SafeWrite(h.packetChan, &datagramPacket{ + plog.G(ctx).Debugf("[TUN-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Length: %d", src.String(), dst.String(), read) + util.SafeWrite(h.packetChan, &DatagramPacket{ DataLength: uint16(read), Data: buf[:], }) @@ -115,18 +117,28 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c sniffer.LogPacket("[gVISOR] ", sniffer.DirectionRecv, protocol, pkt) endpoint.InjectInbound(protocol, pkt) pkt.DecRef() - plog.G(ctx).Debugf("[TUN-%s] Write to Gvisor IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), layers.IPProtocol(ipProtocol).String(), src.String(), dst, read) + plog.G(ctx).Debugf("[TUN-GVISOR] Write to Gvisor IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), src.String(), dst, read) } } -func (h *gvisorTCPHandler) addRoute(ctx context.Context, src net.IP, tcpConn net.Conn) { +func (h *gvisorTCPHandler) addToRouteMapTCP(ctx context.Context, src net.IP, tcpConn net.Conn) { value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn) if loaded { if tcpConn != value.(net.Conn) { h.routeMapTCP.Store(src.String(), tcpConn) - plog.G(ctx).Debugf("[TCP] Replace route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) + plog.G(ctx).Debugf("[TUN-GVISOR] Replace route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) } } else { - plog.G(ctx).Debugf("[TCP] Add new route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) + plog.G(ctx).Debugf("[TUN-GVISOR] Add new route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) } } + +func (h *gvisorTCPHandler) removeFromRouteMapTCP(ctx context.Context, tcpConn net.Conn) { + h.routeMapTCP.Range(func(key, value any) bool { + if value.(net.Conn) == tcpConn { + h.routeMapTCP.Delete(key) + plog.G(ctx).Debugf("[TCP-GVISOR] Delete to DST %s by conn %s from globle route map TCP", key, tcpConn.LocalAddr()) + } + return true + }) +} diff --git a/pkg/core/gvisorudphandler.go b/pkg/core/gvisorudphandler.go index 8054af04..3e28f47e 100644 --- a/pkg/core/gvisorudphandler.go +++ b/pkg/core/gvisorudphandler.go @@ -44,17 +44,17 @@ func (h *gvisorUDPHandler) Handle(ctx context.Context, tcpConn net.Conn) { } // fake udp connect over tcp -type gvisorFakeUDPTunnelConn struct { +type gvisorUDPConnOverTCP struct { // tcp connection net.Conn ctx context.Context } -func newGvisorFakeUDPTunnelConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) { - return &gvisorFakeUDPTunnelConn{ctx: ctx, Conn: conn}, nil +func newGvisorUDPConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) { + return &gvisorUDPConnOverTCP{ctx: ctx, Conn: conn}, nil } -func (c *gvisorFakeUDPTunnelConn) Read(b []byte) (int, error) { +func (c *gvisorUDPConnOverTCP) Read(b []byte) (int, error) { select { case <-c.ctx.Done(): return 0, c.ctx.Err() @@ -67,15 +67,15 @@ func (c *gvisorFakeUDPTunnelConn) Read(b []byte) (int, error) { } } -func (c *gvisorFakeUDPTunnelConn) Write(b []byte) (int, error) { - dgram := newDatagramPacket(b) - if err := dgram.Write(c.Conn); err != nil { +func (c *gvisorUDPConnOverTCP) Write(b []byte) (int, error) { + packet := newDatagramPacket(b) + if err := packet.Write(c.Conn); err != nil { return 0, err } return len(b), nil } -func (c *gvisorFakeUDPTunnelConn) Close() error { +func (c *gvisorUDPConnOverTCP) Close() error { if cc, ok := c.Conn.(interface{ CloseRead() error }); ok { _ = cc.CloseRead() } @@ -184,13 +184,13 @@ func handle(ctx context.Context, tcpConn net.Conn, udpConn *net.UDPConn) { errChan <- err return } - dgram := newDatagramPacket(buf[:n]) - if err = dgram.Write(tcpConn); err != nil { - plog.G(ctx).Errorf("[TUN-UDP] Error: %s <- %s : %s", tcpConn.RemoteAddr(), dgram.Addr(), err) + packet := newDatagramPacket(buf[:n]) + if err = packet.Write(tcpConn); err != nil { + plog.G(ctx).Errorf("[TUN-UDP] Error: %s <- %s : %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), err) errChan <- err return } - plog.G(ctx).Debugf("[TUN-UDP] %s <<< %s length: %d", tcpConn.RemoteAddr(), dgram.Addr(), len(dgram.Data)) + plog.G(ctx).Debugf("[TUN-UDP] %s <<< %s length: %d", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), len(packet.Data)) } }() err := <-errChan diff --git a/pkg/core/route.go b/pkg/core/route.go index bea1c4d7..c6a6009e 100644 --- a/pkg/core/route.go +++ b/pkg/core/route.go @@ -4,14 +4,12 @@ import ( "context" "fmt" "net" - "os" "strings" "sync" "github.com/containernetworking/cni/pkg/types" "github.com/pkg/errors" - "github.com/wencaiwulue/kubevpn/v2/pkg/config" plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/tun" ) @@ -20,71 +18,63 @@ var ( // RouteMapTCP map[srcIP]net.Conn Globe route table for inner ip RouteMapTCP = &sync.Map{} // TCPPacketChan tcp connects - TCPPacketChan = make(chan *datagramPacket, MaxSize) + TCPPacketChan = make(chan *DatagramPacket, MaxSize) ) type TCPUDPacket struct { - data *datagramPacket + data *DatagramPacket } // Route example: -// -L "tcp://:10800" -L "tun://:8422?net=198.19.0.100/16" -// -L "tun:/10.233.24.133:8422?net=198.19.0.102/16&route=198.19.0.0/16" -// -L "tun:/127.0.0.1:8422?net=198.19.0.102/16&route=198.19.0.0/16,10.233.0.0/16" -F "tcp://127.0.0.1:10800" +// -l "tcp://:10800" -l "tun://:8422?net=198.19.0.100/16" +// -l "tun:/10.233.24.133:8422?net=198.19.0.102/16&route=198.19.0.0/16" +// -l "tun:/127.0.0.1:8422?net=198.19.0.102/16&route=198.19.0.0/16,10.233.0.0/16" -f "tcp://127.0.0.1:10800" type Route struct { - ServeNodes []string // -L tun - ChainNode string // -F tcp - Retries int + Listeners []string // -l tun + Forwarder string // -f tcp + Retries int } -func (r *Route) parseChain() (*Chain, error) { - node, err := parseChainNode(r.ChainNode) +func (r *Route) ParseForwarder() (*Forwarder, error) { + forwarder, err := ParseNode(r.Forwarder) if err != nil { return nil, err } - return NewChain(r.Retries, node), nil -} - -func parseChainNode(ns string) (*Node, error) { - node, err := ParseNode(ns) - if err != nil { - return nil, err - } - node.Client = &Client{ - Connector: UDPOverTCPTunnelConnector(), + forwarder.Client = &Client{ + Connector: NewUDPOverTCPConnector(), Transporter: TCPTransporter(), } - return node, nil + return NewForwarder(r.Retries, forwarder), nil } func (r *Route) GenerateServers() ([]Server, error) { - chain, err := r.parseChain() + forwarder, err := r.ParseForwarder() if err != nil && !errors.Is(err, ErrorInvalidNode) { - plog.G(context.Background()).Errorf("Failed to parse chain: %v", err) + plog.G(context.Background()).Errorf("Failed to parse forwarder: %v", err) return nil, err } - servers := make([]Server, 0, len(r.ServeNodes)) - for _, serveNode := range r.ServeNodes { + servers := make([]Server, 0, len(r.Listeners)) + for _, l := range r.Listeners { var node *Node - node, err = ParseNode(serveNode) + node, err = ParseNode(l) if err != nil { - plog.G(context.Background()).Errorf("Failed to parse node %s: %v", serveNode, err) + plog.G(context.Background()).Errorf("Failed to parse node %s: %v", l, err) return nil, err } - var ln net.Listener + var listener net.Listener var handler Handler switch node.Protocol { case "tun": - handler = TunHandler(chain, node) - ln, err = tun.Listener(tun.Config{ + handler = TunHandler(forwarder, node) + listener, err = tun.Listener(tun.Config{ Name: node.Get("name"), Addr: node.Get("net"), - Addr6: os.Getenv(config.EnvInboundPodTunIPv6), + Addr6: node.Get("net6"), MTU: node.GetInt("mtu"), - Routes: parseIPRoutes(node.Get("route")), + Routes: parseRoutes(node.Get("route")), Gateway: node.Get("gw"), }) if err != nil { @@ -93,28 +83,28 @@ func (r *Route) GenerateServers() ([]Server, error) { } case "tcp": handler = TCPHandler() - ln, err = TCPListener(node.Addr) + listener, err = TCPListener(node.Addr) if err != nil { plog.G(context.Background()).Errorf("Failed to create tcp listener: %v", err) return nil, err } case "gtcp": handler = GvisorTCPHandler() - ln, err = GvisorTCPListener(node.Addr) + listener, err = GvisorTCPListener(node.Addr) if err != nil { plog.G(context.Background()).Errorf("Failed to create gvisor tcp listener: %v", err) return nil, err } case "gudp": handler = GvisorUDPHandler() - ln, err = GvisorUDPListener(node.Addr) + listener, err = GvisorUDPListener(node.Addr) if err != nil { plog.G(context.Background()).Errorf("Failed to create gvisor udp listener: %v", err) return nil, err } case "ssh": handler = SSHHandler() - ln, err = SSHListener(node.Addr) + listener, err = SSHListener(node.Addr) if err != nil { plog.G(context.Background()).Errorf("Failed to create ssh listener: %v", err) return nil, err @@ -123,21 +113,18 @@ func (r *Route) GenerateServers() ([]Server, error) { plog.G(context.Background()).Errorf("Not support protocol %s", node.Protocol) return nil, fmt.Errorf("not support protocol %s", node.Protocol) } - servers = append(servers, Server{Listener: ln, Handler: handler}) + servers = append(servers, Server{Listener: listener, Handler: handler}) } return servers, nil } -func parseIPRoutes(routeStringList string) (routes []types.Route) { - if len(routeStringList) == 0 { - return - } - - routeList := strings.Split(routeStringList, ",") - for _, route := range routeList { +func parseRoutes(str string) []types.Route { + var routes []types.Route + list := strings.Split(str, ",") + for _, route := range list { if _, ipNet, _ := net.ParseCIDR(strings.TrimSpace(route)); ipNet != nil { routes = append(routes, types.Route{Dst: *ipNet}) } } - return + return routes } diff --git a/pkg/core/tcphandler.go b/pkg/core/tcphandler.go index 32a88f8f..5eea9c09 100644 --- a/pkg/core/tcphandler.go +++ b/pkg/core/tcphandler.go @@ -3,23 +3,21 @@ package core import ( "context" "net" - "strings" "sync" - "time" "github.com/wencaiwulue/kubevpn/v2/pkg/config" plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -type fakeUDPTunnelConnector struct { +type UDPOverTCPConnector struct { } -func UDPOverTCPTunnelConnector() Connector { - return &fakeUDPTunnelConnector{} +func NewUDPOverTCPConnector() Connector { + return &UDPOverTCPConnector{} } -func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) { +func (c *UDPOverTCPConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) { //defer conn.SetDeadline(time.Time{}) switch con := conn.(type) { case *net.TCPConn: @@ -31,44 +29,32 @@ func (c *fakeUDPTunnelConnector) ConnectContext(ctx context.Context, conn net.Co if err != nil { return nil, err } - err = con.SetKeepAlivePeriod(15 * time.Second) + err = con.SetKeepAlivePeriod(config.KeepAliveTime) if err != nil { return nil, err } } - return newFakeUDPTunnelConnOverTCP(ctx, conn) + return newUDPConnOverTCP(ctx, conn) } -type fakeUdpHandler struct { +type UDPOverTCPHandler struct { // map[srcIP]net.Conn routeMapTCP *sync.Map - packetChan chan *datagramPacket + packetChan chan *DatagramPacket } func TCPHandler() Handler { - return &fakeUdpHandler{ + return &UDPOverTCPHandler{ routeMapTCP: RouteMapTCP, packetChan: TCPPacketChan, } } -func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { +func (h *UDPOverTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) { defer tcpConn.Close() plog.G(ctx).Debugf("[TCP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) - defer func(addr net.Addr) { - var keys []string - h.routeMapTCP.Range(func(key, value any) bool { - if value.(net.Conn) == tcpConn { - keys = append(keys, key.(string)) - } - return true - }) - for _, key := range keys { - h.routeMapTCP.Delete(key) - } - plog.G(ctx).Debugf("[TCP] To %s by conn %s from globle route map TCP", strings.Join(keys, " "), addr) - }(tcpConn.LocalAddr()) + defer h.removeFromRouteMapTCP(ctx, tcpConn) for { select { @@ -78,7 +64,7 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { } buf := config.LPool.Get().([]byte)[:] - dgram, err := readDatagramPacketServer(tcpConn, buf[:]) + packet, err := readDatagramPacketServer(tcpConn, buf[:]) if err != nil { plog.G(ctx).Errorf("[TCP] %s -> %s : %v", tcpConn.RemoteAddr(), tcpConn.LocalAddr(), err) config.LPool.Put(buf[:]) @@ -86,7 +72,7 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { } var src net.IP - src, _, err = util.ParseIP(dgram.Data[:dgram.DataLength]) + src, _, err = util.ParseIP(packet.Data[:packet.DataLength]) if err != nil { plog.G(ctx).Errorf("[TCP] Unknown packet") config.LPool.Put(buf[:]) @@ -101,43 +87,54 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { } else { plog.G(ctx).Debugf("[TCP] Add new route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) } - util.SafeWrite(h.packetChan, dgram) + util.SafeWrite(h.packetChan, packet) } } -// fake udp connect over tcp -type fakeUDPTunnelConn struct { +func (h *UDPOverTCPHandler) removeFromRouteMapTCP(ctx context.Context, tcpConn net.Conn) { + h.routeMapTCP.Range(func(key, value any) bool { + if value.(net.Conn) == tcpConn { + plog.G(ctx).Debugf("[TCP] Delete to DST: %s by conn %s from globle route map TCP", key, tcpConn.LocalAddr()) + } + return true + }) +} + +var _ net.PacketConn = (*UDPConnOverTCP)(nil) + +// UDPConnOverTCP fake udp connection over tcp connection +type UDPConnOverTCP struct { // tcp connection net.Conn ctx context.Context } -func newFakeUDPTunnelConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) { - return &fakeUDPTunnelConn{ctx: ctx, Conn: conn}, nil +func newUDPConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) { + return &UDPConnOverTCP{ctx: ctx, Conn: conn}, nil } -func (c *fakeUDPTunnelConn) ReadFrom(b []byte) (int, net.Addr, error) { +func (c *UDPConnOverTCP) ReadFrom(b []byte) (int, net.Addr, error) { select { case <-c.ctx.Done(): return 0, nil, c.ctx.Err() default: - dgram, err := readDatagramPacket(c.Conn, b) + packet, err := readDatagramPacket(c.Conn, b) if err != nil { return 0, nil, err } - return int(dgram.DataLength), dgram.Addr(), nil + return int(packet.DataLength), nil, nil } } -func (c *fakeUDPTunnelConn) WriteTo(b []byte, _ net.Addr) (int, error) { - dgram := newDatagramPacket(b) - if err := dgram.Write(c.Conn); err != nil { +func (c *UDPConnOverTCP) WriteTo(b []byte, _ net.Addr) (int, error) { + packet := newDatagramPacket(b) + if err := packet.Write(c.Conn); err != nil { return 0, err } return len(b), nil } -func (c *fakeUDPTunnelConn) Close() error { +func (c *UDPConnOverTCP) Close() error { if cc, ok := c.Conn.(interface{ CloseRead() error }); ok { _ = cc.CloseRead() } diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index 57e4389c..4dcb9258 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -4,7 +4,6 @@ import ( "context" "net" "sync" - "time" "github.com/wencaiwulue/kubevpn/v2/pkg/config" plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" @@ -12,103 +11,83 @@ import ( ) const ( - MaxSize = 1000 + MaxSize = 100 ) type tunHandler struct { - chain *Chain + forward *Forwarder node *Node - routeMapUDP *RouteMap - // map[srcIP]net.Conn + routeMapUDP *sync.Map routeMapTCP *sync.Map - chExit chan error -} - -type RouteMap struct { - lock *sync.RWMutex - routes map[string]net.Addr -} - -func NewRouteMap() *RouteMap { - return &RouteMap{ - lock: &sync.RWMutex{}, - routes: map[string]net.Addr{}, - } -} - -func (n *RouteMap) LoadOrStore(to net.IP, addr net.Addr) (net.Addr, bool) { - n.lock.RLock() - route, load := n.routes[to.String()] - n.lock.RUnlock() - if load { - return route, true - } - - n.lock.Lock() - defer n.lock.Unlock() - n.routes[to.String()] = addr - return addr, false -} - -func (n *RouteMap) Store(to net.IP, addr net.Addr) { - n.lock.Lock() - defer n.lock.Unlock() - n.routes[to.String()] = addr -} - -func (n *RouteMap) RouteTo(ip net.IP) net.Addr { - n.lock.RLock() - defer n.lock.RUnlock() - return n.routes[ip.String()] + errChan chan error } // TunHandler creates a handler for tun tunnel. -func TunHandler(chain *Chain, node *Node) Handler { +func TunHandler(forward *Forwarder, node *Node) Handler { return &tunHandler{ - chain: chain, + forward: forward, node: node, - routeMapUDP: NewRouteMap(), + routeMapUDP: &sync.Map{}, routeMapTCP: RouteMapTCP, - chExit: make(chan error, 1), + errChan: make(chan error, 1), } } func (h *tunHandler) Handle(ctx context.Context, tun net.Conn) { - if h.node.Remote != "" { - h.HandleClient(ctx, tun) + if remote := h.node.Remote; remote != "" { + remoteAddr, err := net.ResolveUDPAddr("udp", remote) + if err != nil { + plog.G(ctx).Errorf("[TUN-CLIENT] Failed to resolve udp addr %s: %v", remote, err) + return + } + h.HandleClient(ctx, tun, remoteAddr) } else { h.HandleServer(ctx, tun) } } +func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) { + device := &Device{ + tun: tun, + tunInbound: make(chan *Packet, MaxSize), + tunOutbound: make(chan *Packet, MaxSize), + errChan: h.errChan, + } + + defer device.Close() + go device.readFromTUN() + go device.writeToTUN() + go device.transport(ctx, h.node.Addr, h.routeMapUDP, h.routeMapTCP) + + select { + case err := <-device.errChan: + plog.G(ctx).Errorf("Device exit: %v", err) + return + case <-ctx.Done(): + return + } +} + type Device struct { tun net.Conn - tunInbound chan *DataElem - tunOutbound chan *DataElem + tunInbound chan *Packet + tunOutbound chan *Packet - // your main logic - tunInboundHandler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) - - chExit chan error + errChan chan error } -func (d *Device) readFromTun() { +func (d *Device) readFromTUN() { defer util.HandleCrash() for { buf := config.LPool.Get().([]byte)[:] n, err := d.tun.Read(buf[:]) if err != nil { config.LPool.Put(buf[:]) - plog.G(context.Background()).Errorf("[TUN] Failed to read from tun: %v", err) - util.SafeWrite(d.chExit, err) + plog.G(context.Background()).Errorf("[TUN] Failed to read from tun device: %v", err) + util.SafeWrite(d.errChan, err) return } - if n == 0 { - plog.G(context.Background()).Errorf("[TUN] Read packet length 0") - config.LPool.Put(buf[:]) - continue - } src, dst, err := util.ParseIP(buf[:n]) if err != nil { @@ -117,8 +96,8 @@ func (d *Device) readFromTun() { continue } - plog.G(context.Background()).Debugf("[TUN] SRC: %s --> DST: %s, length: %d", src, dst, n) - util.SafeWrite(d.tunInbound, &DataElem{ + plog.G(context.Background()).Debugf("[TUN] SRC: %s, DST: %s, Length: %d", src, dst, n) + util.SafeWrite(d.tunInbound, &Packet{ data: buf[:], length: n, src: src, @@ -127,13 +106,14 @@ func (d *Device) readFromTun() { } } -func (d *Device) writeToTun() { +func (d *Device) writeToTUN() { defer util.HandleCrash() - for e := range d.tunOutbound { - _, err := d.tun.Write(e.data[:e.length]) - config.LPool.Put(e.data[:]) + for packet := range d.tunOutbound { + _, err := d.tun.Write(packet.data[:packet.length]) + config.LPool.Put(packet.data[:]) if err != nil { - util.SafeWrite(d.chExit, err) + plog.G(context.Background()).Errorf("[TUN] Failed to write to tun device: %v", err) + util.SafeWrite(d.errChan, err) return } } @@ -146,91 +126,51 @@ func (d *Device) Close() { util.SafeClose(TCPPacketChan) } -func heartbeats(ctx context.Context, tun net.Conn) { - tunIfi, err := util.GetTunDeviceByConn(tun) - if err != nil { - plog.G(ctx).Errorf("Failed to get tun device: %s", err.Error()) - return - } - srcIPv4, srcIPv6, dockerSrcIPv4, err := util.GetTunDeviceIP(tunIfi.Name) - if err != nil { - return - } - - ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() - - for ; true; <-ticker.C { - select { - case <-ctx.Done(): - return - default: - } - - if srcIPv4 != nil { - go util.Ping(ctx, srcIPv4.String(), config.RouterIP.String()) - } - if srcIPv6 != nil { - go util.Ping(ctx, srcIPv6.String(), config.RouterIP6.String()) - } - if dockerSrcIPv4 != nil { - go util.Ping(ctx, dockerSrcIPv4.String(), config.DockerRouterIP.String()) - } - } -} - -func (d *Device) Start(ctx context.Context) { - go d.readFromTun() - go d.tunInboundHandler(d.tunInbound, d.tunOutbound) - go d.writeToTun() - - select { - case err := <-d.chExit: - plog.G(ctx).Errorf("Device exit: %v", err) - return - case <-ctx.Done(): - return - } -} - -func (d *Device) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)) { - d.tunInboundHandler = handler -} - -func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) { - device := &Device{ - tun: tun, - tunInbound: make(chan *DataElem, MaxSize), - tunOutbound: make(chan *DataElem, MaxSize), - chExit: h.chExit, - } - device.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) { - for ctx.Err() == nil { - packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", h.node.Addr) +func (d *Device) transport(ctx context.Context, addr string, routeMapUDP *sync.Map, routeMapTCP *sync.Map) { + for ctx.Err() == nil { + func() { + packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", addr) if err != nil { - plog.G(ctx).Errorf("[UDP] Failed to listen %s: %v", h.node.Addr, err) + plog.G(ctx).Errorf("[UDP] Failed to listen %s: %v", addr, err) return } - err = transportTunServer(ctx, tunInbound, tunOutbound, packetConn, h.routeMapUDP, h.routeMapTCP) - if err != nil { - plog.G(ctx).Errorf("[TUN] %s: %v", tun.LocalAddr(), err) - } - } - }) - defer device.Close() - device.Start(ctx) + p := &Peer{ + conn: packetConn, + tcpInbound: make(chan *Packet, MaxSize), + tunInbound: d.tunInbound, + tunOutbound: d.tunOutbound, + routeMapUDP: routeMapUDP, + routeMapTCP: routeMapTCP, + errChan: d.errChan, + } + + defer p.Close() + go p.readFromConn() + go p.readFromTCPConn() + go p.routeTCP() + go p.routeTUN() + + select { + case err = <-p.errChan: + plog.G(ctx).Errorf("[TUN] %s: %v", d.tun.LocalAddr(), err) + return + case <-ctx.Done(): + return + } + }() + } } -type DataElem struct { +type Packet struct { data []byte length int src net.IP dst net.IP } -func NewDataElem(data []byte, length int, src net.IP, dst net.IP) *DataElem { - return &DataElem{ +func NewDataElem(data []byte, length int, src net.IP, dst net.IP) *Packet { + return &Packet{ data: data, length: length, src: src, @@ -238,32 +178,24 @@ func NewDataElem(data []byte, length int, src net.IP, dst net.IP) *DataElem { } } -func (d *DataElem) Data() []byte { +func (d *Packet) Data() []byte { return d.data } -func (d *DataElem) Length() int { +func (d *Packet) Length() int { return d.length } -type udpElem struct { - from net.Addr - data []byte - length int - src net.IP - dst net.IP -} - type Peer struct { conn net.PacketConn - connInbound chan *udpElem + tcpInbound chan *Packet - tunInbound <-chan *DataElem - tunOutbound chan<- *DataElem + tunInbound chan *Packet + tunOutbound chan<- *Packet // map[srcIP.String()]net.Addr for udp - routeMapUDP *RouteMap + routeMapUDP *sync.Map // map[srcIP.String()]net.Conn for tcp routeMapTCP *sync.Map @@ -294,17 +226,16 @@ func (p *Peer) readFromConn() { plog.G(context.Background()).Errorf("[TUN] Unknown packet: %v", err) continue } - if addr, loaded := p.routeMapUDP.LoadOrStore(src, from); loaded { - if addr.String() != from.String() { - p.routeMapUDP.Store(src, from) + if addr, loaded := p.routeMapUDP.LoadOrStore(src.String(), from); loaded { + if addr.(net.Addr).String() != from.String() { + p.routeMapUDP.Store(src.String(), from) plog.G(context.Background()).Debugf("[TUN] Replace route map UDP: %s -> %s", src, from) } } else { plog.G(context.Background()).Debugf("[TUN] Add new route map UDP: %s -> %s", src, from) } - p.connInbound <- &udpElem{ - from: from, + p.tunInbound <- &Packet{ data: buf[:], length: n, src: src, @@ -318,49 +249,40 @@ func (p *Peer) readFromTCPConn() { for packet := range TCPPacketChan { src, dst, err := util.ParseIP(packet.Data) if err != nil { - plog.G(context.Background()).Errorf("[TUN] Unknown packet") + plog.G(context.Background()).Errorf("[TCP] Unknown packet") config.LPool.Put(packet.Data[:]) continue } - u := &udpElem{ + plog.G(context.Background()).Debugf("[TCP] SRC: %s > DST: %s Length: %d", src, dst, packet.DataLength) + p.tcpInbound <- &Packet{ data: packet.Data[:], length: int(packet.DataLength), src: src, dst: dst, } - plog.G(context.Background()).Debugf("[TCP] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length) - p.connInbound <- u } } -func (p *Peer) routePeer() { +func (p *Peer) routeTCP() { defer util.HandleCrash() - for e := range p.connInbound { - if routeToAddr := p.routeMapUDP.RouteTo(e.dst); routeToAddr != nil { - plog.G(context.Background()).Debugf("[UDP] Find UDP route to dst: %s -> %s", e.dst, routeToAddr) - _, err := p.conn.WriteTo(e.data[:e.length], routeToAddr) - config.LPool.Put(e.data[:]) - if err != nil { - p.sendErr(err) - return - } - } else if conn, ok := p.routeMapTCP.Load(e.dst.String()); ok { - plog.G(context.Background()).Debugf("[TCP] Find TCP route to dst: %s -> %s", e.dst.String(), conn.(net.Conn).RemoteAddr()) - dgram := newDatagramPacket(e.data[:e.length]) + for packet := range p.tcpInbound { + if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok { + plog.G(context.Background()).Debugf("[TCP] Find TCP route SRC: %s to DST: %s -> %s", packet.src.String(), packet.dst.String(), conn.(net.Conn).RemoteAddr()) + dgram := newDatagramPacket(packet.data[:packet.length]) err := dgram.Write(conn.(net.Conn)) - config.LPool.Put(e.data[:]) + config.LPool.Put(packet.data[:]) if err != nil { - plog.G(context.Background()).Errorf("[TCP] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) + plog.G(context.Background()).Errorf("[TCP] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) p.sendErr(err) return } } else { - plog.G(context.Background()).Debugf("[TUN] Not found route to dst: %s, write to TUN device", e.dst.String()) - p.tunOutbound <- &DataElem{ - data: e.data, - length: e.length, - src: e.src, - dst: e.dst, + plog.G(context.Background()).Debugf("[TCP] Not found route, write to TUN device. SRC: %s, DST: %s", packet.src.String(), packet.dst.String()) + p.tunOutbound <- &Packet{ + data: packet.data, + length: packet.length, + src: packet.src, + dst: packet.dst, } } } @@ -368,63 +290,33 @@ func (p *Peer) routePeer() { func (p *Peer) routeTUN() { defer util.HandleCrash() - for e := range p.tunInbound { - if addr := p.routeMapUDP.RouteTo(e.dst); addr != nil { - plog.G(context.Background()).Debugf("[TUN] Find UDP route to dst: %s -> %s", e.dst, addr) - _, err := p.conn.WriteTo(e.data[:e.length], addr) - config.LPool.Put(e.data[:]) + for packet := range p.tunInbound { + if addr, ok := p.routeMapUDP.Load(packet.dst.String()); ok { + plog.G(context.Background()).Debugf("[TUN] Find UDP route to DST: %s -> %s, SRC: %s, DST: %s", packet.dst, addr, packet.src.String(), packet.dst.String()) + _, err := p.conn.WriteTo(packet.data[:packet.length], addr.(net.Addr)) + config.LPool.Put(packet.data[:]) if err != nil { - plog.G(context.Background()).Debugf("[TUN] Failed wirte to route dst: %s -> %s", e.dst, addr) + plog.G(context.Background()).Debugf("[TUN] Failed wirte to route dst: %s -> %s", packet.dst, addr) p.sendErr(err) return } - } else if conn, ok := p.routeMapTCP.Load(e.dst.String()); ok { - plog.G(context.Background()).Debugf("[TUN] Find TCP route to dst: %s -> %s", e.dst.String(), conn.(net.Conn).RemoteAddr()) - dgram := newDatagramPacket(e.data[:e.length]) + } else if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok { + plog.G(context.Background()).Debugf("[TUN] Find TCP route to dst: %s -> %s", packet.dst.String(), conn.(net.Conn).RemoteAddr()) + dgram := newDatagramPacket(packet.data[:packet.length]) err := dgram.Write(conn.(net.Conn)) - config.LPool.Put(e.data[:]) + config.LPool.Put(packet.data[:]) if err != nil { - plog.G(context.Background()).Errorf("[TUN] Failed to write TCP %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) + plog.G(context.Background()).Errorf("[TUN] Failed to write TCP %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) p.sendErr(err) return } } else { - plog.G(context.Background()).Errorf("[TUN] No route for src: %s -> dst: %s, drop it", e.src, e.dst) - config.LPool.Put(e.data[:]) + plog.G(context.Background()).Errorf("[TUN] No route for src: %s -> dst: %s, drop it", packet.src, packet.dst) + config.LPool.Put(packet.data[:]) } } } -func (p *Peer) Start() { - go p.readFromConn() - go p.readFromTCPConn() - go p.routePeer() - go p.routeTUN() -} - func (p *Peer) Close() { p.conn.Close() } - -func transportTunServer(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, routeMapUDP *RouteMap, routeMapTCP *sync.Map) error { - p := &Peer{ - conn: packetConn, - connInbound: make(chan *udpElem, MaxSize), - tunInbound: tunInbound, - tunOutbound: tunOutbound, - routeMapUDP: routeMapUDP, - routeMapTCP: routeMapTCP, - errChan: make(chan error, 2), - } - - defer p.Close() - p.Start() - - select { - case err := <-p.errChan: - plog.G(ctx).Errorf(err.Error()) - return err - case <-ctx.Done(): - return nil - } -} diff --git a/pkg/core/tunhandlerclient.go b/pkg/core/tunhandlerclient.go index 85c52ac1..a2ee533f 100644 --- a/pkg/core/tunhandlerclient.go +++ b/pkg/core/tunhandlerclient.go @@ -13,56 +13,66 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) { - defer tun.Close() - remoteAddr, err := net.ResolveUDPAddr("udp", h.node.Remote) - if err != nil { - plog.G(ctx).Errorf("[TUN-CLIENT] Failed to resolve udp addr %s: %v", h.node.Remote, err) - return - } - in := make(chan *DataElem, MaxSize) - out := make(chan *DataElem, MaxSize) - defer util.SafeClose(in) - defer util.SafeClose(out) - - d := &ClientDevice{ +func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn, remoteAddr *net.UDPAddr) { + device := &ClientDevice{ tun: tun, - tunInbound: in, - tunOutbound: out, - chExit: h.chExit, + tunInbound: make(chan *Packet, MaxSize), + tunOutbound: make(chan *Packet, MaxSize), + errChan: h.errChan, } - d.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) { - for ctx.Err() == nil { - packetConn, err := getRemotePacketConn(ctx, h.chain) - if err != nil { - plog.G(ctx).Debugf("[TUN-CLIENT] Failed to get remote conn from %s -> %s: %s", tun.LocalAddr(), remoteAddr, err) - time.Sleep(time.Millisecond * 200) - continue - } - err = transportTunClient(ctx, tunInbound, tunOutbound, packetConn, remoteAddr) - if err != nil { - plog.G(ctx).Debugf("[TUN-CLIENT] %s: %v", tun.LocalAddr(), err) - } - } - }) - d.Start(ctx) + defer device.Close() + go device.forwardPacketToRemote(ctx, remoteAddr, h.forward) + go device.readFromTun() + go device.writeToTun() + go heartbeats(ctx, device.tun) + select { + case <-device.errChan: + case <-ctx.Done(): + } } -func getRemotePacketConn(ctx context.Context, chain *Chain) (packetConn net.PacketConn, err error) { +type ClientDevice struct { + tun net.Conn + tunInbound chan *Packet + tunOutbound chan *Packet + errChan chan error + + remote *net.UDPAddr + forward *Forwarder +} + +func (d *ClientDevice) forwardPacketToRemote(ctx context.Context, remoteAddr *net.UDPAddr, forward *Forwarder) { + for ctx.Err() == nil { + func() { + packetConn, err := getRemotePacketConn(ctx, forward) + if err != nil { + plog.G(ctx).Debugf("[TUN-CLIENT] Failed to get remote conn from %s -> %s: %s", d.tun.LocalAddr(), remoteAddr, err) + time.Sleep(time.Millisecond * 200) + return + } + err = transportTunPacketClient(ctx, d.tunInbound, d.tunOutbound, packetConn, remoteAddr) + if err != nil { + plog.G(ctx).Debugf("[TUN-CLIENT] %s: %v", d.tun.LocalAddr(), err) + } + }() + } +} + +func getRemotePacketConn(ctx context.Context, forwarder *Forwarder) (packetConn net.PacketConn, err error) { defer func() { if err != nil && packetConn != nil { _ = packetConn.Close() } }() - if !chain.IsEmpty() { - var cc net.Conn - cc, err = chain.DialContext(ctx) + if !forwarder.IsEmpty() { + var conn net.Conn + conn, err = forwarder.DialContext(ctx) if err != nil { return } var ok bool - if packetConn, ok = cc.(net.PacketConn); !ok { + if packetConn, ok = conn.(net.PacketConn); !ok { err = errors.New("not a packet connection") return } @@ -76,20 +86,21 @@ func getRemotePacketConn(ctx context.Context, chain *Chain) (packetConn net.Pack return } -func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, remoteAddr net.Addr) error { +func transportTunPacketClient(ctx context.Context, tunInbound <-chan *Packet, tunOutbound chan<- *Packet, packetConn net.PacketConn, remoteAddr net.Addr) error { errChan := make(chan error, 2) defer packetConn.Close() go func() { defer util.HandleCrash() - for e := range tunInbound { - if e.src.Equal(e.dst) { - util.SafeWrite(tunOutbound, e) + for packet := range tunInbound { + if packet.src.Equal(packet.dst) { + util.SafeWrite(tunOutbound, packet) continue } - _, err := packetConn.WriteTo(e.data[:e.length], remoteAddr) - config.LPool.Put(e.data[:]) + _, err := packetConn.WriteTo(packet.data[:packet.length], remoteAddr) + config.LPool.Put(packet.data[:]) if err != nil { + plog.G(ctx).Errorf("failed to write packet to remote %s: %v", remoteAddr, err) util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to write packet to remote %s", remoteAddr))) return } @@ -106,7 +117,7 @@ func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOut util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to read packet from remote %s", remoteAddr))) return } - util.SafeWrite(tunOutbound, &DataElem{data: buf[:], length: n}) + util.SafeWrite(tunOutbound, &Packet{data: buf[:], length: n}) } }() @@ -118,54 +129,22 @@ func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOut } } -type ClientDevice struct { - tun net.Conn - tunInbound chan *DataElem - tunOutbound chan *DataElem - // your main logic - tunInboundHandler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) - chExit chan error -} - -func (d *ClientDevice) Start(ctx context.Context) { - go d.tunInboundHandler(d.tunInbound, d.tunOutbound) - go heartbeats(ctx, d.tun) - go d.readFromTun() - go d.writeToTun() - - select { - case err := <-d.chExit: - plog.G(ctx).Errorf("[TUN-CLIENT]: %v", err) - return - case <-ctx.Done(): - return - } -} - -func (d *ClientDevice) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)) { - d.tunInboundHandler = handler -} - func (d *ClientDevice) readFromTun() { defer util.HandleCrash() for { buf := config.LPool.Get().([]byte)[:] n, err := d.tun.Read(buf[:]) if err != nil { - util.SafeWrite(d.chExit, err) + util.SafeWrite(d.errChan, err) config.LPool.Put(buf[:]) return } - if n == 0 { - config.LPool.Put(buf[:]) - continue - } // Try to determine network protocol number, default zero. var src, dst net.IP src, dst, err = util.ParseIP(buf[:n]) if err != nil { - plog.G(context.Background()).Debugf("[TUN-GVISOR] Unknown packet: %v", err) + plog.G(context.Background()).Errorf("[TUN-RAW] Unknown packet: %v", err) config.LPool.Put(buf[:]) continue } @@ -180,8 +159,47 @@ func (d *ClientDevice) writeToTun() { _, err := d.tun.Write(e.data[:e.length]) config.LPool.Put(e.data[:]) if err != nil { - util.SafeWrite(d.chExit, err) + util.SafeWrite(d.errChan, err) return } } } + +func (d *ClientDevice) Close() { + d.tun.Close() + util.SafeClose(d.tunInbound) + util.SafeClose(d.tunOutbound) +} + +func heartbeats(ctx context.Context, tun net.Conn) { + tunIfi, err := util.GetTunDeviceByConn(tun) + if err != nil { + plog.G(ctx).Errorf("Failed to get tun device: %s", err.Error()) + return + } + srcIPv4, srcIPv6, dockerSrcIPv4, err := util.GetTunDeviceIP(tunIfi.Name) + if err != nil { + return + } + + ticker := time.NewTicker(time.Second * 60) + defer ticker.Stop() + + for ; true; <-ticker.C { + select { + case <-ctx.Done(): + return + default: + } + + if srcIPv4 != nil { + util.Ping(ctx, srcIPv4.String(), config.RouterIP.String()) + } + if srcIPv6 != nil { + util.Ping(ctx, srcIPv6.String(), config.RouterIP6.String()) + } + if dockerSrcIPv4 != nil { + util.Ping(ctx, dockerSrcIPv4.String(), config.DockerRouterIP.String()) + } + } +} diff --git a/pkg/core/udpovertcp.go b/pkg/core/udpovertcp.go index 60859543..4aeafdc9 100644 --- a/pkg/core/udpovertcp.go +++ b/pkg/core/udpovertcp.go @@ -2,65 +2,51 @@ package core import ( "encoding/binary" - "fmt" "io" - "net" "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) -type datagramPacket struct { +type DatagramPacket struct { DataLength uint16 // [2]byte Data []byte // []byte } -func (addr *datagramPacket) String() string { - if addr == nil { - return "" - } - return fmt.Sprintf("DataLength: %d, Data: %v\n", addr.DataLength, addr.Data) -} - -func newDatagramPacket(data []byte) (r *datagramPacket) { - return &datagramPacket{ +func newDatagramPacket(data []byte) (r *DatagramPacket) { + return &DatagramPacket{ DataLength: uint16(len(data)), Data: data, } } -func (addr *datagramPacket) Addr() net.Addr { - var server8422, _ = net.ResolveUDPAddr("udp", "127.0.0.1:8422") - return server8422 -} - -func readDatagramPacket(r io.Reader, b []byte) (*datagramPacket, error) { +func readDatagramPacket(r io.Reader, b []byte) (*DatagramPacket, error) { _, err := io.ReadFull(r, b[:2]) if err != nil { return nil, err } dataLength := binary.BigEndian.Uint16(b[:2]) _, err = io.ReadFull(r, b[:dataLength]) - if err != nil /*&& (err != io.ErrUnexpectedEOF || err != io.EOF)*/ { + if err != nil { return nil, err } - return &datagramPacket{DataLength: dataLength, Data: b[:dataLength]}, nil + return &DatagramPacket{DataLength: dataLength, Data: b[:dataLength]}, nil } // this method will return all byte array in the way: b[:] -func readDatagramPacketServer(r io.Reader, b []byte) (*datagramPacket, error) { +func readDatagramPacketServer(r io.Reader, b []byte) (*DatagramPacket, error) { _, err := io.ReadFull(r, b[:2]) if err != nil { return nil, err } dataLength := binary.BigEndian.Uint16(b[:2]) _, err = io.ReadFull(r, b[:dataLength]) - if err != nil /*&& (err != io.ErrUnexpectedEOF || err != io.EOF)*/ { + if err != nil { return nil, err } - return &datagramPacket{DataLength: dataLength, Data: b[:]}, nil + return &DatagramPacket{DataLength: dataLength, Data: b[:]}, nil } -func (addr *datagramPacket) Write(w io.Writer) error { +func (addr *DatagramPacket) Write(w io.Writer) error { buf := config.LPool.Get().([]byte)[:] defer config.LPool.Put(buf[:]) binary.BigEndian.PutUint16(buf[:2], uint16(len(addr.Data))) diff --git a/pkg/daemon/action/sshdaemon.go b/pkg/daemon/action/sshdaemon.go index 2545e7d6..d4b1fe86 100644 --- a/pkg/daemon/action/sshdaemon.go +++ b/pkg/daemon/action/sshdaemon.go @@ -2,15 +2,16 @@ package action import ( "context" - plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "net" "sync" "github.com/containernetworking/cni/pkg/types" + "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/core" "github.com/wencaiwulue/kubevpn/v2/pkg/daemon/rpc" "github.com/wencaiwulue/kubevpn/v2/pkg/handler" + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" "github.com/wencaiwulue/kubevpn/v2/pkg/tun" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) @@ -44,7 +45,7 @@ func (svr *Server) SshStart(ctx context.Context, req *rpc.SshStartRequest) (resp }() r := core.Route{ - ServeNodes: []string{ + Listeners: []string{ "tun://127.0.0.1:8422?net=" + DefaultServerIP, "tcp://:10800", }, diff --git a/pkg/daemon/handler/ssh.go b/pkg/daemon/handler/ssh.go index c2eecd9b..0baa17c5 100644 --- a/pkg/daemon/handler/ssh.go +++ b/pkg/daemon/handler/ssh.go @@ -130,10 +130,10 @@ func (w *wsHandler) createTwoWayTUNTunnel(ctx context.Context, cli *ssh.Client) w.PrintLine(msg) w.cidr = append(w.cidr, string(serverIP)) r := core.Route{ - ServeNodes: []string{ + Listeners: []string{ fmt.Sprintf("tun:/127.0.0.1:8422?net=%s&route=%s", clientIP, strings.Join(w.cidr, ",")), }, - ChainNode: fmt.Sprintf("tcp://127.0.0.1:%d", localPort), + Forwarder: fmt.Sprintf("tcp://127.0.0.1:%d", localPort), Retries: 5, } servers, err := handler.Parse(r) diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index a8c22fdc..c1636fb2 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -416,18 +416,18 @@ func (c *ConnectOptions) startLocalTunServer(ctx context.Context, forwardAddress return err } - chainNode, err := core.ParseNode(forwardAddress) + forward, err := core.ParseNode(forwardAddress) if err != nil { plog.G(ctx).Errorf("Failed to parse forward node %s: %v", forwardAddress, err) return err } - chainNode.Client = &core.Client{ - Connector: core.UDPOverTCPTunnelConnector(), + forward.Client = &core.Client{ + Connector: core.NewUDPOverTCPConnector(), Transporter: core.TCPTransporter(), } - chain := core.NewChain(5, chainNode) + forwarder := core.NewForwarder(5, forward) - handler := core.TunHandler(chain, node) + handler := core.TunHandler(forwarder, node) listener, err := tun.Listener(tunConfig) if err != nil { plog.G(ctx).Errorf("Failed to create tun listener: %v", err) @@ -703,7 +703,7 @@ func Parse(r core.Route) ([]core.Server, error) { return nil, err } if len(servers) == 0 { - return nil, fmt.Errorf("server is empty, server config: %s", strings.Join(r.ServeNodes, ",")) + return nil, fmt.Errorf("server is empty, server config: %s", strings.Join(r.Listeners, ",")) } return servers, nil } @@ -772,7 +772,7 @@ func (c *ConnectOptions) getCIDR(ctx context.Context, m *dhcp.Manager) (err erro } } if len(c.cidrs) != 0 { - plog.G(ctx).Infoln("Got network CIDR from cache") + plog.G(ctx).Infoln("Get network CIDR from cache") return nil } } diff --git a/pkg/handler/remote.go b/pkg/handler/remote.go index 475f7643..d495ef26 100644 --- a/pkg/handler/remote.go +++ b/pkg/handler/remote.go @@ -365,21 +365,19 @@ func genDeploySpec(namespace string, udp8422 string, tcp10800 string, tcp9002 st Args: []string{util.If( gvisor, ` -kubevpn server -L "tcp://:10800" -L "gtcp://:10801" -L "gudp://:10802" --debug=true`, +kubevpn server -l "tcp://:10800" -l "gtcp://:10801" -l "gudp://:10802" --debug=true`, ` sysctl -w net.ipv4.ip_forward=1 sysctl -w net.ipv6.conf.all.disable_ipv6=0 sysctl -w net.ipv6.conf.all.forwarding=1 update-alternatives --set iptables /usr/sbin/iptables-legacy -iptables -F -ip6tables -F iptables -P INPUT ACCEPT ip6tables -P INPUT ACCEPT iptables -P FORWARD ACCEPT ip6tables -P FORWARD ACCEPT iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE -kubevpn server -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:10801" -L "gudp://:10802" --debug=true`, +kubevpn server -l "tcp://:10800" -l "tun://:8422?net=${TunIPv4}&net6=${TunIPv6}" -l "gtcp://:10801" -l "gudp://:10802" --debug=true`, )}, EnvFrom: []v1.EnvFromSource{{ SecretRef: &v1.SecretEnvSource{ diff --git a/pkg/inject/controller.go b/pkg/inject/controller.go index 0713d183..917836a5 100644 --- a/pkg/inject/controller.go +++ b/pkg/inject/controller.go @@ -51,8 +51,6 @@ sysctl -w net.ipv4.ip_forward=1 sysctl -w net.ipv6.conf.all.disable_ipv6=0 sysctl -w net.ipv6.conf.all.forwarding=1 update-alternatives --set iptables /usr/sbin/iptables-legacy -iptables -F -ip6tables -F iptables -P INPUT ACCEPT ip6tables -P INPUT ACCEPT iptables -P FORWARD ACCEPT @@ -61,7 +59,7 @@ iptables -t nat -A PREROUTING ! -p icmp ! -s 127.0.0.1 ! -d ${CIDR4} -j DNAT --t ip6tables -t nat -A PREROUTING ! -p icmp ! -s 0:0:0:0:0:0:0:1 ! -d ${CIDR6} -j DNAT --to :15006 iptables -t nat -A POSTROUTING ! -p icmp ! -s 127.0.0.1 ! -d ${CIDR4} -j MASQUERADE ip6tables -t nat -A POSTROUTING ! -p icmp ! -s 0:0:0:0:0:0:0:1 ! -d ${CIDR6} -j MASQUERADE -kubevpn server -L "tun:/localhost:8422?net=${TunIPv4}&route=${CIDR4}" -F "tcp://${TrafficManagerService}:10800"`, +kubevpn server -l "tun:/localhost:8422?net=${TunIPv4}&net6=${TunIPv6}&route=${CIDR4}" -f "tcp://${TrafficManagerService}:10800"`, }, Env: []v1.EnvVar{ { @@ -170,7 +168,7 @@ func AddEnvoyContainer(spec *v1.PodTemplateSpec, ns, nodeId string, ipv6 bool, c Image: config.Image, Command: []string{"/bin/sh", "-c"}, Args: []string{` -kubevpn server -L "ssh://:2222"`, +kubevpn server -l "ssh://:2222"`, }, Resources: v1.ResourceRequirements{ Requests: map[v1.ResourceName]resource.Quantity{ diff --git a/pkg/inject/exchange.go b/pkg/inject/exchange.go index b347298c..ba8c006d 100644 --- a/pkg/inject/exchange.go +++ b/pkg/inject/exchange.go @@ -84,8 +84,6 @@ sysctl -w net.ipv6.conf.all.disable_ipv6=0 sysctl -w net.ipv6.conf.all.forwarding=1 sysctl -w net.ipv4.conf.all.route_localnet=1 update-alternatives --set iptables /usr/sbin/iptables-legacy -iptables -F -ip6tables -F iptables -P INPUT ACCEPT ip6tables -P INPUT ACCEPT iptables -P FORWARD ACCEPT @@ -94,7 +92,7 @@ iptables -t nat -A PREROUTING ! -p icmp -j DNAT --to ${LocalTunIPv4} ip6tables -t nat -A PREROUTING ! -p icmp -j DNAT --to ${LocalTunIPv6} iptables -t nat -A POSTROUTING ! -p icmp -j MASQUERADE ip6tables -t nat -A POSTROUTING ! -p icmp -j MASQUERADE -kubevpn server -L "tun:/127.0.0.1:8422?net=${TunIPv4}&route=${CIDR4}" -F "tcp://${TrafficManagerService}:10800"`, +kubevpn server -l "tun:/127.0.0.1:8422?net=${TunIPv4}&net6=${TunIPv6}&route=${CIDR4}" -f "tcp://${TrafficManagerService}:10800"`, }, SecurityContext: &corev1.SecurityContext{ Capabilities: &corev1.Capabilities{ diff --git a/pkg/log/logger.go b/pkg/log/logger.go index 3fe73180..476ca7cc 100644 --- a/pkg/log/logger.go +++ b/pkg/log/logger.go @@ -32,7 +32,7 @@ func GetLoggerForClient(level int32, out io.Writer) *log.Logger { func InitLoggerForServer() *log.Logger { return &log.Logger{ Out: os.Stderr, - Formatter: &format{}, + Formatter: &serverFormat{}, Hooks: make(log.LevelHooks), Level: log.DebugLevel, ExitFunc: os.Exit, diff --git a/pkg/tun/tun_darwin.go b/pkg/tun/tun_darwin.go index 81d70399..7205cd18 100644 --- a/pkg/tun/tun_darwin.go +++ b/pkg/tun/tun_darwin.go @@ -76,7 +76,7 @@ func createTun(cfg Config) (conn net.Conn, itf *net.Interface, err error) { } if err = addTunRoutes(name, cfg.Routes...); err != nil { - err = pkgerr.Wrap(err, "Add tun routes failed") + err = pkgerr.Wrap(err, "Add tun device routes failed") return }