refactor: 分离解析推流和转发流

This commit is contained in:
ydajiang
2025-05-14 16:54:43 +08:00
parent 61e152e8ed
commit 24fc44f9c7
20 changed files with 931 additions and 781 deletions

View File

@@ -97,8 +97,6 @@ type Sink interface {
// EnableAsyncWriteMode 开启异步发送
EnableAsyncWriteMode(queueSize int)
PendingSendQueueSize() int
}
type BaseSink struct {
@@ -124,9 +122,7 @@ type BaseSink struct {
totalDataSize atomic.Uint64
writtenDataSize atomic.Uint64
lastKeyVideoDataSegment *collections.ReferenceCounter[[]byte]
pendingSendQueue chan *collections.ReferenceCounter[[]byte] // 等待发送的数据队列
blockedBufferList *collections.LinkedList[*collections.ReferenceCounter[[]byte]] // 异步队列阻塞后的切片数据
pendingSendQueue *NonBlockingChannel[*collections.ReferenceCounter[[]byte]]
cancelFunc func()
cancelCtx context.Context
@@ -148,8 +144,8 @@ func (s *BaseSink) fastForward(firstSegment *collections.ReferenceCounter[[]byte
firstSegment.Release()
s.writtenDataSize.Add(uint64(len(firstSegment.Get())))
for len(s.pendingSendQueue) > 0 {
buffer := <-s.pendingSendQueue
for len(s.pendingSendQueue.Channel) > 0 {
buffer := <-s.pendingSendQueue.Channel
// 不存在视频, 清空队列
// 还没有追到最近的关键帧, 继续追帧
if s.lastKeyVideoDataSegment == nil || buffer != s.lastKeyVideoDataSegment {
@@ -173,16 +169,9 @@ func (s *BaseSink) fastForward(firstSegment *collections.ReferenceCounter[[]byte
func (s *BaseSink) doAsyncWrite() {
defer func() {
// 释放未发送的数据
for len(s.pendingSendQueue) > 0 {
buffer := <-s.pendingSendQueue
for buffer := s.pendingSendQueue.Pop(); buffer != nil; buffer = s.pendingSendQueue.Pop() {
buffer.Release()
}
for s.blockedBufferList.Size() > 0 {
buffer := s.blockedBufferList.Remove(0)
buffer.Release()
}
ReleasePendingBuffers(s.SourceID, s.TransStreamID)
}()
@@ -191,7 +180,7 @@ func (s *BaseSink) doAsyncWrite() {
select {
case <-s.cancelCtx.Done():
return
case data := <-s.pendingSendQueue:
case data := <-s.pendingSendQueue.Channel:
// 追帧到最近的关键帧
if fastForward {
var ok bool
@@ -245,8 +234,7 @@ func (s *BaseSink) doAsyncWrite() {
func (s *BaseSink) EnableAsyncWriteMode(queueSize int) {
utils.Assert(s.Conn != nil)
s.pendingSendQueue = make(chan *collections.ReferenceCounter[[]byte], queueSize)
s.blockedBufferList = &collections.LinkedList[*collections.ReferenceCounter[[]byte]]{}
s.pendingSendQueue = NewNonBlockingChannel[*collections.ReferenceCounter[[]byte]](queueSize)
s.cancelCtx, s.cancelFunc = context.WithCancel(context.Background())
go s.doAsyncWrite()
}
@@ -265,50 +253,23 @@ func (s *BaseSink) Write(index int, data []*collections.ReferenceCounter[[]byte]
s.totalDataSize.Add(uint64(len(datum.Get())))
}
// 发送被阻塞的数据
for s.blockedBufferList.Size() > 0 {
bytes := s.blockedBufferList.Get(0)
select {
case s.pendingSendQueue <- bytes:
s.blockedBufferList.Remove(0)
break
default:
// 发送被阻塞的数据失败, 将本次发送的数据加入阻塞队列
for _, datum := range data {
s.blockedBufferList.Add(datum)
datum.Refer()
}
return nil
}
}
for _, bytes := range data {
if s.cancelCtx != nil {
bytes.Refer()
select {
case s.pendingSendQueue <- bytes:
break
default:
// 将本次发送的数据加入阻塞队列
s.blockedBufferList.Add(bytes)
//return transport.ZeroWindowSizeError{}
return nil
}
} else {
_, err := s.Conn.Write(bytes.Get())
if s.cancelCtx == nil {
for _, datum := range data {
_, err := s.Conn.Write(datum.Get())
if err != nil {
return err
}
}
} else {
for _, datum := range data {
datum.Refer()
s.pendingSendQueue.Post(datum)
}
}
return nil
}
func (s *BaseSink) PendingSendQueueSize() int {
return len(s.pendingSendQueue)
}
func (s *BaseSink) GetSourceID() string {
return s.SourceID
}
@@ -392,7 +353,7 @@ func (s *BaseSink) Close() {
} else if s.State == SessionStateTransferring {
// 从source中删除sink, 如果source为nil, 已经结束推流.
if source := SourceManager.Find(s.SourceID); source != nil {
source.RemoveSink(s)
source.GetTransStreamPublisher().RemoveSink(s)
}
} else if s.State == SessionStateWaiting {
// 从等待队列中删除sink