diff --git a/delayqueue.go b/delayqueue.go index 1b6d81d..dc35dbb 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -764,13 +764,19 @@ func (q *DelayQueue) StartConsume() (done <-chan struct{}) { case <-q.ticker.C: ids, err := q.beforeConsume() if err != nil { - q.logger.Printf("consume error: %v", err) + q.logger.Printf("before consume error: %v", err) } q.goWithRecover(func() { for _, id := range ids { q.consumeBuffer <- id } }) + // Always do unack2Retry and garbageCollect even there is no new messages + // https://github.com/HDT3213/delayqueue/issues/21 + err = q.afterConsume() + if err != nil { + q.logger.Printf("after consume error: %v", err) + } case <-q.close: break tickerLoop }