refactor: 合并写规则

This commit is contained in:
ydajiang
2025-03-05 08:55:28 +08:00
parent 91dd7ae83d
commit 4ed0c13add
4 changed files with 30 additions and 36 deletions

View File

@@ -2,7 +2,6 @@
"gop_cache": true,
"gop_buffer_size": 8192000,
"probe_timeout": 2000,
"write_timeout": 5000,
"mw_latency": 350,
"listen_ip" : "0.0.0.0",
"public_ip": "192.168.2.148",

View File

@@ -267,18 +267,16 @@ func init() {
AppConfig = AppConfig_{}
}
// AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写.
// AppConfig_ GOP缓存和合并写开关必须保持一致,同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写.
type AppConfig_ struct {
GOPCache bool `json:"gop_cache"` // 是否开启GOP缓存只缓存一组音视频
GOPBufferSize int `json:"gop_buffer_size"` // 预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
ProbeTimeout int `json:"probe_timeout"` // 收流解析AVStream的超时时间
WriteTimeout int `json:"write_timeout"` // Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本.
WriteBufferCapacity int `json:"-"` // 发送缓冲区容量大小, 缓冲区由多个内存块构成.
PublicIP string `json:"public_ip"`
ListenIP string `json:"listen_ip"`
IdleTimeout int64 `json:"idle_timeout"` // 多长时间(单位秒)没有拉流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
ReceiveTimeout int64 `json:"receive_timeout"` // 多长时间(单位秒)没有收到流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
Debug bool `json:"debug"` // debug模式, 开启将保存推流
GOPCache bool `json:"gop_cache"` // 是否开启GOP缓存只缓存一组音视频
GOPBufferSize int `json:"gop_buffer_size"` // 预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
ProbeTimeout int `json:"probe_timeout"` // 收流解析AVStream的超时时间
PublicIP string `json:"public_ip"`
ListenIP string `json:"listen_ip"`
IdleTimeout int64 `json:"idle_timeout"` // 多长时间(单位秒)没有拉流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
ReceiveTimeout int64 `json:"receive_timeout"` // 多长时间(单位秒)没有收到流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
Debug bool `json:"debug"` // debug模式, 开启将保存推流
//缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例.
@@ -321,8 +319,6 @@ func SetDefaultConfig(config *AppConfig_) {
config.GOPBufferSize = limitInt(4096*1024/8, 2048*1024*10, config.GOPBufferSize) // 最低4M码率 最高160M码率
config.MergeWriteLatency = limitInt(350, 2000, config.MergeWriteLatency) // 最低缓存350毫秒数据才发送 最高缓存2秒数据才发送
config.ProbeTimeout = limitInt(2000, 5000, config.MergeWriteLatency) // 2-5秒内必须解析完AVStream
config.WriteTimeout = limitInt(2000, 10000, config.WriteTimeout)
config.WriteBufferCapacity = config.WriteTimeout/config.MergeWriteLatency + 1
config.Log.Level = limitInt(int(zapcore.DebugLevel), int(zapcore.FatalLevel), config.Log.Level)
config.Log.MaxSize = limitMin(1, config.Log.MaxSize)

View File

@@ -31,22 +31,18 @@ type streamBuffer struct {
discardHandler func(packet utils.AVPacket)
}
func NewStreamBuffer() GOPBuffer {
return &streamBuffer{buffer: collections.NewRingBuffer(1000), existVideoKeyFrame: false}
}
func (s *streamBuffer) AddPacket(packet utils.AVPacket) bool {
//缓存满,清空
// 缓存满,清空
if s.Size()+1 == s.buffer.Capacity() {
s.Clear()
}
//丢弃首帧视频非关键帧
// 丢弃首帧视频非关键帧
if utils.AVMediaTypeVideo == packet.MediaType() && !s.existVideoKeyFrame && !packet.KeyFrame() {
return false
}
//丢弃前一组GOP
// 丢弃前一组GOP
videoKeyFrame := utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame()
if videoKeyFrame {
if s.existVideoKeyFrame {
@@ -90,18 +86,16 @@ func (s *streamBuffer) Peek(index int) utils.AVPacket {
func (s *streamBuffer) PeekAll(handler func(packet utils.AVPacket)) {
head, tail := s.buffer.Data()
if head == nil {
return
}
for _, value := range head {
handler(value.(utils.AVPacket))
if head != nil {
for _, value := range head {
handler(value.(utils.AVPacket))
}
}
if tail == nil {
return
}
for _, value := range tail {
handler(value.(utils.AVPacket))
if tail != nil {
for _, value := range tail {
handler(value.(utils.AVPacket))
}
}
}
@@ -116,3 +110,7 @@ func (s *streamBuffer) Clear() {
func (s *streamBuffer) Close() {
s.discardHandler = nil
}
func NewStreamBuffer() GOPBuffer {
return &streamBuffer{buffer: collections.NewRingBuffer(1000), existVideoKeyFrame: false}
}

View File

@@ -5,13 +5,16 @@ import (
"github.com/lkmio/avformat/utils"
)
const (
DefaultMBBufferSize = 20
)
// MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存
// 包含多个合并写块, 循环使用, 至少需要等到第二个I帧才开始循环. webrtcI帧间隔可能会高达几十秒,
// 容量根据write_timeout发送超时和合并写时间来计算, write_timeout/mw_latency.如果I帧间隔大于发送超时时间, 则需要创建新的块.
type MergeWritingBuffer interface {
Allocate(size int, ts int64, videoKey bool) []byte
// PeekCompletedSegment 返回当前完整切片, 以及是否是关键帧切片, 未满返回nil.
// PeekCompletedSegment 返回当前完整切片, 以及是否是关键帧切片, 非完整切片返回nil.
PeekCompletedSegment() ([]byte, bool)
// FlushSegment 生成并返回当前切片, 以及是否是关键帧切片.
@@ -42,7 +45,6 @@ type mwBlock struct {
type mergeWritingBuffer struct {
mwBlocks []mwBlock
index int // 当前切片位于mwBlocks的索引
startTS int64 // 当前切片的开始时间
duration int // 当前切片时长
@@ -231,10 +233,9 @@ func (m *mergeWritingBuffer) Capacity() int {
}
func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer {
// 开启GOP缓存, 输出流也缓存整个GOP
var blocks []mwBlock
if existVideo {
blocks = make([]mwBlock, AppConfig.WriteBufferCapacity)
blocks = make([]mwBlock, DefaultMBBufferSize)
} else {
blocks = make([]mwBlock, 1)
}