SampleBuilder: Port to use jitter buffer

This commit is contained in:
Rob Elsner
2024-02-17 17:33:16 -04:00
committed by Sean DuBois
parent cd73129ac3
commit c47f89065c
2 changed files with 50 additions and 23 deletions

View File

@@ -8,6 +8,7 @@ import (
"math" "math"
"time" "time"
"github.com/pion/interceptor/pkg/jitterbuffer"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/pion/webrtc/v4/pkg/media" "github.com/pion/webrtc/v4/pkg/media"
) )
@@ -16,7 +17,7 @@ import (
type SampleBuilder struct { type SampleBuilder struct {
maxLate uint16 // how many packets to wait until we get a valid Sample maxLate uint16 // how many packets to wait until we get a valid Sample
maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets
buffer [math.MaxUint16 + 1]*rtp.Packet buffer *jitterbuffer.JitterBuffer
preparedSamples [math.MaxUint16 + 1]*media.Sample preparedSamples [math.MaxUint16 + 1]*media.Sample
// Interface that allows us to take RTP packets to samples // Interface that allows us to take RTP packets to samples
@@ -60,7 +61,7 @@ type SampleBuilder struct {
// The depacketizer extracts media samples from RTP packets. // The depacketizer extracts media samples from RTP packets.
// Several depacketizers are available in package github.com/pion/rtp/codecs. // Several depacketizers are available in package github.com/pion/rtp/codecs.
func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder { func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder {
s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate} s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate, buffer: jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(1))}
for _, o := range opts { for _, o := range opts {
o(s) o(s)
} }
@@ -76,7 +77,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
var foundTail *rtp.Packet var foundTail *rtp.Packet
for i := location.head; i != location.tail; i++ { for i := location.head; i != location.tail; i++ {
if packet := s.buffer[i]; packet != nil { if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
foundHead = packet foundHead = packet
break break
} }
@@ -87,7 +88,7 @@ func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool {
} }
for i := location.tail - 1; i != location.head; i-- { for i := location.tail - 1; i != location.head; i-- {
if packet := s.buffer[i]; packet != nil { if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
foundTail = packet foundTail = packet
break break
} }
@@ -105,8 +106,8 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
if location.empty() { if location.empty() {
return 0, false return 0, false
} }
packet := s.buffer[location.head] packet, err := s.buffer.PeekAtSequence(location.head)
if packet == nil { if packet == nil || err != nil {
return 0, false return 0, false
} }
return packet.Timestamp, true return packet.Timestamp, true
@@ -114,7 +115,7 @@ func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timesta
func (s *SampleBuilder) releasePacket(i uint16) { func (s *SampleBuilder) releasePacket(i uint16) {
var p *rtp.Packet var p *rtp.Packet
p, s.buffer[i] = s.buffer[i], nil p, _ = s.buffer.PopAtSequence(i)
if p != nil && s.packetReleaseHandler != nil { if p != nil && s.packetReleaseHandler != nil {
s.packetReleaseHandler(p) s.packetReleaseHandler(p)
} }
@@ -178,7 +179,7 @@ func (s *SampleBuilder) purgeBuffers(flush bool) {
// Push does not copy the input. If you wish to reuse // Push does not copy the input. If you wish to reuse
// this memory make sure to copy before calling Push // this memory make sure to copy before calling Push
func (s *SampleBuilder) Push(p *rtp.Packet) { func (s *SampleBuilder) Push(p *rtp.Packet) {
s.buffer[p.SequenceNumber] = p s.buffer.Push(p)
switch s.filled.compare(p.SequenceNumber) { switch s.filled.compare(p.SequenceNumber) {
case slCompareVoid: case slCompareVoid:
@@ -220,14 +221,19 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
var consume sampleSequenceLocation var consume sampleSequenceLocation
for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ { for i := s.active.head; s.active.compare(i) != slCompareAfter; i++ {
if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) { pkt, err := s.buffer.PeekAtSequence(i)
if pkt == nil || err != nil {
break
}
if s.depacketizer.IsPartitionTail(pkt.Marker, pkt.Payload) {
consume.head = s.active.head consume.head = s.active.head
consume.tail = i + 1 consume.tail = i + 1
break break
} }
headTimestamp, hasData := s.fetchTimestamp(s.active) headTimestamp, hasData := s.fetchTimestamp(s.active)
if hasData && s.buffer[i].Timestamp != headTimestamp { if hasData && pkt.Timestamp != headTimestamp {
consume.head = s.active.head consume.head = s.active.head
consume.tail = i consume.tail = i
break break
@@ -237,8 +243,8 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
if consume.empty() { if consume.empty() {
return nil return nil
} }
pkt, _ := s.buffer.PeekAtSequence(consume.tail)
if !purgingBuffers && s.buffer[consume.tail] == nil { if !purgingBuffers && pkt == nil {
// wait for the next packet after this set of packets to arrive // wait for the next packet after this set of packets to arrive
// to ensure at least one post sample timestamp is known // to ensure at least one post sample timestamp is known
// (unless we have to release right now) // (unless we have to release right now)
@@ -250,8 +256,10 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
// scan for any packet after the current and use that time stamp as the diff point // scan for any packet after the current and use that time stamp as the diff point
for i := consume.tail; i < s.active.tail; i++ { for i := consume.tail; i < s.active.tail; i++ {
if s.buffer[i] != nil { pkt, _ = s.buffer.PeekAtSequence(i)
afterTimestamp = s.buffer[i].Timestamp
if pkt != nil {
afterTimestamp = pkt.Timestamp
break break
} }
} }
@@ -261,10 +269,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
// prior to decoding all the packets, check if this packet // prior to decoding all the packets, check if this packet
// would end being disposed anyway // would end being disposed anyway
if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) { pkt, err := s.buffer.PeekAtSequence(consume.head)
if err == nil && !s.depacketizer.IsPartitionHead(pkt.Payload) {
isPadding := false isPadding := false
for i := consume.head; i != consume.tail; i++ { for i := consume.head; i != consume.tail; i++ {
if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == s.buffer[i].Timestamp && len(s.buffer[i].Payload) == 0 { if s.lastSampleTimestamp != nil && *s.lastSampleTimestamp == pkt.Timestamp && len(pkt.Payload) == 0 {
isPadding = true isPadding = true
} }
} }
@@ -282,7 +291,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
var metadata interface{} var metadata interface{}
var rtpHeaders []*rtp.Header var rtpHeaders []*rtp.Header
for i := consume.head; i != consume.tail; i++ { for i := consume.head; i != consume.tail; i++ {
p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload) pkt, err := s.buffer.PeekAtSequence(i)
if err != nil {
return nil
}
p, err := s.depacketizer.Unmarshal(pkt.Payload)
if err != nil { if err != nil {
return nil return nil
} }
@@ -290,9 +303,11 @@ func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample {
metadata = s.packetHeadHandler(s.depacketizer) metadata = s.packetHeadHandler(s.depacketizer)
} }
if s.returnRTPHeaders { if s.returnRTPHeaders {
h := s.buffer[i].Header.Clone() if packet, _ := s.buffer.PeekAtSequence(i); packet != nil {
h := pkt.Header.Clone()
rtpHeaders = append(rtpHeaders, &h) rtpHeaders = append(rtpHeaders, &h)
} }
}
data = append(data, p...) data = append(data, p...)
} }
@@ -389,3 +404,11 @@ func WithRTPHeaders(enable bool) Option {
o.returnRTPHeaders = enable o.returnRTPHeaders = enable
} }
} }
// WithJitterBufferMinimumLength sets the minimum number of packets which must first
// be received before starting any playback
func WithJitterBufferMinimumLength(length uint16) Option {
return func(o *SampleBuilder) {
o.buffer = jitterbuffer.New(jitterbuffer.WithMinimumPacketCount(length))
}
}

View File

@@ -394,14 +394,18 @@ func TestSampleBuilderCleanReference(t *testing.T) {
s.Push(pkt5) s.Push(pkt5)
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
if s.buffer[(i+int(seqStart))%0x10000] != nil { pkt, err := s.buffer.PeekAtSequence(uint16((i + int(seqStart)) % 0x10000))
if pkt != nil || err == nil {
t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i) t.Errorf("Old packet (%d) is not unreferenced (maxLate: 10, pushed: 12)", i)
} }
} }
if s.buffer[(14+int(seqStart))%0x10000] != pkt4 { pkt, err := s.buffer.PeekAtSequence(uint16((14 + int(seqStart)) % 0x10000))
if pkt != pkt4 || err != nil {
t.Error("New packet must be referenced after jump") t.Error("New packet must be referenced after jump")
} }
if s.buffer[(12+int(seqStart))%0x10000] != pkt5 { pkt, err = s.buffer.PeekAtSequence(uint16((12 + int(seqStart)) % 0x10000))
if pkt != pkt5 || err != nil {
t.Error("New packet must be referenced after jump") t.Error("New packet must be referenced after jump")
} }
}) })