From 7816eb01de2cbc9b6d0a0b27c84344b39f5d3f4b Mon Sep 17 00:00:00 2001 From: finley Date: Wed, 26 Feb 2025 20:52:59 +0800 Subject: [PATCH] support custom prefix --- README.md | 9 +++++++++ README_CN.md | 8 ++++++++ delayqueue.go | 17 ++++++++++++----- delayqueue_test.go | 30 ++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index dd8c9ad..5692a02 100644 --- a/README.md +++ b/README.md @@ -235,6 +235,15 @@ But if consumption exceeded deadline, the message will be redelivered immediatel WithScriptPreload(true) makes DelayQueue preload scripts and call them using EvalSha to reduce communication costs. WithScriptPreload(false) makes DelayQueue run scripts by Eval commnand. Using preload and EvalSha by Default +### Customize Prefix + +```go +queue := delayqueue.NewQueue("example", redisCli, callback, UseCustomPrefix("MyPrefix")) +``` + +All keys of delayqueue has a smae prefix, `dp` by default. If you want to modify the prefix, you could use `UseCustomPrefix`. + + ## Monitoring We provides Monitor to monitor the running status. diff --git a/README_CN.md b/README_CN.md index 0bffee2..bbda182 100644 --- a/README_CN.md +++ b/README_CN.md @@ -226,6 +226,14 @@ WithScriptPreload(true) 会让 delayqueue 预上传脚本并使用 EvalSha 命 ScriptPreload 默认值为 true. +### 自定义前缀 + +```go +queue := delayqueue.NewQueue("example", redisCli, callback, UseCustomPrefix("MyPrefix")) +``` + +delayqueue 中所有的 key 都有相同的前缀,默认情况下前缀为 `dp`。如果你需要自定义前缀可以使用 UseCustomPrefix 函数。 + ## 监控 我们提供了 `Monitor` 来监控运行数据: diff --git a/delayqueue.go b/delayqueue.go index a453cd0..491e77f 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -91,6 +91,7 @@ type Logger interface { } type hashTagKeyOpt int +type prefixOpt string // CallbackFunc receives and consumes messages // returns true to confirm successfully consumed, false to re-deliver this message @@ -104,6 +105,11 @@ func UseHashTagKey() interface{} { return hashTagKeyOpt(1) } +// UseCustomPrefix customize prefix to instead of default prefix "dp" +func UseCustomPrefix(prefix string) interface{} { + return prefixOpt(prefix) +} + // NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message // callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue { @@ -113,22 +119,23 @@ func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue { if cli == nil { panic("cli is required") } + prefix := "dp" useHashTag := false var callback CallbackFunc = nil for _, opt := range opts { switch o := opt.(type) { case hashTagKeyOpt: useHashTag = true + case prefixOpt: + prefix = string(o) case CallbackFunc: callback = o } } - var keyPrefix string + keyPrefix := prefix + ":" + name if useHashTag { - keyPrefix = "{dp:" + name + "}" - } else { - keyPrefix = "dp:" + name - } + keyPrefix = "{" + keyPrefix + "}" + } return &DelayQueue{ name: name, redisCli: cli, diff --git a/delayqueue_test.go b/delayqueue_test.go index 7ed43cd..b59b7cf 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -5,6 +5,7 @@ import ( "log" "os" "strconv" + "strings" "sync" "testing" "time" @@ -478,3 +479,32 @@ func TestDelayQueue_TryIntercept(t *testing.T) { t.Error("expect empty messages") } } + +func TestUseCustomPrefix(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + cb := func(s string) bool { + return false + } + prefix := "MYQUEUE" + dp := NewQueue("test", redisCli, cb, UseCustomPrefix(prefix)) + if !strings.HasPrefix(dp.pendingKey, prefix) { + t.Error("wrong prefix") + } + if !strings.HasPrefix(dp.readyKey, prefix) { + t.Error("wrong prefix") + } + if !strings.HasPrefix(dp.unAckKey, prefix) { + t.Error("wrong prefix") + } + if !strings.HasPrefix(dp.retryKey, prefix) { + t.Error("wrong prefix") + } + if !strings.HasPrefix(dp.retryCountKey, prefix) { + t.Error("wrong prefix") + } + if !strings.HasPrefix(dp.garbageKey, prefix) { + t.Error("wrong prefix") + } +} \ No newline at end of file