mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-10-03 06:22:22 +08:00
add monitor
This commit is contained in:
166
delayqueue.go
166
delayqueue.go
@@ -3,11 +3,12 @@ package delayqueue
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"log"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// DelayQueue is a message queue supporting delayed/scheduled delivery based on redis
|
||||
@@ -27,12 +28,14 @@ type DelayQueue struct {
|
||||
logger *log.Logger
|
||||
close chan struct{}
|
||||
|
||||
maxConsumeDuration time.Duration
|
||||
msgTTL time.Duration
|
||||
defaultRetryCount uint
|
||||
fetchInterval time.Duration
|
||||
fetchLimit uint
|
||||
concurrent uint
|
||||
maxConsumeDuration time.Duration // default 5 seconds
|
||||
msgTTL time.Duration // default 1 hour
|
||||
defaultRetryCount uint // default 3
|
||||
fetchInterval time.Duration // default 1 second
|
||||
fetchLimit uint // default no limit
|
||||
concurrent uint // default 1, executed serially
|
||||
|
||||
eventListener EventListener
|
||||
}
|
||||
|
||||
// NilErr represents redis nil
|
||||
@@ -40,8 +43,13 @@ var NilErr = errors.New("nil")
|
||||
|
||||
// RedisCli is abstraction for redis client, required commands only not all commands
|
||||
type RedisCli interface {
|
||||
Eval(script string, keys []string, args []interface{}) (interface{}, error) // args should be string, integer or float
|
||||
// Eval sends lua script to redis
|
||||
// args should be string, integer or float
|
||||
// returns string, int64, []interface{} (elements can be string or int64)
|
||||
Eval(script string, keys []string, args []interface{}) (interface{}, error)
|
||||
Set(key string, value string, expiration time.Duration) error
|
||||
// Get represents redis command GET
|
||||
// please NilErr when no such key in redis
|
||||
Get(key string) (string, error)
|
||||
Del(keys []string) error
|
||||
HSet(key string, field string, value string) error
|
||||
@@ -50,6 +58,14 @@ type RedisCli interface {
|
||||
SRem(key string, members []string) error
|
||||
ZAdd(key string, values map[string]float64) error
|
||||
ZRem(key string, fields []string) error
|
||||
ZCard(key string) (int64, error)
|
||||
LLen(key string) (int64, error)
|
||||
|
||||
// Publish used for monitor only
|
||||
Publish(channel string, payload string) error
|
||||
// Subscribe used for monitor only
|
||||
// returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
|
||||
Subscribe(channel string) (payloads <-chan string, close func(), err error)
|
||||
}
|
||||
|
||||
type hashTagKeyOpt int
|
||||
@@ -102,7 +118,7 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in
|
||||
retryCountKey: keyPrefix + ":retry:cnt",
|
||||
garbageKey: keyPrefix + ":garbage",
|
||||
useHashTag: useHashTag,
|
||||
close: make(chan struct{}, 1),
|
||||
close: nil,
|
||||
maxConsumeDuration: 5 * time.Second,
|
||||
msgTTL: time.Hour,
|
||||
logger: log.Default(),
|
||||
@@ -131,7 +147,7 @@ func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
|
||||
return q
|
||||
}
|
||||
|
||||
// WithFetchLimit limits the max number of unack (processing) messages
|
||||
// WithFetchLimit limits the max number of processing messages, 0 means no limit
|
||||
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
|
||||
q.fetchLimit = limit
|
||||
return q
|
||||
@@ -207,6 +223,7 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf
|
||||
if err != nil {
|
||||
return fmt.Errorf("push to pending failed: %v", err)
|
||||
}
|
||||
q.reportEvent(NewMessageEvent, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -219,6 +236,7 @@ func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts .
|
||||
// pending2ReadyScript atomically moves messages from pending to ready
|
||||
// keys: pendingKey, readyKey
|
||||
// argv: currentTime
|
||||
// returns: ready message number
|
||||
const pending2ReadyScript = `
|
||||
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get ready msg
|
||||
if (#msgs == 0) then return end
|
||||
@@ -234,15 +252,20 @@ if (#args2 > 2) then
|
||||
redis.call('LPush', KEYS[2], unpack(args2))
|
||||
end
|
||||
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending
|
||||
return #msgs
|
||||
`
|
||||
|
||||
func (q *DelayQueue) pending2Ready() error {
|
||||
now := time.Now().Unix()
|
||||
keys := []string{q.pendingKey, q.readyKey}
|
||||
_, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now})
|
||||
raw, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now})
|
||||
if err != nil && err != NilErr {
|
||||
return fmt.Errorf("pending2ReadyScript failed: %v", err)
|
||||
}
|
||||
count, ok := raw.(int64)
|
||||
if ok {
|
||||
q.reportEvent(ReadyEvent, int(count))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -270,6 +293,7 @@ func (q *DelayQueue) ready2Unack() (string, error) {
|
||||
if !ok {
|
||||
return "", fmt.Errorf("illegal result: %#v", ret)
|
||||
}
|
||||
q.reportEvent(DeliveredEvent, 1)
|
||||
return str, nil
|
||||
}
|
||||
|
||||
@@ -353,6 +377,7 @@ func (q *DelayQueue) ack(idStr string) error {
|
||||
// msg key has ttl, ignore result of delete
|
||||
_ = q.redisCli.Del([]string{q.genMsgKey(idStr)})
|
||||
_ = q.redisCli.HDel(q.retryCountKey, []string{idStr})
|
||||
q.reportEvent(AckEvent, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -364,6 +389,7 @@ func (q *DelayQueue) nack(idStr string) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("negative ack failed: %v", err)
|
||||
}
|
||||
q.reportEvent(NackEvent, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -373,48 +399,74 @@ func (q *DelayQueue) nack(idStr string) error {
|
||||
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
|
||||
// keys: unackKey, retryCountKey, retryKey, garbageKey
|
||||
// argv: currentTime
|
||||
// returns: {retryMsgs, failMsgs}
|
||||
const unack2RetryScript = `
|
||||
local unack2retry = function(msgs)
|
||||
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
|
||||
local retryMsgs = 0
|
||||
local failMsgs = 0
|
||||
for i,v in ipairs(retryCounts) do
|
||||
local k = msgs[i]
|
||||
if v ~= false and v ~= nil and v ~= '' and tonumber(v) > 0 then
|
||||
redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
|
||||
redis.call("LPush", KEYS[3], k) -- add to retry
|
||||
retryMsgs = retryMsgs + 1
|
||||
else
|
||||
redis.call("HDel", KEYS[2], k) -- del retry count
|
||||
redis.call("SAdd", KEYS[4], k) -- add to garbage
|
||||
failMsgs = failMsgs + 1
|
||||
end
|
||||
end
|
||||
return retryMsgs, failMsgs
|
||||
end
|
||||
|
||||
local retryMsgs = 0
|
||||
local failMsgs = 0
|
||||
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get retry msg
|
||||
if (#msgs == 0) then return end
|
||||
if #msgs < 4000 then
|
||||
unack2retry(msgs)
|
||||
local d1, d2 = unack2retry(msgs)
|
||||
retryMsgs = retryMsgs + d1
|
||||
failMsgs = failMsgs + d2
|
||||
else
|
||||
local buf = {}
|
||||
for _,v in ipairs(msgs) do
|
||||
table.insert(buf, v)
|
||||
if #buf == 4000 then
|
||||
unack2retry(buf)
|
||||
local d1, d2 = unack2retry(buf)
|
||||
retryMsgs = retryMsgs + d1
|
||||
failMsgs = failMsgs + d2
|
||||
buf = {}
|
||||
end
|
||||
end
|
||||
if (#buf > 0) then
|
||||
unack2retry(buf)
|
||||
local d1, d2 = unack2retry(buf)
|
||||
retryMsgs = retryMsgs + d1
|
||||
failMsgs = failMsgs + d2
|
||||
end
|
||||
end
|
||||
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack
|
||||
return {retryMsgs, failMsgs}
|
||||
`
|
||||
|
||||
func (q *DelayQueue) unack2Retry() error {
|
||||
keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
|
||||
now := time.Now()
|
||||
_, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()})
|
||||
raw, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()})
|
||||
if err != nil && err != NilErr {
|
||||
return fmt.Errorf("unack to retry script failed: %v", err)
|
||||
}
|
||||
infos, ok := raw.([]interface{})
|
||||
if ok && len(infos) == 2 {
|
||||
retryCount, ok := infos[0].(int64)
|
||||
if ok {
|
||||
q.reportEvent(RetryEvent, int(retryCount))
|
||||
}
|
||||
failCount, ok := infos[1].(int64)
|
||||
if ok {
|
||||
q.reportEvent(FinalFailedEvent, int(failCount))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -499,6 +551,10 @@ func (q *DelayQueue) consume() error {
|
||||
// StartConsume creates a goroutine to consume message from DelayQueue
|
||||
// use `<-done` to wait consumer stopping
|
||||
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
|
||||
if q.cb == nil {
|
||||
panic("this instance has no callback")
|
||||
}
|
||||
q.close = make(chan struct{}, 1)
|
||||
q.ticker = time.NewTicker(q.fetchInterval)
|
||||
done0 := make(chan struct{})
|
||||
go func() {
|
||||
@@ -526,3 +582,83 @@ func (q *DelayQueue) StopConsume() {
|
||||
q.ticker.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// GetPendingCount returns the number of pending messages
|
||||
func (q *DelayQueue) GetPendingCount() (int64, error) {
|
||||
return q.redisCli.ZCard(q.pendingKey)
|
||||
}
|
||||
|
||||
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered
|
||||
func (q *DelayQueue) GetReadyCount() (int64, error) {
|
||||
return q.redisCli.LLen(q.readyKey)
|
||||
}
|
||||
|
||||
// GetProcessingCount returns the number of messages which are being processed
|
||||
func (q *DelayQueue) GetProcessingCount() (int64, error) {
|
||||
return q.redisCli.ZCard(q.unAckKey)
|
||||
}
|
||||
|
||||
// EventListener which will be called when events occur
|
||||
// This Listener can be used to monitor running status
|
||||
type EventListener interface {
|
||||
// OnEvent will be called when events occur
|
||||
OnEvent(*Event)
|
||||
}
|
||||
|
||||
// ListenEvent register a listener which will be called when events occur,
|
||||
// so it can be used to monitor running status
|
||||
//
|
||||
// But It can ONLY receive events from the CURRENT INSTANCE,
|
||||
// if you want to listen to all events in queue, just use Monitor.ListenEvent
|
||||
//
|
||||
// There can be AT MOST ONE EventListener in an DelayQueue instance.
|
||||
// If you are using customized listener, Monitor will stop working
|
||||
func (q *DelayQueue) ListenEvent(listener EventListener) {
|
||||
q.eventListener = listener
|
||||
}
|
||||
|
||||
// RemoveListener stops reporting events to EventListener
|
||||
func (q *DelayQueue) DisableListener() {
|
||||
q.eventListener = nil
|
||||
}
|
||||
|
||||
func (q *DelayQueue) reportEvent(code int, count int) {
|
||||
listener := q.eventListener // eventListener may be changed during running
|
||||
if listener != nil && count > 0 {
|
||||
event := &Event{
|
||||
Code: code,
|
||||
Timestamp: time.Now().Unix(),
|
||||
MsgCount: count,
|
||||
}
|
||||
listener.OnEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
// pubsubListener receives events and reports them through redis pubsub for monitoring
|
||||
type pubsubListener struct {
|
||||
redisCli RedisCli
|
||||
reportChan string
|
||||
}
|
||||
|
||||
func genReportChannel(name string) string {
|
||||
return "dq:" + name + ":reportEvents"
|
||||
}
|
||||
|
||||
// EnableReport enables reporting to monitor
|
||||
func (q *DelayQueue) EnableReport() {
|
||||
reportChan := genReportChannel(q.name)
|
||||
q.ListenEvent(&pubsubListener{
|
||||
redisCli: q.redisCli,
|
||||
reportChan: reportChan,
|
||||
})
|
||||
}
|
||||
|
||||
// DisableReport stops reporting to monitor
|
||||
func (q *DelayQueue) DisableReport() {
|
||||
q.DisableListener()
|
||||
}
|
||||
|
||||
func (l *pubsubListener) OnEvent(event *Event) {
|
||||
payload := encodeEvent(event)
|
||||
l.redisCli.Publish(l.reportChan, payload)
|
||||
}
|
||||
|
Reference in New Issue
Block a user