add RTP/VP9 encoder and decoder (#152)

This commit is contained in:
Alessandro Ros
2022-11-14 18:46:26 +01:00
committed by GitHub
parent 34545becc3
commit e264304710
13 changed files with 537 additions and 27 deletions

View File

@@ -47,7 +47,8 @@ Features:
* Parse H264 elements and formats: RTP/H264, Annex-B, AVCC, anti-competition, DTS
* Parse MPEG4-audio (AAC) elements and formats: RTP/MPEG4-audio, ADTS, MPEG4-audio configurations
* Parse Opus elements: RTP/Opus
* Parse VP8 elements: RTP/Opus
* Parse VP8 elements: RTP/VP8
* Parse VP8 elements: RTP/VP9
## Table of contents
@@ -65,6 +66,7 @@ Features:
* [client-read-codec-mpeg4audio](examples/client-read-codec-mpeg4audio/main.go)
* [client-read-codec-opus](examples/client-read-codec-opus/main.go)
* [client-read-codec-vp8](examples/client-read-codec-vp8/main.go)
* [client-read-codec-vp9](examples/client-read-codec-vp9/main.go)
* [client-read-partial](examples/client-read-partial/main.go)
* [client-read-options](examples/client-read-options/main.go)
* [client-read-pause](examples/client-read-pause/main.go)
@@ -75,6 +77,7 @@ Features:
* [client-publish-codec-pcma](examples/client-publish-codec-pcma/main.go)
* [client-publish-codec-pcmu](examples/client-publish-codec-pcmu/main.go)
* [client-publish-codec-vp8](examples/client-publish-codec-vp8/main.go)
* [client-publish-codec-vp9](examples/client-publish-codec-vp9/main.go)
* [client-publish-options](examples/client-publish-options/main.go)
* [client-publish-pause](examples/client-publish-pause/main.go)
* [server](examples/server/main.go)

View File

@@ -23,7 +23,7 @@ func main() {
log.Println("Waiting for a RTP/VP8 stream on UDP port 9000 - you can send one with GStreamer:\n" +
"gst-launch-1.0 videotestsrc ! video/x-raw,width=1920,height=1080" +
" ! vp8enc cpu-used=8" +
" ! vp8enc cpu-used=8 deadline=1" +
" ! rtpvp8pay ! udpsink host=127.0.0.1 port=9000")
// wait for first packet

View File

@@ -0,0 +1,71 @@
package main
import (
"log"
"net"
"github.com/aler9/gortsplib"
"github.com/pion/rtp"
)
// This example shows how to
// 1. generate RTP/VP9 packets with GStreamer
// 2. connect to a RTSP server, announce an VP9 track
// 3. route the packets from GStreamer to the server
func main() {
// open a listener to receive RTP/VP9 packets
pc, err := net.ListenPacket("udp", "localhost:9000")
if err != nil {
panic(err)
}
defer pc.Close()
log.Println("Waiting for a RTP/VP9 stream on UDP port 9000 - you can send one with GStreamer:\n" +
"gst-launch-1.0 videotestsrc ! video/x-raw,width=1920,height=1080" +
" ! vp9enc cpu-used=8 deadline=1" +
" ! rtpvp9pay ! udpsink host=127.0.0.1 port=9000")
// wait for first packet
buf := make([]byte, 2048)
n, _, err := pc.ReadFrom(buf)
if err != nil {
panic(err)
}
log.Println("stream connected")
// create a VP9 track
track := &gortsplib.TrackVP9{
PayloadType: 96,
}
// connect to the server and start publishing the track
c := gortsplib.Client{}
err = c.StartPublishing("rtsp://localhost:8554/mystream",
gortsplib.Tracks{track})
if err != nil {
panic(err)
}
defer c.Close()
var pkt rtp.Packet
for {
// parse RTP packet
err = pkt.Unmarshal(buf[:n])
if err != nil {
panic(err)
}
// route RTP packet to the server
err = c.WritePacketRTP(0, &pkt)
if err != nil {
panic(err)
}
// read another RTP packet from source
n, _, err = pc.ReadFrom(buf)
if err != nil {
panic(err)
}
}
}

View File

@@ -36,30 +36,26 @@ func main() {
}
// find the Opus track
opusTrack, opusTrackID := func() (*gortsplib.TrackOpus, int) {
for i, track := range tracks {
track := func() *gortsplib.TrackOpus {
for _, track := range tracks {
if tt, ok := track.(*gortsplib.TrackOpus); ok {
return tt, i
return tt
}
}
return nil, -1
return nil
}()
if opusTrack == nil {
if track == nil {
panic("Opus track not found")
}
// setup decoder
dec := &rtpopus.Decoder{
SampleRate: opusTrack.SampleRate,
SampleRate: track.SampleRate,
}
dec.Init()
// called when a RTP packet arrives
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
if ctx.TrackID != opusTrackID {
return
}
// decode an Opus packet from the RTP packet
op, _, err := dec.Decode(ctx.Packet)
if err != nil {
@@ -70,8 +66,8 @@ func main() {
log.Printf("received Opus packet of size %d\n", len(op))
}
// setup and read all tracks
err = c.SetupAndPlay(tracks, baseURL)
// setup and read the Opus track only
err = c.SetupAndPlay(gortsplib.Tracks{track}, baseURL)
if err != nil {
panic(err)
}

View File

@@ -36,15 +36,15 @@ func main() {
}
// find the VP8 track
vp8Track, vp8TrackID := func() (*gortsplib.TrackVP8, int) {
for i, track := range tracks {
track := func() *gortsplib.TrackVP8 {
for _, track := range tracks {
if tt, ok := track.(*gortsplib.TrackVP8); ok {
return tt, i
return tt
}
}
return nil, -1
return nil
}()
if vp8Track == nil {
if track == nil {
panic("VP8 track not found")
}
@@ -54,10 +54,6 @@ func main() {
// called when a RTP packet arrives
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
if ctx.TrackID != vp8TrackID {
return
}
// decode a VP8 frame from the RTP packet
vf, _, err := dec.Decode(ctx.Packet)
if err != nil {
@@ -67,8 +63,8 @@ func main() {
log.Printf("received frame of size %d\n", len(vf))
}
// setup and read all tracks
err = c.SetupAndPlay(tracks, baseURL)
// setup and read the VP8 track only
err = c.SetupAndPlay(gortsplib.Tracks{track}, baseURL)
if err != nil {
panic(err)
}

View File

@@ -0,0 +1,74 @@
package main
import (
"log"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/rtpvp9"
"github.com/aler9/gortsplib/pkg/url"
)
// This example shows how to
// 1. connect to a RTSP server and read all tracks on a path
// 2. check if there's an VP9 track
// 3. get access units of that track
func main() {
c := gortsplib.Client{}
// parse URL
u, err := url.Parse("rtsp://localhost:8554/mystream")
if err != nil {
panic(err)
}
// connect to the server
err = c.Start(u.Scheme, u.Host)
if err != nil {
panic(err)
}
defer c.Close()
// find published tracks
tracks, baseURL, _, err := c.Describe(u)
if err != nil {
panic(err)
}
// find the VP9 track
track := func() *gortsplib.TrackVP9 {
for _, track := range tracks {
if tt, ok := track.(*gortsplib.TrackVP9); ok {
return tt
}
}
return nil
}()
if track == nil {
panic("VP9 track not found")
}
// setup decoder
dec := &rtpvp9.Decoder{}
dec.Init()
// called when a RTP packet arrives
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
// decode a VP9 frame from the RTP packet
vf, _, err := dec.Decode(ctx.Packet)
if err != nil {
return
}
log.Printf("received frame of size %d\n", len(vf))
}
// setup and read the VP9 track only
err = c.SetupAndPlay(gortsplib.Tracks{track}, baseURL)
if err != nil {
panic(err)
}
// wait until a fatal error
panic(c.Wait())
}

View File

@@ -34,7 +34,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
}
if vpkt.PID != 0 {
return nil, 0, fmt.Errorf("packets containing single partitions are not supported (yet)")
return nil, 0, fmt.Errorf("packets containing single partitions are not supported")
}
if vpkt.S == 1 {

View File

@@ -2,5 +2,5 @@
package rtpvp8
const (
rtpClockRate = 90000 // vp8 always uses 90khz
rtpClockRate = 90000 // VP8 always uses 90khz
)

71
pkg/rtpvp9/decoder.go Normal file
View File

@@ -0,0 +1,71 @@
package rtpvp9
import (
"errors"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// ErrMorePacketsNeeded is returned when more packets are needed.
var ErrMorePacketsNeeded = errors.New("need more packets")
// Decoder is a RTP/VP9 decoder.
type Decoder struct {
timeDecoder *rtptimedec.Decoder
fragments [][]byte
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(rtpClockRate)
}
// Decode decodes a VP9 frame from a RTP/VP9 packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
var vpkt codecs.VP9Packet
_, err := vpkt.Unmarshal(pkt.Payload)
if err != nil {
return nil, 0, err
}
if vpkt.B {
d.fragments = d.fragments[:0]
if vpkt.E {
return vpkt.Payload, d.timeDecoder.Decode(pkt.Timestamp), nil
}
d.fragments = append(d.fragments, vpkt.Payload)
return nil, 0, ErrMorePacketsNeeded
}
if len(d.fragments) == 0 {
return nil, 0, fmt.Errorf("received a non-starting fragment")
}
d.fragments = append(d.fragments, vpkt.Payload)
if !vpkt.E {
return nil, 0, ErrMorePacketsNeeded
}
n := 0
for _, frag := range d.fragments {
n += len(frag)
}
frame := make([]byte, n)
pos := 0
for _, frag := range d.fragments {
pos += copy(frame[pos:], frag)
}
d.fragments = d.fragments[:0]
return frame, d.timeDecoder.Decode(pkt.Timestamp), nil
}

134
pkg/rtpvp9/decoder_test.go Normal file
View File

@@ -0,0 +1,134 @@
package rtpvp9
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
frame []byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
[]byte{0x01, 0x02, 0x03, 0x04},
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289528607,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x9c, 0xb5, 0xaf, 0x01, 0x02, 0x03, 0x04},
},
},
},
{
"fragmented",
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 4096/4),
55 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x98, 0xb5, 0xaf}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364), []byte{0x01}),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x90, 0xb5, 0xaf, 0x02, 0x03, 0x04},
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 363), []byte{0x01, 0x02}),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x94, 0xb5, 0xaf, 0x03, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 295)),
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x9c, 0xb5, 0xaf, 0x01, 0x02, 0x03, 0x04},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var frame []byte
for _, pkt := range ca.pkts {
var pts time.Duration
frame, pts, err = d.Decode(pkt)
if err == ErrMorePacketsNeeded {
continue
}
require.NoError(t, err)
require.Equal(t, ca.pts, pts)
}
require.Equal(t, ca.frame, frame)
})
}
}

111
pkg/rtpvp9/encoder.go Normal file
View File

@@ -0,0 +1,111 @@
package rtpvp9
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/VP9 encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
// initial picture ID of frames (optional).
// It defaults to a random value.
InitialPictureID *uint16
sequenceNumber uint16
vp codecs.VP9Payloader
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
if e.InitialPictureID == nil {
v := uint16(randUint32())
e.InitialPictureID = &v
}
e.sequenceNumber = *e.InitialSequenceNumber
e.vp.InitialPictureIDFn = func() uint16 {
return *e.InitialPictureID
}
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*rtpClockRate)
}
// Encode encodes a VP9 frame into RTP/VP9 packets.
func (e *Encoder) Encode(frame []byte, pts time.Duration) ([]*rtp.Packet, error) {
payloads := e.vp.Payload(uint16(e.PayloadMaxSize), frame)
if payloads == nil {
return nil, fmt.Errorf("payloader failed")
}
plen := len(payloads)
ret := make([]*rtp.Packet, plen)
for i, payload := range payloads {
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: i == (plen - 1),
},
Payload: payload,
}
e.sequenceNumber++
}
return ret, nil
}

View File

@@ -0,0 +1,48 @@
package rtpvp9
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
InitialPictureID: func() *uint16 {
v := uint16(0x35af)
return &v
}(),
}
e.Init()
pkts, err := e.Encode(ca.frame, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

6
pkg/rtpvp9/rtpvp9.go Normal file
View File

@@ -0,0 +1,6 @@
// Package rtpvp9 contains a RTP/VP9 decoder and encoder.
package rtpvp9
const (
rtpClockRate = 90000 // VP9 always uses 90khz
)