diff --git a/audio_track.go b/audio_track.go index 3e43136..e5e13d8 100644 --- a/audio_track.go +++ b/audio_track.go @@ -176,33 +176,18 @@ func (at *AudioTrack) Play(ctx context.Context, onAudio func(AudioPack)) { streamExit := at.Stream.Context.Done() ar := at.Clone() ap := ar.Read().(*AudioPack) - startTimestamp := ap.Timestamp - droped := 0 - var action, send func() - drop := func() { - if at.current().Sequence-ap.Sequence < 4 { - action = send - } else { - droped++ - } - } - send = func() { - if onAudio(ap.Copy(startTimestamp)); at.lastTs-ap.Timestamp > 1000 { - action = drop - } - } var extraExit <-chan struct{} if ctx != nil { extraExit = ctx.Done() } - for action = send; at.Flag != 2; ap = ar.Read().(*AudioPack) { + for startTimestamp := ap.Timestamp; at.Goon(); ap = ar.Read().(*AudioPack) { select { case <-extraExit: return case <-streamExit: return default: - action() + onAudio(ap.Copy(startTimestamp)) ar.MoveNext() } } diff --git a/base_track.go b/base_track.go index 5d47208..db7ce21 100644 --- a/base_track.go +++ b/base_track.go @@ -108,16 +108,17 @@ func (ts *Tracks) WaitTrack(codecs ...string) Track { return nil } } else { - go func() { - for { - if rt, ok := ring.Read().(string); ok { - wait <- rt - ring.MoveNext() - } else { - break - } - } - }() + go ring.ReadLoop(wait) + // go func() { + // for { + // if rt, ok := ring.Read().(string); ok { + // wait <- rt + // ring.MoveNext() + // } else { + // break + // } + // } + // }() for { select { case t := <-wait: diff --git a/hook.go b/hook.go index 20ac62e..67c51e4 100644 --- a/hook.go +++ b/hook.go @@ -27,20 +27,14 @@ func AddHooks(hooks map[string]interface{}) { for name, hook := range hooks { rl, ok := Hooks[name] if !ok { - rl = &RingBuffer{} - rl.Init(4) + rl = NewRingBuffer(4) Hooks[name] = rl } - go func(hooks *RingBuffer, callback interface{}) { - vf := reflect.ValueOf(callback) - if vf.Kind() != reflect.Func { - panic("callback is not a function") - } - for { - vf.Call(hooks.Read().([]reflect.Value)) - hooks.MoveNext() - } - }(rl.Clone(), hook) + vf := reflect.ValueOf(hook) + if vf.Kind() != reflect.Func { + panic("callback is not a function") + } + go rl.Clone().ReadLoop(vf.Call, nil) } hookLocker.Unlock() } @@ -49,8 +43,7 @@ func AddHook(name string, callback interface{}) { hookLocker.Lock() rl, ok := Hooks[name] if !ok { - rl = &RingBuffer{} - rl.Init(4) + rl = NewRingBuffer(4) Hooks[name] = rl } hookLocker.Unlock() @@ -58,9 +51,10 @@ func AddHook(name string, callback interface{}) { if vf.Kind() != reflect.Func { panic("callback is not a function") } - for hooks := rl.Clone(); ; hooks.MoveNext() { - vf.Call(hooks.Read().([]reflect.Value)) - } + rl.Clone().ReadLoop(vf.Call, nil) + // for hooks := rl.Clone(); ; hooks.MoveNext() { + // vf.Call(hooks.Read().([]reflect.Value)) + // } } func AddHookWithContext(ctx context.Context, name string, callback interface{}) { @@ -75,9 +69,10 @@ func AddHookWithContext(ctx context.Context, name string, callback interface{}) if vf.Kind() != reflect.Func { panic("callback is not a function") } - for hooks := rl.Clone(); ctx.Err() == nil; hooks.MoveNext() { - vf.Call(hooks.Read().([]reflect.Value)) - } + rl.Clone().ReadLoop(vf.Call, func() bool { return ctx.Err() == nil }) + // for hooks := rl.Clone(); ctx.Err() == nil; hooks.MoveNext() { + // vf.Call(hooks.Read().([]reflect.Value)) + // } } func TriggerHook(name string, payload ...interface{}) { diff --git a/main.go b/main.go index 92265ae..eef5614 100644 --- a/main.go +++ b/main.go @@ -17,7 +17,7 @@ import ( . "github.com/logrusorgru/aurora" ) -const Version = "3.1.6" +const Version = "3.1.7" var ( config = &struct { diff --git a/ring.go b/ring.go index 1fcc861..ef51866 100644 --- a/ring.go +++ b/ring.go @@ -2,8 +2,8 @@ package engine import ( "container/ring" + "reflect" "sync" - "sync/atomic" // "time" ) @@ -14,26 +14,8 @@ type RingItem struct { type RingBuffer struct { *ring.Ring - // UpdateTime time.Time //更新时间,用于计算是否超时 } -// 可释放的Ring,用于音视频 -type RingDisposable struct { - RingBuffer - Flag int32 // 0:不在写入,1:正在写入,2:已销毁 -} - -// 带锁的Ring,用于Hook -// type RingLock struct { -// RingBuffer -// sync.Mutex -// } -// func (r *RingLock) Write(value interface{}) { -// r.Lock() -// r.RingBuffer.Write(value) -// r.Unlock() -// } - // TODO: 池化,泛型 func NewRingBuffer(n int) (r *RingBuffer) { @@ -44,7 +26,6 @@ func NewRingBuffer(n int) (r *RingBuffer) { func (r *RingBuffer) Init(n int) { r.Ring = ring.New(n) - // r.UpdateTime = time.Now() for x := r.Ring; x.Value == nil; x = x.Next() { x.Value = new(RingItem) } @@ -55,53 +36,22 @@ func (rb RingBuffer) Clone() *RingBuffer { return &rb } -func (r *RingBuffer) SubRing(rr *ring.Ring) *RingBuffer { - r = r.Clone() +func (r RingBuffer) SubRing(rr *ring.Ring) *RingBuffer { r.Ring = rr - return r + return &r } func (r *RingBuffer) Write(value interface{}) { - // r.UpdateTime = time.Now() last := r.Current() last.Value = value r.GetNext().Add(1) last.Done() } -func (r *RingDisposable) Write(value interface{}) { - // r.UpdateTime = time.Now() - last := r.Current() - last.Value = value - if atomic.CompareAndSwapInt32(&r.Flag, 0, 1) { - current := r.GetNext() - current.Add(1) - last.Done() - //Flag不为1代表被Dispose了,但尚未处理Done - if !atomic.CompareAndSwapInt32(&r.Flag, 1, 0) { - current.Done() - } - } -} - -func (r *RingDisposable) Step() { - // r.UpdateTime = time.Now() - last := r.Current() - if atomic.CompareAndSwapInt32(&r.Flag, 0, 1) { - current := r.GetNext() - current.Add(1) - last.Done() - //Flag不为1代表被Dispose了,但尚未处理Done - if !atomic.CompareAndSwapInt32(&r.Flag, 1, 0) { - current.Done() - } - } -} - -func (r *RingBuffer) Read() interface{} { +func (r *RingBuffer) read() reflect.Value { current := r.Current() current.Wait() - return current.Value + return reflect.ValueOf(current.Value) } func (r *RingBuffer) CurrentValue() interface{} { @@ -112,7 +62,6 @@ func (r *RingBuffer) NextValue() interface{} { return r.Next().Value.(*RingItem).Value } - func (r *RingBuffer) Current() *RingItem { return r.Ring.Value.(*RingItem) } @@ -126,22 +75,38 @@ func (r *RingBuffer) GetNext() *RingItem { return r.Current() } -func (r *RingDisposable) Dispose() { +func (r *RingBuffer) Read() interface{} { current := r.Current() - if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) { - current.Done() - } else if atomic.CompareAndSwapInt32(&r.Flag, 1, 2) { - //当前是1代表正在写入,此时变成2,但是Done的任务得交给NextW来处理 - } else if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) { - current.Done() - } + current.Wait() + return current.Value } -// // Timeout 发布者是否超时了 -// func (r *RingBuffer) Timeout(t time.Duration) bool { -// // 如果设置为0则表示永不超时 -// if t == 0 { -// return false -// } -// return time.Since(r.UpdateTime) > t -// } +func (r *RingBuffer) ReadLoop(handler interface{}, goon func() bool) { + if goon == nil { + switch t := reflect.ValueOf(handler); t.Kind() { + case reflect.Chan: + for v := r.read(); ; v = r.read() { + t.Send(v) + r.MoveNext() + } + case reflect.Func: + for args := []reflect.Value{r.read()}; ; args[0] = r.read() { + t.Call(args) + r.MoveNext() + } + } + } else { + switch t := reflect.ValueOf(handler); t.Kind() { + case reflect.Chan: + for v := r.read(); goon(); v = r.read() { + t.Send(v) + r.MoveNext() + } + case reflect.Func: + for args := []reflect.Value{r.read()}; goon(); args[0] = r.read() { + t.Call(args) + r.MoveNext() + } + } + } +} diff --git a/ring_disposable.go b/ring_disposable.go new file mode 100644 index 0000000..1c003fa --- /dev/null +++ b/ring_disposable.go @@ -0,0 +1,72 @@ +package engine + +import ( + "container/ring" + "sync/atomic" +) + +// 可释放的Ring,用于音视频 +type RingDisposable struct { + RingBuffer + Flag *int32 // 0:不在写入,1:正在写入,2:已销毁 +} + +func (rb *RingDisposable) Init(n int) { + var flag int32 + rb.RingBuffer.Init(n) + rb.Flag = &flag +} + +func (rb RingDisposable) Clone() *RingDisposable { + return &rb +} + +func (r RingDisposable) SubRing(rr *ring.Ring) *RingDisposable { + r.Ring = rr + return &r +} + +func (r *RingDisposable) Write(value interface{}) { + last := r.Current() + last.Value = value + if atomic.CompareAndSwapInt32(r.Flag, 0, 1) { + current := r.GetNext() + current.Add(1) + last.Done() + //Flag不为1代表被Dispose了,但尚未处理Done + if !atomic.CompareAndSwapInt32(r.Flag, 1, 0) { + current.Done() + } + } +} + +func (r *RingDisposable) Step() { + last := r.Current() + if atomic.CompareAndSwapInt32(r.Flag, 0, 1) { + current := r.GetNext() + current.Add(1) + last.Done() + //Flag不为1代表被Dispose了,但尚未处理Done + if !atomic.CompareAndSwapInt32(r.Flag, 1, 0) { + current.Done() + } + } +} +func (r *RingDisposable) Dispose() { + current := r.Current() + if atomic.CompareAndSwapInt32(r.Flag, 0, 2) { + current.Done() + } else if atomic.CompareAndSwapInt32(r.Flag, 1, 2) { + //当前是1代表正在写入,此时变成2,但是Done的任务得交给NextW来处理 + } else if atomic.CompareAndSwapInt32(r.Flag, 0, 2) { + current.Done() + } +} + +func (r *RingDisposable) Goon() bool { + return *r.Flag != 2 +} + +func (r *RingDisposable) ReadLoop(handler interface{}) { + r.RingBuffer.ReadLoop(handler, r.Goon) +} diff --git a/subscriber.go b/subscriber.go index 17c17e0..6ace459 100644 --- a/subscriber.go +++ b/subscriber.go @@ -84,11 +84,10 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { } vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 ar := at.Clone() - // dropping := false //是否处于丢帧中 vp := vr.Read().(*VideoPack) ap := ar.Read().(*AudioPack) - startTimestamp := vp.Timestamp - for vt.Flag != 2 { + vst, ast := vp.Timestamp, ap.Timestamp + for vt.Goon() { select { case <-extraExit: return @@ -96,25 +95,11 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { return default: if ap.Timestamp > vp.Timestamp || ap.Timestamp == 0 { - s.OnVideo(vp.Copy(startTimestamp)) - // if !dropping { - // s.OnVideo(vp.Copy(startTimestamp)) - // if vt.lastTs - vp.Timestamp > 1000 { - // dropping = true - // } - // } else if vp.IDR { - // dropping = false - // } + s.OnVideo(vp.Copy(vst)) vr.MoveNext() vp = vr.Read().(*VideoPack) } else { - s.OnAudio(ap.Copy(startTimestamp)) - // if !dropping { - // s.OnAudio(ap.Copy(startTimestamp)) - // if at.CurrentValue().(AVPack).Since(ap.Timestamp) > 1000 { - // dropping = true - // } - // } + s.OnAudio(ap.Copy(ast)) ar.MoveNext() ap = ar.Read().(*AudioPack) } diff --git a/video_track.go b/video_track.go index 6fd06d5..c629a39 100644 --- a/video_track.go +++ b/video_track.go @@ -557,26 +557,14 @@ func (vt *VideoTrack) Play(ctx context.Context, onVideo func(VideoPack)) { } vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 vp := vr.Read().(*VideoPack) - startTimestamp := vp.Timestamp - var action, send func() - drop := func() { - if vp.IDR { - action = send - } - } - send = func() { - if onVideo(vp.Copy(startTimestamp)); vt.lastTs-vp.Timestamp > 1000 { - action = drop - } - } - for action = send; vt.Flag != 2; vp = vr.Read().(*VideoPack) { + for startTimestamp := vp.Timestamp; vt.Goon(); vp = vr.Read().(*VideoPack) { select { case <-extraExit: return case <-streamExit: return default: - action() + onVideo(vp.Copy(startTimestamp)) vr.MoveNext() } }