新增:事件驱动和链路追踪

This commit is contained in:
steden
2024-01-16 15:02:25 +08:00
parent 9b26149628
commit 641e145fd3
5 changed files with 53 additions and 20 deletions

View File

@@ -1,11 +1,13 @@
package eventBus
import (
"github.com/farseer-go/fs/container"
"github.com/farseer-go/fs/core"
"github.com/farseer-go/fs/exception"
"github.com/farseer-go/fs/flog"
"github.com/farseer-go/fs/sonyflake"
"github.com/farseer-go/fs/stopwatch"
"github.com/farseer-go/fs/trace"
"strconv"
"time"
)
@@ -17,6 +19,19 @@ func PublishEvent(eventName string, message any) error {
return flog.Errorf("需要先通过订阅事件后,才能发布事件:%s", eventName)
}
// 这里上下文有可能会切换,所以退出程序时,要重新设置回上下文
traceContext := container.Resolve[trace.IManager]().GetCurTrace()
if traceContext != nil {
defer func() {
trace.CurTraceContext.Set(traceContext)
}()
}
// 事件发布链路
var err error
traceDetail := container.Resolve[trace.IManager]().TraceEventPublish(eventName)
defer func() { traceDetail.End(err) }()
// 定义事件参数
eventArgs := core.EventArgs{
Id: strconv.FormatInt(sonyflake.GenerateId(), 10),
@@ -27,16 +42,19 @@ func PublishEvent(eventName string, message any) error {
}
// 遍历订阅者,并同步执行事件消费
var err error
for _, subscribeFunc := range subscriber.GetValue(eventName) {
for _, s := range subscriber.GetValue(eventName) {
// 创建一个事件消费入口
eventTraceContext := container.Resolve[trace.IManager]().EntryEventConsumer(eventName, s.subscribeName)
try := exception.Try(func() {
sw := stopwatch.StartNew()
subscribeFunc(message, eventArgs)
s.consumerFunc(message, eventArgs)
flog.ComponentInfof("event", "%s耗时%s", eventName, sw.GetMillisecondsText())
})
try.CatchException(func(exp any) {
err = flog.Error(exp)
eventTraceContext.Error(err)
})
eventTraceContext.End()
}
return err
}
@@ -58,15 +76,15 @@ func PublishEventAsync(eventName string, message any) error {
}
// 遍历订阅者,并异步执行事件消费
for _, subscribeFunc := range subscriber.GetValue(eventName) {
go func(subscribeFunc core.ConsumerFunc) {
for _, s := range subscriber.GetValue(eventName) {
go func(s subscribeConsumer) {
try := exception.Try(func() {
subscribeFunc(message, eventArgs)
s.consumerFunc(message, eventArgs)
})
try.CatchException(func(exp any) {
_ = flog.Error(exp)
})
}(subscribeFunc)
}(s)
}
return nil
}

View File

@@ -14,15 +14,24 @@ func (c *registerEvent) Publish(message any) error {
return nil
}
type registerSubscribe struct {
eventName string
}
// RegisterEvent 注册core.IEvent实现
func RegisterEvent(eventName string, fns ...core.ConsumerFunc) {
func RegisterEvent(eventName string, fns ...subscribeConsumer) *registerSubscribe {
// 注册仓储
container.Register(func() core.IEvent {
return &registerEvent{eventName: eventName}
}, eventName)
// 同时订阅消费
for i := 0; i < len(fns); i++ {
Subscribe(eventName, fns[i])
return &registerSubscribe{
eventName: eventName,
}
}
// RegisterSubscribe 注册订阅者
func (receiver *registerSubscribe) RegisterSubscribe(subscribeName string, consumerFunc core.ConsumerFunc) *registerSubscribe {
Subscribe(receiver.eventName, subscribeName, consumerFunc)
return receiver
}

View File

@@ -5,10 +5,15 @@ import (
"github.com/farseer-go/fs/core"
)
type subscribeConsumer struct {
subscribeName string // 消费者名称
consumerFunc core.ConsumerFunc // 消费者处理函数
}
// 订阅者
var subscriber collections.Dictionary[string, []core.ConsumerFunc]
var subscriber collections.Dictionary[string, []subscribeConsumer]
// Subscribe 订阅事件
func Subscribe(eventName string, fn core.ConsumerFunc) {
subscriber.Add(eventName, append(subscriber.GetValue(eventName), fn))
func Subscribe(eventName, subscribeName string, consumerFunc core.ConsumerFunc) {
subscriber.Add(eventName, append(subscriber.GetValue(eventName), subscribeConsumer{subscribeName: subscribeName, consumerFunc: consumerFunc}))
}

View File

@@ -16,17 +16,17 @@ type testEventPublish struct {
}
func TestPublishEvent(t *testing.T) {
eventBus.Subscribe("test_event_subscribe", func(message any, ea core.EventArgs) {
eventBus.Subscribe("test_event_subscribe", "test1", func(message any, ea core.EventArgs) {
event := message.(testEventPublish)
atomic.AddInt32(&count, event.count+1)
})
eventBus.Subscribe("test_event_subscribe", func(message any, ea core.EventArgs) {
eventBus.Subscribe("test_event_subscribe", "test2", func(message any, ea core.EventArgs) {
event := message.(testEventPublish)
atomic.AddInt32(&count, event.count+2)
})
eventBus.Subscribe("test_event_subscribe", func(message any, ea core.EventArgs) {
eventBus.Subscribe("test_event_subscribe", "test3", func(message any, ea core.EventArgs) {
panic("")
})

View File

@@ -11,9 +11,10 @@ import (
var count1 int
func TestRegisterEvent(t *testing.T) {
eventBus.RegisterEvent("testRegisterEvent", func(message any, ea core.EventArgs) {
count1 = message.(int)
})
eventBus.RegisterEvent("testRegisterEvent").
RegisterSubscribe("测试", func(message any, ea core.EventArgs) {
count1 = message.(int)
})
_ = container.Resolve[core.IEvent]("testRegisterEvent").Publish(3)