mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-10-05 11:26:52 +08:00
56 lines
1.0 KiB
Go
56 lines
1.0 KiB
Go
package m7s
|
|
|
|
import (
|
|
"log/slog"
|
|
|
|
. "m7s.live/pro/pkg"
|
|
"m7s.live/pro/pkg/util"
|
|
)
|
|
|
|
type WaitManager struct {
|
|
*slog.Logger
|
|
util.Collection[string, *WaitStream]
|
|
}
|
|
|
|
func (w *WaitManager) Wait(subscriber *Subscriber) *WaitStream {
|
|
subscriber.Publisher = nil
|
|
if waiting, ok := w.Get(subscriber.StreamPath); ok {
|
|
waiting.Add(subscriber)
|
|
return waiting
|
|
} else {
|
|
waiting := &WaitStream{
|
|
StreamPath: subscriber.StreamPath,
|
|
}
|
|
w.Set(waiting)
|
|
waiting.Add(subscriber)
|
|
return waiting
|
|
}
|
|
}
|
|
|
|
func (w *WaitManager) WakeUp(streamPath string, publisher *Publisher) {
|
|
if waiting, ok := w.Get(streamPath); ok {
|
|
for subscriber := range waiting.Range {
|
|
publisher.AddSubscriber(subscriber)
|
|
}
|
|
w.Remove(waiting)
|
|
}
|
|
}
|
|
|
|
func (w *WaitManager) checkTimeout() {
|
|
for waits := range w.Range {
|
|
for sub := range waits.Range {
|
|
select {
|
|
case <-sub.TimeoutTimer.C:
|
|
sub.Stop(ErrSubscribeTimeout)
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *WaitManager) Leave(s *Subscriber) {
|
|
if waitStream, ok := w.Get(s.StreamPath); ok {
|
|
waitStream.Remove(s)
|
|
}
|
|
}
|