mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-09-27 19:42:08 +08:00
tolerate legacy incorrect data
This commit is contained in:
@@ -200,7 +200,7 @@ func (q *DelayQueue) pending2Ready() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// pending2ReadyScript atomically moves messages from ready to unack
|
// ready2UnackScript atomically moves messages from ready to unack
|
||||||
// keys: readyKey/retryKey, unackKey
|
// keys: readyKey/retryKey, unackKey
|
||||||
// argv: retryTime
|
// argv: retryTime
|
||||||
const ready2UnackScript = `
|
const ready2UnackScript = `
|
||||||
@@ -338,7 +338,7 @@ local unack2retry = function(msgs)
|
|||||||
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
|
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- get retry count
|
||||||
for i,v in ipairs(retryCounts) do
|
for i,v in ipairs(retryCounts) do
|
||||||
local k = msgs[i]
|
local k = msgs[i]
|
||||||
if tonumber(v) > 0 then
|
if v ~= nil and v ~= '' and tonumber(v) > 0 then
|
||||||
redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
|
redis.call("HIncrBy", KEYS[2], k, -1) -- reduce retry count
|
||||||
redis.call("LPush", KEYS[3], k) -- add to retry
|
redis.call("LPush", KEYS[3], k) -- add to retry
|
||||||
else
|
else
|
||||||
|
@@ -16,7 +16,7 @@ func TestDelayQueue_consume(t *testing.T) {
|
|||||||
Addr: "127.0.0.1:6379",
|
Addr: "127.0.0.1:6379",
|
||||||
})
|
})
|
||||||
redisCli.FlushDB(context.Background())
|
redisCli.FlushDB(context.Background())
|
||||||
size := 10000
|
size := 1000
|
||||||
retryCount := 3
|
retryCount := 3
|
||||||
deliveryCount := make(map[string]int)
|
deliveryCount := make(map[string]int)
|
||||||
cb := func(s string) bool {
|
cb := func(s string) bool {
|
||||||
@@ -31,7 +31,7 @@ func TestDelayQueue_consume(t *testing.T) {
|
|||||||
WithFetchLimit(2)
|
WithFetchLimit(2)
|
||||||
|
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour))
|
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@@ -79,7 +79,7 @@ func TestDelayQueue_ConcurrentConsume(t *testing.T) {
|
|||||||
WithConcurrent(4)
|
WithConcurrent(4)
|
||||||
|
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour))
|
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@@ -123,3 +123,71 @@ func TestDelayQueue_StopConsume(t *testing.T) {
|
|||||||
done := queue.StartConsume()
|
done := queue.StartConsume()
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDelayQueue_Massive_Consume(t *testing.T) {
|
||||||
|
redisCli := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "127.0.0.1:6379",
|
||||||
|
})
|
||||||
|
redisCli.FlushDB(context.Background())
|
||||||
|
size := 20000
|
||||||
|
retryCount := 3
|
||||||
|
cb := func(s string) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
q := NewQueue("test", redisCli, cb).
|
||||||
|
WithFetchInterval(time.Millisecond * 50).
|
||||||
|
WithMaxConsumeDuration(0).
|
||||||
|
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
|
||||||
|
WithFetchLimit(0)
|
||||||
|
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
err := q.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := q.pending2Ready()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// consume
|
||||||
|
ids := make([]string, 0, q.fetchLimit)
|
||||||
|
for {
|
||||||
|
idStr, err := q.ready2Unack()
|
||||||
|
if err == redis.Nil { // consumed all
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ids = append(ids, idStr)
|
||||||
|
if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = q.unack2Retry()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
unackCard, err := redisCli.ZCard(context.Background(), q.unAckKey).Result()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if unackCard != 0 {
|
||||||
|
t.Error("unack card should be 0")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
retryLen, err := redisCli.LLen(context.Background(), q.retryKey).Result()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if int(retryLen) != size {
|
||||||
|
t.Errorf("unack card should be %d", size)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user