测试使用gop缓存

This commit is contained in:
yangjiechina
2023-11-27 18:01:37 +08:00
parent b88be46bb7
commit 369f295452
5 changed files with 76 additions and 15 deletions

View File

@@ -18,7 +18,7 @@ type Publisher struct {
func NewPublisher(sourceId string) *Publisher { func NewPublisher(sourceId string) *Publisher {
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}, audioUnmark: false, videoUnmark: false} publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}, audioUnmark: false, videoUnmark: false}
publisher.deMuxer = libflv.DeMuxer{} publisher.deMuxer = libflv.NewDeMuxer()
//设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl //设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
publisher.deMuxer.SetHandler(publisher) publisher.deMuxer.SetHandler(publisher)

View File

@@ -21,6 +21,7 @@ func init() {
} }
func TestServer(t *testing.T) { func TestServer(t *testing.T) {
stream.AppConfig.GOPCache = 2
impl := serverImpl{} impl := serverImpl{}
addr := "0.0.0.0:1935" addr := "0.0.0.0:1935"
tcpAddr, err := net.ResolveTCPAddr("tcp", addr) tcpAddr, err := net.ResolveTCPAddr("tcp", addr)

View File

@@ -36,9 +36,6 @@ func (t *TransStream) Input(packet utils.AVPacket) {
chunk = &t.audioChunk chunk = &t.audioChunk
payloadSize += 2 + length payloadSize += 2 + length
} else if utils.AVMediaTypeVideo == packet.MediaType() { } else if utils.AVMediaTypeVideo == packet.MediaType() {
if packet.KeyFrame() {
println("")
}
videoPkt = true videoPkt = true
data = packet.AVCCPacketData() data = packet.AVCCPacketData()
length = len(data) length = len(data)
@@ -93,7 +90,10 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
rtmpData := t.memoryPool.Fetch()[:n] rtmpData := t.memoryPool.Fetch()[:n]
ret := t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) ret := true
if stream.AppConfig.GOPCache > 0 {
ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts())
}
if ret { if ret {
//发送给sink //发送给sink
@@ -103,8 +103,10 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
} }
if stream.AppConfig.GOPCache < 1 {
t.memoryPool.FreeTail() t.memoryPool.FreeTail()
} }
}
func (t *TransStream) AddSink(sink stream.ISink) { func (t *TransStream) AddSink(sink stream.ISink) {
t.TransStreamImpl.AddSink(sink) t.TransStreamImpl.AddSink(sink)
@@ -112,10 +114,12 @@ func (t *TransStream) AddSink(sink stream.ISink) {
utils.Assert(t.headerSize > 0) utils.Assert(t.headerSize > 0)
sink.Input(t.header[:t.headerSize]) sink.Input(t.header[:t.headerSize])
t.transBuffer.Peek(func(packet interface{}) { if stream.AppConfig.GOPCache > 0 {
t.transBuffer.PeekAll(func(packet interface{}) {
sink.Input(packet.([]byte)) sink.Input(packet.([]byte))
}) })
} }
}
func (t *TransStream) onDiscardPacket(pkt interface{}) { func (t *TransStream) onDiscardPacket(pkt interface{}) {
t.memoryPool.FreeHead() t.memoryPool.FreeHead()
@@ -148,9 +152,11 @@ func (t *TransStream) WriteHeader() error {
t.TransStreamImpl.Completed = true t.TransStreamImpl.Completed = true
t.header = make([]byte, 1024) t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0)
t.memoryPool = stream.NewMemoryPool(1024 * 1024 * 2) t.memoryPool = stream.NewMemoryPool(1024 * 1000 * (stream.AppConfig.GOPCache + 1))
t.transBuffer = stream.NewStreamBuffer(2000) if stream.AppConfig.GOPCache > 0 {
t.transBuffer = stream.NewStreamBuffer(int64(stream.AppConfig.GOPCache * 200))
t.transBuffer.SetDiscardHandler(t.onDiscardPacket) t.transBuffer.SetDiscardHandler(t.onDiscardPacket)
}
var n int var n int
if audioStream != nil { if audioStream != nil {

View File

@@ -166,6 +166,8 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
var streams [5]utils.AVStream var streams [5]utils.AVStream
var index int var index int
bufferCount := -1
for _, stream := range s.originStreams.All() { for _, stream := range s.originStreams.All() {
if disableVideo && stream.Type() == utils.AVMediaTypeVideo { if disableVideo && stream.Type() == utils.AVMediaTypeVideo {
continue continue
@@ -173,11 +175,20 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
streams[index] = stream streams[index] = stream
index++ index++
//从缓存的Stream中挑选出最小的缓存数量交叉发送.
count := s.buffers[stream.Index()].Size()
if bufferCount == -1 {
bufferCount = count
} else {
bufferCount = utils.MinInt(bufferCount, count)
}
} }
transStreamId := GenerateTransStreamId(sink.Protocol(), streams[:index]...) transStreamId := GenerateTransStreamId(sink.Protocol(), streams[:index]...)
transStream, ok := s.transStreams[transStreamId] transStream, ok := s.transStreams[transStreamId]
if !ok { if !ok {
//创建一个新的传输流
transStream = TransStreamFactory(sink.Protocol(), streams[:index]) transStream = TransStreamFactory(sink.Protocol(), streams[:index])
if s.transStreams == nil { if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]ITransStream, 10) s.transStreams = make(map[TransStreamId]ITransStream, 10)
@@ -192,6 +203,28 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
} }
transStream.AddSink(sink) transStream.AddSink(sink)
if AppConfig.GOPCache > 0 && !ok {
//先交叉发送
for i := 0; i < bufferCount; i++ {
for _, stream := range streams {
buffer := s.buffers[stream.Index()]
packet := buffer.Peek(i).(utils.AVPacket)
transStream.Input(packet)
}
}
//发送超过最低缓存数的缓存包
for _, stream := range streams {
buffer := s.buffers[stream.Index()]
for i := bufferCount; i > buffer.Size(); i++ {
packet := buffer.Peek(i).(utils.AVPacket)
transStream.Input(packet)
}
}
}
return false return false
} }
@@ -217,7 +250,7 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
//为每个Stream创建对于的Buffer //为每个Stream创建对于的Buffer
if AppConfig.GOPCache > 0 { if AppConfig.GOPCache > 0 {
buffer := NewStreamBuffer(int64(AppConfig.GOPCache)) buffer := NewStreamBuffer(int64(AppConfig.GOPCache * 1000))
s.buffers = append(s.buffers, buffer) s.buffers = append(s.buffers, buffer)
} }
} }

View File

@@ -1,5 +1,7 @@
package stream package stream
import "github.com/yangjiechina/avformat/utils"
// StreamBuffer GOP缓存 // StreamBuffer GOP缓存
type StreamBuffer interface { type StreamBuffer interface {
@@ -9,9 +11,13 @@ type StreamBuffer interface {
// SetDiscardHandler 设置丢弃帧时的回调 // SetDiscardHandler 设置丢弃帧时的回调
SetDiscardHandler(handler func(packet interface{})) SetDiscardHandler(handler func(packet interface{}))
Peek(handler func(packet interface{})) PeekAll(handler func(packet interface{}))
Peek(index int) interface{}
Duration() int64 Duration() int64
Size() int
} }
type streamBuffer struct { type streamBuffer struct {
@@ -78,7 +84,18 @@ func (s *streamBuffer) SetDiscardHandler(handler func(packet interface{})) {
s.discardHandler = handler s.discardHandler = handler
} }
func (s *streamBuffer) Peek(handler func(packet interface{})) { func (s *streamBuffer) Peek(index int) interface{} {
utils.Assert(index < s.buffer.Size())
head, tail := s.buffer.All()
if index < len(head) {
return head[index]
} else {
return tail[index-len(head)]
}
}
func (s *streamBuffer) PeekAll(handler func(packet interface{})) {
head, tail := s.buffer.All() head, tail := s.buffer.All()
if head == nil { if head == nil {
@@ -106,3 +123,7 @@ func (s *streamBuffer) Duration() int64 {
return tail.(element).ts - head.(element).ts return tail.(element).ts - head.(element).ts
} }
func (s *streamBuffer) Size() int {
return s.buffer.Size()
}