feat: add track ReaderCount prop

This commit is contained in:
langhuihui
2023-10-17 20:10:48 +08:00
parent fba10d93eb
commit afabc2777b
8 changed files with 59 additions and 33 deletions

View File

@@ -97,6 +97,7 @@ func (bt *Base[T, F]) Dispose() {
} }
type Track interface { type Track interface {
GetReaderCount() int32
GetName() string GetName() string
GetBPS() int GetBPS() int
GetFPS() int GetFPS() int

View File

@@ -1,6 +1,8 @@
package common package common
import ( import (
"sync/atomic"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
@@ -13,6 +15,7 @@ var EmptyLocker emptyLocker
type RingWriter[T any, F IDataFrame[T]] struct { type RingWriter[T any, F IDataFrame[T]] struct {
*util.Ring[F] `json:"-" yaml:"-"` *util.Ring[F] `json:"-" yaml:"-"`
ReaderCount atomic.Int32 `json:"-" yaml:"-"`
pool *util.Ring[F] pool *util.Ring[F]
poolSize int poolSize int
Size int Size int
@@ -109,3 +112,7 @@ func (rb *RingWriter[T, F]) Step() (normal bool) {
rb.LastValue.Ready() rb.LastValue.Ready()
return return
} }
func (rb *RingWriter[T, F]) GetReaderCount() int32 {
return rb.ReaderCount.Load()
}

View File

@@ -94,8 +94,13 @@ func TryInvitePublish(streamPath string) {
EventBus <- InvitePublish{Event: CreateEvent(streamPath)} EventBus <- InvitePublish{Event: CreateEvent(streamPath)}
} }
} }
// InviteTrackEvent 邀请推送指定 Track 事件(转码需要) // InviteTrackEvent 邀请推送指定 Track 事件(转码需要)
type InviteTrack struct { type InviteTrackEvent struct {
Event[string] Event[string]
*Stream ISubscriber
} }
func InviteTrack(name string, suber ISubscriber) {
EventBus <- InviteTrackEvent{Event: CreateEvent(name), ISubscriber: suber}
}

View File

@@ -125,10 +125,6 @@ type Tracks struct {
marshalLock sync.Mutex marshalLock sync.Mutex
} }
func (tracks *Tracks) HasAV() bool {
return tracks.MainVideo != nil && tracks.MainAudio != nil
}
func (tracks *Tracks) Range(f func(name string, t Track)) { func (tracks *Tracks) Range(f func(name string, t Track)) {
tracks.Map.Range(func(k, v any) bool { tracks.Map.Range(func(k, v any) bool {
f(k.(string), v.(Track)) f(k.(string), v.(Track))
@@ -337,8 +333,6 @@ func (r *Stream) action(action StreamAction) (ok bool) {
case STATE_PUBLISHING: case STATE_PUBLISHING:
stateEvent = SEtrackAvaliable{event} stateEvent = SEtrackAvaliable{event}
r.Subscribers.SendInviteTrack(r) r.Subscribers.SendInviteTrack(r)
//订阅者等待音视频轨道超时了,放弃等待,订阅成功
r.Subscribers.AbortWait()
r.Subscribers.Broadcast(stateEvent) r.Subscribers.Broadcast(stateEvent)
if puller, ok := r.Publisher.(IPuller); ok { if puller, ok := r.Publisher.(IPuller); ok {
puller.OnConnected() puller.OnConnected()
@@ -492,6 +486,7 @@ func (s *Stream) run() {
if s.State == STATE_WAITTRACK { if s.State == STATE_WAITTRACK {
s.action(ACTION_TRACKAVAILABLE) s.action(ACTION_TRACKAVAILABLE)
} else { } else {
s.Subscribers.AbortWait()
s.timeout.Reset(time.Second * 5) s.timeout.Reset(time.Second * 5)
} }
} else { } else {
@@ -573,10 +568,18 @@ func (s *Stream) run() {
s.Tracks.Range(func(name string, t Track) { s.Tracks.Range(func(name string, t Track) {
waits.Accept(t) waits.Accept(t)
}) })
if !pubConfig.PubAudio || s.Subscribers.waitAborted { if !pubConfig.PubAudio {
waits.audio.StopWait()
} else if s.State == STATE_PUBLISHING && len(waits.audio) > 0 {
waits.audio.InviteTrack(suber)
} else if s.Subscribers.waitAborted {
waits.audio.StopWait() waits.audio.StopWait()
} }
if !pubConfig.PubVideo || s.Subscribers.waitAborted { if !pubConfig.PubVideo {
waits.video.StopWait()
} else if s.State == STATE_PUBLISHING && len(waits.video) > 0 {
waits.video.InviteTrack(suber)
} else if s.Subscribers.waitAborted {
waits.video.StopWait() waits.video.StopWait()
} }
} }
@@ -620,7 +623,7 @@ func (s *Stream) run() {
if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo { if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo {
s.Subscribers.AbortWait() s.Subscribers.AbortWait()
} }
if s.Tracks.HasAV() { if (s.Tracks.MainVideo != nil || !pubConfig.PubVideo) && (!pubConfig.PubAudio || s.Tracks.MainAudio != nil) {
s.action(ACTION_TRACKAVAILABLE) s.action(ACTION_TRACKAVAILABLE)
} }
} else { } else {

View File

@@ -121,6 +121,7 @@ type TrackPlayer struct {
type Subscriber struct { type Subscriber struct {
IO IO
Config *config.Subscribe Config *config.Subscribe
readers []*track.AVRingReader
TrackPlayer `json:"-" yaml:"-"` TrackPlayer `json:"-" yaml:"-"`
} }
@@ -149,6 +150,7 @@ func (s *Subscriber) OnEvent(event any) {
func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) { func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) {
result = track.NewAVRingReader(t) result = track.NewAVRingReader(t)
s.readers = append(s.readers, result)
result.Logger = s.With(zap.String("track", t.Name)) result.Logger = s.With(zap.String("track", t.Name))
return return
} }

View File

@@ -11,14 +11,14 @@ import (
type Subscribers struct { type Subscribers struct {
public map[ISubscriber]*waitTracks public map[ISubscriber]*waitTracks
internal map[ISubscriber]*waitTracks internal map[ISubscriber]*waitTracks
waits map[*waitTracks]struct{} waits map[*waitTracks]ISubscriber
waitAborted bool // 不再等待了 waitAborted bool // 不再等待了
} }
func (s *Subscribers) Init() { func (s *Subscribers) Init() {
s.public = make(map[ISubscriber]*waitTracks) s.public = make(map[ISubscriber]*waitTracks)
s.internal = make(map[ISubscriber]*waitTracks) s.internal = make(map[ISubscriber]*waitTracks)
s.waits = make(map[*waitTracks]struct{}) s.waits = make(map[*waitTracks]ISubscriber)
} }
func (s *Subscribers) MarshalJSON() ([]byte, error) { func (s *Subscribers) MarshalJSON() ([]byte, error) {
@@ -88,27 +88,21 @@ func (s *Subscribers) OnPublisherLost(event StateEvent) {
// SendInviteTrack 广播需要的 Track转码插件可以用到 // SendInviteTrack 广播需要的 Track转码插件可以用到
func (s *Subscribers) SendInviteTrack(stream *Stream) { func (s *Subscribers) SendInviteTrack(stream *Stream) {
var video = map[string]struct{}{} var video = map[string]ISubscriber{}
var audio = map[string]struct{}{} var audio = map[string]ISubscriber{}
for wait := range s.waits { for wait, suber := range s.waits {
for _, name := range wait.video { for _, name := range wait.video {
video[name] = struct{}{} video[name] = suber
} }
for _, name := range wait.audio { for _, name := range wait.audio {
audio[name] = struct{}{} audio[name] = suber
} }
} }
for v := range video { for v, suber := range video {
EventBus <- InviteTrack{ InviteTrack(v, suber)
Event: CreateEvent(v),
Stream: stream,
}
} }
for a := range audio { for a, suber := range audio {
EventBus <- InviteTrack{ InviteTrack(a, suber)
Event: CreateEvent(a),
Stream: stream,
}
} }
} }
@@ -130,9 +124,18 @@ func (s *Subscribers) Find(id string) ISubscriber {
} }
func (s *Subscribers) Delete(suber ISubscriber) { func (s *Subscribers) Delete(suber ISubscriber) {
delete(s.public, suber)
io := suber.GetSubscriber() io := suber.GetSubscriber()
io.Info("suber -1", zap.Int("remains", s.Len())) for _, reader := range io.readers {
reader.Track.ReaderCount.Add(-1)
}
if _, ok := s.public[suber]; ok {
delete(s.public, suber)
io.Info("suber -1", zap.Int("remains", s.Len()))
}
if _, ok := s.internal[suber]; ok {
delete(s.internal, suber)
io.Info("innersuber -1", zap.Int("remains", len(s.internal)))
}
if config.Global.EnableSubEvent { if config.Global.EnableSubEvent {
EventBus <- UnsubscribeEvent{CreateEvent(suber)} EventBus <- UnsubscribeEvent{CreateEvent(suber)}
} }
@@ -151,7 +154,7 @@ func (s *Subscribers) Add(suber ISubscriber, wait *waitTracks) {
} }
} }
if wait.NeedWait() { if wait.NeedWait() {
s.waits[wait] = struct{}{} s.waits[wait] = suber
} else { } else {
wait.Resolve() wait.Resolve()
} }

View File

@@ -44,6 +44,7 @@ func (r *AVRingReader) DecConfChanged() bool {
} }
func NewAVRingReader(t *Media) *AVRingReader { func NewAVRingReader(t *Media) *AVRingReader {
t.ReaderCount.Add(1)
return &AVRingReader{ return &AVRingReader{
Track: t, Track: t,
} }

View File

@@ -32,7 +32,11 @@ func (w *waitTrackNames) Wait(names ...string) {
func (w *waitTrackNames) StopWait() { func (w *waitTrackNames) StopWait() {
*w = nil *w = nil
} }
func (w waitTrackNames) InviteTrack(suber ISubscriber) {
if len(w) > 0 {
InviteTrack(w[0], suber)
}
}
// Accept 检查名称是否在等待候选项中 // Accept 检查名称是否在等待候选项中
func (w *waitTrackNames) Accept(name string) bool { func (w *waitTrackNames) Accept(name string) bool {
if !w.Waiting() { if !w.Waiting() {