Files
monibuca/wait-stream.go
2024-10-22 08:25:55 +08:00

56 lines
1.0 KiB
Go

package m7s
import (
"log/slog"
. "m7s.live/pro/pkg"
"m7s.live/pro/pkg/util"
)
type WaitManager struct {
*slog.Logger
util.Collection[string, *WaitStream]
}
func (w *WaitManager) Wait(subscriber *Subscriber) *WaitStream {
subscriber.Publisher = nil
if waiting, ok := w.Get(subscriber.StreamPath); ok {
waiting.Add(subscriber)
return waiting
} else {
waiting := &WaitStream{
StreamPath: subscriber.StreamPath,
}
w.Set(waiting)
waiting.Add(subscriber)
return waiting
}
}
func (w *WaitManager) WakeUp(streamPath string, publisher *Publisher) {
if waiting, ok := w.Get(streamPath); ok {
for subscriber := range waiting.Range {
publisher.AddSubscriber(subscriber)
}
w.Remove(waiting)
}
}
func (w *WaitManager) checkTimeout() {
for waits := range w.Range {
for sub := range waits.Range {
select {
case <-sub.TimeoutTimer.C:
sub.Stop(ErrSubscribeTimeout)
default:
}
}
}
}
func (w *WaitManager) Leave(s *Subscriber) {
if waitStream, ok := w.Get(s.StreamPath); ok {
waitStream.Remove(s)
}
}