From 3a16fe78838dbba8facd2feb7c7536dc4a287ac6 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Thu, 19 Aug 2021 14:38:05 +0800 Subject: [PATCH] =?UTF-8?q?GOP=E8=BF=BD=E5=B8=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ring.go | 104 ++++++++++++++++++++++++------------------------- ring_av.go | 40 ++++--------------- subscriber.go | 16 ++++++-- video_track.go | 14 ++++++- 4 files changed, 84 insertions(+), 90 deletions(-) diff --git a/ring.go b/ring.go index 6857e39..e0ef3c9 100644 --- a/ring.go +++ b/ring.go @@ -12,7 +12,7 @@ import ( type DataItem struct { Timestamp time.Time Sequence int - Value interface{} + Value interface{} } // TODO: 池化,泛型 @@ -28,123 +28,123 @@ type RingBuffer struct { context.Context } -func (r *RingBuffer) Init(ctx context.Context, n int) *RingBuffer { +func (rb *RingBuffer) Init(ctx context.Context, n int) *RingBuffer { var flag int32 - if r == nil { - r = &RingBuffer{Context: ctx, Ring: ring.New(n), Flag: &flag} + if rb == nil { + rb = &RingBuffer{Context: ctx, Ring: ring.New(n), Flag: &flag} } else { - r.Ring = ring.New(n) - r.Context = ctx - r.Flag = &flag + rb.Ring = ring.New(n) + rb.Context = ctx + rb.Flag = &flag } - for x := r.Ring; x.Value == nil; x = x.Next() { + for x := rb.Ring; x.Value == nil; x = x.Next() { x.Value = new(LockItem) } - r.Current().Lock() - return r + rb.Current().Lock() + return rb } func (rb RingBuffer) Clone() *RingBuffer { return &rb } -func (r RingBuffer) SubRing(rr *ring.Ring) *RingBuffer { - r.Ring = rr - return &r +func (rb RingBuffer) SubRing(rr *ring.Ring) *RingBuffer { + rb.Ring = rr + return &rb } -func (r *RingBuffer) CurrentValue() interface{} { - return r.Current().Value +func (rb *RingBuffer) CurrentValue() interface{} { + return rb.Current().Value } -func (r *RingBuffer) NextValue() interface{} { - return r.Next().Value.(*LockItem).Value +func (rb *RingBuffer) NextValue() interface{} { + return rb.Next().Value.(*LockItem).Value } -func (r *RingBuffer) Current() *LockItem { - return r.Ring.Value.(*LockItem) +func (rb *RingBuffer) Current() *LockItem { + return rb.Ring.Value.(*LockItem) } -func (r *RingBuffer) MoveNext() { - r.Ring = r.Next() +func (rb *RingBuffer) MoveNext() { + rb.Ring = rb.Next() } -func (r *RingBuffer) GetNext() *LockItem { - r.MoveNext() - return r.Current() +func (rb *RingBuffer) GetNext() *LockItem { + rb.MoveNext() + return rb.Current() } -func (r *RingBuffer) Read() interface{} { - current := r.Current() +func (rb *RingBuffer) Read() interface{} { + current := rb.Current() current.RLock() defer current.RUnlock() return current.Value } -func (r *RingBuffer) Step() { - last := r.Current() - if atomic.CompareAndSwapInt32(r.Flag, 0, 1) { - current := r.GetNext() +func (rb *RingBuffer) Step() { + last := rb.Current() + if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) { + current := rb.GetNext() current.Lock() last.Unlock() //Flag不为1代表被Dispose了,但尚未处理Done - if !atomic.CompareAndSwapInt32(r.Flag, 1, 0) { + if !atomic.CompareAndSwapInt32(rb.Flag, 1, 0) { current.Unlock() } } } -func (r *RingBuffer) Write(value interface{}) { - last := r.Current() +func (rb *RingBuffer) Write(value interface{}) { + last := rb.Current() last.Value = value - if atomic.CompareAndSwapInt32(r.Flag, 0, 1) { - current := r.GetNext() + if atomic.CompareAndSwapInt32(rb.Flag, 0, 1) { + current := rb.GetNext() current.Lock() last.Unlock() //Flag不为1代表被Dispose了,但尚未处理Done - if !atomic.CompareAndSwapInt32(r.Flag, 1, 0) { + if !atomic.CompareAndSwapInt32(rb.Flag, 1, 0) { current.Unlock() } } } -func (r *RingBuffer) Dispose() { - current := r.Current() - if atomic.CompareAndSwapInt32(r.Flag, 0, 2) { +func (rb *RingBuffer) Dispose() { + current := rb.Current() + if atomic.CompareAndSwapInt32(rb.Flag, 0, 2) { current.Unlock() - } else if atomic.CompareAndSwapInt32(r.Flag, 1, 2) { + } else if atomic.CompareAndSwapInt32(rb.Flag, 1, 2) { //当前是1代表正在写入,此时变成2,但是Done的任务得交给NextW来处理 - } else if atomic.CompareAndSwapInt32(r.Flag, 0, 2) { + } else if atomic.CompareAndSwapInt32(rb.Flag, 0, 2) { current.Unlock() } } -func (r *RingBuffer) read() reflect.Value { - return reflect.ValueOf(r.Read()) +func (rb *RingBuffer) read() reflect.Value { + return reflect.ValueOf(rb.Read()) } -func (r *RingBuffer) nextRead() reflect.Value { - r.MoveNext() - return r.read() +func (rb *RingBuffer) nextRead() reflect.Value { + rb.MoveNext() + return rb.read() } // ReadLoop 循环读取,采用了反射机制,不适用高性能场景 // handler入参可以传入回调函数或者channel -func (r *RingBuffer) ReadLoop(handler interface{}) { - r.ReadLoopConditional(handler, func() bool { - return r.Err() == nil && *r.Flag != 2 +func (rb *RingBuffer) ReadLoop(handler interface{}) { + rb.ReadLoopConditional(handler, func() bool { + return rb.Err() == nil && *rb.Flag != 2 }) } // goon判断函数用来判断是否继续读取,返回false将终止循环 -func (r *RingBuffer) ReadLoopConditional(handler interface{}, goon func() bool) { +func (rb *RingBuffer) ReadLoopConditional(handler interface{}, goon func() bool) { switch t := reflect.ValueOf(handler); t.Kind() { case reflect.Chan: - for v := r.read(); goon(); v = r.nextRead() { + for v := rb.read(); goon(); v = rb.nextRead() { t.Send(v) } case reflect.Func: - for args := []reflect.Value{r.read()}; goon(); args[0] = r.nextRead() { + for args := []reflect.Value{rb.read()}; goon(); args[0] = rb.nextRead() { t.Call(args) } } diff --git a/ring_av.go b/ring_av.go index e2f2c5e..9baa1f6 100644 --- a/ring_av.go +++ b/ring_av.go @@ -3,7 +3,6 @@ package engine import ( "container/ring" "context" - "reflect" "runtime" "time" ) @@ -11,8 +10,8 @@ import ( type AVItem struct { Timestamp uint32 Sequence int - Value interface{} - canRead bool + Value interface{} + canRead bool } func (p *AVItem) Since(ts uint32) uint32 { @@ -36,8 +35,8 @@ func (r *AVRing) Init(ctx context.Context, n int) *AVRing { } return r } -func (rb AVRing) Clone() *AVRing { - return &rb +func (r AVRing) Clone() *AVRing { + return &r } func (r AVRing) SubRing(rr *ring.Ring) *AVRing { @@ -65,19 +64,6 @@ func (r *AVRing) wait() { } } -func (r *AVRing) read() reflect.Value { - current := r.Current() - for r.Err() == nil && !current.canRead { - r.wait() - } - return reflect.ValueOf(current.Value) -} - -func (r *AVRing) nextRead() reflect.Value { - r.MoveNext() - return r.read() -} - func (r *AVRing) CurrentValue() interface{} { return r.Current().Value } @@ -93,6 +79,9 @@ func (r *AVRing) NextRead() (item *AVItem, value interface{}) { func (r *AVRing) NextValue() interface{} { return r.Next().Value.(*AVItem).Value } +func (r *AVRing) PreItem() *AVItem { + return r.Prev().Value.(*AVItem) +} func (r *AVRing) GetNext() *AVItem { r.MoveNext() return r.Current() @@ -104,18 +93,3 @@ func (r *AVRing) Read() (item *AVItem, value interface{}) { } return current, current.Value } - -// ReadLoop 循环读取,采用了反射机制,不适用高性能场景 -// handler入参可以传入回调函数或者channel -func (r *AVRing) ReadLoop(handler interface{}) { - switch t := reflect.ValueOf(handler); t.Kind() { - case reflect.Chan: - for v := r.read(); r.Err() == nil; v = r.nextRead() { - t.Send(v) - } - case reflect.Func: - for args := []reflect.Value{r.read()}; r.Err() == nil; args[0] = r.nextRead() { - t.Call(args) - } - } -} diff --git a/subscriber.go b/subscriber.go index 9e11f20..1bb1def 100644 --- a/subscriber.go +++ b/subscriber.go @@ -82,11 +82,13 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { case <-extraExit: //可能等不到关键帧就退出了 return } - vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 + vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 + realSt := vt.PreItem().Timestamp // 当前时间戳 ar := at.Clone() iv, vp := vr.Read() ia, ap := ar.Read() - vst, ast := iv.Timestamp, ia.Timestamp + vst := iv.Timestamp + chase := true for { select { case <-extraExit: @@ -96,10 +98,18 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) { default: if ia.Timestamp > iv.Timestamp || ia.Timestamp == 0 { s.OnVideo(iv.Timestamp-vst, vp.(*VideoPack)) + if chase { + if vst < realSt-10 { + vst += 10 + } else { + vst = realSt + chase = false + } + } vr.MoveNext() iv, vp = vr.Read() } else { - s.OnAudio(ia.Timestamp-ast, ap.(*AudioPack)) + s.OnAudio(ia.Timestamp-vst, ap.(*AudioPack)) ar.MoveNext() ia, ap = ar.Read() } diff --git a/video_track.go b/video_track.go index b4f08a7..faa92d8 100644 --- a/video_track.go +++ b/video_track.go @@ -384,9 +384,11 @@ func (vt *VideoTrack) Play(onVideo func(uint32, *VideoPack), exit1, exit2 <-chan case <-exit2: //可能等不到关键帧就退出了 return } - vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 + vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开 + realSt := vt.PreItem().Timestamp // 当前时间戳 item, vp := vr.Read() - for startTimestamp := item.Timestamp; ; item, vp = vr.Read() { + startTimestamp := item.Timestamp + for chase := true; ; item, vp = vr.Read() { select { case <-exit1: return @@ -394,6 +396,14 @@ func (vt *VideoTrack) Play(onVideo func(uint32, *VideoPack), exit1, exit2 <-chan return default: onVideo(item.Timestamp-startTimestamp, vp.(*VideoPack)) + if chase { + if startTimestamp < realSt-10 { + startTimestamp += 10 + } else { + startTimestamp = realSt + chase = false + } + } vr.MoveNext() } }