mirror of
https://github.com/Monibuca/engine.git
synced 2025-10-25 01:10:29 +08:00
使用反射改造hook
This commit is contained in:
37
hook.go
37
hook.go
@@ -2,6 +2,7 @@ package engine
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -21,7 +22,7 @@ const (
|
|||||||
var Hooks = make(map[string]*RingBuffer)
|
var Hooks = make(map[string]*RingBuffer)
|
||||||
var hookLocker sync.Mutex
|
var hookLocker sync.Mutex
|
||||||
|
|
||||||
func AddHooks(hooks map[string]func(interface{})) {
|
func AddHooks(hooks map[string]interface{}) {
|
||||||
hookLocker.Lock()
|
hookLocker.Lock()
|
||||||
for name, hook := range hooks {
|
for name, hook := range hooks {
|
||||||
rl, ok := Hooks[name]
|
rl, ok := Hooks[name]
|
||||||
@@ -30,9 +31,13 @@ func AddHooks(hooks map[string]func(interface{})) {
|
|||||||
rl.Init(4)
|
rl.Init(4)
|
||||||
Hooks[name] = rl
|
Hooks[name] = rl
|
||||||
}
|
}
|
||||||
go func(hooks *RingBuffer, callback func(interface{})) {
|
go func(hooks *RingBuffer, callback interface{}) {
|
||||||
|
vf := reflect.ValueOf(callback)
|
||||||
|
if vf.Kind() != reflect.Func {
|
||||||
|
panic("callback is not a function")
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
callback(hooks.Read())
|
vf.Call(hooks.Read().([]reflect.Value))
|
||||||
hooks.MoveNext()
|
hooks.MoveNext()
|
||||||
}
|
}
|
||||||
}(rl.Clone(), hook)
|
}(rl.Clone(), hook)
|
||||||
@@ -40,7 +45,7 @@ func AddHooks(hooks map[string]func(interface{})) {
|
|||||||
hookLocker.Unlock()
|
hookLocker.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddHook(name string, callback func(interface{})) {
|
func AddHook(name string, callback interface{}) {
|
||||||
hookLocker.Lock()
|
hookLocker.Lock()
|
||||||
rl, ok := Hooks[name]
|
rl, ok := Hooks[name]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -49,12 +54,16 @@ func AddHook(name string, callback func(interface{})) {
|
|||||||
Hooks[name] = rl
|
Hooks[name] = rl
|
||||||
}
|
}
|
||||||
hookLocker.Unlock()
|
hookLocker.Unlock()
|
||||||
|
vf := reflect.ValueOf(callback)
|
||||||
|
if vf.Kind() != reflect.Func {
|
||||||
|
panic("callback is not a function")
|
||||||
|
}
|
||||||
for hooks := rl.Clone(); ; hooks.MoveNext() {
|
for hooks := rl.Clone(); ; hooks.MoveNext() {
|
||||||
callback(hooks.Read())
|
vf.Call(hooks.Read().([]reflect.Value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddHookWithContext(ctx context.Context, name string, callback func(interface{})) {
|
func AddHookWithContext(ctx context.Context, name string, callback interface{}) {
|
||||||
hookLocker.Lock()
|
hookLocker.Lock()
|
||||||
rl, ok := Hooks[name]
|
rl, ok := Hooks[name]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -62,19 +71,27 @@ func AddHookWithContext(ctx context.Context, name string, callback func(interfac
|
|||||||
Hooks[name] = rl
|
Hooks[name] = rl
|
||||||
}
|
}
|
||||||
hookLocker.Unlock()
|
hookLocker.Unlock()
|
||||||
|
vf := reflect.ValueOf(callback)
|
||||||
|
if vf.Kind() != reflect.Func {
|
||||||
|
panic("callback is not a function")
|
||||||
|
}
|
||||||
for hooks := rl.Clone(); ctx.Err() == nil; hooks.MoveNext() {
|
for hooks := rl.Clone(); ctx.Err() == nil; hooks.MoveNext() {
|
||||||
callback(hooks.Read())
|
vf.Call(hooks.Read().([]reflect.Value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TriggerHook(name string, payload interface{}) {
|
func TriggerHook(name string, payload ...interface{}) {
|
||||||
|
args := make([]reflect.Value, len(payload))
|
||||||
|
for i, arg := range payload {
|
||||||
|
args[i] = reflect.ValueOf(&arg).Elem()
|
||||||
|
}
|
||||||
hookLocker.Lock()
|
hookLocker.Lock()
|
||||||
defer hookLocker.Unlock()
|
defer hookLocker.Unlock()
|
||||||
if rl, ok := Hooks[name]; ok {
|
if rl, ok := Hooks[name]; ok {
|
||||||
rl.Write(payload)
|
rl.Write(args)
|
||||||
} else {
|
} else {
|
||||||
rl = NewRingBuffer(4)
|
rl = NewRingBuffer(4)
|
||||||
Hooks[name] = rl
|
Hooks[name] = rl
|
||||||
rl.Write(payload)
|
rl.Write(args)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -135,9 +135,9 @@ func (r *Stream) Subscribe(s *Subscriber) {
|
|||||||
s.Context, s.cancel = context.WithCancel(r)
|
s.Context, s.cancel = context.WithCancel(r)
|
||||||
r.subscribeMutex.Lock()
|
r.subscribeMutex.Lock()
|
||||||
r.Subscribers = append(r.Subscribers, s)
|
r.Subscribers = append(r.Subscribers, s)
|
||||||
|
TriggerHook(HOOK_SUBSCRIBE, s, len(r.Subscribers))
|
||||||
r.subscribeMutex.Unlock()
|
r.subscribeMutex.Unlock()
|
||||||
utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
|
utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
|
||||||
TriggerHook(HOOK_SUBSCRIBE, s)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,14 +147,15 @@ func (r *Stream) UnSubscribe(s *Subscriber) {
|
|||||||
var deleted bool
|
var deleted bool
|
||||||
r.subscribeMutex.Lock()
|
r.subscribeMutex.Lock()
|
||||||
r.Subscribers, deleted = DeleteSliceItem_Subscriber(r.Subscribers, s)
|
r.Subscribers, deleted = DeleteSliceItem_Subscriber(r.Subscribers, s)
|
||||||
r.subscribeMutex.Unlock()
|
|
||||||
if deleted {
|
if deleted {
|
||||||
utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
|
utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers))))
|
||||||
TriggerHook(HOOK_UNSUBSCRIBE, s)
|
l := len(r.Subscribers)
|
||||||
if len(r.Subscribers) == 0 && r.AutoUnPublish {
|
TriggerHook(HOOK_UNSUBSCRIBE, s, l)
|
||||||
|
if l == 0 && r.AutoUnPublish {
|
||||||
r.Close()
|
r.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
r.subscribeMutex.Unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func DeleteSliceItem_Subscriber(slice []*Subscriber, item *Subscriber) ([]*Subscriber, bool) {
|
func DeleteSliceItem_Subscriber(slice []*Subscriber, item *Subscriber) ([]*Subscriber, bool) {
|
||||||
|
|||||||
Reference in New Issue
Block a user