diff --git a/gb28181/source_udp.go b/gb28181/source_udp.go index 8c4c26d..a8efb80 100644 --- a/gb28181/source_udp.go +++ b/gb28181/source_udp.go @@ -18,7 +18,7 @@ type UDPSource struct { func NewUDPSource() *UDPSource { return &UDPSource{ rtpDeMuxer: jitterbuffer.New(), - rtpBuffer: stream.NewMemoryPoolWithDirect(JitterBufferSize, true), + rtpBuffer: stream.NewDirectMemoryPool(JitterBufferSize), } } diff --git a/stream/memory_pool.go b/stream/memory_pool.go index d0845ce..e23b649 100644 --- a/stream/memory_pool.go +++ b/stream/memory_pool.go @@ -5,7 +5,8 @@ import ( ) // MemoryPool 从解复用阶段,拼凑成完整的AVPacket开始(写),到GOP缓存结束(释放),整个过程都使用池中内存 -// 类似环形缓冲区, 区别在于,写入的内存块是连续的、整块内存. +// 类似环形缓冲区, 区别在于,内存块是连续的、整块内存. +// AVPacket缓存使用memorypool_rb, 允许回环(内存必须完整). tranStream使用memorypool_direct, 连续一块完整的内存, 否则与合并缓存写的观念背道而驰. // 两种使用方式: // 1. 已知需要分配内存大小, 直接使用Allocate()函数分配, 并且外部自行操作内存块 // 2. 未知分配内存大小, 先使用Mark()函数,标记内存起始偏移量, 再通过Write()函数将数据拷贝进内存块,最后调用Fetch/Reset函数完成或释放内存块 @@ -24,21 +25,22 @@ type MemoryPool interface { // Fetch 获取当前内存块,必须先调用Mark函数 Fetch() []byte - // Reset 清空写入的数据,本次缓存的数据无效 + // Reset 清空本次写入的数据,本次缓存的数据无效 Reset() - // Reserve 保留指定大小的内存空间 + // Reserve 预留指定大小的内存空间 //主要是为了和实现和Write相似功能,但是不拷贝, 所以使用流程和Write一样. Reserve(size int) - // FreeHead 从头部释放指定大小内存 + // FreeHead 从头部释放一块内存 FreeHead() - // FreeTail 从尾部释放指定大小内存 + // FreeTail 从尾部释放一块内存 FreeTail() Data() ([]byte, []byte) + // Clear 清空所有内存块 Clear() Empty() bool @@ -48,111 +50,77 @@ type MemoryPool interface { Size() int } -func NewMemoryPool(capacity int) MemoryPool { - pool := &memoryPool{ - data: make([]byte, capacity), - capacity: capacity, - blockQueue: NewQueue(128), - } - - return pool -} - -func NewMemoryPoolWithRecopy(capacity int) MemoryPool { - pool := &memoryPool{ - data: make([]byte, capacity), - capacity: capacity, - blockQueue: NewQueue(128), - recopy: true, - } - - return pool -} - -func NewMemoryPoolWithDirect(capacity int, recopy bool) MemoryPool { - pool := &memoryPool{ - data: make([]byte, capacity), - capacity: capacity, - blockQueue: NewQueue(128), - recopy: recopy, - direct: true, - } - - return pool -} - type memoryPool struct { - data []byte - //实际的可用容量,当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用. - capacity int + data []byte + capacity int //实际的可用容量,当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用. head int tail int - //保存开始索引 - markIndex int - mark bool - blockQueue *Queue + markIndex int //保存开始索引 + marked bool + blockQueue *Queue + discardBlockCount int + recopy bool //扩容时,是否拷贝旧数据. 缓存AVPacket时, 内存已经被Data引用,所以不需要再拷贝旧数据. 用作合并写缓存时, 流还没有发送使用, 需要拷贝旧数据. + isFull func(int) bool +} - recopy bool - direct bool +func (m *memoryPool) grow(size int) { + //1.5倍扩容 + newData := make([]byte, (cap(m.data)+size)*3/2) + //未写入缓冲区大小 + flushSize := m.tail - m.markIndex + //拷贝之前的数据 + if m.recopy { + head, tail := m.Data() + copy(newData, head) + copy(newData[len(head):], tail) + + m.head = 0 + m.tail = len(head) + len(tail) + m.markIndex = m.tail - flushSize + } else { + //只拷贝本回合数据 + copy(newData, m.data[m.tail-flushSize:m.tail]) + //丢弃之前的内存块 + m.discardBlockCount += m.blockQueue.Size() + m.blockQueue.Clear() + + m.head = 0 + m.tail = flushSize + m.markIndex = 0 + } + + m.data = newData + m.capacity = cap(newData) } // 根据head和tail计算出可用的内存地址 func (m *memoryPool) allocate(size int) []byte { - if m.capacity-m.tail < size { - //使用从头释放的内存 - if !m.direct && m.tail-m.markIndex+size <= m.head { - copy(m.data, m.data[m.markIndex:m.tail]) - m.capacity = m.markIndex - m.tail = m.tail - m.markIndex - m.markIndex = 0 - } else { - //扩容 - writeSize := m.tail - m.markIndex - capacity := (cap(m.data) + writeSize + size) * 2 - bytes := make([]byte, capacity) - - if m.recopy { - //将扩容前的老数据复制到新的内存空间 - head, tail := m.Data() - copy(bytes, head) - copy(bytes[len(head):], tail) - m.tail = len(head) + len(tail) - m.markIndex = m.tail - writeSize - } else { - //不对之前的内存进行复制, 已经被AVPacket引用, 自行GC - copy(bytes, m.data[m.markIndex:m.tail]) - m.tail = writeSize - m.markIndex = 0 - } - - m.data = bytes - m.capacity = capacity - m.head = 0 - } + if m.isFull(size) { + m.grow(size) } - bytes := m.data[m.tail:] + bytes := m.data[m.tail : m.tail+size] m.tail += size return bytes } func (m *memoryPool) Mark() { - utils.Assert(!m.mark) + utils.Assert(!m.marked) m.markIndex = m.tail - m.mark = true + m.marked = true } func (m *memoryPool) Write(data []byte) { - utils.Assert(m.mark) + utils.Assert(m.marked) allocate := m.allocate(len(data)) copy(allocate, data) } func (m *memoryPool) Reserve(size int) { - utils.Assert(m.mark) + utils.Assert(m.marked) _ = m.allocate(size) } @@ -163,23 +131,28 @@ func (m *memoryPool) Allocate(size int) []byte { } func (m *memoryPool) Fetch() []byte { - utils.Assert(m.mark) + utils.Assert(m.marked) - m.mark = false + m.marked = false size := m.tail - m.markIndex m.blockQueue.Push(size) return m.data[m.markIndex:m.tail] } func (m *memoryPool) Reset() { - m.mark = false + m.marked = false m.tail = m.markIndex } func (m *memoryPool) FreeHead() { - utils.Assert(!m.mark) + utils.Assert(!m.marked) utils.Assert(!m.blockQueue.IsEmpty()) + if m.discardBlockCount > 1 { + m.discardBlockCount-- + return + } + size := m.blockQueue.Pop().(int) m.head += size @@ -192,9 +165,14 @@ func (m *memoryPool) FreeHead() { } func (m *memoryPool) FreeTail() { - utils.Assert(!m.mark) + utils.Assert(!m.marked) utils.Assert(!m.blockQueue.IsEmpty()) + if m.discardBlockCount > 1 { + m.discardBlockCount-- + return + } + size := m.blockQueue.PopBack().(int) m.tail -= size if m.tail == 0 && !m.blockQueue.IsEmpty() { @@ -216,13 +194,14 @@ func (m *memoryPool) Clear() { m.tail = 0 m.markIndex = 0 - m.mark = false + m.marked = false m.blockQueue.Clear() + m.discardBlockCount = 0 } func (m *memoryPool) Empty() bool { - utils.Assert(!m.mark) + utils.Assert(!m.marked) return m.blockQueue.Size() < 1 } diff --git a/stream/memory_pool_test.go b/stream/memory_pool_test.go index ef2a254..eee2a6e 100644 --- a/stream/memory_pool_test.go +++ b/stream/memory_pool_test.go @@ -13,7 +13,7 @@ func TestMemoryPool(t *testing.T) { bytes[i] = byte(i) } - pool := NewMemoryPool(5) + pool := NewDirectMemoryPool(5) last := uintptr(0) for i := 0; i < 10; i++ { pool.Mark() diff --git a/stream/memorypool_direct.go b/stream/memorypool_direct.go new file mode 100644 index 0000000..bf1edcc --- /dev/null +++ b/stream/memorypool_direct.go @@ -0,0 +1,23 @@ +package stream + +type directMemoryPool struct { + *memoryPool +} + +func (m *directMemoryPool) isFull(size int) bool { + //尾部没有大小合适的内存空间 + return m.capacity-m.tail < size +} + +func NewDirectMemoryPool(capacity int) MemoryPool { + pool := &directMemoryPool{} + pool.memoryPool = &memoryPool{ + data: make([]byte, capacity), + capacity: capacity, + blockQueue: NewQueue(capacity), + recopy: true, + isFull: pool.isFull, + } + + return pool +} diff --git a/stream/memorypool_rb.go b/stream/memorypool_rb.go new file mode 100644 index 0000000..f5dc158 --- /dev/null +++ b/stream/memorypool_rb.go @@ -0,0 +1,32 @@ +package stream + +type rbMemoryPool struct { + *memoryPool +} + +func (m *rbMemoryPool) isFull(size int) bool { + //已经回环 + over := m.tail < m.head + if over && m.head-m.tail >= size { + //头部有大小合适的内存空间 + } else if !over && m.capacity-m.tail >= size { + //尾部有大小合适的内存空间 + } else { + return true + } + + return false +} + +func NewRbMemoryPool(capacity int) MemoryPool { + pool := &rbMemoryPool{} + pool.memoryPool = &memoryPool{ + data: make([]byte, capacity), + capacity: capacity, + blockQueue: NewQueue(capacity), + recopy: false, + isFull: pool.isFull, + } + + return pool +} diff --git a/stream/source.go b/stream/source.go index 635fae1..7adee40 100644 --- a/stream/source.go +++ b/stream/source.go @@ -225,15 +225,15 @@ func (s *SourceImpl) FindOrCreatePacketBuffer(index int, mediaType utils.AVMedia if s.pktBuffers[index] == nil { if utils.AVMediaTypeAudio == mediaType { - s.pktBuffers[index] = NewMemoryPool(48000 * 64) + s.pktBuffers[index] = NewRbMemoryPool(48000 * 64) } else if AppConfig.GOPCache { //开启GOP缓存 //以每秒钟4M码率大小创建视频内存池 - s.pktBuffers[index] = NewMemoryPool(4096 * 1024) + s.pktBuffers[index] = NewRbMemoryPool(4096 * 1024) } else { //未开启GOP缓存 //以每秒钟4M的1/8码率大小创建视频内存池 - s.pktBuffers[index] = NewMemoryPool(4096 * 1024 / 8) + s.pktBuffers[index] = NewRbMemoryPool(4096 * 1024 / 8) } } diff --git a/stream/trans_stream.go b/stream/trans_stream.go index a5ba311..74bf368 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -208,10 +208,10 @@ func (c *CacheTransStream) Init() { c.TransStreamImpl.Init() c.StreamBuffers = make([]MemoryPool, 2) - c.StreamBuffers[0] = NewMemoryPoolWithDirect(1024*4000, true) + c.StreamBuffers[0] = NewDirectMemoryPool(1024 * 4000) if c.ExistVideo && AppConfig.MergeWriteLatency > 0 { - c.StreamBuffers[1] = NewMemoryPoolWithDirect(1024*4000, true) + c.StreamBuffers[1] = NewDirectMemoryPool(1024 * 4000) } c.SegmentOffset = 0