identify: remove support for Identify Delta

This commit is contained in:
Marten Seemann
2023-01-04 11:29:01 +13:00
parent 0e9abdf228
commit aa1f32484a
10 changed files with 33 additions and 769 deletions

View File

@@ -25,7 +25,7 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) {
rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)
for _, id := range [...]protocol.ID{identify.ID, identify.IDDelta, identify.IDPush} {
for _, id := range [...]protocol.ID{identify.ID, identify.IDPush} {
config.AddProtocolLimit(
id,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20},

View File

@@ -286,7 +286,6 @@ func TestHostProtoPreference(t *testing.T) {
// Prevent pushing identify information so this test works.
h1.RemoveStreamHandler(identify.IDPush)
h1.RemoveStreamHandler(identify.IDDelta)
h2.SetStreamHandler(protoOld, handler)
@@ -362,7 +361,6 @@ func TestHostProtoPreknowledge(t *testing.T) {
h2.SetStreamHandler("/super", handler)
// Prevent pushing identify information so this test actually _uses_ the super protocol.
h1.RemoveStreamHandler(identify.IDPush)
h1.RemoveStreamHandler(identify.IDDelta)
h2pi := h2.Peerstore().PeerInfo(h2.ID())
require.NoError(t, h1.Connect(ctx, h2pi))

View File

@@ -115,7 +115,7 @@ type idService struct {
addPeerHandlerCh chan addPeerHandlerReq
rmPeerHandlerCh chan rmPeerHandlerReq
// pushSemaphore limits the push/delta concurrency to avoid storms
// pushSemaphore limits the push concurrency to avoid storms
// that clog the transient scope.
pushSemaphore chan struct{}
}
@@ -154,9 +154,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
// handle local protocol handler updates, and push deltas to peers.
var err error
observedAddrs, err := NewObservedAddrManager(h)
if err != nil {
return nil, fmt.Errorf("failed to create observed address manager: %s", err)
@@ -180,7 +177,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
}
// register protocols that do not depend on peer records.
h.SetStreamHandler(IDDelta, s.deltaHandler)
h.SetStreamHandler(ID, s.sendIdentifyResp)
h.SetStreamHandler(IDPush, s.pushHandler)
@@ -269,20 +265,18 @@ func (ids *idService) loop() {
select {
case phs[pid].pushCh <- struct{}{}:
default:
log.Debugf("dropping addr updated message for %s as buffer full", pid.Pretty())
log.Debugf("dropping addr updated message for %s as buffer full", pid)
}
}
case event.EvtLocalProtocolsUpdated:
for pid := range phs {
select {
case phs[pid].deltaCh <- struct{}{}:
case phs[pid].pushCh <- struct{}{}:
default:
log.Debugf("dropping protocol updated message for %s as buffer full", pid.Pretty())
log.Debugf("dropping protocol updated message for %s as buffer full", pid)
}
}
}
case <-ids.ctx.Done():
return
}

View File

@@ -1,82 +0,0 @@
package identify
import (
"time"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
"github.com/libp2p/go-msgio/protoio"
)
const IDDelta = "/p2p/id/delta/1.0.0"
const deltaMsgSize = 2048
// deltaHandler handles incoming delta updates from peers.
func (ids *idService) deltaHandler(s network.Stream) {
if err := s.Scope().SetService(ServiceName); err != nil {
log.Warnf("error attaching stream to identify service: %s", err)
s.Reset()
return
}
if err := s.Scope().ReserveMemory(deltaMsgSize, network.ReservationPriorityAlways); err != nil {
log.Warnf("error reserving memory for identify stream: %s", err)
s.Reset()
return
}
defer s.Scope().ReleaseMemory(deltaMsgSize)
_ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout))
c := s.Conn()
r := protoio.NewDelimitedReader(s, deltaMsgSize)
mes := pb.Identify{}
if err := r.ReadMsg(&mes); err != nil {
log.Warn("error reading identify message: ", err)
_ = s.Reset()
return
}
defer s.Close()
log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())
delta := mes.GetDelta()
if delta == nil {
return
}
p := s.Conn().RemotePeer()
if err := ids.consumeDelta(p, delta); err != nil {
_ = s.Reset()
log.Warnf("delta update from peer %s failed: %s", p, err)
}
}
// consumeDelta processes an incoming delta from a peer, updating the peerstore
// and emitting the appropriate events.
func (ids *idService) consumeDelta(id peer.ID, delta *pb.Delta) error {
err := ids.Host.Peerstore().AddProtocols(id, delta.GetAddedProtocols()...)
if err != nil {
return err
}
err = ids.Host.Peerstore().RemoveProtocols(id, delta.GetRmProtocols()...)
if err != nil {
return err
}
evt := event.EvtPeerProtocolsUpdated{
Peer: id,
Added: protocol.ConvertFromStrings(delta.GetAddedProtocols()),
Removed: protocol.ConvertFromStrings(delta.GetRmProtocols()),
}
ids.emitters.evtPeerProtocolsUpdated.Emit(evt)
return nil
}

View File

@@ -4,11 +4,8 @@ import (
"github.com/libp2p/go-libp2p/core/network"
)
// IDPush is the protocol.ID of the Identify push protocol. It sends full identify messages containing
// the current state of the peer.
//
// It is in the process of being replaced by identify delta, which sends only diffs for better
// resource utilisation.
// IDPush is the protocol.ID of the Identify push protocol.
// It sends full identify messages containing the current state of the peer.
const IDPush = "/ipfs/id/push/1.0.0"
// pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol.

View File

@@ -3,8 +3,6 @@ package identify_test
import (
"context"
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
@@ -360,169 +358,8 @@ func TestLocalhostAddrFiltering(t *testing.T) {
}
}
func TestIdentifyDeltaOnProtocolChange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t))
defer h2.Close()
defer h1.Close()
h2.SetStreamHandler(protocol.TestingID, func(_ network.Stream) {})
ids1, err := identify.NewIDService(h1)
require.NoError(t, err)
ids2, err := identify.NewIDService(h2)
require.NoError(t, err)
defer func() {
ids1.Close()
ids2.Close()
}()
idComplete, err := h1.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{})
require.NoError(t, err)
defer idComplete.Close()
idFailed, err := h1.EventBus().Subscribe(&event.EvtPeerIdentificationFailed{})
require.NoError(t, err)
defer idFailed.Close()
if err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
t.Fatal(err)
}
conn := h1.Network().ConnsToPeer(h2.ID())[0]
select {
case <-ids1.IdentifyWait(conn):
case <-time.After(5 * time.Second):
t.Fatal("took over 5 seconds to identify")
}
select {
case <-idComplete.Out():
case evt := <-idFailed.Out():
t.Fatalf("Failed to identify: %v", evt.(event.EvtPeerIdentificationFailed).Reason)
case <-time.After(5 * time.Second):
t.Fatal("Missing id event")
}
protos, err := h1.Peerstore().GetProtocols(h2.ID())
if err != nil {
t.Fatal(err)
}
sort.Strings(protos)
if sort.SearchStrings(protos, string(protocol.TestingID)) == len(protos) {
t.Fatalf("expected peer 1 to know that peer 2 speaks the Test protocol amongst others")
}
// set up a subscriber to listen to peer protocol updated events in h1. We expect to receive events from h2
// as protocols are added and removed.
sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
if err != nil {
t.Fatal(err)
}
defer sub.Close()
h1ProtocolsUpdates, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
require.NoError(t, err)
defer h1ProtocolsUpdates.Close()
waitForDelta := make(chan struct{})
go func() {
expectedCount := 2
for expectedCount > 0 {
evt := <-h1ProtocolsUpdates.Out()
expectedCount -= len(evt.(event.EvtPeerProtocolsUpdated).Added)
}
close(waitForDelta)
}()
// add two new protocols in h2 and wait for identify to send deltas.
h2.SetStreamHandler(protocol.ID("foo"), func(_ network.Stream) {})
h2.SetStreamHandler(protocol.ID("bar"), func(_ network.Stream) {})
recvWithTimeout(t, waitForDelta, 10*time.Second, "Timed out waiting to read protocol ids from the wire")
protos, err = h1.Peerstore().GetProtocols(h2.ID())
require.NoError(t, err)
have := make(map[string]bool, len(protos))
for _, p := range protos {
have[p] = true
}
require.True(t, have["foo"])
require.True(t, have["bar"])
// remove one of the newly added protocols from h2, and wait for identify to send the delta.
h2.RemoveStreamHandler(protocol.ID("bar"))
waitForDelta = make(chan struct{})
go func() {
expectedCount := 1
for expectedCount > 0 {
evt := <-h1ProtocolsUpdates.Out()
expectedCount -= len(evt.(event.EvtPeerProtocolsUpdated).Removed)
}
close(waitForDelta)
}()
// check that h1 now has forgotten about h2's bar protocol.
recvWithTimeout(t, waitForDelta, 10*time.Second, "timed out waiting for protocol to be removed")
protos, err = h1.Peerstore().GetProtocols(h2.ID())
require.NoError(t, err)
have = make(map[string]bool, len(protos))
for _, p := range protos {
have[p] = true
}
require.True(t, have["foo"])
require.False(t, have["bar"])
// make sure that h1 emitted events in the eventbus for h2's protocol updates.
done := make(chan struct{})
var lk sync.Mutex
var added []string
var removed []string
var success bool
go func() {
defer close(done)
for {
select {
case <-time.After(5 * time.Second):
return
case e, ok := <-sub.Out():
if !ok {
return
}
evt := e.(event.EvtPeerProtocolsUpdated)
lk.Lock()
added = append(added, protocol.ConvertToStrings(evt.Added)...)
removed = append(removed, protocol.ConvertToStrings(evt.Removed)...)
sort.Strings(added)
sort.Strings(removed)
if reflect.DeepEqual(added, []string{"bar", "foo"}) &&
reflect.DeepEqual(removed, []string{"bar"}) {
success = true
lk.Unlock()
return
}
lk.Unlock()
}
}
}()
<-done
lk.Lock()
defer lk.Unlock()
require.True(t, success, "did not get correct peer protocol updated events")
}
// TestIdentifyDeltaWhileIdentifyingConn tests that the host waits to push delta updates if an identify is ongoing.
func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
// TestIdentifyPushWhileIdentifyingConn tests that the host waits to push updates if an identify is ongoing.
func TestIdentifyPushWhileIdentifyingConn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -559,20 +396,18 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
// from h2, identify h1.
conn := h2.Network().ConnsToPeer(h1.ID())[0]
go func() {
ids2.IdentifyConn(conn)
}()
go ids2.IdentifyConn(conn)
<-time.After(500 * time.Millisecond)
// subscribe to events in h1; after identify h1 should receive the delta from h2 and publish an event in the bus.
// subscribe to events in h1; after identify h1 should receive the update from h2 and publish an event in the bus.
sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{})
if err != nil {
t.Fatal(err)
}
defer sub.Close()
// add a handler in h2; the delta to h1 will queue until we're done identifying h1.
// add a handler in h2; the update to h1 will queue until we're done identifying h1.
h2.SetStreamHandler(protocol.TestingID, func(_ network.Stream) {})
<-time.After(500 * time.Millisecond)
@@ -722,7 +557,7 @@ func TestNotListening(t *testing.T) {
}
}
func TestSendPushIfDeltaNotSupported(t *testing.T) {
func TestSendPush(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -749,19 +584,6 @@ func TestSendPushIfDeltaNotSupported(t *testing.T) {
ids1.IdentifyConn(h1.Network().ConnsToPeer(h2.ID())[0])
ids2.IdentifyConn(h2.Network().ConnsToPeer(h1.ID())[0])
// h1 knows h2 speaks Delta
sup, err := h1.Peerstore().SupportsProtocols(h2.ID(), []string{identify.IDDelta}...)
require.NoError(t, err)
require.Equal(t, []string{identify.IDDelta}, sup)
// h2 stops supporting Delta and that information flows to h1
h2.RemoveStreamHandler(identify.IDDelta)
require.Eventually(t, func() bool {
sup, err := h1.Peerstore().SupportsProtocols(h2.ID(), []string{identify.IDDelta}...)
return err == nil && len(sup) == 0
}, time.Second, 10*time.Millisecond)
// h1 starts listening on a new protocol and h2 finds out about that through a push
h1.SetStreamHandler("rand", func(network.Stream) {})
require.Eventually(t, func() bool {
@@ -1019,7 +841,7 @@ func TestIncomingIDStreamsTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
protocols := []protocol.ID{identify.IDPush, identify.IDDelta}
protocols := []protocol.ID{identify.IDPush}
for _, p := range protocols {
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
@@ -1055,16 +877,6 @@ func TestIncomingIDStreamsTimeout(t *testing.T) {
}
}
func recvWithTimeout(t *testing.T, s <-chan struct{}, timeout time.Duration, failMsg string) {
t.Helper()
select {
case <-s:
return
case <-time.After(timeout):
t.Fatalf("Hit time while waiting to recv from channel: %s", failMsg)
}
}
func waitForAddrInStream(t *testing.T, s <-chan ma.Multiaddr, expected ma.Multiaddr, timeout time.Duration, failMsg string) {
t.Helper()
for {

View File

@@ -5,11 +5,10 @@ package identify_pb
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
proto "github.com/gogo/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
@@ -23,63 +22,6 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type Delta struct {
// new protocols now serviced by the peer.
AddedProtocols []string `protobuf:"bytes,1,rep,name=added_protocols,json=addedProtocols" json:"added_protocols,omitempty"`
// protocols dropped by the peer.
RmProtocols []string `protobuf:"bytes,2,rep,name=rm_protocols,json=rmProtocols" json:"rm_protocols,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Delta) Reset() { *m = Delta{} }
func (m *Delta) String() string { return proto.CompactTextString(m) }
func (*Delta) ProtoMessage() {}
func (*Delta) Descriptor() ([]byte, []int) {
return fileDescriptor_83f1e7e6b485409f, []int{0}
}
func (m *Delta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Delta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Delta.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Delta) XXX_Merge(src proto.Message) {
xxx_messageInfo_Delta.Merge(m, src)
}
func (m *Delta) XXX_Size() int {
return m.Size()
}
func (m *Delta) XXX_DiscardUnknown() {
xxx_messageInfo_Delta.DiscardUnknown(m)
}
var xxx_messageInfo_Delta proto.InternalMessageInfo
func (m *Delta) GetAddedProtocols() []string {
if m != nil {
return m.AddedProtocols
}
return nil
}
func (m *Delta) GetRmProtocols() []string {
if m != nil {
return m.RmProtocols
}
return nil
}
type Identify struct {
// protocolVersion determines compatibility between peers
ProtocolVersion *string `protobuf:"bytes,5,opt,name=protocolVersion" json:"protocolVersion,omitempty"`
@@ -98,8 +40,6 @@ type Identify struct {
ObservedAddr []byte `protobuf:"bytes,4,opt,name=observedAddr" json:"observedAddr,omitempty"`
// protocols are the services this node is running
Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"`
// a delta update is incompatible with everything else. If this field is included, none of the others can appear.
Delta *Delta `protobuf:"bytes,7,opt,name=delta" json:"delta,omitempty"`
// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
// in a form that lets us share authenticated addrs with other peers.
@@ -115,7 +55,7 @@ func (m *Identify) Reset() { *m = Identify{} }
func (m *Identify) String() string { return proto.CompactTextString(m) }
func (*Identify) ProtoMessage() {}
func (*Identify) Descriptor() ([]byte, []int) {
return fileDescriptor_83f1e7e6b485409f, []int{1}
return fileDescriptor_83f1e7e6b485409f, []int{0}
}
func (m *Identify) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@@ -186,13 +126,6 @@ func (m *Identify) GetProtocols() []string {
return nil
}
func (m *Identify) GetDelta() *Delta {
if m != nil {
return m.Delta
}
return nil
}
func (m *Identify) GetSignedPeerRecord() []byte {
if m != nil {
return m.SignedPeerRecord
@@ -201,76 +134,27 @@ func (m *Identify) GetSignedPeerRecord() []byte {
}
func init() {
proto.RegisterType((*Delta)(nil), "identify.pb.Delta")
proto.RegisterType((*Identify)(nil), "identify.pb.Identify")
}
func init() { proto.RegisterFile("identify.proto", fileDescriptor_83f1e7e6b485409f) }
var fileDescriptor_83f1e7e6b485409f = []byte{
// 272 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x90, 0x41, 0x4a, 0xc3, 0x40,
0x14, 0x86, 0x99, 0xd6, 0x6a, 0xf3, 0x12, 0x5a, 0x99, 0xd5, 0x2c, 0x24, 0xc4, 0x6c, 0x1c, 0x5c,
0x64, 0xe1, 0x0d, 0x14, 0x37, 0xe2, 0xa6, 0x8c, 0xe0, 0x56, 0x92, 0xbc, 0x67, 0x19, 0x48, 0x33,
0x65, 0x32, 0x0a, 0xbd, 0x95, 0xc7, 0x70, 0xe9, 0x11, 0x24, 0x27, 0x91, 0x4c, 0x4d, 0x93, 0xea,
0x72, 0x3e, 0x3e, 0xe6, 0x7f, 0xff, 0x0f, 0x0b, 0x8d, 0x54, 0x3b, 0xfd, 0xba, 0xcb, 0xb6, 0xd6,
0x38, 0xc3, 0xc3, 0xe1, 0x5d, 0xa4, 0x4f, 0x30, 0xbb, 0xa7, 0xca, 0xe5, 0xfc, 0x0a, 0x96, 0x39,
0x22, 0xe1, 0x8b, 0x97, 0x4a, 0x53, 0x35, 0x82, 0x25, 0x53, 0x19, 0xa8, 0x85, 0xc7, 0xab, 0x9e,
0xf2, 0x4b, 0x88, 0xec, 0x66, 0x64, 0x4d, 0xbc, 0x15, 0xda, 0xcd, 0x41, 0x49, 0x3f, 0x26, 0x30,
0x7f, 0xf8, 0x0d, 0xe1, 0x12, 0x96, 0xbd, 0xfc, 0x4c, 0xb6, 0xd1, 0xa6, 0x16, 0xb3, 0x84, 0xc9,
0x40, 0xfd, 0xc5, 0x3c, 0x85, 0x28, 0x5f, 0x53, 0xed, 0x7a, 0xed, 0xd4, 0x6b, 0x47, 0x8c, 0x5f,
0x40, 0xb0, 0x7d, 0x2b, 0x2a, 0x5d, 0x3e, 0xd2, 0x4e, 0xb0, 0x84, 0xc9, 0x48, 0x0d, 0x80, 0x27,
0x10, 0x56, 0xba, 0x71, 0x54, 0xdf, 0x22, 0xda, 0xfd, 0x69, 0x91, 0x1a, 0xa3, 0x2e, 0xc3, 0x14,
0x0d, 0xd9, 0x77, 0xc2, 0x0e, 0x88, 0x13, 0xff, 0xc5, 0x11, 0xf3, 0x19, 0x87, 0x7a, 0x53, 0x5f,
0x6f, 0x00, 0x5c, 0xc2, 0x0c, 0xbb, 0xc5, 0xc4, 0x59, 0xc2, 0x64, 0x78, 0xc3, 0xb3, 0xd1, 0x9c,
0x99, 0xdf, 0x52, 0xed, 0x05, 0x7e, 0x0d, 0xe7, 0x8d, 0x5e, 0xd7, 0x84, 0x2b, 0x22, 0xab, 0xa8,
0x34, 0x16, 0xc5, 0xdc, 0xe7, 0xfd, 0xe3, 0x77, 0xd1, 0x67, 0x1b, 0xb3, 0xaf, 0x36, 0x66, 0xdf,
0x6d, 0xcc, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0xc0, 0x03, 0xc8, 0x41, 0xb3, 0x01, 0x00, 0x00,
}
func (m *Delta) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Delta) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Delta) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.RmProtocols) > 0 {
for iNdEx := len(m.RmProtocols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.RmProtocols[iNdEx])
copy(dAtA[i:], m.RmProtocols[iNdEx])
i = encodeVarintIdentify(dAtA, i, uint64(len(m.RmProtocols[iNdEx])))
i--
dAtA[i] = 0x12
}
}
if len(m.AddedProtocols) > 0 {
for iNdEx := len(m.AddedProtocols) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.AddedProtocols[iNdEx])
copy(dAtA[i:], m.AddedProtocols[iNdEx])
i = encodeVarintIdentify(dAtA, i, uint64(len(m.AddedProtocols[iNdEx])))
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
// 213 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x4c, 0x49, 0xcd,
0x2b, 0xc9, 0x4c, 0xab, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0xf0, 0x93, 0x94,
0xda, 0x98, 0xb8, 0x38, 0x3c, 0xa1, 0x7c, 0x21, 0x0d, 0x2e, 0x7e, 0xb0, 0x92, 0xe4, 0xfc, 0x9c,
0xb0, 0xd4, 0xa2, 0xe2, 0xcc, 0xfc, 0x3c, 0x09, 0x56, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x74, 0x61,
0x21, 0x25, 0x2e, 0x9e, 0xc4, 0xf4, 0xd4, 0xbc, 0x12, 0x98, 0x32, 0x36, 0xb0, 0x32, 0x14, 0x31,
0x21, 0x19, 0x2e, 0xce, 0x82, 0xd2, 0xa4, 0x9c, 0xcc, 0x64, 0xef, 0xd4, 0x4a, 0x09, 0x46, 0x05,
0x46, 0x0d, 0x9e, 0x20, 0x84, 0x80, 0x90, 0x02, 0x17, 0x77, 0x4e, 0x66, 0x71, 0x49, 0x6a, 0x9e,
0x63, 0x4a, 0x4a, 0x51, 0xb1, 0x04, 0x93, 0x02, 0xb3, 0x06, 0x4f, 0x10, 0xb2, 0x10, 0xc8, 0x8e,
0xfc, 0xa4, 0xe2, 0xd4, 0xa2, 0xb2, 0xd4, 0x14, 0x90, 0x80, 0x04, 0x0b, 0xd8, 0x08, 0x14, 0x31,
0xb0, 0x1d, 0x50, 0xa7, 0x15, 0x4b, 0x30, 0x2b, 0x30, 0x6b, 0x70, 0x06, 0x21, 0x04, 0x84, 0xb4,
0xb8, 0x04, 0x8a, 0x33, 0xd3, 0xf3, 0x52, 0x53, 0x02, 0x52, 0x53, 0x8b, 0x82, 0x52, 0x93, 0xf3,
0x8b, 0x52, 0x24, 0x38, 0xc0, 0xa6, 0x60, 0x88, 0x3b, 0xf1, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1,
0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc3, 0xb6, 0xc0,
0x32, 0x34, 0x01, 0x00, 0x00,
}
func (m *Identify) Marshal() (dAtA []byte, err error) {
@@ -304,18 +188,6 @@ func (m *Identify) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i--
dAtA[i] = 0x42
}
if m.Delta != nil {
{
size, err := m.Delta.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintIdentify(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x3a
}
if m.AgentVersion != nil {
i -= len(*m.AgentVersion)
copy(dAtA[i:], *m.AgentVersion)
@@ -376,30 +248,6 @@ func encodeVarintIdentify(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return base
}
func (m *Delta) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.AddedProtocols) > 0 {
for _, s := range m.AddedProtocols {
l = len(s)
n += 1 + l + sovIdentify(uint64(l))
}
}
if len(m.RmProtocols) > 0 {
for _, s := range m.RmProtocols {
l = len(s)
n += 1 + l + sovIdentify(uint64(l))
}
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *Identify) Size() (n int) {
if m == nil {
return 0
@@ -434,10 +282,6 @@ func (m *Identify) Size() (n int) {
l = len(*m.AgentVersion)
n += 1 + l + sovIdentify(uint64(l))
}
if m.Delta != nil {
l = m.Delta.Size()
n += 1 + l + sovIdentify(uint64(l))
}
if m.SignedPeerRecord != nil {
l = len(m.SignedPeerRecord)
n += 1 + l + sovIdentify(uint64(l))
@@ -454,121 +298,6 @@ func sovIdentify(x uint64) (n int) {
func sozIdentify(x uint64) (n int) {
return sovIdentify(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Delta) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowIdentify
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Delta: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Delta: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field AddedProtocols", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowIdentify
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthIdentify
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthIdentify
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.AddedProtocols = append(m.AddedProtocols, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field RmProtocols", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowIdentify
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthIdentify
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthIdentify
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.RmProtocols = append(m.RmProtocols, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipIdentify(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthIdentify
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Identify) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@@ -796,42 +525,6 @@ func (m *Identify) Unmarshal(dAtA []byte) error {
s := string(dAtA[iNdEx:postIndex])
m.AgentVersion = &s
iNdEx = postIndex
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Delta", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowIdentify
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthIdentify
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthIdentify
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Delta == nil {
m.Delta = &Delta{}
}
if err := m.Delta.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SignedPeerRecord", wireType)

View File

@@ -2,13 +2,6 @@ syntax = "proto2";
package identify.pb;
message Delta {
// new protocols now serviced by the peer.
repeated string added_protocols = 1;
// protocols dropped by the peer.
repeated string rm_protocols = 2;
}
message Identify {
// protocolVersion determines compatibility between peers
@@ -34,9 +27,6 @@ message Identify {
// protocols are the services this node is running
repeated string protocols = 3;
// a delta update is incompatible with everything else. If this field is included, none of the others can appear.
optional Delta delta = 7;
// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
// in a form that lets us share authenticated addrs with other peers.

View File

@@ -11,9 +11,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
"github.com/libp2p/go-msgio/protoio"
ma "github.com/multiformats/go-multiaddr"
)
@@ -35,8 +32,7 @@ type peerHandler struct {
snapshotMu sync.RWMutex
snapshot *identifySnapshot
pushCh chan struct{}
deltaCh chan struct{}
pushCh chan struct{}
}
func newPeerHandler(pid peer.ID, ids *idService) *peerHandler {
@@ -46,8 +42,7 @@ func newPeerHandler(pid peer.ID, ids *idService) *peerHandler {
snapshot: ids.getSnapshot(),
pushCh: make(chan struct{}, 1),
deltaCh: make(chan struct{}, 1),
pushCh: make(chan struct{}, 1),
}
return ph
@@ -92,52 +87,12 @@ func (ph *peerHandler) loop(ctx context.Context, onExit func()) {
if err := ph.sendPush(ctx); err != nil {
log.Warnw("failed to send Identify Push", "peer", ph.pid, "error", err)
}
case <-ph.deltaCh:
if err := ph.sendDelta(ctx); err != nil {
log.Warnw("failed to send Identify Delta", "peer", ph.pid, "error", err)
}
case <-ctx.Done():
return
}
}
}
func (ph *peerHandler) sendDelta(ctx context.Context) error {
// send a push if the peer does not support the Delta protocol.
if !ph.peerSupportsProtos(ctx, []string{IDDelta}) {
log.Debugw("will send push as peer does not support delta", "peer", ph.pid)
if err := ph.sendPush(ctx); err != nil {
return fmt.Errorf("failed to send push on delta message: %w", err)
}
return nil
}
// extract a delta message, updating the last state.
mes := ph.nextDelta()
if mes == nil || (len(mes.AddedProtocols) == 0 && len(mes.RmProtocols) == 0) {
return nil
}
ds, err := ph.openStream(ctx, []string{IDDelta})
if err != nil {
return fmt.Errorf("failed to open delta stream: %w", err)
}
defer ds.Close()
c := ds.Conn()
if err := protoio.NewDelimitedWriter(ds).WriteMsg(&pb.Identify{Delta: mes}); err != nil {
_ = ds.Reset()
return fmt.Errorf("failed to send delta message, %w", err)
}
log.Debugw("sent identify update", "protocol", ds.Protocol(), "peer", c.RemotePeer(),
"peer address", c.RemoteMultiaddr())
return nil
}
func (ph *peerHandler) sendPush(ctx context.Context) error {
dp, err := ph.openStream(ctx, []string{IDPush})
if err == errProtocolNotSupported {
@@ -216,49 +171,3 @@ func (ph *peerHandler) peerSupportsProtos(ctx context.Context, protos []string)
}
return true
}
func (ph *peerHandler) nextDelta() *pb.Delta {
curr := ph.ids.Host.Mux().Protocols()
// Extract the old protocol list and replace the old snapshot with an
// updated one.
ph.snapshotMu.Lock()
snapshot := *ph.snapshot
old := snapshot.protocols
snapshot.protocols = curr
ph.snapshot = &snapshot
ph.snapshotMu.Unlock()
oldProtos := make(map[string]struct{}, len(old))
currProtos := make(map[string]struct{}, len(curr))
for _, proto := range old {
oldProtos[proto] = struct{}{}
}
for _, proto := range curr {
currProtos[proto] = struct{}{}
}
var added []string
var removed []string
// has it been added ?
for p := range currProtos {
if _, ok := oldProtos[p]; !ok {
added = append(added, p)
}
}
// has it been removed ?
for p := range oldProtos {
if _, ok := currProtos[p]; !ok {
removed = append(removed, p)
}
}
return &pb.Delta{
AddedProtocols: added,
RmProtocols: removed,
}
}

View File

@@ -5,7 +5,6 @@ import (
"testing"
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
blhost "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
@@ -13,52 +12,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestMakeApplyDelta(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
ids1, err := NewIDService(h1)
require.NoError(t, err)
ph := newPeerHandler(h1.ID(), ids1)
ph.start(ctx, func() {})
defer ph.stop()
m1 := ph.nextDelta()
require.NotNil(t, m1)
// We haven't changed anything since creating the peer handler
require.Empty(t, m1.AddedProtocols)
h1.SetStreamHandler("p1", func(network.Stream) {})
m2 := ph.nextDelta()
require.Len(t, m2.AddedProtocols, 1)
require.Contains(t, m2.AddedProtocols, "p1")
require.Empty(t, m2.RmProtocols)
h1.SetStreamHandler("p2", func(network.Stream) {})
h1.SetStreamHandler("p3", func(stream network.Stream) {})
m3 := ph.nextDelta()
require.Len(t, m3.AddedProtocols, 2)
require.Contains(t, m3.AddedProtocols, "p2")
require.Contains(t, m3.AddedProtocols, "p3")
require.Empty(t, m3.RmProtocols)
h1.RemoveStreamHandler("p3")
m4 := ph.nextDelta()
require.Empty(t, m4.AddedProtocols)
require.Len(t, m4.RmProtocols, 1)
require.Contains(t, m4.RmProtocols, "p3")
h1.RemoveStreamHandler("p2")
h1.RemoveStreamHandler("p1")
m5 := ph.nextDelta()
require.Empty(t, m5.AddedProtocols)
require.Len(t, m5.RmProtocols, 2)
require.Contains(t, m5.RmProtocols, "p2")
require.Contains(t, m5.RmProtocols, "p1")
}
func TestHandlerClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()