fix: fetch summary timeout

This commit is contained in:
langhuihui
2023-09-06 13:36:19 +08:00
parent f89b75ecb1
commit 934d3cf9d4
7 changed files with 144 additions and 168 deletions

View File

@@ -92,6 +92,10 @@ func (bt *Base[T, F]) SetStuff(stuff ...any) {
} }
} }
func (bt *Base[T, F]) Dispose() {
bt.Value.Broadcast()
}
type Track interface { type Track interface {
GetName() string GetName() string
GetBPS() int GetBPS() int

286
stream.go
View File

@@ -2,7 +2,6 @@ package engine
import ( import (
"encoding/json" "encoding/json"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -88,30 +87,6 @@ var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{
// Streams 所有的流集合 // Streams 所有的流集合
var Streams util.Map[string, *Stream] var Streams util.Map[string, *Stream]
type StreamList []*Stream
func (l StreamList) Len() int {
return len(l)
}
func (l StreamList) Less(i, j int) bool {
return l[i].Path < l[j].Path
}
func (l StreamList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}
func (l StreamList) Sort() {
sort.Sort(l)
}
func GetSortedStreamList() StreamList {
result := StreamList(Streams.ToList())
result.Sort()
return result
}
func FilterStreams[T IPublisher]() (ss []*Stream) { func FilterStreams[T IPublisher]() (ss []*Stream) {
Streams.Range(func(_ string, s *Stream) { Streams.Range(func(_ string, s *Stream) {
if _, ok := s.Publisher.(T); ok { if _, ok := s.Publisher.(T); ok {
@@ -350,14 +325,18 @@ func (r *Stream) action(action StreamAction) (ok bool) {
r.timeout.Reset(r.DelayCloseTimeout) r.timeout.Reset(r.DelayCloseTimeout)
} }
case STATE_CLOSED: case STATE_CLOSED:
Streams.Delete(r.Path)
r.timeout.Stop()
r.Subscribers.Dispose()
for !r.actionChan.Close() { for !r.actionChan.Close() {
// 等待channel发送完毕伪自旋锁 // 等待channel发送完毕伪自旋锁
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
} }
stateEvent = SEclose{event} stateEvent = SEclose{event}
r.Subscribers.Broadcast(stateEvent) r.Subscribers.Broadcast(stateEvent)
Streams.Delete(r.Path) r.Tracks.Range(func(_ string, t Track) {
r.timeout.Stop() t.Dispose()
})
} }
EventBus <- stateEvent EventBus <- stateEvent
if r.Publisher != nil { if r.Publisher != nil {
@@ -494,140 +473,135 @@ func (s *Stream) run() {
s.action(ACTION_TIMEOUT) s.action(ACTION_TIMEOUT)
} }
case action, ok := <-s.actionChan.C: case action, ok := <-s.actionChan.C:
if !ok {
return
}
timeStart = time.Now() timeStart = time.Now()
if ok { switch v := action.(type) {
switch v := action.(type) { case SubPulse:
case SubPulse: timeOutInfo = zap.String("action", "SubPulse")
timeOutInfo = zap.String("action", "SubPulse") pulseSuber[v] = struct{}{}
pulseSuber[v] = struct{}{} case *util.Promise[IPublisher]:
case *util.Promise[IPublisher]: timeOutInfo = zap.String("action", "Publish")
timeOutInfo = zap.String("action", "Publish") if s.IsClosed() {
if s.IsClosed() { v.Reject(ErrStreamIsClosed)
v.Reject(ErrStreamIsClosed) }
} republish := s.Publisher == v.Value // 重复发布
republish := s.Publisher == v.Value // 重复发布 kicked := !republish && s.Publisher != nil && s.Publisher.IsClosed() // 被踢下线
kicked := !republish && s.Publisher != nil && s.Publisher.IsClosed() // 被踢下线 if !republish {
if !republish { s.Publisher = v.Value
s.Publisher = v.Value }
} if s.action(ACTION_PUBLISH) || republish || kicked {
if s.action(ACTION_PUBLISH) || republish || kicked { v.Resolve()
v.Resolve() if s.Publisher.GetPublisher().Config.InsertSEI {
if s.Publisher.GetPublisher().Config.InsertSEI { if s.Tracks.SEI == nil {
if s.Tracks.SEI == nil { s.Tracks.SEI = track.NewDataTrack[[]byte]("sei")
s.Tracks.SEI = track.NewDataTrack[[]byte]("sei") s.Tracks.SEI.Locker = &sync.Mutex{}
s.Tracks.SEI.Locker = &sync.Mutex{} s.Tracks.SEI.SetStuff(s)
s.Tracks.SEI.SetStuff(s) if s.Tracks.Add("sei", s.Tracks.SEI) {
if s.Tracks.Add("sei", s.Tracks.SEI) { s.Info("sei track added")
s.Info("sei track added")
}
} }
} }
} else {
v.Reject(ErrDuplicatePublish)
} }
case *util.Promise[ISubscriber]: } else {
timeOutInfo = zap.String("action", "Subscribe") v.Reject(ErrDuplicatePublish)
if s.IsClosed() {
v.Reject(ErrStreamIsClosed)
}
suber := v.Value
io := suber.GetSubscriber()
sbConfig := io.Config
waits := &waitTracks{
Promise: v,
}
if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" {
waits.audio.Wait(strings.Split(ats, ",")...)
} else if len(sbConfig.SubAudioTracks) > 0 {
waits.audio.Wait(sbConfig.SubAudioTracks...)
} else if sbConfig.SubAudio {
waits.audio.Wait()
}
if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" {
waits.video.Wait(strings.Split(vts, ",")...)
} else if len(sbConfig.SubVideoTracks) > 0 {
waits.video.Wait(sbConfig.SubVideoTracks...)
} else if sbConfig.SubVideo {
waits.video.Wait()
}
if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" {
waits.data.Wait(strings.Split(dts, ",")...)
} else {
// waits.data.Wait()
}
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入在回调中可以去获取订阅者数量
pubConfig := s.Publisher.GetPublisher().Config
s.Tracks.Range(func(name string, t Track) {
waits.Accept(t)
})
if !pubConfig.PubAudio || s.Subscribers.waitAborted {
waits.audio.StopWait()
}
if !pubConfig.PubVideo || s.Subscribers.waitAborted {
waits.video.StopWait()
}
}
s.Subscribers.Add(suber, waits)
if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE {
s.action(ACTION_FIRSTENTER)
}
case Unsubscribe:
timeOutInfo = zap.String("action", "Unsubscribe")
delete(pulseSuber, v)
s.onSuberClose(v)
case TrackRemoved:
timeOutInfo = zap.String("action", "TrackRemoved")
name := v.GetName()
if t, ok := s.Tracks.LoadAndDelete(name); ok {
s.Info("track -1", zap.String("name", name))
s.Subscribers.Broadcast(t)
t.(common.Track).Dispose()
}
case *util.Promise[Track]:
timeOutInfo = zap.String("action", "Track")
if s.State == STATE_WAITPUBLISH {
s.action(ACTION_PUBLISH)
}
pubConfig := s.GetPublisherConfig()
name := v.Value.GetName()
if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo {
v.Reject(ErrTrackMute)
continue
}
if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubAudio {
v.Reject(ErrTrackMute)
continue
}
if s.Tracks.Add(name, v.Value) {
v.Resolve()
s.Subscribers.OnTrack(v.Value)
if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubAudio {
s.Subscribers.AbortWait()
}
if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo {
s.Subscribers.AbortWait()
}
// 这里重置的目的是当PublishTimeout设置很大的情况下需要及时取消订阅者的等待
s.timeout.Reset(time.Second * 5)
} else {
v.Reject(ErrBadTrackName)
}
case NoMoreTrack:
s.Subscribers.AbortWait()
case StreamAction:
timeOutInfo = zap.String("action", "StreamAction"+v.String())
s.action(v)
default:
timeOutInfo = zap.String("action", "unknown")
s.Error("unknown action", timeOutInfo)
} }
} else { case *util.Promise[ISubscriber]:
s.Subscribers.Dispose() timeOutInfo = zap.String("action", "Subscribe")
s.Tracks.Range(func(_ string, t Track) { if s.IsClosed() {
t.Dispose() v.Reject(ErrStreamIsClosed)
}) }
return suber := v.Value
io := suber.GetSubscriber()
sbConfig := io.Config
waits := &waitTracks{
Promise: v,
}
if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" {
waits.audio.Wait(strings.Split(ats, ",")...)
} else if len(sbConfig.SubAudioTracks) > 0 {
waits.audio.Wait(sbConfig.SubAudioTracks...)
} else if sbConfig.SubAudio {
waits.audio.Wait()
}
if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" {
waits.video.Wait(strings.Split(vts, ",")...)
} else if len(sbConfig.SubVideoTracks) > 0 {
waits.video.Wait(sbConfig.SubVideoTracks...)
} else if sbConfig.SubVideo {
waits.video.Wait()
}
if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" {
waits.data.Wait(strings.Split(dts, ",")...)
} else {
// waits.data.Wait()
}
if s.Publisher != nil {
s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入在回调中可以去获取订阅者数量
pubConfig := s.Publisher.GetPublisher().Config
s.Tracks.Range(func(name string, t Track) {
waits.Accept(t)
})
if !pubConfig.PubAudio || s.Subscribers.waitAborted {
waits.audio.StopWait()
}
if !pubConfig.PubVideo || s.Subscribers.waitAborted {
waits.video.StopWait()
}
}
s.Subscribers.Add(suber, waits)
if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE {
s.action(ACTION_FIRSTENTER)
}
case Unsubscribe:
timeOutInfo = zap.String("action", "Unsubscribe")
delete(pulseSuber, v)
s.onSuberClose(v)
case TrackRemoved:
timeOutInfo = zap.String("action", "TrackRemoved")
name := v.GetName()
if t, ok := s.Tracks.LoadAndDelete(name); ok {
s.Info("track -1", zap.String("name", name))
s.Subscribers.Broadcast(t)
t.(common.Track).Dispose()
}
case *util.Promise[Track]:
timeOutInfo = zap.String("action", "Track")
if s.State == STATE_WAITPUBLISH {
s.action(ACTION_PUBLISH)
}
pubConfig := s.GetPublisherConfig()
name := v.Value.GetName()
if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo {
v.Reject(ErrTrackMute)
continue
}
if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubAudio {
v.Reject(ErrTrackMute)
continue
}
if s.Tracks.Add(name, v.Value) {
v.Resolve()
s.Subscribers.OnTrack(v.Value)
if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubAudio {
s.Subscribers.AbortWait()
}
if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo {
s.Subscribers.AbortWait()
}
// 这里重置的目的是当PublishTimeout设置很大的情况下需要及时取消订阅者的等待
s.timeout.Reset(time.Second * 5)
} else {
v.Reject(ErrBadTrackName)
}
case NoMoreTrack:
s.Subscribers.AbortWait()
case StreamAction:
timeOutInfo = zap.String("action", "StreamAction"+v.String())
s.action(v)
default:
timeOutInfo = zap.String("action", "unknown")
s.Error("unknown action", timeOutInfo)
} }
} }
} }

View File

@@ -16,7 +16,7 @@ var (
summary SummaryUtil summary SummaryUtil
lastSummary Summary lastSummary Summary
children util.Map[string, *Summary] children util.Map[string, *Summary]
collectLock sync.Mutex collectLock sync.RWMutex
) )
// ServerSummary 系统摘要定义 // ServerSummary 系统摘要定义
type Summary struct { type Summary struct {
@@ -62,8 +62,13 @@ func (s *SummaryUtil) MarshalYAML() (any, error) {
} }
func (s *SummaryUtil) collect() *Summary { func (s *SummaryUtil) collect() *Summary {
collectLock.Lock() if collectLock.TryLock() {
defer collectLock.Unlock() defer collectLock.Unlock()
} else {
collectLock.RLock()
defer collectLock.RUnlock()
return &lastSummary
}
dur := time.Since(s.ts) dur := time.Since(s.ts)
if dur < time.Second { if dur < time.Second {
return &lastSummary return &lastSummary

View File

@@ -102,10 +102,6 @@ type Media struct {
流速控制 流速控制
} }
func (av *Media) Dispose() {
av.Value.Broadcast()
}
func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) { func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) {
if b.Reuse() { if b.Reuse() {
item = av.BytesPool.Get(b.Len()) item = av.BytesPool.Get(b.Len())

View File

@@ -60,10 +60,6 @@ func (d *Data[T]) Attach(s IStream) {
} }
} }
func (d *Data[T]) Dispose() {
d.Value.Broadcast()
}
func (d *Data[T]) LastWriteTime() time.Time { func (d *Data[T]) LastWriteTime() time.Time {
return d.LastValue.WriteTime return d.LastValue.WriteTime
} }

View File

@@ -82,7 +82,7 @@ type List[T any] struct {
} }
func (p *List[T]) PushValue(value T) { func (p *List[T]) PushValue(value T) {
p.Push(&ListItem[T]{Value: value}) p.Push(&ListItem[T]{Value: value, reset: true})
} }
func (p *List[T]) Push(item *ListItem[T]) { func (p *List[T]) Push(item *ListItem[T]) {

View File

@@ -309,6 +309,7 @@ func (p BytesPool) Get(size int) (item *ListItem[Buffer]) {
if item == nil { if item == nil {
item = &ListItem[Buffer]{ item = &ListItem[Buffer]{
Value: make(Buffer, size), Value: make(Buffer, size),
reset: true,
} }
} }
return return