feat: add a state

This commit is contained in:
langhuihui
2023-09-10 18:28:15 +08:00
parent 7ff64a914e
commit 8f7ff8d270
4 changed files with 59 additions and 31 deletions

View File

@@ -53,6 +53,10 @@ type SEpublish struct {
StateEvent
}
type SEtrackAvaliable struct {
StateEvent
}
type SErepublish struct {
StateEvent
}

View File

@@ -37,7 +37,7 @@ func (p *Publisher) GetPublisher() *Publisher {
func (p *Publisher) Stop(reason ...zapcore.Field) {
p.IO.Stop(reason...)
p.Stream.Receive(ACTION_PUBLISHLOST)
p.Stream.Receive(ACTION_PUBLISHCLOSE)
}
func (p *Publisher) getAudioTrack() common.AudioTrack {

View File

@@ -16,6 +16,7 @@ var znomorereconnect = zap.String("reason", "no more reconnect")
type IPuller interface {
IPublisher
Connect() error
OnConnected()
Disconnect()
Pull() error
Reconnect() bool
@@ -28,6 +29,10 @@ type Puller struct {
ClientIO[config.Pull]
}
func (pub *Puller) OnConnected() {
pub.ReConnectCount = 0 // 重置重连次数
}
// 是否需要重连
func (pub *Puller) Reconnect() (ok bool) {
ok = pub.Config.RePull == -1 || pub.ReConnectCount <= pub.Config.RePull

View File

@@ -30,56 +30,70 @@ func (s StreamAction) String() string {
// 四状态机
const (
STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
STATE_WAITTRACK // 等待音视频轨道激活
STATE_PUBLISHING // 正在发布流状态
STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启)
STATE_CLOSED // 流已关闭,不可使用
)
const (
ACTION_PUBLISH StreamAction = iota
ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到
ACTION_PUBLISHLOST // 发布者意外断开
ACTION_CLOSE // 主动关闭
ACTION_LASTLEAVE // 最后一个订阅者离开
ACTION_FIRSTENTER // 一个订阅者进入
ACTION_PUBLISH StreamAction = iota
ACTION_TRACKAVAILABLE // 音视频轨道激活
ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到
ACTION_PUBLISHCLOSE // 发布者关闭
ACTION_CLOSE // 主动关闭流
ACTION_LASTLEAVE // 最后一个订阅者离开
ACTION_FIRSTENTER // 第一个订阅者进入
ACTION_NOTRACK // 没有音视频轨道
)
var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴"}
var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"}
var StateNames = [...]string{"⌛", "🟡", "🟢", "🟠", "🔴"}
var ActionNames = [...]string{"publish", "track available", "timeout", "publish close", "close", "last leave", "first enter", "no tracks"}
/*
stateDiagram-v2
[*] --> ⌛等待发布者 : 创建
⌛等待发布者 --> 🟢正在发布 :发布
⌛等待发布者 --> 🟡等待轨道 :发布
⌛等待发布者 --> 🔴已关闭 :关闭
⌛等待发布者 --> 🔴已关闭 :超时
⌛等待发布者 --> 🔴已关闭 :最后订阅者离开
🟡等待轨道 --> 🟢正在发布 :轨道激活
🟡等待轨道 --> 🔴已关闭 :关闭
🟡等待轨道 --> 🔴已关闭 :超时
🟡等待轨道 --> 🔴已关闭 :最后订阅者离开
🟢正在发布 --> ⌛等待发布者: 发布者断开
🟢正在发布 --> 🟡等待关闭: 最后订阅者离开
🟢正在发布 --> 🟠等待关闭: 最后订阅者离开
🟢正在发布 --> 🔴已关闭 :关闭
🟡等待关闭 --> 🟢正在发布 :第一个订阅者进入
🟡等待关闭 --> 🔴已关闭 :关闭
🟡等待关闭 --> 🔴已关闭 :超时
🟡等待关闭 --> 🔴已关闭 :发布者断开
🟠等待关闭 --> 🟢正在发布 :第一个订阅者进入
🟠等待关闭 --> 🔴已关闭 :关闭
🟠等待关闭 --> 🔴已关闭 :超时
🟠等待关闭 --> 🔴已关闭 :发布者断开
*/
var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{
{
ACTION_PUBLISH: STATE_PUBLISHING,
ACTION_PUBLISH: STATE_WAITTRACK,
ACTION_TIMEOUT: STATE_CLOSED,
ACTION_LASTLEAVE: STATE_CLOSED,
ACTION_CLOSE: STATE_CLOSED,
},
{
ACTION_PUBLISHLOST: STATE_WAITPUBLISH,
ACTION_LASTLEAVE: STATE_WAITCLOSE,
ACTION_CLOSE: STATE_CLOSED,
ACTION_TRACKAVAILABLE: STATE_PUBLISHING,
ACTION_TIMEOUT: STATE_CLOSED,
ACTION_LASTLEAVE: STATE_CLOSED,
ACTION_CLOSE: STATE_CLOSED,
},
{
ACTION_PUBLISHLOST: STATE_CLOSED,
ACTION_TIMEOUT: STATE_CLOSED,
ACTION_FIRSTENTER: STATE_PUBLISHING,
ACTION_CLOSE: STATE_CLOSED,
ACTION_PUBLISHCLOSE: STATE_WAITPUBLISH,
ACTION_TIMEOUT: STATE_WAITPUBLISH,
ACTION_LASTLEAVE: STATE_WAITCLOSE,
ACTION_CLOSE: STATE_CLOSED,
},
{
ACTION_PUBLISHCLOSE: STATE_CLOSED,
ACTION_TIMEOUT: STATE_CLOSED,
ACTION_FIRSTENTER: STATE_PUBLISHING,
ACTION_CLOSE: STATE_CLOSED,
},
{},
}
@@ -305,18 +319,20 @@ func (r *Stream) action(action StreamAction) (ok bool) {
}
r.timeout.Reset(waitTime)
r.Debug("wait publisher", zap.Duration("wait timeout", waitTime))
case STATE_PUBLISHING:
case STATE_WAITTRACK:
if len(r.SEHistory) > 1 {
stateEvent = SErepublish{event}
} else {
stateEvent = SEpublish{event}
}
r.timeout.Reset(time.Second * 5) // 5秒心跳检测track的存活度
case STATE_PUBLISHING:
stateEvent = SEtrackAvaliable{event}
r.Subscribers.Broadcast(stateEvent)
// if r.IdleTimeout > 0 && r.Subscribers.Len() == 0 {
// return r.action(ACTION_LASTLEAVE)
// } else {
r.timeout.Reset(r.PublishTimeout) // 5秒心跳检测track的存活度
// }
if puller, ok := r.Publisher.(IPuller); ok {
puller.OnConnected()
}
r.timeout.Reset(r.PublishTimeout) // 检测track的存活度
case STATE_WAITCLOSE:
stateEvent = SEwaitClose{event}
if r.IdleTimeout > 0 {
@@ -414,7 +430,7 @@ func (s *Stream) run() {
case <-s.timeout.C:
timeStart = time.Now()
timeOutInfo = zap.String("state", s.State.String())
if s.State == STATE_PUBLISHING {
if s.State == STATE_PUBLISHING || s.State == STATE_WAITTRACK {
for sub := range s.Subscribers.internal {
if sub.IsClosed() {
delete(s.Subscribers.internal, sub)
@@ -454,7 +470,7 @@ func (s *Stream) run() {
}
}
if lost {
s.action(ACTION_PUBLISHLOST)
s.action(ACTION_TIMEOUT)
continue
}
if s.IdleTimeout > 0 && s.Subscribers.Len() == 0 && time.Since(s.StartTime) > s.IdleTimeout {
@@ -462,6 +478,9 @@ func (s *Stream) run() {
continue
}
}
if s.State == STATE_WAITTRACK {
s.action(ACTION_TRACKAVAILABLE)
}
s.timeout.Reset(time.Second * 5)
//订阅者等待音视频轨道超时了,放弃等待,订阅成功
s.Subscribers.AbortWait()