使用gop缓存

This commit is contained in:
yangjiechina
2023-11-28 18:47:56 +08:00
parent 369f295452
commit 6fd7498ce0
4 changed files with 29 additions and 9 deletions

View File

@@ -8,6 +8,7 @@ import (
type Publisher struct {
stream.SourceImpl
deMuxer libflv.DeMuxer
audioMemoryPool stream.MemoryPool
videoMemoryPool stream.MemoryPool
@@ -33,6 +34,15 @@ func NewPublisher(sourceId string) *Publisher {
return publisher
}
func (p *Publisher) OnDiscardPacket(pkt interface{}) {
packet := pkt.(utils.AVPacket)
if utils.AVMediaTypeAudio == packet.MediaType() {
p.audioMemoryPool.FreeHead()
} else if utils.AVMediaTypeVideo == packet.MediaType() {
p.videoMemoryPool.FreeHead()
}
}
func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) {
//AVStream的Data单独拷贝出来
//释放掉内存池中最新分配的内存
@@ -47,7 +57,10 @@ func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) {
p.videoMemoryPool.FreeTail()
}
p.SourceImpl.OnDeMuxStream(stream_)
if ret, buffer := p.SourceImpl.OnDeMuxStream(stream_); ret && buffer != nil {
buffer.SetDiscardHandler(p.OnDiscardPacket)
}
}
func (p *Publisher) OnDeMuxStreamDone() {

View File

@@ -1,6 +1,8 @@
package stream
import "github.com/yangjiechina/avformat/utils"
import (
"github.com/yangjiechina/avformat/utils"
)
type RingBuffer interface {
IsEmpty() bool
@@ -64,6 +66,7 @@ func (r *ringBuffer) Pop() interface{} {
}
element := r.data[r.head]
r.data[r.head] = nil
r.head = (r.head + 1) % cap(r.data)
r.size--
return element
@@ -86,7 +89,7 @@ func (r *ringBuffer) All() ([]interface{}, []interface{}) {
return nil, nil
}
if r.head < r.tail {
if r.head <= r.tail {
return r.data[r.head:], r.data[:r.tail]
} else {
return r.data[r.head:], nil

View File

@@ -207,7 +207,7 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
if AppConfig.GOPCache > 0 && !ok {
//先交叉发送
for i := 0; i < bufferCount; i++ {
for _, stream := range streams {
for _, stream := range streams[:index] {
buffer := s.buffers[stream.Index()]
packet := buffer.Peek(i).(utils.AVPacket)
transStream.Input(packet)
@@ -215,7 +215,7 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
}
//发送超过最低缓存数的缓存包
for _, stream := range streams {
for _, stream := range streams[:index] {
buffer := s.buffers[stream.Index()]
for i := bufferCount; i > buffer.Size(); i++ {
@@ -236,10 +236,10 @@ func (s *SourceImpl) Close() {
}
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) (bool, StreamBuffer) {
if s.completed {
fmt.Printf("添加Stream失败 Source: %s已经WriteHeader", s.Id_)
return
return false, nil
}
s.originStreams.Add(stream)
@@ -252,7 +252,11 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
if AppConfig.GOPCache > 0 {
buffer := NewStreamBuffer(int64(AppConfig.GOPCache * 1000))
s.buffers = append(s.buffers, buffer)
return true, buffer
}
return true, nil
}
// 从DeMuxer解析完Stream后, 处理等待Sinks

View File

@@ -89,9 +89,9 @@ func (s *streamBuffer) Peek(index int) interface{} {
head, tail := s.buffer.All()
if index < len(head) {
return head[index]
return head[index].(element).pkt
} else {
return tail[index-len(head)]
return tail[index-len(head)].(element).pkt
}
}