支持rtmp合并发送

This commit is contained in:
yangjiechina
2023-12-04 20:51:06 +08:00
parent baebd7d25d
commit fb7c6ac316
6 changed files with 89 additions and 32 deletions

View File

@@ -23,6 +23,8 @@ type TransStream struct {
chunkSizeQueue *stream.Queue
}
var nextOffset int
func (t *TransStream) Input(packet utils.AVPacket) {
utils.Assert(t.TransStreamImpl.Completed)
var data []byte
@@ -115,39 +117,61 @@ func (t *TransStream) Input(packet utils.AVPacket) {
}
t.chunkSizeQueue.Push(len(rtmpData))
if t.lastTs == 0 {
t.transBuffer.Peek(0).(utils.AVPacket).Dts()
}
//if t.lastTs == 0 {
// t.transBuffer.Peek(0).(utils.AVPacket).Dts()
//}
if mergeWriteLatency > t.transBuffer.Peek(t.transBuffer.Size()-1).(utils.AVPacket).Dts()-t.lastTs {
endTs := t.lastTs + mergeWriteLatency
if t.transBuffer.Peek(t.transBuffer.Size()-1).(utils.AVPacket).Dts() < endTs {
return
}
head, tail := t.memoryPool.Data()
queueHead, queueTail := t.chunkSizeQueue.All()
sizeHead, sizeTail := t.chunkSizeQueue.Data()
var offset int
var size int
endTs := t.lastTs + mergeWriteLatency
var chunkSize int
var lastTs int64
var tailIndex int
for i := 0; i < t.transBuffer.Size(); i++ {
pkt := t.transBuffer.Peek(i).(utils.AVPacket)
if pkt.Dts() < t.lastTs {
if i < len(queueHead) {
offset += queueHead[i].(int)
if i < len(sizeHead) {
chunkSize = sizeHead[i].(int)
} else {
offset += queueTail[i+1%len(queueTail)].(int)
chunkSize = sizeTail[tailIndex].(int)
tailIndex++
}
if pkt.Dts() <= t.lastTs && t.lastTs != 0 {
offset += chunkSize
continue
}
if pkt.Dts() > endTs {
break
}
size += len(pkt.Data())
t.lastTs = pkt.Dts()
size += chunkSize
lastTs = pkt.Dts()
}
t.lastTs = lastTs
if nextOffset == 0 {
nextOffset = size
} else {
utils.Assert(offset == nextOffset)
nextOffset += size
}
//后面再优化只发送一次
var data1 []byte
var data2 []byte
if offset > len(head) {
offset -= len(head)
head = tail
tail = nil
}
if offset+size > len(head) {
data1 = head[offset:]
size -= len(head[offset:])
@@ -183,7 +207,8 @@ func (t *TransStream) AddSink(sink stream.ISink) {
func (t *TransStream) onDiscardPacket(pkt interface{}) {
t.memoryPool.FreeHead()
t.chunkSizeQueue.Pop()
size := t.chunkSizeQueue.Pop().(int)
nextOffset -= size
}
func (t *TransStream) WriteHeader() error {
@@ -213,7 +238,7 @@ func (t *TransStream) WriteHeader() error {
t.TransStreamImpl.Completed = true
t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0)
t.memoryPool = stream.NewMemoryPool(1024 * 1000 * (stream.AppConfig.GOPCache + 1))
t.memoryPool = stream.NewMemoryPoolWithRecopy(1024 * 1000 * (stream.AppConfig.GOPCache + 1))
if stream.AppConfig.GOPCache > 0 {
t.transBuffer = stream.NewStreamBuffer(int64(stream.AppConfig.GOPCache * 200))
t.transBuffer.SetDiscardHandler(t.onDiscardPacket)

View File

@@ -39,6 +39,17 @@ func NewMemoryPool(capacity int) MemoryPool {
return pool
}
func NewMemoryPoolWithRecopy(capacity int) MemoryPool {
pool := &memoryPool{
data: make([]byte, capacity),
capacity: capacity,
blockQueue: NewQueue(128),
recopy: true,
}
return pool
}
type memoryPool struct {
data []byte
//实际的可用容量当尾部剩余内存不足以此次Write, 并且头部有足够的空闲内存, 则尾部剩余的内存将不可用.
@@ -50,6 +61,8 @@ type memoryPool struct {
markIndex int
mark bool
blockQueue *Queue
recopy bool
}
// 根据head和tail计算出可用的内存地址
@@ -62,18 +75,28 @@ func (m *memoryPool) allocate(size int) []byte {
m.tail = m.tail - m.markIndex
m.markIndex = 0
} else {
//扩容
capacity := (cap(m.data) + m.tail - m.markIndex + size) * 3 / 2
writeSize := m.tail - m.markIndex
capacity := (cap(m.data) + writeSize + size) * 2
bytes := make([]byte, capacity)
if m.recopy {
//将扩容前的老数据复制到新的内存空间
head, tail := m.Data()
copy(bytes, head)
copy(bytes[len(head):], tail)
m.tail = len(head) + len(tail)
m.markIndex = m.tail - writeSize
} else {
//不对之前的内存进行复制, 已经被AVPacket引用, 自行GC
copy(bytes, m.data[m.markIndex:m.tail])
m.tail = writeSize
m.markIndex = 0
}
m.data = bytes
m.capacity = capacity
m.tail = m.tail - m.markIndex
m.markIndex = 0
m.head = 0
}
}
@@ -132,7 +155,6 @@ func (m *memoryPool) FreeTail() {
size := m.blockQueue.PopBack().(int)
m.tail -= size
if m.tail == 0 && !m.blockQueue.IsEmpty() {
m.tail = m.capacity
}
@@ -140,7 +162,7 @@ func (m *memoryPool) FreeTail() {
func (m *memoryPool) Data() ([]byte, []byte) {
if m.tail <= m.head {
return m.data[m.head:], m.data[:m.tail]
return m.data[m.head:m.capacity], m.data[:m.tail]
} else {
return m.data[m.head:m.tail], nil
}

View File

@@ -20,7 +20,7 @@ func NewQueue(capacity int) *Queue {
func (q *Queue) Push(value interface{}) {
if q.ringBuffer.IsFull() {
newArray := make([]interface{}, q.ringBuffer.Size()*2)
head, tail := q.ringBuffer.All()
head, tail := q.ringBuffer.Data()
copy(newArray, head)
if tail != nil {
copy(newArray[len(head):], tail)

View File

@@ -19,7 +19,7 @@ type RingBuffer interface {
Size() int
All() ([]interface{}, []interface{})
Data() ([]interface{}, []interface{})
}
func NewRingBuffer(capacity int) RingBuffer {
@@ -73,18 +73,24 @@ func (r *ringBuffer) Pop() interface{} {
}
func (r *ringBuffer) Head() interface{} {
utils.Assert(!r.IsEmpty())
return r.data[r.head]
}
func (r *ringBuffer) Tail() interface{} {
return r.data[utils.MaxInt(0, r.tail-1)]
utils.Assert(!r.IsEmpty())
if r.tail > 0 {
return r.data[r.tail-1]
} else {
return r.data[cap(r.data)-1]
}
}
func (r *ringBuffer) Size() int {
return r.size
}
func (r *ringBuffer) All() ([]interface{}, []interface{}) {
func (r *ringBuffer) Data() ([]interface{}, []interface{}) {
if r.size == 0 {
return nil, nil
}

View File

@@ -257,7 +257,7 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
if min == 0xFFFFFFFF {
min = v
} else if v < min {
v = min
min = v
}
}
@@ -274,6 +274,10 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
for i := indexs[index]; i < buffer.Size(); i++ {
packet := buffer.Peek(i).(utils.AVPacket)
if packet.Dts() > min {
break
}
transStream.Input(packet)
indexs[index]++
}

View File

@@ -86,7 +86,7 @@ func (s *streamBuffer) SetDiscardHandler(handler func(packet interface{})) {
func (s *streamBuffer) Peek(index int) interface{} {
utils.Assert(index < s.buffer.Size())
head, tail := s.buffer.All()
head, tail := s.buffer.Data()
if index < len(head) {
return head[index].(element).pkt
@@ -96,7 +96,7 @@ func (s *streamBuffer) Peek(index int) interface{} {
}
func (s *streamBuffer) PeekAll(handler func(packet interface{})) {
head, tail := s.buffer.All()
head, tail := s.buffer.Data()
if head == nil {
return