Add media/oggreader

Start oggreader, this allows a user to demux an oggfile so they can
easily play files from disk.

Resolves: #1133

Co-authored-by: Somers Matthews <somersbmatthews@gmail.com>
Co-authored-by: funvit <funvit@gmail.com>
This commit is contained in:
Sean DuBois
2020-06-14 06:06:08 -05:00
parent cc8d80178c
commit bbbd820e15
6 changed files with 445 additions and 55 deletions

View File

@@ -162,6 +162,8 @@ Check out the **[contributing wiki](https://github.com/pion/webrtc/wiki/Contribu
* [cnderrauber](https://github.com/cnderrauber)
* [Juliusz Chroboczek](https://github.com/jech)
* [John Berthels](https://github.com/jbert)
* [Somers Matthews](https://github.com/somersbmatthews)
* [Vitaliy F](https://github.com/funvit)
### License
MIT License - see [LICENSE](LICENSE) for full text

View File

@@ -1,10 +1,11 @@
# play-from-disk
play-from-disk demonstrates how to send video to your browser from a file saved to disk.
play-from-disk demonstrates how to send video and/or audio to your browser from flise saved to disk.
## Instructions
### Create IVF named `output.ivf` that contains a VP8 track
### Create IVF named `output.ivf` that contains a VP8 track and/or `output.ogg` that contains a Opus track
```
ffmpeg -i $INPUT_FILE -g 30 output.ivf
ffmpeg -i $INPUT_FILE -c:a libopus -page_duration 20000 -vn output.ogg
```
### Download play-from-disk
@@ -13,7 +14,7 @@ go get github.com/pion/webrtc/v2/examples/play-from-disk
```
### Open play-from-disk example page
[jsfiddle.net](https://jsfiddle.net/z7ms3u5r/) you should see two text-areas and a 'Start Session' button
[jsfiddle.net](https://jsfiddle.net/y16Ljznr/) you should see two text-areas and a 'Start Session' button
### Run play-from-disk with your browsers SessionDescription as stdin
The `output.ivf` you created should be in the same directory as `play-from-disk`. In the jsfiddle the top textarea is your browser, copy that and:

View File

@@ -21,8 +21,10 @@ pc.onicecandidate = event => {
}
}
// Offer to receive 1 audio, and 2 video tracks
// Offer to receive 1 audio, and 1 video track
pc.addTransceiver('video', {'direction': 'sendrecv'})
pc.addTransceiver('audio', {'direction': 'sendrecv'})
pc.createOffer().then(d => pc.setLocalDescription(d)).catch(log)
window.startSession = () => {

View File

@@ -12,9 +12,26 @@ import (
"github.com/pion/webrtc/v2/examples/internal/signal"
"github.com/pion/webrtc/v2/pkg/media"
"github.com/pion/webrtc/v2/pkg/media/ivfreader"
"github.com/pion/webrtc/v2/pkg/media/oggreader"
)
const (
audioFileName = "output.ogg"
videoFileName = "output.ivf"
)
func main() {
// Assert that we have an audio or video file
_, err := os.Stat(videoFileName)
haveVideoFile := !os.IsNotExist(err)
_, err = os.Stat(audioFileName)
haveAudioFile := !os.IsNotExist(err)
if !haveAudioFile && !haveVideoFile {
panic("Could not find `" + audioFileName + "` or `" + videoFileName + "`")
}
// Wait for the offer to be pasted
offer := webrtc.SessionDescription{}
signal.Decode(signal.MustReadStdin(), &offer)
@@ -22,24 +39,10 @@ func main() {
// We make our own mediaEngine so we can place the sender's codecs in it. This because we must use the
// dynamic media type from the sender in our answer. This is not required if we are the offerer
mediaEngine := webrtc.MediaEngine{}
err := mediaEngine.PopulateFromSDP(offer)
if err != nil {
if err = mediaEngine.PopulateFromSDP(offer); err != nil {
panic(err)
}
// Search for VP8 Payload type. If the offer doesn't support VP8 exit since
// since they won't be able to decode anything we send them
var payloadType uint8
for _, videoCodec := range mediaEngine.GetCodecsByKind(webrtc.RTPCodecTypeVideo) {
if videoCodec.Name == "VP8" {
payloadType = videoCodec.PayloadType
break
}
}
if payloadType == 0 {
panic("Remote peer does not support VP8")
}
// Create a new RTCPeerConnection
api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine))
peerConnection, err := api.NewPeerConnection(webrtc.Configuration{
@@ -52,52 +55,107 @@ func main() {
if err != nil {
panic(err)
}
// Create a video track
videoTrack, err := peerConnection.NewTrack(payloadType, rand.Uint32(), "video", "pion")
if err != nil {
panic(err)
}
if _, err = peerConnection.AddTrack(videoTrack); err != nil {
panic(err)
}
iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background())
go func() {
// Open a IVF file and start reading using our IVFReader
file, ivfErr := os.Open("output.ivf")
if ivfErr != nil {
panic(ivfErr)
if haveVideoFile {
// Create a video track
videoTrack, addTrackErr := peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeVideo, "VP8"), rand.Uint32(), "video", "pion")
if addTrackErr != nil {
panic(addTrackErr)
}
if _, addTrackErr = peerConnection.AddTrack(videoTrack); err != nil {
panic(addTrackErr)
}
ivf, header, ivfErr := ivfreader.NewWith(file)
if ivfErr != nil {
panic(ivfErr)
}
// Wait for connection established
<-iceConnectedCtx.Done()
// Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
// This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
sleepTime := time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000)
for {
frame, _, ivfErr := ivf.ParseNextFrame()
if ivfErr == io.EOF {
fmt.Printf("All frames parsed and sent")
os.Exit(0)
}
go func() {
// Open a IVF file and start reading using our IVFReader
file, ivfErr := os.Open(videoFileName)
if ivfErr != nil {
panic(ivfErr)
}
time.Sleep(sleepTime)
if ivfErr = videoTrack.WriteSample(media.Sample{Data: frame, Samples: 90000}); ivfErr != nil {
ivf, header, ivfErr := ivfreader.NewWith(file)
if ivfErr != nil {
panic(ivfErr)
}
// Wait for connection established
<-iceConnectedCtx.Done()
// Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
// This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
sleepTime := time.Millisecond * time.Duration((float32(header.TimebaseNumerator)/float32(header.TimebaseDenominator))*1000)
for {
frame, _, ivfErr := ivf.ParseNextFrame()
if ivfErr == io.EOF {
fmt.Printf("All video frames parsed and sent")
os.Exit(0)
}
if ivfErr != nil {
panic(ivfErr)
}
time.Sleep(sleepTime)
if ivfErr = videoTrack.WriteSample(media.Sample{Data: frame, Samples: 90000}); ivfErr != nil {
panic(ivfErr)
}
}
}()
}
if haveAudioFile {
// Create a audio track
audioTrack, addTrackErr := peerConnection.NewTrack(getPayloadType(mediaEngine, webrtc.RTPCodecTypeAudio, "opus"), rand.Uint32(), "audio", "pion")
if addTrackErr != nil {
panic(addTrackErr)
}
}()
if _, addTrackErr = peerConnection.AddTrack(audioTrack); err != nil {
panic(addTrackErr)
}
go func() {
// Open a IVF file and start reading using our IVFReader
file, oggErr := os.Open(audioFileName)
if oggErr != nil {
panic(oggErr)
}
// Open on oggfile in non-checksum mode.
ogg, _, oggErr := oggreader.NewWith(file)
if oggErr != nil {
panic(oggErr)
}
// Wait for connection established
<-iceConnectedCtx.Done()
// Keep track of last granule, the difference is the amount of samples in the buffer
var lastGranule uint64
for {
pageData, pageHeader, oggErr := ogg.ParseNextPage()
if oggErr == io.EOF {
fmt.Printf("All audio pages parsed and sent")
os.Exit(0)
}
if oggErr != nil {
panic(oggErr)
}
// The amount of samples is the difference between the last and current timestamp
sampleCount := float64((pageHeader.GranulePosition - lastGranule))
lastGranule = pageHeader.GranulePosition
if oggErr = audioTrack.WriteSample(media.Sample{Data: pageData, Samples: uint32(sampleCount)}); oggErr != nil {
panic(oggErr)
}
// Convert seconds to Milliseconds, Sleep doesn't accept floats
time.Sleep(time.Duration((sampleCount/48000)*1000) * time.Millisecond)
}
}()
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
@@ -130,3 +188,15 @@ func main() {
// Block forever
select {}
}
// Search for Codec PayloadType
//
// Since we are answering we need to match the remote PayloadType
func getPayloadType(m webrtc.MediaEngine, codecType webrtc.RTPCodecType, codecName string) uint8 {
for _, codec := range m.GetCodecsByKind(codecType) {
if codec.Name == codecName {
return codec.PayloadType
}
}
panic(fmt.Sprintf("Remote peer does not support %s", codecName))
}

View File

@@ -0,0 +1,215 @@
// Package oggreader implements the Ogg media container reader
package oggreader
import (
"encoding/binary"
"errors"
"io"
)
const (
pageHeaderTypeBeginningOfStream = 0x02
pageHeaderSignature = "OggS"
idPageSignature = "OpusHead"
pageHeaderLen = 27
idPagePayloadLength = 19
)
var (
errNilStream = errors.New("stream is nil")
errBadIDPageSignature = errors.New("bad header signature")
errBadIDPageType = errors.New("wrong header, expected beginning of stream")
errBadIDPageLength = errors.New("payload for id page must be 19 bytes")
errBadIDPagePayloadSignature = errors.New("bad payload signature")
errShortPageHeader = errors.New("not enough data for payload header")
errChecksumMismatch = errors.New("expected and actual checksum do not match")
)
// OggReader is used to read Ogg files and return page payloads
type OggReader struct {
stream io.ReadSeeker
bytesReadSuccesfully int64
checksumTable *[256]uint32
doChecksum bool
}
// OggHeader is the metadata from the first two pages
// in the file (ID and Comment)
//
// https://tools.ietf.org/html/rfc7845.html#section-3
type OggHeader struct {
ChannelMap uint8
Channels uint8
OutputGain uint16
PreSkip uint16
SampleRate uint32
Version uint8
}
// OggPageHeader is the metadata for a Page
// Pages are the fundamental unit of multiplexing in an Ogg stream
//
// https://tools.ietf.org/html/rfc7845.html#section-1
type OggPageHeader struct {
GranulePosition uint64
sig [4]byte
version uint8
headerType uint8
serial uint32
index uint32
segmentsCount uint8
}
// NewWith returns a new Ogg reader and Ogg header
// with an io.ReadSeeker input
func NewWith(in io.ReadSeeker) (*OggReader, *OggHeader, error) {
return newWith(in /* doChecksum */, true)
}
func newWith(in io.ReadSeeker, doChecksum bool) (*OggReader, *OggHeader, error) {
if in == nil {
return nil, nil, errNilStream
}
reader := &OggReader{
stream: in,
checksumTable: generateChecksumTable(),
doChecksum: doChecksum,
}
header, err := reader.readHeaders()
if err != nil {
return nil, nil, err
}
return reader, header, nil
}
func (o *OggReader) readHeaders() (*OggHeader, error) {
payload, pageHeader, err := o.ParseNextPage()
if err != nil {
return nil, err
}
header := &OggHeader{}
if string(pageHeader.sig[:]) != pageHeaderSignature {
return nil, errBadIDPageSignature
}
if pageHeader.headerType != pageHeaderTypeBeginningOfStream {
return nil, errBadIDPageType
}
if len(payload) != idPagePayloadLength {
return nil, errBadIDPageLength
}
if s := string(payload[:8]); s != idPageSignature {
return nil, errBadIDPagePayloadSignature
}
header.Version = payload[8]
header.Channels = payload[9]
header.PreSkip = binary.LittleEndian.Uint16(payload[10:12])
header.SampleRate = binary.LittleEndian.Uint32(payload[12:16])
header.OutputGain = binary.LittleEndian.Uint16(payload[16:18])
header.ChannelMap = payload[18]
return header, nil
}
// ParseNextPage reads from stream and returns Ogg page payload, header,
// and an error if there is incomplete page data.
func (o *OggReader) ParseNextPage() ([]byte, *OggPageHeader, error) {
h := make([]byte, pageHeaderLen)
n, err := o.stream.Read(h)
if err != nil {
return nil, nil, err
} else if n < len(h) {
return nil, nil, errShortPageHeader
}
pageHeader := &OggPageHeader{
sig: [4]byte{h[0], h[1], h[2], h[3]},
}
pageHeader.version = h[4]
pageHeader.headerType = h[5]
pageHeader.GranulePosition = binary.LittleEndian.Uint64(h[6 : 6+8])
pageHeader.serial = binary.LittleEndian.Uint32(h[14 : 14+4])
pageHeader.index = binary.LittleEndian.Uint32(h[18 : 18+4])
pageHeader.segmentsCount = h[26]
sizeBuffer := make([]byte, pageHeader.segmentsCount)
if _, err = o.stream.Read(sizeBuffer); err != nil {
return nil, nil, err
}
payloadSize := 0
for _, s := range sizeBuffer {
payloadSize += int(s)
}
payload := make([]byte, payloadSize)
if _, err = o.stream.Read(payload); err != nil {
return nil, nil, err
}
if o.doChecksum {
var checksum uint32
updateChecksum := func(v byte) {
checksum = (checksum << 8) ^ o.checksumTable[byte(checksum>>24)^v]
}
for index := range h {
// Don't include expected checksum in our generation
if index > 21 && index < 26 {
updateChecksum(0)
continue
}
updateChecksum(h[index])
}
for _, s := range sizeBuffer {
updateChecksum(s)
}
for index := range payload {
updateChecksum(payload[index])
}
if binary.LittleEndian.Uint32(h[22:22+4]) != checksum {
return nil, nil, errChecksumMismatch
}
}
return payload, pageHeader, nil
}
// ResetReader resets the internal stream of OggReader. This is useful
// for live streams, where the end of the file might be read without the
// data being finished.
func (o *OggReader) ResetReader(reset func(bytesRead int64) io.ReadSeeker) {
o.stream = reset(o.bytesReadSuccesfully)
}
func generateChecksumTable() *[256]uint32 {
var table [256]uint32
const poly = 0x04c11db7
for i := range table {
r := uint32(i) << 24
for j := 0; j < 8; j++ {
if (r & 0x80000000) != 0 {
r = (r << 1) ^ poly
} else {
r <<= 1
}
table[i] = (r & 0xffffffff)
}
}
return &table
}

View File

@@ -0,0 +1,100 @@
package oggreader
import (
"bytes"
"io"
"testing"
"github.com/stretchr/testify/assert"
)
// buildOggFile generates a valid oggfile that can
// be used for tests
func buildOggContainer() []byte {
return []byte{
0x4f, 0x67, 0x67, 0x53, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x8e, 0x9b, 0x20, 0xaa, 0x00, 0x00,
0x00, 0x00, 0x61, 0xee, 0x61, 0x17, 0x01, 0x13, 0x4f, 0x70,
0x75, 0x73, 0x48, 0x65, 0x61, 0x64, 0x01, 0x02, 0x00, 0x0f,
0x80, 0xbb, 0x00, 0x00, 0x00, 0x00, 0x00, 0x4f, 0x67, 0x67,
0x53, 0x00, 0x00, 0xda, 0x93, 0xc2, 0xd9, 0x00, 0x00, 0x00,
0x00, 0x8e, 0x9b, 0x20, 0xaa, 0x02, 0x00, 0x00, 0x00, 0x49,
0x97, 0x03, 0x37, 0x01, 0x05, 0x98, 0x36, 0xbe, 0x88, 0x9e,
}
}
func TestOggReader_ParseValidHeader(t *testing.T) {
reader, header, err := NewWith(bytes.NewReader(buildOggContainer()))
assert.NoError(t, err)
assert.NotNil(t, reader)
assert.NotNil(t, header)
assert.EqualValues(t, header.ChannelMap, 0)
assert.EqualValues(t, header.Channels, 2)
assert.EqualValues(t, header.OutputGain, 0)
assert.EqualValues(t, header.PreSkip, 0xf00)
assert.EqualValues(t, header.SampleRate, 48000)
assert.EqualValues(t, header.Version, 1)
}
func TestOggReader_ParseNextPage(t *testing.T) {
ogg := bytes.NewReader(buildOggContainer())
reader, _, err := NewWith(ogg)
assert.NoError(t, err)
assert.NotNil(t, reader)
payload, _, err := reader.ParseNextPage()
assert.Equal(t, []byte{0x98, 0x36, 0xbe, 0x88, 0x9e}, payload)
assert.NoError(t, err)
_, _, err = reader.ParseNextPage()
assert.Equal(t, err, io.EOF)
}
func TestOggReader_ParseErrors(t *testing.T) {
t.Run("Assert that Reader isn't nil", func(t *testing.T) {
_, _, err := NewWith(nil)
assert.Equal(t, err, errNilStream)
})
t.Run("Invalid ID Page Header Signature", func(t *testing.T) {
ogg := buildOggContainer()
ogg[0] = 0
_, _, err := newWith(bytes.NewReader(ogg), false)
assert.Equal(t, err, errBadIDPageSignature)
})
t.Run("Invalid ID Page Header Type", func(t *testing.T) {
ogg := buildOggContainer()
ogg[5] = 0
_, _, err := newWith(bytes.NewReader(ogg), false)
assert.Equal(t, err, errBadIDPageType)
})
t.Run("Invalid ID Page Payload Length", func(t *testing.T) {
ogg := buildOggContainer()
ogg[27] = 0
_, _, err := newWith(bytes.NewReader(ogg), false)
assert.Equal(t, err, errBadIDPageLength)
})
t.Run("Invalid ID Page Payload Length", func(t *testing.T) {
ogg := buildOggContainer()
ogg[35] = 0
_, _, err := newWith(bytes.NewReader(ogg), false)
assert.Equal(t, err, errBadIDPagePayloadSignature)
})
t.Run("Invalid Page Checksum", func(t *testing.T) {
ogg := buildOggContainer()
ogg[22] = 0
_, _, err := NewWith(bytes.NewReader(ogg))
assert.Equal(t, err, errChecksumMismatch)
})
}