package delayqueue import ( "context" "fmt" "github.com/go-redis/redis/v8" "github.com/google/uuid" "log" "sync" "time" ) // DelayQueue is a message queue supporting delayed/scheduled delivery based on redis type DelayQueue struct { // name for this Queue. Make sure the name is unique in redis database name string redisCli *redis.Client cb func(string) bool pendingKey string // sorted set: message id -> delivery time readyKey string // list unAckKey string // sorted set: message id -> retry time retryKey string // list retryCountKey string // hash: message id -> remain retry count garbageKey string // set: message id ticker *time.Ticker logger *log.Logger close chan struct{} maxConsumeDuration time.Duration msgTTL time.Duration defaultRetryCount uint fetchInterval time.Duration fetchLimit uint concurrent uint } type hashTagKeyOpt int // UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. // If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue // WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis // see more: https://redis.io/docs/reference/cluster-spec/#hash-tags func UseHashTagKey() interface{} { return hashTagKeyOpt(1) } // NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message // callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts ...interface{}) *DelayQueue { if name == "" { panic("name is required") } if cli == nil { panic("cli is required") } if callback == nil { panic("callback is required") } useHashTag := false for _, opt := range opts { switch opt.(type) { case hashTagKeyOpt: useHashTag = true } } var keyPrefix string if useHashTag { keyPrefix = "{dp:" + name + "}" } else { keyPrefix = "dp:" + name } return &DelayQueue{ name: name, redisCli: cli, cb: callback, pendingKey: keyPrefix + ":pending", readyKey: keyPrefix + ":ready", unAckKey: keyPrefix + ":unack", retryKey: keyPrefix + ":retry", retryCountKey: keyPrefix + ":retry:cnt", garbageKey: keyPrefix + ":garbage", close: make(chan struct{}, 1), maxConsumeDuration: 5 * time.Second, msgTTL: time.Hour, logger: log.Default(), defaultRetryCount: 3, fetchInterval: time.Second, concurrent: 1, } } // WithLogger customizes logger for queue func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue { q.logger = logger return q } // WithFetchInterval customizes the interval at which consumer fetch message from redis func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue { q.fetchInterval = d return q } // WithMaxConsumeDuration customizes max consume duration // If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue { q.maxConsumeDuration = d return q } // WithFetchLimit limits the max number of unack (processing) messages func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue { q.fetchLimit = limit return q } // WithConcurrent sets the number of concurrent consumers func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue { if c == 0 { return q } q.concurrent = c return q } // WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue // use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue { q.defaultRetryCount = count return q } func (q *DelayQueue) genMsgKey(idStr string) string { return "dp:" + q.name + ":msg:" + idStr } type retryCountOpt int // WithRetryCount set retry count for a msg // example: queue.SendDelayMsg(payload, duration, delayqueue.WithRetryCount(3)) func WithRetryCount(count int) interface{} { return retryCountOpt(count) } type msgTTLOpt time.Duration // WithMsgTTL set ttl for a msg // example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour)) func WithMsgTTL(d time.Duration) interface{} { return msgTTLOpt(d) } // SendScheduleMsg submits a message delivered at given time func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error { // parse options retryCount := q.defaultRetryCount for _, opt := range opts { switch o := opt.(type) { case retryCountOpt: retryCount = uint(o) case msgTTLOpt: q.msgTTL = time.Duration(o) } } // generate id idStr := uuid.Must(uuid.NewRandom()).String() ctx := context.Background() now := time.Now() // store msg msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL err := q.redisCli.Set(ctx, q.genMsgKey(idStr), payload, msgTTL).Err() if err != nil { return fmt.Errorf("store msg failed: %v", err) } // store retry count err = q.redisCli.HSet(ctx, q.retryCountKey, idStr, retryCount).Err() if err != nil { return fmt.Errorf("store retry count failed: %v", err) } // put to pending err = q.redisCli.ZAdd(ctx, q.pendingKey, &redis.Z{Score: float64(t.Unix()), Member: idStr}).Err() if err != nil { return fmt.Errorf("push to pending failed: %v", err) } return nil } // SendDelayMsg submits a message delivered after given duration func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error { t := time.Now().Add(duration) return q.SendScheduleMsg(payload, t, opts...) } // pending2ReadyScript atomically moves messages from pending to ready // keys: pendingKey, readyKey // argv: currentTime const pending2ReadyScript = ` local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get ready msg if (#msgs == 0) then return end local args2 = {'LPush', KEYS[2]} -- push into ready for _,v in ipairs(msgs) do table.insert(args2, v) if (#args2 == 4000) then redis.call(unpack(args2)) args2 = {'LPush', KEYS[2]} end end if (#args2 > 2) then redis.call(unpack(args2)) end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending ` func (q *DelayQueue) pending2Ready() error { now := time.Now().Unix() ctx := context.Background() keys := []string{q.pendingKey, q.readyKey} err := q.redisCli.Eval(ctx, pending2ReadyScript, keys, now).Err() if err != nil && err != redis.Nil { return fmt.Errorf("pending2ReadyScript failed: %v", err) } return nil } // ready2UnackScript atomically moves messages from ready to unack // keys: readyKey/retryKey, unackKey // argv: retryTime const ready2UnackScript = ` local msg = redis.call('RPop', KEYS[1]) if (not msg) then return end redis.call('ZAdd', KEYS[2], ARGV[1], msg) return msg ` func (q *DelayQueue) ready2Unack() (string, error) { retryTime := time.Now().Add(q.maxConsumeDuration).Unix() ctx := context.Background() keys := []string{q.readyKey, q.unAckKey} ret, err := q.redisCli.Eval(ctx, ready2UnackScript, keys, retryTime).Result() if err == redis.Nil { return "", err } if err != nil { return "", fmt.Errorf("ready2UnackScript failed: %v", err) } str, ok := ret.(string) if !ok { return "", fmt.Errorf("illegal result: %#v", ret) } return str, nil } func (q *DelayQueue) retry2Unack() (string, error) { retryTime := time.Now().Add(q.maxConsumeDuration).Unix() ctx := context.Background() keys := []string{q.retryKey, q.unAckKey} ret, err := q.redisCli.Eval(ctx, ready2UnackScript, keys, retryTime, q.retryKey, q.unAckKey).Result() if err == redis.Nil { return "", redis.Nil } if err != nil { return "", fmt.Errorf("ready2UnackScript failed: %v", err) } str, ok := ret.(string) if !ok { return "", fmt.Errorf("illegal result: %#v", ret) } return str, nil } func (q *DelayQueue) callback(idStr string) error { ctx := context.Background() payload, err := q.redisCli.Get(ctx, q.genMsgKey(idStr)).Result() if err == redis.Nil { return nil } if err != nil { // Is an IO error? return fmt.Errorf("get message payload failed: %v", err) } ack := q.cb(payload) if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } return err } // batchCallback calls DelayQueue.callback in batch. callback is executed concurrently according to property DelayQueue.concurrent // batchCallback must wait all callback finished, otherwise the actual number of processing messages may beyond DelayQueue.FetchLimit func (q *DelayQueue) batchCallback(ids []string) { if len(ids) == 1 || q.concurrent == 1 { for _, id := range ids { err := q.callback(id) if err != nil { q.logger.Printf("consume msg %s failed: %v", id, err) } } return } ch := make(chan string, len(ids)) for _, id := range ids { ch <- id } close(ch) wg := sync.WaitGroup{} concurrent := int(q.concurrent) if concurrent > len(ids) { // too many goroutines is no use concurrent = len(ids) } wg.Add(concurrent) for i := 0; i < concurrent; i++ { go func() { defer wg.Done() for id := range ch { err := q.callback(id) if err != nil { q.logger.Printf("consume msg %s failed: %v", id, err) } } }() } wg.Wait() } func (q *DelayQueue) ack(idStr string) error { ctx := context.Background() err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err() if err != nil { return fmt.Errorf("remove from unack failed: %v", err) } // msg key has ttl, ignore result of delete _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err() q.redisCli.HDel(ctx, q.retryCountKey, idStr) return nil } func (q *DelayQueue) nack(idStr string) error { ctx := context.Background() // update retry time as now, unack2Retry will move it to retry immediately err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{ Member: idStr, Score: float64(time.Now().Unix()), }).Err() if err != nil { return fmt.Errorf("negative ack failed: %v", err) } return nil } // unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0, // and moves messages from unack to garbage which retry count is 0 // Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval // Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly // keys: unackKey, retryCountKey, retryKey, garbageKey // argv: currentTime const unack2RetryScript = ` local unack2retry = function(msgs) local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count for i,v in ipairs(retryCounts) do local k = msgs[i] if v ~= nil and v ~= '' and tonumber(v) > 0 then redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count redis.call("LPush", KEYS[3], k) -- add to retry else redis.call("HDel", KEYS[2], k) -- del retry count redis.call("SAdd", KEYS[4], k) -- add to garbage end end end local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get retry msg if (#msgs == 0) then return end if #msgs < 4000 then unack2retry(msgs) else local buf = {} for _,v in ipairs(msgs) do table.insert(buf, v) if #buf == 4000 then unack2retry(buf) buf = {} end end if (#buf > 0) then unack2retry(buf) end end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack ` func (q *DelayQueue) unack2Retry() error { ctx := context.Background() keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey} now := time.Now() err := q.redisCli.Eval(ctx, unack2RetryScript, keys, now.Unix()).Err() if err != nil && err != redis.Nil { return fmt.Errorf("unack to retry script failed: %v", err) } return nil } func (q *DelayQueue) garbageCollect() error { ctx := context.Background() msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result() if err != nil { return fmt.Errorf("smembers failed: %v", err) } if len(msgIds) == 0 { return nil } // allow concurrent clean msgKeys := make([]string, 0, len(msgIds)) for _, idStr := range msgIds { msgKeys = append(msgKeys, q.genMsgKey(idStr)) } err = q.redisCli.Del(ctx, msgKeys...).Err() if err != nil && err != redis.Nil { return fmt.Errorf("del msgs failed: %v", err) } err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err() if err != nil && err != redis.Nil { return fmt.Errorf("remove from garbage key failed: %v", err) } return nil } func (q *DelayQueue) consume() error { // pending to ready err := q.pending2Ready() if err != nil { return err } // consume ids := make([]string, 0, q.fetchLimit) for { idStr, err := q.ready2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } ids = append(ids, idStr) if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) { break } } if len(ids) > 0 { q.batchCallback(ids) } // unack to retry err = q.unack2Retry() if err != nil { return err } err = q.garbageCollect() if err != nil { return err } // retry ids = make([]string, 0, q.fetchLimit) for { idStr, err := q.retry2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } ids = append(ids, idStr) if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) { break } } if len(ids) > 0 { q.batchCallback(ids) } return nil } // StartConsume creates a goroutine to consume message from DelayQueue // use `<-done` to wait consumer stopping func (q *DelayQueue) StartConsume() (done <-chan struct{}) { q.ticker = time.NewTicker(q.fetchInterval) done0 := make(chan struct{}) go func() { tickerLoop: for { select { case <-q.ticker.C: err := q.consume() if err != nil { log.Printf("consume error: %v", err) } case <-q.close: break tickerLoop } } close(done0) }() return done0 } // StopConsume stops consumer goroutine func (q *DelayQueue) StopConsume() { close(q.close) if q.ticker != nil { q.ticker.Stop() } }