diff --git a/rtmp/rtmp_transtream.go b/rtmp/rtmp_transtream.go index 797cc98..041cfea 100644 --- a/rtmp/rtmp_transtream.go +++ b/rtmp/rtmp_transtream.go @@ -9,21 +9,34 @@ import ( type TransStream struct { stream.TransStreamImpl - chunkSize int - header []byte //音视频头chunk + chunkSize int + //sequence header + header []byte headerSize int muxer *libflv.Muxer + //只存在音频流 + onlyAudio bool audioChunk librtmp.Chunk videoChunk librtmp.Chunk - memoryPool stream.MemoryPool - transBuffer stream.StreamBuffer + //只需要缓存一组GOP+第2组GOP的第一个合并写切片 + //当开始缓存第2组GOP的第二个合并写切片时,将上一个GOP缓存释放掉 + //使用2块内存池,保证内存连续,一次发送 + //不开启GOP缓存和只有音频包的情况下,创建使用一个MemoryPool + //memoryPool stream.MemoryPool + memoryPool [2]stream.MemoryPool + transBuffer stream.StreamBuffer + + mwSegmentTs int64 lastTs int64 chunkSizeQueue *stream.Queue -} -var nextOffset int + //发送未完整切片的Sinks + //当AddSink时,还未缓存到一组切片,有多少先发多少. 后续切片未满之前的生成的rtmp包都将直接发送给sink. + //只要满了一组切片后,这些sink都不单独发包, 统一发送切片. + incompleteSinks []stream.ISink +} func (t *TransStream) Input(packet utils.AVPacket) { utils.Assert(t.TransStreamImpl.Completed) @@ -49,16 +62,21 @@ func (t *TransStream) Input(packet utils.AVPacket) { } //即不开启GOP缓存又不合并发送. 直接使用AVPacket的预留头封装发送 - if !stream.AppConfig.GOPCache && stream.AppConfig.MergeWriteLatency < 1 { - //首帧视频帧必须要 - } else { - + if !stream.AppConfig.GOPCache || t.onlyAudio { + //首帧视频帧必须要关键帧 + return + } + + if videoPkt && packet.KeyFrame() { + //交替使用缓存 + tmp := t.memoryPool[0] + t.memoryPool[0] = t.memoryPool[1] + t.memoryPool[1] = tmp } - //payloadSize += payloadSize / t.chunkSize //分配内存 - t.memoryPool.Mark() - allocate := t.memoryPool.Allocate(12 + payloadSize + (payloadSize / t.chunkSize)) + t.memoryPool[0].Mark() + allocate := t.memoryPool[0].Allocate(12 + payloadSize + (payloadSize / t.chunkSize)) //写chunk头 chunk.Length = payloadSize @@ -75,8 +93,8 @@ func (t *TransStream) Input(packet utils.AVPacket) { } first := true - var min int for length > 0 { + var min int if first { min = utils.MinInt(length, t.chunkSize-5) first = false @@ -101,15 +119,15 @@ func (t *TransStream) Input(packet utils.AVPacket) { } } - rtmpData := t.memoryPool.Fetch()[:n] - ret := true + var ret bool + rtmpData := t.memoryPool[0].Fetch()[:n] if stream.AppConfig.GOPCache { //ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts()) ret = t.transBuffer.AddPacket(packet, packet.KeyFrame() && videoPkt, packet.Dts()) } if !ret || stream.AppConfig.GOPCache { - t.memoryPool.FreeTail() + t.memoryPool[0].FreeTail() } if ret { @@ -134,7 +152,7 @@ func (t *TransStream) Input(packet utils.AVPacket) { return } - head, tail := t.memoryPool.Data() + head, tail := t.memoryPool[0].Data() sizeHead, sizeTail := t.chunkSizeQueue.Data() var offset int var size int @@ -165,13 +183,6 @@ func (t *TransStream) Input(packet utils.AVPacket) { } t.lastTs = lastTs - if nextOffset == 0 { - nextOffset = size - } else { - utils.Assert(offset == nextOffset) - nextOffset += size - } - //后面再优化只发送一次 var data1 []byte var data2 []byte @@ -201,17 +212,17 @@ func (t *TransStream) Input(packet utils.AVPacket) { } func (t *TransStream) AddSink(sink stream.ISink) { - t.TransStreamImpl.AddSink(sink) - utils.Assert(t.headerSize > 0) + + t.TransStreamImpl.AddSink(sink) sink.Input(t.header[:t.headerSize]) + if !stream.AppConfig.GOPCache { return } - //开启GOP缓存的情况下 - //开启合并写的情况下: - // 如果合并写大小每满一次 + //发送到最近一个合并写切片之前 + // if stream.AppConfig.GOPCache > 0 { // t.transBuffer.PeekAll(func(packet interface{}) { // sink.Input(packet.([]byte)) @@ -220,9 +231,8 @@ func (t *TransStream) AddSink(sink stream.ISink) { } func (t *TransStream) onDiscardPacket(pkt interface{}) { - t.memoryPool.FreeHead() - size := t.chunkSizeQueue.Pop().(int) - nextOffset -= size + t.memoryPool[0].FreeHead() + t.chunkSizeQueue.Pop() } func (t *TransStream) WriteHeader() error { @@ -252,10 +262,16 @@ func (t *TransStream) WriteHeader() error { t.TransStreamImpl.Completed = true t.header = make([]byte, 1024) t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0) - t.memoryPool = stream.NewMemoryPoolWithRecopy(1024 * 4000) + if stream.AppConfig.GOPCache { t.transBuffer = stream.NewStreamBuffer(200) t.transBuffer.SetDiscardHandler(t.onDiscardPacket) + + //创建2块内存 + t.memoryPool[0] = stream.NewMemoryPoolWithRecopy(1024 * 4000) + t.memoryPool[1] = stream.NewMemoryPoolWithRecopy(1024 * 4000) + } else { + } var n int diff --git a/stream/config.go b/stream/config.go index 287b107..0821fad 100644 --- a/stream/config.go +++ b/stream/config.go @@ -56,9 +56,12 @@ func init() { // AppConfig_ GOP缓存和合并写必须保持一致,同时开启或关闭. 关闭GOP缓存,是为了降低延迟,很难理解又另外开启合并写. type AppConfig_ struct { - GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 - ProbeTimeout int `json:"probe_timeout"` - MergeWriteLatency int `json:"mw_latency"` //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互,大幅提升性能. + GOPCache bool `json:"gop_cache"` //是否开启GOP缓存,只缓存一组音视频 + ProbeTimeout int `json:"probe_timeout"` + + //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. + //合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例. + MergeWriteLatency int `json:"mw_latency"` Rtmp RtmpConfig Hook HookConfig } diff --git a/stream/memory_pool.go b/stream/memory_pool.go index 201b638..92c1b52 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -31,6 +31,8 @@ type MemoryPool interface { FreeTail() Data() ([]byte, []byte) + + Clear() } func NewMemoryPool(capacity int) MemoryPool { @@ -177,3 +179,14 @@ func (m *memoryPool) Data() ([]byte, []byte) { } } + +func (m *memoryPool) Clear() { + m.capacity = cap(m.data) + m.head = 0 + m.tail = 0 + + m.markIndex = 0 + m.mark = false + + m.blockQueue.Clear() +} diff --git a/stream/ring_buffer.go b/stream/ring_buffer.go index 6756ef2..b4f06c2 100644 --- a/stream/ring_buffer.go +++ b/stream/ring_buffer.go @@ -20,6 +20,8 @@ type RingBuffer interface { Size() int Data() ([]interface{}, []interface{}) + + Clear() } func NewRingBuffer(capacity int) RingBuffer { @@ -101,3 +103,9 @@ func (r *ringBuffer) Data() ([]interface{}, []interface{}) { return r.data[r.head:r.tail], nil } } + +func (r *ringBuffer) Clear() { + r.size = 0 + r.head = 0 + r.tail = 0 +}