diff --git a/common/index.go b/common/index.go index 6a027d9..7c09310 100644 --- a/common/index.go +++ b/common/index.go @@ -92,6 +92,10 @@ func (bt *Base[T, F]) SetStuff(stuff ...any) { } } +func (bt *Base[T, F]) Dispose() { + bt.Value.Broadcast() +} + type Track interface { GetName() string GetBPS() int diff --git a/stream.go b/stream.go index b268891..12e985d 100644 --- a/stream.go +++ b/stream.go @@ -2,7 +2,6 @@ package engine import ( "encoding/json" - "sort" "strings" "sync" "time" @@ -88,30 +87,6 @@ var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{ // Streams 所有的流集合 var Streams util.Map[string, *Stream] -type StreamList []*Stream - -func (l StreamList) Len() int { - return len(l) -} - -func (l StreamList) Less(i, j int) bool { - return l[i].Path < l[j].Path -} - -func (l StreamList) Swap(i, j int) { - l[i], l[j] = l[j], l[i] -} - -func (l StreamList) Sort() { - sort.Sort(l) -} - -func GetSortedStreamList() StreamList { - result := StreamList(Streams.ToList()) - result.Sort() - return result -} - func FilterStreams[T IPublisher]() (ss []*Stream) { Streams.Range(func(_ string, s *Stream) { if _, ok := s.Publisher.(T); ok { @@ -350,14 +325,18 @@ func (r *Stream) action(action StreamAction) (ok bool) { r.timeout.Reset(r.DelayCloseTimeout) } case STATE_CLOSED: + Streams.Delete(r.Path) + r.timeout.Stop() + r.Subscribers.Dispose() for !r.actionChan.Close() { // 等待channel发送完毕,伪自旋锁 time.Sleep(time.Millisecond * 100) } stateEvent = SEclose{event} r.Subscribers.Broadcast(stateEvent) - Streams.Delete(r.Path) - r.timeout.Stop() + r.Tracks.Range(func(_ string, t Track) { + t.Dispose() + }) } EventBus <- stateEvent if r.Publisher != nil { @@ -494,140 +473,135 @@ func (s *Stream) run() { s.action(ACTION_TIMEOUT) } case action, ok := <-s.actionChan.C: + if !ok { + return + } timeStart = time.Now() - if ok { - switch v := action.(type) { - case SubPulse: - timeOutInfo = zap.String("action", "SubPulse") - pulseSuber[v] = struct{}{} - case *util.Promise[IPublisher]: - timeOutInfo = zap.String("action", "Publish") - if s.IsClosed() { - v.Reject(ErrStreamIsClosed) - } - republish := s.Publisher == v.Value // 重复发布 - kicked := !republish && s.Publisher != nil && s.Publisher.IsClosed() // 被踢下线 - if !republish { - s.Publisher = v.Value - } - if s.action(ACTION_PUBLISH) || republish || kicked { - v.Resolve() - if s.Publisher.GetPublisher().Config.InsertSEI { - if s.Tracks.SEI == nil { - s.Tracks.SEI = track.NewDataTrack[[]byte]("sei") - s.Tracks.SEI.Locker = &sync.Mutex{} - s.Tracks.SEI.SetStuff(s) - if s.Tracks.Add("sei", s.Tracks.SEI) { - s.Info("sei track added") - } + switch v := action.(type) { + case SubPulse: + timeOutInfo = zap.String("action", "SubPulse") + pulseSuber[v] = struct{}{} + case *util.Promise[IPublisher]: + timeOutInfo = zap.String("action", "Publish") + if s.IsClosed() { + v.Reject(ErrStreamIsClosed) + } + republish := s.Publisher == v.Value // 重复发布 + kicked := !republish && s.Publisher != nil && s.Publisher.IsClosed() // 被踢下线 + if !republish { + s.Publisher = v.Value + } + if s.action(ACTION_PUBLISH) || republish || kicked { + v.Resolve() + if s.Publisher.GetPublisher().Config.InsertSEI { + if s.Tracks.SEI == nil { + s.Tracks.SEI = track.NewDataTrack[[]byte]("sei") + s.Tracks.SEI.Locker = &sync.Mutex{} + s.Tracks.SEI.SetStuff(s) + if s.Tracks.Add("sei", s.Tracks.SEI) { + s.Info("sei track added") } } - } else { - v.Reject(ErrDuplicatePublish) } - case *util.Promise[ISubscriber]: - timeOutInfo = zap.String("action", "Subscribe") - if s.IsClosed() { - v.Reject(ErrStreamIsClosed) - } - suber := v.Value - io := suber.GetSubscriber() - sbConfig := io.Config - waits := &waitTracks{ - Promise: v, - } - if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" { - waits.audio.Wait(strings.Split(ats, ",")...) - } else if len(sbConfig.SubAudioTracks) > 0 { - waits.audio.Wait(sbConfig.SubAudioTracks...) - } else if sbConfig.SubAudio { - waits.audio.Wait() - } - if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" { - waits.video.Wait(strings.Split(vts, ",")...) - } else if len(sbConfig.SubVideoTracks) > 0 { - waits.video.Wait(sbConfig.SubVideoTracks...) - } else if sbConfig.SubVideo { - waits.video.Wait() - } - if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" { - waits.data.Wait(strings.Split(dts, ",")...) - } else { - // waits.data.Wait() - } - if s.Publisher != nil { - s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量 - pubConfig := s.Publisher.GetPublisher().Config - s.Tracks.Range(func(name string, t Track) { - waits.Accept(t) - }) - if !pubConfig.PubAudio || s.Subscribers.waitAborted { - waits.audio.StopWait() - } - if !pubConfig.PubVideo || s.Subscribers.waitAborted { - waits.video.StopWait() - } - } - s.Subscribers.Add(suber, waits) - if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE { - s.action(ACTION_FIRSTENTER) - } - case Unsubscribe: - timeOutInfo = zap.String("action", "Unsubscribe") - delete(pulseSuber, v) - s.onSuberClose(v) - case TrackRemoved: - timeOutInfo = zap.String("action", "TrackRemoved") - name := v.GetName() - if t, ok := s.Tracks.LoadAndDelete(name); ok { - s.Info("track -1", zap.String("name", name)) - s.Subscribers.Broadcast(t) - t.(common.Track).Dispose() - } - case *util.Promise[Track]: - timeOutInfo = zap.String("action", "Track") - if s.State == STATE_WAITPUBLISH { - s.action(ACTION_PUBLISH) - } - pubConfig := s.GetPublisherConfig() - name := v.Value.GetName() - if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo { - v.Reject(ErrTrackMute) - continue - } - if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubAudio { - v.Reject(ErrTrackMute) - continue - } - if s.Tracks.Add(name, v.Value) { - v.Resolve() - s.Subscribers.OnTrack(v.Value) - if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubAudio { - s.Subscribers.AbortWait() - } - if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo { - s.Subscribers.AbortWait() - } - // 这里重置的目的是当PublishTimeout设置很大的情况下,需要及时取消订阅者的等待 - s.timeout.Reset(time.Second * 5) - } else { - v.Reject(ErrBadTrackName) - } - case NoMoreTrack: - s.Subscribers.AbortWait() - case StreamAction: - timeOutInfo = zap.String("action", "StreamAction"+v.String()) - s.action(v) - default: - timeOutInfo = zap.String("action", "unknown") - s.Error("unknown action", timeOutInfo) + } else { + v.Reject(ErrDuplicatePublish) } - } else { - s.Subscribers.Dispose() - s.Tracks.Range(func(_ string, t Track) { - t.Dispose() - }) - return + case *util.Promise[ISubscriber]: + timeOutInfo = zap.String("action", "Subscribe") + if s.IsClosed() { + v.Reject(ErrStreamIsClosed) + } + suber := v.Value + io := suber.GetSubscriber() + sbConfig := io.Config + waits := &waitTracks{ + Promise: v, + } + if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" { + waits.audio.Wait(strings.Split(ats, ",")...) + } else if len(sbConfig.SubAudioTracks) > 0 { + waits.audio.Wait(sbConfig.SubAudioTracks...) + } else if sbConfig.SubAudio { + waits.audio.Wait() + } + if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" { + waits.video.Wait(strings.Split(vts, ",")...) + } else if len(sbConfig.SubVideoTracks) > 0 { + waits.video.Wait(sbConfig.SubVideoTracks...) + } else if sbConfig.SubVideo { + waits.video.Wait() + } + if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" { + waits.data.Wait(strings.Split(dts, ",")...) + } else { + // waits.data.Wait() + } + if s.Publisher != nil { + s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量 + pubConfig := s.Publisher.GetPublisher().Config + s.Tracks.Range(func(name string, t Track) { + waits.Accept(t) + }) + if !pubConfig.PubAudio || s.Subscribers.waitAborted { + waits.audio.StopWait() + } + if !pubConfig.PubVideo || s.Subscribers.waitAborted { + waits.video.StopWait() + } + } + s.Subscribers.Add(suber, waits) + if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE { + s.action(ACTION_FIRSTENTER) + } + case Unsubscribe: + timeOutInfo = zap.String("action", "Unsubscribe") + delete(pulseSuber, v) + s.onSuberClose(v) + case TrackRemoved: + timeOutInfo = zap.String("action", "TrackRemoved") + name := v.GetName() + if t, ok := s.Tracks.LoadAndDelete(name); ok { + s.Info("track -1", zap.String("name", name)) + s.Subscribers.Broadcast(t) + t.(common.Track).Dispose() + } + case *util.Promise[Track]: + timeOutInfo = zap.String("action", "Track") + if s.State == STATE_WAITPUBLISH { + s.action(ACTION_PUBLISH) + } + pubConfig := s.GetPublisherConfig() + name := v.Value.GetName() + if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo { + v.Reject(ErrTrackMute) + continue + } + if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubAudio { + v.Reject(ErrTrackMute) + continue + } + if s.Tracks.Add(name, v.Value) { + v.Resolve() + s.Subscribers.OnTrack(v.Value) + if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubAudio { + s.Subscribers.AbortWait() + } + if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo { + s.Subscribers.AbortWait() + } + // 这里重置的目的是当PublishTimeout设置很大的情况下,需要及时取消订阅者的等待 + s.timeout.Reset(time.Second * 5) + } else { + v.Reject(ErrBadTrackName) + } + case NoMoreTrack: + s.Subscribers.AbortWait() + case StreamAction: + timeOutInfo = zap.String("action", "StreamAction"+v.String()) + s.action(v) + default: + timeOutInfo = zap.String("action", "unknown") + s.Error("unknown action", timeOutInfo) } } } diff --git a/summary.go b/summary.go index 8c1e24b..edc878c 100644 --- a/summary.go +++ b/summary.go @@ -16,7 +16,7 @@ var ( summary SummaryUtil lastSummary Summary children util.Map[string, *Summary] - collectLock sync.Mutex + collectLock sync.RWMutex ) // ServerSummary 系统摘要定义 type Summary struct { @@ -62,8 +62,13 @@ func (s *SummaryUtil) MarshalYAML() (any, error) { } func (s *SummaryUtil) collect() *Summary { - collectLock.Lock() - defer collectLock.Unlock() + if collectLock.TryLock() { + defer collectLock.Unlock() + } else { + collectLock.RLock() + defer collectLock.RUnlock() + return &lastSummary + } dur := time.Since(s.ts) if dur < time.Second { return &lastSummary diff --git a/track/base.go b/track/base.go index f556737..4c252b6 100644 --- a/track/base.go +++ b/track/base.go @@ -102,10 +102,6 @@ type Media struct { 流速控制 } -func (av *Media) Dispose() { - av.Value.Broadcast() -} - func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) { if b.Reuse() { item = av.BytesPool.Get(b.Len()) diff --git a/track/data.go b/track/data.go index d8c29bc..a804e14 100644 --- a/track/data.go +++ b/track/data.go @@ -60,10 +60,6 @@ func (d *Data[T]) Attach(s IStream) { } } -func (d *Data[T]) Dispose() { - d.Value.Broadcast() -} - func (d *Data[T]) LastWriteTime() time.Time { return d.LastValue.WriteTime } diff --git a/util/list.go b/util/list.go index c340ecf..0ddb7e6 100644 --- a/util/list.go +++ b/util/list.go @@ -82,7 +82,7 @@ type List[T any] struct { } func (p *List[T]) PushValue(value T) { - p.Push(&ListItem[T]{Value: value}) + p.Push(&ListItem[T]{Value: value, reset: true}) } func (p *List[T]) Push(item *ListItem[T]) { diff --git a/util/pool.go b/util/pool.go index 75a3332..9c65677 100644 --- a/util/pool.go +++ b/util/pool.go @@ -309,6 +309,7 @@ func (p BytesPool) Get(size int) (item *ListItem[Buffer]) { if item == nil { item = &ListItem[Buffer]{ Value: make(Buffer, size), + reset: true, } } return