mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-09-26 20:21:26 +08:00
swarm: emit PeerConnectedness event from swarm instead of from hosts (#1574)
* pass an event bus to the swarm constructor * make the eventbus parameter a required swarm constructor parameter * emit Connectedness notifications from the swarm * remove peer connectedness watchers from hosts * swarm: emit connectedness events when holding the mutex
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/connmgr"
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/metrics"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
@@ -23,6 +24,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
|
||||
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
|
||||
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||
routed "github.com/libp2p/go-libp2p/p2p/host/routed"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
@@ -123,7 +125,7 @@ type Config struct {
|
||||
PrometheusRegisterer prometheus.Registerer
|
||||
}
|
||||
|
||||
func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
|
||||
func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
|
||||
if cfg.Peerstore == nil {
|
||||
return nil, fmt.Errorf("no peerstore specified")
|
||||
}
|
||||
@@ -176,7 +178,7 @@ func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
|
||||
swarm.WithMetricsTracer(swarm.NewMetricsTracer(swarm.WithRegisterer(cfg.PrometheusRegisterer))))
|
||||
}
|
||||
// TODO: Make the swarm implementation configurable.
|
||||
return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
|
||||
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
|
||||
}
|
||||
|
||||
func (cfg *Config) addTransports(h host.Host) error {
|
||||
@@ -284,12 +286,14 @@ func (cfg *Config) addTransports(h host.Host) error {
|
||||
//
|
||||
// This function consumes the config. Do not reuse it (really!).
|
||||
func (cfg *Config) NewNode() (host.Host, error) {
|
||||
swrm, err := cfg.makeSwarm(!cfg.DisableMetrics)
|
||||
eventBus := eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
|
||||
swrm, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h, err := bhost.NewHost(swrm, &bhost.HostOpts{
|
||||
EventBus: eventBus,
|
||||
ConnManager: cfg.ConnManager,
|
||||
AddrsFactory: cfg.AddrsFactory,
|
||||
NATManager: cfg.NATManager,
|
||||
@@ -397,7 +401,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
|
||||
Peerstore: ps,
|
||||
}
|
||||
|
||||
dialer, err := autoNatCfg.makeSwarm(false)
|
||||
dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false)
|
||||
if err != nil {
|
||||
h.Close()
|
||||
return nil, err
|
||||
|
@@ -32,7 +32,6 @@ import (
|
||||
"github.com/libp2p/go-netroute"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
madns "github.com/multiformats/go-multiaddr-dns"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
@@ -108,6 +107,9 @@ var _ host.Host = (*BasicHost)(nil)
|
||||
// HostOpts holds options that can be passed to NewHost in order to
|
||||
// customize construction of the *BasicHost.
|
||||
type HostOpts struct {
|
||||
// EventBus sets the event bus. Will construct a new event bus if omitted.
|
||||
EventBus event.Bus
|
||||
|
||||
// MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted.
|
||||
MultistreamMuxer *msmux.MultistreamMuxer[protocol.ID]
|
||||
|
||||
@@ -164,16 +166,11 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
|
||||
if opts == nil {
|
||||
opts = &HostOpts{}
|
||||
}
|
||||
|
||||
var eventBus event.Bus
|
||||
if opts.EnableMetrics {
|
||||
eventBus = eventbus.NewBus(
|
||||
eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(opts.PrometheusRegisterer))))
|
||||
} else {
|
||||
eventBus = eventbus.NewBus()
|
||||
if opts.EventBus == nil {
|
||||
opts.EventBus = eventbus.NewBus()
|
||||
}
|
||||
|
||||
psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), eventBus)
|
||||
psManager, err := pstoremanager.NewPeerstoreManager(n.Peerstore(), opts.EventBus)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -186,7 +183,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
|
||||
negtimeout: DefaultNegotiationTimeout,
|
||||
AddrsFactory: DefaultAddrsFactory,
|
||||
maResolver: madns.DefaultResolver,
|
||||
eventbus: eventBus,
|
||||
eventbus: opts.EventBus,
|
||||
addrChangeChan: make(chan struct{}, 1),
|
||||
ctx: hostCtx,
|
||||
ctxCancel: cancel,
|
||||
@@ -201,11 +198,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
|
||||
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evtPeerConnectednessChanged, err := h.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h.Network().Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))
|
||||
|
||||
if !h.disableSignedPeerRecord {
|
||||
cab, ok := peerstore.GetCertifiedAddrBook(n.Peerstore())
|
||||
|
@@ -1,71 +0,0 @@
|
||||
package basichost
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
type peerConnectWatcher struct {
|
||||
emitter event.Emitter
|
||||
|
||||
mutex sync.Mutex
|
||||
connected map[peer.ID]struct{}
|
||||
}
|
||||
|
||||
var _ network.Notifiee = &peerConnectWatcher{}
|
||||
|
||||
func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher {
|
||||
return &peerConnectWatcher{
|
||||
emitter: emitter,
|
||||
connected: make(map[peer.ID]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
|
||||
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}
|
||||
|
||||
func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) {
|
||||
p := conn.RemotePeer()
|
||||
w.handleTransition(p, n.Connectedness(p))
|
||||
}
|
||||
|
||||
func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) {
|
||||
p := conn.RemotePeer()
|
||||
w.handleTransition(p, n.Connectedness(p))
|
||||
}
|
||||
|
||||
func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) {
|
||||
if changed := w.checkTransition(p, state); !changed {
|
||||
return
|
||||
}
|
||||
w.emitter.Emit(event.EvtPeerConnectednessChanged{
|
||||
Peer: p,
|
||||
Connectedness: state,
|
||||
})
|
||||
}
|
||||
|
||||
func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool {
|
||||
w.mutex.Lock()
|
||||
defer w.mutex.Unlock()
|
||||
switch state {
|
||||
case network.Connected:
|
||||
if _, ok := w.connected[p]; ok {
|
||||
return false
|
||||
}
|
||||
w.connected[p] = struct{}{}
|
||||
return true
|
||||
case network.NotConnected:
|
||||
if _, ok := w.connected[p]; ok {
|
||||
delete(w.connected, p)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
@@ -1,48 +0,0 @@
|
||||
package basichost
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPeerConnectedness(t *testing.T) {
|
||||
h1, err := NewHost(swarmt.GenSwarm(t), nil)
|
||||
require.NoError(t, err)
|
||||
defer h1.Close()
|
||||
h2, err := NewHost(swarmt.GenSwarm(t), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
|
||||
require.NoError(t, err)
|
||||
defer sub1.Close()
|
||||
sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
|
||||
require.NoError(t, err)
|
||||
defer sub2.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
|
||||
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||
Peer: h2.ID(),
|
||||
Connectedness: network.Connected,
|
||||
})
|
||||
require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||
Peer: h1.ID(),
|
||||
Connectedness: network.Connected,
|
||||
})
|
||||
|
||||
// now close h2. This will disconnect it from h1.
|
||||
require.NoError(t, h2.Close())
|
||||
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||
Peer: h2.ID(),
|
||||
Connectedness: network.NotConnected,
|
||||
})
|
||||
}
|
@@ -78,11 +78,6 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
|
||||
if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
|
||||
return nil
|
||||
}
|
||||
evtPeerConnectednessChanged, err := bh.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
n.Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))
|
||||
|
||||
n.SetStreamHandler(bh.newStreamHandler)
|
||||
|
||||
|
@@ -1,71 +0,0 @@
|
||||
package blankhost
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
type peerConnectWatcher struct {
|
||||
emitter event.Emitter
|
||||
|
||||
mutex sync.Mutex
|
||||
connected map[peer.ID]struct{}
|
||||
}
|
||||
|
||||
var _ network.Notifiee = &peerConnectWatcher{}
|
||||
|
||||
func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher {
|
||||
return &peerConnectWatcher{
|
||||
emitter: emitter,
|
||||
connected: make(map[peer.ID]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
|
||||
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}
|
||||
|
||||
func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) {
|
||||
p := conn.RemotePeer()
|
||||
w.handleTransition(p, n.Connectedness(p))
|
||||
}
|
||||
|
||||
func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) {
|
||||
p := conn.RemotePeer()
|
||||
w.handleTransition(p, n.Connectedness(p))
|
||||
}
|
||||
|
||||
func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) {
|
||||
if changed := w.checkTransition(p, state); !changed {
|
||||
return
|
||||
}
|
||||
w.emitter.Emit(event.EvtPeerConnectednessChanged{
|
||||
Peer: p,
|
||||
Connectedness: state,
|
||||
})
|
||||
}
|
||||
|
||||
func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool {
|
||||
w.mutex.Lock()
|
||||
defer w.mutex.Unlock()
|
||||
switch state {
|
||||
case network.Connected:
|
||||
if _, ok := w.connected[p]; ok {
|
||||
return false
|
||||
}
|
||||
w.connected[p] = struct{}{}
|
||||
return true
|
||||
case network.NotConnected:
|
||||
if _, ok := w.connected[p]; ok {
|
||||
delete(w.connected, p)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
@@ -1,46 +0,0 @@
|
||||
package blankhost
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPeerConnectedness(t *testing.T) {
|
||||
h1 := NewBlankHost(swarmt.GenSwarm(t))
|
||||
defer h1.Close()
|
||||
h2 := NewBlankHost(swarmt.GenSwarm(t))
|
||||
|
||||
sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
|
||||
require.NoError(t, err)
|
||||
defer sub1.Close()
|
||||
sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
|
||||
require.NoError(t, err)
|
||||
defer sub2.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
|
||||
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||
Peer: h2.ID(),
|
||||
Connectedness: network.Connected,
|
||||
})
|
||||
require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||
Peer: h1.ID(),
|
||||
Connectedness: network.Connected,
|
||||
})
|
||||
|
||||
// now close h2. This will disconnect it from h1.
|
||||
require.NoError(t, h2.Close())
|
||||
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
|
||||
Peer: h2.ID(),
|
||||
Connectedness: network.NotConnected,
|
||||
})
|
||||
}
|
@@ -15,6 +15,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
|
||||
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
|
||||
@@ -43,7 +44,7 @@ func makeSwarm(t *testing.T) *Swarm {
|
||||
ps.AddPrivKey(id, priv)
|
||||
t.Cleanup(func() { ps.Close() })
|
||||
|
||||
s, err := NewSwarm(id, ps, WithDialTimeout(time.Second))
|
||||
s, err := NewSwarm(id, ps, eventbus.NewBus(), WithDialTimeout(time.Second))
|
||||
require.NoError(t, err)
|
||||
|
||||
upgrader := makeUpgrader(t, s)
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/connmgr"
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/metrics"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
@@ -111,6 +112,8 @@ type Swarm struct {
|
||||
// down before continuing.
|
||||
refs sync.WaitGroup
|
||||
|
||||
emitter event.Emitter
|
||||
|
||||
rcmgr network.ResourceManager
|
||||
|
||||
local peer.ID
|
||||
@@ -163,11 +166,16 @@ type Swarm struct {
|
||||
}
|
||||
|
||||
// NewSwarm constructs a Swarm.
|
||||
func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm, error) {
|
||||
func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts ...Option) (*Swarm, error) {
|
||||
emitter, err := eventBus.Emitter(new(event.EvtPeerConnectednessChanged))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s := &Swarm{
|
||||
local: local,
|
||||
peers: peers,
|
||||
emitter: emitter,
|
||||
ctx: ctx,
|
||||
ctxCancel: cancel,
|
||||
dialTimeout: defaultDialTimeout,
|
||||
@@ -203,6 +211,8 @@ func (s *Swarm) Close() error {
|
||||
func (s *Swarm) close() {
|
||||
s.ctxCancel()
|
||||
|
||||
s.emitter.Close()
|
||||
|
||||
// Prevents new connections and/or listeners from being added to the swarm.
|
||||
s.listeners.Lock()
|
||||
listeners := s.listeners.m
|
||||
@@ -319,6 +329,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
|
||||
}
|
||||
|
||||
c.streams.m = make(map[*Stream]struct{})
|
||||
if len(s.conns.m[p]) == 0 { // first connection
|
||||
s.emitter.Emit(event.EvtPeerConnectednessChanged{
|
||||
Peer: p,
|
||||
Connectedness: network.Connected,
|
||||
})
|
||||
}
|
||||
s.conns.m[p] = append(s.conns.m[p], c)
|
||||
|
||||
// Add two swarm refs:
|
||||
@@ -611,11 +627,16 @@ func (s *Swarm) removeConn(c *Conn) {
|
||||
|
||||
s.conns.Lock()
|
||||
defer s.conns.Unlock()
|
||||
|
||||
cs := s.conns.m[p]
|
||||
for i, ci := range cs {
|
||||
if ci == c {
|
||||
if len(cs) == 1 {
|
||||
delete(s.conns.m, p)
|
||||
s.emitter.Emit(event.EvtPeerConnectednessChanged{
|
||||
Peer: p,
|
||||
Connectedness: network.NotConnected,
|
||||
})
|
||||
} else {
|
||||
// NOTE: We're intentionally preserving order.
|
||||
// This way, connections to a peer are always
|
||||
@@ -624,7 +645,7 @@ func (s *Swarm) removeConn(c *Conn) {
|
||||
cs[len(cs)-1] = nil
|
||||
s.conns.m[p] = cs[:len(cs)-1]
|
||||
}
|
||||
return
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/test"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
|
||||
circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
|
||||
@@ -74,7 +75,7 @@ func TestDialAddressSelection(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
id, err := peer.IDFromPrivateKey(priv)
|
||||
require.NoError(t, err)
|
||||
s, err := swarm.NewSwarm("local", nil)
|
||||
s, err := swarm.NewSwarm("local", nil, eventbus.NewBus())
|
||||
require.NoError(t, err)
|
||||
|
||||
tcpTr, err := tcp.NewTCPTransport(nil, nil)
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/test"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
|
||||
"github.com/libp2p/go-libp2p/p2p/transport/websocket"
|
||||
@@ -47,7 +48,7 @@ func TestAddrsForDial(t *testing.T) {
|
||||
|
||||
tpt, err := websocket.New(nil, &network.NullResourceManager{})
|
||||
require.NoError(t, err)
|
||||
s, err := NewSwarm(id, ps, WithMultiaddrResolver(resolver))
|
||||
s, err := NewSwarm(id, ps, eventbus.NewBus(), WithMultiaddrResolver(resolver))
|
||||
require.NoError(t, err)
|
||||
defer s.Close()
|
||||
err = s.AddTransport(tpt)
|
||||
@@ -74,7 +75,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
|
||||
ps.AddPubKey(id, priv.GetPublic())
|
||||
ps.AddPrivKey(id, priv)
|
||||
t.Cleanup(func() { ps.Close() })
|
||||
s, err := NewSwarm(id, ps, WithMultiaddrResolver(resolver))
|
||||
s, err := NewSwarm(id, ps, eventbus.NewBus(), WithMultiaddrResolver(resolver))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
s.Close()
|
||||
|
66
p2p/net/swarm/swarm_event_test.go
Normal file
66
p2p/net/swarm/swarm_event_test.go
Normal file
@@ -0,0 +1,66 @@
|
||||
package swarm_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
. "github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newSwarmWithSubscription(t *testing.T) (*Swarm, event.Subscription) {
|
||||
t.Helper()
|
||||
bus := eventbus.NewBus()
|
||||
sw := swarmt.GenSwarm(t, swarmt.EventBus(bus))
|
||||
t.Cleanup(func() { sw.Close() })
|
||||
sub, err := bus.Subscribe(new(event.EvtPeerConnectednessChanged))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { sub.Close() })
|
||||
return sw, sub
|
||||
}
|
||||
|
||||
func checkEvent(t *testing.T, sub event.Subscription, expected event.EvtPeerConnectednessChanged) {
|
||||
t.Helper()
|
||||
select {
|
||||
case ev, ok := <-sub.Out():
|
||||
require.True(t, ok)
|
||||
evt := ev.(event.EvtPeerConnectednessChanged)
|
||||
require.Equal(t, expected.Connectedness, evt.Connectedness, "wrong connectedness state")
|
||||
require.Equal(t, expected.Peer, evt.Peer)
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("didn't get PeerConnectedness event")
|
||||
}
|
||||
|
||||
// check that there are no more events
|
||||
select {
|
||||
case <-sub.Out():
|
||||
t.Fatal("didn't expect any more events")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func TestConnectednessEventsSingleConn(t *testing.T) {
|
||||
s1, sub1 := newSwarmWithSubscription(t)
|
||||
s2, sub2 := newSwarmWithSubscription(t)
|
||||
|
||||
s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{s2.ListenAddresses()[0]}, time.Hour)
|
||||
_, err := s1.DialPeer(context.Background(), s2.LocalPeer())
|
||||
require.NoError(t, err)
|
||||
|
||||
checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.Connected})
|
||||
checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.Connected})
|
||||
|
||||
for _, c := range s2.ConnsToPeer(s1.LocalPeer()) {
|
||||
require.NoError(t, c.Close())
|
||||
}
|
||||
checkEvent(t, sub1, event.EvtPeerConnectednessChanged{Peer: s2.LocalPeer(), Connectedness: network.NotConnected})
|
||||
checkEvent(t, sub2, event.EvtPeerConnectednessChanged{Peer: s1.LocalPeer(), Connectedness: network.NotConnected})
|
||||
}
|
@@ -5,11 +5,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/connmgr"
|
||||
"github.com/libp2p/go-libp2p/core/control"
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/metrics"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
@@ -17,11 +16,13 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/sec"
|
||||
"github.com/libp2p/go-libp2p/core/sec/insecure"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
|
||||
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
|
||||
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
|
||||
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
@@ -36,6 +37,7 @@ type config struct {
|
||||
connectionGater connmgr.ConnectionGater
|
||||
sk crypto.PrivKey
|
||||
swarmOpts []swarm.Option
|
||||
eventBus event.Bus
|
||||
clock
|
||||
}
|
||||
|
||||
@@ -99,6 +101,12 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func EventBus(b event.Bus) Option {
|
||||
return func(_ *testing.T, c *config) {
|
||||
c.eventBus = b
|
||||
}
|
||||
}
|
||||
|
||||
// GenUpgrader creates a new connection upgrader for use with this swarm.
|
||||
func GenUpgrader(t *testing.T, n *swarm.Swarm, connGater connmgr.ConnectionGater, opts ...tptu.Option) transport.Upgrader {
|
||||
id := n.LocalPeer()
|
||||
@@ -140,7 +148,12 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
|
||||
if cfg.connectionGater != nil {
|
||||
swarmOpts = append(swarmOpts, swarm.WithConnectionGater(cfg.connectionGater))
|
||||
}
|
||||
s, err := swarm.NewSwarm(id, ps, swarmOpts...)
|
||||
|
||||
eventBus := cfg.eventBus
|
||||
if eventBus == nil {
|
||||
eventBus = eventbus.NewBus()
|
||||
}
|
||||
s, err := swarm.NewSwarm(id, ps, eventBus, swarmOpts...)
|
||||
require.NoError(t, err)
|
||||
|
||||
upgrader := GenUpgrader(t, s, cfg.connectionGater)
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/transport"
|
||||
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
|
||||
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
|
||||
"github.com/libp2p/go-libp2p/p2p/net/swarm"
|
||||
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
|
||||
@@ -48,7 +49,7 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u
|
||||
}
|
||||
|
||||
bwr := metrics.NewBandwidthCounter()
|
||||
netw, err := swarm.NewSwarm(p, ps, swarm.WithMetrics(bwr))
|
||||
netw, err := swarm.NewSwarm(p, ps, eventbus.NewBus(), swarm.WithMetrics(bwr))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user