B帧优化,处理rtp时间戳跳跃

This commit is contained in:
dexter
2021-11-23 12:38:12 +08:00
parent 7942840094
commit bc359de7b8
6 changed files with 197 additions and 108 deletions

View File

@@ -24,9 +24,12 @@ type AudioTrack struct {
}
func (at *AudioTrack) pushByteStream(ts uint32, payload []byte) {
if len(payload) == 0 {
return
}
switch at.CodecID = payload[0] >> 4; at.CodecID {
case codec.CodecID_AAC:
if payload[1] != 0 {
if len(payload) < 4 || payload[1] != 0 {
return
} else {
config1, config2 := payload[2], payload[3]

52
dtsestimator.go Normal file
View File

@@ -0,0 +1,52 @@
package engine
// DTSEstimator is a DTS estimator.
type DTSEstimator struct {
prevDTS uint32
prevPTS uint32
prevPrevPTS uint32
dts func(uint32) uint32
}
func (d *DTSEstimator) _dts(pts uint32) uint32 {
// P or I frame
if pts > d.prevPTS {
// previous frame was B
// use the DTS of the previous frame
if d.prevPTS < d.prevPrevPTS {
return d.prevPTS
}
// previous frame was P or I
// use two frames ago plus a small quantity
// to avoid non-monotonous DTS with B-frames
return d.prevPrevPTS + 1
}
// B Frame
// increase by a small quantity
return d.prevDTS + 1
}
// NewDTSEstimator allocates a DTSEstimator.
func NewDTSEstimator() *DTSEstimator {
result := &DTSEstimator{}
result.dts = func(pts uint32) uint32 {
if pts > 0 {
result.dts = result._dts
}
return pts
}
return result
}
// Feed provides PTS to the estimator, and returns the estimated DTS.
func (d *DTSEstimator) Feed(pts uint32) uint32 {
dts := d.dts(pts)
d.prevPrevPTS = d.prevPTS
d.prevPTS = pts
d.prevDTS = dts
return dts
}

57
rtp.go Normal file
View File

@@ -0,0 +1,57 @@
package engine
import (
"time"
"github.com/Monibuca/utils/v3"
"github.com/pion/rtp"
)
// 对rtp包进行解封装并修复时间戳包括时间戳跳跃
type RTPDemuxer struct {
rtp.Packet
Reorder bool // 是否支持乱序重排
PTS uint32 // 修复后的时间戳(毫秒)
lastTs uint32 // 记录上一个收到的时间戳
lastSeq uint16 // 记录上一个收到的序列号
lastSeq2 uint16 // 记录上上一个收到的序列号
timeBase uint64 // 采样率
timestamp time.Time // 客观时间用于计算耗时
OnDemux func(uint32, []byte)
}
func (r *RTPDemuxer) Push(rtpRaw []byte) {
if err := r.Unmarshal(rtpRaw); err != nil {
utils.Println("RTP Unmarshal error", err)
return
}
// 本次序号是上上次的序号+1 说明中间隔了一个错误序号某些rtsp流中的rtcp包写成了rtp包导致的
if r.SequenceNumber == r.lastSeq2+1 {
r.lastSeq = r.SequenceNumber
} else {
r.lastSeq2 = r.lastSeq
r.lastSeq = r.SequenceNumber
if r.lastSeq != r.lastSeq2+1 { //序号不连续
utils.Println("RTP SequenceNumber error", r.lastSeq2, r.lastSeq)
return
}
}
if r.Timestamp > r.lastTs {
delta := uint32(uint64(r.Timestamp-r.lastTs) * 1000 / r.timeBase)
if delta > 1000 { // 时间戳跳跃
r.PTS += uint32(time.Since(r.timestamp) / time.Millisecond)
} else {
r.PTS += delta
}
} else if r.lastTs > r.Timestamp {
delta := uint32(uint64(r.lastTs-r.Timestamp) * 1000 / r.timeBase)
if delta > 1000 { // 时间戳跳跃
r.PTS += uint32(time.Since(r.timestamp) / time.Millisecond)
} else {
r.PTS -= delta
}
}
r.timestamp = time.Now()
r.OnDemux(r.PTS, r.Payload)
r.lastTs = r.Timestamp
}

View File

@@ -1,21 +1,14 @@
package engine
import (
"time"
"github.com/Monibuca/utils/v3"
"github.com/Monibuca/utils/v3/codec"
"github.com/pion/rtp"
)
type RTPPublisher struct {
rtp.Packet `json:"-"`
lastTs uint32
absTs uint32
lastSeq uint16
ts uint32 //毫秒单位的时间戳
demux func()
}
type RTPAudio struct {
RTPPublisher
RTPDemuxer `json:"-"`
*AudioTrack
}
@@ -23,48 +16,28 @@ func (s *Stream) NewRTPAudio(codec byte) (r *RTPAudio) {
r = &RTPAudio{
AudioTrack: s.NewAudioTrack(codec),
}
r.demux = r.push
r.OnDemux = r.push
return
}
func (v *RTPAudio) push() {
// 该函数只执行一次
func (v *RTPAudio) push(ts uint32, payload []byte) {
switch v.CodecID {
case codec.CodecID_AAC:
v.demux = func() {
for _, payload := range codec.ParseRTPAAC(v.Payload) {
v.PushRaw(v.ts, payload)
v.OnDemux = func(ts uint32, payload []byte) {
for _, payload := range codec.ParseRTPAAC(payload) {
v.PushRaw(ts, payload)
}
}
case codec.CodecID_PCMA, codec.CodecID_PCMU:
v.demux = func() {
v.PushRaw(v.ts, v.Payload)
v.OnDemux = func(ts uint32, payload []byte) {
v.PushRaw(ts, payload)
}
default:
utils.Println("RTP Publisher: Unsupported codec", v.CodecID)
return // TODO
}
v.demux()
}
func (p *RTPAudio) Push(payload []byte) {
if p.Unmarshal(payload) == nil {
if p.lastTs != 0 {
if p.SequenceNumber != p.lastSeq+1 {
println("RTP Publisher: SequenceNumber error", p.lastSeq, p.SequenceNumber)
return
} else {
// if p.lastTs > p.Timestamp {
// if p.lastTs-p.Timestamp > 100000 {
// p.absTs += (p.Timestamp)
// } else { //B frame
// p.absTs -= (p.lastTs - p.Timestamp)
// }
// } else {
// p.absTs += (p.Timestamp - p.lastTs)
// }
p.absTs += (p.Timestamp - p.lastTs)
p.ts = uint32(uint64(p.absTs) * 1000 / uint64(p.SoundRate))
}
}
p.lastTs = p.Timestamp
p.lastSeq = p.SequenceNumber
p.demux()
}
v.timeBase = uint64(v.AudioTrack.SoundRate)
v.timestamp = time.Now()
v.OnDemux(ts, payload)
}

View File

@@ -2,7 +2,6 @@ package engine
import (
"bytes"
"container/heap"
"encoding/binary"
"github.com/Monibuca/utils/v3"
@@ -33,7 +32,7 @@ type RTPNalu struct {
}
type RTPVideo struct {
RTPPublisher
RTPDemuxer `json:"-"`
*VideoTrack
fuaBuffer *bytes.Buffer
demuxNalu func([]byte) *RTPNalu
@@ -43,13 +42,14 @@ func (s *Stream) NewRTPVideo(codecID byte) (r *RTPVideo) {
r = &RTPVideo{
VideoTrack: s.NewVideoTrack(codecID),
}
r.timeBase = 90000
switch codecID {
case codec.CodecID_H264:
r.demuxNalu = r.demuxH264
case codec.CodecID_H265:
r.demuxNalu = r.demuxH265
}
r.demux = r._demux
r.OnDemux = r._demux
return
}
@@ -69,7 +69,7 @@ func (v *RTPVideo) demuxH264(payload []byte) (result *RTPNalu) {
utils.Printf("STAP-A declared size(%d) is larger then buffer(%d)", naluSize, naluLen-currOffset)
return
}
*current = &RTPNalu{Payload: payload[currOffset : currOffset+naluSize], PTS: v.Timestamp}
*current = &RTPNalu{Payload: payload[currOffset : currOffset+naluSize], PTS: v.PTS}
current = &(*current).Next
}
case codec.NALU_MTAP16, codec.NALU_MTAP24:
@@ -85,7 +85,7 @@ func (v *RTPVideo) demuxH264(payload []byte) (result *RTPNalu) {
if lenSize == 5 {
ts = (ts << 8) | uint16(payload[currOffset+5])
}
*current = &RTPNalu{Payload: payload[currOffset : currOffset+naluSize], PTS: v.Timestamp + uint32(ts)}
*current = &RTPNalu{Payload: payload[currOffset : currOffset+naluSize], PTS: v.PTS + uint32(ts)}
current = &(*current).Next
}
/*
@@ -133,12 +133,12 @@ func (v *RTPVideo) demuxH264(payload []byte) (result *RTPNalu) {
}
if v.fuaBuffer != nil {
if v.fuaBuffer.Write(payload[lenSize:]); payload[1]&fuaEndBitmask != 0 {
result = &RTPNalu{Payload: v.fuaBuffer.Bytes(), PTS: v.Timestamp}
result = &RTPNalu{Payload: v.fuaBuffer.Bytes(), PTS: v.PTS}
v.fuaBuffer = nil
}
}
default:
return &RTPNalu{Payload: payload, PTS: v.Timestamp}
return &RTPNalu{Payload: payload, PTS: v.PTS}
}
return
}
@@ -266,61 +266,65 @@ func (p *RTPVideo) demuxH265(payload []byte) (result *RTPNalu) {
return
}
func (p *RTPVideo) _demux() {
if last := p.demuxNalu(p.Payload); last != nil {
p.demux = func() {
if current := p.demuxNalu(p.Payload); current != nil {
if last.PTS > current.PTS { //有B帧
var b B
utils.Println("rtp has B-frame!!")
for heap.Push(&b, last); last.Next != nil; last = last.Next {
heap.Push(&b, last.Next)
}
for heap.Push(&b, current); current.Next != nil; current = current.Next {
heap.Push(&b, current.Next)
}
p.demux = func() {
if current := p.demuxNalu(p.Payload); current != nil {
if current.PTS > b.MaxTS {
for b.Len() > 0 {
el := heap.Pop(&b).(struct {
DTS uint32
*RTPNalu
})
p.absTs += (el.DTS - p.lastTs)
p.lastTs = el.DTS
p.PushNalu(p.absTs/90, (el.PTS/90 - el.DTS/90), el.Payload)
}
b.MaxTS = 0
}
for heap.Push(&b, current); current.Next != nil; current = current.Next {
heap.Push(&b, current.Next)
}
}
}
return
}
if p.lastTs != 0 {
p.absTs += (last.PTS - p.lastTs)
}
p.lastTs = last.PTS
p.PushNalu(p.absTs/90, 0, last.Payload)
for last = current; last.Next != nil; last = last.Next {
p.absTs += (last.PTS - p.lastTs)
p.lastTs = last.PTS
p.PushNalu(p.absTs/90, 0, last.Payload)
}
// func (p *RTPVideo) _demux(ts uint32, payload []byte) {
// p.timestamp = time.Now()
// if last := p.demuxNalu(payload); last != nil {
// p.OnDemux = func(ts uint32, payload []byte) {
// if current := p.demuxNalu(payload); current != nil {
// if last.PTS > current.PTS { //有B帧
// var b B
// utils.Println("rtp has B-frame!!")
// for heap.Push(&b, last); last.Next != nil; last = last.Next {
// heap.Push(&b, last.Next)
// }
// for heap.Push(&b, current); current.Next != nil; current = current.Next {
// heap.Push(&b, current.Next)
// }
// p.OnDemux = func(ts uint32, payload []byte) {
// if current := p.demuxNalu(payload); current != nil {
// if current.PTS > b.MaxTS {
// for b.Len() > 0 {
// el := heap.Pop(&b).(struct {
// DTS uint32
// *RTPNalu
// })
// p.PushNalu(el.DTS, (el.PTS - el.DTS), el.Payload)
// }
// b.MaxTS = 0
// }
// for heap.Push(&b, current); current.Next != nil; current = current.Next {
// heap.Push(&b, current.Next)
// }
// }
// }
// return
// }
// p.PushNalu(p.PTS, 0, last.Payload)
// for last = current; last.Next != nil; last = last.Next {
// p.PushNalu(p.PTS, 0, last.Payload)
// }
// }
// }
// }
// }
func (p *RTPVideo) _demux(ts uint32, payload []byte) {
if nalus := p.demuxNalu(payload); nalus != nil {
startPTS := nalus.PTS
dtsEst := NewDTSEstimator()
dts := dtsEst.Feed(0)
p.PushNalu(dts, 0, nalus.Payload)
for nalus = nalus.Next; nalus != nil; nalus = nalus.Next {
pts := nalus.PTS - startPTS
dts := dtsEst.Feed(pts)
p.PushNalu(dts, pts-dts, nalus.Payload)
}
p.OnDemux = func(ts uint32, payload []byte) {
for nalus := p.demuxNalu(p.Payload); nalus != nil; nalus = nalus.Next {
pts := nalus.PTS - startPTS
dts := dtsEst.Feed(pts)
p.PushNalu(dts, pts-dts, nalus.Payload)
}
}
}
}
func (p *RTPVideo) Push(payload []byte) {
if p.Unmarshal(payload) == nil {
if p.lastSeq > 0 && p.SequenceNumber != p.lastSeq+1 {
println("RTP Publisher: SequenceNumber error", p.lastSeq, p.SequenceNumber)
return
}
p.lastSeq = p.SequenceNumber
p.demux()
}
}

View File

@@ -372,7 +372,7 @@ func (vt *VideoTrack) setIDR(idr bool) {
vt.AVItem.Value = vt.keyFrameBuffers.Remove(cache)
vt.VideoPack = vt.AVItem.Value.(*VideoPack) //设置当前操作的指针
}
} else {//原来是关键帧,现在是非关键帧,把原来的关键帧缓存放回去
} else { //原来是关键帧,现在是非关键帧,把原来的关键帧缓存放回去
vt.keyFrameBuffers.PushBack(vt.AVItem.Value)
vt.VideoPack = new(VideoPack) //设置当前操作的指针
vt.AVItem.Value = vt.VideoPack