diff --git a/pkg/format/rtph264/decoder.go b/pkg/format/rtph264/decoder.go index 519c045d..725ecd5a 100644 --- a/pkg/format/rtph264/decoder.go +++ b/pkg/format/rtph264/decoder.go @@ -59,9 +59,10 @@ type Decoder struct { annexBMode bool // for Decode() - frameBuffer [][]byte - frameBufferLen int - frameBufferSize int + frameBuffer [][]byte + frameBufferLen int + frameBufferSize int + frameBufferTimestamp uint32 } // Init initializes the decoder. @@ -222,12 +223,46 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, error) { } l := len(nalus) + // support splitting access units by timestamp. + // (some cameras do not use the Marker field, like the FLIR M400) + if d.frameBuffer != nil && pkt.Timestamp != d.frameBufferTimestamp { + ret := d.frameBuffer + d.resetFrameBuffer() + + err = d.addToFrameBuffer(nalus, l, pkt.Timestamp) + if err != nil { + return nil, err + } + + return ret, nil + } + + err = d.addToFrameBuffer(nalus, l, pkt.Timestamp) + if err != nil { + return nil, err + } + + if !pkt.Marker { + return nil, ErrMorePacketsNeeded + } + + ret := d.frameBuffer + d.resetFrameBuffer() + + return ret, nil +} + +func (d *Decoder) resetFrameBuffer() { + d.frameBuffer = nil // do not reuse frameBuffer to avoid race conditions + d.frameBufferLen = 0 + d.frameBufferSize = 0 +} + +func (d *Decoder) addToFrameBuffer(nalus [][]byte, l int, ts uint32) error { if (d.frameBufferLen + l) > h264.MaxNALUsPerAccessUnit { errCount := d.frameBufferLen + l - d.frameBuffer = nil - d.frameBufferLen = 0 - d.frameBufferSize = 0 - return nil, fmt.Errorf("NALU count (%d) exceeds maximum allowed (%d)", + d.resetFrameBuffer() + return fmt.Errorf("NALU count (%d) exceeds maximum allowed (%d)", errCount, h264.MaxNALUsPerAccessUnit) } @@ -235,29 +270,16 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, error) { if (d.frameBufferSize + addSize) > h264.MaxAccessUnitSize { errSize := d.frameBufferSize + addSize - d.frameBuffer = nil - d.frameBufferLen = 0 - d.frameBufferSize = 0 - return nil, fmt.Errorf("access unit size (%d) is too big, maximum is %d", + d.resetFrameBuffer() + return fmt.Errorf("access unit size (%d) is too big, maximum is %d", errSize, h264.MaxAccessUnitSize) } d.frameBuffer = append(d.frameBuffer, nalus...) d.frameBufferLen += l d.frameBufferSize += addSize - - if !pkt.Marker { - return nil, ErrMorePacketsNeeded - } - - ret := d.frameBuffer - - // do not reuse frameBuffer to avoid race conditions - d.frameBuffer = nil - d.frameBufferLen = 0 - d.frameBufferSize = 0 - - return ret, nil + d.frameBufferTimestamp = ts + return nil } // some cameras / servers wrap NALUs into Annex-B diff --git a/pkg/format/rtph264/decoder_test.go b/pkg/format/rtph264/decoder_test.go index 93a79016..25ad7748 100644 --- a/pkg/format/rtph264/decoder_test.go +++ b/pkg/format/rtph264/decoder_test.go @@ -189,37 +189,86 @@ func TestDecodeAnnexB(t *testing.T) { } func TestDecodeAccessUnit(t *testing.T) { - d := &Decoder{} - err := d.Init() - require.NoError(t, err) - - nalus, err := d.Decode(&rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: false, - PayloadType: 96, - SequenceNumber: 17647, - Timestamp: 2289531307, - SSRC: 0x9dbb7812, + for _, ca := range []struct { + name string + pkts []*rtp.Packet + au [][]byte + }{ + { + "marker-splitted", + []*rtp.Packet{ + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17647, + Timestamp: 2289531307, + SSRC: 0x9dbb7812, + }, + Payload: []byte{1, 2}, + }, + { + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 17647, + Timestamp: 2289531307, + SSRC: 0x9dbb7812, + }, + Payload: []byte{3, 4}, + }, + }, + [][]byte{{1, 2}, {3, 4}}, }, - Payload: []byte{0x01, 0x02}, - }) - require.Equal(t, ErrMorePacketsNeeded, err) - require.Equal(t, [][]byte(nil), nalus) - - nalus, err = d.Decode(&rtp.Packet{ - Header: rtp.Header{ - Version: 2, - Marker: true, - PayloadType: 96, - SequenceNumber: 17647, - Timestamp: 2289531307, - SSRC: 0x9dbb7812, + { + "timestamp-splitted (FLIR M400)", + []*rtp.Packet{ + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17647, + Timestamp: 2289531307, + SSRC: 0x9dbb7812, + }, + Payload: []byte{1, 2}, + }, + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17647, + Timestamp: 2289531308, + SSRC: 0x9dbb7812, + }, + Payload: []byte{3, 4}, + }, + }, + [][]byte{{1, 2}}, }, - Payload: []byte{0x01, 0x02}, - }) - require.NoError(t, err) - require.Equal(t, [][]byte{{0x01, 0x02}, {0x01, 0x02}}, nalus) + } { + t.Run(ca.name, func(t *testing.T) { + d := &Decoder{} + err := d.Init() + require.NoError(t, err) + + var au [][]byte + + for i, pkt := range ca.pkts { + au, err = d.Decode(pkt) + if i != len(ca.pkts)-1 { + require.Equal(t, ErrMorePacketsNeeded, err) + } else { + require.NoError(t, err) + require.Equal(t, ca.au, au) + } + } + }) + } } func TestDecoderErrorNALUSize(t *testing.T) {