From 211c9309b230197688fbcfea4fd7db1e885f0af5 Mon Sep 17 00:00:00 2001 From: naison <895703375@qq.com> Date: Sat, 5 Jul 2025 21:43:44 +0800 Subject: [PATCH] feat: handle local conn with gvisor (#665) * feat: handle local conn with gvisor * feat: remove udp route map * feat: optimize code * feat: length * feat: works * feat: should works * feat: optimize code * feat: optimize code * feat: gudp not set remark * feat: ut * feat: set to default value 0 * feat: send reset to gvisor tcp forward request if error * feat: not need to create firewall rule on windows * feat: typo --- pkg/core/gvisorlocalstack.go | 104 +++++++++++ pkg/core/gvisorlocaltcpforwarder.go | 84 +++++++++ pkg/core/gvisorlocaltcphandler.go | 53 ++++++ pkg/core/gvisorlocaltunendpoint.go | 93 ++++++++++ pkg/core/gvisorlocaludpforwarder.go | 127 +++++++++++++ pkg/core/gvisortcpforwarder.go | 30 ++-- pkg/core/gvisortcphandler.go | 2 - pkg/core/gvisortunendpoint.go | 70 +++----- .../{packetconn.go => packetconnovertcp.go} | 0 pkg/core/route.go | 4 - pkg/core/tcphandler.go | 55 +----- pkg/core/tunhandler.go | 86 ++------- pkg/core/tunhandlerclient.go | 125 +++++++------ pkg/core/udpovertcp.go | 52 ++++++ pkg/daemon/action/quit.go | 3 - pkg/daemon/daemon.go | 2 + pkg/handler/connect.go | 33 +--- pkg/handler/function_test.go | 10 +- pkg/inject/fargate.go | 4 +- pkg/inject/mesh.go | 4 +- pkg/inject/proxy.go | 2 +- pkg/util/networkpolicy_other.go | 12 -- pkg/util/networkpolicy_windows.go | 168 ------------------ 23 files changed, 655 insertions(+), 468 deletions(-) create mode 100755 pkg/core/gvisorlocalstack.go create mode 100644 pkg/core/gvisorlocaltcpforwarder.go create mode 100644 pkg/core/gvisorlocaltcphandler.go create mode 100755 pkg/core/gvisorlocaltunendpoint.go create mode 100644 pkg/core/gvisorlocaludpforwarder.go rename pkg/core/{packetconn.go => packetconnovertcp.go} (100%) delete mode 100644 pkg/util/networkpolicy_other.go delete mode 100644 pkg/util/networkpolicy_windows.go diff --git a/pkg/core/gvisorlocalstack.go b/pkg/core/gvisorlocalstack.go new file mode 100755 index 00000000..49cd7c7d --- /dev/null +++ b/pkg/core/gvisorlocalstack.go @@ -0,0 +1,104 @@ +package core + +import ( + "context" + + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/header" + "gvisor.dev/gvisor/pkg/tcpip/link/packetsocket" + "gvisor.dev/gvisor/pkg/tcpip/network/ipv4" + "gvisor.dev/gvisor/pkg/tcpip/network/ipv6" + "gvisor.dev/gvisor/pkg/tcpip/stack" + "gvisor.dev/gvisor/pkg/tcpip/transport/raw" + "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" + "gvisor.dev/gvisor/pkg/tcpip/transport/udp" + + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" +) + +func NewLocalStack(ctx context.Context, tun stack.LinkEndpoint) *stack.Stack { + nicID := tcpip.NICID(1) + s := stack.New(stack.Options{ + NetworkProtocols: []stack.NetworkProtocolFactory{ + ipv4.NewProtocol, + ipv6.NewProtocol, + }, + TransportProtocols: []stack.TransportProtocolFactory{ + tcp.NewProtocol, + udp.NewProtocol, + }, + Clock: tcpip.NewStdClock(), + AllowPacketEndpointWrite: true, + HandleLocal: false, // if set to true, ping local ip will fail + // Enable raw sockets for users with sufficient + // privileges. + RawFactory: raw.EndpointFactory{}, + }) + // set handler for TCP UDP ICMP + s.SetTransportProtocolHandler(tcp.ProtocolNumber, LocalTCPForwarder(ctx, s)) + s.SetTransportProtocolHandler(udp.ProtocolNumber, LocalUDPForwarder(ctx, s)) + s.SetTransportProtocolHandler(header.ICMPv4ProtocolNumber, ICMPForwarder(ctx, s)) + s.SetTransportProtocolHandler(header.ICMPv6ProtocolNumber, ICMPForwarder(ctx, s)) + + s.SetRouteTable([]tcpip.Route{ + { + Destination: header.IPv4EmptySubnet, + NIC: nicID, + }, + { + Destination: header.IPv6EmptySubnet, + NIC: nicID, + }, + }) + + s.CreateNICWithOptions(nicID, packetsocket.New(tun), stack.NICOptions{ + Disabled: false, + Context: ctx, + }) + s.SetPromiscuousMode(nicID, true) + s.SetSpoofing(nicID, true) + + // Enable SACK Recovery. + { + opt := tcpip.TCPSACKEnabled(true) + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, &opt); err != nil { + plog.G(ctx).Fatalf("SetTransportProtocolOption(%d, &%T(%t)): %v", tcp.ProtocolNumber, opt, opt, err) + } + } + + // Set default TTLs as required by socket/netstack. + { + opt := tcpip.DefaultTTLOption(64) + if err := s.SetNetworkProtocolOption(ipv4.ProtocolNumber, &opt); err != nil { + plog.G(ctx).Fatalf("SetNetworkProtocolOption(%d, &%T(%d)): %v", ipv4.ProtocolNumber, opt, opt, err) + } + if err := s.SetNetworkProtocolOption(ipv6.ProtocolNumber, &opt); err != nil { + plog.G(ctx).Fatalf("SetNetworkProtocolOption(%d, &%T(%d)): %v", ipv6.ProtocolNumber, opt, opt, err) + } + } + + // Enable Receive Buffer Auto-Tuning. + { + opt := tcpip.TCPModerateReceiveBufferOption(true) + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, &opt); err != nil { + plog.G(ctx).Fatalf("SetTransportProtocolOption(%d, &%T(%t)): %v", tcp.ProtocolNumber, opt, opt, err) + } + } + + { + if err := s.SetForwardingDefaultAndAllNICs(ipv4.ProtocolNumber, true); err != nil { + plog.G(ctx).Fatalf("Set IPv4 forwarding: %v", err) + } + if err := s.SetForwardingDefaultAndAllNICs(ipv6.ProtocolNumber, true); err != nil { + plog.G(ctx).Fatalf("Set IPv6 forwarding: %v", err) + } + } + + { + option := tcpip.TCPModerateReceiveBufferOption(true) + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, &option); err != nil { + plog.G(ctx).Fatalf("Set TCP moderate receive buffer: %v", err) + } + } + return s +} diff --git a/pkg/core/gvisorlocaltcpforwarder.go b/pkg/core/gvisorlocaltcpforwarder.go new file mode 100644 index 00000000..58270aae --- /dev/null +++ b/pkg/core/gvisorlocaltcpforwarder.go @@ -0,0 +1,84 @@ +package core + +import ( + "context" + "fmt" + "io" + "net" + "time" + + "github.com/pkg/errors" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" + "gvisor.dev/gvisor/pkg/tcpip/stack" + "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" + "gvisor.dev/gvisor/pkg/waiter" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" +) + +func LocalTCPForwarder(ctx context.Context, s *stack.Stack) func(stack.TransportEndpointID, *stack.PacketBuffer) bool { + return tcp.NewForwarder(s, 0, 100000, func(request *tcp.ForwarderRequest) { + ctx = context.Background() + id := request.ID() + plog.G(ctx).Infof("[TUN-TCP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", + id.LocalPort, id.LocalAddress.String(), id.RemotePort, id.RemoteAddress.String(), + ) + w := &waiter.Queue{} + endpoint, tErr := request.CreateEndpoint(w) + if tErr != nil { + plog.G(ctx).Errorf("[TUN-TCP] Failed to create endpoint: %v", tErr) + request.Complete(true) + return + } + defer endpoint.Close() + conn := gonet.NewTCPConn(w, endpoint) + defer conn.Close() + var err error + defer func() { + if err != nil && !errors.Is(err, io.EOF) { + request.Complete(true) + } else { + request.Complete(false) + } + }() + + // 2, dial proxy + var host string + if id.LocalAddress.To4() != (tcpip.Address{}) { + host = "127.0.0.1" + } else { + host = net.IPv6loopback.String() + } + port := fmt.Sprintf("%d", id.LocalPort) + var d = net.Dialer{Timeout: time.Second * 5} + var remote net.Conn + remote, err = d.DialContext(ctx, "tcp", net.JoinHostPort(host, port)) + if err != nil { + plog.G(ctx).Errorf("[TUN-TCP] Failed to connect addr %s: %v", net.JoinHostPort(host, port), err) + return + } + + defer remote.Close() + errChan := make(chan error, 2) + go func() { + buf := config.LPool.Get().([]byte)[:] + defer config.LPool.Put(buf[:]) + written, err2 := io.CopyBuffer(remote, conn, buf) + plog.G(ctx).Infof("[TUN-TCP] Write length %d data to remote", written) + errChan <- err2 + }() + go func() { + buf := config.LPool.Get().([]byte)[:] + defer config.LPool.Put(buf[:]) + written, err2 := io.CopyBuffer(conn, remote, buf) + plog.G(ctx).Infof("[TUN-TCP] Read length %d data from remote", written) + errChan <- err2 + }() + err = <-errChan + if err != nil && !errors.Is(err, io.EOF) { + plog.G(ctx).Errorf("[TUN-TCP] Disconnect: %s >-<: %s: %v", conn.LocalAddr(), remote.RemoteAddr(), err) + } + }).HandlePacket +} diff --git a/pkg/core/gvisorlocaltcphandler.go b/pkg/core/gvisorlocaltcphandler.go new file mode 100644 index 00000000..fcdbf850 --- /dev/null +++ b/pkg/core/gvisorlocaltcphandler.go @@ -0,0 +1,53 @@ +package core + +import ( + "context" + + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/link/channel" + "gvisor.dev/gvisor/pkg/tcpip/link/sniffer" + "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" + "github.com/wencaiwulue/kubevpn/v2/pkg/util" +) + +type gvisorLocalTCPHandler struct { + // read from tcp conn write to gvisor inbound + gvisorInbound <-chan *Packet + // write to tcp conn + gvisorOutbound chan<- *Packet + + outbound chan<- *Packet + errChan chan error +} + +func handleGvisorPacket(gvisorInbound <-chan *Packet, outbound chan<- *Packet) *gvisorLocalTCPHandler { + return &gvisorLocalTCPHandler{ + gvisorInbound: gvisorInbound, + outbound: outbound, + errChan: make(chan error, 1), + } +} + +func (h *gvisorLocalTCPHandler) Run(ctx context.Context) { + endpoint := channel.New(tcp.DefaultReceiveBufferSize, uint32(config.DefaultMTU), tcpip.GetRandMacAddr()) + go func() { + defer util.HandleCrash() + readFromGvisorInboundWriteToEndpoint(ctx, h.gvisorInbound, endpoint) + util.SafeClose(h.errChan) + }() + go func() { + defer util.HandleCrash() + readFromEndpointWriteToTun(ctx, endpoint, h.outbound) + util.SafeClose(h.errChan) + }() + stack := NewLocalStack(ctx, sniffer.NewWithPrefix(endpoint, "[gVISOR] ")) + defer stack.Destroy() + select { + case <-h.errChan: + return + case <-ctx.Done(): + return + } +} diff --git a/pkg/core/gvisorlocaltunendpoint.go b/pkg/core/gvisorlocaltunendpoint.go new file mode 100755 index 00000000..55a3b04d --- /dev/null +++ b/pkg/core/gvisorlocaltunendpoint.go @@ -0,0 +1,93 @@ +package core + +import ( + "context" + "net" + + "github.com/google/gopacket/layers" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" + "gvisor.dev/gvisor/pkg/buffer" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/header" + "gvisor.dev/gvisor/pkg/tcpip/link/channel" + "gvisor.dev/gvisor/pkg/tcpip/link/sniffer" + "gvisor.dev/gvisor/pkg/tcpip/stack" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" + "github.com/wencaiwulue/kubevpn/v2/pkg/util" +) + +func readFromEndpointWriteToTun(ctx context.Context, endpoint *channel.Endpoint, out chan<- *Packet) { + for ctx.Err() == nil { + pktBuffer := endpoint.ReadContext(ctx) + if pktBuffer != nil { + sniffer.LogPacket("[gVISOR] ", sniffer.DirectionSend, pktBuffer.NetworkProtocolNumber, pktBuffer) + data := pktBuffer.ToView().AsSlice() + buf := config.LPool.Get().([]byte)[:] + n := copy(buf[1:], data) + buf[0] = 0 + out <- NewPacket(buf[:], n+1, nil, nil) + } + } +} + +func readFromGvisorInboundWriteToEndpoint(ctx context.Context, in <-chan *Packet, endpoint *channel.Endpoint) { + for ctx.Err() == nil { + var packet *Packet + select { + case packet = <-in: + if packet == nil { + return + } + case <-ctx.Done(): + return + } + + // Try to determine network protocol number, default zero. + var protocol tcpip.NetworkProtocolNumber + var ipProtocol int + var src, dst net.IP + // TUN interface with IFF_NO_PI enabled, thus + // we need to determine protocol from version field + if util.IsIPv4(packet.data[1:packet.length]) { + protocol = header.IPv4ProtocolNumber + ipHeader, err := ipv4.ParseHeader(packet.data[1:packet.length]) + if err != nil { + plog.G(ctx).Errorf("Failed to parse IPv4 header: %v", err) + config.LPool.Put(packet.data[:]) + continue + } + ipProtocol = ipHeader.Protocol + src = ipHeader.Src + dst = ipHeader.Dst + } else if util.IsIPv6(packet.data[1:packet.length]) { + protocol = header.IPv6ProtocolNumber + ipHeader, err := ipv6.ParseHeader(packet.data[1:packet.length]) + if err != nil { + plog.G(ctx).Errorf("[TCP-GVISOR] Failed to parse IPv6 header: %s", err.Error()) + config.LPool.Put(packet.data[:]) + continue + } + ipProtocol = ipHeader.NextHeader + src = ipHeader.Src + dst = ipHeader.Dst + } else { + plog.G(ctx).Errorf("[TCP-GVISOR] Unknown packet") + config.LPool.Put(packet.data[:]) + continue + } + + ipProto := layers.IPProtocol(ipProtocol) + pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ + ReserveHeaderBytes: 0, + Payload: buffer.MakeWithData(packet.data[1:packet.length]), + }) + config.LPool.Put(packet.data[:]) + sniffer.LogPacket("[gVISOR] ", sniffer.DirectionRecv, protocol, pkt) + endpoint.InjectInbound(protocol, pkt) + pkt.DecRef() + plog.G(ctx).Debugf("[TCP-GVISOR] Write to Gvisor. SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, ipProto.String(), packet.length) + } +} diff --git a/pkg/core/gvisorlocaludpforwarder.go b/pkg/core/gvisorlocaludpforwarder.go new file mode 100644 index 00000000..62556445 --- /dev/null +++ b/pkg/core/gvisorlocaludpforwarder.go @@ -0,0 +1,127 @@ +package core + +import ( + "context" + "io" + "net" + "time" + + "github.com/pkg/errors" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" + "gvisor.dev/gvisor/pkg/tcpip/stack" + "gvisor.dev/gvisor/pkg/tcpip/transport/udp" + "gvisor.dev/gvisor/pkg/waiter" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" + plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" + "github.com/wencaiwulue/kubevpn/v2/pkg/util" +) + +func LocalUDPForwarder(ctx context.Context, s *stack.Stack) func(id stack.TransportEndpointID, pkt *stack.PacketBuffer) bool { + return udp.NewForwarder(s, func(request *udp.ForwarderRequest) { + id := request.ID() + plog.G(ctx).Infof("[TUN-UDP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", + id.LocalPort, id.LocalAddress.String(), id.RemotePort, id.RemoteAddress.String(), + ) + src := &net.UDPAddr{ + IP: id.RemoteAddress.AsSlice(), + Port: int(id.RemotePort), + } + var ip net.IP + if id.LocalAddress.To4() != (tcpip.Address{}) { + ip = net.ParseIP("127.0.0.1") + } else { + ip = net.IPv6loopback + } + dst := &net.UDPAddr{ + IP: ip, + Port: int(id.LocalPort), + } + + w := &waiter.Queue{} + endpoint, tErr := request.CreateEndpoint(w) + if tErr != nil { + plog.G(ctx).Errorf("[TUN-UDP] Failed to create endpoint to dst: %s: %v", dst.String(), tErr) + return + } + + // dial dst + remote, err1 := net.DialUDP("udp", nil, dst) + if err1 != nil { + plog.G(ctx).Errorf("[TUN-UDP] Failed to connect dst: %s: %v", dst.String(), err1) + return + } + + conn := gonet.NewUDPConn(w, endpoint) + go func() { + defer conn.Close() + defer remote.Close() + errChan := make(chan error, 2) + go func() { + defer util.HandleCrash() + buf := config.LPool.Get().([]byte)[:] + defer config.LPool.Put(buf[:]) + + var written int + var err error + for { + err = conn.SetReadDeadline(time.Now().Add(time.Second * 120)) + if err != nil { + break + } + var read int + read, _, err = conn.ReadFrom(buf[:]) + if err != nil { + break + } + written += read + err = remote.SetWriteDeadline(time.Now().Add(time.Second * 120)) + if err != nil { + break + } + _, err = remote.Write(buf[:read]) + if err != nil { + break + } + } + plog.G(ctx).Infof("[TUN-UDP] Write length %d data from src: %s -> dst: %s", written, src, dst) + errChan <- err + }() + go func() { + defer util.HandleCrash() + buf := config.LPool.Get().([]byte)[:] + defer config.LPool.Put(buf[:]) + + var err error + var written int + for { + err = remote.SetReadDeadline(time.Now().Add(time.Second * 120)) + if err != nil { + break + } + var n int + n, _, err = remote.ReadFromUDP(buf[:]) + if err != nil { + break + } + written += n + err = conn.SetWriteDeadline(time.Now().Add(time.Second * 120)) + if err != nil { + break + } + _, err = conn.Write(buf[:n]) + if err != nil { + break + } + } + plog.G(ctx).Infof("[TUN-UDP] Read length %d data from dst: %s -> src: %s", written, dst, src) + errChan <- err + }() + err1 = <-errChan + if err1 != nil && !errors.Is(err1, io.EOF) { + plog.G(ctx).Errorf("[TUN-UDP] Disconnect: %s >-<: %s: %v", conn.LocalAddr(), remote.RemoteAddr(), err1) + } + }() + }).HandlePacket +} diff --git a/pkg/core/gvisortcpforwarder.go b/pkg/core/gvisortcpforwarder.go index 364b9d44..cded9e51 100644 --- a/pkg/core/gvisortcpforwarder.go +++ b/pkg/core/gvisortcpforwarder.go @@ -19,32 +19,38 @@ import ( func TCPForwarder(ctx context.Context, s *stack.Stack) func(stack.TransportEndpointID, *stack.PacketBuffer) bool { return tcp.NewForwarder(s, 0, 100000, func(request *tcp.ForwarderRequest) { - defer request.Complete(false) id := request.ID() plog.G(ctx).Infof("[TUN-TCP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", id.LocalPort, id.LocalAddress.String(), id.RemotePort, id.RemoteAddress.String(), ) - + w := &waiter.Queue{} + endpoint, tErr := request.CreateEndpoint(w) + if tErr != nil { + plog.G(ctx).Errorf("[TUN-TCP] Failed to create endpoint: %v", tErr) + request.Complete(true) + return + } + conn := gonet.NewTCPConn(w, endpoint) + defer conn.Close() + var err error + defer func() { + if err != nil && !errors.Is(err, io.EOF) { + request.Complete(true) + } else { + request.Complete(false) + } + }() // 2, dial proxy host := id.LocalAddress.String() port := fmt.Sprintf("%d", id.LocalPort) var remote net.Conn var d = net.Dialer{Timeout: time.Second * 5} - remote, err := d.DialContext(ctx, "tcp", net.JoinHostPort(host, port)) + remote, err = d.DialContext(ctx, "tcp", net.JoinHostPort(host, port)) if err != nil { plog.G(ctx).Errorf("[TUN-TCP] Failed to connect addr %s: %v", net.JoinHostPort(host, port), err) return } - w := &waiter.Queue{} - endpoint, tErr := request.CreateEndpoint(w) - if tErr != nil { - plog.G(ctx).Errorf("[TUN-TCP] Failed to create endpoint: %v", tErr) - return - } - conn := gonet.NewTCPConn(w, endpoint) - - defer conn.Close() defer remote.Close() errChan := make(chan error, 2) go func() { diff --git a/pkg/core/gvisortcphandler.go b/pkg/core/gvisortcphandler.go index 52b24fd3..08906fb4 100644 --- a/pkg/core/gvisortcphandler.go +++ b/pkg/core/gvisortcphandler.go @@ -20,13 +20,11 @@ import ( type gvisorTCPHandler struct { // map[srcIP]net.Conn routeMapTCP *sync.Map - packetChan chan *Packet } func GvisorTCPHandler() Handler { return &gvisorTCPHandler{ routeMapTCP: RouteMapTCP, - packetChan: TCPPacketChan, } } diff --git a/pkg/core/gvisortunendpoint.go b/pkg/core/gvisortunendpoint.go index 2e49db52..6e0092ff 100755 --- a/pkg/core/gvisortunendpoint.go +++ b/pkg/core/gvisortunendpoint.go @@ -25,8 +25,12 @@ func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, c pktBuffer := endpoint.ReadContext(ctx) if pktBuffer != nil { sniffer.LogPacket("[gVISOR] ", sniffer.DirectionSend, pktBuffer.NetworkProtocolNumber, pktBuffer) - buf := pktBuffer.ToView().AsSlice() - _, err := tcpConn.Write(buf) + data := pktBuffer.ToView().AsSlice() + buf := config.LPool.Get().([]byte)[:] + n := copy(buf[1:], data) + buf[0] = 0 + _, err := tcpConn.Write(buf[:n+1]) + config.LPool.Put(buf[:]) if err != nil { plog.G(ctx).Errorf("[TUN-GVISOR] Failed to write data to tun device: %v", err) } @@ -58,9 +62,9 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c var src, dst net.IP // TUN interface with IFF_NO_PI enabled, thus // we need to determine protocol from version field - if util.IsIPv4(buf) { + if util.IsIPv4(buf[1:read]) { protocol = header.IPv4ProtocolNumber - ipHeader, err := ipv4.ParseHeader(buf[:read]) + ipHeader, err := ipv4.ParseHeader(buf[1:read]) if err != nil { plog.G(ctx).Errorf("Failed to parse IPv4 header: %v", err) config.LPool.Put(buf[:]) @@ -69,9 +73,9 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c ipProtocol = ipHeader.Protocol src = ipHeader.Src dst = ipHeader.Dst - } else if util.IsIPv6(buf) { + } else if util.IsIPv6(buf[1:read]) { protocol = header.IPv6ProtocolNumber - ipHeader, err := ipv6.ParseHeader(buf[:read]) + ipHeader, err := ipv6.ParseHeader(buf[1:read]) if err != nil { plog.G(ctx).Errorf("[TCP-GVISOR] Failed to parse IPv6 header: %s", err.Error()) config.LPool.Put(buf[:]) @@ -91,50 +95,28 @@ func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, c // for issue 594, sometimes k8s service network CIDR also use CIDR 198.19.151.170 // if we can find dst in route map, just trade packet as inner communicate // if not find dst in route map, just trade packet as k8s service/pod ip - _, found := h.routeMapTCP.Load(dst.String()) - if found && (config.CIDR.Contains(dst) || config.CIDR6.Contains(dst)) { - err = h.handlePacket(ctx, buf, read, src, dst, layers.IPProtocol(ipProtocol).String()) + if c, found := h.routeMapTCP.Load(dst.String()); found { + plog.G(ctx).Debugf("[TCP-GVISOR] Find TCP route SRC: %s to DST: %s -> %s", src, dst, c.(net.Conn).RemoteAddr()) + dgram := newDatagramPacket(buf, read) + err = dgram.Write(c.(net.Conn)) + config.LPool.Put(buf[:]) if err != nil { - plog.G(ctx).Errorf("[TCP-GVISOR] Failed to handle packet: %v", err) + plog.G(ctx).Errorf("[TCP-GVISOR] Failed to write to %s <- %s : %s", c.(net.Conn).RemoteAddr(), c.(net.Conn).LocalAddr(), err) } - continue + } else { + pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ + ReserveHeaderBytes: 0, + Payload: buffer.MakeWithData(buf[1:read]), + }) + config.LPool.Put(buf[:]) + sniffer.LogPacket("[gVISOR] ", sniffer.DirectionRecv, protocol, pkt) + endpoint.InjectInbound(protocol, pkt) + pkt.DecRef() + plog.G(ctx).Debugf("[TCP-GVISOR] Write to Gvisor. SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read) } - - pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ - ReserveHeaderBytes: 0, - Payload: buffer.MakeWithData(buf[:read]), - }) - config.LPool.Put(buf[:]) - sniffer.LogPacket("[gVISOR] ", sniffer.DirectionRecv, protocol, pkt) - endpoint.InjectInbound(protocol, pkt) - pkt.DecRef() - plog.G(ctx).Debugf("[TCP-GVISOR] Write to Gvisor. SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(ipProtocol).String(), read) } } -func (h *gvisorTCPHandler) handlePacket(ctx context.Context, buf []byte, length int, src, dst net.IP, protocol string) error { - if conn, ok := h.routeMapTCP.Load(dst.String()); ok { - plog.G(ctx).Debugf("[TCP-GVISOR] Find TCP route SRC: %s to DST: %s -> %s", src, dst, conn.(net.Conn).RemoteAddr()) - dgram := newDatagramPacket(buf, length) - err := dgram.Write(conn.(net.Conn)) - config.LPool.Put(buf[:]) - if err != nil { - plog.G(ctx).Errorf("[TCP-GVISOR] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) - return err - } - } else if config.RouterIP.Equal(dst) || config.RouterIP6.Equal(dst) { - plog.G(ctx).Debugf("[TCP-GVISOR] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, protocol, length) - util.SafeWrite(h.packetChan, NewPacket(buf[:], length, src, dst), func(v *Packet) { - config.LPool.Put(v.data[:]) - plog.G(context.Background()).Errorf("[TCP-GVISOR] Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, protocol, v.length) - }) - } else { - plog.G(ctx).Warnf("[TCP-GVISOR] No route for src: %s -> dst: %s, drop it", src, dst) - config.LPool.Put(buf[:]) - } - return nil -} - func (h *gvisorTCPHandler) addToRouteMapTCP(ctx context.Context, src net.IP, tcpConn net.Conn) { value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn) if loaded { diff --git a/pkg/core/packetconn.go b/pkg/core/packetconnovertcp.go similarity index 100% rename from pkg/core/packetconn.go rename to pkg/core/packetconnovertcp.go diff --git a/pkg/core/route.go b/pkg/core/route.go index 19aec976..1692847d 100644 --- a/pkg/core/route.go +++ b/pkg/core/route.go @@ -21,10 +21,6 @@ var ( TCPPacketChan = make(chan *Packet, MaxSize) ) -type TCPUDPacket struct { - data *DatagramPacket -} - // Route example: // -l "tcp://:10800" -l "tun://:8422?net=198.19.0.100/16" // -l "tun:/10.233.24.133:8422?net=198.19.0.102/16&route=198.19.0.0/16" diff --git a/pkg/core/tcphandler.go b/pkg/core/tcphandler.go index 1c5eaf18..e302b983 100644 --- a/pkg/core/tcphandler.go +++ b/pkg/core/tcphandler.go @@ -36,7 +36,7 @@ func (c *UDPOverTCPConnector) ConnectContext(ctx context.Context, conn net.Conn) return nil, err } } - return newUDPConnOverTCP(ctx, conn) + return NewUDPConnOverTCP(ctx, conn) } type UDPOverTCPHandler struct { @@ -76,7 +76,7 @@ func (h *UDPOverTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) { } func (h *UDPOverTCPHandler) handlePacket(ctx context.Context, tcpConn net.Conn, datagram *DatagramPacket) error { - src, dst, protocol, err := util.ParseIP(datagram.Data[:datagram.DataLength]) + src, dst, protocol, err := util.ParseIP(datagram.Data[1:datagram.DataLength]) if err != nil { plog.G(ctx).Errorf("[TCP] Unknown packet") config.LPool.Put(datagram.Data[:]) @@ -93,9 +93,6 @@ func (h *UDPOverTCPHandler) handlePacket(ctx context.Context, tcpConn net.Conn, plog.G(ctx).Errorf("[TCP] Failed to write to %s <- %s : %s", conn.(net.Conn).RemoteAddr(), conn.(net.Conn).LocalAddr(), err) return err } - } else if (config.CIDR.Contains(dst) || config.CIDR6.Contains(dst)) && (!config.RouterIP.Equal(dst) && !config.RouterIP6.Equal(dst)) { - plog.G(ctx).Warnf("[TCP] No route for src: %s -> dst: %s, drop it", src, dst) - config.LPool.Put(datagram.Data[:]) } else { plog.G(ctx).Debugf("[TCP] Forward to TUN device, SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), datagram.DataLength) util.SafeWrite(h.packetChan, NewPacket(datagram.Data, int(datagram.DataLength), src, dst), func(v *Packet) { @@ -127,51 +124,3 @@ func (h *UDPOverTCPHandler) removeFromRouteMapTCP(ctx context.Context, tcpConn n return true }) } - -var _ net.Conn = (*UDPConnOverTCP)(nil) - -// UDPConnOverTCP fake udp connection over tcp connection -type UDPConnOverTCP struct { - // tcp connection - net.Conn - ctx context.Context -} - -func newUDPConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) { - return &UDPConnOverTCP{ctx: ctx, Conn: conn}, nil -} - -func (c *UDPConnOverTCP) Read(b []byte) (int, error) { - select { - case <-c.ctx.Done(): - return 0, c.ctx.Err() - default: - datagram, err := readDatagramPacket(c.Conn, b) - if err != nil { - return 0, err - } - return int(datagram.DataLength), nil - } -} - -func (c *UDPConnOverTCP) Write(b []byte) (int, error) { - buf := config.LPool.Get().([]byte)[:] - n := copy(buf, b) - defer config.LPool.Put(buf) - - packet := newDatagramPacket(buf, n) - if err := packet.Write(c.Conn); err != nil { - return 0, err - } - return len(b), nil -} - -func (c *UDPConnOverTCP) Close() error { - if cc, ok := c.Conn.(interface{ CloseRead() error }); ok { - _ = cc.CloseRead() - } - if cc, ok := c.Conn.(interface{ CloseWrite() error }); ok { - _ = cc.CloseWrite() - } - return c.Conn.Close() -} diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index e5770477..ba5eab43 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -19,7 +19,6 @@ const ( type tunHandler struct { forward *Forwarder node *Node - routeMapUDP *sync.Map routeMapTCP *sync.Map errChan chan error } @@ -29,7 +28,6 @@ func TunHandler(forward *Forwarder, node *Node) Handler { return &tunHandler{ forward: forward, node: node, - routeMapUDP: &sync.Map{}, routeMapTCP: RouteMapTCP, errChan: make(chan error, 1), } @@ -52,9 +50,9 @@ func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) { } defer device.Close() - go device.readFromTUN(ctx) - go device.writeToTUN(ctx) - go device.handlePacket(ctx, h.node.Addr, h.routeMapUDP, h.routeMapTCP) + go device.readFromTun(ctx) + go device.writeToTun(ctx) + go device.handlePacket(ctx, h.routeMapTCP) select { case err := <-device.errChan: @@ -74,7 +72,7 @@ type Device struct { errChan chan error } -func (d *Device) readFromTUN(ctx context.Context) { +func (d *Device) readFromTun(ctx context.Context) { defer util.HandleCrash() for { buf := config.LPool.Get().([]byte)[:] @@ -101,10 +99,10 @@ func (d *Device) readFromTUN(ctx context.Context) { } } -func (d *Device) writeToTUN(ctx context.Context) { +func (d *Device) writeToTun(ctx context.Context) { defer util.HandleCrash() for packet := range d.tunOutbound { - _, err := d.tun.Write(packet.data[:packet.length]) + _, err := d.tun.Write(packet.data[1:packet.length]) config.LPool.Put(packet.data[:]) if err != nil { plog.G(ctx).Errorf("[TUN] Failed to write to tun device: %v", err) @@ -121,29 +119,19 @@ func (d *Device) Close() { util.SafeClose(TCPPacketChan) } -func (d *Device) handlePacket(ctx context.Context, addr string, routeMapUDP *sync.Map, routeMapTCP *sync.Map) { - packetConn, err := (&net.ListenConfig{}).ListenPacket(ctx, "udp", addr) - if err != nil { - util.SafeWrite(d.errChan, err) - plog.G(ctx).Errorf("[TUN] Failed to listen %s: %v", addr, err) - return - } - +func (d *Device) handlePacket(ctx context.Context, routeMapTCP *sync.Map) { p := &Peer{ - conn: packetConn, tunInbound: d.tunInbound, tunOutbound: d.tunOutbound, - routeMapUDP: routeMapUDP, routeMapTCP: routeMapTCP, errChan: make(chan error, 1), } - go p.readFromConn(ctx) - go p.routeTUN(ctx) + go p.routeTun(ctx) go p.routeTCPToTun(ctx) select { - case err = <-p.errChan: + case err := <-p.errChan: plog.G(ctx).Errorf("[TUN] %s: %v", d.tun.LocalAddr(), err) util.SafeWrite(d.errChan, err) return @@ -177,13 +165,9 @@ func (d *Packet) Length() int { } type Peer struct { - conn net.PacketConn - tunInbound chan *Packet tunOutbound chan<- *Packet - // map[srcIP.String()]net.Addr for udp - routeMapUDP *sync.Map // map[srcIP.String()]net.Conn for tcp routeMapTCP *sync.Map @@ -197,41 +181,6 @@ func (p *Peer) sendErr(err error) { } } -func (p *Peer) readFromConn(ctx context.Context) { - defer util.HandleCrash() - for ctx.Err() == nil { - buf := config.LPool.Get().([]byte)[:] - n, from, err := p.conn.ReadFrom(buf[:]) - if err != nil { - config.LPool.Put(buf[:]) - p.sendErr(err) - return - } - - src, dst, protocol, err := util.ParseIP(buf[:n]) - if err != nil { - config.LPool.Put(buf[:]) - plog.G(ctx).Errorf("[TUN] Unknown packet: %v", err) - p.sendErr(err) - return - } - p.addToRouteMapUDP(ctx, src, from) - plog.G(context.Background()).Errorf("[TUN] SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), n) - p.tunInbound <- NewPacket(buf[:], n, src, dst) - } -} - -func (p *Peer) addToRouteMapUDP(ctx context.Context, src net.IP, from net.Addr) { - if addr, loaded := p.routeMapUDP.LoadOrStore(src.String(), from); loaded { - if addr.(net.Addr).String() != from.String() { - p.routeMapUDP.Store(src.String(), from) - plog.G(ctx).Infof("[TUN] Replace route map UDP: %s -> %s", src, from) - } - } else { - plog.G(ctx).Infof("[TUN] Add new route map UDP: %s -> %s", src, from) - } -} - func (p *Peer) routeTCPToTun(ctx context.Context) { defer util.HandleCrash() for packet := range TCPPacketChan { @@ -239,21 +188,14 @@ func (p *Peer) routeTCPToTun(ctx context.Context) { } } -func (p *Peer) routeTUN(ctx context.Context) { +func (p *Peer) routeTun(ctx context.Context) { defer util.HandleCrash() for packet := range p.tunInbound { - if addr, ok := p.routeMapUDP.Load(packet.dst.String()); ok { - plog.G(ctx).Debugf("[TUN] Find UDP route to DST: %s -> %s, SRC: %s, DST: %s", packet.dst, addr, packet.src, packet.dst) - _, err := p.conn.WriteTo(packet.data[:packet.length], addr.(net.Addr)) - config.LPool.Put(packet.data[:]) - if err != nil { - plog.G(ctx).Errorf("[TUN] Failed wirte to route dst: %s -> %s", packet.dst, addr) - p.sendErr(err) - return - } - } else if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok { + if conn, ok := p.routeMapTCP.Load(packet.dst.String()); ok { plog.G(ctx).Debugf("[TUN] Find TCP route to dst: %s -> %s", packet.dst.String(), conn.(net.Conn).RemoteAddr()) - dgram := newDatagramPacket(packet.data, packet.length) + copy(packet.data[1:packet.length+1], packet.data[:packet.length]) + packet.data[0] = 0 + dgram := newDatagramPacket(packet.data, packet.length+1) err := dgram.Write(conn.(net.Conn)) config.LPool.Put(packet.data[:]) if err != nil { diff --git a/pkg/core/tunhandlerclient.go b/pkg/core/tunhandlerclient.go index fcdf91f4..6fc2c949 100644 --- a/pkg/core/tunhandlerclient.go +++ b/pkg/core/tunhandlerclient.go @@ -50,9 +50,15 @@ func (d *ClientDevice) handlePacket(ctx context.Context, forward *Forwarder) { return } defer conn.Close() - err = handlePacketClient(ctx, d.tunInbound, d.tunOutbound, conn) - if err != nil { + + errChan := make(chan error, 2) + go readFromConn(ctx, conn, d.tunInbound, d.tunOutbound, errChan) + go writeToConn(ctx, conn, d.tunInbound, errChan) + + select { + case err = <-errChan: plog.G(ctx).Errorf("Failed to transport data to remote %s: %v", conn.RemoteAddr(), err) + case <-ctx.Done(): return } }() @@ -67,104 +73,107 @@ func forwardConn(ctx context.Context, forwarder *Forwarder) (net.Conn, error) { return conn, nil } -func handlePacketClient(ctx context.Context, tunInbound <-chan *Packet, tunOutbound chan<- *Packet, conn net.Conn) error { - errChan := make(chan error, 2) - - go func() { - defer util.HandleCrash() - for packet := range tunInbound { - err := conn.SetWriteDeadline(time.Now().Add(config.KeepAliveTime)) - if err != nil { - plog.G(ctx).Errorf("Failed to set write deadline: %v", err) - util.SafeWrite(errChan, errors.Wrap(err, "failed to set write deadline")) - return - } - _, err = conn.Write(packet.data[:packet.length]) - config.LPool.Put(packet.data[:]) - if err != nil { - plog.G(ctx).Errorf("Failed to write packet to remote: %v", err) - util.SafeWrite(errChan, errors.Wrap(err, "failed to write packet to remote")) - return - } +func readFromConn(ctx context.Context, conn net.Conn, tunInbound chan *Packet, tunOutbound chan *Packet, errChan chan error) { + defer util.HandleCrash() + var gvisorInbound = make(chan *Packet, MaxSize) + go handleGvisorPacket(gvisorInbound, tunInbound).Run(ctx) + for { + buf := config.LPool.Get().([]byte)[:] + err := conn.SetReadDeadline(time.Now().Add(config.KeepAliveTime)) + if err != nil { + config.LPool.Put(buf[:]) + plog.G(ctx).Errorf("Failed to set read deadline: %v", err) + util.SafeWrite(errChan, errors.Wrap(err, "failed to set read deadline")) + return } - }() - - go func() { - defer util.HandleCrash() - for { - buf := config.LPool.Get().([]byte)[:] - err := conn.SetReadDeadline(time.Now().Add(config.KeepAliveTime)) - if err != nil { - plog.G(ctx).Errorf("Failed to set read deadline: %v", err) - util.SafeWrite(errChan, errors.Wrap(err, "failed to set read deadline")) - return - } - n, err := conn.Read(buf[:]) - if err != nil { - config.LPool.Put(buf[:]) - plog.G(ctx).Errorf("Failed to read packet from remote: %v", err) - util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to read packet from remote %s", conn.RemoteAddr()))) - return - } - if n == 0 { - plog.G(ctx).Warnf("Packet length 0") - config.LPool.Put(buf[:]) - continue - } + n, err := conn.Read(buf[:]) + if err != nil { + config.LPool.Put(buf[:]) + plog.G(ctx).Errorf("Failed to read packet from remote: %v", err) + util.SafeWrite(errChan, errors.Wrap(err, fmt.Sprintf("failed to read packet from remote %s", conn.RemoteAddr()))) + return + } + if n < 1 { + plog.G(ctx).Warnf("Packet length 0") + config.LPool.Put(buf[:]) + continue + } + if buf[0] == 1 { + util.SafeWrite(gvisorInbound, NewPacket(buf[:], n, nil, nil), func(v *Packet) { + config.LPool.Put(v.data[:]) + plog.G(context.Background()).Errorf("Drop packet, LocalAddr: %s, Remote: %s, Length: %d", conn.LocalAddr(), conn.RemoteAddr(), v.length) + }) + } else { util.SafeWrite(tunOutbound, NewPacket(buf[:], n, nil, nil), func(v *Packet) { config.LPool.Put(v.data[:]) plog.G(context.Background()).Errorf("Drop packet, LocalAddr: %s, Remote: %s, Length: %d", conn.LocalAddr(), conn.RemoteAddr(), v.length) }) } - }() + } +} - select { - case err := <-errChan: - return err - case <-ctx.Done(): - return nil +func writeToConn(ctx context.Context, conn net.Conn, inbound <-chan *Packet, errChan chan error) { + defer util.HandleCrash() + for packet := range inbound { + err := conn.SetWriteDeadline(time.Now().Add(config.KeepAliveTime)) + if err != nil { + plog.G(ctx).Errorf("Failed to set write deadline: %v", err) + util.SafeWrite(errChan, errors.Wrap(err, "failed to set write deadline")) + return + } + _, err = conn.Write(packet.data[:packet.length]) + config.LPool.Put(packet.data[:]) + if err != nil { + plog.G(ctx).Errorf("Failed to write packet to remote: %v", err) + util.SafeWrite(errChan, errors.Wrap(err, "failed to write packet to remote")) + return + } } } func (d *ClientDevice) readFromTun(ctx context.Context) { defer util.HandleCrash() + var gvisorInbound = make(chan *Packet, MaxSize) + go handleGvisorPacket(gvisorInbound, d.tunOutbound).Run(ctx) for { buf := config.LPool.Get().([]byte)[:] - n, err := d.tun.Read(buf[:]) + n, err := d.tun.Read(buf[1:]) if err != nil { plog.G(ctx).Errorf("Failed to read packet from tun device: %s", err) util.SafeWrite(d.errChan, err) config.LPool.Put(buf[:]) return } + // local client handle it with gvisor + buf[0] = 1 // Try to determine network protocol number, default zero. var src, dst net.IP var protocol int - src, dst, protocol, err = util.ParseIP(buf[:n]) + src, dst, protocol, err = util.ParseIP(buf[1 : n+1]) if err != nil { plog.G(ctx).Errorf("Unknown packet: %v", err) config.LPool.Put(buf[:]) continue } plog.G(context.Background()).Debugf("SRC: %s, DST: %s, Protocol: %s, Length: %d", src, dst, layers.IPProtocol(protocol).String(), n) - packet := NewPacket(buf[:], n, src, dst) + packet := NewPacket(buf[:], n+1, src, dst) f := func(v *Packet) { config.LPool.Put(v.data[:]) plog.G(context.Background()).Errorf("Drop packet, SRC: %s, DST: %s, Protocol: %s, Length: %d", v.src, v.dst, layers.IPProtocol(protocol).String(), v.length) } if packet.src.Equal(packet.dst) { - util.SafeWrite(d.tunOutbound, packet, f) - continue + util.SafeWrite(gvisorInbound, packet, f) + } else { + util.SafeWrite(d.tunInbound, packet, f) } - util.SafeWrite(d.tunInbound, packet, f) } } func (d *ClientDevice) writeToTun(ctx context.Context) { defer util.HandleCrash() for packet := range d.tunOutbound { - _, err := d.tun.Write(packet.data[:packet.length]) + _, err := d.tun.Write(packet.data[1:packet.length]) config.LPool.Put(packet.data[:]) if err != nil { plog.G(ctx).Errorf("Failed to write packet to tun device: %v", err) diff --git a/pkg/core/udpovertcp.go b/pkg/core/udpovertcp.go index 89e02ac2..cd2d65b7 100644 --- a/pkg/core/udpovertcp.go +++ b/pkg/core/udpovertcp.go @@ -1,10 +1,62 @@ package core import ( + "context" "encoding/binary" "io" + "net" + + "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) +func NewUDPConnOverTCP(ctx context.Context, conn net.Conn) (net.Conn, error) { + return &UDPConnOverTCP{ctx: ctx, Conn: conn}, nil +} + +var _ net.Conn = (*UDPConnOverTCP)(nil) + +// UDPConnOverTCP fake udp connection over tcp connection +type UDPConnOverTCP struct { + // tcp connection + net.Conn + ctx context.Context +} + +func (c *UDPConnOverTCP) Read(b []byte) (int, error) { + select { + case <-c.ctx.Done(): + return 0, c.ctx.Err() + default: + datagram, err := readDatagramPacket(c.Conn, b) + if err != nil { + return 0, err + } + return int(datagram.DataLength), nil + } +} + +func (c *UDPConnOverTCP) Write(b []byte) (int, error) { + buf := config.LPool.Get().([]byte)[:] + n := copy(buf, b) + defer config.LPool.Put(buf) + + packet := newDatagramPacket(buf, n) + if err := packet.Write(c.Conn); err != nil { + return 0, err + } + return len(b), nil +} + +func (c *UDPConnOverTCP) Close() error { + if cc, ok := c.Conn.(interface{ CloseRead() error }); ok { + _ = cc.CloseRead() + } + if cc, ok := c.Conn.(interface{ CloseWrite() error }); ok { + _ = cc.CloseWrite() + } + return c.Conn.Close() +} + type DatagramPacket struct { DataLength uint16 // [2]byte Data []byte // []byte diff --git a/pkg/daemon/action/quit.go b/pkg/daemon/action/quit.go index d2d33909..6a1d03e2 100644 --- a/pkg/daemon/action/quit.go +++ b/pkg/daemon/action/quit.go @@ -44,9 +44,6 @@ func (svr *Server) Quit(resp rpc.Daemon_QuitServer) error { if svr.IsSudo { _ = dns.CleanupHosts() _ = os.RemoveAll("/etc/resolver") - if util.FindAllowFirewallRule(context.Background()) { - util.DeleteAllowFirewallRule(context.Background()) - } } // last step is to quit GRPC server diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index a8d8d422..ede20b17 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" "gopkg.in/natefinch/lumberjack.v2" + glog "gvisor.dev/gvisor/pkg/log" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -61,6 +62,7 @@ func (o *SvrOption) Start(ctx context.Context) error { klog.SetOutput(l) klog.LogToStderr(false) plog.L.SetOutput(l) + glog.SetTarget(plog.ServerEmitter{Writer: &glog.Writer{Next: l}}) rest.SetDefaultWarningHandler(rest.NoWarnings{}) // every day 00:00:00 rotate log go rotateLog(l) diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index aa51d0f4..5e946c17 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -196,11 +196,11 @@ func (c *ConnectOptions) CreateRemoteInboundPod(ctx context.Context, namespace s // https://kubernetes.io/docs/concepts/workloads/pods/ephemeral-containers/ // means mesh mode if c.Engine == config.EngineGvisor { - err = inject.InjectEnvoySidecar(ctx, nodeID, c.factory, c.Namespace, object, controller, headers, portMap, image) + err = inject.InjectEnvoyAndSSH(ctx, nodeID, c.factory, c.Namespace, object, controller, headers, portMap, image) } else if len(headers) != 0 || len(portMap) != 0 { - err = inject.InjectVPNAndEnvoySidecar(ctx, nodeID, c.factory, c.Namespace, controller, configInfo, headers, portMap, tlsSecret, image) + err = inject.InjectServiceMesh(ctx, nodeID, c.factory, c.Namespace, controller, configInfo, headers, portMap, tlsSecret, image) } else { - err = inject.InjectVPNSidecar(ctx, nodeID, c.factory, c.Namespace, controller, configInfo, tlsSecret, image) + err = inject.InjectVPN(ctx, nodeID, c.factory, c.Namespace, controller, configInfo, tlsSecret, image) } if err != nil { plog.G(ctx).Errorf("Injecting inbound sidecar for %s in namespace %s failed: %s", workload, namespace, err.Error()) @@ -286,7 +286,6 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error) plog.G(ctx).Errorf("Add route dynamic failed: %v", err) return } - go c.deleteFirewallRule(c.ctx) plog.G(ctx).Infof("Configuring DNS service...") if err = c.setupDNS(c.ctx, svcInformer); err != nil { plog.G(ctx).Errorf("Configure DNS failed: %v", err) @@ -398,14 +397,6 @@ func (c *ConnectOptions) startLocalTunServer(ctx context.Context, forwardAddress var cidrList []*net.IPNet if !lite { cidrList = append(cidrList, config.CIDR, config.CIDR6) - } else { - // windows needs to add tun IP self to route table, but linux and macOS not need - if util.IsWindows() { - cidrList = append(cidrList, - &net.IPNet{IP: c.LocalTunIPv4.IP, Mask: net.CIDRMask(32, 32)}, - &net.IPNet{IP: c.LocalTunIPv6.IP, Mask: net.CIDRMask(128, 128)}, - ) - } } for _, ipNet := range c.cidrs { cidrList = append(cidrList, ipNet) @@ -646,24 +637,6 @@ func (c *ConnectOptions) addRoute(ipStrList ...string) error { return err } -func (c *ConnectOptions) deleteFirewallRule(ctx context.Context) { - if !util.IsWindows() { - return - } - // The reason why delete firewall rule is: - // On windows ping local tun IPv4/v6 not works - // so needs to add firewall rule to allow this - if !util.FindAllowFirewallRule(ctx) { - util.AddAllowFirewallRule(ctx) - } - - // The reason why delete firewall rule is: - // On windows use 'kubevpn proxy deploy/authors -H user=windows' - // Open terminal 'curl localhost:9080' ok - // Open terminal 'curl localTunIP:9080' not ok - util.DeleteBlockFirewallRule(ctx) -} - func (c *ConnectOptions) setupDNS(ctx context.Context, svcInformer cache.SharedIndexInformer) error { const portTCP = 10800 podList, err := c.GetRunningPodList(ctx) diff --git a/pkg/handler/function_test.go b/pkg/handler/function_test.go index 1f2a882f..2d09a65d 100644 --- a/pkg/handler/function_test.go +++ b/pkg/handler/function_test.go @@ -859,19 +859,19 @@ func Init(t *testing.T) { t.Fatal(err) } - go startupHttpServer(t, "9080", local) - go startupHttpServer(t, "8080", local8080) + go startupHttpServer(t, "localhost:9080", local) + go startupHttpServer(t, "localhost:8080", local8080) } -func startupHttpServer(t *testing.T, port, str string) { +func startupHttpServer(t *testing.T, addr, str string) { var health = func(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(str)) } mux := http.NewServeMux() mux.HandleFunc("/", health) mux.HandleFunc("/health", health) - t.Logf("Start listening http port %s ...", port) - err := http.ListenAndServe(":"+port, mux) + t.Logf("Start listening http addr %s ...", addr) + err := http.ListenAndServe(addr, mux) if err != nil { t.Fatal(err) } diff --git a/pkg/inject/fargate.go b/pkg/inject/fargate.go index d1f0c293..463dfd71 100644 --- a/pkg/inject/fargate.go +++ b/pkg/inject/fargate.go @@ -25,9 +25,9 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -// InjectEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 +// InjectEnvoyAndSSH patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 // https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio -func InjectEnvoySidecar(ctx context.Context, nodeID string, f cmdutil.Factory, managerNamespace string, current, object *runtimeresource.Info, headers map[string]string, portMap []string, image string) (err error) { +func InjectEnvoyAndSSH(ctx context.Context, nodeID string, f cmdutil.Factory, managerNamespace string, current, object *runtimeresource.Info, headers map[string]string, portMap []string, image string) (err error) { var clientset *kubernetes.Clientset clientset, err = f.KubernetesClientSet() if err != nil { diff --git a/pkg/inject/mesh.go b/pkg/inject/mesh.go index f49a3531..ef24d60d 100644 --- a/pkg/inject/mesh.go +++ b/pkg/inject/mesh.go @@ -32,8 +32,8 @@ import ( // https://istio.io/latest/docs/ops/deployment/requirements/#ports-used-by-istio -// InjectVPNAndEnvoySidecar patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 -func InjectVPNAndEnvoySidecar(ctx context.Context, nodeID string, f cmdutil.Factory, managerNamespace string, object *runtimeresource.Info, c util.PodRouteConfig, headers map[string]string, portMaps []string, secret *v1.Secret, image string) (err error) { +// InjectServiceMesh patch a sidecar, using iptables to do port-forward let this pod decide should go to 233.254.254.100 or request to 127.0.0.1 +func InjectServiceMesh(ctx context.Context, nodeID string, f cmdutil.Factory, managerNamespace string, object *runtimeresource.Info, c util.PodRouteConfig, headers map[string]string, portMaps []string, secret *v1.Secret, image string) (err error) { var clientset *kubernetes.Clientset clientset, err = f.KubernetesClientSet() if err != nil { diff --git a/pkg/inject/proxy.go b/pkg/inject/proxy.go index ee9a848f..25f371af 100644 --- a/pkg/inject/proxy.go +++ b/pkg/inject/proxy.go @@ -24,7 +24,7 @@ import ( util2 "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func InjectVPNSidecar(ctx context.Context, nodeID string, f util.Factory, managerNamespace string, object *resource.Info, c util2.PodRouteConfig, secret *v1.Secret, image string) error { +func InjectVPN(ctx context.Context, nodeID string, f util.Factory, managerNamespace string, object *resource.Info, c util2.PodRouteConfig, secret *v1.Secret, image string) error { u := object.Object.(*unstructured.Unstructured) podTempSpec, path, err := util2.GetPodTemplateSpecPath(u) diff --git a/pkg/util/networkpolicy_other.go b/pkg/util/networkpolicy_other.go deleted file mode 100644 index 85240b4d..00000000 --- a/pkg/util/networkpolicy_other.go +++ /dev/null @@ -1,12 +0,0 @@ -//go:build !windows - -package util - -import ( - "context" -) - -func DeleteBlockFirewallRule(ctx context.Context) {} -func DeleteAllowFirewallRule(ctx context.Context) {} -func FindAllowFirewallRule(ctx context.Context) bool { return false } -func AddAllowFirewallRule(ctx context.Context) {} diff --git a/pkg/util/networkpolicy_windows.go b/pkg/util/networkpolicy_windows.go deleted file mode 100644 index 8ada1342..00000000 --- a/pkg/util/networkpolicy_windows.go +++ /dev/null @@ -1,168 +0,0 @@ -//go:build windows - -package util - -import ( - "context" - "os/exec" - "strings" - "syscall" - "time" - - "golang.org/x/text/encoding/simplifiedchinese" - - "github.com/wencaiwulue/kubevpn/v2/pkg/config" - plog "github.com/wencaiwulue/kubevpn/v2/pkg/log" -) - -/** -When startup an app listen 0.0.0.0 on Windows - -Windows Security Alert -[x] Private networks,such as my home or work network -[ ] Public networks, such as those in airports and coffee shops (not recommended because these networks often have little or no security) - -if not select the second options, Windows add a firewall rule like: - -Get-NetFirewallRule -Direction Inbound -Action Block | Sort-Object -Property Priority - -Name : {9127CE75-0943-4877-B797-1316948CDCA8} -DisplayName : ___go_build_authors.exe -Description : ___go_build_authors.exe -DisplayGroup : -Group : -Enabled : True -Profile : Public -Platform : {} -Direction : Inbound -Action : Block -EdgeTraversalPolicy : Block -LooseSourceMapping : False -LocalOnlyMapping : False -Owner : -PrimaryStatus : OK -Status : The rule was parsed successfully from the store. (65536) -EnforcementStatus : NotApplicable -PolicyStoreSource : PersistentStore -PolicyStoreSourceType : Local -RemoteDynamicKeywordAddresses : -PolicyAppId : - -this makes tunIP can not access local service, so we need to delete this rule -*/ -// DeleteBlockFirewallRule Delete all action block firewall rule -func DeleteBlockFirewallRule(ctx context.Context) { - var deleteFirewallBlockRule = func() { - // PowerShell Remove-NetFirewallRule -Action Block - cmd := exec.CommandContext(ctx, "PowerShell", []string{"Remove-NetFirewallRule", "-Action", "Block"}...) - cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true} - _, _ = cmd.CombinedOutput() - /*if err != nil && out != nil { - s := string(out) - var b []byte - if b, err = decode(out); err == nil { - s = string(b) - } - plog.G(ctx).Debugf("failed to delete firewall rule: %v", s) - }*/ - } - - deleteFirewallBlockRule() - - ticker := time.NewTicker(time.Second * 10) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - deleteFirewallBlockRule() - } - } -} - -func decode(in []byte) ([]byte, error) { - out, err := simplifiedchinese.GB18030.NewDecoder().Bytes(in) - if err == nil { - return out, err - } - out, err = simplifiedchinese.GBK.NewDecoder().Bytes(in) - if err == nil { - return out, err - } - return nil, err -} - -// AddAllowFirewallRule -// for ping local tun device ip, if not add this firewall, can not ping local tun IP on windows -func AddAllowFirewallRule(ctx context.Context) { - // netsh advfirewall firewall add rule name=kubevpn-traffic-manager dir=in action=allow enable=yes remoteip=198.19.0.100/16,2001:2::9999/64,LocalSubnet - cmd := exec.CommandContext(ctx, "netsh", []string{ - "advfirewall", - "firewall", - "add", - "rule", - "name=" + config.ConfigMapPodTrafficManager, - "dir=in", - "action=allow", - "enable=yes", - "remoteip=" + strings.Join([]string{config.CIDR.String(), config.CIDR6.String(), config.DockerCIDR.String(), "LocalSubnet"}, ","), - }...) - cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true} - if out, err := cmd.CombinedOutput(); err != nil { - var s string - var b []byte - if b, err = decode(out); err == nil { - s = string(b) - } else { - s = string(out) - } - plog.G(ctx).Infof("Failed to exec command: %s, output: %s", cmd.Args, s) - } -} - -func DeleteAllowFirewallRule(ctx context.Context) { - // netsh advfirewall firewall delete rule name=kubevpn-traffic-manager - cmd := exec.CommandContext(ctx, "netsh", []string{ - "advfirewall", - "firewall", - "delete", - "rule", - "name=" + config.ConfigMapPodTrafficManager, - }...) - cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true} - if out, err := cmd.CombinedOutput(); err != nil { - var s string - var b []byte - if b, err = decode(out); err == nil { - s = string(b) - } else { - s = string(out) - } - plog.G(ctx).Errorf("Failed to exec command: %s, output: %s", cmd.Args, s) - } -} - -func FindAllowFirewallRule(ctx context.Context) bool { - // netsh advfirewall firewall show rule name=kubevpn-traffic-manager - cmd := exec.CommandContext(ctx, "netsh", []string{ - "advfirewall", - "firewall", - "show", - "rule", - "name=" + config.ConfigMapPodTrafficManager, - }...) - cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true} - out, err := cmd.CombinedOutput() - if err != nil { - s := string(out) - var b []byte - if b, err = decode(out); err == nil { - s = string(b) - } - plog.G(ctx).Debugf("Find firewall %s, output: %s", config.ConfigMapPodTrafficManager, s) - return false - } else { - return true - } -}