优化GOP缓存和TransStream缓存

This commit is contained in:
yangjiechina
2024-06-05 20:08:26 +08:00
parent de6ff69f15
commit e3a2af4446
11 changed files with 400 additions and 380 deletions

View File

@@ -1,5 +1,6 @@
{ {
"gop_cache": true, "gop_cache": true,
"gop_buffer_size": 8192000,
"probe_timeout": 2000, "probe_timeout": 2000,
"mw_latency": 350, "mw_latency": 350,
"public_ip": "192.168.2.148", "public_ip": "192.168.2.148",

View File

@@ -28,22 +28,13 @@ func init() {
} }
type httpTransStream struct { type httpTransStream struct {
stream.CacheTransStream stream.TransStreamImpl
muxer libflv.Muxer
header []byte
headerSize int
}
func NewHttpTransStream() stream.ITransStream { muxer libflv.Muxer
return &httpTransStream{ mwBuffer stream.MergeWritingBuffer
muxer: libflv.NewMuxer(), header []byte
header: make([]byte, 1024), headerSize int
headerSize: HttpFlvBlockLengthSize, headerTagSize int
}
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
return NewHttpTransStream(), nil
} }
func (t *httpTransStream) Input(packet utils.AVPacket) error { func (t *httpTransStream) Input(packet utils.AVPacket) error {
@@ -71,49 +62,41 @@ func (t *httpTransStream) Input(packet utils.AVPacket) error {
videoKey = packet.KeyFrame() videoKey = packet.KeyFrame()
} }
if videoKey { //发送剩余数据
head, _ := t.StreamBuffers[0].Data() if videoKey && !t.mwBuffer.IsEmpty() {
if len(head) > t.SegmentOffset { t.mwBuffer.Reserve(2)
//分配末尾换行符 segment := t.mwBuffer.PopSegment()
t.StreamBuffers[0].Allocate(2) t.sendUnpackedSegment(segment)
head, _ = t.StreamBuffers[0].Data()
t.writeSeparator(head[t.SegmentOffset:])
skip := t.computeSikCount(head[t.SegmentOffset:])
t.SendPacketWithOffset(head, t.SegmentOffset+skip)
}
t.SwapStreamBuffer()
} }
var n int var n int
var separatorSize int var separatorSize int
full := t.Full(dts)
if head, _ := t.StreamBuffers[0].Data(); t.SegmentOffset == len(head) { //新的合并写切片, 预留包长字节
if t.mwBuffer.IsCompeted() {
separatorSize = HttpFlvBlockLengthSize separatorSize = HttpFlvBlockLengthSize
//10字节描述flv包长, 前2个字节描述无效字节长度 //10字节描述flv包长, 前2个字节描述无效字节长度
n = HttpFlvBlockLengthSize n = HttpFlvBlockLengthSize
} }
if full {
separatorSize = 2 //结束时, 预留换行符
if t.mwBuffer.IsFull(dts) {
separatorSize += 2
} }
allocate := t.StreamBuffers[0].Allocate(separatorSize + flvSize) //分配flv block
n += t.muxer.Input(allocate[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false) bytes := t.mwBuffer.Allocate(separatorSize + flvSize)
copy(allocate[n:], data) n += t.muxer.Input(bytes[n:], packet.MediaType(), len(data), dts, pts, packet.KeyFrame(), false)
if !full { copy(bytes[n:], data)
return nil
}
head, _ := t.StreamBuffers[0].Data()
//添加长度和换行符 //添加长度和换行符
//每一个合并写切片开始和预留长度所需的字节数 //每一个合并写切片开始和预留长度所需的字节数
//合并写切片末尾加上换行符 //合并写切片末尾加上换行符
//长度是16进制字符串 //长度是16进制字符串
t.writeSeparator(head[t.SegmentOffset:]) segment := t.mwBuffer.PeekCompletedSegment(dts)
if len(segment) > 0 {
skip := t.computeSikCount(head[t.SegmentOffset:]) t.sendUnpackedSegment(segment)
t.SendPacketWithOffset(head, t.SegmentOffset+skip) }
return nil return nil
} }
@@ -133,11 +116,19 @@ func (t *httpTransStream) AddTrack(stream utils.AVStream) error {
return nil return nil
} }
func (t *httpTransStream) sendBuffer(sink stream.ISink, data []byte) error { // 发送还未添加包长和换行符的切片
return sink.Input(data[t.computeSikCount(data):]) func (t *httpTransStream) sendUnpackedSegment(segment []byte) {
t.writeSeparator(segment)
skip := t.computeSkipCount(segment)
t.SendPacket(segment[skip:])
} }
func (t *httpTransStream) computeSikCount(data []byte) int { // 为单个sink发送flv切片, 切片已经添加分隔符
func (t *httpTransStream) sendSegment(sink stream.ISink, data []byte) error {
return sink.Input(data[t.computeSkipCount(data):])
}
func (t *httpTransStream) computeSkipCount(data []byte) int {
return int(6 + binary.BigEndian.Uint16(data[4:])) return int(6 + binary.BigEndian.Uint16(data[4:]))
} }
@@ -146,31 +137,21 @@ func (t *httpTransStream) AddSink(sink stream.ISink) error {
t.TransStreamImpl.AddSink(sink) t.TransStreamImpl.AddSink(sink)
//发送sequence header //发送sequence header
t.sendBuffer(sink, t.header[:t.headerSize]) t.sendSegment(sink, t.header[:t.headerSize])
send := func(sink stream.ISink, data []byte) {
var index int
for ; index < len(data); index += 4 {
size := binary.BigEndian.Uint32(data[index:])
t.sendBuffer(sink, data[index:index+4+int(size)])
index += int(size)
}
}
//发送当前内存池已有的合并写切片 //发送当前内存池已有的合并写切片
if t.SegmentOffset > 0 { segmentList := t.mwBuffer.SegmentList()
data, _ := t.StreamBuffers[0].Data() if len(segmentList) > 0 {
utils.Assert(len(data) > 0) //修改第一个flv tag的pre tag size
send(sink, data[:t.SegmentOffset]) binary.BigEndian.PutUint32(segmentList[20:], uint32(t.headerTagSize))
return nil
}
//发送上一组GOP //遍历发送合并写切片
if t.StreamBuffers[1] != nil && !t.StreamBuffers[1].Empty() { var index int
data, _ := t.StreamBuffers[1].Data() for ; index < len(segmentList); index += 4 {
utils.Assert(len(data) > 0) size := binary.BigEndian.Uint32(segmentList[index:])
send(sink, data) t.sendSegment(sink, segmentList[index:index+4+int(size)])
return nil index += int(size)
}
} }
return nil return nil
@@ -217,13 +198,39 @@ func (t *httpTransStream) WriteHeader() error {
data = track.CodecParameters().DecoderConfRecord().ToMP4VC() data = track.CodecParameters().DecoderConfRecord().ToMP4VC()
} }
t.headerSize += t.muxer.Input(t.header[t.headerSize:], track.Type(), len(data), 0, 0, false, true) n := t.muxer.Input(t.header[t.headerSize:], track.Type(), len(data), 0, 0, false, true)
t.headerSize += n
copy(t.header[t.headerSize:], data) copy(t.header[t.headerSize:], data)
t.headerSize += len(data) t.headerSize += len(data)
t.headerTagSize = n - 15 + len(data) + 11
} }
//将结尾换行符计算在内 //将结尾换行符计算在内
t.headerSize += 2 t.headerSize += 2
t.writeSeparator(t.header[:t.headerSize]) t.writeSeparator(t.header[:t.headerSize])
t.mwBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
return nil return nil
} }
func (t *httpTransStream) Close() error {
//发送剩余的流
segment := t.mwBuffer.PopSegment()
if len(segment) > 0 {
t.sendUnpackedSegment(segment)
}
return nil
}
func NewHttpTransStream() stream.ITransStream {
return &httpTransStream{
muxer: libflv.NewMuxer(),
header: make([]byte, 1024),
headerSize: HttpFlvBlockLengthSize,
}
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
return NewHttpTransStream(), nil
}

View File

@@ -22,6 +22,7 @@ import (
func NewDefaultAppConfig() stream.AppConfig_ { func NewDefaultAppConfig() stream.AppConfig_ {
return stream.AppConfig_{ return stream.AppConfig_{
GOPCache: true, GOPCache: true,
GOPBufferSize: 8196000,
MergeWriteLatency: 350, MergeWriteLatency: 350,
PublicIP: "192.168.2.148", PublicIP: "192.168.2.148",

View File

@@ -8,25 +8,17 @@ import (
) )
type TransStream struct { type TransStream struct {
stream.CacheTransStream stream.TransStreamImpl
chunkSize int chunkSize int
//sequence header header []byte //sequence header
header []byte
headerSize int headerSize int
muxer libflv.Muxer muxer libflv.Muxer
audioChunk librtmp.Chunk audioChunk librtmp.Chunk
videoChunk librtmp.Chunk videoChunk librtmp.Chunk
} mwBuffer stream.MergeWritingBuffer
func NewTransStream(chunkSize int) stream.ITransStream {
transStream := &TransStream{chunkSize: chunkSize}
return transStream
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
return NewTransStream(librtmp.ChunkSize), nil
} }
func (t *TransStream) Input(packet utils.AVPacket) error { func (t *TransStream) Input(packet utils.AVPacket) error {
@@ -72,20 +64,16 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
payloadSize += chunkPayloadOffset + len(data) payloadSize += chunkPayloadOffset + len(data)
} }
//遇到视频关键帧,不考虑合并写大小,发送之前剩余的数据. //遇到视频关键帧, 发送剩余的流
if videoKey { if videoKey {
tmp := t.StreamBuffers[0] segment := t.mwBuffer.PopSegment()
head, _ := tmp.Data() if len(segment) > 0 {
if len(head[t.SegmentOffset:]) > 0 { t.SendPacket(segment)
bytes := head[t.SegmentOffset:]
t.SendPacket(bytes)
//交替使用两块内存
t.SwapStreamBuffer()
} }
} }
//分配内存 //分配内存
allocate := t.StreamBuffers[0].Allocate(chunkHeaderSize + payloadSize + ((payloadSize - 1) / t.chunkSize)) allocate := t.mwBuffer.Allocate(chunkHeaderSize + payloadSize + ((payloadSize - 1) / t.chunkSize))
//写rtmp chunk header //写rtmp chunk header
chunk.Length = payloadSize chunk.Length = payloadSize
@@ -101,14 +89,10 @@ func (t *TransStream) Input(packet utils.AVPacket) error {
n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset) n += chunk.WriteData(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
//未满合并写大小, 不发送 segment := t.mwBuffer.PeekCompletedSegment(dts)
if !t.Full(dts) { if len(segment) > 0 {
return nil t.SendPacket(segment)
} }
head, _ := t.StreamBuffers[0].Data()
//发送合并写数据
t.SendPacketWithOffset(head, t.SegmentOffset)
return nil return nil
} }
@@ -120,18 +104,9 @@ func (t *TransStream) AddSink(sink stream.ISink) error {
sink.Input(t.header[:t.headerSize]) sink.Input(t.header[:t.headerSize])
//发送当前内存池已有的合并写切片 //发送当前内存池已有的合并写切片
if t.SegmentOffset > 0 { segmentList := t.mwBuffer.SegmentList()
data, _ := t.StreamBuffers[0].Data() if len(segmentList) > 0 {
utils.Assert(len(data) > 0) sink.Input(segmentList)
sink.Input(data[:t.SegmentOffset])
return nil
}
//发送上一组GOP
if t.StreamBuffers[1] != nil && !t.StreamBuffers[1].Empty() {
data, _ := t.StreamBuffers[0].Data()
utils.Assert(len(data) > 0)
sink.Input(data)
return nil return nil
} }
@@ -201,5 +176,24 @@ func (t *TransStream) WriteHeader() error {
} }
t.headerSize = n t.headerSize = n
t.mwBuffer = stream.NewMergeWritingBuffer(t.ExistVideo)
return nil return nil
} }
func (t *TransStream) Close() error {
//发送剩余的流
segment := t.mwBuffer.PopSegment()
if len(segment) > 0 {
t.SendPacket(segment)
}
return nil
}
func NewTransStream(chunkSize int) stream.ITransStream {
return &TransStream{chunkSize: chunkSize}
}
func TransStreamFactory(source stream.ISource, protocol stream.Protocol, streams []utils.AVStream) (stream.ITransStream, error) {
return NewTransStream(librtmp.ChunkSize), nil
}

View File

@@ -138,9 +138,10 @@ func init() {
// AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写. // AppConfig_ GOP缓存和合并写必须保持一致同时开启或关闭. 关闭GOP缓存是为了降低延迟很难理解又另外开启合并写.
type AppConfig_ struct { type AppConfig_ struct {
GOPCache bool `json:"gop_cache"` //是否开启GOP缓存只缓存一组音视频 GOPCache bool `json:"gop_cache"` //是否开启GOP缓存只缓存一组音视频
ProbeTimeout int `json:"probe_timeout"` GOPBufferSize int `json:"gop_buffer_size"` //预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
PublicIP string `json:"public_ip"` ProbeTimeout int `json:"probe_timeout"`
PublicIP string `json:"public_ip"`
//缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. //缓存指定时长的包满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
//合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例. //合并写的大小范围应当大于一帧的时长不超过一组GOP的时长在实际发送流的时候也会遵循此条例.

109
stream/gop_buffer.go Normal file
View File

@@ -0,0 +1,109 @@
package stream
import "github.com/yangjiechina/avformat/utils"
// GOPBuffer GOP缓存
type GOPBuffer interface {
// AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败
AddPacket(packet utils.AVPacket) bool
// SetDiscardHandler 设置丢弃帧时的回调
SetDiscardHandler(handler func(packet utils.AVPacket))
PeekAll(handler func(packet utils.AVPacket))
Peek(index int) utils.AVPacket
Size() int
Clear()
}
type streamBuffer struct {
buffer RingBuffer
existVideoKeyFrame bool
discardHandler func(packet utils.AVPacket)
}
func NewStreamBuffer() GOPBuffer {
return &streamBuffer{buffer: NewRingBuffer(1000), existVideoKeyFrame: false}
}
func (s *streamBuffer) AddPacket(packet utils.AVPacket) bool {
//缓存满,清空
if s.Size()+1 == s.buffer.Capacity() {
s.Clear()
}
//丢弃首帧视频非关键帧
if utils.AVMediaTypeVideo == packet.MediaType() && !s.existVideoKeyFrame && !packet.KeyFrame() {
return false
}
//丢弃前一组GOP
videoKeyFrame := utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame()
if videoKeyFrame {
if s.existVideoKeyFrame {
s.discard()
}
s.existVideoKeyFrame = true
}
s.buffer.Push(packet)
return true
}
func (s *streamBuffer) SetDiscardHandler(handler func(packet utils.AVPacket)) {
s.discardHandler = handler
}
func (s *streamBuffer) discard() {
for s.buffer.Size() > 0 {
pkt := s.buffer.Pop()
if s.discardHandler != nil {
s.discardHandler(pkt.(utils.AVPacket))
}
}
s.existVideoKeyFrame = false
}
func (s *streamBuffer) Peek(index int) utils.AVPacket {
utils.Assert(index < s.buffer.Size())
head, tail := s.buffer.Data()
if index < len(head) {
return head[index].(utils.AVPacket)
} else {
return tail[index-len(head)].(utils.AVPacket)
}
}
func (s *streamBuffer) PeekAll(handler func(packet utils.AVPacket)) {
head, tail := s.buffer.Data()
if head == nil {
return
}
for _, value := range head {
handler(value.(utils.AVPacket))
}
if tail == nil {
return
}
for _, value := range tail {
handler(value.(utils.AVPacket))
}
}
func (s *streamBuffer) Size() int {
return s.buffer.Size()
}
func (s *streamBuffer) Clear() {
s.discard()
}

125
stream/mw_buffer.go Normal file
View File

@@ -0,0 +1,125 @@
package stream
// MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存
// 和GOP缓存一样, 也以视频关键帧为界. 遇到视频关键帧, 发送剩余输出流, 清空buffer
type MergeWritingBuffer interface {
Allocate(size int) []byte
// PeekCompletedSegment 返回当前完整合并写切片
PeekCompletedSegment(ts int64) []byte
// PopSegment 返回当前合并写切片, 并清空内存池
PopSegment() []byte
// SegmentList 返回所有完整切片
SegmentList() []byte
IsFull(ts int64) bool
IsCompeted() bool
IsEmpty() bool
Reserve(count int)
}
type mergeWritingBuffer struct {
transStreamBuffer MemoryPool
segmentOffset int //当前合并写包位于memoryPool的开始偏移量
prePacketTS int64 //前一个包的时间戳
}
func (m *mergeWritingBuffer) Allocate(size int) []byte {
return m.transStreamBuffer.Allocate(size)
}
func (m *mergeWritingBuffer) PeekCompletedSegment(ts int64) []byte {
if !AppConfig.GOPCache {
data, _ := m.transStreamBuffer.Data()
m.transStreamBuffer.Clear()
return data
}
if m.prePacketTS == -1 {
m.prePacketTS = ts
}
if ts < m.prePacketTS {
m.prePacketTS = ts
}
if int(ts-m.prePacketTS) < AppConfig.MergeWriteLatency {
return nil
}
head, _ := m.transStreamBuffer.Data()
data := head[m.segmentOffset:]
m.segmentOffset = len(head)
m.prePacketTS = -1
return data
}
func (m *mergeWritingBuffer) IsFull(ts int64) bool {
if m.prePacketTS == -1 {
return false
}
return int(ts-m.prePacketTS) >= AppConfig.MergeWriteLatency
}
func (m *mergeWritingBuffer) IsCompeted() bool {
data, _ := m.transStreamBuffer.Data()
return m.segmentOffset == len(data)
}
func (m *mergeWritingBuffer) IsEmpty() bool {
data, _ := m.transStreamBuffer.Data()
return len(data) <= m.segmentOffset
}
func (m *mergeWritingBuffer) Reserve(count int) {
_ = m.transStreamBuffer.Allocate(count)
}
func (m *mergeWritingBuffer) PopSegment() []byte {
if !AppConfig.GOPCache {
return nil
}
head, _ := m.transStreamBuffer.Data()
data := head[m.segmentOffset:]
m.transStreamBuffer.Clear()
m.segmentOffset = 0
m.prePacketTS = -1
return data
}
func (m *mergeWritingBuffer) SegmentList() []byte {
if !AppConfig.GOPCache {
return nil
}
head, _ := m.transStreamBuffer.Data()
return head[:m.segmentOffset]
}
func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer {
//开启GOP缓存, 输出流也缓存整个GOP
bufferSize := AppConfig.GOPBufferSize
if existVideo && !AppConfig.GOPCache {
bufferSize = 1024 * 1000
} else if !existVideo {
bufferSize = 48000 * 10
}
return &mergeWritingBuffer{
transStreamBuffer: NewDirectMemoryPool(bufferSize),
segmentOffset: 0,
prePacketTS: -1,
}
}

View File

@@ -19,6 +19,8 @@ type RingBuffer interface {
Size() int Size() int
Capacity() int
Data() ([]interface{}, []interface{}) Data() ([]interface{}, []interface{})
Clear() Clear()
@@ -27,20 +29,22 @@ type RingBuffer interface {
func NewRingBuffer(capacity int) RingBuffer { func NewRingBuffer(capacity int) RingBuffer {
utils.Assert(capacity > 0) utils.Assert(capacity > 0)
r := &ringBuffer{ r := &ringBuffer{
data: make([]interface{}, capacity), data: make([]interface{}, capacity),
head: 0, head: 0,
tail: 0, tail: 0,
size: 0, size: 0,
capacity: capacity,
} }
return r return r
} }
type ringBuffer struct { type ringBuffer struct {
data []interface{} data []interface{}
head int head int
tail int tail int
size int size int
capacity int
} }
func (r *ringBuffer) IsEmpty() bool { func (r *ringBuffer) IsEmpty() bool {
@@ -92,6 +96,10 @@ func (r *ringBuffer) Size() int {
return r.size return r.size
} }
func (r *ringBuffer) Capacity() int {
return r.capacity
}
func (r *ringBuffer) Data() ([]interface{}, []interface{}) { func (r *ringBuffer) Data() ([]interface{}, []interface{}) {
if r.size == 0 { if r.size == 0 {
return nil, nil return nil, nil

View File

@@ -61,6 +61,9 @@ type ISource interface {
//@Return bool fatal error.释放Source //@Return bool fatal error.释放Source
Input(data []byte) error Input(data []byte) error
// Type 推流类型
Type() SourceType
// OriginStreams 返回推流的原始Streams // OriginStreams 返回推流的原始Streams
OriginStreams() []utils.AVStream OriginStreams() []utils.AVStream
@@ -83,14 +86,11 @@ type ISource interface {
// 将Sink添加到等待队列 // 将Sink添加到等待队列
Close() Close()
// Type 推流类型
Type() SourceType
// FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池 // FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) MemoryPool
// OnDiscardPacket GOP缓存溢出回调, 释放AVPacket // OnDiscardPacket GOP缓存溢出回调, 释放AVPacket
OnDiscardPacket(pkt interface{}) OnDiscardPacket(pkt utils.AVPacket)
// OnDeMuxStream 解析出AVStream回调 // OnDeMuxStream 解析出AVStream回调
OnDeMuxStream(stream utils.AVStream) OnDeMuxStream(stream utils.AVStream)
@@ -127,14 +127,15 @@ type SourceImpl struct {
videoTranscoders []transcode.ITranscoder //视频解码器 videoTranscoders []transcode.ITranscoder //视频解码器
originStreams StreamManager //推流的音视频Streams originStreams StreamManager //推流的音视频Streams
allStreams StreamManager //推流Streams+转码器获得的Stream allStreams StreamManager //推流Streams+转码器获得的Stream
gopBuffers []StreamBuffer //推流每路的GOP缓存
pktBuffers [8]MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存. pktBuffers [8]MemoryPool //推流每路的AVPacket缓存, AVPacket的data从该内存池中分配. 在GOP缓存溢出时,释放池中内存.
gopBuffer GOPBuffer //GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频
Input_ func(data []byte) error //解决多态无法传递给子类的问题 existVideo bool //是否存在视频
completed bool completed bool
probeTimer *time.Timer probeTimer *time.Timer
Input_ func(data []byte) error //解决多态无法传递给子类的问题
//所有的输出协议, 持有Sink //所有的输出协议, 持有Sink
transStreams map[TransStreamId]ITransStream transStreams map[TransStreamId]ITransStream
@@ -199,11 +200,11 @@ func (s *SourceImpl) FindOrCreatePacketBuffer(index int, mediaType utils.AVMedia
} else if AppConfig.GOPCache { } else if AppConfig.GOPCache {
//开启GOP缓存 //开启GOP缓存
//以每秒钟4M码率大小创建视频内存池 //以每秒钟4M码率大小创建视频内存池
s.pktBuffers[index] = NewRbMemoryPool(4096 * 1024) s.pktBuffers[index] = NewRbMemoryPool(AppConfig.GOPBufferSize)
} else { } else {
//未开启GOP缓存 //未开启GOP缓存
//以每秒钟4M的1/8码率大小创建视频内存池 //以每秒钟4M的1/8码率大小创建视频内存池
s.pktBuffers[index] = NewRbMemoryPool(4096 * 1024 / 8) s.pktBuffers[index] = NewRbMemoryPool(1024 * 1000)
} }
} }
@@ -263,53 +264,6 @@ func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID)
return true return true
} }
// 将GOP缓存发送给TransStream
// 按照时间戳升序发送
func (s *SourceImpl) dispatchStreamBuffer(transStream ITransStream, streams []utils.AVStream) {
size := len(streams)
indexs := make([]int, size)
for {
min := int64(0xFFFFFFFF)
//找出最小的时间戳
for index, stream_ := range streams[:size] {
if s.gopBuffers[stream_.Index()].Size() == indexs[index] {
continue
}
pkt := s.gopBuffers[stream_.Index()].Peek(indexs[index]).(utils.AVPacket)
v := pkt.Dts()
if min == 0xFFFFFFFF {
min = v
} else if v < min {
min = v
}
}
if min == 0xFFFFFFFF {
break
}
for index, stream_ := range streams[:size] {
buffer := s.gopBuffers[stream_.Index()]
if buffer.Size() == indexs[index] {
continue
}
for i := indexs[index]; i < buffer.Size(); i++ {
packet := buffer.Peek(i).(utils.AVPacket)
if packet.Dts() > min {
break
}
transStream.Input(packet)
indexs[index]++
}
}
}
}
func (s *SourceImpl) AddSink(sink ISink) bool { func (s *SourceImpl) AddSink(sink ISink) bool {
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 // 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId() audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
@@ -397,8 +351,10 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
} }
//新的传输流,发送缓存的音视频帧 //新的传输流,发送缓存的音视频帧
if !ok && AppConfig.GOPCache { if !ok && AppConfig.GOPCache && s.existVideo {
s.dispatchStreamBuffer(transStream, streams[:size]) s.gopBuffer.PeekAll(func(packet utils.AVPacket) {
transStream.Input(packet)
})
} }
return true return true
@@ -438,11 +394,18 @@ func (s *SourceImpl) SetState(state SessionState) {
} }
func (s *SourceImpl) Close() { func (s *SourceImpl) Close() {
//释放GOP缓存
if s.gopBuffer != nil {
s.gopBuffer.Clear()
}
//释放解复用器 //释放解复用器
//释放转码器 //释放转码器
//释放每路转协议流, 将所有sink添加到等待队列 //释放每路转协议流, 将所有sink添加到等待队列
_, _ = SourceManager.Remove(s.Id_) _, _ = SourceManager.Remove(s.Id_)
for _, transStream := range s.transStreams { for _, transStream := range s.transStreams {
transStream.Close()
transStream.PopAllSink(func(sink ISink) { transStream.PopAllSink(func(sink ISink) {
sink.SetTransStreamId(0) sink.SetTransStreamId(0)
{ {
@@ -461,8 +424,7 @@ func (s *SourceImpl) Close() {
s.transStreams = nil s.transStreams = nil
} }
func (s *SourceImpl) OnDiscardPacket(pkt interface{}) { func (s *SourceImpl) OnDiscardPacket(packet utils.AVPacket) {
packet := pkt.(utils.AVPacket)
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeHead() s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeHead()
} }
@@ -486,13 +448,15 @@ func (s *SourceImpl) OnDeMuxStream(stream utils.AVStream) {
}) })
} }
if utils.AVMediaTypeVideo == stream.Type() {
s.existVideo = true
}
//为每个Stream创建对应的Buffer //为每个Stream创建对应的Buffer
if AppConfig.GOPCache { if AppConfig.GOPCache && s.existVideo {
buffer := NewStreamBuffer(200) s.gopBuffer = NewStreamBuffer()
//OnDeMuxStream的调用顺序就是AVStream和AVPacket的Index的递增顺序
s.gopBuffers = append(s.gopBuffers, buffer)
//设置GOP缓存溢出回调 //设置GOP缓存溢出回调
buffer.SetDiscardHandler(s.OnDiscardPacket) s.gopBuffer.SetDiscardHandler(s.OnDiscardPacket)
} }
} }
@@ -534,9 +498,8 @@ func (s *SourceImpl) OnDeMuxStreamDone() {
} }
func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) { func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) {
if AppConfig.GOPCache { if AppConfig.GOPCache && s.existVideo {
buffer := s.gopBuffers[packet.Index()] s.gopBuffer.AddPacket(packet)
buffer.AddPacket(packet, packet.KeyFrame(), packet.Dts())
} }
//分发给各个传输流 //分发给各个传输流
@@ -544,12 +507,10 @@ func (s *SourceImpl) OnDeMuxPacket(packet utils.AVPacket) {
stream_.Input(packet) stream_.Input(packet)
} }
if AppConfig.GOPCache { //未开启GOP缓存或只存在音频流, 释放掉内存
return if !AppConfig.GOPCache || !s.existVideo {
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail()
} }
//未开启GOP缓存释放掉内存
s.FindOrCreatePacketBuffer(packet.Index(), packet.MediaType()).FreeTail()
} }
func (s *SourceImpl) OnDeMuxDone() { func (s *SourceImpl) OnDeMuxDone() {

View File

@@ -1,129 +0,0 @@
package stream
import "github.com/yangjiechina/avformat/utils"
// StreamBuffer GOP缓存
type StreamBuffer interface {
// AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败
AddPacket(packet interface{}, key bool, ts int64) bool
// SetDiscardHandler 设置丢弃帧时的回调
SetDiscardHandler(handler func(packet interface{}))
PeekAll(handler func(packet interface{}))
Peek(index int) interface{}
Duration() int64
Size() int
}
type streamBuffer struct {
buffer RingBuffer
duration int64
keyFrameDts int64 //最近一个关键帧的Dts
FarthestKeyFrameDts int64 //最远一个关键帧的Dts
discardHandler func(packet interface{})
}
type element struct {
ts int64
key bool
pkt interface{}
}
func NewStreamBuffer(duration int64) StreamBuffer {
return &streamBuffer{duration: duration, buffer: NewRingBuffer(1000)}
}
func (s *streamBuffer) AddPacket(packet interface{}, key bool, ts int64) bool {
if s.buffer.IsEmpty() {
if !key {
return false
}
s.FarthestKeyFrameDts = ts
}
s.buffer.Push(element{ts, key, packet})
if key {
s.keyFrameDts = ts
}
//丢弃处理
//以最近的关键帧时间戳开始丢弃缓存超过duration长度的帧
//至少需要保障当前GOP完整
//暂时不考虑以下情况:
// 1. 音频收流正常,视频长时间没收流,待视频恢复后。 会造成在此期间,多余的音频帧被丢弃,播放时有画面,没声音.
// 2. 视频反之亦然
if !key {
return true
}
for farthest := s.keyFrameDts - s.duration; s.buffer.Size() > 1 && s.buffer.Head().(element).ts < farthest; {
ele := s.buffer.Pop().(element)
//重新设置最早的关键帧时间戳
if ele.key && ele.ts != s.FarthestKeyFrameDts {
s.FarthestKeyFrameDts = ele.ts
}
if s.discardHandler != nil {
s.discardHandler(ele.pkt)
}
}
return true
}
func (s *streamBuffer) SetDiscardHandler(handler func(packet interface{})) {
s.discardHandler = handler
}
func (s *streamBuffer) Peek(index int) interface{} {
utils.Assert(index < s.buffer.Size())
head, tail := s.buffer.Data()
if index < len(head) {
return head[index].(element).pkt
} else {
return tail[index-len(head)].(element).pkt
}
}
func (s *streamBuffer) PeekAll(handler func(packet interface{})) {
head, tail := s.buffer.Data()
if head == nil {
return
}
for _, value := range head {
handler(value.(element).pkt)
}
if tail == nil {
return
}
for _, value := range tail {
handler(value.(element).pkt)
}
}
func (s *streamBuffer) Duration() int64 {
head := s.buffer.Head()
tail := s.buffer.Tail()
if head == nil || tail == nil {
return 0
}
return tail.(element).ts - head.(element).ts
}
func (s *streamBuffer) Size() int {
return s.buffer.Size()
}

View File

@@ -192,61 +192,3 @@ func (t *TransStreamImpl) SendPacket(data []byte) error {
return nil return nil
} }
// CacheTransStream 针对RTMP/FLV/HLS等基于TCP传输的带缓存传输流.
type CacheTransStream struct {
TransStreamImpl
//作为封装流的内存缓存区, 即使没有开启GOP缓存也创建一个, 开启GOP缓存的情况下, 创建2个, 反复交替使用.
StreamBuffers []MemoryPool
//当前合并写切片位于memoryPool的开始偏移量
SegmentOffset int
//前一个包的时间戳
PrePacketTS int64
}
func (c *CacheTransStream) Init() {
c.TransStreamImpl.Init()
c.StreamBuffers = make([]MemoryPool, 2)
c.StreamBuffers[0] = NewDirectMemoryPool(1024 * 4000)
if c.ExistVideo && AppConfig.MergeWriteLatency > 0 {
c.StreamBuffers[1] = NewDirectMemoryPool(1024 * 4000)
}
c.SegmentOffset = 0
c.PrePacketTS = -1
}
func (c *CacheTransStream) Full(ts int64) bool {
if c.PrePacketTS == -1 {
c.PrePacketTS = ts
}
if ts < c.PrePacketTS {
c.PrePacketTS = ts
}
return int(ts-c.PrePacketTS) >= AppConfig.MergeWriteLatency
}
func (c *CacheTransStream) SwapStreamBuffer() {
if c.ExistVideo && AppConfig.MergeWriteLatency > 0 {
tmp := c.StreamBuffers[0]
c.StreamBuffers[0] = c.StreamBuffers[1]
c.StreamBuffers[1] = tmp
}
c.StreamBuffers[0].Clear()
c.PrePacketTS = -1
c.SegmentOffset = 0
}
func (c *CacheTransStream) SendPacketWithOffset(data []byte, offset int) error {
c.TransStreamImpl.SendPacket(data[offset:])
c.SegmentOffset = len(data)
c.PrePacketTS = -1
return nil
}