diff --git a/README.md b/README.md index be5d694..68b9355 100644 --- a/README.md +++ b/README.md @@ -70,13 +70,16 @@ func main() { ``` > if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@v8` + > If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface +> If you don't want to set the callback during initialization, you can use func `WithCallback`. + ## Producer consumer distributed deployment By default, delayqueue instances can be both producers and consumers. -If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you. +If your program only need producers and consumers are placed elsewhere, `delayqueue.NewPublisher` is a good option for you. ```go func consumer() { @@ -93,26 +96,42 @@ func producer() { ## Options ```go -WithLogger(logger *log.Logger) +func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue +``` + +WithCallback set callback for queue to receives and consumes messages +callback returns true to confirm successfully consumed, false to re-deliver this message. + +If there is no callback set, StartConsume will panic + +```go +queue := NewQueue("test", redisCli) +queue.WithCallback(func(payload string) bool { + return true +}) +``` + +```go +func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue ``` WithLogger customizes logger for queue ```go -WithConcurrent(c uint) +func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue ``` WithConcurrent sets the number of concurrent consumers ```go -WithFetchInterval(d time.Duration) +func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue ``` WithFetchInterval customizes the interval at which consumer fetch message from redis ```go -WithMaxConsumeDuration(d time.Duration) +func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue ``` WithMaxConsumeDuration customizes max consume duration @@ -121,7 +140,7 @@ If no acknowledge received within WithMaxConsumeDuration after message delivery, message again ```go -WithFetchLimit(limit uint) +func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue ``` WithFetchLimit limits the max number of unack (processing) messages @@ -139,7 +158,7 @@ WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to re > see more: https://redis.io/docs/reference/cluster-spec/#hash-tags ```go -WithDefaultRetryCount(count uint) +WithDefaultRetryCount(count uint) *DelayQueue ``` WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue diff --git a/README_CN.md b/README_CN.md index ab61386..a133df0 100644 --- a/README_CN.md +++ b/README_CN.md @@ -67,11 +67,14 @@ func main() { ``` > 如果您仍在使用 redis/v8 请使用 v8 分支: `go get github.com/hdt3213/delayqueue@v8` + > 如果您在使用其他的 redis 客户端, 可以将其包装到 [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) 接口中 +> 如果您不想在初始化时设置callback, 您可以使用 WithCallback 函数 + ## 分开部署生产者和消费者 -默认情况下 delayqueue 实例既可以做生产者也可以做消费者。如果某些程序只需要发送消息,消费者部署在其它程序中,那么可以使用 `delayqueue.NewProducer`. +默认情况下 delayqueue 实例既可以做生产者也可以做消费者。如果某些程序只需要发送消息,消费者部署在其它程序中,那么可以使用 `delayqueue.NewPublisher`. ```go func consumer() { @@ -88,32 +91,47 @@ func producer() { ## 选项 ```go -WithLogger(logger *log.Logger) +func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue +``` + +callback 函数负责接收并消费消息。callback 返回 true 确认已成功消费,返回 false 表示处理失败,需要重试。 + +如果没有设置 callback, 调用 StartConsume 时会 panic。 + +```go +queue := NewQueue("test", redisCli) +queue.WithCallback(func(payload string) bool { + return true +}) +``` + +```go +func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue ``` 为 DelayQueue 设置 logger ```go -WithConcurrent(c uint) +func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue ``` 设置消费者并发数 ```go -WithFetchInterval(d time.Duration) +func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue ``` 设置消费者从 Redis 拉取消息的时间间隔 ```go -WithMaxConsumeDuration(d time.Duration) +func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue ``` 设置最长消费时间。若拉取消息后超出 MaxConsumeDuration 时限仍未返回 ACK 则认为消费失败,DelayQueue 会重新投递此消息。 ```go -WithFetchLimit(limit uint) +func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue ``` FetchLimit 限制消费者从 Redis 中拉取的消息数目,即单个消费者正在处理中的消息数不会超过 FetchLimit diff --git a/delayqueue.go b/delayqueue.go index 94deac2..6d7f560 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -69,7 +69,10 @@ type RedisCli interface { } type hashTagKeyOpt int -type noCallbackOpt int + +// CallbackFunc receives and consumes messages +// returns true to confirm successfully consumed, false to re-deliver this message +type CallbackFunc = func(string) bool // UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. // If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue @@ -81,7 +84,7 @@ func UseHashTagKey() interface{} { // NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message // callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message -func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...interface{}) *DelayQueue { +func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue { if name == "" { panic("name is required") } @@ -89,18 +92,15 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in panic("cli is required") } useHashTag := false - noCallback := false + var callback CallbackFunc = nil for _, opt := range opts { - switch opt.(type) { + switch o := opt.(type) { case hashTagKeyOpt: useHashTag = true - case noCallbackOpt: - noCallback = true + case CallbackFunc: + callback = o } } - if !noCallback && callback == nil { - panic("callback is required") - } var keyPrefix string if useHashTag { keyPrefix = "{dp:" + name + "}" @@ -128,6 +128,13 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in } } +// WithCallback set callback for queue to receives and consumes messages +// callback returns true to confirm successfully consumed, false to re-deliver this message +func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue { + q.cb = callback + return q +} + // WithLogger customizes logger for queue func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue { q.logger = logger @@ -550,6 +557,7 @@ func (q *DelayQueue) consume() error { // StartConsume creates a goroutine to consume message from DelayQueue // use `<-done` to wait consumer stopping +// If there is no callback set, StartConsume will panic func (q *DelayQueue) StartConsume() (done <-chan struct{}) { if q.cb == nil { panic("this instance has no callback") diff --git a/delayqueue_test.go b/delayqueue_test.go index 4959e07..131c88e 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -24,7 +24,8 @@ func TestDelayQueue_consume(t *testing.T) { i, _ := strconv.ParseInt(s, 10, 64) return i%2 == 0 } - queue := NewQueue("test", redisCli, cb, UseHashTagKey()). + queue := NewQueue("test", redisCli, UseHashTagKey()). + WithCallback(cb). WithFetchInterval(time.Millisecond * 50). WithMaxConsumeDuration(0). WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)). diff --git a/monitor.go b/monitor.go index 01c5d3a..f9f7102 100644 --- a/monitor.go +++ b/monitor.go @@ -13,9 +13,8 @@ type Monitor struct { // NewMonitor0 creates a new Monitor by a RedisCli instance func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor { - opts = append(opts, noCallbackOpt(1)) return &Monitor{ - inner: NewQueue0(name, cli, nil, opts...), + inner: NewQueue0(name, cli, opts...), } } diff --git a/publisher.go b/publisher.go index 387a7f4..8b23e56 100644 --- a/publisher.go +++ b/publisher.go @@ -14,9 +14,8 @@ type Publisher struct { // NewPublisher0 creates a new Publisher by a RedisCli instance func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher { - opts = append(opts, noCallbackOpt(1)) return &Publisher{ - inner: NewQueue0(name, cli, nil, opts...), + inner: NewQueue0(name, cli, opts...), } } diff --git a/wrapper.go b/wrapper.go index d663eb6..fd42644 100644 --- a/wrapper.go +++ b/wrapper.go @@ -6,11 +6,20 @@ import ( "time" ) -func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts ...interface{}) *DelayQueue { + +// NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message +// +// queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool { +// // callback returns true to confirm successful consumption. +// // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message +// return true +// }) +// +func NewQueue(name string, cli *redis.Client, opts ...interface{}) *DelayQueue { rc := &redisV9Wrapper{ inner: cli, } - return NewQueue0(name, rc, callback, opts...) + return NewQueue0(name, rc, opts...) } func wrapErr(err error) error { @@ -226,10 +235,10 @@ func (r *redisClusterWrapper) Subscribe(channel string) (<-chan string, func(), return resultChan, close, nil } -func NewQueueOnCluster(name string, cli *redis.ClusterClient, callback func(string) bool, opts ...interface{}) *DelayQueue { +func NewQueueOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *DelayQueue { rc := &redisClusterWrapper{ inner: cli, } opts = append(opts, UseHashTagKey()) - return NewQueue0(name, rc, callback, opts...) + return NewQueue0(name, rc, opts...) }