From b4e9e62d253e7d15c5d67c0b1280d73ccb65c866 Mon Sep 17 00:00:00 2001 From: iTuring <1076862106@qq.com> Date: Tue, 26 Mar 2024 15:02:02 +0800 Subject: [PATCH] =?UTF-8?q?Fix:=E6=B5=8B=E8=AF=95=E7=94=A8=E4=BE=8B?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=BB=93=E6=9D=9F=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- delayqueue_test.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/delayqueue_test.go b/delayqueue_test.go index 1a6c94d..221049d 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -167,22 +167,28 @@ func TestDelayQueue_StopConsume(t *testing.T) { } func TestDelayQueue_AsyncConsume(t *testing.T) { + size := 10 redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) redisCli.FlushDB(context.Background()) - queue := NewQueue("exampleAsync", redisCli, func(payload string) bool { - // callback returns true to confirm successful consumption. - // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message + var queue *DelayQueue + var received int + queue = NewQueue("exampleAsync", redisCli, func(payload string) bool { println(payload) + received++ + if received == size { + queue.StopConsume() + t.Log("send stop signal") + } return true }).WithDefaultRetryCount(1) // send schedule message go func() { for { - time.Sleep(time.Second * 1) - err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*2)) + time.Sleep(time.Millisecond * 500) + err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*1)) if err != nil { panic(err) }