diff --git a/stream/gop_buffer.go b/stream/gop_buffer.go index 81e2c54..ec0d363 100644 --- a/stream/gop_buffer.go +++ b/stream/gop_buffer.go @@ -12,67 +12,36 @@ type GOPBuffer interface { // AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败 AddPacket(packet *avformat.AVPacket) bool - // SetDiscardHandler 设置丢弃帧时的回调 - SetDiscardHandler(handler func(packet *avformat.AVPacket)) - PeekAll(handler func(packet *avformat.AVPacket)) Peek(index int) *avformat.AVPacket + PopAll(handler func(packet *avformat.AVPacket)) + + RequiresClear(nextPacket *avformat.AVPacket) bool + Size() int - - Clear() - - Close() } type streamBuffer struct { - buffer collections.RingBuffer[*avformat.AVPacket] - existVideoKeyFrame bool - discardHandler func(packet *avformat.AVPacket) + buffer collections.RingBuffer[*avformat.AVPacket] + hasVideoKeyFrame bool } func (s *streamBuffer) AddPacket(packet *avformat.AVPacket) bool { - // 缓存满,清空 - if s.Size()+1 == s.buffer.Capacity() { - s.Clear() - } - - // 丢弃首帧视频非关键帧 - if utils.AVMediaTypeVideo == packet.MediaType && !s.existVideoKeyFrame && !packet.Key { - return false - } - - // 丢弃前一组GOP - videoKeyFrame := utils.AVMediaTypeVideo == packet.MediaType && packet.Key - if videoKeyFrame { - if s.existVideoKeyFrame { - s.discard() + if utils.AVMediaTypeVideo == packet.MediaType { + if packet.Key { + s.hasVideoKeyFrame = true + } else if !s.hasVideoKeyFrame { + // 丢弃首帧视频非关键帧 + return false } - - s.existVideoKeyFrame = true } s.buffer.Push(packet) return true } -func (s *streamBuffer) SetDiscardHandler(handler func(packet *avformat.AVPacket)) { - s.discardHandler = handler -} - -func (s *streamBuffer) discard() { - for s.buffer.Size() > 0 { - pkt := s.buffer.Pop() - - if s.discardHandler != nil { - s.discardHandler(pkt) - } - } - - s.existVideoKeyFrame = false -} - func (s *streamBuffer) Peek(index int) *avformat.AVPacket { utils.Assert(index < s.buffer.Size()) head, tail := s.buffer.Data() @@ -104,14 +73,19 @@ func (s *streamBuffer) Size() int { return s.buffer.Size() } -func (s *streamBuffer) Clear() { - s.discard() +func (s *streamBuffer) PopAll(handler func(packet *avformat.AVPacket)) { + for s.buffer.Size() > 0 { + pkt := s.buffer.Pop() + handler(pkt) + } + + s.hasVideoKeyFrame = false } -func (s *streamBuffer) Close() { - s.discardHandler = nil +func (s *streamBuffer) RequiresClear(nextPacket *avformat.AVPacket) bool { + return s.Size()+1 == s.buffer.Capacity() || (s.hasVideoKeyFrame && utils.AVMediaTypeVideo == nextPacket.MediaType && nextPacket.Key) } func NewStreamBuffer() GOPBuffer { - return &streamBuffer{buffer: collections.NewRingBuffer[*avformat.AVPacket](1000), existVideoKeyFrame: false} + return &streamBuffer{buffer: collections.NewRingBuffer[*avformat.AVPacket](1000), hasVideoKeyFrame: false} } diff --git a/stream/source.go b/stream/source.go index 7164548..349a179 100644 --- a/stream/source.go +++ b/stream/source.go @@ -64,9 +64,6 @@ type Source interface { // IsCompleted 所有推流track是否解析完毕 IsCompleted() bool - // OnDiscardPacket GOP缓存溢出回调, 释放AVPacket - OnDiscardPacket(pkt *avformat.AVPacket) - Init(receiveQueueSize int) RemoteAddr() string @@ -128,8 +125,7 @@ type PublishSource struct { videoTranscoders []transcode.Transcoder // 视频解码器 originTracks TrackManager // 推流的音视频Streams allStreamTracks TrackManager // 推流Streams+转码器获得的Stream - //pktBuffers [8]collections.MemoryPool // 推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存. - gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频 + gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频 closed atomic.Bool // source是否已经关闭 completed bool // 所有推流track是否解析完毕, @see writeHeader 函数中赋值为true @@ -266,7 +262,7 @@ func (s *PublishSource) TranscodeTracks() []*Track { return s.allStreamTracks.All() } -func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils.AVCodecID) bool { +func IsSupportMux(protocol TransStreamProtocol, _, _ utils.AVCodecID) bool { if TransStreamRtmp == protocol || TransStreamFlv == protocol { } @@ -288,7 +284,9 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream for _, track := range tracks { // 重新拷贝一个track,传输流内部使用track的时间戳, newTrack := *track - transStream.AddTrack(&newTrack) + if err = transStream.AddTrack(&newTrack); err != nil { + return nil, err + } } transStream.SetID(id) @@ -560,7 +558,7 @@ func (s *PublishSource) cleanupSinkStreaming(sink Sink) { if sink.GetProtocol() == TransStreamHls { // 从HLS拉流队列删除Sink - SinkManager.Remove(sink.GetID()) + _, _ = SinkManager.Remove(sink.GetID()) } sink.StopStreaming(s.TransStreams[sink.GetTransStreamID()]) @@ -573,6 +571,7 @@ func (s *PublishSource) doRemoveSink(sink Sink) bool { s.sinkCount-- log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) + utils.Assert(s.sinkCount > -1) HookPlayDoneEvent(sink) return true @@ -591,25 +590,20 @@ func (s *PublishSource) DoClose() { s.closed.Store(true) + // 释放GOP缓存 + if s.gopBuffer != nil { + s.gopBuffer.PopAll(func(packet *avformat.AVPacket) { + s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) + }) + s.gopBuffer = nil + } + + // 关闭推流源的解复用器 if s.TransDemuxer != nil { s.TransDemuxer.Close() s.TransDemuxer = nil } - // 清空未写完的buffer - //for _, buffer := range s.pktBuffers { - // if buffer != nil { - // buffer.Reset() - // } - //} - - // 释放GOP缓存 - if s.gopBuffer != nil { - s.gopBuffer.Clear() - s.gopBuffer.Close() - s.gopBuffer = nil - } - // 停止track探测计时器 if s.probeTimer != nil { s.probeTimer.Stop() @@ -678,7 +672,7 @@ func (s *PublishSource) DoClose() { // 异步hook go func() { if s.Conn != nil { - s.Conn.Close() + _ = s.Conn.Close() s.Conn = nil } @@ -708,13 +702,6 @@ func (s *PublishSource) Close() { group.Wait() } -// OnDiscardPacket GOP缓存溢出丢弃回调 -func (s *PublishSource) OnDiscardPacket(packet *avformat.AVPacket) { - if s.TransDemuxer != nil { - s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) - } -} - // 解析完所有track后, 创建各种输出流 func (s *PublishSource) writeHeader() { if s.completed { @@ -835,8 +822,6 @@ func (s *PublishSource) OnNewTrack(track avformat.Track) { // 创建GOPBuffer if AppConfig.GOPCache && s.existVideo && s.gopBuffer == nil { s.gopBuffer = NewStreamBuffer() - // 设置GOP缓存溢出回调 - s.gopBuffer.SetDiscardHandler(s.OnDiscardPacket) } } @@ -867,6 +852,13 @@ func (s *PublishSource) OnPacket(packet *avformat.AVPacket) { // 保存到GOP缓存 if AppConfig.GOPCache && s.existVideo { + // GOP队列溢出 + if s.gopBuffer.RequiresClear(packet) { + s.gopBuffer.PopAll(func(packet *avformat.AVPacket) { + s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) + }) + } + s.gopBuffer.AddPacket(packet) }