rtpaac, rtph264: make Encoder.Encode() to return []*rtp.Packet

This commit is contained in:
aler9
2021-09-28 15:33:17 +02:00
parent 308802c5a1
commit 6410be3ba2
4 changed files with 41 additions and 44 deletions

View File

@@ -63,9 +63,8 @@ func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
} }
// Encode encodes AUs into RTP/AAC packets. // Encode encodes AUs into RTP/AAC packets.
// It returns the encoded packets. func (e *Encoder) Encode(aus [][]byte, firstPTS time.Duration) ([]*rtp.Packet, error) {
func (e *Encoder) Encode(aus [][]byte, firstPTS time.Duration) ([][]byte, error) { var rets []*rtp.Packet
var rets [][]byte
var batch [][]byte var batch [][]byte
pts := firstPTS pts := firstPTS
@@ -101,7 +100,7 @@ func (e *Encoder) Encode(aus [][]byte, firstPTS time.Duration) ([][]byte, error)
return rets, nil return rets, nil
} }
func (e *Encoder) writeBatch(aus [][]byte, firstPTS time.Duration) ([][]byte, error) { func (e *Encoder) writeBatch(aus [][]byte, firstPTS time.Duration) ([]*rtp.Packet, error) {
if len(aus) == 1 { if len(aus) == 1 {
// the AU fits into a single RTP packet // the AU fits into a single RTP packet
if len(aus[0]) < rtpPayloadMaxSize { if len(aus[0]) < rtpPayloadMaxSize {
@@ -115,14 +114,14 @@ func (e *Encoder) writeBatch(aus [][]byte, firstPTS time.Duration) ([][]byte, er
return e.writeAggregated(aus, firstPTS) return e.writeAggregated(aus, firstPTS)
} }
func (e *Encoder) writeFragmented(au []byte, pts time.Duration) ([][]byte, error) { func (e *Encoder) writeFragmented(au []byte, pts time.Duration) ([]*rtp.Packet, error) {
packetCount := len(au) / (rtpPayloadMaxSize - 4) packetCount := len(au) / (rtpPayloadMaxSize - 4)
lastPacketSize := len(au) % (rtpPayloadMaxSize - 4) lastPacketSize := len(au) % (rtpPayloadMaxSize - 4)
if lastPacketSize > 0 { if lastPacketSize > 0 {
packetCount++ packetCount++
} }
ret := make([][]byte, packetCount) ret := make([]*rtp.Packet, packetCount)
encPTS := e.encodeTimestamp(pts) encPTS := e.encodeTimestamp(pts)
for i := range ret { for i := range ret {
@@ -137,7 +136,7 @@ func (e *Encoder) writeFragmented(au []byte, pts time.Duration) ([][]byte, error
copy(data[4:], au[:le]) copy(data[4:], au[:le])
au = au[le:] au = au[le:]
frame, err := (&rtp.Packet{ ret[i] = &rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: rtpVersion, Version: rtpVersion,
PayloadType: e.payloadType, PayloadType: e.payloadType,
@@ -147,14 +146,9 @@ func (e *Encoder) writeFragmented(au []byte, pts time.Duration) ([][]byte, error
Marker: (i == (packetCount - 1)), Marker: (i == (packetCount - 1)),
}, },
Payload: data, Payload: data,
}).Marshal()
if err != nil {
return nil, err
} }
e.sequenceNumber++ e.sequenceNumber++
ret[i] = frame
} }
return ret, nil return ret, nil
@@ -176,7 +170,7 @@ func (e *Encoder) lenAggregated(aus [][]byte, addAU []byte) int {
return ret return ret
} }
func (e *Encoder) writeAggregated(aus [][]byte, firstPTS time.Duration) ([][]byte, error) { func (e *Encoder) writeAggregated(aus [][]byte, firstPTS time.Duration) ([]*rtp.Packet, error) {
payload := make([]byte, e.lenAggregated(aus, nil)) payload := make([]byte, e.lenAggregated(aus, nil))
// AU-headers-length // AU-headers-length
@@ -195,7 +189,7 @@ func (e *Encoder) writeAggregated(aus [][]byte, firstPTS time.Duration) ([][]byt
pos += auLen pos += auLen
} }
frame, err := (&rtp.Packet{ pkt := &rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: rtpVersion, Version: rtpVersion,
PayloadType: e.payloadType, PayloadType: e.payloadType,
@@ -205,12 +199,9 @@ func (e *Encoder) writeAggregated(aus [][]byte, firstPTS time.Duration) ([][]byt
Marker: true, Marker: true,
}, },
Payload: payload, Payload: payload,
}).Marshal()
if err != nil {
return nil, err
} }
e.sequenceNumber++ e.sequenceNumber++
return [][]byte{frame}, nil return []*rtp.Packet{pkt}, nil
} }

View File

@@ -420,9 +420,18 @@ func TestEncode(t *testing.T) {
ssrc := uint32(0x9dbb7812) ssrc := uint32(0x9dbb7812)
initialTs := uint32(0x88776655) initialTs := uint32(0x88776655)
e := NewEncoder(96, 48000, &sequenceNumber, &ssrc, &initialTs) e := NewEncoder(96, 48000, &sequenceNumber, &ssrc, &initialTs)
enc, err := e.Encode(ca.aus, ca.pts) enc, err := e.Encode(ca.aus, ca.pts)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, ca.enc, enc)
var bytss [][]byte
for _, pkt := range enc {
byts, err := pkt.Marshal()
require.NoError(t, err)
bytss = append(bytss, byts)
}
require.Equal(t, ca.enc, bytss)
}) })
} }
} }

View File

@@ -61,9 +61,8 @@ func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
} }
// Encode encodes NALUs into RTP/H264 packets. // Encode encodes NALUs into RTP/H264 packets.
// It returns the encoded packets. func (e *Encoder) Encode(nalus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
func (e *Encoder) Encode(nalus [][]byte, pts time.Duration) ([][]byte, error) { var rets []*rtp.Packet
var rets [][]byte
var batch [][]byte var batch [][]byte
// split NALUs into batches // split NALUs into batches
@@ -97,7 +96,7 @@ func (e *Encoder) Encode(nalus [][]byte, pts time.Duration) ([][]byte, error) {
return rets, nil return rets, nil
} }
func (e *Encoder) writeBatch(nalus [][]byte, pts time.Duration, marker bool) ([][]byte, error) { func (e *Encoder) writeBatch(nalus [][]byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
if len(nalus) == 1 { if len(nalus) == 1 {
// the NALU fits into a single RTP packet // the NALU fits into a single RTP packet
if len(nalus[0]) < rtpPayloadMaxSize { if len(nalus[0]) < rtpPayloadMaxSize {
@@ -111,8 +110,8 @@ func (e *Encoder) writeBatch(nalus [][]byte, pts time.Duration, marker bool) ([]
return e.writeAggregated(nalus, pts, marker) return e.writeAggregated(nalus, pts, marker)
} }
func (e *Encoder) writeSingle(nalu []byte, pts time.Duration, marker bool) ([][]byte, error) { func (e *Encoder) writeSingle(nalu []byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
frame, err := (&rtp.Packet{ pkt := &rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: rtpVersion, Version: rtpVersion,
PayloadType: e.payloadType, PayloadType: e.payloadType,
@@ -122,17 +121,14 @@ func (e *Encoder) writeSingle(nalu []byte, pts time.Duration, marker bool) ([][]
Marker: marker, Marker: marker,
}, },
Payload: nalu, Payload: nalu,
}).Marshal()
if err != nil {
return nil, err
} }
e.sequenceNumber++ e.sequenceNumber++
return [][]byte{frame}, nil return []*rtp.Packet{pkt}, nil
} }
func (e *Encoder) writeFragmented(nalu []byte, pts time.Duration, marker bool) ([][]byte, error) { func (e *Encoder) writeFragmented(nalu []byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
// use only FU-A, not FU-B, since we always use non-interleaved mode // use only FU-A, not FU-B, since we always use non-interleaved mode
// (packetization-mode=1) // (packetization-mode=1)
packetCount := (len(nalu) - 1) / (rtpPayloadMaxSize - 2) packetCount := (len(nalu) - 1) / (rtpPayloadMaxSize - 2)
@@ -141,7 +137,7 @@ func (e *Encoder) writeFragmented(nalu []byte, pts time.Duration, marker bool) (
packetCount++ packetCount++
} }
ret := make([][]byte, packetCount) ret := make([]*rtp.Packet, packetCount)
encPTS := e.encodeTimestamp(pts) encPTS := e.encodeTimestamp(pts)
nri := (nalu[0] >> 5) & 0x03 nri := (nalu[0] >> 5) & 0x03
@@ -169,7 +165,7 @@ func (e *Encoder) writeFragmented(nalu []byte, pts time.Duration, marker bool) (
copy(data[2:], nalu[:le]) copy(data[2:], nalu[:le])
nalu = nalu[le:] nalu = nalu[le:]
frame, err := (&rtp.Packet{ ret[i] = &rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: rtpVersion, Version: rtpVersion,
PayloadType: e.payloadType, PayloadType: e.payloadType,
@@ -179,14 +175,9 @@ func (e *Encoder) writeFragmented(nalu []byte, pts time.Duration, marker bool) (
Marker: (i == (packetCount-1) && marker), Marker: (i == (packetCount-1) && marker),
}, },
Payload: data, Payload: data,
}).Marshal()
if err != nil {
return nil, err
} }
e.sequenceNumber++ e.sequenceNumber++
ret[i] = frame
} }
return ret, nil return ret, nil
@@ -208,7 +199,7 @@ func (e *Encoder) lenAggregated(nalus [][]byte, addNALU []byte) int {
return ret return ret
} }
func (e *Encoder) writeAggregated(nalus [][]byte, pts time.Duration, marker bool) ([][]byte, error) { func (e *Encoder) writeAggregated(nalus [][]byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
payload := make([]byte, e.lenAggregated(nalus, nil)) payload := make([]byte, e.lenAggregated(nalus, nil))
// header // header
@@ -226,7 +217,7 @@ func (e *Encoder) writeAggregated(nalus [][]byte, pts time.Duration, marker bool
pos += naluLen pos += naluLen
} }
frame, err := (&rtp.Packet{ pkt := &rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
Version: rtpVersion, Version: rtpVersion,
PayloadType: e.payloadType, PayloadType: e.payloadType,
@@ -236,12 +227,9 @@ func (e *Encoder) writeAggregated(nalus [][]byte, pts time.Duration, marker bool
Marker: marker, Marker: marker,
}, },
Payload: payload, Payload: payload,
}).Marshal()
if err != nil {
return nil, err
} }
e.sequenceNumber++ e.sequenceNumber++
return [][]byte{frame}, nil return []*rtp.Packet{pkt}, nil
} }

View File

@@ -463,9 +463,18 @@ func TestEncode(t *testing.T) {
ssrc := uint32(0x9dbb7812) ssrc := uint32(0x9dbb7812)
initialTs := uint32(0x88776655) initialTs := uint32(0x88776655)
e := NewEncoder(96, &sequenceNumber, &ssrc, &initialTs) e := NewEncoder(96, &sequenceNumber, &ssrc, &initialTs)
enc, err := e.Encode(ca.nalus, ca.pts) enc, err := e.Encode(ca.nalus, ca.pts)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, ca.enc, enc)
var bytss [][]byte
for _, pkt := range enc {
byts, err := pkt.Marshal()
require.NoError(t, err)
bytss = append(bytss, byts)
}
require.Equal(t, ca.enc, bytss)
}) })
} }
} }