add RTP/LPCM decoder and encoder

This commit is contained in:
aler9
2022-11-15 22:11:26 +01:00
parent 0bee80a277
commit a7e222d3e6
9 changed files with 428 additions and 11 deletions

View File

@@ -51,7 +51,7 @@ Features:
* Audio
* G722
* LPCM
* MPEG4-Audio (AAC)
* MPEG4-audio (AAC)
* Opus
* PCMA
* PCMU
@@ -73,6 +73,7 @@ Features:
* [client-read-codec-h264](examples/client-read-codec-h264/main.go)
* [client-read-codec-h264-convert-to-jpeg](examples/client-read-codec-h264-convert-to-jpeg/main.go)
* [client-read-codec-h264-save-to-disk](examples/client-read-codec-h264-save-to-disk/main.go)
* [client-read-codec-lpcm](examples/client-read-codec-lpcm/main.go)
* [client-read-codec-mpeg4audio](examples/client-read-codec-mpeg4audio/main.go)
* [client-read-codec-opus](examples/client-read-codec-opus/main.go)
* [client-read-codec-pcma](examples/client-read-codec-pcma/main.go)

View File

@@ -0,0 +1,73 @@
package main
import (
"log"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/url"
)
// This example shows how to
// 1. connect to a RTSP server and read all tracks on a path
// 2. check if there's an LPCM track
// 3. get LPCM packets of that track
func main() {
c := gortsplib.Client{}
// parse URL
u, err := url.Parse("rtsp://localhost:8554/mystream")
if err != nil {
panic(err)
}
// connect to the server
err = c.Start(u.Scheme, u.Host)
if err != nil {
panic(err)
}
defer c.Close()
// find published tracks
tracks, baseURL, _, err := c.Describe(u)
if err != nil {
panic(err)
}
// find the LPCM track
track := func() *gortsplib.TrackLPCM {
for _, track := range tracks {
if tt, ok := track.(*gortsplib.TrackLPCM); ok {
return tt
}
}
return nil
}()
if track == nil {
panic("LPCM track not found")
}
// setup decoder
dec := track.CreateDecoder()
// called when a RTP packet arrives
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
// decode an LPCM packet from the RTP packet
op, _, err := dec.Decode(ctx.Packet)
if err != nil {
return
}
// print
log.Printf("received LPCM samples of size %d\n", len(op))
}
// setup and read the LPCM track only
err = c.SetupAndPlay(gortsplib.Tracks{track}, baseURL)
if err != nil {
panic(err)
}
// wait until a fatal error
panic(c.Wait())
}

37
pkg/rtplpcm/decoder.go Normal file
View File

@@ -0,0 +1,37 @@
package rtplpcm
import (
"fmt"
"time"
"github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// Decoder is a RTP/LPCM decoder.
type Decoder struct {
BitDepth int
SampleRate int
ChannelCount int
timeDecoder *rtptimedec.Decoder
sampleSize int
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(d.SampleRate)
d.sampleSize = d.BitDepth * d.ChannelCount / 8
}
// Decode decodes audio samples from a RTP packet.
// It returns audio samples and PTS of the first sample.
func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
plen := len(pkt.Payload)
if (plen % d.sampleSize) != 0 {
return nil, 0, fmt.Errorf("received payload of wrong size")
}
return pkt.Payload, d.timeDecoder.Decode(pkt.Timestamp), nil
}

108
pkg/rtplpcm/decoder_test.go Normal file
View File

@@ -0,0 +1,108 @@
package rtplpcm
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
var cases = []struct {
name string
samples []byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
[]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527557,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
},
},
},
{
"splitted",
bytes.Repeat([]byte{0x41, 0x42, 0x43}, 680),
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289527557,
SSRC: 0x9dbb7812,
},
Payload: bytes.Repeat([]byte{0x41, 0x42, 0x43}, 486),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289527800,
SSRC: 0x9dbb7812,
},
Payload: bytes.Repeat([]byte{0x41, 0x42, 0x43}, 194),
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{
BitDepth: 24,
SampleRate: 48000,
ChannelCount: 2,
}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 0,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var samples []byte
expPTS := ca.pts
for _, pkt := range ca.pkts {
partial, pts, err := d.Decode(pkt)
require.NoError(t, err)
require.Equal(t, expPTS, pts)
samples = append(samples, partial...)
expPTS += time.Duration(len(partial)/(24*2/8)) * time.Second / 48000
}
require.Equal(t, ca.samples, samples)
})
}
}

123
pkg/rtplpcm/encoder.go Normal file
View File

@@ -0,0 +1,123 @@
package rtplpcm
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/LPCM encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
BitDepth int
SampleRate int
ChannelCount int
sequenceNumber uint16
sampleSize int
maxPayloadSize int
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
e.sampleSize = e.BitDepth * e.ChannelCount / 8
e.maxPayloadSize = (e.PayloadMaxSize / e.sampleSize) * e.sampleSize
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*float64(e.SampleRate))
}
// Encode encodes audio samples into RTP packets.
func (e *Encoder) Encode(samples []byte, pts time.Duration) ([]*rtp.Packet, error) {
slen := len(samples)
if (slen % e.sampleSize) != 0 {
return nil, fmt.Errorf("invalid samples")
}
n := (slen / e.maxPayloadSize)
if (slen % e.maxPayloadSize) != 0 {
n++
}
ret := make([]*rtp.Packet, n)
i := 0
pos := 0
payloadSize := e.maxPayloadSize
for {
if payloadSize > len(samples[pos:]) {
payloadSize = len(samples[pos:])
}
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: false,
},
Payload: samples[pos : pos+payloadSize],
}
e.sequenceNumber++
i++
pos += payloadSize
pts += time.Duration(payloadSize/e.sampleSize) * time.Second / time.Duration(e.SampleRate)
if pos == slen {
break
}
}
return ret, nil
}

View File

@@ -0,0 +1,50 @@
package rtplpcm
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
BitDepth: 24,
SampleRate: 48000,
ChannelCount: 2,
}
e.Init()
pkts, err := e.Encode(ca.samples, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
BitDepth: 24,
SampleRate: 48000,
ChannelCount: 2,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

View File

@@ -0,0 +1,2 @@
// Package rtplpcm contains a RTP/LPCM decoder and encoder.
package rtplpcm

View File

@@ -82,12 +82,10 @@ func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
}
// Encode encodes AUs into RTP/MPEG4-audio packets.
func (e *Encoder) Encode(aus [][]byte, firstPTS time.Duration) ([]*rtp.Packet, error) {
func (e *Encoder) Encode(aus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
var rets []*rtp.Packet
var batch [][]byte
pts := firstPTS
// split AUs into batches
for _, au := range aus {
if e.lenAggregated(batch, au) <= e.PayloadMaxSize {
@@ -119,18 +117,18 @@ func (e *Encoder) Encode(aus [][]byte, firstPTS time.Duration) ([]*rtp.Packet, e
return rets, nil
}
func (e *Encoder) writeBatch(aus [][]byte, firstPTS time.Duration) ([]*rtp.Packet, error) {
func (e *Encoder) writeBatch(aus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
if len(aus) == 1 {
// the AU fits into a single RTP packet
if len(aus[0]) < e.PayloadMaxSize {
return e.writeAggregated(aus, firstPTS)
return e.writeAggregated(aus, pts)
}
// split the AU into multiple fragmentation packet
return e.writeFragmented(aus[0], firstPTS)
return e.writeFragmented(aus[0], pts)
}
return e.writeAggregated(aus, firstPTS)
return e.writeAggregated(aus, pts)
}
func (e *Encoder) writeFragmented(au []byte, pts time.Duration) ([]*rtp.Packet, error) {
@@ -225,7 +223,7 @@ func (e *Encoder) lenAggregated(aus [][]byte, addAU []byte) int {
return ret
}
func (e *Encoder) writeAggregated(aus [][]byte, firstPTS time.Duration) ([]*rtp.Packet, error) {
func (e *Encoder) writeAggregated(aus [][]byte, pts time.Duration) ([]*rtp.Packet, error) {
payload := make([]byte, e.lenAggregated(aus, nil))
// AU-headers
@@ -262,7 +260,7 @@ func (e *Encoder) writeAggregated(aus [][]byte, firstPTS time.Duration) ([]*rtp.
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(firstPTS),
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: true,
},

View File

@@ -1,4 +1,4 @@
package gortsplib //nolint:dupl
package gortsplib
import (
"fmt"
@@ -6,6 +6,8 @@ import (
"strings"
psdp "github.com/pion/sdp/v3"
"github.com/aler9/gortsplib/pkg/rtplpcm"
)
// TrackLPCM is an uncompressed, Linear PCM track.
@@ -113,3 +115,26 @@ func (t *TrackLPCM) clone() Track {
trackBase: t.trackBase,
}
}
// CreateDecoder creates a decoder able to decode the content of the track.
func (t *TrackLPCM) CreateDecoder() *rtplpcm.Decoder {
d := &rtplpcm.Decoder{
BitDepth: t.BitDepth,
SampleRate: t.SampleRate,
ChannelCount: t.ChannelCount,
}
d.Init()
return d
}
// CreateEncoder creates an encoder able to encode the content of the track.
func (t *TrackLPCM) CreateEncoder() *rtplpcm.Encoder {
e := &rtplpcm.Encoder{
PayloadType: t.PayloadType,
BitDepth: t.BitDepth,
SampleRate: t.SampleRate,
ChannelCount: t.ChannelCount,
}
e.Init()
return e
}