From a3020e2dd2380f46e65e504139ce3752ff24ee83 Mon Sep 17 00:00:00 2001 From: finley Date: Tue, 28 Jan 2025 19:34:54 +0800 Subject: [PATCH] bugfix: NackRedeliveryDelay may cause unexpected retry --- delayqueue.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/delayqueue.go b/delayqueue.go index a3ebd10..a453cd0 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -332,7 +332,7 @@ func (q *DelayQueue) TryIntercept(msg *MessageInfo) (*InterceptResult, error) { Intercepted: true, State: StateReady, }, nil - } + } // try to intercept at pending removed, err = q.redisCli.ZRem(q.pendingKey, []string{id}) if err != nil { @@ -349,7 +349,7 @@ func (q *DelayQueue) TryIntercept(msg *MessageInfo) (*InterceptResult, error) { // message may be being consumed or has been successfully consumed // if the message has been successfully consumed, the following action will cause nothing // if the message is being consumed,the following action will prevent it from being retried - q.redisCli.HDel(q.retryCountKey, []string{id}) + q.redisCli.HDel(q.retryCountKey, []string{id}) q.redisCli.LRem(q.retryKey, 0, id) return &InterceptResult{ @@ -512,11 +512,29 @@ func (q *DelayQueue) ack(idStr string) error { return nil } +// updateZSetScoreScript update score of a zset member if it exists +// KEYS[1]: zset +// ARGV[1]: score +// ARGV[2]: member +const updateZSetScoreScript = ` +if redis.call('zrank', KEYS[1], ARGV[2]) ~= nil then + return redis.call('zadd', KEYS[1], ARGV[1], ARGV[2]) +else + return 0 +end +` + +func (q *DelayQueue) updateZSetScore(key string, score float64, member string) error { + scoreStr := strconv.FormatFloat(score, 'f', -1, 64) + _, err := q.eval(updateZSetScoreScript, []string{key}, []interface{}{scoreStr, member}) + return err +} + func (q *DelayQueue) nack(idStr string) error { atomic.AddInt32(&q.fetchCount, -1) - err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{ - idStr: float64(time.Now().Add(q.nackRedeliveryDelay).Unix()), - }) + retryTime := float64(time.Now().Add(q.nackRedeliveryDelay).Unix()) + // if message consumption has not reach deadlin (still in unAckKey), then update its retry time + err := q.updateZSetScore(q.unAckKey, retryTime, idStr) if err != nil { return fmt.Errorf("negative ack failed: %v", err) }