move RTP decoders/encoders into pkt/rtpcodecs

This commit is contained in:
aler9
2022-11-15 23:08:36 +01:00
parent 9795e9175a
commit c2c0230669
49 changed files with 14 additions and 15 deletions

View File

@@ -0,0 +1,2 @@
// Package rtpcodecs contains utilities to convert codec-specific elements from/to RTP packets.
package rtpcodecs

View File

@@ -0,0 +1,227 @@
package rtph264
import (
"bytes"
"errors"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// ErrMorePacketsNeeded is returned when more packets are needed.
var ErrMorePacketsNeeded = errors.New("need more packets")
// ErrNonStartingPacketAndNoPrevious is returned when we received a non-starting
// packet of a fragmented NALU and we didn't received anything before.
// It's normal to receive this when we are decoding a stream that has been already
// running for some time.
var ErrNonStartingPacketAndNoPrevious = errors.New(
"received a non-starting FU-A packet without any previous FU-A starting packet")
// Decoder is a RTP/H264 decoder.
type Decoder struct {
// indicates the packetization mode.
PacketizationMode int
timeDecoder *rtptimedec.Decoder
firstPacketReceived bool
fragmentedSize int
fragments [][]byte
firstNALUParsed bool
annexBMode bool
// for DecodeUntilMarker()
naluBuffer [][]byte
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(rtpClockRate)
}
// Decode decodes NALUs from a RTP/H264 packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
if d.PacketizationMode >= 2 {
return nil, 0, fmt.Errorf("PacketizationMode >= 2 is not supported")
}
if len(pkt.Payload) < 1 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
return nil, 0, fmt.Errorf("payload is too short")
}
typ := naluType(pkt.Payload[0] & 0x1F)
var nalus [][]byte
switch typ {
case naluTypeFUA:
if len(pkt.Payload) < 2 {
return nil, 0, fmt.Errorf("invalid FU-A packet (invalid size)")
}
start := pkt.Payload[1] >> 7
end := (pkt.Payload[1] >> 6) & 0x01
if start == 1 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
if end != 0 {
return nil, 0, fmt.Errorf("invalid FU-A packet (can't contain both a start and end bit)")
}
nri := (pkt.Payload[0] >> 5) & 0x03
typ := pkt.Payload[1] & 0x1F
d.fragmentedSize = len(pkt.Payload[1:])
d.fragments = append(d.fragments, []byte{(nri << 5) | typ}, pkt.Payload[2:])
d.firstPacketReceived = true
return nil, 0, ErrMorePacketsNeeded
}
if len(d.fragments) == 0 {
if !d.firstPacketReceived {
return nil, 0, ErrNonStartingPacketAndNoPrevious
}
return nil, 0, fmt.Errorf("invalid FU-A packet (non-starting)")
}
d.fragmentedSize += len(pkt.Payload[2:])
if d.fragmentedSize > h264.MaxNALUSize {
d.fragments = d.fragments[:0]
return nil, 0, fmt.Errorf("NALU size (%d) is too big (maximum is %d)", d.fragmentedSize, h264.MaxNALUSize)
}
d.fragments = append(d.fragments, pkt.Payload[2:])
if end != 1 {
return nil, 0, ErrMorePacketsNeeded
}
nalu := make([]byte, d.fragmentedSize)
pos := 0
for _, frag := range d.fragments {
pos += copy(nalu[pos:], frag)
}
d.fragments = d.fragments[:0]
nalus = [][]byte{nalu}
case naluTypeSTAPA:
d.fragments = d.fragments[:0] // discard pending fragmented packets
payload := pkt.Payload[1:]
for len(payload) > 0 {
if len(payload) < 2 {
return nil, 0, fmt.Errorf("invalid STAP-A packet (invalid size)")
}
size := uint16(payload[0])<<8 | uint16(payload[1])
payload = payload[2:]
// avoid final padding
if size == 0 {
break
}
if int(size) > len(payload) {
return nil, 0, fmt.Errorf("invalid STAP-A packet (invalid size)")
}
nalus = append(nalus, payload[:size])
payload = payload[size:]
}
if nalus == nil {
return nil, 0, fmt.Errorf("STAP-A packet doesn't contain any NALU")
}
d.firstPacketReceived = true
case naluTypeSTAPB, naluTypeMTAP16,
naluTypeMTAP24, naluTypeFUB:
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.firstPacketReceived = true
return nil, 0, fmt.Errorf("packet type not supported (%v)", typ)
default:
d.fragments = d.fragments[:0] // discard pending fragmented packets
d.firstPacketReceived = true
nalus = [][]byte{pkt.Payload}
}
nalus, err := d.removeAnnexB(nalus)
if err != nil {
return nil, 0, err
}
return nalus, d.timeDecoder.Decode(pkt.Timestamp), nil
}
// DecodeUntilMarker decodes NALUs from a RTP/H264 packet and puts them in a buffer.
// When a packet has the marker flag (meaning that all the NALUs with the same PTS have
// been received), the buffer is returned.
func (d *Decoder) DecodeUntilMarker(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
nalus, pts, err := d.Decode(pkt)
if err != nil {
return nil, 0, err
}
if (len(d.naluBuffer) + len(nalus)) > h264.MaxNALUsPerGroup {
return nil, 0, fmt.Errorf("number of NALUs contained inside a single group (%d) is too big (maximum is %d)",
len(d.naluBuffer)+len(nalus), h264.MaxNALUsPerGroup)
}
d.naluBuffer = append(d.naluBuffer, nalus...)
if !pkt.Marker {
return nil, 0, ErrMorePacketsNeeded
}
ret := d.naluBuffer
d.naluBuffer = d.naluBuffer[:0]
return ret, pts, nil
}
func (d *Decoder) removeAnnexB(nalus [][]byte) ([][]byte, error) {
// some cameras / servers wrap NALUs into Annex-B
if !d.firstNALUParsed {
d.firstNALUParsed = true
if len(nalus) == 1 {
nalu := nalus[0]
i := bytes.Index(nalu, []byte{0x00, 0x00, 0x00, 0x01})
if i >= 0 {
d.annexBMode = true
if !bytes.HasPrefix(nalu, []byte{0x00, 0x00, 0x00, 0x01}) {
nalu = append([]byte{0x00, 0x00, 0x00, 0x01}, nalu...)
}
return h264.AnnexBUnmarshal(nalu)
}
}
} else if d.annexBMode {
if len(nalus) != 1 {
return nil, fmt.Errorf("multiple NALUs in Annex-B mode are not supported")
}
nalu := nalus[0]
if !bytes.HasPrefix(nalu, []byte{0x00, 0x00, 0x00, 0x01}) {
nalu = append([]byte{0x00, 0x00, 0x00, 0x01}, nalu...)
}
return h264.AnnexBUnmarshal(nalu)
}
return nalus, nil
}

View File

@@ -0,0 +1,589 @@
package rtph264
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
nalus [][]byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
[][]byte{
mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8),
),
},
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289528607,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8),
),
},
},
},
{
"fragmented",
[][]byte{
mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 512),
),
},
55 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x1c, 0x85},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
[]byte{0x00, 0x01},
),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x1c, 0x05},
[]byte{0x02, 0x03, 0x04, 0x05, 0x06, 0x07},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 181),
[]byte{0x00, 0x01, 0x02, 0x03},
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x1c, 0x45},
[]byte{0x04, 0x05, 0x06, 0x07},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 147),
),
},
},
},
{
"aggregated",
[][]byte{
{0x09, 0xF0},
{
0x41, 0x9a, 0x24, 0x6c, 0x41, 0x4f, 0xfe, 0xd6,
0x8c, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x6d, 0x40,
},
},
0,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x18, 0x00, 0x02, 0x09,
0xf0, 0x00, 0x44, 0x41, 0x9a, 0x24, 0x6c, 0x41,
0x4f, 0xfe, 0xd6, 0x8c, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x6d, 0x40,
},
},
},
},
{
"aggregated followed by single",
[][]byte{
{0x09, 0xF0},
{
0x41, 0x9a, 0x24, 0x6c, 0x41, 0x4f, 0xfe, 0xd6,
0x8c, 0xb0, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x6d, 0x40,
},
mergeBytes(
[]byte{0x08},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 175),
),
},
0,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x18, 0x00, 0x02, 0x09,
0xf0, 0x00, 0x44, 0x41, 0x9a, 0x24, 0x6c, 0x41,
0x4f, 0xfe, 0xd6, 0x8c, 0xb0, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00,
0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00,
0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03,
0x00, 0x00, 0x03, 0x00, 0x00, 0x6d, 0x40,
},
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x08},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 175),
),
},
},
},
{
"fragmented followed by aggregated",
[][]byte{
mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 256),
),
{0x09, 0xF0},
{0x09, 0xF0},
},
0,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x1c, 0x85},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
[]byte{0x00, 0x01},
),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x1c, 0x45},
[]byte{0x02, 0x03, 0x04, 0x05, 0x06, 0x07},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 73),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x18, 0x00, 0x02, 0x09,
0xf0, 0x00, 0x02, 0x09, 0xf0,
},
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x06, 0x00},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var nalus [][]byte
for _, pkt := range ca.pkts {
clone := pkt.Clone()
addNALUs, pts, err := d.Decode(pkt)
if err == ErrMorePacketsNeeded {
continue
}
require.NoError(t, err)
require.Equal(t, ca.pts, pts)
nalus = append(nalus, addNALUs...)
// test input integrity
require.Equal(t, clone, pkt)
}
require.Equal(t, ca.nalus, nalus)
})
}
}
func TestDecodeCorruptedFragment(t *testing.T) {
d := &Decoder{}
d.Init()
_, _, err := d.Decode(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{
0x1c, 0x85,
},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
[]byte{0x00, 0x01},
),
})
require.Equal(t, ErrMorePacketsNeeded, err)
nalus, _, err := d.Decode(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x01, 0x00},
})
require.NoError(t, err)
require.Equal(t, [][]byte{{0x01, 0x00}}, nalus)
}
func TestDecodeSTAPAWithPadding(t *testing.T) {
d := &Decoder{}
d.Init()
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x18, 0x00, 0x02, 0xaa,
0xbb, 0x00, 0x02, 0xcc, 0xdd, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
}
nalus, _, err := d.Decode(&pkt)
require.NoError(t, err)
require.Equal(t, [][]byte{
{0xaa, 0xbb},
{0xcc, 0xdd},
}, nalus)
}
func TestDecodeAnnexB(t *testing.T) {
d := &Decoder{}
d.Init()
for i := 0; i < 2; i++ {
nalus, _, err := d.Decode(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x00, 0x00, 0x00, 0x01},
[]byte{0x01, 0x02, 0x03, 0x04},
[]byte{0x00, 0x00, 0x00, 0x01},
[]byte{0x01, 0x02, 0x03, 0x04},
),
})
require.NoError(t, err)
require.Equal(t, [][]byte{
{0x01, 0x02, 0x03, 0x04},
{0x01, 0x02, 0x03, 0x04},
}, nalus)
}
}
func TestDecodeErrors(t *testing.T) {
for _, ca := range []struct {
name string
pkts []*rtp.Packet
err string
}{
{
"missing payload",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
},
},
"payload is too short",
},
{
"STAP-A without NALUs",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x18},
},
},
"STAP-A packet doesn't contain any NALU",
},
{
"STAP-A without size",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x18, 0x01},
},
},
"invalid STAP-A packet (invalid size)",
},
{
"STAP-A with invalid size",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x18, 0x00, 0x15},
},
},
"invalid STAP-A packet (invalid size)",
},
{
"FU-A without payload",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x1c},
},
},
"invalid FU-A packet (invalid size)",
},
{
"FU-A with start and end bit",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x1c, 0b11000000},
},
},
"invalid FU-A packet (can't contain both a start and end bit)",
},
{
"FU-A non-starting",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x05},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x1c, 0b01000000},
},
},
"invalid FU-A packet (non-starting)",
},
{
"MTAP",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x1a},
},
},
"packet type not supported (MTAP-16)",
},
} {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
d.Init()
var lastErr error
for _, pkt := range ca.pkts {
_, _, lastErr = d.Decode(pkt)
}
require.EqualError(t, lastErr, ca.err)
})
}
}

View File

@@ -0,0 +1,249 @@
package rtph264
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/H264 encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
PacketizationMode int
sequenceNumber uint16
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*rtpClockRate)
}
// Encode encodes NALUs into RTP/H264 packets.
func (e *Encoder) Encode(nalus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
if e.PacketizationMode >= 2 {
return nil, fmt.Errorf("PacketizationMode >= 2 is not supported")
}
var rets []*rtp.Packet
var batch [][]byte
// split NALUs into batches
for _, nalu := range nalus {
if e.lenAggregated(batch, nalu) <= e.PayloadMaxSize {
// add to existing batch
batch = append(batch, nalu)
} else {
// write batch
if batch != nil {
pkts, err := e.writeBatch(batch, pts, false)
if err != nil {
return nil, err
}
rets = append(rets, pkts...)
}
// initialize new batch
batch = [][]byte{nalu}
}
}
// write final batch
// marker is used to indicate when all NALUs with same PTS have been sent
pkts, err := e.writeBatch(batch, pts, true)
if err != nil {
return nil, err
}
rets = append(rets, pkts...)
return rets, nil
}
func (e *Encoder) writeBatch(nalus [][]byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
if len(nalus) == 1 {
// the NALU fits into a single RTP packet
if len(nalus[0]) < e.PayloadMaxSize {
return e.writeSingle(nalus[0], pts, marker)
}
// split the NALU into multiple fragmentation packet
return e.writeFragmented(nalus[0], pts, marker)
}
return e.writeAggregated(nalus, pts, marker)
}
func (e *Encoder) writeSingle(nalu []byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: marker,
},
Payload: nalu,
}
e.sequenceNumber++
return []*rtp.Packet{pkt}, nil
}
func (e *Encoder) writeFragmented(nalu []byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
// use only FU-A, not FU-B, since we always use non-interleaved mode
// (packetization-mode=1)
packetCount := (len(nalu) - 1) / (e.PayloadMaxSize - 2)
lastPacketSize := (len(nalu) - 1) % (e.PayloadMaxSize - 2)
if lastPacketSize > 0 {
packetCount++
}
ret := make([]*rtp.Packet, packetCount)
encPTS := e.encodeTimestamp(pts)
nri := (nalu[0] >> 5) & 0x03
typ := nalu[0] & 0x1F
nalu = nalu[1:] // remove header
for i := range ret {
indicator := (nri << 5) | uint8(naluTypeFUA)
start := uint8(0)
if i == 0 {
start = 1
}
end := uint8(0)
le := e.PayloadMaxSize - 2
if i == (packetCount - 1) {
end = 1
le = lastPacketSize
}
header := (start << 7) | (end << 6) | typ
data := make([]byte, 2+le)
data[0] = indicator
data[1] = header
copy(data[2:], nalu[:le])
nalu = nalu[le:]
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: encPTS,
SSRC: *e.SSRC,
Marker: (i == (packetCount-1) && marker),
},
Payload: data,
}
e.sequenceNumber++
}
return ret, nil
}
func (e *Encoder) lenAggregated(nalus [][]byte, addNALU []byte) int {
ret := 1 // header
for _, nalu := range nalus {
ret += 2 // size
ret += len(nalu) // nalu
}
if addNALU != nil {
ret += 2 // size
ret += len(addNALU) // nalu
}
return ret
}
func (e *Encoder) writeAggregated(nalus [][]byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
payload := make([]byte, e.lenAggregated(nalus, nil))
// header
payload[0] = uint8(naluTypeSTAPA)
pos := 1
for _, nalu := range nalus {
// size
naluLen := len(nalu)
payload[pos] = uint8(naluLen >> 8)
payload[pos+1] = uint8(naluLen)
pos += 2
// nalu
copy(payload[pos:], nalu)
pos += naluLen
}
pkt := &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: marker,
},
Payload: payload,
}
e.sequenceNumber++
return []*rtp.Packet{pkt}, nil
}

View File

@@ -0,0 +1,44 @@
package rtph264
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
}
e.Init()
pkts, err := e.Encode(ca.nalus, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

View File

@@ -0,0 +1,43 @@
package rtph264
import (
"fmt"
"strings"
"github.com/aler9/gortsplib/pkg/h264"
)
type naluType h264.NALUType
// additional NALU types for RTP/H264.
const (
naluTypeSTAPA naluType = 24
naluTypeSTAPB naluType = 25
naluTypeMTAP16 naluType = 26
naluTypeMTAP24 naluType = 27
naluTypeFUA naluType = 28
naluTypeFUB naluType = 29
)
var naluLabels = map[naluType]string{
naluTypeSTAPA: "STAP-A",
naluTypeSTAPB: "STAP-B",
naluTypeMTAP16: "MTAP-16",
naluTypeMTAP24: "MTAP-24",
naluTypeFUA: "FU-A",
naluTypeFUB: "FU-B",
}
// String implements fmt.Stringer.
func (nt naluType) String() string {
p := h264.NALUType(nt).String()
if !strings.HasPrefix(p, "unknown") {
return p
}
if l, ok := naluLabels[nt]; ok {
return l
}
return fmt.Sprintf("unknown (%d)", nt)
}

View File

@@ -0,0 +1,14 @@
package rtph264
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestNALUType(t *testing.T) {
require.NotEqual(t, true, strings.HasPrefix(naluType(10).String(), "unknown"))
require.NotEqual(t, true, strings.HasPrefix(naluType(26).String(), "unknown"))
require.Equal(t, true, strings.HasPrefix(naluType(50).String(), "unknown"))
}

View File

@@ -0,0 +1,6 @@
// Package rtph264 contains a RTP/H264 decoder and encoder.
package rtph264
const (
rtpClockRate = 90000 // H264 always uses 90khz
)

View File

@@ -0,0 +1,134 @@
package rtph265
import (
"errors"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
const (
maxNALUSize = 3 * 1024 * 1024
)
// ErrMorePacketsNeeded is returned when more packets are needed.
var ErrMorePacketsNeeded = errors.New("need more packets")
// Decoder is a RTP/H265 decoder.
type Decoder struct {
// indicates that NALUs have an additional field that specifies the decoding order.
MaxDONDiff int
timeDecoder *rtptimedec.Decoder
fragmentedSize int
fragments [][]byte
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(rtpClockRate)
}
// Decode decodes NALUs from a RTP/H265 packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
if d.MaxDONDiff != 0 {
return nil, 0, fmt.Errorf("MaxDONDiff != 0 is not supported (yet)")
}
if len(pkt.Payload) < 2 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
return nil, 0, fmt.Errorf("payload is too short")
}
typ := (pkt.Payload[0] >> 1) & 0b111111
var nalus [][]byte
switch typ {
case 48: // aggregation unit
d.fragments = d.fragments[:0] // discard pending fragmented packets
payload := pkt.Payload[2:]
for len(payload) > 0 {
if len(payload) < 2 {
return nil, 0, fmt.Errorf("invalid aggregation unit (invalid size)")
}
size := uint16(payload[0])<<8 | uint16(payload[1])
payload = payload[2:]
if size == 0 {
break
}
if int(size) > len(payload) {
return nil, 0, fmt.Errorf("invalid aggregation unit (invalid size)")
}
nalus = append(nalus, payload[:size])
payload = payload[size:]
}
case 49: // fragmentation unit
if len(pkt.Payload) < 3 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
return nil, 0, fmt.Errorf("payload is too short")
}
start := pkt.Payload[2] >> 7
end := (pkt.Payload[2] >> 6) & 0x01
if start == 1 {
d.fragments = d.fragments[:0] // discard pending fragmented packets
if end != 0 {
return nil, 0, fmt.Errorf("invalid fragmentation unit (can't contain both a start and end bit)")
}
typ := pkt.Payload[2] & 0b111111
head := uint16(pkt.Payload[0]&0b10000001)<<8 | uint16(typ)<<9 | uint16(pkt.Payload[1])
d.fragmentedSize = len(pkt.Payload[1:])
d.fragments = append(d.fragments, []byte{byte(head >> 8), byte(head)}, pkt.Payload[3:])
return nil, 0, ErrMorePacketsNeeded
}
if len(d.fragments) == 0 {
return nil, 0, fmt.Errorf("invalid fragmentation unit (non-starting)")
}
d.fragmentedSize += len(pkt.Payload[3:])
if d.fragmentedSize > maxNALUSize {
d.fragments = d.fragments[:0]
return nil, 0, fmt.Errorf("NALU size (%d) is too big (maximum is %d)", d.fragmentedSize, maxNALUSize)
}
d.fragments = append(d.fragments, pkt.Payload[3:])
if end != 1 {
return nil, 0, ErrMorePacketsNeeded
}
nalu := make([]byte, d.fragmentedSize)
pos := 0
for _, frag := range d.fragments {
pos += copy(nalu[pos:], frag)
}
d.fragments = d.fragments[:0]
nalus = [][]byte{nalu}
case 50: // PACI
d.fragments = d.fragments[:0] // discard pending fragmented packets
return nil, 0, fmt.Errorf("PACI packets are not supported (yet)")
default:
d.fragments = d.fragments[:0] // discard pending fragmented packets
nalus = [][]byte{pkt.Payload}
}
return nalus, d.timeDecoder.Decode(pkt.Timestamp), nil
}

View File

@@ -0,0 +1,161 @@
package rtph265
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
nalus [][]byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
[][]byte{{0x01, 0x02, 0x03, 0x04, 0x05}},
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289528607,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04, 0x05},
},
},
},
{
"aggregated",
[][]byte{
{0x07, 0x07},
{0x08, 0x08},
{0x09, 0x09},
},
0,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x60, 0x00, 0x00, 0x02, 0x07, 0x07, 0x00, 0x02,
0x08, 0x08, 0x00, 0x02, 0x09, 0x09,
},
},
},
},
{
"fragmented",
[][]byte{
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 512),
},
55 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x63, 0x02, 0x80, 0x03, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 363),
[]byte{0x01, 0x02, 0x03},
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x63, 0x02, 0x40, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 147),
),
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x06, 0x00},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var nalus [][]byte
for _, pkt := range ca.pkts {
clone := pkt.Clone()
addNALUs, pts, err := d.Decode(pkt)
if err == ErrMorePacketsNeeded {
continue
}
require.NoError(t, err)
require.Equal(t, ca.pts, pts)
nalus = append(nalus, addNALUs...)
// test input integrity
require.Equal(t, clone, pkt)
}
require.Equal(t, ca.nalus, nalus)
})
}
}

View File

@@ -0,0 +1,247 @@
package rtph265
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/H265 encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
// indicates that NALUs have an additional field that specifies the decoding order.
MaxDONDiff int
sequenceNumber uint16
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*rtpClockRate)
}
// Encode encodes NALUs into RTP/H265 packets.
func (e *Encoder) Encode(nalus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
if e.MaxDONDiff != 0 {
return nil, fmt.Errorf("MaxDONDiff != 0 is not supported (yet)")
}
var rets []*rtp.Packet
var batch [][]byte
// split NALUs into batches
for _, nalu := range nalus {
if e.lenAggregationUnit(batch, nalu) <= e.PayloadMaxSize {
// add to existing batch
batch = append(batch, nalu)
} else {
// write batch
if batch != nil {
pkts, err := e.writeBatch(batch, pts, false)
if err != nil {
return nil, err
}
rets = append(rets, pkts...)
}
// initialize new batch
batch = [][]byte{nalu}
}
}
// write final batch
// marker is used to indicate when all NALUs with same PTS have been sent
pkts, err := e.writeBatch(batch, pts, true)
if err != nil {
return nil, err
}
rets = append(rets, pkts...)
return rets, nil
}
func (e *Encoder) writeBatch(nalus [][]byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
if len(nalus) == 1 {
// the NALU fits into a single RTP packet
if len(nalus[0]) < e.PayloadMaxSize {
return e.writeSingle(nalus[0], pts, marker)
}
// split the NALU into multiple fragmentation packet
return e.writeFragmentationUnits(nalus[0], pts, marker)
}
return e.writeAggregationUnit(nalus, pts, marker)
}
func (e *Encoder) writeSingle(nalu []byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: marker,
},
Payload: nalu,
}
e.sequenceNumber++
return []*rtp.Packet{pkt}, nil
}
func (e *Encoder) writeFragmentationUnits(nalu []byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
n := (len(nalu) - 2) / (e.PayloadMaxSize - 3)
lastPacketSize := (len(nalu) - 2) % (e.PayloadMaxSize - 3)
if lastPacketSize > 0 {
n++
}
ret := make([]*rtp.Packet, n)
encPTS := e.encodeTimestamp(pts)
head := nalu[:2]
nalu = nalu[2:]
for i := range ret {
start := uint8(0)
if i == 0 {
start = 1
}
end := uint8(0)
le := e.PayloadMaxSize - 3
if i == (n - 1) {
end = 1
le = lastPacketSize
}
data := make([]byte, 3+le)
data[0] = head[0]&0b10000001 | 49<<1
data[1] = head[1]
data[2] = (start << 7) | (end << 6) | (head[0]>>1)&0b111111
copy(data[3:], nalu[:le])
nalu = nalu[le:]
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: encPTS,
SSRC: *e.SSRC,
Marker: (i == (n-1) && marker),
},
Payload: data,
}
e.sequenceNumber++
}
return ret, nil
}
func (e *Encoder) lenAggregationUnit(nalus [][]byte, addNALU []byte) int {
ret := 2 // header
for _, nalu := range nalus {
ret += 2 // size
ret += len(nalu) // nalu
}
if addNALU != nil {
ret += 2 // size
ret += len(addNALU) // nalu
}
return ret
}
func (e *Encoder) writeAggregationUnit(nalus [][]byte, pts time.Duration, marker bool) ([]*rtp.Packet, error) {
payload := make([]byte, e.lenAggregationUnit(nalus, nil))
// header
h := uint16(48) << 9
payload[0] = byte(h >> 8)
payload[1] = byte(h)
pos := 2
for _, nalu := range nalus {
// size
naluLen := len(nalu)
payload[pos] = uint8(naluLen >> 8)
payload[pos+1] = uint8(naluLen)
pos += 2
// nalu
copy(payload[pos:], nalu)
pos += naluLen
}
pkt := &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: marker,
},
Payload: payload,
}
e.sequenceNumber++
return []*rtp.Packet{pkt}, nil
}

View File

@@ -0,0 +1,44 @@
package rtph265
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
}
e.Init()
pkts, err := e.Encode(ca.nalus, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

View File

@@ -0,0 +1,6 @@
// Package rtph265 contains a RTP/H265 decoder and encoder.
package rtph265
const (
rtpClockRate = 90000 // H265 always uses 90khz
)

View File

@@ -0,0 +1,37 @@
package rtplpcm
import (
"fmt"
"time"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// Decoder is a RTP/LPCM decoder.
type Decoder struct {
BitDepth int
SampleRate int
ChannelCount int
timeDecoder *rtptimedec.Decoder
sampleSize int
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(d.SampleRate)
d.sampleSize = d.BitDepth * d.ChannelCount / 8
}
// Decode decodes audio samples from a RTP packet.
// It returns audio samples and PTS of the first sample.
func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
plen := len(pkt.Payload)
if (plen % d.sampleSize) != 0 {
return nil, 0, fmt.Errorf("received payload of wrong size")
}
return pkt.Payload, d.timeDecoder.Decode(pkt.Timestamp), nil
}

View File

@@ -0,0 +1,108 @@
package rtplpcm
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
var cases = []struct {
name string
samples []byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
[]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527557,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
},
},
},
{
"splitted",
bytes.Repeat([]byte{0x41, 0x42, 0x43}, 680),
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527557,
SSRC: 0x9dbb7812,
},
Payload: bytes.Repeat([]byte{0x41, 0x42, 0x43}, 486),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289527800,
SSRC: 0x9dbb7812,
},
Payload: bytes.Repeat([]byte{0x41, 0x42, 0x43}, 194),
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{
BitDepth: 24,
SampleRate: 48000,
ChannelCount: 2,
}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 0,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var samples []byte
expPTS := ca.pts
for _, pkt := range ca.pkts {
partial, pts, err := d.Decode(pkt)
require.NoError(t, err)
require.Equal(t, expPTS, pts)
samples = append(samples, partial...)
expPTS += time.Duration(len(partial)/(24*2/8)) * time.Second / 48000
}
require.Equal(t, ca.samples, samples)
})
}
}

View File

@@ -0,0 +1,123 @@
package rtplpcm
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/LPCM encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
BitDepth int
SampleRate int
ChannelCount int
sequenceNumber uint16
sampleSize int
maxPayloadSize int
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
e.sampleSize = e.BitDepth * e.ChannelCount / 8
e.maxPayloadSize = (e.PayloadMaxSize / e.sampleSize) * e.sampleSize
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*float64(e.SampleRate))
}
// Encode encodes audio samples into RTP packets.
func (e *Encoder) Encode(samples []byte, pts time.Duration) ([]*rtp.Packet, error) {
slen := len(samples)
if (slen % e.sampleSize) != 0 {
return nil, fmt.Errorf("invalid samples")
}
n := (slen / e.maxPayloadSize)
if (slen % e.maxPayloadSize) != 0 {
n++
}
ret := make([]*rtp.Packet, n)
i := 0
pos := 0
payloadSize := e.maxPayloadSize
for {
if payloadSize > len(samples[pos:]) {
payloadSize = len(samples[pos:])
}
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: false,
},
Payload: samples[pos : pos+payloadSize],
}
e.sequenceNumber++
i++
pos += payloadSize
pts += time.Duration(payloadSize/e.sampleSize) * time.Second / time.Duration(e.SampleRate)
if pos == slen {
break
}
}
return ret, nil
}

View File

@@ -0,0 +1,50 @@
package rtplpcm
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
BitDepth: 24,
SampleRate: 48000,
ChannelCount: 2,
}
e.Init()
pkts, err := e.Encode(ca.samples, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
BitDepth: 24,
SampleRate: 48000,
ChannelCount: 2,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

View File

@@ -0,0 +1,2 @@
// Package rtplpcm contains a RTP/LPCM decoder and encoder.
package rtplpcm

View File

@@ -0,0 +1,238 @@
package rtpmpeg4audio
import (
"errors"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/bits"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// ErrMorePacketsNeeded is returned when more packets are needed.
var ErrMorePacketsNeeded = errors.New("need more packets")
// Decoder is a RTP/MPEG4-audio decoder.
type Decoder struct {
// sample rate of input packets.
SampleRate int
// The number of bits on which the AU-size field is encoded in the AU-header.
SizeLength int
// The number of bits on which the AU-Index is encoded in the first AU-header.
IndexLength int
// The number of bits on which the AU-Index-delta field is encoded in any non-first AU-header.
IndexDeltaLength int
timeDecoder *rtptimedec.Decoder
firstAUParsed bool
adtsMode bool
fragments [][]byte
fragmentedSize int
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(d.SampleRate)
}
// Decode decodes AUs from a RTP/MPEG4-audio packet.
// It returns the AUs and the PTS of the first AU.
// The PTS of subsequent AUs can be calculated by adding time.Second*mpeg4audio.SamplesPerAccessUnit/clockRate.
func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
if len(pkt.Payload) < 2 {
d.fragments = d.fragments[:0]
return nil, 0, fmt.Errorf("payload is too short")
}
// AU-headers-length (16 bits)
headersLen := int(uint16(pkt.Payload[0])<<8 | uint16(pkt.Payload[1]))
if headersLen == 0 {
return nil, 0, fmt.Errorf("invalid AU-headers-length")
}
payload := pkt.Payload[2:]
// AU-headers
dataLens, err := d.readAUHeaders(payload, headersLen)
if err != nil {
return nil, 0, err
}
pos := (headersLen / 8)
if (headersLen % 8) != 0 {
pos++
}
payload = payload[pos:]
if len(d.fragments) == 0 {
if pkt.Header.Marker {
// AUs
aus := make([][]byte, len(dataLens))
for i, dataLen := range dataLens {
if len(payload) < int(dataLen) {
return nil, 0, fmt.Errorf("payload is too short")
}
aus[i] = payload[:dataLen]
payload = payload[dataLen:]
}
aus, err = d.removeADTS(aus)
if err != nil {
return nil, 0, err
}
return aus, d.timeDecoder.Decode(pkt.Timestamp), nil
}
if len(dataLens) != 1 {
return nil, 0, fmt.Errorf("a fragmented packet can only contain one AU")
}
if len(payload) < int(dataLens[0]) {
return nil, 0, fmt.Errorf("payload is too short")
}
d.fragmentedSize = int(dataLens[0])
d.fragments = append(d.fragments, payload[:dataLens[0]])
return nil, 0, ErrMorePacketsNeeded
}
// we are decoding a fragmented AU
if len(dataLens) != 1 {
d.fragments = d.fragments[:0]
return nil, 0, fmt.Errorf("a fragmented packet can only contain one AU")
}
if len(payload) < int(dataLens[0]) {
return nil, 0, fmt.Errorf("payload is too short")
}
d.fragmentedSize += int(dataLens[0])
if d.fragmentedSize > mpeg4audio.MaxAccessUnitSize {
d.fragments = d.fragments[:0]
return nil, 0, fmt.Errorf("AU size (%d) is too big (maximum is %d)", d.fragmentedSize, mpeg4audio.MaxAccessUnitSize)
}
d.fragments = append(d.fragments, payload[:dataLens[0]])
if !pkt.Header.Marker {
return nil, 0, ErrMorePacketsNeeded
}
ret := make([]byte, d.fragmentedSize)
n := 0
for _, p := range d.fragments {
n += copy(ret[n:], p)
}
aus := [][]byte{ret}
d.fragments = d.fragments[:0]
aus, err = d.removeADTS(aus)
if err != nil {
return nil, 0, err
}
return aus, d.timeDecoder.Decode(pkt.Timestamp), nil
}
func (d *Decoder) readAUHeaders(buf []byte, headersLen int) ([]uint64, error) {
firstRead := false
count := 0
for i := 0; i < headersLen; {
if i == 0 {
i += d.SizeLength
i += d.IndexLength
} else {
i += d.SizeLength
i += d.IndexDeltaLength
}
count++
}
dataLens := make([]uint64, count)
pos := 0
i := 0
for headersLen > 0 {
dataLen, err := bits.ReadBits(buf, &pos, d.SizeLength)
if err != nil {
return nil, err
}
headersLen -= d.SizeLength
if !firstRead {
firstRead = true
if d.IndexLength > 0 {
auIndex, err := bits.ReadBits(buf, &pos, d.IndexLength)
if err != nil {
return nil, err
}
headersLen -= d.IndexLength
if auIndex != 0 {
return nil, fmt.Errorf("AU-index different than zero is not supported")
}
}
} else if d.IndexDeltaLength > 0 {
auIndexDelta, err := bits.ReadBits(buf, &pos, d.IndexDeltaLength)
if err != nil {
return nil, err
}
headersLen -= d.IndexDeltaLength
if auIndexDelta != 0 {
return nil, fmt.Errorf("AU-index-delta different than zero is not supported")
}
}
dataLens[i] = dataLen
i++
}
return dataLens, nil
}
func (d *Decoder) removeADTS(aus [][]byte) ([][]byte, error) {
// some cameras wrap AUs into ADTS
if !d.firstAUParsed {
d.firstAUParsed = true
if len(aus) == 1 && len(aus[0]) >= 2 {
if aus[0][0] == 0xFF && (aus[0][1]&0xF0) == 0xF0 {
var pkts mpeg4audio.ADTSPackets
err := pkts.Unmarshal(aus[0])
if err == nil && len(pkts) == 1 {
d.adtsMode = true
aus[0] = pkts[0].AU
}
}
}
} else if d.adtsMode {
if len(aus) != 1 {
return nil, fmt.Errorf("multiple AUs in ADTS mode are not supported")
}
var pkts mpeg4audio.ADTSPackets
err := pkts.Unmarshal(aus[0])
if err != nil {
return nil, fmt.Errorf("unable to decode ADTS: %s", err)
}
if len(pkts) != 1 {
return nil, fmt.Errorf("multiple ADTS packets are not supported")
}
aus[0] = pkts[0].AU
}
return aus, nil
}

View File

@@ -0,0 +1,804 @@
package rtpmpeg4audio
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
)
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
sizeLength int
indexLength int
indexDeltaLength int
aus [][]byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
13,
3,
3,
[][]byte{
{
0x21, 0x1a, 0xd4, 0xf5, 0x9e, 0x20, 0xc5, 0x42,
0x89, 0x40, 0xa2, 0x9b, 0x3c, 0x94, 0xdd, 0x28,
0x94, 0x48, 0xd5, 0x8b, 0xb0, 0x2, 0xdb, 0x1b,
0xeb, 0xe0, 0xfa, 0x9f, 0xea, 0x91, 0xa7, 0x3,
0xe8, 0x6b, 0xe5, 0x5, 0x95, 0x6, 0x62, 0x88,
0x13, 0xa, 0x15, 0xa0, 0xeb, 0xef, 0x40, 0x82,
0xdf, 0x49, 0xf2, 0xe0, 0x26, 0xfc, 0x52, 0x5b,
0x6c, 0x2a, 0x2d, 0xe8, 0xa5, 0x70, 0xc5, 0xaf,
0xfc, 0x98, 0x9a, 0x2f, 0x1f, 0xbb, 0xa2, 0xcb,
0xb8, 0x26, 0xb6, 0x6e, 0x4c, 0x15, 0x6c, 0x21,
0x3d, 0x35, 0xf6, 0xcf, 0xa4, 0x3b, 0x72, 0x26,
0xe1, 0x3a, 0x3a, 0x99, 0xd8, 0x2d, 0x6a, 0x22,
0xcd, 0x97, 0xa, 0xef, 0x52, 0x9c, 0x5f, 0xcd,
0x5c, 0xd9, 0xd3, 0x12, 0x7e, 0x45, 0x45, 0xb3,
0x24, 0xef, 0xd3, 0x4f, 0x2f, 0x96, 0xd9, 0x8b,
0x9c, 0xc2, 0xcd, 0x54, 0xb, 0x6e, 0x19, 0x84,
0x56, 0xeb, 0x85, 0x52, 0x63, 0x64, 0x28, 0xb2,
0xf2, 0xcf, 0xb8, 0xa8, 0x71, 0x53, 0x6, 0x82,
0x88, 0xf2, 0xc4, 0xe1, 0x7d, 0x65, 0x54, 0xe0,
0x5e, 0xc8, 0x38, 0x75, 0x9d, 0xb0, 0x58, 0x65,
0x41, 0xa2, 0xcd, 0xdb, 0x1b, 0x9e, 0xac, 0xd1,
0xbe, 0xc9, 0x22, 0xf5, 0xe9, 0xc6, 0x6f, 0xaf,
0xf8, 0xb1, 0x4c, 0xcb, 0xa2, 0x56, 0x11, 0xa4,
0xd7, 0xfd, 0xe5, 0xef, 0x8e, 0xbf, 0xce, 0x4b,
0xef, 0xe1, 0xd, 0xc0, 0x27, 0x18, 0xe2, 0x64,
0x63, 0x5, 0x16, 0x6, 0xc, 0x34, 0xe, 0xf3, 0x62,
0xc2, 0xd6, 0x42, 0x5d, 0x66, 0x81, 0x4, 0x65,
0x76, 0xaa, 0xe7, 0x39, 0xdd, 0x8e, 0xfe, 0x48,
0x23, 0x3a, 0x1, 0xc4, 0xd3, 0x65, 0x80, 0x28,
0x6f, 0x9b, 0xc9, 0xb7, 0x4e, 0x44, 0x4c, 0x98,
0x6a, 0x5f, 0x3b, 0x97, 0x81, 0x9b, 0xa9, 0xab,
0xfd, 0xcf, 0x8e, 0x78, 0xbd, 0x4d, 0x70, 0x81,
0x9b, 0x2d, 0x85, 0x94, 0x74, 0x2a, 0x3a, 0xb4,
0xff, 0x4a, 0x13, 0x70, 0x76, 0x2c, 0x2f, 0x13,
0x5b, 0x43, 0xf9, 0x17, 0xee, 0x26, 0x37, 0x1,
0xbc, 0x9f, 0xb, 0xe, 0x68, 0xcb, 0x87, 0x65,
0x86, 0xcc, 0x4c, 0x2f, 0x7a, 0x14, 0xd, 0xd1,
0xb9, 0x57, 0xbd, 0x50, 0xb6, 0x95, 0x44, 0x1a,
0xd, 0xc0, 0x15, 0xf, 0xd2, 0xc3, 0x72, 0x4d,
0x6e, 0x4f, 0x8e, 0x6d, 0x64, 0xdc, 0x64, 0x1f,
0x33, 0x53, 0x4e, 0xd8, 0xa4, 0x74, 0xf3, 0x33,
0x4, 0x68, 0xd9, 0x92, 0xf3, 0x6e, 0xb7, 0x5b,
0xe6, 0xf6, 0xc3, 0x55, 0x14, 0x54, 0x87, 0x0,
0xaf, 0x7,
},
},
20 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x00, 0x10, 0x0a, 0xd8,
0x21, 0x1a, 0xd4, 0xf5, 0x9e, 0x20, 0xc5, 0x42,
0x89, 0x40, 0xa2, 0x9b, 0x3c, 0x94, 0xdd, 0x28,
0x94, 0x48, 0xd5, 0x8b, 0xb0, 0x02, 0xdb, 0x1b,
0xeb, 0xe0, 0xfa, 0x9f, 0xea, 0x91, 0xa7, 0x03,
0xe8, 0x6b, 0xe5, 0x05, 0x95, 0x06, 0x62, 0x88,
0x13, 0x0a, 0x15, 0xa0, 0xeb, 0xef, 0x40, 0x82,
0xdf, 0x49, 0xf2, 0xe0, 0x26, 0xfc, 0x52, 0x5b,
0x6c, 0x2a, 0x2d, 0xe8, 0xa5, 0x70, 0xc5, 0xaf,
0xfc, 0x98, 0x9a, 0x2f, 0x1f, 0xbb, 0xa2, 0xcb,
0xb8, 0x26, 0xb6, 0x6e, 0x4c, 0x15, 0x6c, 0x21,
0x3d, 0x35, 0xf6, 0xcf, 0xa4, 0x3b, 0x72, 0x26,
0xe1, 0x3a, 0x3a, 0x99, 0xd8, 0x2d, 0x6a, 0x22,
0xcd, 0x97, 0x0a, 0xef, 0x52, 0x9c, 0x5f, 0xcd,
0x5c, 0xd9, 0xd3, 0x12, 0x7e, 0x45, 0x45, 0xb3,
0x24, 0xef, 0xd3, 0x4f, 0x2f, 0x96, 0xd9, 0x8b,
0x9c, 0xc2, 0xcd, 0x54, 0x0b, 0x6e, 0x19, 0x84,
0x56, 0xeb, 0x85, 0x52, 0x63, 0x64, 0x28, 0xb2,
0xf2, 0xcf, 0xb8, 0xa8, 0x71, 0x53, 0x06, 0x82,
0x88, 0xf2, 0xc4, 0xe1, 0x7d, 0x65, 0x54, 0xe0,
0x5e, 0xc8, 0x38, 0x75, 0x9d, 0xb0, 0x58, 0x65,
0x41, 0xa2, 0xcd, 0xdb, 0x1b, 0x9e, 0xac, 0xd1,
0xbe, 0xc9, 0x22, 0xf5, 0xe9, 0xc6, 0x6f, 0xaf,
0xf8, 0xb1, 0x4c, 0xcb, 0xa2, 0x56, 0x11, 0xa4,
0xd7, 0xfd, 0xe5, 0xef, 0x8e, 0xbf, 0xce, 0x4b,
0xef, 0xe1, 0x0d, 0xc0, 0x27, 0x18, 0xe2, 0x64,
0x63, 0x05, 0x16, 0x06, 0x0c, 0x34, 0x0e, 0xf3,
0x62, 0xc2, 0xd6, 0x42, 0x5d, 0x66, 0x81, 0x04,
0x65, 0x76, 0xaa, 0xe7, 0x39, 0xdd, 0x8e, 0xfe,
0x48, 0x23, 0x3a, 0x01, 0xc4, 0xd3, 0x65, 0x80,
0x28, 0x6f, 0x9b, 0xc9, 0xb7, 0x4e, 0x44, 0x4c,
0x98, 0x6a, 0x5f, 0x3b, 0x97, 0x81, 0x9b, 0xa9,
0xab, 0xfd, 0xcf, 0x8e, 0x78, 0xbd, 0x4d, 0x70,
0x81, 0x9b, 0x2d, 0x85, 0x94, 0x74, 0x2a, 0x3a,
0xb4, 0xff, 0x4a, 0x13, 0x70, 0x76, 0x2c, 0x2f,
0x13, 0x5b, 0x43, 0xf9, 0x17, 0xee, 0x26, 0x37,
0x01, 0xbc, 0x9f, 0x0b, 0x0e, 0x68, 0xcb, 0x87,
0x65, 0x86, 0xcc, 0x4c, 0x2f, 0x7a, 0x14, 0x0d,
0xd1, 0xb9, 0x57, 0xbd, 0x50, 0xb6, 0x95, 0x44,
0x1a, 0x0d, 0xc0, 0x15, 0x0f, 0xd2, 0xc3, 0x72,
0x4d, 0x6e, 0x4f, 0x8e, 0x6d, 0x64, 0xdc, 0x64,
0x1f, 0x33, 0x53, 0x4e, 0xd8, 0xa4, 0x74, 0xf3,
0x33, 0x04, 0x68, 0xd9, 0x92, 0xf3, 0x6e, 0xb7,
0x5b, 0xe6, 0xf6, 0xc3, 0x55, 0x14, 0x54, 0x87,
0x00, 0xaf, 0x07,
},
},
},
},
{
"aggregated",
13,
3,
3,
[][]byte{
{0x00, 0x01, 0x02, 0x03},
{0x04, 0x05, 0x06, 0x07},
{0x08, 0x09, 0x0A, 0x0B},
},
0,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x0, 0x30, 0x0, 0x20,
0x0, 0x20, 0x0, 0x20, 0x0, 0x1, 0x2, 0x3,
0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb,
},
},
},
},
{
"fragmented",
13,
3,
3,
[][]byte{
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 512),
},
0,
[]*rtp.Packet{ //nolint:dupl
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x0, 0x10, 0x2d, 0x80},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x00, 0x10, 0x2d, 0x80},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x00, 0x10, 0x25, 0x00},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 148),
),
},
},
},
{
"aggregated followed by fragmented",
13,
3,
3,
[][]byte{
{0x00, 0x01, 0x02, 0x03},
{0x04, 0x05, 0x06, 0x07},
{0x08, 0x09, 0x0A, 0x0B},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 256),
},
0,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x0, 0x30, 0x0, 0x20,
0x0, 0x20, 0x0, 0x20, 0x0, 0x1, 0x2, 0x3,
0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb,
},
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289529429,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x0, 0x10, 0x2d, 0x80},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289529429,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x00, 0x10, 0x12, 0x80},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 74),
),
},
},
},
{
"single, custom sized",
6,
2,
2,
[][]byte{
{0x01, 0x02, 0x03, 0x04},
},
20 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x00, 0x08, 0x10,
0x01, 0x02, 0x03, 0x04,
},
},
},
},
{
"single, custom sized, padded",
13,
0,
0,
[][]byte{
{0x01, 0x02, 0x03, 0x04},
},
20 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x00, 0x0d, 0x00, 0x20,
0x01, 0x02, 0x03, 0x04,
},
},
},
},
{
"aggregated, custom sized, padded",
13,
0,
0,
[][]byte{
{0x01, 0x02, 0x03, 0x04},
{0x05, 0x06, 0x07, 0x08},
},
20 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x00, 0x1a, 0x00, 0x20, 0x01, 0x00,
0x01, 0x02, 0x03, 0x04,
0x05, 0x06, 0x07, 0x08,
},
},
},
},
{
"fragmented, custom sized",
21,
3,
3,
[][]byte{
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 512),
},
20 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x0, 0x18, 0x00, 0x2d, 0x78},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 181),
[]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x00, 0x18, 0x00, 0x2d, 0x78, 0x07},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 181),
[]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05},
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x00, 0x18, 0x00, 0x25, 0x10, 0x06, 0x07},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 148),
),
},
},
},
{
"fragmented, custom sized, padded",
13,
0,
0,
[][]byte{
bytes.Repeat([]byte{0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, 512),
},
20 * time.Millisecond,
[]*rtp.Packet{ //nolint:dupl
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x0, 0x0d, 0x2d, 0x80},
bytes.Repeat([]byte{0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, 182),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x0, 0x0d, 0x2d, 0x80},
bytes.Repeat([]byte{0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, 182),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289527317,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x00, 0x0d, 0x25, 0x00},
bytes.Repeat([]byte{0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, 148),
),
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{
SampleRate: 48000,
SizeLength: ca.sizeLength,
IndexLength: ca.indexLength,
IndexDeltaLength: ca.indexDeltaLength,
}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
}
switch {
case ca.sizeLength == 13 && ca.indexLength == 3:
pkt.Payload = []byte{0x00, 0x10, 0x00, 0x08, 0x0}
case ca.sizeLength == 13 && ca.indexLength == 0:
pkt.Payload = []byte{0x00, 0x0d, 0x00, 0x08, 0x0}
case ca.sizeLength == 6:
pkt.Payload = []byte{0x00, 0x08, 0x04, 0x0}
case ca.sizeLength == 21:
pkt.Payload = []byte{0x00, 0x18, 0x00, 0x0, 0x08, 0x00}
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var aus [][]byte
expPTS := ca.pts
for _, pkt := range ca.pkts {
clone := pkt.Clone()
addAUs, pts, err := d.Decode(pkt)
if err == ErrMorePacketsNeeded {
continue
}
require.NoError(t, err)
require.Equal(t, expPTS, pts)
aus = append(aus, addAUs...)
expPTS += time.Duration(len(aus)) * mpeg4audio.SamplesPerAccessUnit * time.Second / 48000
// test input integrity
require.Equal(t, clone, pkt)
}
require.Equal(t, ca.aus, aus)
})
}
}
func TestDecodeADTS(t *testing.T) {
d := &Decoder{
SampleRate: 16000,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
d.Init()
for i := 0; i < 2; i++ {
aus, _, err := d.Decode(&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x00, 0x10, 0x00, 0x09 << 3,
0xff, 0xf1, 0x4c, 0x80, 0x1, 0x3f, 0xfc, 0xaa, 0xbb,
},
})
require.NoError(t, err)
require.Equal(t, [][]byte{{0xaa, 0xbb}}, aus)
}
}
func TestDecodeErrors(t *testing.T) {
for _, ca := range []struct {
name string
pkts []*rtp.Packet
err string
}{
{
"missing payload",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
},
},
"payload is too short",
},
{
"missing au header",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x00, 0x10},
},
},
"not enough bits",
},
{
"missing au",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x00, 0x10, 0x0a, 0xd8},
},
},
"payload is too short",
},
{
"invalid au headers length",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x00, 0x00},
},
},
"invalid AU-headers-length",
},
{
"au index not zero",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x00, 0x10, 0x0a, 0xd9},
},
},
"AU-index different than zero is not supported",
},
{
"au index delta not zero",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x00, 0x20, 0x00, 0x08, 0x0a, 0xd9},
},
},
"AU-index-delta different than zero is not supported",
},
{
"fragmented with multiple AUs in 1st packet",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 0x60,
SequenceNumber: 0xea2,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x00, 0x20, 0x00, 0x08, 0x00, 0x08},
},
},
"a fragmented packet can only contain one AU",
},
{
"fragmented with no payload in 1st packet",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x00, 0x10, 0x0a, 0xd8},
},
},
"payload is too short",
},
{
"fragmented with multiple AUs in 2nd packet",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x0, 0x10, 0x2d, 0x80},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 0x60,
SequenceNumber: 0x44ee,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x0, 0x20, 0x00, 0x08, 0x00, 0x08},
),
},
},
"a fragmented packet can only contain one AU",
},
{
"fragmented with no payload in 2nd packet",
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 0x60,
SequenceNumber: 0x44ed,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{0x0, 0x10, 0x2d, 0x80},
bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 182),
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 0x60,
SequenceNumber: 0x44ee,
Timestamp: 0x88776a15,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x00, 0x10, 0x0a, 0xd8},
},
},
"payload is too short",
},
} {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{
SampleRate: 48000,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
d.Init()
var lastErr error
for _, pkt := range ca.pkts {
_, _, lastErr = d.Decode(pkt)
}
require.EqualError(t, lastErr, ca.err)
})
}
}

View File

@@ -0,0 +1,273 @@
package rtpmpeg4audio
import (
"crypto/rand"
"time"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/bits"
"github.com/aler9/gortsplib/pkg/mpeg4audio"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/MPEG4-audio encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
// sample rate of packets.
SampleRate int
// The number of bits on which the AU-size field is encoded in the AU-header.
SizeLength int
// The number of bits on which the AU-Index is encoded in the first AU-header.
IndexLength int
// The number of bits on which the AU-Index-delta field is encoded in any non-first AU-header.
IndexDeltaLength int
sequenceNumber uint16
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*float64(e.SampleRate))
}
// Encode encodes AUs into RTP/MPEG4-audio packets.
func (e *Encoder) Encode(aus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
var rets []*rtp.Packet
var batch [][]byte
// split AUs into batches
for _, au := range aus {
if e.lenAggregated(batch, au) <= e.PayloadMaxSize {
// add to existing batch
batch = append(batch, au)
} else {
// write last batch
if batch != nil {
pkts, err := e.writeBatch(batch, pts)
if err != nil {
return nil, err
}
rets = append(rets, pkts...)
pts += time.Duration(len(batch)) * mpeg4audio.SamplesPerAccessUnit * time.Second / time.Duration(e.SampleRate)
}
// initialize new batch
batch = [][]byte{au}
}
}
// write last batch
pkts, err := e.writeBatch(batch, pts)
if err != nil {
return nil, err
}
rets = append(rets, pkts...)
return rets, nil
}
func (e *Encoder) writeBatch(aus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
if len(aus) == 1 {
// the AU fits into a single RTP packet
if len(aus[0]) < e.PayloadMaxSize {
return e.writeAggregated(aus, pts)
}
// split the AU into multiple fragmentation packet
return e.writeFragmented(aus[0], pts)
}
return e.writeAggregated(aus, pts)
}
func (e *Encoder) writeFragmented(au []byte, pts time.Duration) ([]*rtp.Packet, error) {
auHeadersLen := e.SizeLength + e.IndexLength
auHeadersLenBytes := auHeadersLen / 8
if (auHeadersLen % 8) != 0 {
auHeadersLenBytes++
}
auMaxSize := e.PayloadMaxSize - 2 - auHeadersLenBytes
packetCount := len(au) / auMaxSize
lastPacketSize := len(au) % auMaxSize
if lastPacketSize > 0 {
packetCount++
}
ret := make([]*rtp.Packet, packetCount)
encPTS := e.encodeTimestamp(pts)
for i := range ret {
var le int
if i != (packetCount - 1) {
le = auMaxSize
} else {
le = lastPacketSize
}
byts := make([]byte, 2+auHeadersLenBytes+le)
// AU-headers-length
byts[0] = byte(auHeadersLen >> 8)
byts[1] = byte(auHeadersLen)
// AU-headers
pos := 0
bits.WriteBits(byts[2:], &pos, uint64(le), e.SizeLength)
bits.WriteBits(byts[2:], &pos, 0, e.IndexLength)
// AU
copy(byts[2+auHeadersLenBytes:], au[:le])
au = au[le:]
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: encPTS,
SSRC: *e.SSRC,
Marker: (i == (packetCount - 1)),
},
Payload: byts,
}
e.sequenceNumber++
}
return ret, nil
}
func (e *Encoder) lenAggregated(aus [][]byte, addAU []byte) int {
ret := 2 // AU-headers-length
// AU-headers
auHeadersLen := 0
i := 0
for range aus {
if i == 0 {
auHeadersLen += e.SizeLength + e.IndexLength
} else {
auHeadersLen += e.SizeLength + e.IndexDeltaLength
}
i++
}
if addAU != nil {
if i == 0 {
auHeadersLen += e.SizeLength + e.IndexLength
} else {
auHeadersLen += e.SizeLength + e.IndexDeltaLength
}
}
ret += auHeadersLen / 8
if (auHeadersLen % 8) != 0 {
ret++
}
// AU
for _, au := range aus {
ret += len(au)
}
ret += len(addAU)
return ret
}
func (e *Encoder) writeAggregated(aus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
payload := make([]byte, e.lenAggregated(aus, nil))
// AU-headers
written := 0
pos := 0
for i, au := range aus {
bits.WriteBits(payload[2:], &pos, uint64(len(au)), e.SizeLength)
written += e.SizeLength
if i == 0 {
bits.WriteBits(payload[2:], &pos, 0, e.IndexLength)
written += e.IndexLength
} else {
bits.WriteBits(payload[2:], &pos, 0, e.IndexDeltaLength)
written += e.IndexDeltaLength
}
}
pos = 2 + (written / 8)
if (written % 8) != 0 {
pos++
}
// AU-headers-length
payload[0] = byte(written >> 8)
payload[1] = byte(written)
// AUs
for _, au := range aus {
auLen := copy(payload[pos:], au)
pos += auLen
}
pkt := &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: true,
},
Payload: payload,
}
e.sequenceNumber++
return []*rtp.Packet{pkt}, nil
}

View File

@@ -0,0 +1,52 @@
package rtpmpeg4audio
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SampleRate: 48000,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
SizeLength: ca.sizeLength,
IndexLength: ca.indexLength,
IndexDeltaLength: ca.indexDeltaLength,
}
e.Init()
pkts, err := e.Encode(ca.aus, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SampleRate: 48000,
SizeLength: 13,
IndexLength: 3,
IndexDeltaLength: 3,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

View File

@@ -0,0 +1,2 @@
// Package rtpmpeg4audio contains a RTP/MPEG4-audio decoder and encoder.
package rtpmpeg4audio

View File

@@ -0,0 +1,26 @@
package rtpsimpleaudio
import (
"time"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// Decoder is a RTP/simple audio decoder.
type Decoder struct {
SampleRate int
timeDecoder *rtptimedec.Decoder
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(d.SampleRate)
}
// Decode decodes an audio frame from a RTP packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
return pkt.Payload, d.timeDecoder.Decode(pkt.Timestamp), nil
}

View File

@@ -0,0 +1,67 @@
package rtpsimpleaudio
import (
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
var cases = []struct {
name string
frame []byte
pts time.Duration
pkt *rtp.Packet
}{
{
"single",
[]byte{0x01, 0x02, 0x03, 0x04},
25 * time.Millisecond,
&rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 0,
SequenceNumber: 17645,
Timestamp: 2289526557,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{
SampleRate: 8000,
}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 0,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
frame, pts, err := d.Decode(ca.pkt)
require.NoError(t, err)
require.Equal(t, ca.pts, pts)
require.Equal(t, ca.frame, frame)
})
}
}

View File

@@ -0,0 +1,93 @@
package rtpsimpleaudio
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/simple audio encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
SampleRate int
sequenceNumber uint16
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*float64(e.SampleRate))
}
// Encode encodes an audio frame into a RTP packet.
func (e *Encoder) Encode(frame []byte, pts time.Duration) (*rtp.Packet, error) {
if len(frame) > e.PayloadMaxSize {
return nil, fmt.Errorf("frame is too big")
}
pkt := &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: false,
},
Payload: frame,
}
e.sequenceNumber++
return pkt, nil
}

View File

@@ -0,0 +1,46 @@
package rtpsimpleaudio
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 0,
SampleRate: 8000,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
}
e.Init()
pkt, err := e.Encode(ca.frame, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkt, pkt)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 0,
SampleRate: 8000,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

View File

@@ -0,0 +1,2 @@
// Package rtpsimpleaudio contains a RTP decoder and encoder for audio codecs that fit in a single packet.
package rtpsimpleaudio

View File

@@ -0,0 +1,75 @@
package rtpvp8
import (
"errors"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// ErrMorePacketsNeeded is returned when more packets are needed.
var ErrMorePacketsNeeded = errors.New("need more packets")
// Decoder is a RTP/VP8 decoder.
type Decoder struct {
timeDecoder *rtptimedec.Decoder
fragments [][]byte
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(rtpClockRate)
}
// Decode decodes a VP8 frame from a RTP/VP8 packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
var vpkt codecs.VP8Packet
_, err := vpkt.Unmarshal(pkt.Payload)
if err != nil {
return nil, 0, err
}
if vpkt.PID != 0 {
return nil, 0, fmt.Errorf("packets containing single partitions are not supported")
}
if vpkt.S == 1 {
d.fragments = d.fragments[:0]
if pkt.Marker {
return vpkt.Payload, d.timeDecoder.Decode(pkt.Timestamp), nil
}
d.fragments = append(d.fragments, vpkt.Payload)
return nil, 0, ErrMorePacketsNeeded
}
if len(d.fragments) == 0 {
return nil, 0, fmt.Errorf("received a non-starting fragment")
}
d.fragments = append(d.fragments, vpkt.Payload)
if !pkt.Marker {
return nil, 0, ErrMorePacketsNeeded
}
n := 0
for _, frag := range d.fragments {
n += len(frag)
}
frame := make([]byte, n)
pos := 0
for _, frag := range d.fragments {
pos += copy(frame[pos:], frag)
}
d.fragments = d.fragments[:0]
return frame, d.timeDecoder.Decode(pkt.Timestamp), nil
}

View File

@@ -0,0 +1,133 @@
package rtpvp8
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
frame []byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
[]byte{0x01, 0x02, 0x03, 0x04},
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289528607,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x10, 0x01, 0x02, 0x03, 0x04},
},
},
},
{
"fragmented",
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 4096/4),
55 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x10}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364), []byte{0x01, 0x02, 0x03}),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x00, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364), []byte{0x01, 0x02}),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x00, 0x03, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 294)),
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x10, 0x01, 0x02, 0x03, 0x04},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var frame []byte
for _, pkt := range ca.pkts {
var pts time.Duration
frame, pts, err = d.Decode(pkt)
if err == ErrMorePacketsNeeded {
continue
}
require.NoError(t, err)
require.Equal(t, ca.pts, pts)
}
require.Equal(t, ca.frame, frame)
})
}
}

View File

@@ -0,0 +1,99 @@
package rtpvp8
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/VP8 encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
sequenceNumber uint16
vp codecs.VP8Payloader
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*rtpClockRate)
}
// Encode encodes a VP8 frame into RTP/VP8 packets.
func (e *Encoder) Encode(frame []byte, pts time.Duration) ([]*rtp.Packet, error) {
payloads := e.vp.Payload(uint16(e.PayloadMaxSize), frame)
if payloads == nil {
return nil, fmt.Errorf("payloader failed")
}
plen := len(payloads)
ret := make([]*rtp.Packet, plen)
for i, payload := range payloads {
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: i == (plen - 1),
},
Payload: payload,
}
e.sequenceNumber++
}
return ret, nil
}

View File

@@ -0,0 +1,44 @@
package rtpvp8
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
}
e.Init()
pkts, err := e.Encode(ca.frame, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

View File

@@ -0,0 +1,6 @@
// Package rtpvp8 contains a RTP/VP8 decoder and encoder.
package rtpvp8
const (
rtpClockRate = 90000 // VP8 always uses 90khz
)

View File

@@ -0,0 +1,71 @@
package rtpvp9
import (
"errors"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// ErrMorePacketsNeeded is returned when more packets are needed.
var ErrMorePacketsNeeded = errors.New("need more packets")
// Decoder is a RTP/VP9 decoder.
type Decoder struct {
timeDecoder *rtptimedec.Decoder
fragments [][]byte
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(rtpClockRate)
}
// Decode decodes a VP9 frame from a RTP/VP9 packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
var vpkt codecs.VP9Packet
_, err := vpkt.Unmarshal(pkt.Payload)
if err != nil {
return nil, 0, err
}
if vpkt.B {
d.fragments = d.fragments[:0]
if vpkt.E {
return vpkt.Payload, d.timeDecoder.Decode(pkt.Timestamp), nil
}
d.fragments = append(d.fragments, vpkt.Payload)
return nil, 0, ErrMorePacketsNeeded
}
if len(d.fragments) == 0 {
return nil, 0, fmt.Errorf("received a non-starting fragment")
}
d.fragments = append(d.fragments, vpkt.Payload)
if !vpkt.E {
return nil, 0, ErrMorePacketsNeeded
}
n := 0
for _, frag := range d.fragments {
n += len(frag)
}
frame := make([]byte, n)
pos := 0
for _, frag := range d.fragments {
pos += copy(frame[pos:], frag)
}
d.fragments = d.fragments[:0]
return frame, d.timeDecoder.Decode(pkt.Timestamp), nil
}

View File

@@ -0,0 +1,134 @@
package rtpvp9
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
frame []byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
[]byte{0x01, 0x02, 0x03, 0x04},
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289528607,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x9c, 0xb5, 0xaf, 0x01, 0x02, 0x03, 0x04},
},
},
},
{
"fragmented",
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 4096/4),
55 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x98, 0xb5, 0xaf}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364), []byte{0x01}),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x90, 0xb5, 0xaf, 0x02, 0x03, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 363), []byte{0x01, 0x02}),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x94, 0xb5, 0xaf, 0x03, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 295)),
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x9c, 0xb5, 0xaf, 0x01, 0x02, 0x03, 0x04},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var frame []byte
for _, pkt := range ca.pkts {
var pts time.Duration
frame, pts, err = d.Decode(pkt)
if err == ErrMorePacketsNeeded {
continue
}
require.NoError(t, err)
require.Equal(t, ca.pts, pts)
}
require.Equal(t, ca.frame, frame)
})
}
}

View File

@@ -0,0 +1,111 @@
package rtpvp9
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/VP9 encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
// initial picture ID of frames (optional).
// It defaults to a random value.
InitialPictureID *uint16
sequenceNumber uint16
vp codecs.VP9Payloader
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
if e.InitialPictureID == nil {
v := uint16(randUint32())
e.InitialPictureID = &v
}
e.sequenceNumber = *e.InitialSequenceNumber
e.vp.InitialPictureIDFn = func() uint16 {
return *e.InitialPictureID
}
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*rtpClockRate)
}
// Encode encodes a VP9 frame into RTP/VP9 packets.
func (e *Encoder) Encode(frame []byte, pts time.Duration) ([]*rtp.Packet, error) {
payloads := e.vp.Payload(uint16(e.PayloadMaxSize), frame)
if payloads == nil {
return nil, fmt.Errorf("payloader failed")
}
plen := len(payloads)
ret := make([]*rtp.Packet, plen)
for i, payload := range payloads {
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: i == (plen - 1),
},
Payload: payload,
}
e.sequenceNumber++
}
return ret, nil
}

View File

@@ -0,0 +1,48 @@
package rtpvp9
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
InitialPictureID: func() *uint16 {
v := uint16(0x35af)
return &v
}(),
}
e.Init()
pkts, err := e.Encode(ca.frame, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

View File

@@ -0,0 +1,6 @@
// Package rtpvp9 contains a RTP/VP9 decoder and encoder.
package rtpvp9
const (
rtpClockRate = 90000 // VP9 always uses 90khz
)