👔 up: Update adjusts contextual event handling logic

- fix some unit test error on concurrent run
This commit is contained in:
inhere
2025-08-30 13:22:48 +08:00
parent b4e3765eb3
commit 1801b1b525
7 changed files with 87 additions and 40 deletions

View File

@@ -100,6 +100,7 @@ type Event interface {
// Cloneable interface. event can be cloned.
//
// Check and convert:
//
// if ec, ok := e.(Cloneable); ok {}
type Cloneable interface {
Event
@@ -109,6 +110,7 @@ type Cloneable interface {
// ContextAble context-able event interface
//
// Check and convert in listener:
//
// if ec, ok := e.(ContextAble); ok {}
type ContextAble interface {
Event
@@ -121,7 +123,6 @@ type FactoryFunc func() Event
// BasicEvent a built-in implements Event interface
type BasicEvent struct {
ContextTrait
// event name
name string
// user data.
@@ -238,4 +239,14 @@ func (t *ContextTrait) Context() context.Context {
// WithContext set context
func (t *ContextTrait) WithContext(ctx context.Context) {
t.ctx = ctx
}
}
// ContextEvent event with context
type contextEvent struct {
Event
ContextTrait
}
func newContextEvent(ctx context.Context, e Event) ContextAble {
return &contextEvent{Event: e, ContextTrait: ContextTrait{ctx: ctx}}
}

View File

@@ -1,13 +1,27 @@
package event_test
import (
"bytes"
"fmt"
"sync"
"testing"
"github.com/gookit/event"
"github.com/gookit/goutil/testutil/assert"
)
// Thread-safe buffer for testing
type safeBuffer struct {
bytes.Buffer
mu sync.Mutex
}
func (sb *safeBuffer) Write(p []byte) (n int, err error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.Buffer.Write(p)
}
type testListener struct {
userData string
}
@@ -119,5 +133,5 @@ func TestEvent(t *testing.T) {
e1.Set("k", "v")
assert.Equal(t, "v", e1.Get("k"))
// assert.NotEmpty(t, e1.Clone())
assert.NotNil(t, e1.Context())
// assert.NotNil(t, e1.Context())
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"sync/atomic"
"testing"
"time"
@@ -124,7 +125,7 @@ func TestIssues_61(t *testing.T) {
o.ConsumerNum = 10
o.EnableLock = false
})
defer em.CloseWait()
defer em.MustCloseWait()
var listener event.ListenerFunc = func(e event.Event) error {
time.Sleep(1 * time.Second)
@@ -145,9 +146,9 @@ func TestIssues_61(t *testing.T) {
func TestIssues_67(t *testing.T) {
em := event.NewManager("test", event.WithConsumerNum(10), event.WithChannelSize(100))
var counter int
var counter atomic.Int32
em.On("new.member", event.ListenerFunc(func(e event.Event) error {
counter++
counter.Add(1)
fmt.Print(e.Get("memberId"), " ")
return nil
}))
@@ -158,8 +159,8 @@ func TestIssues_67(t *testing.T) {
}
em.MustCloseWait() // MUST wait all async event done
fmt.Println("Total:", counter)
assert.Eq(t, total, counter)
fmt.Println("Total:", counter.Load())
assert.Eq(t, int32(total), counter.Load())
}
type MyEventI68 struct {
@@ -175,7 +176,7 @@ func (e *MyEventI68) CustomData() string {
func TestIssues_68(t *testing.T) {
e := &MyEventI68{customData: "hello"}
e.SetName("e1")
defer event.Reset()
event.Reset()
assert.NoErr(t, event.AddEvent(e))
// add listener

View File

@@ -1,7 +1,6 @@
package event
import (
"context"
"reflect"
"sync"
)
@@ -19,8 +18,7 @@ type Manager struct {
wg sync.WaitGroup
ch chan Event
oc sync.Once
err error // latest error
ctx context.Context // default context
err error // latest error
// name of the manager
name string
@@ -48,7 +46,7 @@ func NewM(name string, fns ...OptionFn) *Manager {
func NewManager(name string, fns ...OptionFn) *Manager {
em := &Manager{
name: name,
ctx: context.Background(),
// ctx: context.Background(),
// sample event
sample: &BasicEvent{},
// events storage

View File

@@ -30,7 +30,7 @@ func (em *Manager) Trigger(name string, params M) (error, Event) { return em.Fir
// Fire trigger event by name. if not found listener, will return (nil, nil)
func (em *Manager) Fire(name string, params M) (err error, e Event) {
// call listeners handle event
e, err = em.fireByNameCtx(em.ctx, name, params, false)
e, err = em.fireByName(name, params, false)
return
}
@@ -65,7 +65,7 @@ func (em *Manager) FireC(name string, params M) {
// - will call listeners handle event.
// - if not found listener, will return (nil, nil)
func (em *Manager) fireByName(name string, params M, useCh bool) (Event, error) {
return em.fireByNameCtx(em.ctx, name, params, useCh)
return em.fireByNameCtx(nil, name, params, useCh)
}
// fireByNameCtx fire event by name with context
@@ -86,6 +86,15 @@ func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, use
e = em.newBasicEvent(name, params)
}
// warp context
if ctx != nil {
if ec, ok := e.(ContextAble); ok {
ec.WithContext(ctx)
} else {
e = newContextEvent(ctx, e)
}
}
// fire by channel
if useCh {
em.FireAsync(e)
@@ -93,20 +102,24 @@ func (em *Manager) fireByNameCtx(ctx context.Context, name string, params M, use
}
// call listeners handle event
err = em.FireEventCtx(ctx, e)
err = em.fireEvent(e)
return
}
// FireEvent fire event by given Event instance
func (em *Manager) FireEvent(e Event) error {
if ec, ok := e.(ContextAble); ok {
return em.FireEventCtx(ec.Context(), e)
}
return em.FireEventCtx(em.ctx, e)
}
// FireEvent fire event by given Event instance.
func (em *Manager) FireEvent(e Event) error { return em.fireEvent(e) }
// FireEventCtx fire event by given Event instance with context
func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) {
if ec, ok := e.(ContextAble); ok {
ec.WithContext(ctx)
return em.fireEvent(ec)
}
return em.fireEvent(newContextEvent(ctx, e))
}
// FireEventCtx fire event by given Event instance with context
func (em *Manager) fireEvent(e Event) (err error) {
if em.EnableLock {
em.Lock()
defer em.Unlock()
@@ -115,9 +128,11 @@ func (em *Manager) FireEventCtx(ctx context.Context, e Event) (err error) {
// ensure aborted is false.
e.Abort(false)
name := e.Name()
// set context
// get context
var ctx context.Context
if ec, ok := e.(ContextAble); ok {
ec.WithContext(ctx)
ctx = ec.Context()
}
// fire group listeners by wildcard. eg "db.user.*"
@@ -243,6 +258,9 @@ func (em *Manager) firePathMode(ctx context.Context, name string, e Event) (err
* region Fire by channel
*************************************************************/
// FireAsyncCtx async fire event by go channel, and with context TODO need?
// func (em *Manager) FireAsyncCtx(ctx context.Context, e Event)
// FireAsync async fire event by go channel.
//
// Note: if you want to use this method, you should
@@ -303,7 +321,7 @@ func (em *Manager) FireBatch(es ...any) (ers []error) {
for _, e := range es {
if name, ok := e.(string); ok {
err, _ = em.Fire(name, nil)
} else if evt, ok := e.(Event); ok {
} else if evt, ok1 := e.(Event); ok1 {
err = em.FireEvent(evt)
} // ignore invalid param.
@@ -326,8 +344,8 @@ func (em *Manager) AwaitFire(e Event) (err error) {
ch := make(chan error)
go func(e Event) {
err := em.FireEvent(e)
ch <- err
err1 := em.FireEvent(e)
ch <- err1
}(e)
err = <-ch

3
std.go
View File

@@ -60,6 +60,9 @@ func Async(name string, params M) { std.Async(name, params) }
// FireAsync fire event by channel
func FireAsync(e Event) { std.FireAsync(e) }
// FireAsyncCtx async fire event by go channel, and with context TODO need?
// func FireAsyncCtx(ctx context.Context, e Event)
// Trigger alias of Fire
func Trigger(name string, params M) (error, Event) { return std.Fire(name, params) }

View File

@@ -16,7 +16,7 @@ var emptyListener = func(e event.Event) error {
}
func TestAddEvent(t *testing.T) {
defer event.Reset()
event.Reset()
event.Std().RemoveEvents()
// no name
@@ -110,6 +110,7 @@ func TestFire(t *testing.T) {
}
func TestAddSubscriber(t *testing.T) {
event.Reset()
event.AddSubscriber(&testSubscriber{})
assert.True(t, event.HasListeners("e1"))
@@ -122,14 +123,12 @@ func TestAddSubscriber(t *testing.T) {
assert.Panics(t, func() {
event.Subscribe(testSubscriber2{})
})
event.Reset()
}
func TestFireEvent(t *testing.T) {
defer event.Reset()
buf := new(bytes.Buffer)
event.Reset()
buf := new(bytes.Buffer)
evt1 := event.NewBasic("evt1", nil).Fill(nil, event.M{"n": "inhere"})
assert.NoErr(t, event.AddEvent(evt1))
@@ -166,11 +165,13 @@ func TestFireEvent(t *testing.T) {
}
return nil
}))
evt2 := event.NewEvent("evt2", event.M{"name": "inhere"})
err = event.FireEventCtx(ctx, evt2)
assert.NoError(t, err)
assert.Equal(t, "ctx-value1", ctxVal)
assert.Equal(t, "ctx-value1", evt2.Context().Value("ctx1"))
// ce, ok := evt2.(event.ContextAble)
// assert.Equal(t, "ctx-value1", evt2.Context().Value("ctx1"))
buf.Reset()
// AsyncFire
@@ -183,7 +184,7 @@ func TestAsync(t *testing.T) {
event.Reset()
event.Config(event.UsePathMode)
buf := new(bytes.Buffer)
buf := new(safeBuffer)
event.On("test", event.ListenerFunc(func(e event.Event) error {
buf.WriteString("test:")
buf.WriteString(e.Get("key").(string))
@@ -194,8 +195,8 @@ func TestAsync(t *testing.T) {
event.Async("test", event.M{"key": "val1"})
te := &testEvent{name: "test", data: event.M{"key": "val2"}}
event.FireAsync(te)
assert.NoError(t, event.CloseWait())
s := buf.String()
assert.Contains(t, s, "test:val1|")
assert.Contains(t, s, "test:val2|")
@@ -232,7 +233,7 @@ func TestFire_notExist(t *testing.T) {
}
func TestMustFire(t *testing.T) {
defer event.Reset()
event.Reset()
event.On("n1", event.ListenerFunc(func(e event.Event) error {
return fmt.Errorf("an error")
@@ -249,7 +250,7 @@ func TestMustFire(t *testing.T) {
}
func TestOn(t *testing.T) {
defer event.Reset()
event.Reset()
assert.Panics(t, func() {
event.On("", event.ListenerFunc(emptyListener), 0)
@@ -275,10 +276,11 @@ func TestOn(t *testing.T) {
assert.False(t, event.HasListeners("n1"))
}
func TestOnce(t *testing.T) {
defer event.Reset()
event.Reset()
event.Once("evt1", event.ListenerFunc(emptyListener))
assert.True(t, event.Std().HasListeners("evt1"))
event.Trigger("evt1", nil)
err, _ := event.Trigger("evt1", nil)
assert.Nil(t, err)
assert.False(t, event.Std().HasListeners("evt1"))
}