feat: add support with context for fire event. see issues #78

- add new method: FireCtx, FireEventCtx
This commit is contained in:
inhere
2025-08-21 13:36:12 +08:00
parent 8a9127892c
commit ab0d946a81
7 changed files with 149 additions and 176 deletions

View File

@@ -80,19 +80,23 @@ type Event interface {
SetData(M) Event
Abort(bool)
IsAborted() bool
// Context support
Context() context.Context
WithContext(ctx context.Context)
}
// Cloneable interface. event can be cloned.
//
// Check and convert:
// if ec, ok := e.(Cloneable); ok {}
type Cloneable interface {
Event
Clone() Event
}
// ContextAble context-able event interface
//
// Check and convert in listener:
// if ec, ok := e.(ContextAble); ok {}
type ContextAble interface {
Event
Context() context.Context
WithContext(ctx context.Context)
}

View File

@@ -2,6 +2,7 @@ package event_test
import (
"bytes"
"context"
"fmt"
"testing"
"time"
@@ -12,7 +13,7 @@ import (
type testNotify struct{}
func (notify *testNotify) Handle(e event.Event) error {
func (notify *testNotify) Handle(_ event.Event) error {
isRun = true
return nil
}
@@ -113,3 +114,65 @@ func TestIssues_61(t *testing.T) {
fmt.Println("publish event finished!")
}
// https://github.com/gookit/event/issues/78
// It is expected to support event passing context, timeout control, and log trace passing such as trace ID information.
// 希望事件支持通过上下文超时控制和日志跟踪传递例如跟踪ID信息。
func TestIssues_78(t *testing.T) {
// Test with context
ctx := context.Background()
// Create a context with value (simulating trace ID)
ctx = context.WithValue(ctx, "trace_id", "trace-12345")
// Create a context with timeout
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
manager := event.NewManager("test")
var traceID string
var ctxErr error
// Register event listener
manager.On("app.test", event.ListenerFunc(func(e event.Event) error {
ec, ok := e.(event.ContextAble)
if !ok {
return nil
}
// Get trace ID from context
traceID, _ = ec.Context().Value("trace_id").(string)
// Check if context is canceled
select {
case <-ec.Context().Done():
ctxErr = ec.Context().Err()
return ctxErr
default:
}
return nil
}))
// Test firing event with context
err, _ := manager.FireCtx(ctx, "app.test", event.M{"key": "value"})
assert.NoError(t, err)
assert.Equal(t, "trace-12345", traceID)
assert.Nil(t, ctxErr)
// Test with std
stdTraceID := ""
event.On("std.test", event.ListenerFunc(func(e event.Event) error {
ec, ok := e.(event.ContextAble)
if !ok {
return nil
}
stdTraceID, _ = ec.Context().Value("trace_id").(string)
return nil
}))
err, _ = event.FireCtx(ctx, "std.test", event.M{"key": "value"})
assert.NoError(t, err)
assert.Equal(t, "trace-12345", stdTraceID)
}

View File

@@ -19,7 +19,8 @@ type Manager struct {
wg sync.WaitGroup
ch chan Event
oc sync.Once
err error // latest error
err error // latest error
ctx context.Context // default context
// name of the manager
name string
@@ -46,7 +47,9 @@ func NewM(name string, fns ...OptionFn) *Manager {
// NewManager create event manager
func NewManager(name string, fns ...OptionFn) *Manager {
em := &Manager{
name: name,
name: name,
ctx: context.Background(),
// sample event
sample: &BasicEvent{},
// events storage
eventFc: make(map[string]FactoryFunc),

View File

@@ -28,7 +28,7 @@ func (em *Manager) Trigger(name string, params M) (error, Event) { return em.Fir
// Fire trigger event by name. if not found listener, will return (nil, nil)
func (em *Manager) Fire(name string, params M) (err error, e Event) {
// call listeners handle event
e, err = em.fireByName(name, params, false)
e, err = em.fireByNameCtx(em.ctx, name, params, false)
return
}
@@ -62,32 +62,8 @@ func (em *Manager) FireC(name string, params M) {
// On useCh=false:
// - will call listeners handle event.
// - if not found listener, will return (nil, nil)
func (em *Manager) fireByName(name string, params M, useCh bool) (e Event, err error) {
name, err = goodNameOrErr(name, false)
if err != nil {
return nil, err
}
// use pre-defined Event
if fc, ok := em.eventFc[name]; ok {
e = fc() // make new instance
if params != nil {
e.SetData(params)
}
} else {
// create new basic event instance
e = em.newBasicEvent(name, params)
}
// fire by channel
if useCh {
em.FireAsync(e)
return nil, nil
}
// call listeners handle event
err = em.FireEvent(e)
return
func (em *Manager) fireByName(name string, params M, useCh bool) (Event, error) {
return em.fireByNameCtx(em.ctx, name, params, useCh)
}
// fireByNameCtx fire event by name with context
@@ -108,9 +84,6 @@ func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, use
e = em.newBasicEvent(name, params)
}
// set context
e.WithContext(ctx)
// fire by channel
if useCh {
em.FireAsync(e)
@@ -123,38 +96,11 @@ func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, use
}
// FireEvent fire event by given Event instance
func (em *Manager) FireEvent(e Event) (err error) {
if em.EnableLock {
em.Lock()
defer em.Unlock()
func (em *Manager) FireEvent(e Event) error {
if ec, ok := e.(ContextAble); ok {
return em.FireEventCtx(ec.Context(), e)
}
// ensure aborted is false.
e.Abort(false)
name := e.Name()
// fire group listeners by wildcard. eg "db.user.*"
if em.MatchMode == ModePath {
err = em.firePathMode(name, e)
return
}
// handle mode: ModeSimple
err = em.fireSimpleMode(name, e)
if err != nil || e.IsAborted() {
return
}
// fire wildcard event listeners
if lq, ok := em.listeners[Wildcard]; ok {
for _, li := range lq.Sort().Items() {
err = li.Listener.Handle(e)
if err != nil || e.IsAborted() {
break
}
}
}
return
return em.FireEventCtx(em.ctx, e)
}
// FireEventCtx fire event by given Event instance with context
@@ -167,19 +113,19 @@ func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) {
// ensure aborted is false.
e.Abort(false)
name := e.Name()
ctx := context.Background()
if ce, ok := e.(ContextAble); ok {
ctx = ce.Context()
// set context
if ec, ok := e.(ContextAble); ok {
ec.WithContext(ctx)
}
// fire group listeners by wildcard. eg "db.user.*"
if em.MatchMode == ModePath {
err = em.firePathModeCtx(ctx, name, e)
err = em.firePathMode(ctx, name, e)
return
}
// handle mode: ModeSimple
err = em.fireSimpleModeCtx(ctx, name, e)
err = em.fireSimpleMode(ctx, name, e)
if err != nil || e.IsAborted() {
return
}
@@ -188,11 +134,13 @@ func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) {
if lq, ok := em.listeners[Wildcard]; ok {
for _, li := range lq.Sort().Items() {
// Check context cancellation
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
if ctx != nil {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}
err = li.Listener.Handle(e)
@@ -208,48 +156,19 @@ func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) {
//
// Example:
// - event "db.user.add" will trigger listeners on the "db.user.*"
func (em *Manager) fireSimpleMode(name string, e Event) (err error) {
// fire direct matched listeners. eg: db.user.add
if lq, ok := em.listeners[name]; ok {
// sort by priority before call.
for _, li := range lq.Sort().Items() {
err = li.Listener.Handle(e)
if err != nil || e.IsAborted() {
return
}
}
}
pos := strings.LastIndexByte(name, '.')
if pos > 0 && pos < len(name) {
groupName := name[:pos+1] + Wildcard // "app.*"
if lq, ok := em.listeners[groupName]; ok {
for _, li := range lq.Sort().Items() {
err = li.Listener.Handle(e)
if err != nil || e.IsAborted() {
return
}
}
}
}
return nil
}
// fireSimpleModeCtx ModeSimple has group listeners by wildcard with context. eg "db.user.*"
func (em *Manager) fireSimpleModeCtx(ctx context.Context, name string, e Event) (err error) {
func (em *Manager) fireSimpleMode(ctx context.Context, name string, e Event) (err error) {
// fire direct matched listeners. eg: db.user.add
if lq, ok := em.listeners[name]; ok {
// sort by priority before call.
for _, li := range lq.Sort().Items() {
// Check context cancellation
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
if ctx != nil {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}
err = li.Listener.Handle(e)
@@ -261,17 +180,20 @@ func (em *Manager) fireSimpleModeCtx(ctx context.Context, name string, e Event)
pos := strings.LastIndexByte(name, '.')
// exists group
if pos > 0 && pos < len(name) {
groupName := name[:pos+1] + Wildcard // "app.*"
if lq, ok := em.listeners[groupName]; ok {
for _, li := range lq.Sort().Items() {
// Check context cancellation
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
if ctx != nil {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}
err = li.Listener.Handle(e)
@@ -285,41 +207,23 @@ func (em *Manager) fireSimpleModeCtx(ctx context.Context, name string, e Event)
return nil
}
// ModePath fire group listeners by ModePath.
// firePathMode fire group listeners by ModePath.
//
// Example:
// - event "db.user.add" will trigger listeners on the "db.**"
// - event "db.user.add" will trigger listeners on the "db.user.*"
func (em *Manager) firePathMode(name string, e Event) (err error) {
for pattern, lq := range em.listeners {
if pattern == name || matchNodePath(pattern, name, ".") {
for _, li := range lq.Sort().Items() {
err = li.Listener.Handle(e)
if err != nil || e.IsAborted() {
return
}
}
}
}
return nil
}
// firePathModeCtx fire group listeners by ModePath with context.
//
// Example:
// - event "db.user.add" will trigger listeners on the "db.**"
// - event "db.user.add" will trigger listeners on the "db.user.*"
func (em *Manager) firePathModeCtx(ctx context.Context, name string, e Event) (err error) {
func (em *Manager) firePathMode(ctx context.Context, name string, e Event) (err error) {
for pattern, lq := range em.listeners {
if pattern == name || matchNodePath(pattern, name, ".") {
for _, li := range lq.Sort().Items() {
// Check context cancellation
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
if ctx != nil {
select {
case <-ctx.Done():
err = ctx.Err()
return
default:
}
}
err = li.Listener.Handle(e)
@@ -387,28 +291,6 @@ func (em *Manager) makeConsumers() {
}
}
// CloseWait close channel and wait all async event done.
func (em *Manager) CloseWait() error {
if err := em.Close(); err != nil {
return err
}
return em.Wait()
}
// Wait wait all async event done.
func (em *Manager) Wait() error {
em.wg.Wait()
return em.err
}
// Close event channel, deny to fire new event.
func (em *Manager) Close() error {
if em.ch != nil {
close(em.ch)
}
return nil
}
// FireBatch fire multi event at once.
//
// Usage:

View File

@@ -17,7 +17,7 @@ func TestManager_FireEvent(t *testing.T) {
em.EnableLock = true
e1 := event.NewBasic("e1", nil)
em.AddEvent(e1)
assert.NoErr(t, em.AddEvent(e1))
em.On("e1", &testListener{"HI"}, event.Min)
em.On("e1", &testListener{"WEL"}, event.High)
@@ -39,7 +39,7 @@ func TestManager_FireEvent2(t *testing.T) {
mgr := event.NewM("test")
evt1 := event.New("evt1", nil).Fill(nil, event.M{"n": "inhere"})
mgr.AddEvent(evt1)
assert.NoErr(t, mgr.AddEvent(evt1))
assert.True(t, mgr.HasEvent("evt1"))
assert.False(t, mgr.HasEvent("not-exist"))
@@ -131,7 +131,8 @@ func TestManager_ListenGroupEvent(t *testing.T) {
em := event.NewManager("test")
e1 := event.NewBasic("app.evt1", event.M{"buf": new(bytes.Buffer)})
e1.AttachTo(em)
err := e1.AttachTo(em)
assert.NoError(t, err)
l2 := event.ListenerFunc(func(e event.Event) error {
e.Get("buf").(*bytes.Buffer).WriteString(" > 2 " + e.Name())
@@ -152,7 +153,8 @@ func TestManager_ListenGroupEvent(t *testing.T) {
buf := e1.Get("buf").(*bytes.Buffer)
err, e := em.Fire("app.evt1", nil)
assert.NoError(t, err)
assert.Equal(t, e1, e)
assert.Equal(t, "app.evt1", e.Name())
// assert.Equal(t, e1, e)
assert.Equal(t, "Hi > 1 app.evt1 > 2 app.evt1 > 3 app.evt1", buf.String())
em.RemoveListener("app.*", l2)
@@ -177,7 +179,8 @@ func TestManager_ListenGroupEvent(t *testing.T) {
buf.Reset()
err, e = em.Trigger("app.evt1", nil)
assert.Error(t, err)
assert.Equal(t, e1, e)
// assert.Equal(t, e1, e)
assert.Equal(t, "app.evt1", e.Name())
assert.Equal(t, "Hi > 1 app.evt1 > 2 app.evt1", buf.String())
em.RemoveListener("", nil)

4
std.go
View File

@@ -65,9 +65,7 @@ func FireCtx(ctx context.Context, name string, params M) (error, Event) {
func FireEvent(e Event) error { return std.FireEvent(e) }
// FireEventCtx fire listeners by Event instance with context.
func FireEventCtx(ctx context.Context, e Event) error {
return std.FireEventCtx(ctx, e)
}
func FireEventCtx(ctx context.Context, e Event) error { return std.FireEventCtx(ctx, e) }
// TriggerEvent alias of FireEvent
func TriggerEvent(e Event) error { return std.FireEvent(e) }

View File

@@ -140,16 +140,36 @@ func TestFireEvent(t *testing.T) {
assert.True(t, event.HasListeners("evt1"))
assert.False(t, event.HasListeners("not-exist"))
// FireEvent
err := event.FireEvent(evt1)
assert.NoError(t, err)
assert.Equal(t, "event: evt1, params: n=inhere", buf.String())
buf.Reset()
// TriggerEvent
err = event.TriggerEvent(evt1)
assert.NoError(t, err)
assert.Equal(t, "event: evt1, params: n=inhere", buf.String())
buf.Reset()
// FireEventCtx
var ctxVal any
ctx := context.WithValue(context.Background(), "ctx1", "ctx-value1")
event.Listen("evt2", event.ListenerFunc(func(e event.Event) error {
ec, ok := e.(event.ContextAble)
if ok {
ctxVal = ec.Context().Value("ctx1")
}
return nil
}))
evt2 := event.New("evt2", event.M{"name": "inhere"})
err = event.FireEventCtx(ctx, evt2)
assert.NoError(t, err)
assert.Equal(t, "ctx-value1", ctxVal)
assert.Equal(t, "ctx-value1", evt2.Context().Value("ctx1"))
buf.Reset()
// AsyncFire
event.AsyncFire(evt1)
time.Sleep(time.Second)
assert.Equal(t, "event: evt1, params: n=inhere", buf.String())