mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-10-08 16:40:18 +08:00
add README_CN.md
This commit is contained in:
@@ -6,6 +6,8 @@
|
||||
[](https://goreportcard.com/report/github.com/HDT3213/delayqueue)
|
||||
[](https://pkg.go.dev/github.com/hdt3213/delayqueue)
|
||||
|
||||
[中文版](https://github.com/HDT3213/delayqueue/blob/master/README_CN.md)
|
||||
|
||||
DelayQueue is a message queue supporting delayed/scheduled delivery based on redis.
|
||||
|
||||
DelayQueue support ACK/Retry mechanism, it will re-deliver message after a while as long as no confirmation is received.
|
||||
|
144
README_CN.md
Normal file
144
README_CN.md
Normal file
@@ -0,0 +1,144 @@
|
||||
|
||||

|
||||
[](https://app.travis-ci.com/github/HDT3213/delayqueue)
|
||||
[](https://coveralls.io/github/HDT3213/delayqueue?branch=master)
|
||||
[](https://goreportcard.com/report/github.com/HDT3213/delayqueue)
|
||||
[](https://pkg.go.dev/github.com/hdt3213/delayqueue)
|
||||
|
||||
DelayQueue 是使用 Go 语言基于 Redis 实现的支持延时/定时投递的消息队列。
|
||||
|
||||
DelayQueue 的主要优势:
|
||||
- 支持 Ack/重试机制,只要消费者没有确认成功消费 (Ack) DelayQueue 就会尝试重新投递消息,直到达到消息的最大重试次数为止。
|
||||
- 可在分布式环境中安全运行,用户可以在多台机器上同时向 DelayQueue 发送或拉取消息。DelayQueue 保证一条消息有且只有一个消费者进行消费。
|
||||
- 消费者负责控制消息流转,无需部署额外服务。
|
||||
|
||||
# 安装
|
||||
|
||||
在启用了 go mod 的项目中运行下列命令即可完成安装:
|
||||
|
||||
```shell
|
||||
go get github.com/hdt3213/delayqueue
|
||||
```
|
||||
|
||||
# 开始使用
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/hdt3213/delayqueue"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
})
|
||||
queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
|
||||
// return true 表示成功消费
|
||||
// 如果返回了 false 或者在 maxConsumeDuration 时限内没有返回则视为消费失败,DelayQueue 会重新投递消息
|
||||
return true
|
||||
}).WithConcurrent(4) // 设置消费者并发数
|
||||
// 发送延时投递消息
|
||||
for i := 0; i < 10; i++ {
|
||||
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
// 发送定时投递消息
|
||||
for i := 0; i < 10; i++ {
|
||||
err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Hour))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
// 开始消费
|
||||
done := queue.StartConsume()
|
||||
<-done // 如需等待消费者关闭,监听 done 即可
|
||||
}
|
||||
```
|
||||
|
||||
# 选项
|
||||
|
||||
```go
|
||||
WithLogger(logger *log.Logger)
|
||||
```
|
||||
|
||||
为 DelayQueue 设置 logger
|
||||
|
||||
|
||||
```go
|
||||
WithConcurrent(c uint)
|
||||
```
|
||||
|
||||
设置消费者并发数
|
||||
|
||||
```go
|
||||
WithFetchInterval(d time.Duration)
|
||||
```
|
||||
|
||||
设置消费者从 Redis 拉取消息的时间间隔
|
||||
|
||||
```go
|
||||
WithMaxConsumeDuration(d time.Duration)
|
||||
```
|
||||
|
||||
设置最长消费时间。若拉取消息后超出 MaxConsumeDuration 时限仍未返回 ACK 则认为消费失败,DelayQueue 会重新投递此消息。
|
||||
|
||||
```go
|
||||
WithFetchLimit(limit uint)
|
||||
```
|
||||
|
||||
FetchLimit 限制消费者从 Redis 中拉取的消息数目,即单个消费者正在处理中的消息数不会超过 FetchLimit
|
||||
|
||||
```go
|
||||
UseHashTagKey()
|
||||
```
|
||||
|
||||
UseHashTagKey() 会在 Redis Key 上添加 hash tag 确保同一个队列的所有 Key 分布在同一个哈希槽中。
|
||||
|
||||
如果您正在使用 Codis/阿里云/腾讯云等 Redis 集群,请在 NewQueue 时添加这个选项:`NewQueue("test", redisCli, cb, UseHashTagKey())`。UseHashTagKey 选项在队列创建后禁止修改。
|
||||
|
||||
**注意:** 修改(添加或移除)此选项会导致无法访问 Redis 中已有的数据。
|
||||
|
||||
see more: https://redis.io/docs/reference/cluster-spec/#hash-tags
|
||||
|
||||
```go
|
||||
WithDefaultRetryCount(count uint)
|
||||
```
|
||||
|
||||
设置队列中消息的默认重试次数。
|
||||
|
||||
在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。
|
||||
|
||||
# 更多细节
|
||||
|
||||
完整流程如图所示:
|
||||
|
||||

|
||||
|
||||
|
||||
整个消息队列中一共有 7 个 Redis 数据结构:
|
||||
|
||||
- pending: 有序集合类型,存储未到投递时间的消息。 member 为消息 ID、score 为投递时间。
|
||||
- ready: 列表类型,存储已到投递时间的消息。element 为消息 ID。
|
||||
- unack: 有序集合类型, 存储已投递但未确认成功消费的消息 ID。 member 为消息 ID、score 为处理超时时间, 超出这个时间还未 ack 的消息会被重试。
|
||||
- retry: 列表类型,存储处理超时后等待重试的消息 ID。element 为消息 ID。
|
||||
- garbage: 集合类型,用于暂存已达重试上限的消息 ID。后面介绍 unack2retry 时会介绍为什么需要 garbage 结构。
|
||||
- msgKey: 为了避免两条内容完全相同的消息造成意外的影响,我们将每条消息放到一个字符串类型的键中,并分配一个 UUID 作为它的唯一标识。其它数据结构中只存储 UUID 而不存储完整的消息内容。每个 msg 拥有一个独立的 key 而不是将所有消息放到一个哈希表中是为了利用 TTL 机制避免泄漏。
|
||||
- retryCountKey: 哈希表类型,键为消息 ID, 值为剩余的重试次数。
|
||||
|
||||
如上图所示整个消息队列中一共涉及 6 个操作:
|
||||
|
||||
- send: 发送一条新消息。首先存储消息内容和重试次数,并将消息 ID 放入 pending 中。
|
||||
- pending2ready: 将已到投递时间的消息从 pending 移动到 ready 中
|
||||
- ready2unack: 将一条等待投递的消息从 ready (或 retry) 移动到 unack 中,并把消息发送给消费者。
|
||||
- unack2retry: 将 unack 中未到重试次数上限的消息转移到 retry 中,已到重试次数上限的转移到 garbage 中等待后续清理。
|
||||
- ack: 从 unack 中删除处理成功的消息并清理它的 msgKey 和 retryCount 数据。
|
||||
- garbageCollect: 清理已到最大重试次数的消息。
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user