From 84fef0761a2d25cff1b7a94203a7867a6ccd4dc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AE=87=E7=BF=94?= <11123448@vivo.xyz> Date: Fri, 9 Jul 2021 17:47:07 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=89=A9=E7=BC=A9ringbuffer?= =?UTF-8?q?=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream.go | 7 ------- subscriber.go | 5 ++--- video_track.go | 38 ++++++++++++++++++++------------------ 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/stream.go b/stream.go index ff27301..4408459 100644 --- a/stream.go +++ b/stream.go @@ -63,7 +63,6 @@ type Stream struct { subscribeMutex sync.Mutex timeout *time.Timer //更新时间用来做超时处理 Close func() `json:"-"` - prePayload uint32 //需要预拼装ByteStream格式的数据的订阅者数量 } func (r *Stream) Update() { @@ -134,9 +133,6 @@ func (r *Stream) Subscribe(s *Subscriber) { utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) s.Context, s.cancel = context.WithCancel(r) r.subscribeMutex.Lock() - if s.ByteStreamFormat { - r.prePayload++ - } r.Subscribers = append(r.Subscribers, s) r.subscribeMutex.Unlock() utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) @@ -149,9 +145,6 @@ func (r *Stream) UnSubscribe(s *Subscriber) { if r.Err() == nil { var deleted bool r.subscribeMutex.Lock() - if s.ByteStreamFormat { - r.prePayload-- - } r.Subscribers, deleted = DeleteSliceItem_Subscriber(r.Subscribers, s) r.subscribeMutex.Unlock() if deleted { diff --git a/subscriber.go b/subscriber.go index 98b9353..d22f59a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -25,7 +25,6 @@ type Subscriber struct { SubscribeArgs url.Values OnAudio func(pack AudioPack) `json:"-"` OnVideo func(pack VideoPack) `json:"-"` - ByteStreamFormat bool closeOnce sync.Once } @@ -137,7 +136,7 @@ func (s *Subscriber) PlayAudio(at *AudioTrack) { } } send = func() { - if s.OnAudio(ap.Copy(startTimestamp)); at.lastTs -ap.Timestamp > 1000 { + if s.OnAudio(ap.Copy(startTimestamp)); at.lastTs-ap.Timestamp > 1000 { action = drop } } @@ -181,7 +180,7 @@ func (s *Subscriber) PlayVideo(vt *VideoTrack) { } } send = func() { - if s.OnVideo(vp.Copy(startTimestamp)); vt.lastTs - vp.Timestamp > 1000 { + if s.OnVideo(vp.Copy(startTimestamp)); vt.lastTs-vp.Timestamp > 1000 { action = drop } } diff --git a/video_track.go b/video_track.go index b607ad9..fdd4c90 100644 --- a/video_track.go +++ b/video_track.go @@ -47,13 +47,6 @@ type VideoTrack struct { idrCount int //处于缓冲中的关键帧数量 } -func (vt *VideoTrack) initVideoRing(v interface{}) { - pack := new(VideoPack) - if vt.writeByteStream != nil { - pack.Buffer = bytes.NewBuffer([]byte{}) - } - v.(*RingItem).Value = pack -} func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) { var cancel context.CancelFunc vt = &VideoTrack{ @@ -66,10 +59,14 @@ func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) { vt.revIDR = func() { vt.idrCount++ current := vt.current() - l := vt.Ring.Len() - if vt.GOP = current.Sequence - idrSequence; vt.GOP < l-5 { - vt.Unlink(l - vt.GOP - 5) //缩小缓冲环节省内存 - //utils.Printf("%s gop:%d ring atrophy to %d", s.StreamPath, vt.GOP, l) + vt.GOP = current.Sequence - idrSequence + if l := vt.Ring.Len() - vt.GOP - 5; l > 5 { + //缩小缓冲环节省内存 + vt.Unlink(l).Do(func(v interface{}) { + if v.(*RingItem).Value.(*VideoPack).IDR { + vt.idrCount-- + } + }) } vt.IDRing = vt.Ring idrSequence = current.Sequence @@ -83,7 +80,9 @@ func (s *Stream) NewVideoTrack(codec byte) (vt *VideoTrack) { vt.Stream = s vt.CodecID = codec vt.Init(256) - vt.Do(vt.initVideoRing) + vt.Do(func(v interface{}) { + v.(*RingItem).Value = new(VideoPack) + }) vt.WaitIDR, cancel = context.WithCancel(context.Background()) switch codec { case 7: @@ -525,14 +524,17 @@ func (vt *VideoTrack) push(pack *VideoPack) { vt.revIDR() } vt.lastTs = pack.Timestamp - nextPack := vt.NextValue().(*VideoPack) - if nextPack.IDR { + if nextPack := vt.NextValue().(*VideoPack); nextPack.IDR { if vt.idrCount == 1 { - exRing := NewRingBuffer(5).Ring - exRing.Do(vt.initVideoRing) + exRing := ring.New(5) + for x := exRing; x.Value == nil; x = x.Next() { + pack := new(VideoPack) + x.Value = &RingItem{Value: pack} + if vt.writeByteStream != nil { + pack.Buffer = bytes.NewBuffer([]byte{}) + } + } vt.Link(exRing) // 扩大缓冲环 - //l := vt.Ring.Len() - //utils.Printf("%s ring grow to %d", vt.Stream.StreamPath, l) } else { vt.idrCount-- }