Detect partition head in SampleBuilder

Add functional option to set PartitionHeadChecker to detect the
beginning of the frame partition and avoid dropping the first
packet.
This commit is contained in:
Atsushi Watanabe
2019-12-09 18:33:14 +09:00
parent 537c841073
commit b361a21c54
2 changed files with 104 additions and 20 deletions

View File

@@ -25,11 +25,18 @@ type SampleBuilder struct {
isContiguous bool isContiguous bool
lastPopSeq uint16 lastPopSeq uint16
lastPopTimestamp uint32 lastPopTimestamp uint32
// Interface that checks whether the packet is the first fragment of the frame or not
partitionHeadChecker rtp.PartitionHeadChecker
} }
// New constructs a new SampleBuilder // New constructs a new SampleBuilder
func New(maxLate uint16, depacketizer rtp.Depacketizer) *SampleBuilder { func New(maxLate uint16, depacketizer rtp.Depacketizer, opts ...Option) *SampleBuilder {
return &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer} s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer}
for _, o := range opts {
o(s)
}
return s
} }
// Push adds a RTP Packet to the sample builder // Push adds a RTP Packet to the sample builder
@@ -47,9 +54,14 @@ func (s *SampleBuilder) buildSample(firstBuffer uint16) (*media.Sample, uint32)
for i := firstBuffer; s.buffer[i] != nil; i++ { for i := firstBuffer; s.buffer[i] != nil; i++ {
if s.buffer[i].Timestamp != s.buffer[firstBuffer].Timestamp { if s.buffer[i].Timestamp != s.buffer[firstBuffer].Timestamp {
lastTimeStamp := s.lastPopTimestamp lastTimeStamp := s.lastPopTimestamp
if !s.isContiguous && s.buffer[firstBuffer-1] != nil { if !s.isContiguous {
// firstBuffer-1 should always pass, but just to be safe if there is a bug in Pop() if s.buffer[firstBuffer-1] != nil {
lastTimeStamp = s.buffer[firstBuffer-1].Timestamp lastTimeStamp = s.buffer[firstBuffer-1].Timestamp
} else {
// If PartitionHeadChecker detects that the first packet is a head,
// the duration of the packet is not guessable
lastTimeStamp = s.buffer[firstBuffer].Timestamp
}
} }
samples := s.buffer[i-1].Timestamp - lastTimeStamp samples := s.buffer[i-1].Timestamp - lastTimeStamp
@@ -106,18 +118,21 @@ func (s *SampleBuilder) PopWithTimestamp() (*media.Sample, uint32) {
for ; i != s.lastPush; i++ { for ; i != s.lastPush; i++ {
curr := s.buffer[i] curr := s.buffer[i]
if curr == nil { if curr == nil {
if s.buffer[i-1] != nil {
break // there is a gap, we can't proceed
}
continue // we haven't hit a buffer yet, keep moving continue // we haven't hit a buffer yet, keep moving
} }
if !s.isContiguous { if !s.isContiguous {
if s.buffer[i-1] == nil { if s.partitionHeadChecker == nil {
continue // We have never popped a buffer, so we can't assert that the first RTP packet we encounter is valid if s.buffer[i-1] == nil {
} else if s.buffer[i-1].Timestamp == curr.Timestamp { continue // We have never popped a buffer, so we can't assert that the first RTP packet we encounter is valid
continue // We have the same timestamps, so it is data that spans multiple RTP packets } else if s.buffer[i-1].Timestamp == curr.Timestamp {
continue // We have the same timestamps, so it is data that spans multiple RTP packets
}
} else {
if !s.partitionHeadChecker.IsPartitionHead(curr.Payload) {
continue
}
// We can start using this frame as it is a head of frame partition
} }
} }
@@ -126,3 +141,13 @@ func (s *SampleBuilder) PopWithTimestamp() (*media.Sample, uint32) {
} }
return nil, 0 return nil, 0
} }
// Option configures SampleBuilder
type Option func(o *SampleBuilder)
// WithPartitionHeadChecker assigns codec specific PartitionHeadChecker to SampleBuilder
func WithPartitionHeadChecker(checker rtp.PartitionHeadChecker) Option {
return func(o *SampleBuilder) {
o.partitionHeadChecker = checker
}
}

View File

@@ -9,11 +9,13 @@ import (
) )
type sampleBuilderTest struct { type sampleBuilderTest struct {
message string message string
packets []*rtp.Packet packets []*rtp.Packet
samples []*media.Sample withHeadChecker bool
timestamps []uint32 headBytes []byte
maxLate uint16 samples []*media.Sample
timestamps []uint32
maxLate uint16
} }
type fakeDepacketizer struct { type fakeDepacketizer struct {
@@ -23,6 +25,19 @@ func (f *fakeDepacketizer) Unmarshal(r []byte) ([]byte, error) {
return r, nil return r, nil
} }
type fakePartitionHeadChecker struct {
headBytes []byte
}
func (f *fakePartitionHeadChecker) IsPartitionHead(payload []byte) bool {
for _, b := range f.headBytes {
if payload[0] == b {
return true
}
}
return false
}
func TestSampleBuilder(t *testing.T) { func TestSampleBuilder(t *testing.T) {
testData := []sampleBuilderTest{ testData := []sampleBuilderTest{
{ {
@@ -76,6 +91,36 @@ func TestSampleBuilder(t *testing.T) {
timestamps: []uint32{}, timestamps: []uint32{},
maxLate: 50, maxLate: 50,
}, },
{
message: "SampleBuilder should emit a packet after a gap if PartitionHeadChecker assumes it head",
packets: []*rtp.Packet{
{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 5}, Payload: []byte{0x01}},
{Header: rtp.Header{SequenceNumber: 5007, Timestamp: 6}, Payload: []byte{0x02}},
{Header: rtp.Header{SequenceNumber: 5008, Timestamp: 7}, Payload: []byte{0x03}},
},
withHeadChecker: true,
headBytes: []byte{0x02},
samples: []*media.Sample{
{Data: []byte{0x02}, Samples: 0},
},
timestamps: []uint32{
6,
},
maxLate: 50,
},
{
message: "SampleBuilder shouldn't emit a packet after a gap if PartitionHeadChecker doesn't assume it head",
packets: []*rtp.Packet{
{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 5}, Payload: []byte{0x01}},
{Header: rtp.Header{SequenceNumber: 5007, Timestamp: 6}, Payload: []byte{0x02}},
{Header: rtp.Header{SequenceNumber: 5008, Timestamp: 7}, Payload: []byte{0x03}},
},
withHeadChecker: true,
headBytes: []byte{},
samples: []*media.Sample{},
timestamps: []uint32{},
maxLate: 50,
},
{ {
message: "SampleBuilder should emit multiple valid packets", message: "SampleBuilder should emit multiple valid packets",
packets: []*rtp.Packet{ packets: []*rtp.Packet{
@@ -106,7 +151,14 @@ func TestSampleBuilder(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
for _, t := range testData { for _, t := range testData {
s := New(t.maxLate, &fakeDepacketizer{}) var opts []Option
if t.withHeadChecker {
opts = append(opts, WithPartitionHeadChecker(
&fakePartitionHeadChecker{headBytes: t.headBytes},
))
}
s := New(t.maxLate, &fakeDepacketizer{}, opts...)
samples := []*media.Sample{} samples := []*media.Sample{}
for _, p := range t.packets { for _, p := range t.packets {
@@ -123,7 +175,14 @@ func TestSampleBuilder(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
for _, t := range testData { for _, t := range testData {
s := New(t.maxLate, &fakeDepacketizer{}) var opts []Option
if t.withHeadChecker {
opts = append(opts, WithPartitionHeadChecker(
&fakePartitionHeadChecker{headBytes: t.headBytes},
))
}
s := New(t.maxLate, &fakeDepacketizer{}, opts...)
samples := []*media.Sample{} samples := []*media.Sample{}
timestamps := []uint32{} timestamps := []uint32{}