mirror of
https://github.com/lkmio/lkm.git
synced 2025-10-05 07:06:57 +08:00
完善内存池扩容
This commit is contained in:
@@ -18,7 +18,7 @@ type UDPSource struct {
|
||||
func NewUDPSource() *UDPSource {
|
||||
return &UDPSource{
|
||||
rtpDeMuxer: jitterbuffer.New(),
|
||||
rtpBuffer: stream.NewMemoryPoolWithDirect(JitterBufferSize, true),
|
||||
rtpBuffer: stream.NewDirectMemoryPool(JitterBufferSize),
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -5,7 +5,8 @@ import (
|
||||
)
|
||||
|
||||
// MemoryPool 从解复用阶段,拼凑成完整的AVPacket开始(写),到GOP缓存结束(释放),整个过程都使用池中内存
|
||||
// 类似环形缓冲区, 区别在于,写入的内存块是连续的、整块内存.
|
||||
// 类似环形缓冲区, 区别在于,内存块是连续的、整块内存.
|
||||
// AVPacket缓存使用memorypool_rb, 允许回环(内存必须完整). tranStream使用memorypool_direct, 连续一块完整的内存, 否则与合并缓存写的观念背道而驰.
|
||||
// 两种使用方式:
|
||||
// 1. 已知需要分配内存大小, 直接使用Allocate()函数分配, 并且外部自行操作内存块
|
||||
// 2. 未知分配内存大小, 先使用Mark()函数,标记内存起始偏移量, 再通过Write()函数将数据拷贝进内存块,最后调用Fetch/Reset函数完成或释放内存块
|
||||
@@ -24,21 +25,22 @@ type MemoryPool interface {
|
||||
// Fetch 获取当前内存块,必须先调用Mark函数
|
||||
Fetch() []byte
|
||||
|
||||
// Reset 清空写入的数据,本次缓存的数据无效
|
||||
// Reset 清空本次写入的数据,本次缓存的数据无效
|
||||
Reset()
|
||||
|
||||
// Reserve 保留指定大小的内存空间
|
||||
// Reserve 预留指定大小的内存空间
|
||||
//主要是为了和实现和Write相似功能,但是不拷贝, 所以使用流程和Write一样.
|
||||
Reserve(size int)
|
||||
|
||||
// FreeHead 从头部释放指定大小内存
|
||||
// FreeHead 从头部释放一块内存
|
||||
FreeHead()
|
||||
|
||||
// FreeTail 从尾部释放指定大小内存
|
||||
// FreeTail 从尾部释放一块内存
|
||||
FreeTail()
|
||||
|
||||
Data() ([]byte, []byte)
|
||||
|
||||
// Clear 清空所有内存块
|
||||
Clear()
|
||||
|
||||
Empty() bool
|
||||
@@ -48,111 +50,77 @@ type MemoryPool interface {
|
||||
Size() int
|
||||
}
|
||||
|
||||
func NewMemoryPool(capacity int) MemoryPool {
|
||||
pool := &memoryPool{
|
||||
data: make([]byte, capacity),
|
||||
capacity: capacity,
|
||||
blockQueue: NewQueue(128),
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
func NewMemoryPoolWithRecopy(capacity int) MemoryPool {
|
||||
pool := &memoryPool{
|
||||
data: make([]byte, capacity),
|
||||
capacity: capacity,
|
||||
blockQueue: NewQueue(128),
|
||||
recopy: true,
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
func NewMemoryPoolWithDirect(capacity int, recopy bool) MemoryPool {
|
||||
pool := &memoryPool{
|
||||
data: make([]byte, capacity),
|
||||
capacity: capacity,
|
||||
blockQueue: NewQueue(128),
|
||||
recopy: recopy,
|
||||
direct: true,
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
type memoryPool struct {
|
||||
data []byte
|
||||
//实际的可用容量,当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用.
|
||||
capacity int
|
||||
capacity int //实际的可用容量,当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用.
|
||||
head int
|
||||
tail int
|
||||
|
||||
//保存开始索引
|
||||
markIndex int
|
||||
mark bool
|
||||
markIndex int //保存开始索引
|
||||
marked bool
|
||||
blockQueue *Queue
|
||||
discardBlockCount int
|
||||
recopy bool //扩容时,是否拷贝旧数据. 缓存AVPacket时, 内存已经被Data引用,所以不需要再拷贝旧数据. 用作合并写缓存时, 流还没有发送使用, 需要拷贝旧数据.
|
||||
isFull func(int) bool
|
||||
}
|
||||
|
||||
recopy bool
|
||||
direct bool
|
||||
func (m *memoryPool) grow(size int) {
|
||||
//1.5倍扩容
|
||||
newData := make([]byte, (cap(m.data)+size)*3/2)
|
||||
//未写入缓冲区大小
|
||||
flushSize := m.tail - m.markIndex
|
||||
//拷贝之前的数据
|
||||
if m.recopy {
|
||||
head, tail := m.Data()
|
||||
copy(newData, head)
|
||||
copy(newData[len(head):], tail)
|
||||
|
||||
m.head = 0
|
||||
m.tail = len(head) + len(tail)
|
||||
m.markIndex = m.tail - flushSize
|
||||
} else {
|
||||
//只拷贝本回合数据
|
||||
copy(newData, m.data[m.tail-flushSize:m.tail])
|
||||
//丢弃之前的内存块
|
||||
m.discardBlockCount += m.blockQueue.Size()
|
||||
m.blockQueue.Clear()
|
||||
|
||||
m.head = 0
|
||||
m.tail = flushSize
|
||||
m.markIndex = 0
|
||||
}
|
||||
|
||||
m.data = newData
|
||||
m.capacity = cap(newData)
|
||||
}
|
||||
|
||||
// 根据head和tail计算出可用的内存地址
|
||||
func (m *memoryPool) allocate(size int) []byte {
|
||||
if m.capacity-m.tail < size {
|
||||
//使用从头释放的内存
|
||||
if !m.direct && m.tail-m.markIndex+size <= m.head {
|
||||
copy(m.data, m.data[m.markIndex:m.tail])
|
||||
m.capacity = m.markIndex
|
||||
m.tail = m.tail - m.markIndex
|
||||
m.markIndex = 0
|
||||
} else {
|
||||
//扩容
|
||||
writeSize := m.tail - m.markIndex
|
||||
capacity := (cap(m.data) + writeSize + size) * 2
|
||||
bytes := make([]byte, capacity)
|
||||
|
||||
if m.recopy {
|
||||
//将扩容前的老数据复制到新的内存空间
|
||||
head, tail := m.Data()
|
||||
copy(bytes, head)
|
||||
copy(bytes[len(head):], tail)
|
||||
m.tail = len(head) + len(tail)
|
||||
m.markIndex = m.tail - writeSize
|
||||
} else {
|
||||
//不对之前的内存进行复制, 已经被AVPacket引用, 自行GC
|
||||
copy(bytes, m.data[m.markIndex:m.tail])
|
||||
m.tail = writeSize
|
||||
m.markIndex = 0
|
||||
if m.isFull(size) {
|
||||
m.grow(size)
|
||||
}
|
||||
|
||||
m.data = bytes
|
||||
m.capacity = capacity
|
||||
m.head = 0
|
||||
}
|
||||
}
|
||||
|
||||
bytes := m.data[m.tail:]
|
||||
bytes := m.data[m.tail : m.tail+size]
|
||||
m.tail += size
|
||||
return bytes
|
||||
}
|
||||
|
||||
func (m *memoryPool) Mark() {
|
||||
utils.Assert(!m.mark)
|
||||
utils.Assert(!m.marked)
|
||||
|
||||
m.markIndex = m.tail
|
||||
m.mark = true
|
||||
m.marked = true
|
||||
}
|
||||
|
||||
func (m *memoryPool) Write(data []byte) {
|
||||
utils.Assert(m.mark)
|
||||
utils.Assert(m.marked)
|
||||
|
||||
allocate := m.allocate(len(data))
|
||||
copy(allocate, data)
|
||||
}
|
||||
|
||||
func (m *memoryPool) Reserve(size int) {
|
||||
utils.Assert(m.mark)
|
||||
utils.Assert(m.marked)
|
||||
_ = m.allocate(size)
|
||||
}
|
||||
|
||||
@@ -163,23 +131,28 @@ func (m *memoryPool) Allocate(size int) []byte {
|
||||
}
|
||||
|
||||
func (m *memoryPool) Fetch() []byte {
|
||||
utils.Assert(m.mark)
|
||||
utils.Assert(m.marked)
|
||||
|
||||
m.mark = false
|
||||
m.marked = false
|
||||
size := m.tail - m.markIndex
|
||||
m.blockQueue.Push(size)
|
||||
return m.data[m.markIndex:m.tail]
|
||||
}
|
||||
|
||||
func (m *memoryPool) Reset() {
|
||||
m.mark = false
|
||||
m.marked = false
|
||||
m.tail = m.markIndex
|
||||
}
|
||||
|
||||
func (m *memoryPool) FreeHead() {
|
||||
utils.Assert(!m.mark)
|
||||
utils.Assert(!m.marked)
|
||||
utils.Assert(!m.blockQueue.IsEmpty())
|
||||
|
||||
if m.discardBlockCount > 1 {
|
||||
m.discardBlockCount--
|
||||
return
|
||||
}
|
||||
|
||||
size := m.blockQueue.Pop().(int)
|
||||
m.head += size
|
||||
|
||||
@@ -192,9 +165,14 @@ func (m *memoryPool) FreeHead() {
|
||||
}
|
||||
|
||||
func (m *memoryPool) FreeTail() {
|
||||
utils.Assert(!m.mark)
|
||||
utils.Assert(!m.marked)
|
||||
utils.Assert(!m.blockQueue.IsEmpty())
|
||||
|
||||
if m.discardBlockCount > 1 {
|
||||
m.discardBlockCount--
|
||||
return
|
||||
}
|
||||
|
||||
size := m.blockQueue.PopBack().(int)
|
||||
m.tail -= size
|
||||
if m.tail == 0 && !m.blockQueue.IsEmpty() {
|
||||
@@ -216,13 +194,14 @@ func (m *memoryPool) Clear() {
|
||||
m.tail = 0
|
||||
|
||||
m.markIndex = 0
|
||||
m.mark = false
|
||||
m.marked = false
|
||||
|
||||
m.blockQueue.Clear()
|
||||
m.discardBlockCount = 0
|
||||
}
|
||||
|
||||
func (m *memoryPool) Empty() bool {
|
||||
utils.Assert(!m.mark)
|
||||
utils.Assert(!m.marked)
|
||||
return m.blockQueue.Size() < 1
|
||||
}
|
||||
|
||||
|
@@ -13,7 +13,7 @@ func TestMemoryPool(t *testing.T) {
|
||||
bytes[i] = byte(i)
|
||||
}
|
||||
|
||||
pool := NewMemoryPool(5)
|
||||
pool := NewDirectMemoryPool(5)
|
||||
last := uintptr(0)
|
||||
for i := 0; i < 10; i++ {
|
||||
pool.Mark()
|
||||
|
23
stream/memorypool_direct.go
Normal file
23
stream/memorypool_direct.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package stream
|
||||
|
||||
type directMemoryPool struct {
|
||||
*memoryPool
|
||||
}
|
||||
|
||||
func (m *directMemoryPool) isFull(size int) bool {
|
||||
//尾部没有大小合适的内存空间
|
||||
return m.capacity-m.tail < size
|
||||
}
|
||||
|
||||
func NewDirectMemoryPool(capacity int) MemoryPool {
|
||||
pool := &directMemoryPool{}
|
||||
pool.memoryPool = &memoryPool{
|
||||
data: make([]byte, capacity),
|
||||
capacity: capacity,
|
||||
blockQueue: NewQueue(capacity),
|
||||
recopy: true,
|
||||
isFull: pool.isFull,
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
32
stream/memorypool_rb.go
Normal file
32
stream/memorypool_rb.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package stream
|
||||
|
||||
type rbMemoryPool struct {
|
||||
*memoryPool
|
||||
}
|
||||
|
||||
func (m *rbMemoryPool) isFull(size int) bool {
|
||||
//已经回环
|
||||
over := m.tail < m.head
|
||||
if over && m.head-m.tail >= size {
|
||||
//头部有大小合适的内存空间
|
||||
} else if !over && m.capacity-m.tail >= size {
|
||||
//尾部有大小合适的内存空间
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func NewRbMemoryPool(capacity int) MemoryPool {
|
||||
pool := &rbMemoryPool{}
|
||||
pool.memoryPool = &memoryPool{
|
||||
data: make([]byte, capacity),
|
||||
capacity: capacity,
|
||||
blockQueue: NewQueue(capacity),
|
||||
recopy: false,
|
||||
isFull: pool.isFull,
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
@@ -225,15 +225,15 @@ func (s *SourceImpl) FindOrCreatePacketBuffer(index int, mediaType utils.AVMedia
|
||||
|
||||
if s.pktBuffers[index] == nil {
|
||||
if utils.AVMediaTypeAudio == mediaType {
|
||||
s.pktBuffers[index] = NewMemoryPool(48000 * 64)
|
||||
s.pktBuffers[index] = NewRbMemoryPool(48000 * 64)
|
||||
} else if AppConfig.GOPCache {
|
||||
//开启GOP缓存
|
||||
//以每秒钟4M码率大小创建视频内存池
|
||||
s.pktBuffers[index] = NewMemoryPool(4096 * 1024)
|
||||
s.pktBuffers[index] = NewRbMemoryPool(4096 * 1024)
|
||||
} else {
|
||||
//未开启GOP缓存
|
||||
//以每秒钟4M的1/8码率大小创建视频内存池
|
||||
s.pktBuffers[index] = NewMemoryPool(4096 * 1024 / 8)
|
||||
s.pktBuffers[index] = NewRbMemoryPool(4096 * 1024 / 8)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -208,10 +208,10 @@ func (c *CacheTransStream) Init() {
|
||||
c.TransStreamImpl.Init()
|
||||
|
||||
c.StreamBuffers = make([]MemoryPool, 2)
|
||||
c.StreamBuffers[0] = NewMemoryPoolWithDirect(1024*4000, true)
|
||||
c.StreamBuffers[0] = NewDirectMemoryPool(1024 * 4000)
|
||||
|
||||
if c.ExistVideo && AppConfig.MergeWriteLatency > 0 {
|
||||
c.StreamBuffers[1] = NewMemoryPoolWithDirect(1024*4000, true)
|
||||
c.StreamBuffers[1] = NewDirectMemoryPool(1024 * 4000)
|
||||
}
|
||||
|
||||
c.SegmentOffset = 0
|
||||
|
Reference in New Issue
Block a user