mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-12-24 10:30:52 +08:00
fix lua script error: too many results to unpack
This commit is contained in:
@@ -178,8 +178,14 @@ if (#msgs == 0) then return end
|
||||
local args2 = {'LPush', KEYS[2]} -- push into ready
|
||||
for _,v in ipairs(msgs) do
|
||||
table.insert(args2, v)
|
||||
if (#args2 == 4000) then
|
||||
redis.call(unpack(args2))
|
||||
args2 = {'LPush', KEYS[2]}
|
||||
end
|
||||
end
|
||||
if (#args2 > 2) then
|
||||
redis.call(unpack(args2))
|
||||
end
|
||||
redis.call(unpack(args2))
|
||||
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending
|
||||
`
|
||||
|
||||
@@ -328,17 +334,35 @@ func (q *DelayQueue) nack(idStr string) error {
|
||||
// keys: unackKey, retryCountKey, retryKey, garbageKey
|
||||
// argv: currentTime
|
||||
const unack2RetryScript = `
|
||||
local unack2retry = function(msgs)
|
||||
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
|
||||
for i,v in ipairs(retryCounts) do
|
||||
local k = msgs[i]
|
||||
if tonumber(v) > 0 then
|
||||
redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
|
||||
redis.call("LPush", KEYS[3], k) -- add to retry
|
||||
else
|
||||
redis.call("HDel", KEYS[2], k) -- del retry count
|
||||
redis.call("SAdd", KEYS[4], k) -- add to garbage
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get retry msg
|
||||
if (#msgs == 0) then return end
|
||||
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
|
||||
for i,v in ipairs(retryCounts) do
|
||||
local k = msgs[i]
|
||||
if tonumber(v) > 0 then
|
||||
redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
|
||||
redis.call("LPush", KEYS[3], k) -- add to retry
|
||||
else
|
||||
redis.call("HDel", KEYS[2], k) -- del retry count
|
||||
redis.call("SAdd", KEYS[4], k) -- add to garbage
|
||||
if #msgs < 4000 then
|
||||
unack2retry(msgs)
|
||||
else
|
||||
local buf = {}
|
||||
for _,v in ipairs(msgs) do
|
||||
table.insert(buf, v)
|
||||
if #buf == 4000 then
|
||||
unack2retry(buf)
|
||||
buf = {}
|
||||
end
|
||||
end
|
||||
if (#buf > 0) then
|
||||
unack2retry(buf)
|
||||
end
|
||||
end
|
||||
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack
|
||||
|
||||
@@ -16,7 +16,7 @@ func TestDelayQueue_consume(t *testing.T) {
|
||||
Addr: "127.0.0.1:6379",
|
||||
})
|
||||
redisCli.FlushDB(context.Background())
|
||||
size := 10
|
||||
size := 10000
|
||||
retryCount := 3
|
||||
deliveryCount := make(map[string]int)
|
||||
cb := func(s string) bool {
|
||||
@@ -28,7 +28,7 @@ func TestDelayQueue_consume(t *testing.T) {
|
||||
WithFetchInterval(time.Millisecond * 50).
|
||||
WithMaxConsumeDuration(0).
|
||||
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
|
||||
WithFetchLimit(1)
|
||||
WithFetchLimit(2)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour))
|
||||
|
||||
Reference in New Issue
Block a user