From 705b9721d81fded9cfebd6d873aa44db2bf566c1 Mon Sep 17 00:00:00 2001 From: finley Date: Fri, 4 Oct 2024 15:27:59 +0800 Subject: [PATCH] preload scripts and use evalsha to call them --- README.md | 12 ++++++-- README_CN.md | 10 +++++- delayqueue.go | 77 +++++++++++++++++++++++++++++++++++++++++----- delayqueue_test.go | 50 ++++++++++++++++++++++++++++++ wrapper.go | 24 +++++++++++++++ 5 files changed, 161 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 56be11e..59d649c 100644 --- a/README.md +++ b/README.md @@ -24,10 +24,12 @@ Core Advantages: DelayQueue requires a Go version with modules support. Run following command line in your project with go.mod: -``` +```bash go get github.com/hdt3213/delayqueue ``` +> if you are using `github.com/go-redis/redis/v8` please use `go get github.com/hdt3213/delayqueue@redisv8` + ## Get Started ```go @@ -69,8 +71,6 @@ func main() { } ``` -> if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@redisv8` - > Please note that redis/v8 is not compatible with redis cluster 7.x. [detail](https://github.com/redis/go-redis/issues/2085) > If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface @@ -167,6 +167,12 @@ WithDefaultRetryCount customizes the max number of retry, it effects of messages use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message +```go +(q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue +``` + +WithScriptPreload(true) makes DelayQueue preload scripts and call them using EvalSha to reduce communication costs. WithScriptPreload(false) makes DelayQueue run scripts by Eval commnand. Using preload and EvalSha by Default + ## Monitoring We provides Monitor to monitor the running status. diff --git a/README_CN.md b/README_CN.md index f07e51b..dcaf255 100644 --- a/README_CN.md +++ b/README_CN.md @@ -23,7 +23,7 @@ DelayQueue 的主要优势: go get github.com/hdt3213/delayqueue ``` -> 如果您仍在使用 `github.com/go-redis/redis/v8` 请安装 `go get github.com/hdt3213/delayqueue@v8` +> 如果您仍在使用 `github.com/go-redis/redis/v8` 请安装 `go get github.com/hdt3213/delayqueue@redisv8` ## 开始使用 @@ -158,6 +158,14 @@ WithDefaultRetryCount(count uint) 在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。 +```go +(q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue +``` + +WithScriptPreload(true) 会让 delayqueue 预上传脚本并使用 EvalSha 命令调用脚本,WithScriptPreload(false) 会让 delayqueue 使用 Eval 命令运行脚本。 + +ScriptPreload 默认值为 true. + ## 监控 我们提供了 `Monitor` 来监控运行数据: diff --git a/delayqueue.go b/delayqueue.go index 402883c..f35e716 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "strconv" + "strings" + "sync" "sync/atomic" "time" @@ -35,7 +37,9 @@ type DelayQueue struct { fetchLimit uint // default no limit fetchCount int32 // actually running task number concurrent uint // default 1, executed serially - + sha1map map[string]string + sha1mapMu *sync.RWMutex + scriptPreload bool // for batch consume consumeBuffer chan string @@ -70,6 +74,12 @@ type RedisCli interface { // Subscribe used for monitor only // returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well Subscribe(channel string) (payloads <-chan string, close func(), err error) + + // ScriptLoad call `script load` command + ScriptLoad(script string) (string, error) + // EvalSha run preload scripts + // If there is no preload scripts please return error with message "NOSCRIPT" + EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error) } type hashTagKeyOpt int @@ -129,6 +139,9 @@ func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue { defaultRetryCount: 3, fetchInterval: time.Second, concurrent: 1, + sha1map: make(map[string]string), + sha1mapMu: &sync.RWMutex{}, + scriptPreload: true, } } @@ -151,6 +164,12 @@ func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue { return q } +// WithScriptPreload use script load command preload scripts to redis +func (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue { + q.scriptPreload = flag + return q +} + // WithMaxConsumeDuration customizes max consume duration // If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue { @@ -245,6 +264,48 @@ func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts . return q.SendScheduleMsg(payload, t, opts...) } +func (q *DelayQueue) loadScript(script string) (string, error) { + sha1, err := q.redisCli.ScriptLoad(script) + if err != nil { + return "", err + } + q.sha1mapMu.Lock() + q.sha1map[script] = sha1 + q.sha1mapMu.Unlock() + return sha1, nil +} + +func (q *DelayQueue) eval(script string, keys []string, args []interface{}) (interface{}, error) { + if !q.scriptPreload { + return q.redisCli.Eval(script, keys, args) + } + var err error + q.sha1mapMu.RLock() + sha1, ok := q.sha1map[script] + q.sha1mapMu.RUnlock() + if !ok { + sha1, err = q.loadScript(script) + if err != nil { + return nil, err + } + } + result, err := q.redisCli.EvalSha(sha1, keys, args) + if err == nil { + return result, err + } + // script not loaded, reload it + // It is possible to access a node in the cluster that has no pre-loaded scripts. + if strings.HasPrefix(err.Error(), "NOSCRIPT") { + sha1, err = q.loadScript(script) + if err != nil { + return nil, err + } + // try again + result, err = q.redisCli.EvalSha(sha1, keys, args) + } + return result, err +} + // pending2ReadyScript atomically moves messages from pending to ready // keys: pendingKey, readyKey // argv: currentTime @@ -270,7 +331,7 @@ return #msgs func (q *DelayQueue) pending2Ready() error { now := time.Now().Unix() keys := []string{q.pendingKey, q.readyKey} - raw, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now}) + raw, err := q.eval(pending2ReadyScript, keys, []interface{}{now}) if err != nil && err != NilErr { return fmt.Errorf("pending2ReadyScript failed: %v", err) } @@ -294,7 +355,7 @@ return msg func (q *DelayQueue) ready2Unack() (string, error) { retryTime := time.Now().Add(q.maxConsumeDuration).Unix() keys := []string{q.readyKey, q.unAckKey} - ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime}) + ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime}) if err == NilErr { return "", err } @@ -312,7 +373,7 @@ func (q *DelayQueue) ready2Unack() (string, error) { func (q *DelayQueue) retry2Unack() (string, error) { retryTime := time.Now().Add(q.maxConsumeDuration).Unix() keys := []string{q.retryKey, q.unAckKey} - ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey}) + ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey}) if err == NilErr { return "", NilErr } @@ -429,7 +490,7 @@ return {retryMsgs, failMsgs} func (q *DelayQueue) unack2Retry() error { keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey} now := time.Now() - raw, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()}) + raw, err := q.eval(unack2RetryScript, keys, []interface{}{now.Unix()}) if err != nil && err != NilErr { return fmt.Errorf("unack to retry script failed: %v", err) } @@ -545,9 +606,9 @@ func (q *DelayQueue) assertNotRunning() { } } -func (q *DelayQueue)goWithRecover(fn func()) { - go func () { - defer func () { +func (q *DelayQueue) goWithRecover(fn func()) { + go func() { + defer func() { if err := recover(); err != nil { q.logger.Printf("panic: %v\n", err) } diff --git a/delayqueue_test.go b/delayqueue_test.go index be97cf4..b35344c 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -340,4 +340,54 @@ func TestDelayQueue_FetchLimit(t *testing.T) { if len(ids3) == 0 { t.Error("should get some messages, after consumption") } +} + +func TestDelayQueue_ScriptPreload(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + redisCli.FlushDB(context.Background()) + size := 101 // use a prime number may found some hidden bugs ^_^ + retryCount := 3 + mu := sync.Mutex{} + deliveryCount := make(map[string]int) + cb := func(s string) bool { + mu.Lock() + deliveryCount[s]++ + mu.Unlock() + return true + } + queue := NewQueue("test", redisCli, cb). + WithFetchInterval(time.Millisecond * 50). + WithMaxConsumeDuration(0). + WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)). + WithConcurrent(4). + WithScriptPreload(true) + + for i := 0; i < size; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour)) + if err != nil { + t.Error(err) + } + } + for i := 0; i < 2*size; i++ { + if i == 2 { + // random clean script cache + redisCli.ScriptFlush(context.Background()) + } + ids, err := queue.beforeConsume() + if err != nil { + t.Errorf("consume error: %v", err) + return + } + for _, id := range ids { + queue.callback(id) + } + queue.afterConsume() + } + for k, v := range deliveryCount { + if v != 1 { + t.Errorf("expect 1 delivery, actual %d. key: %s", v, k) + } + } } \ No newline at end of file diff --git a/wrapper.go b/wrapper.go index fd42644..261efaa 100644 --- a/wrapper.go +++ b/wrapper.go @@ -132,6 +132,18 @@ func (r *redisV9Wrapper) Subscribe(channel string) (<-chan string, func(), error return resultChan, close, nil } +func (r *redisV9Wrapper) EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error) { + ctx := context.Background() + ret, err := r.inner.EvalSha(ctx, sha1, keys, args...).Result() + return ret, wrapErr(err) +} + +func (r *redisV9Wrapper) ScriptLoad(script string) (string, error) { + ctx := context.Background() + sha1, err := r.inner.ScriptLoad(ctx, script).Result() + return sha1, wrapErr(err) +} + type redisClusterWrapper struct { inner *redis.ClusterClient } @@ -235,6 +247,18 @@ func (r *redisClusterWrapper) Subscribe(channel string) (<-chan string, func(), return resultChan, close, nil } +func (r *redisClusterWrapper) EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error) { + ctx := context.Background() + ret, err := r.inner.EvalSha(ctx, sha1, keys, args...).Result() + return ret, wrapErr(err) +} + +func (r *redisClusterWrapper) ScriptLoad(script string) (string, error) { + ctx := context.Background() + sha1, err := r.inner.ScriptLoad(ctx, script).Result() + return sha1, wrapErr(err) +} + func NewQueueOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *DelayQueue { rc := &redisClusterWrapper{ inner: cli,