From 95a8a7139a3662b45ec2c0e939ee25fd8a3d33b2 Mon Sep 17 00:00:00 2001 From: Yaya Liu Date: Thu, 22 Sep 2022 10:40:45 +0800 Subject: [PATCH] add customizes msg ttl (#2) add customizes msg ttl --- delayqueue.go | 10 ++++++++++ delayqueue_test.go | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/delayqueue.go b/delayqueue.go index b62c932..ab57a40 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -120,6 +120,14 @@ func WithRetryCount(count int) interface{} { return retryCountOpt(count) } +type msgTTLOpt time.Duration + +// WithMsgTTL set ttl for a msg +// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour)) +func WithMsgTTL(d time.Duration) interface{} { + return msgTTLOpt(d) +} + // SendScheduleMsg submits a message delivered at given time func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error { // parse options @@ -128,6 +136,8 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf switch o := opt.(type) { case retryCountOpt: retryCount = uint(o) + case msgTTLOpt: + q.msgTTL = time.Duration(o) } } // generate id diff --git a/delayqueue_test.go b/delayqueue_test.go index 593916c..624d5a1 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -31,7 +31,7 @@ func TestDelayQueue_consume(t *testing.T) { WithFetchLimit(1) for i := 0; i < size; i++ { - err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount)) + err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour)) if err != nil { t.Error(err) } @@ -79,7 +79,7 @@ func TestDelayQueue_ConcurrentConsume(t *testing.T) { WithConcurrent(4) for i := 0; i < size; i++ { - err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount)) + err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour)) if err != nil { t.Error(err) }