修复DTS生成时遇到PTS超过循环的情况

This commit is contained in:
dexter
2023-01-30 22:17:34 +08:00
parent db6f8de3a7
commit b306f65a7a
10 changed files with 52 additions and 34 deletions

View File

@@ -25,24 +25,28 @@ func (d *DTSEstimator) Clone() *DTSEstimator {
func (d *DTSEstimator) add(pts uint32) {
i := 0
if len(d.cache) >= 4 {
i = len(d.cache) - 3
l := len(d.cache)
if l >= 4 {
l--
// i = l - 3
d.cache = append(d.cache[:0], d.cache[1:]...)[:l]
}
var new_cache []uint32
for ; i < len(d.cache); i = i + 1 {
for ; i < l; i = i + 1 {
if d.cache[i] > pts {
break
}
new_cache = append(new_cache, d.cache[i])
}
new_cache = append(new_cache, pts)
new_cache = append(new_cache, d.cache[i:]...)
d.cache = new_cache
d.cache = append(d.cache, pts)
d.cache = append(d.cache[:i+1], d.cache[i:l]...)
d.cache[i] = pts
}
// Feed provides PTS to the estimator, and returns the estimated DTS.
func (d *DTSEstimator) Feed(pts uint32) uint32 {
if pts < d.prevPTS && d.prevPTS-pts > 0x80000000 {
*d = *NewDTSEstimator()
}
d.add(pts)
dts := pts
if !d.hasB {

View File

@@ -0,0 +1,17 @@
package common
import (
"testing"
)
func TestDts(t *testing.T) {
t.Run(t.Name(), func(t *testing.T) {
dtsg := NewDTSEstimator()
var pts uint32 = 0xFFFFFFFF - 5
for i := 0; i < 10; i++ {
dts := dtsg.Feed(pts)
pts++
t.Logf("dts=%d", dts)
}
})
}

View File

@@ -88,11 +88,6 @@ func (av *AVFrame) Reset() {
av.DeltaTime = 0
}
type SequenceHead struct {
AVCC []byte
Seq int //收到第几个序列帧,用于变码率时让订阅者发送序列帧
}
type ParamaterSets [][]byte
func (v ParamaterSets) GetAnnexB() (r net.Buffers) {

View File

@@ -286,8 +286,8 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int)
time.Sleep(time.Second * 5)
} else {
if err = opt.Publish(streamPath, puller); err != nil {
if puber := Streams.Get(streamPath).Publisher; puber != puller && puber != nil {
io := puber.GetPublisher()
if stream := Streams.Get(streamPath); stream != nil && stream.Publisher != puller && stream.Publisher != nil {
io := stream.Publisher.GetPublisher()
opt.Error("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err))
return
}

View File

@@ -39,8 +39,7 @@ func (vt *H264) WriteSliceBytes(slice []byte) {
vt.ParamaterSets[1] = slice
lenSPS := len(vt.Video.SPS)
lenPPS := len(vt.Video.PPS)
b := util.Buffer(vt.SequenceHead)
b.Reset()
var b util.Buffer
if lenSPS > 3 {
b.Write(codec.RTMP_AVC_HEAD[:6])
b.Write(vt.Video.SPS[1:4])
@@ -51,8 +50,10 @@ func (vt *H264) WriteSliceBytes(slice []byte) {
b.WriteByte(0xE1)
b.WriteUint16(uint16(lenSPS))
b.Write(vt.Video.SPS)
b.WriteByte(0x01)
b.WriteUint16(uint16(lenPPS))
b.Write(vt.Video.PPS)
vt.SequenceHead = b
vt.updateSequeceHead()
case codec.NALU_IDR_Picture:
vt.Value.IFrame = true
@@ -119,7 +120,7 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) {
if util.Bit1(frame.Payload[1], 0) {
vt.WriteSliceByte(naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60))
}
rv.AUList.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():]))
rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():]))
}
}
frame.SequenceNumber += vt.rtpSequence //增加偏移需要增加rtp包后需要顺延

View File

@@ -111,12 +111,7 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) {
if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) {
vt.WriteSliceByte(first3[0]&0b10000001|(naluType<<1), first3[1])
}
rv.AUList.Push(vt.BytesPool.GetShell(buffer))
// if util.Bit1(fuHeader, 1) {
// complete := rv.Raw[lastIndex] //拼接完成
// rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去
// vt.WriteSlice(complete)
// }
rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(buffer))
default:
vt.WriteSliceBytes(frame.Payload)
}

View File

@@ -179,12 +179,12 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) {
// 写入CTS
util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90)
rv.AVCC.Push(mem)
vt.Value.AUList.Range(func(au *util.BLL) bool {
rv.AUList.Range(func(au *util.BLL) bool {
mem = vt.BytesPool.Get(4)
util.PutBE(mem.Value, uint32(au.ByteLength))
vt.Value.AVCC.Push(mem)
rv.AVCC.Push(mem)
au.Range(func(slice util.BLI) bool {
vt.Value.AVCC.Push(vt.BytesPool.GetShell(slice))
rv.AVCC.Push(vt.BytesPool.GetShell(slice))
return true
})
return true

View File

@@ -83,7 +83,7 @@ type AMF struct {
Buffer
}
func ReadAMF[T any](amf *AMF) (result T) {
func ReadAMF[T string | float64 | bool | map[string]any](amf *AMF) (result T) {
value, err := amf.Unmarshal()
if err != nil {
return

View File

@@ -82,7 +82,7 @@ func (p *List[T]) Range(do func(value T) bool) {
}
func (p *List[T]) Recycle() {
for item := p.Next; item != nil && !item.IsRoot(); {
for item := p.Next; item != nil && item.list != nil && !item.IsRoot(); {
next := item.Next
item.Recycle()
item = next

View File

@@ -78,7 +78,7 @@ func (r *BLLsReader) CanRead() bool {
return r.ListItem != nil && !r.IsRoot()
}
func (r *BLLsReader) ReadByte() (b byte, err error) {
func (r *BLLsReader) ReadByte() (b byte, err error) {
if r.BLLReader.CanRead() {
b, err = r.BLLReader.ReadByte()
if err == nil {
@@ -92,6 +92,7 @@ func (r *BLLsReader) ReadByte() (b byte, err error) {
r.BLLReader = *r.Value.NewReader()
return r.BLLReader.ReadByte()
}
type BLLs struct {
List[*BLL]
ByteLength int
@@ -257,6 +258,9 @@ type BytesPool []List[BLI]
// 获取来自真实内存的切片的——假内存块,即只回收外壳
func (p BytesPool) GetShell(b []byte) (item *ListItem[BLI]) {
if len(p) == 0 {
return &ListItem[BLI]{Value: b}
}
if p[0].Length > 0 {
item = p[0].Shift()
} else {
@@ -275,17 +279,19 @@ func (p BytesPool) Get(size int) (item *ListItem[BLI]) {
item = p[i].Shift()
item.Value = item.Value[:size]
} else {
item = &ListItem[BLI]{}
item = &ListItem[BLI]{
Value: make([]byte, size, level),
}
}
item.Pool = &p[i]
item.Value = make([]byte, size, level)
return
}
}
// Pool 中没有就无法回收
if item == nil {
item = &ListItem[BLI]{}
item.Value = make([]byte, size)
item = &ListItem[BLI]{
Value: make([]byte, size),
}
}
return
}