diff --git a/delayqueue.go b/delayqueue.go index 2d56893..90d257d 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -4,9 +4,9 @@ import ( "context" "fmt" "github.com/go-redis/redis/v8" + "github.com/google/uuid" "log" "math" - "strconv" "time" ) @@ -22,7 +22,6 @@ type DelayQueue struct { retryKey string // list retryCountKey string // hash: message id -> remain retry count garbageKey string // set: message id - idGenKey string // id generator ticker *time.Ticker logger *log.Logger close chan struct{} @@ -56,7 +55,6 @@ func NewQueue(name string, cli *redis.Client, callback func(string) bool) *Delay retryKey: "dp:" + name + ":retry", retryCountKey: "dp:" + name + ":retry:cnt", garbageKey: "dp:" + name + ":garbage", - idGenKey: "dp:" + name + ":id_gen", close: make(chan struct{}, 1), maxConsumeDuration: 5 * time.Second, msgTTL: time.Hour, @@ -103,22 +101,6 @@ func (q *DelayQueue) genMsgKey(idStr string) string { return "dp:" + q.name + ":msg:" + idStr } -func (q *DelayQueue) genId() (uint32, error) { - ctx := context.Background() - id, err := q.redisCli.Incr(ctx, q.idGenKey).Result() - if err != nil && err.Error() == "ERR increment or decrement would overflow" { - err = q.redisCli.Set(ctx, q.idGenKey, 1, 0).Err() - if err != nil { - return 0, fmt.Errorf("reset id gen failed: %v", err) - } - return 1, nil - } - if err != nil { - return 0, fmt.Errorf("incr id gen failed: %v", err) - } - return uint32(id), nil -} - type retryCountOpt int // WithRetryCount set retry count for a msg @@ -138,16 +120,12 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf } } // generate id - id, err := q.genId() - if err != nil { - return err - } - idStr := strconv.FormatUint(uint64(id), 10) + idStr := uuid.Must(uuid.NewRandom()).String() ctx := context.Background() now := time.Now() // store msg msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL - err = q.redisCli.Set(ctx, q.genMsgKey(idStr), payload, msgTTL).Err() + err := q.redisCli.Set(ctx, q.genMsgKey(idStr), payload, msgTTL).Err() if err != nil { return fmt.Errorf("store msg failed: %v", err) } @@ -157,7 +135,7 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf return fmt.Errorf("store retry count failed: %v", err) } // put to pending - err = q.redisCli.ZAdd(ctx, q.pendingKey, &redis.Z{Score: float64(t.Unix()), Member: id}).Err() + err = q.redisCli.ZAdd(ctx, q.pendingKey, &redis.Z{Score: float64(t.Unix()), Member: idStr}).Err() if err != nil { return fmt.Errorf("push to pending failed: %v", err) } @@ -264,6 +242,19 @@ func (q *DelayQueue) ack(idStr string) error { return nil } +func (q *DelayQueue) nack(idStr string) error { + ctx := context.Background() + // update retry time as now, unack2Retry will move it to retry immediately + err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{ + Member: idStr, + Score: float64(time.Now().Unix()), + }).Err() + if err != nil { + return fmt.Errorf("negative ack failed: %v", err) + } + return nil +} + // unack2RetryScript atomically moves messages from unack to retry which remaining retry count greater than 0, // and moves messages from unack to garbage which retry count is 0 // Because DelayQueue cannot determine garbage message before eval unack2RetryScript, so it cannot pass keys parameter to redisCli.Eval @@ -324,7 +315,7 @@ func (q *DelayQueue) garbageCollect() error { } func (q *DelayQueue) consume() error { - // pending2ready + // pending to ready err := q.pending2Ready() if err != nil { return err @@ -346,9 +337,11 @@ func (q *DelayQueue) consume() error { } if ack { err = q.ack(idStr) - if err != nil { - return err - } + } else { + err = q.nack(idStr) + } + if err != nil { + return err } if fetchCount >= q.fetchLimit { break @@ -380,9 +373,11 @@ func (q *DelayQueue) consume() error { } if ack { err = q.ack(idStr) - if err != nil { - return err - } + } else { + err = q.nack(idStr) + } + if err != nil { + return err } if fetchCount >= q.fetchLimit { break diff --git a/delayqueue_test.go b/delayqueue_test.go index dcf66b1..60d1425 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -4,7 +4,6 @@ import ( "context" "github.com/go-redis/redis/v8" "log" - "math" "os" "strconv" "testing" @@ -81,24 +80,3 @@ func TestDelayQueue_StopConsume(t *testing.T) { done := queue.StartConsume() <-done } - -func TestIDOverflow(t *testing.T) { - redisCli := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1:6379", - }) - redisCli.FlushDB(context.Background()) - queue := NewQueue("test", redisCli, func(s string) bool { - return true - }) - err := redisCli.Set(context.Background(), queue.idGenKey, math.MaxInt64, 0).Err() - if err != nil { - t.Error(err) - } - for i := 0; i < 10; i++ { - _, err := queue.genId() - if err != nil { - t.Error("id gen error") - return - } - } -} diff --git a/go.mod b/go.mod index f70872d..3dbd2ad 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,5 @@ go 1.16 require ( github.com/go-redis/redis/v8 v8.11.4 // indirect + github.com/google/uuid v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index 1f09bd6..9253501 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=