diff --git a/README.md b/README.md index feea3df9..2e61d297 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ Features: * Parse H264 elements and formats: RTP/H264, Annex-B, AVCC, anti-competition, DTS * Parse MPEG4-audio (AAC) elements and formats: RTP/MPEG4-audio, ADTS, MPEG4-audio configurations * Parse Opus elements: RTP/Opus + * Parse VP8 elements: RTP/Opus ## Table of contents @@ -63,6 +64,7 @@ Features: * [client-read-codec-h264-save-to-disk](examples/client-read-codec-h264-save-to-disk/main.go) * [client-read-codec-mpeg4audio](examples/client-read-codec-mpeg4audio/main.go) * [client-read-codec-opus](examples/client-read-codec-opus/main.go) +* [client-read-codec-vp8](examples/client-read-codec-vp8/main.go) * [client-read-partial](examples/client-read-partial/main.go) * [client-read-options](examples/client-read-options/main.go) * [client-read-pause](examples/client-read-pause/main.go) @@ -72,6 +74,7 @@ Features: * [client-publish-codec-opus](examples/client-publish-codec-opus/main.go) * [client-publish-codec-pcma](examples/client-publish-codec-pcma/main.go) * [client-publish-codec-pcmu](examples/client-publish-codec-pcmu/main.go) +* [client-publish-codec-vp8](examples/client-publish-codec-vp8/main.go) * [client-publish-options](examples/client-publish-options/main.go) * [client-publish-pause](examples/client-publish-pause/main.go) * [server](examples/server/main.go) diff --git a/examples/client-publish-codec-vp8/main.go b/examples/client-publish-codec-vp8/main.go new file mode 100644 index 00000000..2e353ec8 --- /dev/null +++ b/examples/client-publish-codec-vp8/main.go @@ -0,0 +1,71 @@ +package main + +import ( + "log" + "net" + + "github.com/aler9/gortsplib" + "github.com/pion/rtp" +) + +// This example shows how to +// 1. generate RTP/VP8 packets with GStreamer +// 2. connect to a RTSP server, announce an VP8 track +// 3. route the packets from GStreamer to the server + +func main() { + // open a listener to receive RTP/VP8 packets + pc, err := net.ListenPacket("udp", "localhost:9000") + if err != nil { + panic(err) + } + defer pc.Close() + + log.Println("Waiting for a RTP/VP8 stream on UDP port 9000 - you can send one with GStreamer:\n" + + "gst-launch-1.0 videotestsrc ! video/x-raw,width=1920,height=1080" + + " ! vp8enc cpu-used=8" + + " ! rtpvp8pay ! udpsink host=127.0.0.1 port=9000") + + // wait for first packet + buf := make([]byte, 2048) + n, _, err := pc.ReadFrom(buf) + if err != nil { + panic(err) + } + log.Println("stream connected") + + // create a VP8 track + track := &gortsplib.TrackVP8{ + PayloadType: 96, + } + + // connect to the server and start publishing the track + c := gortsplib.Client{} + err = c.StartPublishing("rtsp://localhost:8554/mystream", + gortsplib.Tracks{track}) + if err != nil { + panic(err) + } + defer c.Close() + + var pkt rtp.Packet + for { + // parse RTP packet + err = pkt.Unmarshal(buf[:n]) + if err != nil { + panic(err) + } + + // route RTP packet to the server + err = c.WritePacketRTP(0, &pkt) + if err != nil { + panic(err) + } + + // read another RTP packet from source + n, _, err = pc.ReadFrom(buf) + if err != nil { + panic(err) + } + } +} diff --git a/examples/client-read-codec-vp8/main.go b/examples/client-read-codec-vp8/main.go new file mode 100644 index 00000000..4238c6b8 --- /dev/null +++ b/examples/client-read-codec-vp8/main.go @@ -0,0 +1,78 @@ +package main + +import ( + "log" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/rtpvp8" + "github.com/aler9/gortsplib/pkg/url" +) + +// This example shows how to +// 1. connect to a RTSP server and read all tracks on a path +// 2. check if there's an VP8 track +// 3. get access units of that track + +func main() { + c := gortsplib.Client{} + + // parse URL + u, err := url.Parse("rtsp://localhost:8554/mystream") + if err != nil { + panic(err) + } + + // connect to the server + err = c.Start(u.Scheme, u.Host) + if err != nil { + panic(err) + } + defer c.Close() + + // find published tracks + tracks, baseURL, _, err := c.Describe(u) + if err != nil { + panic(err) + } + + // find the VP8 track + vp8Track, vp8TrackID := func() (*gortsplib.TrackVP8, int) { + for i, track := range tracks { + if tt, ok := track.(*gortsplib.TrackVP8); ok { + return tt, i + } + } + return nil, -1 + }() + if vp8Track == nil { + panic("VP8 track not found") + } + + // setup decoder + dec := &rtpvp8.Decoder{} + dec.Init() + + // called when a RTP packet arrives + c.OnPacketRTP = func(ctx *gortsplib.ClientOnPacketRTPCtx) { + if ctx.TrackID != vp8TrackID { + return + } + + // decode a VP8 frame from the RTP packet + vf, _, err := dec.Decode(ctx.Packet) + if err != nil { + return + } + + log.Printf("received frame of size %d\n", len(vf)) + } + + // setup and read all tracks + err = c.SetupAndPlay(tracks, baseURL) + if err != nil { + panic(err) + } + + // wait until a fatal error + panic(c.Wait()) +} diff --git a/pkg/rtpvp8/decoder.go b/pkg/rtpvp8/decoder.go new file mode 100644 index 00000000..956b5e89 --- /dev/null +++ b/pkg/rtpvp8/decoder.go @@ -0,0 +1,75 @@ +package rtpvp8 + +import ( + "errors" + "fmt" + "time" + + "github.com/pion/rtp" + "github.com/pion/rtp/codecs" + + "github.com/aler9/gortsplib/pkg/rtptimedec" +) + +// ErrMorePacketsNeeded is returned when more packets are needed. +var ErrMorePacketsNeeded = errors.New("need more packets") + +// Decoder is a RTP/VP8 decoder. +type Decoder struct { + timeDecoder *rtptimedec.Decoder + fragments [][]byte +} + +// Init initializes the decoder. +func (d *Decoder) Init() { + d.timeDecoder = rtptimedec.New(rtpClockRate) +} + +// Decode decodes a VP8 frame from a RTP/VP8 packet. +func (d *Decoder) Decode(pkt *rtp.Packet) ([]byte, time.Duration, error) { + var vpkt codecs.VP8Packet + _, err := vpkt.Unmarshal(pkt.Payload) + if err != nil { + return nil, 0, err + } + + if vpkt.PID != 0 { + return nil, 0, fmt.Errorf("packets containing single partitions are not supported (yet)") + } + + if vpkt.S == 1 { + d.fragments = d.fragments[:0] + + if pkt.Marker { + return vpkt.Payload, d.timeDecoder.Decode(pkt.Timestamp), nil + } + + d.fragments = append(d.fragments, vpkt.Payload) + return nil, 0, ErrMorePacketsNeeded + } + + if len(d.fragments) == 0 { + return nil, 0, fmt.Errorf("received a non-starting fragment") + } + + d.fragments = append(d.fragments, vpkt.Payload) + + if !pkt.Marker { + return nil, 0, ErrMorePacketsNeeded + } + + n := 0 + for _, frag := range d.fragments { + n += len(frag) + } + + frame := make([]byte, n) + pos := 0 + + for _, frag := range d.fragments { + pos += copy(frame[pos:], frag) + } + + d.fragments = d.fragments[:0] + return frame, d.timeDecoder.Decode(pkt.Timestamp), nil +} diff --git a/pkg/rtpvp8/decoder_test.go b/pkg/rtpvp8/decoder_test.go new file mode 100644 index 00000000..67822475 --- /dev/null +++ b/pkg/rtpvp8/decoder_test.go @@ -0,0 +1,133 @@ +package rtpvp8 + +import ( + "bytes" + "testing" + "time" + + "github.com/pion/rtp" + "github.com/stretchr/testify/require" +) + +func mergeBytes(vals ...[]byte) []byte { + size := 0 + for _, v := range vals { + size += len(v) + } + res := make([]byte, size) + + pos := 0 + for _, v := range vals { + n := copy(res[pos:], v) + pos += n + } + + return res +} + +var cases = []struct { + name string + frame []byte + pts time.Duration + pkts []*rtp.Packet +}{ + { + "single", + []byte{0x01, 0x02, 0x03, 0x04}, + 25 * time.Millisecond, + []*rtp.Packet{ + { + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 17645, + Timestamp: 2289528607, + SSRC: 0x9dbb7812, + }, + Payload: []byte{0x10, 0x01, 0x02, 0x03, 0x04}, + }, + }, + }, + { + "fragmented", + bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 4096/4), + 55 * time.Millisecond, + []*rtp.Packet{ + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17645, + Timestamp: 2289531307, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes([]byte{0x10}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364), []byte{0x01, 0x02, 0x03}), + }, + { + Header: rtp.Header{ + Version: 2, + Marker: false, + PayloadType: 96, + SequenceNumber: 17646, + Timestamp: 2289531307, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes([]byte{0x00, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 364), []byte{0x01, 0x02}), + }, + { + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 17647, + Timestamp: 2289531307, + SSRC: 0x9dbb7812, + }, + Payload: mergeBytes([]byte{0x00, 0x03, 0x04}, bytes.Repeat([]byte{0x01, 0x02, 0x03, 0x04}, 294)), + }, + }, + }, +} + +func TestDecode(t *testing.T) { + for _, ca := range cases { + t.Run(ca.name, func(t *testing.T) { + d := &Decoder{} + d.Init() + + // send an initial packet downstream + // in order to compute the right timestamp, + // that is relative to the initial packet + pkt := rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 17645, + Timestamp: 2289526357, + SSRC: 0x9dbb7812, + }, + Payload: []byte{0x10, 0x01, 0x02, 0x03, 0x04}, + } + _, _, err := d.Decode(&pkt) + require.NoError(t, err) + + var frame []byte + + for _, pkt := range ca.pkts { + var pts time.Duration + frame, pts, err = d.Decode(pkt) + if err == ErrMorePacketsNeeded { + continue + } + + require.NoError(t, err) + require.Equal(t, ca.pts, pts) + } + + require.Equal(t, ca.frame, frame) + }) + } +} diff --git a/pkg/rtpvp8/encoder.go b/pkg/rtpvp8/encoder.go new file mode 100644 index 00000000..58f6bac8 --- /dev/null +++ b/pkg/rtpvp8/encoder.go @@ -0,0 +1,99 @@ +package rtpvp8 + +import ( + "crypto/rand" + "fmt" + "time" + + "github.com/pion/rtp" + "github.com/pion/rtp/codecs" +) + +const ( + rtpVersion = 2 +) + +func randUint32() uint32 { + var b [4]byte + rand.Read(b[:]) + return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]) +} + +// Encoder is a RTP/VP8 encoder. +type Encoder struct { + // payload type of packets. + PayloadType uint8 + + // SSRC of packets (optional). + // It defaults to a random value. + SSRC *uint32 + + // initial sequence number of packets (optional). + // It defaults to a random value. + InitialSequenceNumber *uint16 + + // initial timestamp of packets (optional). + // It defaults to a random value. + InitialTimestamp *uint32 + + // maximum size of packet payloads (optional). + // It defaults to 1460. + PayloadMaxSize int + + sequenceNumber uint16 + vp codecs.VP8Payloader +} + +// Init initializes the encoder. +func (e *Encoder) Init() { + if e.SSRC == nil { + v := randUint32() + e.SSRC = &v + } + if e.InitialSequenceNumber == nil { + v := uint16(randUint32()) + e.InitialSequenceNumber = &v + } + if e.InitialTimestamp == nil { + v := randUint32() + e.InitialTimestamp = &v + } + if e.PayloadMaxSize == 0 { + e.PayloadMaxSize = 1460 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header) + } + + e.sequenceNumber = *e.InitialSequenceNumber +} + +func (e *Encoder) encodeTimestamp(ts time.Duration) uint32 { + return *e.InitialTimestamp + uint32(ts.Seconds()*rtpClockRate) +} + +// Encode encodes a VP8 frame into RTP/VP8 packets. +func (e *Encoder) Encode(frame []byte, pts time.Duration) ([]*rtp.Packet, error) { + payloads := e.vp.Payload(uint16(e.PayloadMaxSize), frame) + if payloads == nil { + return nil, fmt.Errorf("payloader failed") + } + + plen := len(payloads) + ret := make([]*rtp.Packet, plen) + + for i, payload := range payloads { + ret[i] = &rtp.Packet{ + Header: rtp.Header{ + Version: rtpVersion, + PayloadType: e.PayloadType, + SequenceNumber: e.sequenceNumber, + Timestamp: e.encodeTimestamp(pts), + SSRC: *e.SSRC, + Marker: i == (plen - 1), + }, + Payload: payload, + } + + e.sequenceNumber++ + } + + return ret, nil +} diff --git a/pkg/rtpvp8/encoder_test.go b/pkg/rtpvp8/encoder_test.go new file mode 100644 index 00000000..8bb6857a --- /dev/null +++ b/pkg/rtpvp8/encoder_test.go @@ -0,0 +1,44 @@ +package rtpvp8 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEncode(t *testing.T) { + for _, ca := range cases { + t.Run(ca.name, func(t *testing.T) { + e := &Encoder{ + PayloadType: 96, + SSRC: func() *uint32 { + v := uint32(0x9dbb7812) + return &v + }(), + InitialSequenceNumber: func() *uint16 { + v := uint16(0x44ed) + return &v + }(), + InitialTimestamp: func() *uint32 { + v := uint32(0x88776655) + return &v + }(), + } + e.Init() + + pkts, err := e.Encode(ca.frame, ca.pts) + require.NoError(t, err) + require.Equal(t, ca.pkts, pkts) + }) + } +} + +func TestEncodeRandomInitialState(t *testing.T) { + e := &Encoder{ + PayloadType: 96, + } + e.Init() + require.NotEqual(t, nil, e.SSRC) + require.NotEqual(t, nil, e.InitialSequenceNumber) + require.NotEqual(t, nil, e.InitialTimestamp) +} diff --git a/pkg/rtpvp8/rtpvp8.go b/pkg/rtpvp8/rtpvp8.go new file mode 100644 index 00000000..685c1d7e --- /dev/null +++ b/pkg/rtpvp8/rtpvp8.go @@ -0,0 +1,6 @@ +// Package rtpvp8 contains a RTP/VP8 decoder and encoder. +package rtpvp8 + +const ( + rtpClockRate = 90000 // vp8 always uses 90khz +)