diff --git a/api.go b/api.go index 59ebb09..90ac493 100644 --- a/api.go +++ b/api.go @@ -188,6 +188,7 @@ func (api *ApiServer) onFLV(sourceId string, w http.ResponseWriter, r *http.Requ for { if _, err := conn.Read(bytes); err != nil { log.Sugar.Infof("http-flv 断开连接 sink:%s", sink.PrintInfo()) + sink.Close() break } } diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index 56c93dd..d78cb95 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -3,6 +3,7 @@ package rtmp import ( "github.com/yangjiechina/avformat/libflv" "github.com/yangjiechina/avformat/librtmp" + "github.com/yangjiechina/avformat/transport" "github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/live-server/stream" "net" @@ -43,7 +44,8 @@ func NewPublisher(sourceId string, stack *librtmp.Stack, conn net.Conn) Publishe //设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl deMuxer.SetHandler(publisher_) publisher_.Input_ = publisher_.Input - + //为推流方分配足够多的缓冲区 + conn.(*transport.Conn).ReallocateRecvBuffer(1024 * 1024) return publisher_ } diff --git a/rtmp/rtmp_server.go b/rtmp/rtmp_server.go index 7de1525..96f0c81 100644 --- a/rtmp/rtmp_server.go +++ b/rtmp/rtmp_server.go @@ -65,6 +65,8 @@ func (s *serverImpl) OnDisConnected(conn net.Conn, err error) { log.Sugar.Debugf("rtmp断开连接 conn:%s", conn.RemoteAddr().String()) t := conn.(*transport.Conn) - t.Data.(*sessionImpl).Close() - t.Data = nil + if t.Data != nil { + t.Data.(*sessionImpl).Close() + t.Data = nil + } } diff --git a/stream/sink.go b/stream/sink.go index a3be9e2..434fe55 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/yangjiechina/avformat/utils" "net" + "sync" ) type SinkId interface{} @@ -26,9 +27,11 @@ type ISink interface { ProtocolStr() string + // State 获取Sink状态, 调用前外部必须手动加锁 State() SessionState - SetState(state SessionState) bool + // SetState 设置Sink状态, 调用前外部必须手动加锁 + SetState(state SessionState) EnableVideo() bool @@ -45,6 +48,13 @@ type ISink interface { Close() PrintInfo() string + + // Lock Sink请求拉流->Source推流->Sink断开整个阶段, 是无锁线程安全 + //如果Sink在等待队列-Sink断开, 这个过程是非线程安全的 + //所以Source在AddSink时, SessionStateWait状态时, 需要加锁保护. + Lock() + + UnLock() } // GenerateSinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String @@ -77,10 +87,7 @@ type SinkImpl struct { TransStreamId_ TransStreamId disableVideo bool - //Sink在请求拉流->Source推流->Sink断开整个阶段 是无锁线程安全 - //如果Sink在等待队列-Sink断开,这个过程是非线程安全的 - //SetState的时候,如果closed为true,返回false, 调用者自行删除sink - //closed atomic.Bool + lock sync.RWMutex //HasSentKeyVideo 是否已经发送视频关键帧 //未开启GOP缓存的情况下,为避免播放花屏,发送的首个视频帧必须为关键帧 @@ -130,28 +137,24 @@ func (s *SinkImpl) ProtocolStr() string { return streamTypeToStr(s.Protocol_) } +func (s *SinkImpl) Lock() { + s.lock.Lock() +} + +func (s *SinkImpl) UnLock() { + s.lock.Unlock() +} + func (s *SinkImpl) State() SessionState { + utils.Assert(!s.lock.TryLock()) + return s.State_ } -func (s *SinkImpl) SetState(state SessionState) bool { - //load := s.closed.Load() - //if load { - // return false - //} +func (s *SinkImpl) SetState(state SessionState) { + utils.Assert(!s.lock.TryLock()) - if s.State_ < SessionStateClose { - s.State_ = state - } - - //更改状态期间,被Close - //if s.closed.CompareAndSwap(false, false) - //{ - // - //} - - //return !s.closed.Load() - return true + s.State_ = state } func (s *SinkImpl) EnableVideo() bool { @@ -170,18 +173,42 @@ func (s *SinkImpl) DesiredVideoCodecId() utils.AVCodecID { return s.DesiredVideoCodecId_ } +// Close 做如下事情: +// 1. Sink如果正在拉流,删除任务交给Source处理. 否则直接从等待队列删除Sink. +// 2. 发送PlayDoneHook事件 +// 什么时候调用Close? 是否考虑线程安全? +// 拉流断开连接,不需要考虑线程安全 +// 踢流走source管道删除,并且关闭Conn func (s *SinkImpl) Close() { - //Source的TransStream中删除sink + utils.Assert(SessionStateClose != s.State_) + + if s.Conn != nil { + s.Conn.Close() + s.Conn = nil + } + + //还没有添加到任何队列, 不做任何处理 + if s.State_ < SessionStateWait { + return + } + + { + s.Lock() + defer s.UnLock() + if s.State_ == SessionStateClose { + return + } + + s.State_ = SessionStateClose + } + if s.State_ == SessionStateTransferring { source := SourceManager.Find(s.SourceId_) source.AddEvent(SourceEventPlayDone, s) - s.State_ = SessionStateClose } else if s.State_ == SessionStateWait { - //非线程安全 - //从等待队列中删除sink RemoveSinkFromWaitingQueue(s.SourceId_, s.Id_) - s.State_ = SessionStateClose - //s.closed.Store(true) + //拉流结束事件, 在等待队列直接发送通知, 在拉流由Source负责发送. + HookPlayingDone(s, nil, nil) } } diff --git a/stream/sink_hook.go b/stream/sink_hook.go index d95008c..5ee2870 100644 --- a/stream/sink_hook.go +++ b/stream/sink_hook.go @@ -12,8 +12,17 @@ func HookPlaying(s ISink, success func(), failure func(state utils.HookState)) { if source == nil { log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId()) - s.SetState(SessionStateWait) - AddSinkToWaitingQueue(s.SourceId(), s) + { + s.Lock() + defer s.UnLock() + + if SessionStateClose == s.State() { + log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", s.PrintInfo()) + } else { + s.SetState(SessionStateWait) + AddSinkToWaitingQueue(s.SourceId(), s) + } + } } else { log.Sugar.Debugf("发送播放事件 sink:%s-%v source:%s", s.ProtocolStr(), s.Id(), s.SourceId()) @@ -23,23 +32,63 @@ func HookPlaying(s ISink, success func(), failure func(state utils.HookState)) { if !AppConfig.Hook.EnableOnPlay() { f() - success() + + if success != nil { + success() + } return } err := hookEvent(HookEventPlay, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) { f() - success() + + if success != nil { + success() + } }, func(response *http.Response, err error) { log.Sugar.Errorf("Hook播放事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) - failure(utils.HookStateFailure) + if failure != nil { + failure(utils.HookStateFailure) + } }) if err != nil { log.Sugar.Errorf("Hook播放事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) - failure(utils.HookStateFailure) + if failure != nil { + failure(utils.HookStateFailure) + } + return + } +} + +func HookPlayingDone(s ISink, success func(), failure func(state utils.HookState)) { + if !AppConfig.Hook.EnableOnPlayDone() { + if success != nil { + success() + } + return + } + + err := hookEvent(HookEventPlayDone, NewPlayHookEventInfo(s.SourceId(), "", s.Protocol()), func(response *http.Response) { + if success != nil { + success() + } + }, func(response *http.Response, err error) { + log.Sugar.Errorf("Hook播放结束事件响应失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) + + if failure != nil { + failure(utils.HookStateFailure) + } + }) + + if err != nil { + log.Sugar.Errorf("Hook播放结束事件发送失败 err:%s sink:%s-%v source:%s", err.Error(), s.ProtocolStr(), s.Id(), s.SourceId()) + + if failure != nil { + failure(utils.HookStateFailure) + } return } } diff --git a/stream/source.go b/stream/source.go index 39b6290..42519ba 100644 --- a/stream/source.go +++ b/stream/source.go @@ -50,13 +50,13 @@ const ( ) const ( - SessionStateCreate = SessionState(1) - SessionStateHandshaking = SessionState(2) - SessionStateHandshakeFailure = SessionState(3) - SessionStateHandshakeDone = SessionState(4) - SessionStateWait = SessionState(5) - SessionStateTransferring = SessionState(6) - SessionStateClose = SessionState(7) + SessionStateCreate = SessionState(1) //新建状态 + SessionStateHandshaking = SessionState(2) //握手中 + SessionStateHandshakeFailure = SessionState(3) //握手失败 + SessionStateHandshakeDone = SessionState(4) //握手完成 + SessionStateWait = SessionState(5) //位于等待队列中 + SessionStateTransferring = SessionState(6) //推拉流中 + SessionStateClose = SessionState(7) //关闭状态 ) func sourceTypeToStr(sourceType SourceType) string { @@ -342,14 +342,20 @@ func (s *SourceImpl) AddSink(sink ISink) bool { } sink.SetTransStreamId(transStreamId) - transStream.AddSink(sink) - state := sink.SetState(SessionStateTransferring) - if !state { - transStream.RemoveSink(sink.Id()) - return false + { + sink.Lock() + defer sink.UnLock() + + if SessionStateClose == sink.State() { + log.Sugar.Warnf("AddSink失败, sink已经断开链接 %s", sink.PrintInfo()) + } else { + transStream.AddSink(sink) + } + sink.SetState(SessionStateTransferring) } + //新的传输流,发送缓存的音视频帧 if !ok && AppConfig.GOPCache { s.dispatchStreamBuffer(transStream, streams[:size]) } @@ -364,6 +370,7 @@ func (s *SourceImpl) RemoveSink(sink ISink) bool { //如果从传输流没能删除sink, 再从等待队列删除 _, b := transStream.RemoveSink(sink.Id()) if b { + HookPlayingDone(sink, nil, nil) return true } } @@ -397,9 +404,15 @@ func (s *SourceImpl) Close() { for _, transStream := range s.transStreams { transStream.PopAllSink(func(sink ISink) { sink.SetTransStreamId(0) - state := sink.SetState(SessionStateWait) - if state { - AddSinkToWaitingQueue(s.Id_, sink) + { + sink.Lock() + defer sink.UnLock() + + if SessionStateClose == sink.State() { + log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开链接 %s", sink.PrintInfo()) + } else { + AddSinkToWaitingQueue(s.Id_, sink) + } } }) }