mirror of
https://github.com/gookit/event
synced 2025-09-26 19:11:14 +08:00
👔 up: refactoring the files structure, add fire with context
This commit is contained in:
83
event.go
83
event.go
@@ -1,6 +1,10 @@
|
||||
// Package event is lightweight event manager and dispatcher implements by Go.
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// wildcard event name
|
||||
const (
|
||||
Wildcard = "*"
|
||||
@@ -35,7 +39,7 @@ type M = map[string]any
|
||||
// ManagerFace event manager interface
|
||||
type ManagerFace interface {
|
||||
// AddEvent events: add event
|
||||
AddEvent(Event)
|
||||
AddEvent(Event) error
|
||||
// On listeners: add listeners
|
||||
On(name string, listener Listener, priority ...int)
|
||||
// Fire event
|
||||
@@ -57,9 +61,7 @@ type Options struct {
|
||||
type OptionFn func(o *Options)
|
||||
|
||||
// UsePathMode set event name match mode to ModePath
|
||||
func UsePathMode(o *Options) {
|
||||
o.MatchMode = ModePath
|
||||
}
|
||||
func UsePathMode(o *Options) { o.MatchMode = ModePath }
|
||||
|
||||
// EnableLock enable lock on fire event.
|
||||
func EnableLock(enable bool) OptionFn {
|
||||
@@ -78,6 +80,9 @@ type Event interface {
|
||||
SetData(M) Event
|
||||
Abort(bool)
|
||||
IsAborted() bool
|
||||
// Context support
|
||||
Context() context.Context
|
||||
WithContext(ctx context.Context)
|
||||
}
|
||||
|
||||
// Cloneable interface. event can be cloned.
|
||||
@@ -86,11 +91,18 @@ type Cloneable interface {
|
||||
Clone() Event
|
||||
}
|
||||
|
||||
// ContextAble context-able event interface
|
||||
type ContextAble interface {
|
||||
Context() context.Context
|
||||
WithContext(ctx context.Context)
|
||||
}
|
||||
|
||||
// FactoryFunc for create event instance.
|
||||
type FactoryFunc func() Event
|
||||
|
||||
// BasicEvent a built-in implements Event interface
|
||||
type BasicEvent struct {
|
||||
ContextTrait
|
||||
// event name
|
||||
name string
|
||||
// user data.
|
||||
@@ -102,9 +114,7 @@ type BasicEvent struct {
|
||||
}
|
||||
|
||||
// New create an event instance
|
||||
func New(name string, data M) *BasicEvent {
|
||||
return NewBasic(name, data)
|
||||
}
|
||||
func New(name string, data M) *BasicEvent { return NewBasic(name, data) }
|
||||
|
||||
// NewBasic new a basic event instance
|
||||
func NewBasic(name string, data M) *BasicEvent {
|
||||
@@ -119,9 +129,7 @@ func NewBasic(name string, data M) *BasicEvent {
|
||||
}
|
||||
|
||||
// Abort event loop exec
|
||||
func (e *BasicEvent) Abort(abort bool) {
|
||||
e.aborted = abort
|
||||
}
|
||||
func (e *BasicEvent) Abort(abort bool) { e.aborted = abort }
|
||||
|
||||
// Fill event data
|
||||
func (e *BasicEvent) Fill(target any, data M) *BasicEvent {
|
||||
@@ -134,16 +142,13 @@ func (e *BasicEvent) Fill(target any, data M) *BasicEvent {
|
||||
}
|
||||
|
||||
// AttachTo add current event to the event manager.
|
||||
func (e *BasicEvent) AttachTo(em ManagerFace) {
|
||||
em.AddEvent(e)
|
||||
}
|
||||
func (e *BasicEvent) AttachTo(em ManagerFace) error { return em.AddEvent(e) }
|
||||
|
||||
// Get data by index
|
||||
func (e *BasicEvent) Get(key string) any {
|
||||
if v, ok := e.data[key]; ok {
|
||||
return v
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -159,23 +164,25 @@ func (e *BasicEvent) Set(key string, val any) {
|
||||
if e.data == nil {
|
||||
e.data = make(map[string]any)
|
||||
}
|
||||
|
||||
e.data[key] = val
|
||||
}
|
||||
|
||||
// Name get event name
|
||||
func (e *BasicEvent) Name() string {
|
||||
return e.name
|
||||
}
|
||||
func (e *BasicEvent) Name() string { return e.name }
|
||||
|
||||
// Data get all data
|
||||
func (e *BasicEvent) Data() map[string]any {
|
||||
return e.data
|
||||
}
|
||||
func (e *BasicEvent) Data() map[string]any { return e.data }
|
||||
|
||||
// IsAborted check.
|
||||
func (e *BasicEvent) IsAborted() bool {
|
||||
return e.aborted
|
||||
func (e *BasicEvent) IsAborted() bool { return e.aborted }
|
||||
|
||||
// Target get target
|
||||
func (e *BasicEvent) Target() any { return e.target }
|
||||
|
||||
// SetName set event name
|
||||
func (e *BasicEvent) SetName(name string) *BasicEvent {
|
||||
e.name = name
|
||||
return e
|
||||
}
|
||||
|
||||
// Clone new instance
|
||||
@@ -184,17 +191,6 @@ func (e *BasicEvent) Clone() Event {
|
||||
return &cp
|
||||
}
|
||||
|
||||
// Target get target
|
||||
func (e *BasicEvent) Target() any {
|
||||
return e.target
|
||||
}
|
||||
|
||||
// SetName set event name
|
||||
func (e *BasicEvent) SetName(name string) *BasicEvent {
|
||||
e.name = name
|
||||
return e
|
||||
}
|
||||
|
||||
// SetData set data to the event
|
||||
func (e *BasicEvent) SetData(data M) Event {
|
||||
if data != nil {
|
||||
@@ -208,3 +204,22 @@ func (e *BasicEvent) SetTarget(target any) *BasicEvent {
|
||||
e.target = target
|
||||
return e
|
||||
}
|
||||
|
||||
// ContextTrait event context trait
|
||||
type ContextTrait struct {
|
||||
// context
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// Context get context
|
||||
func (t *ContextTrait) Context() context.Context {
|
||||
if t.ctx == nil {
|
||||
return context.Background()
|
||||
}
|
||||
return t.ctx
|
||||
}
|
||||
|
||||
// WithContext set context
|
||||
func (t *ContextTrait) WithContext(ctx context.Context) {
|
||||
t.ctx = ctx
|
||||
}
|
@@ -53,6 +53,7 @@ func (s testSubscriber2) SubscribedEvents() map[string]any {
|
||||
}
|
||||
|
||||
type testEvent struct {
|
||||
event.ContextTrait
|
||||
name string
|
||||
data map[string]any
|
||||
abort bool
|
||||
|
303
manager.go
303
manager.go
@@ -1,9 +1,8 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"context"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
@@ -74,7 +73,7 @@ func (em *Manager) WithOptions(fns ...OptionFn) *Manager {
|
||||
}
|
||||
|
||||
/*************************************************************
|
||||
* -- register listeners
|
||||
* region Register listeners
|
||||
*************************************************************/
|
||||
|
||||
// AddListener register a event handler/listener. alias of the method On()
|
||||
@@ -155,297 +154,15 @@ func (em *Manager) addListenerItem(name string, li *ListenerItem) {
|
||||
}
|
||||
|
||||
/*************************************************************
|
||||
* Listener Manage: - trigger event
|
||||
*************************************************************/
|
||||
|
||||
// MustTrigger alias of the method MustFire()
|
||||
func (em *Manager) MustTrigger(name string, params M) Event {
|
||||
return em.MustFire(name, params)
|
||||
}
|
||||
|
||||
// MustFire fire event by name. will panic on error
|
||||
func (em *Manager) MustFire(name string, params M) Event {
|
||||
err, e := em.Fire(name, params)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Trigger alias of the method Fire()
|
||||
func (em *Manager) Trigger(name string, params M) (error, Event) {
|
||||
return em.Fire(name, params)
|
||||
}
|
||||
|
||||
// Fire trigger event by name. if not found listener, will return (nil, nil)
|
||||
func (em *Manager) Fire(name string, params M) (err error, e Event) {
|
||||
// call listeners handle event
|
||||
e, err = em.fireByName(name, params, false)
|
||||
return
|
||||
}
|
||||
|
||||
// Async fire event by go channel.
|
||||
//
|
||||
// Note: if you want to use this method, you should
|
||||
// call the method Close() after all events are fired.
|
||||
func (em *Manager) Async(name string, params M) {
|
||||
_, _ = em.fireByName(name, params, true)
|
||||
}
|
||||
|
||||
// FireC async fire event by go channel. alias of the method Async()
|
||||
//
|
||||
// Note: if you want to use this method, you should
|
||||
// call the method Close() after all events are fired.
|
||||
func (em *Manager) FireC(name string, params M) {
|
||||
_, _ = em.fireByName(name, params, true)
|
||||
}
|
||||
|
||||
// fire event by name.
|
||||
//
|
||||
// if useCh is true, will async fire by channel. always return (nil, nil)
|
||||
//
|
||||
// On useCh=false:
|
||||
// - will call listeners handle event.
|
||||
// - if not found listener, will return (nil, nil)
|
||||
func (em *Manager) fireByName(name string, params M, useCh bool) (e Event, err error) {
|
||||
name = goodName(name, false)
|
||||
|
||||
// use pre-defined Event
|
||||
if fc, ok := em.eventFc[name]; ok {
|
||||
e = fc() // make new instance
|
||||
if params != nil {
|
||||
e.SetData(params)
|
||||
}
|
||||
} else {
|
||||
// create new basic event instance
|
||||
e = em.newBasicEvent(name, params)
|
||||
}
|
||||
|
||||
// fire by channel
|
||||
if useCh {
|
||||
em.FireAsync(e)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// call listeners handle event
|
||||
err = em.FireEvent(e)
|
||||
return
|
||||
}
|
||||
|
||||
// FireEvent fire event by given Event instance
|
||||
func (em *Manager) FireEvent(e Event) (err error) {
|
||||
if em.EnableLock {
|
||||
em.Lock()
|
||||
defer em.Unlock()
|
||||
}
|
||||
|
||||
// ensure aborted is false.
|
||||
e.Abort(false)
|
||||
name := e.Name()
|
||||
|
||||
// fire group listeners by wildcard. eg "db.user.*"
|
||||
if em.MatchMode == ModePath {
|
||||
err = em.firePathMode(name, e)
|
||||
return
|
||||
}
|
||||
|
||||
// handle mode: ModeSimple
|
||||
err = em.fireSimpleMode(name, e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
|
||||
// fire wildcard event listeners
|
||||
if lq, ok := em.listeners[Wildcard]; ok {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ModeSimple has group listeners by wildcard. eg "db.user.*"
|
||||
//
|
||||
// Example:
|
||||
// - event "db.user.add" will trigger listeners on the "db.user.*"
|
||||
func (em *Manager) fireSimpleMode(name string, e Event) (err error) {
|
||||
// fire direct matched listeners. eg: db.user.add
|
||||
if lq, ok := em.listeners[name]; ok {
|
||||
// sort by priority before call.
|
||||
for _, li := range lq.Sort().Items() {
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pos := strings.LastIndexByte(name, '.')
|
||||
|
||||
if pos > 0 && pos < len(name) {
|
||||
groupName := name[:pos+1] + Wildcard // "app.*"
|
||||
|
||||
if lq, ok := em.listeners[groupName]; ok {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ModePath fire group listeners by ModePath.
|
||||
//
|
||||
// Example:
|
||||
// - event "db.user.add" will trigger listeners on the "db.**"
|
||||
// - event "db.user.add" will trigger listeners on the "db.user.*"
|
||||
func (em *Manager) firePathMode(name string, e Event) (err error) {
|
||||
for pattern, lq := range em.listeners {
|
||||
if pattern == name || matchNodePath(pattern, name, ".") {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
/*************************************************************
|
||||
* Fire by channel
|
||||
*************************************************************/
|
||||
|
||||
// FireAsync async fire event by go channel.
|
||||
//
|
||||
// Note: if you want to use this method, you should
|
||||
// call the method Close() after all events are fired.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// em := NewManager("test")
|
||||
// em.FireAsync("db.user.add", M{"id": 1001})
|
||||
func (em *Manager) FireAsync(e Event) {
|
||||
// once make consumers
|
||||
em.oc.Do(func() {
|
||||
em.makeConsumers()
|
||||
})
|
||||
|
||||
// dispatch event
|
||||
em.ch <- e
|
||||
}
|
||||
|
||||
// async fire event by 'go' keywords
|
||||
func (em *Manager) makeConsumers() {
|
||||
if em.ConsumerNum <= 0 {
|
||||
em.ConsumerNum = defaultConsumerNum
|
||||
}
|
||||
if em.ChannelSize <= 0 {
|
||||
em.ChannelSize = defaultChannelSize
|
||||
}
|
||||
|
||||
em.ch = make(chan Event, em.ChannelSize)
|
||||
|
||||
// make event consumers
|
||||
for i := 0; i < em.ConsumerNum; i++ {
|
||||
em.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
em.err = fmt.Errorf("async consum event error: %v", err)
|
||||
}
|
||||
em.wg.Done()
|
||||
}()
|
||||
|
||||
// keep running until channel closed
|
||||
for e := range em.ch {
|
||||
_ = em.FireEvent(e) // ignore async fire error
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// CloseWait close channel and wait all async event done.
|
||||
func (em *Manager) CloseWait() error {
|
||||
if err := em.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return em.Wait()
|
||||
}
|
||||
|
||||
// Wait wait all async event done.
|
||||
func (em *Manager) Wait() error {
|
||||
em.wg.Wait()
|
||||
return em.err
|
||||
}
|
||||
|
||||
// Close event channel, deny to fire new event.
|
||||
func (em *Manager) Close() error {
|
||||
if em.ch != nil {
|
||||
close(em.ch)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FireBatch fire multi event at once.
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// FireBatch("name1", "name2", &MyEvent{})
|
||||
func (em *Manager) FireBatch(es ...any) (ers []error) {
|
||||
var err error
|
||||
for _, e := range es {
|
||||
if name, ok := e.(string); ok {
|
||||
err, _ = em.Fire(name, nil)
|
||||
} else if evt, ok := e.(Event); ok {
|
||||
err = em.FireEvent(evt)
|
||||
} // ignore invalid param.
|
||||
|
||||
if err != nil {
|
||||
ers = append(ers, err)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// AsyncFire simple async fire event by 'go' keywords
|
||||
func (em *Manager) AsyncFire(e Event) {
|
||||
go func(e Event) {
|
||||
_ = em.FireEvent(e)
|
||||
}(e)
|
||||
}
|
||||
|
||||
// AwaitFire async fire event by 'go' keywords, but will wait return result
|
||||
func (em *Manager) AwaitFire(e Event) (err error) {
|
||||
ch := make(chan error)
|
||||
|
||||
go func(e Event) {
|
||||
err := em.FireEvent(e)
|
||||
ch <- err
|
||||
}(e)
|
||||
|
||||
err = <-ch
|
||||
close(ch)
|
||||
return
|
||||
}
|
||||
|
||||
/*************************************************************
|
||||
* Event Manage
|
||||
* region Event Manage
|
||||
*************************************************************/
|
||||
|
||||
// AddEvent add a pre-defined event instance to manager.
|
||||
func (em *Manager) AddEvent(e Event) {
|
||||
name := goodName(e.Name(), false)
|
||||
func (em *Manager) AddEvent(e Event) error {
|
||||
name, err := goodNameOrErr(e.Name(), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if ec, ok := e.(Cloneable); ok {
|
||||
em.AddEventFc(name, func() Event {
|
||||
@@ -456,6 +173,7 @@ func (em *Manager) AddEvent(e Event) {
|
||||
return e
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddEventFc add a pre-defined event factory func to manager.
|
||||
@@ -493,12 +211,13 @@ func (em *Manager) RemoveEvents() {
|
||||
}
|
||||
|
||||
/*************************************************************
|
||||
* Helper Methods
|
||||
* region Helper Methods
|
||||
*************************************************************/
|
||||
|
||||
// newBasicEvent create new BasicEvent by clone em.sample
|
||||
func (em *Manager) newBasicEvent(name string, data M) *BasicEvent {
|
||||
var cp = *em.sample
|
||||
cp.ctx = context.Background()
|
||||
|
||||
cp.SetName(name)
|
||||
cp.SetData(data)
|
||||
|
452
manager_fire.go
Normal file
452
manager_fire.go
Normal file
@@ -0,0 +1,452 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
/*************************************************************
|
||||
* region Trigger event
|
||||
*************************************************************/
|
||||
|
||||
// MustTrigger alias of the method MustFire()
|
||||
func (em *Manager) MustTrigger(name string, params M) Event { return em.MustFire(name, params) }
|
||||
|
||||
// MustFire fire event by name. will panic on error
|
||||
func (em *Manager) MustFire(name string, params M) Event {
|
||||
err, e := em.Fire(name, params)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Trigger alias of the method Fire()
|
||||
func (em *Manager) Trigger(name string, params M) (error, Event) { return em.Fire(name, params) }
|
||||
|
||||
// Fire trigger event by name. if not found listener, will return (nil, nil)
|
||||
func (em *Manager) Fire(name string, params M) (err error, e Event) {
|
||||
// call listeners handle event
|
||||
e, err = em.fireByName(name, params, false)
|
||||
return
|
||||
}
|
||||
|
||||
// FireCtx fire event by name with context
|
||||
func (em *Manager) FireCtx(ctx context.Context, name string, params M) (err error, e Event) {
|
||||
// call listeners handle event
|
||||
e, err = em.fireByNameCtx(ctx, name, params, false)
|
||||
return
|
||||
}
|
||||
|
||||
// Async fire event by go channel.
|
||||
//
|
||||
// Note: if you want to use this method, you should
|
||||
// call the method Close() after all events are fired.
|
||||
func (em *Manager) Async(name string, params M) {
|
||||
_, _ = em.fireByName(name, params, true)
|
||||
}
|
||||
|
||||
// FireC async fire event by go channel. alias of the method Async()
|
||||
//
|
||||
// Note: if you want to use this method, you should
|
||||
// call the method Close() after all events are fired.
|
||||
func (em *Manager) FireC(name string, params M) {
|
||||
_, _ = em.fireByName(name, params, true)
|
||||
}
|
||||
|
||||
// fire event by name.
|
||||
//
|
||||
// if useCh is true, will async fire by channel. always return (nil, nil)
|
||||
//
|
||||
// On useCh=false:
|
||||
// - will call listeners handle event.
|
||||
// - if not found listener, will return (nil, nil)
|
||||
func (em *Manager) fireByName(name string, params M, useCh bool) (e Event, err error) {
|
||||
name, err = goodNameOrErr(name, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// use pre-defined Event
|
||||
if fc, ok := em.eventFc[name]; ok {
|
||||
e = fc() // make new instance
|
||||
if params != nil {
|
||||
e.SetData(params)
|
||||
}
|
||||
} else {
|
||||
// create new basic event instance
|
||||
e = em.newBasicEvent(name, params)
|
||||
}
|
||||
|
||||
// fire by channel
|
||||
if useCh {
|
||||
em.FireAsync(e)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// call listeners handle event
|
||||
err = em.FireEvent(e)
|
||||
return
|
||||
}
|
||||
|
||||
// fireByNameCtx fire event by name with context
|
||||
func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, useCh bool) (e Event, err error) {
|
||||
name, err = goodNameOrErr(name, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// use pre-defined Event
|
||||
if fc, ok := em.eventFc[name]; ok {
|
||||
e = fc() // make new instance
|
||||
if params != nil {
|
||||
e.SetData(params)
|
||||
}
|
||||
} else {
|
||||
// create new basic event instance
|
||||
e = em.newBasicEvent(name, params)
|
||||
}
|
||||
|
||||
// set context
|
||||
e.WithContext(ctx)
|
||||
|
||||
// fire by channel
|
||||
if useCh {
|
||||
em.FireAsync(e)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// call listeners handle event
|
||||
err = em.FireEventCtx(ctx, e)
|
||||
return
|
||||
}
|
||||
|
||||
// FireEvent fire event by given Event instance
|
||||
func (em *Manager) FireEvent(e Event) (err error) {
|
||||
if em.EnableLock {
|
||||
em.Lock()
|
||||
defer em.Unlock()
|
||||
}
|
||||
|
||||
// ensure aborted is false.
|
||||
e.Abort(false)
|
||||
name := e.Name()
|
||||
|
||||
// fire group listeners by wildcard. eg "db.user.*"
|
||||
if em.MatchMode == ModePath {
|
||||
err = em.firePathMode(name, e)
|
||||
return
|
||||
}
|
||||
|
||||
// handle mode: ModeSimple
|
||||
err = em.fireSimpleMode(name, e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
|
||||
// fire wildcard event listeners
|
||||
if lq, ok := em.listeners[Wildcard]; ok {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// FireEventCtx fire event by given Event instance with context
|
||||
func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) {
|
||||
if em.EnableLock {
|
||||
em.Lock()
|
||||
defer em.Unlock()
|
||||
}
|
||||
|
||||
// ensure aborted is false.
|
||||
e.Abort(false)
|
||||
name := e.Name()
|
||||
ctx := context.Background()
|
||||
if ce, ok := e.(ContextAble); ok {
|
||||
ctx = ce.Context()
|
||||
}
|
||||
|
||||
// fire group listeners by wildcard. eg "db.user.*"
|
||||
if em.MatchMode == ModePath {
|
||||
err = em.firePathModeCtx(ctx, name, e)
|
||||
return
|
||||
}
|
||||
|
||||
// handle mode: ModeSimple
|
||||
err = em.fireSimpleModeCtx(ctx, name, e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
|
||||
// fire wildcard event listeners
|
||||
if lq, ok := em.listeners[Wildcard]; ok {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ModeSimple has group listeners by wildcard. eg "db.user.*"
|
||||
//
|
||||
// Example:
|
||||
// - event "db.user.add" will trigger listeners on the "db.user.*"
|
||||
func (em *Manager) fireSimpleMode(name string, e Event) (err error) {
|
||||
// fire direct matched listeners. eg: db.user.add
|
||||
if lq, ok := em.listeners[name]; ok {
|
||||
// sort by priority before call.
|
||||
for _, li := range lq.Sort().Items() {
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pos := strings.LastIndexByte(name, '.')
|
||||
|
||||
if pos > 0 && pos < len(name) {
|
||||
groupName := name[:pos+1] + Wildcard // "app.*"
|
||||
|
||||
if lq, ok := em.listeners[groupName]; ok {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fireSimpleModeCtx ModeSimple has group listeners by wildcard with context. eg "db.user.*"
|
||||
func (em *Manager) fireSimpleModeCtx(ctx context.Context, name string, e Event) (err error) {
|
||||
// fire direct matched listeners. eg: db.user.add
|
||||
if lq, ok := em.listeners[name]; ok {
|
||||
// sort by priority before call.
|
||||
for _, li := range lq.Sort().Items() {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pos := strings.LastIndexByte(name, '.')
|
||||
|
||||
if pos > 0 && pos < len(name) {
|
||||
groupName := name[:pos+1] + Wildcard // "app.*"
|
||||
|
||||
if lq, ok := em.listeners[groupName]; ok {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ModePath fire group listeners by ModePath.
|
||||
//
|
||||
// Example:
|
||||
// - event "db.user.add" will trigger listeners on the "db.**"
|
||||
// - event "db.user.add" will trigger listeners on the "db.user.*"
|
||||
func (em *Manager) firePathMode(name string, e Event) (err error) {
|
||||
for pattern, lq := range em.listeners {
|
||||
if pattern == name || matchNodePath(pattern, name, ".") {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// firePathModeCtx fire group listeners by ModePath with context.
|
||||
//
|
||||
// Example:
|
||||
// - event "db.user.add" will trigger listeners on the "db.**"
|
||||
// - event "db.user.add" will trigger listeners on the "db.user.*"
|
||||
func (em *Manager) firePathModeCtx(ctx context.Context, name string, e Event) (err error) {
|
||||
for pattern, lq := range em.listeners {
|
||||
if pattern == name || matchNodePath(pattern, name, ".") {
|
||||
for _, li := range lq.Sort().Items() {
|
||||
// Check context cancellation
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = ctx.Err()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
err = li.Listener.Handle(e)
|
||||
if err != nil || e.IsAborted() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
/*************************************************************
|
||||
* region Fire by channel
|
||||
*************************************************************/
|
||||
|
||||
// FireAsync async fire event by go channel.
|
||||
//
|
||||
// Note: if you want to use this method, you should
|
||||
// call the method Close() after all events are fired.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// em := NewManager("test")
|
||||
// em.FireAsync("db.user.add", M{"id": 1001})
|
||||
func (em *Manager) FireAsync(e Event) {
|
||||
// once make consumers
|
||||
em.oc.Do(func() {
|
||||
em.makeConsumers()
|
||||
})
|
||||
|
||||
// dispatch event
|
||||
em.ch <- e
|
||||
}
|
||||
|
||||
// async fire event by 'go' keywords
|
||||
func (em *Manager) makeConsumers() {
|
||||
if em.ConsumerNum <= 0 {
|
||||
em.ConsumerNum = defaultConsumerNum
|
||||
}
|
||||
if em.ChannelSize <= 0 {
|
||||
em.ChannelSize = defaultChannelSize
|
||||
}
|
||||
|
||||
em.ch = make(chan Event, em.ChannelSize)
|
||||
|
||||
// make event consumers
|
||||
for i := 0; i < em.ConsumerNum; i++ {
|
||||
em.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
em.err = fmt.Errorf("async consum event error: %v", err)
|
||||
}
|
||||
em.wg.Done()
|
||||
}()
|
||||
|
||||
// keep running until channel closed
|
||||
for e := range em.ch {
|
||||
_ = em.FireEvent(e) // ignore async fire error
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// CloseWait close channel and wait all async event done.
|
||||
func (em *Manager) CloseWait() error {
|
||||
if err := em.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return em.Wait()
|
||||
}
|
||||
|
||||
// Wait wait all async event done.
|
||||
func (em *Manager) Wait() error {
|
||||
em.wg.Wait()
|
||||
return em.err
|
||||
}
|
||||
|
||||
// Close event channel, deny to fire new event.
|
||||
func (em *Manager) Close() error {
|
||||
if em.ch != nil {
|
||||
close(em.ch)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// FireBatch fire multi event at once.
|
||||
//
|
||||
// Usage:
|
||||
//
|
||||
// FireBatch("name1", "name2", &MyEvent{})
|
||||
func (em *Manager) FireBatch(es ...any) (ers []error) {
|
||||
var err error
|
||||
for _, e := range es {
|
||||
if name, ok := e.(string); ok {
|
||||
err, _ = em.Fire(name, nil)
|
||||
} else if evt, ok := e.(Event); ok {
|
||||
err = em.FireEvent(evt)
|
||||
} // ignore invalid param.
|
||||
|
||||
if err != nil {
|
||||
ers = append(ers, err)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// AsyncFire simple async fire event by 'go' keywords
|
||||
func (em *Manager) AsyncFire(e Event) {
|
||||
go func(e Event) {
|
||||
_ = em.FireEvent(e)
|
||||
}(e)
|
||||
}
|
||||
|
||||
// AwaitFire async fire event by 'go' keywords, but will wait return result
|
||||
func (em *Manager) AwaitFire(e Event) (err error) {
|
||||
ch := make(chan error)
|
||||
|
||||
go func(e Event) {
|
||||
err := em.FireEvent(e)
|
||||
ch <- err
|
||||
}(e)
|
||||
|
||||
err = <-ch
|
||||
close(ch)
|
||||
return
|
||||
}
|
12
priority.go
12
priority.go
@@ -1,12 +0,0 @@
|
||||
package event
|
||||
|
||||
// There are some default priority constants
|
||||
const (
|
||||
Min = -300
|
||||
Low = -200
|
||||
BelowNormal = -100
|
||||
Normal = 0
|
||||
AboveNormal = 100
|
||||
High = 200
|
||||
Max = 300
|
||||
)
|
82
std.go
82
std.go
@@ -1,20 +1,20 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// std default event manager
|
||||
var std = NewManager("default")
|
||||
|
||||
// Std get default event manager
|
||||
func Std() *Manager {
|
||||
return std
|
||||
}
|
||||
func Std() *Manager { return std }
|
||||
|
||||
// Config set default event manager options
|
||||
func Config(fn ...OptionFn) {
|
||||
std.WithOptions(fn...)
|
||||
}
|
||||
func Config(fn ...OptionFn) { std.WithOptions(fn...) }
|
||||
|
||||
/*************************************************************
|
||||
* Listener
|
||||
* region Listener
|
||||
*************************************************************/
|
||||
|
||||
// On register a listener to the event. alias of Listen()
|
||||
@@ -33,82 +33,62 @@ func Listen(name string, listener Listener, priority ...int) {
|
||||
}
|
||||
|
||||
// Subscribe register a listener to the event
|
||||
func Subscribe(sbr Subscriber) {
|
||||
std.Subscribe(sbr)
|
||||
}
|
||||
func Subscribe(sbr Subscriber) { std.Subscribe(sbr) }
|
||||
|
||||
// AddSubscriber register a listener to the event
|
||||
func AddSubscriber(sbr Subscriber) {
|
||||
std.AddSubscriber(sbr)
|
||||
}
|
||||
func AddSubscriber(sbr Subscriber) { std.AddSubscriber(sbr) }
|
||||
|
||||
// AsyncFire simple async fire event by 'go' keywords
|
||||
func AsyncFire(e Event) {
|
||||
std.AsyncFire(e)
|
||||
}
|
||||
func AsyncFire(e Event) { std.AsyncFire(e) }
|
||||
|
||||
// Async fire event by channel
|
||||
func Async(name string, params M) {
|
||||
std.Async(name, params)
|
||||
}
|
||||
func Async(name string, params M) { std.Async(name, params) }
|
||||
|
||||
// FireAsync fire event by channel
|
||||
func FireAsync(e Event) {
|
||||
std.FireAsync(e)
|
||||
}
|
||||
func FireAsync(e Event) { std.FireAsync(e) }
|
||||
|
||||
// CloseWait close chan and wait for all async events done.
|
||||
func CloseWait() error {
|
||||
return std.CloseWait()
|
||||
}
|
||||
func CloseWait() error { return std.CloseWait() }
|
||||
|
||||
// Trigger alias of Fire
|
||||
func Trigger(name string, params M) (error, Event) {
|
||||
return std.Fire(name, params)
|
||||
}
|
||||
func Trigger(name string, params M) (error, Event) { return std.Fire(name, params) }
|
||||
|
||||
// Fire listeners by name.
|
||||
func Fire(name string, params M) (error, Event) {
|
||||
return std.Fire(name, params)
|
||||
func Fire(name string, params M) (error, Event) { return std.Fire(name, params) }
|
||||
|
||||
// FireCtx listeners by name with context.
|
||||
func FireCtx(ctx context.Context, name string, params M) (error, Event) {
|
||||
return std.FireCtx(ctx, name, params)
|
||||
}
|
||||
|
||||
// FireEvent fire listeners by Event instance.
|
||||
func FireEvent(e Event) error {
|
||||
return std.FireEvent(e)
|
||||
func FireEvent(e Event) error { return std.FireEvent(e) }
|
||||
|
||||
// FireEventCtx fire listeners by Event instance with context.
|
||||
func FireEventCtx(ctx context.Context, e Event) error {
|
||||
return std.FireEventCtx(ctx, e)
|
||||
}
|
||||
|
||||
// TriggerEvent alias of FireEvent
|
||||
func TriggerEvent(e Event) error {
|
||||
return std.FireEvent(e)
|
||||
}
|
||||
func TriggerEvent(e Event) error { return std.FireEvent(e) }
|
||||
|
||||
// MustFire fire event by name. will panic on error
|
||||
func MustFire(name string, params M) Event {
|
||||
return std.MustFire(name, params)
|
||||
}
|
||||
func MustFire(name string, params M) Event { return std.MustFire(name, params) }
|
||||
|
||||
// MustTrigger alias of MustFire
|
||||
func MustTrigger(name string, params M) Event {
|
||||
return std.MustFire(name, params)
|
||||
}
|
||||
func MustTrigger(name string, params M) Event { return std.MustFire(name, params) }
|
||||
|
||||
// FireBatch fire multi event at once.
|
||||
func FireBatch(es ...any) []error {
|
||||
return std.FireBatch(es...)
|
||||
}
|
||||
func FireBatch(es ...any) []error { return std.FireBatch(es...) }
|
||||
|
||||
// HasListeners has listeners for the event name.
|
||||
func HasListeners(name string) bool {
|
||||
return std.HasListeners(name)
|
||||
}
|
||||
func HasListeners(name string) bool { return std.HasListeners(name) }
|
||||
|
||||
// Reset the default event manager
|
||||
func Reset() {
|
||||
std.Clear()
|
||||
}
|
||||
func Reset() { std.Clear() }
|
||||
|
||||
/*************************************************************
|
||||
* Event
|
||||
* region Event
|
||||
*************************************************************/
|
||||
|
||||
// AddEvent add a pre-defined event.
|
||||
|
@@ -5,6 +5,21 @@ import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
// There are some default priority constants
|
||||
const (
|
||||
Min = -300
|
||||
Low = -200
|
||||
BelowNormal = -100
|
||||
Normal = 0
|
||||
AboveNormal = 100
|
||||
High = 200
|
||||
Max = 300
|
||||
)
|
||||
|
||||
/*************************************************************
|
||||
* region Listener
|
||||
*************************************************************/
|
||||
|
||||
// Listener interface
|
||||
type Listener interface {
|
||||
Handle(e Event) error
|
20
util.go
20
util.go
@@ -1,6 +1,7 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"regexp"
|
||||
@@ -41,25 +42,34 @@ var goodNameReg = regexp.MustCompile(`^[a-zA-Z][\w-.*]*$`)
|
||||
|
||||
// goodName check event name is valid.
|
||||
func goodName(name string, isReg bool) string {
|
||||
name, err := goodNameOrErr(name, isReg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return name
|
||||
}
|
||||
|
||||
// goodNameOrErr check event name is valid.
|
||||
func goodNameOrErr(name string, isReg bool) (string, error) {
|
||||
name = strings.TrimSpace(name)
|
||||
if name == "" {
|
||||
panic("event: the event name cannot be empty")
|
||||
return "", errors.New("event: the event name cannot be empty")
|
||||
}
|
||||
|
||||
// on add listener
|
||||
if isReg {
|
||||
if name == AllNode || name == Wildcard {
|
||||
return Wildcard
|
||||
return Wildcard, nil
|
||||
}
|
||||
if strings.HasPrefix(name, AllNode) {
|
||||
return name
|
||||
return name, nil
|
||||
}
|
||||
}
|
||||
|
||||
if !goodNameReg.MatchString(name) {
|
||||
panic(`event: name is invalid, must match regex:` + goodNameReg.String())
|
||||
return name, errors.New(`event: name is invalid, must match regex:` + goodNameReg.String())
|
||||
}
|
||||
return name
|
||||
return name, nil
|
||||
}
|
||||
|
||||
func panicf(format string, args ...any) {
|
||||
|
Reference in New Issue
Block a user