diff --git a/pkg/rtpaac/encoder.go b/pkg/rtpaac/encoder.go index b7928ae4..7b8052f3 100644 --- a/pkg/rtpaac/encoder.go +++ b/pkg/rtpaac/encoder.go @@ -63,15 +63,17 @@ func (e *Encoder) Encode(at *AUAndTimestamp) ([]byte, error) { return nil, fmt.Errorf("data is too big") } + payload := make([]byte, 4+len(at.AU)) + // AU-headers-length - payload := []byte{0x00, 0x10} + payload[0] = 0x00 + payload[1] = 0x10 // AU-header - header := make([]byte, 2) - binary.BigEndian.PutUint16(header, uint16(len(at.AU))<<3) - payload = append(payload, header...) + binary.BigEndian.PutUint16(payload[2:], uint16(len(at.AU))<<3) - payload = append(payload, at.AU...) + // AU + copy(payload[4:], at.AU) rpkt := rtp.Packet{ Header: rtp.Header{ @@ -80,11 +82,11 @@ func (e *Encoder) Encode(at *AUAndTimestamp) ([]byte, error) { SequenceNumber: e.sequenceNumber, Timestamp: e.encodeTimestamp(at.Timestamp), SSRC: e.ssrc, + Marker: true, }, Payload: payload, } e.sequenceNumber++ - rpkt.Header.Marker = true frame, err := rpkt.Marshal() if err != nil { diff --git a/pkg/rtph264/encoder.go b/pkg/rtph264/encoder.go index 7c0510a7..87d504c9 100644 --- a/pkg/rtph264/encoder.go +++ b/pkg/rtph264/encoder.go @@ -1,6 +1,7 @@ package rtph264 import ( + "encoding/binary" "math/rand" "time" @@ -55,16 +56,60 @@ func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 { return e.initialTs + uint32(ts.Seconds()*rtpClockRate) } -// Encode encodes a NALU into RTP/H264 packets. +// Encode encodes NALUs into RTP/H264 packets. // It always returns at least one RTP/H264 packet. -func (e *Encoder) Encode(nt *NALUAndTimestamp) ([][]byte, error) { - // if the NALU fits into a single RTP packet, use a single payload - if len(nt.NALU) < rtpPayloadMaxSize { - return e.writeSingle(nt) +// RTP/H264 packets can be: +// * single +// * fragmented (FU-A) +// * aggregated (STAP-A) +func (e *Encoder) Encode(nts []*NALUAndTimestamp) ([][]byte, error) { + var rets [][]byte + var batch []*NALUAndTimestamp + + // split packets into batches + for _, nt := range nts { + // packets can be contained into a single aggregation unit + if e.lenAggregated(batch, nt) < rtpPayloadMaxSize { + // add packet to batch + batch = append(batch, nt) + + } else { + // write last batch + if batch != nil { + pkts, err := e.writeBatch(batch) + if err != nil { + return nil, err + } + rets = append(rets, pkts...) + } + + // initialize new batch + batch = []*NALUAndTimestamp{nt} + } } - // otherwise, split the NALU into multiple fragmentation payloads - return e.writeFragmented(nt) + // write last batch + pkts, err := e.writeBatch(batch) + if err != nil { + return nil, err + } + rets = append(rets, pkts...) + + return rets, nil +} + +func (e *Encoder) writeBatch(nts []*NALUAndTimestamp) ([][]byte, error) { + if len(nts) == 1 { + // the NALU fits into a single RTP packet, use a single payload + if len(nts[0].NALU) < rtpPayloadMaxSize { + return e.writeSingle(nts[0]) + } + + // split the NALU into multiple fragmentation payloads + return e.writeFragmented(nts[0]) + } + + return e.writeAggregated(nts) } func (e *Encoder) writeSingle(nt *NALUAndTimestamp) ([][]byte, error) { @@ -75,13 +120,12 @@ func (e *Encoder) writeSingle(nt *NALUAndTimestamp) ([][]byte, error) { SequenceNumber: e.sequenceNumber, Timestamp: e.encodeTimestamp(nt.Timestamp), SSRC: e.ssrc, + Marker: true, }, Payload: nt.NALU, } e.sequenceNumber++ - rpkt.Header.Marker = true - frame, err := rpkt.Marshal() if err != nil { return nil, err @@ -152,3 +196,54 @@ func (e *Encoder) writeFragmented(nt *NALUAndTimestamp) ([][]byte, error) { return ret, nil } + +func (e *Encoder) lenAggregated(batch []*NALUAndTimestamp, additionalEl *NALUAndTimestamp) int { + ret := 1 // header + + for _, bnt := range batch { + ret += 2 // size + ret += len(bnt.NALU) // unit + } + + if additionalEl != nil { + ret += 2 // size + ret += len(additionalEl.NALU) // unit + } + + return ret +} + +func (e *Encoder) writeAggregated(nts []*NALUAndTimestamp) ([][]byte, error) { + payload := make([]byte, e.lenAggregated(nts, nil)) + payload[0] = uint8(codech264.NALUTypeStapA) // header + pos := 1 + + for _, nt := range nts { + naluLen := len(nt.NALU) + binary.BigEndian.PutUint16(payload[pos:], uint16(naluLen)) + pos += 2 + + copy(payload[pos:], nt.NALU) + pos += naluLen + } + + rpkt := rtp.Packet{ + Header: rtp.Header{ + Version: rtpVersion, + PayloadType: e.payloadType, + SequenceNumber: e.sequenceNumber, + Timestamp: e.encodeTimestamp(nts[0].Timestamp), + SSRC: e.ssrc, + Marker: true, + }, + Payload: payload, + } + e.sequenceNumber++ + + frame, err := rpkt.Marshal() + if err != nil { + return nil, err + } + + return [][]byte{frame}, nil +} diff --git a/pkg/rtph264/rtph264_test.go b/pkg/rtph264/rtph264_test.go index b4c3ab38..fe79c2c0 100644 --- a/pkg/rtph264/rtph264_test.go +++ b/pkg/rtph264/rtph264_test.go @@ -33,17 +33,19 @@ func (f readerFunc) Read(p []byte) (int, error) { var cases = []struct { name string - dec *NALUAndTimestamp + dec []*NALUAndTimestamp enc [][]byte }{ { "single", - &NALUAndTimestamp{ - Timestamp: 25 * time.Millisecond, - NALU: mergeBytes( - []byte{0x05}, - bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8), - ), + []*NALUAndTimestamp{ + { + Timestamp: 25 * time.Millisecond, + NALU: mergeBytes( + []byte{0x05}, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8), + ), + }, }, [][]byte{ mergeBytes( @@ -57,12 +59,14 @@ var cases = []struct { }, { "negative timestamp", - &NALUAndTimestamp{ - Timestamp: -20 * time.Millisecond, - NALU: mergeBytes( - []byte{0x05}, - bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8), - ), + []*NALUAndTimestamp{ + { + Timestamp: -20 * time.Millisecond, + NALU: mergeBytes( + []byte{0x05}, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8), + ), + }, }, [][]byte{ mergeBytes( @@ -76,33 +80,159 @@ var cases = []struct { }, { "fragmented", - &NALUAndTimestamp{ - Timestamp: 55 * time.Millisecond, - NALU: mergeBytes( - []byte{0x05}, - bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 256), - ), + []*NALUAndTimestamp{ + { + Timestamp: 55 * time.Millisecond, + NALU: mergeBytes( + []byte{0x05}, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 256), + ), + }, }, [][]byte{ mergeBytes( []byte{ 0x80, 0x60, 0x44, 0xed, 0x88, 0x77, 0x79, 0xab, - 0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x85, 0x00, 0x01, - 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, + 0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x85, }, - bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 181), + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182), []byte{0x00, 0x01}, ), mergeBytes( []byte{ 0x80, 0xe0, 0x44, 0xee, 0x88, 0x77, 0x79, 0xab, - 0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x45, 0x02, 0x03, - 0x04, 0x05, 0x06, 0x07, + 0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x45, }, + []byte{0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 73), ), }, }, + { + "aggregated", + []*NALUAndTimestamp{ + { + NALU: []byte{0x09, 0xF0}, + }, + { + NALU: []byte{ + 0x41, 0x9a, 0x24, 0x6c, 0x41, 0x4f, 0xfe, 0xd6, + 0x8c, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x6d, 0x40, + }, + }, + }, + [][]byte{ + { + 0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55, + 0x9d, 0xbb, 0x78, 0x12, 0x18, 0x00, 0x02, 0x09, + 0xf0, 0x00, 0x44, 0x41, 0x9a, 0x24, 0x6c, 0x41, + 0x4f, 0xfe, 0xd6, 0x8c, 0xb0, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x6d, 0x40, + }, + }, + }, + { + "aggregated followed by single", + []*NALUAndTimestamp{ + { + NALU: []byte{0x09, 0xF0}, + }, + { + NALU: []byte{ + 0x41, 0x9a, 0x24, 0x6c, 0x41, 0x4f, 0xfe, 0xd6, + 0x8c, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x6d, 0x40, + }, + }, + { + NALU: mergeBytes( + []byte{0x08}, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 175), + ), + }, + }, + [][]byte{ + { + 0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55, + 0x9d, 0xbb, 0x78, 0x12, 0x18, 0x00, 0x02, 0x09, + 0xf0, 0x00, 0x44, 0x41, 0x9a, 0x24, 0x6c, 0x41, + 0x4f, 0xfe, 0xd6, 0x8c, 0xb0, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, + 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x6d, 0x40, + }, + mergeBytes( + []byte{ + 0x80, 0xe0, 0x44, 0xee, 0x88, 0x77, 0x66, 0x55, + 0x9d, 0xbb, 0x78, 0x12, 0x08, + }, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 175), + ), + }, + }, + { + "fragmented followed by aggregated", + []*NALUAndTimestamp{ + { + NALU: mergeBytes( + []byte{0x05}, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 256), + ), + }, + { + NALU: []byte{0x09, 0xF0}, + }, + { + NALU: []byte{0x09, 0xF0}, + }, + }, + [][]byte{ + mergeBytes( + []byte{ + 0x80, 0x60, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55, + 0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x85, + }, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182), + []byte{0x00, 0x01}, + ), + mergeBytes( + []byte{ + 0x80, 0xe0, 0x44, 0xee, 0x88, 0x77, 0x66, 0x55, + 0x9d, 0xbb, 0x78, 0x12, 0x1c, 0x45, + }, + []byte{0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 73), + ), + { + 0x80, 0xe0, 0x44, 0xef, 0x88, 0x77, 0x66, 0x55, + 0x9d, 0xbb, 0x78, 0x12, 0x18, 0x00, 0x02, 0x09, + 0xf0, 0x00, 0x02, 0x09, 0xf0, + }, + }, + }, } func TestEncode(t *testing.T) { @@ -135,71 +265,22 @@ func TestDecode(t *testing.T) { d := NewDecoder() // send an initial packet downstream - // in order to correctly compute the timestamp + // in order to compute the timestamp, + // which is relative to the initial packet _, err := d.Decode([]byte{ 0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55, 0x9d, 0xbb, 0x78, 0x12, 0x06, 0x00, }) require.NoError(t, err) - dec, err := d.Read(r) - require.NoError(t, err) - require.Equal(t, ca.dec, dec) + for _, dec0 := range ca.dec { + dec, err := d.Read(r) + require.NoError(t, err) + require.Equal(t, dec0, dec) + } _, err = d.Read(r) require.Equal(t, io.EOF, err) }) } } - -func TestDecodeAggregated(t *testing.T) { - sent := false - r := readerFunc(func(p []byte) (int, error) { - if sent { - return 0, io.EOF - } - - sent = true - pkt := []byte{ - 0x80, 0xe0, 0x0e, 0x6a, 0x48, 0xf1, 0x7d, 0xb9, - 0x23, 0xe6, 0x5d, 0x50, 0x18, 0x00, 0x02, 0x09, - 0xf0, 0x00, 0x44, 0x41, 0x9a, 0x24, 0x6c, 0x41, - 0x4f, 0xfe, 0xd6, 0x8c, 0xb0, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, - 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, - 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, - 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, - 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x03, 0x00, 0x00, 0x6d, 0x40, - } - return copy(p, pkt), nil - }) - - d := NewDecoder() - - nt, err := d.Read(r) - require.NoError(t, err) - require.Equal(t, &NALUAndTimestamp{ - NALU: []byte{0x09, 0xF0}, - }, nt) - - nt, err = d.Read(r) - require.NoError(t, err) - require.Equal(t, &NALUAndTimestamp{ - NALU: []byte{ - 0x41, 0x9a, 0x24, 0x6c, 0x41, 0x4f, 0xfe, 0xd6, - 0x8c, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, - 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, - 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, - 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, - 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, - 0x00, 0x00, 0x6d, 0x40, - }, - }, nt) - - _, err = d.Read(r) - require.Equal(t, io.EOF, err) -}