From f231a0cd71c1f2ac54228c5adf26cd27eae40deb Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Thu, 29 Aug 2024 19:51:03 +0800 Subject: [PATCH] fix: delayclosetimeout not work --- stream.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/stream.go b/stream.go index b78181a..397d620 100644 --- a/stream.go +++ b/stream.go @@ -295,6 +295,13 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream } } +func (r *Stream) resetTimer(dur time.Duration) { + if r.timeout != nil { + r.Warn("reset timer", zap.Duration("timeout", dur)) + r.timeout.Reset(dur) + } +} + func (r *Stream) action(action StreamAction) (ok bool) { var event StateEvent event.Target = r @@ -327,7 +334,7 @@ func (r *Stream) action(action StreamAction) (ok bool) { } else if waitTime == 0 { waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流 } - r.timeout.Reset(waitTime) + r.resetTimer(waitTime) r.Debug("wait publisher", zap.Duration("wait timeout", waitTime)) case STATE_WAITTRACK: if len(r.SEHistory) > 1 { @@ -335,7 +342,7 @@ func (r *Stream) action(action StreamAction) (ok bool) { } else { stateEvent = SEpublish{event} } - r.timeout.Reset(time.Second * 5) // 5秒心跳,检测track的存活度 + r.resetTimer(time.Second * 5) // 5秒心跳,检测track的存活度 case STATE_PUBLISHING: stateEvent = SEtrackAvaliable{event} r.Subscribers.SendInviteTrack(r) @@ -343,13 +350,13 @@ func (r *Stream) action(action StreamAction) (ok bool) { if puller, ok := r.Publisher.(IPuller); ok { puller.OnConnected() } - r.timeout.Reset(time.Second * 5) // 5秒心跳,检测track的存活度 + r.resetTimer(time.Second * 5) // 5秒心跳,检测track的存活度 case STATE_WAITCLOSE: stateEvent = SEwaitClose{event} if r.IdleTimeout > 0 { - r.timeout.Reset(r.IdleTimeout) + r.resetTimer(r.IdleTimeout) } else { - r.timeout.Reset(r.DelayCloseTimeout) + r.resetTimer(r.DelayCloseTimeout) } case STATE_CLOSED: Streams.Delete(r.Path) @@ -508,11 +515,14 @@ func (s *Stream) run() { continue } } - if s.State == STATE_WAITTRACK { - s.action(ACTION_TRACKAVAILABLE) + switch s.State { + case STATE_WAITTRACK: + s.action(ACTION_TRACKAVAILABLE) + case STATE_WAITCLOSE: + continue } s.Subscribers.AbortWait() - s.timeout.Reset(time.Second * 5) + s.resetTimer(time.Second * 5) } else { s.Debug("timeout", timeOutInfo) s.action(ACTION_TIMEOUT)