mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-09-27 03:25:56 +08:00

- Refactor frame converter implementation - Update mp4 track to use ICodex - General refactoring and code improvements 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
64 lines
1.3 KiB
Go
64 lines
1.3 KiB
Go
package m7s
|
|
|
|
import (
|
|
"log/slog"
|
|
"time"
|
|
|
|
. "m7s.live/v5/pkg"
|
|
"m7s.live/v5/pkg/util"
|
|
)
|
|
|
|
type WaitManager struct {
|
|
*slog.Logger
|
|
util.Collection[string, *WaitStream]
|
|
}
|
|
|
|
func (w *WaitManager) Wait(subscriber *Subscriber) *WaitStream {
|
|
subscriber.waitStartTime = time.Now()
|
|
if subscriber.Publisher != nil {
|
|
subscriber.Info("publisher gone", "pid", subscriber.Publisher.ID)
|
|
}
|
|
if waiting, ok := w.Get(subscriber.StreamPath); ok {
|
|
waiting.Add(subscriber)
|
|
return waiting
|
|
} else {
|
|
waiting := &WaitStream{
|
|
StreamPath: subscriber.StreamPath,
|
|
}
|
|
w.Set(waiting)
|
|
waiting.Add(subscriber)
|
|
return waiting
|
|
}
|
|
}
|
|
|
|
func (w *WaitManager) WakeUp(streamPath string, publisher *Publisher) {
|
|
if waiting, ok := w.Get(streamPath); ok {
|
|
for subscriber := range waiting.Range {
|
|
publisher.AddSubscriber(subscriber)
|
|
}
|
|
waiting.Clear()
|
|
publisher.OnDispose(func() {
|
|
if waiting.Length == 0 {
|
|
w.Remove(waiting)
|
|
}
|
|
})
|
|
// w.Remove(waiting)
|
|
}
|
|
}
|
|
|
|
func (w *WaitManager) checkTimeout() {
|
|
for waits := range w.Range {
|
|
for sub := range waits.Range {
|
|
if time.Since(sub.waitStartTime) > max(sub.WaitTimeout, sub.BufferTime) {
|
|
sub.Stop(ErrSubscribeTimeout)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *WaitManager) Leave(s *Subscriber) {
|
|
if waitStream, ok := w.Get(s.StreamPath); ok {
|
|
waitStream.Remove(s)
|
|
}
|
|
}
|