diff --git a/delayqueue.go b/delayqueue.go index 90d257d..06b8a73 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -149,23 +149,24 @@ func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts . } // pending2ReadyScript atomically moves messages from pending to ready -// argv: currentTime, pendingKey, readyKey +// keys: pendingKey, readyKey +// argv: currentTime const pending2ReadyScript = ` -local msgs = redis.call('ZRangeByScore', ARGV[2], '0', ARGV[1]) -- get ready msg +local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get ready msg if (#msgs == 0) then return end -local args2 = {'LPush', ARGV[3]} -- push into ready +local args2 = {'LPush', KEYS[2]} -- push into ready for _,v in ipairs(msgs) do table.insert(args2, v) end redis.call(unpack(args2)) -redis.call('ZRemRangeByScore', ARGV[2], '0', ARGV[1]) -- remove msgs from pending +redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending ` func (q *DelayQueue) pending2Ready() error { now := time.Now().Unix() ctx := context.Background() keys := []string{q.pendingKey, q.readyKey} - err := q.redisCli.Eval(ctx, pending2ReadyScript, keys, now, q.pendingKey, q.readyKey).Err() + err := q.redisCli.Eval(ctx, pending2ReadyScript, keys, now).Err() if err != nil && err != redis.Nil { return fmt.Errorf("pending2ReadyScript failed: %v", err) } @@ -173,11 +174,12 @@ func (q *DelayQueue) pending2Ready() error { } // pending2ReadyScript atomically moves messages from ready to unack -// argv: retryTime, readyKey/retryKey, unackKey +// keys: readyKey/retryKey, unackKey +// argv: retryTime const ready2UnackScript = ` -local msg = redis.call('RPop', ARGV[2]) +local msg = redis.call('RPop', KEYS[1]) if (not msg) then return end -redis.call('ZAdd', ARGV[3], ARGV[1], msg) +redis.call('ZAdd', KEYS[2], ARGV[1], msg) return msg ` @@ -185,7 +187,7 @@ func (q *DelayQueue) ready2Unack() (string, error) { retryTime := time.Now().Add(q.maxConsumeDuration).Unix() ctx := context.Background() keys := []string{q.readyKey, q.unAckKey} - ret, err := q.redisCli.Eval(ctx, ready2UnackScript, keys, retryTime, q.readyKey, q.unAckKey).Result() + ret, err := q.redisCli.Eval(ctx, ready2UnackScript, keys, retryTime).Result() if err == redis.Nil { return "", err } @@ -259,30 +261,30 @@ func (q *DelayQueue) nack(idStr string) error { // and moves messages from unack to garbage which retry count is 0 // Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval // Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly -// argv: currentTime, unackKey, retryCountKey, retryKey, garbageKey +// keys: unackKey, retryCountKey, retryKey, garbageKey +// argv: currentTime const unack2RetryScript = ` -local msgs = redis.call('ZRangeByScore', ARGV[2], '0', ARGV[1]) -- get retry msg +local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get retry msg if (#msgs == 0) then return end -local retryCounts = redis.call('HMGet', ARGV[3], unpack(msgs)) -- get retry count +local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count for i,v in ipairs(retryCounts) do local k = msgs[i] if tonumber(v) > 0 then - redis.call("HIncrBy", ARGV[3], k, -1) -- reduce retry count - redis.call("LPush", ARGV[4], k) -- add to retry + redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count + redis.call("LPush", KEYS[3], k) -- add to retry else - redis.call("HDel", ARGV[3], k) -- del retry count - redis.call("SAdd", ARGV[5], k) -- add to garbage + redis.call("HDel", KEYS[2], k) -- del retry count + redis.call("SAdd", KEYS[4], k) -- add to garbage end end -redis.call('ZRemRangeByScore', ARGV[2], '0', ARGV[1]) -- remove msgs from unack +redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack ` func (q *DelayQueue) unack2Retry() error { ctx := context.Background() - keys := []string{q.unAckKey, q.retryKey, q.retryCountKey, q.garbageKey} + keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey} now := time.Now() - err := q.redisCli.Eval(ctx, unack2RetryScript, keys, - now.Unix(), q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey).Err() + err := q.redisCli.Eval(ctx, unack2RetryScript, keys, now.Unix()).Err() if err != nil && err != redis.Nil { return fmt.Errorf("unack to retry script failed: %v", err) } diff --git a/delayqueue_test.go b/delayqueue_test.go index 60d1425..a09b336 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -68,6 +68,7 @@ func TestDelayQueue_StopConsume(t *testing.T) { received++ if received == size { queue.StopConsume() + t.Log("send stop signal") } return true }).WithDefaultRetryCount(1)