move unfinished libp2p backend into separate branch

Signed-off-by: Steffen Vogel <post@steffenvogel.de>
This commit is contained in:
Steffen Vogel
2022-05-08 13:47:42 +02:00
parent f486223781
commit 6de9082c36
11 changed files with 196 additions and 1607 deletions

View File

@@ -19,7 +19,6 @@ community: "some-common-password"
backends:
- grpc://localhost:8080?insecure=true
- k8s:///path/to/your/kubeconfig.yaml?namespace=default
- p2p:?mdns=true&dht=true&private=false&listen-address=/ip4/0.0.0.0/tcp/1234&bootstrap-peer=/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN&private-key=6MNeaexWoGcSKlpvJopeL0G39dqc6zrZUaZ3mbTEl1k=
# Wireguard settings
wg:

View File

@@ -5,7 +5,6 @@
- Encrypt all signaling messages
- Plug-able signaling backends:
- [libp2p](https://libp2p.io/)
- GRPC
- Kubernetes API-server
- WebSocket

View File

@@ -5,7 +5,6 @@ community: "some-common-password"
backends:
- grpc://localhost:8080?insecure=true
- k8s:///path/to/your/kubeconfig.yaml?namespace=default
- p2p:?mdns=true&dht=true&private=false&listen-address=/ip4/0.0.0.0/tcp/1234&bootstrap-peer=/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN&private-key=6MNeaexWoGcSKlpvJopeL0G39dqc6zrZUaZ3mbTEl1k=
# Wireguard settings
wg:

154
go.mod
View File

@@ -9,18 +9,11 @@ require (
github.com/go-logr/zapr v1.2.3
github.com/google/gopacket v1.1.19
github.com/google/nftables v0.0.0-20220502152923-38a96768dbc6
github.com/ipfs/go-log/v2 v2.5.0
github.com/libp2p/go-libp2p v0.18.0
github.com/libp2p/go-libp2p-core v0.14.0
github.com/libp2p/go-libp2p-kad-dht v0.15.0
github.com/libp2p/go-libp2p-noise v0.3.0 // indirect
github.com/libp2p/go-libp2p-tls v0.3.1 // indirect
github.com/multiformats/go-multiaddr v0.5.0
github.com/pion/dtls/v2 v2.1.3
github.com/pion/ice/v2 v2.2.6
github.com/pion/logging v0.2.2
github.com/pion/stun v0.3.5
github.com/pion/transport v0.13.0 // indirect
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.11.0
@@ -28,162 +21,87 @@ require (
github.com/vishvananda/netlink v1.2.0-beta
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6
golang.zx2c4.com/wireguard v0.0.0-20220407013110-ef5c587f782d
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220420130459-88a4932fb60b
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20220504211119-3d4a969bb56b
google.golang.org/grpc v1.46.0
google.golang.org/protobuf v1.28.0
gopkg.in/ini.v1 v1.66.4
k8s.io/api v0.24.0 // indirect
k8s.io/apimachinery v0.24.0
k8s.io/client-go v0.24.0
k8s.io/klog/v2 v2.60.1
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheekybits/genny v1.0.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/huin/goupnp v1.0.2 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-datastore v0.5.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipns v0.1.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipld/go-ipld-prime v0.16.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/ipfs/go-cid v0.2.0 // indirect
github.com/josharian/native v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/libp2p/go-buffer-pool v0.0.2 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-conn-security-multistream v0.3.0 // indirect
github.com/libp2p/go-eventbus v0.2.1 // indirect
github.com/libp2p/go-flow-metrics v0.0.3 // indirect
github.com/libp2p/go-libp2p-asn-util v0.1.0 // indirect
github.com/libp2p/go-libp2p-blankhost v0.3.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.4.7 // indirect
github.com/libp2p/go-libp2p-mplex v0.6.0 // indirect
github.com/libp2p/go-libp2p-nat v0.1.0 // indirect
github.com/libp2p/go-libp2p-peerstore v0.6.0 // indirect
github.com/libp2p/go-libp2p-pnet v0.2.0 // indirect
github.com/libp2p/go-libp2p-quic-transport v0.16.1 // indirect
github.com/libp2p/go-libp2p-record v0.1.3 // indirect
github.com/libp2p/go-libp2p-swarm v0.10.2 // indirect
github.com/libp2p/go-libp2p-transport-upgrader v0.7.1 // indirect
github.com/libp2p/go-libp2p-yamux v0.8.2 // indirect
github.com/libp2p/go-mplex v0.6.0 // indirect
github.com/libp2p/go-msgio v0.1.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
github.com/libp2p/go-reuseport v0.1.0 // indirect
github.com/libp2p/go-reuseport-transport v0.1.0 // indirect
github.com/libp2p/go-stream-muxer-multistream v0.4.0 // indirect
github.com/libp2p/go-tcp-transport v0.5.1 // indirect
github.com/libp2p/go-ws-transport v0.6.0 // indirect
github.com/libp2p/zeroconf/v2 v2.1.1 // indirect
github.com/lucas-clemente/quic-go v0.25.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mdlayher/genetlink v1.2.0 // indirect
github.com/mdlayher/netlink v1.6.0 // indirect
github.com/mdlayher/socket v0.2.3 // indirect
github.com/miekg/dns v1.1.47 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/mdlayher/socket v0.2.3
github.com/miekg/dns v1.1.48 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/mitchellh/mapstructure v1.5.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.0.4 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-multicodec v0.4.1 // indirect
github.com/multiformats/go-multihash v0.1.0
github.com/multiformats/go-multistream v0.2.2 // indirect
github.com/multiformats/go-multihash v0.1.0 // indirect
github.com/multiformats/go-varint v0.0.6
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pion/mdns v0.0.5 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/transport v0.13.0 // indirect
github.com/pion/turn/v2 v2.0.8 // indirect
github.com/pion/udp v0.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
golang.zx2c4.com/wintun v0.0.0-20211104114900-415007cec224 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 // indirect
k8s.io/api v0.24.0 // indirect
k8s.io/kube-openapi v0.0.0-20220413171646-5e7f5fdc6da6 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
kernel.org/pub/linux/libs/security/libcap/cap v1.2.63 // indirect
kernel.org/pub/linux/libs/security/libcap/psx v1.2.63 // indirect
kernel.org/pub/linux/libs/security/libcap/cap v1.2.64
kernel.org/pub/linux/libs/security/libcap/psx v1.2.64 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
@@ -191,30 +109,26 @@ require (
)
require (
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/foxcpp/go-mockdns v1.0.0
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
sigs.k8s.io/controller-runtime v0.11.2
)
require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/containerd/cgroups v1.0.3 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/emicklei/go-restful v2.15.0+incompatible // indirect
github.com/frankban/quicktest v1.14.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/gnostic v0.6.7 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.21.1 // indirect
github.com/google/gnostic v0.6.9 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/libp2p/go-libp2p-resource-manager v0.1.5 // indirect
github.com/libp2p/go-yamux/v3 v3.0.2 // indirect
github.com/mailru/easyjson v0.7.6 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/raulk/clock v1.1.0 // indirect
github.com/raulk/go-watchdog v1.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
k8s.io/apiextensions-apiserver v0.24.0 // indirect
)
replace github.com/vishvananda/netlink => github.com/stv0g/netlink v1.1.1-gont

1063
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -2,7 +2,6 @@ package log
import (
"github.com/go-logr/zapr"
glog "github.com/ipfs/go-log/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/grpclog"
@@ -40,9 +39,6 @@ func SetupLogging(level zapcore.Level, outputPaths []string, errOutputPaths []st
)
klog.SetLogger(zapr.NewLogger(klogger))
// Redirect libp2p / ipfs log to Zap
glog.SetPrimaryCore(logger.Core())
// Redirect gRPC log to Zap
glogger := logger.Named("grpc")
grpclog.SetLoggerV2(NewGRPCLogger(glogger))

View File

@@ -1,389 +0,0 @@
package p2p
import (
"context"
"fmt"
"net"
"sync"
p2p "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns"
ma "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
"google.golang.org/protobuf/reflect/protoreflect"
"riasc.eu/wice/pkg/crypto"
"riasc.eu/wice/pkg/pb"
"riasc.eu/wice/pkg/pb/uvarint"
"riasc.eu/wice/pkg/signaling"
)
const (
userAgent = "wice"
protocolSignaling = "/wice/signaling/0.1.0"
protocolIdentification = "/wice/id/0.1.0"
)
func init() {
signaling.Backends["p2p"] = &signaling.BackendPlugin{
New: NewBackend,
Description: "libp2p",
}
}
type Backend struct {
signaling.SubscriptionsRegistry
peers map[crypto.Key][]peer.ID
peersLock sync.RWMutex
peersCond *sync.Cond
logger *zap.Logger
config BackendConfig
context context.Context
host host.Host
mdns mdns.Service
dht *dht.IpfsDHT
events chan *pb.Event
}
func NewBackend(cfg *signaling.BackendConfig, events chan *pb.Event, logger *zap.Logger) (signaling.Backend, error) {
var err error
b := &Backend{
SubscriptionsRegistry: signaling.NewSubscriptionsRegistry(),
peers: map[crypto.Key][]peer.ID{},
logger: logger,
config: defaultConfig,
events: events,
context: context.Background(),
}
b.peersCond = sync.NewCond(&b.peersLock)
if err := b.config.Parse(cfg); err != nil {
return nil, fmt.Errorf("failed to parse backend options: %w", err)
}
opts := b.options()
b.host, err = p2p.New(opts...)
if err != nil {
return nil, fmt.Errorf("failed to create host: %w", err)
}
b.logger.Info("Host created",
zap.Any("id", b.host.ID()),
zap.Any("addrs", b.StringAddrs()),
)
b.host.SetStreamHandler(protocolIdentification, b.handleIdentificationStream)
b.host.SetStreamHandler(protocolSignaling, b.handleSignalingStream)
if b.config.EnableMDNSDiscovery {
b.logger.Debug("Setup mDNS discovery")
b.mdns = mdns.NewMdnsService(b.host, mdnsServiceName, &mDNSNotifee{b})
if err := b.mdns.Start(); err != nil {
return nil, fmt.Errorf("failed to start mDNS service: %w", err)
}
}
if b.config.EnableDHTDiscovery && b.dht != nil {
b.logger.Debug("Bootstrapping the DHT")
if err = b.dht.Bootstrap(b.context); err != nil {
return nil, fmt.Errorf("failed to bootstrap DHT: %w", err)
}
}
if !b.config.Private {
b.bootstrap()
}
b.events <- &pb.Event{
Type: pb.Event_BACKEND_READY,
Event: &pb.Event_BackendReady{
BackendReady: &pb.BackendReadyEvent{
Type: pb.BackendReadyEvent_P2P,
Id: b.host.ID().String(),
ListenAddresses: b.StringAddrs(),
},
},
}
return b, nil
}
func (b *Backend) StringAddrs() []string {
p2p, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s", b.host.ID()))
if err != nil {
return nil
}
as := []string{}
out:
for _, a := range b.host.Addrs() {
// Skip IPv4 and IPv6 loopback addresses
for _, prot := range []int{ma.P_IP4, ma.P_IP6} {
if val, err := a.ValueForProtocol(prot); err == nil {
if ip := net.ParseIP(val); ip.IsLoopback() {
continue out
}
}
}
a = a.Encapsulate(p2p)
as = append(as, a.String())
}
return as
}
func (b *Backend) Publish(ctx context.Context, kp *crypto.KeyPair, msg *pb.SignalingMessage) error {
if err := b.waitForPeer(ctx, &kp.Theirs); err != nil {
return fmt.Errorf("failed to wait for peer: %w", err)
}
env, err := msg.Encrypt(kp)
if err != nil {
return fmt.Errorf("failed to encrypt message: %w", err)
}
b.peersLock.RLock()
if pids, ok := b.peers[kp.Theirs]; ok {
if err := b.sendMessageToPeers(ctx, pids, env); err != nil {
b.peersLock.RUnlock()
return fmt.Errorf("failed to send: %w", err)
}
}
b.peersLock.RUnlock()
b.logger.Info("Published to message to peers", zap.Any("kp", kp))
return nil
}
func (b *Backend) Subscribe(ctx context.Context, kp *crypto.KeyPair) (chan *pb.SignalingMessage, error) {
go b.watchPeer(&kp.Theirs)
sub, err := b.NewSubscription(kp)
if err != nil {
return nil, err
}
b.logger.Info("Subscribed to messages from peer", zap.Any("kp", kp))
return sub.C, nil
}
func (b *Backend) Close() error {
if b.dht != nil {
if err := b.dht.Close(); err != nil {
return err
}
}
if b.mdns != nil {
if err := b.mdns.Close(); err != nil {
return err
}
}
return b.host.Close()
}
func (b *Backend) handleMDNSPeer(ai peer.AddrInfo) {
if ai.ID == b.host.ID() {
return
}
if err := b.host.Connect(b.context, ai); err != nil {
b.logger.Error("Failed to connect to mDNS peer", zap.Error(err))
return
}
b.logger.Info("Found new peer via mDNS", zap.Any("peer", ai))
b.mDNSPeersLock.Lock()
b.mDNSPeers = append(b.mDNSPeers, ai.ID)
b.mDNSPeersLock.Unlock()
}
func (b *Backend) options() []p2p.Option {
opts := []p2p.Option{
p2p.Defaults,
p2p.UserAgent(userAgent),
p2p.EnableNATService(),
p2p.EnableRelay(),
p2p.EnableRelayService(),
}
if len(b.config.ListenAddresses) > 0 {
opts = append(opts, p2p.ListenAddrs([]ma.Multiaddr(b.config.ListenAddresses)...))
} else {
opts = append(opts, p2p.DefaultListenAddrs)
}
if b.config.EnableNATPortMap {
opts = append(opts, p2p.NATPortMap())
}
if b.config.PrivateKey != nil {
opts = append(opts, p2p.Identity(b.config.PrivateKey))
}
if b.config.PrivateNetwork != nil {
opts = append(opts, p2p.PrivateNetwork(b.config.PrivateNetwork))
}
if b.config.EnableDHTDiscovery {
opts = append(opts, p2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
var err error
b.dht, err = dht.New(b.context, h)
return b.dht, err
}))
}
return opts
}
func (b *Backend) handleSignalingStream(s network.Stream) {
b.logger.Info("Handle new stream", zap.Any("protocol", s.Protocol()))
rd := uvarint.NewDelimitedReader(s, 10<<10)
for {
var env pb.SignalingEnvelope
if err := rd.ReadMsg(&env); err != nil {
b.logger.Warn("Failed to read message", zap.Error(err))
}
kp, err := env.PublicKeyPair()
if err != nil {
b.logger.Error("Failed open envelope", zap.Error(err))
return
}
sub, err := b.GetSubscription(&kp)
if err != nil {
b.logger.Error("Failed to find matching subscription", zap.Error(err))
return
}
if err := sub.NewMessage(&env); err != nil {
b.logger.Error("Failed to handle new message", zap.Error(err))
return
}
if err := s.Close(); err != nil {
b.logger.Error("Failed to close stream", zap.Error(err))
}
}
}
func (b *Backend) handleIdentificationStream(s network.Stream) {
b.GetSubscription()
}
func (b *Backend) sendMessageToPeers(ctx context.Context, pids []peer.ID, msg protoreflect.ProtoMessage) error {
for _, pid := range pids {
s, err := b.host.NewStream(ctx, pid, protocolSignaling)
if err != nil {
return err
}
wr := uvarint.NewDelimitedWriter(s)
if err := wr.WriteMsg(msg); err != nil {
return err
}
}
return nil
}
func (b *Backend) bootstrap() {
// Let's connect to the bootstrap nodes first. They will tell us about the
// other nodes in the network.
var wg sync.WaitGroup
for _, pi := range b.config.BootstrapPeers {
logger := b.logger.With(zap.Any("peer", pi))
logger.Debug("Connecting to peer")
wg.Add(1)
go func(pi peer.AddrInfo) {
defer wg.Done()
if err := b.host.Connect(b.context, pi); err != nil {
logger.Warn("Failed to connect to boostrap node")
} else {
logger.Info("Connection established with bootstrap node")
}
}(pi)
}
wg.Wait() // TODO: can we run this asynchronously?
b.logger.Debug("Bootstrap finished")
}
func (b *Backend) waitForPeer(ctx context.Context, pk *crypto.Key) {
b.peersLock.RLock()
defer b.peersLock.RUnlock()
for {
if pids, ok := b.peers[*pk]; ok && len(pids) > 0 {
break
}
b.peersCond.Wait()
}
}
func (b *Backend) watchPeer(pk *crypto.Key) {
c := publicKeyToCid(pk)
for ai := range b.dht.FindProvidersAsync(b.context, c, 0) {
if ai.ID == b.host.ID() {
continue // found ourself?
}
if ok, err := b.checkPeer(ai); err != nil {
b.logger.Error("Failed to validate peer", zap.Error(err))
} else if ok {
b.addPeer(pk, ai.ID)
}
}
}
func (b *Backend) checkPeer(pk *crypto.Key, ai peer.AddrInfo) (bool, error) {
if err := b.host.Connect(context.TODO(), ai); err != nil {
b.logger.Error("Failed to connect to peeer", zap.Any("addr", ai))
}
s, err := b.host.NewStream(context.TODO(), ai.ID, protocolID)
b.logger.Info("Found peer", zap.Any("addr", ai), zap.Any("peer", pk))
return true, nil
}
func (b *Backend) addPeer(pk *crypto.Key, pid peer.ID) {
b.peersLock.Lock()
defer b.peersLock.Unlock()
if ids, ok := b.peers[*pk]; ok {
ids = append(ids, pid)
} else {
b.peers[*pk] = []peer.ID{pid}
}
b.peersCond.Broadcast()
}

View File

@@ -1,15 +0,0 @@
package p2p_test
import (
"testing"
"riasc.eu/wice/internal/test"
)
func TestMain(m *testing.M) {
test.Main(m)
}
func TestBackendP2P(t *testing.T) {
test.TestBackend(t, "p2p:?private=true&mdns=true", 2)
}

View File

@@ -1,22 +0,0 @@
package p2p
import (
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"riasc.eu/wice/pkg/crypto"
)
// publicKeyToCid generates a Cid from a public X25519 key
// The Cid is generated by hashing the public key with SHA256
// This avoids putting the real public keys into the DHT.
func publicKeyToCid(key *crypto.Key) cid.Cid {
pref := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: multihash.SHA2_256,
}
c, _ := pref.Sum(key[:])
return c
}

View File

@@ -1,138 +0,0 @@
package p2p
import (
"fmt"
"strconv"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/pnet"
dht "github.com/libp2p/go-libp2p-kad-dht"
maddr "github.com/multiformats/go-multiaddr"
"riasc.eu/wice/pkg/signaling"
)
var defaultConfig = BackendConfig{
Private: false,
EnableDHTDiscovery: true,
EnableMDNSDiscovery: true,
ListenAddresses: make(multiAddressList, 0),
BootstrapPeers: make(peerAddressList, 0),
}
type peerAddressList []peer.AddrInfo
type multiAddressList []maddr.Multiaddr
type BackendConfig struct {
signaling.BackendConfig
// Load some options
ListenAddresses multiAddressList
// BootstrapPeers is a list of peers to which we initially connect
BootstrapPeers peerAddressList
// PrivateKey is the private key used by the libp2p host.
PrivateKey crypto.PrivKey
// PrivateNetwork configures libp2p to use the given private network protector.
PrivateNetwork pnet.PSK
// DHTDiscovery enables peer discovery and content routing via the Kadmelia DHT.
EnableDHTDiscovery bool
// MDNSDiscovery enables peer discovery via local mDNS.
EnableMDNSDiscovery bool
// NATPortMap configures libp2p to use the default NATManager. The default NATManager will attempt to open a port in your network's firewall using UPnP.
EnableNATPortMap bool
// Do not connect to public bootstrap peers
Private bool
}
func (al *multiAddressList) Set(as []string) error {
for _, a := range as {
ma, err := maddr.NewMultiaddr(a)
if err != nil {
return err
}
*al = append(*al, ma)
}
return nil
}
func (al *peerAddressList) Set(as []string) error {
for _, a := range as {
pi, err := peer.AddrInfoFromString(a)
if err != nil {
return err
}
*al = append(*al, *pi)
}
return nil
}
func (c *BackendConfig) Parse(cfg *signaling.BackendConfig) error {
var err error
options := cfg.URI.Query()
c.BackendConfig = *cfg
if pkStr := options.Get("private-key"); pkStr != "" {
pk, err := crypto.ConfigDecodeKey(pkStr)
if err != nil {
return fmt.Errorf("failed to parse private key: %w", err)
}
if c.PrivateKey, err = crypto.UnmarshalEd25519PrivateKey(pk); err != nil {
return fmt.Errorf("failed to parse private key: %w", err)
}
}
if bStr := options.Get("mdns"); bStr != "" {
if c.EnableMDNSDiscovery, err = strconv.ParseBool(bStr); err != nil {
return fmt.Errorf("failed to parse mdns option: %w", err)
}
}
if bStr := options.Get("dht"); bStr != "" {
if c.EnableDHTDiscovery, err = strconv.ParseBool(bStr); err != nil {
return fmt.Errorf("failed to parse dht option: %w", err)
}
}
if laStrs, ok := options["listen-address"]; ok {
if err := c.ListenAddresses.Set(laStrs); err != nil {
return fmt.Errorf("failed to parse listen-address option: %w", err)
}
}
if bpStrs, ok := options["bootstrap-peer"]; ok {
if err := c.BootstrapPeers.Set(bpStrs); err != nil {
return fmt.Errorf("failed to parse listen-address option: %w", err)
}
}
if privateStrs, ok := options["private"]; ok {
if c.Private, err = strconv.ParseBool(privateStrs[0]); err != nil {
return fmt.Errorf("failed to parse %s as a boolean value: %w", privateStrs[0], err)
}
}
// use the default set of bootstrap peers if none are provided
if len(c.BootstrapPeers) == 0 {
for _, s := range dht.DefaultBootstrapPeers {
if pi, err := peer.AddrInfoFromP2pAddr(s); err == nil {
c.BootstrapPeers = append(c.BootstrapPeers, *pi)
}
}
}
return nil
}

View File

@@ -1,15 +0,0 @@
package p2p
import "github.com/libp2p/go-libp2p-core/peer"
const (
mdnsServiceName = "wice/0.1"
)
type mDNSNotifee struct {
backend *Backend
}
func (m *mDNSNotifee) HandlePeerFound(ai peer.AddrInfo) {
m.backend.handleMDNSPeer(ai)
}