support concurrent consume

This commit is contained in:
hdt3213
2022-09-10 23:12:53 +08:00
parent 52cae62cf5
commit f93ae51a6d
7 changed files with 206 additions and 56 deletions

1
.gitignore vendored
View File

@@ -15,3 +15,4 @@
# vendor/
.idea
unack2retry.lua
exmaple

View File

@@ -8,12 +8,20 @@
DelayQueue is a message queue supporting delayed/scheduled delivery based on redis.
DelayQueue guarantees to deliver at least once.
DelayQueue support ACK/Retry mechanism, it will re-deliver message after a while as long as no confirmation is received.
As long as Redis doesn't crash, consumer crashes won't cause message loss.
## Example
DelayQueue can works safely in a distributed environment, you could deliver message to same queue or consume message from same queue at multiple machines.
## Install
DelayQueue requires a Go version with modules support. Run following command line in your project with go.mod:
```
go get github.com/hdt3213/delayqueue
```
## Get Started
```go
package main
@@ -33,7 +41,7 @@ func main() {
// callback returns true to confirm successful consumption.
// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
return true
})
}).WithConcurrent(4) // set the number of concurrent consumers
// send delay message
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
@@ -54,21 +62,28 @@ func main() {
}
```
## options
## Options
```
```go
WithLogger(logger *log.Logger)
```
WithLogger customizes logger for queue
```go
WithConcurrent(c uint)
```
WithConcurrent sets the number of concurrent consumers
```go
WithFetchInterval(d time.Duration)
```
WithFetchInterval customizes the interval at which consumer fetch message from redis
```
```go
WithMaxConsumeDuration(d time.Duration)
```
@@ -77,17 +92,29 @@ WithMaxConsumeDuration customizes max consume duration
If no acknowledge received within WithMaxConsumeDuration after message delivery, DelayQueue will try to deliver this
message again
```
```go
WithFetchLimit(limit uint)
```
WithFetchLimit limits the max number of messages at one time
WithFetchLimit limits the max number of unack (processing) messages
```
```go
WithDefaultRetryCount(count uint)
```
WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
# More Details
Here is the complete flowchart:
![](https://s2.loli.net/2022/09/10/tziHmcAX4sFJPN6.png)
- pending: A sorted set of messages pending for delivery. `member` is message id, `score` is delivery unix timestamp.
- ready: A list of messages ready to deliver. Workers fetch messages from here.
- unack: A sorted set of messages waiting for ack (successfully consumed confirmation) which means the messages here is being processing. `member` is message id, `score` is the unix timestamp of processing deadline.
- retry: A list of messages which processing exceeded deadline and waits for retry
- garbage: A list of messages reaching max retry count and waits for cleaning

View File

@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"log"
"math"
"sync"
"time"
)
@@ -31,6 +31,8 @@ type DelayQueue struct {
defaultRetryCount uint
fetchInterval time.Duration
fetchLimit uint
concurrent uint
}
// NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
@@ -61,7 +63,7 @@ func NewQueue(name string, cli *redis.Client, callback func(string) bool) *Delay
logger: log.Default(),
defaultRetryCount: 3,
fetchInterval: time.Second,
fetchLimit: math.MaxInt32,
concurrent: 1,
}
}
@@ -84,12 +86,21 @@ func (q *DelayQueue) WithMaxConsumeDuration(d time.Duration) *DelayQueue {
return q
}
// WithFetchLimit limits the max number of messages at one time
// WithFetchLimit limits the max number of unack (processing) messages
func (q *DelayQueue) WithFetchLimit(limit uint) *DelayQueue {
q.fetchLimit = limit
return q
}
// WithConcurrent sets the number of concurrent consumers
func (q *DelayQueue) WithConcurrent(c uint) *DelayQueue {
if c == 0 {
return q
}
q.concurrent = c
return q
}
// WithDefaultRetryCount customizes the max number of retry, it effects of messages in this queue
// use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message
func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
@@ -219,17 +230,60 @@ func (q *DelayQueue) retry2Unack() (string, error) {
return str, nil
}
func (q *DelayQueue) callback(idStr string) (bool, error) {
func (q *DelayQueue) callback(idStr string) error {
ctx := context.Background()
payload, err := q.redisCli.Get(ctx, q.genMsgKey(idStr)).Result()
if err == redis.Nil {
return true, nil
return nil
}
if err != nil {
// Is an IO error?
return false, fmt.Errorf("get message payload failed: %v", err)
return fmt.Errorf("get message payload failed: %v", err)
}
return q.cb(payload), nil
ack := q.cb(payload)
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
return err
}
// batchCallback calls DelayQueue.callback in batch. callback is executed concurrently according to property DelayQueue.concurrent
// batchCallback must wait all callback finished, otherwise the actual number of processing messages may beyond DelayQueue.FetchLimit
func (q *DelayQueue) batchCallback(ids []string) {
if len(ids) == 1 || q.concurrent == 1 {
for _, id := range ids {
err := q.callback(id)
if err != nil {
q.logger.Printf("consume msg %s failed: %v", id, err)
}
}
return
}
ch := make(chan string, len(ids))
for _, id := range ids {
ch <- id
}
close(ch)
wg := sync.WaitGroup{}
concurrent := int(q.concurrent)
if concurrent > len(ids) { // too many goroutines is no use
concurrent = len(ids)
}
wg.Add(concurrent)
for i := 0; i < concurrent; i++ {
go func() {
defer wg.Done()
for id := range ch {
err := q.callback(id)
if err != nil {
q.logger.Printf("consume msg %s failed: %v", id, err)
}
}
}()
}
wg.Wait()
}
func (q *DelayQueue) ack(idStr string) error {
@@ -323,7 +377,7 @@ func (q *DelayQueue) consume() error {
return err
}
// consume
var fetchCount uint
ids := make([]string, 0, q.fetchLimit)
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
@@ -332,23 +386,14 @@ func (q *DelayQueue) consume() error {
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
ids = append(ids, idStr)
if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
break
}
}
if len(ids) > 0 {
q.batchCallback(ids)
}
// unack to retry
err = q.unack2Retry()
if err != nil {
@@ -359,7 +404,7 @@ func (q *DelayQueue) consume() error {
return err
}
// retry
fetchCount = 0
ids = make([]string, 0, q.fetchLimit)
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
@@ -368,23 +413,14 @@ func (q *DelayQueue) consume() error {
if err != nil {
return err
}
fetchCount++
ack, err := q.callback(idStr)
if err != nil {
return err
}
if ack {
err = q.ack(idStr)
} else {
err = q.nack(idStr)
}
if err != nil {
return err
}
if fetchCount >= q.fetchLimit {
ids = append(ids, idStr)
if q.fetchLimit > 0 && len(ids) >= int(q.fetchLimit) {
break
}
}
if len(ids) > 0 {
q.batchCallback(ids)
}
return nil
}
@@ -406,14 +442,14 @@ func (q *DelayQueue) StartConsume() (done <-chan struct{}) {
break tickerLoop
}
}
done0 <- struct{}{}
close(done0)
}()
return done0
}
// StopConsume stops consumer goroutine
func (q *DelayQueue) StopConsume() {
q.close <- struct{}{}
close(q.close)
if q.ticker != nil {
q.ticker.Stop()
}

View File

@@ -6,6 +6,7 @@ import (
"log"
"os"
"strconv"
"sync"
"testing"
"time"
)
@@ -56,6 +57,47 @@ func TestDelayQueue_consume(t *testing.T) {
}
}
func TestDelayQueue_ConcurrentConsume(t *testing.T) {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
redisCli.FlushDB(context.Background())
size := 101 // use a prime number may found some hidden bugs ^_^
retryCount := 3
mu := sync.Mutex{}
deliveryCount := make(map[string]int)
cb := func(s string) bool {
mu.Lock()
deliveryCount[s]++
mu.Unlock()
return true
}
queue := NewQueue("test", redisCli, cb).
WithFetchInterval(time.Millisecond * 50).
WithMaxConsumeDuration(0).
WithLogger(log.New(os.Stderr, "[DelayQueue]", log.LstdFlags)).
WithConcurrent(4)
for i := 0; i < size; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), 0, WithRetryCount(retryCount))
if err != nil {
t.Error(err)
}
}
for i := 0; i < 2*size; i++ {
err := queue.consume()
if err != nil {
t.Errorf("consume error: %v", err)
return
}
}
for k, v := range deliveryCount {
if v != 1 {
t.Errorf("expect 1 delivery, actual %d. key: %s", v, k)
}
}
}
func TestDelayQueue_StopConsume(t *testing.T) {
size := 10
redisCli := redis.NewClient(&redis.Options{

37
exmaple/main.go Normal file
View File

@@ -0,0 +1,37 @@
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example", redisCli, func(payload string) bool {
// callback returns true to confirm successful consumption.
// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
println(payload)
return true
}).WithConcurrent(4)
// send delay message
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Second, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// send schedule message
for i := 0; i < 10; i++ {
err := queue.SendScheduleMsg(strconv.Itoa(i), time.Now().Add(time.Second))
if err != nil {
panic(err)
}
}
// start consume
done := queue.StartConsume()
<-done
}

4
go.mod
View File

@@ -3,6 +3,6 @@ module github.com/hdt3213/delayqueue
go 1.16
require (
github.com/go-redis/redis/v8 v8.11.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/go-redis/redis/v8 v8.11.4
github.com/google/uuid v1.3.0
)

11
go.sum
View File

@@ -5,9 +5,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg=
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
@@ -29,12 +28,15 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c=
github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -49,6 +51,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -63,10 +66,12 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
@@ -85,8 +90,10 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=