mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-10-01 21:42:16 +08:00
use KEYS instead of ARGV in lua
This commit is contained in:
@@ -149,23 +149,24 @@ func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts .
|
|||||||
}
|
}
|
||||||
|
|
||||||
// pending2ReadyScript atomically moves messages from pending to ready
|
// pending2ReadyScript atomically moves messages from pending to ready
|
||||||
// argv: currentTime, pendingKey, readyKey
|
// keys: pendingKey, readyKey
|
||||||
|
// argv: currentTime
|
||||||
const pending2ReadyScript = `
|
const pending2ReadyScript = `
|
||||||
local msgs = redis.call('ZRangeByScore', ARGV[2], '0', ARGV[1]) -- get ready msg
|
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get ready msg
|
||||||
if (#msgs == 0) then return end
|
if (#msgs == 0) then return end
|
||||||
local args2 = {'LPush', ARGV[3]} -- push into ready
|
local args2 = {'LPush', KEYS[2]} -- push into ready
|
||||||
for _,v in ipairs(msgs) do
|
for _,v in ipairs(msgs) do
|
||||||
table.insert(args2, v)
|
table.insert(args2, v)
|
||||||
end
|
end
|
||||||
redis.call(unpack(args2))
|
redis.call(unpack(args2))
|
||||||
redis.call('ZRemRangeByScore', ARGV[2], '0', ARGV[1]) -- remove msgs from pending
|
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *DelayQueue) pending2Ready() error {
|
func (q *DelayQueue) pending2Ready() error {
|
||||||
now := time.Now().Unix()
|
now := time.Now().Unix()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
keys := []string{q.pendingKey, q.readyKey}
|
keys := []string{q.pendingKey, q.readyKey}
|
||||||
err := q.redisCli.Eval(ctx, pending2ReadyScript, keys, now, q.pendingKey, q.readyKey).Err()
|
err := q.redisCli.Eval(ctx, pending2ReadyScript, keys, now).Err()
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
return fmt.Errorf("pending2ReadyScript failed: %v", err)
|
return fmt.Errorf("pending2ReadyScript failed: %v", err)
|
||||||
}
|
}
|
||||||
@@ -173,11 +174,12 @@ func (q *DelayQueue) pending2Ready() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// pending2ReadyScript atomically moves messages from ready to unack
|
// pending2ReadyScript atomically moves messages from ready to unack
|
||||||
// argv: retryTime, readyKey/retryKey, unackKey
|
// keys: readyKey/retryKey, unackKey
|
||||||
|
// argv: retryTime
|
||||||
const ready2UnackScript = `
|
const ready2UnackScript = `
|
||||||
local msg = redis.call('RPop', ARGV[2])
|
local msg = redis.call('RPop', KEYS[1])
|
||||||
if (not msg) then return end
|
if (not msg) then return end
|
||||||
redis.call('ZAdd', ARGV[3], ARGV[1], msg)
|
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
|
||||||
return msg
|
return msg
|
||||||
`
|
`
|
||||||
|
|
||||||
@@ -185,7 +187,7 @@ func (q *DelayQueue) ready2Unack() (string, error) {
|
|||||||
retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
|
retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
keys := []string{q.readyKey, q.unAckKey}
|
keys := []string{q.readyKey, q.unAckKey}
|
||||||
ret, err := q.redisCli.Eval(ctx, ready2UnackScript, keys, retryTime, q.readyKey, q.unAckKey).Result()
|
ret, err := q.redisCli.Eval(ctx, ready2UnackScript, keys, retryTime).Result()
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -259,30 +261,30 @@ func (q *DelayQueue) nack(idStr string) error {
|
|||||||
// and moves messages from unack to garbage which retry count is 0
|
// and moves messages from unack to garbage which retry count is 0
|
||||||
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
|
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
|
||||||
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
|
// Therefore unack2RetryScript moves garbage message to garbageKey instead of deleting directly
|
||||||
// argv: currentTime, unackKey, retryCountKey, retryKey, garbageKey
|
// keys: unackKey, retryCountKey, retryKey, garbageKey
|
||||||
|
// argv: currentTime
|
||||||
const unack2RetryScript = `
|
const unack2RetryScript = `
|
||||||
local msgs = redis.call('ZRangeByScore', ARGV[2], '0', ARGV[1]) -- get retry msg
|
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- get retry msg
|
||||||
if (#msgs == 0) then return end
|
if (#msgs == 0) then return end
|
||||||
local retryCounts = redis.call('HMGet', ARGV[3], unpack(msgs)) -- get retry count
|
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
|
||||||
for i,v in ipairs(retryCounts) do
|
for i,v in ipairs(retryCounts) do
|
||||||
local k = msgs[i]
|
local k = msgs[i]
|
||||||
if tonumber(v) > 0 then
|
if tonumber(v) > 0 then
|
||||||
redis.call("HIncrBy", ARGV[3], k, -1) -- reduce retry count
|
redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
|
||||||
redis.call("LPush", ARGV[4], k) -- add to retry
|
redis.call("LPush", KEYS[3], k) -- add to retry
|
||||||
else
|
else
|
||||||
redis.call("HDel", ARGV[3], k) -- del retry count
|
redis.call("HDel", KEYS[2], k) -- del retry count
|
||||||
redis.call("SAdd", ARGV[5], k) -- add to garbage
|
redis.call("SAdd", KEYS[4], k) -- add to garbage
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
redis.call('ZRemRangeByScore', ARGV[2], '0', ARGV[1]) -- remove msgs from unack
|
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *DelayQueue) unack2Retry() error {
|
func (q *DelayQueue) unack2Retry() error {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
keys := []string{q.unAckKey, q.retryKey, q.retryCountKey, q.garbageKey}
|
keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
err := q.redisCli.Eval(ctx, unack2RetryScript, keys,
|
err := q.redisCli.Eval(ctx, unack2RetryScript, keys, now.Unix()).Err()
|
||||||
now.Unix(), q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey).Err()
|
|
||||||
if err != nil && err != redis.Nil {
|
if err != nil && err != redis.Nil {
|
||||||
return fmt.Errorf("unack to retry script failed: %v", err)
|
return fmt.Errorf("unack to retry script failed: %v", err)
|
||||||
}
|
}
|
||||||
|
@@ -68,6 +68,7 @@ func TestDelayQueue_StopConsume(t *testing.T) {
|
|||||||
received++
|
received++
|
||||||
if received == size {
|
if received == size {
|
||||||
queue.StopConsume()
|
queue.StopConsume()
|
||||||
|
t.Log("send stop signal")
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}).WithDefaultRetryCount(1)
|
}).WithDefaultRetryCount(1)
|
||||||
|
Reference in New Issue
Block a user