diff --git a/.gitignore b/.gitignore index 0833f81..fd56221 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ # Dependency directories (remove the comment below to include it) # vendor/ .idea -unack2retry.lua \ No newline at end of file +unack2retry.lua +exmaple \ No newline at end of file diff --git a/README.md b/README.md index dd38d3d..b73ffca 100644 --- a/README.md +++ b/README.md @@ -8,12 +8,20 @@ DelayQueue is a message queue supporting delayed/scheduled delivery based on redis. -DelayQueue guarantees to deliver at least once. - DelayQueue support ACK/Retry mechanism, it will re-deliver message after a while as long as no confirmation is received. -As long as Redis doesn't crash, consumer crashes won't cause message loss. +As long as Redis doesn't crash, consumer crashes won't cause message loss. -## Example +DelayQueue can works safely in a distributed environment, you could deliver message to same queue or consume message from same queue at multiple machines. + +## Install + +DelayQueue requires a Go version with modules support. Run following command line in your project with go.mod: + +``` +go get github.com/hdt3213/delayqueue +``` + +## Get Started ```go package main @@ -33,7 +41,7 @@ func main() { // callback returns true to confirm successful consumption. // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message return true - }) + }).WithConcurrent(4) // set the number of concurrent consumers // send delay message for i := 0; i < 10; i++ { err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3)) @@ -54,21 +62,28 @@ func main() { } ``` -## options +## Options -``` +```go WithLogger(logger *log.Logger) ``` WithLogger customizes logger for queue + +```go +WithConcurrent(c uint) ``` + +WithConcurrent sets the number of concurrent consumers + +```go WithFetchInterval(d time.Duration) ``` WithFetchInterval customizes the interval at which consumer fetch message from redis -``` +```go WithMaxConsumeDuration(d time.Duration) ``` @@ -77,17 +92,29 @@ WithMaxConsumeDuration customizes max consume duration If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this message again -``` +```go WithFetchLimit(limit uint) ``` -WithFetchLimit limits the max number of messages at one time +WithFetchLimit limits the max number of unack (processing) messages -``` +```go WithDefaultRetryCount(count uint) ``` WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue -use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message \ No newline at end of file +use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message + +# More Details + +Here is the complete flowchart: + +![](https://s2.loli.net/2022/09/10/tziHmcAX4sFJPN6.png) + +- pending: A sorted set of messages pending for delivery. `member` is message id, `score` is delivery unix timestamp. +- ready: A list of messages ready to deliver. Workers fetch messages from here. +- unack: A sorted set of messages waiting for ack (successfully consumed confirmation) which means the messages here is being processing. `member` is message id, `score` is the unix timestamp of processing deadline. +- retry: A list of messages which processing exceeded deadline and waits for retry +- garbage: A list of messages reaching max retry count and waits for cleaning \ No newline at end of file diff --git a/delayqueue.go b/delayqueue.go index 06b8a73..b62c932 100644 --- a/delayqueue.go +++ b/delayqueue.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/uuid" "log" - "math" + "sync" "time" ) @@ -31,6 +31,8 @@ type DelayQueue struct { defaultRetryCount uint fetchInterval time.Duration fetchLimit uint + + concurrent uint } // NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message @@ -61,7 +63,7 @@ func NewQueue(name string, cli *redis.Client, callback func(string) bool) *Delay logger: log.Default(), defaultRetryCount: 3, fetchInterval: time.Second, - fetchLimit: math.MaxInt32, + concurrent: 1, } } @@ -84,12 +86,21 @@ func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue { return q } -// WithFetchLimit limits the max number of messages at one time +// WithFetchLimit limits the max number of unack (processing) messages func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue { q.fetchLimit = limit return q } +// WithConcurrent sets the number of concurrent consumers +func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue { + if c == 0 { + return q + } + q.concurrent = c + return q +} + // WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue // use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue { @@ -219,17 +230,60 @@ func (q *DelayQueue) retry2Unack() (string, error) { return str, nil } -func (q *DelayQueue) callback(idStr string) (bool, error) { +func (q *DelayQueue) callback(idStr string) error { ctx := context.Background() payload, err := q.redisCli.Get(ctx, q.genMsgKey(idStr)).Result() if err == redis.Nil { - return true, nil + return nil } if err != nil { // Is an IO error? - return false, fmt.Errorf("get message payload failed: %v", err) + return fmt.Errorf("get message payload failed: %v", err) } - return q.cb(payload), nil + ack := q.cb(payload) + if ack { + err = q.ack(idStr) + } else { + err = q.nack(idStr) + } + return err +} + +// batchCallback calls DelayQueue.callback in batch. callback is executed concurrently according to property DelayQueue.concurrent +// batchCallback must wait all callback finished, otherwise the actual number of processing messages may beyond DelayQueue.FetchLimit +func (q *DelayQueue) batchCallback(ids []string) { + if len(ids) == 1 || q.concurrent == 1 { + for _, id := range ids { + err := q.callback(id) + if err != nil { + q.logger.Printf("consume msg %s failed: %v", id, err) + } + } + return + } + ch := make(chan string, len(ids)) + for _, id := range ids { + ch <- id + } + close(ch) + wg := sync.WaitGroup{} + concurrent := int(q.concurrent) + if concurrent > len(ids) { // too many goroutines is no use + concurrent = len(ids) + } + wg.Add(concurrent) + for i := 0; i < concurrent; i++ { + go func() { + defer wg.Done() + for id := range ch { + err := q.callback(id) + if err != nil { + q.logger.Printf("consume msg %s failed: %v", id, err) + } + } + }() + } + wg.Wait() } func (q *DelayQueue) ack(idStr string) error { @@ -323,7 +377,7 @@ func (q *DelayQueue) consume() error { return err } // consume - var fetchCount uint + ids := make([]string, 0, q.fetchLimit) for { idStr, err := q.ready2Unack() if err == redis.Nil { // consumed all @@ -332,23 +386,14 @@ func (q *DelayQueue) consume() error { if err != nil { return err } - fetchCount++ - ack, err := q.callback(idStr) - if err != nil { - return err - } - if ack { - err = q.ack(idStr) - } else { - err = q.nack(idStr) - } - if err != nil { - return err - } - if fetchCount >= q.fetchLimit { + ids = append(ids, idStr) + if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) { break } } + if len(ids) > 0 { + q.batchCallback(ids) + } // unack to retry err = q.unack2Retry() if err != nil { @@ -359,7 +404,7 @@ func (q *DelayQueue) consume() error { return err } // retry - fetchCount = 0 + ids = make([]string, 0, q.fetchLimit) for { idStr, err := q.retry2Unack() if err == redis.Nil { // consumed all @@ -368,23 +413,14 @@ func (q *DelayQueue) consume() error { if err != nil { return err } - fetchCount++ - ack, err := q.callback(idStr) - if err != nil { - return err - } - if ack { - err = q.ack(idStr) - } else { - err = q.nack(idStr) - } - if err != nil { - return err - } - if fetchCount >= q.fetchLimit { + ids = append(ids, idStr) + if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) { break } } + if len(ids) > 0 { + q.batchCallback(ids) + } return nil } @@ -406,14 +442,14 @@ func (q *DelayQueue) StartConsume() (done <-chan struct{}) { break tickerLoop } } - done0 <- struct{}{} + close(done0) }() return done0 } // StopConsume stops consumer goroutine func (q *DelayQueue) StopConsume() { - q.close <- struct{}{} + close(q.close) if q.ticker != nil { q.ticker.Stop() } diff --git a/delayqueue_test.go b/delayqueue_test.go index a09b336..593916c 100644 --- a/delayqueue_test.go +++ b/delayqueue_test.go @@ -6,6 +6,7 @@ import ( "log" "os" "strconv" + "sync" "testing" "time" ) @@ -56,6 +57,47 @@ func TestDelayQueue_consume(t *testing.T) { } } +func TestDelayQueue_ConcurrentConsume(t *testing.T) { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + redisCli.FlushDB(context.Background()) + size := 101 // use a prime number may found some hidden bugs ^_^ + retryCount := 3 + mu := sync.Mutex{} + deliveryCount := make(map[string]int) + cb := func(s string) bool { + mu.Lock() + deliveryCount[s]++ + mu.Unlock() + return true + } + queue := NewQueue("test", redisCli, cb). + WithFetchInterval(time.Millisecond * 50). + WithMaxConsumeDuration(0). + WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)). + WithConcurrent(4) + + for i := 0; i < size; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount)) + if err != nil { + t.Error(err) + } + } + for i := 0; i < 2*size; i++ { + err := queue.consume() + if err != nil { + t.Errorf("consume error: %v", err) + return + } + } + for k, v := range deliveryCount { + if v != 1 { + t.Errorf("expect 1 delivery, actual %d. key: %s", v, k) + } + } +} + func TestDelayQueue_StopConsume(t *testing.T) { size := 10 redisCli := redis.NewClient(&redis.Options{ diff --git a/exmaple/main.go b/exmaple/main.go new file mode 100644 index 0000000..b41f122 --- /dev/null +++ b/exmaple/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "github.com/go-redis/redis/v8" + "github.com/hdt3213/delayqueue" + "strconv" + "time" +) + +func main() { + redisCli := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1:6379", + }) + queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool { + // callback returns true to confirm successful consumption. + // If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message + println(payload) + return true + }).WithConcurrent(4) + // send delay message + for i := 0; i < 10; i++ { + err := queue.SendDelayMsg(strconv.Itoa(i), time.Second, delayqueue.WithRetryCount(3)) + if err != nil { + panic(err) + } + } + // send schedule message + for i := 0; i < 10; i++ { + err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Second)) + if err != nil { + panic(err) + } + } + // start consume + done := queue.StartConsume() + <-done +} diff --git a/go.mod b/go.mod index 3dbd2ad..cd2d310 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,6 @@ module github.com/hdt3213/delayqueue go 1.16 require ( - github.com/go-redis/redis/v8 v8.11.4 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/go-redis/redis/v8 v8.11.4 + github.com/google/uuid v1.3.0 ) diff --git a/go.sum b/go.sum index 9253501..f3e2acb 100644 --- a/go.sum +++ b/go.sum @@ -5,9 +5,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= -github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= @@ -29,12 +28,15 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -49,6 +51,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -63,10 +66,12 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -85,8 +90,10 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=