eventbus
A lightweight eventbus that simplifies communication between goroutines.
Installation
Make sure that go(version 1.18+) is installed on your computer. Type the following command:
go get github.com/werbenhu/eventbus
Import package in your project
import (
"github.com/werbenhu/eventbus"
)
What's eventbus?
EventBus is a wrapper for multiple topics, with each topic corresponding to a channel. The eventbus.Publish()
method pushes a message to the channel, while the handler in the eventbus.Subscribe()
method processes the messages coming out of the channel.
If you want to use a buffered EventBus, you can create one using the eventbus.NewBuffered(bufferSize int)
method, which creates a buffered channel for each topic.
EventBus uses a Copy-On-Write map
to manage handlers and topics, so it is not recommended for use in scenarios with a large number of frequent subscriptions and unsubscriptions.
eventbus example
func handler(topic string, payload int) {
fmt.Printf("topic:%s, payload:%d\n", topic, payload)
}
func main() {
bus := eventbus.New()
// Subscribe() subscribes to a topic, return an error if the handler is not a function.
// The handler must have two parameters: the first parameter must be a string,
// and the type of the handler's second parameter must be consistent with the type of the payload in `Publish()`
bus.Subscribe("testtopic", handler)
// Publish() triggers the handlers defined for a topic. The `payload` argument will be passed to the handler.
// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.
bus.Publish("testtopic", 100)
// Subscribers receive messages asynchronously.
// To ensure that subscribers can receive all messages, there is a delay before unsubscribe
time.Sleep(time.Millisecond)
bus.Unsubscribe("testtopic", handler)
bus.Close()
}
Use Pipe instead of channel
Pipe is a wrapper for a channel where there is no concept of a topic. Subscribers receive messages asynchronously. You can use Pipe.Publish()
instead of chan <-
and Pipe.Subscribe()
instead of <- chan
. If there are multiple subscribers, one message will be received by each subscriber.
If you want to use a buffered channel, you can use eventbus.NewBufferedPipe[T](bufferSize int)
to create a buffered pipe.
pipe example
func handler1(val string) {
fmt.Printf("handler1 val:%s\n", val)
}
func handler2(val string) {
fmt.Printf("handler2 val:%s\n", val)
}
func main() {
pipe := eventbus.NewPipe[string]()
pipe.Subscribe(handler1)
pipe.Subscribe(handler2)
var wg sync.WaitGroup
wg.Add(1)
go func(p *eventbus.Pipe[string]) {
for i := 0; i < 100; i++ {
p.Publish(strconv.Itoa(i))
}
wg.Done()
}(pipe)
wg.Wait()
// Subscribers receive messages asynchronously.
// To ensure that subscribers can receive all messages, there is a delay before unsubscribe
time.Sleep(time.Millisecond)
pipe.Unsubscribe(handler1)
pipe.Unsubscribe(handler2)
pipe.Close()
}