mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-10-05 07:06:53 +08:00
preload scripts and use evalsha to call them
This commit is contained in:
12
README.md
12
README.md
@@ -24,10 +24,12 @@ Core Advantages:
|
||||
|
||||
DelayQueue requires a Go version with modules support. Run following command line in your project with go.mod:
|
||||
|
||||
```
|
||||
```bash
|
||||
go get github.com/hdt3213/delayqueue
|
||||
```
|
||||
|
||||
> if you are using `github.com/go-redis/redis/v8` please use `go get github.com/hdt3213/delayqueue@redisv8`
|
||||
|
||||
## Get Started
|
||||
|
||||
```go
|
||||
@@ -69,8 +71,6 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
> if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@redisv8`
|
||||
|
||||
> Please note that redis/v8 is not compatible with redis cluster 7.x. [detail](https://github.com/redis/go-redis/issues/2085)
|
||||
|
||||
> If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface
|
||||
@@ -167,6 +167,12 @@ WithDefaultRetryCount customizes the max number of retry, it effects of messages
|
||||
|
||||
use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
|
||||
|
||||
```go
|
||||
(q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue
|
||||
```
|
||||
|
||||
WithScriptPreload(true) makes DelayQueue preload scripts and call them using EvalSha to reduce communication costs. WithScriptPreload(false) makes DelayQueue run scripts by Eval commnand. Using preload and EvalSha by Default
|
||||
|
||||
## Monitoring
|
||||
|
||||
We provides Monitor to monitor the running status.
|
||||
|
10
README_CN.md
10
README_CN.md
@@ -23,7 +23,7 @@ DelayQueue 的主要优势:
|
||||
go get github.com/hdt3213/delayqueue
|
||||
```
|
||||
|
||||
> 如果您仍在使用 `github.com/go-redis/redis/v8` 请安装 `go get github.com/hdt3213/delayqueue@v8`
|
||||
> 如果您仍在使用 `github.com/go-redis/redis/v8` 请安装 `go get github.com/hdt3213/delayqueue@redisv8`
|
||||
|
||||
## 开始使用
|
||||
|
||||
@@ -158,6 +158,14 @@ WithDefaultRetryCount(count uint)
|
||||
|
||||
在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。
|
||||
|
||||
```go
|
||||
(q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue
|
||||
```
|
||||
|
||||
WithScriptPreload(true) 会让 delayqueue 预上传脚本并使用 EvalSha 命令调用脚本,WithScriptPreload(false) 会让 delayqueue 使用 Eval 命令运行脚本。
|
||||
|
||||
ScriptPreload 默认值为 true.
|
||||
|
||||
## 监控
|
||||
|
||||
我们提供了 `Monitor` 来监控运行数据:
|
||||
|
@@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -35,7 +37,9 @@ type DelayQueue struct {
|
||||
fetchLimit uint // default no limit
|
||||
fetchCount int32 // actually running task number
|
||||
concurrent uint // default 1, executed serially
|
||||
|
||||
sha1map map[string]string
|
||||
sha1mapMu *sync.RWMutex
|
||||
scriptPreload bool
|
||||
// for batch consume
|
||||
consumeBuffer chan string
|
||||
|
||||
@@ -70,6 +74,12 @@ type RedisCli interface {
|
||||
// Subscribe used for monitor only
|
||||
// returns: payload channel, subscription closer, error; the subscription closer should close payload channel as well
|
||||
Subscribe(channel string) (payloads <-chan string, close func(), err error)
|
||||
|
||||
// ScriptLoad call `script load` command
|
||||
ScriptLoad(script string) (string, error)
|
||||
// EvalSha run preload scripts
|
||||
// If there is no preload scripts please return error with message "NOSCRIPT"
|
||||
EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error)
|
||||
}
|
||||
|
||||
type hashTagKeyOpt int
|
||||
@@ -129,6 +139,9 @@ func NewQueue0(name string, cli RedisCli, opts ...interface{}) *DelayQueue {
|
||||
defaultRetryCount: 3,
|
||||
fetchInterval: time.Second,
|
||||
concurrent: 1,
|
||||
sha1map: make(map[string]string),
|
||||
sha1mapMu: &sync.RWMutex{},
|
||||
scriptPreload: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,6 +164,12 @@ func (q *DelayQueue) WithFetchInterval(d time.Duration) *DelayQueue {
|
||||
return q
|
||||
}
|
||||
|
||||
// WithScriptPreload use script load command preload scripts to redis
|
||||
func (q *DelayQueue) WithScriptPreload(flag bool) *DelayQueue {
|
||||
q.scriptPreload = flag
|
||||
return q
|
||||
}
|
||||
|
||||
// WithMaxConsumeDuration customizes max consume duration
|
||||
// If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again
|
||||
func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
|
||||
@@ -245,6 +264,48 @@ func (q *DelayQueue) SendDelayMsg(payload string, duration time.Duration, opts .
|
||||
return q.SendScheduleMsg(payload, t, opts...)
|
||||
}
|
||||
|
||||
func (q *DelayQueue) loadScript(script string) (string, error) {
|
||||
sha1, err := q.redisCli.ScriptLoad(script)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
q.sha1mapMu.Lock()
|
||||
q.sha1map[script] = sha1
|
||||
q.sha1mapMu.Unlock()
|
||||
return sha1, nil
|
||||
}
|
||||
|
||||
func (q *DelayQueue) eval(script string, keys []string, args []interface{}) (interface{}, error) {
|
||||
if !q.scriptPreload {
|
||||
return q.redisCli.Eval(script, keys, args)
|
||||
}
|
||||
var err error
|
||||
q.sha1mapMu.RLock()
|
||||
sha1, ok := q.sha1map[script]
|
||||
q.sha1mapMu.RUnlock()
|
||||
if !ok {
|
||||
sha1, err = q.loadScript(script)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
result, err := q.redisCli.EvalSha(sha1, keys, args)
|
||||
if err == nil {
|
||||
return result, err
|
||||
}
|
||||
// script not loaded, reload it
|
||||
// It is possible to access a node in the cluster that has no pre-loaded scripts.
|
||||
if strings.HasPrefix(err.Error(), "NOSCRIPT") {
|
||||
sha1, err = q.loadScript(script)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// try again
|
||||
result, err = q.redisCli.EvalSha(sha1, keys, args)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// pending2ReadyScript atomically moves messages from pending to ready
|
||||
// keys: pendingKey, readyKey
|
||||
// argv: currentTime
|
||||
@@ -270,7 +331,7 @@ return #msgs
|
||||
func (q *DelayQueue) pending2Ready() error {
|
||||
now := time.Now().Unix()
|
||||
keys := []string{q.pendingKey, q.readyKey}
|
||||
raw, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now})
|
||||
raw, err := q.eval(pending2ReadyScript, keys, []interface{}{now})
|
||||
if err != nil && err != NilErr {
|
||||
return fmt.Errorf("pending2ReadyScript failed: %v", err)
|
||||
}
|
||||
@@ -294,7 +355,7 @@ return msg
|
||||
func (q *DelayQueue) ready2Unack() (string, error) {
|
||||
retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
|
||||
keys := []string{q.readyKey, q.unAckKey}
|
||||
ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime})
|
||||
ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime})
|
||||
if err == NilErr {
|
||||
return "", err
|
||||
}
|
||||
@@ -312,7 +373,7 @@ func (q *DelayQueue) ready2Unack() (string, error) {
|
||||
func (q *DelayQueue) retry2Unack() (string, error) {
|
||||
retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
|
||||
keys := []string{q.retryKey, q.unAckKey}
|
||||
ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
|
||||
ret, err := q.eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
|
||||
if err == NilErr {
|
||||
return "", NilErr
|
||||
}
|
||||
@@ -429,7 +490,7 @@ return {retryMsgs, failMsgs}
|
||||
func (q *DelayQueue) unack2Retry() error {
|
||||
keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
|
||||
now := time.Now()
|
||||
raw, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()})
|
||||
raw, err := q.eval(unack2RetryScript, keys, []interface{}{now.Unix()})
|
||||
if err != nil && err != NilErr {
|
||||
return fmt.Errorf("unack to retry script failed: %v", err)
|
||||
}
|
||||
@@ -545,9 +606,9 @@ func (q *DelayQueue) assertNotRunning() {
|
||||
}
|
||||
}
|
||||
|
||||
func (q *DelayQueue)goWithRecover(fn func()) {
|
||||
go func () {
|
||||
defer func () {
|
||||
func (q *DelayQueue) goWithRecover(fn func()) {
|
||||
go func() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
q.logger.Printf("panic: %v\n", err)
|
||||
}
|
||||
|
@@ -340,4 +340,54 @@ func TestDelayQueue_FetchLimit(t *testing.T) {
|
||||
if len(ids3) == 0 {
|
||||
t.Error("should get some messages, after consumption")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayQueue_ScriptPreload(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
})
|
||||
redisCli.FlushDB(context.Background())
|
||||
size := 101 // use a prime number may found some hidden bugs ^_^
|
||||
retryCount := 3
|
||||
mu := sync.Mutex{}
|
||||
deliveryCount := make(map[string]int)
|
||||
cb := func(s string) bool {
|
||||
mu.Lock()
|
||||
deliveryCount[s]++
|
||||
mu.Unlock()
|
||||
return true
|
||||
}
|
||||
queue := NewQueue("test", redisCli, cb).
|
||||
WithFetchInterval(time.Millisecond * 50).
|
||||
WithMaxConsumeDuration(0).
|
||||
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
|
||||
WithConcurrent(4).
|
||||
WithScriptPreload(true)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
for i := 0; i < 2*size; i++ {
|
||||
if i == 2 {
|
||||
// random clean script cache
|
||||
redisCli.ScriptFlush(context.Background())
|
||||
}
|
||||
ids, err := queue.beforeConsume()
|
||||
if err != nil {
|
||||
t.Errorf("consume error: %v", err)
|
||||
return
|
||||
}
|
||||
for _, id := range ids {
|
||||
queue.callback(id)
|
||||
}
|
||||
queue.afterConsume()
|
||||
}
|
||||
for k, v := range deliveryCount {
|
||||
if v != 1 {
|
||||
t.Errorf("expect 1 delivery, actual %d. key: %s", v, k)
|
||||
}
|
||||
}
|
||||
}
|
24
wrapper.go
24
wrapper.go
@@ -132,6 +132,18 @@ func (r *redisV9Wrapper) Subscribe(channel string) (<-chan string, func(), error
|
||||
return resultChan, close, nil
|
||||
}
|
||||
|
||||
func (r *redisV9Wrapper) EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error) {
|
||||
ctx := context.Background()
|
||||
ret, err := r.inner.EvalSha(ctx, sha1, keys, args...).Result()
|
||||
return ret, wrapErr(err)
|
||||
}
|
||||
|
||||
func (r *redisV9Wrapper) ScriptLoad(script string) (string, error) {
|
||||
ctx := context.Background()
|
||||
sha1, err := r.inner.ScriptLoad(ctx, script).Result()
|
||||
return sha1, wrapErr(err)
|
||||
}
|
||||
|
||||
type redisClusterWrapper struct {
|
||||
inner *redis.ClusterClient
|
||||
}
|
||||
@@ -235,6 +247,18 @@ func (r *redisClusterWrapper) Subscribe(channel string) (<-chan string, func(),
|
||||
return resultChan, close, nil
|
||||
}
|
||||
|
||||
func (r *redisClusterWrapper) EvalSha(sha1 string, keys []string, args []interface{}) (interface{}, error) {
|
||||
ctx := context.Background()
|
||||
ret, err := r.inner.EvalSha(ctx, sha1, keys, args...).Result()
|
||||
return ret, wrapErr(err)
|
||||
}
|
||||
|
||||
func (r *redisClusterWrapper) ScriptLoad(script string) (string, error) {
|
||||
ctx := context.Background()
|
||||
sha1, err := r.inner.ScriptLoad(ctx, script).Result()
|
||||
return sha1, wrapErr(err)
|
||||
}
|
||||
|
||||
func NewQueueOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *DelayQueue {
|
||||
rc := &redisClusterWrapper{
|
||||
inner: cli,
|
||||
|
Reference in New Issue
Block a user