diff --git a/README.md b/README.md index 59d649c..118cab2 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,8 @@ func producer() { ## Options +### Consume Function + ```go func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue ``` @@ -113,12 +115,21 @@ queue.WithCallback(func(payload string) bool { }) ``` +### Logger + ```go -func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue +func (q *DelayQueue)WithLogger(logger Logger) *DelayQueue ``` -WithLogger customizes logger for queue +WithLogger customizes logger for queue. Logger should implemented the following interface: +```go +type Logger interface { + Printf(format string, v ...interface{}) +} +``` + +### Concurrent ```go func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue @@ -126,12 +137,16 @@ func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue WithConcurrent sets the number of concurrent consumers +### Polling Interval + ```go func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue ``` WithFetchInterval customizes the interval at which consumer fetch message from redis +### Timeout + ```go func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue ``` @@ -141,12 +156,16 @@ WithMaxConsumeDuration customizes max consume duration If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again +### Max Processing Limit + ```go func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue ``` WithFetchLimit limits the max number of unack (processing) messages +### Hash Tag + ```go UseHashTagKey() ``` @@ -159,6 +178,8 @@ WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to re > see more: https://redis.io/docs/reference/cluster-spec/#hash-tags +### Default Retry Count + ```go WithDefaultRetryCount(count uint) *DelayQueue ``` @@ -167,6 +188,12 @@ WithDefaultRetryCount customizes the max number of retry, it effects of messages use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message +```go +queue.SendDelayMsg(msg, time.Hour, delayqueue.WithRetryCount(3)) +``` + +### Script Preload + ```go (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue ``` diff --git a/README_CN.md b/README_CN.md index dcaf255..bf32638 100644 --- a/README_CN.md +++ b/README_CN.md @@ -92,6 +92,8 @@ func producer() { ## 选项 +### 回调函数 + ```go func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue ``` @@ -107,12 +109,21 @@ queue.WithCallback(func(payload string) bool { }) ``` +### 日志 + ```go func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue ``` -为 DelayQueue 设置 logger +为 DelayQueue 设置 logger, logger 需要实现下面的接口: +```go +type Logger interface { + Printf(format string, v ...interface{}) +} +``` + +### 并发数 ```go func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue @@ -120,23 +131,31 @@ func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue 设置消费者并发数 +### 轮询间隔 + ```go func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue ``` 设置消费者从 Redis 拉取消息的时间间隔 +### 消费超时 + ```go func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue ``` 设置最长消费时间。若拉取消息后超出 MaxConsumeDuration 时限仍未返回 ACK 则认为消费失败,DelayQueue 会重新投递此消息。 +### 最大处理中消息数 + ```go func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue ``` -FetchLimit 限制消费者从 Redis 中拉取的消息数目,即单个消费者正在处理中的消息数不会超过 FetchLimit +单个消费者正在处理中的消息数不会超过 FetchLimit + +### 启用 HashTag ```go UseHashTagKey() @@ -150,6 +169,8 @@ UseHashTagKey() 会在 Redis Key 上添加 hash tag 确保同一个队列的所 see more: https://redis.io/docs/reference/cluster-spec/#hash-tags +### 设置默认重试次数 + ```go WithDefaultRetryCount(count uint) ``` @@ -158,6 +179,12 @@ WithDefaultRetryCount(count uint) 在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。 +```go +queue.SendDelayMsg(msg, time.Hour, delayqueue.WithRetryCount(3)) +``` + +### 预加载脚本 + ```go (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue ``` diff --git a/delayqueue.go b/delayqueue.go index f35e716..de25d90 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -27,7 +27,7 @@ type DelayQueue struct { garbageKey string // set: message id useHashTag bool ticker *time.Ticker - logger *log.Logger + logger Logger close chan struct{} running int32 maxConsumeDuration time.Duration // default 5 seconds @@ -82,6 +82,11 @@ type RedisCli interface { EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error) } +// Logger is an abstraction of logging system +type Logger interface { + Printf(format string, v ...interface{}) +} + type hashTagKeyOpt int // CallbackFunc receives and consumes messages @@ -153,7 +158,7 @@ func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue { } // WithLogger customizes logger for queue -func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue { +func (q *DelayQueue) WithLogger(logger Logger) *DelayQueue { q.logger = logger return q }