From 059bc20018d6aa64f144bc744b9a4cb72f0ba712 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Sun, 8 Jun 2025 21:27:12 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20AVPacket=E4=BD=BF=E7=94=A8sync.Pool?= =?UTF-8?q?=E7=AE=A1=E7=90=86,=20=E5=87=8F=E5=B0=91=E5=86=85=E5=AD=98?= =?UTF-8?q?=E7=A2=8E=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- stream/source.go | 19 +++++++++++++++++-- stream/stream_publisher.go | 18 +++++++++++------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/stream/source.go b/stream/source.go index d4325dd..e1f4f8e 100644 --- a/stream/source.go +++ b/stream/source.go @@ -211,6 +211,11 @@ func (s *PublishSource) DoClose() { s.TransDemuxer.Close() } + // 释放packet + for _, track := range s.originTracks.All() { + s.clearUnusedPackets(track.Packets) + } + // 等传输流发布器关闭结束 s.streamPublisher.close() @@ -334,13 +339,23 @@ func (s *PublishSource) OnPacket(packet *avformat.AVPacket) { s.streamPublisher.Post(&StreamEvent{StreamEventTypePacket, packetPtr}) // 释放未引用的AVPacket + s.clearUnusedPackets(packets) +} + +func (s *PublishSource) clearUnusedPackets(packets *collections.LinkedList[*collections.ReferenceCounter[*avformat.AVPacket]]) { for packets.Size() > 0 { if packets.Get(0).UseCount() > 1 { break } - packets.Remove(0).Release() - s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) + unusedPacketPtr := packets.Remove(0) + bufferIndex := unusedPacketPtr.Get().BufferIndex + // 引用计数减1 + unusedPacketPtr.Release() + // AVPacket放回池中, 减少AVPacket分配 + avformat.FreePacket(unusedPacketPtr.Get()) + // 释放AVPacket的Data缓冲区 + s.TransDemuxer.DiscardHeadPacket(bufferIndex) } } diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index 29b3b6e..b0db461 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -464,7 +464,7 @@ func (t *transStreamPublisher) FindSink(id SinkID) Sink { return result } -func (t *transStreamPublisher) cleanupSinkStreaming(sink Sink) { +func (t *transStreamPublisher) clearSinkStreaming(sink Sink) { transStreamSinks := t.transStreamSinks[sink.GetTransStreamID()] delete(transStreamSinks, sink.GetID()) t.lastStreamEndTime = time.Now() @@ -472,7 +472,7 @@ func (t *transStreamPublisher) cleanupSinkStreaming(sink Sink) { } func (t *transStreamPublisher) doRemoveSink(sink Sink) bool { - t.cleanupSinkStreaming(sink) + t.clearSinkStreaming(sink) delete(t.sinks, sink.GetID()) t.sinkCount-- @@ -494,7 +494,7 @@ func (t *transStreamPublisher) doClose() { // 释放GOP缓存 if t.gopBuffer != nil { - t.ClearGopBuffer() + t.ClearGopBuffer(true) t.gopBuffer = nil } @@ -596,7 +596,7 @@ func (t *transStreamPublisher) WriteHeader() { // 如果不存在视频帧, 清空GOP缓存 if !t.existVideo { - t.ClearGopBuffer() + t.ClearGopBuffer(false) t.gopBuffer = nil } } @@ -613,10 +613,14 @@ func (t *transStreamPublisher) Sinks() []Sink { return sinks } -func (t *transStreamPublisher) ClearGopBuffer() { +// ClearGopBuffer 清空GOP缓存, 在关闭stream publisher时, free为true, AVPacket放回池中. 如果free为false, 由Source放回池中. +func (t *transStreamPublisher) ClearGopBuffer(free bool) { t.gopBuffer.PopAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { - packet.Release() + if packet.Release() && free { + avformat.FreePacket(packet.Get()) + } + // 释放annexb和avcc格式转换的缓存 if t.bitstreamFilterBuffer != nil { t.bitstreamFilterBuffer.Pop() } @@ -636,7 +640,7 @@ func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*av // GOP队列溢出 if t.gopBuffer.RequiresClear(packet) { - t.ClearGopBuffer() + t.ClearGopBuffer(false) } t.gopBuffer.AddPacket(packet)