From 02aed2588259a2aa72941f809345efdff39267c8 Mon Sep 17 00:00:00 2001 From: hdt3213 Date: Fri, 30 Sep 2022 16:20:02 +0800 Subject: [PATCH] fix lua script error: too many results to unpack --- delayqueue.go | 44 ++++++++++++++++++++++++++++++++++---------- delayqueue_test.go | 4 ++-- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/delayqueue.go b/delayqueue.go index ab57a40..fc86238 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -178,8 +178,14 @@ if (#msgs == 0) then return end local args2 = {'LPush', KEYS[2]} -- push into ready for _,v in ipairs(msgs) do table.insert(args2, v) + if (#args2 == 4000) then + redis.call(unpack(args2)) + args2 = {'LPush', KEYS[2]} + end +end +if (#args2 > 2) then + redis.call(unpack(args2)) end -redis.call(unpack(args2)) redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending ` @@ -328,17 +334,35 @@ func (q *DelayQueue) nack(idStr string) error { // keys: unackKey, retryCountKey, retryKey, garbageKey // argv: currentTime const unack2RetryScript = ` +local unack2retry = function(msgs) + local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count + for i,v in ipairs(retryCounts) do + local k = msgs[i] + if tonumber(v) > 0 then + redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count + redis.call("LPush", KEYS[3], k) -- add to retry + else + redis.call("HDel", KEYS[2], k) -- del retry count + redis.call("SAdd", KEYS[4], k) -- add to garbage + end + end +end + local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get retry msg if (#msgs == 0) then return end -local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count -for i,v in ipairs(retryCounts) do - local k = msgs[i] - if tonumber(v) > 0 then - redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count - redis.call("LPush", KEYS[3], k) -- add to retry - else - redis.call("HDel", KEYS[2], k) -- del retry count - redis.call("SAdd", KEYS[4], k) -- add to garbage +if #msgs < 4000 then + unack2retry(msgs) +else + local buf = {} + for _,v in ipairs(msgs) do + table.insert(buf, v) + if #buf == 4000 then + unack2retry(buf) + buf = {} + end + end + if (#buf > 0) then + unack2retry(buf) end end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack diff --git a/delayqueue_test.go b/delayqueue_test.go index 624d5a1..4036a23 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -16,7 +16,7 @@ func TestDelayQueue_consume(t *testing.T) { Addr: "127.0.0.1:6379", }) redisCli.FlushDB(context.Background()) - size := 10 + size := 10000 retryCount := 3 deliveryCount := make(map[string]int) cb := func(s string) bool { @@ -28,7 +28,7 @@ func TestDelayQueue_consume(t *testing.T) { WithFetchInterval(time.Millisecond * 50). WithMaxConsumeDuration(0). WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)). - WithFetchLimit(1) + WithFetchLimit(2) for i := 0; i < size; i++ { err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour))