mirror of
https://github.com/HDT3213/delayqueue.git
synced 2025-09-27 19:42:08 +08:00
use uuid instead of auto increment id;negative ack
This commit is contained in:
@@ -4,9 +4,9 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
|
"github.com/google/uuid"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -22,7 +22,6 @@ type DelayQueue struct {
|
|||||||
retryKey string // list
|
retryKey string // list
|
||||||
retryCountKey string // hash: message id -> remain retry count
|
retryCountKey string // hash: message id -> remain retry count
|
||||||
garbageKey string // set: message id
|
garbageKey string // set: message id
|
||||||
idGenKey string // id generator
|
|
||||||
ticker *time.Ticker
|
ticker *time.Ticker
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
close chan struct{}
|
close chan struct{}
|
||||||
@@ -56,7 +55,6 @@ func NewQueue(name string, cli *redis.Client, callback func(string) bool) *Delay
|
|||||||
retryKey: "dp:" + name + ":retry",
|
retryKey: "dp:" + name + ":retry",
|
||||||
retryCountKey: "dp:" + name + ":retry:cnt",
|
retryCountKey: "dp:" + name + ":retry:cnt",
|
||||||
garbageKey: "dp:" + name + ":garbage",
|
garbageKey: "dp:" + name + ":garbage",
|
||||||
idGenKey: "dp:" + name + ":id_gen",
|
|
||||||
close: make(chan struct{}, 1),
|
close: make(chan struct{}, 1),
|
||||||
maxConsumeDuration: 5 * time.Second,
|
maxConsumeDuration: 5 * time.Second,
|
||||||
msgTTL: time.Hour,
|
msgTTL: time.Hour,
|
||||||
@@ -103,22 +101,6 @@ func (q *DelayQueue) genMsgKey(idStr string) string {
|
|||||||
return "dp:" + q.name + ":msg:" + idStr
|
return "dp:" + q.name + ":msg:" + idStr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *DelayQueue) genId() (uint32, error) {
|
|
||||||
ctx := context.Background()
|
|
||||||
id, err := q.redisCli.Incr(ctx, q.idGenKey).Result()
|
|
||||||
if err != nil && err.Error() == "ERR increment or decrement would overflow" {
|
|
||||||
err = q.redisCli.Set(ctx, q.idGenKey, 1, 0).Err()
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("reset id gen failed: %v", err)
|
|
||||||
}
|
|
||||||
return 1, nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("incr id gen failed: %v", err)
|
|
||||||
}
|
|
||||||
return uint32(id), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type retryCountOpt int
|
type retryCountOpt int
|
||||||
|
|
||||||
// WithRetryCount set retry count for a msg
|
// WithRetryCount set retry count for a msg
|
||||||
@@ -138,16 +120,12 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// generate id
|
// generate id
|
||||||
id, err := q.genId()
|
idStr := uuid.Must(uuid.NewRandom()).String()
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
idStr := strconv.FormatUint(uint64(id), 10)
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
// store msg
|
// store msg
|
||||||
msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
|
msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
|
||||||
err = q.redisCli.Set(ctx, q.genMsgKey(idStr), payload, msgTTL).Err()
|
err := q.redisCli.Set(ctx, q.genMsgKey(idStr), payload, msgTTL).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("store msg failed: %v", err)
|
return fmt.Errorf("store msg failed: %v", err)
|
||||||
}
|
}
|
||||||
@@ -157,7 +135,7 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf
|
|||||||
return fmt.Errorf("store retry count failed: %v", err)
|
return fmt.Errorf("store retry count failed: %v", err)
|
||||||
}
|
}
|
||||||
// put to pending
|
// put to pending
|
||||||
err = q.redisCli.ZAdd(ctx, q.pendingKey, &redis.Z{Score: float64(t.Unix()), Member: id}).Err()
|
err = q.redisCli.ZAdd(ctx, q.pendingKey, &redis.Z{Score: float64(t.Unix()), Member: idStr}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("push to pending failed: %v", err)
|
return fmt.Errorf("push to pending failed: %v", err)
|
||||||
}
|
}
|
||||||
@@ -264,6 +242,19 @@ func (q *DelayQueue) ack(idStr string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *DelayQueue) nack(idStr string) error {
|
||||||
|
ctx := context.Background()
|
||||||
|
// update retry time as now, unack2Retry will move it to retry immediately
|
||||||
|
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
|
||||||
|
Member: idStr,
|
||||||
|
Score: float64(time.Now().Unix()),
|
||||||
|
}).Err()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("negative ack failed: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
|
// unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0,
|
||||||
// and moves messages from unack to garbage which retry count is 0
|
// and moves messages from unack to garbage which retry count is 0
|
||||||
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
|
// Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval
|
||||||
@@ -324,7 +315,7 @@ func (q *DelayQueue) garbageCollect() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (q *DelayQueue) consume() error {
|
func (q *DelayQueue) consume() error {
|
||||||
// pending2ready
|
// pending to ready
|
||||||
err := q.pending2Ready()
|
err := q.pending2Ready()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -346,9 +337,11 @@ func (q *DelayQueue) consume() error {
|
|||||||
}
|
}
|
||||||
if ack {
|
if ack {
|
||||||
err = q.ack(idStr)
|
err = q.ack(idStr)
|
||||||
if err != nil {
|
} else {
|
||||||
return err
|
err = q.nack(idStr)
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
if fetchCount >= q.fetchLimit {
|
if fetchCount >= q.fetchLimit {
|
||||||
break
|
break
|
||||||
@@ -380,9 +373,11 @@ func (q *DelayQueue) consume() error {
|
|||||||
}
|
}
|
||||||
if ack {
|
if ack {
|
||||||
err = q.ack(idStr)
|
err = q.ack(idStr)
|
||||||
if err != nil {
|
} else {
|
||||||
return err
|
err = q.nack(idStr)
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
if fetchCount >= q.fetchLimit {
|
if fetchCount >= q.fetchLimit {
|
||||||
break
|
break
|
||||||
|
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -81,24 +80,3 @@ func TestDelayQueue_StopConsume(t *testing.T) {
|
|||||||
done := queue.StartConsume()
|
done := queue.StartConsume()
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIDOverflow(t *testing.T) {
|
|
||||||
redisCli := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "127.0.0.1:6379",
|
|
||||||
})
|
|
||||||
redisCli.FlushDB(context.Background())
|
|
||||||
queue := NewQueue("test", redisCli, func(s string) bool {
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
err := redisCli.Set(context.Background(), queue.idGenKey, math.MaxInt64, 0).Err()
|
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
_, err := queue.genId()
|
|
||||||
if err != nil {
|
|
||||||
t.Error("id gen error")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
1
go.mod
1
go.mod
@@ -4,4 +4,5 @@ go 1.16
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/go-redis/redis/v8 v8.11.4 // indirect
|
github.com/go-redis/redis/v8 v8.11.4 // indirect
|
||||||
|
github.com/google/uuid v1.3.0 // indirect
|
||||||
)
|
)
|
||||||
|
2
go.sum
2
go.sum
@@ -25,6 +25,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
|||||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
|
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||||
|
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||||
|
Reference in New Issue
Block a user