rtpaac: add decoder, add tests

This commit is contained in:
aler9
2021-03-07 16:53:53 +01:00
parent 54a4859473
commit 9ea2f797ea
12 changed files with 197 additions and 40 deletions

View File

@@ -1,4 +1,4 @@
// Package multibuffer implements a buffer with multiple levels. // Package multibuffer contains a buffer with multiple levels.
package multibuffer package multibuffer
// MultiBuffer implements software multi buffering, that allows to reuse // MultiBuffer implements software multi buffering, that allows to reuse

View File

@@ -1,3 +1,4 @@
// Package ringbuffer contains a ring buffer.
package ringbuffer package ringbuffer
import ( import (

View File

@@ -1,4 +1,4 @@
// Package rtcpreceiver implements a utility to generate RTCP receiver reports. // Package rtcpreceiver contains a utility to generate RTCP receiver reports.
package rtcpreceiver package rtcpreceiver
import ( import (

View File

@@ -1,4 +1,4 @@
// Package rtcpsender implements a utility to generate RTCP sender reports. // Package rtcpsender contains a utility to generate RTCP sender reports.
package rtcpsender package rtcpsender
import ( import (

45
pkg/rtpaac/decoder.go Normal file
View File

@@ -0,0 +1,45 @@
package rtpaac
import (
"fmt"
"time"
"github.com/pion/rtp"
)
// Decoder is a RTP/AAC decoder.
type Decoder struct {
clockRate time.Duration
initialTs uint32
initialTsSet bool
}
// NewDecoder allocates a Decoder.
func NewDecoder(clockRate int) *Decoder {
return &Decoder{
clockRate: time.Duration(clockRate),
}
}
// Decode decodes an AU from an RTP/AAC packet.
func (d *Decoder) Decode(byts []byte) (*AUAndTimestamp, error) {
pkt := rtp.Packet{}
err := pkt.Unmarshal(byts)
if err != nil {
return nil, err
}
if !d.initialTsSet {
d.initialTsSet = true
d.initialTs = pkt.Timestamp
}
if pkt.Payload[0] != 0x00 || pkt.Payload[1] != 0x10 {
return nil, fmt.Errorf("invalid payload")
}
return &AUAndTimestamp{
AU: pkt.Payload[4:],
Timestamp: time.Duration(pkt.Timestamp-d.initialTs) * time.Second / d.clockRate,
}, nil
}

View File

@@ -5,7 +5,6 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"math/rand" "math/rand"
"time"
"github.com/pion/rtp" "github.com/pion/rtp"
) )
@@ -22,39 +21,55 @@ type Encoder struct {
sequenceNumber uint16 sequenceNumber uint16
ssrc uint32 ssrc uint32
initialTs uint32 initialTs uint32
started time.Duration
} }
// NewEncoder allocates an Encoder. // NewEncoder allocates an Encoder.
func NewEncoder(payloadType uint8, clockRate int) (*Encoder, error) { func NewEncoder(payloadType uint8,
clockRate int,
sequenceNumber *uint16,
ssrc *uint32,
initialTs *uint32) *Encoder {
return &Encoder{ return &Encoder{
payloadType: payloadType, payloadType: payloadType,
clockRate: float64(clockRate), clockRate: float64(clockRate),
sequenceNumber: uint16(rand.Uint32()), sequenceNumber: func() uint16 {
ssrc: rand.Uint32(), if sequenceNumber != nil {
initialTs: rand.Uint32(), return *sequenceNumber
}, nil }
return uint16(rand.Uint32())
}(),
ssrc: func() uint32 {
if ssrc != nil {
return *ssrc
}
return rand.Uint32()
}(),
initialTs: func() uint32 {
if initialTs != nil {
return *initialTs
}
return rand.Uint32()
}(),
}
} }
// Write encodes an AAC frame into RTP/AAC packets. // Encode encodes an AU into an RTP/AAC packet.
func (e *Encoder) Write(ts time.Duration, data []byte) ([][]byte, error) { func (e *Encoder) Encode(at *AUAndTimestamp) ([]byte, error) {
if e.started == 0 { if len(at.AU) > rtpPayloadMaxSize {
e.started = ts
}
if len(data) > rtpPayloadMaxSize {
return nil, fmt.Errorf("data is too big") return nil, fmt.Errorf("data is too big")
} }
rtpTs := e.initialTs + uint32((ts-e.started).Seconds()*e.clockRate) rtpTs := e.initialTs + uint32((at.Timestamp).Seconds()*e.clockRate)
payload := []byte{0x00, 0x10}
// 13 bits payload size // 13 bits payload size
// 3 bits AU-Index(-delta) // 3 bits AU-Index(-delta)
header := make([]byte, 2) header := make([]byte, 2)
binary.BigEndian.PutUint16(header, uint16(len(data))<<3) binary.BigEndian.PutUint16(header, uint16(len(at.AU))<<3)
payload = append(payload, header...)
payload := append([]byte{0x00, 0x10}, header...) payload = append(payload, at.AU...)
payload = append(payload, data...)
rpkt := rtp.Packet{ rpkt := rtp.Packet{
Header: rtp.Header{ Header: rtp.Header{
@@ -74,5 +89,5 @@ func (e *Encoder) Write(ts time.Duration, data []byte) ([][]byte, error) {
return nil, err return nil, err
} }
return [][]byte{frame}, nil return frame, nil
} }

12
pkg/rtpaac/rtpaac.go Normal file
View File

@@ -0,0 +1,12 @@
// Package rtpaac contains a RTP/AAC decoder and encoder.
package rtpaac
import (
"time"
)
// AUAndTimestamp is an Access Unit and its timestamp.
type AUAndTimestamp struct {
Timestamp time.Duration
AU []byte
}

80
pkg/rtpaac/rtpaac_test.go Normal file
View File

@@ -0,0 +1,80 @@
package rtpaac
import (
"bytes"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
dec *AUAndTimestamp
enc []byte
}{
{
"single",
&AUAndTimestamp{
Timestamp: 25 * time.Millisecond,
AU: bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8),
},
mergeBytes(
[]byte{
0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x6b, 0x05,
0x9d, 0xbb, 0x78, 0x12, 0x00, 0x10, 0x02, 0x00,
},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8),
),
},
}
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
sequenceNumber := uint16(0x44ed)
ssrc := uint32(0x9dbb7812)
initialTs := uint32(0x88776655)
e := NewEncoder(96, 48000, &sequenceNumber, &ssrc, &initialTs)
enc, err := e.Encode(ca.dec)
require.NoError(t, err)
require.Equal(t, ca.enc, enc)
})
}
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := NewDecoder(48000)
// send an initial packet downstream
// in order to correctly compute the timestamp
_, err := d.Decode([]byte{
0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x00, 0x10, 0x00, 0x08, 0x0,
})
require.NoError(t, err)
dec, err := d.Decode(ca.enc)
require.NoError(t, err)
require.Equal(t, ca.dec, dec)
})
}
}

View File

@@ -33,13 +33,13 @@ const (
// Decoder is a RTP/H264 decoder. // Decoder is a RTP/H264 decoder.
type Decoder struct { type Decoder struct {
state decoderState
initialTs uint32 initialTs uint32
initialTsSet bool initialTsSet bool
state decoderState
fragmentedBuf []byte fragmentedBuf []byte
} }
// NewDecoder creates a decoder around a Reader. // NewDecoder allocates a Decoder.
func NewDecoder() *Decoder { func NewDecoder() *Decoder {
return &Decoder{} return &Decoder{}
} }

View File

@@ -21,8 +21,10 @@ type Encoder struct {
} }
// NewEncoder allocates an Encoder. // NewEncoder allocates an Encoder.
func NewEncoder(payloadType uint8, sequenceNumber *uint16, func NewEncoder(payloadType uint8,
ssrc *uint32, initialTs *uint32) *Encoder { sequenceNumber *uint16,
ssrc *uint32,
initialTs *uint32) *Encoder {
return &Encoder{ return &Encoder{
payloadType: payloadType, payloadType: payloadType,
sequenceNumber: func() uint16 { sequenceNumber: func() uint16 {

View File

@@ -5,7 +5,7 @@ import (
"time" "time"
) )
// NALUAndTimestamp is a NALU and an associated timestamp. // NALUAndTimestamp is a Network Abstraction Layer Unit and its timestamp.
type NALUAndTimestamp struct { type NALUAndTimestamp struct {
Timestamp time.Duration Timestamp time.Duration
NALU []byte NALU []byte

View File

@@ -2,6 +2,7 @@ package rtph264
import ( import (
"bytes" "bytes"
"io"
"testing" "testing"
"time" "time"
@@ -104,30 +105,31 @@ func TestDecode(t *testing.T) {
t.Run(ca.name, func(t *testing.T) { t.Run(ca.name, func(t *testing.T) {
i := 0 i := 0
r := readerFunc(func(p []byte) (int, error) { r := readerFunc(func(p []byte) (int, error) {
if i == 0 { if i == len(ca.enc) {
// send an initial packet downstream return 0, io.EOF
// in order to correctly compute the timestamp
n := copy(p, []byte{
0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x06, 0x00,
})
i++
return n, nil
} }
n := copy(p, ca.enc[i-1]) n := copy(p, ca.enc[i])
i++ i++
return n, nil return n, nil
}) })
d := NewDecoder() d := NewDecoder()
_, err := d.Read(r) // send an initial packet downstream
// in order to correctly compute the timestamp
_, err := d.Decode([]byte{
0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55,
0x9d, 0xbb, 0x78, 0x12, 0x06, 0x00,
})
require.NoError(t, err) require.NoError(t, err)
dec, err := d.Read(r) dec, err := d.Read(r)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, ca.dec, dec) require.Equal(t, ca.dec, dec)
_, err = d.Read(r)
require.Equal(t, io.EOF, err)
}) })
} }
} }