rtph264: support encoding aggregated packets

This commit is contained in:
aler9
2021-03-24 13:17:54 +01:00
parent 404ff8826a
commit ea6605c199
3 changed files with 273 additions and 95 deletions

View File

@@ -63,15 +63,17 @@ func (e *Encoder) Encode(at *AUAndTimestamp) ([]byte, error) {
return nil, fmt.Errorf("data is too big")
}
payload := make([]byte, 4+len(at.AU))
// AU-headers-length
payload := []byte{0x00, 0x10}
payload[0] = 0x00
payload[1] = 0x10
// AU-header
header := make([]byte, 2)
binary.BigEndian.PutUint16(header, uint16(len(at.AU))<<3)
payload = append(payload, header...)
binary.BigEndian.PutUint16(payload[2:], uint16(len(at.AU))<<3)
payload = append(payload, at.AU...)
// AU
copy(payload[4:], at.AU)
rpkt := rtp.Packet{
Header: rtp.Header{
@@ -80,11 +82,11 @@ func (e *Encoder) Encode(at *AUAndTimestamp) ([]byte, error) {
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(at.Timestamp),
SSRC: e.ssrc,
Marker: true,
},
Payload: payload,
}
e.sequenceNumber++
rpkt.Header.Marker = true
frame, err := rpkt.Marshal()
if err != nil {

View File

@@ -1,6 +1,7 @@
package rtph264
import (
"encoding/binary"
"math/rand"
"time"
@@ -55,16 +56,60 @@ func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return e.initialTs + uint32(ts.Seconds()*rtpClockRate)
}
// Encode encodes a NALU into RTP/H264 packets.
// Encode encodes NALUs into RTP/H264 packets.
// It always returns at least one RTP/H264 packet.
func (e *Encoder) Encode(nt *NALUAndTimestamp) ([][]byte, error) {
// if the NALU fits into a single RTP packet, use a single payload
if len(nt.NALU) < rtpPayloadMaxSize {
return e.writeSingle(nt)
// RTP/H264 packets can be:
// * single
// * fragmented (FU-A)
// * aggregated (STAP-A)
func (e *Encoder) Encode(nts []*NALUAndTimestamp) ([][]byte, error) {
var rets [][]byte
var batch []*NALUAndTimestamp
// split packets into batches
for _, nt := range nts {
// packets can be contained into a single aggregation unit
if e.lenAggregated(batch, nt) < rtpPayloadMaxSize {
// add packet to batch
batch = append(batch, nt)
} else {
// write last batch
if batch != nil {
pkts, err := e.writeBatch(batch)
if err != nil {
return nil, err
}
rets = append(rets, pkts...)
}
// otherwise, split the NALU into multiple fragmentation payloads
return e.writeFragmented(nt)
// initialize new batch
batch = []*NALUAndTimestamp{nt}
}
}
// write last batch
pkts, err := e.writeBatch(batch)
if err != nil {
return nil, err
}
rets = append(rets, pkts...)
return rets, nil
}
func (e *Encoder) writeBatch(nts []*NALUAndTimestamp) ([][]byte, error) {
if len(nts) == 1 {
// the NALU fits into a single RTP packet, use a single payload
if len(nts[0].NALU) < rtpPayloadMaxSize {
return e.writeSingle(nts[0])
}
// split the NALU into multiple fragmentation payloads
return e.writeFragmented(nts[0])
}
return e.writeAggregated(nts)
}
func (e *Encoder) writeSingle(nt *NALUAndTimestamp) ([][]byte, error) {
@@ -75,13 +120,12 @@ func (e *Encoder) writeSingle(nt *NALUAndTimestamp) ([][]byte, error) {
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(nt.Timestamp),
SSRC: e.ssrc,
Marker: true,
},
Payload: nt.NALU,
}
e.sequenceNumber++
rpkt.Header.Marker = true
frame, err := rpkt.Marshal()
if err != nil {
return nil, err
@@ -152,3 +196,54 @@ func (e *Encoder) writeFragmented(nt *NALUAndTimestamp) ([][]byte, error) {
return ret, nil
}
func (e *Encoder) lenAggregated(batch []*NALUAndTimestamp, additionalEl *NALUAndTimestamp) int {
ret := 1 // header
for _, bnt := range batch {
ret += 2 // size
ret += len(bnt.NALU) // unit
}
if additionalEl != nil {
ret += 2 // size
ret += len(additionalEl.NALU) // unit
}
return ret
}
func (e *Encoder) writeAggregated(nts []*NALUAndTimestamp) ([][]byte, error) {
payload := make([]byte, e.lenAggregated(nts, nil))
payload[0] = uint8(codech264.NALUTypeStapA) // header
pos := 1
for _, nt := range nts {
naluLen := len(nt.NALU)
binary.BigEndian.PutUint16(payload[pos:], uint16(naluLen))
pos += 2
copy(payload[pos:], nt.NALU)
pos += naluLen
}
rpkt := rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.payloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(nts[0].Timestamp),
SSRC: e.ssrc,
Marker: true,
},
Payload: payload,
}
e.sequenceNumber++
frame, err := rpkt.Marshal()
if err != nil {
return nil, err
}
return [][]byte{frame}, nil
}

View File

@@ -33,18 +33,20 @@ func (f readerFunc) Read(p []byte) (int, error) {
var cases = []struct {
name string
dec *NALUAndTimestamp
dec []*NALUAndTimestamp
enc [][]byte
}{
{
"single",
&NALUAndTimestamp{
[]*NALUAndTimestamp{
{
Timestamp: 25 * time.Millisecond,
NALU: mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8),
),
},
},
[][]byte{
mergeBytes(
[]byte{
@@ -57,13 +59,15 @@ var cases = []struct {
},
{
"negative timestamp",
&NALUAndTimestamp{
[]*NALUAndTimestamp{
{
Timestamp: -20 * time.Millisecond,
NALU: mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8),
),
},
},
[][]byte{
mergeBytes(
[]byte{
@@ -76,33 +80,159 @@ var cases = []struct {
},
{
"fragmented",
&NALUAndTimestamp{
[]*NALUAndTimestamp{
{
Timestamp: 55 * time.Millisecond,
NALU: mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 256),
),
},
},
[][]byte{
mergeBytes(
[]byte{
0x80, 0x60, 0x44, 0xed, 0x88, 0x77, 0x79, 0xab,
0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x85, 0x00, 0x01,
0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x85,
},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 181),
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
[]byte{0x00, 0x01},
),
mergeBytes(
[]byte{
0x80, 0xe0, 0x44, 0xee, 0x88, 0x77, 0x79, 0xab,
0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x45, 0x02, 0x03,
0x04, 0x05, 0x06, 0x07,
0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x45,
},
[]byte{0x02, 0x03, 0x04, 0x05, 0x06, 0x07},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 73),
),
},
},
{
"aggregated",
[]*NALUAndTimestamp{
{
NALU: []byte{0x09, 0xF0},
},
{
NALU: []byte{
0x41, 0x9a, 0x24, 0x6c, 0x41, 0x4f, 0xfe, 0xd6,
0x8c, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x6d, 0x40,
},
},
},
[][]byte{
{
0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x18, 0x00, 0x02, 0x09,
0xf0, 0x00, 0x44, 0x41, 0x9a, 0x24, 0x6c, 0x41,
0x4f, 0xfe, 0xd6, 0x8c, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x6d, 0x40,
},
},
},
{
"aggregated followed by single",
[]*NALUAndTimestamp{
{
NALU: []byte{0x09, 0xF0},
},
{
NALU: []byte{
0x41, 0x9a, 0x24, 0x6c, 0x41, 0x4f, 0xfe, 0xd6,
0x8c, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x6d, 0x40,
},
},
{
NALU: mergeBytes(
[]byte{0x08},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 175),
),
},
},
[][]byte{
{
0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x18, 0x00, 0x02, 0x09,
0xf0, 0x00, 0x44, 0x41, 0x9a, 0x24, 0x6c, 0x41,
0x4f, 0xfe, 0xd6, 0x8c, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x6d, 0x40,
},
mergeBytes(
[]byte{
0x80, 0xe0, 0x44, 0xee, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x08,
},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 175),
),
},
},
{
"fragmented followed by aggregated",
[]*NALUAndTimestamp{
{
NALU: mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 256),
),
},
{
NALU: []byte{0x09, 0xF0},
},
{
NALU: []byte{0x09, 0xF0},
},
},
[][]byte{
mergeBytes(
[]byte{
0x80, 0x60, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x85,
},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
[]byte{0x00, 0x01},
),
mergeBytes(
[]byte{
0x80, 0xe0, 0x44, 0xee, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x45,
},
[]byte{0x02, 0x03, 0x04, 0x05, 0x06, 0x07},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 73),
),
{
0x80, 0xe0, 0x44, 0xef, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x18, 0x00, 0x02, 0x09,
0xf0, 0x00, 0x02, 0x09, 0xf0,
},
},
},
}
func TestEncode(t *testing.T) {
@@ -135,71 +265,22 @@ func TestDecode(t *testing.T) {
d := NewDecoder()
// send an initial packet downstream
// in order to correctly compute the timestamp
// in order to compute the timestamp,
// which is relative to the initial packet
_, err := d.Decode([]byte{
0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x06, 0x00,
})
require.NoError(t, err)
for _, dec0 := range ca.dec {
dec, err := d.Read(r)
require.NoError(t, err)
require.Equal(t, ca.dec, dec)
require.Equal(t, dec0, dec)
}
_, err = d.Read(r)
require.Equal(t, io.EOF, err)
})
}
}
func TestDecodeAggregated(t *testing.T) {
sent := false
r := readerFunc(func(p []byte) (int, error) {
if sent {
return 0, io.EOF
}
sent = true
pkt := []byte{
0x80, 0xe0, 0x0e, 0x6a, 0x48, 0xf1, 0x7d, 0xb9,
0x23, 0xe6, 0x5d, 0x50, 0x18, 0x00, 0x02, 0x09,
0xf0, 0x00, 0x44, 0x41, 0x9a, 0x24, 0x6c, 0x41,
0x4f, 0xfe, 0xd6, 0x8c, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x6d, 0x40,
}
return copy(p, pkt), nil
})
d := NewDecoder()
nt, err := d.Read(r)
require.NoError(t, err)
require.Equal(t, &NALUAndTimestamp{
NALU: []byte{0x09, 0xF0},
}, nt)
nt, err = d.Read(r)
require.NoError(t, err)
require.Equal(t, &NALUAndTimestamp{
NALU: []byte{
0x41, 0x9a, 0x24, 0x6c, 0x41, 0x4f, 0xfe, 0xd6,
0x8c, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x6d, 0x40,
},
}, nt)
_, err = d.Read(r)
require.Equal(t, io.EOF, err)
}