fix: 错误判断合并写是否包含关键帧的问题

This commit is contained in:
ydajiang
2024-12-23 20:27:21 +08:00
parent 2dc4b8622b
commit cdc4f84ffe
7 changed files with 123 additions and 86 deletions

View File

@@ -33,6 +33,7 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
var videoKey bool
var dts int64
var pts int64
var keyBuffer bool
dts = packet.ConvertDts(1000)
pts = packet.ConvertPts(1000)
@@ -48,8 +49,9 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
// 关键帧都放在切片头部,所以遇到关键帧创建新切片, 发送当前切片剩余流
if videoKey && !t.MWBuffer.IsNewSegment() {
segment := t.forceFlushSegment()
segment, key := t.forceFlushSegment()
t.AppendOutStreamBuffer(segment)
keyBuffer = key
}
var n int
@@ -73,12 +75,13 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
copy(bytes[n:], data)
// 合并写满再发
if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
if segment, key := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
// 已经分配末尾换行符内存
keyBuffer = key
t.AppendOutStreamBuffer(t.FormatSegment(segment))
}
return t.OutBuffer[:t.OutBufferSize], 0, true, nil
return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil
}
func (t *TransStream) AddTrack(track *stream.Track) error {
@@ -91,8 +94,8 @@ func (t *TransStream) AddTrack(track *stream.Track) error {
} else if utils.AVMediaTypeVideo == track.Stream.Type() {
t.muxer.AddVideoTrack(track.Stream.CodecId())
t.muxer.AddProperty("width", track.Stream.CodecParameters().Width())
t.muxer.AddProperty("height", track.Stream.CodecParameters().Height())
t.muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters().Width()))
t.muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters().Height()))
}
return nil
}
@@ -157,7 +160,7 @@ func (t *TransStream) Close() ([][]byte, int64, error) {
// 发送剩余的流
if !t.MWBuffer.IsNewSegment() {
if segment := t.forceFlushSegment(); len(segment) > 0 {
if segment, _ := t.forceFlushSegment(); len(segment) > 0 {
t.AppendOutStreamBuffer(segment)
}
}
@@ -166,16 +169,16 @@ func (t *TransStream) Close() ([][]byte, int64, error) {
}
// 保存为完整的http-flv切片
func (t *TransStream) forceFlushSegment() []byte {
func (t *TransStream) forceFlushSegment() ([]byte, bool) {
// 预览末尾换行符
t.MWBuffer.Reserve(2)
segment := t.MWBuffer.FlushSegment()
return t.FormatSegment(segment)
segment, key := t.MWBuffer.FlushSegment()
return t.FormatSegment(segment), key
}
// GetHttpFLVBlock 跳过头部的无效数据返回http-flv块
func (t *TransStream) GetHttpFLVBlock(data []byte) []byte {
return data[t.computeSkipCount(data):]
return data[t.computeSkipBytesSize(data):]
}
// FormatSegment 为切片添加包长和换行符
@@ -184,7 +187,7 @@ func (t *TransStream) FormatSegment(segment []byte) []byte {
return t.GetHttpFLVBlock(segment)
}
func (t *TransStream) computeSkipCount(data []byte) int {
func (t *TransStream) computeSkipBytesSize(data []byte) int {
return int(6 + binary.BigEndian.Uint16(data[4:]))
}
@@ -220,7 +223,7 @@ func (t *TransStream) writeSeparator(dst []byte) {
func NewHttpTransStream() stream.TransStream {
return &TransStream{
muxer: libflv.NewMuxer(),
muxer: libflv.NewMuxer(nil),
header: make([]byte, 1024),
headerSize: HttpFlvBlockHeaderSize,
}

View File

@@ -30,7 +30,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error {
// 创建PeerConnection
var remoteTrack *webrtc.TrackLocalStaticSample
s.tracks = make([]*webrtc.TrackLocalStaticSample, transStream.TrackCount())
s.tracks = make([]*webrtc.TrackLocalStaticSample, transStream.TrackSize())
connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{})
connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {

View File

@@ -34,6 +34,7 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
var chunkPayloadOffset int
var dts int64
var pts int64
var keyBuffer bool
dts = packet.ConvertDts(1000)
pts = packet.ConvertPts(1000)
@@ -55,15 +56,15 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
// 遇到视频关键帧, 发送剩余的流, 创建新切片
if videoKey {
if segment := t.MWBuffer.FlushSegment(); len(segment) > 0 {
if segment, key := t.MWBuffer.FlushSegment(); len(segment) > 0 {
keyBuffer = key
t.AppendOutStreamBuffer(segment)
}
}
// 分配内存
// 固定type0
// type为0的header大小
chunkHeaderSize := 12
// type3chunk数量
// type为3的chunk数量
numChunks := (payloadSize - 1) / t.chunkSize
rtmpMsgSize := chunkHeaderSize + payloadSize + numChunks
// 如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳
@@ -71,29 +72,32 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error
rtmpMsgSize += (1 + numChunks) * 4
}
// 分配指定大小的内存
allocate := t.MWBuffer.Allocate(rtmpMsgSize, dts, videoKey)
// 写chunk header
// 写第一个type为0的chunk header
chunk.Length = payloadSize
chunk.Timestamp = uint32(dts)
n := chunk.MarshalHeader(allocate)
// flv
// 封装成flv
if videoPkt {
n += t.muxer.WriteVideoData(allocate[n:], uint32(ct), packet.KeyFrame(), false)
} else {
n += t.muxer.WriteAudioData(allocate[n:], false)
}
// 将flv data写入chunk body
n += chunk.WriteBody(allocate[n:], data, t.chunkSize, chunkPayloadOffset)
utils.Assert(len(allocate) == n)
// 合并写满了再发
if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
if segment, key := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 {
keyBuffer = key
t.AppendOutStreamBuffer(segment)
}
return t.OutBuffer[:t.OutBufferSize], 0, true, nil
return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil
}
func (t *transStream) ReadExtraData(_ int64) ([][]byte, int64, error) {

View File

@@ -26,12 +26,12 @@ type Sink struct {
}
func (s *Sink) StartStreaming(transStream stream.TransStream) error {
utils.Assert(transStream.TrackCount() > 0)
utils.Assert(transStream.TrackSize() > 0)
if s.senders != nil {
return nil
}
s.senders = make([]*librtp.RtpSender, transStream.TrackCount())
s.senders = make([]*librtp.RtpSender, transStream.TrackSize())
// sdp回调给sink, sink应答给describe请求
if s.cb != nil {
s.cb(transStream.(*TransStream).sdp)

View File

@@ -11,21 +11,22 @@ import (
type MergeWritingBuffer interface {
Allocate(size int, ts int64, videoKey bool) []byte
// PeekCompletedSegment 返回当前完整切片, 如果不满, 返回nil.
PeekCompletedSegment() []byte
// PeekCompletedSegment 返回当前完整切片, 以及是否是关键帧切片, 未满返回nil.
PeekCompletedSegment() ([]byte, bool)
// FlushSegment 保存当前切片, 创建新的切片
FlushSegment() []byte
// FlushSegment 生成并返回当前切片, 以及是否是关键帧切片.
FlushSegment() ([]byte, bool)
// IsFull 当前切片已满
IsFull(ts int64) bool
// IsNewSegment 新切片, 还未写数据
// IsNewSegment 当前切片是否还未写数据
IsNewSegment() bool
// Reserve 从当前切片中预留指定长度数据
Reserve(number int)
Reserve(length int)
// ReadSegmentsFromKeyFrameIndex 最近的关键帧读取切片
// ReadSegmentsFromKeyFrameIndex 返回最近的关键帧切片
ReadSegmentsFromKeyFrameIndex(cb func([]byte))
Capacity() int
@@ -36,6 +37,7 @@ type mwBlock struct {
keyVideo bool
buffer collections.MemoryPool
completed bool
Time int64
}
type mergeWritingBuffer struct {
@@ -45,7 +47,7 @@ type mergeWritingBuffer struct {
startTS int64 // 当前切片的开始时间
duration int // 当前切片时长
lastKeyFrameIndex int // 最关键帧所在切片的索引
lastKeyFrameIndex int // 最近的关键帧所在切片的索引
keyFrameCount int // 关键帧计数
existVideo bool // 是否存在视频
@@ -55,9 +57,9 @@ type mergeWritingBuffer struct {
func (m *mergeWritingBuffer) createMWBlock(videoKey bool) mwBlock {
if videoKey {
return mwBlock{true, videoKey, collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength), false}
return mwBlock{true, videoKey, collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength), false, 0}
} else {
return mwBlock{true, false, collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength), false}
return mwBlock{true, false, collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength), false, 0}
}
}
@@ -77,17 +79,18 @@ func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte
utils.Assert(ts != -1)
//新的切片
// 新的切片
if m.startTS == -1 {
m.startTS = ts
if m.mwBlocks[m.index].buffer == nil {
//创建内存块
// 创建内存块
m.mwBlocks[m.index] = m.createMWBlock(videoKey)
} else {
//循环使用
// 循环使用
m.mwBlocks[m.index].buffer.Clear()
// 关键帧被覆盖, 减少计数
if m.mwBlocks[m.index].keyVideo {
m.keyFrameCount--
}
@@ -96,14 +99,15 @@ func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte
m.mwBlocks[m.index].free = false
m.mwBlocks[m.index].completed = false
m.mwBlocks[m.index].keyVideo = videoKey
m.mwBlocks[m.index].Time = ts
}
if videoKey {
//请务必确保关键帧帧从新的切片开始
//外部遇到关键帧请先调用FlushSegment
// 请务必确保关键帧帧从新的切片开始
// 外部遇到关键帧请先调用FlushSegment
utils.Assert(m.mwBlocks[m.index].buffer.IsEmpty())
m.lastKeyFrameIndex = m.index
m.keyFrameCount++
//m.lastKeyFrameIndex = m.index
//m.keyFrameCount++
}
if ts < m.startTS {
@@ -114,49 +118,61 @@ func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte
return m.mwBlocks[m.index].buffer.Allocate(size)
}
func (m *mergeWritingBuffer) FlushSegment() []byte {
func (m *mergeWritingBuffer) FlushSegment() ([]byte, bool) {
if !AppConfig.GOPCache || !m.existVideo {
return nil
return nil, false
} else if m.mwBlocks[m.index].buffer == nil || m.mwBlocks[m.index].free {
return nil
return nil, false
}
data, _ := m.mwBlocks[m.index].buffer.Data()
if len(data) == 0 {
return nil
return nil, false
}
//更新缓冲长度
key := m.mwBlocks[m.index].keyVideo
if key {
m.lastKeyFrameIndex = m.index
m.keyFrameCount++
}
// 计算最大切片数据长度,后续创建新切片按照最大长度分配内存空间
if m.lastKeyFrameIndex == m.index && m.keyFrameBufferMaxLength < len(data) {
m.keyFrameBufferMaxLength = len(data) * 3 / 2
} else if m.lastKeyFrameIndex != m.index && m.nonKeyFrameBufferMaxLength < len(data) {
m.nonKeyFrameBufferMaxLength = len(data) * 3 / 2
}
// 设置当前切片的完整性
m.mwBlocks[m.index].completed = true
// 分配下一个切片
capacity := cap(m.mwBlocks)
if m.index+1 == capacity && m.keyFrameCount == 1 {
m.grow()
capacity = cap(m.mwBlocks)
}
// 计算下一个切片索引
m.index = (m.index + 1) % capacity
m.mwBlocks[m.index].completed = true
// 清空下一个切片的标记
m.startTS = -1
m.duration = 0
m.mwBlocks[m.index].free = true
m.mwBlocks[m.index].completed = false
return data
return data, key
}
func (m *mergeWritingBuffer) PeekCompletedSegment() []byte {
func (m *mergeWritingBuffer) PeekCompletedSegment() ([]byte, bool) {
if !AppConfig.GOPCache || !m.existVideo {
data, _ := m.mwBlocks[0].buffer.Data()
m.mwBlocks[0].buffer.Clear()
return data
return data, false
}
if m.duration < AppConfig.MergeWriteLatency {
return nil
return nil, false
}
return m.FlushSegment()
@@ -174,10 +190,10 @@ func (m *mergeWritingBuffer) IsNewSegment() bool {
return m.mwBlocks[m.index].buffer == nil || m.mwBlocks[m.index].free
}
func (m *mergeWritingBuffer) Reserve(number int) {
func (m *mergeWritingBuffer) Reserve(length int) {
utils.Assert(m.mwBlocks[m.index].buffer != nil)
_ = m.mwBlocks[m.index].buffer.Allocate(number)
_ = m.mwBlocks[m.index].buffer.Allocate(length)
}
func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) {
@@ -185,18 +201,25 @@ func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) {
return
}
for i := m.lastKeyFrameIndex; i < cap(m.mwBlocks); i++ {
if m.mwBlocks[i].buffer == nil || !m.mwBlocks[i].completed {
continue
}
ranges := [2][2]int{{-1, -1}, {-1, -1}}
if m.lastKeyFrameIndex <= m.index {
ranges[0][0] = m.lastKeyFrameIndex
ranges[0][1] = m.index + 1
} else {
// 回环, 先遍历后面和前面的数据
ranges[0][0] = m.lastKeyFrameIndex
ranges[0][1] = cap(m.mwBlocks)
data, _ := m.mwBlocks[i].buffer.Data()
cb(data)
ranges[1][0] = 0
ranges[1][1] = m.index + 1
}
//回调循环使用的头部数据
if m.index < m.lastKeyFrameIndex {
for i := 0; i < m.index; i++ {
for _, index := range ranges {
for i := index[0]; i > -1 && i < index[1]; i++ {
if m.mwBlocks[i].buffer == nil || !m.mwBlocks[i].completed {
break
}
data, _ := m.mwBlocks[i].buffer.Data()
cb(data)
}
@@ -217,7 +240,7 @@ func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer {
}
if !existVideo || !AppConfig.GOPCache {
blocks[0] = mwBlock{true, false, collections.NewDirectMemoryPool(1024 * 100), false}
blocks[0] = mwBlock{true, false, collections.NewDirectMemoryPool(1024 * 100), false, 0}
}
return &mergeWritingBuffer{

View File

@@ -300,7 +300,9 @@ func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils
func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track) (TransStream, error) {
log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), s.ID)
transStream, err := CreateTransStream(s, protocol, tracks)
source := SourceManager.Find(s.ID)
utils.Assert(source != nil)
transStream, err := CreateTransStream(source, protocol, tracks)
if err != nil {
log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
return nil, err
@@ -313,7 +315,7 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream
}
transStream.SetID(id)
transStream.SetTransStreamProtocol(protocol)
transStream.SetProtocol(protocol)
// 创建输出流对应的拉流队列
s.TransStreamSinks[id] = make(map[SinkID]Sink, 128)
@@ -461,6 +463,15 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
return true
}
// 累加拉流计数
if s.recordSink != sink {
s.sinkCount++
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
}
s.sinks[sink.GetID()] = sink
s.TransStreamSinks[transStreamId][sink.GetID()] = sink
// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
conn, ok := sink.GetConn().(*transport.Conn)
if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
@@ -478,15 +489,6 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
s.write(sink, 0, data, timestamp)
}
// 累加拉流计数
if s.recordSink != sink {
s.sinkCount++
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
}
s.sinks[sink.GetID()] = sink
s.TransStreamSinks[transStreamId][sink.GetID()] = sink
// 新建传输流,发送已经缓存的音视频帧
if !exist && AppConfig.GOPCache && s.existVideo {
s.DispatchGOPBuffer(transStream)

View File

@@ -10,37 +10,42 @@ type TransStream interface {
SetID(id TransStreamID)
// Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧
Input(packet utils.AVPacket) ([][]byte, int64, bool, error)
AddTrack(track *Track) error
TrackCount() int
TrackSize() int
GetTracks() []*Track
// WriteHeader track添加完毕, 通过调用此函数告知
WriteHeader() error
// GetProtocol 返回输出流协议
GetProtocol() TransStreamProtocol
// ReadExtraData 获取封装后的编码器扩展数据
SetProtocol(protocol TransStreamProtocol)
// ReadExtraData 读取传输流的编码器扩展数据
ReadExtraData(timestamp int64) ([][]byte, int64, error)
// ReadKeyFrameBuffer 读取已经缓存的包含关键视频帧的输出流
// ReadKeyFrameBuffer 读取最近的包含视频关键帧的合并写队列
ReadKeyFrameBuffer() ([][]byte, int64, error)
// Close 关闭传输流, 返回还未flush的合并写块
Close() ([][]byte, int64, error)
// ClearOutStreamBuffer 清空传输流的合并写块队列
ClearOutStreamBuffer()
// AppendOutStreamBuffer 添加合并写块到队列
AppendOutStreamBuffer(buffer []byte)
// OutStreamBufferCapacity 返回输出流缓冲区的容量大小, 输出流缓冲区同时作为向sink推流的发送缓冲区, 容量大小决定向sink异步推流的队列大小;
// OutStreamBufferCapacity 返回合并写块队列容量大小, 作为sink异步推流的队列大小;
OutStreamBufferCapacity() int
IsExistVideo() bool
SetTransStreamProtocol(protocol TransStreamProtocol)
}
type BaseTransStream struct {
@@ -50,8 +55,8 @@ type BaseTransStream struct {
ExistVideo bool
Protocol TransStreamProtocol
OutBuffer [][]byte // 输出流的返回队列
OutBufferSize int
OutBuffer [][]byte // 传输流的合并写块队列
OutBufferSize int // 传输流返合并写块队列大小
}
func (t *BaseTransStream) GetID() TransStreamID {
@@ -82,6 +87,10 @@ func (t *BaseTransStream) GetProtocol() TransStreamProtocol {
return t.Protocol
}
func (t *BaseTransStream) SetProtocol(protocol TransStreamProtocol) {
t.Protocol = protocol
}
func (t *BaseTransStream) ClearOutStreamBuffer() {
t.OutBufferSize = 0
}
@@ -106,7 +115,7 @@ func (t *BaseTransStream) OutStreamBufferCapacity() int {
return 0
}
func (t *BaseTransStream) TrackCount() int {
func (t *BaseTransStream) TrackSize() int {
return len(t.Tracks)
}
@@ -126,10 +135,6 @@ func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) {
return nil, 0, nil
}
func (t *BaseTransStream) SetTransStreamProtocol(protocol TransStreamProtocol) {
t.Protocol = protocol
}
type TCPTransStream struct {
BaseTransStream