修复rtmp内存池缓存失败问题

This commit is contained in:
yangjie
2023-11-26 20:28:14 +08:00
parent f932284313
commit e84df324bf
7 changed files with 139 additions and 43 deletions

View File

@@ -11,15 +11,13 @@ type Publisher struct {
deMuxer libflv.DeMuxer
audioMemoryPool stream.MemoryPool
videoMemoryPool stream.MemoryPool
audioPacket []byte
videoPacket []byte
audioUnmark bool
videoUnmark bool
}
func NewPublisher(sourceId string) *Publisher {
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}}
publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}, audioUnmark: false, videoUnmark: false}
publisher.deMuxer = libflv.DeMuxer{}
//设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
publisher.deMuxer.SetHandler(publisher)
@@ -36,15 +34,17 @@ func NewPublisher(sourceId string) *Publisher {
}
func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) {
//AVStream的Data单独拷贝出来
//释放掉内存池中最新分配的内存
tmp := stream_.Extra()
bytes := make([]byte, len(tmp))
copy(bytes, tmp)
stream_.SetExtraData(bytes)
if utils.AVMediaTypeAudio == stream_.Type() {
p.audioMemoryPool.FreeTail(len(p.audioPacket))
p.audioMemoryPool.FreeTail()
} else if utils.AVMediaTypeVideo == stream_.Type() {
p.videoMemoryPool.FreeTail(len(p.videoPacket))
p.videoMemoryPool.FreeTail()
}
p.SourceImpl.OnDeMuxStream(stream_)
@@ -62,9 +62,9 @@ func (p *Publisher) OnDeMuxPacket(index int, packet utils.AVPacket) {
}
if utils.AVMediaTypeAudio == packet.MediaType() {
p.audioMemoryPool.FreeHead(len(packet.Data()))
p.audioMemoryPool.FreeHead()
} else if utils.AVMediaTypeVideo == packet.MediaType() {
p.videoMemoryPool.FreeHead(len(packet.Data()))
p.videoMemoryPool.FreeHead()
}
}
@@ -79,7 +79,6 @@ func (p *Publisher) OnVideo(data []byte, ts uint32) {
p.videoUnmark = false
}
p.videoPacket = data
_ = p.deMuxer.InputVideo(data, ts)
}
@@ -89,7 +88,6 @@ func (p *Publisher) OnAudio(data []byte, ts uint32) {
p.audioUnmark = false
}
p.audioPacket = data
_ = p.deMuxer.InputAudio(data, ts)
}
@@ -97,7 +95,7 @@ func (p *Publisher) OnAudio(data []byte, ts uint32) {
func (p *Publisher) OnPartPacket(index int, data []byte, first bool) {
//audio
if index == 0 {
if p.audioUnmark {
if !p.audioUnmark {
p.audioMemoryPool.Mark()
p.audioUnmark = true
}
@@ -105,7 +103,7 @@ func (p *Publisher) OnPartPacket(index int, data []byte, first bool) {
p.audioMemoryPool.Write(data)
//video
} else if index == 1 {
if p.videoUnmark {
if !p.videoUnmark {
p.videoMemoryPool.Mark()
p.videoUnmark = true
}

View File

@@ -99,8 +99,8 @@ func (t *TransStream) AddSink(sink stream.ISink) {
}
func (t *TransStream) onDiscardPacket(pkt interface{}) {
bytes := pkt.([]byte)
t.memoryPool.FreeHead(len(bytes))
//bytes := pkt.([]byte)
t.memoryPool.FreeHead()
}
func (t *TransStream) WriteHeader() error {

View File

@@ -21,16 +21,17 @@ type MemoryPool interface {
Reset()
// FreeHead 从头部释放指定大小内存
FreeHead(size int)
FreeHead()
// FreeTail 从尾部释放指定大小内存
FreeTail(size int)
FreeTail()
}
func NewMemoryPool(capacity int) MemoryPool {
pool := &memoryPool{
data: make([]byte, capacity),
capacity: capacity,
blockQueue: NewQueue(128),
}
return pool
@@ -38,37 +39,37 @@ func NewMemoryPool(capacity int) MemoryPool {
type memoryPool struct {
data []byte
ptrStart uintptr
ptrEnd uintptr
//剩余的可用内存空间不足以为此次write
//实际的可用容量当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用.
capacity int
head int
tail int
//保存开始索引
mark int
markIndex int
mark bool
blockQueue *Queue
}
// 根据head和tail计算出可用的内存地址
func (m *memoryPool) allocate(size int) []byte {
if m.capacity-m.tail < size {
//使用从头释放的内存
if m.tail-m.mark+size <= m.head {
copy(m.data, m.data[m.mark:m.tail])
m.capacity = m.mark
m.tail = m.tail - m.mark
m.mark = 0
if m.tail-m.markIndex+size <= m.head {
copy(m.data, m.data[m.markIndex:m.tail])
m.capacity = m.markIndex
m.tail = m.tail - m.markIndex
m.markIndex = 0
} else {
//扩容
capacity := (cap(m.data) + m.tail - m.mark + size) * 3 / 2
capacity := (cap(m.data) + m.tail - m.markIndex + size) * 3 / 2
bytes := make([]byte, capacity)
//不对之前的内存进行复制, 已经被AVPacket引用, 自行GC
copy(bytes, m.data[m.mark:m.tail])
copy(bytes, m.data[m.markIndex:m.tail])
m.data = bytes
m.capacity = capacity
m.tail = m.tail - m.mark
m.mark = 0
m.tail = m.tail - m.markIndex
m.markIndex = 0
m.head = 0
}
@@ -80,28 +81,42 @@ func (m *memoryPool) allocate(size int) []byte {
}
func (m *memoryPool) Mark() {
m.mark = m.tail
m.markIndex = m.tail
m.mark = true
}
func (m *memoryPool) Write(data []byte) {
utils.Assert(m.mark)
allocate := m.allocate(len(data))
copy(allocate, data)
}
func (m *memoryPool) Allocate(size int) []byte {
utils.Assert(m.mark)
return m.allocate(size)
}
func (m *memoryPool) Fetch() []byte {
return m.data[m.mark:m.tail]
utils.Assert(m.mark)
m.mark = false
size := m.tail - m.markIndex
m.blockQueue.Push(size)
return m.data[m.markIndex:m.tail]
}
func (m *memoryPool) Reset() {
m.tail = m.mark
m.mark = false
m.tail = m.markIndex
}
func (m *memoryPool) FreeHead(size int) {
func (m *memoryPool) FreeHead() {
utils.Assert(!m.blockQueue.IsEmpty())
size := m.blockQueue.Pop().(int)
m.head += size
if m.head == m.tail {
m.head = 0
m.tail = 0
@@ -110,7 +125,13 @@ func (m *memoryPool) FreeHead(size int) {
}
}
func (m *memoryPool) FreeTail(size int) {
func (m *memoryPool) FreeTail() {
utils.Assert(!m.blockQueue.IsEmpty())
size := m.blockQueue.PopBack().(int)
m.tail -= size
utils.Assert(m.tail >= 0)
if m.tail == 0 && !m.blockQueue.IsEmpty() {
m.tail = m.capacity
}
}

View File

@@ -2,7 +2,9 @@ package stream
import (
"encoding/hex"
"github.com/yangjiechina/avformat/utils"
"testing"
"unsafe"
)
func TestMemoryPool(t *testing.T) {
@@ -12,14 +14,19 @@ func TestMemoryPool(t *testing.T) {
}
pool := NewMemoryPool(5)
last := uintptr(0)
for i := 0; i < 10; i++ {
pool.Mark()
pool.Write(bytes)
fetch := pool.Fetch()
addr := *(*uintptr)(unsafe.Pointer(&fetch))
if last != 0 {
utils.Assert(last == addr)
}
last = addr
println(hex.Dump(fetch))
if i%2 == 0 {
pool.FreeHead(len(fetch))
}
pool.FreeTail()
}
}

48
stream/queue.go Normal file
View File

@@ -0,0 +1,48 @@
package stream
import "github.com/yangjiechina/avformat/utils"
type Queue struct {
*ringBuffer
}
func NewQueue(capacity int) *Queue {
utils.Assert(capacity > 0)
return &Queue{ringBuffer: &ringBuffer{
data: make([]interface{}, capacity),
head: 0,
tail: 0,
size: 0,
}}
}
func (q *Queue) Push(value interface{}) {
if q.ringBuffer.IsFull() {
newArray := make([]interface{}, q.ringBuffer.Size()*2)
head, tail := q.ringBuffer.All()
copy(newArray, head)
if tail != nil {
copy(newArray[len(head):], tail)
}
q.data = newArray
q.head = 0
q.tail = q.size
}
q.data[q.tail] = value
q.tail = (q.tail + 1) % cap(q.data)
q.size++
}
func (q *Queue) PopBack() interface{} {
utils.Assert(q.size > 0)
value := q.ringBuffer.Tail()
q.size--
q.tail = utils.MaxInt(0, q.tail-1)
return value
}

19
stream/queue_test.go Normal file
View File

@@ -0,0 +1,19 @@
package stream
import (
"fmt"
"testing"
)
func TestQueue(t *testing.T) {
queue := NewQueue(1)
for i := 0; i < 100; i++ {
queue.Push(i)
}
for i := 0; i < 100; i++ {
pop := queue.PopBack()
println(fmt.Sprintf("element:%d", pop.(int)))
}
}

View File

@@ -1,5 +1,7 @@
package stream
import "github.com/yangjiechina/avformat/utils"
type RingBuffer interface {
IsEmpty() bool
@@ -19,6 +21,7 @@ type RingBuffer interface {
}
func NewRingBuffer(capacity int) RingBuffer {
utils.Assert(capacity > 0)
r := &ringBuffer{
data: make([]interface{}, capacity),
head: 0,
@@ -71,7 +74,7 @@ func (r *ringBuffer) Head() interface{} {
}
func (r *ringBuffer) Tail() interface{} {
return r.data[r.tail]
return r.data[utils.MaxInt(0, r.tail-1)]
}
func (r *ringBuffer) Size() int {
@@ -80,8 +83,8 @@ func (r *ringBuffer) Size() int {
func (r *ringBuffer) All() ([]interface{}, []interface{}) {
if r.head < r.tail {
return r.data[r.head:r.tail], nil
} else {
return r.data[r.head:], r.data[:r.tail]
} else {
return r.data[r.head:], nil
}
}