rcmgr: move StatsTraceReporter to rcmgr package (#2388)

This commit is contained in:
Sukun
2023-06-28 11:23:44 +05:30
committed by GitHub
parent 1e31d70533
commit 173fef8a2e
8 changed files with 77 additions and 65 deletions

View File

@@ -26,7 +26,7 @@ import (
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
routed "github.com/libp2p/go-libp2p/p2p/host/routed" routed "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-libp2p/p2p/net/swarm"
tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader" tptu "github.com/libp2p/go-libp2p/p2p/net/upgrader"
@@ -301,7 +301,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
} }
if !cfg.DisableMetrics { if !cfg.DisableMetrics {
rcmgrObs.MustRegisterWith(cfg.PrometheusRegisterer) rcmgr.MustRegisterWith(cfg.PrometheusRegisterer)
} }
h, err := bhost.NewHost(swrm, &bhost.HostOpts{ h, err := bhost.NewHost(swrm, &bhost.HostOpts{

View File

@@ -14,13 +14,12 @@ option libp2p.PrometheusRegisterer. For example:
import ( import (
// ... // ...
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
func SetupResourceManager() (network.ResourceManager, error) { func SetupResourceManager() (network.ResourceManager, error) {
str, err := rcmgrObs.NewStatsTraceReporter() str, err := rcmgr.NewStatsTraceReporter()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -48,8 +48,8 @@ limits := cfg.Build(scaledDefaultLimits)
limiter := rcmgr.NewFixedLimiter(limits) limiter := rcmgr.NewFixedLimiter(limits)
// (Optional if you want metrics) // (Optional if you want metrics)
rcmgrObs.MustRegisterWith(prometheus.DefaultRegisterer) rcmgr.MustRegisterWith(prometheus.DefaultRegisterer)
str, err := rcmgrObs.NewStatsTraceReporter() str, err := rcmgr.NewStatsTraceReporter()
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -1,6 +1,6 @@
//go:build nocover //go:build nocover
package obs package rcmgr
import ( import (
"math/rand" "math/rand"
@@ -8,26 +8,25 @@ import (
"testing" "testing"
"time" "time"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt { func randomTraceEvt(rng *rand.Rand) TraceEvt {
// Possibly non-sensical // Possibly non-sensical
typs := []rcmgr.TraceEvtTyp{ typs := []TraceEvtTyp{
rcmgr.TraceStartEvt, TraceStartEvt,
rcmgr.TraceCreateScopeEvt, TraceCreateScopeEvt,
rcmgr.TraceDestroyScopeEvt, TraceDestroyScopeEvt,
rcmgr.TraceReserveMemoryEvt, TraceReserveMemoryEvt,
rcmgr.TraceBlockReserveMemoryEvt, TraceBlockReserveMemoryEvt,
rcmgr.TraceReleaseMemoryEvt, TraceReleaseMemoryEvt,
rcmgr.TraceAddStreamEvt, TraceAddStreamEvt,
rcmgr.TraceBlockAddStreamEvt, TraceBlockAddStreamEvt,
rcmgr.TraceRemoveStreamEvt, TraceRemoveStreamEvt,
rcmgr.TraceAddConnEvt, TraceAddConnEvt,
rcmgr.TraceBlockAddConnEvt, TraceBlockAddConnEvt,
rcmgr.TraceRemoveConnEvt, TraceRemoveConnEvt,
} }
names := []string{ names := []string{
@@ -43,7 +42,7 @@ func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {
"service:libp2p.autonat.peer:12D3Koo", "service:libp2p.autonat.peer:12D3Koo",
} }
return rcmgr.TraceEvt{ return TraceEvt{
Type: typs[rng.Intn(len(typs))], Type: typs[rng.Intn(len(typs))],
Name: names[rng.Intn(len(names))], Name: names[rng.Intn(len(names))],
DeltaOut: rng.Intn(5), DeltaOut: rng.Intn(5),
@@ -60,7 +59,7 @@ func randomTraceEvt(rng *rand.Rand) rcmgr.TraceEvt {
} }
var registerOnce sync.Once var regOnce sync.Once
func BenchmarkMetricsRecording(b *testing.B) { func BenchmarkMetricsRecording(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
@@ -70,7 +69,7 @@ func BenchmarkMetricsRecording(b *testing.B) {
}) })
evtCount := 10000 evtCount := 10000
evts := make([]rcmgr.TraceEvt, evtCount) evts := make([]TraceEvt, evtCount)
rng := rand.New(rand.NewSource(int64(b.N))) rng := rand.New(rand.NewSource(int64(b.N)))
for i := 0; i < evtCount; i++ { for i := 0; i < evtCount; i++ {
evts[i] = randomTraceEvt(rng) evts[i] = randomTraceEvt(rng)
@@ -92,7 +91,7 @@ func TestNoAllocsNoCover(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
evtCount := 10_000 evtCount := 10_000
evts := make([]rcmgr.TraceEvt, 0, evtCount) evts := make([]TraceEvt, 0, evtCount)
rng := rand.New(rand.NewSource(1)) rng := rand.New(rand.NewSource(1))
for i := 0; i < evtCount; i++ { for i := 0; i < evtCount; i++ {

View File

@@ -0,0 +1,18 @@
// Package obs implements metrics tracing for resource manager
//
// Deprecated: obs is deprecated and the exported types and methods
// are moved to rcmgr package. Use the corresponding identifier in
// the rcmgr package, for example
// obs.NewStatsTraceReporter => rcmgr.NewStatsTraceReporter
package obs
import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
)
var MustRegisterWith = rcmgr.MustRegisterWith
// StatsTraceReporter reports stats on the resource manager using its traces.
type StatsTraceReporter = rcmgr.StatsTraceReporter
var NewStatsTraceReporter = rcmgr.NewStatsTraceReporter

View File

@@ -1,9 +1,8 @@
package obs package rcmgr
import ( import (
"strings" "strings"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/metricshelper" "github.com/libp2p/go-libp2p/p2p/metricshelper"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@@ -74,7 +73,7 @@ var (
previousPeerStreamsOutbound = previousPeerStreams.With(prometheus.Labels{"dir": "outbound"}) previousPeerStreamsOutbound = previousPeerStreams.With(prometheus.Labels{"dir": "outbound"})
// Memory // Memory
memory = prometheus.NewGaugeVec(prometheus.GaugeOpts{ memoryTotal = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricNamespace, Namespace: metricNamespace,
Name: "memory", Name: "memory",
Help: "Amount of memory reserved as reported to the Resource Manager", Help: "Amount of memory reserved as reported to the Resource Manager",
@@ -151,7 +150,7 @@ func MustRegisterWith(reg prometheus.Registerer) {
previousPeerStreams, previousPeerStreams,
memory, memoryTotal,
peerMemory, peerMemory,
previousPeerMemory, previousPeerMemory,
connMemory, connMemory,
@@ -169,7 +168,7 @@ func NewStatsTraceReporter() (StatsTraceReporter, error) {
return StatsTraceReporter{}, nil return StatsTraceReporter{}, nil
} }
func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) { func (r StatsTraceReporter) ConsumeEvent(evt TraceEvt) {
tags := metricshelper.GetStringSlice() tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags) defer metricshelper.PutStringSlice(tags)
@@ -177,10 +176,10 @@ func (r StatsTraceReporter) ConsumeEvent(evt rcmgr.TraceEvt) {
} }
// Separate func so that we can test that this function does not allocate. The syncPool may allocate. // Separate func so that we can test that this function does not allocate. The syncPool may allocate.
func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags *[]string) { func (r StatsTraceReporter) consumeEventWithLabelSlice(evt TraceEvt, tags *[]string) {
switch evt.Type { switch evt.Type {
case rcmgr.TraceAddStreamEvt, rcmgr.TraceRemoveStreamEvt: case TraceAddStreamEvt, TraceRemoveStreamEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { if p := PeerStrInScopeName(evt.Name); p != "" {
// Aggregated peer stats. Counts how many peers have N number of streams open. // Aggregated peer stats. Counts how many peers have N number of streams open.
// Uses two buckets aggregations. One to count how many streams the // Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many // peer has now. The other to count the negative value, or how many
@@ -210,11 +209,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
} }
} else { } else {
if evt.DeltaOut != 0 { if evt.DeltaOut != 0 {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0] *tags = (*tags)[:0]
*tags = append(*tags, "outbound", evt.Name, "") *tags = append(*tags, "outbound", evt.Name, "")
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut)) streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { } else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0] *tags = (*tags)[:0]
*tags = append(*tags, "outbound", "protocol", proto) *tags = append(*tags, "outbound", "protocol", proto)
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut)) streams.WithLabelValues(*tags...).Set(float64(evt.StreamsOut))
@@ -227,11 +226,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
} }
if evt.DeltaIn != 0 { if evt.DeltaIn != 0 {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0] *tags = (*tags)[:0]
*tags = append(*tags, "inbound", evt.Name, "") *tags = append(*tags, "inbound", evt.Name, "")
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn)) streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { } else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0] *tags = (*tags)[:0]
*tags = append(*tags, "inbound", "protocol", proto) *tags = append(*tags, "inbound", "protocol", proto)
streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn)) streams.WithLabelValues(*tags...).Set(float64(evt.StreamsIn))
@@ -244,8 +243,8 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
} }
} }
case rcmgr.TraceAddConnEvt, rcmgr.TraceRemoveConnEvt: case TraceAddConnEvt, TraceRemoveConnEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { if p := PeerStrInScopeName(evt.Name); p != "" {
// Aggregated peer stats. Counts how many peers have N number of connections. // Aggregated peer stats. Counts how many peers have N number of connections.
// Uses two buckets aggregations. One to count how many streams the // Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many // peer has now. The other to count the negative value, or how many
@@ -274,31 +273,31 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
} }
} }
} else { } else {
if rcmgr.IsConnScope(evt.Name) { if IsConnScope(evt.Name) {
// Not measuring this. I don't think it's useful. // Not measuring this. I don't think it's useful.
break break
} }
if rcmgr.IsSystemScope(evt.Name) { if IsSystemScope(evt.Name) {
connsInboundSystem.Set(float64(evt.ConnsIn)) connsInboundSystem.Set(float64(evt.ConnsIn))
connsOutboundSystem.Set(float64(evt.ConnsOut)) connsOutboundSystem.Set(float64(evt.ConnsOut))
} else if rcmgr.IsTransientScope(evt.Name) { } else if IsTransientScope(evt.Name) {
connsInboundTransient.Set(float64(evt.ConnsIn)) connsInboundTransient.Set(float64(evt.ConnsIn))
connsOutboundTransient.Set(float64(evt.ConnsOut)) connsOutboundTransient.Set(float64(evt.ConnsOut))
} }
// Represents the delta in fds // Represents the delta in fds
if evt.Delta != 0 { if evt.Delta != 0 {
if rcmgr.IsSystemScope(evt.Name) { if IsSystemScope(evt.Name) {
fdsSystem.Set(float64(evt.FD)) fdsSystem.Set(float64(evt.FD))
} else if rcmgr.IsTransientScope(evt.Name) { } else if IsTransientScope(evt.Name) {
fdsTransient.Set(float64(evt.FD)) fdsTransient.Set(float64(evt.FD))
} }
} }
} }
case rcmgr.TraceReserveMemoryEvt, rcmgr.TraceReleaseMemoryEvt: case TraceReserveMemoryEvt, TraceReleaseMemoryEvt:
if p := rcmgr.PeerStrInScopeName(evt.Name); p != "" { if p := PeerStrInScopeName(evt.Name); p != "" {
oldMem := evt.Memory - evt.Delta oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory { if oldMem != evt.Memory {
if oldMem != 0 { if oldMem != 0 {
@@ -308,7 +307,7 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
peerMemory.Observe(float64(evt.Memory)) peerMemory.Observe(float64(evt.Memory))
} }
} }
} else if rcmgr.IsConnScope(evt.Name) { } else if IsConnScope(evt.Name) {
oldMem := evt.Memory - evt.Delta oldMem := evt.Memory - evt.Delta
if oldMem != evt.Memory { if oldMem != evt.Memory {
if oldMem != 0 { if oldMem != 0 {
@@ -319,14 +318,14 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
} }
} }
} else { } else {
if rcmgr.IsSystemScope(evt.Name) || rcmgr.IsTransientScope(evt.Name) { if IsSystemScope(evt.Name) || IsTransientScope(evt.Name) {
*tags = (*tags)[:0] *tags = (*tags)[:0]
*tags = append(*tags, evt.Name, "") *tags = append(*tags, evt.Name, "")
memory.WithLabelValues(*tags...).Set(float64(evt.Memory)) memoryTotal.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else if proto := rcmgr.ParseProtocolScopeName(evt.Name); proto != "" { } else if proto := ParseProtocolScopeName(evt.Name); proto != "" {
*tags = (*tags)[:0] *tags = (*tags)[:0]
*tags = append(*tags, "protocol", proto) *tags = append(*tags, "protocol", proto)
memory.WithLabelValues(*tags...).Set(float64(evt.Memory)) memoryTotal.WithLabelValues(*tags...).Set(float64(evt.Memory))
} else { } else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and // Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer // you can use aggregated peer stats + service stats to infer
@@ -335,11 +334,11 @@ func (r StatsTraceReporter) consumeEventWithLabelSlice(evt rcmgr.TraceEvt, tags
} }
} }
case rcmgr.TraceBlockAddConnEvt, rcmgr.TraceBlockAddStreamEvt, rcmgr.TraceBlockReserveMemoryEvt: case TraceBlockAddConnEvt, TraceBlockAddStreamEvt, TraceBlockReserveMemoryEvt:
var resource string var resource string
if evt.Type == rcmgr.TraceBlockAddConnEvt { if evt.Type == TraceBlockAddConnEvt {
resource = "connection" resource = "connection"
} else if evt.Type == rcmgr.TraceBlockAddStreamEvt { } else if evt.Type == TraceBlockAddStreamEvt {
resource = "stream" resource = "stream"
} else { } else {
resource = "memory" resource = "memory"

View File

@@ -1,19 +1,17 @@
package obs_test package rcmgr
import ( import (
"sync" "sync"
"testing" "testing"
"time" "time"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
var registerOnce sync.Once var registerOnce sync.Once
func TestTraceReporterStartAndClose(t *testing.T) { func TestTraceReporterStartAndClose(t *testing.T) {
rcmgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.DefaultLimits.AutoScale()), rcmgr.WithTraceReporter(obs.StatsTraceReporter{})) rcmgr, err := NewResourceManager(NewFixedLimiter(DefaultLimits.AutoScale()), WithTraceReporter(StatsTraceReporter{}))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -21,18 +19,18 @@ func TestTraceReporterStartAndClose(t *testing.T) {
} }
func TestConsumeEvent(t *testing.T) { func TestConsumeEvent(t *testing.T) {
evt := rcmgr.TraceEvt{ evt := TraceEvt{
Type: rcmgr.TraceBlockAddStreamEvt, Type: TraceBlockAddStreamEvt,
Name: "conn-1", Name: "conn-1",
DeltaOut: 1, DeltaOut: 1,
Time: time.Now().Format(time.RFC3339Nano), Time: time.Now().Format(time.RFC3339Nano),
} }
registerOnce.Do(func() { registerOnce.Do(func() {
obs.MustRegisterWith(prometheus.DefaultRegisterer) MustRegisterWith(prometheus.DefaultRegisterer)
}) })
str, err := obs.NewStatsTraceReporter() str, err := NewStatsTraceReporter()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p/p2p/host/resource-manager/obs"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -319,7 +318,7 @@ func TestReadmeExample(t *testing.T) {
limiter := rcmgr.NewFixedLimiter(limits) limiter := rcmgr.NewFixedLimiter(limits)
// (Optional if you want metrics) Construct the OpenCensus metrics reporter. // (Optional if you want metrics) Construct the OpenCensus metrics reporter.
str, err := rcmgrObs.NewStatsTraceReporter() str, err := rcmgr.NewStatsTraceReporter()
if err != nil { if err != nil {
panic(err) panic(err)
} }