add RTP/AV1 decoder and encoder (#270)

This commit is contained in:
Alessandro Ros
2023-05-04 18:26:03 +02:00
committed by GitHub
parent a9a7426412
commit 2170ef4b00
28 changed files with 1535 additions and 109 deletions

View File

@@ -44,8 +44,8 @@ Features:
* Utilities
* Parse RTSP elements
* Encode/decode format-specific frames into/from RTP packets. The following formats are supported:
* Video: H264, H265, M-JPEG, VP8, VP9, MPEG-4 Video (H263, Xvid)
* Audio: G711 (PCMA, PCMU), G722, LPCM, MPEG-2 Audio (MP3), MPEG-4 Audio (AAC), Opus
* Video: AV1, VP9, VP8, H265, H264, MPEG-4 Video (H263, Xvid), M-JPEG
* Audio: Opus, MPEG-4 Audio (AAC), MPEG-2 Audio (MP3), G722, G711 (PCMA, PCMU), LPCM
## Table of contents
@@ -97,6 +97,7 @@ https://pkg.go.dev/github.com/bluenviron/gortsplib/v3#pkg-index
## Standards
* [Codec standards](https://github.com/bluenviron/mediacommon#standards)
* [RFC2326, RTSP 1.0](https://datatracker.ietf.org/doc/html/rfc2326)
* [RFC7826, RTSP 2.0](https://datatracker.ietf.org/doc/html/rfc7826)
* [RFC8866, SDP: Session Description Protocol](https://datatracker.ietf.org/doc/html/rfc8866)
@@ -113,9 +114,6 @@ https://pkg.go.dev/github.com/bluenviron/gortsplib/v3#pkg-index
* [RFC7587, RTP Payload Format for the Opus Speech and Audio Codec](https://datatracker.ietf.org/doc/html/rfc7587)
* [RFC3640, RTP Payload Format for Transport of MPEG-4 Elementary Streams](https://datatracker.ietf.org/doc/html/rfc3640)
* [RTP Payload Format For AV1 (v1.0)](https://aomediacodec.github.io/av1-rtp-spec/)
* [ITU-T Rec. H.264 (08/2021)](https://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.264-202108-I!!PDF-E&type=items)
* [ITU-T Rec. H.265 (08/2021)](https://www.itu.int/rec/dologin_pub.asp?lang=e&id=T-REC-H.265-202108-I!!PDF-E&type=items)
* ISO 14496-3, Coding of audio-visual objects, part 3, Audio
* [Golang project layout](https://github.com/golang-standards/project-layout)
## Links

2
go.mod
View File

@@ -4,7 +4,7 @@ go 1.18
require (
github.com/asticode/go-astits v1.11.0
github.com/bluenviron/mediacommon v0.4.2
github.com/bluenviron/mediacommon v0.5.0
github.com/google/uuid v1.3.0
github.com/pion/rtcp v1.2.10
github.com/pion/rtp v0.0.0-20230107162714-c3ea6851e25b

4
go.sum
View File

@@ -2,8 +2,8 @@ github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflx
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astits v1.11.0 h1:GTHUXht0ZXAJXsVbsLIcyfHr1Bchi4QQwMARw2ZWAng=
github.com/asticode/go-astits v1.11.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/bluenviron/mediacommon v0.4.2 h1:rdghY3g70+fdviapO2hL6CHpOGeTd7KbH1aEZnMwh88=
github.com/bluenviron/mediacommon v0.4.2/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/bluenviron/mediacommon v0.5.0 h1:YsVFlEknaXWhZGfz+Y1QbuzXLMVSmHODc7OnRqZoITY=
github.com/bluenviron/mediacommon v0.5.0/go.mod h1:t0dqPsWUTchyvib0MhixIwXEgvDX4V9G+I0GzWLQRb8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -1,10 +1,12 @@
package formats
package formats //nolint:dupl
import (
"fmt"
"strconv"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v3/pkg/formats/rtpav1"
)
// AV1 is a RTP format that uses the AV1 codec.
@@ -94,3 +96,19 @@ func (f *AV1) FMTP() map[string]string {
func (f *AV1) PTSEqualsDTS(*rtp.Packet) bool {
return true
}
// CreateDecoder creates a decoder able to decode the content of the format.
func (f *AV1) CreateDecoder() *rtpav1.Decoder {
d := &rtpav1.Decoder{}
d.Init()
return d
}
// CreateEncoder creates an encoder able to encode the content of the format.
func (f *AV1) CreateEncoder() *rtpav1.Encoder {
e := &rtpav1.Encoder{
PayloadType: f.PayloadTyp,
}
e.Init()
return e
}

View File

@@ -15,3 +15,17 @@ func TestAV1Attributes(t *testing.T) {
require.Equal(t, 90000, format.ClockRate())
require.Equal(t, true, format.PTSEqualsDTS(&rtp.Packet{}))
}
func TestAV1DecEncoder(t *testing.T) {
format := &AV1{}
enc := format.CreateEncoder()
pkts, err := enc.Encode([][]byte{{0x01, 0x02, 0x03, 0x04}}, 0)
require.NoError(t, err)
require.Equal(t, format.PayloadType(), pkts[0].PayloadType)
dec := format.CreateDecoder()
byts, _, err := dec.Decode(pkts[0])
require.NoError(t, err)
require.Equal(t, [][]byte{{0x01, 0x02, 0x03, 0x04}}, byts)
}

View File

@@ -0,0 +1,150 @@
package rtpav1
import (
"errors"
"fmt"
"time"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/bluenviron/gortsplib/v3/pkg/rtptime"
)
// ErrMorePacketsNeeded is returned when more packets are needed.
var ErrMorePacketsNeeded = errors.New("need more packets")
// ErrNonStartingPacketAndNoPrevious is returned when we received a non-starting
// packet of a fragmented NALU and we didn't received anything before.
// It's normal to receive this when decoding a stream that has been already
// running for some time.
var ErrNonStartingPacketAndNoPrevious = errors.New(
"received a non-starting fragment without any previous starting fragment")
func joinFragments(fragments [][]byte, size int) []byte {
ret := make([]byte, size)
n := 0
for _, p := range fragments {
n += copy(ret[n:], p)
}
return ret
}
// Decoder is a RTP/AV1 decoder.
// Specification: https://aomediacodec.github.io/av1-rtp-spec/
type Decoder struct {
timeDecoder *rtptime.Decoder
firstPacketReceived bool
fragmentsSize int
fragments [][]byte
// for DecodeUntilMarker()
obuBuffer [][]byte
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptime.NewDecoder(90000)
}
// Decode decodes OBUs from a RTP packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
var av1header codecs.AV1Packet
_, err := av1header.Unmarshal(pkt.Payload)
if err != nil {
d.fragments = d.fragments[:0] // discard pending fragments
d.fragmentsSize = 0
return nil, 0, fmt.Errorf("invalid header: %v", err)
}
for _, el := range av1header.OBUElements {
if len(el) == 0 {
return nil, 0, fmt.Errorf("invalid OBU fragment")
}
}
if av1header.Z {
if len(d.fragments) == 0 {
if !d.firstPacketReceived {
return nil, 0, ErrNonStartingPacketAndNoPrevious
}
return nil, 0, fmt.Errorf("received a subsequent fragment without previous fragments")
}
d.fragmentsSize += len(av1header.OBUElements[0])
if d.fragmentsSize > av1.MaxOBUSize {
d.fragments = d.fragments[:0]
d.fragmentsSize = 0
return nil, 0, fmt.Errorf("OBU size (%d) is too big, maximum is %d", d.fragmentsSize, av1.MaxOBUSize)
}
d.fragments = append(d.fragments, av1header.OBUElements[0])
av1header.OBUElements = av1header.OBUElements[1:]
}
d.firstPacketReceived = true
var obus [][]byte
if len(av1header.OBUElements) > 0 {
if d.fragmentsSize != 0 {
obus = append(obus, joinFragments(d.fragments, d.fragmentsSize))
d.fragments = d.fragments[:0]
d.fragmentsSize = 0
}
if av1header.Y {
elementCount := len(av1header.OBUElements)
d.fragmentsSize += len(av1header.OBUElements[elementCount-1])
if d.fragmentsSize > av1.MaxOBUSize {
d.fragments = d.fragments[:0]
d.fragmentsSize = 0
return nil, 0, fmt.Errorf("OBU size (%d) is too big, maximum is %d", d.fragmentsSize, av1.MaxOBUSize)
}
d.fragments = append(d.fragments, av1header.OBUElements[elementCount-1])
av1header.OBUElements = av1header.OBUElements[:elementCount-1]
}
obus = append(obus, av1header.OBUElements...)
} else if !av1header.Y {
obus = append(obus, joinFragments(d.fragments, d.fragmentsSize))
d.fragments = d.fragments[:0]
d.fragmentsSize = 0
}
if len(obus) == 0 {
return nil, 0, ErrMorePacketsNeeded
}
return obus, d.timeDecoder.Decode(pkt.Timestamp), nil
}
// DecodeUntilMarker decodes OBUs from a RTP packet and puts them in a buffer.
// When a packet has the marker flag (meaning that all OBUs with the same PTS have
// been received), the buffer is returned.
func (d *Decoder) DecodeUntilMarker(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
obus, pts, err := d.Decode(pkt)
if err != nil {
return nil, 0, err
}
if (len(d.obuBuffer) + len(obus)) > av1.MaxOBUsPerTemporalUnit {
return nil, 0, fmt.Errorf("OBU count (%d) exceeds maximum allowed (%d)",
len(d.obuBuffer)+len(obus), av1.MaxOBUsPerTemporalUnit)
}
d.obuBuffer = append(d.obuBuffer, obus...)
if !pkt.Marker {
return nil, 0, ErrMorePacketsNeeded
}
ret := d.obuBuffer
d.obuBuffer = d.obuBuffer[:0]
return ret, pts, nil
}

View File

@@ -0,0 +1,62 @@
package rtpav1
import (
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
d.Init()
var obus [][]byte
for _, pkt := range ca.pkts {
addOBUs, _, err := d.Decode(pkt)
if err == ErrMorePacketsNeeded {
continue
}
require.NoError(t, err)
obus = append(obus, addOBUs...)
}
require.Equal(t, ca.obus, obus)
})
}
}
func FuzzDecoder(f *testing.F) {
f.Fuzz(func(t *testing.T, a []byte, am bool, b []byte, bm bool) {
d := &Decoder{}
d.Init()
d.Decode(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: am,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: a,
})
d.Decode(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: bm,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: b,
})
})
}

View File

@@ -0,0 +1,147 @@
package rtpav1
import (
"crypto/rand"
"time"
"github.com/bluenviron/mediacommon/pkg/codecs/av1"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v3/pkg/rtptime"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/AV1 encoder.
// Specification: https://aomediacodec.github.io/av1-rtp-spec/
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
sequenceNumber uint16
timeEncoder *rtptime.Encoder
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
e.timeEncoder = rtptime.NewEncoder(90000, *e.InitialTimestamp)
}
// Encode encodes OBUs into RTP packets.
func (e *Encoder) Encode(obus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
isKeyFrame, err := av1.ContainsKeyFrame(obus)
if err != nil {
return nil, err
}
ts := e.timeEncoder.Encode(pts)
var curPacket *rtp.Packet
var packets []*rtp.Packet
curPayloadLen := 0
createNewPacket := func(z bool) {
curPacket = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: ts,
SSRC: *e.SSRC,
},
Payload: []byte{0},
}
e.sequenceNumber++
packets = append(packets, curPacket)
curPayloadLen = 1
if z {
curPacket.Payload[0] |= 1 << 7
}
}
finalizeCurPacket := func(y bool) {
if y {
curPacket.Payload[0] |= 1 << 6
}
}
createNewPacket(false)
for _, obu := range obus {
for {
avail := e.PayloadMaxSize - curPayloadLen
obuLen := len(obu)
needed := obuLen + 2
if needed <= avail {
le := av1.LEB128Marshal(uint(obuLen))
curPacket.Payload = append(curPacket.Payload, le...)
curPacket.Payload = append(curPacket.Payload, obu...)
curPayloadLen += len(le) + obuLen
break
}
if avail > 2 {
fragmentLen := avail - 2
le := av1.LEB128Marshal(uint(fragmentLen))
curPacket.Payload = append(curPacket.Payload, le...)
curPacket.Payload = append(curPacket.Payload, obu[:fragmentLen]...)
obu = obu[fragmentLen:]
}
finalizeCurPacket(true)
createNewPacket(true)
}
}
finalizeCurPacket(false)
if isKeyFrame {
packets[0].Payload[0] |= 1 << 3
}
packets[len(packets)-1].Marker = true
return packets, nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,2 @@
// Package rtpav1 contains a RTP/AV1 decoder and encoder.
package rtpav1

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("")
bool(false)
[]byte("")
bool(false)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("\x190")
bool(false)
[]byte("\xd00")
bool(false)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("0\x00")
bool(true)
[]byte("0")
bool(false)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("\xd00")
bool(false)
[]byte("")
bool(false)

View File

@@ -39,7 +39,7 @@ type Decoder struct {
timeDecoder *rtptime.Decoder
firstPacketReceived bool
fragmentedSize int
fragmentsSize int
fragments [][]byte
annexBMode bool
@@ -59,7 +59,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
}
if len(pkt.Payload) < 1 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("payload is too short")
}
@@ -76,7 +76,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
end := (pkt.Payload[1] >> 6) & 0x01
if start == 1 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
if end != 0 {
return nil, 0, fmt.Errorf("invalid FU-A packet (can't contain both a start and end bit)")
@@ -84,7 +84,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
nri := (pkt.Payload[0] >> 5) & 0x03
typ := pkt.Payload[1] & 0x1F
d.fragmentedSize = len(pkt.Payload[1:])
d.fragmentsSize = len(pkt.Payload[1:])
d.fragments = append(d.fragments, []byte{(nri << 5) | typ}, pkt.Payload[2:])
d.firstPacketReceived = true
@@ -99,10 +99,10 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
return nil, 0, fmt.Errorf("invalid FU-A packet (non-starting)")
}
d.fragmentedSize += len(pkt.Payload[2:])
if d.fragmentedSize > h264.MaxNALUSize {
d.fragmentsSize += len(pkt.Payload[2:])
if d.fragmentsSize > h264.MaxNALUSize {
d.fragments = d.fragments[:0]
return nil, 0, fmt.Errorf("NALU size (%d) is too big (maximum is %d)", d.fragmentedSize, h264.MaxNALUSize)
return nil, 0, fmt.Errorf("NALU size (%d) is too big, maximum is %d", d.fragmentsSize, h264.MaxNALUSize)
}
d.fragments = append(d.fragments, pkt.Payload[2:])
@@ -111,12 +111,12 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
return nil, 0, ErrMorePacketsNeeded
}
nalus = [][]byte{joinFragments(d.fragments, d.fragmentedSize)}
nalus = [][]byte{joinFragments(d.fragments, d.fragmentsSize)}
d.fragments = d.fragments[:0]
case h264.NALUTypeSTAPA:
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
payload := pkt.Payload[1:]
@@ -149,12 +149,12 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
case h264.NALUTypeSTAPB, h264.NALUTypeMTAP16,
h264.NALUTypeMTAP24, h264.NALUTypeFUB:
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
d.firstPacketReceived = true
return nil, 0, fmt.Errorf("packet type not supported (%v)", typ)
default:
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
d.firstPacketReceived = true
nalus = [][]byte{pkt.Payload}
}

View File

@@ -146,8 +146,10 @@ func (e *Encoder) writeSingle(nalu []byte, pts time.Duration, marker bool) ([]*r
func (e *Encoder) writeFragmented(nalu []byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
// use only FU-A, not FU-B, since we always use non-interleaved mode
// (packetization-mode=1)
packetCount := (len(nalu) - 1) / (e.PayloadMaxSize - 2)
lastPacketSize := (len(nalu) - 1) % (e.PayloadMaxSize - 2)
avail := e.PayloadMaxSize - 2
le := len(nalu) - 1
packetCount := le / avail
lastPacketSize := le % avail
if lastPacketSize > 0 {
packetCount++
}
@@ -167,7 +169,7 @@ func (e *Encoder) writeFragmented(nalu []byte, pts time.Duration, marker bool) (
start = 1
}
end := uint8(0)
le := e.PayloadMaxSize - 2
le := avail
if i == (packetCount - 1) {
end = 1
le = lastPacketSize

View File

@@ -38,7 +38,7 @@ type Decoder struct {
timeDecoder *rtptime.Decoder
firstPacketReceived bool
fragmentedSize int
fragmentsSize int
fragments [][]byte
// for DecodeUntilMarker()
@@ -57,7 +57,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
}
if len(pkt.Payload) < 2 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("payload is too short")
}
@@ -66,7 +66,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
switch typ {
case h265.NALUType_AggregationUnit:
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
payload := pkt.Payload[2:]
@@ -98,7 +98,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
case h265.NALUType_FragmentationUnit:
if len(pkt.Payload) < 3 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("payload is too short")
}
@@ -106,7 +106,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
end := (pkt.Payload[2] >> 6) & 0x01
if start == 1 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
if end != 0 {
return nil, 0, fmt.Errorf("invalid fragmentation unit (can't contain both a start and end bit)")
@@ -114,7 +114,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
typ := pkt.Payload[2] & 0b111111
head := uint16(pkt.Payload[0]&0b10000001)<<8 | uint16(typ)<<9 | uint16(pkt.Payload[1])
d.fragmentedSize = len(pkt.Payload[1:])
d.fragmentsSize = len(pkt.Payload[1:])
d.fragments = append(d.fragments, []byte{byte(head >> 8), byte(head)}, pkt.Payload[3:])
d.firstPacketReceived = true
@@ -129,10 +129,10 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
return nil, 0, fmt.Errorf("invalid fragmentation unit (non-starting)")
}
d.fragmentedSize += len(pkt.Payload[3:])
if d.fragmentedSize > h265.MaxNALUSize {
d.fragmentsSize += len(pkt.Payload[3:])
if d.fragmentsSize > h265.MaxNALUSize {
d.fragments = d.fragments[:0]
return nil, 0, fmt.Errorf("NALU size (%d) is too big (maximum is %d)", d.fragmentedSize, h265.MaxNALUSize)
return nil, 0, fmt.Errorf("NALU size (%d) is too big, maximum is %d", d.fragmentsSize, h265.MaxNALUSize)
}
d.fragments = append(d.fragments, pkt.Payload[3:])
@@ -141,17 +141,17 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
return nil, 0, ErrMorePacketsNeeded
}
nalus = [][]byte{joinFragments(d.fragments, d.fragmentedSize)}
nalus = [][]byte{joinFragments(d.fragments, d.fragmentsSize)}
d.fragments = d.fragments[:0]
case h265.NALUType_PACI:
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
d.firstPacketReceived = true
return nil, 0, fmt.Errorf("PACI packets are not supported (yet)")
default:
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
d.firstPacketReceived = true
nalus = [][]byte{pkt.Payload}
}

View File

@@ -144,8 +144,10 @@ func (e *Encoder) writeSingle(nalu []byte, pts time.Duration, marker bool) ([]*r
}
func (e *Encoder) writeFragmentationUnits(nalu []byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
n := (len(nalu) - 2) / (e.PayloadMaxSize - 3)
lastPacketSize := (len(nalu) - 2) % (e.PayloadMaxSize - 3)
avail := e.PayloadMaxSize - 3
le := len(nalu) - 2
n := le / avail
lastPacketSize := le % avail
if lastPacketSize > 0 {
n++
}
@@ -162,7 +164,7 @@ func (e *Encoder) writeFragmentationUnits(nalu []byte, pts time.Duration, marker
start = 1
}
end := uint8(0)
le := e.PayloadMaxSize - 3
le := avail
if i == (n - 1) {
end = 1
le = lastPacketSize

View File

@@ -108,7 +108,7 @@ func joinFragments(fragments [][]byte, size int) []byte {
type Decoder struct {
timeDecoder *rtptime.Decoder
firstPacketReceived bool
fragmentedSize int
fragmentsSize int
fragments [][]byte
firstJpegHeader *headers.JPEG
firstQTHeader *headers.QuantizationTable
@@ -139,8 +139,8 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
}
if jh.FragmentOffset == 0 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragmentedSize = 0
d.fragments = d.fragments[:0] // discard pending fragments
d.fragmentsSize = 0
d.firstPacketReceived = true
if jh.Quantization >= 128 {
@@ -155,20 +155,20 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
}
d.fragments = append(d.fragments, byts)
d.fragmentedSize = len(byts)
d.fragmentsSize = len(byts)
d.firstJpegHeader = &jh
} else {
if int(jh.FragmentOffset) != d.fragmentedSize {
if int(jh.FragmentOffset) != d.fragmentsSize {
if !d.firstPacketReceived {
return nil, 0, ErrNonStartingPacketAndNoPrevious
}
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragmentedSize = 0
d.fragments = d.fragments[:0] // discard pending fragments
d.fragmentsSize = 0
return nil, 0, fmt.Errorf("received wrong fragment")
}
d.fragmentedSize += len(byts)
d.fragmentsSize += len(byts)
d.fragments = append(d.fragments, byts)
}
@@ -176,14 +176,14 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
return nil, 0, ErrMorePacketsNeeded
}
if d.fragmentedSize < 2 {
if d.fragmentsSize < 2 {
return nil, 0, fmt.Errorf("invalid data")
}
data := joinFragments(d.fragments, d.fragmentedSize)
data := joinFragments(d.fragments, d.fragmentsSize)
d.fragments = d.fragments[:0]
d.fragmentedSize = 0
d.fragmentsSize = 0
var buf []byte

View File

@@ -36,7 +36,7 @@ type Decoder struct {
timeDecoder *rtptime.Decoder
firstPacketReceived bool
fragments [][]byte
fragmentedSize int
fragmentsSize int
fragmentsExpected int
}
@@ -48,15 +48,15 @@ func (d *Decoder) Init() {
// Decode decodes frames from a RTP packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
if len(pkt.Payload) < 5 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragmentedSize = 0
d.fragments = d.fragments[:0] // discard pending fragments
d.fragmentsSize = 0
return nil, 0, fmt.Errorf("payload is too short")
}
mbz := uint16(pkt.Payload[0])<<8 | uint16(pkt.Payload[1])
if mbz != 0 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragmentedSize = 0
d.fragments = d.fragments[:0] // discard pending fragments
d.fragmentsSize = 0
return nil, 0, fmt.Errorf("invalid MBZ: %v", mbz)
}
@@ -65,8 +65,8 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
var frames [][]byte
if offset == 0 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragmentedSize = 0
d.fragments = d.fragments[:0] // discard pending fragments
d.fragmentsSize = 0
d.firstPacketReceived = true
buf := pkt.Payload[4:]
@@ -91,29 +91,29 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
}
d.fragments = append(d.fragments, buf)
d.fragmentedSize = bl
d.fragmentsSize = bl
d.fragmentsExpected = fl - bl
return nil, 0, ErrMorePacketsNeeded
}
}
} else {
if int(offset) != d.fragmentedSize {
if int(offset) != d.fragmentsSize {
if !d.firstPacketReceived {
return nil, 0, ErrNonStartingPacketAndNoPrevious
}
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragmentedSize = 0
return nil, 0, fmt.Errorf("unexpected offset %v, expected %v", offset, d.fragmentedSize)
d.fragments = d.fragments[:0] // discard pending fragments
d.fragmentsSize = 0
return nil, 0, fmt.Errorf("unexpected offset %v, expected %v", offset, d.fragmentsSize)
}
bl := len(pkt.Payload[4:])
d.fragmentedSize += bl
d.fragmentsSize += bl
d.fragmentsExpected -= bl
if d.fragmentsExpected < 0 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragmentedSize = 0
d.fragments = d.fragments[:0] // discard pending fragments
d.fragmentsSize = 0
return nil, 0, fmt.Errorf("fragment is too big")
}
@@ -123,10 +123,10 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
return nil, 0, ErrMorePacketsNeeded
}
frames = [][]byte{joinFragments(d.fragments, d.fragmentedSize)}
frames = [][]byte{joinFragments(d.fragments, d.fragmentsSize)}
d.fragments = d.fragments[:0]
d.fragmentedSize = 0
d.fragmentsSize = 0
}
return frames, d.timeDecoder.Decode(pkt.Timestamp), nil

View File

@@ -125,10 +125,10 @@ func (e *Encoder) writeBatch(frames [][]byte, pts time.Duration) ([]*rtp.Packet,
}
func (e *Encoder) writeFragmented(frame []byte, pts time.Duration) ([]*rtp.Packet, error) {
availPerPacket := e.PayloadMaxSize - 4
avail := e.PayloadMaxSize - 4
le := len(frame)
packetCount := le / availPerPacket
lastPacketSize := le % availPerPacket
packetCount := le / avail
lastPacketSize := le % avail
if lastPacketSize > 0 {
packetCount++
}
@@ -140,7 +140,7 @@ func (e *Encoder) writeFragmented(frame []byte, pts time.Duration) ([]*rtp.Packe
for i := range ret {
var le int
if i != (packetCount - 1) {
le = availPerPacket
le = avail
} else {
le = lastPacketSize
}

View File

@@ -39,11 +39,11 @@ type Decoder struct {
// The number of bits in which the AU-Index-delta field is encoded in any non-first AU-header.
IndexDeltaLength int
timeDecoder *rtptime.Decoder
firstAUParsed bool
adtsMode bool
fragments [][]byte
fragmentedSize int
timeDecoder *rtptime.Decoder
firstAUParsed bool
adtsMode bool
fragments [][]byte
fragmentsSize int
}
// Init initializes the decoder.
@@ -56,14 +56,14 @@ func (d *Decoder) Init() {
// The PTS of subsequent AUs can be calculated by adding time.Second*mpeg4audio.SamplesPerAccessUnit/clockRate.
func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
if len(pkt.Payload) < 2 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("payload is too short")
}
// AU-headers-length (16 bits)
headersLen := int(uint16(pkt.Payload[0])<<8 | uint16(pkt.Payload[1]))
if headersLen == 0 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("invalid AU-headers-length")
}
payload := pkt.Payload[2:]
@@ -71,7 +71,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
// AU-headers
dataLens, err := d.readAUHeaders(payload, headersLen)
if err != nil {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, err
}
@@ -104,26 +104,26 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
return nil, 0, fmt.Errorf("payload is too short")
}
d.fragmentedSize = int(dataLens[0])
d.fragmentsSize = int(dataLens[0])
d.fragments = append(d.fragments, payload[:dataLens[0]])
return nil, 0, ErrMorePacketsNeeded
}
} else {
// we are decoding a fragmented AU
if len(dataLens) != 1 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("a fragmented packet can only contain one AU")
}
if len(payload) < int(dataLens[0]) {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("payload is too short")
}
d.fragmentedSize += int(dataLens[0])
if d.fragmentedSize > mpeg4audio.MaxAccessUnitSize {
d.fragments = d.fragments[:0] // discard pending fragmented packets
return nil, 0, fmt.Errorf("AU size (%d) is too big (maximum is %d)", d.fragmentedSize, mpeg4audio.MaxAccessUnitSize)
d.fragmentsSize += int(dataLens[0])
if d.fragmentsSize > mpeg4audio.MaxAccessUnitSize {
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("AU size (%d) is too big, maximum is %d", d.fragmentsSize, mpeg4audio.MaxAccessUnitSize)
}
d.fragments = append(d.fragments, payload[:dataLens[0]])
@@ -132,7 +132,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
return nil, 0, ErrMorePacketsNeeded
}
aus = [][]byte{joinFragments(d.fragments, d.fragmentedSize)}
aus = [][]byte{joinFragments(d.fragments, d.fragmentsSize)}
d.fragments = d.fragments[:0]
}

View File

@@ -132,10 +132,10 @@ func (e *Encoder) writeFragmented(au []byte, pts time.Duration) ([]*rtp.Packet,
auHeadersLenBytes++
}
auMaxSize := e.PayloadMaxSize - 2 - auHeadersLenBytes
avail := e.PayloadMaxSize - 2 - auHeadersLenBytes
le := len(au)
packetCount := le / auMaxSize
lastPacketSize := le % auMaxSize
packetCount := le / avail
lastPacketSize := le % avail
if lastPacketSize > 0 {
packetCount++
}
@@ -146,7 +146,7 @@ func (e *Encoder) writeFragmented(au []byte, pts time.Duration) ([]*rtp.Packet,
for i := range ret {
var le int
if i != (packetCount - 1) {
le = auMaxSize
le = avail
} else {
le = lastPacketSize
}

View File

@@ -29,9 +29,9 @@ func joinFragments(fragments [][]byte, size int) []byte {
// Decoder is a RTP/MPEG-4 Video decoder.
// Specification: https://datatracker.ietf.org/doc/html/rfc6416
type Decoder struct {
timeDecoder *rtptime.Decoder
fragments [][]byte
fragmentedSize int
timeDecoder *rtptime.Decoder
fragments [][]byte
fragmentsSize int
}
// Init initializes the decoder.
@@ -47,15 +47,15 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
if pkt.Marker {
frame = pkt.Payload
} else {
d.fragmentedSize = len(pkt.Payload)
d.fragmentsSize = len(pkt.Payload)
d.fragments = append(d.fragments, pkt.Payload)
return nil, 0, ErrMorePacketsNeeded
}
} else {
d.fragmentedSize += len(pkt.Payload)
if d.fragmentedSize > maxFrameSize {
d.fragments = d.fragments[:0] // discard pending fragmented packets
return nil, 0, fmt.Errorf("frame size (%d) is too big (maximum is %d)", d.fragmentedSize, maxFrameSize)
d.fragmentsSize += len(pkt.Payload)
if d.fragmentsSize > maxFrameSize {
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("frame size (%d) is too big, maximum is %d", d.fragmentsSize, maxFrameSize)
}
d.fragments = append(d.fragments, pkt.Payload)
@@ -64,7 +64,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
return nil, 0, ErrMorePacketsNeeded
}
frame = joinFragments(d.fragments, d.fragmentedSize)
frame = joinFragments(d.fragments, d.fragmentsSize)
d.fragments = d.fragments[:0]
}

View File

@@ -69,10 +69,10 @@ func (e *Encoder) Init() {
// Encode encodes a frame into RTP packets.
func (e *Encoder) Encode(frame []byte, pts time.Duration) ([]*rtp.Packet, error) {
availPerPacket := e.PayloadMaxSize
avail := e.PayloadMaxSize
le := len(frame)
packetCount := le / availPerPacket
lastPacketSize := le % availPerPacket
packetCount := le / avail
lastPacketSize := le % avail
if lastPacketSize > 0 {
packetCount++
}
@@ -84,7 +84,7 @@ func (e *Encoder) Encode(frame []byte, pts time.Duration) ([]*rtp.Packet, error)
for i := range ret {
var le int
if i != (packetCount - 1) {
le = availPerPacket
le = avail
} else {
le = lastPacketSize
}

View File

@@ -48,19 +48,19 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
var vpkt codecs.VP8Packet
_, err := vpkt.Unmarshal(pkt.Payload)
if err != nil {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, err
}
if vpkt.PID != 0 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, fmt.Errorf("packets containing single partitions are not supported")
}
var frame []byte
if vpkt.S == 1 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
d.firstPacketReceived = true
if !pkt.Marker {

View File

@@ -48,14 +48,14 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
var vpkt codecs.VP9Packet
_, err := vpkt.Unmarshal(pkt.Payload)
if err != nil {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
return nil, 0, err
}
var frame []byte
if vpkt.B {
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.fragments = d.fragments[:0] // discard pending fragments
d.firstPacketReceived = true
if !vpkt.E {

View File

@@ -1,4 +1,4 @@
package formats
package formats //nolint:dupl
import (
"fmt"