Files
sponge/pkg/rabbitmq/common.go
2024-06-10 11:53:52 +08:00

374 lines
10 KiB
Go

package rabbitmq
import amqp "github.com/rabbitmq/amqp091-go"
// ErrClosed closed
var ErrClosed = amqp.ErrClosed
const (
exchangeTypeDirect = "direct"
exchangeTypeTopic = "topic"
exchangeTypeFanout = "fanout"
exchangeTypeHeaders = "headers"
exchangeTypeDelayedMessage = "x-delayed-message"
// HeadersTypeAll all
HeadersTypeAll HeadersType = "all"
// HeadersTypeAny any
HeadersTypeAny HeadersType = "any"
)
// HeadersType headers type
type HeadersType = string
// Exchange rabbitmq minimum management unit
type Exchange struct {
name string // exchange name
eType string // exchange type: direct, topic, fanout, headers, x-delayed-message
routingKey string // route key
headersKeys map[string]interface{} // this field is required if eType=headers.
delayedMessageType string // this field is required if eType=headers, support direct, topic, fanout, headers
}
// Name exchange name
func (e *Exchange) Name() string {
return e.name
}
// Type exchange type
func (e *Exchange) Type() string {
return e.eType
}
// RoutingKey exchange routing key
func (e *Exchange) RoutingKey() string {
return e.routingKey
}
// HeadersKeys exchange headers keys
func (e *Exchange) HeadersKeys() map[string]interface{} {
return e.headersKeys
}
// DelayedMessageType exchange delayed message type
func (e *Exchange) DelayedMessageType() string {
return e.delayedMessageType
}
// NewDirectExchange create a direct exchange
func NewDirectExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeDirect,
routingKey: routingKey,
}
}
// NewTopicExchange create a topic exchange
func NewTopicExchange(exchangeName string, routingKey string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeTopic,
routingKey: routingKey,
}
}
// NewFanoutExchange create a fanout exchange
func NewFanoutExchange(exchangeName string) *Exchange {
return &Exchange{
name: exchangeName,
eType: exchangeTypeFanout,
routingKey: "",
}
}
// NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"
func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange {
if keys == nil {
keys = make(map[string]interface{})
}
switch headersType {
case HeadersTypeAll, HeadersTypeAny:
keys["x-match"] = headersType
default:
keys["x-match"] = HeadersTypeAll
}
return &Exchange{
name: exchangeName,
eType: exchangeTypeHeaders,
routingKey: "",
headersKeys: keys,
}
}
// NewDelayedMessageExchange create a delayed message exchange
func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange {
return &Exchange{
name: exchangeName,
eType: "x-delayed-message",
routingKey: e.routingKey,
delayedMessageType: e.eType,
headersKeys: e.headersKeys,
}
}
// -------------------------------------------------------------------------------------------
// QueueDeclareOption declare queue option.
type QueueDeclareOption func(*queueDeclareOptions)
type queueDeclareOptions struct {
autoDelete bool // delete automatically
exclusive bool // exclusive (only available to the program that created it)
noWait bool // block processing
args amqp.Table // additional properties
}
func (o *queueDeclareOptions) apply(opts ...QueueDeclareOption) {
for _, opt := range opts {
opt(o)
}
}
// default queue declare settings
func defaultQueueDeclareOptions() *queueDeclareOptions {
return &queueDeclareOptions{
autoDelete: false,
exclusive: false,
noWait: false,
args: nil,
}
}
// WithQueueDeclareAutoDelete set queue declare auto delete option.
func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.autoDelete = enable
}
}
// WithQueueDeclareExclusive set queue declare exclusive option.
func WithQueueDeclareExclusive(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.exclusive = enable
}
}
// WithQueueDeclareNoWait set queue declare no wait option.
func WithQueueDeclareNoWait(enable bool) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.noWait = enable
}
}
// WithQueueDeclareArgs set queue declare args option.
func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption {
return func(o *queueDeclareOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// ExchangeDeclareOption declare exchange option.
type ExchangeDeclareOption func(*exchangeDeclareOptions)
type exchangeDeclareOptions struct {
autoDelete bool // delete automatically
internal bool // public or not, false means public
noWait bool // block processing
args amqp.Table // additional properties
}
func (o *exchangeDeclareOptions) apply(opts ...ExchangeDeclareOption) {
for _, opt := range opts {
opt(o)
}
}
// default exchange declare settings
func defaultExchangeDeclareOptions() *exchangeDeclareOptions {
return &exchangeDeclareOptions{
//durable: true,
autoDelete: false,
internal: false,
noWait: false,
args: nil,
}
}
// WithExchangeDeclareAutoDelete set exchange declare auto delete option.
func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.autoDelete = enable
}
}
// WithExchangeDeclareInternal set exchange declare internal option.
func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.internal = enable
}
}
// WithExchangeDeclareNoWait set exchange declare no wait option.
func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.noWait = enable
}
}
// WithExchangeDeclareArgs set exchange declare args option.
func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption {
return func(o *exchangeDeclareOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// QueueBindOption declare queue bind option.
type QueueBindOption func(*queueBindOptions)
type queueBindOptions struct {
noWait bool // block processing
args amqp.Table // this parameter is invalid if the type is headers.
}
func (o *queueBindOptions) apply(opts ...QueueBindOption) {
for _, opt := range opts {
opt(o)
}
}
// default queue bind settings
func defaultQueueBindOptions() *queueBindOptions {
return &queueBindOptions{
noWait: false,
args: nil,
}
}
// WithQueueBindNoWait set queue bind no wait option.
func WithQueueBindNoWait(enable bool) QueueBindOption {
return func(o *queueBindOptions) {
o.noWait = enable
}
}
// WithQueueBindArgs set queue bind args option.
func WithQueueBindArgs(args map[string]interface{}) QueueBindOption {
return func(o *queueBindOptions) {
o.args = args
}
}
// -------------------------------------------------------------------------------------------
// DelayedMessagePublishOption declare queue bind option.
type DelayedMessagePublishOption func(*delayedMessagePublishOptions)
type delayedMessagePublishOptions struct {
topicKey string // the topic message type must be required
headersKeys map[string]interface{} // the headers message type must be required
}
func (o *delayedMessagePublishOptions) apply(opts ...DelayedMessagePublishOption) {
for _, opt := range opts {
opt(o)
}
}
// default delayed message publish settings
func defaultDelayedMessagePublishOptions() *delayedMessagePublishOptions {
return &delayedMessagePublishOptions{}
}
// WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.
func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption {
return func(o *delayedMessagePublishOptions) {
if topicKey == "" {
return
}
o.topicKey = topicKey
}
}
// WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.
func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption {
return func(o *delayedMessagePublishOptions) {
if headersKeys == nil {
return
}
o.headersKeys = headersKeys
}
}
// -------------------------------------------------------------------------------------------
// DeadLetterOption declare dead letter option.
type DeadLetterOption func(*deadLetterOptions)
type deadLetterOptions struct {
exchangeName string
queueName string
routingKey string
exchangeDeclare *exchangeDeclareOptions
queueDeclare *queueDeclareOptions
queueBind *queueBindOptions
}
func (o *deadLetterOptions) apply(opts ...DeadLetterOption) {
for _, opt := range opts {
opt(o)
}
}
func (o *deadLetterOptions) isEnabled() bool {
if o.exchangeName != "" && o.queueName != "" {
return true
}
return false
}
func defaultDeadLetterOptions() *deadLetterOptions {
return &deadLetterOptions{
exchangeDeclare: defaultExchangeDeclareOptions(),
queueDeclare: defaultQueueDeclareOptions(),
queueBind: defaultQueueBindOptions(),
}
}
// WithDeadLetterExchangeDeclareOptions set dead letter exchange declare option.
func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption {
return func(o *deadLetterOptions) {
o.exchangeDeclare.apply(opts...)
}
}
// WithDeadLetterQueueDeclareOptions set dead letter queue declare option.
func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption {
return func(o *deadLetterOptions) {
o.queueDeclare.apply(opts...)
}
}
// WithDeadLetterQueueBindOptions set dead letter queue bind option.
func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption {
return func(o *deadLetterOptions) {
o.queueBind.apply(opts...)
}
}
// WithDeadLetter set dead letter exchange, queue, routing key.
func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption {
return func(o *deadLetterOptions) {
o.exchangeName = exchangeName
o.queueName = queueName
o.routingKey = routingKey
}
}