Files
engine/subscribers.go
2024-02-20 18:08:21 +08:00

176 lines
3.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"encoding/json"
"go.uber.org/zap"
"m7s.live/engine/v4/common"
"m7s.live/engine/v4/config"
)
type Subscribers struct {
public map[ISubscriber]*waitTracks
internal map[ISubscriber]*waitTracks
waits map[*waitTracks]ISubscriber
waitAborted bool // 不再等待了
}
func (s *Subscribers) Init() {
s.public = make(map[ISubscriber]*waitTracks)
s.internal = make(map[ISubscriber]*waitTracks)
s.waits = make(map[*waitTracks]ISubscriber)
}
func (s *Subscribers) MarshalJSON() ([]byte, error) {
var subers []ISubscriber
for suber := range s.public {
subers = append(subers, suber)
}
return json.Marshal(subers)
}
func (s *Subscribers) Broadcast(event any) {
for sub := range s.internal {
sub.OnEvent(event)
}
for sub := range s.public {
sub.OnEvent(event)
}
}
func (s *Subscribers) Pick() ISubscriber {
for sub := range s.public {
return sub
}
return nil
}
func (s *Subscribers) Len() int {
return len(s.public)
}
func (s *Subscribers) RangeAll(f func(sub ISubscriber)) {
s.rangeAll(func(sub ISubscriber, wait *waitTracks) {
f(sub)
})
}
func (s *Subscribers) rangeAll(f func(sub ISubscriber, wait *waitTracks)) {
for sub, wait := range s.internal {
f(sub, wait)
}
for sub, wait := range s.public {
f(sub, wait)
}
}
func (s *Subscribers) OnTrack(track common.Track) {
s.rangeAll(func(sub ISubscriber, wait *waitTracks) {
if _, ok := s.waits[wait]; ok {
if wait.Accept(track) {
delete(s.waits, wait)
}
} else {
sub.OnEvent(track)
}
})
}
func (s *Subscribers) OnPublisherLost(event StateEvent) {
s.rangeAll(func(sub ISubscriber, wait *waitTracks) {
if _, ok := s.waits[wait]; ok {
wait.Reject(ErrPublisherLost)
delete(s.waits, wait)
}
sub.OnEvent(event)
})
}
// SendInviteTrack 广播需要的 Track转码插件可以用到
func (s *Subscribers) SendInviteTrack(stream *Stream) {
var video = map[string]ISubscriber{}
var audio = map[string]ISubscriber{}
for wait, suber := range s.waits {
for _, name := range wait.video {
video[name] = suber
}
for _, name := range wait.audio {
audio[name] = suber
}
}
for v, suber := range video {
InviteTrack(v, suber)
}
for a, suber := range audio {
InviteTrack(a, suber)
}
}
func (s *Subscribers) AbortWait() {
s.waitAborted = true
for wait := range s.waits {
wait.Resolve()
delete(s.waits, wait)
}
}
func (s *Subscribers) Find(id string) ISubscriber {
for sub := range s.public {
if sub.GetSubscriber().ID == id {
return sub
}
}
return nil
}
func (s *Subscribers) Delete(suber ISubscriber) {
io := suber.GetSubscriber()
io.TrackPlayer.Audio = nil
io.TrackPlayer.AudioReader = nil
io.TrackPlayer.Video = nil
io.TrackPlayer.VideoReader = nil
for _, reader := range io.readers {
reader.Track.Debug("reader -1", zap.Int32("count", reader.Track.ReaderCount.Add(-1)))
}
io.readers = nil
if _, ok := s.public[suber]; ok {
delete(s.public, suber)
io.Info("suber -1", zap.Int("remains", s.Len()))
}
if _, ok := s.internal[suber]; ok {
delete(s.internal, suber)
io.Info("innersuber -1", zap.Int("remains", len(s.internal)))
}
if config.Global.EnableSubEvent {
EventBus <- UnsubscribeEvent{CreateEvent(suber)}
}
}
func (s *Subscribers) Add(suber ISubscriber, wait *waitTracks) {
io := suber.GetSubscriber()
if io.Config.Internal {
s.internal[suber] = wait
io.Info("innersuber +1", zap.Int("remains", len(s.internal)))
} else {
s.public[suber] = wait
io.Info("suber +1", zap.Int("remains", s.Len()))
if config.Global.EnableSubEvent {
EventBus <- suber
}
}
if wait.NeedWait() {
s.waits[wait] = suber
} else {
wait.Resolve()
}
}
func (s *Subscribers) Dispose() {
for w := range s.waits {
w.Reject(ErrStreamIsClosed)
}
s.waits = nil
s.public = nil
s.internal = nil
}