mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-09-27 03:26:05 +08:00
add more tests
This commit is contained in:
@@ -514,7 +514,6 @@ func (q *DelayQueue) ack(idStr string) error {
|
||||
|
||||
func (q *DelayQueue) nack(idStr string) error {
|
||||
atomic.AddInt32(&q.fetchCount, -1)
|
||||
// update retry time as now, unack2Retry will move it to retry immediately
|
||||
err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
|
||||
idStr: float64(time.Now().Add(q.nackRedeliveryDelay).Unix()),
|
||||
})
|
||||
|
@@ -102,6 +102,7 @@ func TestDelayQueueOnCluster(t *testing.T) {
|
||||
}
|
||||
queue.afterConsume()
|
||||
}
|
||||
queue.garbageCollect()
|
||||
if succeed != size {
|
||||
t.Error("msg not consumed")
|
||||
}
|
||||
@@ -126,7 +127,8 @@ func TestDelayQueue_ConcurrentConsume(t *testing.T) {
|
||||
WithFetchInterval(time.Millisecond * 50).
|
||||
WithMaxConsumeDuration(0).
|
||||
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
|
||||
WithConcurrent(4)
|
||||
WithConcurrent(4).
|
||||
WithScriptPreload(false)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
|
||||
@@ -341,56 +343,6 @@ func TestDelayQueue_FetchLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayQueue_ScriptPreload(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
})
|
||||
redisCli.FlushDB(context.Background())
|
||||
size := 101 // use a prime number may found some hidden bugs ^_^
|
||||
retryCount := 3
|
||||
mu := sync.Mutex{}
|
||||
deliveryCount := make(map[string]int)
|
||||
cb := func(s string) bool {
|
||||
mu.Lock()
|
||||
deliveryCount[s]++
|
||||
mu.Unlock()
|
||||
return true
|
||||
}
|
||||
queue := NewQueue("test", redisCli, cb).
|
||||
WithFetchInterval(time.Millisecond * 50).
|
||||
WithMaxConsumeDuration(0).
|
||||
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
|
||||
WithConcurrent(4).
|
||||
WithScriptPreload(true)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount), WithMsgTTL(time.Hour))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
for i := 0; i < 2*size; i++ {
|
||||
if i == 2 {
|
||||
// random clean script cache
|
||||
redisCli.ScriptFlush(context.Background())
|
||||
}
|
||||
ids, err := queue.beforeConsume()
|
||||
if err != nil {
|
||||
t.Errorf("consume error: %v", err)
|
||||
return
|
||||
}
|
||||
for _, id := range ids {
|
||||
queue.callback(id)
|
||||
}
|
||||
queue.afterConsume()
|
||||
}
|
||||
for k, v := range deliveryCount {
|
||||
if v != 1 {
|
||||
t.Errorf("expect 1 delivery, actual %d. key: %s", v, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelayQueue_NackRedeliveryDelay(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
|
10
monitor.go
10
monitor.go
@@ -18,7 +18,7 @@ func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor {
|
||||
}
|
||||
}
|
||||
|
||||
// NewPublisher creates a new Publisher by a *redis.Client
|
||||
// NewMonitor creates a new Monitor by a *redis.Client
|
||||
func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor {
|
||||
rc := &redisV9Wrapper{
|
||||
inner: cli,
|
||||
@@ -26,6 +26,14 @@ func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor {
|
||||
return NewMonitor0(name, rc, opts...)
|
||||
}
|
||||
|
||||
// NewMonitor creates a new Monitor by a *redis.ClusterClient
|
||||
func NewMonitorOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *Monitor {
|
||||
rc := &redisClusterWrapper{
|
||||
inner: cli,
|
||||
}
|
||||
return NewMonitor0(name, rc, opts...)
|
||||
}
|
||||
|
||||
// WithLogger customizes logger for queue
|
||||
func (m *Monitor) WithLogger(logger *log.Logger) *Monitor {
|
||||
m.inner.logger = logger
|
||||
|
114
monitor_test.go
114
monitor_test.go
@@ -10,7 +10,7 @@ import (
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func TestMonitor_get_status(t *testing.T) {
|
||||
func TestMonitor_GetStatus(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
})
|
||||
@@ -72,6 +72,72 @@ func TestMonitor_get_status(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitor_Cluster_GetStatus(t *testing.T) {
|
||||
redisCli := redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: []string{
|
||||
"127.0.0.1:7000",
|
||||
"127.0.0.1:7001",
|
||||
"127.0.0.1:7002",
|
||||
},
|
||||
})
|
||||
redisCli.FlushDB(context.Background())
|
||||
size := 1000
|
||||
cb := func(s string) bool {
|
||||
return true
|
||||
}
|
||||
logger := log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)
|
||||
queue := NewQueueOnCluster("test", redisCli, cb)
|
||||
monitor := NewMonitorOnCluster("test", redisCli).WithLogger(logger)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// test pengding count
|
||||
pending, err := monitor.GetPendingCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if int(pending) != size {
|
||||
t.Errorf("execting %d, got %d", int(pending), size)
|
||||
return
|
||||
}
|
||||
|
||||
// test ready count
|
||||
err = queue.pending2Ready()
|
||||
if err != nil {
|
||||
t.Errorf("consume error: %v", err)
|
||||
return
|
||||
}
|
||||
ready, err := monitor.GetReadyCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if int(ready) != size {
|
||||
t.Errorf("execting %d, got %d", int(pending), size)
|
||||
return
|
||||
}
|
||||
|
||||
// test processing count
|
||||
for i := 0; i < size/2; i++ {
|
||||
_, _ = queue.ready2Unack()
|
||||
}
|
||||
processing, err := monitor.GetProcessingCount()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
if int(processing) != size/2 {
|
||||
t.Errorf("execting %d, got %d", int(pending), size/2)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type MyProfiler struct {
|
||||
ProduceCount int
|
||||
DeliverCount int
|
||||
@@ -137,6 +203,52 @@ func TestMonitor_listener1(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitor_Cluster_listener1(t *testing.T) {
|
||||
redisCli := redis.NewClusterClient(&redis.ClusterOptions{
|
||||
Addrs: []string{
|
||||
"127.0.0.1:7000",
|
||||
"127.0.0.1:7001",
|
||||
"127.0.0.1:7002",
|
||||
},
|
||||
})
|
||||
redisCli.FlushDB(context.Background())
|
||||
size := 1000
|
||||
cb := func(s string) bool {
|
||||
return true
|
||||
}
|
||||
queue := NewQueueOnCluster("test", redisCli, cb)
|
||||
queue.EnableReport()
|
||||
monitor := NewMonitorOnCluster("test", redisCli)
|
||||
profile := &MyProfiler{}
|
||||
monitor.ListenEvent(profile)
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
err := queue.SendDelayMsg(strconv.Itoa(i), 0)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
ids, err := queue.beforeConsume()
|
||||
if err != nil {
|
||||
t.Errorf("consume error: %v", err)
|
||||
return
|
||||
}
|
||||
for _, id := range ids {
|
||||
queue.callback(id)
|
||||
}
|
||||
queue.afterConsume()
|
||||
|
||||
if profile.ProduceCount != size {
|
||||
t.Error("wrong produce count")
|
||||
}
|
||||
if profile.DeliverCount != size {
|
||||
t.Error("wrong deliver count")
|
||||
}
|
||||
if profile.ConsumeCount != size {
|
||||
t.Error("wrong consume count")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMonitor_listener2(t *testing.T) {
|
||||
redisCli := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1:6379",
|
||||
|
Reference in New Issue
Block a user