mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-09-27 03:26:05 +08:00
add WithCallback
This commit is contained in:
33
README.md
33
README.md
@@ -70,13 +70,16 @@ func main() {
|
||||
```
|
||||
|
||||
> if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@v8`
|
||||
|
||||
> If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface
|
||||
|
||||
> If you don't want to set the callback during initialization, you can use func `WithCallback`.
|
||||
|
||||
## Producer consumer distributed deployment
|
||||
|
||||
By default, delayqueue instances can be both producers and consumers.
|
||||
|
||||
If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you.
|
||||
If your program only need producers and consumers are placed elsewhere, `delayqueue.NewPublisher` is a good option for you.
|
||||
|
||||
```go
|
||||
func consumer() {
|
||||
@@ -93,26 +96,42 @@ func producer() {
|
||||
## Options
|
||||
|
||||
```go
|
||||
WithLogger(logger *log.Logger)
|
||||
func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue
|
||||
```
|
||||
|
||||
WithCallback set callback for queue to receives and consumes messages
|
||||
callback returns true to confirm successfully consumed, false to re-deliver this message.
|
||||
|
||||
If there is no callback set, StartConsume will panic
|
||||
|
||||
```go
|
||||
queue := NewQueue("test", redisCli)
|
||||
queue.WithCallback(func(payload string) bool {
|
||||
return true
|
||||
})
|
||||
```
|
||||
|
||||
```go
|
||||
func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue
|
||||
```
|
||||
|
||||
WithLogger customizes logger for queue
|
||||
|
||||
|
||||
```go
|
||||
WithConcurrent(c uint)
|
||||
func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue
|
||||
```
|
||||
|
||||
WithConcurrent sets the number of concurrent consumers
|
||||
|
||||
```go
|
||||
WithFetchInterval(d time.Duration)
|
||||
func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue
|
||||
```
|
||||
|
||||
WithFetchInterval customizes the interval at which consumer fetch message from redis
|
||||
|
||||
```go
|
||||
WithMaxConsumeDuration(d time.Duration)
|
||||
func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue
|
||||
```
|
||||
|
||||
WithMaxConsumeDuration customizes max consume duration
|
||||
@@ -121,7 +140,7 @@ If no acknowledge received within WithMaxConsumeDuration after message delivery,
|
||||
message again
|
||||
|
||||
```go
|
||||
WithFetchLimit(limit uint)
|
||||
func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue
|
||||
```
|
||||
|
||||
WithFetchLimit limits the max number of unack (processing) messages
|
||||
@@ -139,7 +158,7 @@ WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to re
|
||||
> see more: https://redis.io/docs/reference/cluster-spec/#hash-tags
|
||||
|
||||
```go
|
||||
WithDefaultRetryCount(count uint)
|
||||
WithDefaultRetryCount(count uint) *DelayQueue
|
||||
```
|
||||
|
||||
WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
|
||||
|
30
README_CN.md
30
README_CN.md
@@ -67,11 +67,14 @@ func main() {
|
||||
```
|
||||
|
||||
> 如果您仍在使用 redis/v8 请使用 v8 分支: `go get github.com/hdt3213/delayqueue@v8`
|
||||
|
||||
> 如果您在使用其他的 redis 客户端, 可以将其包装到 [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) 接口中
|
||||
|
||||
> 如果您不想在初始化时设置callback, 您可以使用 WithCallback 函数
|
||||
|
||||
## 分开部署生产者和消费者
|
||||
|
||||
默认情况下 delayqueue 实例既可以做生产者也可以做消费者。如果某些程序只需要发送消息,消费者部署在其它程序中,那么可以使用 `delayqueue.NewProducer`.
|
||||
默认情况下 delayqueue 实例既可以做生产者也可以做消费者。如果某些程序只需要发送消息,消费者部署在其它程序中,那么可以使用 `delayqueue.NewPublisher`.
|
||||
|
||||
```go
|
||||
func consumer() {
|
||||
@@ -88,32 +91,47 @@ func producer() {
|
||||
## 选项
|
||||
|
||||
```go
|
||||
WithLogger(logger *log.Logger)
|
||||
func (q *DelayQueue)WithCallback(callback CallbackFunc) *DelayQueue
|
||||
```
|
||||
|
||||
callback 函数负责接收并消费消息。callback 返回 true 确认已成功消费,返回 false 表示处理失败,需要重试。
|
||||
|
||||
如果没有设置 callback, 调用 StartConsume 时会 panic。
|
||||
|
||||
```go
|
||||
queue := NewQueue("test", redisCli)
|
||||
queue.WithCallback(func(payload string) bool {
|
||||
return true
|
||||
})
|
||||
```
|
||||
|
||||
```go
|
||||
func (q *DelayQueue)WithLogger(logger *log.Logger) *DelayQueue
|
||||
```
|
||||
|
||||
为 DelayQueue 设置 logger
|
||||
|
||||
|
||||
```go
|
||||
WithConcurrent(c uint)
|
||||
func (q *DelayQueue)WithConcurrent(c uint) *DelayQueue
|
||||
```
|
||||
|
||||
设置消费者并发数
|
||||
|
||||
```go
|
||||
WithFetchInterval(d time.Duration)
|
||||
func (q *DelayQueue)WithFetchInterval(d time.Duration) *DelayQueue
|
||||
```
|
||||
|
||||
设置消费者从 Redis 拉取消息的时间间隔
|
||||
|
||||
```go
|
||||
WithMaxConsumeDuration(d time.Duration)
|
||||
func (q *DelayQueue)WithMaxConsumeDuration(d time.Duration) *DelayQueue
|
||||
```
|
||||
|
||||
设置最长消费时间。若拉取消息后超出 MaxConsumeDuration 时限仍未返回 ACK 则认为消费失败,DelayQueue 会重新投递此消息。
|
||||
|
||||
```go
|
||||
WithFetchLimit(limit uint)
|
||||
func (q *DelayQueue)WithFetchLimit(limit uint) *DelayQueue
|
||||
```
|
||||
|
||||
FetchLimit 限制消费者从 Redis 中拉取的消息数目,即单个消费者正在处理中的消息数不会超过 FetchLimit
|
||||
|
@@ -69,7 +69,10 @@ type RedisCli interface {
|
||||
}
|
||||
|
||||
type hashTagKeyOpt int
|
||||
type noCallbackOpt int
|
||||
|
||||
// CallbackFunc receives and consumes messages
|
||||
// returns true to confirm successfully consumed, false to re-deliver this message
|
||||
type CallbackFunc = func(string) bool
|
||||
|
||||
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
|
||||
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
|
||||
@@ -81,7 +84,7 @@ func UseHashTagKey() interface{} {
|
||||
|
||||
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
|
||||
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
|
||||
func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...interface{}) *DelayQueue {
|
||||
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
|
||||
if name == "" {
|
||||
panic("name is required")
|
||||
}
|
||||
@@ -89,18 +92,15 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in
|
||||
panic("cli is required")
|
||||
}
|
||||
useHashTag := false
|
||||
noCallback := false
|
||||
var callback CallbackFunc = nil
|
||||
for _, opt := range opts {
|
||||
switch opt.(type) {
|
||||
switch o := opt.(type) {
|
||||
case hashTagKeyOpt:
|
||||
useHashTag = true
|
||||
case noCallbackOpt:
|
||||
noCallback = true
|
||||
case CallbackFunc:
|
||||
callback = o
|
||||
}
|
||||
}
|
||||
if !noCallback && callback == nil {
|
||||
panic("callback is required")
|
||||
}
|
||||
var keyPrefix string
|
||||
if useHashTag {
|
||||
keyPrefix = "{dp:" + name + "}"
|
||||
@@ -128,6 +128,13 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in
|
||||
}
|
||||
}
|
||||
|
||||
// WithCallback set callback for queue to receives and consumes messages
|
||||
// callback returns true to confirm successfully consumed, false to re-deliver this message
|
||||
func (q *DelayQueue) WithCallback(callback CallbackFunc) *DelayQueue {
|
||||
q.cb = callback
|
||||
return q
|
||||
}
|
||||
|
||||
// WithLogger customizes logger for queue
|
||||
func (q *DelayQueue) WithLogger(logger *log.Logger) *DelayQueue {
|
||||
q.logger = logger
|
||||
@@ -550,6 +557,7 @@ func (q *DelayQueue) consume() error {
|
||||
|
||||
// StartConsume creates a goroutine to consume message from DelayQueue
|
||||
// use `<-done` to wait consumer stopping
|
||||
// If there is no callback set, StartConsume will panic
|
||||
func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
|
||||
if q.cb == nil {
|
||||
panic("this instance has no callback")
|
||||
|
@@ -24,7 +24,8 @@ func TestDelayQueue_consume(t *testing.T) {
|
||||
i, _ := strconv.ParseInt(s, 10, 64)
|
||||
return i%2 == 0
|
||||
}
|
||||
queue := NewQueue("test", redisCli, cb, UseHashTagKey()).
|
||||
queue := NewQueue("test", redisCli, UseHashTagKey()).
|
||||
WithCallback(cb).
|
||||
WithFetchInterval(time.Millisecond * 50).
|
||||
WithMaxConsumeDuration(0).
|
||||
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
|
||||
|
@@ -13,9 +13,8 @@ type Monitor struct {
|
||||
|
||||
// NewMonitor0 creates a new Monitor by a RedisCli instance
|
||||
func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor {
|
||||
opts = append(opts, noCallbackOpt(1))
|
||||
return &Monitor{
|
||||
inner: NewQueue0(name, cli, nil, opts...),
|
||||
inner: NewQueue0(name, cli, opts...),
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -14,9 +14,8 @@ type Publisher struct {
|
||||
|
||||
// NewPublisher0 creates a new Publisher by a RedisCli instance
|
||||
func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher {
|
||||
opts = append(opts, noCallbackOpt(1))
|
||||
return &Publisher{
|
||||
inner: NewQueue0(name, cli, nil, opts...),
|
||||
inner: NewQueue0(name, cli, opts...),
|
||||
}
|
||||
}
|
||||
|
||||
|
17
wrapper.go
17
wrapper.go
@@ -6,11 +6,20 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts ...interface{}) *DelayQueue {
|
||||
|
||||
// NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
|
||||
//
|
||||
// queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
|
||||
// // callback returns true to confirm successful consumption.
|
||||
// // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
|
||||
// return true
|
||||
// })
|
||||
//
|
||||
func NewQueue(name string, cli *redis.Client, opts ...interface{}) *DelayQueue {
|
||||
rc := &redisV9Wrapper{
|
||||
inner: cli,
|
||||
}
|
||||
return NewQueue0(name, rc, callback, opts...)
|
||||
return NewQueue0(name, rc, opts...)
|
||||
}
|
||||
|
||||
func wrapErr(err error) error {
|
||||
@@ -226,10 +235,10 @@ func (r *redisClusterWrapper) Subscribe(channel string) (<-chan string, func(),
|
||||
return resultChan, close, nil
|
||||
}
|
||||
|
||||
func NewQueueOnCluster(name string, cli *redis.ClusterClient, callback func(string) bool, opts ...interface{}) *DelayQueue {
|
||||
func NewQueueOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *DelayQueue {
|
||||
rc := &redisClusterWrapper{
|
||||
inner: cli,
|
||||
}
|
||||
opts = append(opts, UseHashTagKey())
|
||||
return NewQueue0(name, rc, callback, opts...)
|
||||
return NewQueue0(name, rc, opts...)
|
||||
}
|
||||
|
Reference in New Issue
Block a user