Fix OpusWriter issues that break VLC

* Correctly calculate page CRC
Ogg uses slightly non-standard version of CRC, we are unable to
use the Go's version. You can see the details here[0], in summary
"direct algorithm, initial val and final XOR = 0,
generator polynomial=0x04c11db7"

[0] https://xiph.org/vorbis/doc/framing.html

* Properly set EOS value
Before we created a new page with an EOS. Instead seek backwards
and update the last page with valid Opus to have EOS header and
re-generate the CRC

* Only use 0 timestamp/granule for headers
Audio itself should start from 0

* Rename OpusWriter -> OggWriter
Ogg supports more then just Opus, renaming gives us the flexibility to
expand in the future.
This commit is contained in:
Sean DuBois
2019-08-15 15:56:16 -07:00
committed by Sean DuBois
parent 12efd3e258
commit 6209597312
6 changed files with 275 additions and 222 deletions

1
.gitignore vendored
View File

@@ -15,6 +15,7 @@ node_modules/
### Files ###
#############
*.ivf
*.ogg
tags
cover.out
*.sw[poe]

View File

@@ -9,7 +9,7 @@ import (
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
"github.com/pion/webrtc/v2/pkg/media/ivfwriter"
"github.com/pion/webrtc/v2/pkg/media/opuswriter"
"github.com/pion/webrtc/v2/pkg/media/oggwriter"
"github.com/pion/webrtc/v2/examples/internal/signal"
)
@@ -68,7 +68,7 @@ func main() {
panic(err)
}
opusFile, err := opuswriter.New("output.opus", 48000, 2)
oggFile, err := oggwriter.New("output.ogg", 48000, 2)
if err != nil {
panic(err)
}
@@ -95,7 +95,7 @@ func main() {
codec := track.Codec()
if codec.Name == webrtc.Opus {
fmt.Println("Got Opus track, saving to disk as output.opus (48 kHz, 2 channels)")
saveToDisk(opusFile, track)
saveToDisk(oggFile, track)
} else if codec.Name == webrtc.VP8 {
fmt.Println("Got VP8 track, saving to disk as output.ivf")
saveToDisk(ivfFile, track)
@@ -112,7 +112,7 @@ func main() {
} else if connectionState == webrtc.ICEConnectionStateFailed ||
connectionState == webrtc.ICEConnectionStateDisconnected {
closeErr := opusFile.Close()
closeErr := oggFile.Close()
if closeErr != nil {
panic(closeErr)
}

View File

@@ -47,7 +47,7 @@ func TestIVFWriter_AddPacketAndClose(t *testing.T) {
assert := assert.New(t)
// The linter misbehave and thinks this code is the same as the tests in opuswriter_test
// The linter misbehave and thinks this code is the same as the tests in oggwriter_test
// nolint:dupl
addPacketTestCase := []ivfWriterPacketTest{
{

View File

@@ -0,0 +1,251 @@
package oggwriter
import (
"encoding/binary"
"fmt"
"io"
"math/rand"
"os"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
)
const (
pageHeaderTypeContinuationOfStream = 0x00
pageHeaderTypeBeginningOfStream = 0x02
pageHeaderTypeEndOfStream = 0x04
defaultPreSkip = 3840 // 3840 recommanded in the RFC
idPageSignature = "OpusHead"
commentPageSignature = "OpusTags"
pageHeaderSignature = "OggS"
)
// OggWriter is used to take RTP packets and write them to an OGG on disk
type OggWriter struct {
stream io.Writer
fd *os.File
sampleRate uint32
channelCount uint16
serial uint32
pageIndex uint32
checksumTable *[256]uint32
previousGranulePosition uint64
previousTimestamp uint32
lastPayloadSize int
}
// New builds a new OGG Opus writer
func New(fileName string, sampleRate uint32, channelCount uint16) (*OggWriter, error) {
f, err := os.Create(fileName)
if err != nil {
return nil, err
}
writer, err := NewWith(f, sampleRate, channelCount)
if err != nil {
return nil, f.Close()
}
writer.fd = f
return writer, nil
}
// NewWith initialize a new OGG Opus writer with an io.Writer output
func NewWith(out io.Writer, sampleRate uint32, channelCount uint16) (*OggWriter, error) {
if out == nil {
return nil, fmt.Errorf("file not opened")
}
writer := &OggWriter{
stream: out,
sampleRate: sampleRate,
channelCount: channelCount,
serial: rand.Uint32(),
checksumTable: generateChecksumTable(),
// Timestamp and Granule MUST start from 1
// Only headers can have 0 values
previousTimestamp: 1,
previousGranulePosition: 1,
}
if err := writer.writeHeaders(); err != nil {
return nil, err
}
return writer, nil
}
/*
ref: https://tools.ietf.org/html/rfc7845.html
https://git.xiph.org/?p=opus-tools.git;a=blob;f=src/opus_header.c#l219
Page 0 Pages 1 ... n Pages (n+1) ...
+------------+ +---+ +---+ ... +---+ +-----------+ +---------+ +--
| | | | | | | | | | | | |
|+----------+| |+-----------------+| |+-------------------+ +-----
|||ID Header|| || Comment Header || ||Audio Data Packet 1| | ...
|+----------+| |+-----------------+| |+-------------------+ +-----
| | | | | | | | | | | | |
+------------+ +---+ +---+ ... +---+ +-----------+ +---------+ +--
^ ^ ^
| | |
| | Mandatory Page Break
| |
| ID header is contained on a single page
|
'Beginning Of Stream'
Figure 1: Example Packet Organization for a Logical Ogg Opus Stream
*/
func (i *OggWriter) writeHeaders() error {
// ID Header
oggIDHeader := make([]byte, 19)
copy(oggIDHeader[0:], []byte(idPageSignature)) // Magic Signature 'OpusHead'
oggIDHeader[8] = 1 // Version
oggIDHeader[9] = uint8(i.channelCount) // Channel count
binary.LittleEndian.PutUint16(oggIDHeader[10:], defaultPreSkip) // pre-skip
binary.LittleEndian.PutUint32(oggIDHeader[12:], i.sampleRate) // original sample rate, any valid sample e.g 48000
binary.LittleEndian.PutUint16(oggIDHeader[16:], 0) // output gain
oggIDHeader[18] = 0 // channel map 0 = one stream: mono or stereo
// Reference: https://tools.ietf.org/html/rfc7845.html#page-6
// RFC specifies that the ID Header page should have a granule position of 0 and a Header Type set to 2 (StartOfStream)
data := i.createPage(oggIDHeader, pageHeaderTypeBeginningOfStream, 0, i.pageIndex)
if err := i.writeToStream(data); err != nil {
return err
}
i.pageIndex++
// Comment Header
oggCommentHeader := make([]byte, 21)
copy(oggCommentHeader[0:], []byte(commentPageSignature)) // Magic Signature 'OpusTags'
binary.LittleEndian.PutUint32(oggCommentHeader[8:], 5) // Vendor Length
copy(oggCommentHeader[12:], []byte("pion")) // Vendor name 'pion'
binary.LittleEndian.PutUint32(oggCommentHeader[17:], 0) // User Comment List Length
// RFC specifies that the page where the CommentHeader completes should have a granule position of 0
data = i.createPage(oggCommentHeader, pageHeaderTypeContinuationOfStream, 0, i.pageIndex)
if err := i.writeToStream(data); err != nil {
return err
}
i.pageIndex++
return nil
}
const (
pageHeaderSize = 27
)
func (i *OggWriter) createPage(payload []uint8, headerType uint8, granulePos uint64, pageIndex uint32) []byte {
i.lastPayloadSize = len(payload)
page := make([]byte, pageHeaderSize+1+i.lastPayloadSize)
copy(page[0:], []byte(pageHeaderSignature)) // page headers starts with 'OggS'
page[4] = 0 // Version
page[5] = headerType // 1 = continuation, 2 = beginning of stream, 4 = end of stream
binary.LittleEndian.PutUint64(page[6:], granulePos) // granule position
binary.LittleEndian.PutUint32(page[14:], i.serial) // Bitstream serial number
binary.LittleEndian.PutUint32(page[18:], pageIndex) // Page sequence number
page[26] = 1 // Number of segments in page, giving always 1 segment
page[27] = uint8(i.lastPayloadSize) // Segment Table inserting at 27th position since page header length is 27
copy(page[28:], payload) // inserting at 28th since Segment Table(1) + header length(27)
var checksum uint32
for index := range page {
checksum = (checksum << 8) ^ i.checksumTable[byte(checksum>>24)^page[index]]
}
binary.LittleEndian.PutUint32(page[22:], checksum) // Checksum - generating for page data and inserting at 22th position into 32 bits
return page
}
// WriteRTP adds a new packet and writes the appropriate headers for it
func (i *OggWriter) WriteRTP(packet *rtp.Packet) error {
if packet == nil {
return fmt.Errorf("packet must not be nil")
}
opusPacket := codecs.OpusPacket{}
if _, err := opusPacket.Unmarshal(packet.Payload); err != nil {
// Only handle Opus packets
return err
}
payload := opusPacket.Payload[0:]
// Should be equivalent to sampleRate * duration
if i.previousTimestamp != 0 {
increment := packet.Timestamp - i.previousTimestamp
i.previousGranulePosition += uint64(increment)
}
i.previousTimestamp = packet.Timestamp
data := i.createPage(payload, pageHeaderTypeContinuationOfStream, i.previousGranulePosition, i.pageIndex)
i.pageIndex++
return i.writeToStream(data)
}
// Close stops the recording
func (i *OggWriter) Close() error {
defer func() {
i.fd = nil
i.stream = nil
}()
// Returns no error has it may be convenient to call
// Close() multiple times
if i.fd == nil {
return nil
}
// Seek back one page, we need to update the header and generate new CRC
pageOffset, err := i.fd.Seek(-1*int64(i.lastPayloadSize+pageHeaderSize+1), 2)
if err != nil {
return err
}
payload := make([]byte, i.lastPayloadSize)
if _, err := i.fd.ReadAt(payload, pageOffset+pageHeaderSize+1); err != nil {
return err
}
data := i.createPage(payload, pageHeaderTypeEndOfStream, i.previousGranulePosition, i.pageIndex-1)
if err := i.writeToStream(data); err != nil {
return err
}
// Update the last page if we are operating on files
// to mark it as the EOS
return i.fd.Close()
}
// Wraps writing to the stream and maintains state
// so we can set values for EOS
func (i *OggWriter) writeToStream(p []byte) error {
if i.stream == nil {
return fmt.Errorf("file not opened")
}
_, err := i.stream.Write(p)
return err
}
func generateChecksumTable() *[256]uint32 {
var table [256]uint32
const poly = 0x04c11db7
for i := range table {
r := uint32(i) << 24
for j := 0; j < 8; j++ {
if (r & 0x80000000) != 0 {
r = (r << 1) ^ poly
} else {
r <<= 1
}
table[i] = (r & 0xffffffff)
}
}
return &table
}

View File

@@ -1,4 +1,4 @@
package opuswriter
package oggwriter
import (
"bytes"
@@ -10,17 +10,17 @@ import (
"github.com/stretchr/testify/assert"
)
type opusWriterPacketTest struct {
type oggWriterPacketTest struct {
buffer io.Writer
message string
messageClose string
packet *rtp.Packet
writer *OpusWriter
writer *OggWriter
err error
closeErr error
}
func TestOpusWriter_AddPacketAndClose(t *testing.T) {
func TestOggWriter_AddPacketAndClose(t *testing.T) {
rawPkt := []byte{
0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64,
0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e,
@@ -48,35 +48,35 @@ func TestOpusWriter_AddPacketAndClose(t *testing.T) {
// The linter misbehave and thinks this code is the same as the tests in ivf-writer_test
// nolint:dupl
addPacketTestCase := []opusWriterPacketTest{
addPacketTestCase := []oggWriterPacketTest{
{
buffer: &bytes.Buffer{},
message: "OpusWriter shouldn't be able to write something to a closed file",
messageClose: "OpusWriter should be able to close an already closed file",
packet: nil,
message: "OggWriter shouldn't be able to write something to a closed file",
messageClose: "OggWriter should be able to close an already closed file",
packet: validPacket,
err: fmt.Errorf("file not opened"),
closeErr: nil,
},
{
buffer: &bytes.Buffer{},
message: "OpusWriter shouldn't be able to write an empty packet",
messageClose: "OpusWriter should be able to close the file",
message: "OggWriter shouldn't be able to write an empty packet",
messageClose: "OggWriter should be able to close the file",
packet: &rtp.Packet{},
err: fmt.Errorf("invalid nil packet"),
closeErr: nil,
},
{
buffer: &bytes.Buffer{},
message: "OpusWriter should be able to write an Opus packet",
messageClose: "OpusWriter should be able to close the file",
message: "OggWriter should be able to write an Opus packet",
messageClose: "OggWriter should be able to close the file",
packet: validPacket,
err: nil,
closeErr: nil,
},
{
buffer: nil,
message: "OpusWriter shouldn't be able to write something to a closed file",
messageClose: "OpusWriter should be able to close an already closed file",
message: "OggWriter shouldn't be able to write something to a closed file",
messageClose: "OggWriter should be able to close an already closed file",
packet: nil,
err: fmt.Errorf("file not opened"),
closeErr: nil,
@@ -85,22 +85,22 @@ func TestOpusWriter_AddPacketAndClose(t *testing.T) {
// First test case has a 'nil' file descriptor
writer, err := NewWith(addPacketTestCase[0].buffer, 48000, 2)
assert.Nil(err, "OpusWriter should be created")
assert.Nil(err, "OggWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
err = writer.Close()
assert.Nil(err, "OpusWriter should be able to close the file descriptor")
assert.Nil(err, "OggWriter should be able to close the file descriptor")
writer.stream = nil
addPacketTestCase[0].writer = writer
// Second test writes tries to write an empty packet
writer, err = NewWith(addPacketTestCase[1].buffer, 48000, 2)
assert.Nil(err, "OpusWriter should be created")
assert.Nil(err, "OggWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
addPacketTestCase[1].writer = writer
// Third test writes tries to write a valid Opus packet
writer, err = NewWith(addPacketTestCase[2].buffer, 48000, 2)
assert.Nil(err, "OpusWriter should be created")
assert.Nil(err, "OggWriter should be created")
assert.NotNil(writer, "Writer shouldn't be nil")
addPacketTestCase[2].writer = writer

View File

@@ -1,199 +0,0 @@
package opuswriter
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math/rand"
"os"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
)
// OpusWriter is used to take RTP packets and write them to an OGG on disk
type OpusWriter struct {
stream io.Writer
fd *os.File
sampleRate uint32
channelCount uint16
serial uint32
pageIndex uint32
checksumTable *crc32.Table
previousGranulePosition uint64
previousTimestamp uint32
}
// New builds a new OGG Opus writer
func New(fileName string, sampleRate uint32, channelCount uint16) (*OpusWriter, error) {
f, err := os.Create(fileName)
if err != nil {
return nil, err
}
writer, err := NewWith(f, sampleRate, channelCount)
if err != nil {
return nil, err
}
writer.fd = f
return writer, nil
}
// NewWith initialize a new OGG Opus writer with an io.Writer output
func NewWith(out io.Writer, sampleRate uint32, channelCount uint16) (*OpusWriter, error) {
if out == nil {
return nil, fmt.Errorf("file not opened")
}
writer := &OpusWriter{
stream: out,
sampleRate: sampleRate,
channelCount: channelCount,
serial: rand.Uint32(),
checksumTable: crc32.MakeTable(0x04c11db7),
}
if err := writer.writeHeaders(); err != nil {
return nil, err
}
return writer, nil
}
/*
ref: https://tools.ietf.org/html/rfc7845.html
https://git.xiph.org/?p=opus-tools.git;a=blob;f=src/opus_header.c#l219
Page 0 Pages 1 ... n Pages (n+1) ...
+------------+ +---+ +---+ ... +---+ +-----------+ +---------+ +--
| | | | | | | | | | | | |
|+----------+| |+-----------------+| |+-------------------+ +-----
|||ID Header|| || Comment Header || ||Audio Data Packet 1| | ...
|+----------+| |+-----------------+| |+-------------------+ +-----
| | | | | | | | | | | | |
+------------+ +---+ +---+ ... +---+ +-----------+ +---------+ +--
^ ^ ^
| | |
| | Mandatory Page Break
| |
| ID header is contained on a single page
|
'Beginning Of Stream'
Figure 1: Example Packet Organization for a Logical Ogg Opus Stream
*/
func (i *OpusWriter) writeHeaders() error {
// ID Header
oggIDHeader := make([]byte, 19)
copy(oggIDHeader[0:], []byte("OpusHead")) // Magic Signature 'OpusHead'
oggIDHeader[8] = 1 // Version
oggIDHeader[9] = uint8(i.channelCount) // Channel count
binary.LittleEndian.PutUint16(oggIDHeader[10:], 0) // pre-skip, don't need to skip any value
binary.LittleEndian.PutUint32(oggIDHeader[12:], i.sampleRate) // original sample rate, any valid sample e.g 48000
binary.LittleEndian.PutUint16(oggIDHeader[16:], 0) // output gain
oggIDHeader[18] = 0 // channel map 0 = one stream: mono or stereo
// Reference: https://tools.ietf.org/html/rfc7845.html#page-6
// RFC specifies that the ID Header page should have a granule position of 0 and a Header Type set to 2 (StartOfStream)
data := i.createPage(oggIDHeader, 2, 0)
if _, err := i.stream.Write(data); err != nil {
return err
}
// Comment Header
oggCommentHeader := make([]byte, 21)
copy(oggCommentHeader[0:], []byte("OpusTags")) // Magic Signature 'OpusTags'
binary.LittleEndian.PutUint32(oggCommentHeader[8:], 5) // Vendor Length
copy(oggCommentHeader[12:], []byte("pion")) // Vendor name 'pion'
binary.LittleEndian.PutUint32(oggCommentHeader[17:], 0) // User Comment List Length
// RFC specifies that the page where the CommentHeader completes should have a granule position of 0
data = i.createPage(oggCommentHeader, 0, 0)
if _, err := i.stream.Write(data); err != nil {
return err
}
return nil
}
const (
pageHeaderSize = 27
)
func (i *OpusWriter) createPage(payload []uint8, headerType uint8, granulePos uint64) []byte {
payloadLen := len(payload)
page := make([]byte, pageHeaderSize+1+payloadLen)
copy(page[0:], []byte("OggS")) // page headers starts with 'OggS'
page[4] = 0 // Version
page[5] = headerType // 1 = continuation, 2 = beginning of stream, 4 = end of stream
binary.LittleEndian.PutUint64(page[6:], granulePos) // granule position
binary.LittleEndian.PutUint32(page[14:], i.serial) // Bitstream serial number
binary.LittleEndian.PutUint32(page[18:], i.pageIndex) // Page sequence number
i.pageIndex++
page[26] = 1 // Number of segments in page, giving always 1 segment
page[27] = uint8(payloadLen) // Segment Table inserting at 27th position since page header length is 27
copy(page[28:], payload) // inserting at 28th since Segment Table(1) + header length(27)
checksum := crc32.Checksum(payload, i.checksumTable)
binary.LittleEndian.PutUint32(page[22:], checksum) // Checksum - generating for page data and inserting at 22th position into 32 bits
return page
}
// WriteRTP adds a new packet and writes the appropriate headers for it
func (i *OpusWriter) WriteRTP(packet *rtp.Packet) error {
if i.stream == nil {
return fmt.Errorf("file not opened")
}
opusPacket := codecs.OpusPacket{}
if _, err := opusPacket.Unmarshal(packet.Payload); err != nil {
// Only handle Opus packets
return err
}
payload := opusPacket.Payload[0:]
// Should be equivalent to sampleRate * duration
if i.previousTimestamp != 0 {
increment := packet.Timestamp - i.previousTimestamp
i.previousGranulePosition += uint64(increment)
}
i.previousTimestamp = packet.Timestamp
data := i.createPage(payload, 0, i.previousGranulePosition)
_, err := i.stream.Write(data)
return err
}
// Close stops the recording
func (i *OpusWriter) Close() error {
defer func() {
i.fd = nil
i.stream = nil
}()
if i.stream == nil {
// Returns no error has it may be convenient to call
// Close() multiple times
return nil
}
// RFC specifies that the last page should have a Header Type set to 4 (EndOfStream)
// The granule position here is the magic value '-1'
data := i.createPage(make([]uint8, 0), 4, 0xFFFFFFFFFFFFFFFF)
if _, err := i.stream.Write(data); err != nil {
if i.fd != nil {
if e2 := i.fd.Close(); e2 != nil {
err = fmt.Errorf("error writing file (%v); error deleting file (%v)", err, e2)
}
}
return err
}
if i.fd != nil {
return i.fd.Close()
}
return nil
}