diff --git a/hook.go b/hook.go index 94a2983..813987e 100644 --- a/hook.go +++ b/hook.go @@ -2,6 +2,7 @@ package engine import ( "context" + "reflect" "sync" ) @@ -21,7 +22,7 @@ const ( var Hooks = make(map[string]*RingBuffer) var hookLocker sync.Mutex -func AddHooks(hooks map[string]func(interface{})) { +func AddHooks(hooks map[string]interface{}) { hookLocker.Lock() for name, hook := range hooks { rl, ok := Hooks[name] @@ -30,9 +31,13 @@ func AddHooks(hooks map[string]func(interface{})) { rl.Init(4) Hooks[name] = rl } - go func(hooks *RingBuffer, callback func(interface{})) { + go func(hooks *RingBuffer, callback interface{}) { + vf := reflect.ValueOf(callback) + if vf.Kind() != reflect.Func { + panic("callback is not a function") + } for { - callback(hooks.Read()) + vf.Call(hooks.Read().([]reflect.Value)) hooks.MoveNext() } }(rl.Clone(), hook) @@ -40,7 +45,7 @@ func AddHooks(hooks map[string]func(interface{})) { hookLocker.Unlock() } -func AddHook(name string, callback func(interface{})) { +func AddHook(name string, callback interface{}) { hookLocker.Lock() rl, ok := Hooks[name] if !ok { @@ -49,12 +54,16 @@ func AddHook(name string, callback func(interface{})) { Hooks[name] = rl } hookLocker.Unlock() + vf := reflect.ValueOf(callback) + if vf.Kind() != reflect.Func { + panic("callback is not a function") + } for hooks := rl.Clone(); ; hooks.MoveNext() { - callback(hooks.Read()) + vf.Call(hooks.Read().([]reflect.Value)) } } -func AddHookWithContext(ctx context.Context, name string, callback func(interface{})) { +func AddHookWithContext(ctx context.Context, name string, callback interface{}) { hookLocker.Lock() rl, ok := Hooks[name] if !ok { @@ -62,19 +71,27 @@ func AddHookWithContext(ctx context.Context, name string, callback func(interfac Hooks[name] = rl } hookLocker.Unlock() + vf := reflect.ValueOf(callback) + if vf.Kind() != reflect.Func { + panic("callback is not a function") + } for hooks := rl.Clone(); ctx.Err() == nil; hooks.MoveNext() { - callback(hooks.Read()) + vf.Call(hooks.Read().([]reflect.Value)) } } -func TriggerHook(name string, payload interface{}) { +func TriggerHook(name string, payload ...interface{}) { + args := make([]reflect.Value, len(payload)) + for i, arg := range payload { + args[i] = reflect.ValueOf(&arg).Elem() + } hookLocker.Lock() defer hookLocker.Unlock() if rl, ok := Hooks[name]; ok { - rl.Write(payload) + rl.Write(args) } else { rl = NewRingBuffer(4) Hooks[name] = rl - rl.Write(payload) + rl.Write(args) } } diff --git a/stream.go b/stream.go index 452263e..23580df 100644 --- a/stream.go +++ b/stream.go @@ -135,9 +135,9 @@ func (r *Stream) Subscribe(s *Subscriber) { s.Context, s.cancel = context.WithCancel(r) r.subscribeMutex.Lock() r.Subscribers = append(r.Subscribers, s) + TriggerHook(HOOK_SUBSCRIBE, s, len(r.Subscribers)) r.subscribeMutex.Unlock() utils.Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) - TriggerHook(HOOK_SUBSCRIBE, s) } } @@ -147,14 +147,15 @@ func (r *Stream) UnSubscribe(s *Subscriber) { var deleted bool r.subscribeMutex.Lock() r.Subscribers, deleted = DeleteSliceItem_Subscriber(r.Subscribers, s) - r.subscribeMutex.Unlock() if deleted { utils.Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) - TriggerHook(HOOK_UNSUBSCRIBE, s) - if len(r.Subscribers) == 0 && r.AutoUnPublish { + l := len(r.Subscribers) + TriggerHook(HOOK_UNSUBSCRIBE, s, l) + if l == 0 && r.AutoUnPublish { r.Close() } } + r.subscribeMutex.Unlock() } } func DeleteSliceItem_Subscriber(slice []*Subscriber, item *Subscriber) ([]*Subscriber, bool) {