diff --git a/events.go b/events.go index 3eb92ef..adecb06 100644 --- a/events.go +++ b/events.go @@ -53,6 +53,10 @@ type SEpublish struct { StateEvent } +type SEtrackAvaliable struct { + StateEvent +} + type SErepublish struct { StateEvent } diff --git a/publisher.go b/publisher.go index 1044425..b41e655 100644 --- a/publisher.go +++ b/publisher.go @@ -37,7 +37,7 @@ func (p *Publisher) GetPublisher() *Publisher { func (p *Publisher) Stop(reason ...zapcore.Field) { p.IO.Stop(reason...) - p.Stream.Receive(ACTION_PUBLISHLOST) + p.Stream.Receive(ACTION_PUBLISHCLOSE) } func (p *Publisher) getAudioTrack() common.AudioTrack { diff --git a/puller.go b/puller.go index fbd8fc7..a70e2f0 100644 --- a/puller.go +++ b/puller.go @@ -16,6 +16,7 @@ var znomorereconnect = zap.String("reason", "no more reconnect") type IPuller interface { IPublisher Connect() error + OnConnected() Disconnect() Pull() error Reconnect() bool @@ -28,6 +29,10 @@ type Puller struct { ClientIO[config.Pull] } +func (pub *Puller) OnConnected() { + pub.ReConnectCount = 0 // 重置重连次数 +} + // 是否需要重连 func (pub *Puller) Reconnect() (ok bool) { ok = pub.Config.RePull == -1 || pub.ReConnectCount <= pub.Config.RePull diff --git a/stream.go b/stream.go index 74ab0cc..a9f76ae 100644 --- a/stream.go +++ b/stream.go @@ -30,56 +30,70 @@ func (s StreamAction) String() string { // 四状态机 const ( STATE_WAITPUBLISH StreamState = iota // 等待发布者状态 + STATE_WAITTRACK // 等待音视频轨道激活 STATE_PUBLISHING // 正在发布流状态 STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启) STATE_CLOSED // 流已关闭,不可使用 ) const ( - ACTION_PUBLISH StreamAction = iota - ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到 - ACTION_PUBLISHLOST // 发布者意外断开 - ACTION_CLOSE // 主动关闭流 - ACTION_LASTLEAVE // 最后一个订阅者离开 - ACTION_FIRSTENTER // 第一个订阅者进入 + ACTION_PUBLISH StreamAction = iota + ACTION_TRACKAVAILABLE // 音视频轨道激活 + ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到 + ACTION_PUBLISHCLOSE // 发布者关闭 + ACTION_CLOSE // 主动关闭流 + ACTION_LASTLEAVE // 最后一个订阅者离开 + ACTION_FIRSTENTER // 第一个订阅者进入 + ACTION_NOTRACK // 没有音视频轨道 ) -var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴"} -var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"} +var StateNames = [...]string{"⌛", "🟡", "🟢", "🟠", "🔴"} +var ActionNames = [...]string{"publish", "track available", "timeout", "publish close", "close", "last leave", "first enter", "no tracks"} /* stateDiagram-v2 [*] --> ⌛等待发布者 : 创建 - ⌛等待发布者 --> 🟢正在发布 :发布 + ⌛等待发布者 --> 🟡等待轨道 :发布 ⌛等待发布者 --> 🔴已关闭 :关闭 ⌛等待发布者 --> 🔴已关闭 :超时 ⌛等待发布者 --> 🔴已关闭 :最后订阅者离开 + 🟡等待轨道 --> 🟢正在发布 :轨道激活 + 🟡等待轨道 --> 🔴已关闭 :关闭 + 🟡等待轨道 --> 🔴已关闭 :超时 + 🟡等待轨道 --> 🔴已关闭 :最后订阅者离开 🟢正在发布 --> ⌛等待发布者: 发布者断开 - 🟢正在发布 --> 🟡等待关闭: 最后订阅者离开 + 🟢正在发布 --> 🟠等待关闭: 最后订阅者离开 🟢正在发布 --> 🔴已关闭 :关闭 - 🟡等待关闭 --> 🟢正在发布 :第一个订阅者进入 - 🟡等待关闭 --> 🔴已关闭 :关闭 - 🟡等待关闭 --> 🔴已关闭 :超时 - 🟡等待关闭 --> 🔴已关闭 :发布者断开 + 🟠等待关闭 --> 🟢正在发布 :第一个订阅者进入 + 🟠等待关闭 --> 🔴已关闭 :关闭 + 🟠等待关闭 --> 🔴已关闭 :超时 + 🟠等待关闭 --> 🔴已关闭 :发布者断开 */ var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{ { - ACTION_PUBLISH: STATE_PUBLISHING, + ACTION_PUBLISH: STATE_WAITTRACK, ACTION_TIMEOUT: STATE_CLOSED, ACTION_LASTLEAVE: STATE_CLOSED, ACTION_CLOSE: STATE_CLOSED, }, { - ACTION_PUBLISHLOST: STATE_WAITPUBLISH, - ACTION_LASTLEAVE: STATE_WAITCLOSE, - ACTION_CLOSE: STATE_CLOSED, + ACTION_TRACKAVAILABLE: STATE_PUBLISHING, + ACTION_TIMEOUT: STATE_CLOSED, + ACTION_LASTLEAVE: STATE_CLOSED, + ACTION_CLOSE: STATE_CLOSED, }, { - ACTION_PUBLISHLOST: STATE_CLOSED, - ACTION_TIMEOUT: STATE_CLOSED, - ACTION_FIRSTENTER: STATE_PUBLISHING, - ACTION_CLOSE: STATE_CLOSED, + ACTION_PUBLISHCLOSE: STATE_WAITPUBLISH, + ACTION_TIMEOUT: STATE_WAITPUBLISH, + ACTION_LASTLEAVE: STATE_WAITCLOSE, + ACTION_CLOSE: STATE_CLOSED, + }, + { + ACTION_PUBLISHCLOSE: STATE_CLOSED, + ACTION_TIMEOUT: STATE_CLOSED, + ACTION_FIRSTENTER: STATE_PUBLISHING, + ACTION_CLOSE: STATE_CLOSED, }, {}, } @@ -305,18 +319,20 @@ func (r *Stream) action(action StreamAction) (ok bool) { } r.timeout.Reset(waitTime) r.Debug("wait publisher", zap.Duration("wait timeout", waitTime)) - case STATE_PUBLISHING: + case STATE_WAITTRACK: if len(r.SEHistory) > 1 { stateEvent = SErepublish{event} } else { stateEvent = SEpublish{event} } + r.timeout.Reset(time.Second * 5) // 5秒心跳,检测track的存活度 + case STATE_PUBLISHING: + stateEvent = SEtrackAvaliable{event} r.Subscribers.Broadcast(stateEvent) - // if r.IdleTimeout > 0 && r.Subscribers.Len() == 0 { - // return r.action(ACTION_LASTLEAVE) - // } else { - r.timeout.Reset(r.PublishTimeout) // 5秒心跳,检测track的存活度 - // } + if puller, ok := r.Publisher.(IPuller); ok { + puller.OnConnected() + } + r.timeout.Reset(r.PublishTimeout) // 检测track的存活度 case STATE_WAITCLOSE: stateEvent = SEwaitClose{event} if r.IdleTimeout > 0 { @@ -414,7 +430,7 @@ func (s *Stream) run() { case <-s.timeout.C: timeStart = time.Now() timeOutInfo = zap.String("state", s.State.String()) - if s.State == STATE_PUBLISHING { + if s.State == STATE_PUBLISHING || s.State == STATE_WAITTRACK { for sub := range s.Subscribers.internal { if sub.IsClosed() { delete(s.Subscribers.internal, sub) @@ -454,7 +470,7 @@ func (s *Stream) run() { } } if lost { - s.action(ACTION_PUBLISHLOST) + s.action(ACTION_TIMEOUT) continue } if s.IdleTimeout > 0 && s.Subscribers.Len() == 0 && time.Since(s.StartTime) > s.IdleTimeout { @@ -462,6 +478,9 @@ func (s *Stream) run() { continue } } + if s.State == STATE_WAITTRACK { + s.action(ACTION_TRACKAVAILABLE) + } s.timeout.Reset(time.Second * 5) //订阅者等待音视频轨道超时了,放弃等待,订阅成功 s.Subscribers.AbortWait()