重构缓存

This commit is contained in:
yangjiechina
2023-12-09 17:18:04 +08:00
parent a849b496a2
commit 631c13303c
4 changed files with 77 additions and 37 deletions

View File

@@ -9,21 +9,34 @@ import (
type TransStream struct { type TransStream struct {
stream.TransStreamImpl stream.TransStreamImpl
chunkSize int chunkSize int
header []byte //音视频头chunk //sequence header
header []byte
headerSize int headerSize int
muxer *libflv.Muxer muxer *libflv.Muxer
//只存在音频流
onlyAudio bool
audioChunk librtmp.Chunk audioChunk librtmp.Chunk
videoChunk librtmp.Chunk videoChunk librtmp.Chunk
memoryPool stream.MemoryPool //只需要缓存一组GOP+第2组GOP的第一个合并写切片
transBuffer stream.StreamBuffer //当开始缓存第2组GOP的第二个合并写切片时将上一个GOP缓存释放掉
//使用2块内存池保证内存连续一次发送
//不开启GOP缓存和只有音频包的情况下创建使用一个MemoryPool
//memoryPool stream.MemoryPool
memoryPool [2]stream.MemoryPool
transBuffer stream.StreamBuffer
mwSegmentTs int64
lastTs int64 lastTs int64
chunkSizeQueue *stream.Queue chunkSizeQueue *stream.Queue
}
var nextOffset int //发送未完整切片的Sinks
//当AddSink时还未缓存到一组切片有多少先发多少. 后续切片未满之前的生成的rtmp包都将直接发送给sink.
//只要满了一组切片后这些sink都不单独发包, 统一发送切片.
incompleteSinks []stream.ISink
}
func (t *TransStream) Input(packet utils.AVPacket) { func (t *TransStream) Input(packet utils.AVPacket) {
utils.Assert(t.TransStreamImpl.Completed) utils.Assert(t.TransStreamImpl.Completed)
@@ -49,16 +62,21 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
//即不开启GOP缓存又不合并发送. 直接使用AVPacket的预留头封装发送 //即不开启GOP缓存又不合并发送. 直接使用AVPacket的预留头封装发送
if !stream.AppConfig.GOPCache && stream.AppConfig.MergeWriteLatency < 1 { if !stream.AppConfig.GOPCache || t.onlyAudio {
//首帧视频帧必须要 //首帧视频帧必须要关键帧
} else { return
}
if videoPkt && packet.KeyFrame() {
//交替使用缓存
tmp := t.memoryPool[0]
t.memoryPool[0] = t.memoryPool[1]
t.memoryPool[1] = tmp
} }
//payloadSize += payloadSize / t.chunkSize
//分配内存 //分配内存
t.memoryPool.Mark() t.memoryPool[0].Mark()
allocate := t.memoryPool.Allocate(12 + payloadSize + (payloadSize / t.chunkSize)) allocate := t.memoryPool[0].Allocate(12 + payloadSize + (payloadSize / t.chunkSize))
//写chunk头 //写chunk头
chunk.Length = payloadSize chunk.Length = payloadSize
@@ -75,8 +93,8 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
first := true first := true
var min int
for length > 0 { for length > 0 {
var min int
if first { if first {
min = utils.MinInt(length, t.chunkSize-5) min = utils.MinInt(length, t.chunkSize-5)
first = false first = false
@@ -101,15 +119,15 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
} }
rtmpData := t.memoryPool.Fetch()[:n] var ret bool
ret := true rtmpData := t.memoryPool[0].Fetch()[:n]
if stream.AppConfig.GOPCache { if stream.AppConfig.GOPCache {
//ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) //ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts())
ret = t.transBuffer.AddPacket(packet, packet.KeyFrame() && videoPkt, packet.Dts()) ret = t.transBuffer.AddPacket(packet, packet.KeyFrame() && videoPkt, packet.Dts())
} }
if !ret || stream.AppConfig.GOPCache { if !ret || stream.AppConfig.GOPCache {
t.memoryPool.FreeTail() t.memoryPool[0].FreeTail()
} }
if ret { if ret {
@@ -134,7 +152,7 @@ func (t *TransStream) Input(packet utils.AVPacket) {
return return
} }
head, tail := t.memoryPool.Data() head, tail := t.memoryPool[0].Data()
sizeHead, sizeTail := t.chunkSizeQueue.Data() sizeHead, sizeTail := t.chunkSizeQueue.Data()
var offset int var offset int
var size int var size int
@@ -165,13 +183,6 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
t.lastTs = lastTs t.lastTs = lastTs
if nextOffset == 0 {
nextOffset = size
} else {
utils.Assert(offset == nextOffset)
nextOffset += size
}
//后面再优化只发送一次 //后面再优化只发送一次
var data1 []byte var data1 []byte
var data2 []byte var data2 []byte
@@ -201,17 +212,17 @@ func (t *TransStream) Input(packet utils.AVPacket) {
} }
func (t *TransStream) AddSink(sink stream.ISink) { func (t *TransStream) AddSink(sink stream.ISink) {
t.TransStreamImpl.AddSink(sink)
utils.Assert(t.headerSize > 0) utils.Assert(t.headerSize > 0)
t.TransStreamImpl.AddSink(sink)
sink.Input(t.header[:t.headerSize]) sink.Input(t.header[:t.headerSize])
if !stream.AppConfig.GOPCache { if !stream.AppConfig.GOPCache {
return return
} }
//开启GOP缓存的情况下 //发送到最近一个合并写切片之前
//开启合并写的情况下:
// 如果合并写大小每满一次
// if stream.AppConfig.GOPCache > 0 { // if stream.AppConfig.GOPCache > 0 {
// t.transBuffer.PeekAll(func(packet interface{}) { // t.transBuffer.PeekAll(func(packet interface{}) {
// sink.Input(packet.([]byte)) // sink.Input(packet.([]byte))
@@ -220,9 +231,8 @@ func (t *TransStream) AddSink(sink stream.ISink) {
} }
func (t *TransStream) onDiscardPacket(pkt interface{}) { func (t *TransStream) onDiscardPacket(pkt interface{}) {
t.memoryPool.FreeHead() t.memoryPool[0].FreeHead()
size := t.chunkSizeQueue.Pop().(int) t.chunkSizeQueue.Pop()
nextOffset -= size
} }
func (t *TransStream) WriteHeader() error { func (t *TransStream) WriteHeader() error {
@@ -252,10 +262,16 @@ func (t *TransStream) WriteHeader() error {
t.TransStreamImpl.Completed = true t.TransStreamImpl.Completed = true
t.header = make([]byte, 1024) t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0)
t.memoryPool = stream.NewMemoryPoolWithRecopy(1024 * 4000)
if stream.AppConfig.GOPCache { if stream.AppConfig.GOPCache {
t.transBuffer = stream.NewStreamBuffer(200) t.transBuffer = stream.NewStreamBuffer(200)
t.transBuffer.SetDiscardHandler(t.onDiscardPacket) t.transBuffer.SetDiscardHandler(t.onDiscardPacket)
//创建2块内存
t.memoryPool[0] = stream.NewMemoryPoolWithRecopy(1024 * 4000)
t.memoryPool[1] = stream.NewMemoryPoolWithRecopy(1024 * 4000)
} else {
} }
var n int var n int

View File

@@ -56,9 +56,12 @@ func init() {
// AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写. // AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写.
type AppConfig_ struct { type AppConfig_ struct {
GOPCache bool `json:"gop_cache"` //是否开启GOP缓存只缓存一组音视频 GOPCache bool `json:"gop_cache"` //是否开启GOP缓存只缓存一组音视频
ProbeTimeout int `json:"probe_timeout"` ProbeTimeout int `json:"probe_timeout"`
MergeWriteLatency int `json:"mw_latency"` //缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互,大幅提升性能.
//缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例.
MergeWriteLatency int `json:"mw_latency"`
Rtmp RtmpConfig Rtmp RtmpConfig
Hook HookConfig Hook HookConfig
} }

View File

@@ -31,6 +31,8 @@ type MemoryPool interface {
FreeTail() FreeTail()
Data() ([]byte, []byte) Data() ([]byte, []byte)
Clear()
} }
func NewMemoryPool(capacity int) MemoryPool { func NewMemoryPool(capacity int) MemoryPool {
@@ -177,3 +179,14 @@ func (m *memoryPool) Data() ([]byte, []byte) {
} }
} }
func (m *memoryPool) Clear() {
m.capacity = cap(m.data)
m.head = 0
m.tail = 0
m.markIndex = 0
m.mark = false
m.blockQueue.Clear()
}

View File

@@ -20,6 +20,8 @@ type RingBuffer interface {
Size() int Size() int
Data() ([]interface{}, []interface{}) Data() ([]interface{}, []interface{})
Clear()
} }
func NewRingBuffer(capacity int) RingBuffer { func NewRingBuffer(capacity int) RingBuffer {
@@ -101,3 +103,9 @@ func (r *ringBuffer) Data() ([]interface{}, []interface{}) {
return r.data[r.head:r.tail], nil return r.data[r.head:r.tail], nil
} }
} }
func (r *ringBuffer) Clear() {
r.size = 0
r.head = 0
r.tail = 0
}