Don't use JitterBuffer in SampleBuilder

The performance of the SampleBuilder is significantly worse when using
the SampleBuilder. It would be good to evaluate improving the
performance of the JitterBuffer. However for the time being we are just
going to revert.

Resolve #2778
This commit is contained in:
Sean DuBois
2024-10-10 19:13:33 -04:00
parent dc1f8ffd1c
commit bb41f23a0f
2 changed files with 23 additions and 50 deletions

View File

@@ -8,7 +8,6 @@ import (
"math" "math"
"time" "time"
"github.com/pion/interceptor/pkg/jitterbuffer"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/pion/webrtc/v4/pkg/media" "github.com/pion/webrtc/v4/pkg/media"
) )
@@ -17,7 +16,7 @@ import (
type SampleBuilder struct { type SampleBuilder struct {
maxLate uint16 // how many packets to wait until we get a valid Sample maxLate uint16 // how many packets to wait until we get a valid Sample
maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
buffer *jitterbuffer.JitterBuffer buffer [math.MaxUint16 + 1]*rtp.Packet
preparedSamples [math.MaxUint16 + 1]*media.Sample preparedSamples [math.MaxUint16 + 1]*media.Sample
// Interface that allows us to take RTP packets to samples // Interface that allows us to take RTP packets to samples
@@ -61,7 +60,7 @@ type SampleBuilder struct {
// The depacketizer extracts media samples from RTP packets. // The depacketizer extracts media samples from RTP packets.
// Several depacketizers are available in package github.com/pion/rtp/codecs. // Several depacketizers are available in package github.com/pion/rtp/codecs.
func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder { func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(1))} s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate}
for _, o := range opts { for _, o := range opts {
o(s) o(s)
} }
@@ -77,7 +76,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
var foundTail *rtp.Packet var foundTail *rtp.Packet
for i := location.head; i != location.tail; i++ { for i := location.head; i != location.tail; i++ {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil { if packet := s.buffer[i]; packet != nil {
foundHead = packet foundHead = packet
break break
} }
@@ -88,7 +87,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
} }
for i := location.tail - 1; i != location.head; i-- { for i := location.tail - 1; i != location.head; i-- {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil { if packet := s.buffer[i]; packet != nil {
foundTail = packet foundTail = packet
break break
} }
@@ -106,8 +105,8 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
if location.empty() { if location.empty() {
return 0, false return 0, false
} }
packet, err := s.buffer.PeekAtSequence(location.head) packet := s.buffer[location.head]
if packet == nil || err != nil { if packet == nil {
return 0, false return 0, false
} }
return packet.Timestamp, true return packet.Timestamp, true
@@ -115,7 +114,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
func (s *SampleBuilder) releasePacket(i uint16) { func (s *SampleBuilder) releasePacket(i uint16) {
var p *rtp.Packet var p *rtp.Packet
p, _ = s.buffer.PopAtSequence(i) p, s.buffer[i] = s.buffer[i], nil
if p != nil && s.packetReleaseHandler != nil { if p != nil && s.packetReleaseHandler != nil {
s.packetReleaseHandler(p) s.packetReleaseHandler(p)
} }
@@ -179,7 +178,7 @@ func (s *SampleBuilder) purgeBuffers(flush bool) {
// Push does not copy the input. If you wish to reuse // Push does not copy the input. If you wish to reuse
// this memory make sure to copy before calling Push // this memory make sure to copy before calling Push
func (s *SampleBuilder) Push(p *rtp.Packet) { func (s *SampleBuilder) Push(p *rtp.Packet) {
s.buffer.Push(p) s.buffer[p.SequenceNumber] = p
switch s.filled.compare(p.SequenceNumber) { switch s.filled.compare(p.SequenceNumber) {
case slCompareVoid: case slCompareVoid:
@@ -221,19 +220,14 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
var consume sampleSequenceLocation var consume sampleSequenceLocation
for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ { for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ {
pkt, err := s.buffer.PeekAtSequence(i) if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) {
if pkt == nil || err != nil {
break
}
if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) {
consume.head = s.active.head consume.head = s.active.head
consume.tail = i + 1 consume.tail = i + 1
break break
} }
headTimestamp, hasData := s.fetchTimestamp(s.active) headTimestamp, hasData := s.fetchTimestamp(s.active)
if hasData && pkt.Timestamp != headTimestamp { if hasData && s.buffer[i].Timestamp != headTimestamp {
consume.head = s.active.head consume.head = s.active.head
consume.tail = i consume.tail = i
break break
@@ -243,8 +237,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
if consume.empty() { if consume.empty() {
return nil return nil
} }
pkt, _ := s.buffer.PeekAtSequence(consume.tail)
if !purgingBuffers && pkt == nil { if !purgingBuffers && s.buffer[consume.tail] == nil {
// wait for the next packet after this set of packets to arrive // wait for the next packet after this set of packets to arrive
// to ensure at least one post sample timestamp is known // to ensure at least one post sample timestamp is known
// (unless we have to release right now) // (unless we have to release right now)
@@ -256,10 +250,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
// scan for any packet after the current and use that time stamp as the diff point // scan for any packet after the current and use that time stamp as the diff point
for i := consume.tail; i < s.active.tail; i++ { for i := consume.tail; i < s.active.tail; i++ {
pkt, _ = s.buffer.PeekAtSequence(i) if s.buffer[i] != nil {
afterTimestamp = s.buffer[i].Timestamp
if pkt != nil {
afterTimestamp = pkt.Timestamp
break break
} }
} }
@@ -269,11 +261,10 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
// prior to decoding all the packets, check if this packet // prior to decoding all the packets, check if this packet
// would end being disposed anyway // would end being disposed anyway
pkt, err := s.buffer.PeekAtSequence(consume.head) if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) {
if err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) {
isPadding := false isPadding := false
for i := consume.head; i != consume.tail; i++ { for i := consume.head; i != consume.tail; i++ {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 { if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 {
isPadding = true isPadding = true
} }
} }
@@ -291,11 +282,7 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
var metadata interface{} var metadata interface{}
var rtpHeaders []*rtp.Header var rtpHeaders []*rtp.Header
for i := consume.head; i != consume.tail; i++ { for i := consume.head; i != consume.tail; i++ {
pkt, err := s.buffer.PeekAtSequence(i) p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload)
if err != nil {
return nil
}
p, err := s.depacketizer.Unmarshal(pkt.Payload)
if err != nil { if err != nil {
return nil return nil
} }
@@ -303,10 +290,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
metadata = s.packetHeadHandler(s.depacketizer) metadata = s.packetHeadHandler(s.depacketizer)
} }
if s.returnRTPHeaders { if s.returnRTPHeaders {
if packet, _ := s.buffer.PeekAtSequence(i); packet != nil { h := s.buffer[i].Header.Clone()
h := pkt.Header.Clone() rtpHeaders = append(rtpHeaders, &h)
rtpHeaders = append(rtpHeaders, &h)
}
} }
data = append(data, p...) data = append(data, p...)
@@ -404,11 +389,3 @@ func WithRTPHeaders(enable bool) Option {
o.returnRTPHeaders = enable o.returnRTPHeaders = enable
} }
} }
// WithJitterBufferMinimumLength sets the minimum number of packets which must first
// be received before starting any playback
func WithJitterBufferMinimumLength(length uint16) Option {
return func(o *SampleBuilder) {
o.buffer = jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(length))
}
}

View File

@@ -396,18 +396,14 @@ func TestSampleBuilderCleanReference(t *testing.T) {
s.Push(pkt5) s.Push(pkt5)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000)) if s.buffer[(i+int(seqStart))%0x10000] != nil {
if pkt != nil || err == nil {
t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i) t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i)
} }
} }
pkt, err := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000)) if s.buffer[(14+int(seqStart))%0x10000] != pkt4 {
if pkt != pkt4 || err != nil {
t.Error("New packet must be referenced after jump") t.Error("New packet must be referenced after jump")
} }
pkt, err = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000)) if s.buffer[(12+int(seqStart))%0x10000] != pkt5 {
if pkt != pkt5 || err != nil {
t.Error("New packet must be referenced after jump") t.Error("New packet must be referenced after jump")
} }
}) })