From ada9848dbd63ecde69fb2fea7351301a7c0c034e Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Sun, 4 Jul 2021 21:54:38 +0800 Subject: [PATCH] =?UTF-8?q?ring=20=E5=8A=9F=E8=83=BD=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- audio_track.go | 1 + base_track.go | 6 +----- subscriber.go | 38 ++++++++++++++++++++------------------ video_track.go | 27 ++++++++++++++++----------- 4 files changed, 38 insertions(+), 34 deletions(-) diff --git a/audio_track.go b/audio_track.go index 7b65787..b635bbe 100644 --- a/audio_track.go +++ b/audio_track.go @@ -129,6 +129,7 @@ func (at *AudioTrack) push(pack *AudioPack) { at.bytes = 0 at.ts = pack.Timestamp } + at.lastTs = pack.Timestamp at.Step() } diff --git a/base_track.go b/base_track.go index ed82d6a..6b8f5ce 100644 --- a/base_track.go +++ b/base_track.go @@ -16,7 +16,6 @@ type Track interface { type AVPack interface { Since(uint32) uint32 - Distance(int) int } type BasePack struct { @@ -30,10 +29,6 @@ func (p *BasePack) Since(ts uint32) uint32 { return p.Timestamp - ts } -func (p *BasePack) Distance(sq int) int { - return p.Sequence - sq -} - type Track_Base struct { RingDisposable `json:"-"` Stream *Stream `json:"-"` @@ -42,6 +37,7 @@ type Track_Base struct { BPS int bytes int // GOP内的数据大小 ts uint32 // GOP起始时间戳 + lastTs uint32 //最新的时间戳 } func (t *Track_Base) GetBPS() { diff --git a/subscriber.go b/subscriber.go index adc7ae5..98b9353 100644 --- a/subscriber.go +++ b/subscriber.go @@ -85,7 +85,7 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { } vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 ar := at.Clone() - dropping := false //是否处于丢帧中 + // dropping := false //是否处于丢帧中 vp := vr.Read().(*VideoPack) ap := ar.Read().(*AudioPack) startTimestamp := vp.Timestamp @@ -97,23 +97,25 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { return default: if ap.Timestamp > vp.Timestamp || ap.Timestamp == 0 { - if !dropping { - s.OnVideo(vp.Copy(startTimestamp)) - if vt.CurrentValue().(AVPack).Since(vp.Timestamp) > 1000 { - dropping = true - } - } else if vp.IDR { - dropping = false - } + s.OnVideo(vp.Copy(startTimestamp)) + // if !dropping { + // s.OnVideo(vp.Copy(startTimestamp)) + // if vt.lastTs - vp.Timestamp > 1000 { + // dropping = true + // } + // } else if vp.IDR { + // dropping = false + // } vr.MoveNext() vp = vr.Read().(*VideoPack) } else { - if !dropping { - s.OnAudio(ap.Copy(startTimestamp)) - if at.CurrentValue().(AVPack).Since(ap.Timestamp) > 1000 { - dropping = true - } - } + s.OnAudio(ap.Copy(startTimestamp)) + // if !dropping { + // s.OnAudio(ap.Copy(startTimestamp)) + // if at.CurrentValue().(AVPack).Since(ap.Timestamp) > 1000 { + // dropping = true + // } + // } ar.MoveNext() ap = ar.Read().(*AudioPack) } @@ -128,14 +130,14 @@ func (s *Subscriber) PlayAudio(at *AudioTrack) { droped := 0 var action, send func() drop := func() { - if at.CurrentValue().(AVPack).Distance(ap.Sequence) < 4 { + if at.current().Sequence-ap.Sequence < 4 { action = send } else { droped++ } } send = func() { - if s.OnAudio(ap.Copy(startTimestamp)); at.CurrentValue().(AVPack).Since(ap.Timestamp) > 1000 { + if s.OnAudio(ap.Copy(startTimestamp)); at.lastTs -ap.Timestamp > 1000 { action = drop } } @@ -179,7 +181,7 @@ func (s *Subscriber) PlayVideo(vt *VideoTrack) { } } send = func() { - if s.OnVideo(vp.Copy(startTimestamp)); vt.CurrentValue().(AVPack).Since(vp.Timestamp) > 1000 { + if s.OnVideo(vp.Copy(startTimestamp)); vt.lastTs - vp.Timestamp > 1000 { action = drop } } diff --git a/video_track.go b/video_track.go index f999076..46ccd89 100644 --- a/video_track.go +++ b/video_track.go @@ -33,8 +33,7 @@ func (vp VideoPack) Copy(ts uint32) VideoPack { } type VideoTrack struct { - IDRing *ring.Ring //最近的关键帧位置,首屏渲染 - lastIDR *VideoPack + IDRing *ring.Ring //最近的关键帧位置,首屏渲染 Track_Base SPSInfo codec.SPSInfo GOP int //关键帧间隔 @@ -59,21 +58,25 @@ func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) { vt = &VideoTrack{ revIDR: func() { vt.IDRing = vt.Ring - vt.lastIDR = vt.CurrentValue().(*VideoPack) cancel() + idrSequence := vt.current().Sequence + l := vt.Ring.Len() vt.revIDR = func() { - current := vt.CurrentValue().(*VideoPack) - vt.GOP = current.Distance(vt.lastIDR.Sequence) - if l := vt.Ring.Len(); vt.IDRing.Value != vt.lastIDR { + current := vt.current() + if vt.GOP = current.Sequence - idrSequence; vt.GOP > l-1 { //缓冲环不够大,导致IDR被覆盖 exRing := NewRingBuffer(vt.GOP - l + 5).Ring exRing.Do(vt.initVideoRing) vt.Link(exRing) // 扩大缓冲环 + l = vt.Ring.Len() + utils.Printf("%s ring grow to %d", s.StreamPath, l) } else if vt.GOP < l-5 { vt.Unlink(l - vt.GOP - 5) //缩小缓冲环节省内存 + l = vt.Ring.Len() + utils.Printf("%s ring atrophy to %d", s.StreamPath, l) } vt.IDRing = vt.Ring - vt.lastIDR = current + idrSequence = current.Sequence vt.ts = current.Timestamp vt.bytes = 0 } @@ -492,15 +495,17 @@ func (vt *VideoTrack) pushByteStream(ts uint32, payload []byte) { } // 已完成序列帧组装,重置Push函数,从Payload中提取Nalu供非bytestream格式使用 vt.PushByteStream = func(ts uint32, payload []byte) { - pack := vt.CurrentValue().(*VideoPack) + pack := vt.current() if len(payload) < 4 { return } vt.bytes += len(payload) + pack.IDR = payload[0]>>4 == 1 pack.Timestamp = ts pack.Sequence = vt.PacketCount pack.Payload = payload pack.CompositionTime = utils.BigEndian.Uint24(payload[2:]) + pack.NALUs = nil for nalus := payload[5:]; len(nalus) > nalulenSize; { nalulen := 0 for i := 0; i < nalulenSize; i++ { @@ -522,9 +527,9 @@ func (vt *VideoTrack) push(pack *VideoPack) { vt.writeByteStream(pack) } vt.GetBPS() - pack.Sequence = vt.PacketCount - if pack.IDR { - defer vt.revIDR() + if pack.Sequence = vt.PacketCount; pack.IDR { + vt.revIDR() } + vt.lastTs = pack.Timestamp vt.Step() }