From 6fd7498ce0bcbd0e3fd1593df8dc676a957cbbb4 Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Tue, 28 Nov 2023 18:47:56 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8gop=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rtmp/rtmp_publisher.go | 15 ++++++++++++++- stream/ring_buffer.go | 7 +++++-- stream/source.go | 12 ++++++++---- stream/stream_buffer.go | 4 ++-- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index e169df3..ba50204 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -8,6 +8,7 @@ import ( type Publisher struct { stream.SourceImpl + deMuxer libflv.DeMuxer audioMemoryPool stream.MemoryPool videoMemoryPool stream.MemoryPool @@ -33,6 +34,15 @@ func NewPublisher(sourceId string) *Publisher { return publisher } +func (p *Publisher) OnDiscardPacket(pkt interface{}) { + packet := pkt.(utils.AVPacket) + if utils.AVMediaTypeAudio == packet.MediaType() { + p.audioMemoryPool.FreeHead() + } else if utils.AVMediaTypeVideo == packet.MediaType() { + p.videoMemoryPool.FreeHead() + } +} + func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) { //AVStream的Data单独拷贝出来 //释放掉内存池中最新分配的内存 @@ -47,7 +57,10 @@ func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) { p.videoMemoryPool.FreeTail() } - p.SourceImpl.OnDeMuxStream(stream_) + if ret, buffer := p.SourceImpl.OnDeMuxStream(stream_); ret && buffer != nil { + buffer.SetDiscardHandler(p.OnDiscardPacket) + } + } func (p *Publisher) OnDeMuxStreamDone() { diff --git a/stream/ring_buffer.go b/stream/ring_buffer.go index ed25146..3bdf8d9 100644 --- a/stream/ring_buffer.go +++ b/stream/ring_buffer.go @@ -1,6 +1,8 @@ package stream -import "github.com/yangjiechina/avformat/utils" +import ( + "github.com/yangjiechina/avformat/utils" +) type RingBuffer interface { IsEmpty() bool @@ -64,6 +66,7 @@ func (r *ringBuffer) Pop() interface{} { } element := r.data[r.head] + r.data[r.head] = nil r.head = (r.head + 1) % cap(r.data) r.size-- return element @@ -86,7 +89,7 @@ func (r *ringBuffer) All() ([]interface{}, []interface{}) { return nil, nil } - if r.head < r.tail { + if r.head <= r.tail { return r.data[r.head:], r.data[:r.tail] } else { return r.data[r.head:], nil diff --git a/stream/source.go b/stream/source.go index 2ca7a82..bee03a0 100644 --- a/stream/source.go +++ b/stream/source.go @@ -207,7 +207,7 @@ func (s *SourceImpl) AddSink(sink ISink) bool { if AppConfig.GOPCache > 0 && !ok { //先交叉发送 for i := 0; i < bufferCount; i++ { - for _, stream := range streams { + for _, stream := range streams[:index] { buffer := s.buffers[stream.Index()] packet := buffer.Peek(i).(utils.AVPacket) transStream.Input(packet) @@ -215,7 +215,7 @@ func (s *SourceImpl) AddSink(sink ISink) bool { } //发送超过最低缓存数的缓存包 - for _, stream := range streams { + for _, stream := range streams[:index] { buffer := s.buffers[stream.Index()] for i := bufferCount; i > buffer.Size(); i++ { @@ -236,10 +236,10 @@ func (s *SourceImpl) Close() { } -func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) { +func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) (bool, StreamBuffer) { if s.completed { fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.Id_) - return + return false, nil } s.originStreams.Add(stream) @@ -252,7 +252,11 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) { if AppConfig.GOPCache > 0 { buffer := NewStreamBuffer(int64(AppConfig.GOPCache * 1000)) s.buffers = append(s.buffers, buffer) + + return true, buffer } + + return true, nil } // 从DeMuxer解析完Stream后, 处理等待Sinks diff --git a/stream/stream_buffer.go b/stream/stream_buffer.go index ef061fd..174fbbd 100644 --- a/stream/stream_buffer.go +++ b/stream/stream_buffer.go @@ -89,9 +89,9 @@ func (s *streamBuffer) Peek(index int) interface{} { head, tail := s.buffer.All() if index < len(head) { - return head[index] + return head[index].(element).pkt } else { - return tail[index-len(head)] + return tail[index-len(head)].(element).pkt } }