refactor: 合并写和异步推流

This commit is contained in:
ydajiang
2025-04-11 15:19:19 +08:00
parent 4ec0912340
commit bb1f5eba35
9 changed files with 316 additions and 192 deletions

View File

@@ -1,6 +1,5 @@
{
"gop_cache": true,
"gop_buffer_size": 8192000,
"probe_timeout": 2000,
"mw_latency": 350,
"listen_ip" : "0.0.0.0",

View File

@@ -43,8 +43,8 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e
}
}
// 关键帧都放在切片头部,所以遇到关键帧创建新切片, 发送当前切片剩余流
if videoKey && !t.MWBuffer.IsNewSegment() {
// 关键帧都放在切片头部,所以遇到关键帧创建新切片
if videoKey && !t.MWBuffer.IsNewSegment() && t.MWBuffer.HasVideoDataInCurrentSegment() {
segment, key := t.flushSegment()
t.AppendOutStreamBuffer(segment)
keyBuffer = key
@@ -58,22 +58,35 @@ func (t *TransStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e
separatorSize = HttpFlvBlockHeaderSize
// 10字节描述flv包长, 前2个字节描述无效字节长度
n = HttpFlvBlockHeaderSize
}
// 切片末尾, 预留换行符
if t.MWBuffer.IsFull(dts) {
} else if t.MWBuffer.ShouldFlush(dts) {
// 切片末尾, 预留换行符
separatorSize += 2
}
// 分配block
bytes := t.MWBuffer.Allocate(separatorSize+flvTagSize, dts, videoKey)
// 分配指定大小的内存
bytes, ok := t.MWBuffer.TryAlloc(separatorSize+flvTagSize, dts, utils.AVMediaTypeVideo == packet.MediaType, videoKey)
if !ok {
segment, key := t.flushSegment()
t.AppendOutStreamBuffer(segment)
if !keyBuffer {
keyBuffer = key
}
bytes, ok = t.MWBuffer.TryAlloc(HttpFlvBlockHeaderSize+flvTagSize, dts, utils.AVMediaTypeVideo == packet.MediaType, videoKey)
n = HttpFlvBlockHeaderSize
utils.Assert(ok)
}
// 写flv tag
n += t.Muxer.Input(bytes[n:], packet.MediaType, len(data), dts, pts, false, frameType)
copy(bytes[n:], data)
// 合并写满再发
if segment, key := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
keyBuffer = key
if segment, key := t.MWBuffer.TryFlushSegment(); len(segment) > 0 {
if !keyBuffer {
keyBuffer = key
}
// 已经分配末尾换行符内存, 直接添加
t.AppendOutStreamBuffer(FormatSegment(segment))
}

View File

@@ -22,7 +22,7 @@ func (f *ForwardStream) WrapData(data []byte) []byte {
return block
}
func (f *ForwardStream) OutStreamBufferCapacity() int {
func (f *ForwardStream) Capacity() int {
return f.buffer.BlockCount()
}

View File

@@ -59,11 +59,10 @@ func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e
payloadSize += dataHeaderSize + len(data)
// 遇到视频关键帧, 发送剩余的流, 创建新切片
if videoKey {
if segment, key := t.MWBuffer.FlushSegment(); len(segment) > 0 {
keyBuffer = key
t.AppendOutStreamBuffer(segment)
}
if videoKey && !t.MWBuffer.IsNewSegment() && t.MWBuffer.HasVideoDataInCurrentSegment() {
segment, key := t.MWBuffer.FlushSegment()
t.AppendOutStreamBuffer(segment)
keyBuffer = key
}
// type为0的header大小
@@ -78,7 +77,17 @@ func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e
}
// 分配指定大小的内存
bytes := t.MWBuffer.Allocate(totalSize, dts, videoKey)
bytes, ok := t.MWBuffer.TryAlloc(totalSize, dts, videoPkt, videoKey)
if !ok {
segment, key := t.MWBuffer.FlushSegment()
if !keyBuffer {
keyBuffer = key
}
t.AppendOutStreamBuffer(segment)
bytes, ok = t.MWBuffer.TryAlloc(totalSize, dts, videoPkt, videoKey)
utils.Assert(ok)
}
// 写第一个type为0的chunk sequenceHeader
chunk.Length = payloadSize
@@ -97,7 +106,7 @@ func (t *transStream) Input(packet *avformat.AVPacket) ([][]byte, int64, bool, e
utils.Assert(len(bytes) == n)
// 合并写满了再发
if segment, key := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
if segment, key := t.MWBuffer.TryFlushSegment(); len(segment) > 0 {
keyBuffer = key
t.AppendOutStreamBuffer(segment)
}

View File

@@ -264,9 +264,8 @@ func init() {
// AppConfig_ GOP缓存和合并写开关必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写.
type AppConfig_ struct {
GOPCache bool `json:"gop_cache"` // 是否开启GOP缓存只缓存一组音视频
GOPBufferSize int `json:"gop_buffer_size"` // 预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
ProbeTimeout int `json:"probe_timeout"` // 收流解析AVStream的超时时间
GOPCache bool `json:"gop_cache"` // 是否开启GOP缓存只缓存一组音视频
ProbeTimeout int `json:"probe_timeout"` // 收流解析AVStream的超时时间
PublicIP string `json:"public_ip"`
ListenIP string `json:"listen_ip"`
IdleTimeout int64 `json:"idle_timeout"` // 多长时间(单位秒)没有拉流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
@@ -306,14 +305,12 @@ func LoadConfigFile(path string) (*AppConfig_, error) {
func SetDefaultConfig(config *AppConfig_) {
if !config.GOPCache {
config.GOPCache = true
config.GOPBufferSize = 8196 * 1024
config.MergeWriteLatency = 350
log.Sugar.Warnf("强制开启GOP缓存")
}
config.GOPBufferSize = limitInt(4096*1024/8, 2048*1024*10, config.GOPBufferSize) // 最低4M码率 最高160M码率
config.MergeWriteLatency = limitInt(350, 2000, config.MergeWriteLatency) // 最低缓存350毫秒数据才发送 最高缓存2秒数据才发送
config.ProbeTimeout = limitInt(2000, 5000, config.MergeWriteLatency) // 2-5秒内必须解析完AVStream
config.MergeWriteLatency = limitInt(350, 2000, config.MergeWriteLatency) // 最低缓存350毫秒数据才发送 最高缓存2秒数据才发送
config.ProbeTimeout = limitInt(2000, 5000, config.MergeWriteLatency) // 2-5秒内必须解析完AVStream
config.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config.Log.Level)
config.Log.MaxSize = limitMin(1, config.Log.MaxSize)

View File

@@ -1,27 +1,30 @@
package stream
import (
"github.com/lkmio/avformat/bufio"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
)
const (
DefaultMBBufferSize = 20
BlockBufferSize = 1024 * 1024 * 2
BlockBufferCount = 4
)
// MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存
// 包含多个合并写块, 循环使用, 至少需要等到第二个I帧才开始循环. webrtcI帧间隔可能会高达几十秒,
type MergeWritingBuffer interface {
Allocate(size int, ts int64, videoKey bool) []byte
TryGrow() bool
// PeekCompletedSegment 返回当前完整切片, 以及是否是关键帧切片, 非完整切片返回nil.
PeekCompletedSegment() ([]byte, bool)
TryAlloc(size int, ts int64, videoPkt, videoKey bool) ([]byte, bool)
// TryFlushSegment 尝试生成切片, 如果时长不足, 返回nil
TryFlushSegment() ([]byte, bool)
// FlushSegment 生成并返回当前切片, 以及是否是关键帧切片.
FlushSegment() ([]byte, bool)
// IsFull 当前切片已满
IsFull(ts int64) bool
// ShouldFlush 当前切片是否已达到生成条件
ShouldFlush(ts int64) bool
// IsNewSegment 当前切片是否还未写数据
IsNewSegment() bool
@@ -33,83 +36,180 @@ type MergeWritingBuffer interface {
ReadSegmentsFromKeyFrameIndex(cb func([]byte))
Capacity() int
}
type mwBlock struct {
free bool
keyVideo bool
buffer collections.MemoryPool
completed bool
Time int64
HasVideoDataInCurrentSegment() bool
}
type mergeWritingBuffer struct {
mwBlocks []mwBlock
index int // 当前切片位于mwBlocks的索引
buffers []struct {
buffer collections.BlockBuffer
nextSegmentDataSize int
preSegmentsDataSize int
preSegmentCount int
prevSegments *collections.Queue[struct {
data []byte
key bool
}]
segments *collections.Queue[struct {
data []byte
key bool
}]
}
index int // 当前使用内存池的索引
startTS int64 // 当前切片的开始时间
duration int // 当前切片时长
lastKeyFrameIndex int // 最近的关键帧所在切片的索引
keyFrameCount int // 关键帧计数
existVideo bool // 是否存在视频
keyFrameBufferMaxLength int
nonKeyFrameBufferMaxLength int
hasKeyVideoDataInCurrentSegment bool // 当前切片是否存在关键视频帧
hasVideoDataInCurrentSegment bool // 当前切片是否存在视频帧
completedKeyVideoSegmentPositions []int64 // 完整视频关键帧切片的数量
existVideo bool // 是否存在视频
segmentCount int // 切片数量
}
func (m *mergeWritingBuffer) createMWBlock(videoKey bool) mwBlock {
if videoKey {
return mwBlock{true, videoKey, collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength), false, 0}
func (m *mergeWritingBuffer) createBuffer(minSize int) collections.BlockBuffer {
var size int
if !m.existVideo {
size = 1024 * 500
} else {
return mwBlock{true, false, collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength), false, 0}
size = BlockBufferSize
}
return collections.NewDirectBlockBuffer(bufio.MaxInt(size, minSize))
}
func (m *mergeWritingBuffer) grow(minSize int) {
m.buffers = append(m.buffers, struct {
buffer collections.BlockBuffer
nextSegmentDataSize int
preSegmentsDataSize int
preSegmentCount int
prevSegments *collections.Queue[struct {
data []byte
key bool
}]
segments *collections.Queue[struct {
data []byte
key bool
}]
}{buffer: m.createBuffer(minSize), prevSegments: collections.NewQueue[struct {
data []byte
key bool
}](64), segments: collections.NewQueue[struct {
data []byte
key bool
}](64)})
}
func (m *mergeWritingBuffer) TryGrow() bool {
var ok bool
if !m.existVideo {
ok = len(m.buffers) < 1
} else {
ok = len(m.buffers) < BlockBufferCount
}
if ok {
m.grow(0)
}
return ok
}
func (m *mergeWritingBuffer) RemoveSegment() {
segment := m.buffers[m.index].prevSegments.Pop()
m.buffers[m.index].nextSegmentDataSize += len(segment.data)
m.segmentCount--
if segment.key {
m.completedKeyVideoSegmentPositions = m.completedKeyVideoSegmentPositions[1:]
}
}
func (m *mergeWritingBuffer) grow() {
pools := make([]mwBlock, cap(m.mwBlocks)*3/2)
for i := 0; i < cap(m.mwBlocks); i++ {
pools[i] = m.mwBlocks[i]
func (m *mergeWritingBuffer) TryAlloc(size int, ts int64, videoPkt, videoKey bool) ([]byte, bool) {
length := len(m.buffers)
if length < 1 {
m.grow(size)
}
m.mwBlocks = pools
bytes := m.buffers[m.index].buffer.AvailableBytes()
if bytes < size {
// 非完整切片,先保存切片再分配新的内存
if m.buffers[m.index].buffer.PendingBlockSize() > 0 {
return nil, false
}
// 还未遇到2组GOP, 不能释放旧的内存池, 创建新的内存池
// 其他情况, 调用tryAlloc, 手动申请内存
if m.existVideo && AppConfig.GOPCache && len(m.completedKeyVideoSegmentPositions) < 2 {
m.grow(size)
}
// 即将使用下一个内存池, 清空上次创建的切片
for m.buffers[m.index].prevSegments.Size() > 0 {
m.RemoveSegment()
}
// 使用下一块内存, 或者从头覆盖
if m.index+1 < len(m.buffers) {
m.index++
} else {
m.index = 0
}
// 复用内存池, 将未清空完的上上次创建的切片放在尾部
//for m.buffers[m.index].prevSegments.Size() > 0 {
// m.buffers[m.index].segments.Push(m.buffers[m.index].prevSegments.Pop())
//}
// 复用内存池, 清空上上次创建的切片
//for m.buffers[m.index].prevSegments.Size() > 0 {
// m.RemoveSegment()
//}
// 复用内存池, 保留上次内存池创建的切片
m.buffers[m.index].nextSegmentDataSize = 0
m.buffers[m.index].preSegmentsDataSize = 0
m.buffers[m.index].preSegmentCount = m.buffers[m.index].segments.Size()
m.buffers[m.index].buffer.Clear()
if m.buffers[m.index].preSegmentCount > 0 {
m.buffers[m.index].prevSegments.Clear()
tmp := m.buffers[m.index].prevSegments
m.buffers[m.index].prevSegments = m.buffers[m.index].segments
m.buffers[m.index].segments = tmp
m.RemoveSegment()
}
}
// 复用旧的内存池, 减少计数
if !m.buffers[m.index].prevSegments.IsEmpty() {
totalSize := len(m.buffers[m.index].buffer.(*collections.DirectBlockBuffer).Data()) + size
for !m.buffers[m.index].prevSegments.IsEmpty() && totalSize > m.buffers[m.index].nextSegmentDataSize {
m.RemoveSegment()
}
}
return m.alloc(size, ts, videoPkt, videoKey), true
}
func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte {
if !AppConfig.GOPCache || !m.existVideo {
return m.mwBlocks[0].buffer.Allocate(size)
}
func (m *mergeWritingBuffer) alloc(size int, ts int64, videoPkt, videoKey bool) []byte {
utils.Assert(ts != -1)
bytes := m.buffers[m.index].buffer.AvailableBytes()
// 当前切片必须有足够空间, 否则先调用TryAlloc
utils.Assert(bytes >= size)
// 新的切片
if m.startTS == -1 {
m.startTS = ts
}
if m.mwBlocks[m.index].buffer == nil {
// 创建内存块
m.mwBlocks[m.index] = m.createMWBlock(videoKey)
} else {
// 循环使用
m.mwBlocks[m.index].buffer.Clear()
// 关键帧被覆盖, 减少计数
if m.mwBlocks[m.index].keyVideo {
m.keyFrameCount--
}
}
m.mwBlocks[m.index].free = false
m.mwBlocks[m.index].completed = false
m.mwBlocks[m.index].keyVideo = videoKey
m.mwBlocks[m.index].Time = ts
if !m.hasVideoDataInCurrentSegment && videoPkt {
m.hasVideoDataInCurrentSegment = true
}
if videoKey {
// 请务必确保关键帧帧从新的切片开始
// 外部遇到关键帧请先调用FlushSegment
utils.Assert(m.mwBlocks[m.index].buffer.IsEmpty())
//m.lastKeyFrameIndex = m.index
//m.keyFrameCount++
m.hasKeyVideoDataInCurrentSegment = true
}
if ts < m.startTS {
@@ -117,70 +217,43 @@ func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte
}
m.duration = int(ts - m.startTS)
return m.mwBlocks[m.index].buffer.Allocate(size)
return m.buffers[m.index].buffer.Alloc(size)
}
func (m *mergeWritingBuffer) FlushSegment() ([]byte, bool) {
if !AppConfig.GOPCache || !m.existVideo {
return nil, false
} else if m.mwBlocks[m.index].buffer == nil || m.mwBlocks[m.index].free {
return nil, false
}
data, _ := m.mwBlocks[m.index].buffer.Data()
data := m.buffers[m.index].buffer.Feat()
if len(data) == 0 {
return nil, false
}
key := m.mwBlocks[m.index].keyVideo
m.segmentCount++
key := m.hasKeyVideoDataInCurrentSegment
m.hasKeyVideoDataInCurrentSegment = false
if key {
m.lastKeyFrameIndex = m.index
m.keyFrameCount++
m.completedKeyVideoSegmentPositions = append(m.completedKeyVideoSegmentPositions, int64(m.index<<32|m.buffers[m.index].segments.Size()))
}
// 计算最大切片数据长度,后续创建新切片按照最大长度分配内存空间
if m.lastKeyFrameIndex == m.index && m.keyFrameBufferMaxLength < len(data) {
m.keyFrameBufferMaxLength = len(data) * 3 / 2
} else if m.lastKeyFrameIndex != m.index && m.nonKeyFrameBufferMaxLength < len(data) {
m.nonKeyFrameBufferMaxLength = len(data) * 3 / 2
}
// 设置当前切片的完整性
m.mwBlocks[m.index].completed = true
// 分配下一个切片
capacity := cap(m.mwBlocks)
if m.index+1 == capacity && m.keyFrameCount == 1 {
m.grow()
capacity = cap(m.mwBlocks)
}
// 计算下一个切片索引
m.index = (m.index + 1) % capacity
m.buffers[m.index].segments.Push(struct {
data []byte
key bool
}{data: data, key: key})
// 清空下一个切片的标记
m.startTS = -1
m.duration = 0
m.mwBlocks[m.index].free = true
m.mwBlocks[m.index].completed = false
m.hasVideoDataInCurrentSegment = false
return data, key
}
func (m *mergeWritingBuffer) PeekCompletedSegment() ([]byte, bool) {
if !AppConfig.GOPCache || !m.existVideo {
data, _ := m.mwBlocks[0].buffer.Data()
m.mwBlocks[0].buffer.Clear()
return data, false
func (m *mergeWritingBuffer) TryFlushSegment() ([]byte, bool) {
if (!AppConfig.GOPCache || !m.existVideo) || m.duration >= AppConfig.MergeWriteLatency {
return m.FlushSegment()
}
if m.duration < AppConfig.MergeWriteLatency {
return nil, false
}
return m.FlushSegment()
return nil, false
}
func (m *mergeWritingBuffer) IsFull(ts int64) bool {
func (m *mergeWritingBuffer) ShouldFlush(ts int64) bool {
if m.startTS == -1 {
return false
}
@@ -189,67 +262,55 @@ func (m *mergeWritingBuffer) IsFull(ts int64) bool {
}
func (m *mergeWritingBuffer) IsNewSegment() bool {
return m.mwBlocks[m.index].buffer == nil || m.mwBlocks[m.index].free
return m.buffers == nil || m.buffers[m.index].buffer.PendingBlockSize() == 0
}
func (m *mergeWritingBuffer) Reserve(length int) {
utils.Assert(m.mwBlocks[m.index].buffer != nil)
_ = m.mwBlocks[m.index].buffer.Allocate(length)
func (m *mergeWritingBuffer) Reserve(size int) {
_ = m.buffers[m.index].buffer.Alloc(size)
}
func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) {
if m.keyFrameCount == 0 {
if !AppConfig.GOPCache || !m.existVideo || len(m.completedKeyVideoSegmentPositions) < 1 {
return
}
ranges := [2][2]int{{-1, -1}, {-1, -1}}
if m.lastKeyFrameIndex <= m.index {
ranges[0][0] = m.lastKeyFrameIndex
ranges[0][1] = m.index + 1
} else {
// 回环, 先遍历后面和前面的数据
ranges[0][0] = m.lastKeyFrameIndex
ranges[0][1] = cap(m.mwBlocks)
marker := m.completedKeyVideoSegmentPositions[len(m.completedKeyVideoSegmentPositions)-1]
bufferIndex := int(marker >> 32 & 0xFFFFFFFF)
position := int(marker & 0xFFFFFFFF)
ranges[1][0] = 0
ranges[1][1] = m.index + 1
var ranges [][2]int
// 回环
if m.index < bufferIndex {
ranges = append(ranges, [2]int{bufferIndex, len(m.buffers) - 1})
ranges = append(ranges, [2]int{0, m.index})
} else {
ranges = append(ranges, [2]int{bufferIndex, m.index})
}
for _, index := range ranges {
for i := index[0]; i > -1 && i < index[1]; i++ {
if m.mwBlocks[i].buffer == nil || !m.mwBlocks[i].completed {
break
for _, ints := range ranges {
for i := ints[0]; i <= ints[1]; i++ {
for j := position; j < m.buffers[i].segments.Size(); j++ {
cb(m.buffers[i].segments.Peek(j).data)
}
data, _ := m.mwBlocks[i].buffer.Data()
cb(data)
// 后续的切片, 从0开始
position = 0
}
}
}
func (m *mergeWritingBuffer) Capacity() int {
return cap(m.mwBlocks)
return m.segmentCount
}
func (m *mergeWritingBuffer) HasVideoDataInCurrentSegment() bool {
return m.hasVideoDataInCurrentSegment
}
func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer {
var blocks []mwBlock
if existVideo {
blocks = make([]mwBlock, DefaultMBBufferSize)
} else {
blocks = make([]mwBlock, 1)
}
if !existVideo || !AppConfig.GOPCache {
blocks[0] = mwBlock{true, false, collections.NewDirectMemoryPool(1024 * 100), false, 0}
}
return &mergeWritingBuffer{
keyFrameBufferMaxLength: AppConfig.MergeWriteLatency * 1024 * 2,
nonKeyFrameBufferMaxLength: AppConfig.MergeWriteLatency * 1024 / 2,
mwBlocks: blocks,
startTS: -1,
lastKeyFrameIndex: -1,
existVideo: existVideo,
startTS: -1,
existVideo: existVideo,
}
}

View File

@@ -96,6 +96,8 @@ type Sink interface {
// IsExited 异步发送协程是否退出, 如果还没有退出(write阻塞)不恢复推流
IsExited() bool
PendingSendQueueSize() int
}
type BaseSink struct {
@@ -189,6 +191,10 @@ func (s *BaseSink) Write(index int, data [][]byte, ts int64) error {
return nil
}
func (s *BaseSink) PendingSendQueueSize() int {
return len(s.pendingSendQueue)
}
func (s *BaseSink) GetSourceID() string {
return s.SourceID
}

View File

@@ -321,6 +321,7 @@ func (s *PublishSource) DispatchPacket(transStream TransStream, packet *avformat
func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, videoKey bool) {
sinks := s.TransStreamSinks[transStream.GetID()]
exist := transStream.IsExistVideo()
for _, sink := range sinks {
// 如果存在视频, 确保向sink发送的第一帧是关键帧
@@ -330,35 +331,53 @@ func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data
}
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
s.write(sink, index, extraData, timestamp)
s.write(transStream, sink, index, extraData, timestamp)
}
}
s.write(sink, index, data, timestamp)
s.write(transStream, sink, index, data, timestamp)
}
}
func (s *PublishSource) pendingSink(sink Sink) {
if s.existVideo {
log.Sugar.Errorf("向sink推流超时,挂起%s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID)
// 等待下个关键帧恢复推流
s.PauseStreaming(sink)
} else {
log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID)
go sink.Close()
}
}
// 向sink推流
func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int64) {
func (s *PublishSource) write(transStream TransStream, sink Sink, index int, data [][]byte, timestamp int64) {
err := sink.Write(index, data, timestamp)
if err == nil {
sink.IncreaseSentPacketCount()
ok := err == nil
defer func() {
if ok {
sink.IncreaseSentPacketCount()
}
}()
// 跳过非TCP流和待发送包数量小于合并写缓冲区大小的sink
if !transStream.IsTCPStreaming() || sink.PendingSendQueueSize() <= transStream.Capacity() {
return
}
// 尝试扩容合并写缓冲区, 不能扩容, 则挂起Sink
if !transStream.GrowMWBuffer() {
ok = false
s.pendingSink(sink)
}
// 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞.
// 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流.
_, ok := err.(*transport.ZeroWindowSizeError)
if ok {
if s.existVideo {
log.Sugar.Errorf("向sink推流超时,挂起%s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID)
// 等待下个关键帧恢复推流
s.PauseStreaming(sink)
} else {
log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID)
go sink.Close()
}
}
//_, ok := err.(*transport.ZeroWindowSizeError)
//if ok {
// s.pendingSink(sink)
//}
}
func (s *PublishSource) PauseStreaming(sink Sink) {
@@ -475,9 +494,8 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
_, ok := sink.GetConn().(*transport.Conn)
if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
length := transStream.OutStreamBufferCapacity() - 2
sink.EnableAsyncWriteMode(length)
if ok && sink.IsTCPStreaming() {
sink.EnableAsyncWriteMode(64)
}
// 发送已有的缓存数据
@@ -485,10 +503,10 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
data, timestamp, _ := transStream.ReadKeyFrameBuffer()
if len(data) > 0 {
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
s.write(sink, 0, extraData, timestamp)
s.write(transStream, sink, 0, extraData, timestamp)
}
s.write(sink, 0, data, timestamp)
s.write(transStream, sink, 0, data, timestamp)
}
// 新建传输流,发送已经缓存的音视频帧

View File

@@ -43,10 +43,14 @@ type TransStream interface {
// AppendOutStreamBuffer 添加合并写块到队列
AppendOutStreamBuffer(buffer []byte)
// OutStreamBufferCapacity 返回合并写块队列容量大小, 作为sink异步推流的队列大小;
OutStreamBufferCapacity() int
// Capacity 返回合并写块队列容量大小, 作为sink异步推流的队列大小;
Capacity() int
IsExistVideo() bool
GrowMWBuffer() bool
IsTCPStreaming() bool
}
type BaseTransStream struct {
@@ -112,7 +116,7 @@ func (t *BaseTransStream) AppendOutStreamBuffer(buffer []byte) {
t.OutBufferSize++
}
func (t *BaseTransStream) OutStreamBufferCapacity() int {
func (t *BaseTransStream) Capacity() int {
return 0
}
@@ -136,6 +140,14 @@ func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
return nil, 0, nil
}
func (t *BaseTransStream) GrowMWBuffer() bool {
return false
}
func (t *BaseTransStream) IsTCPStreaming() bool {
return false
}
type TCPTransStream struct {
BaseTransStream
@@ -145,7 +157,16 @@ type TCPTransStream struct {
MWBuffer MergeWritingBuffer //合并写缓冲区, 同时作为用户态的发送缓冲区
}
func (t *TCPTransStream) OutStreamBufferCapacity() int {
func (t *TCPTransStream) Capacity() int {
utils.Assert(t.MWBuffer != nil)
return t.MWBuffer.Capacity()
}
func (t *TCPTransStream) GrowMWBuffer() bool {
utils.Assert(t.MWBuffer != nil)
return t.MWBuffer.TryGrow()
}
func (t *TCPTransStream) IsTCPStreaming() bool {
return true
}