From 409c04d0dc268e163d23a2a25ab0642c29db82ff Mon Sep 17 00:00:00 2001 From: werben Date: Tue, 25 Apr 2023 20:11:55 +0800 Subject: [PATCH] Add synchronously way Add synchronously way --- README-CN.md | 34 ++++++++++++++---- README.md | 91 +++++++++++++++++++++++++++++++----------------- eventbus.go | 81 ++++++++++++++++++++++++++++++------------ eventbus_test.go | 69 ++++++++++++++++++++++++++++++++++++ pipe.go | 24 ++++++++++--- pipe_test.go | 26 +++++++++++++- singleton.go | 8 ++++- 7 files changed, 268 insertions(+), 65 deletions(-) diff --git a/README-CN.md b/README-CN.md index 67e8635..affe326 100644 --- a/README-CN.md +++ b/README-CN.md @@ -8,7 +8,7 @@ [English](README.md) | [简体中文](README-CN.md) # EventBus -EventBus 是一个轻量级的事件发布/订阅框架,可以简化 Go 协程之间的通信。 +EventBus 是一个轻量级的事件发布/订阅框架,支持同步和异步发布消息,它可以简化 Go 协程之间的通信。 ## 安装 @@ -25,12 +25,20 @@ import ( ``` ## EventBus 是什么? -EventBus 是对多个主题的封装,每个主题对应一个通道。`Publish()` 方法将消息推送到通道,`Subscribe(`) 方法中的handler将处理从通道出来的消息。 + +EventBus同时支持同步和异步的方式发布消息。 + +#### 异步的方式 +在EventBus里,每个主题对应一个通道。`Publish()` 方法将消息推送到通道,`Subscribe(`) 方法中的handler将处理从通道出来的消息。 如果要使用带缓冲的EventBus,可以使用 `eventbus.NewBuffered(bufferSize int)` 方法创建带缓冲的EventBus,这样会为每个topic都创建一个带缓冲的channel。 EventBus使用一个Copy-On-Write的map管理handler和topic,所以不建议在有大量频繁的订阅和取消订阅的业务场景中使用。 +#### 同步的方式 +同步的方式下EventBus不使用channel,而是通过直接调用handler将消息传递给订阅者。如果想同步的方式发布消息,使用eventbus.PublishSync()函数即可。 + + ### EventBus 示例 ```go package main @@ -54,11 +62,13 @@ func main() { // handler的第二个参数类型必须与 `Publish()` 中的 payload 类型一致。 bus.Subscribe("testtopic", handler) - // Publish() 方法触发为主题定义的handler。`payload` 参数将传递给handler。 - // payload 的类型必须与 `Subscribe()` 中handler的第二个参数类型相对应。 + // 异步方式发布消息 bus.Publish("testtopic", 100) - // 订阅者异步接收消息。为了确保订阅者可以接收所有消息,这里在取消订阅之前给了一点延迟。 + //同步方式发布消息 + bus.PublishSync("testtopic", 200) + + // 订阅者接收消息。为了确保订阅者可以接收完所有消息的异步消息,这里在取消订阅之前给了一点延迟。 time.Sleep(time.Millisecond) bus.Unsubscribe("testtopic", handler) bus.Close() @@ -91,10 +101,16 @@ func main() { var wg sync.WaitGroup wg.Add(1) go func() { + //异步方式发布消息 for i := 0; i < 100; i++ { // eventbus.Subscribe() 将调用全局单例singleton.Publish()方法 eventbus.Publish("testtopic", i) } + //同步方式发布消息 + for i := 100; i < 200; i++ { + // eventbus.Subscribe() 将调用全局单例singleton.Publish()方法 + eventbus.Publish("testtopic", i) + } wg.Done() }() wg.Wait() @@ -114,6 +130,8 @@ Pipe 是通道的一个封装,这里没有主题的概念。订阅者异步接 如果要使用带缓冲的通道,可以使用 `eventbus.NewBufferedPipe[T](bufferSize int)` 方法创建带缓冲的管道。 + + #### Pipe 示例 ```go package main @@ -143,14 +161,18 @@ func main() { var wg sync.WaitGroup wg.Add(1) go func() { + //异步方式发布消息 for i := 0; i < 100; i++ { pipe.Publish(strconv.Itoa(i)) } + //同步方式发布消息 + for i := 100; i < 200; i++ { + pipe.PublishSync(strconv.Itoa(i)) + } wg.Done() }() wg.Wait() - // 订阅者异步接收消息。为了确保订阅者可以接收所有消息,这里在取消订阅之前给了一点延迟。 time.Sleep(time.Millisecond) pipe.Unsubscribe(handler1) pipe.Unsubscribe(handler2) diff --git a/README.md b/README.md index 3b5ed8c..406ee82 100644 --- a/README.md +++ b/README.md @@ -26,14 +26,30 @@ import ( ``` ## What's eventbus? - EventBus is a wrapper for multiple topics, with each topic corresponding to a channel. The `eventbus.Publish()` method pushes a message to the channel, while the handler in the `eventbus.Subscribe()` method processes the messages coming out of the channel. -If you want to use a buffered EventBus, you can create one using the `eventbus.NewBuffered(bufferSize int)` method, which creates a buffered channel for each topic. +EventBus supports both synchronous and asynchronous message publication. it uses a `Copy-On-Write` map to manage handlers and topics, so it is not recommended for use in scenarios with a large number of frequent subscriptions and unsubscriptions. -EventBus uses a `Copy-On-Write` map to manage handlers and topics, so it is not recommended for use in scenarios with a large number of frequent subscriptions and unsubscriptions. +#### Asynchronous Way + +In EventBus, each topic corresponds to a channel. The `Publish()` method pushes the message to the channel, and the handler in the `Subscribe()` method handles the message that comes out of the channel. + +If you want to use a buffered EventBus, you can create a buffered EventBus with the `eventbus.NewBuffered(bufferSize int)` method, which will create a buffered channel for each topic. + +#### Synchronous Way + +In the synchronous way, EventBus does not use channels, but passes payloads to subscribers by calling the handler directly. To publish messages synchronously, use the `eventbus.PublishSync()` function. ### eventbus example ```go +package main + +import ( + "fmt" + "time" + + "github.com/werbenhu/eventbus" +) + func handler(topic string, payload int) { fmt.Printf("topic:%s, payload:%d\n", topic, payload) } @@ -41,25 +57,31 @@ func handler(topic string, payload int) { func main() { bus := eventbus.New() - // Subscribe() subscribes to a topic, return an error if the handler is not a function. - // The handler must have two parameters: the first parameter must be a string, - // and the type of the handler's second parameter must be consistent with the type of the payload in `Publish()` + // Subscribe to a topic. Returns an error if the handler is not a function. + // The handler function must have two parameters: the first parameter must be of type string, + // and the second parameter's type must match the type of `payload` in the `Publish()` function. bus.Subscribe("testtopic", handler) - // Publish() triggers the handlers defined for a topic. The `payload` argument will be passed to the handler. - // The type of the payload must correspond to the second parameter of the handler in `Subscribe()`. + // Publish a message asynchronously. + // The `Publish()` function triggers the handler defined for the topic, and passes the `payload` as an argument. + // The type of `payload` must match the type of the second parameter in the handler function defined in `Subscribe()`. bus.Publish("testtopic", 100) - // Subscribers receive messages asynchronously. - // To ensure that subscribers can receive all messages, there is a delay before unsubscribe + // Publish a message synchronously. + bus.PublishSync("testtopic", 200) + + // Wait a bit to ensure that subscribers have received all asynchronous messages before unsubscribing. time.Sleep(time.Millisecond) bus.Unsubscribe("testtopic", handler) + + // Close the event bus. bus.Close() } + ``` ### Using the global singleton object of EventBus -To make it more convenient to use EventBus, here is a global singleton object of EventBus. The channel inside this object is unbuffered. By directly using `eventbus.Subscribe()`, `eventbus.Publish()`, and `eventbus.Unsubscribe()`, the corresponding methods of this singleton object will be called. +To make it more convenient to use EventBus, here is a global singleton object of EventBus. The channel inside this object is unbuffered. By directly using `eventbus.Subscribe()`, `eventbus.Publish()`, `eventbus.PublishSync()`, and `eventbus.Unsubscribe()`, the corresponding methods of this singleton object will be called. ```go package main @@ -77,14 +99,20 @@ func handler(topic string, payload int) { } func main() { - // eventbus.Subscribe() will call the singleton.Subscribe() method of the global singleton + // eventbus.Subscribe() will call the global singleton's Subscribe() method eventbus.Subscribe("testtopic", handler) var wg sync.WaitGroup wg.Add(1) go func() { + // Asynchronously publish messages for i := 0; i < 100; i++ { - // eventbus.Publish() will call the singleton.Publish() method + // eventbus.Publish() will call the global singleton's Publish() method + eventbus.Publish("testtopic", i) + } + // Synchronously publish messages + for i := 100; i < 200; i++ { + // eventbus.Publish() will call the global singleton's Publish() method eventbus.Publish("testtopic", i) } wg.Done() @@ -92,10 +120,10 @@ func main() { wg.Wait() time.Sleep(time.Millisecond) - // eventbus.Unsubscribe() will call the singleton.Unsubscribe() method + // eventbus.Unsubscribe() will call the global singleton's Unsubscribe() method eventbus.Unsubscribe("testtopic", handler) - // eventbus.Close() will call the singleton.Close() method + // eventbus.Close() will call the global singleton's Close() method eventbus.Close() } ``` @@ -112,41 +140,42 @@ package main import ( "fmt" - "strconv" "sync" "time" "github.com/werbenhu/eventbus" ) -func handler1(val string) { - fmt.Printf("handler1 val:%s\n", val) -} - -func handler2(val string) { - fmt.Printf("handler2 val:%s\n", val) +func handler(topic string, payload int) { + fmt.Printf("topic:%s, payload:%d\n", topic, payload) } func main() { - pipe := eventbus.NewPipe[string]() - pipe.Subscribe(handler1) - pipe.Subscribe(handler2) + // eventbus.Subscribe() will call the global singleton's Subscribe() method + eventbus.Subscribe("testtopic", handler) var wg sync.WaitGroup wg.Add(1) go func() { + // Asynchronously publish messages for i := 0; i < 100; i++ { - pipe.Publish(strconv.Itoa(i)) + // eventbus.Publish() will call the global singleton's Publish() method + eventbus.Publish("testtopic", i) + } + // Synchronously publish messages + for i := 100; i < 200; i++ { + // eventbus.Publish() will call the global singleton's Publish() method + eventbus.Publish("testtopic", i) } wg.Done() }() wg.Wait() - // Subscribers receive messages asynchronously. - // To ensure that subscribers can receive all messages, there is a delay before unsubscribe time.Sleep(time.Millisecond) - pipe.Unsubscribe(handler1) - pipe.Unsubscribe(handler2) - pipe.Close() + // eventbus.Unsubscribe() will call the global singleton's Unsubscribe() method + eventbus.Unsubscribe("testtopic", handler) + + // eventbus.Close() will call the global singleton's Close() method + eventbus.Close() } ``` diff --git a/eventbus.go b/eventbus.go index 5c1df2c..b4350d4 100644 --- a/eventbus.go +++ b/eventbus.go @@ -37,32 +37,37 @@ func newChannel(topic string, bufferSize int) *channel { return c } +// transfer calls all the handlers in the channel with the given payload. +// It iterates over the handlers in the handlers map to call them with the payload. +func (c *channel) transfer(topic string, payload any) { + var payloadValue reflect.Value + topicValue := reflect.ValueOf(c.topic) + + c.handlers.Range(func(key any, fn any) bool { + handler := fn.(*reflect.Value) + typ := handler.Type() + + if payload == nil { + // If the parameter passed to the handler is nil, + // it initializes a new payload element based on the + // type of the second parameter of the handler using the reflect package. + payloadValue = reflect.New(typ.In(1)).Elem() + } else { + payloadValue = reflect.ValueOf(payload) + } + (*handler).Call([]reflect.Value{topicValue, payloadValue}) + return true + }) +} + // loop listens to the channel and calls handlers with payload. // It receives messages from the channel and then iterates over the handlers // in the handlers map to call them with the payload. func (c *channel) loop() { - topic := reflect.ValueOf(c.topic) for { select { - case param := <-c.channel: - c.handlers.Range(func(key any, fn any) bool { - - var payload reflect.Value - handler := fn.(*reflect.Value) - typ := handler.Type() - - if param == nil { - // If the parameter passed to the handler is nil, - // it initializes a new payload element based on the - // type of the second parameter of the handler using the reflect package. - payload = reflect.New(typ.In(1)).Elem() - } else { - payload = reflect.ValueOf(param) - } - - (*handler).Call([]reflect.Value{topic, payload}) - return true - }) + case payload := <-c.channel: + c.transfer(c.topic, payload) case <-c.stopCh: return } @@ -81,7 +86,22 @@ func (c *channel) subscribe(handler any) error { return nil } -// publish triggers the handlers defined for this channel. The `payload` argument will be passed to the handler. +// publishSync triggers the handlers defined for this channel synchronously. +// The payload argument will be passed to the handler. +// It does not use channels and instead directly calls the handler function. +func (c *channel) publishSync(payload any) error { + c.RLock() + defer c.RUnlock() + if c.closed { + return ErrChannelClosed + } + c.transfer(c.topic, payload) + return nil +} + +// publish triggers the handlers defined for this channel asynchronously. +// The `payload` argument will be passed to the handler. +// It uses the channel to asynchronously call the handler. func (c *channel) publish(payload any) error { c.RLock() defer c.RUnlock() @@ -179,7 +199,9 @@ func (e *EventBus) Subscribe(topic string, handler any) error { return ch.(*channel).subscribe(handler) } -// Publish triggers the handlers defined for a topic. The `payload` argument will be passed to the handler. +// publish triggers the handlers defined for this channel asynchronously. +// The `payload` argument will be passed to the handler. +// It uses the channel to asynchronously call the handler. // The type of the payload must correspond to the second parameter of the handler in `Subscribe()`. func (e *EventBus) Publish(topic string, payload any) error { ch, ok := e.channels.Load(topic) @@ -193,6 +215,21 @@ func (e *EventBus) Publish(topic string, payload any) error { return ch.(*channel).publish(payload) } +// publishSync triggers the handlers defined for this channel synchronously. +// The payload argument will be passed to the handler. +// It does not use channels and instead directly calls the handler function. +func (e *EventBus) PublishSync(topic string, payload any) error { + ch, ok := e.channels.Load(topic) + + if !ok { + ch = newChannel(topic, e.bufferSize) + e.channels.Store(topic, ch) + go ch.(*channel).loop() + } + + return ch.(*channel).publishSync(payload) +} + // Close closes the eventbus func (e *EventBus) Close() { e.once.Do(func() { diff --git a/eventbus_test.go b/eventbus_test.go index b883e6a..592f953 100644 --- a/eventbus_test.go +++ b/eventbus_test.go @@ -112,6 +112,33 @@ func Test_channelPublish(t *testing.T) { assert.Equal(t, ErrChannelClosed, err) } +func Test_channelPublishSync(t *testing.T) { + ch := newChannel("test_topic", -1) + assert.NotNil(t, ch) + assert.NotNil(t, ch.channel) + assert.Equal(t, "test_topic", ch.topic) + ch.subscribe(busHandlerOne) + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + for i := 0; i < 100; i++ { + err := ch.publish(i) + assert.Nil(t, err) + } + wg.Done() + }() + wg.Wait() + + err := ch.publishSync(nil) + assert.Nil(t, err) + time.Sleep(time.Millisecond) + ch.close() + err = ch.publishSync(1) + assert.Equal(t, ErrChannelClosed, err) +} + func Test_New(t *testing.T) { bus := New() assert.NotNil(t, bus) @@ -205,6 +232,31 @@ func Test_EventBusPublish(t *testing.T) { bus.Close() } +func Test_EventBusPublishSync(t *testing.T) { + bus := New() + assert.NotNil(t, bus) + assert.Equal(t, -1, bus.bufferSize) + assert.NotNil(t, bus.channels) + + err := bus.Publish("testtopic", 1) + assert.Nil(t, err) + + err = bus.Subscribe("testtopic", busHandlerOne) + assert.Nil(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + for i := 0; i < 100; i++ { + err := bus.PublishSync("testtopic", i) + assert.Nil(t, err) + } + wg.Done() + }() + wg.Wait() + bus.Close() +} + func BenchmarkEventBusPublish(b *testing.B) { bus := New() bus.Subscribe("testtopic", busHandlerOne) @@ -221,3 +273,20 @@ func BenchmarkEventBusPublish(b *testing.B) { wg.Wait() bus.Close() } + +func BenchmarkEventBusPublishSync(b *testing.B) { + bus := New() + bus.Subscribe("testtopic", busHandlerOne) + + b.ResetTimer() + var wg sync.WaitGroup + wg.Add(1) + go func() { + for i := 0; i < b.N; i++ { + bus.PublishSync("testtopic", i) + } + wg.Done() + }() + wg.Wait() + bus.Close() +} diff --git a/pipe.go b/pipe.go index 1cf3a7f..0bc835b 100644 --- a/pipe.go +++ b/pipe.go @@ -6,9 +6,10 @@ import ( type Handler[T any] func(payload T) -// Pipe is a wrapper of channel. Subscribers will receive messages asynchronously. -// You can use Pipe.Publish() instead of chan <- and Pipe.Subscribe() instead of <- chan. -// If there are multiple subscribers, one message will be received by each subscriber. +// Pipe is a wrapper for a channel that allows for asynchronous message passing to subscribers. +// Use Pipe.Publish() instead of `chan<-` and Pipe.Subscribe() instead of `<-chan`. +// To pass messages to subscribers synchronously, use Pipe.PublishSync(), which does not use a channel. +// If multiple subscribers exist, each subscriber will receive the message. type Pipe[T any] struct { sync.RWMutex bufferSize int @@ -86,7 +87,7 @@ func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error { return nil } -// publish trigger handlers defined for this pipe. payload argument will be transferred to handlers. +// Publish triggers the handlers defined for this pipe, transferring the payload to the handlers. func (p *Pipe[T]) Publish(payload T) error { p.RLock() defer p.RUnlock() @@ -97,6 +98,21 @@ func (p *Pipe[T]) Publish(payload T) error { return nil } +// PublishSync triggers the handlers defined for this pipe synchronously, without using a channel. +// The payload will be passed directly to the handlers. +func (p *Pipe[T]) PublishSync(payload T) error { + p.RLock() + defer p.RUnlock() + if p.closed { + return ErrChannelClosed + } + p.handlers.Range(func(key any, fn any) bool { + fn.(Handler[T])(payload) + return true + }) + return nil +} + // close closes the pipe func (p *Pipe[T]) Close() { p.Lock() diff --git a/pipe_test.go b/pipe_test.go index 48bc566..11b0630 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -94,6 +94,30 @@ func Test_PipePublish(t *testing.T) { assert.Equal(t, ErrChannelClosed, err) } +func Test_PipePublishSync(t *testing.T) { + p := NewPipe[int]() + assert.NotNil(t, p) + assert.NotNil(t, p.channel) + + err := p.Subscribe(pipeHandlerOne) + time.Sleep(time.Millisecond) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + for i := 0; i < 1000; i++ { + err := p.PublishSync(i) + assert.Nil(t, err) + } + wg.Done() + }() + wg.Wait() + + p.Close() + err = p.PublishSync(1) + assert.Equal(t, ErrChannelClosed, err) +} + func Test_PipeClose(t *testing.T) { p := NewPipe[int]() assert.NotNil(t, p) @@ -114,7 +138,7 @@ func BenchmarkPipePublish(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - pipe.Publish(i) + pipe.PublishSync(i) } } diff --git a/singleton.go b/singleton.go index 76a39f3..4faab62 100644 --- a/singleton.go +++ b/singleton.go @@ -33,7 +33,13 @@ func Publish(topic string, payload any) error { return singleton.Publish(topic, payload) } -// Close closes the singleton +// PublishSync is a synchronous version of Publish that triggers the handlers defined for a topic with the given payload. +// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`. +func PublishSync(topic string, payload any) error { + return singleton.Publish(topic, payload) +} + +// Close closes the singleton instance of EventBus. func Close() { singleton.Close() }