diff --git a/pkg/multibuffer/multibuffer.go b/pkg/multibuffer/multibuffer.go index 2962e386..0e30dd34 100644 --- a/pkg/multibuffer/multibuffer.go +++ b/pkg/multibuffer/multibuffer.go @@ -1,4 +1,4 @@ -// Package multibuffer implements a buffer with multiple levels. +// Package multibuffer contains a buffer with multiple levels. package multibuffer // MultiBuffer implements software multi buffering, that allows to reuse diff --git a/pkg/ringbuffer/ringbuffer.go b/pkg/ringbuffer/ringbuffer.go index 6e68510c..b505e410 100644 --- a/pkg/ringbuffer/ringbuffer.go +++ b/pkg/ringbuffer/ringbuffer.go @@ -1,3 +1,4 @@ +// Package ringbuffer contains a ring buffer. package ringbuffer import ( diff --git a/pkg/rtcpreceiver/rtcpreceiver.go b/pkg/rtcpreceiver/rtcpreceiver.go index 149b6059..d9db504a 100644 --- a/pkg/rtcpreceiver/rtcpreceiver.go +++ b/pkg/rtcpreceiver/rtcpreceiver.go @@ -1,4 +1,4 @@ -// Package rtcpreceiver implements a utility to generate RTCP receiver reports. +// Package rtcpreceiver contains a utility to generate RTCP receiver reports. package rtcpreceiver import ( diff --git a/pkg/rtcpsender/rtcpsender.go b/pkg/rtcpsender/rtcpsender.go index 08297504..bab5e537 100644 --- a/pkg/rtcpsender/rtcpsender.go +++ b/pkg/rtcpsender/rtcpsender.go @@ -1,4 +1,4 @@ -// Package rtcpsender implements a utility to generate RTCP sender reports. +// Package rtcpsender contains a utility to generate RTCP sender reports. package rtcpsender import ( diff --git a/pkg/rtpaac/decoder.go b/pkg/rtpaac/decoder.go new file mode 100644 index 00000000..6d3d2863 --- /dev/null +++ b/pkg/rtpaac/decoder.go @@ -0,0 +1,45 @@ +package rtpaac + +import ( + "fmt" + "time" + + "github.com/pion/rtp" +) + +// Decoder is a RTP/AAC decoder. +type Decoder struct { + clockRate time.Duration + initialTs uint32 + initialTsSet bool +} + +// NewDecoder allocates a Decoder. +func NewDecoder(clockRate int) *Decoder { + return &Decoder{ + clockRate: time.Duration(clockRate), + } +} + +// Decode decodes an AU from an RTP/AAC packet. +func (d *Decoder) Decode(byts []byte) (*AUAndTimestamp, error) { + pkt := rtp.Packet{} + err := pkt.Unmarshal(byts) + if err != nil { + return nil, err + } + + if !d.initialTsSet { + d.initialTsSet = true + d.initialTs = pkt.Timestamp + } + + if pkt.Payload[0] != 0x00 || pkt.Payload[1] != 0x10 { + return nil, fmt.Errorf("invalid payload") + } + + return &AUAndTimestamp{ + AU: pkt.Payload[4:], + Timestamp: time.Duration(pkt.Timestamp-d.initialTs) * time.Second / d.clockRate, + }, nil +} diff --git a/pkg/rtpaac/encoder.go b/pkg/rtpaac/encoder.go index 35b1a8b1..7826ba0a 100644 --- a/pkg/rtpaac/encoder.go +++ b/pkg/rtpaac/encoder.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "fmt" "math/rand" - "time" "github.com/pion/rtp" ) @@ -22,39 +21,55 @@ type Encoder struct { sequenceNumber uint16 ssrc uint32 initialTs uint32 - started time.Duration } // NewEncoder allocates an Encoder. -func NewEncoder(payloadType uint8, clockRate int) (*Encoder, error) { +func NewEncoder(payloadType uint8, + clockRate int, + sequenceNumber *uint16, + ssrc *uint32, + initialTs *uint32) *Encoder { return &Encoder{ - payloadType: payloadType, - clockRate: float64(clockRate), - sequenceNumber: uint16(rand.Uint32()), - ssrc: rand.Uint32(), - initialTs: rand.Uint32(), - }, nil + payloadType: payloadType, + clockRate: float64(clockRate), + sequenceNumber: func() uint16 { + if sequenceNumber != nil { + return *sequenceNumber + } + return uint16(rand.Uint32()) + }(), + ssrc: func() uint32 { + if ssrc != nil { + return *ssrc + } + return rand.Uint32() + }(), + initialTs: func() uint32 { + if initialTs != nil { + return *initialTs + } + return rand.Uint32() + }(), + } } -// Write encodes an AAC frame into RTP/AAC packets. -func (e *Encoder) Write(ts time.Duration, data []byte) ([][]byte, error) { - if e.started == 0 { - e.started = ts - } - - if len(data) > rtpPayloadMaxSize { +// Encode encodes an AU into an RTP/AAC packet. +func (e *Encoder) Encode(at *AUAndTimestamp) ([]byte, error) { + if len(at.AU) > rtpPayloadMaxSize { return nil, fmt.Errorf("data is too big") } - rtpTs := e.initialTs + uint32((ts-e.started).Seconds()*e.clockRate) + rtpTs := e.initialTs + uint32((at.Timestamp).Seconds()*e.clockRate) + + payload := []byte{0x00, 0x10} // 13 bits payload size // 3 bits AU-Index(-delta) header := make([]byte, 2) - binary.BigEndian.PutUint16(header, uint16(len(data))<<3) + binary.BigEndian.PutUint16(header, uint16(len(at.AU))<<3) + payload = append(payload, header...) - payload := append([]byte{0x00, 0x10}, header...) - payload = append(payload, data...) + payload = append(payload, at.AU...) rpkt := rtp.Packet{ Header: rtp.Header{ @@ -74,5 +89,5 @@ func (e *Encoder) Write(ts time.Duration, data []byte) ([][]byte, error) { return nil, err } - return [][]byte{frame}, nil + return frame, nil } diff --git a/pkg/rtpaac/rtpaac.go b/pkg/rtpaac/rtpaac.go new file mode 100644 index 00000000..f2562ccd --- /dev/null +++ b/pkg/rtpaac/rtpaac.go @@ -0,0 +1,12 @@ +// Package rtpaac contains a RTP/AAC decoder and encoder. +package rtpaac + +import ( + "time" +) + +// AUAndTimestamp is an Access Unit and its timestamp. +type AUAndTimestamp struct { + Timestamp time.Duration + AU []byte +} diff --git a/pkg/rtpaac/rtpaac_test.go b/pkg/rtpaac/rtpaac_test.go new file mode 100644 index 00000000..a9ecf9c8 --- /dev/null +++ b/pkg/rtpaac/rtpaac_test.go @@ -0,0 +1,80 @@ +package rtpaac + +import ( + "bytes" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func mergeBytes(vals ...[]byte) []byte { + size := 0 + for _, v := range vals { + size += len(v) + } + res := make([]byte, size) + + pos := 0 + for _, v := range vals { + n := copy(res[pos:], v) + pos += n + } + + return res +} + +var cases = []struct { + name string + dec *AUAndTimestamp + enc []byte +}{ + { + "single", + &AUAndTimestamp{ + Timestamp: 25 * time.Millisecond, + AU: bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8), + }, + mergeBytes( + []byte{ + 0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x6b, 0x05, + 0x9d, 0xbb, 0x78, 0x12, 0x00, 0x10, 0x02, 0x00, + }, + bytes.Repeat([]byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07}, 8), + ), + }, +} + +func TestEncode(t *testing.T) { + for _, ca := range cases { + t.Run(ca.name, func(t *testing.T) { + sequenceNumber := uint16(0x44ed) + ssrc := uint32(0x9dbb7812) + initialTs := uint32(0x88776655) + e := NewEncoder(96, 48000, &sequenceNumber, &ssrc, &initialTs) + enc, err := e.Encode(ca.dec) + require.NoError(t, err) + require.Equal(t, ca.enc, enc) + }) + } +} + +func TestDecode(t *testing.T) { + for _, ca := range cases { + t.Run(ca.name, func(t *testing.T) { + d := NewDecoder(48000) + + // send an initial packet downstream + // in order to correctly compute the timestamp + _, err := d.Decode([]byte{ + 0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55, + 0x9d, 0xbb, 0x78, 0x12, 0x00, 0x10, 0x00, 0x08, 0x0, + }) + require.NoError(t, err) + + dec, err := d.Decode(ca.enc) + require.NoError(t, err) + require.Equal(t, ca.dec, dec) + }) + } +} diff --git a/pkg/rtph264/decoder.go b/pkg/rtph264/decoder.go index 8e2fe0f0..fec48b1b 100644 --- a/pkg/rtph264/decoder.go +++ b/pkg/rtph264/decoder.go @@ -33,13 +33,13 @@ const ( // Decoder is a RTP/H264 decoder. type Decoder struct { - state decoderState initialTs uint32 initialTsSet bool + state decoderState fragmentedBuf []byte } -// NewDecoder creates a decoder around a Reader. +// NewDecoder allocates a Decoder. func NewDecoder() *Decoder { return &Decoder{} } diff --git a/pkg/rtph264/encoder.go b/pkg/rtph264/encoder.go index 7b14fb07..4f4cd080 100644 --- a/pkg/rtph264/encoder.go +++ b/pkg/rtph264/encoder.go @@ -21,8 +21,10 @@ type Encoder struct { } // NewEncoder allocates an Encoder. -func NewEncoder(payloadType uint8, sequenceNumber *uint16, - ssrc *uint32, initialTs *uint32) *Encoder { +func NewEncoder(payloadType uint8, + sequenceNumber *uint16, + ssrc *uint32, + initialTs *uint32) *Encoder { return &Encoder{ payloadType: payloadType, sequenceNumber: func() uint16 { diff --git a/pkg/rtph264/rtph264.go b/pkg/rtph264/rtph264.go index 9ccedd60..52227618 100644 --- a/pkg/rtph264/rtph264.go +++ b/pkg/rtph264/rtph264.go @@ -5,7 +5,7 @@ import ( "time" ) -// NALUAndTimestamp is a NALU and an associated timestamp. +// NALUAndTimestamp is a Network Abstraction Layer Unit and its timestamp. type NALUAndTimestamp struct { Timestamp time.Duration NALU []byte diff --git a/pkg/rtph264/rtph264_test.go b/pkg/rtph264/rtph264_test.go index fed2f252..da40997d 100644 --- a/pkg/rtph264/rtph264_test.go +++ b/pkg/rtph264/rtph264_test.go @@ -2,6 +2,7 @@ package rtph264 import ( "bytes" + "io" "testing" "time" @@ -104,30 +105,31 @@ func TestDecode(t *testing.T) { t.Run(ca.name, func(t *testing.T) { i := 0 r := readerFunc(func(p []byte) (int, error) { - if i == 0 { - // send an initial packet downstream - // in order to correctly compute the timestamp - n := copy(p, []byte{ - 0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55, - 0x9d, 0xbb, 0x78, 0x12, 0x06, 0x00, - }) - i++ - return n, nil + if i == len(ca.enc) { + return 0, io.EOF } - n := copy(p, ca.enc[i-1]) + n := copy(p, ca.enc[i]) i++ return n, nil }) d := NewDecoder() - _, err := d.Read(r) + // send an initial packet downstream + // in order to correctly compute the timestamp + _, err := d.Decode([]byte{ + 0x80, 0xe0, 0x44, 0xed, 0x88, 0x77, 0x66, 0x55, + 0x9d, 0xbb, 0x78, 0x12, 0x06, 0x00, + }) require.NoError(t, err) dec, err := d.Read(r) require.NoError(t, err) require.Equal(t, ca.dec, dec) + + _, err = d.Read(r) + require.Equal(t, io.EOF, err) }) } }