rtp*: move time decoder into own package

This commit is contained in:
aler9
2022-01-18 10:44:44 +01:00
parent 51ab931caa
commit 17f05635dd
6 changed files with 75 additions and 108 deletions

View File

@@ -7,6 +7,8 @@ import (
"time" "time"
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/rtptimedec"
) )
// ErrMorePacketsNeeded is returned when more packets are needed. // ErrMorePacketsNeeded is returned when more packets are needed.
@@ -14,13 +16,7 @@ var ErrMorePacketsNeeded = errors.New("need more packets")
// Decoder is a RTP/AAC decoder. // Decoder is a RTP/AAC decoder.
type Decoder struct { type Decoder struct {
clockRate time.Duration timeDecoder *rtptimedec.Decoder
tsAdd int64
tsInitial *int64
tsPrev *int64
// for Decode()
isDecodingFragmented bool isDecodingFragmented bool
fragmentedBuf []byte fragmentedBuf []byte
} }
@@ -28,26 +24,10 @@ type Decoder struct {
// NewDecoder allocates a Decoder. // NewDecoder allocates a Decoder.
func NewDecoder(clockRate int) *Decoder { func NewDecoder(clockRate int) *Decoder {
return &Decoder{ return &Decoder{
clockRate: time.Duration(clockRate), timeDecoder: rtptimedec.New(clockRate),
} }
} }
func (d *Decoder) decodeTimestamp(ts uint32) time.Duration {
ts64 := int64(ts) + d.tsAdd
if d.tsPrev != nil && (ts64-*d.tsPrev) < -0xFFFF {
ts64 += 0xFFFFFFFF
d.tsAdd += 0xFFFFFFFF
}
d.tsPrev = &ts64
if d.tsInitial == nil {
d.tsInitial = &ts64
}
return time.Duration(ts64-*d.tsInitial) * time.Second / d.clockRate
}
// Decode decodes AUs from a RTP/AAC packet. // Decode decodes AUs from a RTP/AAC packet.
// It returns the AUs and the PTS of the first AU. // It returns the AUs and the PTS of the first AU.
// The PTS of subsequent AUs can be calculated by adding time.Second*1000/clockRate. // The PTS of subsequent AUs can be calculated by adding time.Second*1000/clockRate.
@@ -100,7 +80,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
pkt.Payload = pkt.Payload[dataLen:] pkt.Payload = pkt.Payload[dataLen:]
} }
return aus, d.decodeTimestamp(pkt.Timestamp), nil return aus, d.timeDecoder.Decode(pkt.Timestamp), nil
} }
if headersLen != 16 { if headersLen != 16 {
@@ -152,5 +132,5 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
} }
d.isDecodingFragmented = false d.isDecodingFragmented = false
return [][]byte{d.fragmentedBuf}, d.decodeTimestamp(pkt.Timestamp), nil return [][]byte{d.fragmentedBuf}, d.timeDecoder.Decode(pkt.Timestamp), nil
} }

View File

@@ -255,35 +255,6 @@ func TestDecode(t *testing.T) {
} }
} }
func TestDecodeTimestampOverflow(t *testing.T) {
d := NewDecoder(90000)
var pts time.Duration
for _, ts := range []uint32{
4294877296,
90001,
3240090001,
565122706,
} {
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x01,
Timestamp: ts,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
var err error
_, pts, err = d.Decode(&pkt)
require.NoError(t, err)
}
require.Equal(t, 15*60*60*time.Second+2*time.Second, pts)
}
func TestDecodeErrors(t *testing.T) { func TestDecodeErrors(t *testing.T) {
for _, ca := range []struct { for _, ca := range []struct {
name string name string

View File

@@ -11,6 +11,7 @@ import (
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/aler9/gortsplib/pkg/h264" "github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/rtptimedec"
) )
// ErrMorePacketsNeeded is returned when more packets are needed. // ErrMorePacketsNeeded is returned when more packets are needed.
@@ -36,11 +37,7 @@ func (r PacketConnReader) Read(p []byte) (int, error) {
// Decoder is a RTP/H264 decoder. // Decoder is a RTP/H264 decoder.
type Decoder struct { type Decoder struct {
tsAdd int64 timeDecoder *rtptimedec.Decoder
tsInitial *int64
tsPrev *int64
// for Decode()
startingPacketReceived bool startingPacketReceived bool
isDecodingFragmented bool isDecodingFragmented bool
fragmentedBuffer []byte fragmentedBuffer []byte
@@ -51,23 +48,9 @@ type Decoder struct {
// NewDecoder allocates a Decoder. // NewDecoder allocates a Decoder.
func NewDecoder() *Decoder { func NewDecoder() *Decoder {
return &Decoder{} return &Decoder{
timeDecoder: rtptimedec.New(90000),
} }
func (d *Decoder) decodeTimestamp(ts uint32) time.Duration {
ts64 := int64(ts) + d.tsAdd
if d.tsPrev != nil && (ts64-*d.tsPrev) < -0xFFFF {
ts64 += 0xFFFFFFFF
d.tsAdd += 0xFFFFFFFF
}
d.tsPrev = &ts64
if d.tsInitial == nil {
d.tsInitial = &ts64
}
return time.Duration(ts64-*d.tsInitial) * time.Second / rtpClockRate
} }
// Decode decodes NALUs from a RTP/H264 packet. // Decode decodes NALUs from a RTP/H264 packet.
@@ -110,7 +93,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
} }
d.startingPacketReceived = true d.startingPacketReceived = true
return nalus, d.decodeTimestamp(pkt.Timestamp), nil return nalus, d.timeDecoder.Decode(pkt.Timestamp), nil
case naluTypeFUA: // first packet of a fragmented NALU case naluTypeFUA: // first packet of a fragmented NALU
if len(pkt.Payload) < 2 { if len(pkt.Payload) < 2 {
@@ -139,7 +122,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
} }
d.startingPacketReceived = true d.startingPacketReceived = true
return [][]byte{pkt.Payload}, d.decodeTimestamp(pkt.Timestamp), nil return [][]byte{pkt.Payload}, d.timeDecoder.Decode(pkt.Timestamp), nil
} }
// we are decoding a fragmented NALU // we are decoding a fragmented NALU
@@ -171,7 +154,7 @@ func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, time.Duration, error) {
d.isDecodingFragmented = false d.isDecodingFragmented = false
d.startingPacketReceived = true d.startingPacketReceived = true
return [][]byte{d.fragmentedBuffer}, d.decodeTimestamp(pkt.Timestamp), nil return [][]byte{d.fragmentedBuffer}, d.timeDecoder.Decode(pkt.Timestamp), nil
} }
// DecodeUntilMarker decodes NALUs from a RTP/H264 packet and puts them in a buffer. // DecodeUntilMarker decodes NALUs from a RTP/H264 packet and puts them in a buffer.

View File

@@ -259,35 +259,6 @@ func TestDecode(t *testing.T) {
} }
} }
func TestDecodeTimestampOverflow(t *testing.T) {
d := NewDecoder()
var pts time.Duration
for _, ts := range []uint32{
4294877296,
90001,
3240090001,
565122706,
} {
pkt := rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 0x01,
Timestamp: ts,
SSRC: 0xba9da416,
},
Payload: []byte("\x00\x00"),
}
var err error
_, pts, err = d.Decode(&pkt)
require.NoError(t, err)
}
require.Equal(t, 15*60*60*time.Second+2*time.Second, pts)
}
func TestDecodePartOfFragmentedBeforeSingle(t *testing.T) { func TestDecodePartOfFragmentedBeforeSingle(t *testing.T) {
d := NewDecoder() d := NewDecoder()

38
pkg/rtptimedec/decoder.go Normal file
View File

@@ -0,0 +1,38 @@
// Package rtptimedec contains a RTP timestamp decoder.
package rtptimedec
import (
"time"
)
// Decoder is a RTP timestamp decoder.
type Decoder struct {
clockRate time.Duration
tsAdd int64
tsInitial *int64
tsPrev *int64
}
// New allocates a Decoder.
func New(clockRate int) *Decoder {
return &Decoder{
clockRate: time.Duration(clockRate),
}
}
// Decode decodes a RTP timestamp.
func (d *Decoder) Decode(ts uint32) time.Duration {
ts64 := int64(ts) + d.tsAdd
if d.tsPrev != nil && (ts64-*d.tsPrev) < -0xFFFF {
ts64 += 0xFFFFFFFF
d.tsAdd += 0xFFFFFFFF
}
d.tsPrev = &ts64
if d.tsInitial == nil {
d.tsInitial = &ts64
}
return time.Duration(ts64-*d.tsInitial) * time.Second / d.clockRate
}

View File

@@ -0,0 +1,24 @@
package rtptimedec
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestOverflow(t *testing.T) {
d := New(90000)
var pts time.Duration
for _, ts := range []uint32{
4294877296,
90001,
3240090001,
565122706,
} {
pts = d.Decode(ts)
}
require.Equal(t, 15*60*60*time.Second+2*time.Second, pts)
}