From 2880c3239bec89a450650bec6daa19bf6e1ffd43 Mon Sep 17 00:00:00 2001 From: iTuring <1076862106@qq.com> Date: Fri, 22 Mar 2024 16:06:49 +0800 Subject: [PATCH] =?UTF-8?q?Fix:=E5=BC=82=E6=AD=A5=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E4=B8=8B=E6=B6=88=E8=B4=B9=E8=80=85=E6=97=A0=E6=B3=95=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- delayqueue.go | 2 +- delayqueue_test.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/delayqueue.go b/delayqueue.go index 6d7f560..59aecb1 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -255,7 +255,7 @@ for _,v in ipairs(msgs) do args2 = {} end end -if (#args2 > 2) then +if (#args2 > 0) then redis.call('LPush', KEYS[2], unpack(args2)) end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pending diff --git a/delayqueue_test.go b/delayqueue_test.go index 131c88e..1a6c94d 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -2,13 +2,14 @@ package delayqueue import ( "context" - "github.com/redis/go-redis/v9" "log" "os" "strconv" "sync" "testing" "time" + + "github.com/redis/go-redis/v9" ) func TestDelayQueue_consume(t *testing.T) { @@ -165,6 +166,33 @@ func TestDelayQueue_StopConsume(t *testing.T) { <-done } +func TestDelayQueue_AsyncConsume(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + redisCli.FlushDB(context.Background()) + queue := NewQueue("exampleAsync", redisCli, func(payload string) bool { + // callback returns true to confirm successful consumption. + // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message + println(payload) + return true + }).WithDefaultRetryCount(1) + + // send schedule message + go func() { + for { + time.Sleep(time.Second * 1) + err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*2)) + if err != nil { + panic(err) + } + } + }() + // start consume + done := queue.StartConsume() + <-done +} + func TestDelayQueue_Massive_Backlog(t *testing.T) { redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379",