add customizes msg ttl (#2)

add customizes msg ttl
This commit is contained in:
Yaya Liu
2022-09-22 10:40:45 +08:00
committed by GitHub
parent f93ae51a6d
commit 95a8a7139a
2 changed files with 12 additions and 2 deletions

View File

@@ -120,6 +120,14 @@ func WithRetryCount(count int) interface{} {
return retryCountOpt(count) return retryCountOpt(count)
} }
type msgTTLOpt time.Duration
// WithMsgTTL set ttl for a msg
// example: queue.SendDelayMsg(payload, duration, delayqueue.WithMsgTTL(Hour))
func WithMsgTTL(d time.Duration) interface{} {
return msgTTLOpt(d)
}
// SendScheduleMsg submits a message delivered at given time // SendScheduleMsg submits a message delivered at given time
func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error { func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
// parse options // parse options
@@ -128,6 +136,8 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf
switch o := opt.(type) { switch o := opt.(type) {
case retryCountOpt: case retryCountOpt:
retryCount = uint(o) retryCount = uint(o)
case msgTTLOpt:
q.msgTTL = time.Duration(o)
} }
} }
// generate id // generate id

View File

@@ -31,7 +31,7 @@ func TestDelayQueue_consume(t *testing.T) {
WithFetchLimit(1) WithFetchLimit(1)
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount)) err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@@ -79,7 +79,7 @@ func TestDelayQueue_ConcurrentConsume(t *testing.T) {
WithConcurrent(4) WithConcurrent(4)
for i := 0; i < size; i++ { for i := 0; i < size; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount)) err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount),WithMsgTTL(time.Hour))
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }