mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-09-27 11:32:06 +08:00
This commit is contained in:
@@ -235,6 +235,15 @@ But if consumption exceeded deadline, the message will be redelivered immediatel
|
|||||||
|
|
||||||
WithScriptPreload(true) makes DelayQueue preload scripts and call them using EvalSha to reduce communication costs. WithScriptPreload(false) makes DelayQueue run scripts by Eval commnand. Using preload and EvalSha by Default
|
WithScriptPreload(true) makes DelayQueue preload scripts and call them using EvalSha to reduce communication costs. WithScriptPreload(false) makes DelayQueue run scripts by Eval commnand. Using preload and EvalSha by Default
|
||||||
|
|
||||||
|
### Customize Prefix
|
||||||
|
|
||||||
|
```go
|
||||||
|
queue := delayqueue.NewQueue("example", redisCli, callback, UseCustomPrefix("MyPrefix"))
|
||||||
|
```
|
||||||
|
|
||||||
|
All keys of delayqueue has a smae prefix, `dp` by default. If you want to modify the prefix, you could use `UseCustomPrefix`.
|
||||||
|
|
||||||
|
|
||||||
## Monitoring
|
## Monitoring
|
||||||
|
|
||||||
We provides Monitor to monitor the running status.
|
We provides Monitor to monitor the running status.
|
||||||
|
@@ -226,6 +226,14 @@ WithScriptPreload(true) 会让 delayqueue 预上传脚本并使用 EvalSha 命
|
|||||||
|
|
||||||
ScriptPreload 默认值为 true.
|
ScriptPreload 默认值为 true.
|
||||||
|
|
||||||
|
### 自定义前缀
|
||||||
|
|
||||||
|
```go
|
||||||
|
queue := delayqueue.NewQueue("example", redisCli, callback, UseCustomPrefix("MyPrefix"))
|
||||||
|
```
|
||||||
|
|
||||||
|
delayqueue 中所有的 key 都有相同的前缀,默认情况下前缀为 `dp`。如果你需要自定义前缀可以使用 UseCustomPrefix 函数。
|
||||||
|
|
||||||
## 监控
|
## 监控
|
||||||
|
|
||||||
我们提供了 `Monitor` 来监控运行数据:
|
我们提供了 `Monitor` 来监控运行数据:
|
||||||
|
@@ -91,6 +91,7 @@ type Logger interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type hashTagKeyOpt int
|
type hashTagKeyOpt int
|
||||||
|
type prefixOpt string
|
||||||
|
|
||||||
// CallbackFunc receives and consumes messages
|
// CallbackFunc receives and consumes messages
|
||||||
// returns true to confirm successfully consumed, false to re-deliver this message
|
// returns true to confirm successfully consumed, false to re-deliver this message
|
||||||
@@ -104,6 +105,11 @@ func UseHashTagKey() interface{} {
|
|||||||
return hashTagKeyOpt(1)
|
return hashTagKeyOpt(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UseCustomPrefix customize prefix to instead of default prefix "dp"
|
||||||
|
func UseCustomPrefix(prefix string) interface{} {
|
||||||
|
return prefixOpt(prefix)
|
||||||
|
}
|
||||||
|
|
||||||
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
|
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
|
||||||
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
|
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
|
||||||
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
|
func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
|
||||||
@@ -113,21 +119,22 @@ func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
|
|||||||
if cli == nil {
|
if cli == nil {
|
||||||
panic("cli is required")
|
panic("cli is required")
|
||||||
}
|
}
|
||||||
|
prefix := "dp"
|
||||||
useHashTag := false
|
useHashTag := false
|
||||||
var callback CallbackFunc = nil
|
var callback CallbackFunc = nil
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
switch o := opt.(type) {
|
switch o := opt.(type) {
|
||||||
case hashTagKeyOpt:
|
case hashTagKeyOpt:
|
||||||
useHashTag = true
|
useHashTag = true
|
||||||
|
case prefixOpt:
|
||||||
|
prefix = string(o)
|
||||||
case CallbackFunc:
|
case CallbackFunc:
|
||||||
callback = o
|
callback = o
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var keyPrefix string
|
keyPrefix := prefix + ":" + name
|
||||||
if useHashTag {
|
if useHashTag {
|
||||||
keyPrefix = "{dp:" + name + "}"
|
keyPrefix = "{" + keyPrefix + "}"
|
||||||
} else {
|
|
||||||
keyPrefix = "dp:" + name
|
|
||||||
}
|
}
|
||||||
return &DelayQueue{
|
return &DelayQueue{
|
||||||
name: name,
|
name: name,
|
||||||
|
@@ -5,6 +5,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -478,3 +479,32 @@ func TestDelayQueue_TryIntercept(t *testing.T) {
|
|||||||
t.Error("expect empty messages")
|
t.Error("expect empty messages")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUseCustomPrefix(t *testing.T) {
|
||||||
|
redisCli := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "127.0.0.1:6379",
|
||||||
|
})
|
||||||
|
cb := func(s string) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
prefix := "MYQUEUE"
|
||||||
|
dp := NewQueue("test", redisCli, cb, UseCustomPrefix(prefix))
|
||||||
|
if !strings.HasPrefix(dp.pendingKey, prefix) {
|
||||||
|
t.Error("wrong prefix")
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(dp.readyKey, prefix) {
|
||||||
|
t.Error("wrong prefix")
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(dp.unAckKey, prefix) {
|
||||||
|
t.Error("wrong prefix")
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(dp.retryKey, prefix) {
|
||||||
|
t.Error("wrong prefix")
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(dp.retryCountKey, prefix) {
|
||||||
|
t.Error("wrong prefix")
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(dp.garbageKey, prefix) {
|
||||||
|
t.Error("wrong prefix")
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user