From 8dd18cdb150cbb5afc45bf9c95e14726c4fd13f5 Mon Sep 17 00:00:00 2001 From: finley Date: Tue, 28 Jan 2025 19:33:47 +0800 Subject: [PATCH] add more tests --- delayqueue.go | 1 - delayqueue_test.go | 54 ++------------------- monitor.go | 10 +++- monitor_test.go | 114 ++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 125 insertions(+), 54 deletions(-) diff --git a/delayqueue.go b/delayqueue.go index 6d11bf1..a3ebd10 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -514,7 +514,6 @@ func (q *DelayQueue) ack(idStr string) error { func (q *DelayQueue) nack(idStr string) error { atomic.AddInt32(&q.fetchCount, -1) - // update retry time as now, unack2Retry will move it to retry immediately err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{ idStr: float64(time.Now().Add(q.nackRedeliveryDelay).Unix()), }) diff --git a/delayqueue_test.go b/delayqueue_test.go index 846e4f6..7ed43cd 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -102,6 +102,7 @@ func TestDelayQueueOnCluster(t *testing.T) { } queue.afterConsume() } + queue.garbageCollect() if succeed != size { t.Error("msg not consumed") } @@ -126,7 +127,8 @@ func TestDelayQueue_ConcurrentConsume(t *testing.T) { WithFetchInterval(time.Millisecond * 50). WithMaxConsumeDuration(0). WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)). - WithConcurrent(4) + WithConcurrent(4). + WithScriptPreload(false) for i := 0; i < size; i++ { err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour)) @@ -341,56 +343,6 @@ func TestDelayQueue_FetchLimit(t *testing.T) { } } -func TestDelayQueue_ScriptPreload(t *testing.T) { - redisCli := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1:6379", - }) - redisCli.FlushDB(context.Background()) - size := 101 // use a prime number may found some hidden bugs ^_^ - retryCount := 3 - mu := sync.Mutex{} - deliveryCount := make(map[string]int) - cb := func(s string) bool { - mu.Lock() - deliveryCount[s]++ - mu.Unlock() - return true - } - queue := NewQueue("test", redisCli, cb). - WithFetchInterval(time.Millisecond * 50). - WithMaxConsumeDuration(0). - WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)). - WithConcurrent(4). - WithScriptPreload(true) - - for i := 0; i < size; i++ { - err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour)) - if err != nil { - t.Error(err) - } - } - for i := 0; i < 2*size; i++ { - if i == 2 { - // random clean script cache - redisCli.ScriptFlush(context.Background()) - } - ids, err := queue.beforeConsume() - if err != nil { - t.Errorf("consume error: %v", err) - return - } - for _, id := range ids { - queue.callback(id) - } - queue.afterConsume() - } - for k, v := range deliveryCount { - if v != 1 { - t.Errorf("expect 1 delivery, actual %d. key: %s", v, k) - } - } -} - func TestDelayQueue_NackRedeliveryDelay(t *testing.T) { redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", diff --git a/monitor.go b/monitor.go index f9f7102..6c6cf32 100644 --- a/monitor.go +++ b/monitor.go @@ -18,7 +18,7 @@ func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor { } } -// NewPublisher creates a new Publisher by a *redis.Client +// NewMonitor creates a new Monitor by a *redis.Client func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor { rc := &redisV9Wrapper{ inner: cli, @@ -26,6 +26,14 @@ func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor { return NewMonitor0(name, rc, opts...) } +// NewMonitor creates a new Monitor by a *redis.ClusterClient +func NewMonitorOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *Monitor { + rc := &redisClusterWrapper{ + inner: cli, + } + return NewMonitor0(name, rc, opts...) +} + // WithLogger customizes logger for queue func (m *Monitor) WithLogger(logger *log.Logger) *Monitor { m.inner.logger = logger diff --git a/monitor_test.go b/monitor_test.go index 445fc01..61aa732 100644 --- a/monitor_test.go +++ b/monitor_test.go @@ -10,7 +10,7 @@ import ( "github.com/redis/go-redis/v9" ) -func TestMonitor_get_status(t *testing.T) { +func TestMonitor_GetStatus(t *testing.T) { redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) @@ -72,6 +72,72 @@ func TestMonitor_get_status(t *testing.T) { } } +func TestMonitor_Cluster_GetStatus(t *testing.T) { + redisCli := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{ + "127.0.0.1:7000", + "127.0.0.1:7001", + "127.0.0.1:7002", + }, + }) + redisCli.FlushDB(context.Background()) + size := 1000 + cb := func(s string) bool { + return true + } + logger := log.New(os.Stderr, "[DelayQueue]", log.LstdFlags) + queue := NewQueueOnCluster("test", redisCli, cb) + monitor := NewMonitorOnCluster("test", redisCli).WithLogger(logger) + + for i := 0; i < size; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), 0) + if err != nil { + t.Error(err) + } + } + + // test pengding count + pending, err := monitor.GetPendingCount() + if err != nil { + t.Error(err) + return + } + if int(pending) != size { + t.Errorf("execting %d, got %d", int(pending), size) + return + } + + // test ready count + err = queue.pending2Ready() + if err != nil { + t.Errorf("consume error: %v", err) + return + } + ready, err := monitor.GetReadyCount() + if err != nil { + t.Error(err) + return + } + if int(ready) != size { + t.Errorf("execting %d, got %d", int(pending), size) + return + } + + // test processing count + for i := 0; i < size/2; i++ { + _, _ = queue.ready2Unack() + } + processing, err := monitor.GetProcessingCount() + if err != nil { + t.Error(err) + return + } + if int(processing) != size/2 { + t.Errorf("execting %d, got %d", int(pending), size/2) + return + } +} + type MyProfiler struct { ProduceCount int DeliverCount int @@ -137,6 +203,52 @@ func TestMonitor_listener1(t *testing.T) { } } +func TestMonitor_Cluster_listener1(t *testing.T) { + redisCli := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{ + "127.0.0.1:7000", + "127.0.0.1:7001", + "127.0.0.1:7002", + }, + }) + redisCli.FlushDB(context.Background()) + size := 1000 + cb := func(s string) bool { + return true + } + queue := NewQueueOnCluster("test", redisCli, cb) + queue.EnableReport() + monitor := NewMonitorOnCluster("test", redisCli) + profile := &MyProfiler{} + monitor.ListenEvent(profile) + + for i := 0; i < size; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), 0) + if err != nil { + t.Error(err) + } + } + ids, err := queue.beforeConsume() + if err != nil { + t.Errorf("consume error: %v", err) + return + } + for _, id := range ids { + queue.callback(id) + } + queue.afterConsume() + + if profile.ProduceCount != size { + t.Error("wrong produce count") + } + if profile.DeliverCount != size { + t.Error("wrong deliver count") + } + if profile.ConsumeCount != size { + t.Error("wrong consume count") + } +} + func TestMonitor_listener2(t *testing.T) { redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379",