mirror of
https://github.com/werbenhu/eventbus.git
synced 2025-09-26 20:41:48 +08:00
128 lines
2.8 KiB
Go
128 lines
2.8 KiB
Go
package eventbus
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
type Handler[T any] func(payload T)
|
|
|
|
// Pipe is a wrapper for a channel that allows for asynchronous message passing to subscribers.
|
|
// Use Pipe.Publish() instead of `chan<-` and Pipe.Subscribe() instead of `<-chan`.
|
|
// To pass messages to subscribers synchronously, use Pipe.PublishSync(), which does not use a channel.
|
|
// If multiple subscribers exist, each subscriber will receive the message.
|
|
type Pipe[T any] struct {
|
|
sync.RWMutex
|
|
bufferSize int
|
|
channel chan T
|
|
handlers *CowMap
|
|
closed bool
|
|
stopCh chan any
|
|
}
|
|
|
|
// NewPipe create a unbuffered pipe
|
|
func NewPipe[T any]() *Pipe[T] {
|
|
p := &Pipe[T]{
|
|
bufferSize: -1,
|
|
channel: make(chan T),
|
|
stopCh: make(chan any),
|
|
handlers: NewCowMap(),
|
|
}
|
|
|
|
go p.loop()
|
|
return p
|
|
}
|
|
|
|
// NewPipe create a buffered pipe, bufferSize is the buffer size of the pipe
|
|
// When create a buffered pipe. You can publish into the Pipe without a corresponding concurrent subscriber.
|
|
func NewBufferedPipe[T any](bufferSize int) *Pipe[T] {
|
|
if bufferSize <= 0 {
|
|
bufferSize = 1
|
|
}
|
|
|
|
p := &Pipe[T]{
|
|
bufferSize: bufferSize,
|
|
channel: make(chan T, bufferSize),
|
|
stopCh: make(chan any),
|
|
handlers: NewCowMap(),
|
|
}
|
|
|
|
go p.loop()
|
|
return p
|
|
}
|
|
|
|
// loop loops forever, receiving published message from the pipe, transfer payload to subscriber by calling handlers
|
|
func (p *Pipe[T]) loop() {
|
|
for {
|
|
select {
|
|
case payload := <-p.channel:
|
|
p.handlers.Range(func(key any, fn any) bool {
|
|
fn.(Handler[T])(payload)
|
|
return true
|
|
})
|
|
case <-p.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// subscribe add a handler to a pipe, return error if the pipe is closed.
|
|
func (p *Pipe[T]) Subscribe(handler Handler[T]) error {
|
|
p.RLock()
|
|
defer p.RUnlock()
|
|
if p.closed {
|
|
return ErrChannelClosed
|
|
}
|
|
p.handlers.Store(&handler, handler)
|
|
return nil
|
|
}
|
|
|
|
// unsubscribe removes handler defined for this pipe.
|
|
func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error {
|
|
p.RLock()
|
|
defer p.RUnlock()
|
|
if p.closed {
|
|
return ErrChannelClosed
|
|
}
|
|
p.handlers.Delete(&handler)
|
|
return nil
|
|
}
|
|
|
|
// Publish triggers the handlers defined for this pipe, transferring the payload to the handlers.
|
|
func (p *Pipe[T]) Publish(payload T) error {
|
|
p.RLock()
|
|
defer p.RUnlock()
|
|
if p.closed {
|
|
return ErrChannelClosed
|
|
}
|
|
p.channel <- payload
|
|
return nil
|
|
}
|
|
|
|
// PublishSync triggers the handlers defined for this pipe synchronously, without using a channel.
|
|
// The payload will be passed directly to the handlers.
|
|
func (p *Pipe[T]) PublishSync(payload T) error {
|
|
p.RLock()
|
|
defer p.RUnlock()
|
|
if p.closed {
|
|
return ErrChannelClosed
|
|
}
|
|
p.handlers.Range(func(key any, fn any) bool {
|
|
fn.(Handler[T])(payload)
|
|
return true
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// close closes the pipe
|
|
func (p *Pipe[T]) Close() {
|
|
p.Lock()
|
|
defer p.Unlock()
|
|
|
|
if p.closed {
|
|
return
|
|
}
|
|
p.closed = true
|
|
p.stopCh <- struct{}{}
|
|
close(p.channel)
|
|
}
|