mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-09-26 20:21:26 +08:00
Identify: emit useful events after identification (#2759)
* Identify should emit useful events after identification * Compare strings rather than objects * Include other fields in emitted event * Sort addrs when comparing in test
This commit is contained in:
@@ -1,11 +1,40 @@
|
||||
package event
|
||||
|
||||
import "github.com/libp2p/go-libp2p/core/peer"
|
||||
import (
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-libp2p/core/record"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// EvtPeerIdentificationCompleted is emitted when the initial identification round for a peer is completed.
|
||||
type EvtPeerIdentificationCompleted struct {
|
||||
// Peer is the ID of the peer whose identification succeeded.
|
||||
Peer peer.ID
|
||||
|
||||
// Conn is the connection we identified.
|
||||
Conn network.Conn
|
||||
|
||||
// ListenAddrs is the list of addresses the peer is listening on.
|
||||
ListenAddrs []multiaddr.Multiaddr
|
||||
|
||||
// Protocols is the list of protocols the peer advertised on this connection.
|
||||
Protocols []protocol.ID
|
||||
|
||||
// SignedPeerRecord is the provided signed peer record of the peer. May be nil.
|
||||
SignedPeerRecord *record.Envelope
|
||||
|
||||
// AgentVersion is like a UserAgent string in browsers, or client version in
|
||||
// bittorrent includes the client name and client.
|
||||
AgentVersion string
|
||||
|
||||
// ProtocolVersion is the protocolVersion field in the identify message
|
||||
ProtocolVersion string
|
||||
|
||||
// ObservedAddr is the our side's connection address as observed by the
|
||||
// peer. This is not verified, the peer could return anything here.
|
||||
ObservedAddr multiaddr.Multiaddr
|
||||
}
|
||||
|
||||
// EvtPeerIdentificationFailed is emitted when the initial identification round for a peer failed.
|
||||
|
@@ -400,8 +400,6 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
|
||||
ids.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: c.RemotePeer(), Reason: err})
|
||||
return
|
||||
}
|
||||
|
||||
ids.emitters.evtPeerIdentificationCompleted.Emit(event.EvtPeerIdentificationCompleted{Peer: c.RemotePeer()})
|
||||
}()
|
||||
|
||||
return e.IdentifyWaitChan
|
||||
@@ -711,8 +709,16 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
|
||||
})
|
||||
}
|
||||
|
||||
// mes.ObservedAddr
|
||||
ids.consumeObservedAddress(mes.GetObservedAddr(), c)
|
||||
obsAddr, err := ma.NewMultiaddrBytes(mes.GetObservedAddr())
|
||||
if err != nil {
|
||||
log.Debugf("error parsing received observed addr for %s: %s", c, err)
|
||||
obsAddr = nil
|
||||
}
|
||||
|
||||
if obsAddr != nil {
|
||||
// TODO refactor this to use the emitted events instead of having this func call explicitly.
|
||||
ids.observedAddrs.Record(c, obsAddr)
|
||||
}
|
||||
|
||||
// mes.ListenAddrs
|
||||
laddrs := mes.GetListenAddrs()
|
||||
@@ -763,6 +769,7 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
|
||||
signedAddrs, err := ids.consumeSignedPeerRecord(c.RemotePeer(), signedPeerRecord)
|
||||
if err != nil {
|
||||
log.Debugf("failed to consume signed peer record: %s", err)
|
||||
signedPeerRecord = nil
|
||||
} else {
|
||||
addrs = signedAddrs
|
||||
}
|
||||
@@ -786,6 +793,18 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
|
||||
|
||||
// get the key from the other side. we may not have it (no-auth transport)
|
||||
ids.consumeReceivedPubKey(c, mes.PublicKey)
|
||||
|
||||
ids.emitters.evtPeerIdentificationCompleted.Emit(event.EvtPeerIdentificationCompleted{
|
||||
Peer: c.RemotePeer(),
|
||||
Conn: c,
|
||||
ListenAddrs: lmaddrs,
|
||||
Protocols: mesProtocols,
|
||||
SignedPeerRecord: signedPeerRecord,
|
||||
ObservedAddr: obsAddr,
|
||||
ProtocolVersion: pv,
|
||||
AgentVersion: av,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (ids *idService) consumeSignedPeerRecord(p peer.ID, signedPeerRecord *record.Envelope) ([]ma.Multiaddr, error) {
|
||||
@@ -919,20 +938,6 @@ func HasConsistentTransport(a ma.Multiaddr, green []ma.Multiaddr) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (ids *idService) consumeObservedAddress(observed []byte, c network.Conn) {
|
||||
if observed == nil {
|
||||
return
|
||||
}
|
||||
|
||||
maddr, err := ma.NewMultiaddrBytes(observed)
|
||||
if err != nil {
|
||||
log.Debugf("error parsing received observed addr for %s: %s", c, err)
|
||||
return
|
||||
}
|
||||
|
||||
ids.observedAddrs.Record(c, maddr)
|
||||
}
|
||||
|
||||
// addConnWithLock assuems caller holds the connsMu lock
|
||||
func (ids *idService) addConnWithLock(c network.Conn) {
|
||||
_, found := ids.conns[c]
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -200,12 +201,47 @@ func TestIDService(t *testing.T) {
|
||||
|
||||
// test that we received the "identify completed" event.
|
||||
select {
|
||||
case <-sub.Out():
|
||||
case evtAny := <-sub.Out():
|
||||
assertCorrectEvtPeerIdentificationCompleted(t, evtAny, h2)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("expected EvtPeerIdentificationCompleted event within 10 seconds; none received")
|
||||
}
|
||||
}
|
||||
|
||||
func assertCorrectEvtPeerIdentificationCompleted(t *testing.T, evtAny interface{}, other host.Host) {
|
||||
t.Helper()
|
||||
evt := evtAny.(event.EvtPeerIdentificationCompleted)
|
||||
require.NotNil(t, evt.Conn)
|
||||
require.Equal(t, other.ID(), evt.Peer)
|
||||
|
||||
require.Equal(t, len(other.Addrs()), len(evt.ListenAddrs))
|
||||
if len(other.Addrs()) == len(evt.ListenAddrs) {
|
||||
otherAddrsStrings := make([]string, len(other.Addrs()))
|
||||
evtAddrStrings := make([]string, len(evt.ListenAddrs))
|
||||
for i, a := range other.Addrs() {
|
||||
otherAddrsStrings[i] = a.String()
|
||||
evtAddrStrings[i] = evt.ListenAddrs[i].String()
|
||||
}
|
||||
slices.Sort(otherAddrsStrings)
|
||||
slices.Sort(evtAddrStrings)
|
||||
require.Equal(t, otherAddrsStrings, evtAddrStrings)
|
||||
}
|
||||
|
||||
otherProtos := other.Mux().Protocols()
|
||||
slices.Sort(otherProtos)
|
||||
evtProtos := evt.Protocols
|
||||
slices.Sort(evtProtos)
|
||||
require.Equal(t, otherProtos, evtProtos)
|
||||
idFromSignedRecord, err := peer.IDFromPublicKey(evt.SignedPeerRecord.PublicKey)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, other.ID(), idFromSignedRecord)
|
||||
require.Equal(t, peer.PeerRecordEnvelopePayloadType, evt.SignedPeerRecord.PayloadType)
|
||||
var peerRecord peer.PeerRecord
|
||||
evt.SignedPeerRecord.TypedRecord(&peerRecord)
|
||||
require.Equal(t, other.ID(), peerRecord.PeerID)
|
||||
require.Equal(t, other.Addrs(), peerRecord.Addrs)
|
||||
}
|
||||
|
||||
func TestProtoMatching(t *testing.T) {
|
||||
tcp1, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234")
|
||||
tcp2, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/2345")
|
||||
@@ -665,7 +701,8 @@ func TestLargeIdentifyMessage(t *testing.T) {
|
||||
|
||||
// test that we received the "identify completed" event.
|
||||
select {
|
||||
case <-sub.Out():
|
||||
case evtAny := <-sub.Out():
|
||||
assertCorrectEvtPeerIdentificationCompleted(t, evtAny, h2)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("expected EvtPeerIdentificationCompleted event within 3 seconds; none received")
|
||||
}
|
||||
|
Reference in New Issue
Block a user