重构缓存

This commit is contained in:
yangjiechina
2023-12-07 18:06:58 +08:00
parent fb7c6ac316
commit a849b496a2
7 changed files with 112 additions and 66 deletions

View File

@@ -1,6 +1,7 @@
{
"gop_cache": 0,
"gop_cache": true,
"probe_timeout": 2000,
"mw_latency": 350,
"rtmp": {
"enable": true,

View File

@@ -30,10 +30,10 @@ func NewPublisher(sourceId string, stack *librtmp.Stack) *Publisher {
func (p *Publisher) Init() {
//创建内存池
p.audioMemoryPool = stream.NewMemoryPool(48000 * (stream.AppConfig.GOPCache + 1))
if stream.AppConfig.GOPCache > 0 {
p.audioMemoryPool = stream.NewMemoryPool(48000 * 1)
if stream.AppConfig.GOPCache {
//以每秒钟4M码率大小创建内存池
p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8 * stream.AppConfig.GOPCache)
p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000)
} else {
p.videoMemoryPool = stream.NewMemoryPool(4096 * 1000 / 8)
}
@@ -77,7 +77,7 @@ func (p *Publisher) OnDeMuxStream(stream_ utils.AVStream) {
func (p *Publisher) OnDeMuxPacket(packet utils.AVPacket) {
p.SourceImpl.OnDeMuxPacket(packet)
if stream.AppConfig.GOPCache > 0 {
if stream.AppConfig.GOPCache {
return
}

View File

@@ -27,6 +27,7 @@ var nextOffset int
func (t *TransStream) Input(packet utils.AVPacket) {
utils.Assert(t.TransStreamImpl.Completed)
var data []byte
var chunk *librtmp.Chunk
var videoPkt bool
@@ -47,6 +48,13 @@ func (t *TransStream) Input(packet utils.AVPacket) {
payloadSize += 5 + length
}
//即不开启GOP缓存又不合并发送. 直接使用AVPacket的预留头封装发送
if !stream.AppConfig.GOPCache && stream.AppConfig.MergeWriteLatency < 1 {
//首帧视频帧必须要
} else {
}
//payloadSize += payloadSize / t.chunkSize
//分配内存
t.memoryPool.Mark()
@@ -95,12 +103,12 @@ func (t *TransStream) Input(packet utils.AVPacket) {
rtmpData := t.memoryPool.Fetch()[:n]
ret := true
if stream.AppConfig.GOPCache > 0 {
if stream.AppConfig.GOPCache {
//ret = t.transBuffer.AddPacket(rtmpData, packet.KeyFrame() && videoPkt, packet.Dts())
ret = t.transBuffer.AddPacket(packet, packet.KeyFrame() && videoPkt, packet.Dts())
}
if !ret || stream.AppConfig.GOPCache < 1 {
if !ret || stream.AppConfig.GOPCache {
t.memoryPool.FreeTail()
}
@@ -197,7 +205,13 @@ func (t *TransStream) AddSink(sink stream.ISink) {
utils.Assert(t.headerSize > 0)
sink.Input(t.header[:t.headerSize])
if !stream.AppConfig.GOPCache {
return
}
//开启GOP缓存的情况下
//开启合并写的情况下:
// 如果合并写大小每满一次
// if stream.AppConfig.GOPCache > 0 {
// t.transBuffer.PeekAll(func(packet interface{}) {
// sink.Input(packet.([]byte))
@@ -238,9 +252,9 @@ func (t *TransStream) WriteHeader() error {
t.TransStreamImpl.Completed = true
t.header = make([]byte, 1024)
t.muxer = libflv.NewMuxer(audioCodecId, videoCodecId, 0, 0, 0)
t.memoryPool = stream.NewMemoryPoolWithRecopy(1024 * 1000 * (stream.AppConfig.GOPCache + 1))
if stream.AppConfig.GOPCache > 0 {
t.transBuffer = stream.NewStreamBuffer(int64(stream.AppConfig.GOPCache * 200))
t.memoryPool = stream.NewMemoryPoolWithRecopy(1024 * 4000)
if stream.AppConfig.GOPCache {
t.transBuffer = stream.NewStreamBuffer(200)
t.transBuffer.SetDiscardHandler(t.onDiscardPacket)
}

View File

@@ -1,5 +1,9 @@
package stream
const (
DefaultMergeWriteLatency = 350
)
type RtmpConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
@@ -50,9 +54,11 @@ func init() {
AppConfig = AppConfig_{}
}
// AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写.
type AppConfig_ struct {
GOPCache int `json:"gop_cache"` //缓存GOP个数不是时长
ProbeTimeout int `json:"probe_timeout"`
Rtmp RtmpConfig
Hook HookConfig
GOPCache bool `json:"gop_cache"` //是否开启GOP缓存只缓存一组音视频
ProbeTimeout int `json:"probe_timeout"`
MergeWriteLatency int `json:"mw_latency"` //缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互,大幅提升性能.
Rtmp RtmpConfig
Hook HookConfig
}

View File

@@ -13,6 +13,10 @@ type MemoryPool interface {
Write(data []byte)
// Reserve 保留指定大小的内存空间
//主要是为了和实现和Write相似功能但是不拷贝, 所以使用流程和Write一样.
Reserve(size int)
Allocate(size int) []byte
Fetch() []byte
@@ -117,6 +121,11 @@ func (m *memoryPool) Write(data []byte) {
copy(allocate, data)
}
func (m *memoryPool) Reserve(size int) {
utils.Assert(m.mark)
_ = m.allocate(size)
}
func (m *memoryPool) Allocate(size int) []byte {
utils.Assert(m.mark)
return m.allocate(size)

View File

@@ -66,11 +66,16 @@ type SinkImpl struct {
State_ SessionState
TransStreamId_ TransStreamId
disableVideo bool
//Sink在请求拉流->Source推流->Sink断开整个阶段 是无锁线程安全
//如果Sink在等待队列-Sink断开这个过程是非线程安全的
//SetState的时候如果closed为true返回false, 调用者自行删除sink
closed atomic.Bool
//HasSentKeyVideo 是否已经发送视频关键帧
//未开启GOP缓存的情况下为避免播放花屏发送的首个视频帧必须为关键帧
HasSentKeyVideo bool
DesiredAudioCodecId_ utils.AVCodecID
DesiredVideoCodecId_ utils.AVCodecID

View File

@@ -10,14 +10,18 @@ import (
"github.com/yangjiechina/live-server/transcode"
)
// SourceType Source 推流类型
// SourceType 推流类型
type SourceType byte
// Protocol 输出协议
// Protocol 输出的流协议
type Protocol uint32
type SourceEvent byte
// SessionState 推拉流Session的状态
// 包含握手和Hook授权阶段
type SessionState uint32
const (
SourceTypeRtmp = SourceType(1)
SourceType28181 = SourceType(2)
@@ -35,11 +39,12 @@ const (
SourceEventPlayDone = SourceEvent(2)
SourceEventInput = SourceEvent(3)
SourceEventClose = SourceEvent(4)
)
// SessionState 推拉流Session状态
// 包含, 握手阶段、Hook授权.
type SessionState uint32
// TransMuxerHeaderMaxSize 传输流协议头的最大长度
// 在解析流分配AVPacket的Data时, 如果没有开启合并写, 提前预留指定长度的字节数量.
// 在封装传输流时, 直接在预留头中添加对应传输流的协议头,减少或免内存拷贝. 在传输flv以及转换AVCC和AnnexB格式时有显著提升.
TransMuxerHeaderMaxSize = 30
)
const (
SessionStateCreate = SessionState(1)
@@ -167,6 +172,53 @@ func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID)
return true
}
// 分发每路Stream的Buffer给传输流
// 按照时间戳升序发送
func (s *SourceImpl) dispatchStreamBuffer(transStream ITransStream, streams []utils.AVStream) {
size := len(streams)
indexs := make([]int, size)
for {
min := int64(0xFFFFFFFF)
//找出最小的时间戳
for index, stream := range streams[:size] {
if s.buffers[stream.Index()].Size() == indexs[index] {
continue
}
pkt := s.buffers[stream.Index()].Peek(indexs[index]).(utils.AVPacket)
v := pkt.Dts()
if min == 0xFFFFFFFF {
min = v
} else if v < min {
min = v
}
}
if min == 0xFFFFFFFF {
break
}
for index, stream := range streams[:size] {
buffer := s.buffers[stream.Index()]
if buffer.Size() == indexs[index] {
continue
}
for i := indexs[index]; i < buffer.Size(); i++ {
packet := buffer.Peek(i).(utils.AVPacket)
if packet.Dts() > min {
break
}
transStream.Input(packet)
indexs[index]++
}
}
}
}
func (s *SourceImpl) AddSink(sink ISink) bool {
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
@@ -240,49 +292,8 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
return false
}
if AppConfig.GOPCache > 0 && !ok {
indexs := make([]int, size)
for {
min := int64(0xFFFFFFFF)
for index, stream := range streams[:size] {
size := s.buffers[stream.Index()].Size()
if size == indexs[index] {
continue
}
pkt := s.buffers[stream.Index()].Peek(indexs[index]).(utils.AVPacket)
v := pkt.Dts()
if min == 0xFFFFFFFF {
min = v
} else if v < min {
min = v
}
}
if min == 0xFFFFFFFF {
break
}
for index, stream := range streams[:size] {
buffer := s.buffers[stream.Index()]
size := buffer.Size()
if size == indexs[index] {
continue
}
for i := indexs[index]; i < buffer.Size(); i++ {
packet := buffer.Peek(i).(utils.AVPacket)
if packet.Dts() > min {
break
}
transStream.Input(packet)
indexs[index]++
}
}
}
if AppConfig.GOPCache && !ok {
s.dispatchStreamBuffer(transStream, streams[:size])
}
return false
@@ -357,8 +368,8 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) (bool, StreamBuffer) {
}
//为每个Stream创建对应的Buffer
if AppConfig.GOPCache > 0 {
buffer := NewStreamBuffer(int64(AppConfig.GOPCache * 1000))
if AppConfig.GOPCache {
buffer := NewStreamBuffer(200)
//OnDeMuxStream的调用顺序就是AVStream和AVPacket的Index的递增顺序
s.buffers = append(s.buffers, buffer)
return true, buffer
@@ -393,7 +404,7 @@ func (s *SourceImpl) OnDeMuxStreamDone() {
}
func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) {
if AppConfig.GOPCache > 0 {
if AppConfig.GOPCache {
buffer := s.buffers[packet.Index()]
buffer.AddPacket(packet, packet.KeyFrame(), packet.Dts())
}