Add interface for getting media-playout stats

This commit is contained in:
Shreyas Jaganmohan
2025-09-26 12:53:53 -04:00
parent bf15721dd9
commit 0575dfbe15
6 changed files with 571 additions and 0 deletions

View File

@@ -413,6 +413,7 @@ func (r *RTPReceiver) collectStats(collector *statsReportCollector, statsGetter
mid = r.tr.Mid()
}
now := statsTimestampNow()
nowTime := now.Time()
for trackIndex := range r.tracks {
remoteTrack := r.tracks[trackIndex].track
if remoteTrack == nil {
@@ -465,6 +466,22 @@ func (r *RTPReceiver) collectStats(collector *statsReportCollector, statsGetter
}
collector.Collect(inboundID, inboundStats)
if remoteTrack.Kind() == RTPCodecTypeAudio {
r.collectAudioPlayoutStats(collector, nowTime, remoteTrack)
}
}
}
func (r *RTPReceiver) collectAudioPlayoutStats(
collector *statsReportCollector,
nowTime time.Time,
remoteTrack *TrackRemote,
) {
playoutStats := remoteTrack.pullAudioPlayoutStats(nowTime)
for _, stats := range playoutStats {
collector.Collecting()
collector.Collect(stats.ID, stats)
}
}

View File

@@ -132,6 +132,143 @@ func TestRTPReceiver_CollectStats_Mapping(t *testing.T) {
assert.Greater(t, float64(inbound.LastPacketReceivedTimestamp), 0.0)
}
func TestRTPReceiver_CollectStats_AudioPlayoutPull(t *testing.T) {
receiver := &RTPReceiver{
kind: RTPCodecTypeAudio,
log: logging.NewDefaultLoggerFactory().NewLogger("RTPReceiverTest"),
}
track := newTrackRemote(RTPCodecTypeAudio, 7777, 0, "", receiver)
receiver.tracks = []trackStreams{{track: track}}
provider := &fakeAudioPlayoutStatsProvider{
stats: AudioPlayoutStats{
ID: "media-playout-7777",
Type: StatsTypeMediaPlayout,
Kind: string(MediaKindAudio),
TotalSamplesCount: 960,
TotalSamplesDuration: float64(960) / 48000,
TotalPlayoutDelay: 0.5,
},
ok: true,
}
_ = provider.AddTrack(track)
collector := newStatsReportCollector()
receiver.collectStats(collector, &fakeGetter{})
report := collector.Ready()
got, ok := report["media-playout-7777"]
require.True(t, ok, "missing audio playout stats entry")
playout, ok := got.(AudioPlayoutStats)
require.True(t, ok)
assert.Equal(t, provider.stats.TotalSamplesCount, playout.TotalSamplesCount)
assert.Equal(t, provider.stats.TotalSamplesDuration, playout.TotalSamplesDuration)
assert.Equal(t, provider.stats.TotalPlayoutDelay, playout.TotalPlayoutDelay)
assert.NotZero(t, playout.Timestamp)
assert.Equal(t, 1, provider.calls)
}
func TestRTPReceiver_CollectStats_AudioPlayoutSharedProvider(t *testing.T) {
receiver := &RTPReceiver{
kind: RTPCodecTypeAudio,
log: logging.NewDefaultLoggerFactory().NewLogger("RTPReceiverTest"),
}
trackOne := newTrackRemote(RTPCodecTypeAudio, 5555, 0, "", receiver)
trackTwo := newTrackRemote(RTPCodecTypeAudio, 6666, 0, "", receiver)
receiver.tracks = []trackStreams{{track: trackOne}, {track: trackTwo}}
provider := &fakeAudioPlayoutStatsProvider{
stats: AudioPlayoutStats{
ID: "shared-playout",
Type: StatsTypeMediaPlayout,
Kind: string(MediaKindAudio),
TotalSamplesCount: 100,
},
ok: true,
}
_ = provider.AddTrack(trackOne)
_ = provider.AddTrack(trackTwo)
collector := newStatsReportCollector()
receiver.collectStats(collector, &fakeGetter{})
report := collector.Ready()
got, ok := report["shared-playout"]
require.True(t, ok, "shared provider stats missing")
playout, ok := got.(AudioPlayoutStats)
require.True(t, ok)
assert.Equal(t, provider.stats.TotalSamplesCount, playout.TotalSamplesCount)
assert.Equal(t, provider.stats.Type, playout.Type)
assert.Equal(t, provider.stats.Kind, playout.Kind)
assert.Equal(t, provider.stats.ID, playout.ID)
assert.NotZero(t, playout.Timestamp)
assert.Equal(t, 2, provider.calls)
}
func TestRTPReceiver_CollectStats_AudioPlayoutTimestampAlignment(t *testing.T) {
receiver := &RTPReceiver{
kind: RTPCodecTypeAudio,
log: logging.NewDefaultLoggerFactory().NewLogger("RTPReceiverTest"),
}
track := newTrackRemote(RTPCodecTypeAudio, 9999, 0, "", receiver)
receiver.tracks = []trackStreams{{track: track}}
provider := &fakeAudioPlayoutStatsProvider{
stats: AudioPlayoutStats{
ID: "media-playout-9999",
Type: StatsTypeMediaPlayout,
Kind: string(MediaKindAudio),
TotalSamplesCount: 1,
},
ok: true,
}
_ = provider.AddTrack(track)
collector := newStatsReportCollector()
receiver.collectStats(collector, &fakeGetter{})
report := collector.Ready()
got, ok := report["media-playout-9999"]
require.True(t, ok, "playout stats missing")
playout, ok := got.(AudioPlayoutStats)
require.True(t, ok, "playout stats type assertion failed")
require.NotZero(t, provider.lastNow)
assert.Equal(t, statsTimestampFrom(provider.lastNow), playout.Timestamp)
}
type fakeGetter struct{ s stats.Stats }
func (f *fakeGetter) Get(uint32) *stats.Stats { return &f.s }
type fakeAudioPlayoutStatsProvider struct {
stats AudioPlayoutStats
ok bool
calls int
lastNow time.Time
}
func (f *fakeAudioPlayoutStatsProvider) Snapshot(now time.Time) (AudioPlayoutStats, bool) {
f.calls++
f.lastNow = now
return f.stats, f.ok
}
func (f *fakeAudioPlayoutStatsProvider) AddTrack(track *TrackRemote) error {
track.addProvider(f)
return nil
}
func (f *fakeAudioPlayoutStatsProvider) RemoveTrack(track *TrackRemote) {
track.removeProvider(f)
}

View File

@@ -6,6 +6,12 @@
package webrtc
import (
"context"
"sync"
"time"
)
// GetConnectionStats is a helper method to return the associated stats for a given PeerConnection.
func (r StatsReport) GetConnectionStats(conn *PeerConnection) (PeerConnectionStats, bool) {
statsID := conn.getStatsID()
@@ -101,3 +107,137 @@ func (r StatsReport) GetCodecStats(c *RTPCodecParameters) (CodecStats, bool) {
return codecStats, true
}
// AudioPlayoutStatsProvider is an interface for getting audio playout metrics.
type AudioPlayoutStatsProvider interface {
// AddTrack registers a track to report playout stats to this provider.
AddTrack(track *TrackRemote) error
// RemoveTrack unregisters a track from this provider.
RemoveTrack(track *TrackRemote)
// Snapshot returns the accumulated stats at the given time.
Snapshot(now time.Time) (AudioPlayoutStats, bool)
}
type trackContext struct {
cancel context.CancelFunc
}
// defaultAudioPlayoutStatsProvider accumulates audio playout stats on behalf of the application.
type defaultAudioPlayoutStatsProvider struct {
mu sync.Mutex
stats AudioPlayoutStats
lastSynthesized bool
tracks map[*TrackRemote]*trackContext
}
// NewAudioPlayoutStatsProvider constructs a default provider with the supplied stats ID.
func NewAudioPlayoutStatsProvider(id string) *defaultAudioPlayoutStatsProvider {
return &defaultAudioPlayoutStatsProvider{
stats: AudioPlayoutStats{
ID: id,
Type: StatsTypeMediaPlayout,
Kind: string(MediaKindAudio),
},
tracks: make(map[*TrackRemote]*trackContext),
}
}
// Accumulate applies a new batch of played-out samples to the running totals.
func (p *defaultAudioPlayoutStatsProvider) Accumulate(
samples int, sampleRate uint32, deviceDelay time.Duration, synthesized bool,
) {
if samples <= 0 || sampleRate == 0 {
return
}
delaySeconds := deviceDelay.Seconds()
if delaySeconds < 0 {
delaySeconds = 0
}
duration := float64(samples) / float64(sampleRate)
p.mu.Lock()
defer p.mu.Unlock()
p.stats.TotalSamplesCount += uint64(samples)
p.stats.TotalSamplesDuration += duration
p.stats.TotalPlayoutDelay += delaySeconds * float64(samples)
if synthesized {
p.stats.SynthesizedSamplesDuration += duration
if !p.lastSynthesized {
p.stats.SynthesizedSamplesEvents++
}
}
p.lastSynthesized = synthesized
}
// Snapshot returns the accumulated stats at the given time.
func (p *defaultAudioPlayoutStatsProvider) Snapshot(now time.Time) (AudioPlayoutStats, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.stats.TotalSamplesCount == 0 {
return AudioPlayoutStats{}, false
}
stats := p.stats
stats.Timestamp = statsTimestampFrom(now)
return stats, true
}
// AddTrack registers a track to report playout stats to this provider.
func (p *defaultAudioPlayoutStatsProvider) AddTrack(track *TrackRemote) error {
p.mu.Lock()
defer p.mu.Unlock()
if _, exists := p.tracks[track]; exists {
return nil
}
track.addProvider(p)
ctx, cancel := context.WithCancel(context.Background())
p.tracks[track] = &trackContext{cancel: cancel}
go func() {
receiver := track.receiver
if receiver == nil {
cancel()
return
}
select {
case <-receiver.closed:
p.removeTrackInternal(track)
case <-ctx.Done():
return
}
}()
return nil
}
// RemoveTrack unregisters a track from this provider.
func (p *defaultAudioPlayoutStatsProvider) RemoveTrack(track *TrackRemote) {
p.removeTrackInternal(track)
}
func (p *defaultAudioPlayoutStatsProvider) removeTrackInternal(track *TrackRemote) {
p.mu.Lock()
defer p.mu.Unlock()
if tc, exists := p.tracks[track]; exists {
tc.cancel()
delete(p.tracks, track)
}
track.removeProvider(p)
}

View File

@@ -2287,3 +2287,97 @@ func TestStatsReport_GetCodecStats_Success(t *testing.T) {
require.True(t, ok)
assert.Equal(t, want, got)
}
func TestDefaultAudioPlayoutStatsProvider_AccumulateSnapshot(t *testing.T) {
provider := NewAudioPlayoutStatsProvider("media-playout-1001")
sampleRate := uint32(48000)
now := time.Unix(1710000000, 500*int64(time.Millisecond))
samplesPerBatch := 960 * 2
batches := []struct {
delay time.Duration
synthesized bool
}{
{20 * time.Millisecond, true},
{25 * time.Millisecond, true},
{25 * time.Millisecond, false},
}
for _, batch := range batches {
provider.Accumulate(samplesPerBatch, sampleRate, batch.delay, batch.synthesized)
}
stats, ok := provider.Snapshot(now)
require.True(t, ok)
assert.Equal(t, StatsTypeMediaPlayout, stats.Type)
assert.Equal(t, "media-playout-1001", stats.ID)
assert.Equal(t, string(MediaKindAudio), stats.Kind)
assert.Equal(t, statsTimestampFrom(now), stats.Timestamp)
samplesPerBatchU64 := uint64(samplesPerBatch) //#nosec G115 -- samplesPerBatch is a small test value
expectedSamples := samplesPerBatchU64 * uint64(len(batches))
assert.Equal(t, expectedSamples, stats.TotalSamplesCount)
expectedDuration := float64(expectedSamples) / float64(sampleRate)
assert.Equal(t, expectedDuration, stats.TotalSamplesDuration)
synthesizedDuration := float64(samplesPerBatch*2) / float64(sampleRate)
assert.Equal(t, synthesizedDuration, stats.SynthesizedSamplesDuration)
assert.EqualValues(t, 1, stats.SynthesizedSamplesEvents)
totalDelay := 0.0
for _, batch := range batches {
totalDelay += batch.delay.Seconds() * float64(samplesPerBatch)
}
assert.Equal(t, totalDelay, stats.TotalPlayoutDelay)
}
func TestDefaultAudioPlayoutStatsProvider_AddRemoveTrack(t *testing.T) {
receiver := &RTPReceiver{closed: make(chan any)}
track := newTrackRemote(RTPCodecTypeAudio, 1234, 0, "", receiver)
samplesPerBatch := 960
provider := NewAudioPlayoutStatsProvider("media-playout-device-1")
err := provider.AddTrack(track)
require.NoError(t, err)
defer provider.RemoveTrack(track)
provider.Accumulate(samplesPerBatch, 48000, 10*time.Millisecond, false)
stats := track.pullAudioPlayoutStats(time.Now())
require.Len(t, stats, 1)
assert.Equal(t, "media-playout-device-1", stats[0].ID)
assert.EqualValues(t, samplesPerBatch, stats[0].TotalSamplesCount)
provider.RemoveTrack(track)
stats = track.pullAudioPlayoutStats(time.Now())
require.Empty(t, stats)
}
func TestDefaultAudioPlayoutStatsProvider_MultipleProviders(t *testing.T) {
receiver := &RTPReceiver{closed: make(chan any)}
track := newTrackRemote(RTPCodecTypeAudio, 5555, 0, "", receiver)
samplesPerBatch := 960
provider1 := NewAudioPlayoutStatsProvider("media-playout-speaker")
provider2 := NewAudioPlayoutStatsProvider("media-playout-headphones")
err := provider1.AddTrack(track)
require.NoError(t, err)
defer provider1.RemoveTrack(track)
err = provider2.AddTrack(track)
require.NoError(t, err)
defer provider2.RemoveTrack(track)
provider1.Accumulate(samplesPerBatch, 48000, 10*time.Millisecond, false)
provider2.Accumulate(samplesPerBatch*2, 48000, 15*time.Millisecond, false)
stats := track.pullAudioPlayoutStats(time.Now())
require.Len(t, stats, 2)
ids := []string{stats[0].ID, stats[1].ID}
assert.Contains(t, ids, "media-playout-speaker")
assert.Contains(t, ids, "media-playout-headphones")
}

View File

@@ -7,6 +7,7 @@
package webrtc
import (
"fmt"
"sync"
"time"
@@ -32,6 +33,8 @@ type TrackRemote struct {
receiver *RTPReceiver
peeked []byte
peekedAttributes interceptor.Attributes
audioPlayoutStatsProviders []AudioPlayoutStatsProvider
}
func newTrackRemote(kind RTPCodecType, ssrc, rtxSsrc SSRC, rid string, receiver *RTPReceiver) *TrackRemote {
@@ -237,6 +240,70 @@ func (t *TrackRemote) HasRTX() bool {
return t.rtxSsrc != 0
}
func (t *TrackRemote) addProvider(provider AudioPlayoutStatsProvider) {
t.mu.Lock()
defer t.mu.Unlock()
for _, p := range t.audioPlayoutStatsProviders {
if p == provider {
return
}
}
t.audioPlayoutStatsProviders = append(t.audioPlayoutStatsProviders, provider)
}
func (t *TrackRemote) removeProvider(provider AudioPlayoutStatsProvider) {
t.mu.Lock()
defer t.mu.Unlock()
for i, p := range t.audioPlayoutStatsProviders {
if p == provider {
t.audioPlayoutStatsProviders = append(t.audioPlayoutStatsProviders[:i], t.audioPlayoutStatsProviders[i+1:]...)
return
}
}
}
func (t *TrackRemote) pullAudioPlayoutStats(now time.Time) []AudioPlayoutStats {
t.mu.RLock()
providers := t.audioPlayoutStatsProviders
t.mu.RUnlock()
if len(providers) == 0 {
return nil
}
var allStats []AudioPlayoutStats
for _, provider := range providers {
stats, ok := provider.Snapshot(now)
if !ok {
continue
}
if stats.ID == "" {
stats.ID = fmt.Sprintf("media-playout-%d", uint32(t.SSRC()))
}
if stats.Type == "" {
stats.Type = StatsTypeMediaPlayout
}
if stats.Kind == "" {
stats.Kind = string(MediaKindAudio)
}
if stats.Timestamp == 0 {
stats.Timestamp = statsTimestampFrom(now)
}
allStats = append(allStats, stats)
}
return allStats
}
func (t *TrackRemote) setRtxSSRC(ssrc SSRC) {
t.mu.Lock()
defer t.mu.Unlock()

116
track_remote_test.go Normal file
View File

@@ -0,0 +1,116 @@
// SPDX-FileCopyrightText: 2024 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
//go:build !js
// +build !js
package webrtc
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type fakeTrackAudioPlayoutStatsProvider struct {
stats AudioPlayoutStats
ok bool
calls int
lastNow time.Time
}
func (f *fakeTrackAudioPlayoutStatsProvider) Snapshot(now time.Time) (AudioPlayoutStats, bool) {
f.calls++
f.lastNow = now
return f.stats, f.ok
}
func (f *fakeTrackAudioPlayoutStatsProvider) AddTrack(track *TrackRemote) error {
track.addProvider(f)
return nil
}
func (f *fakeTrackAudioPlayoutStatsProvider) RemoveTrack(track *TrackRemote) {
track.removeProvider(f)
}
func TestTrackRemotePullAudioPlayoutStats(t *testing.T) {
receiver := &RTPReceiver{}
track := newTrackRemote(RTPCodecTypeAudio, 4242, 0, "", receiver)
provider := &fakeTrackAudioPlayoutStatsProvider{
stats: AudioPlayoutStats{
ID: "media-playout-4242",
Type: StatsTypeMediaPlayout,
Kind: string(MediaKindAudio),
TotalSamplesCount: 960,
},
ok: true,
}
err := provider.AddTrack(track)
require.NoError(t, err)
now := time.Unix(1710000000, 0)
allStats := track.pullAudioPlayoutStats(now)
require.Len(t, allStats, 1)
stats := allStats[0]
assert.Equal(t, provider.stats.TotalSamplesCount, stats.TotalSamplesCount)
assert.Equal(t, provider.stats.Type, stats.Type)
assert.Equal(t, provider.stats.ID, stats.ID)
assert.Equal(t, provider.stats.Kind, stats.Kind)
assert.Equal(t, statsTimestampFrom(now), stats.Timestamp)
assert.Equal(t, 1, provider.calls)
assert.Equal(t, now, provider.lastNow)
}
func TestTrackRemotePullAudioPlayoutStatsMissingProvider(t *testing.T) {
receiver := &RTPReceiver{}
track := newTrackRemote(RTPCodecTypeAudio, 1111, 0, "", receiver)
stats := track.pullAudioPlayoutStats(time.Now())
require.Empty(t, stats)
}
func TestTrackRemotePullAudioPlayoutStatsProviderFalse(t *testing.T) {
receiver := &RTPReceiver{}
track := newTrackRemote(RTPCodecTypeAudio, 1111, 0, "", receiver)
provider := &fakeTrackAudioPlayoutStatsProvider{ok: false}
err := provider.AddTrack(track)
require.NoError(t, err)
stats := track.pullAudioPlayoutStats(time.Now())
require.Empty(t, stats)
assert.Equal(t, 1, provider.calls)
}
func TestTrackRemotePullAudioPlayoutStatsNormalizesDefaults(t *testing.T) {
receiver := &RTPReceiver{}
track := newTrackRemote(RTPCodecTypeAudio, 2468, 0, "", receiver)
provider := &fakeTrackAudioPlayoutStatsProvider{
stats: AudioPlayoutStats{
TotalSamplesCount: 480,
},
ok: true,
}
err := provider.AddTrack(track)
require.NoError(t, err)
allStats := track.pullAudioPlayoutStats(time.Unix(10, 0))
require.Len(t, allStats, 1)
stats := allStats[0]
assert.Equal(t, "media-playout-2468", stats.ID)
assert.Equal(t, StatsTypeMediaPlayout, stats.Type)
assert.Equal(t, string(MediaKindAudio), stats.Kind)
assert.NotZero(t, stats.Timestamp)
}