diff --git a/common/frame.go b/common/frame.go index 4eefdfa..0c71a6d 100644 --- a/common/frame.go +++ b/common/frame.go @@ -4,6 +4,7 @@ import ( "bytes" "io" "net" + "sync" "time" "github.com/pion/rtp" @@ -58,7 +59,8 @@ type DataFrame[T any] struct { type AVFrame struct { BaseFrame IFrame bool - CanRead bool `json:"-" yaml:"-"` + CanRead bool `json:"-" yaml:"-"` + WG sync.WaitGroup `json:"-" yaml:"-"` PTS time.Duration DTS time.Duration Timestamp time.Duration // 绝对时间戳 diff --git a/http.go b/http.go index 8ccda43..80b1fb8 100644 --- a/http.go +++ b/http.go @@ -53,6 +53,8 @@ func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) { if !summary.Running() { summary.collect() } + summary.rw.RLock() + defer summary.rw.RUnlock() if y { if err := yaml.NewEncoder(rw).Encode(&summary); err != nil { http.Error(rw, err.Error(), http.StatusInternalServerError) diff --git a/summary.go b/summary.go index 675ba5e..c61b7e6 100644 --- a/summary.go +++ b/summary.go @@ -1,6 +1,7 @@ package engine import ( + "sync" "sync/atomic" "time" @@ -39,6 +40,7 @@ type Summary struct { Streams []StreamSummay lastNetWork []net.IOCountersStat ref atomic.Int32 + rw sync.RWMutex } // NetWorkInfo 网速信息 @@ -92,6 +94,8 @@ func (s *Summary) Report(slave *Summary) { } func (s *Summary) collect() *Summary { + s.rw.Lock() + defer s.rw.Unlock() v, _ := mem.VirtualMemory() d, _ := disk.Usage("/") nv, _ := net.IOCounters(true) diff --git a/track/base.go b/track/base.go index 687a241..3d53b12 100644 --- a/track/base.go +++ b/track/base.go @@ -153,6 +153,7 @@ func (av *Media) SetStuff(stuff ...any) { switch v := s.(type) { case int: av.Init(v) + av.CurrentFrame().WG.Add(1) av.SSRC = uint32(uintptr(unsafe.Pointer(av))) av.等待上限 = config.Global.SpeedLimit case uint32: @@ -310,9 +311,11 @@ func (av *Media) Flush() { preValue = curValue curValue = av.MoveNext() curValue.CanRead = false + curValue.WG.Add(1) curValue.Reset() curValue.Sequence = av.MoveCount preValue.CanRead = true + preValue.WG.Done() } func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration { diff --git a/track/reader-av.go b/track/reader-av.go index aaafdc8..7af69d7 100644 --- a/track/reader-av.go +++ b/track/reader-av.go @@ -58,7 +58,7 @@ func NewAVRingReader(t *Media, poll time.Duration) *AVRingReader { } func (r *AVRingReader) ReadFrame() *common.AVFrame { - for r.Frame = &r.Value; r.ctx.Err() == nil && !r.Frame.CanRead; r.wait() { + for r.Frame = &r.Value; r.ctx.Err() == nil && !r.Frame.CanRead; r.Frame.WG.Wait() { } // 超过一半的缓冲区大小,说明Reader太慢,需要丢帧 if r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence {