Add synchronously way

Add synchronously way
This commit is contained in:
werben
2023-04-25 20:11:55 +08:00
parent 31fbda00bf
commit 409c04d0dc
7 changed files with 268 additions and 65 deletions

View File

@@ -8,7 +8,7 @@
[English](README.md) | [简体中文](README-CN.md)
# EventBus
EventBus 是一个轻量级的事件发布/订阅框架,可以简化 Go 协程之间的通信。
EventBus 是一个轻量级的事件发布/订阅框架,支持同步和异步发布消息,它可以简化 Go 协程之间的通信。
## 安装
@@ -25,12 +25,20 @@ import (
```
## EventBus 是什么?
EventBus 是对多个主题的封装,每个主题对应一个通道。`Publish()` 方法将消息推送到通道,`Subscribe(`) 方法中的handler将处理从通道出来的消息。
EventBus同时支持同步和异步的方式发布消息。
#### 异步的方式
在EventBus里每个主题对应一个通道。`Publish()` 方法将消息推送到通道,`Subscribe(`) 方法中的handler将处理从通道出来的消息。
如果要使用带缓冲的EventBus可以使用 `eventbus.NewBuffered(bufferSize int)` 方法创建带缓冲的EventBus这样会为每个topic都创建一个带缓冲的channel。
EventBus使用一个Copy-On-Write的map管理handler和topic所以不建议在有大量频繁的订阅和取消订阅的业务场景中使用。
#### 同步的方式
同步的方式下EventBus不使用channel而是通过直接调用handler将消息传递给订阅者。如果想同步的方式发布消息使用eventbus.PublishSync()函数即可。
### EventBus 示例
```go
package main
@@ -54,11 +62,13 @@ func main() {
// handler的第二个参数类型必须与 `Publish()` 中的 payload 类型一致。
bus.Subscribe("testtopic", handler)
// Publish() 方法触发为主题定义的handler。`payload` 参数将传递给handler。
// payload 的类型必须与 `Subscribe()` 中handler的第二个参数类型相对应。
// 异步方式发布消息
bus.Publish("testtopic", 100)
// 订阅者异步接收消息。为了确保订阅者可以接收所有消息,这里在取消订阅之前给了一点延迟。
//同步方式发布消息
bus.PublishSync("testtopic", 200)
// 订阅者接收消息。为了确保订阅者可以接收完所有消息的异步消息,这里在取消订阅之前给了一点延迟。
time.Sleep(time.Millisecond)
bus.Unsubscribe("testtopic", handler)
bus.Close()
@@ -91,10 +101,16 @@ func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
//异步方式发布消息
for i := 0; i < 100; i++ {
// eventbus.Subscribe() 将调用全局单例singleton.Publish()方法
eventbus.Publish("testtopic", i)
}
//同步方式发布消息
for i := 100; i < 200; i++ {
// eventbus.Subscribe() 将调用全局单例singleton.Publish()方法
eventbus.Publish("testtopic", i)
}
wg.Done()
}()
wg.Wait()
@@ -114,6 +130,8 @@ Pipe 是通道的一个封装,这里没有主题的概念。订阅者异步接
如果要使用带缓冲的通道,可以使用 `eventbus.NewBufferedPipe[T](bufferSize int)` 方法创建带缓冲的管道。
#### Pipe 示例
```go
package main
@@ -143,14 +161,18 @@ func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
//异步方式发布消息
for i := 0; i < 100; i++ {
pipe.Publish(strconv.Itoa(i))
}
//同步方式发布消息
for i := 100; i < 200; i++ {
pipe.PublishSync(strconv.Itoa(i))
}
wg.Done()
}()
wg.Wait()
// 订阅者异步接收消息。为了确保订阅者可以接收所有消息,这里在取消订阅之前给了一点延迟。
time.Sleep(time.Millisecond)
pipe.Unsubscribe(handler1)
pipe.Unsubscribe(handler2)

View File

@@ -26,14 +26,30 @@ import (
```
## What's eventbus?
EventBus is a wrapper for multiple topics, with each topic corresponding to a channel. The `eventbus.Publish()` method pushes a message to the channel, while the handler in the `eventbus.Subscribe()` method processes the messages coming out of the channel.
If you want to use a buffered EventBus, you can create one using the `eventbus.NewBuffered(bufferSize int)` method, which creates a buffered channel for each topic.
EventBus supports both synchronous and asynchronous message publication. it uses a `Copy-On-Write` map to manage handlers and topics, so it is not recommended for use in scenarios with a large number of frequent subscriptions and unsubscriptions.
EventBus uses a `Copy-On-Write` map to manage handlers and topics, so it is not recommended for use in scenarios with a large number of frequent subscriptions and unsubscriptions.
#### Asynchronous Way
In EventBus, each topic corresponds to a channel. The `Publish()` method pushes the message to the channel, and the handler in the `Subscribe()` method handles the message that comes out of the channel.
If you want to use a buffered EventBus, you can create a buffered EventBus with the `eventbus.NewBuffered(bufferSize int)` method, which will create a buffered channel for each topic.
#### Synchronous Way
In the synchronous way, EventBus does not use channels, but passes payloads to subscribers by calling the handler directly. To publish messages synchronously, use the `eventbus.PublishSync()` function.
### eventbus example
```go
package main
import (
"fmt"
"time"
"github.com/werbenhu/eventbus"
)
func handler(topic string, payload int) {
fmt.Printf("topic:%s, payload:%d\n", topic, payload)
}
@@ -41,25 +57,31 @@ func handler(topic string, payload int) {
func main() {
bus := eventbus.New()
// Subscribe() subscribes to a topic, return an error if the handler is not a function.
// The handler must have two parameters: the first parameter must be a string,
// and the type of the handler's second parameter must be consistent with the type of the payload in `Publish()`
// Subscribe to a topic. Returns an error if the handler is not a function.
// The handler function must have two parameters: the first parameter must be of type string,
// and the second parameter's type must match the type of `payload` in the `Publish()` function.
bus.Subscribe("testtopic", handler)
// Publish() triggers the handlers defined for a topic. The `payload` argument will be passed to the handler.
// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.
// Publish a message asynchronously.
// The `Publish()` function triggers the handler defined for the topic, and passes the `payload` as an argument.
// The type of `payload` must match the type of the second parameter in the handler function defined in `Subscribe()`.
bus.Publish("testtopic", 100)
// Subscribers receive messages asynchronously.
// To ensure that subscribers can receive all messages, there is a delay before unsubscribe
// Publish a message synchronously.
bus.PublishSync("testtopic", 200)
// Wait a bit to ensure that subscribers have received all asynchronous messages before unsubscribing.
time.Sleep(time.Millisecond)
bus.Unsubscribe("testtopic", handler)
// Close the event bus.
bus.Close()
}
```
### Using the global singleton object of EventBus
To make it more convenient to use EventBus, here is a global singleton object of EventBus. The channel inside this object is unbuffered. By directly using `eventbus.Subscribe()`, `eventbus.Publish()`, and `eventbus.Unsubscribe()`, the corresponding methods of this singleton object will be called.
To make it more convenient to use EventBus, here is a global singleton object of EventBus. The channel inside this object is unbuffered. By directly using `eventbus.Subscribe()`, `eventbus.Publish()`, `eventbus.PublishSync()`, and `eventbus.Unsubscribe()`, the corresponding methods of this singleton object will be called.
```go
package main
@@ -77,14 +99,20 @@ func handler(topic string, payload int) {
}
func main() {
// eventbus.Subscribe() will call the singleton.Subscribe() method of the global singleton
// eventbus.Subscribe() will call the global singleton's Subscribe() method
eventbus.Subscribe("testtopic", handler)
var wg sync.WaitGroup
wg.Add(1)
go func() {
// Asynchronously publish messages
for i := 0; i < 100; i++ {
// eventbus.Publish() will call the singleton.Publish() method
// eventbus.Publish() will call the global singleton's Publish() method
eventbus.Publish("testtopic", i)
}
// Synchronously publish messages
for i := 100; i < 200; i++ {
// eventbus.Publish() will call the global singleton's Publish() method
eventbus.Publish("testtopic", i)
}
wg.Done()
@@ -92,10 +120,10 @@ func main() {
wg.Wait()
time.Sleep(time.Millisecond)
// eventbus.Unsubscribe() will call the singleton.Unsubscribe() method
// eventbus.Unsubscribe() will call the global singleton's Unsubscribe() method
eventbus.Unsubscribe("testtopic", handler)
// eventbus.Close() will call the singleton.Close() method
// eventbus.Close() will call the global singleton's Close() method
eventbus.Close()
}
```
@@ -112,41 +140,42 @@ package main
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/werbenhu/eventbus"
)
func handler1(val string) {
fmt.Printf("handler1 val:%s\n", val)
}
func handler2(val string) {
fmt.Printf("handler2 val:%s\n", val)
func handler(topic string, payload int) {
fmt.Printf("topic:%s, payload:%d\n", topic, payload)
}
func main() {
pipe := eventbus.NewPipe[string]()
pipe.Subscribe(handler1)
pipe.Subscribe(handler2)
// eventbus.Subscribe() will call the global singleton's Subscribe() method
eventbus.Subscribe("testtopic", handler)
var wg sync.WaitGroup
wg.Add(1)
go func() {
// Asynchronously publish messages
for i := 0; i < 100; i++ {
pipe.Publish(strconv.Itoa(i))
// eventbus.Publish() will call the global singleton's Publish() method
eventbus.Publish("testtopic", i)
}
// Synchronously publish messages
for i := 100; i < 200; i++ {
// eventbus.Publish() will call the global singleton's Publish() method
eventbus.Publish("testtopic", i)
}
wg.Done()
}()
wg.Wait()
// Subscribers receive messages asynchronously.
// To ensure that subscribers can receive all messages, there is a delay before unsubscribe
time.Sleep(time.Millisecond)
pipe.Unsubscribe(handler1)
pipe.Unsubscribe(handler2)
pipe.Close()
// eventbus.Unsubscribe() will call the global singleton's Unsubscribe() method
eventbus.Unsubscribe("testtopic", handler)
// eventbus.Close() will call the global singleton's Close() method
eventbus.Close()
}
```

View File

@@ -37,32 +37,37 @@ func newChannel(topic string, bufferSize int) *channel {
return c
}
// transfer calls all the handlers in the channel with the given payload.
// It iterates over the handlers in the handlers map to call them with the payload.
func (c *channel) transfer(topic string, payload any) {
var payloadValue reflect.Value
topicValue := reflect.ValueOf(c.topic)
c.handlers.Range(func(key any, fn any) bool {
handler := fn.(*reflect.Value)
typ := handler.Type()
if payload == nil {
// If the parameter passed to the handler is nil,
// it initializes a new payload element based on the
// type of the second parameter of the handler using the reflect package.
payloadValue = reflect.New(typ.In(1)).Elem()
} else {
payloadValue = reflect.ValueOf(payload)
}
(*handler).Call([]reflect.Value{topicValue, payloadValue})
return true
})
}
// loop listens to the channel and calls handlers with payload.
// It receives messages from the channel and then iterates over the handlers
// in the handlers map to call them with the payload.
func (c *channel) loop() {
topic := reflect.ValueOf(c.topic)
for {
select {
case param := <-c.channel:
c.handlers.Range(func(key any, fn any) bool {
var payload reflect.Value
handler := fn.(*reflect.Value)
typ := handler.Type()
if param == nil {
// If the parameter passed to the handler is nil,
// it initializes a new payload element based on the
// type of the second parameter of the handler using the reflect package.
payload = reflect.New(typ.In(1)).Elem()
} else {
payload = reflect.ValueOf(param)
}
(*handler).Call([]reflect.Value{topic, payload})
return true
})
case payload := <-c.channel:
c.transfer(c.topic, payload)
case <-c.stopCh:
return
}
@@ -81,7 +86,22 @@ func (c *channel) subscribe(handler any) error {
return nil
}
// publish triggers the handlers defined for this channel. The `payload` argument will be passed to the handler.
// publishSync triggers the handlers defined for this channel synchronously.
// The payload argument will be passed to the handler.
// It does not use channels and instead directly calls the handler function.
func (c *channel) publishSync(payload any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
c.transfer(c.topic, payload)
return nil
}
// publish triggers the handlers defined for this channel asynchronously.
// The `payload` argument will be passed to the handler.
// It uses the channel to asynchronously call the handler.
func (c *channel) publish(payload any) error {
c.RLock()
defer c.RUnlock()
@@ -179,7 +199,9 @@ func (e *EventBus) Subscribe(topic string, handler any) error {
return ch.(*channel).subscribe(handler)
}
// Publish triggers the handlers defined for a topic. The `payload` argument will be passed to the handler.
// publish triggers the handlers defined for this channel asynchronously.
// The `payload` argument will be passed to the handler.
// It uses the channel to asynchronously call the handler.
// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.
func (e *EventBus) Publish(topic string, payload any) error {
ch, ok := e.channels.Load(topic)
@@ -193,6 +215,21 @@ func (e *EventBus) Publish(topic string, payload any) error {
return ch.(*channel).publish(payload)
}
// publishSync triggers the handlers defined for this channel synchronously.
// The payload argument will be passed to the handler.
// It does not use channels and instead directly calls the handler function.
func (e *EventBus) PublishSync(topic string, payload any) error {
ch, ok := e.channels.Load(topic)
if !ok {
ch = newChannel(topic, e.bufferSize)
e.channels.Store(topic, ch)
go ch.(*channel).loop()
}
return ch.(*channel).publishSync(payload)
}
// Close closes the eventbus
func (e *EventBus) Close() {
e.once.Do(func() {

View File

@@ -112,6 +112,33 @@ func Test_channelPublish(t *testing.T) {
assert.Equal(t, ErrChannelClosed, err)
}
func Test_channelPublishSync(t *testing.T) {
ch := newChannel("test_topic", -1)
assert.NotNil(t, ch)
assert.NotNil(t, ch.channel)
assert.Equal(t, "test_topic", ch.topic)
ch.subscribe(busHandlerOne)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 100; i++ {
err := ch.publish(i)
assert.Nil(t, err)
}
wg.Done()
}()
wg.Wait()
err := ch.publishSync(nil)
assert.Nil(t, err)
time.Sleep(time.Millisecond)
ch.close()
err = ch.publishSync(1)
assert.Equal(t, ErrChannelClosed, err)
}
func Test_New(t *testing.T) {
bus := New()
assert.NotNil(t, bus)
@@ -205,6 +232,31 @@ func Test_EventBusPublish(t *testing.T) {
bus.Close()
}
func Test_EventBusPublishSync(t *testing.T) {
bus := New()
assert.NotNil(t, bus)
assert.Equal(t, -1, bus.bufferSize)
assert.NotNil(t, bus.channels)
err := bus.Publish("testtopic", 1)
assert.Nil(t, err)
err = bus.Subscribe("testtopic", busHandlerOne)
assert.Nil(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 100; i++ {
err := bus.PublishSync("testtopic", i)
assert.Nil(t, err)
}
wg.Done()
}()
wg.Wait()
bus.Close()
}
func BenchmarkEventBusPublish(b *testing.B) {
bus := New()
bus.Subscribe("testtopic", busHandlerOne)
@@ -221,3 +273,20 @@ func BenchmarkEventBusPublish(b *testing.B) {
wg.Wait()
bus.Close()
}
func BenchmarkEventBusPublishSync(b *testing.B) {
bus := New()
bus.Subscribe("testtopic", busHandlerOne)
b.ResetTimer()
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < b.N; i++ {
bus.PublishSync("testtopic", i)
}
wg.Done()
}()
wg.Wait()
bus.Close()
}

24
pipe.go
View File

@@ -6,9 +6,10 @@ import (
type Handler[T any] func(payload T)
// Pipe is a wrapper of channel. Subscribers will receive messages asynchronously.
// You can use Pipe.Publish() instead of chan <- and Pipe.Subscribe() instead of <- chan.
// If there are multiple subscribers, one message will be received by each subscriber.
// Pipe is a wrapper for a channel that allows for asynchronous message passing to subscribers.
// Use Pipe.Publish() instead of `chan<-` and Pipe.Subscribe() instead of `<-chan`.
// To pass messages to subscribers synchronously, use Pipe.PublishSync(), which does not use a channel.
// If multiple subscribers exist, each subscriber will receive the message.
type Pipe[T any] struct {
sync.RWMutex
bufferSize int
@@ -86,7 +87,7 @@ func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error {
return nil
}
// publish trigger handlers defined for this pipe. payload argument will be transferred to handlers.
// Publish triggers the handlers defined for this pipe, transferring the payload to the handlers.
func (p *Pipe[T]) Publish(payload T) error {
p.RLock()
defer p.RUnlock()
@@ -97,6 +98,21 @@ func (p *Pipe[T]) Publish(payload T) error {
return nil
}
// PublishSync triggers the handlers defined for this pipe synchronously, without using a channel.
// The payload will be passed directly to the handlers.
func (p *Pipe[T]) PublishSync(payload T) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.handlers.Range(func(key any, fn any) bool {
fn.(Handler[T])(payload)
return true
})
return nil
}
// close closes the pipe
func (p *Pipe[T]) Close() {
p.Lock()

View File

@@ -94,6 +94,30 @@ func Test_PipePublish(t *testing.T) {
assert.Equal(t, ErrChannelClosed, err)
}
func Test_PipePublishSync(t *testing.T) {
p := NewPipe[int]()
assert.NotNil(t, p)
assert.NotNil(t, p.channel)
err := p.Subscribe(pipeHandlerOne)
time.Sleep(time.Millisecond)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < 1000; i++ {
err := p.PublishSync(i)
assert.Nil(t, err)
}
wg.Done()
}()
wg.Wait()
p.Close()
err = p.PublishSync(1)
assert.Equal(t, ErrChannelClosed, err)
}
func Test_PipeClose(t *testing.T) {
p := NewPipe[int]()
assert.NotNil(t, p)
@@ -114,7 +138,7 @@ func BenchmarkPipePublish(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
pipe.Publish(i)
pipe.PublishSync(i)
}
}

View File

@@ -33,7 +33,13 @@ func Publish(topic string, payload any) error {
return singleton.Publish(topic, payload)
}
// Close closes the singleton
// PublishSync is a synchronous version of Publish that triggers the handlers defined for a topic with the given payload.
// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.
func PublishSync(topic string, payload any) error {
return singleton.Publish(topic, payload)
}
// Close closes the singleton instance of EventBus.
func Close() {
singleton.Close()
}