From 1801b1b525d9dba12bb50588af631d74be6fc810 Mon Sep 17 00:00:00 2001 From: inhere Date: Sat, 30 Aug 2025 13:22:48 +0800 Subject: [PATCH] :necktie: up: Update adjusts contextual event handling logic - fix some unit test error on concurrent run --- event.go | 15 +++++++++++++-- event_test.go | 16 +++++++++++++++- issues_test.go | 13 +++++++------ manager.go | 6 ++---- manager_fire.go | 48 +++++++++++++++++++++++++++++++++--------------- std.go | 3 +++ std_test.go | 26 ++++++++++++++------------ 7 files changed, 87 insertions(+), 40 deletions(-) diff --git a/event.go b/event.go index adaf880..e3266c7 100644 --- a/event.go +++ b/event.go @@ -100,6 +100,7 @@ type Event interface { // Cloneable interface. event can be cloned. // // Check and convert: +// // if ec, ok := e.(Cloneable); ok {} type Cloneable interface { Event @@ -109,6 +110,7 @@ type Cloneable interface { // ContextAble context-able event interface // // Check and convert in listener: +// // if ec, ok := e.(ContextAble); ok {} type ContextAble interface { Event @@ -121,7 +123,6 @@ type FactoryFunc func() Event // BasicEvent a built-in implements Event interface type BasicEvent struct { - ContextTrait // event name name string // user data. @@ -238,4 +239,14 @@ func (t *ContextTrait) Context() context.Context { // WithContext set context func (t *ContextTrait) WithContext(ctx context.Context) { t.ctx = ctx -} \ No newline at end of file +} + +// ContextEvent event with context +type contextEvent struct { + Event + ContextTrait +} + +func newContextEvent(ctx context.Context, e Event) ContextAble { + return &contextEvent{Event: e, ContextTrait: ContextTrait{ctx: ctx}} +} diff --git a/event_test.go b/event_test.go index 805eb93..5e0e028 100644 --- a/event_test.go +++ b/event_test.go @@ -1,13 +1,27 @@ package event_test import ( + "bytes" "fmt" + "sync" "testing" "github.com/gookit/event" "github.com/gookit/goutil/testutil/assert" ) +// Thread-safe buffer for testing +type safeBuffer struct { + bytes.Buffer + mu sync.Mutex +} + +func (sb *safeBuffer) Write(p []byte) (n int, err error) { + sb.mu.Lock() + defer sb.mu.Unlock() + return sb.Buffer.Write(p) +} + type testListener struct { userData string } @@ -119,5 +133,5 @@ func TestEvent(t *testing.T) { e1.Set("k", "v") assert.Equal(t, "v", e1.Get("k")) // assert.NotEmpty(t, e1.Clone()) - assert.NotNil(t, e1.Context()) + // assert.NotNil(t, e1.Context()) } diff --git a/issues_test.go b/issues_test.go index 1ab8385..d4930d5 100644 --- a/issues_test.go +++ b/issues_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sync/atomic" "testing" "time" @@ -124,7 +125,7 @@ func TestIssues_61(t *testing.T) { o.ConsumerNum = 10 o.EnableLock = false }) - defer em.CloseWait() + defer em.MustCloseWait() var listener event.ListenerFunc = func(e event.Event) error { time.Sleep(1 * time.Second) @@ -145,9 +146,9 @@ func TestIssues_61(t *testing.T) { func TestIssues_67(t *testing.T) { em := event.NewManager("test", event.WithConsumerNum(10), event.WithChannelSize(100)) - var counter int + var counter atomic.Int32 em.On("new.member", event.ListenerFunc(func(e event.Event) error { - counter++ + counter.Add(1) fmt.Print(e.Get("memberId"), " ") return nil })) @@ -158,8 +159,8 @@ func TestIssues_67(t *testing.T) { } em.MustCloseWait() // MUST wait all async event done - fmt.Println("Total:", counter) - assert.Eq(t, total, counter) + fmt.Println("Total:", counter.Load()) + assert.Eq(t, int32(total), counter.Load()) } type MyEventI68 struct { @@ -175,7 +176,7 @@ func (e *MyEventI68) CustomData() string { func TestIssues_68(t *testing.T) { e := &MyEventI68{customData: "hello"} e.SetName("e1") - defer event.Reset() + event.Reset() assert.NoErr(t, event.AddEvent(e)) // add listener diff --git a/manager.go b/manager.go index 6e6b1d4..305e2b7 100644 --- a/manager.go +++ b/manager.go @@ -1,7 +1,6 @@ package event import ( - "context" "reflect" "sync" ) @@ -19,8 +18,7 @@ type Manager struct { wg sync.WaitGroup ch chan Event oc sync.Once - err error // latest error - ctx context.Context // default context + err error // latest error // name of the manager name string @@ -48,7 +46,7 @@ func NewM(name string, fns ...OptionFn) *Manager { func NewManager(name string, fns ...OptionFn) *Manager { em := &Manager{ name: name, - ctx: context.Background(), + // ctx: context.Background(), // sample event sample: &BasicEvent{}, // events storage diff --git a/manager_fire.go b/manager_fire.go index 8b02968..57e3942 100644 --- a/manager_fire.go +++ b/manager_fire.go @@ -30,7 +30,7 @@ func (em *Manager) Trigger(name string, params M) (error, Event) { return em.Fir // Fire trigger event by name. if not found listener, will return (nil, nil) func (em *Manager) Fire(name string, params M) (err error, e Event) { // call listeners handle event - e, err = em.fireByNameCtx(em.ctx, name, params, false) + e, err = em.fireByName(name, params, false) return } @@ -65,7 +65,7 @@ func (em *Manager) FireC(name string, params M) { // - will call listeners handle event. // - if not found listener, will return (nil, nil) func (em *Manager) fireByName(name string, params M, useCh bool) (Event, error) { - return em.fireByNameCtx(em.ctx, name, params, useCh) + return em.fireByNameCtx(nil, name, params, useCh) } // fireByNameCtx fire event by name with context @@ -86,6 +86,15 @@ func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, use e = em.newBasicEvent(name, params) } + // warp context + if ctx != nil { + if ec, ok := e.(ContextAble); ok { + ec.WithContext(ctx) + } else { + e = newContextEvent(ctx, e) + } + } + // fire by channel if useCh { em.FireAsync(e) @@ -93,20 +102,24 @@ func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, use } // call listeners handle event - err = em.FireEventCtx(ctx, e) + err = em.fireEvent(e) return } -// FireEvent fire event by given Event instance -func (em *Manager) FireEvent(e Event) error { - if ec, ok := e.(ContextAble); ok { - return em.FireEventCtx(ec.Context(), e) - } - return em.FireEventCtx(em.ctx, e) -} +// FireEvent fire event by given Event instance. +func (em *Manager) FireEvent(e Event) error { return em.fireEvent(e) } // FireEventCtx fire event by given Event instance with context func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) { + if ec, ok := e.(ContextAble); ok { + ec.WithContext(ctx) + return em.fireEvent(ec) + } + return em.fireEvent(newContextEvent(ctx, e)) +} + +// FireEventCtx fire event by given Event instance with context +func (em *Manager) fireEvent(e Event) (err error) { if em.EnableLock { em.Lock() defer em.Unlock() @@ -115,9 +128,11 @@ func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) { // ensure aborted is false. e.Abort(false) name := e.Name() - // set context + + // get context + var ctx context.Context if ec, ok := e.(ContextAble); ok { - ec.WithContext(ctx) + ctx = ec.Context() } // fire group listeners by wildcard. eg "db.user.*" @@ -243,6 +258,9 @@ func (em *Manager) firePathMode(ctx context.Context, name string, e Event) (err * region Fire by channel *************************************************************/ +// FireAsyncCtx async fire event by go channel, and with context TODO need? +// func (em *Manager) FireAsyncCtx(ctx context.Context, e Event) + // FireAsync async fire event by go channel. // // Note: if you want to use this method, you should @@ -303,7 +321,7 @@ func (em *Manager) FireBatch(es ...any) (ers []error) { for _, e := range es { if name, ok := e.(string); ok { err, _ = em.Fire(name, nil) - } else if evt, ok := e.(Event); ok { + } else if evt, ok1 := e.(Event); ok1 { err = em.FireEvent(evt) } // ignore invalid param. @@ -326,8 +344,8 @@ func (em *Manager) AwaitFire(e Event) (err error) { ch := make(chan error) go func(e Event) { - err := em.FireEvent(e) - ch <- err + err1 := em.FireEvent(e) + ch <- err1 }(e) err = <-ch diff --git a/std.go b/std.go index b3654cd..099438f 100644 --- a/std.go +++ b/std.go @@ -60,6 +60,9 @@ func Async(name string, params M) { std.Async(name, params) } // FireAsync fire event by channel func FireAsync(e Event) { std.FireAsync(e) } +// FireAsyncCtx async fire event by go channel, and with context TODO need? +// func FireAsyncCtx(ctx context.Context, e Event) + // Trigger alias of Fire func Trigger(name string, params M) (error, Event) { return std.Fire(name, params) } diff --git a/std_test.go b/std_test.go index ae2e832..b3526a4 100644 --- a/std_test.go +++ b/std_test.go @@ -16,7 +16,7 @@ var emptyListener = func(e event.Event) error { } func TestAddEvent(t *testing.T) { - defer event.Reset() + event.Reset() event.Std().RemoveEvents() // no name @@ -110,6 +110,7 @@ func TestFire(t *testing.T) { } func TestAddSubscriber(t *testing.T) { + event.Reset() event.AddSubscriber(&testSubscriber{}) assert.True(t, event.HasListeners("e1")) @@ -122,14 +123,12 @@ func TestAddSubscriber(t *testing.T) { assert.Panics(t, func() { event.Subscribe(testSubscriber2{}) }) - - event.Reset() } func TestFireEvent(t *testing.T) { - defer event.Reset() - buf := new(bytes.Buffer) + event.Reset() + buf := new(bytes.Buffer) evt1 := event.NewBasic("evt1", nil).Fill(nil, event.M{"n": "inhere"}) assert.NoErr(t, event.AddEvent(evt1)) @@ -166,11 +165,13 @@ func TestFireEvent(t *testing.T) { } return nil })) + evt2 := event.NewEvent("evt2", event.M{"name": "inhere"}) err = event.FireEventCtx(ctx, evt2) assert.NoError(t, err) assert.Equal(t, "ctx-value1", ctxVal) - assert.Equal(t, "ctx-value1", evt2.Context().Value("ctx1")) + // ce, ok := evt2.(event.ContextAble) + // assert.Equal(t, "ctx-value1", evt2.Context().Value("ctx1")) buf.Reset() // AsyncFire @@ -183,7 +184,7 @@ func TestAsync(t *testing.T) { event.Reset() event.Config(event.UsePathMode) - buf := new(bytes.Buffer) + buf := new(safeBuffer) event.On("test", event.ListenerFunc(func(e event.Event) error { buf.WriteString("test:") buf.WriteString(e.Get("key").(string)) @@ -194,8 +195,8 @@ func TestAsync(t *testing.T) { event.Async("test", event.M{"key": "val1"}) te := &testEvent{name: "test", data: event.M{"key": "val2"}} event.FireAsync(te) - assert.NoError(t, event.CloseWait()) + s := buf.String() assert.Contains(t, s, "test:val1|") assert.Contains(t, s, "test:val2|") @@ -232,7 +233,7 @@ func TestFire_notExist(t *testing.T) { } func TestMustFire(t *testing.T) { - defer event.Reset() + event.Reset() event.On("n1", event.ListenerFunc(func(e event.Event) error { return fmt.Errorf("an error") @@ -249,7 +250,7 @@ func TestMustFire(t *testing.T) { } func TestOn(t *testing.T) { - defer event.Reset() + event.Reset() assert.Panics(t, func() { event.On("", event.ListenerFunc(emptyListener), 0) @@ -275,10 +276,11 @@ func TestOn(t *testing.T) { assert.False(t, event.HasListeners("n1")) } func TestOnce(t *testing.T) { - defer event.Reset() + event.Reset() event.Once("evt1", event.ListenerFunc(emptyListener)) assert.True(t, event.Std().HasListeners("evt1")) - event.Trigger("evt1", nil) + err, _ := event.Trigger("evt1", nil) + assert.Nil(t, err) assert.False(t, event.Std().HasListeners("evt1")) }