mirror of
https://github.com/norouter/norouter.git
synced 2025-12-24 13:17:54 +08:00
initial commit
Signed-off-by: Akihiro Suda <akihiro.suda.cz@hco.ntt.co.jp>
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
/bin
|
||||
202
LICENSE
Normal file
202
LICENSE
Normal file
@@ -0,0 +1,202 @@
|
||||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
16
Makefile
Normal file
16
Makefile
Normal file
@@ -0,0 +1,16 @@
|
||||
.DEFAULT_GOAL := binaries
|
||||
|
||||
binaries: bin/norouter
|
||||
|
||||
bin/norouter:
|
||||
CGO_ENABLED=0 go build -o $@ ./cmd/norouter
|
||||
LANG=C LC_ALL=C file $@ | grep -qw "statically linked"
|
||||
|
||||
clean:
|
||||
rm -rf bin
|
||||
|
||||
integration:
|
||||
./integration/test-internal-agent.sh
|
||||
./integration/test-router.sh
|
||||
|
||||
.PHONY: bin/norouter clean integration
|
||||
180
README.md
Normal file
180
README.md
Normal file
@@ -0,0 +1,180 @@
|
||||
# NoRouter: the easiest multi-host & multi-cloud networking ever. No root privilege required.
|
||||
|
||||
NoRouter is the easiest multi-host & multi-cloud networking ever. And yet, NoRouter does not require any privilege such as `sudo` or `docker run --privileged`.
|
||||
|
||||
NoRouter implements unprivileged networking by using multiple loopback addresseses such as 127.0.42.101 and 127.0.42.102.
|
||||
The hosts in the network are connected by forwarding packets over stdio streams like `ssh`, `docker exec`, `podman exec`, `kubectl exec`, and whatever.
|
||||
|
||||

|
||||
|
||||
|
||||
NoRouter is mostly expected to be used in dev environments.
|
||||
|
||||
## Example using `docker exec` and `podman exec`
|
||||
|
||||
This example creates a virtual 127.0.42.0/24 network across a Docker container, a Podman container, and the localhost, using `docker exec` and `podman exec`.
|
||||
|
||||
**Step 0: build `bin/norouter` binary** (on Linux)
|
||||
|
||||
```console
|
||||
make
|
||||
```
|
||||
|
||||
**Step 1: create `host1` (nginx) as a Docker container**
|
||||
```console
|
||||
docker run -d --name host1 nginx:alpine
|
||||
docker cp $(pwd)/bin/norouter host1:/usr/local/bin
|
||||
```
|
||||
|
||||
**Step 2: create `host2` (Apache httpd) as a Podman container**
|
||||
```console
|
||||
podman run -d --name host2 httpd:alpine
|
||||
podman cp $(pwd)/bin/norouter host2:/usr/local/bin
|
||||
```
|
||||
|
||||
**Step 3: create [`example.yaml`](./example.yaml)**
|
||||
|
||||
```yaml
|
||||
hosts:
|
||||
# host0 is the localhost
|
||||
host0:
|
||||
vip: "127.0.42.100"
|
||||
host1:
|
||||
cmd: ["docker", "exec", "-i", "host1", "norouter"]
|
||||
vip: "127.0.42.101"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
host2:
|
||||
cmd: ["podman", "exec", "-i", "host2", "norouter"]
|
||||
vip: "127.0.42.102"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
```
|
||||
|
||||
**Step 4: start the NoRouter "router" process**
|
||||
|
||||
```console
|
||||
./bin/norouter example.yaml
|
||||
```
|
||||
|
||||
**Step 5: connect to `host1` (127.0.42.101, nginx)**
|
||||
|
||||
```console
|
||||
wget -O - http://127.0.42.101:8080
|
||||
docker exec host1 wget -O - http://127.0.42.101:8080
|
||||
podman exec host2 wget -O - http://127.0.42.101:8080
|
||||
```
|
||||
|
||||
Make sure nginx's `index.html` ("Welcome to nginx!") is shown.
|
||||
|
||||
**Step 6: connect to `host2` (127.0.42.102, Apache httpd)**
|
||||
|
||||
```console
|
||||
wget -O - http://127.0.42.102:8080
|
||||
docker exec host1 wget -O - http://127.0.42.102:8080
|
||||
podman exec host2 wget -O - http://127.0.42.102:8080
|
||||
```
|
||||
|
||||
Make sure Apache httpd's `index.html` ("It works!") is shown.
|
||||
|
||||
### How it works under the hood
|
||||
|
||||
The "router" process of NoRouter launches the following commands and transfer the packets using their stdio streams.
|
||||
|
||||
```
|
||||
/proc/self/exe internal agent \
|
||||
--me 127.0.42.100 \
|
||||
--other 127.0.42.101:8080 \
|
||||
--other 127.0.42.102:8080
|
||||
```
|
||||
|
||||
```
|
||||
docker exec -i host1 norouter internal agent \
|
||||
--me 127.0.42.101 \
|
||||
--forward 8080:127.0.0.1:80 \
|
||||
--other 127.0.42.102:8080
|
||||
```
|
||||
|
||||
```
|
||||
podman exec -i host2 norouter internal agent \
|
||||
--me 127.0.42.102 \
|
||||
--other 127.0.42.101:8080 \
|
||||
--forward 8080:127.0.0.1:80
|
||||
```
|
||||
|
||||
`me` is used as a virtual src IP for connecting to `--other <dstIP>:<dstPort>`.
|
||||
|
||||
#### stdio protocol
|
||||
|
||||
The protocol is still subject to change.
|
||||
<!-- can we reuse some existing protocol? -->
|
||||
|
||||
```
|
||||
uint32le Len (includes header fields and Payload but does not include Len itself)
|
||||
[4]byte SrcIP
|
||||
uint16le SrcPort
|
||||
[4]byte DstIP
|
||||
uint16le DstPort
|
||||
uint16le Proto
|
||||
uint16le Flags
|
||||
[]byte Payload (without L2/L3/L4 headers at all)
|
||||
```
|
||||
|
||||
## More examples
|
||||
|
||||
### Kubernetes
|
||||
|
||||
Install `norouter` binary using `kubectl cp`
|
||||
|
||||
e.g.
|
||||
```
|
||||
kubectl run --image=nginx:alpine --restart=Never nginx
|
||||
kubectl cp bin/norouter nginx:/usr/local/bin
|
||||
```
|
||||
|
||||
In the NoRouter yaml, specify `cmd` as `["kubectl", "exec", "-i", "some-kubernetes-pod", "--", "norouter"]`.
|
||||
To connect multiple Kubernetes clusters, pass `--context` arguments to `kubectl`.
|
||||
|
||||
e.g. To connect GKE, AKS, and your laptop:
|
||||
|
||||
```yaml
|
||||
hosts:
|
||||
laptop:
|
||||
vip: "127.0.42.100"
|
||||
nginx-on-gke:
|
||||
cmd: ["kubectl", "--context=gke_myproject-12345_asia-northeast1-c_my-gke", "exec", "-i", "nginx", "--", "norouter"]
|
||||
vip: "127.0.42.101"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
httpd-on-aks:
|
||||
cmd: ["kubectl", "--context=my-aks", "exec", "-i", "httpd", "--", "norouter"]
|
||||
vip: "127.0.42.102"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
```
|
||||
|
||||
### SSH
|
||||
|
||||
Install `norouter` binary using `scp cp ./bin/norouter some-user@some-ssh-host.example.com:/usr/local/bin` .
|
||||
|
||||
In the NoRouter yaml, specify `cmd` as `["ssh", "some-user@some-ssh-host.example.com", "--", "norouter"]`.
|
||||
|
||||
If your key has a passphrase, make sure to configure `ssh-agent` so that NoRouter can login to the host automatically.
|
||||
|
||||
### Azure Container Instances (`az container exec`)
|
||||
|
||||
`az container exec` can't be supported currently because:
|
||||
- No support for stdin without tty: https://github.com/Azure/azure-cli/issues/15225
|
||||
- No support for appending command arguments: https://docs.microsoft.com/en-us/azure/container-instances/container-instances-exec#restrictions
|
||||
- Extra TTY escape sequence on busybox: https://github.com/Azure/azure-cli/issues/6537
|
||||
|
||||
A workaround is to inject an SSH sidecar into an Azure container group, and use `ssh` instead of `az container exec`.
|
||||
|
||||
## TODOs
|
||||
|
||||
- Install `norouter` binary to remote hosts automatically?
|
||||
- Assist generating mTLS certs?
|
||||
- Add DNS fields to `/etc/resolv.conf` when the file is writable? (writable by default in Docker and Kubernetes)
|
||||
- Detect port numbers automatically by watching `/proc/net/tcp`, and propagate the information across the cluster automatically?
|
||||
|
||||
## Similar projects
|
||||
|
||||
- [vdeplug4](https://github.com/rd235/vdeplug4): vdeplug4 can create ad-hoc L2 networks over stdio.
|
||||
vdeplug4 is similar to NoRouter in the sense that it uses stdio, but vdeplug4 requires privileges (at least in userNS) for creating TAP devices.
|
||||
- [telepresence](https://www.telepresence.io/): kube-only and needs privileges
|
||||
14
cmd/norouter/internal.go
Normal file
14
cmd/norouter/internal.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var internalCommand = &cli.Command{
|
||||
Name: "internal",
|
||||
Usage: "Internal commands",
|
||||
Hidden: true,
|
||||
Subcommands: []*cli.Command{
|
||||
internalAgentCommand,
|
||||
},
|
||||
}
|
||||
58
cmd/norouter/internal_agent.go
Normal file
58
cmd/norouter/internal_agent.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/norouter/norouter/pkg/agent"
|
||||
"github.com/norouter/norouter/pkg/agent/config"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var internalAgentCommand = &cli.Command{
|
||||
Name: "agent",
|
||||
Usage: "agent",
|
||||
Action: internalAgentAction,
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "me",
|
||||
Usage: "my virtual IP without port and proto, e.g. \"127.0.42.101\"",
|
||||
Required: true,
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "other",
|
||||
Usage: "other virtual IP, port, and optionally proto, e.g. \"127.0.42.102:8080/tcp\"",
|
||||
},
|
||||
&cli.StringSliceFlag{
|
||||
Name: "forward",
|
||||
Usage: "local forward, e.g. \"8080:127.0.0.1:80/tcp\"",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func internalAgentAction(clicontext *cli.Context) error {
|
||||
me, err := config.ParseMe(clicontext.String("me"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var others []*config.Other
|
||||
for _, s := range clicontext.StringSlice("other") {
|
||||
o, err := config.ParseOther(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
others = append(others, o)
|
||||
}
|
||||
var forwards []*config.Forward
|
||||
for _, s := range clicontext.StringSlice("forward") {
|
||||
f, err := config.ParseForward(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
forwards = append(forwards, f)
|
||||
}
|
||||
a, err := agent.New(me, others, forwards, os.Stdout, os.Stdin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return a.Run()
|
||||
}
|
||||
66
cmd/norouter/main.go
Normal file
66
cmd/norouter/main.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logrus.SetFormatter(newLogrusFormatter())
|
||||
if err := newApp().Run(os.Args); err != nil {
|
||||
logrus.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func newApp() *cli.App {
|
||||
debug := false
|
||||
app := cli.NewApp()
|
||||
app.Name = "norouter"
|
||||
app.Usage = "NoRouter: the easiest multi-host & multi-cloud networking ever. No root privilege required."
|
||||
|
||||
app.Flags = []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "debug",
|
||||
Usage: "debug mode",
|
||||
Destination: &debug,
|
||||
},
|
||||
}
|
||||
app.Before = func(context *cli.Context) error {
|
||||
if debug {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
app.Commands = []*cli.Command{
|
||||
routerCommand,
|
||||
internalCommand,
|
||||
}
|
||||
app.Action = routerAction
|
||||
return app
|
||||
}
|
||||
|
||||
func newLogrusFormatter() logrus.Formatter {
|
||||
hostname, _ := os.Hostname()
|
||||
if hostname == "" {
|
||||
hostname = "<unknown>"
|
||||
}
|
||||
return &logrusFormatter{
|
||||
prefix: hostname + ": ",
|
||||
Formatter: &logrus.TextFormatter{},
|
||||
}
|
||||
}
|
||||
|
||||
type logrusFormatter struct {
|
||||
prefix string
|
||||
logrus.Formatter
|
||||
}
|
||||
|
||||
func (lf *logrusFormatter) Format(e *logrus.Entry) ([]byte, error) {
|
||||
b, err := lf.Formatter.Format(e)
|
||||
if err != nil {
|
||||
return b, err
|
||||
}
|
||||
return append([]byte(lf.prefix), b...), nil
|
||||
}
|
||||
55
cmd/norouter/router.go
Normal file
55
cmd/norouter/router.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/norouter/norouter/pkg/router"
|
||||
"github.com/norouter/norouter/pkg/router/config"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/urfave/cli/v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
var routerCommand = &cli.Command{
|
||||
Name: "router",
|
||||
Aliases: []string{"r"},
|
||||
Usage: "router (default subcommand)",
|
||||
Action: routerAction,
|
||||
}
|
||||
|
||||
func routerAction(clicontext *cli.Context) error {
|
||||
configPath := clicontext.Args().First()
|
||||
if configPath == "" {
|
||||
return errors.New("no config file path was specified")
|
||||
}
|
||||
cfg, err := loadConfig(configPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logrus.Debugf("config: %+v", cfg)
|
||||
ccSet, err := router.NewCmdClientSet(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for vip, client := range ccSet.ByVIP {
|
||||
logrus.Debugf("client for %q: %q", vip, client.String())
|
||||
}
|
||||
r, err := router.New(ccSet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.Run()
|
||||
}
|
||||
|
||||
func loadConfig(configPath string) (*config.Config, error) {
|
||||
configB, err := ioutil.ReadFile(configPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var cfg config.Config
|
||||
if err := yaml.Unmarshal(configB, &cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cfg, nil
|
||||
}
|
||||
BIN
docs/image.png
Normal file
BIN
docs/image.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 62 KiB |
13
example.yaml
Normal file
13
example.yaml
Normal file
@@ -0,0 +1,13 @@
|
||||
# see README.md for the usage
|
||||
hosts:
|
||||
# host0 is the localhost
|
||||
host0:
|
||||
vip: "127.0.42.100"
|
||||
host1:
|
||||
cmd: ["docker", "exec", "-i", "host1", "norouter"]
|
||||
vip: "127.0.42.101"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
host2:
|
||||
cmd: ["podman", "exec", "-i", "host2", "norouter"]
|
||||
vip: "127.0.42.102"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
12
go.mod
Normal file
12
go.mod
Normal file
@@ -0,0 +1,12 @@
|
||||
module github.com/norouter/norouter
|
||||
|
||||
go 1.15
|
||||
|
||||
require (
|
||||
github.com/hashicorp/go-multierror v1.1.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/sirupsen/logrus v1.6.0
|
||||
github.com/urfave/cli/v2 v2.2.0
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
|
||||
gopkg.in/yaml.v2 v2.2.2
|
||||
)
|
||||
33
go.sum
Normal file
33
go.sum
Normal file
@@ -0,0 +1,33 @@
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
|
||||
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
|
||||
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
|
||||
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
|
||||
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4=
|
||||
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
66
integration/test-internal-agent.sh
Executable file
66
integration/test-internal-agent.sh
Executable file
@@ -0,0 +1,66 @@
|
||||
#!/bin/bash
|
||||
# A script for testing `norouter internal agent` without `norouter router`
|
||||
#
|
||||
set -eu -o pipefail
|
||||
|
||||
cd "$(dirname $0)/.."
|
||||
|
||||
pid=""
|
||||
cleanup() {
|
||||
echo "Cleaning up..."
|
||||
set +e
|
||||
if [[ -n "$pid" && -d "/proc/$pid" ]]; then kill $pid; fi
|
||||
docker rm -f host1 host2
|
||||
make clean
|
||||
set -e
|
||||
}
|
||||
cleanup
|
||||
trap "cleanup" EXIT
|
||||
|
||||
make
|
||||
docker run -d --name host1 -v "$(pwd)/bin:/mnt:ro" nginx:1.19.2-alpine
|
||||
docker run -d --name host2 -v "$(pwd)/bin:/mnt:ro" httpd:2.4.46-alpine
|
||||
|
||||
: ${DEBUG=}
|
||||
flags=""
|
||||
if [[ -n "$DEBUG" ]]; then
|
||||
flags="--debug"
|
||||
fi
|
||||
|
||||
dpipe \
|
||||
docker exec -i host1 /mnt/norouter ${flags} internal agent \
|
||||
--me 127.0.42.101 \
|
||||
--forward 8080:127.0.0.1:80 \
|
||||
--other 127.0.42.102:8080 \
|
||||
= \
|
||||
docker exec -i host2 /mnt/norouter ${flags} internal agent \
|
||||
--me 127.0.42.102 \
|
||||
--other 127.0.42.101:8080 \
|
||||
--forward 8080:127.0.0.1:80 &
|
||||
pid=$!
|
||||
|
||||
sleep 2
|
||||
|
||||
: ${N=10}
|
||||
succeeds=0
|
||||
fails=0
|
||||
# Connect to host1 (127.0.42.101, nginx) from host2
|
||||
for ((i = 0; i < $N; i++)); do
|
||||
if docker exec host2 wget -q -O- http://127.0.42.101:8080 | grep -q "Welcome to nginx"; then
|
||||
succeeds=$((succeeds + 1))
|
||||
else
|
||||
fails=$((fails + 1))
|
||||
fi
|
||||
done
|
||||
|
||||
# Connect to host2 (127.0.42.102, Apache httpd) from host1
|
||||
for ((i = 0; i < $N; i++)); do
|
||||
if docker exec host1 wget -q -O- http://127.0.42.102:8080 | grep -q "It works"; then
|
||||
succeeds=$((succeeds + 1))
|
||||
else
|
||||
fails=$((fails + 1))
|
||||
fi
|
||||
done
|
||||
|
||||
echo "tests: $((N * 2)), succceeds: ${succeeds}, fails: ${fails}"
|
||||
exit ${fails}
|
||||
63
integration/test-router.sh
Executable file
63
integration/test-router.sh
Executable file
@@ -0,0 +1,63 @@
|
||||
#!/bin/bash
|
||||
set -eu -o pipefail
|
||||
|
||||
cd "$(dirname $0)/.."
|
||||
|
||||
pid=""
|
||||
cleanup() {
|
||||
echo "Cleaning up..."
|
||||
set +e
|
||||
if [[ -n "$pid" && -d "/proc/$pid" ]]; then kill $pid; fi
|
||||
docker rm -f host1 host2 host3
|
||||
make clean
|
||||
set -e
|
||||
}
|
||||
cleanup
|
||||
trap "cleanup" EXIT
|
||||
|
||||
make
|
||||
docker run -d --name host1 -v "$(pwd)/bin:/mnt:ro" nginx:1.19.2-alpine
|
||||
docker run -d --name host2 -v "$(pwd)/bin:/mnt:ro" httpd:2.4.46-alpine
|
||||
docker run -d --name host3 -v "$(pwd)/bin:/mnt:ro" caddy:2.1.1-alpine
|
||||
|
||||
: ${DEBUG=}
|
||||
flags=""
|
||||
if [[ -n "$DEBUG" ]]; then
|
||||
flags="--debug"
|
||||
fi
|
||||
|
||||
./bin/norouter ${flags} ./integration/test-router.yaml &
|
||||
pid=$!
|
||||
|
||||
sleep 3
|
||||
|
||||
: ${N=3}
|
||||
succeeds=0
|
||||
fails=0
|
||||
|
||||
test_wget() {
|
||||
for ((i = 0; i < $N; i++)); do
|
||||
if wget -q -O- $1 | grep -q "$2"; then
|
||||
succeeds=$((succeeds + 1))
|
||||
else
|
||||
fails=$((fails + 1))
|
||||
fi
|
||||
for ((j = 1; j <= 3; j++)); do
|
||||
if docker exec host${j} wget -q -O- $1 | grep -q "$2"; then
|
||||
succeeds=$((succeeds + 1))
|
||||
else
|
||||
fails=$((fails + 1))
|
||||
fi
|
||||
done
|
||||
done
|
||||
}
|
||||
|
||||
# Connect to host1 (nginx)
|
||||
test_wget http://127.0.42.101:8080 "Welcome to nginx"
|
||||
# Connect to host2 (Apache httpd)
|
||||
test_wget http://127.0.42.102:8080 "It works"
|
||||
# Connect to host3 (Caddy)
|
||||
test_wget http://127.0.42.103:8080 "Caddy"
|
||||
|
||||
echo "tests: $((N * 4 * 3)), succceeds: ${succeeds}, fails: ${fails}"
|
||||
exit ${fails}
|
||||
16
integration/test-router.yaml
Normal file
16
integration/test-router.yaml
Normal file
@@ -0,0 +1,16 @@
|
||||
hosts:
|
||||
# host0 is the localhost
|
||||
host0:
|
||||
vip: "127.0.42.100"
|
||||
host1:
|
||||
cmd: ["docker", "exec", "-i", "host1", "/mnt/norouter"]
|
||||
vip: "127.0.42.101"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
host2:
|
||||
cmd: ["docker", "exec", "-i", "host2", "/mnt/norouter"]
|
||||
vip: "127.0.42.102"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
host3:
|
||||
cmd: ["docker", "exec", "-i", "host3", "/mnt/norouter"]
|
||||
vip: "127.0.42.103"
|
||||
ports: ["8080:127.0.0.1:80"]
|
||||
255
pkg/agent/agent.go
Normal file
255
pkg/agent/agent.go
Normal file
@@ -0,0 +1,255 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/norouter/norouter/pkg/agent/config"
|
||||
"github.com/norouter/norouter/pkg/agent/conn"
|
||||
"github.com/norouter/norouter/pkg/bicopy"
|
||||
"github.com/norouter/norouter/pkg/debugutil"
|
||||
"github.com/norouter/norouter/pkg/stream"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func runForward(me net.IP, f *config.Forward) error {
|
||||
lh := fmt.Sprintf("%s:%d", me.String(), f.ListenPort)
|
||||
l, err := net.Listen(f.Proto, lh)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to listen on %q", lh)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
lconn, err := l.Accept()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to accept")
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
dh := fmt.Sprintf("%s:%d", f.ConnectIP.String(), f.ConnectPort)
|
||||
dconn, err := net.Dial("tcp", dh)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Errorf("failed to dial to %q", dh)
|
||||
return
|
||||
}
|
||||
defer dconn.Close()
|
||||
defer lconn.Close()
|
||||
bicopy.Bicopy(lconn, dconn, nil)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func newRandSource(me net.IP, now time.Time) rand.Source {
|
||||
h := fnv.New64()
|
||||
me4 := me.To4()
|
||||
if me4 == nil {
|
||||
panic(errors.Errorf("unsupported IP address %q", me))
|
||||
|
||||
}
|
||||
binary.Write(h, binary.LittleEndian, me)
|
||||
binary.Write(h, binary.LittleEndian, now.UnixNano)
|
||||
seed := h.Sum64()
|
||||
return rand.NewSource(int64(seed))
|
||||
}
|
||||
|
||||
func New(me net.IP, others []*config.Other, forwards []*config.Forward, w io.Writer, r io.Reader) (*Agent, error) {
|
||||
debugDump := false
|
||||
sender := &stream.Sender{
|
||||
Writer: w,
|
||||
DebugDump: debugDump,
|
||||
}
|
||||
receiver := &stream.Receiver{
|
||||
Reader: r,
|
||||
DebugDump: debugDump,
|
||||
}
|
||||
a := &Agent{
|
||||
me: me,
|
||||
others: others,
|
||||
tcpForwards: make(map[uint16]*config.Forward),
|
||||
sender: sender,
|
||||
receiver: receiver,
|
||||
rand: rand.New(newRandSource(me, time.Now())),
|
||||
pwHM: make(map[uint64]*io.PipeWriter),
|
||||
}
|
||||
for _, f := range forwards {
|
||||
if f.Proto != "tcp" {
|
||||
return nil, errors.Errorf("unexpected proto %q", f.Proto)
|
||||
}
|
||||
a.tcpForwards[f.ListenPort] = f
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
type Agent struct {
|
||||
me net.IP
|
||||
others []*config.Other
|
||||
tcpForwards map[uint16]*config.Forward // key: listenPort
|
||||
sender *stream.Sender
|
||||
receiver *stream.Receiver
|
||||
rand *rand.Rand
|
||||
pwHM map[uint64]*io.PipeWriter
|
||||
pwHMMu sync.RWMutex
|
||||
}
|
||||
|
||||
func (a *Agent) generateVSrcPort() uint16 {
|
||||
var vSrcPort uint16
|
||||
for {
|
||||
vSrcPort = uint16(a.rand.Int())
|
||||
if vSrcPort == 0 {
|
||||
continue
|
||||
}
|
||||
if _, conflict := a.tcpForwards[vSrcPort]; conflict {
|
||||
continue
|
||||
}
|
||||
// FIXME: detect more colisions
|
||||
break
|
||||
}
|
||||
return vSrcPort
|
||||
}
|
||||
|
||||
func (a *Agent) runOther(o *config.Other) error {
|
||||
lh := fmt.Sprintf("%s:%d", o.IP, o.Port)
|
||||
l, err := net.Listen(o.Proto, lh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
lconn, err := l.Accept()
|
||||
if err != nil {
|
||||
logrus.WithError(err).Error("failed to accept")
|
||||
continue
|
||||
}
|
||||
vSrcPort := a.generateVSrcPort()
|
||||
logrus.Debugf("generated vSrcPort=%d", vSrcPort)
|
||||
conn, pw, err := conn.New(a.me, vSrcPort, o.IP, o.Port, a.sender)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Warn("failed to create conn")
|
||||
lconn.Close()
|
||||
return
|
||||
}
|
||||
hdrHash := stream.HashFields(o.IP, o.Port, a.me, vSrcPort, stream.TCP)
|
||||
a.pwHMMu.Lock()
|
||||
logrus.Debugf("registering pw for %s:%d->%s:%d", o.IP, o.Port, a.me, vSrcPort)
|
||||
a.pwHM[hdrHash] = pw
|
||||
a.pwHMMu.Unlock()
|
||||
go func() {
|
||||
defer lconn.Close()
|
||||
defer conn.Close()
|
||||
bicopy.Bicopy(conn, lconn, nil)
|
||||
}()
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Agent) getPW(hdrHash uint64, pkt *stream.Packet) (*io.PipeWriter, bool, error) {
|
||||
a.pwHMMu.RLock()
|
||||
pw, pwOk := a.pwHM[hdrHash]
|
||||
a.pwHMMu.RUnlock()
|
||||
if pwOk {
|
||||
return pw, pwOk, nil
|
||||
}
|
||||
if f, fOk := a.tcpForwards[pkt.DstPort]; fOk {
|
||||
// connect to forward ports, e.g. 8080 (->127.0.0.1:80)
|
||||
dh := fmt.Sprintf("%s:%d", f.ConnectIP.String(), f.ConnectPort)
|
||||
dconn, err := net.Dial(f.Proto, dh)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Warnf("failed to dial %q", dh)
|
||||
return nil, false, err
|
||||
}
|
||||
logrus.Debugf("dialed to %q, creating replyConn", dh)
|
||||
var replyConn *conn.Conn
|
||||
replyConn, pw, err = conn.New(pkt.DstIP, pkt.DstPort, pkt.SrcIP, pkt.SrcPort, a.sender)
|
||||
if err != nil {
|
||||
dconn.Close()
|
||||
return nil, false, errors.Wrap(err, "failed to create replyConn")
|
||||
}
|
||||
a.pwHMMu.Lock()
|
||||
logrus.Debugf("registering pw for %s:%d->%s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort)
|
||||
a.pwHM[hdrHash] = pw
|
||||
a.pwHMMu.Unlock()
|
||||
go func() {
|
||||
defer dconn.Close()
|
||||
defer replyConn.Close()
|
||||
bicopy.Bicopy(dconn, replyConn, nil)
|
||||
}()
|
||||
return pw, true, nil
|
||||
}
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
func (a *Agent) Run() error {
|
||||
for _, f := range a.tcpForwards {
|
||||
if err := runForward(a.me, f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, o := range a.others {
|
||||
if err := a.runOther(o); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for {
|
||||
pkt, err := a.receiver.Recv()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to recv from receiver")
|
||||
}
|
||||
if pkt.Proto != stream.TCP {
|
||||
logrus.Warnf("received unknown proto %d, ignoring", pkt.Proto)
|
||||
continue
|
||||
}
|
||||
if !pkt.DstIP.Equal(a.me) {
|
||||
logrus.Warnf("received dstIP=%s is not me (%s), ignoring", pkt.DstIP.String(), a.me.String())
|
||||
continue
|
||||
}
|
||||
hdrHash := stream.HashFields(pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort, pkt.Proto)
|
||||
pw, pwOk, err := a.getPW(hdrHash, pkt)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Warnf("failed to call getPW (%s:%d->%s:%d)", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort)
|
||||
}
|
||||
if pwOk {
|
||||
logrus.Debugf("Calling pw.Write %s:%d->%s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort)
|
||||
if _, err := pw.Write(pkt.Payload); err != nil {
|
||||
logrus.WithError(err).Warn("pw.Write failed")
|
||||
}
|
||||
} else {
|
||||
logrus.Debugf("NOT calling pw.Write %s:%d->%s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort)
|
||||
}
|
||||
a.gc(pkt, hdrHash, pw)
|
||||
a.debugPrintStat()
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) gc(pkt *stream.Packet, hdrHash uint64, pw *io.PipeWriter) {
|
||||
// FIXME: support half-closing properly
|
||||
if pkt.Flags&stream.FlagCloseRead != 0 || pkt.Flags&stream.FlagCloseWrite != 0 {
|
||||
if pw != nil {
|
||||
if err := pw.Close(); err != nil {
|
||||
logrus.WithError(err).Debugf("failed to close pw")
|
||||
}
|
||||
}
|
||||
a.pwHMMu.Lock()
|
||||
delete(a.pwHM, hdrHash)
|
||||
a.pwHMMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Agent) debugPrintStat() {
|
||||
if logrus.GetLevel() >= logrus.DebugLevel {
|
||||
a.pwHMMu.RLock()
|
||||
l := len(a.pwHM)
|
||||
a.pwHMMu.RUnlock()
|
||||
logrus.Debugf("STAT: len(a.pwHM)=%d,GoRoutines=%d, FDs=%d", l, runtime.NumGoroutine(), debugutil.NumFDs())
|
||||
}
|
||||
}
|
||||
100
pkg/agent/config/config.go
Normal file
100
pkg/agent/config/config.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Other struct {
|
||||
IP net.IP
|
||||
Port uint16
|
||||
Proto string
|
||||
}
|
||||
|
||||
func (o *Other) String() string {
|
||||
return fmt.Sprintf("%s:%d/%s", o.IP.String(), o.Port, o.Proto)
|
||||
}
|
||||
|
||||
type Forward struct {
|
||||
// listenIP is "me"
|
||||
ListenPort uint16
|
||||
ConnectIP net.IP
|
||||
ConnectPort uint16
|
||||
Proto string
|
||||
}
|
||||
|
||||
// parseMe parses --me=127.0.42.101 flag
|
||||
func ParseMe(me string) (net.IP, error) {
|
||||
ip := net.ParseIP(me)
|
||||
if ip == nil {
|
||||
return nil, errors.Errorf("invalid \"me\" IP %q", me)
|
||||
}
|
||||
ip = ip.To4()
|
||||
if ip == nil {
|
||||
return nil, errors.Errorf("invalid \"me\" IP %q, must be IPv4", me)
|
||||
}
|
||||
return ip, nil
|
||||
}
|
||||
|
||||
// ParseOther parses --other=127.0.42.102:8080[/tcp] flag
|
||||
func ParseOther(other string) (*Other, error) {
|
||||
s := strings.TrimSuffix(other, "/tcp")
|
||||
if strings.Contains(s, "/") {
|
||||
// TODO: support "/udp" suffix
|
||||
return nil, errors.Errorf("cannot parse \"other\" address %q", other)
|
||||
}
|
||||
h, p, err := net.SplitHostPort(s)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot parse \"other\" address %q", other)
|
||||
}
|
||||
ip := net.ParseIP(h)
|
||||
if ip == nil {
|
||||
return nil, errors.Errorf("cannot parse \"other\" address %q", other)
|
||||
}
|
||||
port, err := strconv.Atoi(p)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot parse \"other\" address %q", other)
|
||||
}
|
||||
o := &Other{
|
||||
IP: ip,
|
||||
Port: uint16(port),
|
||||
Proto: "tcp",
|
||||
}
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// ParseForward parses --forward=8080:127.0.0.1:80[/tcp] flag
|
||||
func ParseForward(forward string) (*Forward, error) {
|
||||
s := strings.TrimSuffix(forward, "/tcp")
|
||||
if strings.Contains(s, "/") {
|
||||
// TODO: support "/udp" suffix
|
||||
return nil, errors.Errorf("cannot parse \"forward\" address %q", forward)
|
||||
}
|
||||
split := strings.Split(s, ":")
|
||||
if len(split) != 3 {
|
||||
return nil, errors.Errorf("cannot parse \"forward\" address %q", forward)
|
||||
}
|
||||
listenPort, err := strconv.Atoi(split[0])
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot parse \"forward\" address %q", forward)
|
||||
}
|
||||
connectIP := net.ParseIP(split[1])
|
||||
if connectIP == nil {
|
||||
return nil, errors.Errorf("cannot parse \"forward\" address %q", forward)
|
||||
}
|
||||
connectPort, err := strconv.Atoi(split[2])
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot parse \"forward\" address %q", forward)
|
||||
}
|
||||
f := &Forward{
|
||||
ListenPort: uint16(listenPort),
|
||||
ConnectIP: connectIP,
|
||||
ConnectPort: uint16(connectPort),
|
||||
Proto: "tcp",
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
130
pkg/agent/conn/conn.go
Normal file
130
pkg/agent/conn/conn.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package conn
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/norouter/norouter/pkg/stream"
|
||||
)
|
||||
|
||||
func New(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16, sender *stream.Sender) (*Conn, *io.PipeWriter, error) {
|
||||
pr, pw := io.Pipe()
|
||||
c := &Conn{
|
||||
SrcIP: srcIP,
|
||||
SrcPort: srcPort,
|
||||
DstIP: dstIP,
|
||||
DstPort: dstPort,
|
||||
sender: sender,
|
||||
pr: pr,
|
||||
}
|
||||
return c, pw, nil
|
||||
}
|
||||
|
||||
type Conn struct {
|
||||
SrcIP net.IP
|
||||
SrcPort uint16
|
||||
DstIP net.IP
|
||||
DstPort uint16
|
||||
|
||||
sender *stream.Sender
|
||||
pr *io.PipeReader // pkt.Payload, passed from receiver
|
||||
readClosed bool
|
||||
writeClosed bool
|
||||
}
|
||||
|
||||
func (c *Conn) Write(p []byte) (int, error) {
|
||||
pkt := &stream.Packet{
|
||||
SrcIP: c.SrcIP,
|
||||
SrcPort: c.SrcPort,
|
||||
DstIP: c.DstIP,
|
||||
DstPort: c.DstPort,
|
||||
Proto: stream.TCP,
|
||||
Flags: 0,
|
||||
Payload: p,
|
||||
}
|
||||
if err := c.sender.Send(pkt); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (c *Conn) Read(p []byte) (int, error) {
|
||||
return c.pr.Read(p)
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
if c.readClosed && c.writeClosed {
|
||||
return nil
|
||||
}
|
||||
if c.readClosed {
|
||||
return c.CloseWrite()
|
||||
}
|
||||
if c.writeClosed {
|
||||
return c.CloseRead()
|
||||
}
|
||||
var merr *multierror.Error
|
||||
pkt := &stream.Packet{
|
||||
SrcIP: c.SrcIP,
|
||||
SrcPort: c.SrcPort,
|
||||
DstIP: c.DstIP,
|
||||
DstPort: c.DstPort,
|
||||
Proto: stream.TCP,
|
||||
Flags: stream.FlagCloseRead | stream.FlagCloseWrite,
|
||||
Payload: nil,
|
||||
}
|
||||
if err := c.sender.Send(pkt); err != nil {
|
||||
merr = multierror.Append(merr, err)
|
||||
}
|
||||
if err := c.pr.Close(); err != nil {
|
||||
merr = multierror.Append(merr, err)
|
||||
}
|
||||
c.readClosed = true
|
||||
c.writeClosed = true
|
||||
return merr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (c *Conn) CloseRead() error {
|
||||
if c.readClosed {
|
||||
return nil
|
||||
}
|
||||
var merr *multierror.Error
|
||||
pkt := &stream.Packet{
|
||||
SrcIP: c.SrcIP,
|
||||
SrcPort: c.SrcPort,
|
||||
DstIP: c.DstIP,
|
||||
DstPort: c.DstPort,
|
||||
Proto: stream.TCP,
|
||||
Flags: stream.FlagCloseRead,
|
||||
Payload: nil,
|
||||
}
|
||||
if err := c.sender.Send(pkt); err != nil {
|
||||
merr = multierror.Append(merr, err)
|
||||
}
|
||||
if err := c.pr.Close(); err != nil {
|
||||
merr = multierror.Append(merr, err)
|
||||
}
|
||||
c.readClosed = true
|
||||
return merr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (c *Conn) CloseWrite() error {
|
||||
if c.writeClosed {
|
||||
return nil
|
||||
}
|
||||
var merr *multierror.Error
|
||||
pkt := &stream.Packet{
|
||||
SrcIP: c.SrcIP,
|
||||
SrcPort: c.SrcPort,
|
||||
DstIP: c.DstIP,
|
||||
DstPort: c.DstPort,
|
||||
Proto: stream.TCP,
|
||||
Flags: stream.FlagCloseWrite,
|
||||
Payload: nil,
|
||||
}
|
||||
if err := c.sender.Send(pkt); err != nil {
|
||||
merr = multierror.Append(merr, err)
|
||||
}
|
||||
c.writeClosed = true
|
||||
return merr.ErrorOrNil()
|
||||
}
|
||||
62
pkg/bicopy/bicopy.go
Normal file
62
pkg/bicopy/bicopy.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package bicopy
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Bicopy is from https://github.com/rootless-containers/rootlesskit/blob/v0.10.1/pkg/port/builtin/parent/tcp/tcp.go#L73-L104
|
||||
// (originally from libnetwork, Apache License 2.0)
|
||||
func Bicopy(x, y io.ReadWriter, quit <-chan struct{}) {
|
||||
type closeReader interface {
|
||||
CloseRead() error
|
||||
}
|
||||
type closeWriter interface {
|
||||
CloseWrite() error
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
var broker = func(to, from io.ReadWriter) {
|
||||
if _, err := io.Copy(to, from); err != nil {
|
||||
logrus.WithError(err).Debug("failed to call io.Copy")
|
||||
}
|
||||
if fromCR, ok := from.(closeReader); ok {
|
||||
if err := fromCR.CloseRead(); err != nil {
|
||||
logrus.WithError(err).Debug("failed to call CloseRead")
|
||||
}
|
||||
}
|
||||
if toCW, ok := to.(closeWriter); ok {
|
||||
if err := toCW.CloseWrite(); err != nil {
|
||||
logrus.WithError(err).Debug("failed to call CloseWrite")
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
wg.Add(2)
|
||||
go broker(x, y)
|
||||
go broker(y, x)
|
||||
finish := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(finish)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-quit:
|
||||
case <-finish:
|
||||
}
|
||||
if xCloser, ok := x.(io.Closer); ok {
|
||||
if err := xCloser.Close(); err != nil {
|
||||
logrus.WithError(err).Debug("failed to call xCloser.Close")
|
||||
}
|
||||
}
|
||||
if yCloser, ok := y.(io.Closer); ok {
|
||||
if err := yCloser.Close(); err != nil {
|
||||
logrus.WithError(err).Debug("failed to call yCloser.Close")
|
||||
}
|
||||
}
|
||||
<-finish
|
||||
// TODO: return copied bytes
|
||||
}
|
||||
18
pkg/debugutil/debugutil.go
Normal file
18
pkg/debugutil/debugutil.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package debugutil
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
func NumFDs() int {
|
||||
if runtime.GOOS != "linux" {
|
||||
// unimplemented
|
||||
return -1
|
||||
}
|
||||
ents, err := ioutil.ReadDir("/proc/self/fd")
|
||||
if err != nil {
|
||||
return -1
|
||||
}
|
||||
return len(ents)
|
||||
}
|
||||
84
pkg/router/cmdclient.go
Normal file
84
pkg/router/cmdclient.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
|
||||
agentconfig "github.com/norouter/norouter/pkg/agent/config"
|
||||
"github.com/norouter/norouter/pkg/router/config"
|
||||
)
|
||||
|
||||
type CmdClientSet struct {
|
||||
ByVIP map[string]*CmdClient
|
||||
}
|
||||
|
||||
func NewCmdClientSet(cfg *config.Config) (*CmdClientSet, error) {
|
||||
var publicHostPorts []string
|
||||
for _, h := range cfg.Hosts {
|
||||
for _, p := range h.Ports {
|
||||
f, err := agentconfig.ParseForward(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
publicHostPorts = append(publicHostPorts, fmt.Sprintf("%s:%d/%s", h.VIP, f.ListenPort, f.Proto))
|
||||
}
|
||||
}
|
||||
ccSet := &CmdClientSet{
|
||||
ByVIP: make(map[string]*CmdClient),
|
||||
}
|
||||
for hostname, h := range cfg.Hosts {
|
||||
client, err := NewCmdClient(hostname, h, publicHostPorts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ccSet.ByVIP[h.VIP] = client
|
||||
}
|
||||
return ccSet, nil
|
||||
}
|
||||
|
||||
// NewCmdClient.
|
||||
func NewCmdClient(hostname string, h config.Host, publicHostPorts []string) (*CmdClient, error) {
|
||||
var cmd *exec.Cmd
|
||||
if len(h.Cmd) != 0 {
|
||||
// e.g. ["docker", "exec", "-i", "host1", "--", "norouter"]
|
||||
cmd = exec.Command(h.Cmd[0], h.Cmd[1:]...)
|
||||
} else {
|
||||
if runtime.GOOS == "linux" {
|
||||
cmd = exec.Command("/proc/self/exe")
|
||||
} else {
|
||||
cmd = exec.Command(os.Args[0])
|
||||
}
|
||||
}
|
||||
cmd.Args = append(cmd.Args, "internal", "agent", "--me", h.VIP)
|
||||
for _, port := range h.Ports {
|
||||
cmd.Args = append(cmd.Args, "--forward", port)
|
||||
}
|
||||
for _, pub := range publicHostPorts {
|
||||
o, err := agentconfig.ParseOther(pub)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if o.IP.String() == h.VIP {
|
||||
continue
|
||||
}
|
||||
cmd.Args = append(cmd.Args, "--other", pub)
|
||||
}
|
||||
c := &CmdClient{
|
||||
Hostname: hostname,
|
||||
VIP: h.VIP,
|
||||
cmd: cmd,
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
type CmdClient struct {
|
||||
Hostname string
|
||||
VIP string
|
||||
cmd *exec.Cmd
|
||||
}
|
||||
|
||||
func (c *CmdClient) String() string {
|
||||
return fmt.Sprintf("<%s (%s)> %s", c.Hostname, c.VIP, c.cmd.String())
|
||||
}
|
||||
11
pkg/router/config/config.go
Normal file
11
pkg/router/config/config.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package config
|
||||
|
||||
type Config struct {
|
||||
Hosts map[string]Host `yaml:"hosts"`
|
||||
}
|
||||
|
||||
type Host struct {
|
||||
Cmd []string `yaml:"cmd"` // e.g. ["docker", "exec", "-i", "host1", "norouter"]
|
||||
VIP string `yaml:"vip"` // e.g. "127.0.42.101"
|
||||
Ports []string `yaml:"ports"` // e.g. ["8080:127.0.0.1:80"], or ["8080:127.0.0.1:80/tcp"]
|
||||
}
|
||||
103
pkg/router/router.go
Normal file
103
pkg/router/router.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/norouter/norouter/pkg/stream"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func New(ccSet *CmdClientSet) (*Router, error) {
|
||||
r := &Router{
|
||||
ccSet: ccSet,
|
||||
senders: make(map[string]*stream.Sender),
|
||||
receivers: make(map[string]*stream.Receiver),
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
type Router struct {
|
||||
ccSet *CmdClientSet
|
||||
senders map[string]*stream.Sender // key: vip (TODO: don't use string)
|
||||
receivers map[string]*stream.Receiver
|
||||
}
|
||||
|
||||
func (r *Router) Run() error {
|
||||
debugDump := false
|
||||
// Step 1: fill up senders
|
||||
for vip, cc := range r.ccSet.ByVIP {
|
||||
cc.cmd.Stderr = &stderrWriter{
|
||||
vip: cc.VIP,
|
||||
hostname: cc.Hostname,
|
||||
}
|
||||
stdin, err := cc.cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sender := &stream.Sender{
|
||||
Writer: stdin,
|
||||
DebugDump: debugDump,
|
||||
}
|
||||
r.senders[vip] = sender
|
||||
stdout, err := cc.cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
receiver := &stream.Receiver{
|
||||
Reader: stdout,
|
||||
DebugDump: debugDump,
|
||||
}
|
||||
r.receivers[vip] = receiver
|
||||
logrus.Infof("starting client for %s(%s): %q", cc.Hostname, vip, cc.cmd.String())
|
||||
if err := cc.cmd.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
// TODO: notify if a client exits
|
||||
defer func() {
|
||||
logrus.Warnf("exiting client: %q", cc.String())
|
||||
if err := cc.cmd.Process.Signal(os.Interrupt); err != nil {
|
||||
logrus.WithError(err).Errorf("error while sending os.Interrupt to %s(%s)", cc.Hostname, vip)
|
||||
cc.cmd.Process.Kill()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var eg errgroup.Group
|
||||
// Step 2: start goroutines after filling up all r.senders
|
||||
for vipx, receiverx := range r.receivers {
|
||||
vip := vipx
|
||||
receiver := receiverx
|
||||
eg.Go(func() error {
|
||||
for {
|
||||
pkt, err := receiver.Recv()
|
||||
if err != nil {
|
||||
return errors.Errorf("failed to receive from %s", vip)
|
||||
}
|
||||
dstIPStr := pkt.DstIP.String()
|
||||
sender, ok := r.senders[dstIPStr]
|
||||
if !ok {
|
||||
logrus.WithError(err).Warnf("unexpected dstIP %s in a packet from %s", dstIPStr, vip)
|
||||
continue
|
||||
}
|
||||
logrus.Debugf("routing packet from %s:%d to %s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort)
|
||||
if err := sender.Send(pkt); err != nil {
|
||||
logrus.WithError(err).Warnf("routing packet from %s:%d to %s:%d", pkt.SrcIP, pkt.SrcPort, pkt.DstIP, pkt.DstPort)
|
||||
continue
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
type stderrWriter struct {
|
||||
hostname string
|
||||
vip string
|
||||
}
|
||||
|
||||
func (w *stderrWriter) Write(p []byte) (int, error) {
|
||||
logrus.Warnf("stderr[%s(%s)]: %s", w.hostname, w.vip, string(p))
|
||||
return len(p), nil
|
||||
}
|
||||
35
pkg/stream/hash.go
Normal file
35
pkg/stream/hash.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"hash/fnv"
|
||||
"net"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func HashFields(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16, proto Proto) uint64 {
|
||||
h := fnv.New64()
|
||||
if len(srcIP) != 0 {
|
||||
srcIP4 := srcIP.To4()
|
||||
if srcIP4 == nil {
|
||||
panic(errors.Errorf("unsupported IP address %q", srcIP))
|
||||
}
|
||||
binary.Write(h, binary.LittleEndian, srcIP4)
|
||||
} else {
|
||||
binary.Write(h, binary.LittleEndian, net.ParseIP("0.0.0.0"))
|
||||
}
|
||||
binary.Write(h, binary.LittleEndian, srcPort)
|
||||
if len(dstIP) != 0 {
|
||||
dstIP4 := dstIP.To4()
|
||||
if dstIP4 == nil {
|
||||
panic(errors.Errorf("unsupported IP address %q", dstIP))
|
||||
}
|
||||
binary.Write(h, binary.LittleEndian, dstIP4)
|
||||
} else {
|
||||
binary.Write(h, binary.LittleEndian, net.ParseIP("0.0.0.0"))
|
||||
}
|
||||
binary.Write(h, binary.LittleEndian, dstPort)
|
||||
binary.Write(h, binary.LittleEndian, proto)
|
||||
return h.Sum64()
|
||||
}
|
||||
75
pkg/stream/receiver.go
Normal file
75
pkg/stream/receiver.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Receiver
|
||||
type Receiver struct {
|
||||
io.Reader
|
||||
sync.Mutex
|
||||
DebugDump bool
|
||||
}
|
||||
|
||||
func (receiver *Receiver) Recv() (*Packet, error) {
|
||||
var length uint32 // HeaderLength + len(p)
|
||||
receiver.Lock()
|
||||
if err := binary.Read(receiver.Reader, binary.LittleEndian, &length); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b := make([]byte, length)
|
||||
if err := binary.Read(receiver.Reader, binary.LittleEndian, &b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
receiver.Unlock()
|
||||
var (
|
||||
srcIP4 [4]byte
|
||||
srcPort uint16
|
||||
dstIP4 [4]byte
|
||||
dstPort uint16
|
||||
proto uint16
|
||||
flags uint16
|
||||
)
|
||||
br := bytes.NewReader(b)
|
||||
if err := binary.Read(br, binary.LittleEndian, &srcIP4); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := binary.Read(br, binary.LittleEndian, &srcPort); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := binary.Read(br, binary.LittleEndian, &dstIP4); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := binary.Read(br, binary.LittleEndian, &dstPort); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := binary.Read(br, binary.LittleEndian, &proto); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := binary.Read(br, binary.LittleEndian, &flags); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pkt := &Packet{
|
||||
SrcIP: net.IP(srcIP4[:]),
|
||||
SrcPort: srcPort,
|
||||
DstIP: net.IP(dstIP4[:]),
|
||||
DstPort: dstPort,
|
||||
Proto: Proto(proto),
|
||||
Flags: flags,
|
||||
Payload: b[HeaderLength:],
|
||||
}
|
||||
|
||||
if receiver.DebugDump && logrus.GetLevel() >= logrus.DebugLevel {
|
||||
logrus.Debugf("receiver: Received %s:%d->%s:%d (%v) 0b%b: %q",
|
||||
pkt.SrcIP.String(), pkt.SrcPort,
|
||||
pkt.DstIP.String(), pkt.DstPort,
|
||||
pkt.Proto, pkt.Flags, string(pkt.Payload))
|
||||
}
|
||||
return pkt, nil
|
||||
}
|
||||
66
pkg/stream/sender.go
Normal file
66
pkg/stream/sender.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Sender
|
||||
type Sender struct {
|
||||
io.Writer
|
||||
sync.Mutex
|
||||
DebugDump bool
|
||||
}
|
||||
|
||||
func (sender *Sender) Send(p *Packet) error {
|
||||
if sender.DebugDump && logrus.GetLevel() >= logrus.DebugLevel {
|
||||
logrus.Debugf("sender: Sending %s:%d %s:%d (%v) 0b%b: %q",
|
||||
p.SrcIP.String(), p.SrcPort,
|
||||
p.DstIP.String(), p.DstPort,
|
||||
p.Proto, p.Flags, string(p.Payload))
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
if err := binary.Write(&buf, binary.LittleEndian, uint32(HeaderLength+len(p.Payload))); err != nil {
|
||||
return err
|
||||
}
|
||||
sip := p.SrcIP.To4()
|
||||
if sip == nil {
|
||||
return errors.Errorf("unexpected ip %+v", sip)
|
||||
}
|
||||
if err := binary.Write(&buf, binary.LittleEndian, []byte(sip)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&buf, binary.LittleEndian, p.SrcPort); err != nil {
|
||||
return err
|
||||
}
|
||||
dip := p.DstIP.To4()
|
||||
if dip == nil {
|
||||
return errors.Errorf("unexpected ip %+v", dip)
|
||||
}
|
||||
if err := binary.Write(&buf, binary.LittleEndian, []byte(dip)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&buf, binary.LittleEndian, p.DstPort); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&buf, binary.LittleEndian, uint16(p.Proto)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := binary.Write(&buf, binary.LittleEndian, p.Flags); err != nil {
|
||||
return err
|
||||
}
|
||||
if p.Payload != nil {
|
||||
if err := binary.Write(&buf, binary.LittleEndian, p.Payload); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sender.Lock()
|
||||
_, err := io.Copy(sender.Writer, &buf)
|
||||
sender.Unlock()
|
||||
return err
|
||||
}
|
||||
50
pkg/stream/stream.go
Normal file
50
pkg/stream/stream.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package stream
|
||||
|
||||
import (
|
||||
"net"
|
||||
)
|
||||
|
||||
type Proto = uint16
|
||||
|
||||
const (
|
||||
TCP Proto = 0
|
||||
)
|
||||
|
||||
// HeaderLength:
|
||||
//
|
||||
// + uint64 srcIP (4 bytes)
|
||||
//
|
||||
// + uint16 srcPort (2 bytes)
|
||||
//
|
||||
// + uint32 dstIP (4 bytes)
|
||||
//
|
||||
// + uint16 dstPort (2 bytes)
|
||||
//
|
||||
// + uint16 proto (2 bytes)
|
||||
//
|
||||
// + uint32 flags (2 bytes)
|
||||
const HeaderLength = 4 + 2 + 4 + 2 + 2 + 2
|
||||
|
||||
// Packet requires uint32le length to be prepended.
|
||||
// The protocol is highly likely to be changed.
|
||||
type Packet struct {
|
||||
// SrcIP is the src IP. Must be [4]byte.
|
||||
SrcIP net.IP
|
||||
// SrcPort is the dest port.
|
||||
SrcPort uint16
|
||||
// DstIP is the dest IP. Must be [4]byte.
|
||||
DstIP net.IP
|
||||
// DstPort is the dest port.
|
||||
DstPort uint16
|
||||
// Proto must be TCP.
|
||||
Proto Proto
|
||||
// Flags, such as FlagCloseRead and FlagCloseWrite
|
||||
Flags uint16
|
||||
// Payload does not contain any L2/L3/L4 headers.
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
const (
|
||||
FlagCloseRead = 0b01
|
||||
FlagCloseWrite = 0b10
|
||||
)
|
||||
Reference in New Issue
Block a user