diff --git a/common/dtsestimator.go b/common/dtsestimator.go index b3cf8ce..a7ee56c 100644 --- a/common/dtsestimator.go +++ b/common/dtsestimator.go @@ -25,24 +25,28 @@ func (d *DTSEstimator) Clone() *DTSEstimator { func (d *DTSEstimator) add(pts uint32) { i := 0 - if len(d.cache) >= 4 { - i = len(d.cache) - 3 + l := len(d.cache) + if l >= 4 { + l-- + // i = l - 3 + d.cache = append(d.cache[:0], d.cache[1:]...)[:l] } - var new_cache []uint32 - for ; i < len(d.cache); i = i + 1 { + for ; i < l; i = i + 1 { if d.cache[i] > pts { break } - new_cache = append(new_cache, d.cache[i]) } - new_cache = append(new_cache, pts) - new_cache = append(new_cache, d.cache[i:]...) - d.cache = new_cache + d.cache = append(d.cache, pts) + d.cache = append(d.cache[:i+1], d.cache[i:l]...) + d.cache[i] = pts } // Feed provides PTS to the estimator, and returns the estimated DTS. func (d *DTSEstimator) Feed(pts uint32) uint32 { + if pts < d.prevPTS && d.prevPTS-pts > 0x80000000 { + *d = *NewDTSEstimator() + } d.add(pts) dts := pts if !d.hasB { diff --git a/common/dtsestimator_test.go b/common/dtsestimator_test.go new file mode 100644 index 0000000..1aa0d55 --- /dev/null +++ b/common/dtsestimator_test.go @@ -0,0 +1,17 @@ +package common + +import ( + "testing" +) + +func TestDts(t *testing.T) { + t.Run(t.Name(), func(t *testing.T) { + dtsg := NewDTSEstimator() + var pts uint32 = 0xFFFFFFFF - 5 + for i := 0; i < 10; i++ { + dts := dtsg.Feed(pts) + pts++ + t.Logf("dts=%d", dts) + } + }) +} diff --git a/common/frame.go b/common/frame.go index 2ce95ca..5d7071f 100644 --- a/common/frame.go +++ b/common/frame.go @@ -88,11 +88,6 @@ func (av *AVFrame) Reset() { av.DeltaTime = 0 } -type SequenceHead struct { - AVCC []byte - Seq int //收到第几个序列帧,用于变码率时让订阅者发送序列帧 -} - type ParamaterSets [][]byte func (v ParamaterSets) GetAnnexB() (r net.Buffers) { diff --git a/plugin.go b/plugin.go index a155677..5a3ad21 100644 --- a/plugin.go +++ b/plugin.go @@ -286,8 +286,8 @@ func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) time.Sleep(time.Second * 5) } else { if err = opt.Publish(streamPath, puller); err != nil { - if puber := Streams.Get(streamPath).Publisher; puber != puller && puber != nil { - io := puber.GetPublisher() + if stream := Streams.Get(streamPath); stream != nil && stream.Publisher != puller && stream.Publisher != nil { + io := stream.Publisher.GetPublisher() opt.Error("puller is not publisher", zap.String("ID", io.ID), zap.String("Type", io.Type), zap.Error(err)) return } diff --git a/track/h264.go b/track/h264.go index 852c207..490e8b8 100644 --- a/track/h264.go +++ b/track/h264.go @@ -39,8 +39,7 @@ func (vt *H264) WriteSliceBytes(slice []byte) { vt.ParamaterSets[1] = slice lenSPS := len(vt.Video.SPS) lenPPS := len(vt.Video.PPS) - b := util.Buffer(vt.SequenceHead) - b.Reset() + var b util.Buffer if lenSPS > 3 { b.Write(codec.RTMP_AVC_HEAD[:6]) b.Write(vt.Video.SPS[1:4]) @@ -51,8 +50,10 @@ func (vt *H264) WriteSliceBytes(slice []byte) { b.WriteByte(0xE1) b.WriteUint16(uint16(lenSPS)) b.Write(vt.Video.SPS) + b.WriteByte(0x01) b.WriteUint16(uint16(lenPPS)) b.Write(vt.Video.PPS) + vt.SequenceHead = b vt.updateSequeceHead() case codec.NALU_IDR_Picture: vt.Value.IFrame = true @@ -119,7 +120,7 @@ func (vt *H264) WriteRTPFrame(frame *RTPFrame) { if util.Bit1(frame.Payload[1], 0) { vt.WriteSliceByte(naluType.Parse(frame.Payload[1]).Or(frame.Payload[0] & 0x60)) } - rv.AUList.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():])) + rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(frame.Payload[naluType.Offset():])) } } frame.SequenceNumber += vt.rtpSequence //增加偏移,需要增加rtp包后需要顺延 diff --git a/track/h265.go b/track/h265.go index 6394e6d..8579d2d 100644 --- a/track/h265.go +++ b/track/h265.go @@ -111,12 +111,7 @@ func (vt *H265) WriteRTPFrame(frame *RTPFrame) { if naluType := fuHeader & 0b00111111; util.Bit1(fuHeader, 0) { vt.WriteSliceByte(first3[0]&0b10000001|(naluType<<1), first3[1]) } - rv.AUList.Push(vt.BytesPool.GetShell(buffer)) - // if util.Bit1(fuHeader, 1) { - // complete := rv.Raw[lastIndex] //拼接完成 - // rv.Raw = rv.Raw[:lastIndex] // 缩短一个元素,因为后面的方法会加回去 - // vt.WriteSlice(complete) - // } + rv.AUList.Pre.Value.Push(vt.BytesPool.GetShell(buffer)) default: vt.WriteSliceBytes(frame.Payload) } diff --git a/track/video.go b/track/video.go index 5d7b134..8ab4c21 100644 --- a/track/video.go +++ b/track/video.go @@ -179,12 +179,12 @@ func (vt *Video) CompleteAVCC(rv *AVFrame) { // 写入CTS util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90) rv.AVCC.Push(mem) - vt.Value.AUList.Range(func(au *util.BLL) bool { + rv.AUList.Range(func(au *util.BLL) bool { mem = vt.BytesPool.Get(4) util.PutBE(mem.Value, uint32(au.ByteLength)) - vt.Value.AVCC.Push(mem) + rv.AVCC.Push(mem) au.Range(func(slice util.BLI) bool { - vt.Value.AVCC.Push(vt.BytesPool.GetShell(slice)) + rv.AVCC.Push(vt.BytesPool.GetShell(slice)) return true }) return true diff --git a/util/amf.go b/util/amf.go index 498f3c3..977308d 100644 --- a/util/amf.go +++ b/util/amf.go @@ -83,7 +83,7 @@ type AMF struct { Buffer } -func ReadAMF[T any](amf *AMF) (result T) { +func ReadAMF[T string | float64 | bool | map[string]any](amf *AMF) (result T) { value, err := amf.Unmarshal() if err != nil { return diff --git a/util/list.go b/util/list.go index 6055e19..1da0dc2 100644 --- a/util/list.go +++ b/util/list.go @@ -82,7 +82,7 @@ func (p *List[T]) Range(do func(value T) bool) { } func (p *List[T]) Recycle() { - for item := p.Next; item != nil && !item.IsRoot(); { + for item := p.Next; item != nil && item.list != nil && !item.IsRoot(); { next := item.Next item.Recycle() item = next diff --git a/util/pool.go b/util/pool.go index 965b5f2..c0179a5 100644 --- a/util/pool.go +++ b/util/pool.go @@ -78,7 +78,7 @@ func (r *BLLsReader) CanRead() bool { return r.ListItem != nil && !r.IsRoot() } -func (r *BLLsReader) ReadByte() (b byte, err error) { +func (r *BLLsReader) ReadByte() (b byte, err error) { if r.BLLReader.CanRead() { b, err = r.BLLReader.ReadByte() if err == nil { @@ -92,6 +92,7 @@ func (r *BLLsReader) ReadByte() (b byte, err error) { r.BLLReader = *r.Value.NewReader() return r.BLLReader.ReadByte() } + type BLLs struct { List[*BLL] ByteLength int @@ -257,6 +258,9 @@ type BytesPool []List[BLI] // 获取来自真实内存的切片的——假内存块,即只回收外壳 func (p BytesPool) GetShell(b []byte) (item *ListItem[BLI]) { + if len(p) == 0 { + return &ListItem[BLI]{Value: b} + } if p[0].Length > 0 { item = p[0].Shift() } else { @@ -275,17 +279,19 @@ func (p BytesPool) Get(size int) (item *ListItem[BLI]) { item = p[i].Shift() item.Value = item.Value[:size] } else { - item = &ListItem[BLI]{} + item = &ListItem[BLI]{ + Value: make([]byte, size, level), + } } item.Pool = &p[i] - item.Value = make([]byte, size, level) return } } // Pool 中没有就无法回收 if item == nil { - item = &ListItem[BLI]{} - item.Value = make([]byte, size) + item = &ListItem[BLI]{ + Value: make([]byte, size), + } } return }