diff --git a/README.md b/README.md index 15cf36e..288e997 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,22 @@ func main() { > if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@v8` > If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface +## Producer consumer distributed deployment + +By default, delayqueue instances can be both producers and consumers. If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you. + +```go +func consumer() { + queue := NewQueue("test", redisCli, cb) + queue.StartConsume() +} + +func producer() { + publisher := NewPublisher("test", redisCli) + publisher.SendDelayMsg(strconv.Itoa(i), 0) +} +``` + ## Options ```go diff --git a/README_CN.md b/README_CN.md index 0bf9f6a..294d061 100644 --- a/README_CN.md +++ b/README_CN.md @@ -14,7 +14,7 @@ DelayQueue 的主要优势: - 原生适配分布式环境, 可在多台机器上并发的处理消息. 可以随时增加、减少或迁移 Worker - 支持各类 Redis 集群 -# 安装 +## 安装 在启用了 go mod 的项目中运行下列命令即可完成安装: @@ -24,7 +24,7 @@ go get github.com/hdt3213/delayqueue > 如果您仍在使用 `github.com/go-redis/redis/v8` 请安装 `go get github.com/hdt3213/delayqueue@v8` -# 开始使用 +## 开始使用 ```go package main @@ -68,7 +68,23 @@ func main() { > 如果您仍在使用 redis/v8 请使用 v8 分支: `go get github.com/hdt3213/delayqueue@v8` > 如果您在使用其他的 redis 客户端, 可以将其包装到 [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) 接口中 -# 选项 +## 分开部署生产者和消费者 + +默认情况下 delayqueue 实例既可以做生产者也可以做消费者。如果某些程序只需要发送消息,消费者部署在其它程序中,那么可以使用 `delayqueue.NewProducer`. + +```go +func consumer() { + queue := NewQueue("test", redisCli, cb) + queue.StartConsume() +} + +func producer() { + publisher := NewPublisher("test", redisCli) + publisher.SendDelayMsg(strconv.Itoa(i), 0) +} +``` + +## 选项 ```go WithLogger(logger *log.Logger) @@ -121,7 +137,7 @@ WithDefaultRetryCount(count uint) 在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。 -# 集群 +## 集群 如果需要在 Redis Cluster 上工作, 请使用 `NewQueueOnCluster`: @@ -151,7 +167,7 @@ callback := func(s string) bool { queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey()) ``` -# 更多细节 +## 更多细节 完整流程如图所示: diff --git a/delayqueue.go b/delayqueue.go index 8e81478..44e157c 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -53,6 +53,7 @@ type RedisCli interface { } type hashTagKeyOpt int +type noCallbackOpt int // UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. // If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue @@ -71,16 +72,19 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in if cli == nil { panic("cli is required") } - if callback == nil { - panic("callback is required") - } useHashTag := false + noCallback := false for _, opt := range opts { switch opt.(type) { case hashTagKeyOpt: useHashTag = true + case noCallbackOpt: + noCallback = true } } + if !noCallback && callback == nil { + panic("callback is required") + } var keyPrefix string if useHashTag { keyPrefix = "{dp:" + name + "}" diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..f9bb8aa --- /dev/null +++ b/publisher.go @@ -0,0 +1,45 @@ +package delayqueue + +import ( + "log" + "time" + + "github.com/redis/go-redis/v9" +) + +// Publisher only publishes messages to delayqueue, it is a encapsulation of delayqueue +type Publisher struct { + inner *DelayQueue +} + +// NewPublisher0 creates a new Publisher by a RedisCli instance +func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher { + opts = append(opts, noCallbackOpt(1)) + return &Publisher{ + inner: NewQueue0(name, cli, nil, opts...), + } +} + +// NewPublisher creates a new Publisher by a *redis.Client +func NewPublisher(name string, cli *redis.Client, opts ...interface{}) *Publisher { + rc := &redisV9Wrapper{ + inner: cli, + } + return NewPublisher0(name, rc, opts...) +} + +// WithLogger customizes logger for queue +func (p *Publisher) WithLogger(logger *log.Logger) *Publisher { + p.inner.logger = logger + return p +} + +// SendScheduleMsg submits a message delivered at given time +func (p *Publisher) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error { + return p.inner.SendScheduleMsg(payload, t, opts...) +} + +// SendDelayMsg submits a message delivered after given duration +func (p *Publisher) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error { + return p.inner.SendDelayMsg(payload, duration, opts...) +} \ No newline at end of file diff --git a/publisher_test.go b/publisher_test.go new file mode 100644 index 0000000..27432a8 --- /dev/null +++ b/publisher_test.go @@ -0,0 +1,56 @@ +package delayqueue + +import ( + "context" + "log" + "os" + "strconv" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +func TestPublisher(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + redisCli.FlushDB(context.Background()) + size := 1000 + retryCount := 3 + deliveryCount := make(map[string]int) + cb := func(s string) bool { + deliveryCount[s]++ + i, _ := strconv.ParseInt(s, 10, 64) + return i%2 == 0 + } + logger := log.New(os.Stderr, "[DelayQueue]", log.LstdFlags) + queue := NewQueue("test", redisCli, cb).WithLogger(logger) + publisher := NewPublisher("test", redisCli).WithLogger(logger) + + for i := 0; i < size; i++ { + err := publisher.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour)) + if err != nil { + t.Error(err) + } + } + for i := 0; i < 10*size; i++ { + err := queue.consume() + if err != nil { + t.Errorf("consume error: %v", err) + return + } + } + for k, v := range deliveryCount { + i, _ := strconv.ParseInt(k, 10, 64) + if i%2 == 0 { + if v != 1 { + t.Errorf("expect 1 delivery, actual %d", v) + } + } else { + if v != retryCount+1 { + t.Errorf("expect %d delivery, actual %d", retryCount+1, v) + } + } + } +} \ No newline at end of file