refactor: GOP溢出处理

This commit is contained in:
ydajiang
2025-04-08 11:00:09 +08:00
parent a508ef2838
commit 4ec0912340
2 changed files with 46 additions and 80 deletions

View File

@@ -12,67 +12,36 @@ type GOPBuffer interface {
// AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败
AddPacket(packet *avformat.AVPacket) bool
// SetDiscardHandler 设置丢弃帧时的回调
SetDiscardHandler(handler func(packet *avformat.AVPacket))
PeekAll(handler func(packet *avformat.AVPacket))
Peek(index int) *avformat.AVPacket
PopAll(handler func(packet *avformat.AVPacket))
RequiresClear(nextPacket *avformat.AVPacket) bool
Size() int
Clear()
Close()
}
type streamBuffer struct {
buffer collections.RingBuffer[*avformat.AVPacket]
existVideoKeyFrame bool
discardHandler func(packet *avformat.AVPacket)
hasVideoKeyFrame bool
}
func (s *streamBuffer) AddPacket(packet *avformat.AVPacket) bool {
// 缓存满,清空
if s.Size()+1 == s.buffer.Capacity() {
s.Clear()
}
if utils.AVMediaTypeVideo == packet.MediaType {
if packet.Key {
s.hasVideoKeyFrame = true
} else if !s.hasVideoKeyFrame {
// 丢弃首帧视频非关键帧
if utils.AVMediaTypeVideo == packet.MediaType && !s.existVideoKeyFrame && !packet.Key {
return false
}
// 丢弃前一组GOP
videoKeyFrame := utils.AVMediaTypeVideo == packet.MediaType && packet.Key
if videoKeyFrame {
if s.existVideoKeyFrame {
s.discard()
}
s.existVideoKeyFrame = true
}
s.buffer.Push(packet)
return true
}
func (s *streamBuffer) SetDiscardHandler(handler func(packet *avformat.AVPacket)) {
s.discardHandler = handler
}
func (s *streamBuffer) discard() {
for s.buffer.Size() > 0 {
pkt := s.buffer.Pop()
if s.discardHandler != nil {
s.discardHandler(pkt)
}
}
s.existVideoKeyFrame = false
}
func (s *streamBuffer) Peek(index int) *avformat.AVPacket {
utils.Assert(index < s.buffer.Size())
head, tail := s.buffer.Data()
@@ -104,14 +73,19 @@ func (s *streamBuffer) Size() int {
return s.buffer.Size()
}
func (s *streamBuffer) Clear() {
s.discard()
func (s *streamBuffer) PopAll(handler func(packet *avformat.AVPacket)) {
for s.buffer.Size() > 0 {
pkt := s.buffer.Pop()
handler(pkt)
}
s.hasVideoKeyFrame = false
}
func (s *streamBuffer) Close() {
s.discardHandler = nil
func (s *streamBuffer) RequiresClear(nextPacket *avformat.AVPacket) bool {
return s.Size()+1 == s.buffer.Capacity() || (s.hasVideoKeyFrame && utils.AVMediaTypeVideo == nextPacket.MediaType && nextPacket.Key)
}
func NewStreamBuffer() GOPBuffer {
return &streamBuffer{buffer: collections.NewRingBuffer[*avformat.AVPacket](1000), existVideoKeyFrame: false}
return &streamBuffer{buffer: collections.NewRingBuffer[*avformat.AVPacket](1000), hasVideoKeyFrame: false}
}

View File

@@ -64,9 +64,6 @@ type Source interface {
// IsCompleted 所有推流track是否解析完毕
IsCompleted() bool
// OnDiscardPacket GOP缓存溢出回调, 释放AVPacket
OnDiscardPacket(pkt *avformat.AVPacket)
Init(receiveQueueSize int)
RemoteAddr() string
@@ -128,7 +125,6 @@ type PublishSource struct {
videoTranscoders []transcode.Transcoder // 视频解码器
originTracks TrackManager // 推流的音视频Streams
allStreamTracks TrackManager // 推流Streams+转码器获得的Stream
//pktBuffers [8]collections.MemoryPool // 推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
closed atomic.Bool // source是否已经关闭
@@ -266,7 +262,7 @@ func (s *PublishSource) TranscodeTracks() []*Track {
return s.allStreamTracks.All()
}
func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils.AVCodecID) bool {
func IsSupportMux(protocol TransStreamProtocol, _, _ utils.AVCodecID) bool {
if TransStreamRtmp == protocol || TransStreamFlv == protocol {
}
@@ -288,7 +284,9 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream
for _, track := range tracks {
// 重新拷贝一个track传输流内部使用track的时间戳
newTrack := *track
transStream.AddTrack(&newTrack)
if err = transStream.AddTrack(&newTrack); err != nil {
return nil, err
}
}
transStream.SetID(id)
@@ -560,7 +558,7 @@ func (s *PublishSource) cleanupSinkStreaming(sink Sink) {
if sink.GetProtocol() == TransStreamHls {
// 从HLS拉流队列删除Sink
SinkManager.Remove(sink.GetID())
_, _ = SinkManager.Remove(sink.GetID())
}
sink.StopStreaming(s.TransStreams[sink.GetTransStreamID()])
@@ -573,6 +571,7 @@ func (s *PublishSource) doRemoveSink(sink Sink) bool {
s.sinkCount--
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
utils.Assert(s.sinkCount > -1)
HookPlayDoneEvent(sink)
return true
@@ -591,25 +590,20 @@ func (s *PublishSource) DoClose() {
s.closed.Store(true)
// 释放GOP缓存
if s.gopBuffer != nil {
s.gopBuffer.PopAll(func(packet *avformat.AVPacket) {
s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex)
})
s.gopBuffer = nil
}
// 关闭推流源的解复用器
if s.TransDemuxer != nil {
s.TransDemuxer.Close()
s.TransDemuxer = nil
}
// 清空未写完的buffer
//for _, buffer := range s.pktBuffers {
// if buffer != nil {
// buffer.Reset()
// }
//}
// 释放GOP缓存
if s.gopBuffer != nil {
s.gopBuffer.Clear()
s.gopBuffer.Close()
s.gopBuffer = nil
}
// 停止track探测计时器
if s.probeTimer != nil {
s.probeTimer.Stop()
@@ -678,7 +672,7 @@ func (s *PublishSource) DoClose() {
// 异步hook
go func() {
if s.Conn != nil {
s.Conn.Close()
_ = s.Conn.Close()
s.Conn = nil
}
@@ -708,13 +702,6 @@ func (s *PublishSource) Close() {
group.Wait()
}
// OnDiscardPacket GOP缓存溢出丢弃回调
func (s *PublishSource) OnDiscardPacket(packet *avformat.AVPacket) {
if s.TransDemuxer != nil {
s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex)
}
}
// 解析完所有track后, 创建各种输出流
func (s *PublishSource) writeHeader() {
if s.completed {
@@ -835,8 +822,6 @@ func (s *PublishSource) OnNewTrack(track avformat.Track) {
// 创建GOPBuffer
if AppConfig.GOPCache && s.existVideo && s.gopBuffer == nil {
s.gopBuffer = NewStreamBuffer()
// 设置GOP缓存溢出回调
s.gopBuffer.SetDiscardHandler(s.OnDiscardPacket)
}
}
@@ -867,6 +852,13 @@ func (s *PublishSource) OnPacket(packet *avformat.AVPacket) {
// 保存到GOP缓存
if AppConfig.GOPCache && s.existVideo {
// GOP队列溢出
if s.gopBuffer.RequiresClear(packet) {
s.gopBuffer.PopAll(func(packet *avformat.AVPacket) {
s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex)
})
}
s.gopBuffer.AddPacket(packet)
}