feat: fit wrong timestamp

This commit is contained in:
langhuihui
2024-01-04 19:01:19 +08:00
parent 453ea6a6f2
commit 048c7eb0a8
5 changed files with 95 additions and 15 deletions

View File

@@ -1,11 +1,14 @@
package common
import "m7s.live/engine/v4/util"
// DTSEstimator is a DTS estimator.
type DTSEstimator struct {
hasB bool
prevPTS uint32
prevDTS uint32
cache []uint32
interval uint32
}
// NewDTSEstimator allocates a DTSEstimator.
@@ -20,6 +23,7 @@ func (d *DTSEstimator) Clone() *DTSEstimator {
d.prevPTS,
d.prevDTS,
append([]uint32(nil), d.cache...),
d.interval,
}
}
@@ -44,9 +48,11 @@ func (d *DTSEstimator) add(pts uint32) {
// Feed provides PTS to the estimator, and returns the estimated DTS.
func (d *DTSEstimator) Feed(pts uint32) uint32 {
if pts < d.prevPTS && d.prevPTS-pts > 0x80000000 {
interval := util.Conditoinal(pts > d.prevPTS, pts-d.prevPTS, d.prevPTS-pts)
if interval > 10*d.interval {
*d = *NewDTSEstimator()
}
d.interval = interval
d.add(pts)
dts := pts
if !d.hasB {

View File

@@ -15,3 +15,14 @@ func TestDts(t *testing.T) {
}
})
}
func Test往前跳(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
data := []uint32{64175310, 64178910, 64182510, 64186110, 64189710, 64340910, 64344510, 64348110, 64351710, 64355310, 64358910}
dtsg := NewDTSEstimator()
for _, pts := range data {
dts := dtsg.Feed(pts)
t.Logf("pts=%d,dts=%d", pts, dts)
}
})
}

View File

@@ -12,6 +12,8 @@ import (
"m7s.live/engine/v4/util"
)
var deltaDTSRange time.Duration = 90 * 10000 // 超过 10 秒
type 流速控制 struct {
起始时间戳 time.Duration
起始dts time.Duration
@@ -99,7 +101,7 @@ type Media struct {
RTPDemuxer
SpesificTrack `json:"-" yaml:"-"`
deltaTs time.Duration //用于接续发布后时间戳连续
deltaDTSRange time.Duration //DTS差的范围
流速控制
}
@@ -225,6 +227,7 @@ func (av *Media) AddIDR() {
func (av *Media) Flush() {
curValue, preValue, nextValue := av.Value, av.LastValue, av.Next()
useDts := curValue.Timestamp == 0
originDTS := curValue.DTS
if av.State == TrackStateOffline {
av.State = TrackStateOnline
if useDts {
@@ -256,14 +259,12 @@ func (av *Media) Flush() {
} else {
if useDts {
deltaDts := curValue.DTS - preValue.DTS
if deltaDts <= 0 && deltaDts > -(1<<15) {
// 生成一个无奈的deltaDts
deltaDts = 90
// 必须保证DTS递增
curValue.DTS = preValue.DTS + deltaDts
} else if deltaDts != 90 {
// 正常情况下生成容错范围
av.deltaDTSRange = deltaDts * 2
if deltaDts > deltaDTSRange || deltaDts < -deltaDTSRange {
// 时间戳跳变,等同于离线重连
av.deltaTs = deltaDts
curValue.DTS = preValue.DTS + 90
curValue.PTS = preValue.PTS + 90
av.Warn("track dts reset", zap.Int64("delta", int64(deltaDts)))
}
curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS)
}
@@ -271,7 +272,7 @@ func (av *Media) Flush() {
curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond)
}
if log.Trace {
av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Duration("dts", curValue.DTS), zap.Duration("dts delta", curValue.DTS-preValue.DTS), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp), zap.Int("au", curValue.AUList.Length), zap.Int("rtp", curValue.RTP.Length), zap.Int("avcc", curValue.AVCC.ByteLength), zap.Int("raw", curValue.AUList.ByteLength), zap.Int("bps", av.BPS))
av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Int64("dts0", int64(preValue.DTS)), zap.Int64("dts1", int64(originDTS)), zap.Uint64("dts2", uint64(curValue.DTS)), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp), zap.Int("au", curValue.AUList.Length), zap.Int("rtp", curValue.RTP.Length), zap.Int("avcc", curValue.AVCC.ByteLength), zap.Int("raw", curValue.AUList.ByteLength), zap.Int("bps", av.BPS))
}
bufferTime := av.Stream.GetPublisherConfig().BufferTime
if bufferTime > 0 && av.IDRingList.IDRList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.IDRList.Next.Next.Value.Value.Timestamp) > bufferTime {

47
util/timestamp.go Normal file
View File

@@ -0,0 +1,47 @@
package util
// 时间戳评估,对于时间戳不规范的情况,需要纠正,使得时间戳自增,相对合理
// 定义一个全局结构来存储状态
type TimestampProcessor struct {
baseTimestamp int
lastTimestamp int
normalTotalTime int
averageInterval int
normalCount int
}
// 处理单个时间戳的函数
func (p *TimestampProcessor) ProcessTimestamp(timestamp int) int {
if p.normalCount == 0 {
// 处理第一个时间戳
p.baseTimestamp = timestamp
p.lastTimestamp = timestamp
p.normalCount = 1
return timestamp
}
delta := timestamp - p.lastTimestamp
// 计算当前间隔
currentInterval := Conditoinal(delta > 0, delta, -delta)
// 判断是否为突变
if p.averageInterval > 0 && currentInterval > 10*p.averageInterval {
// 突变,调整起始时间戳和相关累计信息
p.baseTimestamp = p.lastTimestamp
p.normalTotalTime = p.averageInterval
p.normalCount = 1
} else {
// 非突变,累加时间和更新计数
p.normalTotalTime += currentInterval
p.normalCount++
p.averageInterval = p.normalTotalTime / (p.normalCount - 1)
}
// 更新最后一个时间戳
p.lastTimestamp = timestamp
// 计算输出时间戳
outputTimestamp := p.baseTimestamp + p.normalTotalTime
return outputTimestamp
}

15
util/timestamp_test.go Normal file
View File

@@ -0,0 +1,15 @@
package util
import (
"testing"
)
func TestTimestamp(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
var p TimestampProcessor
var testData = []int{0, 10, 20, 30, 40, 50, 60, 70, 80, 10, 20, 30, 40, 50, 60, 70, 80}
for _, v := range testData {
t.Log(p.ProcessTimestamp(v))
}
})
}