mirror of
https://github.com/libp2p/go-libp2p.git
synced 2025-09-26 20:21:26 +08:00
identify: reduce timeout to 5 seconds (#3259)
The max message size is about 100kB. 5 seconds are enough to transfer this.
This commit is contained in:
@@ -32,15 +32,15 @@ import (
|
||||
|
||||
var log = logging.Logger("net/identify")
|
||||
|
||||
var Timeout = 30 * time.Second // timeout on all incoming Identify interactions
|
||||
|
||||
const (
|
||||
// ID is the protocol.ID of version 1.0.0 of the identify service.
|
||||
ID = "/ipfs/id/1.0.0"
|
||||
// IDPush is the protocol.ID of the Identify push protocol.
|
||||
// It sends full identify messages containing the current state of the peer.
|
||||
IDPush = "/ipfs/id/push/1.0.0"
|
||||
|
||||
// DefaultTimeout for all id interactions, incoming / outgoing, id / id-push.
|
||||
DefaultTimeout = 5 * time.Second
|
||||
// ServiceName is the default identify service name
|
||||
ServiceName = "libp2p.identify"
|
||||
|
||||
legacyIDSize = 2 * 1024
|
||||
@@ -148,6 +148,7 @@ type idService struct {
|
||||
refCount sync.WaitGroup
|
||||
|
||||
disableSignedPeerRecord bool
|
||||
timeout time.Duration
|
||||
|
||||
connsMu sync.RWMutex
|
||||
// The conns map contains all connections we're currently handling.
|
||||
@@ -182,7 +183,9 @@ type normalizer interface {
|
||||
// NewIDService constructs a new *idService and activates it by
|
||||
// attaching its stream handler to the given host.Host.
|
||||
func NewIDService(h host.Host, opts ...Option) (*idService, error) {
|
||||
var cfg config
|
||||
cfg := config{
|
||||
timeout: DefaultTimeout,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(&cfg)
|
||||
}
|
||||
@@ -203,6 +206,7 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
|
||||
disableSignedPeerRecord: cfg.disableSignedPeerRecord,
|
||||
setupCompleted: make(chan struct{}),
|
||||
metricsTracer: cfg.metricsTracer,
|
||||
timeout: cfg.timeout,
|
||||
}
|
||||
|
||||
var normalize func(ma.Multiaddr) ma.Multiaddr
|
||||
@@ -344,10 +348,10 @@ func (ids *idService) sendPushes(ctx context.Context) {
|
||||
go func(c network.Conn) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(ctx, ids.timeout)
|
||||
defer cancel()
|
||||
|
||||
str, err := newStreamAndNegotiate(ctx, c, IDPush)
|
||||
str, err := newStreamAndNegotiate(ctx, c, IDPush, ids.timeout)
|
||||
if err != nil { // connection might have been closed recently
|
||||
return
|
||||
}
|
||||
@@ -438,34 +442,35 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
|
||||
}
|
||||
|
||||
// newStreamAndNegotiate opens a new stream on the given connection and negotiates the given protocol.
|
||||
func newStreamAndNegotiate(ctx context.Context, c network.Conn, proto protocol.ID) (network.Stream, error) {
|
||||
func newStreamAndNegotiate(ctx context.Context, c network.Conn, proto protocol.ID, timeout time.Duration) (network.Stream, error) {
|
||||
s, err := c.NewStream(network.WithAllowLimitedConn(ctx, "identify"))
|
||||
if err != nil {
|
||||
log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err)
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to open new stream: %w", err)
|
||||
}
|
||||
|
||||
// Ignore the error. Consistent with our previous behavior. (See https://github.com/libp2p/go-libp2p/issues/3109)
|
||||
_ = s.SetDeadline(time.Now().Add(Timeout))
|
||||
_ = s.SetDeadline(time.Now().Add(timeout))
|
||||
|
||||
if err := s.SetProtocol(proto); err != nil {
|
||||
log.Warnf("error setting identify protocol for stream: %s", err)
|
||||
_ = s.Reset()
|
||||
return nil, fmt.Errorf("failed to set protocol: %w", err)
|
||||
}
|
||||
|
||||
// ok give the response to our handler.
|
||||
if err := msmux.SelectProtoOrFail(proto, s); err != nil {
|
||||
log.Infow("failed negotiate identify protocol with peer", "peer", c.RemotePeer(), "error", err)
|
||||
_ = s.Reset()
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("multistream mux select protocol failed: %w", err)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (ids *idService) identifyConn(c network.Conn) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), Timeout)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), ids.timeout)
|
||||
defer cancel()
|
||||
s, err := newStreamAndNegotiate(network.WithAllowLimitedConn(ctx, "identify"), c, ID)
|
||||
s, err := newStreamAndNegotiate(network.WithAllowLimitedConn(ctx, "identify"), c, ID, ids.timeout)
|
||||
if err != nil {
|
||||
log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err)
|
||||
return err
|
||||
@@ -476,8 +481,10 @@ func (ids *idService) identifyConn(c network.Conn) error {
|
||||
|
||||
// handlePush handles incoming identify push streams
|
||||
func (ids *idService) handlePush(s network.Stream) {
|
||||
s.SetDeadline(time.Now().Add(Timeout))
|
||||
ids.handleIdentifyResponse(s, true)
|
||||
s.SetDeadline(time.Now().Add(ids.timeout))
|
||||
if err := ids.handleIdentifyResponse(s, true); err != nil {
|
||||
log.Debugf("failed to handle identify push: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (ids *idService) handleIdentifyRequest(s network.Stream) {
|
||||
|
@@ -817,12 +817,6 @@ func TestLargePushMessage(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestIdentifyResponseReadTimeout(t *testing.T) {
|
||||
timeout := identify.Timeout
|
||||
identify.Timeout = 100 * time.Millisecond
|
||||
defer func() {
|
||||
identify.Timeout = timeout
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@@ -832,12 +826,12 @@ func TestIdentifyResponseReadTimeout(t *testing.T) {
|
||||
defer h2.Close()
|
||||
|
||||
h2p := h2.ID()
|
||||
ids1, err := identify.NewIDService(h1)
|
||||
ids1, err := identify.NewIDService(h1, identify.WithTimeout(100*time.Millisecond))
|
||||
require.NoError(t, err)
|
||||
defer ids1.Close()
|
||||
ids1.Start()
|
||||
|
||||
ids2, err := identify.NewIDService(h2)
|
||||
ids2, err := identify.NewIDService(h2, identify.WithTimeout(100*time.Millisecond))
|
||||
require.NoError(t, err)
|
||||
defer ids2.Close()
|
||||
ids2.Start()
|
||||
@@ -863,12 +857,6 @@ func TestIdentifyResponseReadTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestIncomingIDStreamsTimeout(t *testing.T) {
|
||||
timeout := identify.Timeout
|
||||
identify.Timeout = 100 * time.Millisecond
|
||||
defer func() {
|
||||
identify.Timeout = timeout
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@@ -880,12 +868,12 @@ func TestIncomingIDStreamsTimeout(t *testing.T) {
|
||||
defer h1.Close()
|
||||
defer h2.Close()
|
||||
|
||||
ids1, err := identify.NewIDService(h1)
|
||||
ids1, err := identify.NewIDService(h1, identify.WithTimeout(100*time.Millisecond))
|
||||
require.NoError(t, err)
|
||||
defer ids1.Close()
|
||||
ids1.Start()
|
||||
|
||||
ids2, err := identify.NewIDService(h2)
|
||||
ids2, err := identify.NewIDService(h2, identify.WithTimeout(100*time.Millisecond))
|
||||
require.NoError(t, err)
|
||||
defer ids2.Close()
|
||||
ids2.Start()
|
||||
|
@@ -1,11 +1,14 @@
|
||||
package identify
|
||||
|
||||
import "time"
|
||||
|
||||
type config struct {
|
||||
protocolVersion string
|
||||
userAgent string
|
||||
disableSignedPeerRecord bool
|
||||
metricsTracer MetricsTracer
|
||||
disableObservedAddrManager bool
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// Option is an option function for identify.
|
||||
@@ -47,3 +50,10 @@ func DisableObservedAddrManager() Option {
|
||||
cfg.disableObservedAddrManager = true
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimeout sets the timeout for identify interactions.
|
||||
func WithTimeout(timeout time.Duration) Option {
|
||||
return func(cfg *config) {
|
||||
cfg.timeout = timeout
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user