Files
sponge/pkg/rabbitmq/consumer.go
2024-06-10 11:53:52 +08:00

452 lines
11 KiB
Go

package rabbitmq
import (
"context"
"strconv"
"strings"
"sync/atomic"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
// ConsumerOption consumer option.
type ConsumerOption func(*consumerOptions)
type consumerOptions struct {
exchangeDeclare *exchangeDeclareOptions
queueDeclare *queueDeclareOptions
queueBind *queueBindOptions
qos *qosOptions
consume *consumeOptions
isPersistent bool // persistent or not
isAutoAck bool // auto-answer or not, if false, manual ACK required
}
func (o *consumerOptions) apply(opts ...ConsumerOption) {
for _, opt := range opts {
opt(o)
}
}
// default consumer settings
func defaultConsumerOptions() *consumerOptions {
return &consumerOptions{
exchangeDeclare: defaultExchangeDeclareOptions(),
queueDeclare: defaultQueueDeclareOptions(),
queueBind: defaultQueueBindOptions(),
qos: defaultQosOptions(),
consume: defaultConsumeOptions(),
isPersistent: true,
isAutoAck: true,
}
}
// WithConsumerExchangeDeclareOptions set exchange declare option.
func WithConsumerExchangeDeclareOptions(opts ...ExchangeDeclareOption) ConsumerOption {
return func(o *consumerOptions) {
o.exchangeDeclare.apply(opts...)
}
}
// WithConsumerQueueDeclareOptions set queue declare option.
func WithConsumerQueueDeclareOptions(opts ...QueueDeclareOption) ConsumerOption {
return func(o *consumerOptions) {
o.queueDeclare.apply(opts...)
}
}
// WithConsumerQueueBindOptions set queue bind option.
func WithConsumerQueueBindOptions(opts ...QueueBindOption) ConsumerOption {
return func(o *consumerOptions) {
o.queueBind.apply(opts...)
}
}
// WithConsumerQosOptions set consume qos option.
func WithConsumerQosOptions(opts ...QosOption) ConsumerOption {
return func(o *consumerOptions) {
o.qos.apply(opts...)
}
}
// WithConsumerConsumeOptions set consumer consume option.
func WithConsumerConsumeOptions(opts ...ConsumeOption) ConsumerOption {
return func(o *consumerOptions) {
o.consume.apply(opts...)
}
}
// WithConsumerAutoAck set consumer auto ack option, if false, manual ACK required.
func WithConsumerAutoAck(enable bool) ConsumerOption {
return func(o *consumerOptions) {
o.isAutoAck = enable
}
}
// WithConsumerPersistent set consumer persistent option.
func WithConsumerPersistent(enable bool) ConsumerOption {
return func(o *consumerOptions) {
o.isPersistent = enable
}
}
// -------------------------------------------------------------------------------------------
// ConsumeOption consume option.
type ConsumeOption func(*consumeOptions)
type consumeOptions struct {
consumer string // used to distinguish between multiple consumers
exclusive bool // only available to the program that created it
noLocal bool // if set to true, a message sent by a producer in the same Connection cannot be passed to a consumer in this Connection.
noWait bool // block processing
args amqp.Table // additional properties
}
func (o *consumeOptions) apply(opts ...ConsumeOption) {
for _, opt := range opts {
opt(o)
}
}
// default consume settings
func defaultConsumeOptions() *consumeOptions {
return &consumeOptions{
consumer: "",
exclusive: false,
noLocal: false,
noWait: false,
args: nil,
}
}
// WithConsumeConsumer set consume consumer option.
func WithConsumeConsumer(consumer string) ConsumeOption {
return func(o *consumeOptions) {
o.consumer = consumer
}
}
// WithConsumeExclusive set consume exclusive option.
func WithConsumeExclusive(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.exclusive = enable
}
}
// WithConsumeNoLocal set consume noLocal option.
func WithConsumeNoLocal(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.noLocal = enable
}
}
// WithConsumeNoWait set consume no wait option.
func WithConsumeNoWait(enable bool) ConsumeOption {
return func(o *consumeOptions) {
o.noWait = enable
}
}
// WithConsumeArgs set consume args option.
func WithConsumeArgs(args map[string]interface{}) ConsumeOption {
return func(o *consumeOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// QosOption qos option.
type QosOption func(*qosOptions)
type qosOptions struct {
enable bool
prefetchCount int
prefetchSize int
global bool
}
func (o *qosOptions) apply(opts ...QosOption) {
for _, opt := range opts {
opt(o)
}
}
// default qos settings
func defaultQosOptions() *qosOptions {
return &qosOptions{
enable: false,
prefetchCount: 0,
prefetchSize: 0,
global: false,
}
}
// WithQosEnable set qos enable option.
func WithQosEnable() QosOption {
return func(o *qosOptions) {
o.enable = true
}
}
// WithQosPrefetchCount set qos prefetch count option.
func WithQosPrefetchCount(count int) QosOption {
return func(o *qosOptions) {
o.prefetchCount = count
}
}
// WithQosPrefetchSize set qos prefetch size option.
func WithQosPrefetchSize(size int) QosOption {
return func(o *qosOptions) {
o.prefetchSize = size
}
}
// WithQosPrefetchGlobal set qos global option.
func WithQosPrefetchGlobal(enable bool) QosOption {
return func(o *qosOptions) {
o.global = enable
}
}
// -------------------------------------------------------------------------------------------
// Consumer session
type Consumer struct {
Exchange *Exchange
QueueName string
connection *Connection
ch *amqp.Channel
exchangeDeclareOption *exchangeDeclareOptions
queueDeclareOption *queueDeclareOptions
queueBindOption *queueBindOptions
qosOption *qosOptions
consumeOption *consumeOptions
isPersistent bool // persistent or not
isAutoAck bool // auto ack or not
zapLog *zap.Logger
count int64 // consumer success message number
}
// Handler message
type Handler func(ctx context.Context, data []byte, tagID string) error
//type Handler func(ctx context.Context, d *amqp.Delivery, isAutoAck bool) error
// NewConsumer create a consumer
func NewConsumer(exchange *Exchange, queueName string, connection *Connection, opts ...ConsumerOption) (*Consumer, error) {
o := defaultConsumerOptions()
o.apply(opts...)
c := &Consumer{
Exchange: exchange,
QueueName: queueName,
connection: connection,
exchangeDeclareOption: o.exchangeDeclare,
queueDeclareOption: o.queueDeclare,
queueBindOption: o.queueBind,
qosOption: o.qos,
consumeOption: o.consume,
isPersistent: o.isPersistent,
isAutoAck: o.isAutoAck,
zapLog: connection.zapLog,
}
return c, nil
}
// initialize a consumer session
func (c *Consumer) initialize() error {
c.connection.mutex.Lock()
// crate a new channel
ch, err := c.connection.conn.Channel()
if err != nil {
c.connection.mutex.Unlock()
return err
}
c.ch = ch
c.connection.mutex.Unlock()
if c.Exchange.eType == exchangeTypeDelayedMessage {
if c.exchangeDeclareOption.args == nil {
c.exchangeDeclareOption.args = amqp.Table{
"x-delayed-type": c.Exchange.delayedMessageType,
}
} else {
c.exchangeDeclareOption.args["x-delayed-type"] = c.Exchange.delayedMessageType
}
}
// declare the exchange type
err = ch.ExchangeDeclare(
c.Exchange.name,
c.Exchange.eType,
c.isPersistent,
c.exchangeDeclareOption.autoDelete,
c.exchangeDeclareOption.internal,
c.exchangeDeclareOption.noWait,
c.exchangeDeclareOption.args,
)
if err != nil {
_ = ch.Close()
return err
}
// declare a queue and create it automatically if it doesn't exist, or skip creation if it does.
queue, err := ch.QueueDeclare(
c.QueueName,
c.isPersistent,
c.queueDeclareOption.autoDelete,
c.queueDeclareOption.exclusive,
c.queueDeclareOption.noWait,
c.queueDeclareOption.args,
)
if err != nil {
_ = ch.Close()
return err
}
args := c.queueBindOption.args
if c.Exchange.eType == exchangeTypeHeaders {
args = c.Exchange.headersKeys
}
// binding queue and exchange
err = ch.QueueBind(
queue.Name,
c.Exchange.routingKey,
c.Exchange.name,
c.queueBindOption.noWait,
args,
)
if err != nil {
_ = ch.Close()
return err
}
// setting the prefetch value, set channel.Qos on the consumer side to limit the number of messages consumed at a time,
// balancing message throughput and fairness, and prevent consumers from being hit by sudden bursts of information traffic.
if c.qosOption.enable {
err = ch.Qos(c.qosOption.prefetchCount, c.qosOption.prefetchSize, c.qosOption.global)
if err != nil {
_ = ch.Close()
return err
}
}
fields := logFields(c.QueueName, c.Exchange)
fields = append(fields, zap.Bool("autoAck", c.isAutoAck))
c.zapLog.Info("[rabbitmq consumer] initialized", fields...)
return nil
}
func (c *Consumer) consumeWithContext(ctx context.Context) (<-chan amqp.Delivery, error) {
return c.ch.ConsumeWithContext(
ctx,
c.QueueName,
c.consumeOption.consumer,
c.isAutoAck,
c.consumeOption.exclusive,
c.consumeOption.noLocal,
c.consumeOption.noWait,
c.consumeOption.args,
)
}
// Consume messages for loop in goroutine
func (c *Consumer) Consume(ctx context.Context, handler Handler) {
go func() {
ticker := time.NewTicker(time.Second * 2)
isFirst := true
for {
if isFirst {
isFirst = false
ticker.Reset(time.Millisecond * 10)
} else {
ticker.Reset(time.Second * 2)
}
// check connection for loop
select {
case <-ticker.C:
if !c.connection.CheckConnected() {
continue
}
case <-c.connection.exit:
c.Close()
return
}
ticker.Stop()
err := c.initialize()
if err != nil {
c.zapLog.Warn("[rabbitmq consumer] initialize consumer error", zap.String("err", err.Error()), zap.String("queue", c.QueueName))
continue
}
delivery, err := c.consumeWithContext(ctx)
if err != nil {
c.zapLog.Warn("[rabbitmq consumer] execution of consumption error", zap.String("err", err.Error()), zap.String("queue", c.QueueName))
continue
}
c.zapLog.Info("[rabbitmq consumer] queue is ready and waiting for messages, queue=" + c.QueueName)
isContinueConsume := false
for {
select {
case <-c.connection.exit:
c.Close()
return
case d, ok := <-delivery:
if !ok {
c.zapLog.Warn("[rabbitmq consumer] exit consume message, queue=" + c.QueueName)
isContinueConsume = true
break
}
tagID := strings.Join([]string{d.Exchange, c.QueueName, strconv.FormatUint(d.DeliveryTag, 10)}, "/")
err = handler(ctx, d.Body, tagID)
if err != nil {
c.zapLog.Warn("[rabbitmq consumer] handle message error", zap.String("err", err.Error()), zap.String("tagID", tagID))
continue
}
if !c.isAutoAck {
if err = d.Ack(false); err != nil {
c.zapLog.Warn("[rabbitmq consumer] manual ack error", zap.String("err", err.Error()), zap.String("tagID", tagID))
continue
}
c.zapLog.Info("[rabbitmq consumer] manual ack done", zap.String("tagID", tagID))
}
atomic.AddInt64(&c.count, 1)
}
if isContinueConsume {
break
}
}
}
}()
}
// Close consumer
func (c *Consumer) Close() {
if c.ch != nil {
_ = c.ch.Close()
}
}
// Count consumer success message number
func (c *Consumer) Count() int64 {
return atomic.LoadInt64(&c.count)
}