mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-11-03 09:40:49 +08:00
@@ -30,8 +30,7 @@ func TestDelayQueue_consume(t *testing.T) {
|
||||
WithFetchInterval(time.Millisecond * 50).
|
||||
WithMaxConsumeDuration(0).
|
||||
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
|
||||
WithFetchLimit(2).
|
||||
WithNackRedeliveryDelay(time.Second)
|
||||
WithFetchLimit(2)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
|
||||
@@ -280,7 +279,6 @@ func TestDelayQueue_Massive_Backlog(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// consume should stopped after actual fetch count hits fetch limit
|
||||
func TestDelayQueue_FetchLimit(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
@@ -325,7 +323,7 @@ func TestDelayQueue_FetchLimit(t *testing.T) {
|
||||
if len(ids2) > 0 {
|
||||
t.Error("should get 0 message, after hitting fetch limit")
|
||||
}
|
||||
|
||||
|
||||
// consume
|
||||
for _, id := range ids1 {
|
||||
queue.callback(id)
|
||||
@@ -391,4 +389,59 @@ func TestDelayQueue_ScriptPreload(t *testing.T) {
|
||||
t.Errorf("expect 1 delivery, actual %d. key: %s", v, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayQueue_NackRedeliveryDelay(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
})
|
||||
redisCli.FlushDB(context.Background())
|
||||
cb := func(s string) bool {
|
||||
return false
|
||||
}
|
||||
redeliveryDelay := time.Second
|
||||
queue := NewQueue("test", redisCli, UseHashTagKey()).
|
||||
WithCallback(cb).
|
||||
WithFetchInterval(time.Millisecond * 50).
|
||||
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
|
||||
WithDefaultRetryCount(3).
|
||||
WithNackRedeliveryDelay(redeliveryDelay)
|
||||
|
||||
err := queue.SendScheduleMsg("foobar", time.Now().Add(-time.Minute))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// first consume, callback will failed
|
||||
ids, err := queue.beforeConsume()
|
||||
if err != nil {
|
||||
t.Errorf("consume error: %v", err)
|
||||
return
|
||||
}
|
||||
for _, id := range ids {
|
||||
queue.callback(id)
|
||||
}
|
||||
queue.afterConsume()
|
||||
|
||||
// retry immediately
|
||||
ids, err = queue.beforeConsume()
|
||||
if err != nil {
|
||||
t.Errorf("consume error: %v", err)
|
||||
return
|
||||
}
|
||||
if len(ids) != 0 {
|
||||
t.Errorf("should not redeliver immediately")
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(redeliveryDelay)
|
||||
queue.afterConsume()
|
||||
ids, err = queue.beforeConsume()
|
||||
if err != nil {
|
||||
t.Errorf("consume error: %v", err)
|
||||
return
|
||||
}
|
||||
if len(ids) != 1 {
|
||||
t.Errorf("should not redeliver immediately")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user