commit f12e2e2ac56d13c1d94f2fda2634800db8136444 Author: Akihiro Suda Date: Thu Sep 24 21:27:24 2020 +0900 initial commit Signed-off-by: Akihiro Suda diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5e56e04 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/bin diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0f56f1f --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +.DEFAULT_GOAL := binaries + +binaries: bin/norouter + +bin/norouter: + CGO_ENABLED=0 go build -o $@ ./cmd/norouter + LANG=C LC_ALL=C file $@ | grep -qw "statically linked" + +clean: + rm -rf bin + +integration: + ./integration/test-internal-agent.sh + ./integration/test-router.sh + +.PHONY: bin/norouter clean integration diff --git a/README.md b/README.md new file mode 100644 index 0000000..54450b0 --- /dev/null +++ b/README.md @@ -0,0 +1,180 @@ +# NoRouter: the easiest multi-host & multi-cloud networking ever. No root privilege required. + +NoRouter is the easiest multi-host & multi-cloud networking ever. And yet, NoRouter does not require any privilege such as `sudo` or `docker run --privileged`. + +NoRouter implements unprivileged networking by using multiple loopback addresseses such as 127.0.42.101 and 127.0.42.102. +The hosts in the network are connected by forwarding packets over stdio streams like `ssh`, `docker exec`, `podman exec`, `kubectl exec`, and whatever. + +![./docs/image.png](./docs/image.png) + + +NoRouter is mostly expected to be used in dev environments. + +## Example using `docker exec` and `podman exec` + +This example creates a virtual 127.0.42.0/24 network across a Docker container, a Podman container, and the localhost, using `docker exec` and `podman exec`. + +**Step 0: build `bin/norouter` binary** (on Linux) + +```console +make +``` + +**Step 1: create `host1` (nginx) as a Docker container** +```console +docker run -d --name host1 nginx:alpine +docker cp $(pwd)/bin/norouter host1:/usr/local/bin +``` + +**Step 2: create `host2` (Apache httpd) as a Podman container** +```console +podman run -d --name host2 httpd:alpine +podman cp $(pwd)/bin/norouter host2:/usr/local/bin +``` + +**Step 3: create [`example.yaml`](./example.yaml)** + +```yaml +hosts: + # host0 is the localhost + host0: + vip: "127.0.42.100" + host1: + cmd: ["docker", "exec", "-i", "host1", "norouter"] + vip: "127.0.42.101" + ports: ["8080:127.0.0.1:80"] + host2: + cmd: ["podman", "exec", "-i", "host2", "norouter"] + vip: "127.0.42.102" + ports: ["8080:127.0.0.1:80"] +``` + +**Step 4: start the NoRouter "router" process** + +```console +./bin/norouter example.yaml +``` + +**Step 5: connect to `host1` (127.0.42.101, nginx)** + +```console +wget -O - http://127.0.42.101:8080 +docker exec host1 wget -O - http://127.0.42.101:8080 +podman exec host2 wget -O - http://127.0.42.101:8080 +``` + +Make sure nginx's `index.html` ("Welcome to nginx!") is shown. + +**Step 6: connect to `host2` (127.0.42.102, Apache httpd)** + +```console +wget -O - http://127.0.42.102:8080 +docker exec host1 wget -O - http://127.0.42.102:8080 +podman exec host2 wget -O - http://127.0.42.102:8080 +``` + +Make sure Apache httpd's `index.html` ("It works!") is shown. + +### How it works under the hood + +The "router" process of NoRouter launches the following commands and transfer the packets using their stdio streams. + +``` +/proc/self/exe internal agent \ + --me 127.0.42.100 \ + --other 127.0.42.101:8080 \ + --other 127.0.42.102:8080 +``` + +``` +docker exec -i host1 norouter internal agent \ + --me 127.0.42.101 \ + --forward 8080:127.0.0.1:80 \ + --other 127.0.42.102:8080 +``` + +``` +podman exec -i host2 norouter internal agent \ + --me 127.0.42.102 \ + --other 127.0.42.101:8080 \ + --forward 8080:127.0.0.1:80 +``` + +`me` is used as a virtual src IP for connecting to `--other :`. + +#### stdio protocol + +The protocol is still subject to change. + + +``` +uint32le Len (includes header fields and Payload but does not include Len itself) +[4]byte SrcIP +uint16le SrcPort +[4]byte DstIP +uint16le DstPort +uint16le Proto +uint16le Flags +[]byte Payload (without L2/L3/L4 headers at all) +``` + +## More examples + +### Kubernetes + +Install `norouter` binary using `kubectl cp` + +e.g. +``` +kubectl run --image=nginx:alpine --restart=Never nginx +kubectl cp bin/norouter nginx:/usr/local/bin +``` + +In the NoRouter yaml, specify `cmd` as `["kubectl", "exec", "-i", "some-kubernetes-pod", "--", "norouter"]`. +To connect multiple Kubernetes clusters, pass `--context` arguments to `kubectl`. + +e.g. To connect GKE, AKS, and your laptop: + +```yaml +hosts: + laptop: + vip: "127.0.42.100" + nginx-on-gke: + cmd: ["kubectl", "--context=gke_myproject-12345_asia-northeast1-c_my-gke", "exec", "-i", "nginx", "--", "norouter"] + vip: "127.0.42.101" + ports: ["8080:127.0.0.1:80"] + httpd-on-aks: + cmd: ["kubectl", "--context=my-aks", "exec", "-i", "httpd", "--", "norouter"] + vip: "127.0.42.102" + ports: ["8080:127.0.0.1:80"] +``` + +### SSH + +Install `norouter` binary using `scp cp ./bin/norouter some-user@some-ssh-host.example.com:/usr/local/bin` . + +In the NoRouter yaml, specify `cmd` as `["ssh", "some-user@some-ssh-host.example.com", "--", "norouter"]`. + +If your key has a passphrase, make sure to configure `ssh-agent` so that NoRouter can login to the host automatically. + +### Azure Container Instances (`az container exec`) + +`az container exec` can't be supported currently because: +- No support for stdin without tty: https://github.com/Azure/azure-cli/issues/15225 +- No support for appending command arguments: https://docs.microsoft.com/en-us/azure/container-instances/container-instances-exec#restrictions +- Extra TTY escape sequence on busybox: https://github.com/Azure/azure-cli/issues/6537 + +A workaround is to inject an SSH sidecar into an Azure container group, and use `ssh` instead of `az container exec`. + +## TODOs + +- Install `norouter` binary to remote hosts automatically? +- Assist generating mTLS certs? +- Add DNS fields to `/etc/resolv.conf` when the file is writable? (writable by default in Docker and Kubernetes) +- Detect port numbers automatically by watching `/proc/net/tcp`, and propagate the information across the cluster automatically? + +## Similar projects + +- [vdeplug4](https://github.com/rd235/vdeplug4): vdeplug4 can create ad-hoc L2 networks over stdio. + vdeplug4 is similar to NoRouter in the sense that it uses stdio, but vdeplug4 requires privileges (at least in userNS) for creating TAP devices. +- [telepresence](https://www.telepresence.io/): kube-only and needs privileges diff --git a/cmd/norouter/internal.go b/cmd/norouter/internal.go new file mode 100644 index 0000000..17c2f86 --- /dev/null +++ b/cmd/norouter/internal.go @@ -0,0 +1,14 @@ +package main + +import ( + "github.com/urfave/cli/v2" +) + +var internalCommand = &cli.Command{ + Name: "internal", + Usage: "Internal commands", + Hidden: true, + Subcommands: []*cli.Command{ + internalAgentCommand, + }, +} diff --git a/cmd/norouter/internal_agent.go b/cmd/norouter/internal_agent.go new file mode 100644 index 0000000..6c2466d --- /dev/null +++ b/cmd/norouter/internal_agent.go @@ -0,0 +1,58 @@ +package main + +import ( + "os" + + "github.com/norouter/norouter/pkg/agent" + "github.com/norouter/norouter/pkg/agent/config" + "github.com/urfave/cli/v2" +) + +var internalAgentCommand = &cli.Command{ + Name: "agent", + Usage: "agent", + Action: internalAgentAction, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "me", + Usage: "my virtual IP without port and proto, e.g. \"127.0.42.101\"", + Required: true, + }, + &cli.StringSliceFlag{ + Name: "other", + Usage: "other virtual IP, port, and optionally proto, e.g. \"127.0.42.102:8080/tcp\"", + }, + &cli.StringSliceFlag{ + Name: "forward", + Usage: "local forward, e.g. \"8080:127.0.0.1:80/tcp\"", + }, + }, +} + +func internalAgentAction(clicontext *cli.Context) error { + me, err := config.ParseMe(clicontext.String("me")) + if err != nil { + return err + } + var others []*config.Other + for _, s := range clicontext.StringSlice("other") { + o, err := config.ParseOther(s) + if err != nil { + return err + } + others = append(others, o) + } + var forwards []*config.Forward + for _, s := range clicontext.StringSlice("forward") { + f, err := config.ParseForward(s) + if err != nil { + return err + } + forwards = append(forwards, f) + } + a, err := agent.New(me, others, forwards, os.Stdout, os.Stdin) + if err != nil { + return err + } + return a.Run() +} diff --git a/cmd/norouter/main.go b/cmd/norouter/main.go new file mode 100644 index 0000000..6f27081 --- /dev/null +++ b/cmd/norouter/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "os" + + "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" +) + +func main() { + logrus.SetFormatter(newLogrusFormatter()) + if err := newApp().Run(os.Args); err != nil { + logrus.Fatal(err) + } +} + +func newApp() *cli.App { + debug := false + app := cli.NewApp() + app.Name = "norouter" + app.Usage = "NoRouter: the easiest multi-host & multi-cloud networking ever. No root privilege required." + + app.Flags = []cli.Flag{ + &cli.BoolFlag{ + Name: "debug", + Usage: "debug mode", + Destination: &debug, + }, + } + app.Before = func(context *cli.Context) error { + if debug { + logrus.SetLevel(logrus.DebugLevel) + } + return nil + } + app.Commands = []*cli.Command{ + routerCommand, + internalCommand, + } + app.Action = routerAction + return app +} + +func newLogrusFormatter() logrus.Formatter { + hostname, _ := os.Hostname() + if hostname == "" { + hostname = "" + } + return &logrusFormatter{ + prefix: hostname + ": ", + Formatter: &logrus.TextFormatter{}, + } +} + +type logrusFormatter struct { + prefix string + logrus.Formatter +} + +func (lf *logrusFormatter) Format(e *logrus.Entry) ([]byte, error) { + b, err := lf.Formatter.Format(e) + if err != nil { + return b, err + } + return append([]byte(lf.prefix), b...), nil +} diff --git a/cmd/norouter/router.go b/cmd/norouter/router.go new file mode 100644 index 0000000..8265f97 --- /dev/null +++ b/cmd/norouter/router.go @@ -0,0 +1,55 @@ +package main + +import ( + "io/ioutil" + + "github.com/norouter/norouter/pkg/router" + "github.com/norouter/norouter/pkg/router/config" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/urfave/cli/v2" + "gopkg.in/yaml.v2" +) + +var routerCommand = &cli.Command{ + Name: "router", + Aliases: []string{"r"}, + Usage: "router (default subcommand)", + Action: routerAction, +} + +func routerAction(clicontext *cli.Context) error { + configPath := clicontext.Args().First() + if configPath == "" { + return errors.New("no config file path was specified") + } + cfg, err := loadConfig(configPath) + if err != nil { + return err + } + logrus.Debugf("config: %+v", cfg) + ccSet, err := router.NewCmdClientSet(cfg) + if err != nil { + return err + } + for vip, client := range ccSet.ByVIP { + logrus.Debugf("client for %q: %q", vip, client.String()) + } + r, err := router.New(ccSet) + if err != nil { + return err + } + return r.Run() +} + +func loadConfig(configPath string) (*config.Config, error) { + configB, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + var cfg config.Config + if err := yaml.Unmarshal(configB, &cfg); err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/docs/image.png b/docs/image.png new file mode 100644 index 0000000..18c7b1d Binary files /dev/null and b/docs/image.png differ diff --git a/example.yaml b/example.yaml new file mode 100644 index 0000000..2a2b5b9 --- /dev/null +++ b/example.yaml @@ -0,0 +1,13 @@ +# see README.md for the usage +hosts: + # host0 is the localhost + host0: + vip: "127.0.42.100" + host1: + cmd: ["docker", "exec", "-i", "host1", "norouter"] + vip: "127.0.42.101" + ports: ["8080:127.0.0.1:80"] + host2: + cmd: ["podman", "exec", "-i", "host2", "norouter"] + vip: "127.0.42.102" + ports: ["8080:127.0.0.1:80"] diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..29777f1 --- /dev/null +++ b/go.mod @@ -0,0 +1,12 @@ +module github.com/norouter/norouter + +go 1.15 + +require ( + github.com/hashicorp/go-multierror v1.1.0 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.6.0 + github.com/urfave/cli/v2 v2.2.0 + golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 + gopkg.in/yaml.v2 v2.2.2 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5fc1069 --- /dev/null +++ b/go.sum @@ -0,0 +1,33 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= +github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= +github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= +github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= +github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= +github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4= +github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/integration/test-internal-agent.sh b/integration/test-internal-agent.sh new file mode 100755 index 0000000..c97cb81 --- /dev/null +++ b/integration/test-internal-agent.sh @@ -0,0 +1,66 @@ +#!/bin/bash +# A script for testing `norouter internal agent` without `norouter router` +# +set -eu -o pipefail + +cd "$(dirname $0)/.." + +pid="" +cleanup() { + echo "Cleaning up..." + set +e + if [[ -n "$pid" && -d "/proc/$pid" ]]; then kill $pid; fi + docker rm -f host1 host2 + make clean + set -e +} +cleanup +trap "cleanup" EXIT + +make +docker run -d --name host1 -v "$(pwd)/bin:/mnt:ro" nginx:1.19.2-alpine +docker run -d --name host2 -v "$(pwd)/bin:/mnt:ro" httpd:2.4.46-alpine + +: ${DEBUG=} +flags="" +if [[ -n "$DEBUG" ]]; then + flags="--debug" +fi + +dpipe \ + docker exec -i host1 /mnt/norouter ${flags} internal agent \ + --me 127.0.42.101 \ + --forward 8080:127.0.0.1:80 \ + --other 127.0.42.102:8080 \ + = \ + docker exec -i host2 /mnt/norouter ${flags} internal agent \ + --me 127.0.42.102 \ + --other 127.0.42.101:8080 \ + --forward 8080:127.0.0.1:80 & +pid=$! + +sleep 2 + +: ${N=10} +succeeds=0 +fails=0 +# Connect to host1 (127.0.42.101, nginx) from host2 +for ((i = 0; i < $N; i++)); do + if docker exec host2 wget -q -O- http://127.0.42.101:8080 | grep -q "Welcome to nginx"; then + succeeds=$((succeeds + 1)) + else + fails=$((fails + 1)) + fi +done + +# Connect to host2 (127.0.42.102, Apache httpd) from host1 +for ((i = 0; i < $N; i++)); do + if docker exec host1 wget -q -O- http://127.0.42.102:8080 | grep -q "It works"; then + succeeds=$((succeeds + 1)) + else + fails=$((fails + 1)) + fi +done + +echo "tests: $((N * 2)), succceeds: ${succeeds}, fails: ${fails}" +exit ${fails} diff --git a/integration/test-router.sh b/integration/test-router.sh new file mode 100755 index 0000000..094a130 --- /dev/null +++ b/integration/test-router.sh @@ -0,0 +1,63 @@ +#!/bin/bash +set -eu -o pipefail + +cd "$(dirname $0)/.." + +pid="" +cleanup() { + echo "Cleaning up..." + set +e + if [[ -n "$pid" && -d "/proc/$pid" ]]; then kill $pid; fi + docker rm -f host1 host2 host3 + make clean + set -e +} +cleanup +trap "cleanup" EXIT + +make +docker run -d --name host1 -v "$(pwd)/bin:/mnt:ro" nginx:1.19.2-alpine +docker run -d --name host2 -v "$(pwd)/bin:/mnt:ro" httpd:2.4.46-alpine +docker run -d --name host3 -v "$(pwd)/bin:/mnt:ro" caddy:2.1.1-alpine + +: ${DEBUG=} +flags="" +if [[ -n "$DEBUG" ]]; then + flags="--debug" +fi + +./bin/norouter ${flags} ./integration/test-router.yaml & +pid=$! + +sleep 3 + +: ${N=3} +succeeds=0 +fails=0 + +test_wget() { + for ((i = 0; i < $N; i++)); do + if wget -q -O- $1 | grep -q "$2"; then + succeeds=$((succeeds + 1)) + else + fails=$((fails + 1)) + fi + for ((j = 1; j <= 3; j++)); do + if docker exec host${j} wget -q -O- $1 | grep -q "$2"; then + succeeds=$((succeeds + 1)) + else + fails=$((fails + 1)) + fi + done + done +} + +# Connect to host1 (nginx) +test_wget http://127.0.42.101:8080 "Welcome to nginx" +# Connect to host2 (Apache httpd) +test_wget http://127.0.42.102:8080 "It works" +# Connect to host3 (Caddy) +test_wget http://127.0.42.103:8080 "Caddy" + +echo "tests: $((N * 4 * 3)), succceeds: ${succeeds}, fails: ${fails}" +exit ${fails} diff --git a/integration/test-router.yaml b/integration/test-router.yaml new file mode 100644 index 0000000..199746d --- /dev/null +++ b/integration/test-router.yaml @@ -0,0 +1,16 @@ +hosts: + # host0 is the localhost + host0: + vip: "127.0.42.100" + host1: + cmd: ["docker", "exec", "-i", "host1", "/mnt/norouter"] + vip: "127.0.42.101" + ports: ["8080:127.0.0.1:80"] + host2: + cmd: ["docker", "exec", "-i", "host2", "/mnt/norouter"] + vip: "127.0.42.102" + ports: ["8080:127.0.0.1:80"] + host3: + cmd: ["docker", "exec", "-i", "host3", "/mnt/norouter"] + vip: "127.0.42.103" + ports: ["8080:127.0.0.1:80"] diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go new file mode 100644 index 0000000..6e86e2d --- /dev/null +++ b/pkg/agent/agent.go @@ -0,0 +1,255 @@ +package agent + +import ( + "encoding/binary" + "fmt" + "hash/fnv" + "io" + "math/rand" + "net" + "runtime" + "sync" + "time" + + "github.com/norouter/norouter/pkg/agent/config" + "github.com/norouter/norouter/pkg/agent/conn" + "github.com/norouter/norouter/pkg/bicopy" + "github.com/norouter/norouter/pkg/debugutil" + "github.com/norouter/norouter/pkg/stream" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +func runForward(me net.IP, f *config.Forward) error { + lh := fmt.Sprintf("%s:%d", me.String(), f.ListenPort) + l, err := net.Listen(f.Proto, lh) + if err != nil { + return errors.Wrapf(err, "failed to listen on %q", lh) + } + go func() { + for { + lconn, err := l.Accept() + if err != nil { + logrus.WithError(err).Error("failed to accept") + continue + } + go func() { + dh := fmt.Sprintf("%s:%d", f.ConnectIP.String(), f.ConnectPort) + dconn, err := net.Dial("tcp", dh) + if err != nil { + logrus.WithError(err).Errorf("failed to dial to %q", dh) + return + } + defer dconn.Close() + defer lconn.Close() + bicopy.Bicopy(lconn, dconn, nil) + }() + } + }() + return nil +} + +func newRandSource(me net.IP, now time.Time) rand.Source { + h := fnv.New64() + me4 := me.To4() + if me4 == nil { + panic(errors.Errorf("unsupported IP address %q", me)) + + } + binary.Write(h, binary.LittleEndian, me) + binary.Write(h, binary.LittleEndian, now.UnixNano) + seed := h.Sum64() + return rand.NewSource(int64(seed)) +} + +func New(me net.IP, others []*config.Other, forwards []*config.Forward, w io.Writer, r io.Reader) (*Agent, error) { + debugDump := false + sender := &stream.Sender{ + Writer: w, + DebugDump: debugDump, + } + receiver := &stream.Receiver{ + Reader: r, + DebugDump: debugDump, + } + a := &Agent{ + me: me, + others: others, + tcpForwards: make(map[uint16]*config.Forward), + sender: sender, + receiver: receiver, + rand: rand.New(newRandSource(me, time.Now())), + pwHM: make(map[uint64]*io.PipeWriter), + } + for _, f := range forwards { + if f.Proto != "tcp" { + return nil, errors.Errorf("unexpected proto %q", f.Proto) + } + a.tcpForwards[f.ListenPort] = f + } + return a, nil +} + +type Agent struct { + me net.IP + others []*config.Other + tcpForwards map[uint16]*config.Forward // key: listenPort + sender *stream.Sender + receiver *stream.Receiver + rand *rand.Rand + pwHM map[uint64]*io.PipeWriter + pwHMMu sync.RWMutex +} + +func (a *Agent) generateVSrcPort() uint16 { + var vSrcPort uint16 + for { + vSrcPort = uint16(a.rand.Int()) + if vSrcPort == 0 { + continue + } + if _, conflict := a.tcpForwards[vSrcPort]; conflict { + continue + } + // FIXME: detect more colisions + break + } + return vSrcPort +} + +func (a *Agent) runOther(o *config.Other) error { + lh := fmt.Sprintf("%s:%d", o.IP, o.Port) + l, err := net.Listen(o.Proto, lh) + if err != nil { + return err + } + go func() { + for { + lconn, err := l.Accept() + if err != nil { + logrus.WithError(err).Error("failed to accept") + continue + } + vSrcPort := a.generateVSrcPort() + logrus.Debugf("generated vSrcPort=%d", vSrcPort) + conn, pw, err := conn.New(a.me, vSrcPort, o.IP, o.Port, a.sender) + if err != nil { + logrus.WithError(err).Warn("failed to create conn") + lconn.Close() + return + } + hdrHash := stream.HashFields(o.IP, o.Port, a.me, vSrcPort, stream.TCP) + a.pwHMMu.Lock() + logrus.Debugf("registering pw for %s:%d->%s:%d", o.IP, o.Port, a.me, vSrcPort) + a.pwHM[hdrHash] = pw + a.pwHMMu.Unlock() + go func() { + defer lconn.Close() + defer conn.Close() + bicopy.Bicopy(conn, lconn, nil) + }() + } + }() + return nil +} + +func (a *Agent) getPW(hdrHash uint64, pkt *stream.Packet) (*io.PipeWriter, bool, error) { + a.pwHMMu.RLock() + pw, pwOk := a.pwHM[hdrHash] + a.pwHMMu.RUnlock() + if pwOk { + return pw, pwOk, nil + } + if f, fOk := a.tcpForwards[pkt.DstPort]; fOk { + // connect to forward ports, e.g. 8080 (->127.0.0.1:80) + dh := fmt.Sprintf("%s:%d", f.ConnectIP.String(), f.ConnectPort) + dconn, err := net.Dial(f.Proto, dh) + if err != nil { + logrus.WithError(err).Warnf("failed to dial %q", dh) + return nil, false, err + } + logrus.Debugf("dialed to %q, creating replyConn", dh) + var replyConn *conn.Conn + replyConn, pw, err = conn.New(pkt.DstIP, pkt.DstPort, pkt.SrcIP, pkt.SrcPort, a.sender) + if err != nil { + dconn.Close() + return nil, false, errors.Wrap(err, "failed to create replyConn") + } + a.pwHMMu.Lock() + logrus.Debugf("registering pw for %s:%d->%s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort) + a.pwHM[hdrHash] = pw + a.pwHMMu.Unlock() + go func() { + defer dconn.Close() + defer replyConn.Close() + bicopy.Bicopy(dconn, replyConn, nil) + }() + return pw, true, nil + } + return nil, false, nil +} + +func (a *Agent) Run() error { + for _, f := range a.tcpForwards { + if err := runForward(a.me, f); err != nil { + return err + } + } + for _, o := range a.others { + if err := a.runOther(o); err != nil { + return err + } + } + for { + pkt, err := a.receiver.Recv() + if err != nil { + return errors.Wrap(err, "failed to recv from receiver") + } + if pkt.Proto != stream.TCP { + logrus.Warnf("received unknown proto %d, ignoring", pkt.Proto) + continue + } + if !pkt.DstIP.Equal(a.me) { + logrus.Warnf("received dstIP=%s is not me (%s), ignoring", pkt.DstIP.String(), a.me.String()) + continue + } + hdrHash := stream.HashFields(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, pkt.Proto) + pw, pwOk, err := a.getPW(hdrHash, pkt) + if err != nil { + logrus.WithError(err).Warnf("failed to call getPW (%s:%d->%s:%d)", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort) + } + if pwOk { + logrus.Debugf("Calling pw.Write %s:%d->%s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort) + if _, err := pw.Write(pkt.Payload); err != nil { + logrus.WithError(err).Warn("pw.Write failed") + } + } else { + logrus.Debugf("NOT calling pw.Write %s:%d->%s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort) + } + a.gc(pkt, hdrHash, pw) + a.debugPrintStat() + } +} + +func (a *Agent) gc(pkt *stream.Packet, hdrHash uint64, pw *io.PipeWriter) { + // FIXME: support half-closing properly + if pkt.Flags&stream.FlagCloseRead != 0 || pkt.Flags&stream.FlagCloseWrite != 0 { + if pw != nil { + if err := pw.Close(); err != nil { + logrus.WithError(err).Debugf("failed to close pw") + } + } + a.pwHMMu.Lock() + delete(a.pwHM, hdrHash) + a.pwHMMu.Unlock() + } +} + +func (a *Agent) debugPrintStat() { + if logrus.GetLevel() >= logrus.DebugLevel { + a.pwHMMu.RLock() + l := len(a.pwHM) + a.pwHMMu.RUnlock() + logrus.Debugf("STAT: len(a.pwHM)=%d,GoRoutines=%d, FDs=%d", l, runtime.NumGoroutine(), debugutil.NumFDs()) + } +} diff --git a/pkg/agent/config/config.go b/pkg/agent/config/config.go new file mode 100644 index 0000000..a2bb39f --- /dev/null +++ b/pkg/agent/config/config.go @@ -0,0 +1,100 @@ +package config + +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/pkg/errors" +) + +type Other struct { + IP net.IP + Port uint16 + Proto string +} + +func (o *Other) String() string { + return fmt.Sprintf("%s:%d/%s", o.IP.String(), o.Port, o.Proto) +} + +type Forward struct { + // listenIP is "me" + ListenPort uint16 + ConnectIP net.IP + ConnectPort uint16 + Proto string +} + +// parseMe parses --me=127.0.42.101 flag +func ParseMe(me string) (net.IP, error) { + ip := net.ParseIP(me) + if ip == nil { + return nil, errors.Errorf("invalid \"me\" IP %q", me) + } + ip = ip.To4() + if ip == nil { + return nil, errors.Errorf("invalid \"me\" IP %q, must be IPv4", me) + } + return ip, nil +} + +// ParseOther parses --other=127.0.42.102:8080[/tcp] flag +func ParseOther(other string) (*Other, error) { + s := strings.TrimSuffix(other, "/tcp") + if strings.Contains(s, "/") { + // TODO: support "/udp" suffix + return nil, errors.Errorf("cannot parse \"other\" address %q", other) + } + h, p, err := net.SplitHostPort(s) + if err != nil { + return nil, errors.Wrapf(err, "cannot parse \"other\" address %q", other) + } + ip := net.ParseIP(h) + if ip == nil { + return nil, errors.Errorf("cannot parse \"other\" address %q", other) + } + port, err := strconv.Atoi(p) + if err != nil { + return nil, errors.Wrapf(err, "cannot parse \"other\" address %q", other) + } + o := &Other{ + IP: ip, + Port: uint16(port), + Proto: "tcp", + } + return o, nil +} + +// ParseForward parses --forward=8080:127.0.0.1:80[/tcp] flag +func ParseForward(forward string) (*Forward, error) { + s := strings.TrimSuffix(forward, "/tcp") + if strings.Contains(s, "/") { + // TODO: support "/udp" suffix + return nil, errors.Errorf("cannot parse \"forward\" address %q", forward) + } + split := strings.Split(s, ":") + if len(split) != 3 { + return nil, errors.Errorf("cannot parse \"forward\" address %q", forward) + } + listenPort, err := strconv.Atoi(split[0]) + if err != nil { + return nil, errors.Wrapf(err, "cannot parse \"forward\" address %q", forward) + } + connectIP := net.ParseIP(split[1]) + if connectIP == nil { + return nil, errors.Errorf("cannot parse \"forward\" address %q", forward) + } + connectPort, err := strconv.Atoi(split[2]) + if err != nil { + return nil, errors.Wrapf(err, "cannot parse \"forward\" address %q", forward) + } + f := &Forward{ + ListenPort: uint16(listenPort), + ConnectIP: connectIP, + ConnectPort: uint16(connectPort), + Proto: "tcp", + } + return f, nil +} diff --git a/pkg/agent/conn/conn.go b/pkg/agent/conn/conn.go new file mode 100644 index 0000000..b13fbfb --- /dev/null +++ b/pkg/agent/conn/conn.go @@ -0,0 +1,130 @@ +package conn + +import ( + "io" + "net" + + "github.com/hashicorp/go-multierror" + "github.com/norouter/norouter/pkg/stream" +) + +func New(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16, sender *stream.Sender) (*Conn, *io.PipeWriter, error) { + pr, pw := io.Pipe() + c := &Conn{ + SrcIP: srcIP, + SrcPort: srcPort, + DstIP: dstIP, + DstPort: dstPort, + sender: sender, + pr: pr, + } + return c, pw, nil +} + +type Conn struct { + SrcIP net.IP + SrcPort uint16 + DstIP net.IP + DstPort uint16 + + sender *stream.Sender + pr *io.PipeReader // pkt.Payload, passed from receiver + readClosed bool + writeClosed bool +} + +func (c *Conn) Write(p []byte) (int, error) { + pkt := &stream.Packet{ + SrcIP: c.SrcIP, + SrcPort: c.SrcPort, + DstIP: c.DstIP, + DstPort: c.DstPort, + Proto: stream.TCP, + Flags: 0, + Payload: p, + } + if err := c.sender.Send(pkt); err != nil { + return 0, err + } + return len(p), nil +} + +func (c *Conn) Read(p []byte) (int, error) { + return c.pr.Read(p) +} + +func (c *Conn) Close() error { + if c.readClosed && c.writeClosed { + return nil + } + if c.readClosed { + return c.CloseWrite() + } + if c.writeClosed { + return c.CloseRead() + } + var merr *multierror.Error + pkt := &stream.Packet{ + SrcIP: c.SrcIP, + SrcPort: c.SrcPort, + DstIP: c.DstIP, + DstPort: c.DstPort, + Proto: stream.TCP, + Flags: stream.FlagCloseRead | stream.FlagCloseWrite, + Payload: nil, + } + if err := c.sender.Send(pkt); err != nil { + merr = multierror.Append(merr, err) + } + if err := c.pr.Close(); err != nil { + merr = multierror.Append(merr, err) + } + c.readClosed = true + c.writeClosed = true + return merr.ErrorOrNil() +} + +func (c *Conn) CloseRead() error { + if c.readClosed { + return nil + } + var merr *multierror.Error + pkt := &stream.Packet{ + SrcIP: c.SrcIP, + SrcPort: c.SrcPort, + DstIP: c.DstIP, + DstPort: c.DstPort, + Proto: stream.TCP, + Flags: stream.FlagCloseRead, + Payload: nil, + } + if err := c.sender.Send(pkt); err != nil { + merr = multierror.Append(merr, err) + } + if err := c.pr.Close(); err != nil { + merr = multierror.Append(merr, err) + } + c.readClosed = true + return merr.ErrorOrNil() +} + +func (c *Conn) CloseWrite() error { + if c.writeClosed { + return nil + } + var merr *multierror.Error + pkt := &stream.Packet{ + SrcIP: c.SrcIP, + SrcPort: c.SrcPort, + DstIP: c.DstIP, + DstPort: c.DstPort, + Proto: stream.TCP, + Flags: stream.FlagCloseWrite, + Payload: nil, + } + if err := c.sender.Send(pkt); err != nil { + merr = multierror.Append(merr, err) + } + c.writeClosed = true + return merr.ErrorOrNil() +} diff --git a/pkg/bicopy/bicopy.go b/pkg/bicopy/bicopy.go new file mode 100644 index 0000000..0ed50a5 --- /dev/null +++ b/pkg/bicopy/bicopy.go @@ -0,0 +1,62 @@ +package bicopy + +import ( + "io" + "sync" + + "github.com/sirupsen/logrus" +) + +// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104 +// (originally from libnetwork, Apache License 2.0) +func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) { + type closeReader interface { + CloseRead() error + } + type closeWriter interface { + CloseWrite() error + } + var wg sync.WaitGroup + var broker = func(to, from io.ReadWriter) { + if _, err := io.Copy(to, from); err != nil { + logrus.WithError(err).Debug("failed to call io.Copy") + } + if fromCR, ok := from.(closeReader); ok { + if err := fromCR.CloseRead(); err != nil { + logrus.WithError(err).Debug("failed to call CloseRead") + } + } + if toCW, ok := to.(closeWriter); ok { + if err := toCW.CloseWrite(); err != nil { + logrus.WithError(err).Debug("failed to call CloseWrite") + } + } + wg.Done() + } + + wg.Add(2) + go broker(x, y) + go broker(y, x) + finish := make(chan struct{}) + go func() { + wg.Wait() + close(finish) + }() + + select { + case <-quit: + case <-finish: + } + if xCloser, ok := x.(io.Closer); ok { + if err := xCloser.Close(); err != nil { + logrus.WithError(err).Debug("failed to call xCloser.Close") + } + } + if yCloser, ok := y.(io.Closer); ok { + if err := yCloser.Close(); err != nil { + logrus.WithError(err).Debug("failed to call yCloser.Close") + } + } + <-finish + // TODO: return copied bytes +} diff --git a/pkg/debugutil/debugutil.go b/pkg/debugutil/debugutil.go new file mode 100644 index 0000000..0febbf1 --- /dev/null +++ b/pkg/debugutil/debugutil.go @@ -0,0 +1,18 @@ +package debugutil + +import ( + "io/ioutil" + "runtime" +) + +func NumFDs() int { + if runtime.GOOS != "linux" { + // unimplemented + return -1 + } + ents, err := ioutil.ReadDir("/proc/self/fd") + if err != nil { + return -1 + } + return len(ents) +} diff --git a/pkg/router/cmdclient.go b/pkg/router/cmdclient.go new file mode 100644 index 0000000..fd6f65a --- /dev/null +++ b/pkg/router/cmdclient.go @@ -0,0 +1,84 @@ +package router + +import ( + "fmt" + "os" + "os/exec" + "runtime" + + agentconfig "github.com/norouter/norouter/pkg/agent/config" + "github.com/norouter/norouter/pkg/router/config" +) + +type CmdClientSet struct { + ByVIP map[string]*CmdClient +} + +func NewCmdClientSet(cfg *config.Config) (*CmdClientSet, error) { + var publicHostPorts []string + for _, h := range cfg.Hosts { + for _, p := range h.Ports { + f, err := agentconfig.ParseForward(p) + if err != nil { + return nil, err + } + publicHostPorts = append(publicHostPorts, fmt.Sprintf("%s:%d/%s", h.VIP, f.ListenPort, f.Proto)) + } + } + ccSet := &CmdClientSet{ + ByVIP: make(map[string]*CmdClient), + } + for hostname, h := range cfg.Hosts { + client, err := NewCmdClient(hostname, h, publicHostPorts) + if err != nil { + return nil, err + } + ccSet.ByVIP[h.VIP] = client + } + return ccSet, nil +} + +// NewCmdClient. +func NewCmdClient(hostname string, h config.Host, publicHostPorts []string) (*CmdClient, error) { + var cmd *exec.Cmd + if len(h.Cmd) != 0 { + // e.g. ["docker", "exec", "-i", "host1", "--", "norouter"] + cmd = exec.Command(h.Cmd[0], h.Cmd[1:]...) + } else { + if runtime.GOOS == "linux" { + cmd = exec.Command("/proc/self/exe") + } else { + cmd = exec.Command(os.Args[0]) + } + } + cmd.Args = append(cmd.Args, "internal", "agent", "--me", h.VIP) + for _, port := range h.Ports { + cmd.Args = append(cmd.Args, "--forward", port) + } + for _, pub := range publicHostPorts { + o, err := agentconfig.ParseOther(pub) + if err != nil { + return nil, err + } + if o.IP.String() == h.VIP { + continue + } + cmd.Args = append(cmd.Args, "--other", pub) + } + c := &CmdClient{ + Hostname: hostname, + VIP: h.VIP, + cmd: cmd, + } + return c, nil +} + +type CmdClient struct { + Hostname string + VIP string + cmd *exec.Cmd +} + +func (c *CmdClient) String() string { + return fmt.Sprintf("<%s (%s)> %s", c.Hostname, c.VIP, c.cmd.String()) +} diff --git a/pkg/router/config/config.go b/pkg/router/config/config.go new file mode 100644 index 0000000..ec04ff3 --- /dev/null +++ b/pkg/router/config/config.go @@ -0,0 +1,11 @@ +package config + +type Config struct { + Hosts map[string]Host `yaml:"hosts"` +} + +type Host struct { + Cmd []string `yaml:"cmd"` // e.g. ["docker", "exec", "-i", "host1", "norouter"] + VIP string `yaml:"vip"` // e.g. "127.0.42.101" + Ports []string `yaml:"ports"` // e.g. ["8080:127.0.0.1:80"], or ["8080:127.0.0.1:80/tcp"] +} diff --git a/pkg/router/router.go b/pkg/router/router.go new file mode 100644 index 0000000..d5acb86 --- /dev/null +++ b/pkg/router/router.go @@ -0,0 +1,103 @@ +package router + +import ( + "os" + + "github.com/norouter/norouter/pkg/stream" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" +) + +func New(ccSet *CmdClientSet) (*Router, error) { + r := &Router{ + ccSet: ccSet, + senders: make(map[string]*stream.Sender), + receivers: make(map[string]*stream.Receiver), + } + return r, nil +} + +type Router struct { + ccSet *CmdClientSet + senders map[string]*stream.Sender // key: vip (TODO: don't use string) + receivers map[string]*stream.Receiver +} + +func (r *Router) Run() error { + debugDump := false + // Step 1: fill up senders + for vip, cc := range r.ccSet.ByVIP { + cc.cmd.Stderr = &stderrWriter{ + vip: cc.VIP, + hostname: cc.Hostname, + } + stdin, err := cc.cmd.StdinPipe() + if err != nil { + return err + } + sender := &stream.Sender{ + Writer: stdin, + DebugDump: debugDump, + } + r.senders[vip] = sender + stdout, err := cc.cmd.StdoutPipe() + if err != nil { + return err + } + receiver := &stream.Receiver{ + Reader: stdout, + DebugDump: debugDump, + } + r.receivers[vip] = receiver + logrus.Infof("starting client for %s(%s): %q", cc.Hostname, vip, cc.cmd.String()) + if err := cc.cmd.Start(); err != nil { + return err + } + // TODO: notify if a client exits + defer func() { + logrus.Warnf("exiting client: %q", cc.String()) + if err := cc.cmd.Process.Signal(os.Interrupt); err != nil { + logrus.WithError(err).Errorf("error while sending os.Interrupt to %s(%s)", cc.Hostname, vip) + cc.cmd.Process.Kill() + } + }() + } + + var eg errgroup.Group + // Step 2: start goroutines after filling up all r.senders + for vipx, receiverx := range r.receivers { + vip := vipx + receiver := receiverx + eg.Go(func() error { + for { + pkt, err := receiver.Recv() + if err != nil { + return errors.Errorf("failed to receive from %s", vip) + } + dstIPStr := pkt.DstIP.String() + sender, ok := r.senders[dstIPStr] + if !ok { + logrus.WithError(err).Warnf("unexpected dstIP %s in a packet from %s", dstIPStr, vip) + continue + } + logrus.Debugf("routing packet from %s:%d to %s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort) + if err := sender.Send(pkt); err != nil { + logrus.WithError(err).Warnf("routing packet from %s:%d to %s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort) + continue + } + } + }) + } + return eg.Wait() +} + +type stderrWriter struct { + hostname string + vip string +} + +func (w *stderrWriter) Write(p []byte) (int, error) { + logrus.Warnf("stderr[%s(%s)]: %s", w.hostname, w.vip, string(p)) + return len(p), nil +} diff --git a/pkg/stream/hash.go b/pkg/stream/hash.go new file mode 100644 index 0000000..d774601 --- /dev/null +++ b/pkg/stream/hash.go @@ -0,0 +1,35 @@ +package stream + +import ( + "encoding/binary" + "hash/fnv" + "net" + + "github.com/pkg/errors" +) + +func HashFields(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16, proto Proto) uint64 { + h := fnv.New64() + if len(srcIP) != 0 { + srcIP4 := srcIP.To4() + if srcIP4 == nil { + panic(errors.Errorf("unsupported IP address %q", srcIP)) + } + binary.Write(h, binary.LittleEndian, srcIP4) + } else { + binary.Write(h, binary.LittleEndian, net.ParseIP("0.0.0.0")) + } + binary.Write(h, binary.LittleEndian, srcPort) + if len(dstIP) != 0 { + dstIP4 := dstIP.To4() + if dstIP4 == nil { + panic(errors.Errorf("unsupported IP address %q", dstIP)) + } + binary.Write(h, binary.LittleEndian, dstIP4) + } else { + binary.Write(h, binary.LittleEndian, net.ParseIP("0.0.0.0")) + } + binary.Write(h, binary.LittleEndian, dstPort) + binary.Write(h, binary.LittleEndian, proto) + return h.Sum64() +} diff --git a/pkg/stream/receiver.go b/pkg/stream/receiver.go new file mode 100644 index 0000000..3edfc5a --- /dev/null +++ b/pkg/stream/receiver.go @@ -0,0 +1,75 @@ +package stream + +import ( + "bytes" + "encoding/binary" + "io" + "net" + "sync" + + "github.com/sirupsen/logrus" +) + +// Receiver +type Receiver struct { + io.Reader + sync.Mutex + DebugDump bool +} + +func (receiver *Receiver) Recv() (*Packet, error) { + var length uint32 // HeaderLength + len(p) + receiver.Lock() + if err := binary.Read(receiver.Reader, binary.LittleEndian, &length); err != nil { + return nil, err + } + b := make([]byte, length) + if err := binary.Read(receiver.Reader, binary.LittleEndian, &b); err != nil { + return nil, err + } + receiver.Unlock() + var ( + srcIP4 [4]byte + srcPort uint16 + dstIP4 [4]byte + dstPort uint16 + proto uint16 + flags uint16 + ) + br := bytes.NewReader(b) + if err := binary.Read(br, binary.LittleEndian, &srcIP4); err != nil { + return nil, err + } + if err := binary.Read(br, binary.LittleEndian, &srcPort); err != nil { + return nil, err + } + if err := binary.Read(br, binary.LittleEndian, &dstIP4); err != nil { + return nil, err + } + if err := binary.Read(br, binary.LittleEndian, &dstPort); err != nil { + return nil, err + } + if err := binary.Read(br, binary.LittleEndian, &proto); err != nil { + return nil, err + } + if err := binary.Read(br, binary.LittleEndian, &flags); err != nil { + return nil, err + } + pkt := &Packet{ + SrcIP: net.IP(srcIP4[:]), + SrcPort: srcPort, + DstIP: net.IP(dstIP4[:]), + DstPort: dstPort, + Proto: Proto(proto), + Flags: flags, + Payload: b[HeaderLength:], + } + + if receiver.DebugDump && logrus.GetLevel() >= logrus.DebugLevel { + logrus.Debugf("receiver: Received %s:%d->%s:%d (%v) 0b%b: %q", + pkt.SrcIP.String(), pkt.SrcPort, + pkt.DstIP.String(), pkt.DstPort, + pkt.Proto, pkt.Flags, string(pkt.Payload)) + } + return pkt, nil +} diff --git a/pkg/stream/sender.go b/pkg/stream/sender.go new file mode 100644 index 0000000..f3c37cf --- /dev/null +++ b/pkg/stream/sender.go @@ -0,0 +1,66 @@ +package stream + +import ( + "bytes" + "encoding/binary" + "io" + "sync" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// Sender +type Sender struct { + io.Writer + sync.Mutex + DebugDump bool +} + +func (sender *Sender) Send(p *Packet) error { + if sender.DebugDump && logrus.GetLevel() >= logrus.DebugLevel { + logrus.Debugf("sender: Sending %s:%d %s:%d (%v) 0b%b: %q", + p.SrcIP.String(), p.SrcPort, + p.DstIP.String(), p.DstPort, + p.Proto, p.Flags, string(p.Payload)) + } + var buf bytes.Buffer + if err := binary.Write(&buf, binary.LittleEndian, uint32(HeaderLength+len(p.Payload))); err != nil { + return err + } + sip := p.SrcIP.To4() + if sip == nil { + return errors.Errorf("unexpected ip %+v", sip) + } + if err := binary.Write(&buf, binary.LittleEndian, []byte(sip)); err != nil { + return err + } + if err := binary.Write(&buf, binary.LittleEndian, p.SrcPort); err != nil { + return err + } + dip := p.DstIP.To4() + if dip == nil { + return errors.Errorf("unexpected ip %+v", dip) + } + if err := binary.Write(&buf, binary.LittleEndian, []byte(dip)); err != nil { + return err + } + if err := binary.Write(&buf, binary.LittleEndian, p.DstPort); err != nil { + return err + } + if err := binary.Write(&buf, binary.LittleEndian, uint16(p.Proto)); err != nil { + return err + } + if err := binary.Write(&buf, binary.LittleEndian, p.Flags); err != nil { + return err + } + if p.Payload != nil { + if err := binary.Write(&buf, binary.LittleEndian, p.Payload); err != nil { + return err + } + } + sender.Lock() + _, err := io.Copy(sender.Writer, &buf) + sender.Unlock() + return err +} diff --git a/pkg/stream/stream.go b/pkg/stream/stream.go new file mode 100644 index 0000000..73cd008 --- /dev/null +++ b/pkg/stream/stream.go @@ -0,0 +1,50 @@ +package stream + +import ( + "net" +) + +type Proto = uint16 + +const ( + TCP Proto = 0 +) + +// HeaderLength: +// +// + uint64 srcIP (4 bytes) +// +// + uint16 srcPort (2 bytes) +// +// + uint32 dstIP (4 bytes) +// +// + uint16 dstPort (2 bytes) +// +// + uint16 proto (2 bytes) +// +// + uint32 flags (2 bytes) +const HeaderLength = 4 + 2 + 4 + 2 + 2 + 2 + +// Packet requires uint32le length to be prepended. +// The protocol is highly likely to be changed. +type Packet struct { + // SrcIP is the src IP. Must be [4]byte. + SrcIP net.IP + // SrcPort is the dest port. + SrcPort uint16 + // DstIP is the dest IP. Must be [4]byte. + DstIP net.IP + // DstPort is the dest port. + DstPort uint16 + // Proto must be TCP. + Proto Proto + // Flags, such as FlagCloseRead and FlagCloseWrite + Flags uint16 + // Payload does not contain any L2/L3/L4 headers. + Payload []byte +} + +const ( + FlagCloseRead = 0b01 + FlagCloseWrite = 0b10 +)