From 9568530233e3b7892c7423f6811dfaed3a430caa Mon Sep 17 00:00:00 2001 From: ydajiang Date: Tue, 22 Apr 2025 19:46:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E5=BF=AB=E8=BF=9B?= =?UTF-8?q?=E8=BF=BD=E5=B8=A7=E5=92=8C=E5=85=B3=E9=97=ADsink,=20=E5=9C=A8?= =?UTF-8?q?=E6=8E=A8=E6=B5=81=E7=BC=93=E6=85=A2=E6=97=B6.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flv/flv_sink.go | 4 +- gb28181/forward_sink.go | 4 +- record/record_flv.go | 2 +- rtc/rtc_sink.go | 2 +- rtsp/rtsp_sink.go | 4 +- stream/sink.go | 106 +++++++++++++++++++++++++++++++++++++--- stream/source.go | 18 +++---- 7 files changed, 117 insertions(+), 23 deletions(-) diff --git a/flv/flv_sink.go b/flv/flv_sink.go index fdf3487..76dcce8 100644 --- a/flv/flv_sink.go +++ b/flv/flv_sink.go @@ -17,14 +17,14 @@ func (s *Sink) StopStreaming(stream stream.TransStream) { s.prevTagSize = stream.(*TransStream).Muxer.PrevTagSize() } -func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { +func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error { // 恢复推流时, 不发送9个字节的flv header if s.prevTagSize > 0 { data = data[1:] s.prevTagSize = 0 } - return s.BaseSink.Write(index, data, ts) + return s.BaseSink.Write(index, data, ts, keyVideo) } func NewFLVSink(id stream.SinkID, sourceId string, conn net.Conn) stream.Sink { diff --git a/gb28181/forward_sink.go b/gb28181/forward_sink.go index 86f11a7..3e4a0b9 100644 --- a/gb28181/forward_sink.go +++ b/gb28181/forward_sink.go @@ -35,7 +35,7 @@ func (f *ForwardSink) OnDisConnected(conn net.Conn, err error) { f.Close() } -func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { +func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error { // TCP等待连接后再转发数据 if SetupUDP != f.setup && f.Conn == nil { return nil @@ -47,7 +47,7 @@ func (f *ForwardSink) Write(index int, data []*collections.ReferenceCounter[[]by if SetupUDP == f.setup { f.socket.(*transport.UDPClient).Write(data[0].Get()[2:]) } else { - return f.BaseSink.Write(index, data, ts) + return f.BaseSink.Write(index, data, ts, keyVideo) } return nil diff --git a/record/record_flv.go b/record/record_flv.go index 7088719..e9ee149 100644 --- a/record/record_flv.go +++ b/record/record_flv.go @@ -15,7 +15,7 @@ type FLVFileSink struct { } // Input 输入http-flv数据 -func (f *FLVFileSink) Write(index int, blocks []*collections.ReferenceCounter[[]byte], ts int64) error { +func (f *FLVFileSink) Write(index int, blocks []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error { if f.fail { return nil } diff --git a/rtc/rtc_sink.go b/rtc/rtc_sink.go index 75de538..862ff08 100644 --- a/rtc/rtc_sink.go +++ b/rtc/rtc_sink.go @@ -141,7 +141,7 @@ func (s *Sink) Close() { } } -func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { +func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error { if s.tracks[index] == nil { return nil } diff --git a/rtsp/rtsp_sink.go b/rtsp/rtsp_sink.go index 815ec6d..ebd05f4 100644 --- a/rtsp/rtsp_sink.go +++ b/rtsp/rtsp_sink.go @@ -81,7 +81,7 @@ func (s *Sink) AddSender(index int, tcp bool, ssrc uint32) (uint16, uint16, erro return rtpPort, rtcpPort, err } -func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rtpTime int64) error { +func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rtpTime int64, keyVideo bool) error { // 拉流方还没有连接上来 if index >= cap(s.senders) || s.senders[index] == nil { return nil @@ -94,7 +94,7 @@ func (s *Sink) Write(index int, data []*collections.ReferenceCounter[[]byte], rt if s.TCPStreaming { // 一次发送会花屏? // return s.BaseSink.Write(index, data, rtpTime) - s.BaseSink.Write(index, data[i:i+1], rtpTime) + s.BaseSink.Write(index, data[i:i+1], rtpTime, keyVideo) //s.Conn.Write(bytes.Get()) } else { // 发送rtcp sr包 diff --git a/stream/sink.go b/stream/sink.go index 34f221e..d50e240 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -9,9 +9,17 @@ import ( "net" "net/url" "sync" + "sync/atomic" "time" ) +const ( + EnableFastForward = false // 发送超时, 开始追帧 + EnableCloseOnWriteTimeout = false // 发送超时, 直接关闭Sink + WriteTimeout = 2000 // 发送超时时间, 单位毫秒. 如果发送超时, 开始追帧/关闭Sink + MaxPendingDataSize = 1024 * 1024 * 5 // 最大等待发送数据大小, 超过该大小, 开始追帧/关闭Sink +) + // Sink 对拉流端的封装 type Sink interface { GetID() SinkID @@ -20,7 +28,7 @@ type Sink interface { GetSourceID() string - Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error + Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error GetTransStreamID() TransStreamID @@ -110,9 +118,12 @@ type BaseSink struct { TCPStreaming bool // 是否是TCP流式拉流 urlValues url.Values // 拉流时携带的Url参数 - SentPacketCount int // 发包计数 - Ready bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流. - createTime time.Time + SentPacketCount int // 发包计数 + Ready bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流. + createTime time.Time + totalDataSize atomic.Uint64 + writtenDataSize atomic.Uint64 + lastKeyVideoDataSegment *collections.ReferenceCounter[[]byte] pendingSendQueue chan *collections.ReferenceCounter[[]byte] // 等待发送的数据队列 blockedBufferList *collections.LinkedList[*collections.ReferenceCounter[[]byte]] // 异步队列阻塞后的切片数据 @@ -129,6 +140,36 @@ func (s *BaseSink) SetID(id SinkID) { s.ID = id } +func (s *BaseSink) fastForward(firstSegment *collections.ReferenceCounter[[]byte]) (*collections.ReferenceCounter[[]byte], bool) { + if s.lastKeyVideoDataSegment == firstSegment { + return firstSegment, true + } + + firstSegment.Release() + s.writtenDataSize.Add(uint64(len(firstSegment.Get()))) + + for len(s.pendingSendQueue) > 0 { + buffer := <-s.pendingSendQueue + // 不存在视频, 清空队列 + // 还没有追到最近的关键帧, 继续追帧 + if s.lastKeyVideoDataSegment == nil || buffer != s.lastKeyVideoDataSegment { + buffer.Release() + s.writtenDataSize.Add(uint64(len(buffer.Get()))) + } else { + // else if TransStreamFlv == s.Protocol { + // // 重置第一个flv tag的pre tag size + // if data == s.lastKeyVideoDataSegment { + // binary.BigEndian.PutUint32(GetFLVTag(data.Get()), s.flvExtraDataPreTagSize) + // } + // } + return buffer, true + } + } + + // 不存在视频, 清空队列后, 等待下次继续推流 + return nil, s.lastKeyVideoDataSegment == nil +} + func (s *BaseSink) doAsyncWrite() { defer func() { // 释放未发送的数据 @@ -145,13 +186,56 @@ func (s *BaseSink) doAsyncWrite() { ReleasePendingBuffers(s.SourceID, s.TransStreamID) }() + var fastForward bool for { select { case <-s.cancelCtx.Done(): return case data := <-s.pendingSendQueue: - s.Conn.Write(data.Get()) + // 追帧到最近的关键帧 + if fastForward { + var ok bool + data, ok = s.fastForward(data) + if fastForward = !ok; !ok || data == nil { + break + } + } + + l := time.Now().UnixMilli() + _, err := s.Conn.Write(data.Get()) + duration := time.Now().UnixMilli() - l + if err != nil { + log.Sugar.Errorf(err.Error()) + } + data.Release() + s.writtenDataSize.Add(uint64(len(data.Get()))) + + if (EnableFastForward || EnableCloseOnWriteTimeout) && duration > WriteTimeout { + // 等待发送的数据大小超过最大等待发送数据大小, 开始追帧 + // 如果extra data没有发送完成, 拉流端会有问题. 给个最低128k限制, 当然也可以统计真实的extra data大小 + // timeout := s.writtenDataSize.Load() > 128*1024 && s.totalDataSize.Load()-s.writtenDataSize.Load() > MaxPendingDataSize + timeout := s.totalDataSize.Load()-s.writtenDataSize.Load() > MaxPendingDataSize + if !timeout { + break + } + + if EnableCloseOnWriteTimeout { + log.Sugar.Errorf("write timeout, closing sink. writtenDataSize: %d, totalDataSize: %d sink: %s, source: %s", s.writtenDataSize.Load(), s.totalDataSize.Load(), s.ID, s.SourceID) + s.Conn.Close() + // 不直接return, 从连接处最外层逐步关闭Sink + // 如果直接return,Sink未从Source中删除, 执行defer func函数操作blockedBufferList与write非线程安全, 可能会panic, 以及管道清理不干净, buffer不释放等问题 + // return + + <-s.cancelCtx.Done() + return + } + + if EnableFastForward { + fastForward = true + log.Sugar.Errorf("write timeout, fast forward. writtenDataSize: %d, totalDataSize: %d sink: %s, source: %s", s.writtenDataSize.Load(), s.totalDataSize.Load(), s.ID, s.SourceID) + } + } break } } @@ -165,11 +249,20 @@ func (s *BaseSink) EnableAsyncWriteMode(queueSize int) { go s.doAsyncWrite() } -func (s *BaseSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64) error { +func (s *BaseSink) Write(index int, data []*collections.ReferenceCounter[[]byte], ts int64, keyVideo bool) error { if s.Conn == nil { return nil } + if keyVideo { + s.lastKeyVideoDataSegment = data[0] + } + + // 统计发送的数据大小 + for _, datum := range data { + s.totalDataSize.Add(uint64(len(datum.Get()))) + } + // 发送被阻塞的数据 for s.blockedBufferList.Size() > 0 { bytes := s.blockedBufferList.Get(0) @@ -274,6 +367,7 @@ func (s *BaseSink) Close() { s.Lock() defer func() { + // 此时Sink已经从Source或等待队列中删除 closed := s.State == SessionStateClosed s.State = SessionStateClosed s.UnLock() diff --git a/stream/source.go b/stream/source.go index bb18d48..0618da0 100644 --- a/stream/source.go +++ b/stream/source.go @@ -314,7 +314,7 @@ func (s *PublishSource) DispatchPacket(transStream TransStream, packet *avformat } // DispatchBuffer 分发传输流 -func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, videoKey bool) { +func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) { sinks := s.TransStreamSinks[transStream.GetID()] exist := transStream.IsExistVideo() @@ -322,18 +322,18 @@ func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data // 如果存在视频, 确保向sink发送的第一帧是关键帧 if exist && sink.GetSentPacketCount() < 1 { - if !videoKey { + if !keyVideo { continue } if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { - if ok := s.write(sink, index, extraData, timestamp); !ok { + if ok := s.write(sink, index, extraData, timestamp, false); !ok { continue } } } - if ok := s.write(sink, index, data, timestamp); !ok { + if ok := s.write(sink, index, data, timestamp, keyVideo); !ok { continue } } @@ -345,8 +345,8 @@ func (s *PublishSource) pendingSink(sink Sink) { } // 向sink推流 -func (s *PublishSource) write(sink Sink, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64) bool { - err := sink.Write(index, data, timestamp) +func (s *PublishSource) write(sink Sink, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) bool { + err := sink.Write(index, data, timestamp, keyVideo) if err == nil { sink.IncreaseSentPacketCount() return true @@ -439,7 +439,7 @@ func (s *PublishSource) doAddSink(sink Sink, resume bool) bool { log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), s.ID) return false } - + // 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流 if !sink.IsReady() { return true @@ -465,10 +465,10 @@ func (s *PublishSource) doAddSink(sink Sink, resume bool) bool { data, timestamp, _ := transStream.ReadKeyFrameBuffer() if len(data) > 0 { if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { - s.write(sink, 0, extraData, timestamp) + s.write(sink, 0, extraData, timestamp, false) } - s.write(sink, 0, data, timestamp) + s.write(sink, 0, data, timestamp, true) } // 新建传输流,发送已经缓存的音视频帧