diff --git a/go.mod b/go.mod index 76370a6c..8bdf726e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/wencaiwulue/kubevpn -go 1.19 +go 1.20 require ( github.com/cilium/ipam v0.0.0-20220824141044-46ef3d556735 @@ -17,12 +17,12 @@ require ( github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.6.1 - golang.org/x/net v0.8.0 // indirect + golang.org/x/net v0.8.0 golang.org/x/sys v0.6.0 golang.zx2c4.com/wireguard v0.0.0-20220920152132-bb719d3a6e2c golang.zx2c4.com/wireguard/windows v0.5.3 google.golang.org/appengine v1.6.7 // indirect - google.golang.org/grpc v1.52.3 + google.golang.org/grpc v1.53.0-dev.0.20230123225046-4075ef07c5d5 google.golang.org/protobuf v1.30.0 gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/api v0.26.3 @@ -50,12 +50,13 @@ require ( github.com/spf13/pflag v1.0.5 go.uber.org/automaxprocs v1.5.1 golang.org/x/crypto v0.2.0 - golang.org/x/exp v0.0.0-20230113213754-f9f960f08ad4 + golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 golang.org/x/oauth2 v0.6.0 golang.org/x/sync v0.1.0 golang.org/x/text v0.8.0 golang.org/x/time v0.3.0 golang.zx2c4.com/wintun v0.0.0-20211104114900-415007cec224 + gvisor.dev/gvisor v0.0.0-20230603040744-5c9219dedd33 k8s.io/utils v0.0.0-20230313181309-38a27ef9d749 sigs.k8s.io/controller-runtime v0.14.5 sigs.k8s.io/kustomize/api v0.12.1 @@ -63,7 +64,7 @@ require ( ) require ( - cloud.google.com/go/compute v1.14.0 // indirect + cloud.google.com/go/compute v1.15.1 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect github.com/Azure/azure-sdk-for-go v68.0.0+incompatible // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect @@ -191,7 +192,7 @@ require ( go.uber.org/zap v1.24.0 // indirect go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect - golang.org/x/mod v0.8.0 // indirect + golang.org/x/mod v0.11.0 // indirect golang.org/x/term v0.6.0 // indirect golang.org/x/tools v0.6.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect @@ -201,7 +202,6 @@ require ( gopkg.in/DataDog/dd-trace-go.v1 v1.47.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gotest.tools/v3 v3.4.0 // indirect inet.af/netaddr v0.0.0-20220617031823-097006376321 // indirect k8s.io/apiextensions-apiserver v0.26.3 // indirect k8s.io/component-base v0.26.3 // indirect diff --git a/go.sum b/go.sum index b03fb351..eaaa3d5a 100644 --- a/go.sum +++ b/go.sum @@ -14,15 +14,15 @@ cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKV cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= -cloud.google.com/go v0.105.0 h1:DNtEKRBAAzeS4KyIory52wWHuClNaXJ5x1F7xa4q+5Y= +cloud.google.com/go v0.107.0 h1:qkj22L7bgkl6vIeZDlOY2po43Mx/TIa2Wsa7VR+PEww= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= -cloud.google.com/go/compute v1.14.0 h1:hfm2+FfxVmnRlh6LpB7cg1ZNU+5edAHmW679JePztk0= -cloud.google.com/go/compute v1.14.0/go.mod h1:YfLtxrj9sU4Yxv+sXzZkyPjEyPBZfXHUvjxega5vAdo= +cloud.google.com/go/compute v1.15.1 h1:7UGq3QknM33pw5xATlpzeoomNxsacIVvTqTTvbfajmE= +cloud.google.com/go/compute v1.15.1/go.mod h1:bjjoF/NtFUrkD/urWfdHaKuOPDR5nWIs63rR+SXhcpA= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= @@ -1117,8 +1117,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20230113213754-f9f960f08ad4 h1:CNkDRtCj8otM5CFz5jYvbr8ioXX8flVsLfDWEj0M5kk= -golang.org/x/exp v0.0.0-20230113213754-f9f960f08ad4/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 h1:Di6/M8l0O2lCLc6VVRWhgCiApHV8MnQurBnFSHsQtNY= +golang.org/x/exp v0.0.0-20230725093048-515e97ebf090/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -1143,8 +1143,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= +golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1505,8 +1505,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= -google.golang.org/grpc v1.52.3 h1:pf7sOysg4LdgBqduXveGKrcEwbStiK2rtfghdzlUYDQ= -google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY= +google.golang.org/grpc v1.53.0-dev.0.20230123225046-4075ef07c5d5 h1:qq9WB3Dez2tMAKtZTVtZsZSmTkDgPeXx+FRPt5kLEkM= +google.golang.org/grpc v1.53.0-dev.0.20230123225046-4075ef07c5d5/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -1565,11 +1565,13 @@ gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= gotest.tools/v3 v3.4.0 h1:ZazjZUfuVeZGLAmlKKuyv3IKP5orXcwtOwDQH6YVr6o= -gotest.tools/v3 v3.4.0/go.mod h1:CtbdzLSsqVhDgMtKsx03ird5YTGB3ar27v0u/yKBW5g= +gvisor.dev/gvisor v0.0.0-20230603040744-5c9219dedd33 h1:64QentohifmKGeTgJCHilDgfmQVuYE45fsaS9psJ3zY= +gvisor.dev/gvisor v0.0.0-20230603040744-5c9219dedd33/go.mod h1:sQuqOkxbfJq/GS2uSnqHphtXclHyk/ZrAGhZBxxsq6g= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/config/config.go b/pkg/config/config.go index 1c356119..154930eb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -145,3 +145,9 @@ var ( }, } ) + +var SPool = sync.Pool{ + New: func() any { + return make([]byte, 2) + }, +} diff --git a/pkg/core/gvisorstack.go b/pkg/core/gvisorstack.go new file mode 100755 index 00000000..c8df7b64 --- /dev/null +++ b/pkg/core/gvisorstack.go @@ -0,0 +1,108 @@ +package core + +import ( + "context" + "fmt" + + log "github.com/sirupsen/logrus" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/header" + "gvisor.dev/gvisor/pkg/tcpip/link/packetsocket" + "gvisor.dev/gvisor/pkg/tcpip/network/ipv4" + "gvisor.dev/gvisor/pkg/tcpip/network/ipv6" + "gvisor.dev/gvisor/pkg/tcpip/stack" + "gvisor.dev/gvisor/pkg/tcpip/transport/raw" + "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" +) + +var _ stack.UniqueID = (*id)(nil) + +type id struct { +} + +func (i id) UniqueID() uint64 { + return 1 +} + +func NewStack(ctx context.Context, tun stack.LinkEndpoint) *stack.Stack { + s := stack.New(stack.Options{ + NetworkProtocols: []stack.NetworkProtocolFactory{ + ipv4.NewProtocol, + ipv6.NewProtocol, + }, + TransportProtocols: []stack.TransportProtocolFactory{ + tcp.NewProtocol, + }, + Clock: tcpip.NewStdClock(), + AllowPacketEndpointWrite: true, + HandleLocal: false, // if set to true, ping local ip will fail + // Enable raw sockets for users with sufficient + // privileges. + RawFactory: raw.EndpointFactory{}, + UniqueID: id{}, + }) + // set handler for TCP UDP ICMP + s.SetTransportProtocolHandler(tcp.ProtocolNumber, TCPForwarder(s)) + + s.SetRouteTable([]tcpip.Route{ + { + Destination: header.IPv4EmptySubnet, + NIC: 1, + }, + { + Destination: header.IPv6EmptySubnet, + NIC: 1, + }, + }) + + s.CreateNICWithOptions(1, packetsocket.New(tun), stack.NICOptions{ + Disabled: false, + Context: ctx, + }) + s.SetPromiscuousMode(1, true) + s.SetSpoofing(1, true) + + // Enable SACK Recovery. + { + opt := tcpip.TCPSACKEnabled(true) + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, &opt); err != nil { + log.Fatal(fmt.Errorf("SetTransportProtocolOption(%d, &%T(%t)): %s", tcp.ProtocolNumber, opt, opt, err)) + } + } + + // Set default TTLs as required by socket/netstack. + { + opt := tcpip.DefaultTTLOption(64) + if err := s.SetNetworkProtocolOption(ipv4.ProtocolNumber, &opt); err != nil { + log.Fatal(fmt.Errorf("SetNetworkProtocolOption(%d, &%T(%d)): %s", ipv4.ProtocolNumber, opt, opt, err)) + } + if err := s.SetNetworkProtocolOption(ipv6.ProtocolNumber, &opt); err != nil { + log.Fatal(fmt.Errorf("SetNetworkProtocolOption(%d, &%T(%d)): %s", ipv6.ProtocolNumber, opt, opt, err)) + } + } + + // Enable Receive Buffer Auto-Tuning. + { + opt := tcpip.TCPModerateReceiveBufferOption(true) + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, &opt); err != nil { + log.Fatal(fmt.Errorf("SetTransportProtocolOption(%d, &%T(%t)): %s", tcp.ProtocolNumber, opt, opt, err)) + } + } + + { + if err := s.SetForwardingDefaultAndAllNICs(ipv4.ProtocolNumber, true); err != nil { + log.Fatal(fmt.Errorf("set ipv4 forwarding: %s", err)) + } + if err := s.SetForwardingDefaultAndAllNICs(ipv6.ProtocolNumber, true); err != nil { + log.Fatal(fmt.Errorf("set ipv6 forwarding: %s", err)) + } + } + + { + option := tcpip.TCPModerateReceiveBufferOption(true) + if err := s.SetTransportProtocolOption(tcp.ProtocolNumber, &option); err != nil { + log.Fatal(fmt.Errorf("set TCP moderate receive buffer: %s", err)) + } + } + return s +} diff --git a/pkg/core/gvisortcpforwarder.go b/pkg/core/gvisortcpforwarder.go new file mode 100644 index 00000000..41e7d2d7 --- /dev/null +++ b/pkg/core/gvisortcpforwarder.go @@ -0,0 +1,102 @@ +package core + +import ( + "bytes" + "encoding/binary" + "io" + "net" + + log "github.com/sirupsen/logrus" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" + "gvisor.dev/gvisor/pkg/tcpip/stack" + "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" + "gvisor.dev/gvisor/pkg/waiter" + + "github.com/wencaiwulue/kubevpn/pkg/config" +) + +func TCPForwarder(s *stack.Stack) func(stack.TransportEndpointID, *stack.PacketBuffer) bool { + rcvWnd := 1000 << 20 // 1000MB + return tcp.NewForwarder(s, rcvWnd, 100000, func(request *tcp.ForwarderRequest) { + defer request.Complete(false) + id := request.ID() + log.Debugf("[TUN-TCP] Info: LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", + id.LocalPort, id.LocalAddress.String(), id.RemotePort, id.RemoteAddress.String(), + ) + remote, err := net.Dial("tcp", "localhost:10801") + if err != nil { + log.Warningln(err) + return + } + if err = WriteProxyInfo(remote, id); err != nil { + log.Warningln(err) + return + } + + w := &waiter.Queue{} + endpoint, t := request.CreateEndpoint(w) + if t != nil { + log.Warningln(t) + return + } + conn := gonet.NewTCPConn(w, endpoint) + go io.Copy(remote, conn) + io.Copy(conn, remote) + }).HandlePacket +} + +func WriteProxyInfo(conn net.Conn, id stack.TransportEndpointID) error { + var b bytes.Buffer + i := config.SPool.Get().([]byte)[:] + defer config.SPool.Put(i[:]) + binary.BigEndian.PutUint16(i, id.LocalPort) + b.Write(i) + binary.BigEndian.PutUint16(i, id.RemotePort) + b.Write(i) + b.WriteByte(byte(id.LocalAddress.Len())) + b.Write(id.LocalAddress.AsSlice()) + b.WriteByte(byte(id.RemoteAddress.Len())) + b.Write(id.RemoteAddress.AsSlice()) + _, err := b.WriteTo(conn) + return err +} + +// ParseProxyInfo parse proxy info [20]byte +func ParseProxyInfo(conn net.Conn) (id stack.TransportEndpointID, err error) { + var n int + var port = make([]byte, 2) + + // local port + if n, err = io.ReadFull(conn, port); err != nil || n != 2 { + return + } + id.LocalPort = binary.BigEndian.Uint16(port) + + // remote port + if n, err = io.ReadFull(conn, port); err != nil || n != 2 { + return + } + id.RemotePort = binary.BigEndian.Uint16(port) + + // local address + if n, err = io.ReadFull(conn, port[:1]); err != nil || n != 1 { + return + } + var localAddress = make([]byte, port[0]) + if n, err = io.ReadFull(conn, localAddress); err != nil || n != len(localAddress) { + return + } + id.LocalAddress = tcpip.AddrFromSlice(localAddress) + + // remote address + if n, err = io.ReadFull(conn, port[:1]); err != nil || n != 1 { + return + } + var remoteAddress = make([]byte, port[0]) + if n, err = io.ReadFull(conn, remoteAddress); err != nil || n != len(remoteAddress) { + return + } + id.RemoteAddress = tcpip.AddrFromSlice(remoteAddress) + return +} diff --git a/pkg/core/gvisortcphandler.go b/pkg/core/gvisortcphandler.go new file mode 100644 index 00000000..0f078e28 --- /dev/null +++ b/pkg/core/gvisortcphandler.go @@ -0,0 +1,58 @@ +package core + +import ( + "context" + "io" + "net" + "strconv" + "time" + + log "github.com/sirupsen/logrus" +) + +type gvisorTCPHandler struct{} + +func GvisorTCPHandler() Handler { + return &gvisorTCPHandler{} +} + +func (h *gvisorTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) { + defer tcpConn.Close() + log.Debugf("[GvisorTCPServer] %s -> %s\n", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) + func(conn net.Conn) { + defer conn.Close() + // 1, get proxy info + endpointID, err := ParseProxyInfo(conn) + if err != nil { + log.Warning("failed to parse proxy info", "err: ", err) + return + } + log.Debugf("[TUN-TCP] Info: LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", + endpointID.LocalPort, endpointID.LocalAddress.String(), endpointID.RemotePort, endpointID.RemoteAddress.String(), + ) + // 2, dial proxy + s := net.ParseIP(endpointID.LocalAddress.String()).String() + port := strconv.FormatUint(uint64(endpointID.LocalPort), 10) + var dial net.Conn + dial, err = net.DialTimeout("tcp", net.JoinHostPort(s, port), time.Second*5) + if err != nil { + log.Warningln(err) + return + } + go io.Copy(conn, dial) + io.Copy(dial, conn) + }(tcpConn) +} + +func GvisorTCPListener(addr string) (net.Listener, error) { + log.Debug("gvisor tcp listen addr", addr) + laddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return nil, err + } + ln, err := net.ListenTCP("tcp", laddr) + if err != nil { + return nil, err + } + return &tcpKeepAliveListener{TCPListener: ln}, nil +} diff --git a/pkg/core/route.go b/pkg/core/route.go index dd29a200..614654c0 100644 --- a/pkg/core/route.go +++ b/pkg/core/route.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "net" "os" "strings" @@ -88,12 +89,20 @@ func (r *Route) GenerateServers() ([]Server, error) { if err != nil { return nil, err } - default: + case "tcp": handler = TCPHandler() ln, err = TCPListener(node.Addr) if err != nil { return nil, err } + case "gtcp": + handler = GvisorTCPHandler() + ln, err = GvisorTCPListener(node.Addr) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("not support protocol %s", node.Protocol) } servers = append(servers, Server{Listener: ln, Handler: handler}) } diff --git a/pkg/core/tcphandler.go b/pkg/core/tcphandler.go index 437d03e3..a3138d6c 100644 --- a/pkg/core/tcphandler.go +++ b/pkg/core/tcphandler.go @@ -70,7 +70,7 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { for _, key := range keys { h.connNAT.Delete(key) } - log.Debugf("delete conn %s from globle routeConnNAT, deleted count %d", addr, len(keys)) + log.Debugf("[tcpserver] delete conn %s from globle routeConnNAT, deleted count %d", addr, len(keys)) }(tcpConn.LocalAddr()) var firstIPv4 = true @@ -93,11 +93,11 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { src = bb[8:24] firstIPv6 = false } else { - log.Errorf("[tun] unknown packet") + log.Errorf("[tcpserver] unknown packet") continue } h.connNAT.LoadOrStore(src.String(), tcpConn) - log.Debugf("[tun] new routeConnNAT: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) + log.Debugf("[tcpserver] new routeConnNAT: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) } h.ch <- dgram } diff --git a/pkg/core/tunendpoint.go b/pkg/core/tunendpoint.go new file mode 100755 index 00000000..49b8ae73 --- /dev/null +++ b/pkg/core/tunendpoint.go @@ -0,0 +1,205 @@ +package core + +import ( + "context" + "net" + "sync" + + "github.com/google/gopacket/layers" + log "github.com/sirupsen/logrus" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" + "gvisor.dev/gvisor/pkg/buffer" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/header" + "gvisor.dev/gvisor/pkg/tcpip/link/channel" + "gvisor.dev/gvisor/pkg/tcpip/stack" + "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" + + "github.com/wencaiwulue/kubevpn/pkg/config" +) + +var _ stack.LinkEndpoint = (*tunEndpoint)(nil) + +// tunEndpoint /Users/naison/go/pkg/mod/gvisor.dev/gvisor@v0.0.0-20220422052705-39790bd3a15a/pkg/tcpip/link/tun/device.go:122 +type tunEndpoint struct { + ctx context.Context + tun net.Conn + once sync.Once + endpoint *channel.Endpoint + + in chan<- *DataElem + out chan *DataElem +} + +// WritePackets writes packets. Must not be called with an empty list of +// packet buffers. +// +// WritePackets may modify the packet buffers, and takes ownership of the PacketBufferList. +// it is not safe to use the PacketBufferList after a call to WritePackets. +func (e *tunEndpoint) WritePackets(p stack.PacketBufferList) (int, tcpip.Error) { + return e.endpoint.WritePackets(p) +} + +// MTU is the maximum transmission unit for this endpoint. This is +// usually dictated by the backing physical network; when such a +// physical network doesn't exist, the limit is generally 64k, which +// includes the maximum size of an IP packet. +func (e *tunEndpoint) MTU() uint32 { + return uint32(config.DefaultMTU) +} + +// MaxHeaderLength returns the maximum size the data link (and +// lower level layers combined) headers can have. Higher levels use this +// information to reserve space in the front of the packets they're +// building. +func (e *tunEndpoint) MaxHeaderLength() uint16 { + return 0 +} + +// LinkAddress returns the link address (typically a MAC) of the +// endpoint. +func (e *tunEndpoint) LinkAddress() tcpip.LinkAddress { + return e.endpoint.LinkAddress() +} + +// Capabilities returns the set of capabilities supported by the +// endpoint. +func (e *tunEndpoint) Capabilities() stack.LinkEndpointCapabilities { + return e.endpoint.LinkEPCapabilities +} + +// Attach attaches the data link layer endpoint to the network-layer +// dispatcher of the stack. +// +// Attach is called with a nil dispatcher when the endpoint's NIC is being +// removed. +func (e *tunEndpoint) Attach(dispatcher stack.NetworkDispatcher) { + e.endpoint.Attach(dispatcher) + // queue --> tun + e.once.Do(func() { + go func() { + for { + read := e.endpoint.ReadContext(e.ctx) + if !read.IsNil() { + bb := read.ToView().AsSlice() + i := config.LPool.Get().([]byte)[:] + n := copy(i, bb) + bb = nil + e.out <- NewDataElem(i[:], n, nil, nil) + } + } + }() + // tun --> dispatcher + go func() { + for { + bytes := config.LPool.Get().([]byte) + read, err := e.tun.Read(bytes[:]) + if err != nil { + log.Warningln(err) + panic(err) + return + } + if read == 0 { + log.Warnf("[TUN]: read from tun length is %d", read) + continue + } + // Try to determine network protocol number, default zero. + var protocol tcpip.NetworkProtocolNumber + var ipProtocol int + var src, dst net.IP + // TUN interface with IFF_NO_PI enabled, thus + // we need to determine protocol from version field + version := bytes[0] >> 4 + if version == 4 { + protocol = header.IPv4ProtocolNumber + ipHeader, err := ipv4.ParseHeader(bytes[:read]) + if err != nil { + log.Error(err) + continue + } + ipProtocol = ipHeader.Protocol + src = ipHeader.Src + dst = ipHeader.Dst + } else if version == 6 { + protocol = header.IPv6ProtocolNumber + ipHeader, err := ipv6.ParseHeader(bytes[:read]) + if err != nil { + log.Error(err) + continue + } + ipProtocol = ipHeader.NextHeader + src = ipHeader.Src + dst = ipHeader.Dst + } else { + log.Errorf("[TUN-gvisor] unknown packet version %d", version) + continue + } + // if is tcp use gvisor + if ipProtocol == int(layers.IPProtocolTCP) && + (!config.CIDR.Contains(dst) && !config.CIDR6.Contains(dst)) { + pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ + ReserveHeaderBytes: 0, + Payload: buffer.MakeWithData(bytes[:read]), + }) + //defer pkt.DecRef() + config.LPool.Put(bytes[:]) + e.endpoint.InjectInbound(protocol, pkt) + } else { + log.Debugf("[TUN-RAW] IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), src.String(), dst, read) + e.in <- NewDataElem(bytes[:], read, src, dst) + } + } + }() + go func() { + for elem := range e.out { + _, err := e.tun.Write(elem.Data()[:elem.Length()]) + config.LPool.Put(elem.Data()[:]) + if err != nil { + panic(err) + } + } + }() + }) +} + +// IsAttached returns whether a NetworkDispatcher is attached to the +// endpoint. +func (e *tunEndpoint) IsAttached() bool { + return e.endpoint.IsAttached() +} + +// Wait waits for any worker goroutines owned by the endpoint to stop. +// +// For now, requesting that an endpoint's worker goroutine(s) stop is +// implementation specific. +// +// Wait will not block if the endpoint hasn't started any goroutines +// yet, even if it might later. +func (e *tunEndpoint) Wait() { + return +} + +// ARPHardwareType returns the ARPHRD_TYPE of the link endpoint. +// +// See: +// https://github.com/torvalds/linux/blob/aa0c9086b40c17a7ad94425b3b70dd1fdd7497bf/include/uapi/linux/if_arp.h#L30 +func (e *tunEndpoint) ARPHardwareType() header.ARPHardwareType { + return header.ARPHardwareNone +} + +// AddHeader adds a link layer header to the packet if required. +func (e *tunEndpoint) AddHeader(ptr stack.PacketBufferPtr) { + return +} + +func NewTunEndpoint(ctx context.Context, tun net.Conn, mtu uint32, in chan<- *DataElem, out chan *DataElem) stack.LinkEndpoint { + addr, _ := tcpip.ParseMACAddress("02:03:03:04:05:06") + return &tunEndpoint{ + ctx: ctx, + tun: tun, + endpoint: channel.New(tcp.DefaultReceiveBufferSize, mtu, addr), + in: in, + out: out, + } +} diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index c08f53e1..b37ca934 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -225,7 +225,7 @@ func (d *Device) parseIPHeader() { e.src = e.data[:e.length][8:24] e.dst = e.data[:e.length][24:40] } else { - log.Errorf("[tun] unknown packet") + log.Errorf("[tun-packet] unknown packet") continue } @@ -238,7 +238,7 @@ func (d *Device) Close() { d.tun.Close() } -func (d *Device) heartbeats() { +func heartbeats(in chan<- *DataElem) { tunIface, err := tun.GetInterface() if err != nil { return @@ -301,7 +301,7 @@ func (d *Device) heartbeats() { } else { src, dst = srcIPv6, config.RouterIP6 } - d.tunInbound <- &DataElem{ + in <- &DataElem{ data: data, length: length, src: src, @@ -370,7 +370,7 @@ func (d *Device) Start(ctx context.Context) { } go d.tunInboundHandler(d.tunInbound, d.tunOutbound) go d.writeToTun() - go d.heartbeats() + //go heartbeats(d.tunInbound) select { case err := <-d.chExit: @@ -420,6 +420,23 @@ type DataElem struct { dst net.IP } +func NewDataElem(data []byte, length int, src net.IP, dst net.IP) *DataElem { + return &DataElem{ + data: data, + length: length, + src: src, + dst: dst, + } +} + +func (d *DataElem) Data() []byte { + return d.data +} + +func (d *DataElem) Length() int { + return d.length +} + type udpElem struct { from net.Addr data []byte @@ -485,7 +502,7 @@ func (p *Peer) readFromTCPConn() { u.src = b[8:24] u.dst = b[24:40] } else { - log.Errorf("[tun] unknown packet") + log.Errorf("[tun-conn] unknown packet") continue } log.Debugf("[tcpserver] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length) diff --git a/pkg/core/tunhandlerclient.go b/pkg/core/tunhandlerclient.go index c1e2fde7..72b3c165 100644 --- a/pkg/core/tunhandlerclient.go +++ b/pkg/core/tunhandlerclient.go @@ -17,16 +17,18 @@ func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) { log.Errorf("[tun] %s: remote addr: %v", tun.LocalAddr(), err) return } + in := make(chan *DataElem, MaxSize) + out := make(chan *DataElem, MaxSize) + endpoint := NewTunEndpoint(ctx, tun, uint32(config.DefaultMTU), in, out) + stack := NewStack(ctx, endpoint) + go stack.Wait() - device := &Device{ - tun: tun, - thread: MaxThread, - tunInboundRaw: make(chan *DataElem, MaxSize), - tunInbound: make(chan *DataElem, MaxSize), - tunOutbound: make(chan *DataElem, MaxSize), - chExit: h.chExit, + d := &ClientDevice{ + tunInbound: in, + tunOutbound: out, + chExit: h.chExit, } - device.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) { + d.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) { for { packetConn, err := getRemotePacketConn(ctx, h.chain) if err != nil { @@ -41,8 +43,7 @@ func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) { } }) - defer device.Close() - device.Start(ctx) + d.Start(ctx) } func getRemotePacketConn(ctx context.Context, chain *Chain) (net.PacketConn, error) { @@ -105,3 +106,28 @@ func transportTunClient(ctx context.Context, tunInbound <-chan *DataElem, tunOut return nil } } + +type ClientDevice struct { + tunInbound chan *DataElem + tunOutbound chan *DataElem + // your main logic + tunInboundHandler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) + chExit chan error +} + +func (d *ClientDevice) Start(ctx context.Context) { + go d.tunInboundHandler(d.tunInbound, d.tunOutbound) + //go heartbeats(d.tunInbound) + + select { + case err := <-d.chExit: + log.Error(err) + return + case <-ctx.Done(): + return + } +} + +func (d *ClientDevice) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)) { + d.tunInboundHandler = handler +} diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index 52aa5f0d..73bc066a 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -158,6 +158,9 @@ func (c *ConnectOptions) DoConnect() (err error) { if err = c.portForward(ctx, fmt.Sprintf("%d:10800", port)); err != nil { return } + if err = c.portForward(ctx, "10801:10801"); err != nil { + return + } if util.IsWindows() { driver.InstallWireGuardTunDriver() } @@ -175,7 +178,7 @@ func (c *ConnectOptions) DoConnect() (err error) { if err = c.setupDNS(); err != nil { return } - go c.heartbeats() + //go c.heartbeats() log.Info("dns service ok") return } diff --git a/pkg/handler/remote.go b/pkg/handler/remote.go index edf03af3..b003d8a9 100644 --- a/pkg/handler/remote.go +++ b/pkg/handler/remote.go @@ -274,7 +274,7 @@ iptables -P FORWARD ACCEPT ip6tables -P FORWARD ACCEPT iptables -t nat -A POSTROUTING -s ${CIDR4} -o eth0 -j MASQUERADE ip6tables -t nat -A POSTROUTING -s ${CIDR6} -o eth0 -j MASQUERADE -kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" --debug=true`, +kubevpn serve -L "tcp://:10800" -L "tun://:8422?net=${TunIPv4}" -L "gtcp://:10801" --debug=true`, }, EnvFrom: []v1.EnvFromSource{{ SecretRef: &v1.SecretEnvSource{