Files
eventbus2/eventbus.go

240 lines
6.2 KiB
Go

package eventbus
import (
"reflect"
"sync"
)
// channel is a struct representing a topic and its associated handlers.
type channel struct {
sync.RWMutex
bufferSize int
topic string
channel chan any
handlers *CowMap
closed bool
stopCh chan any
}
// newChannel creates a new channel with a specified topic and buffer size.
// It initializes the handlers map with NewCowMap function and
// starts a goroutine c.loop() to continuously listen to messages in the channel.
func newChannel(topic string, bufferSize int) *channel {
var ch chan any
if bufferSize <= 0 {
ch = make(chan any)
} else {
ch = make(chan any, bufferSize)
}
c := &channel{
topic: topic,
bufferSize: bufferSize,
channel: ch,
handlers: NewCowMap(),
stopCh: make(chan any),
}
go c.loop()
return c
}
// transfer calls all the handlers in the channel with the given payload.
// It iterates over the handlers in the handlers map to call them with the payload.
func (c *channel) transfer(topic string, payload any) {
var payloadValue reflect.Value
topicValue := reflect.ValueOf(c.topic)
c.handlers.Range(func(key any, fn any) bool {
handler := fn.(*reflect.Value)
typ := handler.Type()
if payload == nil {
// If the parameter passed to the handler is nil,
// it initializes a new payload element based on the
// type of the second parameter of the handler using the reflect package.
payloadValue = reflect.New(typ.In(1)).Elem()
} else {
payloadValue = reflect.ValueOf(payload)
}
(*handler).Call([]reflect.Value{topicValue, payloadValue})
return true
})
}
// loop listens to the channel and calls handlers with payload.
// It receives messages from the channel and then iterates over the handlers
// in the handlers map to call them with the payload.
func (c *channel) loop() {
for {
select {
case payload := <-c.channel:
c.transfer(c.topic, payload)
case <-c.stopCh:
return
}
}
}
// subscribe add a handler to a channel, return error if the channel is closed.
func (c *channel) subscribe(handler any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
fn := reflect.ValueOf(handler)
c.handlers.Store(fn.Pointer(), &fn)
return nil
}
// publishSync triggers the handlers defined for this channel synchronously.
// The payload argument will be passed to the handler.
// It does not use channels and instead directly calls the handler function.
func (c *channel) publishSync(payload any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
c.transfer(c.topic, payload)
return nil
}
// publish triggers the handlers defined for this channel asynchronously.
// The `payload` argument will be passed to the handler.
// It uses the channel to asynchronously call the handler.
func (c *channel) publish(payload any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
c.channel <- payload
return nil
}
// unsubscribe removes handler defined for this channel.
func (c *channel) unsubscribe(handler any) error {
c.RLock()
defer c.RUnlock()
if c.closed {
return ErrChannelClosed
}
fn := reflect.ValueOf(handler)
c.handlers.Delete(fn.Pointer())
return nil
}
// close closes a channel
func (c *channel) close() {
c.Lock()
defer c.Unlock()
if c.closed {
return
}
c.closed = true
c.stopCh <- struct{}{}
c.handlers.Clear()
close(c.channel)
}
// EventBus is a container for event topics.
// Each topic corresponds to a channel. `eventbus.Publish()` pushes a message to the channel,
// and the handler in `eventbus.Subscribe()` will process the message coming out of the channel.
type EventBus struct {
channels *CowMap
bufferSize int
once sync.Once
}
// NewBuffered returns new EventBus with a buffered channel.
// The second argument indicate the buffer's length
func NewBuffered(bufferSize int) *EventBus {
if bufferSize <= 0 {
bufferSize = 1
}
return &EventBus{
bufferSize: bufferSize,
channels: NewCowMap(),
}
}
// New returns new EventBus with empty handlers.
func New() *EventBus {
return &EventBus{
bufferSize: -1,
channels: NewCowMap(),
}
}
// Unsubscribe removes handler defined for a topic.
// Returns error if there are no handlers subscribed to the topic.
func (e *EventBus) Unsubscribe(topic string, handler any) error {
ch, ok := e.channels.Load(topic)
if !ok {
return ErrNoSubscriber
}
return ch.(*channel).unsubscribe(handler)
}
// Subscribe subscribes to a topic, return an error if the handler is not a function.
// The handler must have two parameters: the first parameter must be a string,
// and the type of the handler's second parameter must be consistent with the type of the payload in `Publish()`
func (e *EventBus) Subscribe(topic string, handler any) error {
typ := reflect.TypeOf(handler)
if typ.Kind() != reflect.Func {
return ErrHandlerIsNotFunc
}
if typ.NumIn() != 2 {
return ErrHandlerParamNum
}
if typ.In(0).Kind() != reflect.String {
return ErrHandlerFirstParam
}
ch, ok := e.channels.Load(topic)
if !ok {
ch = newChannel(topic, e.bufferSize)
e.channels.Store(topic, ch)
}
return ch.(*channel).subscribe(handler)
}
// publish triggers the handlers defined for this channel asynchronously.
// The `payload` argument will be passed to the handler.
// It uses the channel to asynchronously call the handler.
// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.
func (e *EventBus) Publish(topic string, payload any) error {
ch, ok := e.channels.Load(topic)
if !ok {
ch = newChannel(topic, e.bufferSize)
e.channels.Store(topic, ch)
}
return ch.(*channel).publish(payload)
}
// publishSync triggers the handlers defined for this channel synchronously.
// The payload argument will be passed to the handler.
// It does not use channels and instead directly calls the handler function.
func (e *EventBus) PublishSync(topic string, payload any) error {
ch, ok := e.channels.Load(topic)
if !ok {
ch = newChannel(topic, e.bufferSize)
e.channels.Store(topic, ch)
}
return ch.(*channel).publishSync(payload)
}
// Close closes the eventbus
func (e *EventBus) Close() {
e.once.Do(func() {
e.channels.Range(func(key any, ch any) bool {
ch.(*channel).close()
return true
})
})
}