完善sink释放

This commit is contained in:
yangjiechina
2024-04-23 14:21:53 +08:00
parent ccb2afbe23
commit 43fd14b219
6 changed files with 146 additions and 52 deletions

1
api.go
View File

@@ -188,6 +188,7 @@ func (api *ApiServer) onFLV(sourceId string, w http.ResponseWriter, r *http.Requ
for { for {
if _, err := conn.Read(bytes); err != nil { if _, err := conn.Read(bytes); err != nil {
log.Sugar.Infof("http-flv 断开连接 sink:%s", sink.PrintInfo()) log.Sugar.Infof("http-flv 断开连接 sink:%s", sink.PrintInfo())
sink.Close()
break break
} }
} }

View File

@@ -3,6 +3,7 @@ package rtmp
import ( import (
"github.com/yangjiechina/avformat/libflv" "github.com/yangjiechina/avformat/libflv"
"github.com/yangjiechina/avformat/librtmp" "github.com/yangjiechina/avformat/librtmp"
"github.com/yangjiechina/avformat/transport"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/stream" "github.com/yangjiechina/live-server/stream"
"net" "net"
@@ -43,7 +44,8 @@ func NewPublisher(sourceId string, stack *librtmp.Stack, conn net.Conn) Publishe
//设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl //设置回调从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl
deMuxer.SetHandler(publisher_) deMuxer.SetHandler(publisher_)
publisher_.Input_ = publisher_.Input publisher_.Input_ = publisher_.Input
//为推流方分配足够多的缓冲区
conn.(*transport.Conn).ReallocateRecvBuffer(1024 * 1024)
return publisher_ return publisher_
} }

View File

@@ -65,6 +65,8 @@ func (s *serverImpl) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("rtmp断开连接 conn:%s", conn.RemoteAddr().String()) log.Sugar.Debugf("rtmp断开连接 conn:%s", conn.RemoteAddr().String())
t := conn.(*transport.Conn) t := conn.(*transport.Conn)
if t.Data != nil {
t.Data.(*sessionImpl).Close() t.Data.(*sessionImpl).Close()
t.Data = nil t.Data = nil
} }
}

View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/avformat/utils"
"net" "net"
"sync"
) )
type SinkId interface{} type SinkId interface{}
@@ -26,9 +27,11 @@ type ISink interface {
ProtocolStr() string ProtocolStr() string
// State 获取Sink状态, 调用前外部必须手动加锁
State() SessionState State() SessionState
SetState(state SessionState) bool // SetState 设置Sink状态, 调用前外部必须手动加锁
SetState(state SessionState)
EnableVideo() bool EnableVideo() bool
@@ -45,6 +48,13 @@ type ISink interface {
Close() Close()
PrintInfo() string PrintInfo() string
// Lock Sink请求拉流->Source推流->Sink断开整个阶段, 是无锁线程安全
//如果Sink在等待队列-Sink断开, 这个过程是非线程安全的
//所以Source在AddSink时, SessionStateWait状态时, 需要加锁保护.
Lock()
UnLock()
} }
// GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String // GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String
@@ -77,10 +87,7 @@ type SinkImpl struct {
TransStreamId_ TransStreamId TransStreamId_ TransStreamId
disableVideo bool disableVideo bool
//Sink在请求拉流->Source推流->Sink断开整个阶段 是无锁线程安全 lock sync.RWMutex
//如果Sink在等待队列-Sink断开这个过程是非线程安全的
//SetState的时候如果closed为true返回false, 调用者自行删除sink
//closed atomic.Bool
//HasSentKeyVideo 是否已经发送视频关键帧 //HasSentKeyVideo 是否已经发送视频关键帧
//未开启GOP缓存的情况下为避免播放花屏发送的首个视频帧必须为关键帧 //未开启GOP缓存的情况下为避免播放花屏发送的首个视频帧必须为关键帧
@@ -130,30 +137,26 @@ func (s *SinkImpl) ProtocolStr() string {
return streamTypeToStr(s.Protocol_) return streamTypeToStr(s.Protocol_)
} }
func (s *SinkImpl) Lock() {
s.lock.Lock()
}
func (s *SinkImpl) UnLock() {
s.lock.Unlock()
}
func (s *SinkImpl) State() SessionState { func (s *SinkImpl) State() SessionState {
utils.Assert(!s.lock.TryLock())
return s.State_ return s.State_
} }
func (s *SinkImpl) SetState(state SessionState) bool { func (s *SinkImpl) SetState(state SessionState) {
//load := s.closed.Load() utils.Assert(!s.lock.TryLock())
//if load {
// return false
//}
if s.State_ < SessionStateClose {
s.State_ = state s.State_ = state
} }
//更改状态期间被Close
//if s.closed.CompareAndSwap(false, false)
//{
//
//}
//return !s.closed.Load()
return true
}
func (s *SinkImpl) EnableVideo() bool { func (s *SinkImpl) EnableVideo() bool {
return !s.disableVideo return !s.disableVideo
} }
@@ -170,18 +173,42 @@ func (s *SinkImpl) DesiredVideoCodecId() utils.AVCodecID {
return s.DesiredVideoCodecId_ return s.DesiredVideoCodecId_
} }
// Close 做如下事情:
// 1. Sink如果正在拉流,删除任务交给Source处理. 否则直接从等待队列删除Sink.
// 2. 发送PlayDoneHook事件
// 什么时候调用Close? 是否考虑线程安全?
// 拉流断开连接,不需要考虑线程安全
// 踢流走source管道删除,并且关闭Conn
func (s *SinkImpl) Close() { func (s *SinkImpl) Close() {
//Source的TransStream中删除sink utils.Assert(SessionStateClose != s.State_)
if s.Conn != nil {
s.Conn.Close()
s.Conn = nil
}
//还没有添加到任何队列, 不做任何处理
if s.State_ < SessionStateWait {
return
}
{
s.Lock()
defer s.UnLock()
if s.State_ == SessionStateClose {
return
}
s.State_ = SessionStateClose
}
if s.State_ == SessionStateTransferring { if s.State_ == SessionStateTransferring {
source := SourceManager.Find(s.SourceId_) source := SourceManager.Find(s.SourceId_)
source.AddEvent(SourceEventPlayDone, s) source.AddEvent(SourceEventPlayDone, s)
s.State_ = SessionStateClose
} else if s.State_ == SessionStateWait { } else if s.State_ == SessionStateWait {
//非线程安全
//从等待队列中删除sink
RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_) RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_)
s.State_ = SessionStateClose //拉流结束事件, 在等待队列直接发送通知, 在拉流由Source负责发送.
//s.closed.Store(true) HookPlayingDone(s, nil, nil)
} }
} }

View File

@@ -12,8 +12,17 @@ func HookPlaying(s ISink, success func(), failure func(state utils.HookState)) {
if source == nil { if source == nil {
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId()) log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId())
{
s.Lock()
defer s.UnLock()
if SessionStateClose == s.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", s.PrintInfo())
} else {
s.SetState(SessionStateWait) s.SetState(SessionStateWait)
AddSinkToWaitingQueue(s.SourceId(), s) AddSinkToWaitingQueue(s.SourceId(), s)
}
}
} else { } else {
log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId()) log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId())
@@ -23,23 +32,63 @@ func HookPlaying(s ISink, success func(), failure func(state utils.HookState)) {
if !AppConfig.Hook.EnableOnPlay() { if !AppConfig.Hook.EnableOnPlay() {
f() f()
if success != nil {
success() success()
}
return return
} }
err := hookEvent(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) { err := hookEvent(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) {
f() f()
if success != nil {
success() success()
}
}, func(response *http.Response, err error) { }, func(response *http.Response, err error) {
log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId())
if failure != nil {
failure(utils.HookStateFailure) failure(utils.HookStateFailure)
}
}) })
if err != nil { if err != nil {
log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId())
if failure != nil {
failure(utils.HookStateFailure) failure(utils.HookStateFailure)
}
return
}
}
func HookPlayingDone(s ISink, success func(), failure func(state utils.HookState)) {
if !AppConfig.Hook.EnableOnPlayDone() {
if success != nil {
success()
}
return
}
err := hookEvent(HookEventPlayDone, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) {
if success != nil {
success()
}
}, func(response *http.Response, err error) {
log.Sugar.Errorf("Hook播放结束事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId())
if failure != nil {
failure(utils.HookStateFailure)
}
})
if err != nil {
log.Sugar.Errorf("Hook播放结束事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId())
if failure != nil {
failure(utils.HookStateFailure)
}
return return
} }
} }

View File

@@ -50,13 +50,13 @@ const (
) )
const ( const (
SessionStateCreate = SessionState(1) SessionStateCreate = SessionState(1) //新建状态
SessionStateHandshaking = SessionState(2) SessionStateHandshaking = SessionState(2) //握手中
SessionStateHandshakeFailure = SessionState(3) SessionStateHandshakeFailure = SessionState(3) //握手失败
SessionStateHandshakeDone = SessionState(4) SessionStateHandshakeDone = SessionState(4) //握手完成
SessionStateWait = SessionState(5) SessionStateWait = SessionState(5) //位于等待队列中
SessionStateTransferring = SessionState(6) SessionStateTransferring = SessionState(6) //推拉流中
SessionStateClose = SessionState(7) SessionStateClose = SessionState(7) //关闭状态
) )
func sourceTypeToStr(sourceType SourceType) string { func sourceTypeToStr(sourceType SourceType) string {
@@ -342,14 +342,20 @@ func (s *SourceImpl) AddSink(sink ISink) bool {
} }
sink.SetTransStreamId(transStreamId) sink.SetTransStreamId(transStreamId)
transStream.AddSink(sink)
state := sink.SetState(SessionStateTransferring) {
if !state { sink.Lock()
transStream.RemoveSink(sink.Id()) defer sink.UnLock()
return false
if SessionStateClose == sink.State() {
log.Sugar.Warnf("AddSink失败, sink已经断开链接 %s", sink.PrintInfo())
} else {
transStream.AddSink(sink)
}
sink.SetState(SessionStateTransferring)
} }
//新的传输流,发送缓存的音视频帧
if !ok && AppConfig.GOPCache { if !ok && AppConfig.GOPCache {
s.dispatchStreamBuffer(transStream, streams[:size]) s.dispatchStreamBuffer(transStream, streams[:size])
} }
@@ -364,6 +370,7 @@ func (s *SourceImpl) RemoveSink(sink ISink) bool {
//如果从传输流没能删除sink, 再从等待队列删除 //如果从传输流没能删除sink, 再从等待队列删除
_, b := transStream.RemoveSink(sink.Id()) _, b := transStream.RemoveSink(sink.Id())
if b { if b {
HookPlayingDone(sink, nil, nil)
return true return true
} }
} }
@@ -397,10 +404,16 @@ func (s *SourceImpl) Close() {
for _, transStream := range s.transStreams { for _, transStream := range s.transStreams {
transStream.PopAllSink(func(sink ISink) { transStream.PopAllSink(func(sink ISink) {
sink.SetTransStreamId(0) sink.SetTransStreamId(0)
state := sink.SetState(SessionStateWait) {
if state { sink.Lock()
defer sink.UnLock()
if SessionStateClose == sink.State() {
log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.PrintInfo())
} else {
AddSinkToWaitingQueue(s.Id_, sink) AddSinkToWaitingQueue(s.Id_, sink)
} }
}
}) })
} }