diff --git a/gb28181/source.go b/gb28181/source.go index 4a9d191..7ed889d 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -121,7 +121,7 @@ func (source *BaseGBSource) ProcessPacket(data []byte) error { _ = packet.Unmarshal(data) // 国标级联转发 - if source.GetTransStreamPublisher().GetTransStreams() != nil { + if source.GetTransStreamPublisher().GetForwardTransStream() != nil { if source.lastRtpTimestamp == -1 { source.lastRtpTimestamp = int64(packet.Timestamp) } diff --git a/rtmp/rtmp_server.go b/rtmp/rtmp_server.go index 6d602db..c2e9bf4 100644 --- a/rtmp/rtmp_server.go +++ b/rtmp/rtmp_server.go @@ -65,11 +65,7 @@ func (s *server) OnPacket(conn net.Conn, data []byte) []byte { _ = conn.Close() } - if session.isPublisher { - return stream.TCPReceiveBufferPool.Get().([]byte) - } - - return nil + return session.receiveBuffer } func NewServer() Server { diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 42ecfd1..cd2885c 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -10,10 +10,11 @@ import ( // Session RTMP会话, 解析处理Message type Session struct { - conn net.Conn - stack *rtmp.ServerStack // rtmp协议栈, 解析message - handle interface{} // 持有具体会话句柄(推流端/拉流端), 在@see OnPublish @see OnPlay回调中赋值 - isPublisher bool // 是否是推流会话 + conn net.Conn + stack *rtmp.ServerStack // rtmp协议栈, 解析message + handle interface{} // 持有具体会话句柄(推流端/拉流端), 在@see OnPublish @see OnPlay回调中赋值 + isPublisher bool // 是否是推流会话 + receiveBuffer []byte } func (s *Session) generateSourceID(app, stream string) string { @@ -95,6 +96,11 @@ func (s *Session) Close() { s.stack.Close() s.stack = nil } + + if s.receiveBuffer != nil { + stream.TCPReceiveBufferPool.Put(s.receiveBuffer[:cap(s.receiveBuffer)]) + s.receiveBuffer = nil + } }() // 还未确定会话类型, 无需处理 @@ -118,7 +124,9 @@ func (s *Session) Close() { } func NewSession(conn net.Conn) *Session { - session := &Session{} + session := &Session{ + receiveBuffer: stream.TCPReceiveBufferPool.Get().([]byte), + } stackServer := rtmp.NewStackServer(false) stackServer.SetOnStreamHandler(session) diff --git a/stream/source.go b/stream/source.go index d4325dd..e1f4f8e 100644 --- a/stream/source.go +++ b/stream/source.go @@ -211,6 +211,11 @@ func (s *PublishSource) DoClose() { s.TransDemuxer.Close() } + // 释放packet + for _, track := range s.originTracks.All() { + s.clearUnusedPackets(track.Packets) + } + // 等传输流发布器关闭结束 s.streamPublisher.close() @@ -334,13 +339,23 @@ func (s *PublishSource) OnPacket(packet *avformat.AVPacket) { s.streamPublisher.Post(&StreamEvent{StreamEventTypePacket, packetPtr}) // 释放未引用的AVPacket + s.clearUnusedPackets(packets) +} + +func (s *PublishSource) clearUnusedPackets(packets *collections.LinkedList[*collections.ReferenceCounter[*avformat.AVPacket]]) { for packets.Size() > 0 { if packets.Get(0).UseCount() > 1 { break } - packets.Remove(0).Release() - s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) + unusedPacketPtr := packets.Remove(0) + bufferIndex := unusedPacketPtr.Get().BufferIndex + // 引用计数减1 + unusedPacketPtr.Release() + // AVPacket放回池中, 减少AVPacket分配 + avformat.FreePacket(unusedPacketPtr.Get()) + // 释放AVPacket的Data缓冲区 + s.TransDemuxer.DiscardHeadPacket(bufferIndex) } } diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go index dc922bc..91e4fb8 100644 --- a/stream/stream_publisher.go +++ b/stream/stream_publisher.go @@ -489,7 +489,7 @@ func (t *transStreamPublisher) FindSink(id SinkID) Sink { return result } -func (t *transStreamPublisher) cleanupSinkStreaming(sink Sink) { +func (t *transStreamPublisher) clearSinkStreaming(sink Sink) { transStreamSinks := t.transStreamSinks[sink.GetTransStreamID()] delete(transStreamSinks, sink.GetID()) t.lastStreamEndTime = time.Now() @@ -497,7 +497,7 @@ func (t *transStreamPublisher) cleanupSinkStreaming(sink Sink) { } func (t *transStreamPublisher) doRemoveSink(sink Sink) bool { - t.cleanupSinkStreaming(sink) + t.clearSinkStreaming(sink) delete(t.sinks, sink.GetID()) t.sinkCount-- @@ -519,7 +519,7 @@ func (t *transStreamPublisher) doClose() { // 释放GOP缓存 if t.gopBuffer != nil { - t.ClearGopBuffer() + t.ClearGopBuffer(true) t.gopBuffer = nil } @@ -621,7 +621,7 @@ func (t *transStreamPublisher) WriteHeader() { // 如果不存在视频帧, 清空GOP缓存 if !t.existVideo { - t.ClearGopBuffer() + t.ClearGopBuffer(false) t.gopBuffer = nil } } @@ -638,10 +638,14 @@ func (t *transStreamPublisher) Sinks() []Sink { return sinks } -func (t *transStreamPublisher) ClearGopBuffer() { +// ClearGopBuffer 清空GOP缓存, 在关闭stream publisher时, free为true, AVPacket放回池中. 如果free为false, 由Source放回池中. +func (t *transStreamPublisher) ClearGopBuffer(free bool) { t.gopBuffer.PopAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { - packet.Release() + if packet.Release() && free { + avformat.FreePacket(packet.Get()) + } + // 释放annexb和avcc格式转换的缓存 if t.bitstreamFilterBuffer != nil { t.bitstreamFilterBuffer.Pop() } @@ -661,7 +665,7 @@ func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*av // GOP队列溢出 if t.gopBuffer.RequiresClear(packet) { - t.ClearGopBuffer() + t.ClearGopBuffer(false) } t.gopBuffer.AddPacket(packet)