diff --git a/stream/hook_source.go b/stream/hook_source.go index b5f9a0a..59b5c85 100644 --- a/stream/hook_source.go +++ b/stream/hook_source.go @@ -32,6 +32,7 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.GetType().String(), source.GetID(), indent) + source.SetState(SessionStateTransferring) return response, utils.HookStateOK } diff --git a/stream/source.go b/stream/source.go index d986a97..825d750 100644 --- a/stream/source.go +++ b/stream/source.go @@ -105,8 +105,7 @@ type PublishSource struct { createTime time.Time // source创建时间 statistics *BitrateStatistics // 码流统计 streamLogger avformat.OnUnpackStream2FileHandler - // streamLock sync.RWMutex - streamLock sync.Mutex + streamLock sync.Mutex // 收流、探测超时、关闭等操作互斥锁 timers struct { receiveTimer *time.Timer // 收流超时计时器 @@ -139,7 +138,7 @@ func (s *PublishSource) SetID(id string) { } func (s *PublishSource) Init() { - s.SetState(SessionStateHandshakeSuccess) + s.SetState(SessionStateCreated) s.statistics = NewBitrateStatistics() s.streamPublisher = NewTransStreamPublisher(s.ID) @@ -180,6 +179,7 @@ func (s *PublishSource) SetState(state SessionState) { func (s *PublishSource) DoClose() { log.Sugar.Debugf("closing the %s source. id: %s. closed flag: %t", s.Type, s.ID, s.closed.Load()) + // 已关闭, 直接返回 if s.closed.Load() { return } @@ -189,6 +189,7 @@ func (s *PublishSource) DoClose() { closed = s.closed.Swap(true) }) + // 已关闭, 直接返回 if closed { return } @@ -216,9 +217,16 @@ func (s *PublishSource) DoClose() { s.clearUnusedPackets(track.Packets) } - // 等传输流发布器关闭结束 + // 停止发布输出流 + // 同步执行 s.streamPublisher.close() + // 只释放prepare成功的source, 否则在关闭失败的source时, 造成id相同的source被错误释放 + if s.state < SessionStateTransferring { + return + } + + s.state = SessionStateClosed // 释放解复用器 // 释放转码器 // 释放每路转协议流, 将所有sink添加到等待队列