mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-10-05 07:06:53 +08:00
add separate producer
This commit is contained in:
16
README.md
16
README.md
@@ -71,6 +71,22 @@ func main() {
|
|||||||
> if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@v8`
|
> if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@v8`
|
||||||
> If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface
|
> If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface
|
||||||
|
|
||||||
|
## Producer consumer distributed deployment
|
||||||
|
|
||||||
|
By default, delayqueue instances can be both producers and consumers. If your program only need producers and consumers are placed elsewhere, `delayqueue.NewProducer` is a good option for you.
|
||||||
|
|
||||||
|
```go
|
||||||
|
func consumer() {
|
||||||
|
queue := NewQueue("test", redisCli, cb)
|
||||||
|
queue.StartConsume()
|
||||||
|
}
|
||||||
|
|
||||||
|
func producer() {
|
||||||
|
publisher := NewPublisher("test", redisCli)
|
||||||
|
publisher.SendDelayMsg(strconv.Itoa(i), 0)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
## Options
|
## Options
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
26
README_CN.md
26
README_CN.md
@@ -14,7 +14,7 @@ DelayQueue 的主要优势:
|
|||||||
- 原生适配分布式环境, 可在多台机器上并发的处理消息. 可以随时增加、减少或迁移 Worker
|
- 原生适配分布式环境, 可在多台机器上并发的处理消息. 可以随时增加、减少或迁移 Worker
|
||||||
- 支持各类 Redis 集群
|
- 支持各类 Redis 集群
|
||||||
|
|
||||||
# 安装
|
## 安装
|
||||||
|
|
||||||
在启用了 go mod 的项目中运行下列命令即可完成安装:
|
在启用了 go mod 的项目中运行下列命令即可完成安装:
|
||||||
|
|
||||||
@@ -24,7 +24,7 @@ go get github.com/hdt3213/delayqueue
|
|||||||
|
|
||||||
> 如果您仍在使用 `github.com/go-redis/redis/v8` 请安装 `go get github.com/hdt3213/delayqueue@v8`
|
> 如果您仍在使用 `github.com/go-redis/redis/v8` 请安装 `go get github.com/hdt3213/delayqueue@v8`
|
||||||
|
|
||||||
# 开始使用
|
## 开始使用
|
||||||
|
|
||||||
```go
|
```go
|
||||||
package main
|
package main
|
||||||
@@ -68,7 +68,23 @@ func main() {
|
|||||||
> 如果您仍在使用 redis/v8 请使用 v8 分支: `go get github.com/hdt3213/delayqueue@v8`
|
> 如果您仍在使用 redis/v8 请使用 v8 分支: `go get github.com/hdt3213/delayqueue@v8`
|
||||||
> 如果您在使用其他的 redis 客户端, 可以将其包装到 [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) 接口中
|
> 如果您在使用其他的 redis 客户端, 可以将其包装到 [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) 接口中
|
||||||
|
|
||||||
# 选项
|
## 分开部署生产者和消费者
|
||||||
|
|
||||||
|
默认情况下 delayqueue 实例既可以做生产者也可以做消费者。如果某些程序只需要发送消息,消费者部署在其它程序中,那么可以使用 `delayqueue.NewProducer`.
|
||||||
|
|
||||||
|
```go
|
||||||
|
func consumer() {
|
||||||
|
queue := NewQueue("test", redisCli, cb)
|
||||||
|
queue.StartConsume()
|
||||||
|
}
|
||||||
|
|
||||||
|
func producer() {
|
||||||
|
publisher := NewPublisher("test", redisCli)
|
||||||
|
publisher.SendDelayMsg(strconv.Itoa(i), 0)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## 选项
|
||||||
|
|
||||||
```go
|
```go
|
||||||
WithLogger(logger *log.Logger)
|
WithLogger(logger *log.Logger)
|
||||||
@@ -121,7 +137,7 @@ WithDefaultRetryCount(count uint)
|
|||||||
|
|
||||||
在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。
|
在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。
|
||||||
|
|
||||||
# 集群
|
## 集群
|
||||||
|
|
||||||
如果需要在 Redis Cluster 上工作, 请使用 `NewQueueOnCluster`:
|
如果需要在 Redis Cluster 上工作, 请使用 `NewQueueOnCluster`:
|
||||||
|
|
||||||
@@ -151,7 +167,7 @@ callback := func(s string) bool {
|
|||||||
queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey())
|
queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey())
|
||||||
```
|
```
|
||||||
|
|
||||||
# 更多细节
|
## 更多细节
|
||||||
|
|
||||||
完整流程如图所示:
|
完整流程如图所示:
|
||||||
|
|
||||||
|
@@ -53,6 +53,7 @@ type RedisCli interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type hashTagKeyOpt int
|
type hashTagKeyOpt int
|
||||||
|
type noCallbackOpt int
|
||||||
|
|
||||||
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
|
// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot.
|
||||||
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
|
// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue
|
||||||
@@ -71,16 +72,19 @@ func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...in
|
|||||||
if cli == nil {
|
if cli == nil {
|
||||||
panic("cli is required")
|
panic("cli is required")
|
||||||
}
|
}
|
||||||
if callback == nil {
|
|
||||||
panic("callback is required")
|
|
||||||
}
|
|
||||||
useHashTag := false
|
useHashTag := false
|
||||||
|
noCallback := false
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
switch opt.(type) {
|
switch opt.(type) {
|
||||||
case hashTagKeyOpt:
|
case hashTagKeyOpt:
|
||||||
useHashTag = true
|
useHashTag = true
|
||||||
|
case noCallbackOpt:
|
||||||
|
noCallback = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !noCallback && callback == nil {
|
||||||
|
panic("callback is required")
|
||||||
|
}
|
||||||
var keyPrefix string
|
var keyPrefix string
|
||||||
if useHashTag {
|
if useHashTag {
|
||||||
keyPrefix = "{dp:" + name + "}"
|
keyPrefix = "{dp:" + name + "}"
|
||||||
|
45
publisher.go
Normal file
45
publisher.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package delayqueue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Publisher only publishes messages to delayqueue, it is a encapsulation of delayqueue
|
||||||
|
type Publisher struct {
|
||||||
|
inner *DelayQueue
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPublisher0 creates a new Publisher by a RedisCli instance
|
||||||
|
func NewPublisher0(name string, cli RedisCli, opts ...interface{}) *Publisher {
|
||||||
|
opts = append(opts, noCallbackOpt(1))
|
||||||
|
return &Publisher{
|
||||||
|
inner: NewQueue0(name, cli, nil, opts...),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPublisher creates a new Publisher by a *redis.Client
|
||||||
|
func NewPublisher(name string, cli *redis.Client, opts ...interface{}) *Publisher {
|
||||||
|
rc := &redisV9Wrapper{
|
||||||
|
inner: cli,
|
||||||
|
}
|
||||||
|
return NewPublisher0(name, rc, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithLogger customizes logger for queue
|
||||||
|
func (p *Publisher) WithLogger(logger *log.Logger) *Publisher {
|
||||||
|
p.inner.logger = logger
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendScheduleMsg submits a message delivered at given time
|
||||||
|
func (p *Publisher) SendScheduleMsg(payload string, t time.Time, opts ...interface{}) error {
|
||||||
|
return p.inner.SendScheduleMsg(payload, t, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendDelayMsg submits a message delivered after given duration
|
||||||
|
func (p *Publisher) SendDelayMsg(payload string, duration time.Duration, opts ...interface{}) error {
|
||||||
|
return p.inner.SendDelayMsg(payload, duration, opts...)
|
||||||
|
}
|
56
publisher_test.go
Normal file
56
publisher_test.go
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package delayqueue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPublisher(t *testing.T) {
|
||||||
|
redisCli := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "127.0.0.1:6379",
|
||||||
|
})
|
||||||
|
redisCli.FlushDB(context.Background())
|
||||||
|
size := 1000
|
||||||
|
retryCount := 3
|
||||||
|
deliveryCount := make(map[string]int)
|
||||||
|
cb := func(s string) bool {
|
||||||
|
deliveryCount[s]++
|
||||||
|
i, _ := strconv.ParseInt(s, 10, 64)
|
||||||
|
return i%2 == 0
|
||||||
|
}
|
||||||
|
logger := log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)
|
||||||
|
queue := NewQueue("test", redisCli, cb).WithLogger(logger)
|
||||||
|
publisher := NewPublisher("test", redisCli).WithLogger(logger)
|
||||||
|
|
||||||
|
for i := 0; i < size; i++ {
|
||||||
|
err := publisher.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := 0; i < 10*size; i++ {
|
||||||
|
err := queue.consume()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("consume error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for k, v := range deliveryCount {
|
||||||
|
i, _ := strconv.ParseInt(k, 10, 64)
|
||||||
|
if i%2 == 0 {
|
||||||
|
if v != 1 {
|
||||||
|
t.Errorf("expect 1 delivery, actual %d", v)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if v != retryCount+1 {
|
||||||
|
t.Errorf("expect %d delivery, actual %d", retryCount+1, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user