identify: rate limit id push protocol (#3266)

The rate limits id pushes from peers to one every five second with an allowed burst of 10 pushes. This should be enough for all but malfunctioning and malicious peers.

We can use the exact same code for autonat, autonatv2, circuit v2, etc.

Introducing limits to identify separately to get some feedback for #3265. For this PR, I'd like to ignore issues regarding where should this piece of code go, and focus on how specifically it should behave. See the long comment in rateLimiter.allow for example. 

Part of: #3265
This commit is contained in:
sukun
2025-05-01 22:32:38 +05:30
committed by GitHub
parent effdc6525c
commit 50d714c94c
7 changed files with 611 additions and 6 deletions

1
go.mod
View File

@@ -63,6 +63,7 @@ require (
golang.org/x/crypto v0.35.0
golang.org/x/sync v0.11.0
golang.org/x/sys v0.30.0
golang.org/x/time v0.11.0
golang.org/x/tools v0.30.0
google.golang.org/protobuf v1.36.5
)

4
go.sum
View File

@@ -429,8 +429,8 @@ golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030000716-a0a13e073c7b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -0,0 +1,319 @@
// Package rate provides rate limiting functionality at a global, network, and subnet level.
package rate
import (
"container/heap"
"net/netip"
"slices"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"golang.org/x/time/rate"
)
// Limit is the configuration for a token bucket rate limiter.
// The bucket has a capacity of Burst, and is refilled at a rate of RPS tokens per second.
// Initially, buckets are completley full, i.e. tokens in the bucket is equal to `Burst`.
// In any given time interval T seconds, maximum events allowed will be `T*RPS + Burst`.
type Limit struct {
// RPS is the rate of requests per second in steady state.
RPS float64
// Burst is the number of requests allowed over the RPS.
Burst int
}
// PrefixLimit is a rate limit configuration that applies to a specific network prefix.
type PrefixLimit struct {
Prefix netip.Prefix
Limit
}
// SubnetLimit is a rate limit configuration that applies to a specific subnet.
type SubnetLimit struct {
PrefixLength int
Limit
}
// Limiter rate limits new streams for a service. It allows setting NetworkPrefix specific,
// global, and subnet specific limits. Use 0 for no rate limiting.
// The limiter maintains state that must be periodically cleaned up using Cleanup
type Limiter struct {
// NetworkPrefixLimits are limits for streams with peer IPs belonging to specific subnets.
// It can be used to increase the limit for trusted networks and decrease the limit for specific networks.
NetworkPrefixLimits []PrefixLimit
// GlobalLimit is the limit for all streams where the peer IP doesn't fall within any
// of the `NetworkPrefixLimits`
GlobalLimit Limit
// SubnetRateLimiter is a rate limiter for subnets.
SubnetRateLimiter SubnetLimiter
initOnce sync.Once
globalBucket *rate.Limiter
networkPrefixBuckets []*rate.Limiter // ith element ratelimits ith NetworkPrefixLimits
}
func (r *Limiter) init() {
r.initOnce.Do(func() {
if r.GlobalLimit.RPS == 0 {
r.globalBucket = rate.NewLimiter(rate.Inf, 0)
} else {
r.globalBucket = rate.NewLimiter(rate.Limit(r.GlobalLimit.RPS), r.GlobalLimit.Burst)
}
// sort such that the widest prefix (smallest bit count) is last.
slices.SortFunc(r.NetworkPrefixLimits, func(a, b PrefixLimit) int { return b.Prefix.Bits() - a.Prefix.Bits() })
r.networkPrefixBuckets = make([]*rate.Limiter, 0, len(r.NetworkPrefixLimits))
for _, limit := range r.NetworkPrefixLimits {
if limit.RPS == 0 {
r.networkPrefixBuckets = append(r.networkPrefixBuckets, rate.NewLimiter(rate.Inf, 0))
} else {
r.networkPrefixBuckets = append(r.networkPrefixBuckets, rate.NewLimiter(rate.Limit(limit.RPS), limit.Burst))
}
}
})
}
// Limit rate limits a StreamHandler function.
func (r *Limiter) Limit(f func(s network.Stream)) func(s network.Stream) {
r.init()
return func(s network.Stream) {
if !r.allow(s.Conn().RemoteMultiaddr()) {
_ = s.ResetWithError(network.StreamRateLimited)
return
}
f(s)
}
}
func (r *Limiter) allow(addr ma.Multiaddr) bool {
r.init()
// Check buckets from the most specific to the least.
//
// This ensures that a single peer cannot take up all the tokens in the global
// rate limiting bucket. We *MUST* follow this order because the rate limiter
// implementation doesn't have a `ReturnToken` method. If we checked the global
// bucket before the specific bucket, and the specific bucket rejected the
// request, there's no way to return the token to the global bucket. So all
// rejected requests from the specific bucket would take up tokens from the global bucket.
ip, err := manet.ToIP(addr)
if err != nil {
return r.globalBucket.Allow()
}
ipAddr, ok := netip.AddrFromSlice(ip)
if !ok {
return r.globalBucket.Allow()
}
// prefixs have been sorted from most to least specific so rejected requests for more
// specific prefixes don't take up tokens from the less specific prefixes.
isWithinNetworkPrefix := false
for i, limit := range r.NetworkPrefixLimits {
if limit.Prefix.Contains(ipAddr) {
if !r.networkPrefixBuckets[i].Allow() {
return false
}
isWithinNetworkPrefix = true
}
}
if isWithinNetworkPrefix {
return true
}
if !r.SubnetRateLimiter.Allow(ipAddr, time.Now()) {
return false
}
return r.globalBucket.Allow()
}
// SubnetLimiter rate limits requests per ip subnet.
type SubnetLimiter struct {
// IPv4SubnetLimits are the per subnet limits for streams with IPv4 Peers.
IPv4SubnetLimits []SubnetLimit
// IPv6SubnetLimits are the per subnet limits for streams with IPv6 Peers.
IPv6SubnetLimits []SubnetLimit
// GracePeriod is the time to wait to remove a full capacity bucket.
// Keeping a bucket around helps prevent allocations
GracePeriod time.Duration
initOnce sync.Once
mx sync.Mutex
ipv4Heaps []*bucketHeap
ipv6Heaps []*bucketHeap
}
func (s *SubnetLimiter) init() {
s.initOnce.Do(func() {
// smaller prefix length, i.e. largest subnet, last
slices.SortFunc(s.IPv4SubnetLimits, func(a, b SubnetLimit) int { return b.PrefixLength - a.PrefixLength })
slices.SortFunc(s.IPv6SubnetLimits, func(a, b SubnetLimit) int { return b.PrefixLength - a.PrefixLength })
s.ipv4Heaps = make([]*bucketHeap, len(s.IPv4SubnetLimits))
for i := range s.IPv4SubnetLimits {
s.ipv4Heaps[i] = &bucketHeap{
prefixBucket: make([]prefixBucketWithExpiry, 0),
prefixToIndex: make(map[netip.Prefix]int),
}
heap.Init(s.ipv4Heaps[i])
}
s.ipv6Heaps = make([]*bucketHeap, len(s.IPv6SubnetLimits))
for i := range s.IPv6SubnetLimits {
s.ipv6Heaps[i] = &bucketHeap{
prefixBucket: make([]prefixBucketWithExpiry, 0),
prefixToIndex: make(map[netip.Prefix]int),
}
heap.Init(s.ipv6Heaps[i])
}
})
}
// Allow returns true if requests for `ipAddr` are within specified rate limits
func (s *SubnetLimiter) Allow(ipAddr netip.Addr, now time.Time) bool {
s.init()
s.mx.Lock()
defer s.mx.Unlock()
s.cleanUp(now)
var subNetLimits []SubnetLimit
var heaps []*bucketHeap
if ipAddr.Is4() {
subNetLimits = s.IPv4SubnetLimits
heaps = s.ipv4Heaps
} else {
subNetLimits = s.IPv6SubnetLimits
heaps = s.ipv6Heaps
}
for i, limit := range subNetLimits {
prefix, err := ipAddr.Prefix(limit.PrefixLength)
if err != nil {
return false // we have a ipaddr this shouldn't happen
}
bucket := heaps[i].Get(prefix)
if bucket == (prefixBucketWithExpiry{}) {
bucket = prefixBucketWithExpiry{
Prefix: prefix,
tokenBucket: tokenBucket{rate.NewLimiter(rate.Limit(limit.RPS), limit.Burst)},
Expiry: now,
}
}
if !bucket.Allow() {
// bucket is empty, its expiry would have been set correctly the last time
// it allowed a request.
return false
}
bucket.Expiry = bucket.FullAt(now).Add(s.GracePeriod)
heaps[i].Upsert(bucket)
}
return true
}
// cleanUp removes limiters that have expired by now.
func (s *SubnetLimiter) cleanUp(now time.Time) {
for _, h := range s.ipv4Heaps {
h.Expire(now)
}
for _, h := range s.ipv6Heaps {
h.Expire(now)
}
}
// tokenBucket is a *rate.Limiter with a `FullAt` method.
type tokenBucket struct {
*rate.Limiter
}
// FullAt returns the instant at which the bucket will be full.
func (b *tokenBucket) FullAt(now time.Time) time.Time {
tokensNeeded := float64(b.Burst()) - b.TokensAt(now)
refillRate := float64(b.Limit())
eta := time.Duration((tokensNeeded / refillRate) * float64(time.Second))
return now.Add(eta)
}
// prefixBucketWithExpiry is a token bucket with a prefix and Expiry. The expiry is when the bucket
// will be full with tokens.
type prefixBucketWithExpiry struct {
tokenBucket
Prefix netip.Prefix
Expiry time.Time
}
// bucketHeap is a heap of buckets ordered by their Expiry. At expiry, the bucket
// is removed from the heap as a full bucket is indistinguishable from a new bucket.
type bucketHeap struct {
prefixBucket []prefixBucketWithExpiry
prefixToIndex map[netip.Prefix]int
}
var _ heap.Interface = (*bucketHeap)(nil)
// Upsert replaces the bucket with prefix `b.Prefix` with the provided bucket, `b`, or
// inserts `b` if no bucket with prefix `b.Prefix` exists.
func (h *bucketHeap) Upsert(b prefixBucketWithExpiry) {
if i, ok := h.prefixToIndex[b.Prefix]; ok {
h.prefixBucket[i] = b
heap.Fix(h, i)
return
}
heap.Push(h, b)
}
// Get returns the limiter for a prefix
func (h *bucketHeap) Get(prefix netip.Prefix) prefixBucketWithExpiry {
if i, ok := h.prefixToIndex[prefix]; ok {
return h.prefixBucket[i]
}
return prefixBucketWithExpiry{}
}
// Expire removes elements with expiry before `expiry`
func (h *bucketHeap) Expire(expiry time.Time) {
for h.Len() > 0 {
oldest := h.prefixBucket[0]
if oldest.Expiry.After(expiry) {
break
}
heap.Pop(h)
}
}
// Methods for the heap interface
// Len returns the length of the heap
func (h *bucketHeap) Len() int {
return len(h.prefixBucket)
}
// Less compares two elements in the heap
func (h *bucketHeap) Less(i, j int) bool {
return h.prefixBucket[i].Expiry.Before(h.prefixBucket[j].Expiry)
}
// Swap swaps two elements in the heap
func (h *bucketHeap) Swap(i, j int) {
h.prefixBucket[i], h.prefixBucket[j] = h.prefixBucket[j], h.prefixBucket[i]
h.prefixToIndex[h.prefixBucket[i].Prefix] = i
h.prefixToIndex[h.prefixBucket[j].Prefix] = j
}
// Push adds a new element to the heap
func (h *bucketHeap) Push(x any) {
item := x.(prefixBucketWithExpiry)
h.prefixBucket = append(h.prefixBucket, item)
h.prefixToIndex[item.Prefix] = len(h.prefixBucket) - 1
}
// Pop removes and returns the top element from the heap
func (h *bucketHeap) Pop() any {
n := len(h.prefixBucket)
item := h.prefixBucket[n-1]
h.prefixBucket = h.prefixBucket[0 : n-1]
delete(h.prefixToIndex, item.Prefix)
return item
}

View File

@@ -0,0 +1,257 @@
package rate
import (
"fmt"
"net/netip"
"testing"
"time"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)
const rateLimitErrorTolerance = 0.05
func getSleepDurationAndRequestCount(rps float64) (time.Duration, int) {
sleepDuration := 100 * time.Millisecond
requestCount := int(sleepDuration.Seconds() * float64(rps))
if requestCount < 1 {
// Adding 1ms to ensure we do get 1 request. If the rate is low enough that
// 100ms won't have a single request adding 1ms won't error here.
sleepDuration = time.Duration((1/rps)*float64(time.Second)) + 1*time.Millisecond
requestCount = 1
}
return sleepDuration, requestCount
}
func assertLimiter(t *testing.T, rl *Limiter, addr ma.Multiaddr, allowed, errorMargin int) {
t.Helper()
for i := 0; i < allowed; i++ {
require.True(t, rl.allow(addr))
}
for i := 0; i < errorMargin; i++ {
rl.allow(addr)
}
require.False(t, rl.allow(addr))
}
func TestLimiterGlobal(t *testing.T) {
addr := ma.StringCast("/ip4/127.0.0.1/udp/123/quic-v1")
limits := []Limit{
{RPS: 0.0, Burst: 1},
{RPS: 0.8, Burst: 1},
{RPS: 10, Burst: 20},
{RPS: 100, Burst: 200},
{RPS: 1000, Burst: 2000},
}
for _, limit := range limits {
t.Run(fmt.Sprintf("limit %0.1f", limit.RPS), func(t *testing.T) {
rl := &Limiter{
GlobalLimit: limit,
}
if limit.RPS == 0 {
// 0 implies no rate limiting, any large number would do
for i := 0; i < 1000; i++ {
require.True(t, rl.allow(addr))
}
return
}
assertLimiter(t, rl, addr, limit.Burst, int(limit.RPS*rateLimitErrorTolerance))
sleepDuration, requestCount := getSleepDurationAndRequestCount(limit.RPS)
time.Sleep(sleepDuration)
assertLimiter(t, rl, addr, requestCount, int(float64(requestCount)*rateLimitErrorTolerance))
})
}
}
func TestLimiterNetworkPrefix(t *testing.T) {
local := ma.StringCast("/ip4/127.0.0.1/udp/123/quic-v1")
public := ma.StringCast("/ip4/1.1.1.1/udp/123/quic-v1")
rl := &Limiter{
NetworkPrefixLimits: []PrefixLimit{
{Prefix: netip.MustParsePrefix("127.0.0.0/24"), Limit: Limit{}},
},
GlobalLimit: Limit{RPS: 10, Burst: 10},
}
// element within prefix is allowed even over the limit
for range rl.GlobalLimit.Burst + 100 {
require.True(t, rl.allow(local))
}
// rate limit public ips
assertLimiter(t, rl, public, rl.GlobalLimit.Burst, int(rl.GlobalLimit.RPS*rateLimitErrorTolerance))
// public ip rejected
require.False(t, rl.allow(public))
// local ip accepted
for range 100 {
require.True(t, rl.allow(local))
}
}
func TestLimiterNetworkPrefixWidth(t *testing.T) {
a1 := ma.StringCast("/ip4/1.1.1.1/udp/123/quic-v1")
a2 := ma.StringCast("/ip4/1.1.0.1/udp/123/quic-v1")
wideLimit := 20
narrowLimit := 10
rl := &Limiter{
NetworkPrefixLimits: []PrefixLimit{
{Prefix: netip.MustParsePrefix("1.1.0.0/16"), Limit: Limit{RPS: 0.01, Burst: wideLimit}},
{Prefix: netip.MustParsePrefix("1.1.1.0/24"), Limit: Limit{RPS: 0.01, Burst: narrowLimit}},
},
}
for range 2 * wideLimit {
rl.allow(a1)
}
// a1 rejected
require.False(t, rl.allow(a1))
// a2 accepted
for range wideLimit - narrowLimit {
require.True(t, rl.allow(a2))
}
}
func subnetAddrs(prefix netip.Prefix) func() netip.Addr {
next := prefix.Addr()
return func() netip.Addr {
addr := next
next = addr.Next()
if !prefix.Contains(addr) {
next = prefix.Addr()
addr = next
}
return addr
}
}
func TestSubnetLimiter(t *testing.T) {
assertOutput := func(outcome bool, rl *SubnetLimiter, subnetAddrs func() netip.Addr, n int) {
t.Helper()
for range n {
require.Equal(t, outcome, rl.Allow(subnetAddrs(), time.Now()), "%d", n)
}
}
t.Run("Simple", func(*testing.T) {
// Keep the refil rate low
v4Small := SubnetLimit{PrefixLength: 24, Limit: Limit{RPS: 0.0001, Burst: 10}}
v4Large := SubnetLimit{PrefixLength: 16, Limit: Limit{RPS: 0.0001, Burst: 19}}
v6Small := SubnetLimit{PrefixLength: 64, Limit: Limit{RPS: 0.0001, Burst: 10}}
v6Large := SubnetLimit{PrefixLength: 48, Limit: Limit{RPS: 0.0001, Burst: 17}}
rl := &SubnetLimiter{
IPv4SubnetLimits: []SubnetLimit{v4Large, v4Small},
IPv6SubnetLimits: []SubnetLimit{v6Large, v6Small},
}
v4SubnetAddr1 := subnetAddrs(netip.MustParsePrefix("192.168.1.1/24"))
v4SubnetAddr2 := subnetAddrs(netip.MustParsePrefix("192.168.2.1/24"))
v6SubnetAddr1 := subnetAddrs(netip.MustParsePrefix("2001:0:0:1::/64"))
v6SubnetAddr2 := subnetAddrs(netip.MustParsePrefix("2001:0:0:2::/64"))
assertOutput(true, rl, v4SubnetAddr1, v4Small.Burst)
assertOutput(false, rl, v4SubnetAddr1, v4Large.Burst)
assertOutput(true, rl, v4SubnetAddr2, v4Large.Burst-v4Small.Burst)
assertOutput(false, rl, v4SubnetAddr2, v4Large.Burst)
assertOutput(true, rl, v6SubnetAddr1, v6Small.Burst)
assertOutput(false, rl, v6SubnetAddr1, v6Large.Burst)
assertOutput(true, rl, v6SubnetAddr2, v6Large.Burst-v6Small.Burst)
assertOutput(false, rl, v6SubnetAddr2, v6Large.Burst)
})
t.Run("Complex", func(*testing.T) {
limits := []SubnetLimit{
{PrefixLength: 32, Limit: Limit{RPS: 0.01, Burst: 10}},
{PrefixLength: 24, Limit: Limit{RPS: 0.01, Burst: 20}},
{PrefixLength: 16, Limit: Limit{RPS: 0.01, Burst: 30}},
{PrefixLength: 8, Limit: Limit{RPS: 0.01, Burst: 40}},
}
rl := &SubnetLimiter{
IPv4SubnetLimits: limits,
}
snAddrs := []func() netip.Addr{
subnetAddrs(netip.MustParsePrefix("192.168.1.1/32")),
subnetAddrs(netip.MustParsePrefix("192.168.1.2/24")),
subnetAddrs(netip.MustParsePrefix("192.168.2.1/16")),
subnetAddrs(netip.MustParsePrefix("192.0.1.1/8")),
}
for i, addrsFunc := range snAddrs {
prev := 0
if i > 0 {
prev = limits[i-1].Burst
}
assertOutput(true, rl, addrsFunc, limits[i].Burst-prev)
assertOutput(false, rl, addrsFunc, limits[i].Burst)
}
})
t.Run("Zero", func(t *testing.T) {
sl := SubnetLimiter{}
for range 10000 {
require.True(t, sl.Allow(netip.IPv6Loopback(), time.Now()))
}
})
}
func TestSubnetLimiterCleanup(t *testing.T) {
tc := []struct {
Limit
TTL time.Duration
}{
{Limit: Limit{RPS: 1, Burst: 10}, TTL: 10 * time.Second},
{Limit: Limit{RPS: 0.1, Burst: 2}, TTL: 20 * time.Second},
{Limit: Limit{RPS: 1, Burst: 100}, TTL: 100 * time.Second},
{Limit: Limit{RPS: 3, Burst: 6}, TTL: 2 * time.Second},
}
for i, tt := range tc {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ip1, ip2 := netip.IPv6Loopback(), netip.MustParseAddr("2001::")
sl := SubnetLimiter{IPv6SubnetLimits: []SubnetLimit{{PrefixLength: 64, Limit: tt.Limit}}}
now := time.Now()
// Empty the ip1 bucket
for range tt.Burst {
require.True(t, sl.Allow(ip1, now))
}
for range tt.Burst / 2 {
require.True(t, sl.Allow(ip2, now))
}
epsilon := 100 * time.Millisecond
// just before ip1 expiry
now = now.Add(tt.TTL).Add(-epsilon)
sl.cleanUp(now) // ip2 will be removed
require.Equal(t, 1, sl.ipv6Heaps[0].Len())
// just after ip1 expiry
now = now.Add(2 * epsilon)
require.True(t, sl.Allow(ip2, now)) // remove the ip1 bucket
require.Equal(t, 1, sl.ipv6Heaps[0].Len()) // ip2 added in the previous call
})
}
}
func TestTokenBucketFullAfter(t *testing.T) {
tc := []struct {
*rate.Limiter
FullAfter time.Duration
}{
{Limiter: rate.NewLimiter(1, 10), FullAfter: 10 * time.Second},
{Limiter: rate.NewLimiter(0.01, 10), FullAfter: 1000 * time.Second},
{Limiter: rate.NewLimiter(0.01, 1), FullAfter: 100 * time.Second},
}
for i, tt := range tc {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
b := tokenBucket{tt.Limiter}
now := time.Now()
for range b.Burst() {
tt.Allow()
}
epsilon := 10 * time.Millisecond
require.GreaterOrEqual(t, tt.FullAfter+epsilon, b.FullAt(now).Sub(now))
require.LessOrEqual(t, tt.FullAfter-epsilon, b.FullAt(now).Sub(now))
})
}
}

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"net/netip"
"slices"
"sync"
"time"
@@ -19,6 +20,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/internal/rate"
useragent "github.com/libp2p/go-libp2p/p2p/protocol/identify/internal/user-agent"
"github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
@@ -55,6 +57,21 @@ const (
connectedPeerMaxAddrs = 500
)
var (
defaultNetworkPrefixRateLimits = []rate.PrefixLimit{
{Prefix: netip.MustParsePrefix("127.0.0.0/8"), Limit: rate.Limit{}}, // inf
{Prefix: netip.MustParsePrefix("::1/128"), Limit: rate.Limit{}}, // inf
}
defaultGlobalRateLimit = rate.Limit{RPS: 2000, Burst: 3000}
defaultIPv4SubnetRateLimits = []rate.SubnetLimit{
{PrefixLength: 24, Limit: rate.Limit{RPS: 0.2, Burst: 10}}, // 1 every 5 seconds
}
defaultIPv6SubnetRateLimits = []rate.SubnetLimit{
{PrefixLength: 56, Limit: rate.Limit{RPS: 0.2, Burst: 10}}, // 1 every 5 seconds
{PrefixLength: 48, Limit: rate.Limit{RPS: 0.5, Burst: 20}}, // 1 every 2 seconds
}
)
type identifySnapshot struct {
seq uint64
protocols []protocol.ID
@@ -174,6 +191,8 @@ type idService struct {
}
natEmitter *natEmitter
rateLimiter *rate.Limiter
}
type normalizer interface {
@@ -207,6 +226,15 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
setupCompleted: make(chan struct{}),
metricsTracer: cfg.metricsTracer,
timeout: cfg.timeout,
rateLimiter: &rate.Limiter{
GlobalLimit: defaultGlobalRateLimit,
NetworkPrefixLimits: defaultNetworkPrefixRateLimits,
SubnetRateLimiter: rate.SubnetLimiter{
IPv4SubnetLimits: defaultIPv4SubnetRateLimits,
IPv6SubnetLimits: defaultIPv6SubnetRateLimits,
GracePeriod: 1 * time.Minute,
},
},
}
var normalize func(ma.Multiaddr) ma.Multiaddr
@@ -249,7 +277,7 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
func (ids *idService) Start() {
ids.Host.Network().Notify((*netNotifiee)(ids))
ids.Host.SetStreamHandler(ID, ids.handleIdentifyRequest)
ids.Host.SetStreamHandler(IDPush, ids.handlePush)
ids.Host.SetStreamHandler(IDPush, ids.rateLimiter.Limit(ids.handlePush))
ids.updateSnapshot()
close(ids.setupCompleted)
@@ -869,7 +897,6 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
ProtocolVersion: pv,
AgentVersion: av,
})
}
func (ids *idService) consumeSignedPeerRecord(p peer.ID, signedPeerRecord *record.Envelope) ([]ma.Multiaddr, error) {

View File

@@ -96,6 +96,7 @@ require (
golang.org/x/sync v0.11.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.22.0 // indirect
golang.org/x/time v0.11.0 // indirect
golang.org/x/tools v0.30.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
lukechampine.com/blake3 v1.4.0 // indirect

View File

@@ -423,8 +423,8 @@ golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030000716-a0a13e073c7b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=