feat: 支持快进追帧和关闭sink, 在推流缓慢时.

This commit is contained in:
ydajiang
2025-04-22 19:46:39 +08:00
parent 02a0a42238
commit 9568530233
7 changed files with 117 additions and 23 deletions

View File

@@ -17,14 +17,14 @@ func (s *Sink) StopStreaming(stream stream.TransStream) {
s.prevTagSize = stream.(*TransStream).Muxer.PrevTagSize()
}
func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error {
func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
// 恢复推流时, 不发送9个字节的flv header
if s.prevTagSize > 0 {
data = data[1:]
s.prevTagSize = 0
}
return s.BaseSink.Write(index, data, ts)
return s.BaseSink.Write(index, data, ts, keyVideo)
}
func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink {

View File

@@ -35,7 +35,7 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) {
f.Close()
}
func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error {
func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
// TCP等待连接后再转发数据
if SetupUDP != f.setup && f.Conn == nil {
return nil
@@ -47,7 +47,7 @@ func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]by
if SetupUDP == f.setup {
f.socket.(*transport.UDPClient).Write(data[0].Get()[2:])
} else {
return f.BaseSink.Write(index, data, ts)
return f.BaseSink.Write(index, data, ts, keyVideo)
}
return nil

View File

@@ -15,7 +15,7 @@ type FLVFileSink struct {
}
// Input 输入http-flv数据
func (f *FLVFileSink) Write(index int, blocks []*collections.ReferenceCounter[[]byte], ts int64) error {
func (f *FLVFileSink) Write(index int, blocks []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
if f.fail {
return nil
}

View File

@@ -141,7 +141,7 @@ func (s *Sink) Close() {
}
}
func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error {
func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
if s.tracks[index] == nil {
return nil
}

View File

@@ -81,7 +81,7 @@ func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro
return rtpPort, rtcpPort, err
}
func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rtpTime int64) error {
func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rtpTime int64, keyVideo bool) error {
// 拉流方还没有连接上来
if index >= cap(s.senders) || s.senders[index] == nil {
return nil
@@ -94,7 +94,7 @@ func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rt
if s.TCPStreaming {
// 一次发送会花屏?
// return s.BaseSink.Write(index, data, rtpTime)
s.BaseSink.Write(index, data[i:i+1], rtpTime)
s.BaseSink.Write(index, data[i:i+1], rtpTime, keyVideo)
//s.Conn.Write(bytes.Get())
} else {
// 发送rtcp sr包

View File

@@ -9,9 +9,17 @@ import (
"net"
"net/url"
"sync"
"sync/atomic"
"time"
)
const (
EnableFastForward = false // 发送超时, 开始追帧
EnableCloseOnWriteTimeout = false // 发送超时, 直接关闭Sink
WriteTimeout = 2000 // 发送超时时间, 单位毫秒. 如果发送超时, 开始追帧/关闭Sink
MaxPendingDataSize = 1024 * 1024 * 5 // 最大等待发送数据大小, 超过该大小, 开始追帧/关闭Sink
)
// Sink 对拉流端的封装
type Sink interface {
GetID() SinkID
@@ -20,7 +28,7 @@ type Sink interface {
GetSourceID() string
Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error
Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error
GetTransStreamID() TransStreamID
@@ -113,6 +121,9 @@ type BaseSink struct {
SentPacketCount int // 发包计数
Ready bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流.
createTime time.Time
totalDataSize atomic.Uint64
writtenDataSize atomic.Uint64
lastKeyVideoDataSegment *collections.ReferenceCounter[[]byte]
pendingSendQueue chan *collections.ReferenceCounter[[]byte] // 等待发送的数据队列
blockedBufferList *collections.LinkedList[*collections.ReferenceCounter[[]byte]] // 异步队列阻塞后的切片数据
@@ -129,6 +140,36 @@ func (s *BaseSink) SetID(id SinkID) {
s.ID = id
}
func (s *BaseSink) fastForward(firstSegment *collections.ReferenceCounter[[]byte]) (*collections.ReferenceCounter[[]byte], bool) {
if s.lastKeyVideoDataSegment == firstSegment {
return firstSegment, true
}
firstSegment.Release()
s.writtenDataSize.Add(uint64(len(firstSegment.Get())))
for len(s.pendingSendQueue) > 0 {
buffer := <-s.pendingSendQueue
// 不存在视频, 清空队列
// 还没有追到最近的关键帧, 继续追帧
if s.lastKeyVideoDataSegment == nil || buffer != s.lastKeyVideoDataSegment {
buffer.Release()
s.writtenDataSize.Add(uint64(len(buffer.Get())))
} else {
// else if TransStreamFlv == s.Protocol {
// // 重置第一个flv tag的pre tag size
// if data == s.lastKeyVideoDataSegment {
// binary.BigEndian.PutUint32(GetFLVTag(data.Get()), s.flvExtraDataPreTagSize)
// }
// }
return buffer, true
}
}
// 不存在视频, 清空队列后, 等待下次继续推流
return nil, s.lastKeyVideoDataSegment == nil
}
func (s *BaseSink) doAsyncWrite() {
defer func() {
// 释放未发送的数据
@@ -145,13 +186,56 @@ func (s *BaseSink) doAsyncWrite() {
ReleasePendingBuffers(s.SourceID, s.TransStreamID)
}()
var fastForward bool
for {
select {
case <-s.cancelCtx.Done():
return
case data := <-s.pendingSendQueue:
s.Conn.Write(data.Get())
// 追帧到最近的关键帧
if fastForward {
var ok bool
data, ok = s.fastForward(data)
if fastForward = !ok; !ok || data == nil {
break
}
}
l := time.Now().UnixMilli()
_, err := s.Conn.Write(data.Get())
duration := time.Now().UnixMilli() - l
if err != nil {
log.Sugar.Errorf(err.Error())
}
data.Release()
s.writtenDataSize.Add(uint64(len(data.Get())))
if (EnableFastForward || EnableCloseOnWriteTimeout) && duration > WriteTimeout {
// 等待发送的数据大小超过最大等待发送数据大小, 开始追帧
// 如果extra data没有发送完成, 拉流端会有问题. 给个最低128k限制, 当然也可以统计真实的extra data大小
// timeout := s.writtenDataSize.Load() > 128*1024 && s.totalDataSize.Load()-s.writtenDataSize.Load() > MaxPendingDataSize
timeout := s.totalDataSize.Load()-s.writtenDataSize.Load() > MaxPendingDataSize
if !timeout {
break
}
if EnableCloseOnWriteTimeout {
log.Sugar.Errorf("write timeout, closing sink. writtenDataSize: %d, totalDataSize: %d sink: %s, source: %s", s.writtenDataSize.Load(), s.totalDataSize.Load(), s.ID, s.SourceID)
s.Conn.Close()
// 不直接return, 从连接处最外层逐步关闭Sink
// 如果直接return,Sink未从Source中删除, 执行defer func函数操作blockedBufferList与write非线程安全, 可能会panic, 以及管道清理不干净, buffer不释放等问题
// return
<-s.cancelCtx.Done()
return
}
if EnableFastForward {
fastForward = true
log.Sugar.Errorf("write timeout, fast forward. writtenDataSize: %d, totalDataSize: %d sink: %s, source: %s", s.writtenDataSize.Load(), s.totalDataSize.Load(), s.ID, s.SourceID)
}
}
break
}
}
@@ -165,11 +249,20 @@ func (s *BaseSink) EnableAsyncWriteMode(queueSize int) {
go s.doAsyncWrite()
}
func (s *BaseSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error {
func (s *BaseSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error {
if s.Conn == nil {
return nil
}
if keyVideo {
s.lastKeyVideoDataSegment = data[0]
}
// 统计发送的数据大小
for _, datum := range data {
s.totalDataSize.Add(uint64(len(datum.Get())))
}
// 发送被阻塞的数据
for s.blockedBufferList.Size() > 0 {
bytes := s.blockedBufferList.Get(0)
@@ -274,6 +367,7 @@ func (s *BaseSink) Close() {
s.Lock()
defer func() {
// 此时Sink已经从Source或等待队列中删除
closed := s.State == SessionStateClosed
s.State = SessionStateClosed
s.UnLock()

View File

@@ -314,7 +314,7 @@ func (s *PublishSource) DispatchPacket(transStream TransStream, packet *avformat
}
// DispatchBuffer 分发传输流
func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, videoKey bool) {
func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) {
sinks := s.TransStreamSinks[transStream.GetID()]
exist := transStream.IsExistVideo()
@@ -322,18 +322,18 @@ func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data
// 如果存在视频, 确保向sink发送的第一帧是关键帧
if exist && sink.GetSentPacketCount() < 1 {
if !videoKey {
if !keyVideo {
continue
}
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
if ok := s.write(sink, index, extraData, timestamp); !ok {
if ok := s.write(sink, index, extraData, timestamp, false); !ok {
continue
}
}
}
if ok := s.write(sink, index, data, timestamp); !ok {
if ok := s.write(sink, index, data, timestamp, keyVideo); !ok {
continue
}
}
@@ -345,8 +345,8 @@ func (s *PublishSource) pendingSink(sink Sink) {
}
// 向sink推流
func (s *PublishSource) write(sink Sink, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64) bool {
err := sink.Write(index, data, timestamp)
func (s *PublishSource) write(sink Sink, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) bool {
err := sink.Write(index, data, timestamp, keyVideo)
if err == nil {
sink.IncreaseSentPacketCount()
return true
@@ -465,10 +465,10 @@ func (s *PublishSource) doAddSink(sink Sink, resume bool) bool {
data, timestamp, _ := transStream.ReadKeyFrameBuffer()
if len(data) > 0 {
if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 {
s.write(sink, 0, extraData, timestamp)
s.write(sink, 0, extraData, timestamp, false)
}
s.write(sink, 0, data, timestamp)
s.write(sink, 0, data, timestamp, true)
}
// 新建传输流,发送已经缓存的音视频帧