Files
sponge/pkg/rabbitmq/consumer_test.go
2024-12-19 13:31:29 +08:00

385 lines
9.1 KiB
Go

package rabbitmq
import (
"context"
"fmt"
"testing"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/go-dev-frame/sponge/pkg/utils"
)
func TestConsumerOptions(t *testing.T) {
opts := []ConsumerOption{
WithConsumerExchangeDeclareOptions(
WithExchangeDeclareAutoDelete(true),
WithExchangeDeclareInternal(true),
WithExchangeDeclareNoWait(true),
WithExchangeDeclareArgs(map[string]interface{}{"foo1": "bar1"}),
),
WithConsumerQueueDeclareOptions(
WithQueueDeclareAutoDelete(true),
WithQueueDeclareExclusive(true),
WithQueueDeclareNoWait(true),
WithQueueDeclareArgs(map[string]interface{}{"foo": "bar"}),
),
WithConsumerQueueBindOptions(
WithQueueBindNoWait(true),
WithQueueBindArgs(map[string]interface{}{"foo2": "bar2"}),
),
WithConsumerConsumeOptions(
WithConsumeConsumer("test"),
WithConsumeExclusive(true),
WithConsumeNoLocal(true),
WithConsumeNoWait(true),
WithConsumeArgs(map[string]interface{}{"foo": "bar"}),
),
WithConsumerQosOptions(
WithQosEnable(),
WithQosPrefetchCount(1),
WithQosPrefetchSize(4096),
WithQosPrefetchGlobal(true),
),
WithConsumerAutoAck(true),
WithConsumerPersistent(true),
}
o := defaultConsumerOptions()
o.apply(opts...)
assert.True(t, o.queueDeclare.autoDelete)
assert.True(t, o.queueDeclare.exclusive)
assert.True(t, o.queueDeclare.noWait)
assert.Equal(t, "bar", o.queueDeclare.args["foo"])
assert.True(t, o.exchangeDeclare.autoDelete)
assert.True(t, o.exchangeDeclare.internal)
assert.True(t, o.exchangeDeclare.noWait)
assert.Equal(t, "bar1", o.exchangeDeclare.args["foo1"])
assert.True(t, o.queueBind.noWait)
assert.Equal(t, "bar2", o.queueBind.args["foo2"])
assert.True(t, o.isPersistent)
assert.True(t, o.isAutoAck)
}
var handler = func(ctx context.Context, data []byte, tagID string) error {
fmt.Printf("[received]: tagID=%s, data=%s\n", tagID, data)
return nil
}
func consume(ctx context.Context, queueName string, exchange *Exchange) error {
var consumeErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
consumeErr = err
return
}
c, err := NewConsumer(exchange, queueName, connection, WithConsumerAutoAck(false))
if err != nil {
consumeErr = err
return
}
c.Consume(ctx, handler)
})
return consumeErr
}
func TestConsumer_direct(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
exchangeName := "direct-exchange-demo"
queueName := "direct-queue-1"
routeKey := "direct-key-1"
exchange := NewDirectExchange(exchangeName, routeKey)
err := producerDirect(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_topic(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
exchangeName := "topic-exchange-demo"
queueName := "topic-queue-1"
routingKey := "key1.key2.*"
exchange := NewTopicExchange(exchangeName, routingKey)
err := producerTopic(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_fanout(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
exchangeName := "fanout-exchange-demo"
queueName := "fanout-queue-1"
exchange := NewFanoutExchange(exchangeName)
err := producerFanout(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_headers(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
exchangeName := "headers-exchange-demo"
queueName := "headers-queue-1"
kv1 := map[string]interface{}{"hello1": "world1", "foo1": "bar1"}
exchange := NewHeadersExchange(exchangeName, HeadersTypeAll, kv1) // all
err := producerHeaders(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func TestConsumer_delayedMessage(t *testing.T) {
ctx, _ := context.WithTimeout(context.Background(), time.Second*7)
exchangeName := "delayed-message-exchange-demo"
queueName := "delayed-message-queue"
routingKey := "delayed-key"
exchange := NewDelayedMessageExchange(exchangeName, NewDirectExchange("", routingKey))
err := producerDelayedMessage(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
err = consume(ctx, queueName, exchange)
if err != nil {
t.Log(err)
return
}
<-ctx.Done()
time.Sleep(time.Millisecond * 100)
}
func producerDirect(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
_ = p.PublishDirect(ctx, []byte("say hello 1"))
_ = p.PublishDirect(ctx, []byte("say hello 2"))
producerErr = p.PublishDirect(ctx, []byte("say hello 3"))
})
return producerErr
}
func producerTopic(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
key := "key1.key2.key3"
producerErr = p.PublishTopic(ctx, key, []byte(key+" say hello"))
})
return producerErr
}
func producerFanout(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
producerErr = p.PublishFanout(ctx, []byte(" say hello"))
})
return producerErr
}
func producerHeaders(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
headersKey1 := exchange.headersKeys
err = p.PublishHeaders(ctx, headersKey1, []byte("say hello 1"))
if err != nil {
producerErr = err
return
}
headersKey1 = map[string]interface{}{"foo": "bar"}
producerErr = p.PublishHeaders(ctx, headersKey1, []byte("say hello 2"))
})
return producerErr
}
func producerDelayedMessage(ctx context.Context, queueName string, exchange *Exchange) error {
var producerErr error
utils.SafeRunWithTimeout(time.Second*6, func(cancel context.CancelFunc) {
defer cancel()
connection, err := NewConnection(url)
if err != nil {
producerErr = err
return
}
defer connection.Close()
p, err := NewProducer(exchange, queueName, connection)
if err != nil {
producerErr = err
return
}
defer p.Close()
producerErr = p.PublishDelayedMessage(ctx, time.Second*5, []byte("say hello "+time.Now().Format(datetimeLayout)))
time.Sleep(time.Second)
producerErr = p.PublishDelayedMessage(ctx, time.Second*5, []byte("say hello "+time.Now().Format(datetimeLayout)))
})
return producerErr
}
func TestConsumerErr(t *testing.T) {
connection := &Connection{
exit: make(chan struct{}),
zapLog: zap.NewNop(),
conn: &amqp.Connection{},
isConnected: true,
}
exchange := NewDirectExchange("foo", "bar")
c, err := NewConsumer(exchange, "test", connection, WithConsumerQosOptions(
WithQosEnable(),
WithQosPrefetchCount(1)),
)
if err != nil {
t.Log(err)
return
}
t.Log(c.Count())
c.ch = &amqp.Channel{}
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
defer cancel()
err := c.initialize()
if err != nil {
t.Log(err)
return
}
})
utils.SafeRunWithTimeout(time.Second, func(cancel context.CancelFunc) {
defer cancel()
_, err := c.consumeWithContext(context.Background())
if err != nil {
t.Log(err)
return
}
})
utils.SafeRunWithTimeout(time.Second*3, func(cancel context.CancelFunc) {
defer cancel()
c.Consume(context.Background(), handler)
})
utils.SafeRun(context.Background(), func(ctx context.Context) {
c.Close()
})
time.Sleep(time.Millisecond * 2500)
close(c.connection.exit)
}