mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-09-27 03:26:05 +08:00
Fix:异步模式下消费者无法消费的bug
This commit is contained in:
@@ -255,7 +255,7 @@ for _,v in ipairs(msgs) do
|
||||
args2 = {}
|
||||
end
|
||||
end
|
||||
if (#args2 > 2) then
|
||||
if (#args2 > 0) then
|
||||
redis.call('LPush', KEYS[2], unpack(args2))
|
||||
end
|
||||
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending
|
||||
|
@@ -2,13 +2,14 @@ package delayqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func TestDelayQueue_consume(t *testing.T) {
|
||||
@@ -165,6 +166,33 @@ func TestDelayQueue_StopConsume(t *testing.T) {
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestDelayQueue_AsyncConsume(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
})
|
||||
redisCli.FlushDB(context.Background())
|
||||
queue := NewQueue("exampleAsync", redisCli, func(payload string) bool {
|
||||
// callback returns true to confirm successful consumption.
|
||||
// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
|
||||
println(payload)
|
||||
return true
|
||||
}).WithDefaultRetryCount(1)
|
||||
|
||||
// send schedule message
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Second * 1)
|
||||
err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*2))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
// start consume
|
||||
done := queue.StartConsume()
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestDelayQueue_Massive_Backlog(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
|
Reference in New Issue
Block a user