diff --git a/delayqueue_test.go b/delayqueue_test.go index 1a6c94d..221049d 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -167,22 +167,28 @@ func TestDelayQueue_StopConsume(t *testing.T) { } func TestDelayQueue_AsyncConsume(t *testing.T) { + size := 10 redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) redisCli.FlushDB(context.Background()) - queue := NewQueue("exampleAsync", redisCli, func(payload string) bool { - // callback returns true to confirm successful consumption. - // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message + var queue *DelayQueue + var received int + queue = NewQueue("exampleAsync", redisCli, func(payload string) bool { println(payload) + received++ + if received == size { + queue.StopConsume() + t.Log("send stop signal") + } return true }).WithDefaultRetryCount(1) // send schedule message go func() { for { - time.Sleep(time.Second * 1) - err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*2)) + time.Sleep(time.Millisecond * 500) + err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*1)) if err != nil { panic(err) }