Merge branch 'dev' of https://gitee.com/lkmio/lkm into dev

This commit is contained in:
ydajiang
2025-06-11 16:41:05 +08:00
5 changed files with 43 additions and 20 deletions

View File

@@ -121,7 +121,7 @@ func (source *BaseGBSource) ProcessPacket(data []byte) error {
_ = packet.Unmarshal(data) _ = packet.Unmarshal(data)
// 国标级联转发 // 国标级联转发
if source.GetTransStreamPublisher().GetTransStreams() != nil { if source.GetTransStreamPublisher().GetForwardTransStream() != nil {
if source.lastRtpTimestamp == -1 { if source.lastRtpTimestamp == -1 {
source.lastRtpTimestamp = int64(packet.Timestamp) source.lastRtpTimestamp = int64(packet.Timestamp)
} }

View File

@@ -65,11 +65,7 @@ func (s *server) OnPacket(conn net.Conn, data []byte) []byte {
_ = conn.Close() _ = conn.Close()
} }
if session.isPublisher { return session.receiveBuffer
return stream.TCPReceiveBufferPool.Get().([]byte)
}
return nil
} }
func NewServer() Server { func NewServer() Server {

View File

@@ -10,10 +10,11 @@ import (
// Session RTMP会话, 解析处理Message // Session RTMP会话, 解析处理Message
type Session struct { type Session struct {
conn net.Conn conn net.Conn
stack *rtmp.ServerStack // rtmp协议栈, 解析message stack *rtmp.ServerStack // rtmp协议栈, 解析message
handle interface{} // 持有具体会话句柄(推流端/拉流端) 在@see OnPublish @see OnPlay回调中赋值 handle interface{} // 持有具体会话句柄(推流端/拉流端) 在@see OnPublish @see OnPlay回调中赋值
isPublisher bool // 是否是推流会话 isPublisher bool // 是否是推流会话
receiveBuffer []byte
} }
func (s *Session) generateSourceID(app, stream string) string { func (s *Session) generateSourceID(app, stream string) string {
@@ -95,6 +96,11 @@ func (s *Session) Close() {
s.stack.Close() s.stack.Close()
s.stack = nil s.stack = nil
} }
if s.receiveBuffer != nil {
stream.TCPReceiveBufferPool.Put(s.receiveBuffer[:cap(s.receiveBuffer)])
s.receiveBuffer = nil
}
}() }()
// 还未确定会话类型, 无需处理 // 还未确定会话类型, 无需处理
@@ -118,7 +124,9 @@ func (s *Session) Close() {
} }
func NewSession(conn net.Conn) *Session { func NewSession(conn net.Conn) *Session {
session := &Session{} session := &Session{
receiveBuffer: stream.TCPReceiveBufferPool.Get().([]byte),
}
stackServer := rtmp.NewStackServer(false) stackServer := rtmp.NewStackServer(false)
stackServer.SetOnStreamHandler(session) stackServer.SetOnStreamHandler(session)

View File

@@ -211,6 +211,11 @@ func (s *PublishSource) DoClose() {
s.TransDemuxer.Close() s.TransDemuxer.Close()
} }
// 释放packet
for _, track := range s.originTracks.All() {
s.clearUnusedPackets(track.Packets)
}
// 等传输流发布器关闭结束 // 等传输流发布器关闭结束
s.streamPublisher.close() s.streamPublisher.close()
@@ -334,13 +339,23 @@ func (s *PublishSource) OnPacket(packet *avformat.AVPacket) {
s.streamPublisher.Post(&StreamEvent{StreamEventTypePacket, packetPtr}) s.streamPublisher.Post(&StreamEvent{StreamEventTypePacket, packetPtr})
// 释放未引用的AVPacket // 释放未引用的AVPacket
s.clearUnusedPackets(packets)
}
func (s *PublishSource) clearUnusedPackets(packets *collections.LinkedList[*collections.ReferenceCounter[*avformat.AVPacket]]) {
for packets.Size() > 0 { for packets.Size() > 0 {
if packets.Get(0).UseCount() > 1 { if packets.Get(0).UseCount() > 1 {
break break
} }
packets.Remove(0).Release() unusedPacketPtr := packets.Remove(0)
s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) bufferIndex := unusedPacketPtr.Get().BufferIndex
// 引用计数减1
unusedPacketPtr.Release()
// AVPacket放回池中, 减少AVPacket分配
avformat.FreePacket(unusedPacketPtr.Get())
// 释放AVPacket的Data缓冲区
s.TransDemuxer.DiscardHeadPacket(bufferIndex)
} }
} }

View File

@@ -489,7 +489,7 @@ func (t *transStreamPublisher) FindSink(id SinkID) Sink {
return result return result
} }
func (t *transStreamPublisher) cleanupSinkStreaming(sink Sink) { func (t *transStreamPublisher) clearSinkStreaming(sink Sink) {
transStreamSinks := t.transStreamSinks[sink.GetTransStreamID()] transStreamSinks := t.transStreamSinks[sink.GetTransStreamID()]
delete(transStreamSinks, sink.GetID()) delete(transStreamSinks, sink.GetID())
t.lastStreamEndTime = time.Now() t.lastStreamEndTime = time.Now()
@@ -497,7 +497,7 @@ func (t *transStreamPublisher) cleanupSinkStreaming(sink Sink) {
} }
func (t *transStreamPublisher) doRemoveSink(sink Sink) bool { func (t *transStreamPublisher) doRemoveSink(sink Sink) bool {
t.cleanupSinkStreaming(sink) t.clearSinkStreaming(sink)
delete(t.sinks, sink.GetID()) delete(t.sinks, sink.GetID())
t.sinkCount-- t.sinkCount--
@@ -519,7 +519,7 @@ func (t *transStreamPublisher) doClose() {
// 释放GOP缓存 // 释放GOP缓存
if t.gopBuffer != nil { if t.gopBuffer != nil {
t.ClearGopBuffer() t.ClearGopBuffer(true)
t.gopBuffer = nil t.gopBuffer = nil
} }
@@ -621,7 +621,7 @@ func (t *transStreamPublisher) WriteHeader() {
// 如果不存在视频帧, 清空GOP缓存 // 如果不存在视频帧, 清空GOP缓存
if !t.existVideo { if !t.existVideo {
t.ClearGopBuffer() t.ClearGopBuffer(false)
t.gopBuffer = nil t.gopBuffer = nil
} }
} }
@@ -638,10 +638,14 @@ func (t *transStreamPublisher) Sinks() []Sink {
return sinks return sinks
} }
func (t *transStreamPublisher) ClearGopBuffer() { // ClearGopBuffer 清空GOP缓存, 在关闭stream publisher时, free为true, AVPacket放回池中. 如果free为false, 由Source放回池中.
func (t *transStreamPublisher) ClearGopBuffer(free bool) {
t.gopBuffer.PopAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { t.gopBuffer.PopAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) {
packet.Release() if packet.Release() && free {
avformat.FreePacket(packet.Get())
}
// 释放annexb和avcc格式转换的缓存
if t.bitstreamFilterBuffer != nil { if t.bitstreamFilterBuffer != nil {
t.bitstreamFilterBuffer.Pop() t.bitstreamFilterBuffer.Pop()
} }
@@ -661,7 +665,7 @@ func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*av
// GOP队列溢出 // GOP队列溢出
if t.gopBuffer.RequiresClear(packet) { if t.gopBuffer.RequiresClear(packet) {
t.ClearGopBuffer() t.ClearGopBuffer(false)
} }
t.gopBuffer.AddPacket(packet) t.gopBuffer.AddPacket(packet)