Implementation of opuswriter

- Implemented a common "Writer" interface
- Made IVFWriter compliant to the new interface
- Implemented an OGG Opus writer
- Updated the save-file example
- Updated janus-gateway/streaming example
- Wrote a few unit tests for opus-writer
- Wrote a few unit tests for ivf-writer
- Fixed filenames
- Improved API to allow tests using an io.Writer
This commit is contained in:
Antoine Baché
2019-02-26 00:19:07 -08:00
committed by Antoine Baché
parent 6fd4ee67f4
commit 07e5c4e07e
7 changed files with 576 additions and 40 deletions

View File

@@ -6,9 +6,30 @@ import (
janus "github.com/notedit/janus-go"
"github.com/pions/webrtc"
"github.com/pions/webrtc/pkg/media"
"github.com/pions/webrtc/pkg/media/ivfwriter"
"github.com/pions/webrtc/pkg/media/opuswriter"
)
func saveToDisk(i media.Writer, track *webrtc.Track) {
defer func() {
if err := i.Close(); err != nil {
panic(err)
}
}()
for {
packet, err := track.ReadRTP()
if err != nil {
panic(err)
}
if err := i.AddPacket(packet); err != nil {
panic(err)
}
}
}
func watchHandle(handle *janus.Handle) {
// wait for event
for {
@@ -27,7 +48,6 @@ func watchHandle(handle *janus.Handle) {
}
}
}
func main() {
@@ -53,26 +73,21 @@ func main() {
})
peerConnection.OnTrack(func(track *webrtc.Track, receiver *webrtc.RTPReceiver) {
if track.Codec().Name == webrtc.Opus {
return
}
fmt.Println("Got VP8 track, saving to disk as output.ivf")
i, err := ivfwriter.New("output.ivf")
if err != nil {
panic(err)
}
for {
packet, err := track.ReadRTP()
codec := track.Codec()
if codec.Name == webrtc.Opus {
fmt.Println("Got Opus track, saving to disk as output.opus")
i, err := opuswriter.New("output.opus", codec.ClockRate, codec.Channels)
if err != nil {
panic(err)
}
err = i.AddPacket(packet)
saveToDisk(i, track)
} else if codec.Name == webrtc.VP8 {
fmt.Println("Got VP8 track, saving to disk as output.ivf")
i, err := ivfwriter.New("output.ivf")
if err != nil {
panic(err)
}
saveToDisk(i, track)
}
})

View File

@@ -6,13 +6,33 @@ import (
"github.com/pions/rtcp"
"github.com/pions/webrtc"
"github.com/pions/webrtc/pkg/media"
"github.com/pions/webrtc/pkg/media/ivfwriter"
"github.com/pions/webrtc/pkg/media/opuswriter"
"github.com/pions/webrtc/examples/internal/signal"
)
func main() {
func saveToDisk(i media.Writer, track *webrtc.Track) {
defer func() {
if err := i.Close(); err != nil {
panic(err)
}
}()
for {
packet, err := track.ReadRTP()
if err != nil {
panic(err)
}
if err := i.AddPacket(packet); err != nil {
panic(err)
}
}
}
func main() {
// Create a MediaEngine object to configure the supported codec
m := webrtc.MediaEngine{}
@@ -49,31 +69,28 @@ func main() {
go func() {
ticker := time.NewTicker(time.Second * 3)
for range ticker.C {
err := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC()})
if err != nil {
fmt.Println(err)
errSend := peerConnection.SendRTCP(&rtcp.PictureLossIndication{MediaSSRC: track.SSRC()})
if errSend != nil {
fmt.Println(errSend)
}
}
}()
if track.Codec().Name == webrtc.VP8 {
codec := track.Codec()
if codec.Name == webrtc.Opus {
fmt.Println("Got Opus track, saving to disk as output.opus")
i, err := opuswriter.New("output.opus", codec.ClockRate, codec.Channels)
if err != nil {
panic(err)
}
saveToDisk(i, track)
} else if codec.Name == webrtc.VP8 {
fmt.Println("Got VP8 track, saving to disk as output.ivf")
i, err := ivfwriter.New("output.ivf")
if err != nil {
panic(err)
}
for {
packet, err := track.ReadRTP()
if err != nil {
panic(err)
}
err = i.AddPacket(packet)
if err != nil {
panic(err)
}
}
saveToDisk(i, track)
}
})

View File

@@ -3,6 +3,7 @@ package ivfwriter
import (
"encoding/binary"
"fmt"
"io"
"os"
"github.com/pions/rtp"
@@ -11,6 +12,7 @@ import (
// IVFWriter is used to take RTP packets and write them to an IVF on disk
type IVFWriter struct {
stream io.Writer
fd *os.File
count uint64
currentFrame []byte
@@ -22,7 +24,30 @@ func New(fileName string) (*IVFWriter, error) {
if err != nil {
return nil, err
}
writer, err := NewWith(f)
if err != nil {
return nil, err
}
writer.fd = f
return writer, nil
}
// NewWith initialize a new IVF writer with an io.Writer output
func NewWith(out io.Writer) (*IVFWriter, error) {
if out == nil {
return nil, fmt.Errorf("file not opened")
}
writer := &IVFWriter{
stream: out,
}
if err := writer.writeHeader(); err != nil {
return nil, err
}
return writer, nil
}
func (i *IVFWriter) writeHeader() error {
header := make([]byte, 32)
copy(header[0:], []byte("DKIF")) // DKIF
binary.LittleEndian.PutUint16(header[4:], 0) // Version
@@ -35,15 +60,15 @@ func New(fileName string) (*IVFWriter, error) {
binary.LittleEndian.PutUint32(header[24:], 900) // Frame count
binary.LittleEndian.PutUint32(header[28:], 0) // Unused
if _, err := f.Write(header); err != nil {
return nil, err
}
return &IVFWriter{fd: f}, nil
_, err := i.stream.Write(header)
return err
}
// AddPacket adds a new packet and writes the appropriate headers for it
func (i *IVFWriter) AddPacket(packet *rtp.Packet) error {
if i.stream == nil {
return fmt.Errorf("file not opened")
}
vp8Packet := codecs.VP8Packet{}
_, err := vp8Packet.Unmarshal(packet)
@@ -56,7 +81,6 @@ func (i *IVFWriter) AddPacket(packet *rtp.Packet) error {
if !packet.Marker {
return nil
} else if len(i.currentFrame) == 0 {
fmt.Println("skipping")
return nil
}
@@ -66,12 +90,27 @@ func (i *IVFWriter) AddPacket(packet *rtp.Packet) error {
i.count++
if _, err := i.fd.Write(frameHeader); err != nil {
if _, err := i.stream.Write(frameHeader); err != nil {
return err
} else if _, err := i.fd.Write(i.currentFrame); err != nil {
} else if _, err := i.stream.Write(i.currentFrame); err != nil {
return err
}
i.currentFrame = nil
return nil
}
// Close stops the recording
func (i *IVFWriter) Close() error {
defer func() {
i.fd = nil
i.stream = nil
}()
if i.fd == nil {
// Returns no error has it may be convenient to call
// Close() multiple times
return nil
}
return i.fd.Close()
}

View File

@@ -0,0 +1,126 @@
package ivfwriter
import (
"bytes"
"fmt"
"io"
"testing"
"github.com/pions/rtp"
"github.com/stretchr/testify/assert"
)
type ivfWriterPacketTest struct {
buffer io.Writer
message string
messageClose string
packet *rtp.Packet
writer *IVFWriter
err error
closeErr error
}
var rawPkt = []byte{
0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64,
0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e,
}
var validPacket = &rtp.Packet{
Header: rtp.Header{
Marker: true,
Extension: true,
ExtensionProfile: 1,
ExtensionPayload: []byte{0xFF, 0xFF, 0xFF, 0xFF},
Version: 2,
PayloadOffset: 20,
PayloadType: 96,
SequenceNumber: 27023,
Timestamp: 3653407706,
SSRC: 476325762,
CSRC: []uint32{},
},
Payload: rawPkt[20:],
Raw: rawPkt,
}
func TestIVFWriter_AddPacketAndClose(t *testing.T) {
assert := assert.New(t)
// The linter misbehave and thinks this code is the same as the tests in opuswriter_test
// nolint:dupl
addPacketTestCase := []ivfWriterPacketTest{
{
buffer: &bytes.Buffer{},
message: "IVFWriter shouldn't be able to write something to a closed file",
messageClose: "IVFWriter should be able to close an already closed file",
packet: nil,
err: fmt.Errorf("file not opened"),
closeErr: nil,
},
{
buffer: &bytes.Buffer{},
message: "IVFWriter shouldn't be able to write something an empty packet",
messageClose: "IVFWriter should be able to close the file",
packet: &rtp.Packet{},
err: fmt.Errorf("Payload is not large enough to container header"),
closeErr: nil,
},
{
buffer: &bytes.Buffer{},
message: "IVFWriter should be able to write an IVF packet",
messageClose: "IVFWriter should be able to close the file",
packet: validPacket,
err: nil,
closeErr: nil,
},
{
buffer: nil,
message: "IVFWriter shouldn't be able to write something to a closed file",
messageClose: "IVFWriter should be able to close an already closed file",
packet: nil,
err: fmt.Errorf("file not opened"),
closeErr: nil,
},
}
// First test case has a 'nil' file descriptor
writer, err := NewWith(addPacketTestCase[0].buffer)
assert.Nil(err, "IVFWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
err = writer.Close()
assert.Nil(err, "IVFWriter should be able to close the stream")
writer.stream = nil
addPacketTestCase[0].writer = writer
// Second test tries to write an empty packet
writer, err = NewWith(addPacketTestCase[1].buffer)
assert.Nil(err, "IVFWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
addPacketTestCase[1].writer = writer
// Third test tries to write a valid VP8 packet
writer, err = NewWith(addPacketTestCase[2].buffer)
assert.Nil(err, "IVFWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
addPacketTestCase[2].writer = writer
// Fourth test tries to write to a nil stream
writer, err = NewWith(addPacketTestCase[3].buffer)
assert.NotNil(err, "IVFWriter shouldn't be created")
assert.Nil(writer, "Writer should be nil")
addPacketTestCase[3].writer = writer
for _, t := range addPacketTestCase {
if t.writer != nil {
res := t.writer.AddPacket(t.packet)
assert.Equal(res, t.err, t.message)
}
}
for _, t := range addPacketTestCase {
if t.writer != nil {
res := t.writer.Close()
assert.Equal(res, t.closeErr, t.messageClose)
}
}
}

View File

@@ -1,7 +1,21 @@
package media
import (
"github.com/pions/rtp"
)
// Sample contains media, and the amount of samples in it
type Sample struct {
Data []byte
Samples uint32
}
// Writer defines an interface to handle
// the creation of media files
type Writer interface {
// Add the content of an RTP packet to the media
AddPacket(packet *rtp.Packet) error
// Close the media
// Note: Close implementation must be idempotent
Close() error
}

View File

@@ -0,0 +1,199 @@
package opuswriter
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math/rand"
"os"
"github.com/pions/rtp"
"github.com/pions/rtp/codecs"
)
// OpusWriter is used to take RTP packets and write them to an OGG on disk
type OpusWriter struct {
stream io.Writer
fd *os.File
sampleRate uint32
channelCount uint16
serial uint32
pageIndex uint32
checksumTable *crc32.Table
previousGranulePosition uint64
previousTimestamp uint32
}
// New builds a new OGG Opus writer
func New(fileName string, sampleRate uint32, channelCount uint16) (*OpusWriter, error) {
f, err := os.Create(fileName)
if err != nil {
return nil, err
}
writer, err := NewWith(f, sampleRate, channelCount)
if err != nil {
return nil, err
}
writer.fd = f
return writer, nil
}
// NewWith initialize a new OGG Opus writer with an io.Writer output
func NewWith(out io.Writer, sampleRate uint32, channelCount uint16) (*OpusWriter, error) {
if out == nil {
return nil, fmt.Errorf("file not opened")
}
writer := &OpusWriter{
stream: out,
sampleRate: sampleRate,
channelCount: channelCount,
serial: rand.Uint32(),
checksumTable: crc32.MakeTable(0x04c11db7),
}
if err := writer.writeHeaders(); err != nil {
return nil, err
}
return writer, nil
}
/*
ref: https://tools.ietf.org/html/rfc7845.html
https://git.xiph.org/?p=opus-tools.git;a=blob;f=src/opus_header.c#l219
Page 0 Pages 1 ... n Pages (n+1) ...
+------------+ +---+ +---+ ... +---+ +-----------+ +---------+ +--
| | | | | | | | | | | | |
|+----------+| |+-----------------+| |+-------------------+ +-----
|||ID Header|| || Comment Header || ||Audio Data Packet 1| | ...
|+----------+| |+-----------------+| |+-------------------+ +-----
| | | | | | | | | | | | |
+------------+ +---+ +---+ ... +---+ +-----------+ +---------+ +--
^ ^ ^
| | |
| | Mandatory Page Break
| |
| ID header is contained on a single page
|
'Beginning Of Stream'
Figure 1: Example Packet Organization for a Logical Ogg Opus Stream
*/
func (i *OpusWriter) writeHeaders() error {
// ID Header
oggIDHeader := make([]byte, 19)
copy(oggIDHeader[0:], []byte("OpusHead")) // Magic Signature 'OpusHead'
oggIDHeader[8] = 1 // Version
oggIDHeader[9] = uint8(i.channelCount) // Channel count
binary.LittleEndian.PutUint16(oggIDHeader[10:], 0) // pre-skip, don't need to skip any value
binary.LittleEndian.PutUint32(oggIDHeader[12:], i.sampleRate) // original sample rate, any valid sample e.g 48000
binary.LittleEndian.PutUint16(oggIDHeader[16:], 0) // output gain
oggIDHeader[18] = 0 // channel map 0 = one stream: mono or stereo
// Reference: https://tools.ietf.org/html/rfc7845.html#page-6
// RFC specifies that the ID Header page should have a granule position of 0 and a Header Type set to 2 (StartOfStream)
data := i.createPage(oggIDHeader, 2, 0)
if _, err := i.stream.Write(data); err != nil {
return err
}
// Comment Header
oggCommentHeader := make([]byte, 21)
copy(oggCommentHeader[0:], []byte("OpusTags")) // Magic Signature 'OpusTags'
binary.LittleEndian.PutUint32(oggCommentHeader[8:], 5) // Vendor Length
copy(oggCommentHeader[12:], []byte("pions")) // Vendor name 'pions'
binary.LittleEndian.PutUint32(oggCommentHeader[17:], 0) // User Comment List Length
// RFC specifies that the page where the CommentHeader completes should have a granule position of 0
data = i.createPage(oggCommentHeader, 0, 0)
if _, err := i.stream.Write(data); err != nil {
return err
}
return nil
}
const (
pageHeaderSize = 27
)
func (i *OpusWriter) createPage(payload []uint8, headerType uint8, granulePos uint64) []byte {
payloadLen := len(payload)
page := make([]byte, pageHeaderSize+1+payloadLen)
copy(page[0:], []byte("OggS")) // page headers starts with 'OggS'
page[4] = 0 // Version
page[5] = headerType // 1 = continuation, 2 = beginning of stream, 4 = end of stream
binary.LittleEndian.PutUint64(page[6:], granulePos) // granule position
binary.LittleEndian.PutUint32(page[14:], i.serial) // Bitstream serial number
binary.LittleEndian.PutUint32(page[18:], i.pageIndex) // Page sequence number
i.pageIndex++
page[26] = 1 // Number of segments in page, giving always 1 segment
page[27] = uint8(payloadLen) // Segment Table inserting at 27th position since page header length is 27
copy(page[28:], payload) // inserting at 28th since Segment Table(1) + header length(27)
checksum := crc32.Checksum(payload, i.checksumTable)
binary.LittleEndian.PutUint32(page[22:], checksum) // Checksum - generating for page data and inserting at 22th position into 32 bits
return page
}
// AddPacket adds a new packet and writes the appropriate headers for it
func (i *OpusWriter) AddPacket(packet *rtp.Packet) error {
if i.stream == nil {
return fmt.Errorf("file not opened")
}
opusPacket := codecs.OpusPacket{}
_, err := opusPacket.Unmarshal(packet)
if err != nil {
// Only handle Opus packets
return err
}
payload := opusPacket.Payload[0:]
// Should be equivalent to sampleRate * duration
if i.previousTimestamp != 0 {
increment := packet.Timestamp - i.previousTimestamp
i.previousGranulePosition += uint64(increment)
}
i.previousTimestamp = packet.Timestamp
data := i.createPage(payload, 0, i.previousGranulePosition)
_, err = i.stream.Write(data)
return err
}
// Close stops the recording
func (i *OpusWriter) Close() error {
defer func() {
i.fd = nil
i.stream = nil
}()
if i.stream == nil {
// Returns no error has it may be convenient to call
// Close() multiple times
return nil
}
// RFC specifies that the last page should have a Header Type set to 4 (EndOfStream)
// The granule position here is the magic value '-1'
data := i.createPage(make([]uint8, 0), 4, 0xFFFFFFFFFFFFFFFF)
if _, err := i.stream.Write(data); err != nil {
if i.fd != nil {
if e2 := i.fd.Close(); e2 != nil {
err = fmt.Errorf("error writing file (%v); error deleting file (%v)", err, e2)
}
}
return err
}
if i.fd != nil {
return i.fd.Close()
}
return nil
}

View File

@@ -0,0 +1,126 @@
package opuswriter
import (
"bytes"
"fmt"
"io"
"testing"
"github.com/pions/rtp"
"github.com/stretchr/testify/assert"
)
type opusWriterPacketTest struct {
buffer io.Writer
message string
messageClose string
packet *rtp.Packet
writer *OpusWriter
err error
closeErr error
}
var rawPkt = []byte{
0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64,
0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e,
}
var validPacket = &rtp.Packet{
Header: rtp.Header{
Marker: true,
Extension: true,
ExtensionProfile: 1,
ExtensionPayload: []byte{0xFF, 0xFF, 0xFF, 0xFF},
Version: 2,
PayloadOffset: 20,
PayloadType: 111,
SequenceNumber: 27023,
Timestamp: 3653407706,
SSRC: 476325762,
CSRC: []uint32{},
},
Payload: rawPkt[20:],
Raw: rawPkt,
}
func TestOpusWriter_AddPacketAndClose(t *testing.T) {
assert := assert.New(t)
// The linter misbehave and thinks this code is the same as the tests in ivf-writer_test
// nolint:dupl
addPacketTestCase := []opusWriterPacketTest{
{
buffer: &bytes.Buffer{},
message: "OpusWriter shouldn't be able to write something to a closed file",
messageClose: "OpusWriter should be able to close an already closed file",
packet: nil,
err: fmt.Errorf("file not opened"),
closeErr: nil,
},
{
buffer: &bytes.Buffer{},
message: "OpusWriter shouldn't be able to write an empty packet",
messageClose: "OpusWriter should be able to close the file",
packet: &rtp.Packet{},
err: nil, // TODO: Update pions/rpt Opus unmarshal, so it returns an error, and update expected value
closeErr: nil,
},
{
buffer: &bytes.Buffer{},
message: "OpusWriter should be able to write an Opus packet",
messageClose: "OpusWriter should be able to close the file",
packet: validPacket,
err: nil,
closeErr: nil,
},
{
buffer: nil,
message: "OpusWriter shouldn't be able to write something to a closed file",
messageClose: "OpusWriter should be able to close an already closed file",
packet: nil,
err: fmt.Errorf("file not opened"),
closeErr: nil,
},
}
// First test case has a 'nil' file descriptor
writer, err := NewWith(addPacketTestCase[0].buffer, 48000, 2)
assert.Nil(err, "OpusWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
err = writer.Close()
assert.Nil(err, "OpusWriter should be able to close the file descriptor")
writer.stream = nil
addPacketTestCase[0].writer = writer
// Second test writes tries to write an empty packet
writer, err = NewWith(addPacketTestCase[1].buffer, 48000, 2)
assert.Nil(err, "OpusWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
addPacketTestCase[1].writer = writer
// Third test writes tries to write a valid Opus packet
writer, err = NewWith(addPacketTestCase[2].buffer, 48000, 2)
assert.Nil(err, "OpusWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
addPacketTestCase[2].writer = writer
// Fourth test tries to write to a nil stream
writer, err = NewWith(addPacketTestCase[3].buffer, 4800, 2)
assert.NotNil(err, "IVFWriter shouldn't be created")
assert.Nil(writer, "Writer should be nil")
addPacketTestCase[3].writer = writer
for _, t := range addPacketTestCase {
if t.writer != nil {
res := t.writer.AddPacket(t.packet)
assert.Equal(res, t.err, t.message)
}
}
for _, t := range addPacketTestCase {
if t.writer != nil {
res := t.writer.Close()
assert.Equal(res, t.closeErr, t.messageClose)
}
}
}