diff --git a/pkg/format/rtph264/encoder.go b/pkg/format/rtph264/encoder.go index b84bcb98..f6918097 100644 --- a/pkg/format/rtph264/encoder.go +++ b/pkg/format/rtph264/encoder.go @@ -23,6 +23,30 @@ func randUint32() (uint32, error) { return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil } +func lenAggregated(nalus [][]byte, addNALU []byte) int { + ret := 1 // header + + for _, nalu := range nalus { + ret += 2 // size + ret += len(nalu) // nalu + } + + if addNALU != nil { + ret += 2 // size + ret += len(addNALU) // nalu + } + + return ret +} + +func packetCount(avail, le int) int { + n := le / avail + if (le % avail) != 0 { + n++ + } + return n +} + // Encoder is a RTP/H264 encoder. // Specification: https://datatracker.ietf.org/doc/html/rfc6184 type Encoder struct { @@ -82,7 +106,7 @@ func (e *Encoder) Encode(nalus [][]byte) ([]*rtp.Packet, error) { // split NALUs into batches for _, nalu := range nalus { - if e.lenAggregated(batch, nalu) <= e.PayloadMaxSize { + if lenAggregated(batch, nalu) <= e.PayloadMaxSize { // add to existing batch batch = append(batch, nalu) } else { @@ -147,37 +171,27 @@ func (e *Encoder) writeFragmented(nalu []byte, marker bool) ([]*rtp.Packet, erro // (packetization-mode=1) avail := e.PayloadMaxSize - 2 le := len(nalu) - 1 - packetCount := le / avail - lastPacketSize := le % avail - if lastPacketSize > 0 { - packetCount++ - } + packetCount := packetCount(avail, le) ret := make([]*rtp.Packet, packetCount) nri := (nalu[0] >> 5) & 0x03 typ := nalu[0] & 0x1F nalu = nalu[1:] // remove header + le = avail + start := uint8(1) + end := uint8(0) for i := range ret { - indicator := (nri << 5) | uint8(h264.NALUTypeFUA) - - start := uint8(0) - if i == 0 { - start = 1 - } - end := uint8(0) - le := avail if i == (packetCount - 1) { end = 1 - le = lastPacketSize + le = len(nalu) } - header := (start << 7) | (end << 6) | typ data := make([]byte, 2+le) - data[0] = indicator - data[1] = header - copy(data[2:], nalu[:le]) + data[0] = (nri << 5) | uint8(h264.NALUTypeFUA) + data[1] = (start << 7) | (end << 6) | typ + copy(data[2:], nalu) nalu = nalu[le:] ret[i] = &rtp.Packet{ @@ -192,29 +206,14 @@ func (e *Encoder) writeFragmented(nalu []byte, marker bool) ([]*rtp.Packet, erro } e.sequenceNumber++ + start = 0 } return ret, nil } -func (e *Encoder) lenAggregated(nalus [][]byte, addNALU []byte) int { - ret := 1 // header - - for _, nalu := range nalus { - ret += 2 // size - ret += len(nalu) // nalu - } - - if addNALU != nil { - ret += 2 // size - ret += len(addNALU) // nalu - } - - return ret -} - func (e *Encoder) writeAggregated(nalus [][]byte, marker bool) ([]*rtp.Packet, error) { - payload := make([]byte, e.lenAggregated(nalus, nil)) + payload := make([]byte, lenAggregated(nalus, nil)) // header payload[0] = uint8(h264.NALUTypeSTAPA) diff --git a/pkg/format/rtph264/encoder_test.go b/pkg/format/rtph264/encoder_test.go index c11b0c94..9f63f80a 100644 --- a/pkg/format/rtph264/encoder_test.go +++ b/pkg/format/rtph264/encoder_test.go @@ -115,6 +115,38 @@ var cases = []struct { }, }, }, + { + "fragmented to the limit", + [][]byte{bytes.Repeat([]byte{1}, 2917)}, + []*rtp.Packet{ + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17645, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes( + []byte{0x1c, 0x81}, + bytes.Repeat([]byte{1}, 1458), + ), + }, + { + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 17646, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes( + []byte{0x1c, 0x41}, + bytes.Repeat([]byte{1}, 1458), + ), + }, + }, + }, { "aggregated", [][]byte{ diff --git a/pkg/format/rtph265/encoder.go b/pkg/format/rtph265/encoder.go index a06c3b30..d93b7efc 100644 --- a/pkg/format/rtph265/encoder.go +++ b/pkg/format/rtph265/encoder.go @@ -21,6 +21,14 @@ func randUint32() (uint32, error) { return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil } +func packetCount(avail, le int) int { + n := le / avail + if (le % avail) != 0 { + n++ + } + return n +} + // Encoder is a RTP/H265 encoder. // Specification: https://datatracker.ietf.org/doc/html/rfc7798 type Encoder struct { @@ -144,34 +152,27 @@ func (e *Encoder) writeSingle(nalu []byte, marker bool) ([]*rtp.Packet, error) { func (e *Encoder) writeFragmentationUnits(nalu []byte, marker bool) ([]*rtp.Packet, error) { avail := e.PayloadMaxSize - 3 le := len(nalu) - 2 - n := le / avail - lastPacketSize := le % avail - if lastPacketSize > 0 { - n++ - } + packetCount := packetCount(avail, le) - ret := make([]*rtp.Packet, n) + ret := make([]*rtp.Packet, packetCount) head := nalu[:2] nalu = nalu[2:] + le = avail + start := uint8(1) + end := uint8(0) for i := range ret { - start := uint8(0) - if i == 0 { - start = 1 - } - end := uint8(0) - le := avail - if i == (n - 1) { + if i == (packetCount - 1) { + le = len(nalu) end = 1 - le = lastPacketSize } data := make([]byte, 3+le) data[0] = head[0]&0b10000001 | 49<<1 data[1] = head[1] data[2] = (start << 7) | (end << 6) | (head[0]>>1)&0b111111 - copy(data[3:], nalu[:le]) + copy(data[3:], nalu) nalu = nalu[le:] ret[i] = &rtp.Packet{ @@ -180,12 +181,13 @@ func (e *Encoder) writeFragmentationUnits(nalu []byte, marker bool) ([]*rtp.Pack PayloadType: e.PayloadType, SequenceNumber: e.sequenceNumber, SSRC: *e.SSRC, - Marker: (i == (n-1) && marker), + Marker: (i == (packetCount-1) && marker), }, Payload: data, } e.sequenceNumber++ + start = 0 } return ret, nil diff --git a/pkg/format/rtph265/encoder_test.go b/pkg/format/rtph265/encoder_test.go index a64d5536..3b85b73d 100644 --- a/pkg/format/rtph265/encoder_test.go +++ b/pkg/format/rtph265/encoder_test.go @@ -124,6 +124,38 @@ var cases = []struct { }, }, }, + { + "fragmented to the limit", + [][]byte{bytes.Repeat([]byte{1}, 2916)}, + []*rtp.Packet{ + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17645, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes( + []byte{0x63, 0x01, 0x80}, + bytes.Repeat([]byte{1}, 1457), + ), + }, + { + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 17646, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes( + []byte{0x63, 0x01, 0x40}, + bytes.Repeat([]byte{1}, 1457), + ), + }, + }, + }, } func TestEncode(t *testing.T) { diff --git a/pkg/format/rtplpcm/encoder.go b/pkg/format/rtplpcm/encoder.go index 428f77cd..099fa57d 100644 --- a/pkg/format/rtplpcm/encoder.go +++ b/pkg/format/rtplpcm/encoder.go @@ -82,7 +82,6 @@ func (e *Encoder) packetCount(slen int) int { if (slen % e.maxPayloadSize) != 0 { n++ } - return n } @@ -95,12 +94,11 @@ func (e *Encoder) Encode(samples []byte) ([]*rtp.Packet, error) { packetCount := e.packetCount(slen) ret := make([]*rtp.Packet, packetCount) - i := 0 pos := 0 payloadSize := e.maxPayloadSize timestamp := uint32(0) - for { + for i := range ret { if payloadSize > len(samples[pos:]) { payloadSize = len(samples[pos:]) } @@ -118,13 +116,8 @@ func (e *Encoder) Encode(samples []byte) ([]*rtp.Packet, error) { } e.sequenceNumber++ - i++ pos += payloadSize timestamp += uint32(payloadSize / e.sampleSize) - - if pos == slen { - break - } } return ret, nil diff --git a/pkg/format/rtpmpeg1audio/encoder.go b/pkg/format/rtpmpeg1audio/encoder.go index 9f6efe1a..98feff83 100644 --- a/pkg/format/rtpmpeg1audio/encoder.go +++ b/pkg/format/rtpmpeg1audio/encoder.go @@ -29,6 +29,14 @@ func lenAggregated(frames [][]byte, frame []byte) int { return l } +func packetCount(avail, le int) int { + n := le / avail + if (le % avail) != 0 { + n++ + } + return n +} + // Encoder is a RTP/MPEG-1/2 Audio encoder. // Specification: https://datatracker.ietf.org/doc/html/rfc2250 type Encoder struct { @@ -127,21 +135,15 @@ func (e *Encoder) writeBatch(frames [][]byte, timestamp uint32) ([]*rtp.Packet, func (e *Encoder) writeFragmented(frame []byte, timestamp uint32) ([]*rtp.Packet, error) { avail := e.PayloadMaxSize - 4 le := len(frame) - packetCount := le / avail - lastPacketSize := le % avail - if lastPacketSize > 0 { - packetCount++ - } + packetCount := packetCount(avail, le) - pos := 0 ret := make([]*rtp.Packet, packetCount) + pos := 0 + le = avail for i := range ret { - var le int - if i != (packetCount - 1) { - le = avail - } else { - le = lastPacketSize + if i == (packetCount - 1) { + le = len(frame) - pos } payload := make([]byte, 4+le) diff --git a/pkg/format/rtpmpeg4audiogeneric/encoder.go b/pkg/format/rtpmpeg4audiogeneric/encoder.go index a00237ed..a41444a8 100644 --- a/pkg/format/rtpmpeg4audiogeneric/encoder.go +++ b/pkg/format/rtpmpeg4audiogeneric/encoder.go @@ -23,6 +23,14 @@ func randUint32() (uint32, error) { return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil } +func packetCount(avail, le int) int { + n := le / avail + if (le % avail) != 0 { + n++ + } + return n +} + // Encoder is a RTP/MPEG4-audio encoder. // Specification: https://datatracker.ietf.org/doc/html/rfc3640 type Encoder struct { @@ -132,20 +140,14 @@ func (e *Encoder) writeFragmented(au []byte, timestamp uint32) ([]*rtp.Packet, e avail := e.PayloadMaxSize - 2 - auHeadersLenBytes le := len(au) - packetCount := le / avail - lastPacketSize := le % avail - if lastPacketSize > 0 { - packetCount++ - } + packetCount := packetCount(avail, le) ret := make([]*rtp.Packet, packetCount) + le = avail for i := range ret { - var le int - if i != (packetCount - 1) { - le = avail - } else { - le = lastPacketSize + if i == (packetCount - 1) { + le = len(au) } payload := make([]byte, 2+auHeadersLenBytes+le) @@ -160,7 +162,7 @@ func (e *Encoder) writeFragmented(au []byte, timestamp uint32) ([]*rtp.Packet, e bits.WriteBits(payload[2:], &pos, 0, e.IndexLength) // AU - copy(payload[2+auHeadersLenBytes:], au[:le]) + copy(payload[2+auHeadersLenBytes:], au) au = au[le:] ret[i] = &rtp.Packet{ @@ -170,7 +172,7 @@ func (e *Encoder) writeFragmented(au []byte, timestamp uint32) ([]*rtp.Packet, e SequenceNumber: e.sequenceNumber, Timestamp: timestamp, SSRC: *e.SSRC, - Marker: (i == (packetCount - 1)), + Marker: (i == packetCount-1), }, Payload: payload, } diff --git a/pkg/format/rtpmpeg4audiogeneric/encoder_test.go b/pkg/format/rtpmpeg4audiogeneric/encoder_test.go index 0c6f0a06..232d1876 100644 --- a/pkg/format/rtpmpeg4audiogeneric/encoder_test.go +++ b/pkg/format/rtpmpeg4audiogeneric/encoder_test.go @@ -229,6 +229,41 @@ var cases = []struct { }, }, }, + { + "fragmented to the limit", + 13, + 3, + 3, + [][]byte{bytes.Repeat([]byte{1}, 2912)}, + []*rtp.Packet{ + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17645, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes( + []byte{0x0, 0x10, 0x2d, 0x80}, + bytes.Repeat([]byte{1}, 1456), + ), + }, + { + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 17646, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes( + []byte{0x0, 0x10, 0x2d, 0x80}, + bytes.Repeat([]byte{1}, 1456), + ), + }, + }, + }, { "aggregated followed by fragmented", 13, diff --git a/pkg/format/rtpmpeg4audiolatm/encoder.go b/pkg/format/rtpmpeg4audiolatm/encoder.go index 350bd6e3..4308e042 100644 --- a/pkg/format/rtpmpeg4audiolatm/encoder.go +++ b/pkg/format/rtpmpeg4audiolatm/encoder.go @@ -68,12 +68,11 @@ func (e *Encoder) Init() error { func (e *Encoder) packetCount(auLen int, plil int) int { totalLen := plil + auLen - packetCount := totalLen / e.PayloadMaxSize - lastPacketSize := totalLen % e.PayloadMaxSize - if lastPacketSize > 0 { - packetCount++ + n := totalLen / e.PayloadMaxSize + if (totalLen % e.PayloadMaxSize) != 0 { + n++ } - return packetCount + return n } // Encode encodes an access unit into RTP packets. @@ -82,29 +81,25 @@ func (e *Encoder) Encode(au []byte) ([]*rtp.Packet, error) { plil := payloadLengthInfoEncodeSize(auLen) packetCount := e.packetCount(auLen, plil) - avail := e.PayloadMaxSize - plil ret := make([]*rtp.Packet, packetCount) + le := e.PayloadMaxSize - plil for i := range ret { - var final bool - var l int - - if len(au) < avail { - l = len(au) - final = true - } else { - l = avail - final = false + if i == (packetCount - 1) { + le = len(au) } var payload []byte if i == 0 { - payload = make([]byte, plil+l) + payload = make([]byte, plil+le) payloadLengthInfoEncode(plil, auLen, payload) - copy(payload[plil:], au[:l]) + copy(payload[plil:], au[:le]) + au = au[le:] + le = e.PayloadMaxSize } else { - payload = au[:l] + payload = au[:le] + au = au[le:] } ret[i] = &rtp.Packet{ @@ -113,19 +108,12 @@ func (e *Encoder) Encode(au []byte) ([]*rtp.Packet, error) { PayloadType: e.PayloadType, SequenceNumber: e.sequenceNumber, SSRC: *e.SSRC, - Marker: final, + Marker: (i == packetCount-1), }, Payload: payload, } e.sequenceNumber++ - - if final { - break - } - - au = au[l:] - avail = e.PayloadMaxSize } return ret, nil diff --git a/pkg/format/rtpmpeg4audiolatm/encoder_test.go b/pkg/format/rtpmpeg4audiolatm/encoder_test.go index 00af7534..2e9103b6 100644 --- a/pkg/format/rtpmpeg4audiolatm/encoder_test.go +++ b/pkg/format/rtpmpeg4audiolatm/encoder_test.go @@ -4,7 +4,6 @@ import ( "bytes" "testing" - "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio" "github.com/pion/rtp" "github.com/stretchr/testify/require" ) @@ -34,25 +33,12 @@ func mergeBytes(vals ...[]byte) []byte { } var cases = []struct { - name string - config *mpeg4audio.StreamMuxConfig - au []byte - pkts []*rtp.Packet + name string + au []byte + pkts []*rtp.Packet }{ { "single", - &mpeg4audio.StreamMuxConfig{ - Programs: []*mpeg4audio.StreamMuxConfigProgram{{ - Layers: []*mpeg4audio.StreamMuxConfigLayer{{ - AudioSpecificConfig: &mpeg4audio.AudioSpecificConfig{ - Type: 2, - SampleRate: 48000, - ChannelCount: 2, - }, - LatmBufferFullness: 255, - }}, - }}, - }, []byte{1, 2, 3, 4}, []*rtp.Packet{ { @@ -71,18 +57,6 @@ var cases = []struct { }, { "fragmented", - &mpeg4audio.StreamMuxConfig{ - Programs: []*mpeg4audio.StreamMuxConfigProgram{{ - Layers: []*mpeg4audio.StreamMuxConfigLayer{{ - AudioSpecificConfig: &mpeg4audio.AudioSpecificConfig{ - Type: 2, - SampleRate: 48000, - ChannelCount: 2, - }, - LatmBufferFullness: 255, - }}, - }}, - }, bytes.Repeat([]byte{0, 1, 2, 3, 4, 5, 6, 7}, 512), []*rtp.Packet{ { @@ -129,6 +103,38 @@ var cases = []struct { }, }, }, + { + "fragmented to the limit", + bytes.Repeat([]byte{1}, 2908), + []*rtp.Packet{ + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17645, + SSRC: 2646308882, + }, + Payload: mergeBytes( + bytes.Repeat([]byte{0xff}, 11), + []byte{0x67}, + bytes.Repeat([]byte{1}, 1448), + ), + }, + { + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 17646, + SSRC: 2646308882, + }, + Payload: mergeBytes( + bytes.Repeat([]byte{1}, 1460), + ), + }, + }, + }, } func TestEncode(t *testing.T) { diff --git a/pkg/format/rtpmpeg4video/encoder.go b/pkg/format/rtpmpeg4video/encoder.go index be6d1a15..bf3fa0d2 100644 --- a/pkg/format/rtpmpeg4video/encoder.go +++ b/pkg/format/rtpmpeg4video/encoder.go @@ -20,6 +20,14 @@ func randUint32() (uint32, error) { return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil } +func packetCount(avail, le int) int { + n := le / avail + if (le % avail) != 0 { + n++ + } + return n +} + // Encoder is a RTP/MPEG-4 Video encoder. // Specification: https://datatracker.ietf.org/doc/html/rfc6416 type Encoder struct { @@ -66,29 +74,18 @@ func (e *Encoder) Init() error { return nil } -func packetCount(avail, le int) int { - packetCount := le / avail - lastPacketSize := le % avail - if lastPacketSize > 0 { - packetCount++ - } - return packetCount -} - // Encode encodes a frame into RTP packets. func (e *Encoder) Encode(frame []byte) ([]*rtp.Packet, error) { avail := e.PayloadMaxSize le := len(frame) packetCount := packetCount(avail, le) - pos := 0 ret := make([]*rtp.Packet, packetCount) + pos := 0 + le = avail for i := range ret { - var le int - if i != (packetCount - 1) { - le = avail - } else { + if i == (packetCount - 1) { le = len(frame[pos:]) } @@ -98,7 +95,7 @@ func (e *Encoder) Encode(frame []byte) ([]*rtp.Packet, error) { PayloadType: e.PayloadType, SequenceNumber: e.sequenceNumber, SSRC: *e.SSRC, - Marker: (i == len(ret)-1), + Marker: (i == packetCount-1), }, Payload: frame[pos : pos+le], }