diff --git a/common/index.go b/common/index.go index 8a37bac..4c27e2c 100644 --- a/common/index.go +++ b/common/index.go @@ -97,6 +97,7 @@ func (bt *Base[T, F]) Dispose() { } type Track interface { + GetReaderCount() int32 GetName() string GetBPS() int GetFPS() int diff --git a/common/ring-writer.go b/common/ring-writer.go index 3414a65..1da52f6 100644 --- a/common/ring-writer.go +++ b/common/ring-writer.go @@ -1,6 +1,8 @@ package common import ( + "sync/atomic" + "m7s.live/engine/v4/util" ) @@ -13,6 +15,7 @@ var EmptyLocker emptyLocker type RingWriter[T any, F IDataFrame[T]] struct { *util.Ring[F] `json:"-" yaml:"-"` + ReaderCount atomic.Int32 `json:"-" yaml:"-"` pool *util.Ring[F] poolSize int Size int @@ -109,3 +112,7 @@ func (rb *RingWriter[T, F]) Step() (normal bool) { rb.LastValue.Ready() return } + +func (rb *RingWriter[T, F]) GetReaderCount() int32 { + return rb.ReaderCount.Load() +} \ No newline at end of file diff --git a/events.go b/events.go index b38f0ff..3200b6f 100644 --- a/events.go +++ b/events.go @@ -94,8 +94,13 @@ func TryInvitePublish(streamPath string) { EventBus <- InvitePublish{Event: CreateEvent(streamPath)} } } + // InviteTrackEvent 邀请推送指定 Track 事件(转码需要) -type InviteTrack struct { +type InviteTrackEvent struct { Event[string] - *Stream -} \ No newline at end of file + ISubscriber +} + +func InviteTrack(name string, suber ISubscriber) { + EventBus <- InviteTrackEvent{Event: CreateEvent(name), ISubscriber: suber} +} diff --git a/stream.go b/stream.go index 3f9eded..187a4c0 100644 --- a/stream.go +++ b/stream.go @@ -125,10 +125,6 @@ type Tracks struct { marshalLock sync.Mutex } -func (tracks *Tracks) HasAV() bool { - return tracks.MainVideo != nil && tracks.MainAudio != nil -} - func (tracks *Tracks) Range(f func(name string, t Track)) { tracks.Map.Range(func(k, v any) bool { f(k.(string), v.(Track)) @@ -337,8 +333,6 @@ func (r *Stream) action(action StreamAction) (ok bool) { case STATE_PUBLISHING: stateEvent = SEtrackAvaliable{event} r.Subscribers.SendInviteTrack(r) - //订阅者等待音视频轨道超时了,放弃等待,订阅成功 - r.Subscribers.AbortWait() r.Subscribers.Broadcast(stateEvent) if puller, ok := r.Publisher.(IPuller); ok { puller.OnConnected() @@ -492,6 +486,7 @@ func (s *Stream) run() { if s.State == STATE_WAITTRACK { s.action(ACTION_TRACKAVAILABLE) } else { + s.Subscribers.AbortWait() s.timeout.Reset(time.Second * 5) } } else { @@ -573,10 +568,18 @@ func (s *Stream) run() { s.Tracks.Range(func(name string, t Track) { waits.Accept(t) }) - if !pubConfig.PubAudio || s.Subscribers.waitAborted { + if !pubConfig.PubAudio { + waits.audio.StopWait() + } else if s.State == STATE_PUBLISHING && len(waits.audio) > 0 { + waits.audio.InviteTrack(suber) + } else if s.Subscribers.waitAborted { waits.audio.StopWait() } - if !pubConfig.PubVideo || s.Subscribers.waitAborted { + if !pubConfig.PubVideo { + waits.video.StopWait() + } else if s.State == STATE_PUBLISHING && len(waits.video) > 0 { + waits.video.InviteTrack(suber) + } else if s.Subscribers.waitAborted { waits.video.StopWait() } } @@ -620,7 +623,7 @@ func (s *Stream) run() { if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo { s.Subscribers.AbortWait() } - if s.Tracks.HasAV() { + if (s.Tracks.MainVideo != nil || !pubConfig.PubVideo) && (!pubConfig.PubAudio || s.Tracks.MainAudio != nil) { s.action(ACTION_TRACKAVAILABLE) } } else { diff --git a/subscriber.go b/subscriber.go index f3d6f00..40fc35e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -121,6 +121,7 @@ type TrackPlayer struct { type Subscriber struct { IO Config *config.Subscribe + readers []*track.AVRingReader TrackPlayer `json:"-" yaml:"-"` } @@ -149,6 +150,7 @@ func (s *Subscriber) OnEvent(event any) { func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) { result = track.NewAVRingReader(t) + s.readers = append(s.readers, result) result.Logger = s.With(zap.String("track", t.Name)) return } diff --git a/subscribers.go b/subscribers.go index 5c45033..a8713e1 100644 --- a/subscribers.go +++ b/subscribers.go @@ -11,14 +11,14 @@ import ( type Subscribers struct { public map[ISubscriber]*waitTracks internal map[ISubscriber]*waitTracks - waits map[*waitTracks]struct{} + waits map[*waitTracks]ISubscriber waitAborted bool // 不再等待了 } func (s *Subscribers) Init() { s.public = make(map[ISubscriber]*waitTracks) s.internal = make(map[ISubscriber]*waitTracks) - s.waits = make(map[*waitTracks]struct{}) + s.waits = make(map[*waitTracks]ISubscriber) } func (s *Subscribers) MarshalJSON() ([]byte, error) { @@ -88,27 +88,21 @@ func (s *Subscribers) OnPublisherLost(event StateEvent) { // SendInviteTrack 广播需要的 Track(转码插件可以用到) func (s *Subscribers) SendInviteTrack(stream *Stream) { - var video = map[string]struct{}{} - var audio = map[string]struct{}{} - for wait := range s.waits { + var video = map[string]ISubscriber{} + var audio = map[string]ISubscriber{} + for wait, suber := range s.waits { for _, name := range wait.video { - video[name] = struct{}{} + video[name] = suber } for _, name := range wait.audio { - audio[name] = struct{}{} + audio[name] = suber } } - for v := range video { - EventBus <- InviteTrack{ - Event: CreateEvent(v), - Stream: stream, - } + for v, suber := range video { + InviteTrack(v, suber) } - for a := range audio { - EventBus <- InviteTrack{ - Event: CreateEvent(a), - Stream: stream, - } + for a, suber := range audio { + InviteTrack(a, suber) } } @@ -130,9 +124,18 @@ func (s *Subscribers) Find(id string) ISubscriber { } func (s *Subscribers) Delete(suber ISubscriber) { - delete(s.public, suber) io := suber.GetSubscriber() - io.Info("suber -1", zap.Int("remains", s.Len())) + for _, reader := range io.readers { + reader.Track.ReaderCount.Add(-1) + } + if _, ok := s.public[suber]; ok { + delete(s.public, suber) + io.Info("suber -1", zap.Int("remains", s.Len())) + } + if _, ok := s.internal[suber]; ok { + delete(s.internal, suber) + io.Info("innersuber -1", zap.Int("remains", len(s.internal))) + } if config.Global.EnableSubEvent { EventBus <- UnsubscribeEvent{CreateEvent(suber)} } @@ -151,7 +154,7 @@ func (s *Subscribers) Add(suber ISubscriber, wait *waitTracks) { } } if wait.NeedWait() { - s.waits[wait] = struct{}{} + s.waits[wait] = suber } else { wait.Resolve() } diff --git a/track/reader-av.go b/track/reader-av.go index 8e4e2e1..9de4cef 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -44,6 +44,7 @@ func (r *AVRingReader) DecConfChanged() bool { } func NewAVRingReader(t *Media) *AVRingReader { + t.ReaderCount.Add(1) return &AVRingReader{ Track: t, } diff --git a/wait-tracks.go b/wait-tracks.go index 301de12..09a69be 100644 --- a/wait-tracks.go +++ b/wait-tracks.go @@ -32,7 +32,11 @@ func (w *waitTrackNames) Wait(names ...string) { func (w *waitTrackNames) StopWait() { *w = nil } - +func (w waitTrackNames) InviteTrack(suber ISubscriber) { + if len(w) > 0 { + InviteTrack(w[0], suber) + } +} // Accept 检查名称是否在等待候选项中 func (w *waitTrackNames) Accept(name string) bool { if !w.Waiting() {