feat: add waitGroup

This commit is contained in:
langhuihui
2023-06-08 09:34:00 +08:00
parent 28a51b9b60
commit ac7a26c6e2
5 changed files with 13 additions and 2 deletions

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"io" "io"
"net" "net"
"sync"
"time" "time"
"github.com/pion/rtp" "github.com/pion/rtp"
@@ -58,7 +59,8 @@ type DataFrame[T any] struct {
type AVFrame struct { type AVFrame struct {
BaseFrame BaseFrame
IFrame bool IFrame bool
CanRead bool `json:"-" yaml:"-"` CanRead bool `json:"-" yaml:"-"`
WG sync.WaitGroup `json:"-" yaml:"-"`
PTS time.Duration PTS time.Duration
DTS time.Duration DTS time.Duration
Timestamp time.Duration // 绝对时间戳 Timestamp time.Duration // 绝对时间戳

View File

@@ -53,6 +53,8 @@ func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) {
if !summary.Running() { if !summary.Running() {
summary.collect() summary.collect()
} }
summary.rw.RLock()
defer summary.rw.RUnlock()
if y { if y {
if err := yaml.NewEncoder(rw).Encode(&summary); err != nil { if err := yaml.NewEncoder(rw).Encode(&summary); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError) http.Error(rw, err.Error(), http.StatusInternalServerError)

View File

@@ -1,6 +1,7 @@
package engine package engine
import ( import (
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -39,6 +40,7 @@ type Summary struct {
Streams []StreamSummay Streams []StreamSummay
lastNetWork []net.IOCountersStat lastNetWork []net.IOCountersStat
ref atomic.Int32 ref atomic.Int32
rw sync.RWMutex
} }
// NetWorkInfo 网速信息 // NetWorkInfo 网速信息
@@ -92,6 +94,8 @@ func (s *Summary) Report(slave *Summary) {
} }
func (s *Summary) collect() *Summary { func (s *Summary) collect() *Summary {
s.rw.Lock()
defer s.rw.Unlock()
v, _ := mem.VirtualMemory() v, _ := mem.VirtualMemory()
d, _ := disk.Usage("/") d, _ := disk.Usage("/")
nv, _ := net.IOCounters(true) nv, _ := net.IOCounters(true)

View File

@@ -153,6 +153,7 @@ func (av *Media) SetStuff(stuff ...any) {
switch v := s.(type) { switch v := s.(type) {
case int: case int:
av.Init(v) av.Init(v)
av.CurrentFrame().WG.Add(1)
av.SSRC = uint32(uintptr(unsafe.Pointer(av))) av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
av.等待上限 = config.Global.SpeedLimit av.等待上限 = config.Global.SpeedLimit
case uint32: case uint32:
@@ -310,9 +311,11 @@ func (av *Media) Flush() {
preValue = curValue preValue = curValue
curValue = av.MoveNext() curValue = av.MoveNext()
curValue.CanRead = false curValue.CanRead = false
curValue.WG.Add(1)
curValue.Reset() curValue.Reset()
curValue.Sequence = av.MoveCount curValue.Sequence = av.MoveCount
preValue.CanRead = true preValue.CanRead = true
preValue.WG.Done()
} }
func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration { func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration {

View File

@@ -58,7 +58,7 @@ func NewAVRingReader(t *Media, poll time.Duration) *AVRingReader {
} }
func (r *AVRingReader) ReadFrame() *common.AVFrame { func (r *AVRingReader) ReadFrame() *common.AVFrame {
for r.Frame = &r.Value; r.ctx.Err() == nil && !r.Frame.CanRead; r.wait() { for r.Frame = &r.Value; r.ctx.Err() == nil && !r.Frame.CanRead; r.Frame.WG.Wait() {
} }
// 超过一半的缓冲区大小说明Reader太慢需要丢帧 // 超过一半的缓冲区大小说明Reader太慢需要丢帧
if r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence { if r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Frame.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Frame.Sequence {