diff --git a/stream/mw_buffer.go b/stream/mw_buffer.go index 3f10709..50e2515 100644 --- a/stream/mw_buffer.go +++ b/stream/mw_buffer.go @@ -29,35 +29,41 @@ type MergeWritingBuffer interface { ReadSegmentsFromKeyFrameIndex(cb func([]byte)) } +type mwBlock struct { + free bool + keyVideo bool + buffer collections.MemoryPool +} + type mergeWritingBuffer struct { - mwBlocks []collections.MemoryPool + mwBlocks []mwBlock //空闲合并写块 - freeKeyFrameMWBlocks collections.LinkedList[collections.MemoryPool] - freeNoneKeyFrameMWBlocks collections.LinkedList[collections.MemoryPool] + keyFrameFreeMWBlocks collections.LinkedList[collections.MemoryPool] + noneKeyFreeFrameMWBlocks collections.LinkedList[collections.MemoryPool] index int //当前切片位于mwBlocks的索引 startTS int64 //当前切片的开始时间 duration int //当前切片时长 lastKeyFrameIndex int //最新关键帧所在切片的索引 + keyFrameCount int //关键帧计数 existVideo bool //是否存在视频 keyFrameBufferMaxLength int nonKeyFrameBufferMaxLength int - keyFrameMap map[int]int } -func (m *mergeWritingBuffer) createMWBlock(videoKey bool) collections.MemoryPool { +func (m *mergeWritingBuffer) createMWBlock(videoKey bool) mwBlock { if videoKey { - return collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength) + return mwBlock{true, videoKey, collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength)} } else { - return collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength) + return mwBlock{true, false, collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength)} } } func (m *mergeWritingBuffer) grow() { - pools := make([]collections.MemoryPool, cap(m.mwBlocks)*3/2) + pools := make([]mwBlock, cap(m.mwBlocks)*3/2) for i := 0; i < cap(m.mwBlocks); i++ { pools[i] = m.mwBlocks[i] } @@ -67,46 +73,37 @@ func (m *mergeWritingBuffer) grow() { func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte { if !AppConfig.GOPCache || !m.existVideo { - return m.mwBlocks[0].Allocate(size) + return m.mwBlocks[0].buffer.Allocate(size) } utils.Assert(ts != -1) //新的切片 if m.startTS == -1 { - if _, ok := m.keyFrameMap[m.index]; ok { - delete(m.keyFrameMap, m.index) - } + m.startTS = ts - if m.mwBlocks[m.index] == nil { + if m.mwBlocks[m.index].buffer == nil { //创建内存块 m.mwBlocks[m.index] = m.createMWBlock(videoKey) } else { //循环使用 - if !videoKey { + m.mwBlocks[m.index].buffer.Clear() - //I帧间隔长, 不够写一组GOP, 扩容! - if len(m.keyFrameMap) < 1 { - capacity := len(m.mwBlocks) - m.grow() - m.index = capacity - - m.mwBlocks[m.index] = m.createMWBlock(videoKey) - } + if m.mwBlocks[m.index].keyVideo { + m.keyFrameCount-- } - - m.mwBlocks[m.index].Clear() } - m.startTS = ts + m.mwBlocks[m.index].free = false + m.mwBlocks[m.index].keyVideo = videoKey } if videoKey { //请务必确保关键帧帧从新的切片开始 //外部遇到关键帧请先调用FlushSegment - utils.Assert(m.mwBlocks[m.index].IsEmpty()) + utils.Assert(m.mwBlocks[m.index].buffer.IsEmpty()) m.lastKeyFrameIndex = m.index - m.keyFrameMap[m.index] = m.index + m.keyFrameCount++ } if ts < m.startTS { @@ -114,15 +111,17 @@ func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte } m.duration = int(ts - m.startTS) - return m.mwBlocks[m.index].Allocate(size) + return m.mwBlocks[m.index].buffer.Allocate(size) } func (m *mergeWritingBuffer) FlushSegment() []byte { - if m.mwBlocks[m.index] == nil { + if !AppConfig.GOPCache || !m.existVideo { + return nil + } else if m.mwBlocks[m.index].buffer == nil || m.mwBlocks[m.index].free { return nil } - data, _ := m.mwBlocks[m.index].Data() + data, _ := m.mwBlocks[m.index].buffer.Data() if len(data) == 0 { return nil } @@ -134,16 +133,22 @@ func (m *mergeWritingBuffer) FlushSegment() []byte { m.nonKeyFrameBufferMaxLength = len(data) * 3 / 2 } - m.index = (m.index + 1) % cap(m.mwBlocks) + capacity := cap(m.mwBlocks) + if m.index+1 == capacity && m.keyFrameCount == 1 { + m.grow() + } + + m.index = (m.index + 1) % capacity m.startTS = -1 m.duration = 0 + m.mwBlocks[m.index].free = true return data } func (m *mergeWritingBuffer) PeekCompletedSegment() []byte { if !AppConfig.GOPCache || !m.existVideo { - data, _ := m.mwBlocks[0].Data() - m.mwBlocks[0].Clear() + data, _ := m.mwBlocks[0].buffer.Data() + m.mwBlocks[0].buffer.Clear() return data } @@ -163,38 +168,33 @@ func (m *mergeWritingBuffer) IsFull(ts int64) bool { } func (m *mergeWritingBuffer) IsNewSegment() bool { - if m.mwBlocks[m.index] == nil { - return true - } - - data, _ := m.mwBlocks[m.index].Data() - return len(data) == 0 + return m.mwBlocks[m.index].buffer == nil || m.mwBlocks[m.index].free } func (m *mergeWritingBuffer) Reserve(number int) { - utils.Assert(m.mwBlocks[m.index] != nil) + utils.Assert(m.mwBlocks[m.index].buffer != nil) - m.mwBlocks[m.index].Reserve(number) + m.mwBlocks[m.index].buffer.Reserve(number) } func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) { - if m.lastKeyFrameIndex < 0 || m.index == m.lastKeyFrameIndex { + if m.keyFrameCount == 0 { return } for i := m.lastKeyFrameIndex; i < cap(m.mwBlocks); i++ { - if m.mwBlocks[i] == nil { + if m.mwBlocks[i].buffer == nil { continue } - data, _ := m.mwBlocks[i].Data() + data, _ := m.mwBlocks[i].buffer.Data() cb(data) } //回调循环使用的头部数据 if m.index < m.lastKeyFrameIndex { for i := 0; i < m.index; i++ { - data, _ := m.mwBlocks[i].Data() + data, _ := m.mwBlocks[i].buffer.Data() cb(data) } } @@ -202,15 +202,15 @@ func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) { func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { //开启GOP缓存, 输出流也缓存整个GOP - var blocks []collections.MemoryPool + var blocks []mwBlock if existVideo { - blocks = make([]collections.MemoryPool, AppConfig.WriteBufferNumber) + blocks = make([]mwBlock, AppConfig.WriteBufferNumber) } else { - blocks = make([]collections.MemoryPool, 1) + blocks = make([]mwBlock, 1) } if !existVideo || !AppConfig.GOPCache { - blocks[0] = collections.NewDirectMemoryPool(1024 * 100) + blocks[0] = mwBlock{true, false, collections.NewDirectMemoryPool(1024 * 100)} } return &mergeWritingBuffer{ @@ -220,6 +220,5 @@ func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { startTS: -1, lastKeyFrameIndex: -1, existVideo: existVideo, - keyFrameMap: make(map[int]int, 5), } } diff --git a/stream/source.go b/stream/source.go index 4f2def7..2b35c33 100644 --- a/stream/source.go +++ b/stream/source.go @@ -558,10 +558,6 @@ func (s *PublishSource) OnDeMuxStream(stream utils.AVStream) { //启动探测超时计时器 if len(s.originStreams.All()) == 1 { - if AppConfig.ProbeTimeout == 0 { - AppConfig.ProbeTimeout = 2000 - } - s.probeTimer = time.AfterFunc(time.Duration(AppConfig.ProbeTimeout)*time.Millisecond, func() { s.probeTimoutEvent <- true })