delayed queue

This commit is contained in:
zhuyasen
2023-10-25 00:52:08 +08:00
parent dd0d47b40a
commit fad717c58a
23 changed files with 2774 additions and 1956 deletions

View File

@@ -1,244 +1,362 @@
## rabbitmq ## rabbitmq
rabbitmq library wrapped in [github.com/rabbitmq/amqp091-go](github.com/rabbitmq/amqp091-go), supports automatic reconnection and customized setting of queue parameters. rabbitmq library wrapped in [github.com/rabbitmq/amqp091-go](github.com/rabbitmq/amqp091-go), supports automatic reconnection and customized setting parameters, includes `direct`, `topic`, `fanout`, `headers`, `delayed message`, `publisher subscriber` a total of six message types.
### Example of use ### Example of use
#### Consumer code #### Code Example
This is a consumer code example common to the four types direct, topic, fanout, and headers. The code example includes `direct`, `topic`, `fanout`, `headers`, `delayed message`, `publisher subscriber` a total of six message types.
```go ```go
package main package main
import ( import (
"context" "context"
"strings" "fmt"
"time"
"github.com/zhufuyi/sponge/pkg/logger" "github.com/zhufuyi/sponge/pkg/logger"
"github.com/zhufuyi/sponge/pkg/rabbitmq" "github.com/zhufuyi/sponge/pkg/rabbitmq"
"github.com/zhufuyi/sponge/pkg/rabbitmq/consumer"
)
var handler = func(ctx context.Context, data []byte, tag ...string) error {
tagID := strings.Join(tag, ",")
logger.Infof("tagID=%s, receive message: %s", tagID, string(data))
return nil
}
func main() {
url := rabbitmq.DefaultURL
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
if err != nil {
logger.Error("NewConnection err",logger.Err(err))
return
}
defer c.Close()
queue, err := consumer.NewQueue(context.Background(), "yourQueueName", c, consumer.WithConsumeAutoAck(false)) // here you can set the consume parameter
if err != nil {
logger.Error("NewQueue err",logger.Err(err))
return
}
queue.Consume(handler)
exit := make(chan struct{})
<-exit
}
```
<br>
#### Direct Type Code
```go
package main
import (
"context"
"github.com/zhufuyi/sponge/pkg/logger"
"github.com/zhufuyi/sponge/pkg/rabbitmq"
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
) )
func main() { func main() {
url := rabbitmq.DefaultURL url := "amqp://guest:guest@127.0.0.1:5672/"
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
if err != nil {
logger.Error("NewConnection err",logger.Err(err))
return
}
defer c.Close()
directExample(url)
topicExample(url)
fanoutExample(url)
headersExample(url)
delayedMessageExample(url)
publisherSubscriberExample(url)
}
func directExample(url string) {
exchangeName := "direct-exchange-demo" exchangeName := "direct-exchange-demo"
queueName := "direct-queue-1" queueName := "direct-queue-1"
routeKey := "direct-key-1" routeKey := "direct-key-1"
exchange := producer.NewDirectExchange(exchangeName, routeKey) exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter fmt.Printf("\n\n-------------------- direct --------------------\n")
if err != nil {
logger.Error("NewQueue err",logger.Err(err)) // producer-side direct message
return func() {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
err = p.PublishDirect(context.Background(), []byte("[direct] say hello"))
checkErr(err)
}()
// consumer-side direct message
func() {
runConsume(url, exchange, queueName)
}()
<-time.After(time.Second)
} }
defer q.Close()
err = q.Publish(context.Background(), []byte(routeKey+" say hello"))
if err != nil {
logger.Error("Publish err",logger.Err(err))
return
}
}
```
<br>
#### Topic Type Code
```go
package main
import (
"context"
"github.com/zhufuyi/sponge/pkg/logger"
"github.com/zhufuyi/sponge/pkg/rabbitmq"
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
)
func main() {
url := rabbitmq.DefaultURL
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
if err != nil {
logger.Error("NewConnection err",logger.Err(err))
return
}
defer c.Close()
func topicExample(url string) {
exchangeName := "topic-exchange-demo" exchangeName := "topic-exchange-demo"
queueName := "topic-queue-1" queueName := "topic-queue-1"
routingKey := "key1.key2.*" routingKey := "key1.key2.*"
exchange := producer.NewTopicExchange(exchangeName, routingKey) exchange := rabbitmq.NewTopicExchange(exchangeName, routingKey)
q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter fmt.Printf("\n\n-------------------- topic --------------------\n")
if err != nil {
logger.Error("NewQueue err",logger.Err(err)) // producer-side topic message
return func() {
} connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
defer q.Close() checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
key := "key1.key2.key3" key := "key1.key2.key3"
err = q.PublishTopic(context.Background(), key, []byte(key+" say hello ")) err = p.PublishTopic(context.Background(), key, []byte("[topic] "+key+" say hello"))
if err != nil { checkErr(err)
logger.Error("PublishTopic err",logger.Err(err)) }()
return
// consumer-side topic message
func() {
runConsume(url, exchange, queueName)
}()
<-time.After(time.Second)
} }
}
```
<br>
#### Fanout Type Code
```go
package main
import (
"context"
"github.com/zhufuyi/sponge/pkg/logger"
"github.com/zhufuyi/sponge/pkg/rabbitmq"
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
)
func main() {
url := rabbitmq.DefaultURL
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval
if err != nil {
logger.Error("NewConnection err",logger.Err(err))
return
}
defer c.Close()
func fanoutExample(url string) {
exchangeName := "fanout-exchange-demo" exchangeName := "fanout-exchange-demo"
queueName := "fanout-queue-1" queueName := "fanout-queue-1"
exchange := producer.NewFanOutExchange(exchangeName) exchange := rabbitmq.NewFanoutExchange(exchangeName)
q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter fmt.Printf("\n\n-------------------- fanout --------------------\n")
if err != nil {
logger.Error("NewQueue err",logger.Err(err))
return
}
defer q.Close()
err = q.Publish(context.Background(), []byte("say hello")) // producer-side fanout message
func() {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
err = p.PublishFanout(context.Background(), []byte("[fanout] say hello"))
checkErr(err)
}()
// consumer-side fanout message
func() {
runConsume(url, exchange, queueName)
queueName = "fanout-queue-2"
runConsume(url, exchange, queueName)
}()
<-time.After(time.Second)
}
func headersExample(url string) {
exchangeName := "headers-exchange-demo"
queueName := "headers-queue-1"
headersKeys := map[string]interface{}{"hello": "world", "foo": "bar"}
exchange := rabbitmq.NewHeadersExchange(exchangeName, rabbitmq.HeadersTypeAll, headersKeys) // all, you can set HeadersTypeAny type
fmt.Printf("\n\n-------------------- headers --------------------\n")
// producer-side headers message
func() {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
ctx := context.Background()
headersKeys1 := headersKeys
err = p.PublishHeaders(ctx, headersKeys1, []byte("[headers] say hello 1"))
checkErr(err)
headersKeys2 := map[string]interface{}{"foo": "bar"}
err = p.PublishHeaders(ctx, headersKeys2, []byte("[headers] say hello 2"))
checkErr(err)
}()
// consumer-side headers message
func() {
runConsume(url, exchange, queueName)
}()
<-time.After(time.Second)
}
func delayedMessageExample(url string) {
exchangeName := "delayed-message-exchange-demo"
queueName := "delayed-message-queue"
routingKey := "delayed-key"
exchange := rabbitmq.NewDelayedMessageExchange(exchangeName, rabbitmq.NewDirectExchange("", routingKey))
fmt.Printf("\n\n-------------------- delayed message --------------------\n")
// producer-side delayed message
func() {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
checkErr(err)
defer p.Close()
ctx := context.Background()
datetimeLayout := "2006-01-02 15:04:05.000"
err = p.PublishDelayedMessage(ctx, time.Second*3, []byte("[delayed message] say hello "+time.Now().Format(datetimeLayout)))
checkErr(err)
}()
// consumer-side delayed message
func() {
runConsume(url, exchange, queueName)
}()
<-time.After(time.Second * 4)
}
func publisherSubscriberExample(url string) {
channelName := "pub-sub"
fmt.Printf("\n\n-------------------- publisher subscriber --------------------\n")
// publisher-side message
func() {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
defer connection.Close()
p, err := rabbitmq.NewPublisher(channelName, connection)
checkErr(err)
defer p.Close()
err = p.Publish(context.Background(), []byte("[pub-sub] say hello"))
checkErr(err)
}()
// subscriber-side message
func() {
identifier := "pub-sub-queue-1"
runSubscriber(url, channelName, identifier)
identifier = "pub-sub-queue-2"
runSubscriber(url, channelName, identifier)
}()
<-time.After(time.Second)
}
func runConsume(url string, exchange *rabbitmq.Exchange, queueName string) {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
checkErr(err)
c.Consume(context.Background(), handler)
}
func runSubscriber(url string, channelName string, identifier string) {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
checkErr(err)
s, err := rabbitmq.NewSubscriber(channelName, identifier, connection, rabbitmq.WithConsumerAutoAck(false))
checkErr(err)
s.Subscribe(context.Background(), handler)
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
return nil
}
func checkErr(err error) {
if err != nil { if err != nil {
logger.Error("Publish err",logger.Err(err)) panic(err)
return
} }
} }
``` ```
<br> <br>
#### Headers Type Code #### Example of Automatic Resumption of Publish
If the error of publish is caused by the network, you can check if the reconnection is successful and publish it again.
```go ```go
package main package main
import ( import (
"context" "context"
"errors"
"strconv"
"time"
"github.com/zhufuyi/sponge/pkg/logger" "github.com/zhufuyi/sponge/pkg/logger"
"github.com/zhufuyi/sponge/pkg/rabbitmq" "github.com/zhufuyi/sponge/pkg/rabbitmq"
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
) )
var url = "amqp://guest:guest@127.0.0.1:5672/"
func main() { func main() {
url := rabbitmq.DefaultURL ctx, _ := context.WithTimeout(context.Background(), time.Hour)
c, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get())) // here you can set the connection parameters, such as tls, reconnect time interval exchangeName := "direct-exchange-demo"
queueName := "direct-queue"
routeKey := "info"
exchange := rabbitmq.NewDirectExchange(exchangeName, routeKey)
err := runConsume(ctx, exchange, queueName)
if err != nil { if err != nil {
logger.Error("NewConnection err",logger.Err(err)) logger.Error("runConsume failed", logger.Err(err))
return return
} }
defer c.Close()
err = runProduce(ctx, exchange, queueName)
exchangeName := "headers-exchange-demo"
// the message is only received if there is an exact match for headers
queueName := "headers-queue-1"
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
exchange := producer.NewHeaderExchange(exchangeName, producer.HeadersTypeAll, kv1)
q, err := producer.NewQueue(queueName, c.Conn, exchange) // here you can set the producer parameter
if err != nil { if err != nil {
logger.Error("NewQueue err",logger.Err(err)) logger.Error("runProduce failed", logger.Err(err))
return
}
defer q.Close()
headersKey1 := kv1 // exact match, consumer queue can receive messages
err = q.PublishHeaders(context.Background(), headersKey1, []byte("say hello"))
if err != nil {
logger.Error("PublishHeaders err",logger.Err(err))
return return
} }
} }
```
<br> func runProduce(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
#### Publish Error Handling
If the error is caused by the network, you can check if the reconnection is successful and resend it again.
```go
err := q.Publish(context.Background(), []byte(routeKey+" say hello"))
if err != nil { if err != nil {
if errors.Is(err, producer.ErrClosed) && c.CheckConnected() { // check connection return err
q, err = producer.NewQueue(queueName, c.Conn, exchange) }
defer connection.Close()
p, err := rabbitmq.NewProducer(exchange, queueName, connection)
if err != nil { if err != nil {
logger.Info("queue reconnect failed", logger.Err(err)) return err
}
defer p.Close()
count := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
count++
data := []byte("direct say hello" + strconv.Itoa(count))
err = p.PublishDirect(ctx, data)
if err != nil {
if errors.Is(err, rabbitmq.ErrClosed) {
for {
if !connection.CheckConnected() { // check connection
time.Sleep(time.Second * 2)
continue
}
p, err = rabbitmq.NewProducer(exchange, queueName, connection)
if err != nil {
logger.Warn("reconnect failed", logger.Err(err))
time.Sleep(time.Second * 2)
continue
}
break
}
} else { } else {
logger.Info("queue reconnect success") logger.Warn("publish failed", logger.Err(err))
} }
} }
logger.Info("publish message", logger.String("data", string(data)))
time.Sleep(time.Second * 5)
}
}
}
func runConsume(ctx context.Context, exchange *rabbitmq.Exchange, queueName string) error {
connection, err := rabbitmq.NewConnection(url, rabbitmq.WithLogger(logger.Get()))
if err != nil {
return err
}
c, err := rabbitmq.NewConsumer(exchange, queueName, connection, rabbitmq.WithConsumerAutoAck(false))
if err != nil {
return err
}
c.Consume(ctx, handler)
return nil
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
logger.Info("received message", logger.String("tagID", tagID), logger.String("data", string(data)))
return nil
} }
``` ```

307
pkg/rabbitmq/common.go Normal file
View File

@@ -0,0 +1,307 @@
package rabbitmq
import amqp "github.com/rabbitmq/amqp091-go"
// ErrClosed closed
var ErrClosed = amqp.ErrClosed
const (
exchangeTypeDirect = "direct"
exchangeTypeTopic = "topic"
exchangeTypeFanout = "fanout"
exchangeTypeHeaders = "headers"
exchangeTypeDelayedMessage = "x-delayed-message"
// HeadersTypeAll all
HeadersTypeAll HeadersType = "all"
// HeadersTypeAny any
HeadersTypeAny HeadersType = "any"
)
// HeadersType headers type
type HeadersType = string
// Exchange rabbitmq minimum management unit
type Exchange struct {
name string // exchange name
eType string // exchange type: direct, topic, fanout, headers, x-delayed-message
routingKey string // route key
headersKeys map[string]interface{} // this field is required if eType=headers.
delayedMessageType string // this field is required if eType=headers, support direct, topic, fanout, headers
}
// Name exchange name
func (e *Exchange) Name() string {
return e.name
}
// Type exchange type
func (e *Exchange) Type() string {
return e.eType
}
// RoutingKey exchange routing key
func (e *Exchange) RoutingKey() string {
return e.routingKey
}
// HeadersKeys exchange headers keys
func (e *Exchange) HeadersKeys() map[string]interface{} {
return e.headersKeys
}
// DelayedMessageType exchange delayed message type
func (e *Exchange) DelayedMessageType() string {
return e.delayedMessageType
}
// NewDirectExchange create a direct exchange
func NewDirectExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeDirect,
routingKey: routingKey,
}
}
// NewTopicExchange create a topic exchange
func NewTopicExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeTopic,
routingKey: routingKey,
}
}
// NewFanoutExchange create a fanout exchange
func NewFanoutExchange(exchangeName string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeFanout,
routingKey: "",
}
}
// NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"
func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange {
if keys == nil {
keys = make(map[string]interface{})
}
switch headersType {
case HeadersTypeAll, HeadersTypeAny:
keys["x-match"] = headersType
default:
keys["x-match"] = HeadersTypeAll
}
return &Exchange{
name: exchangeName,
eType: exchangeTypeHeaders,
routingKey: "",
headersKeys: keys,
}
}
// NewDelayedMessageExchange create a delayed message exchange
func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange {
return &Exchange{
name: exchangeName,
eType: "x-delayed-message",
routingKey: e.routingKey,
delayedMessageType: e.eType,
headersKeys: e.headersKeys,
}
}
// -------------------------------------------------------------------------------------------
// QueueDeclareOption declare queue option.
type QueueDeclareOption func(*queueDeclareOptions)
type queueDeclareOptions struct {
autoDelete bool // delete automatically
exclusive bool // exclusive (only available to the program that created it)
noWait bool // block processing
args amqp.Table // additional properties
}
func (o *queueDeclareOptions) apply(opts ...QueueDeclareOption) {
for _, opt := range opts {
opt(o)
}
}
// default queue declare settings
func defaultQueueDeclareOptions() *queueDeclareOptions {
return &queueDeclareOptions{
autoDelete: false,
exclusive: false,
noWait: false,
args: nil,
}
}
// WithQueueDeclareAutoDelete set queue declare auto delete option.
func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.autoDelete = enable
}
}
// WithQueueDeclareExclusive set queue declare exclusive option.
func WithQueueDeclareExclusive(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.exclusive = enable
}
}
// WithQueueDeclareNoWait set queue declare no wait option.
func WithQueueDeclareNoWait(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.noWait = enable
}
}
// WithQueueDeclareArgs set queue declare args option.
func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// ExchangeDeclareOption declare exchange option.
type ExchangeDeclareOption func(*exchangeDeclareOptions)
type exchangeDeclareOptions struct {
autoDelete bool // delete automatically
internal bool // public or not, false means public
noWait bool // block processing
args amqp.Table // additional properties
}
func (o *exchangeDeclareOptions) apply(opts ...ExchangeDeclareOption) {
for _, opt := range opts {
opt(o)
}
}
// default exchange declare settings
func defaultExchangeDeclareOptions() *exchangeDeclareOptions {
return &exchangeDeclareOptions{
//durable: true,
autoDelete: false,
internal: false,
noWait: false,
args: nil,
}
}
// WithExchangeDeclareAutoDelete set exchange declare auto delete option.
func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.autoDelete = enable
}
}
// WithExchangeDeclareInternal set exchange declare internal option.
func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.internal = enable
}
}
// WithExchangeDeclareNoWait set exchange declare no wait option.
func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.noWait = enable
}
}
// WithExchangeDeclareArgs set exchange declare args option.
func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// QueueBindOption declare queue bind option.
type QueueBindOption func(*queueBindOptions)
type queueBindOptions struct {
noWait bool // block processing
args amqp.Table // this parameter is invalid if the type is headers.
}
func (o *queueBindOptions) apply(opts ...QueueBindOption) {
for _, opt := range opts {
opt(o)
}
}
// default queue bind settings
func defaultQueueBindOptions() *queueBindOptions {
return &queueBindOptions{
noWait: false,
args: nil,
}
}
// WithQueueBindNoWait set queue bind no wait option.
func WithQueueBindNoWait(enable bool) QueueBindOption {
return func(o *queueBindOptions) {
o.noWait = enable
}
}
// WithQueueBindArgs set queue bind args option.
func WithQueueBindArgs(args map[string]interface{}) QueueBindOption {
return func(o *queueBindOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// DelayedMessagePublishOption declare queue bind option.
type DelayedMessagePublishOption func(*delayedMessagePublishOptions)
type delayedMessagePublishOptions struct {
topicKey string // the topic message type must be required
headersKeys map[string]interface{} // the headers message type must be required
}
func (o *delayedMessagePublishOptions) apply(opts ...DelayedMessagePublishOption) {
for _, opt := range opts {
opt(o)
}
}
// default delayed message publish settings
func defaultDelayedMessagePublishOptions() *delayedMessagePublishOptions {
return &delayedMessagePublishOptions{}
}
// WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.
func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption {
return func(o *delayedMessagePublishOptions) {
if topicKey == "" {
return
}
o.topicKey = topicKey
}
}
// WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.
func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption {
return func(o *delayedMessagePublishOptions) {
if headersKeys == nil {
return
}
o.headersKeys = headersKeys
}
}

View File

@@ -0,0 +1,91 @@
package rabbitmq
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestExchange(t *testing.T) {
e := NewDirectExchange("foo", "bar")
assert.Equal(t, e.eType, exchangeTypeDirect)
e = NewTopicExchange("foo", "bar.*")
assert.Equal(t, e.eType, exchangeTypeTopic)
e = NewFanoutExchange("foo")
assert.Equal(t, e.eType, exchangeTypeFanout)
e = NewHeadersExchange("foo", HeadersTypeAll, nil)
assert.Equal(t, e.eType, exchangeTypeHeaders)
e = NewHeadersExchange("foo", "unknown", nil)
assert.Equal(t, e.eType, exchangeTypeHeaders)
e = NewDelayedMessageExchange("foobar", NewDirectExchange("", "key"))
assert.Equal(t, e.eType, exchangeTypeDelayedMessage)
e = NewDelayedMessageExchange("foobar", NewDirectExchange("", "key"))
assert.Equal(t, e.name, e.Name())
assert.Equal(t, e.eType, e.Type())
assert.Equal(t, e.routingKey, e.RoutingKey())
assert.Equal(t, e.delayedMessageType, e.DelayedMessageType())
assert.Equal(t, e.headersKeys, e.HeadersKeys())
}
func TestExchangeDeclareOptions(t *testing.T) {
opts := []ExchangeDeclareOption{
WithExchangeDeclareAutoDelete(true),
WithExchangeDeclareInternal(true),
WithExchangeDeclareNoWait(true),
WithExchangeDeclareArgs(map[string]interface{}{"foo": "bar"}),
}
o := defaultExchangeDeclareOptions()
o.apply(opts...)
assert.True(t, o.autoDelete)
assert.True(t, o.internal)
assert.True(t, o.noWait)
assert.Equal(t, "bar", o.args["foo"])
}
func TestQueueDeclareOptions(t *testing.T) {
opts := []QueueDeclareOption{
WithQueueDeclareAutoDelete(true),
WithQueueDeclareExclusive(true),
WithQueueDeclareNoWait(true),
WithQueueDeclareArgs(map[string]interface{}{"foo": "bar"}),
}
o := defaultQueueDeclareOptions()
o.apply(opts...)
assert.True(t, o.autoDelete)
assert.True(t, o.exclusive)
assert.True(t, o.noWait)
assert.Equal(t, "bar", o.args["foo"])
}
func TestQueueBindOptions(t *testing.T) {
opts := []QueueBindOption{
WithQueueBindNoWait(true),
WithQueueBindArgs(map[string]interface{}{"foo": "bar"}),
}
o := defaultQueueBindOptions()
o.apply(opts...)
assert.True(t, o.noWait)
assert.Equal(t, "bar", o.args["foo"])
}
func TestDelayedMessagePublishOptions(t *testing.T) {
opts := []DelayedMessagePublishOption{
WithDelayedMessagePublishTopicKey(""),
WithDelayedMessagePublishTopicKey("key1.key2"),
WithDelayedMessagePublishHeadersKeys(nil),
WithDelayedMessagePublishHeadersKeys(map[string]interface{}{"foo": "bar"}),
}
o := defaultDelayedMessagePublishOptions()
o.apply(opts...)
assert.Equal(t, "key1.key2", o.topicKey)
assert.Equal(t, "bar", o.headersKeys["foo"])
}

View File

@@ -1,156 +0,0 @@
// Package rabbitmq is a go wrapper for rabbitmq
package rabbitmq
import (
"crypto/tls"
"errors"
"fmt"
"strings"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
// Connection rabbitmq connection
type Connection struct {
Mutex sync.Mutex
url string
tlsConfig *tls.Config
reconnectTime time.Duration
Exit chan struct{}
ZapLog *zap.Logger
Conn *amqp.Connection
blockChan chan amqp.Blocking
closeChan chan *amqp.Error
IsConnected bool
}
// NewConnection rabbitmq connection
func NewConnection(url string, opts ...ConnectionOption) (*Connection, error) {
if url == "" {
return nil, errors.New("url is empty")
}
o := defaultConnectionOptions()
o.apply(opts...)
c := &Connection{
url: url,
reconnectTime: o.reconnectTime,
tlsConfig: o.tlsConfig,
Exit: make(chan struct{}),
ZapLog: o.zapLog,
}
conn, err := connect(c.url, c.tlsConfig)
if err != nil {
return nil, err
}
c.Conn = conn
c.blockChan = c.Conn.NotifyBlocked(make(chan amqp.Blocking, 1))
c.closeChan = c.Conn.NotifyClose(make(chan *amqp.Error, 1))
c.IsConnected = true
go c.monitor()
return c, nil
}
func connect(url string, tlsConfig *tls.Config) (*amqp.Connection, error) {
var (
conn *amqp.Connection
err error
)
if strings.HasPrefix(url, "amqps://") {
if tlsConfig == nil {
return nil, errors.New("tls not set, e.g. NewConnection(url, WithTLSConfig(tlsConfig))")
}
conn, err = amqp.DialTLS(url, tlsConfig)
if err != nil {
return nil, err
}
} else {
conn, err = amqp.Dial(url)
if err != nil {
return nil, err
}
}
return conn, nil
}
// CheckConnected rabbitmq connection
func (c *Connection) CheckConnected() bool {
c.Mutex.Lock()
defer c.Mutex.Unlock()
return c.IsConnected
}
func (c *Connection) monitor() {
retryCount := 0
reconnectTip := fmt.Sprintf("[rabbitmq connection] lost connection, attempting reconnect in %s", c.reconnectTime)
for {
select {
case <-c.Exit:
_ = c.closeConn()
c.ZapLog.Info("[rabbitmq connection] close connection")
return
case b := <-c.blockChan:
if b.Active {
c.ZapLog.Warn("[rabbitmq connection] TCP blocked: " + b.Reason)
} else {
c.ZapLog.Warn("[rabbitmq connection] TCP unblocked")
}
case <-c.closeChan:
c.Mutex.Lock()
c.IsConnected = false
c.Mutex.Unlock()
retryCount++
c.ZapLog.Warn(reconnectTip)
time.Sleep(c.reconnectTime) // wait for reconnect
amqpConn, amqpErr := connect(c.url, c.tlsConfig)
if amqpErr != nil {
c.ZapLog.Warn("[rabbitmq connection] reconnect failed", zap.String("err", amqpErr.Error()), zap.Int("retryCount", retryCount))
continue
}
c.ZapLog.Info("[rabbitmq connection] reconnect success")
// set new connection
c.Mutex.Lock()
c.IsConnected = true
c.Conn = amqpConn
c.blockChan = c.Conn.NotifyBlocked(make(chan amqp.Blocking, 1))
c.closeChan = c.Conn.NotifyClose(make(chan *amqp.Error, 1))
c.Mutex.Unlock()
}
}
}
// Close rabbitmq connection
func (c *Connection) Close() {
c.Mutex.Lock()
c.IsConnected = false
c.Mutex.Unlock()
close(c.Exit)
}
func (c *Connection) closeConn() error {
c.Mutex.Lock()
defer c.Mutex.Unlock()
if c.Conn != nil {
return c.Conn.Close()
}
return nil
}

224
pkg/rabbitmq/connection.go Normal file
View File

@@ -0,0 +1,224 @@
// Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go
//
// producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message.
// publisher and subscriber using the fanout message type.
package rabbitmq
import (
"crypto/tls"
"errors"
"fmt"
"strings"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
// DefaultURL default rabbitmq url
const DefaultURL = "amqp://guest:guest@localhost:5672/"
var defaultLogger, _ = zap.NewProduction()
// ConnectionOption connection option.
type ConnectionOption func(*connectionOptions)
type connectionOptions struct {
tlsConfig *tls.Config // tls config, if the url is amqps this field must be set
reconnectTime time.Duration // reconnect time interval, default is 3s
zapLog *zap.Logger
}
func (o *connectionOptions) apply(opts ...ConnectionOption) {
for _, opt := range opts {
opt(o)
}
}
// default connection settings
func defaultConnectionOptions() *connectionOptions {
return &connectionOptions{
tlsConfig: nil,
reconnectTime: time.Second * 3,
zapLog: defaultLogger,
}
}
// WithTLSConfig set tls config option.
func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption {
return func(o *connectionOptions) {
if tlsConfig == nil {
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
o.tlsConfig = tlsConfig
}
}
// WithReconnectTime set reconnect time interval option.
func WithReconnectTime(d time.Duration) ConnectionOption {
return func(o *connectionOptions) {
if d == 0 {
d = time.Second * 3
}
o.reconnectTime = d
}
}
// WithLogger set logger option.
func WithLogger(zapLog *zap.Logger) ConnectionOption {
return func(o *connectionOptions) {
if zapLog == nil {
return
}
o.zapLog = zapLog
}
}
// -------------------------------------------------------------------------------------------
// Connection rabbitmq connection
type Connection struct {
mutex sync.Mutex
url string
tlsConfig *tls.Config
reconnectTime time.Duration
exit chan struct{}
zapLog *zap.Logger
conn *amqp.Connection
blockChan chan amqp.Blocking
closeChan chan *amqp.Error
isConnected bool
}
// NewConnection rabbitmq connection
func NewConnection(url string, opts ...ConnectionOption) (*Connection, error) {
if url == "" {
return nil, errors.New("url is empty")
}
o := defaultConnectionOptions()
o.apply(opts...)
connection := &Connection{
url: url,
reconnectTime: o.reconnectTime,
tlsConfig: o.tlsConfig,
exit: make(chan struct{}),
zapLog: o.zapLog,
}
conn, err := connect(connection.url, connection.tlsConfig)
if err != nil {
return nil, err
}
connection.zapLog.Info("[rabbitmq connection] connected successfully.")
connection.conn = conn
connection.blockChan = connection.conn.NotifyBlocked(make(chan amqp.Blocking, 1))
connection.closeChan = connection.conn.NotifyClose(make(chan *amqp.Error, 1))
connection.isConnected = true
go connection.monitor()
return connection, nil
}
func connect(url string, tlsConfig *tls.Config) (*amqp.Connection, error) {
var (
conn *amqp.Connection
err error
)
if strings.HasPrefix(url, "amqps://") {
if tlsConfig == nil {
return nil, errors.New("tls not set, e.g. NewConnection(url, WithTLSConfig(tlsConfig))")
}
conn, err = amqp.DialTLS(url, tlsConfig)
if err != nil {
return nil, err
}
} else {
conn, err = amqp.Dial(url)
if err != nil {
return nil, err
}
}
return conn, nil
}
// CheckConnected rabbitmq connection
func (c *Connection) CheckConnected() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.isConnected
}
func (c *Connection) monitor() {
retryCount := 0
reconnectTip := fmt.Sprintf("[rabbitmq connection] lost connection, attempting reconnect in %s", c.reconnectTime)
for {
select {
case <-c.exit:
_ = c.closeConn()
c.zapLog.Info("[rabbitmq connection] closed")
return
case b := <-c.blockChan:
if b.Active {
c.zapLog.Warn("[rabbitmq connection] TCP blocked: " + b.Reason)
} else {
c.zapLog.Warn("[rabbitmq connection] TCP unblocked")
}
case <-c.closeChan:
c.mutex.Lock()
c.isConnected = false
c.mutex.Unlock()
retryCount++
c.zapLog.Warn(reconnectTip)
time.Sleep(c.reconnectTime) // wait for reconnect
amqpConn, amqpErr := connect(c.url, c.tlsConfig)
if amqpErr != nil {
c.zapLog.Warn("[rabbitmq connection] reconnect failed", zap.String("err", amqpErr.Error()), zap.Int("retryCount", retryCount))
continue
}
c.zapLog.Info("[rabbitmq connection] reconnected successfully.")
// set new connection
c.mutex.Lock()
c.isConnected = true
c.conn = amqpConn
c.blockChan = c.conn.NotifyBlocked(make(chan amqp.Blocking, 1))
c.closeChan = c.conn.NotifyClose(make(chan *amqp.Error, 1))
c.mutex.Unlock()
}
}
}
// Close rabbitmq connection
func (c *Connection) Close() {
c.mutex.Lock()
c.isConnected = false
c.mutex.Unlock()
close(c.exit)
}
func (c *Connection) closeConn() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.conn != nil {
return c.conn.Close()
}
return nil
}

View File

@@ -16,6 +16,7 @@ import (
var ( var (
url = "amqp://guest:guest@192.168.3.37:5672/" url = "amqp://guest:guest@192.168.3.37:5672/"
urlTLS = "amqps://guest:guest@127.0.0.1:5672/" urlTLS = "amqps://guest:guest@127.0.0.1:5672/"
datetimeLayout = "2006-01-02 15:04:05.000"
) )
func TestConnectionOptions(t *testing.T) { func TestConnectionOptions(t *testing.T) {
@@ -36,6 +37,7 @@ func TestConnectionOptions(t *testing.T) {
func TestNewConnection1(t *testing.T) { func TestNewConnection1(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*2, func(cancel context.CancelFunc) { utils.SafeRunWithTimeout(time.Second*2, func(cancel context.CancelFunc) {
defer cancel()
c, err := NewConnection("") c, err := NewConnection("")
assert.Error(t, err) assert.Error(t, err)
@@ -47,13 +49,12 @@ func TestNewConnection1(t *testing.T) {
assert.True(t, c.CheckConnected()) assert.True(t, c.CheckConnected())
time.Sleep(time.Second) time.Sleep(time.Second)
c.Close() c.Close()
}) })
} }
func TestNewConnection2(t *testing.T) { func TestNewConnection2(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*2, func(cancel context.CancelFunc) { utils.SafeRunWithTimeout(time.Second*2, func(cancel context.CancelFunc) {
defer cancel()
// error // error
_, err := NewConnection(urlTLS) _, err := NewConnection(urlTLS)
assert.Error(t, err) assert.Error(t, err)
@@ -62,7 +63,6 @@ func TestNewConnection2(t *testing.T) {
InsecureSkipVerify: true, InsecureSkipVerify: true,
})) }))
assert.Error(t, err) assert.Error(t, err)
}) })
} }
@@ -70,12 +70,12 @@ func TestConnection_monitor(t *testing.T) {
c := &Connection{ c := &Connection{
url: urlTLS, url: urlTLS,
reconnectTime: time.Second, reconnectTime: time.Second,
Exit: make(chan struct{}), exit: make(chan struct{}),
ZapLog: defaultLogger, zapLog: defaultLogger,
Conn: &amqp.Connection{}, conn: &amqp.Connection{},
blockChan: make(chan amqp.Blocking, 1), blockChan: make(chan amqp.Blocking, 1),
closeChan: make(chan *amqp.Error, 1), closeChan: make(chan *amqp.Error, 1),
IsConnected: true, isConnected: true,
} }
c.CheckConnected() c.CheckConnected()
@@ -85,15 +85,15 @@ func TestConnection_monitor(t *testing.T) {
}() }()
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
c.Mutex.Lock() c.mutex.Lock()
c.blockChan <- amqp.Blocking{Active: false} c.blockChan <- amqp.Blocking{Active: false}
c.blockChan <- amqp.Blocking{Active: true, Reason: "the disk is full."} c.blockChan <- amqp.Blocking{Active: true, Reason: "the disk is full."}
c.Mutex.Unlock() c.mutex.Unlock()
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
c.Mutex.Lock() c.mutex.Lock()
c.closeChan <- &amqp.Error{Code: 504, Reason: "connect failed"} c.closeChan <- &amqp.Error{Code: 504, Reason: "connect failed"}
c.Mutex.Unlock() c.mutex.Unlock()
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
c.Close() c.Close()

442
pkg/rabbitmq/consumer.go Normal file
View File

@@ -0,0 +1,442 @@
package rabbitmq
import (
"context"
"strconv"
"strings"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
// ConsumerOption consumer option.
type ConsumerOption func(*consumerOptions)
type consumerOptions struct {
exchangeDeclare *exchangeDeclareOptions
queueDeclare *queueDeclareOptions
queueBind *queueBindOptions
qos *qosOptions
consume *consumeOptions
isPersistent bool // persistent or not
isAutoAck bool // auto-answer or not, if false, manual ACK required
}
func (o *consumerOptions) apply(opts ...ConsumerOption) {
for _, opt := range opts {
opt(o)
}
}
// default consumer settings
func defaultConsumerOptions() *consumerOptions {
return &consumerOptions{
exchangeDeclare: defaultExchangeDeclareOptions(),
queueDeclare: defaultQueueDeclareOptions(),
queueBind: defaultQueueBindOptions(),
qos: defaultQosOptions(),
consume: defaultConsumeOptions(),
isPersistent: true,
isAutoAck: true,
}
}
// WithConsumerExchangeDeclareOptions set exchange declare option.
func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption {
return func(o *consumerOptions) {
o.exchangeDeclare.apply(opts...)
}
}
// WithConsumerQueueDeclareOptions set queue declare option.
func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption {
return func(o *consumerOptions) {
o.queueDeclare.apply(opts...)
}
}
// WithConsumerQueueBindOptions set queue bind option.
func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption {
return func(o *consumerOptions) {
o.queueBind.apply(opts...)
}
}
// WithConsumerQosOptions set consume qos option.
func WithConsumerQosOptions(opts ...QosOption) ConsumerOption {
return func(o *consumerOptions) {
o.qos.apply(opts...)
}
}
// WithConsumerConsumeOptions set consumer consume option.
func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption {
return func(o *consumerOptions) {
o.consume.apply(opts...)
}
}
// WithConsumerAutoAck set consumer auto ack option.
func WithConsumerAutoAck(enable bool) ConsumerOption {
return func(o *consumerOptions) {
o.isAutoAck = enable
}
}
// WithConsumerPersistent set consumer persistent option.
func WithConsumerPersistent(enable bool) ConsumerOption {
return func(o *consumerOptions) {
o.isPersistent = enable
}
}
// -------------------------------------------------------------------------------------------
// ConsumeOption consume option.
type ConsumeOption func(*consumeOptions)
type consumeOptions struct {
consumer string // used to distinguish between multiple consumers
exclusive bool // only available to the program that created it
noLocal bool // if set to true, a message sent by a producer in the same Connection cannot be passed to a consumer in this Connection.
noWait bool // block processing
args amqp.Table // additional properties
}
func (o *consumeOptions) apply(opts ...ConsumeOption) {
for _, opt := range opts {
opt(o)
}
}
// default consume settings
func defaultConsumeOptions() *consumeOptions {
return &consumeOptions{
consumer: "",
exclusive: false,
noLocal: false,
noWait: false,
args: nil,
}
}
// WithConsumeConsumer set consume consumer option.
func WithConsumeConsumer(consumer string) ConsumeOption {
return func(o *consumeOptions) {
o.consumer = consumer
}
}
// WithConsumeExclusive set consume exclusive option.
func WithConsumeExclusive(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.exclusive = enable
}
}
// WithConsumeNoLocal set consume noLocal option.
func WithConsumeNoLocal(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.noLocal = enable
}
}
// WithConsumeNoWait set consume no wait option.
func WithConsumeNoWait(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.noWait = enable
}
}
// WithConsumeArgs set consume args option.
func WithConsumeArgs(args map[string]interface{}) ConsumeOption {
return func(o *consumeOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// QosOption qos option.
type QosOption func(*qosOptions)
type qosOptions struct {
enable bool
prefetchCount int
prefetchSize int
global bool
}
func (o *qosOptions) apply(opts ...QosOption) {
for _, opt := range opts {
opt(o)
}
}
// default qos settings
func defaultQosOptions() *qosOptions {
return &qosOptions{
enable: false,
prefetchCount: 0,
prefetchSize: 0,
global: false,
}
}
// WithQosEnable set qos enable option.
func WithQosEnable() QosOption {
return func(o *qosOptions) {
o.enable = true
}
}
// WithQosPrefetchCount set qos prefetch count option.
func WithQosPrefetchCount(count int) QosOption {
return func(o *qosOptions) {
o.prefetchCount = count
}
}
// WithQosPrefetchSize set qos prefetch size option.
func WithQosPrefetchSize(size int) QosOption {
return func(o *qosOptions) {
o.prefetchSize = size
}
}
// WithQosPrefetchGlobal set qos global option.
func WithQosPrefetchGlobal(enable bool) QosOption {
return func(o *qosOptions) {
o.global = enable
}
}
// -------------------------------------------------------------------------------------------
// Consumer session
type Consumer struct {
Exchange *Exchange
QueueName string
connection *Connection
ch *amqp.Channel
exchangeDeclareOption *exchangeDeclareOptions
queueDeclareOption *queueDeclareOptions
queueBindOption *queueBindOptions
qosOption *qosOptions
consumeOption *consumeOptions
isPersistent bool // persistent or not
isAutoAck bool // auto ack or not
zapLog *zap.Logger
}
// Handler message
type Handler func(ctx context.Context, data []byte, tagID string) error
//type Handler func(ctx context.Context, d *amqp.Delivery, isAutoAck bool) error
// NewConsumer create a consumer
func NewConsumer(exchange *Exchange, queueName string, connection *Connection, opts ...ConsumerOption) (*Consumer, error) {
o := defaultConsumerOptions()
o.apply(opts...)
c := &Consumer{
Exchange: exchange,
QueueName: queueName,
connection: connection,
exchangeDeclareOption: o.exchangeDeclare,
queueDeclareOption: o.queueDeclare,
queueBindOption: o.queueBind,
qosOption: o.qos,
consumeOption: o.consume,
isPersistent: o.isPersistent,
isAutoAck: o.isAutoAck,
zapLog: connection.zapLog,
}
return c, nil
}
// initialize a consumer session
func (c *Consumer) initialize() error {
c.connection.mutex.Lock()
// crate a new channel
ch, err := c.connection.conn.Channel()
if err != nil {
c.connection.mutex.Unlock()
return err
}
c.ch = ch
c.connection.mutex.Unlock()
if c.Exchange.eType == exchangeTypeDelayedMessage {
if c.exchangeDeclareOption.args == nil {
c.exchangeDeclareOption.args = amqp.Table{
"x-delayed-type": c.Exchange.delayedMessageType,
}
} else {
c.exchangeDeclareOption.args["x-delayed-type"] = c.Exchange.delayedMessageType
}
}
// declare the exchange type
err = ch.ExchangeDeclare(
c.Exchange.name,
c.Exchange.eType,
c.isPersistent,
c.exchangeDeclareOption.autoDelete,
c.exchangeDeclareOption.internal,
c.exchangeDeclareOption.noWait,
c.exchangeDeclareOption.args,
)
if err != nil {
_ = ch.Close()
return err
}
// declare a queue and create it automatically if it doesn't exist, or skip creation if it does.
queue, err := ch.QueueDeclare(
c.QueueName,
c.isPersistent,
c.queueDeclareOption.autoDelete,
c.queueDeclareOption.exclusive,
c.queueDeclareOption.noWait,
c.queueDeclareOption.args,
)
if err != nil {
_ = ch.Close()
return err
}
args := c.queueBindOption.args
if c.Exchange.eType == exchangeTypeHeaders {
args = c.Exchange.headersKeys
}
// binding queue and exchange
err = ch.QueueBind(
queue.Name,
c.Exchange.routingKey,
c.Exchange.name,
c.queueBindOption.noWait,
args,
)
if err != nil {
_ = ch.Close()
return err
}
// setting the prefetch value, set channel.Qos on the consumer side to limit the number of messages consumed at a time,
// balancing message throughput and fairness, and prevent consumers from being hit by sudden bursts of information traffic.
if c.qosOption.enable {
err = ch.Qos(c.qosOption.prefetchCount, c.qosOption.prefetchSize, c.qosOption.global)
if err != nil {
_ = ch.Close()
return err
}
}
fields := logFields(c.QueueName, c.Exchange)
fields = append(fields, zap.Bool("autoAck", c.isAutoAck))
c.zapLog.Info("[rabbitmq consumer] initialized", fields...)
return nil
}
func (c *Consumer) consumeWithContext(ctx context.Context) (<-chan amqp.Delivery, error) {
return c.ch.ConsumeWithContext(
ctx,
c.QueueName,
c.consumeOption.consumer,
c.isAutoAck,
c.consumeOption.exclusive,
c.consumeOption.noLocal,
c.consumeOption.noWait,
c.consumeOption.args,
)
}
// Consume messages for loop in goroutine
func (c *Consumer) Consume(ctx context.Context, handler Handler) {
go func() {
ticker := time.NewTicker(time.Second * 2)
isFirst := true
for {
if isFirst {
isFirst = false
ticker.Reset(time.Millisecond * 10)
} else {
ticker.Reset(time.Second * 2)
}
// check connection for loop
select {
case <-ticker.C:
if !c.connection.CheckConnected() {
continue
}
case <-c.connection.exit:
c.Close()
return
}
ticker.Stop()
err := c.initialize()
if err != nil {
c.zapLog.Warn("[rabbitmq consumer] initialize consumer error", zap.String("err", err.Error()), zap.String("queue", c.QueueName))
continue
}
delivery, err := c.consumeWithContext(ctx)
if err != nil {
c.zapLog.Warn("[rabbitmq consumer] execution of consumption error", zap.String("err", err.Error()), zap.String("queue", c.QueueName))
continue
}
c.zapLog.Info("[rabbitmq consumer] queue is ready and waiting for messages, queue=" + c.QueueName)
isContinueConsume := false
for {
select {
case <-c.connection.exit:
c.Close()
return
case d, ok := <-delivery:
if !ok {
c.zapLog.Warn("[rabbitmq consumer] exit consume message, queue=" + c.QueueName)
isContinueConsume = true
break
}
tagID := strings.Join([]string{d.Exchange, c.QueueName, strconv.FormatUint(d.DeliveryTag, 10)}, "/")
err = handler(ctx, d.Body, tagID)
if err != nil {
c.zapLog.Warn("[rabbitmq consumer] handle message error", zap.String("err", err.Error()), zap.String("tagID", tagID))
continue
}
if !c.isAutoAck {
if err = d.Ack(false); err != nil {
c.zapLog.Warn("[rabbitmq consumer] manual ack error", zap.String("err", err.Error()), zap.String("tagID", tagID))
continue
}
c.zapLog.Info("[rabbitmq consumer] manual ack done", zap.String("tagID", tagID))
}
}
if isContinueConsume {
break
}
}
}
}()
}
// Close consumer
func (c *Consumer) Close() {
if c.ch != nil {
_ = c.ch.Close()
}
}

View File

@@ -1,163 +0,0 @@
// Package consumer is the generic consumer-side processing logic for the four modes direct, topic, fanout, headers
package consumer
import (
"context"
"strconv"
"time"
"github.com/zhufuyi/sponge/pkg/rabbitmq"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
// Queue session
type Queue struct {
name string
c *rabbitmq.Connection
ch *amqp.Channel
ctx context.Context
autoAck bool
consumeOption *consumeOptions
zapLog *zap.Logger
}
// Handler message
type Handler func(ctx context.Context, data []byte, tagID ...string) error
// NewQueue create a queue
func NewQueue(ctx context.Context, name string, c *rabbitmq.Connection, opts ...ConsumeOption) (*Queue, error) {
o := defaultConsumeOptions()
o.apply(opts...)
q := &Queue{
name: name,
c: c,
consumeOption: o,
zapLog: c.ZapLog,
autoAck: o.autoAck,
ctx: ctx,
}
return q, nil
}
func (q *Queue) newChannel() error {
q.c.Mutex.Lock()
// crate a new channel
ch, err := q.c.Conn.Channel()
if err != nil {
q.c.Mutex.Unlock()
return err
}
q.ch = ch
q.c.Mutex.Unlock()
// setting the prefetch value
// set channel.Qos on the consumer side to limit the number of messages consumed at a time,
// balancing message throughput and fairness, and preventing consumers from being hit by bursty message traffic.
o := q.consumeOption
if o.enableQos {
err = ch.Qos(o.qos.prefetchCount, o.qos.prefetchSize, o.qos.global)
if err != nil {
_ = ch.Close()
return err
}
}
q.zapLog.Info("[rabbitmq consumer] create a queue success", zap.String("name", q.name), zap.Bool("autoAck", o.autoAck))
return nil
}
func (q *Queue) consumeWithContext() (<-chan amqp.Delivery, error) {
return q.ch.ConsumeWithContext(
q.ctx,
q.name,
q.consumeOption.consumer,
q.consumeOption.autoAck,
q.consumeOption.exclusive,
q.consumeOption.noLocal,
q.consumeOption.noWait,
q.consumeOption.args,
)
}
// Consume messages for loop in goroutine
func (q *Queue) Consume(handler Handler) {
go func() {
ticker := time.NewTicker(time.Second * 2)
for {
ticker.Reset(time.Second * 2)
// check connection for loop
select {
case <-ticker.C:
if !q.c.CheckConnected() {
continue
}
case <-q.c.Exit:
q.Close()
return
}
ticker.Stop()
err := q.newChannel()
if err != nil {
q.zapLog.Warn("[rabbitmq consumer] create a channel error", zap.String("err", err.Error()))
continue
}
delivery, err := q.consumeWithContext()
if err != nil {
q.zapLog.Warn("[rabbitmq consumer] execution of consumption error", zap.String("err", err.Error()))
continue
}
q.zapLog.Info("[rabbitmq consumer] queue is ready and waiting for messages, queue=" + q.name)
isContinueConsume := false
for {
select {
case <-q.c.Exit:
q.Close()
return
case d, ok := <-delivery:
if !ok {
q.zapLog.Warn("[rabbitmq consumer] queue receive message exception exit, queue=" + q.name)
isContinueConsume = true
break
}
tagID := q.name + "/" + strconv.FormatUint(d.DeliveryTag, 10)
err = handler(q.ctx, d.Body, tagID)
if err != nil {
q.zapLog.Warn("[rabbitmq consumer] handle message error", zap.String("err", err.Error()))
continue
}
if !q.autoAck {
if err = d.Ack(false); err != nil {
q.zapLog.Warn("[rabbitmq consumer] manual ack error", zap.String("err", err.Error()), zap.String("tagID", tagID))
continue
}
q.zapLog.Info("[rabbitmq consumer] manual ack done", zap.String("tagID", tagID))
}
}
if isContinueConsume {
break
}
}
}
}()
}
// Close queue
func (q *Queue) Close() {
if q.ch != nil {
_ = q.ch.Close()
}
}

View File

@@ -1,284 +0,0 @@
package consumer
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/zhufuyi/sponge/pkg/rabbitmq"
"github.com/zhufuyi/sponge/pkg/rabbitmq/producer"
"github.com/zhufuyi/sponge/pkg/utils"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
var url = "amqp://guest:guest@192.168.3.37:5672/"
var handler = func(ctx context.Context, data []byte, tag ...string) error {
tagID := strings.Join(tag, ",")
fmt.Printf("tagID=%s, receive message: %s\n", tagID, string(data))
return nil
}
func consume(ctx context.Context, queueNames ...string) error {
var consumeErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
consumeErr = err
return
}
for _, queueName := range queueNames {
queue, err := NewQueue(ctx, queueName, c, WithConsumeAutoAck(false))
if err != nil {
consumeErr = err
return
}
queue.Consume(handler)
}
})
return consumeErr
}
func TestConsumer_direct(t *testing.T) {
queueName := "direct-queue-1"
err := producerDirect(queueName)
if err != nil {
t.Log(err)
return
}
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
err = consume(ctx, queueName)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_topic(t *testing.T) {
queueName := "topic-queue-1"
err := producerTopic(queueName)
if err != nil {
t.Log(err)
return
}
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
err = consume(ctx, queueName)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_fanout(t *testing.T) {
queueName := "fanout-queue-1"
err := producerFanout(queueName)
if err != nil {
t.Log(err)
return
}
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
err = consume(ctx, queueName)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_headers(t *testing.T) {
queueName := "headers-queue-1"
err := producerHeaders(queueName)
if err != nil {
t.Log(err)
return
}
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
err = consume(ctx, queueName)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func producerDirect(queueName string) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
producerErr = err
return
}
defer c.Close()
ctx := context.Background()
exchangeName := "direct-exchange-demo"
routeKey := "direct-key-1"
exchange := producer.NewDirectExchange(exchangeName, routeKey)
q, err := producer.NewQueue(queueName, c.Conn, exchange)
if err != nil {
producerErr = err
return
}
defer q.Close()
_ = q.Publish(ctx, []byte("say hello 1"))
_ = q.Publish(ctx, []byte("say hello 2"))
producerErr = q.Publish(ctx, []byte("say hello 3"))
})
return producerErr
}
func producerTopic(queueName string) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
producerErr = err
return
}
defer c.Close()
ctx := context.Background()
exchangeName := "topic-exchange-demo"
routingKey := "key1.key2.*"
exchange := producer.NewTopicExchange(exchangeName, routingKey)
q, err := producer.NewQueue(queueName, c.Conn, exchange)
if err != nil {
producerErr = err
return
}
defer q.Close()
key := "key1.key2.key3"
producerErr = q.PublishTopic(ctx, key, []byte(key+" say hello"))
})
return producerErr
}
func producerFanout(queueName string) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
producerErr = err
return
}
defer c.Close()
ctx := context.Background()
exchangeName := "fanout-exchange-demo"
exchange := producer.NewFanOutExchange(exchangeName)
q, err := producer.NewQueue(queueName, c.Conn, exchange)
if err != nil {
producerErr = err
return
}
defer q.Close()
producerErr = q.Publish(ctx, []byte(" say hello "))
})
return producerErr
}
func producerHeaders(queueName string) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
producerErr = err
return
}
defer c.Close()
ctx := context.Background()
exchangeName := "headers-exchange-demo"
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
exchange := producer.NewHeaderExchange(exchangeName, producer.HeadersTypeAll, kv1) // all
q, err := producer.NewQueue(queueName, c.Conn, exchange)
if err != nil {
producerErr = err
return
}
defer q.Close()
headersKey1 := kv1
err = q.PublishHeaders(ctx, headersKey1, []byte("say hello 1"))
if err != nil {
producerErr = err
return
}
headersKey1 = map[string]interface{}{"foo": "bar"}
producerErr = q.PublishHeaders(ctx, headersKey1, []byte("say hello 2"))
})
return producerErr
}
func TestNewQueue(t *testing.T) {
c := &rabbitmq.Connection{
Exit: make(chan struct{}),
ZapLog: zap.NewNop(),
Conn: &amqp.Connection{},
IsConnected: true,
}
q, err := NewQueue(context.Background(), "test", c, WithConsumeQos(WithQosPrefetchCount(1)))
if err != nil {
t.Log(err)
return
}
q.ch = &amqp.Channel{}
amqp.NewConnectionProperties()
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
err = q.newChannel()
if err != nil {
t.Log(err)
return
}
})
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
_, err := q.consumeWithContext()
if err != nil {
t.Log(err)
return
}
})
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
q.Consume(handler)
})
time.Sleep(time.Millisecond * 2500)
close(q.c.Exit)
}

View File

@@ -1,135 +0,0 @@
package consumer
import amqp "github.com/rabbitmq/amqp091-go"
// ConsumeOption consume option.
type ConsumeOption func(*consumeOptions)
type consumeOptions struct {
consumer string // used to distinguish between multiple consumers
autoAck bool // auto-answer or not, if false, manual ACK required
exclusive bool // only available to the program that created it
noLocal bool // if set to true, a message sent by a producer in the same Connection cannot be passed to a consumer in this Connection.
noWait bool // block processing
args amqp.Table // additional properties
enableQos bool
qos *qosOptions
}
func (o *consumeOptions) apply(opts ...ConsumeOption) {
for _, opt := range opts {
opt(o)
}
}
// default consume settings
func defaultConsumeOptions() *consumeOptions {
return &consumeOptions{
consumer: "",
autoAck: true,
exclusive: false,
noLocal: false,
noWait: false,
args: nil,
enableQos: false,
qos: defaultQosOptions(),
}
}
// WithConsumeConsumer set consume consumer option.
func WithConsumeConsumer(consumer string) ConsumeOption {
return func(o *consumeOptions) {
o.consumer = consumer
}
}
// WithConsumeAutoAck set consume auto ack option.
func WithConsumeAutoAck(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.autoAck = enable
}
}
// WithConsumeExclusive set consume exclusive option.
func WithConsumeExclusive(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.exclusive = enable
}
}
// WithConsumeNoLocal set consume noLocal option.
func WithConsumeNoLocal(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.noLocal = enable
}
}
// WithConsumeNoWait set consume no wait option.
func WithConsumeNoWait(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.noWait = enable
}
}
// WithConsumeArgs set consume args option.
func WithConsumeArgs(args map[string]interface{}) ConsumeOption {
return func(o *consumeOptions) {
o.args = args
}
}
// WithConsumeQos set consume qos option.
func WithConsumeQos(opts ...QosOption) ConsumeOption {
return func(o *consumeOptions) {
o.enableQos = true
o.qos.apply(opts...)
}
}
// -------------------------------------------------------------------------------------------
// QosOption qos option.
type QosOption func(*qosOptions)
type qosOptions struct {
prefetchCount int
prefetchSize int
global bool
}
func (o *qosOptions) apply(opts ...QosOption) {
for _, opt := range opts {
opt(o)
}
}
// default qos settings
func defaultQosOptions() *qosOptions {
return &qosOptions{
prefetchCount: 0,
prefetchSize: 0,
global: false,
}
}
// WithQosPrefetchCount set qos prefetch count option.
func WithQosPrefetchCount(count int) QosOption {
return func(o *qosOptions) {
o.prefetchCount = count
}
}
// WithQosPrefetchSize set qos prefetch size option.
func WithQosPrefetchSize(size int) QosOption {
return func(o *qosOptions) {
o.prefetchSize = size
}
}
// WithQosPrefetchGlobal set qos global option.
func WithQosPrefetchGlobal(enable bool) QosOption {
return func(o *qosOptions) {
o.global = enable
}
}

View File

@@ -1,52 +0,0 @@
package consumer
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestConsumeOptions(t *testing.T) {
opts := []ConsumeOption{
WithConsumeConsumer("test"),
WithConsumeAutoAck(true),
WithConsumeExclusive(true),
WithConsumeNoLocal(true),
WithConsumeNoWait(true),
WithConsumeArgs(map[string]interface{}{"foo": "bar"}),
WithConsumeQos(
WithQosPrefetchCount(1),
WithQosPrefetchSize(4096),
WithQosPrefetchGlobal(true),
),
}
o := defaultConsumeOptions()
o.apply(opts...)
assert.Equal(t, "test", o.consumer)
assert.True(t, o.autoAck)
assert.True(t, o.exclusive)
assert.True(t, o.noLocal)
assert.True(t, o.noWait)
assert.Equal(t, "bar", o.args["foo"])
assert.True(t, o.enableQos)
assert.Equal(t, 1, o.qos.prefetchCount)
assert.Equal(t, 4096, o.qos.prefetchSize)
assert.True(t, o.qos.global)
}
func TestQosOptions(t *testing.T) {
opts := []QosOption{
WithQosPrefetchCount(1),
WithQosPrefetchSize(4096),
WithQosPrefetchGlobal(true),
}
o := defaultQosOptions()
o.apply(opts...)
assert.Equal(t, 1, o.prefetchCount)
assert.Equal(t, 4096, o.prefetchSize)
assert.True(t, o.global)
}

View File

@@ -0,0 +1,383 @@
package rabbitmq
import (
"context"
"fmt"
"testing"
"time"
"github.com/zhufuyi/sponge/pkg/utils"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func TestConsumerOptions(t *testing.T) {
opts := []ConsumerOption{
WithConsumerExchangeDeclareOptions(
WithExchangeDeclareAutoDelete(true),
WithExchangeDeclareInternal(true),
WithExchangeDeclareNoWait(true),
WithExchangeDeclareArgs(map[string]interface{}{"foo1": "bar1"}),
),
WithConsumerQueueDeclareOptions(
WithQueueDeclareAutoDelete(true),
WithQueueDeclareExclusive(true),
WithQueueDeclareNoWait(true),
WithQueueDeclareArgs(map[string]interface{}{"foo": "bar"}),
),
WithConsumerQueueBindOptions(
WithQueueBindNoWait(true),
WithQueueBindArgs(map[string]interface{}{"foo2": "bar2"}),
),
WithConsumerConsumeOptions(
WithConsumeConsumer("test"),
WithConsumeExclusive(true),
WithConsumeNoLocal(true),
WithConsumeNoWait(true),
WithConsumeArgs(map[string]interface{}{"foo": "bar"}),
),
WithConsumerQosOptions(
WithQosEnable(),
WithQosPrefetchCount(1),
WithQosPrefetchSize(4096),
WithQosPrefetchGlobal(true),
),
WithConsumerAutoAck(true),
WithConsumerPersistent(true),
}
o := defaultConsumerOptions()
o.apply(opts...)
assert.True(t, o.queueDeclare.autoDelete)
assert.True(t, o.queueDeclare.exclusive)
assert.True(t, o.queueDeclare.noWait)
assert.Equal(t, "bar", o.queueDeclare.args["foo"])
assert.True(t, o.exchangeDeclare.autoDelete)
assert.True(t, o.exchangeDeclare.internal)
assert.True(t, o.exchangeDeclare.noWait)
assert.Equal(t, "bar1", o.exchangeDeclare.args["foo1"])
assert.True(t, o.queueBind.noWait)
assert.Equal(t, "bar2", o.queueBind.args["foo2"])
assert.True(t, o.isPersistent)
assert.True(t, o.isAutoAck)
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
fmt.Printf("[received]: tagID=%s, data=%s\n", tagID, data)
return nil
}
func consume(ctx context.Context, queueName string, exchange *Exchange) error {
var consumeErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
consumeErr = err
return
}
c, err := NewConsumer(exchange, queueName, connection, WithConsumerAutoAck(false))
if err != nil {
consumeErr = err
return
}
c.Consume(ctx, handler)
})
return consumeErr
}
func TestConsumer_direct(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
exchangeName := "direct-exchange-demo"
queueName := "direct-queue-1"
routeKey := "direct-key-1"
exchange := NewDirectExchange(exchangeName, routeKey)
err := producerDirect(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_topic(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
exchangeName := "topic-exchange-demo"
queueName := "topic-queue-1"
routingKey := "key1.key2.*"
exchange := NewTopicExchange(exchangeName, routingKey)
err := producerTopic(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_fanout(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
exchangeName := "fanout-exchange-demo"
queueName := "fanout-queue-1"
exchange := NewFanoutExchange(exchangeName)
err := producerFanout(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_headers(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
exchangeName := "headers-exchange-demo"
queueName := "headers-queue-1"
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
exchange := NewHeadersExchange(exchangeName, HeadersTypeAll, kv1) // all
err := producerHeaders(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_delayedMessage(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*7)
exchangeName := "delayed-message-exchange-demo"
queueName := "delayed-message-queue"
routingKey := "delayed-key"
exchange := NewDelayedMessageExchange(exchangeName, NewDirectExchange("", routingKey))
err := producerDelayedMessage(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func producerDirect(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
_ = p.PublishDirect(ctx, []byte("say hello 1"))
_ = p.PublishDirect(ctx, []byte("say hello 2"))
producerErr = p.PublishDirect(ctx, []byte("say hello 3"))
})
return producerErr
}
func producerTopic(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
key := "key1.key2.key3"
producerErr = p.PublishTopic(ctx, key, []byte(key+" say hello"))
})
return producerErr
}
func producerFanout(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
producerErr = p.PublishFanout(ctx, []byte(" say hello"))
})
return producerErr
}
func producerHeaders(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
headersKey1 := exchange.headersKeys
err = p.PublishHeaders(ctx, headersKey1, []byte("say hello 1"))
if err != nil {
producerErr = err
return
}
headersKey1 = map[string]interface{}{"foo": "bar"}
producerErr = p.PublishHeaders(ctx, headersKey1, []byte("say hello 2"))
})
return producerErr
}
func producerDelayedMessage(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*6, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
producerErr = p.PublishDelayedMessage(ctx, time.Second*5, []byte("say hello "+time.Now().Format(datetimeLayout)))
time.Sleep(time.Second)
producerErr = p.PublishDelayedMessage(ctx, time.Second*5, []byte("say hello "+time.Now().Format(datetimeLayout)))
})
return producerErr
}
func TestConsumerErr(t *testing.T) {
connection := &Connection{
exit: make(chan struct{}),
zapLog: zap.NewNop(),
conn: &amqp.Connection{},
isConnected: true,
}
exchange := NewDirectExchange("foo", "bar")
c, err := NewConsumer(exchange, "test", connection, WithConsumerQosOptions(
WithQosEnable(),
WithQosPrefetchCount(1)),
)
if err != nil {
t.Log(err)
return
}
c.ch = &amqp.Channel{}
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
defer cancel()
err := c.initialize()
if err != nil {
t.Log(err)
return
}
})
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
defer cancel()
_, err := c.consumeWithContext(context.Background())
if err != nil {
t.Log(err)
return
}
})
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
c.Consume(context.Background(), handler)
})
utils.SafeRun(context.Background(), func(ctx context.Context) {
c.Close()
})
time.Sleep(time.Millisecond * 2500)
close(c.connection.exit)
}

View File

@@ -1,70 +0,0 @@
package rabbitmq
import (
"crypto/tls"
"time"
"go.uber.org/zap"
)
// DefaultURL default rabbitmq url
const DefaultURL = "amqp://guest:guest@localhost:5672/"
var defaultLogger, _ = zap.NewProduction()
// ConnectionOption connection option.
type ConnectionOption func(*connectionOptions)
type connectionOptions struct {
tlsConfig *tls.Config // tls config, if the url is amqps this field must be set
reconnectTime time.Duration // reconnect time interval, default is 3s
zapLog *zap.Logger
}
func (o *connectionOptions) apply(opts ...ConnectionOption) {
for _, opt := range opts {
opt(o)
}
}
// default connection settings
func defaultConnectionOptions() *connectionOptions {
return &connectionOptions{
tlsConfig: nil,
reconnectTime: time.Second * 3,
zapLog: defaultLogger,
}
}
// WithTLSConfig set tls config option.
func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption {
return func(o *connectionOptions) {
if tlsConfig == nil {
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
o.tlsConfig = tlsConfig
}
}
// WithReconnectTime set reconnect time interval option.
func WithReconnectTime(d time.Duration) ConnectionOption {
return func(o *connectionOptions) {
if d == 0 {
d = time.Second * 3
}
o.reconnectTime = d
}
}
// WithLogger set logger option.
func WithLogger(zapLog *zap.Logger) ConnectionOption {
return func(o *connectionOptions) {
if zapLog == nil {
return
}
o.zapLog = zapLog
}
}

331
pkg/rabbitmq/producer.go Normal file
View File

@@ -0,0 +1,331 @@
package rabbitmq
import (
"context"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
// ProducerOption producer option.
type ProducerOption func(*producerOptions)
type producerOptions struct {
exchangeDeclare *exchangeDeclareOptions
queueDeclare *queueDeclareOptions
queueBind *queueBindOptions
isPersistent bool // is it persistent
// If true, the message will be returned to the sender if the queue cannot be
// found according to its own exchange type and routeKey rules.
mandatory bool
}
func (o *producerOptions) apply(opts ...ProducerOption) {
for _, opt := range opts {
opt(o)
}
}
// default producer settings
func defaultProducerOptions() *producerOptions {
return &producerOptions{
exchangeDeclare: defaultExchangeDeclareOptions(),
queueDeclare: defaultQueueDeclareOptions(),
queueBind: defaultQueueBindOptions(),
isPersistent: true,
mandatory: true,
}
}
// WithProducerExchangeDeclareOptions set exchange declare option.
func WithProducerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ProducerOption {
return func(o *producerOptions) {
o.exchangeDeclare.apply(opts...)
}
}
// WithProducerQueueDeclareOptions set queue declare option.
func WithProducerQueueDeclareOptions(opts ...QueueDeclareOption) ProducerOption {
return func(o *producerOptions) {
o.queueDeclare.apply(opts...)
}
}
// WithProducerQueueBindOptions set queue bind option.
func WithProducerQueueBindOptions(opts ...QueueBindOption) ProducerOption {
return func(o *producerOptions) {
o.queueBind.apply(opts...)
}
}
// WithProducerPersistent set producer persistent option.
func WithProducerPersistent(enable bool) ProducerOption {
return func(o *producerOptions) {
o.isPersistent = enable
}
}
// WithProducerMandatory set producer mandatory option.
func WithProducerMandatory(enable bool) ProducerOption {
return func(o *producerOptions) {
o.mandatory = enable
}
}
// -------------------------------------------------------------------------------------------
// Producer session
type Producer struct {
Exchange *Exchange // exchange
QueueName string // queue name
conn *amqp.Connection // rabbitmq connection
ch *amqp.Channel // rabbitmq channel
// persistent or not
isPersistent bool
deliveryMode uint8 // amqp.Persistent or amqp.Transient
// If true, the message will be returned to the sender if the queue cannot be
// found according to its own exchange type and routeKey rules.
mandatory bool
zapLog *zap.Logger
}
// NewProducer create a producer
func NewProducer(exchange *Exchange, queueName string, connection *Connection, opts ...ProducerOption) (*Producer, error) {
o := defaultProducerOptions()
o.apply(opts...)
// crate a new channel
ch, err := connection.conn.Channel()
if err != nil {
return nil, err
}
if exchange.eType == exchangeTypeDelayedMessage {
if o.exchangeDeclare.args == nil {
o.exchangeDeclare.args = amqp.Table{
"x-delayed-type": exchange.delayedMessageType,
}
} else {
o.exchangeDeclare.args["x-delayed-type"] = exchange.delayedMessageType
}
}
// declare the exchange type
err = ch.ExchangeDeclare(
exchange.name,
exchange.eType,
o.isPersistent,
o.exchangeDeclare.autoDelete,
o.exchangeDeclare.internal,
o.exchangeDeclare.noWait,
o.exchangeDeclare.args,
)
if err != nil {
_ = ch.Close()
return nil, err
}
// declare a queue and create it automatically if it doesn't exist, or skip creation if it does.
q, err := ch.QueueDeclare(
queueName,
o.isPersistent,
o.queueDeclare.autoDelete,
o.queueDeclare.exclusive,
o.queueDeclare.noWait,
o.queueDeclare.args,
)
if err != nil {
_ = ch.Close()
return nil, err
}
args := o.queueBind.args
if exchange.eType == exchangeTypeHeaders {
args = exchange.headersKeys
}
// binding queue and exchange
err = ch.QueueBind(
q.Name,
exchange.routingKey,
exchange.name,
o.queueBind.noWait,
args,
)
if err != nil {
_ = ch.Close()
return nil, err
}
deliveryMode := amqp.Persistent
if !o.isPersistent {
deliveryMode = amqp.Transient
}
fields := logFields(queueName, exchange)
fields = append(fields, zap.Bool("isPersistent", o.isPersistent))
connection.zapLog.Info("[rabbit producer] initialized", fields...)
return &Producer{
QueueName: queueName,
conn: connection.conn,
ch: ch,
Exchange: exchange,
isPersistent: o.isPersistent,
deliveryMode: deliveryMode,
mandatory: o.mandatory,
zapLog: connection.zapLog,
}, nil
}
// PublishDirect send direct type message
func (p *Producer) PublishDirect(ctx context.Context, body []byte) error {
if p.Exchange.eType != exchangeTypeDirect {
return fmt.Errorf("invalid exchange type (%s), only supports direct type", p.Exchange.eType)
}
return p.ch.PublishWithContext(
ctx,
p.Exchange.name,
p.Exchange.routingKey,
p.mandatory,
false,
amqp.Publishing{
DeliveryMode: p.deliveryMode,
ContentType: "text/plain",
Body: body,
},
)
}
// PublishFanout send fanout type message
func (p *Producer) PublishFanout(ctx context.Context, body []byte) error {
if p.Exchange.eType != exchangeTypeFanout {
return fmt.Errorf("invalid exchange type (%s), only supports fanout type", p.Exchange.eType)
}
return p.ch.PublishWithContext(
ctx,
p.Exchange.name,
p.Exchange.routingKey,
p.mandatory,
false,
amqp.Publishing{
DeliveryMode: p.deliveryMode,
ContentType: "text/plain",
Body: body,
},
)
}
// PublishTopic send topic type message
func (p *Producer) PublishTopic(ctx context.Context, topicKey string, body []byte) error {
if p.Exchange.eType != exchangeTypeTopic {
return fmt.Errorf("invalid exchange type (%s), only supports topic type", p.Exchange.eType)
}
return p.ch.PublishWithContext(
ctx,
p.Exchange.name,
topicKey,
p.mandatory,
false,
amqp.Publishing{
DeliveryMode: p.deliveryMode,
ContentType: "text/plain",
Body: body,
},
)
}
// PublishHeaders send headers type message
func (p *Producer) PublishHeaders(ctx context.Context, headersKeys map[string]interface{}, body []byte) error {
if p.Exchange.eType != exchangeTypeHeaders {
return fmt.Errorf("invalid exchange type (%s), only supports headers type", p.Exchange.eType)
}
return p.ch.PublishWithContext(
ctx,
p.Exchange.name,
p.Exchange.routingKey,
p.mandatory,
false,
amqp.Publishing{
DeliveryMode: p.deliveryMode,
Headers: headersKeys,
ContentType: "text/plain",
Body: body,
},
)
}
// PublishDelayedMessage send delayed type message
func (p *Producer) PublishDelayedMessage(ctx context.Context, delayTime time.Duration, body []byte, opts ...DelayedMessagePublishOption) error {
if p.Exchange.eType != exchangeTypeDelayedMessage {
return fmt.Errorf("invalid exchange type (%s), only supports x-delayed-message type", p.Exchange.eType)
}
routingKey := p.Exchange.routingKey
headersKeys := make(map[string]interface{})
o := defaultDelayedMessagePublishOptions()
o.apply(opts...)
switch p.Exchange.delayedMessageType {
case exchangeTypeTopic:
if o.topicKey == "" {
return fmt.Errorf("topic key is required, please set topicKey in DelayedMessagePublishOption")
}
routingKey = o.topicKey
case exchangeTypeHeaders:
if o.headersKeys == nil {
return fmt.Errorf("headers keys is required, please set headersKeys in DelayedMessagePublishOption")
}
headersKeys = o.headersKeys
}
headersKeys["x-delay"] = int(delayTime / time.Millisecond) // delay time: milliseconds
return p.ch.PublishWithContext(
ctx,
p.Exchange.name,
routingKey,
p.mandatory,
false,
amqp.Publishing{
DeliveryMode: p.deliveryMode,
Headers: headersKeys,
ContentType: "text/plain",
Body: body,
},
)
}
// Close the consumer
func (p *Producer) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}
func logFields(queueName string, exchange *Exchange) []zap.Field {
fields := []zap.Field{
zap.String("queue", queueName),
zap.String("exchange", exchange.name),
zap.String("exchangeType", exchange.eType),
}
switch exchange.eType {
case exchangeTypeDirect, exchangeTypeTopic:
fields = append(fields, zap.String("routingKey", exchange.routingKey))
case exchangeTypeHeaders:
fields = append(fields, zap.Any("headersKeys", exchange.headersKeys))
case exchangeTypeDelayedMessage:
fields = append(fields, zap.String("delayedMessageType", exchange.delayedMessageType))
switch exchange.delayedMessageType {
case exchangeTypeDirect, exchangeTypeTopic:
fields = append(fields, zap.String("routingKey", exchange.routingKey))
case exchangeTypeHeaders:
fields = append(fields, zap.Any("headersKeys", exchange.headersKeys))
}
}
return fields
}

View File

@@ -1,242 +0,0 @@
package producer
import (
amqp "github.com/rabbitmq/amqp091-go"
)
// QueueDeclareOption declare queue option.
type QueueDeclareOption func(*queueDeclareOptions)
type queueDeclareOptions struct {
durable bool // is it persistent
autoDelete bool // delete automatically
exclusive bool // exclusive (only available to the program that created it)
noWait bool // block processing
args amqp.Table // additional properties
}
func (o *queueDeclareOptions) apply(opts ...QueueDeclareOption) {
for _, opt := range opts {
opt(o)
}
}
// default queue declare settings
func defaultQueueDeclareOptions() *queueDeclareOptions {
return &queueDeclareOptions{
durable: true,
autoDelete: false,
exclusive: false,
noWait: false,
args: nil,
}
}
// WithQueueDeclareDurable set queue declare durable option.
func WithQueueDeclareDurable(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.durable = enable
}
}
// WithQueueDeclareAutoDelete set queue declare auto delete option.
func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.autoDelete = enable
}
}
// WithQueueDeclareExclusive set queue declare exclusive option.
func WithQueueDeclareExclusive(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.exclusive = enable
}
}
// WithQueueDeclareNoWait set queue declare no wait option.
func WithQueueDeclareNoWait(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.noWait = enable
}
}
// WithQueueDeclareArgs set queue declare args option.
func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// ExchangeDeclareOption declare exchange option.
type ExchangeDeclareOption func(*exchangeDeclareOptions)
type exchangeDeclareOptions struct {
durable bool // is it persistent
autoDelete bool // delete automatically
internal bool // public or not, false means public
noWait bool // block processing
args amqp.Table // additional properties
}
func (o *exchangeDeclareOptions) apply(opts ...ExchangeDeclareOption) {
for _, opt := range opts {
opt(o)
}
}
// default exchange declare settings
func defaultExchangeDeclareOptions() *exchangeDeclareOptions {
return &exchangeDeclareOptions{
durable: true,
autoDelete: false,
internal: false,
noWait: false,
args: nil,
}
}
// WithExchangeDeclareDurable set exchange declare durable option.
func WithExchangeDeclareDurable(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.durable = enable
}
}
// WithExchangeDeclareAutoDelete set exchange declare auto delete option.
func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.autoDelete = enable
}
}
// WithExchangeDeclareInternal set exchange declare internal option.
func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.internal = enable
}
}
// WithExchangeDeclareNoWait set exchange declare no wait option.
func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.noWait = enable
}
}
// WithExchangeDeclareArgs set exchange declare args option.
func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// QueueBindOption declare queue bind option.
type QueueBindOption func(*queueBindOptions)
type queueBindOptions struct {
noWait bool // block processing
args amqp.Table // this parameter is invalid if the type is headers.
}
func (o *queueBindOptions) apply(opts ...QueueBindOption) {
for _, opt := range opts {
opt(o)
}
}
// default queue bind settings
func defaultQueueBindOptions() *queueBindOptions {
return &queueBindOptions{
noWait: false,
args: nil,
}
}
// WithQueueBindNoWait set queue bind no wait option.
func WithQueueBindNoWait(enable bool) QueueBindOption {
return func(o *queueBindOptions) {
o.noWait = enable
}
}
// WithQueueBindArgs set queue bind args option.
func WithQueueBindArgs(args map[string]interface{}) QueueBindOption {
return func(o *queueBindOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// QueueOption queue option.
type QueueOption func(*queueOptions)
type queueOptions struct {
queueDeclare *queueDeclareOptions
exchangeDeclare *exchangeDeclareOptions
queueBind *queueBindOptions
// If true, the message will be returned to the sender if the queue cannot be
// found according to its own exchange type and routeKey rules.
mandatory bool
// If true, when exchange sends a message to the queue and finds that there
// are no consumers on the queue, it returns the message to the sender
immediate bool
}
func (o *queueOptions) apply(opts ...QueueOption) {
for _, opt := range opts {
opt(o)
}
}
// default queue declare settings
func defaultProducerOptions() *queueOptions {
return &queueOptions{
queueDeclare: defaultQueueDeclareOptions(),
exchangeDeclare: defaultExchangeDeclareOptions(),
queueBind: defaultQueueBindOptions(),
mandatory: false,
immediate: false,
}
}
// WithQueueDeclareOptions set queue declare option.
func WithQueueDeclareOptions(opts ...QueueDeclareOption) QueueOption {
return func(o *queueOptions) {
o.queueDeclare.apply(opts...)
}
}
// WithExchangeDeclareOptions set exchange declare option.
func WithExchangeDeclareOptions(opts ...ExchangeDeclareOption) QueueOption {
return func(o *queueOptions) {
o.exchangeDeclare.apply(opts...)
}
}
// WithQueueBindOptions set queue bind option.
func WithQueueBindOptions(opts ...QueueBindOption) QueueOption {
return func(o *queueOptions) {
o.queueBind.apply(opts...)
}
}
// WithQueuePublishMandatory set queue publish mandatory option.
func WithQueuePublishMandatory(enable bool) QueueOption {
return func(o *queueOptions) {
o.mandatory = enable
}
}
// WithQueuePublishImmediate set queue publish immediate option.
func WithQueuePublishImmediate(enable bool) QueueOption {
return func(o *queueOptions) {
o.immediate = enable
}
}

View File

@@ -1,103 +0,0 @@
package producer
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestQueueDeclareOptions(t *testing.T) {
opts := []QueueDeclareOption{
WithQueueDeclareDurable(true),
WithQueueDeclareAutoDelete(true),
WithQueueDeclareExclusive(true),
WithQueueDeclareNoWait(true),
WithQueueDeclareArgs(map[string]interface{}{"foo": "bar"}),
}
o := defaultQueueDeclareOptions()
o.apply(opts...)
assert.True(t, o.durable)
assert.True(t, o.autoDelete)
assert.True(t, o.exclusive)
assert.True(t, o.noWait)
assert.Equal(t, "bar", o.args["foo"])
}
func TestExchangeDeclareOptions(t *testing.T) {
opts := []ExchangeDeclareOption{
WithExchangeDeclareDurable(true),
WithExchangeDeclareAutoDelete(true),
WithExchangeDeclareInternal(true),
WithExchangeDeclareNoWait(true),
WithExchangeDeclareArgs(map[string]interface{}{"foo1": "bar1"}),
}
o := defaultExchangeDeclareOptions()
o.apply(opts...)
assert.True(t, o.durable)
assert.True(t, o.autoDelete)
assert.True(t, o.internal)
assert.True(t, o.noWait)
assert.Equal(t, "bar1", o.args["foo1"])
}
func TestQueueBindOptions(t *testing.T) {
opts := []QueueBindOption{
WithQueueBindNoWait(true),
WithQueueBindArgs(map[string]interface{}{"foo2": "bar2"}),
}
o := defaultQueueBindOptions()
o.apply(opts...)
assert.True(t, o.noWait)
assert.Equal(t, "bar2", o.args["foo2"])
}
func TestProducerOptions(t *testing.T) {
opts := []QueueOption{
WithQueueDeclareOptions(
WithQueueDeclareDurable(true),
WithQueueDeclareAutoDelete(true),
WithQueueDeclareExclusive(true),
WithQueueDeclareNoWait(true),
WithQueueDeclareArgs(map[string]interface{}{"foo": "bar"}),
),
WithExchangeDeclareOptions(
WithExchangeDeclareDurable(true),
WithExchangeDeclareAutoDelete(true),
WithExchangeDeclareInternal(true),
WithExchangeDeclareNoWait(true),
WithExchangeDeclareArgs(map[string]interface{}{"foo1": "bar1"}),
),
WithQueueBindOptions(
WithQueueBindNoWait(true),
WithQueueBindArgs(map[string]interface{}{"foo2": "bar2"}),
),
WithQueuePublishMandatory(true),
WithQueuePublishImmediate(true)}
o := defaultProducerOptions()
o.apply(opts...)
assert.True(t, o.queueDeclare.durable)
assert.True(t, o.queueDeclare.autoDelete)
assert.True(t, o.queueDeclare.exclusive)
assert.True(t, o.queueDeclare.noWait)
assert.Equal(t, "bar", o.queueDeclare.args["foo"])
assert.True(t, o.exchangeDeclare.durable)
assert.True(t, o.exchangeDeclare.autoDelete)
assert.True(t, o.exchangeDeclare.internal)
assert.True(t, o.exchangeDeclare.noWait)
assert.Equal(t, "bar1", o.exchangeDeclare.args["foo1"])
assert.True(t, o.queueBind.noWait)
assert.Equal(t, "bar2", o.queueBind.args["foo2"])
assert.True(t, o.mandatory)
assert.True(t, o.immediate)
}

View File

@@ -1,226 +0,0 @@
// Package producer is the generic producer-side processing logic for the four modes direct, topic, fanout, headers.
package producer
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
// ErrClosed closed
var ErrClosed = amqp.ErrClosed
const (
exchangeTypeDirect = "direct"
exchangeTypeTopic = "topic"
exchangeTypeFanout = "fanout"
exchangeTypeHeaders = "headers"
// HeadersTypeAll all
HeadersTypeAll HeadersType = "all"
// HeadersTypeAny any
HeadersTypeAny HeadersType = "any"
)
// HeadersType headers type
type HeadersType = string
// Exchange rabbitmq minimum management unit
type Exchange struct {
name string // exchange name
eType string // exchange type: direct, topic, fanout, headers
routingKey string // route key
Headers map[string]interface{} // this field is required if eType=headers.
}
// NewDirectExchange create a direct exchange
func NewDirectExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeDirect,
routingKey: routingKey,
}
}
// NewTopicExchange create a topic exchange
func NewTopicExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeTopic,
routingKey: routingKey,
}
}
// NewFanOutExchange create a fanout exchange
func NewFanOutExchange(exchangeName string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeFanout,
routingKey: "",
}
}
// NewHeaderExchange create a headers exchange, the headerType supports "all" and "any"
func NewHeaderExchange(exchangeName string, headersType HeadersType, kv map[string]interface{}) *Exchange {
if kv == nil {
kv = make(map[string]interface{})
}
switch headersType {
case HeadersTypeAll, HeadersTypeAny:
kv["x-match"] = headersType
default:
kv["x-match"] = HeadersTypeAll
}
return &Exchange{
name: exchangeName,
eType: exchangeTypeHeaders,
routingKey: "",
Headers: kv,
}
}
// -------------------------------------------------------------------------------------------
// Queue session
type Queue struct {
queueName string // queue name
exchange *Exchange // exchange
conn *amqp.Connection // rabbitmq connection
ch *amqp.Channel // rabbitmq channel
// If true, the message will be returned to the sender if the queue cannot be
// found according to its own exchange type and routeKey rules.
mandatory bool
// If true, when exchange sends a message to the queue and finds that there
// are no consumers on the queue, it returns the message to the sender
immediate bool
}
// NewQueue create a queue
func NewQueue(queueName string, conn *amqp.Connection, exchange *Exchange, opts ...QueueOption) (*Queue, error) {
o := defaultProducerOptions()
o.apply(opts...)
// crate a new channel
ch, err := conn.Channel()
if err != nil {
return nil, err
}
// declare a queue and create it automatically if it doesn't exist, or skip creation if it does.
q, err := ch.QueueDeclare(
queueName,
o.queueDeclare.durable,
o.queueDeclare.autoDelete,
o.queueDeclare.exclusive,
o.queueDeclare.noWait,
o.queueDeclare.args,
)
if err != nil {
return nil, err
}
// declare the exchange type
err = ch.ExchangeDeclare(
exchange.name,
exchange.eType,
o.exchangeDeclare.durable,
o.exchangeDeclare.autoDelete,
o.exchangeDeclare.internal,
o.exchangeDeclare.noWait,
o.exchangeDeclare.args,
)
if err != nil {
return nil, err
}
args := o.queueBind.args
if exchange.eType == exchangeTypeHeaders {
args = exchange.Headers
}
// Binding queue and exchange
err = ch.QueueBind(
q.Name,
exchange.routingKey,
exchange.name,
o.queueBind.noWait,
args,
)
if err != nil {
return nil, err
}
return &Queue{
queueName: queueName,
conn: conn,
ch: ch,
exchange: exchange,
mandatory: o.mandatory,
immediate: o.immediate,
}, nil
}
// Publish send direct or fanout type message
func (q *Queue) Publish(ctx context.Context, body []byte) error {
if q.exchange.eType != exchangeTypeDirect && q.exchange.eType != exchangeTypeFanout {
return fmt.Errorf("invalid exchange type (%s), only supports direct or fanout types", q.exchange.eType)
}
return q.ch.PublishWithContext(
ctx,
q.exchange.name,
q.exchange.routingKey,
q.mandatory,
q.immediate,
amqp.Publishing{
ContentType: "text/plain",
Body: body,
},
)
}
// PublishTopic send topic type message
func (q *Queue) PublishTopic(ctx context.Context, topicKey string, body []byte) error {
if q.exchange.eType != exchangeTypeTopic {
return fmt.Errorf("invalid exchange type (%s), only supports topic type", q.exchange.eType)
}
return q.ch.PublishWithContext(
ctx,
q.exchange.name,
topicKey,
q.mandatory,
q.immediate,
amqp.Publishing{
ContentType: "text/plain",
Body: body,
},
)
}
// PublishHeaders send headers type message
func (q *Queue) PublishHeaders(ctx context.Context, headersKey map[string]interface{}, body []byte) error {
if q.exchange.eType != exchangeTypeHeaders {
return fmt.Errorf("invalid exchange type (%s), only supports headers type", q.exchange.eType)
}
return q.ch.PublishWithContext(
ctx,
q.exchange.name,
q.exchange.routingKey,
q.mandatory,
q.immediate,
amqp.Publishing{
Headers: headersKey,
ContentType: "text/plain",
Body: body,
},
)
}
// Close the queue
func (q *Queue) Close() {
if q.ch != nil {
_ = q.ch.Close()
}
}

View File

@@ -1,334 +0,0 @@
package producer
import (
"context"
"strconv"
"testing"
"time"
"github.com/zhufuyi/sponge/pkg/rabbitmq"
"github.com/zhufuyi/sponge/pkg/utils"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
)
const (
url = "amqp://guest:guest@192.168.3.37:5672/"
)
func TestProducer_direct(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer c.Close()
ctx := context.Background()
exchangeName := "direct-exchange-demo"
queueName := "direct-queue-1"
routeKey := "direct-key-1"
exchange := NewDirectExchange(exchangeName, routeKey)
q, err := NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
for i := 0; i < 10; i++ {
err = q.Publish(ctx, []byte(routeKey+" say hello "+strconv.Itoa(i)))
if err != nil {
t.Error(err)
return
}
}
})
}
func TestProducer_topic(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer c.Close()
ctx := context.Background()
exchangeName := "topic-exchange-demo"
queueName := "topic-queue-1"
routingKey := "key1.key2.*"
exchange := NewTopicExchange(exchangeName, routingKey)
q, err := NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
queueName = "topic-queue-2"
routingKey = "*.key2"
exchange = NewTopicExchange(exchangeName, routingKey)
q, err = NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
queueName = "topic-queue-3"
routingKey = "key1.#"
exchange = NewTopicExchange(exchangeName, routingKey)
q, err = NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
queueName = "topic-queue-4"
routingKey = "#.key3"
exchange = NewTopicExchange(exchangeName, routingKey)
q, err = NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
keys := []string{
"key1", // only match queue 3
"key1.key2", // only match queue 2 and 3
"key2.key3", // only match queue 4
"key1.key2.key3", // match queue 1,2,3,4
}
for _, key := range keys {
err = q.PublishTopic(ctx, key, []byte(key+" say hello "))
if err != nil {
t.Error(err)
return
}
}
})
}
func TestProducer_fanout(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer c.Close()
ctx := context.Background()
exchangeName := "fanout-exchange-demo"
queueName := "fanout-queue-1"
exchange := NewFanOutExchange(exchangeName)
q, err := NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
queueName = "fanout-queue-2"
exchange = NewFanOutExchange(exchangeName)
q, err = NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
queueName = "fanout-queue-3"
exchange = NewFanOutExchange(exchangeName)
q, err = NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
// queues 1,2 and 3 can receive the same messages.
for i := 0; i < 10; i++ {
err = q.Publish(ctx, []byte(" say hello "+strconv.Itoa(i)))
if err != nil {
t.Error(err)
return
}
}
})
}
func TestProducer_headers(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
c, err := rabbitmq.NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer c.Close()
ctx := context.Background()
exchangeName := "headers-exchange-demo"
// the message is only received if there is an exact match for headers
queueName := "headers-queue-1"
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
exchange := NewHeaderExchange(exchangeName, HeadersTypeAll, kv1)
q, err := NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Log(err)
return
}
defer q.Close()
headersKey1 := kv1 // exact match, consumer queue can receive messages
err = q.PublishHeaders(ctx, headersKey1, []byte("say hello 1"))
if err != nil {
t.Error(err)
return
}
headersKey1 = map[string]interface{}{"foo": "bar"} // there is a complete mismatch and the consumer queue cannot receive the message
err = q.PublishHeaders(ctx, headersKey1, []byte("say hello 2"))
if err != nil {
t.Error(err)
return
}
headersKey1 = map[string]interface{}{"foo1": "bar1"} // partial match, consumer queue cannot receive message
err = q.PublishHeaders(ctx, headersKey1, []byte("say hello 3"))
if err != nil {
t.Error(err)
return
}
// only partial matches of headers are needed to receive the message
queueName = "headers-queue-2"
kv2 := map[string]interface{}{"hello2": "world2", "foo2": "bar2"}
exchange = NewHeaderExchange(exchangeName, HeadersTypeAny, kv2)
q, err = NewQueue(queueName, c.Conn, exchange)
if err != nil {
t.Error(err)
return
}
defer q.Close()
headersKey2 := kv2 // exact match, consumer queue can receive messages
err = q.PublishHeaders(ctx, headersKey2, []byte("say hello 4"))
if err != nil {
t.Error(err)
return
}
headersKey2 = map[string]interface{}{"foo": "bar"} // there is a complete mismatch and the consumer queue cannot receive the message
err = q.PublishHeaders(ctx, headersKey2, []byte("say hello 5"))
if err != nil {
t.Error(err)
return
}
headersKey2 = map[string]interface{}{"foo2": "bar2"} // partial match, the consumer queue can receive the message
err = q.PublishHeaders(ctx, headersKey2, []byte("say hello 6"))
if err != nil {
t.Error(err)
return
}
})
}
func TestQueueErr(t *testing.T) {
q := &Queue{
queueName: "test",
exchange: &Exchange{
name: "test",
eType: "unknown",
routingKey: "test",
},
//queue: amqp.Queue{},
}
ctx := context.Background()
err := q.Publish(ctx, []byte("test"))
assert.Error(t, err)
err = q.PublishTopic(ctx, "", []byte("test"))
assert.Error(t, err)
err = q.PublishHeaders(ctx, nil, []byte("test"))
assert.Error(t, err)
}
func TestNewExchange(t *testing.T) {
NewDirectExchange("foo", "bar")
NewTopicExchange("foo", "bar")
NewFanOutExchange("foo")
NewHeaderExchange("foo", HeadersTypeAll, nil)
NewHeaderExchange("foo", "bar", nil)
}
func TestNewQueue(t *testing.T) {
defer func() { recover() }()
q, err := NewQueue("foo", &amqp.Connection{}, NewDirectExchange("foo", "bar"))
if err != nil {
t.Log(err)
return
}
q.Close()
}
func TestPublish(t *testing.T) {
q := Queue{
queueName: "foo",
conn: &amqp.Connection{},
ch: &amqp.Channel{},
mandatory: false,
immediate: false,
}
defer func() { recover() }()
q.exchange = NewTopicExchange("foo", "bar")
_ = q.Publish(context.Background(), []byte("test"))
q.exchange = NewDirectExchange("foo", "bar")
_ = q.Publish(context.Background(), []byte("test"))
}
func TestPublishTopic(t *testing.T) {
q := Queue{
queueName: "foo",
conn: &amqp.Connection{},
ch: &amqp.Channel{},
mandatory: false,
immediate: false,
}
defer func() { recover() }()
q.exchange = NewDirectExchange("foo", "bar")
_ = q.PublishTopic(context.Background(), "foo", []byte("bar"))
q.exchange = NewTopicExchange("foo", "bar")
_ = q.PublishTopic(context.Background(), "foo", []byte("bar"))
}
func TestPublishHeaders(t *testing.T) {
q := Queue{
queueName: "foo",
conn: &amqp.Connection{},
ch: &amqp.Channel{},
mandatory: false,
immediate: false,
}
defer func() { recover() }()
q.exchange = NewDirectExchange("foo", "bar")
_ = q.PublishHeaders(context.Background(), nil, []byte("bar"))
q.exchange = NewHeaderExchange("foo", "bar", nil)
_ = q.PublishHeaders(context.Background(), nil, []byte("bar"))
}

View File

@@ -0,0 +1,426 @@
package rabbitmq
import (
"context"
"strconv"
"testing"
"time"
"github.com/zhufuyi/sponge/pkg/utils"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
)
func TestProducerOptions(t *testing.T) {
opts := []ProducerOption{
WithProducerExchangeDeclareOptions(
WithExchangeDeclareAutoDelete(true),
WithExchangeDeclareInternal(true),
WithExchangeDeclareNoWait(true),
WithExchangeDeclareArgs(map[string]interface{}{"foo1": "bar1"}),
),
WithProducerQueueDeclareOptions(
WithQueueDeclareAutoDelete(true),
WithQueueDeclareExclusive(true),
WithQueueDeclareNoWait(true),
WithQueueDeclareArgs(map[string]interface{}{"foo": "bar"}),
),
WithProducerQueueBindOptions(
WithQueueBindNoWait(true),
WithQueueBindArgs(map[string]interface{}{"foo2": "bar2"}),
),
WithProducerPersistent(true),
WithProducerMandatory(true),
}
o := defaultProducerOptions()
o.apply(opts...)
assert.True(t, o.queueDeclare.autoDelete)
assert.True(t, o.queueDeclare.exclusive)
assert.True(t, o.queueDeclare.noWait)
assert.Equal(t, "bar", o.queueDeclare.args["foo"])
assert.True(t, o.exchangeDeclare.autoDelete)
assert.True(t, o.exchangeDeclare.internal)
assert.True(t, o.exchangeDeclare.noWait)
assert.Equal(t, "bar1", o.exchangeDeclare.args["foo1"])
assert.True(t, o.queueBind.noWait)
assert.Equal(t, "bar2", o.queueBind.args["foo2"])
assert.True(t, o.isPersistent)
assert.True(t, o.mandatory)
}
func TestProducer_direct(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer connection.Close()
ctx := context.Background()
exchangeName := "direct-exchange-demo"
queueName := "direct-queue-demo"
routingKey := "info"
exchange := NewDirectExchange(exchangeName, routingKey)
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
t.Log(err)
return
}
defer p.Close()
for i := 1; i <= 10; i++ {
err = p.PublishDirect(ctx, []byte(routingKey+" say hello "+strconv.Itoa(i)))
if err != nil {
t.Error(err)
return
}
}
})
}
func TestProducer_topic(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer connection.Close()
ctx := context.Background()
exchangeName := "topic-exchange-demo"
queueName := "topic-queue-1"
routingKey := "*.orange.*"
exchange := NewTopicExchange(exchangeName, routingKey)
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
t.Log(err)
return
}
defer p.Close()
key := "key1.orange.key3"
err = p.PublishTopic(ctx, key, []byte(key+" say hello"))
queueName = "topic-queue-2"
routingKey = "*.*.rabbit"
exchange = NewTopicExchange(exchangeName, routingKey)
p, err = NewProducer(exchange, queueName, connection)
if err != nil {
t.Log(err)
return
}
defer p.Close()
key = "key1.key2.rabbit"
err = p.PublishTopic(ctx, key, []byte(key+" say hello"))
queueName = "topic-queue-2"
routingKey = "lazy.#"
exchange = NewTopicExchange(exchangeName, routingKey)
p, err = NewProducer(exchange, queueName, connection)
if err != nil {
t.Log(err)
return
}
defer p.Close()
key = "lazy.key2.key3"
err = p.PublishTopic(ctx, key, []byte(key+" say hello"))
})
}
func TestProducer_fanout(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer connection.Close()
ctx := context.Background()
exchangeName := "fanout-exchange-demo"
queueNames := []string{"fanout-queue-1", "fanout-queue-2", "fanout-queue-3"}
for _, queueName := range queueNames {
exchange := NewFanoutExchange(exchangeName)
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
t.Log(err)
return
}
defer p.Close()
err = p.PublishFanout(ctx, []byte(queueName+" say hello"))
if err != nil {
t.Error(err)
return
}
}
})
}
func TestProducer_headers(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer connection.Close()
ctx := context.Background()
exchangeName := "headers-exchange-demo"
// the message is only received if there is an exact match for headers
queueName := "headers-queue-1"
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
exchange := NewHeadersExchange(exchangeName, HeadersTypeAll, kv1)
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
t.Log(err)
return
}
defer p.Close()
headersKey1 := kv1 // exact match, consumer queue can receive messages
err = p.PublishHeaders(ctx, headersKey1, []byte("say hello 1"))
if err != nil {
t.Error(err)
return
}
headersKey1 = map[string]interface{}{"foo": "bar"} // there is a complete mismatch and the consumer queue cannot receive the message
err = p.PublishHeaders(ctx, headersKey1, []byte("say hello 2"))
if err != nil {
t.Error(err)
return
}
headersKey1 = map[string]interface{}{"foo1": "bar1"} // partial match, consumer queue cannot receive message
err = p.PublishHeaders(ctx, headersKey1, []byte("say hello 3"))
if err != nil {
t.Error(err)
return
}
// only partial matches of headers are needed to receive the message
queueName = "headers-queue-2"
kv2 := map[string]interface{}{"hello2": "world2", "foo2": "bar2"}
exchange = NewHeadersExchange(exchangeName, HeadersTypeAny, kv2)
p, err = NewProducer(exchange, queueName, connection)
if err != nil {
t.Error(err)
return
}
defer p.Close()
headersKey2 := kv2 // exact match, consumer queue can receive messages
err = p.PublishHeaders(ctx, headersKey2, []byte("say hello 4"))
if err != nil {
t.Error(err)
return
}
headersKey2 = map[string]interface{}{"foo": "bar"} // there is a complete mismatch and the consumer queue cannot receive the message
err = p.PublishHeaders(ctx, headersKey2, []byte("say hello 5"))
if err != nil {
t.Error(err)
return
}
headersKey2 = map[string]interface{}{"foo2": "bar2"} // partial match, the consumer queue can receive the message
err = p.PublishHeaders(ctx, headersKey2, []byte("say hello 6"))
if err != nil {
t.Error(err)
return
}
})
}
func TestProducer_delayedMessage(t *testing.T) {
utils.SafeRunWithTimeout(time.Second*6, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
t.Log(err)
return
}
defer connection.Close()
ctx := context.Background()
exchangeName := "delayed-message-exchange-demo"
queueName := "delayed-message-queue"
routingKey := "delayed-key"
e := NewDirectExchange("", routingKey)
exchange := NewDelayedMessageExchange(exchangeName, e)
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
t.Log(err)
return
}
defer p.Close()
for i := 0; i < 3; i++ {
err = p.PublishDelayedMessage(ctx, time.Second*10, []byte("say hello "+time.Now().Format(datetimeLayout)))
if err != nil {
t.Error(err)
return
}
time.Sleep(time.Second)
}
})
}
func TestPublishErr(t *testing.T) {
p := &Producer{
QueueName: "test",
Exchange: &Exchange{
name: "test",
eType: "unknown",
routingKey: "test",
},
}
ctx := context.Background()
err := p.PublishDirect(ctx, []byte("data"))
assert.Error(t, err)
err = p.PublishFanout(ctx, []byte("data"))
assert.Error(t, err)
err = p.PublishTopic(ctx, "", []byte("data"))
assert.Error(t, err)
err = p.PublishHeaders(ctx, nil, []byte("data"))
assert.Error(t, err)
err = p.PublishDelayedMessage(ctx, time.Second, []byte("data"))
assert.Error(t, err)
}
func TestPublishDirect(t *testing.T) {
p := &Producer{
QueueName: "foo",
conn: &amqp.Connection{},
ch: &amqp.Channel{},
isPersistent: true,
mandatory: true,
}
defer func() { recover() }()
ctx := context.Background()
p.Exchange = NewDirectExchange("foo", "bar")
_ = p.PublishDirect(ctx, []byte("data"))
}
func TestPublishTopic(t *testing.T) {
p := &Producer{
QueueName: "foo",
conn: &amqp.Connection{},
ch: &amqp.Channel{},
isPersistent: true,
mandatory: true,
}
defer func() { recover() }()
ctx := context.Background()
p.Exchange = NewDirectExchange("foo", "bar")
_ = p.PublishTopic(ctx, "foo", []byte("data"))
p.Exchange = NewTopicExchange("foo", "bar")
_ = p.PublishTopic(ctx, "foo", []byte("data"))
}
func TestPublishFanout(t *testing.T) {
p := &Producer{
QueueName: "foo",
conn: &amqp.Connection{},
ch: &amqp.Channel{},
isPersistent: true,
mandatory: true,
}
defer func() { recover() }()
ctx := context.Background()
p.Exchange = NewFanoutExchange("foo")
_ = p.PublishFanout(ctx, []byte("data"))
}
func TestPublishHeaders(t *testing.T) {
p := &Producer{
QueueName: "foo",
conn: &amqp.Connection{},
ch: &amqp.Channel{},
isPersistent: true,
mandatory: true,
}
defer func() { recover() }()
ctx := context.Background()
p.Exchange = NewDirectExchange("foo", "bar")
_ = p.PublishHeaders(ctx, nil, []byte("data"))
p.Exchange = NewHeadersExchange("foo", "bar", nil)
_ = p.PublishHeaders(ctx, nil, []byte("data"))
}
func TestPublishDelayedMessage(t *testing.T) {
p := &Producer{
QueueName: "foo",
conn: &amqp.Connection{},
ch: &amqp.Channel{},
isPersistent: true,
mandatory: true,
}
defer func() { recover() }()
ctx := context.Background()
p.Exchange = NewDelayedMessageExchange("foo", NewTopicExchange("", "bar"))
_ = p.PublishDelayedMessage(ctx, time.Second, []byte("data"))
p.Exchange = NewDelayedMessageExchange("foo", NewHeadersExchange("", HeadersTypeAll, nil))
_ = p.PublishDelayedMessage(ctx, time.Second, []byte("data"))
p.Exchange = NewDelayedMessageExchange("foo", NewDirectExchange("", "bar"))
_ = p.PublishDelayedMessage(ctx, time.Second, []byte("data"))
}
func TestProducerErr(t *testing.T) {
exchangeName := "direct-exchange-demo"
queueName := "direct-queue-1"
routeKey := "direct-key-1"
exchange := NewDirectExchange(exchangeName, routeKey)
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
defer cancel()
_, err := NewProducer(exchange, queueName, &Connection{conn: &amqp.Connection{}})
if err != nil {
t.Log(err)
return
}
})
p := &Producer{conn: &amqp.Connection{}, ch: &amqp.Channel{}}
utils.SafeRun(context.Background(), func(ctx context.Context) {
_ = p.PublishDirect(context.Background(), []byte("hello world"))
})
utils.SafeRun(context.Background(), func(ctx context.Context) {
p.Close()
})
}
func Test_printFields(t *testing.T) {
exchange := NewDirectExchange("foo", "bar")
fields := logFields("queue", exchange)
t.Log(fields)
exchange = NewHeadersExchange("foo", HeadersTypeAny, map[string]interface{}{"hello": "world"})
fields = logFields("queue", exchange)
t.Log(fields)
e := NewDirectExchange("", "bar")
exchange = NewDelayedMessageExchange("foo", e)
fields = logFields("queue", exchange)
t.Log(fields)
e = NewHeadersExchange("", HeadersTypeAny, map[string]interface{}{"hello": "world"})
exchange = NewDelayedMessageExchange("foo", e)
fields = logFields("queue", exchange)
t.Log(fields)
}

83
pkg/rabbitmq/publisher.go Normal file
View File

@@ -0,0 +1,83 @@
package rabbitmq
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
// Publisher session
type Publisher struct {
*Producer
}
// NewPublisher create a publisher, channelName is exchange name
func NewPublisher(channelName string, connection *Connection, opts ...ProducerOption) (*Publisher, error) {
o := defaultProducerOptions()
o.apply(opts...)
exchange := NewFanoutExchange(channelName)
// crate a new channel
ch, err := connection.conn.Channel()
if err != nil {
return nil, err
}
// declare the exchange type
err = ch.ExchangeDeclare(
channelName,
exchangeTypeFanout,
o.isPersistent,
o.exchangeDeclare.autoDelete,
o.exchangeDeclare.internal,
o.exchangeDeclare.noWait,
o.exchangeDeclare.args,
)
if err != nil {
_ = ch.Close()
return nil, err
}
deliveryMode := amqp.Persistent
if !o.isPersistent {
deliveryMode = amqp.Transient
}
connection.zapLog.Info("[rabbit producer] initialized", zap.String("channel", channelName), zap.Bool("isPersistent", o.isPersistent))
p := &Producer{
Exchange: exchange,
conn: connection.conn,
ch: ch,
isPersistent: o.isPersistent,
deliveryMode: deliveryMode,
mandatory: o.mandatory,
zapLog: connection.zapLog,
}
return &Publisher{p}, nil
}
func (p *Publisher) Publish(ctx context.Context, body []byte) error {
return p.ch.PublishWithContext(
ctx,
p.Exchange.name,
p.Exchange.routingKey,
p.mandatory,
false,
amqp.Publishing{
DeliveryMode: p.deliveryMode,
ContentType: "text/plain",
Body: body,
},
)
}
// Close publisher
func (p *Publisher) Close() {
if p.ch != nil {
_ = p.ch.Close()
}
}

View File

@@ -0,0 +1,70 @@
package rabbitmq
import (
"context"
"fmt"
"testing"
"time"
"github.com/zhufuyi/sponge/pkg/utils"
amqp "github.com/rabbitmq/amqp091-go"
)
var testChannelName = "pub-sub"
func runPublisher(ctx context.Context, channelName string) error {
var publisherErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
publisherErr = err
return
}
defer connection.Close()
p, err := NewPublisher(channelName, connection)
if err != nil {
publisherErr = err
return
}
defer p.Close()
data := []byte("hello world " + time.Now().Format(datetimeLayout))
err = p.Publish(ctx, data)
if err != nil {
publisherErr = err
return
}
fmt.Printf("[send]: %s\n", data)
})
return publisherErr
}
func TestPublisher(t *testing.T) {
err := runPublisher(context.Background(), testChannelName)
if err != nil {
t.Log(err)
return
}
}
func TestPublisherErr(t *testing.T) {
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
defer cancel()
_, err := NewPublisher(testChannelName, &Connection{conn: &amqp.Connection{}})
if err != nil {
t.Log(err)
return
}
})
p := &Publisher{&Producer{conn: &amqp.Connection{}, ch: &amqp.Channel{}}}
utils.SafeRun(context.Background(), func(ctx context.Context) {
_ = p.Publish(context.Background(), []byte("hello world"))
})
utils.SafeRun(context.Background(), func(ctx context.Context) {
p.Close()
})
}

View File

@@ -0,0 +1,33 @@
package rabbitmq
import (
"context"
)
// Subscriber session
type Subscriber struct {
*Consumer
}
// NewSubscriber create a subscriber, channelName is exchange name, identifier is queue name
func NewSubscriber(channelName string, identifier string, connection *Connection, opts ...ConsumerOption) (*Subscriber, error) {
exchange := NewFanoutExchange(channelName)
queueName := identifier
c, err := NewConsumer(exchange, queueName, connection, opts...)
if err != nil {
return nil, err
}
return &Subscriber{c}, nil
}
// Subscribe and handle message
func (s *Subscriber) Subscribe(ctx context.Context, handler Handler) {
s.Consume(ctx, handler)
}
// Close subscriber
func (s *Subscriber) Close() {
if s.ch != nil {
_ = s.ch.Close()
}
}

View File

@@ -0,0 +1,75 @@
package rabbitmq
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"testing"
"time"
"github.com/zhufuyi/sponge/pkg/utils"
)
func TestSubscriber(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
err := runPublisher(ctx, testChannelName)
if err != nil {
t.Log(err)
return
}
err = runSubscriber(ctx, testChannelName, "fanout-queue-1")
if err != nil {
t.Log(err)
return
}
err = runSubscriber(ctx, testChannelName, "fanout-queue-2")
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func runSubscriber(ctx context.Context, channelName string, identifier string) error {
var subscriberErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
subscriberErr = err
return
}
s, err := NewSubscriber(channelName, identifier, connection, WithConsumerAutoAck(false))
if err != nil {
subscriberErr = err
return
}
s.Subscribe(ctx, handler)
})
return subscriberErr
}
func TestSubscriberErr(t *testing.T) {
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
defer cancel()
_, err := NewSubscriber(testChannelName, "fanout-queue-1", &Connection{conn: &amqp.Connection{}})
if err != nil {
t.Log(err)
return
}
})
s := &Subscriber{&Consumer{connection: &Connection{conn: &amqp.Connection{}}, ch: &amqp.Channel{}}}
utils.SafeRun(context.Background(), func(ctx context.Context) {
s.Subscribe(context.Background(), handler)
})
utils.SafeRun(context.Background(), func(ctx context.Context) {
s.Close()
})
}