优化代码

This commit is contained in:
李宇翔
2021-07-15 21:10:44 +08:00
parent d7893239f2
commit ad99534f84
8 changed files with 144 additions and 153 deletions

View File

@@ -176,33 +176,18 @@ func (at *AudioTrack) Play(ctx context.Context, onAudio func(AudioPack)) {
streamExit := at.Stream.Context.Done()
ar := at.Clone()
ap := ar.Read().(*AudioPack)
startTimestamp := ap.Timestamp
droped := 0
var action, send func()
drop := func() {
if at.current().Sequence-ap.Sequence < 4 {
action = send
} else {
droped++
}
}
send = func() {
if onAudio(ap.Copy(startTimestamp)); at.lastTs-ap.Timestamp > 1000 {
action = drop
}
}
var extraExit <-chan struct{}
if ctx != nil {
extraExit = ctx.Done()
}
for action = send; at.Flag != 2; ap = ar.Read().(*AudioPack) {
for startTimestamp := ap.Timestamp; at.Goon(); ap = ar.Read().(*AudioPack) {
select {
case <-extraExit:
return
case <-streamExit:
return
default:
action()
onAudio(ap.Copy(startTimestamp))
ar.MoveNext()
}
}

View File

@@ -108,16 +108,17 @@ func (ts *Tracks) WaitTrack(codecs ...string) Track {
return nil
}
} else {
go func() {
for {
if rt, ok := ring.Read().(string); ok {
wait <- rt
ring.MoveNext()
} else {
break
}
}
}()
go ring.ReadLoop(wait)
// go func() {
// for {
// if rt, ok := ring.Read().(string); ok {
// wait <- rt
// ring.MoveNext()
// } else {
// break
// }
// }
// }()
for {
select {
case t := <-wait:

29
hook.go
View File

@@ -27,20 +27,14 @@ func AddHooks(hooks map[string]interface{}) {
for name, hook := range hooks {
rl, ok := Hooks[name]
if !ok {
rl = &RingBuffer{}
rl.Init(4)
rl = NewRingBuffer(4)
Hooks[name] = rl
}
go func(hooks *RingBuffer, callback interface{}) {
vf := reflect.ValueOf(callback)
vf := reflect.ValueOf(hook)
if vf.Kind() != reflect.Func {
panic("callback is not a function")
}
for {
vf.Call(hooks.Read().([]reflect.Value))
hooks.MoveNext()
}
}(rl.Clone(), hook)
go rl.Clone().ReadLoop(vf.Call, nil)
}
hookLocker.Unlock()
}
@@ -49,8 +43,7 @@ func AddHook(name string, callback interface{}) {
hookLocker.Lock()
rl, ok := Hooks[name]
if !ok {
rl = &RingBuffer{}
rl.Init(4)
rl = NewRingBuffer(4)
Hooks[name] = rl
}
hookLocker.Unlock()
@@ -58,9 +51,10 @@ func AddHook(name string, callback interface{}) {
if vf.Kind() != reflect.Func {
panic("callback is not a function")
}
for hooks := rl.Clone(); ; hooks.MoveNext() {
vf.Call(hooks.Read().([]reflect.Value))
}
rl.Clone().ReadLoop(vf.Call, nil)
// for hooks := rl.Clone(); ; hooks.MoveNext() {
// vf.Call(hooks.Read().([]reflect.Value))
// }
}
func AddHookWithContext(ctx context.Context, name string, callback interface{}) {
@@ -75,9 +69,10 @@ func AddHookWithContext(ctx context.Context, name string, callback interface{})
if vf.Kind() != reflect.Func {
panic("callback is not a function")
}
for hooks := rl.Clone(); ctx.Err() == nil; hooks.MoveNext() {
vf.Call(hooks.Read().([]reflect.Value))
}
rl.Clone().ReadLoop(vf.Call, func() bool { return ctx.Err() == nil })
// for hooks := rl.Clone(); ctx.Err() == nil; hooks.MoveNext() {
// vf.Call(hooks.Read().([]reflect.Value))
// }
}
func TriggerHook(name string, payload ...interface{}) {

View File

@@ -17,7 +17,7 @@ import (
. "github.com/logrusorgru/aurora"
)
const Version = "3.1.6"
const Version = "3.1.7"
var (
config = &struct {

109
ring.go
View File

@@ -2,8 +2,8 @@ package engine
import (
"container/ring"
"reflect"
"sync"
"sync/atomic"
// "time"
)
@@ -14,26 +14,8 @@ type RingItem struct {
type RingBuffer struct {
*ring.Ring
// UpdateTime time.Time //更新时间,用于计算是否超时
}
// 可释放的Ring用于音视频
type RingDisposable struct {
RingBuffer
Flag int32 // 0:不在写入1正在写入2已销毁
}
// 带锁的Ring用于Hook
// type RingLock struct {
// RingBuffer
// sync.Mutex
// }
// func (r *RingLock) Write(value interface{}) {
// r.Lock()
// r.RingBuffer.Write(value)
// r.Unlock()
// }
// TODO: 池化,泛型
func NewRingBuffer(n int) (r *RingBuffer) {
@@ -44,7 +26,6 @@ func NewRingBuffer(n int) (r *RingBuffer) {
func (r *RingBuffer) Init(n int) {
r.Ring = ring.New(n)
// r.UpdateTime = time.Now()
for x := r.Ring; x.Value == nil; x = x.Next() {
x.Value = new(RingItem)
}
@@ -55,53 +36,22 @@ func (rb RingBuffer) Clone() *RingBuffer {
return &rb
}
func (r *RingBuffer) SubRing(rr *ring.Ring) *RingBuffer {
r = r.Clone()
func (r RingBuffer) SubRing(rr *ring.Ring) *RingBuffer {
r.Ring = rr
return r
return &r
}
func (r *RingBuffer) Write(value interface{}) {
// r.UpdateTime = time.Now()
last := r.Current()
last.Value = value
r.GetNext().Add(1)
last.Done()
}
func (r *RingDisposable) Write(value interface{}) {
// r.UpdateTime = time.Now()
last := r.Current()
last.Value = value
if atomic.CompareAndSwapInt32(&r.Flag, 0, 1) {
current := r.GetNext()
current.Add(1)
last.Done()
//Flag不为1代表被Dispose了但尚未处理Done
if !atomic.CompareAndSwapInt32(&r.Flag, 1, 0) {
current.Done()
}
}
}
func (r *RingDisposable) Step() {
// r.UpdateTime = time.Now()
last := r.Current()
if atomic.CompareAndSwapInt32(&r.Flag, 0, 1) {
current := r.GetNext()
current.Add(1)
last.Done()
//Flag不为1代表被Dispose了但尚未处理Done
if !atomic.CompareAndSwapInt32(&r.Flag, 1, 0) {
current.Done()
}
}
}
func (r *RingBuffer) Read() interface{} {
func (r *RingBuffer) read() reflect.Value {
current := r.Current()
current.Wait()
return current.Value
return reflect.ValueOf(current.Value)
}
func (r *RingBuffer) CurrentValue() interface{} {
@@ -112,7 +62,6 @@ func (r *RingBuffer) NextValue() interface{} {
return r.Next().Value.(*RingItem).Value
}
func (r *RingBuffer) Current() *RingItem {
return r.Ring.Value.(*RingItem)
}
@@ -126,22 +75,38 @@ func (r *RingBuffer) GetNext() *RingItem {
return r.Current()
}
func (r *RingDisposable) Dispose() {
func (r *RingBuffer) Read() interface{} {
current := r.Current()
if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) {
current.Done()
} else if atomic.CompareAndSwapInt32(&r.Flag, 1, 2) {
//当前是1代表正在写入此时变成2但是Done的任务得交给NextW来处理
} else if atomic.CompareAndSwapInt32(&r.Flag, 0, 2) {
current.Done()
}
current.Wait()
return current.Value
}
// // Timeout 发布者是否超时了
// func (r *RingBuffer) Timeout(t time.Duration) bool {
// // 如果设置为0则表示永不超时
// if t == 0 {
// return false
// }
// return time.Since(r.UpdateTime) > t
// }
func (r *RingBuffer) ReadLoop(handler interface{}, goon func() bool) {
if goon == nil {
switch t := reflect.ValueOf(handler); t.Kind() {
case reflect.Chan:
for v := r.read(); ; v = r.read() {
t.Send(v)
r.MoveNext()
}
case reflect.Func:
for args := []reflect.Value{r.read()}; ; args[0] = r.read() {
t.Call(args)
r.MoveNext()
}
}
} else {
switch t := reflect.ValueOf(handler); t.Kind() {
case reflect.Chan:
for v := r.read(); goon(); v = r.read() {
t.Send(v)
r.MoveNext()
}
case reflect.Func:
for args := []reflect.Value{r.read()}; goon(); args[0] = r.read() {
t.Call(args)
r.MoveNext()
}
}
}
}

72
ring_disposable.go Normal file
View File

@@ -0,0 +1,72 @@
package engine
import (
"container/ring"
"sync/atomic"
)
// 可释放的Ring用于音视频
type RingDisposable struct {
RingBuffer
Flag *int32 // 0:不在写入1正在写入2已销毁
}
func (rb *RingDisposable) Init(n int) {
var flag int32
rb.RingBuffer.Init(n)
rb.Flag = &flag
}
func (rb RingDisposable) Clone() *RingDisposable {
return &rb
}
func (r RingDisposable) SubRing(rr *ring.Ring) *RingDisposable {
r.Ring = rr
return &r
}
func (r *RingDisposable) Write(value interface{}) {
last := r.Current()
last.Value = value
if atomic.CompareAndSwapInt32(r.Flag, 0, 1) {
current := r.GetNext()
current.Add(1)
last.Done()
//Flag不为1代表被Dispose了但尚未处理Done
if !atomic.CompareAndSwapInt32(r.Flag, 1, 0) {
current.Done()
}
}
}
func (r *RingDisposable) Step() {
last := r.Current()
if atomic.CompareAndSwapInt32(r.Flag, 0, 1) {
current := r.GetNext()
current.Add(1)
last.Done()
//Flag不为1代表被Dispose了但尚未处理Done
if !atomic.CompareAndSwapInt32(r.Flag, 1, 0) {
current.Done()
}
}
}
func (r *RingDisposable) Dispose() {
current := r.Current()
if atomic.CompareAndSwapInt32(r.Flag, 0, 2) {
current.Done()
} else if atomic.CompareAndSwapInt32(r.Flag, 1, 2) {
//当前是1代表正在写入此时变成2但是Done的任务得交给NextW来处理
} else if atomic.CompareAndSwapInt32(r.Flag, 0, 2) {
current.Done()
}
}
func (r *RingDisposable) Goon() bool {
return *r.Flag != 2
}
func (r *RingDisposable) ReadLoop(handler interface{}) {
r.RingBuffer.ReadLoop(handler, r.Goon)
}

View File

@@ -84,11 +84,10 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) {
}
vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开
ar := at.Clone()
// dropping := false //是否处于丢帧中
vp := vr.Read().(*VideoPack)
ap := ar.Read().(*AudioPack)
startTimestamp := vp.Timestamp
for vt.Flag != 2 {
vst, ast := vp.Timestamp, ap.Timestamp
for vt.Goon() {
select {
case <-extraExit:
return
@@ -96,25 +95,11 @@ func (s *Subscriber) Play(at *AudioTrack, vt *VideoTrack) {
return
default:
if ap.Timestamp > vp.Timestamp || ap.Timestamp == 0 {
s.OnVideo(vp.Copy(startTimestamp))
// if !dropping {
// s.OnVideo(vp.Copy(startTimestamp))
// if vt.lastTs - vp.Timestamp > 1000 {
// dropping = true
// }
// } else if vp.IDR {
// dropping = false
// }
s.OnVideo(vp.Copy(vst))
vr.MoveNext()
vp = vr.Read().(*VideoPack)
} else {
s.OnAudio(ap.Copy(startTimestamp))
// if !dropping {
// s.OnAudio(ap.Copy(startTimestamp))
// if at.CurrentValue().(AVPack).Since(ap.Timestamp) > 1000 {
// dropping = true
// }
// }
s.OnAudio(ap.Copy(ast))
ar.MoveNext()
ap = ar.Read().(*AudioPack)
}

View File

@@ -557,26 +557,14 @@ func (vt *VideoTrack) Play(ctx context.Context, onVideo func(VideoPack)) {
}
vr := vt.SubRing(vt.IDRing) //从关键帧开始读取,首屏秒开
vp := vr.Read().(*VideoPack)
startTimestamp := vp.Timestamp
var action, send func()
drop := func() {
if vp.IDR {
action = send
}
}
send = func() {
if onVideo(vp.Copy(startTimestamp)); vt.lastTs-vp.Timestamp > 1000 {
action = drop
}
}
for action = send; vt.Flag != 2; vp = vr.Read().(*VideoPack) {
for startTimestamp := vp.Timestamp; vt.Goon(); vp = vr.Read().(*VideoPack) {
select {
case <-extraExit:
return
case <-streamExit:
return
default:
action()
onVideo(vp.Copy(startTimestamp))
vr.MoveNext()
}
}