Files
eventbus2/pipe.go
werben 409c04d0dc Add synchronously way
Add synchronously way
2023-04-25 20:11:55 +08:00

128 lines
2.8 KiB
Go

package eventbus
import (
"sync"
)
type Handler[T any] func(payload T)
// Pipe is a wrapper for a channel that allows for asynchronous message passing to subscribers.
// Use Pipe.Publish() instead of `chan<-` and Pipe.Subscribe() instead of `<-chan`.
// To pass messages to subscribers synchronously, use Pipe.PublishSync(), which does not use a channel.
// If multiple subscribers exist, each subscriber will receive the message.
type Pipe[T any] struct {
sync.RWMutex
bufferSize int
channel chan T
handlers *CowMap
closed bool
stopCh chan any
}
// NewPipe create a unbuffered pipe
func NewPipe[T any]() *Pipe[T] {
p := &Pipe[T]{
bufferSize: -1,
channel: make(chan T),
stopCh: make(chan any),
handlers: NewCowMap(),
}
go p.loop()
return p
}
// NewPipe create a buffered pipe, bufferSize is the buffer size of the pipe
// When create a buffered pipe. You can publish into the Pipe without a corresponding concurrent subscriber.
func NewBufferedPipe[T any](bufferSize int) *Pipe[T] {
if bufferSize <= 0 {
bufferSize = 1
}
p := &Pipe[T]{
bufferSize: bufferSize,
channel: make(chan T, bufferSize),
stopCh: make(chan any),
handlers: NewCowMap(),
}
go p.loop()
return p
}
// loop loops forever, receiving published message from the pipe, transfer payload to subscriber by calling handlers
func (p *Pipe[T]) loop() {
for {
select {
case payload := <-p.channel:
p.handlers.Range(func(key any, fn any) bool {
fn.(Handler[T])(payload)
return true
})
case <-p.stopCh:
return
}
}
}
// subscribe add a handler to a pipe, return error if the pipe is closed.
func (p *Pipe[T]) Subscribe(handler Handler[T]) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.handlers.Store(&handler, handler)
return nil
}
// unsubscribe removes handler defined for this pipe.
func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.handlers.Delete(&handler)
return nil
}
// Publish triggers the handlers defined for this pipe, transferring the payload to the handlers.
func (p *Pipe[T]) Publish(payload T) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.channel <- payload
return nil
}
// PublishSync triggers the handlers defined for this pipe synchronously, without using a channel.
// The payload will be passed directly to the handlers.
func (p *Pipe[T]) PublishSync(payload T) error {
p.RLock()
defer p.RUnlock()
if p.closed {
return ErrChannelClosed
}
p.handlers.Range(func(key any, fn any) bool {
fn.(Handler[T])(payload)
return true
})
return nil
}
// close closes the pipe
func (p *Pipe[T]) Close() {
p.Lock()
defer p.Unlock()
if p.closed {
return
}
p.closed = true
p.stopCh <- struct{}{}
close(p.channel)
}