mirror of
https://github.com/lkmio/lkm.git
synced 2025-09-27 03:26:01 +08:00
feat: 恢复推流
This commit is contained in:
@@ -1,12 +1,15 @@
|
|||||||
package stream
|
package stream
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/lkmio/avformat/transport"
|
||||||
"github.com/lkmio/avformat/utils"
|
"github.com/lkmio/avformat/utils"
|
||||||
"github.com/lkmio/lkm/log"
|
"github.com/lkmio/lkm/log"
|
||||||
"net"
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -84,6 +87,15 @@ type Sink interface {
|
|||||||
CreateTime() time.Time
|
CreateTime() time.Time
|
||||||
|
|
||||||
SetCreateTime(time time.Time)
|
SetCreateTime(time time.Time)
|
||||||
|
|
||||||
|
// EnableAsyncWriteMode 开启异步发送
|
||||||
|
EnableAsyncWriteMode(queueSize int)
|
||||||
|
|
||||||
|
// Pause 暂停推流
|
||||||
|
Pause()
|
||||||
|
|
||||||
|
// IsExited 异步发送协程是否退出, 如果还没有退出(write阻塞)不恢复推流
|
||||||
|
IsExited() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseSink struct {
|
type BaseSink struct {
|
||||||
@@ -106,6 +118,11 @@ type BaseSink struct {
|
|||||||
SentPacketCount int // 发包计数
|
SentPacketCount int // 发包计数
|
||||||
Ready bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流.
|
Ready bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流.
|
||||||
createTime time.Time
|
createTime time.Time
|
||||||
|
|
||||||
|
existed atomic.Bool
|
||||||
|
pendingSendQueue chan []byte // 等待发送的数据队列
|
||||||
|
cancelFunc func()
|
||||||
|
cancelCtx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BaseSink) GetID() SinkID {
|
func (s *BaseSink) GetID() SinkID {
|
||||||
@@ -116,9 +133,52 @@ func (s *BaseSink) SetID(id SinkID) {
|
|||||||
s.ID = id
|
s.ID = id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *BaseSink) doAsyncWrite() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.cancelCtx.Done():
|
||||||
|
return
|
||||||
|
case data := <-s.pendingSendQueue:
|
||||||
|
s.Conn.Write(data)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.existed.Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BaseSink) EnableAsyncWriteMode(queueSize int) {
|
||||||
|
utils.Assert(s.Conn != nil)
|
||||||
|
s.existed.Store(false)
|
||||||
|
s.pendingSendQueue = make(chan []byte, queueSize)
|
||||||
|
s.cancelCtx, s.cancelFunc = context.WithCancel(context.Background())
|
||||||
|
go s.doAsyncWrite()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BaseSink) Pause() {
|
||||||
|
if s.cancelCtx != nil {
|
||||||
|
s.cancelFunc()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BaseSink) IsExited() bool {
|
||||||
|
return s.existed.Load()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *BaseSink) Write(index int, data [][]byte, ts int64) error {
|
func (s *BaseSink) Write(index int, data [][]byte, ts int64) error {
|
||||||
if s.Conn != nil {
|
if s.Conn == nil {
|
||||||
for _, bytes := range data {
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, bytes := range data {
|
||||||
|
if s.cancelCtx != nil {
|
||||||
|
select {
|
||||||
|
case s.pendingSendQueue <- bytes:
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
return transport.ZeroWindowSizeError{}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
_, err := s.Conn.Write(bytes)
|
_, err := s.Conn.Write(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@@ -154,6 +154,7 @@ type PublishSource struct {
|
|||||||
|
|
||||||
TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink
|
TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink
|
||||||
sinks map[SinkID]Sink // 保存所有Sink
|
sinks map[SinkID]Sink // 保存所有Sink
|
||||||
|
slowSinks map[SinkID]Sink // 因推流慢被挂起的sink队列
|
||||||
TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
|
TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
|
||||||
streamEndInfo *StreamEndInfo // 之前推流源信息
|
streamEndInfo *StreamEndInfo // 之前推流源信息
|
||||||
accumulateTimestamps bool // 是否累加时间戳
|
accumulateTimestamps bool // 是否累加时间戳
|
||||||
@@ -216,6 +217,7 @@ func (s *PublishSource) Init(receiveQueueSize int) {
|
|||||||
|
|
||||||
s.TransStreams = make(map[TransStreamID]TransStream, 10)
|
s.TransStreams = make(map[TransStreamID]TransStream, 10)
|
||||||
s.sinks = make(map[SinkID]Sink, 128)
|
s.sinks = make(map[SinkID]Sink, 128)
|
||||||
|
s.slowSinks = make(map[SinkID]Sink, 12)
|
||||||
s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
|
s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
|
||||||
s.statistics = NewBitrateStatistics()
|
s.statistics = NewBitrateStatistics()
|
||||||
}
|
}
|
||||||
@@ -375,8 +377,33 @@ func (s *PublishSource) write(sink Sink, index int, data [][]byte, timestamp int
|
|||||||
// 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流.
|
// 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流.
|
||||||
_, ok := err.(*transport.ZeroWindowSizeError)
|
_, ok := err.(*transport.ZeroWindowSizeError)
|
||||||
if ok {
|
if ok {
|
||||||
log.Sugar.Errorf("向sink推流超时,关闭连接. sink: %s", sink.GetID())
|
if s.existVideo {
|
||||||
go sink.Close()
|
log.Sugar.Errorf("向sink推流超时,挂起%s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID)
|
||||||
|
// 等待下个关键帧恢复推流
|
||||||
|
s.PauseStreaming(sink)
|
||||||
|
} else {
|
||||||
|
log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID)
|
||||||
|
go sink.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublishSource) PauseStreaming(sink Sink) {
|
||||||
|
s.cleanupSinkStreaming(sink)
|
||||||
|
s.slowSinks[sink.GetID()] = sink
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublishSource) ResumeStreaming() {
|
||||||
|
for id, sink := range s.sinks {
|
||||||
|
if !sink.IsExited() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(s.slowSinks, id)
|
||||||
|
ok := s.doAddSink(sink)
|
||||||
|
if ok {
|
||||||
|
go sink.Close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -474,9 +501,11 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
|
|||||||
s.TransStreamSinks[transStreamId][sink.GetID()] = sink
|
s.TransStreamSinks[transStreamId][sink.GetID()] = sink
|
||||||
|
|
||||||
// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
|
// TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响.
|
||||||
conn, ok := sink.GetConn().(*transport.Conn)
|
_, ok := sink.GetConn().(*transport.Conn)
|
||||||
if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
|
if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 {
|
||||||
conn.EnableAsyncWriteMode(transStream.OutStreamBufferCapacity() - 2)
|
length := transStream.OutStreamBufferCapacity() - 2
|
||||||
|
fmt.Printf("发送缓冲区容量: %d\r\n", length)
|
||||||
|
sink.EnableAsyncWriteMode(length)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 发送已有的缓存数据
|
// 发送已有的缓存数据
|
||||||
@@ -550,22 +579,27 @@ func (s *PublishSource) FindSink(id SinkID) Sink {
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PublishSource) doRemoveSink(sink Sink) bool {
|
func (s *PublishSource) cleanupSinkStreaming(sink Sink) {
|
||||||
transStreamSinks := s.TransStreamSinks[sink.GetTransStreamID()]
|
transStreamSinks := s.TransStreamSinks[sink.GetTransStreamID()]
|
||||||
delete(s.sinks, sink.GetID())
|
|
||||||
delete(transStreamSinks, sink.GetID())
|
delete(transStreamSinks, sink.GetID())
|
||||||
|
|
||||||
s.sinkCount--
|
|
||||||
s.lastStreamEndTime = time.Now()
|
s.lastStreamEndTime = time.Now()
|
||||||
|
|
||||||
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
|
|
||||||
|
|
||||||
if sink.GetProtocol() == TransStreamHls {
|
if sink.GetProtocol() == TransStreamHls {
|
||||||
// 从HLS拉流队列删除Sink
|
// 从HLS拉流队列删除Sink
|
||||||
SinkManager.Remove(sink.GetID())
|
SinkManager.Remove(sink.GetID())
|
||||||
}
|
}
|
||||||
|
|
||||||
sink.StopStreaming(s.TransStreams[sink.GetTransStreamID()])
|
sink.StopStreaming(s.TransStreams[sink.GetTransStreamID()])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *PublishSource) doRemoveSink(sink Sink) bool {
|
||||||
|
s.cleanupSinkStreaming(sink)
|
||||||
|
delete(s.sinks, sink.GetID())
|
||||||
|
delete(s.slowSinks, sink.GetID())
|
||||||
|
|
||||||
|
s.sinkCount--
|
||||||
|
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
|
||||||
|
|
||||||
HookPlayDoneEvent(sink)
|
HookPlayDoneEvent(sink)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -848,6 +882,11 @@ func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket) {
|
|||||||
s.gopBuffer.AddPacket(packet)
|
s.gopBuffer.AddPacket(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 遇到关键帧, 恢复推流
|
||||||
|
if utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame() && len(s.slowSinks) > 0 {
|
||||||
|
s.ResumeStreaming()
|
||||||
|
}
|
||||||
|
|
||||||
// track解析完毕后,才能生成传输流
|
// track解析完毕后,才能生成传输流
|
||||||
if s.completed {
|
if s.completed {
|
||||||
s.CorrectTimestamp(packet)
|
s.CorrectTimestamp(packet)
|
||||||
|
Reference in New Issue
Block a user