mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-09-27 03:26:05 +08:00
80 lines
2.1 KiB
Go
80 lines
2.1 KiB
Go
package delayqueue
|
|
|
|
import (
|
|
"log"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
// Monitor can get running status and events of DelayQueue
|
|
type Monitor struct {
|
|
inner *DelayQueue
|
|
}
|
|
|
|
// NewMonitor0 creates a new Monitor by a RedisCli instance
|
|
func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor {
|
|
return &Monitor{
|
|
inner: NewQueue0(name, cli, opts...),
|
|
}
|
|
}
|
|
|
|
// NewMonitor creates a new Monitor by a *redis.Client
|
|
func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor {
|
|
rc := &redisV9Wrapper{
|
|
inner: cli,
|
|
}
|
|
return NewMonitor0(name, rc, opts...)
|
|
}
|
|
|
|
// NewMonitor creates a new Monitor by a *redis.ClusterClient
|
|
func NewMonitorOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *Monitor {
|
|
rc := &redisClusterWrapper{
|
|
inner: cli,
|
|
}
|
|
return NewMonitor0(name, rc, opts...)
|
|
}
|
|
|
|
// WithLogger customizes logger for queue
|
|
func (m *Monitor) WithLogger(logger *log.Logger) *Monitor {
|
|
m.inner.logger = logger
|
|
return m
|
|
}
|
|
|
|
// GetPendingCount returns the number of messages which delivery time has not arrived
|
|
func (m *Monitor) GetPendingCount() (int64, error) {
|
|
return m.inner.GetPendingCount()
|
|
}
|
|
|
|
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet
|
|
func (m *Monitor) GetReadyCount() (int64, error) {
|
|
return m.inner.GetReadyCount()
|
|
}
|
|
|
|
// GetProcessingCount returns the number of messages which are being processed
|
|
func (m *Monitor) GetProcessingCount() (int64, error) {
|
|
return m.inner.GetProcessingCount()
|
|
}
|
|
|
|
// ListenEvent register a listener which will be called when events occured in this queue
|
|
// so it can be used to monitor running status
|
|
// returns: close function, error
|
|
func (m *Monitor) ListenEvent(listener EventListener) (func(), error) {
|
|
reportChan := genReportChannel(m.inner.name)
|
|
sub, closer, err := m.inner.redisCli.Subscribe(reportChan)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
go func() {
|
|
for payload := range sub {
|
|
event, err := decodeEvent(payload)
|
|
if err != nil {
|
|
m.inner.logger.Printf("[listen event] %v\n", event)
|
|
} else {
|
|
listener.OnEvent(event)
|
|
}
|
|
}
|
|
}()
|
|
return closer, nil
|
|
}
|
|
|