diff --git a/README.md b/README.md index b73ffca..7071641 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,17 @@ WithFetchLimit(limit uint) WithFetchLimit limits the max number of unack (processing) messages +```go +UseHashTagKey() +``` + +UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. + +If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, you should add this option to NewQueue: `NewQueue("test", redisCli, cb, UseHashTagKey())`. This Option cannot be changed after DelayQueue has been created. + +WARNING! CHANGING(add or remove) this option will cause DelayQueue failing to read existed data in redis + +> see more: https://redis.io/docs/reference/cluster-spec/#hash-tags ```go WithDefaultRetryCount(count uint) diff --git a/delayqueue.go b/delayqueue.go index 5ab9e4d..9d1c316 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -35,9 +35,19 @@ type DelayQueue struct { concurrent uint } +type hashTagKeyOpt int + +// UseHashTagKey add hashtags to redis keys to ensure all keys of this queue are allocated in the same hash slot. +// If you are using Codis/AliyunRedisCluster/TencentCloudRedisCluster, add this option to NewQueue +// WARNING! Changing (add or remove) this option will cause DelayQueue failing to read existed data in redis +// see more: https://redis.io/docs/reference/cluster-spec/#hash-tags +func UseHashTagKey() interface{} { + return hashTagKeyOpt(1) +} + // NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message // callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message -func NewQueue(name string, cli *redis.Client, callback func(string) bool) *DelayQueue { +func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts ...interface{}) *DelayQueue { if name == "" { panic("name is required") } @@ -47,16 +57,29 @@ func NewQueue(name string, cli *redis.Client, callback func(string) bool) *Delay if callback == nil { panic("callback is required") } + useHashTag := false + for _, opt := range opts { + switch opt.(type) { + case hashTagKeyOpt: + useHashTag = true + } + } + var keyPrefix string + if useHashTag { + keyPrefix = "{dp:" + name + "}" + } else { + keyPrefix = "dp:" + name + } return &DelayQueue{ name: name, redisCli: cli, cb: callback, - pendingKey: "dp:" + name + ":pending", - readyKey: "dp:" + name + ":ready", - unAckKey: "dp:" + name + ":unack", - retryKey: "dp:" + name + ":retry", - retryCountKey: "dp:" + name + ":retry:cnt", - garbageKey: "dp:" + name + ":garbage", + pendingKey: keyPrefix + ":pending", + readyKey: keyPrefix + ":ready", + unAckKey: keyPrefix + ":unack", + retryKey: keyPrefix + ":retry", + retryCountKey: keyPrefix + ":retry:cnt", + garbageKey: keyPrefix + ":garbage", close: make(chan struct{}, 1), maxConsumeDuration: 5 * time.Second, msgTTL: time.Hour, diff --git a/delayqueue_test.go b/delayqueue_test.go index 9d86f08..7f9ea0f 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -24,7 +24,7 @@ func TestDelayQueue_consume(t *testing.T) { i, _ := strconv.ParseInt(s, 10, 64) return i%2 == 0 } - queue := NewQueue("test", redisCli, cb). + queue := NewQueue("test", redisCli, cb, UseHashTagKey()). WithFetchInterval(time.Millisecond * 50). WithMaxConsumeDuration(0). WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).