From ab0d946a81984326e91571abcbe0daaa9f9f2007 Mon Sep 17 00:00:00 2001 From: inhere Date: Thu, 21 Aug 2025 13:36:12 +0800 Subject: [PATCH] :sparkles: feat: add support with context for fire event. see issues #78 - add new method: FireCtx, FireEventCtx --- event.go | 10 ++- issues_test.go | 65 ++++++++++++++- manager.go | 7 +- manager_fire.go | 206 +++++++++++------------------------------------- manager_test.go | 13 +-- std.go | 4 +- std_test.go | 20 +++++ 7 files changed, 149 insertions(+), 176 deletions(-) diff --git a/event.go b/event.go index 277a0dd..c93be50 100644 --- a/event.go +++ b/event.go @@ -80,19 +80,23 @@ type Event interface { SetData(M) Event Abort(bool) IsAborted() bool - // Context support - Context() context.Context - WithContext(ctx context.Context) } // Cloneable interface. event can be cloned. +// +// Check and convert: +// if ec, ok := e.(Cloneable); ok {} type Cloneable interface { Event Clone() Event } // ContextAble context-able event interface +// +// Check and convert in listener: +// if ec, ok := e.(ContextAble); ok {} type ContextAble interface { + Event Context() context.Context WithContext(ctx context.Context) } diff --git a/issues_test.go b/issues_test.go index 42c4981..51c8bc7 100644 --- a/issues_test.go +++ b/issues_test.go @@ -2,6 +2,7 @@ package event_test import ( "bytes" + "context" "fmt" "testing" "time" @@ -12,7 +13,7 @@ import ( type testNotify struct{} -func (notify *testNotify) Handle(e event.Event) error { +func (notify *testNotify) Handle(_ event.Event) error { isRun = true return nil } @@ -113,3 +114,65 @@ func TestIssues_61(t *testing.T) { fmt.Println("publish event finished!") } + +// https://github.com/gookit/event/issues/78 +// It is expected to support event passing context, timeout control, and log trace passing such as trace ID information. +// 希望事件支持通过上下文,超时控制和日志跟踪传递,例如跟踪ID信息。 +func TestIssues_78(t *testing.T) { + // Test with context + ctx := context.Background() + + // Create a context with value (simulating trace ID) + ctx = context.WithValue(ctx, "trace_id", "trace-12345") + + // Create a context with timeout + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + manager := event.NewManager("test") + + var traceID string + var ctxErr error + + // Register event listener + manager.On("app.test", event.ListenerFunc(func(e event.Event) error { + ec, ok := e.(event.ContextAble) + if !ok { + return nil + } + + // Get trace ID from context + traceID, _ = ec.Context().Value("trace_id").(string) + + // Check if context is canceled + select { + case <-ec.Context().Done(): + ctxErr = ec.Context().Err() + return ctxErr + default: + } + + return nil + })) + + // Test firing event with context + err, _ := manager.FireCtx(ctx, "app.test", event.M{"key": "value"}) + assert.NoError(t, err) + assert.Equal(t, "trace-12345", traceID) + assert.Nil(t, ctxErr) + + // Test with std + stdTraceID := "" + event.On("std.test", event.ListenerFunc(func(e event.Event) error { + ec, ok := e.(event.ContextAble) + if !ok { + return nil + } + stdTraceID, _ = ec.Context().Value("trace_id").(string) + return nil + })) + + err, _ = event.FireCtx(ctx, "std.test", event.M{"key": "value"}) + assert.NoError(t, err) + assert.Equal(t, "trace-12345", stdTraceID) +} diff --git a/manager.go b/manager.go index ea31382..8c3276e 100644 --- a/manager.go +++ b/manager.go @@ -19,7 +19,8 @@ type Manager struct { wg sync.WaitGroup ch chan Event oc sync.Once - err error // latest error + err error // latest error + ctx context.Context // default context // name of the manager name string @@ -46,7 +47,9 @@ func NewM(name string, fns ...OptionFn) *Manager { // NewManager create event manager func NewManager(name string, fns ...OptionFn) *Manager { em := &Manager{ - name: name, + name: name, + ctx: context.Background(), + // sample event sample: &BasicEvent{}, // events storage eventFc: make(map[string]FactoryFunc), diff --git a/manager_fire.go b/manager_fire.go index 6975356..a69b2aa 100644 --- a/manager_fire.go +++ b/manager_fire.go @@ -28,7 +28,7 @@ func (em *Manager) Trigger(name string, params M) (error, Event) { return em.Fir // Fire trigger event by name. if not found listener, will return (nil, nil) func (em *Manager) Fire(name string, params M) (err error, e Event) { // call listeners handle event - e, err = em.fireByName(name, params, false) + e, err = em.fireByNameCtx(em.ctx, name, params, false) return } @@ -62,32 +62,8 @@ func (em *Manager) FireC(name string, params M) { // On useCh=false: // - will call listeners handle event. // - if not found listener, will return (nil, nil) -func (em *Manager) fireByName(name string, params M, useCh bool) (e Event, err error) { - name, err = goodNameOrErr(name, false) - if err != nil { - return nil, err - } - - // use pre-defined Event - if fc, ok := em.eventFc[name]; ok { - e = fc() // make new instance - if params != nil { - e.SetData(params) - } - } else { - // create new basic event instance - e = em.newBasicEvent(name, params) - } - - // fire by channel - if useCh { - em.FireAsync(e) - return nil, nil - } - - // call listeners handle event - err = em.FireEvent(e) - return +func (em *Manager) fireByName(name string, params M, useCh bool) (Event, error) { + return em.fireByNameCtx(em.ctx, name, params, useCh) } // fireByNameCtx fire event by name with context @@ -108,9 +84,6 @@ func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, use e = em.newBasicEvent(name, params) } - // set context - e.WithContext(ctx) - // fire by channel if useCh { em.FireAsync(e) @@ -123,38 +96,11 @@ func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, use } // FireEvent fire event by given Event instance -func (em *Manager) FireEvent(e Event) (err error) { - if em.EnableLock { - em.Lock() - defer em.Unlock() +func (em *Manager) FireEvent(e Event) error { + if ec, ok := e.(ContextAble); ok { + return em.FireEventCtx(ec.Context(), e) } - - // ensure aborted is false. - e.Abort(false) - name := e.Name() - - // fire group listeners by wildcard. eg "db.user.*" - if em.MatchMode == ModePath { - err = em.firePathMode(name, e) - return - } - - // handle mode: ModeSimple - err = em.fireSimpleMode(name, e) - if err != nil || e.IsAborted() { - return - } - - // fire wildcard event listeners - if lq, ok := em.listeners[Wildcard]; ok { - for _, li := range lq.Sort().Items() { - err = li.Listener.Handle(e) - if err != nil || e.IsAborted() { - break - } - } - } - return + return em.FireEventCtx(em.ctx, e) } // FireEventCtx fire event by given Event instance with context @@ -167,19 +113,19 @@ func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) { // ensure aborted is false. e.Abort(false) name := e.Name() - ctx := context.Background() - if ce, ok := e.(ContextAble); ok { - ctx = ce.Context() + // set context + if ec, ok := e.(ContextAble); ok { + ec.WithContext(ctx) } // fire group listeners by wildcard. eg "db.user.*" if em.MatchMode == ModePath { - err = em.firePathModeCtx(ctx, name, e) + err = em.firePathMode(ctx, name, e) return } // handle mode: ModeSimple - err = em.fireSimpleModeCtx(ctx, name, e) + err = em.fireSimpleMode(ctx, name, e) if err != nil || e.IsAborted() { return } @@ -188,11 +134,13 @@ func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) { if lq, ok := em.listeners[Wildcard]; ok { for _, li := range lq.Sort().Items() { // Check context cancellation - select { - case <-ctx.Done(): - err = ctx.Err() - return - default: + if ctx != nil { + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } } err = li.Listener.Handle(e) @@ -208,48 +156,19 @@ func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) { // // Example: // - event "db.user.add" will trigger listeners on the "db.user.*" -func (em *Manager) fireSimpleMode(name string, e Event) (err error) { - // fire direct matched listeners. eg: db.user.add - if lq, ok := em.listeners[name]; ok { - // sort by priority before call. - for _, li := range lq.Sort().Items() { - err = li.Listener.Handle(e) - if err != nil || e.IsAborted() { - return - } - } - } - - pos := strings.LastIndexByte(name, '.') - - if pos > 0 && pos < len(name) { - groupName := name[:pos+1] + Wildcard // "app.*" - - if lq, ok := em.listeners[groupName]; ok { - for _, li := range lq.Sort().Items() { - err = li.Listener.Handle(e) - if err != nil || e.IsAborted() { - return - } - } - } - } - - return nil -} - -// fireSimpleModeCtx ModeSimple has group listeners by wildcard with context. eg "db.user.*" -func (em *Manager) fireSimpleModeCtx(ctx context.Context, name string, e Event) (err error) { +func (em *Manager) fireSimpleMode(ctx context.Context, name string, e Event) (err error) { // fire direct matched listeners. eg: db.user.add if lq, ok := em.listeners[name]; ok { // sort by priority before call. for _, li := range lq.Sort().Items() { // Check context cancellation - select { - case <-ctx.Done(): - err = ctx.Err() - return - default: + if ctx != nil { + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } } err = li.Listener.Handle(e) @@ -261,17 +180,20 @@ func (em *Manager) fireSimpleModeCtx(ctx context.Context, name string, e Event) pos := strings.LastIndexByte(name, '.') + // exists group if pos > 0 && pos < len(name) { groupName := name[:pos+1] + Wildcard // "app.*" if lq, ok := em.listeners[groupName]; ok { for _, li := range lq.Sort().Items() { // Check context cancellation - select { - case <-ctx.Done(): - err = ctx.Err() - return - default: + if ctx != nil { + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } } err = li.Listener.Handle(e) @@ -285,41 +207,23 @@ func (em *Manager) fireSimpleModeCtx(ctx context.Context, name string, e Event) return nil } -// ModePath fire group listeners by ModePath. +// firePathMode fire group listeners by ModePath. // // Example: // - event "db.user.add" will trigger listeners on the "db.**" // - event "db.user.add" will trigger listeners on the "db.user.*" -func (em *Manager) firePathMode(name string, e Event) (err error) { - for pattern, lq := range em.listeners { - if pattern == name || matchNodePath(pattern, name, ".") { - for _, li := range lq.Sort().Items() { - err = li.Listener.Handle(e) - if err != nil || e.IsAborted() { - return - } - } - } - } - - return nil -} - -// firePathModeCtx fire group listeners by ModePath with context. -// -// Example: -// - event "db.user.add" will trigger listeners on the "db.**" -// - event "db.user.add" will trigger listeners on the "db.user.*" -func (em *Manager) firePathModeCtx(ctx context.Context, name string, e Event) (err error) { +func (em *Manager) firePathMode(ctx context.Context, name string, e Event) (err error) { for pattern, lq := range em.listeners { if pattern == name || matchNodePath(pattern, name, ".") { for _, li := range lq.Sort().Items() { // Check context cancellation - select { - case <-ctx.Done(): - err = ctx.Err() - return - default: + if ctx != nil { + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } } err = li.Listener.Handle(e) @@ -387,28 +291,6 @@ func (em *Manager) makeConsumers() { } } -// CloseWait close channel and wait all async event done. -func (em *Manager) CloseWait() error { - if err := em.Close(); err != nil { - return err - } - return em.Wait() -} - -// Wait wait all async event done. -func (em *Manager) Wait() error { - em.wg.Wait() - return em.err -} - -// Close event channel, deny to fire new event. -func (em *Manager) Close() error { - if em.ch != nil { - close(em.ch) - } - return nil -} - // FireBatch fire multi event at once. // // Usage: diff --git a/manager_test.go b/manager_test.go index dba2074..82ab48a 100644 --- a/manager_test.go +++ b/manager_test.go @@ -17,7 +17,7 @@ func TestManager_FireEvent(t *testing.T) { em.EnableLock = true e1 := event.NewBasic("e1", nil) - em.AddEvent(e1) + assert.NoErr(t, em.AddEvent(e1)) em.On("e1", &testListener{"HI"}, event.Min) em.On("e1", &testListener{"WEL"}, event.High) @@ -39,7 +39,7 @@ func TestManager_FireEvent2(t *testing.T) { mgr := event.NewM("test") evt1 := event.New("evt1", nil).Fill(nil, event.M{"n": "inhere"}) - mgr.AddEvent(evt1) + assert.NoErr(t, mgr.AddEvent(evt1)) assert.True(t, mgr.HasEvent("evt1")) assert.False(t, mgr.HasEvent("not-exist")) @@ -131,7 +131,8 @@ func TestManager_ListenGroupEvent(t *testing.T) { em := event.NewManager("test") e1 := event.NewBasic("app.evt1", event.M{"buf": new(bytes.Buffer)}) - e1.AttachTo(em) + err := e1.AttachTo(em) + assert.NoError(t, err) l2 := event.ListenerFunc(func(e event.Event) error { e.Get("buf").(*bytes.Buffer).WriteString(" > 2 " + e.Name()) @@ -152,7 +153,8 @@ func TestManager_ListenGroupEvent(t *testing.T) { buf := e1.Get("buf").(*bytes.Buffer) err, e := em.Fire("app.evt1", nil) assert.NoError(t, err) - assert.Equal(t, e1, e) + assert.Equal(t, "app.evt1", e.Name()) + // assert.Equal(t, e1, e) assert.Equal(t, "Hi > 1 app.evt1 > 2 app.evt1 > 3 app.evt1", buf.String()) em.RemoveListener("app.*", l2) @@ -177,7 +179,8 @@ func TestManager_ListenGroupEvent(t *testing.T) { buf.Reset() err, e = em.Trigger("app.evt1", nil) assert.Error(t, err) - assert.Equal(t, e1, e) + // assert.Equal(t, e1, e) + assert.Equal(t, "app.evt1", e.Name()) assert.Equal(t, "Hi > 1 app.evt1 > 2 app.evt1", buf.String()) em.RemoveListener("", nil) diff --git a/std.go b/std.go index 1ff0e77..78d296e 100644 --- a/std.go +++ b/std.go @@ -65,9 +65,7 @@ func FireCtx(ctx context.Context, name string, params M) (error, Event) { func FireEvent(e Event) error { return std.FireEvent(e) } // FireEventCtx fire listeners by Event instance with context. -func FireEventCtx(ctx context.Context, e Event) error { - return std.FireEventCtx(ctx, e) -} +func FireEventCtx(ctx context.Context, e Event) error { return std.FireEventCtx(ctx, e) } // TriggerEvent alias of FireEvent func TriggerEvent(e Event) error { return std.FireEvent(e) } diff --git a/std_test.go b/std_test.go index 69fcc73..e82101b 100644 --- a/std_test.go +++ b/std_test.go @@ -140,16 +140,36 @@ func TestFireEvent(t *testing.T) { assert.True(t, event.HasListeners("evt1")) assert.False(t, event.HasListeners("not-exist")) + // FireEvent err := event.FireEvent(evt1) assert.NoError(t, err) assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) buf.Reset() + // TriggerEvent err = event.TriggerEvent(evt1) assert.NoError(t, err) assert.Equal(t, "event: evt1, params: n=inhere", buf.String()) buf.Reset() + // FireEventCtx + var ctxVal any + ctx := context.WithValue(context.Background(), "ctx1", "ctx-value1") + event.Listen("evt2", event.ListenerFunc(func(e event.Event) error { + ec, ok := e.(event.ContextAble) + if ok { + ctxVal = ec.Context().Value("ctx1") + } + return nil + })) + evt2 := event.New("evt2", event.M{"name": "inhere"}) + err = event.FireEventCtx(ctx, evt2) + assert.NoError(t, err) + assert.Equal(t, "ctx-value1", ctxVal) + assert.Equal(t, "ctx-value1", evt2.Context().Value("ctx1")) + buf.Reset() + + // AsyncFire event.AsyncFire(evt1) time.Sleep(time.Second) assert.Equal(t, "event: evt1, params: n=inhere", buf.String())