diff --git a/delayqueue.go b/delayqueue.go index fc86238..5ab9e4d 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -200,7 +200,7 @@ func (q *DelayQueue) pending2Ready() error { return nil } -// pending2ReadyScript atomically moves messages from ready to unack +// ready2UnackScript atomically moves messages from ready to unack // keys: readyKey/retryKey, unackKey // argv: retryTime const ready2UnackScript = ` @@ -338,7 +338,7 @@ local unack2retry = function(msgs) local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count for i,v in ipairs(retryCounts) do local k = msgs[i] - if tonumber(v) > 0 then + if v ~= nil and v ~= '' and tonumber(v) > 0 then redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count redis.call("LPush", KEYS[3], k) -- add to retry else diff --git a/delayqueue_test.go b/delayqueue_test.go index 4036a23..9d86f08 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -16,7 +16,7 @@ func TestDelayQueue_consume(t *testing.T) { Addr: "127.0.0.1:6379", }) redisCli.FlushDB(context.Background()) - size := 10000 + size := 1000 retryCount := 3 deliveryCount := make(map[string]int) cb := func(s string) bool { @@ -31,7 +31,7 @@ func TestDelayQueue_consume(t *testing.T) { WithFetchLimit(2) for i := 0; i < size; i++ { - err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour)) + err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour)) if err != nil { t.Error(err) } @@ -79,7 +79,7 @@ func TestDelayQueue_ConcurrentConsume(t *testing.T) { WithConcurrent(4) for i := 0; i < size; i++ { - err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour)) + err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour)) if err != nil { t.Error(err) } @@ -123,3 +123,71 @@ func TestDelayQueue_StopConsume(t *testing.T) { done := queue.StartConsume() <-done } + +func TestDelayQueue_Massive_Consume(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + redisCli.FlushDB(context.Background()) + size := 20000 + retryCount := 3 + cb := func(s string) bool { + return false + } + q := NewQueue("test", redisCli, cb). + WithFetchInterval(time.Millisecond * 50). + WithMaxConsumeDuration(0). + WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)). + WithFetchLimit(0) + + for i := 0; i < size; i++ { + err := q.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount)) + if err != nil { + t.Error(err) + } + } + err := q.pending2Ready() + if err != nil { + t.Error(err) + return + } + // consume + ids := make([]string, 0, q.fetchLimit) + for { + idStr, err := q.ready2Unack() + if err == redis.Nil { // consumed all + break + } + if err != nil { + t.Error(err) + return + } + ids = append(ids, idStr) + if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) { + break + } + } + err = q.unack2Retry() + if err != nil { + t.Error(err) + return + } + unackCard, err := redisCli.ZCard(context.Background(), q.unAckKey).Result() + if err != nil { + t.Error(err) + return + } + if unackCard != 0 { + t.Error("unack card should be 0") + return + } + retryLen, err := redisCli.LLen(context.Background(), q.retryKey).Result() + if err != nil { + t.Error(err) + return + } + if int(retryLen) != size { + t.Errorf("unack card should be %d", size) + return + } +}