swarm: add a basic metrics tracer (#1973)

* swarm: add very basic metrics for opening and closing connections

* swarm: use a sync.Pool to make metrics collection allocation-free

* swarm: introduce a MetricsTracer interface

* swarm: add the transport to the dial error metric

* swarm: add Grafana dashboard

* swarm: use the prometheus namespace option
This commit is contained in:
Marten Seemann
2023-01-26 16:18:55 -08:00
committed by GitHub
parent d5a280e6f6
commit 3919359872
9 changed files with 2521 additions and 8 deletions

View File

@@ -119,7 +119,7 @@ type Config struct {
HolePunchingOptions []holepunch.Option
}
func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
func (cfg *Config) makeSwarm(enableMetrics bool) (*swarm.Swarm, error) {
if cfg.Peerstore == nil {
return nil, fmt.Errorf("no peerstore specified")
}
@@ -151,7 +151,7 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
return nil, err
}
opts := make([]swarm.Option, 0, 3)
opts := make([]swarm.Option, 0, 6)
if cfg.Reporter != nil {
opts = append(opts, swarm.WithMetrics(cfg.Reporter))
}
@@ -167,6 +167,9 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
if cfg.MultiaddrResolver != nil {
opts = append(opts, swarm.WithMultiaddrResolver(cfg.MultiaddrResolver))
}
if enableMetrics {
opts = append(opts, swarm.WithMetricsTracer(swarm.NewMetricsTracer()))
}
// TODO: Make the swarm implementation configurable.
return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
}
@@ -276,7 +279,7 @@ func (cfg *Config) addTransports(h host.Host) error {
//
// This function consumes the config. Do not reuse it (really!).
func (cfg *Config) NewNode() (host.Host, error) {
swrm, err := cfg.makeSwarm()
swrm, err := cfg.makeSwarm(true)
if err != nil {
return nil, err
}
@@ -382,7 +385,7 @@ func (cfg *Config) NewNode() (host.Host, error) {
Peerstore: ps,
}
dialer, err := autoNatCfg.makeSwarm()
dialer, err := autoNatCfg.makeSwarm(false)
if err != nil {
h.Close()
return nil, err

File diff suppressed because it is too large Load Diff

View File

@@ -71,6 +71,13 @@ func WithMetrics(reporter metrics.Reporter) Option {
}
}
func WithMetricsTracer(t MetricsTracer) Option {
return func(s *Swarm) error {
s.metricsTracer = t
return nil
}
}
func WithDialTimeout(t time.Duration) Option {
return func(s *Swarm) error {
s.dialTimeout = t
@@ -151,7 +158,8 @@ type Swarm struct {
ctx context.Context // is canceled when Close is called
ctxCancel context.CancelFunc
bwc metrics.Reporter
bwc metrics.Reporter
metricsTracer MetricsTracer
}
// NewSwarm constructs a Swarm.

View File

@@ -60,6 +60,10 @@ func (c *Conn) Close() error {
}
func (c *Conn) doClose() {
if c.swarm.metricsTracer != nil {
c.swarm.metricsTracer.ClosedConnection(c.stat.Direction, time.Since(c.stat.Stats.Opened), c.ConnState())
}
c.swarm.removeConn(c)
// Prevent new streams from opening.

View File

@@ -490,11 +490,20 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra
return nil, ErrNoTransport
}
start := time.Now()
connC, err := tpt.Dial(ctx, addr, p)
if err != nil {
if s.metricsTracer != nil {
s.metricsTracer.FailedDialing(addr, err)
}
return nil, err
}
canonicallog.LogPeerStatus(100, connC.RemotePeer(), connC.RemoteMultiaddr(), "connection_status", "established", "dir", "outbound")
if s.metricsTracer != nil {
connState := connC.ConnState()
s.metricsTracer.OpenedConnection(network.DirOutbound, connC.RemotePublicKey(), connState)
s.metricsTracer.CompletedHandshake(time.Since(start), connState)
}
// Trust the transport? Yeah... right.
if connC.RemotePeer() != p {

View File

@@ -130,6 +130,9 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
return
}
canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound")
if s.metricsTracer != nil {
s.metricsTracer.OpenedConnection(network.DirInbound, c.RemotePublicKey(), c.ConnState())
}
log.Debugf("swarm listener accepted connection: %s <-> %s", c.LocalMultiaddr(), c.RemoteMultiaddr())
s.refs.Add(1)

View File

@@ -0,0 +1,191 @@
package swarm
import (
"context"
"errors"
"net"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
)
const metricNamespace = "libp2p_swarm"
var (
connsOpened = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "connections_opened_total",
Help: "Connections Opened",
},
[]string{"dir", "transport", "security", "muxer"},
)
keyTypes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "key_types_total",
Help: "key type",
},
[]string{"dir", "key_type"},
)
connsClosed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "connections_closed_total",
Help: "Connections Closed",
},
[]string{"dir", "transport", "security", "muxer"},
)
dialError = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "dial_errors_total",
Help: "Dial Error",
},
[]string{"transport", "error"},
)
connDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "connection_duration_seconds",
Help: "Duration of a Connection",
Buckets: prometheus.ExponentialBuckets(1.0/16, 2, 25), // up to 24 days
},
[]string{"dir", "transport", "security", "muxer"},
)
connHandshakeLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metricNamespace,
Name: "handshake_latency_seconds",
Help: "Duration of the libp2p Handshake",
Buckets: prometheus.ExponentialBuckets(0.001, 1.3, 35),
},
[]string{"transport", "security", "muxer"},
)
)
var initMetricsOnce sync.Once
func initMetrics() {
prometheus.MustRegister(connsOpened, keyTypes, connsClosed, dialError, connDuration, connHandshakeLatency)
}
type MetricsTracer interface {
OpenedConnection(network.Direction, crypto.PubKey, network.ConnectionState)
ClosedConnection(network.Direction, time.Duration, network.ConnectionState)
CompletedHandshake(time.Duration, network.ConnectionState)
FailedDialing(ma.Multiaddr, error)
}
type metricsTracer struct{}
var _ MetricsTracer = &metricsTracer{}
func NewMetricsTracer() *metricsTracer {
initMetricsOnce.Do(initMetrics)
return &metricsTracer{}
}
var stringPool = sync.Pool{New: func() any {
s := make([]string, 0, 8)
return &s
}}
func getStringSlice() *[]string {
s := stringPool.Get().(*[]string)
*s = (*s)[:0]
return s
}
func putStringSlice(s *[]string) { stringPool.Put(s) }
func getDirection(dir network.Direction) string {
switch dir {
case network.DirOutbound:
return "outbound"
case network.DirInbound:
return "inbound"
default:
return "unknown"
}
}
func appendConnectionState(tags []string, cs network.ConnectionState) []string {
if cs.Transport == "" {
// This shouldn't happen, unless the transport doesn't properly set the Transport field in the ConnectionState.
tags = append(tags, "unknown")
} else {
tags = append(tags, cs.Transport)
}
// These might be empty, depending on the transport.
// For example, QUIC doesn't set security nor muxer.
tags = append(tags, cs.Security)
tags = append(tags, cs.StreamMultiplexer)
return tags
}
func (m *metricsTracer) OpenedConnection(dir network.Direction, p crypto.PubKey, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
*tags = append(*tags, getDirection(dir))
*tags = appendConnectionState(*tags, cs)
connsOpened.WithLabelValues(*tags...).Inc()
*tags = (*tags)[:0]
*tags = append(*tags, getDirection(dir))
*tags = append(*tags, p.Type().String())
keyTypes.WithLabelValues(*tags...).Inc()
}
func (m *metricsTracer) ClosedConnection(dir network.Direction, duration time.Duration, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
*tags = append(*tags, getDirection(dir))
*tags = appendConnectionState(*tags, cs)
connsClosed.WithLabelValues(*tags...).Inc()
*tags = (*tags)[:0]
*tags = append(*tags, getDirection(dir))
*tags = appendConnectionState(*tags, cs)
connDuration.WithLabelValues(*tags...).Observe(duration.Seconds())
}
func (m *metricsTracer) CompletedHandshake(t time.Duration, cs network.ConnectionState) {
tags := getStringSlice()
defer putStringSlice(tags)
*tags = appendConnectionState(*tags, cs)
connHandshakeLatency.WithLabelValues(*tags...).Observe(t.Seconds())
}
var transports = [...]int{ma.P_CIRCUIT, ma.P_WEBRTC, ma.P_WEBTRANSPORT, ma.P_QUIC, ma.P_QUIC_V1, ma.P_WSS, ma.P_WS, ma.P_TCP}
func (m *metricsTracer) FailedDialing(addr ma.Multiaddr, err error) {
var transport string
for _, t := range transports {
if _, err := addr.ValueForProtocol(t); err == nil {
transport = ma.ProtocolWithCode(t).Name
}
}
e := "other"
if errors.Is(err, context.Canceled) {
e = "canceled"
} else if errors.Is(err, context.DeadlineExceeded) {
e = "deadline"
} else {
nerr, ok := err.(net.Error)
if ok && nerr.Timeout() {
e = "timeout"
} else if strings.Contains(err.Error(), "connect: connection refused") {
e = "connection refused"
}
}
dialError.WithLabelValues(transport, e).Inc()
}

View File

@@ -0,0 +1,32 @@
package swarm
import (
"crypto/rand"
"testing"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/stretchr/testify/require"
)
func BenchmarkMetricsConnOpen(b *testing.B) {
b.ReportAllocs()
quicConnState := network.ConnectionState{Transport: "quic"}
tcpConnState := network.ConnectionState{
StreamMultiplexer: "yamux",
Security: "tls",
Transport: "tcp",
}
_, pub, err := crypto.GenerateEd25519Key(rand.Reader)
require.NoError(b, err)
tr := NewMetricsTracer()
for i := 0; i < b.N; i++ {
switch i % 2 {
case 0:
tr.OpenedConnection(network.DirInbound, pub, quicConnState)
case 1:
tr.OpenedConnection(network.DirInbound, pub, tcpConnState)
}
}
}

View File

@@ -144,7 +144,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
pconn, err := pnet.NewProtectedConn(u.psk, conn)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to setup private network protector: %s", err)
return nil, fmt.Errorf("failed to setup private network protector: %w", err)
}
conn = pconn
} else if ipnet.ForcePrivateNetwork {
@@ -155,7 +155,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
sconn, security, server, err := u.setupSecurity(ctx, conn, p, dir)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to negotiate security protocol: %s", err)
return nil, fmt.Errorf("failed to negotiate security protocol: %w", err)
}
// call the connection gater, if one is registered.
@@ -182,7 +182,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
muxer, smconn, err := u.setupMuxer(ctx, sconn, server, connScope.PeerScope())
if err != nil {
sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %w", err)
}
tc := &transportConn{