add RTP/VP8 encoder and decoder (#149)

This commit is contained in:
Alessandro Ros
2022-11-14 17:38:58 +01:00
committed by GitHub
parent 0ad09c2184
commit 4d7abfc638
8 changed files with 509 additions and 0 deletions

View File

@@ -47,6 +47,7 @@ Features:
* Parse H264 elements and formats: RTP/H264, Annex-B, AVCC, anti-competition, DTS * Parse H264 elements and formats: RTP/H264, Annex-B, AVCC, anti-competition, DTS
* Parse MPEG4-audio (AAC) elements and formats: RTP/MPEG4-audio, ADTS, MPEG4-audio configurations * Parse MPEG4-audio (AAC) elements and formats: RTP/MPEG4-audio, ADTS, MPEG4-audio configurations
* Parse Opus elements: RTP/Opus * Parse Opus elements: RTP/Opus
* Parse VP8 elements: RTP/Opus
## Table of contents ## Table of contents
@@ -63,6 +64,7 @@ Features:
* [client-read-codec-h264-save-to-disk](examples/client-read-codec-h264-save-to-disk/main.go) * [client-read-codec-h264-save-to-disk](examples/client-read-codec-h264-save-to-disk/main.go)
* [client-read-codec-mpeg4audio](examples/client-read-codec-mpeg4audio/main.go) * [client-read-codec-mpeg4audio](examples/client-read-codec-mpeg4audio/main.go)
* [client-read-codec-opus](examples/client-read-codec-opus/main.go) * [client-read-codec-opus](examples/client-read-codec-opus/main.go)
* [client-read-codec-vp8](examples/client-read-codec-vp8/main.go)
* [client-read-partial](examples/client-read-partial/main.go) * [client-read-partial](examples/client-read-partial/main.go)
* [client-read-options](examples/client-read-options/main.go) * [client-read-options](examples/client-read-options/main.go)
* [client-read-pause](examples/client-read-pause/main.go) * [client-read-pause](examples/client-read-pause/main.go)
@@ -72,6 +74,7 @@ Features:
* [client-publish-codec-opus](examples/client-publish-codec-opus/main.go) * [client-publish-codec-opus](examples/client-publish-codec-opus/main.go)
* [client-publish-codec-pcma](examples/client-publish-codec-pcma/main.go) * [client-publish-codec-pcma](examples/client-publish-codec-pcma/main.go)
* [client-publish-codec-pcmu](examples/client-publish-codec-pcmu/main.go) * [client-publish-codec-pcmu](examples/client-publish-codec-pcmu/main.go)
* [client-publish-codec-vp8](examples/client-publish-codec-vp8/main.go)
* [client-publish-options](examples/client-publish-options/main.go) * [client-publish-options](examples/client-publish-options/main.go)
* [client-publish-pause](examples/client-publish-pause/main.go) * [client-publish-pause](examples/client-publish-pause/main.go)
* [server](examples/server/main.go) * [server](examples/server/main.go)

View File

@@ -0,0 +1,71 @@
package main
import (
"log"
"net"
"github.com/aler9/gortsplib"
"github.com/pion/rtp"
)
// This example shows how to
// 1. generate RTP/VP8 packets with GStreamer
// 2. connect to a RTSP server, announce an VP8 track
// 3. route the packets from GStreamer to the server
func main() {
// open a listener to receive RTP/VP8 packets
pc, err := net.ListenPacket("udp", "localhost:9000")
if err != nil {
panic(err)
}
defer pc.Close()
log.Println("Waiting for a RTP/VP8 stream on UDP port 9000 - you can send one with GStreamer:\n" +
"gst-launch-1.0 videotestsrc ! video/x-raw,width=1920,height=1080" +
" ! vp8enc cpu-used=8" +
" ! rtpvp8pay ! udpsink host=127.0.0.1 port=9000")
// wait for first packet
buf := make([]byte, 2048)
n, _, err := pc.ReadFrom(buf)
if err != nil {
panic(err)
}
log.Println("stream connected")
// create a VP8 track
track := &gortsplib.TrackVP8{
PayloadType: 96,
}
// connect to the server and start publishing the track
c := gortsplib.Client{}
err = c.StartPublishing("rtsp://localhost:8554/mystream",
gortsplib.Tracks{track})
if err != nil {
panic(err)
}
defer c.Close()
var pkt rtp.Packet
for {
// parse RTP packet
err = pkt.Unmarshal(buf[:n])
if err != nil {
panic(err)
}
// route RTP packet to the server
err = c.WritePacketRTP(0, &pkt)
if err != nil {
panic(err)
}
// read another RTP packet from source
n, _, err = pc.ReadFrom(buf)
if err != nil {
panic(err)
}
}
}

View File

@@ -0,0 +1,78 @@
package main
import (
"log"
"github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/rtpvp8"
"github.com/aler9/gortsplib/pkg/url"
)
// This example shows how to
// 1. connect to a RTSP server and read all tracks on a path
// 2. check if there's an VP8 track
// 3. get access units of that track
func main() {
c := gortsplib.Client{}
// parse URL
u, err := url.Parse("rtsp://localhost:8554/mystream")
if err != nil {
panic(err)
}
// connect to the server
err = c.Start(u.Scheme, u.Host)
if err != nil {
panic(err)
}
defer c.Close()
// find published tracks
tracks, baseURL, _, err := c.Describe(u)
if err != nil {
panic(err)
}
// find the VP8 track
vp8Track, vp8TrackID := func() (*gortsplib.TrackVP8, int) {
for i, track := range tracks {
if tt, ok := track.(*gortsplib.TrackVP8); ok {
return tt, i
}
}
return nil, -1
}()
if vp8Track == nil {
panic("VP8 track not found")
}
// setup decoder
dec := &rtpvp8.Decoder{}
dec.Init()
// called when a RTP packet arrives
c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) {
if ctx.TrackID != vp8TrackID {
return
}
// decode a VP8 frame from the RTP packet
vf, _, err := dec.Decode(ctx.Packet)
if err != nil {
return
}
log.Printf("received frame of size %d\n", len(vf))
}
// setup and read all tracks
err = c.SetupAndPlay(tracks, baseURL)
if err != nil {
panic(err)
}
// wait until a fatal error
panic(c.Wait())
}

75
pkg/rtpvp8/decoder.go Normal file
View File

@@ -0,0 +1,75 @@
package rtpvp8
import (
"errors"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/aler9/gortsplib/pkg/rtptimedec"
)
// ErrMorePacketsNeeded is returned when more packets are needed.
var ErrMorePacketsNeeded = errors.New("need more packets")
// Decoder is a RTP/VP8 decoder.
type Decoder struct {
timeDecoder *rtptimedec.Decoder
fragments [][]byte
}
// Init initializes the decoder.
func (d *Decoder) Init() {
d.timeDecoder = rtptimedec.New(rtpClockRate)
}
// Decode decodes a VP8 frame from a RTP/VP8 packet.
func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) {
var vpkt codecs.VP8Packet
_, err := vpkt.Unmarshal(pkt.Payload)
if err != nil {
return nil, 0, err
}
if vpkt.PID != 0 {
return nil, 0, fmt.Errorf("packets containing single partitions are not supported (yet)")
}
if vpkt.S == 1 {
d.fragments = d.fragments[:0]
if pkt.Marker {
return vpkt.Payload, d.timeDecoder.Decode(pkt.Timestamp), nil
}
d.fragments = append(d.fragments, vpkt.Payload)
return nil, 0, ErrMorePacketsNeeded
}
if len(d.fragments) == 0 {
return nil, 0, fmt.Errorf("received a non-starting fragment")
}
d.fragments = append(d.fragments, vpkt.Payload)
if !pkt.Marker {
return nil, 0, ErrMorePacketsNeeded
}
n := 0
for _, frag := range d.fragments {
n += len(frag)
}
frame := make([]byte, n)
pos := 0
for _, frag := range d.fragments {
pos += copy(frame[pos:], frag)
}
d.fragments = d.fragments[:0]
return frame, d.timeDecoder.Decode(pkt.Timestamp), nil
}

133
pkg/rtpvp8/decoder_test.go Normal file
View File

@@ -0,0 +1,133 @@
package rtpvp8
import (
"bytes"
"testing"
"time"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
frame []byte
pts time.Duration
pkts []*rtp.Packet
}{
{
"single",
[]byte{0x01, 0x02, 0x03, 0x04},
25 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289528607,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x10, 0x01, 0x02, 0x03, 0x04},
},
},
},
{
"fragmented",
bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 4096/4),
55 * time.Millisecond,
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x10}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364), []byte{0x01, 0x02, 0x03}),
},
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17646,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x00, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364), []byte{0x01, 0x02}),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17647,
Timestamp: 2289531307,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes([]byte{0x00, 0x03, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 294)),
},
},
},
}
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
d.Init()
// send an initial packet downstream
// in order to compute the right timestamp,
// that is relative to the initial packet
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
Timestamp: 2289526357,
SSRC: 0x9dbb7812,
},
Payload: []byte{0x10, 0x01, 0x02, 0x03, 0x04},
}
_, _, err := d.Decode(&pkt)
require.NoError(t, err)
var frame []byte
for _, pkt := range ca.pkts {
var pts time.Duration
frame, pts, err = d.Decode(pkt)
if err == ErrMorePacketsNeeded {
continue
}
require.NoError(t, err)
require.Equal(t, ca.pts, pts)
}
require.Equal(t, ca.frame, frame)
})
}
}

99
pkg/rtpvp8/encoder.go Normal file
View File

@@ -0,0 +1,99 @@
package rtpvp8
import (
"crypto/rand"
"fmt"
"time"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
)
const (
rtpVersion = 2
)
func randUint32() uint32 {
var b [4]byte
rand.Read(b[:])
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3])
}
// Encoder is a RTP/VP8 encoder.
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// initial timestamp of packets (optional).
// It defaults to a random value.
InitialTimestamp *uint32
// maximum size of packet payloads (optional).
// It defaults to 1460.
PayloadMaxSize int
sequenceNumber uint16
vp codecs.VP8Payloader
}
// Init initializes the encoder.
func (e *Encoder) Init() {
if e.SSRC == nil {
v := randUint32()
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v := uint16(randUint32())
e.InitialSequenceNumber = &v
}
if e.InitialTimestamp == nil {
v := randUint32()
e.InitialTimestamp = &v
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header)
}
e.sequenceNumber = *e.InitialSequenceNumber
}
func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 {
return *e.InitialTimestamp + uint32(ts.Seconds()*rtpClockRate)
}
// Encode encodes a VP8 frame into RTP/VP8 packets.
func (e *Encoder) Encode(frame []byte, pts time.Duration) ([]*rtp.Packet, error) {
payloads := e.vp.Payload(uint16(e.PayloadMaxSize), frame)
if payloads == nil {
return nil, fmt.Errorf("payloader failed")
}
plen := len(payloads)
ret := make([]*rtp.Packet, plen)
for i, payload := range payloads {
ret[i] = &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
Timestamp: e.encodeTimestamp(pts),
SSRC: *e.SSRC,
Marker: i == (plen - 1),
},
Payload: payload,
}
e.sequenceNumber++
}
return ret, nil
}

View File

@@ -0,0 +1,44 @@
package rtpvp8
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: func() *uint32 {
v := uint32(0x9dbb7812)
return &v
}(),
InitialSequenceNumber: func() *uint16 {
v := uint16(0x44ed)
return &v
}(),
InitialTimestamp: func() *uint32 {
v := uint32(0x88776655)
return &v
}(),
}
e.Init()
pkts, err := e.Encode(ca.frame, ca.pts)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
}
e.Init()
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
require.NotEqual(t, nil, e.InitialTimestamp)
}

6
pkg/rtpvp8/rtpvp8.go Normal file
View File

@@ -0,0 +1,6 @@
// Package rtpvp8 contains a RTP/VP8 decoder and encoder.
package rtpvp8
const (
rtpClockRate = 90000 // vp8 always uses 90khz
)