mirror of
https://github.com/zhufuyi/sponge.git
synced 2025-10-04 00:16:25 +08:00
374 lines
10 KiB
Go
374 lines
10 KiB
Go
package rabbitmq
|
|
|
|
import amqp "github.com/rabbitmq/amqp091-go"
|
|
|
|
// ErrClosed closed
|
|
var ErrClosed = amqp.ErrClosed
|
|
|
|
const (
|
|
exchangeTypeDirect = "direct"
|
|
exchangeTypeTopic = "topic"
|
|
exchangeTypeFanout = "fanout"
|
|
exchangeTypeHeaders = "headers"
|
|
exchangeTypeDelayedMessage = "x-delayed-message"
|
|
|
|
// HeadersTypeAll all
|
|
HeadersTypeAll HeadersType = "all"
|
|
// HeadersTypeAny any
|
|
HeadersTypeAny HeadersType = "any"
|
|
)
|
|
|
|
// HeadersType headers type
|
|
type HeadersType = string
|
|
|
|
// Exchange rabbitmq minimum management unit
|
|
type Exchange struct {
|
|
name string // exchange name
|
|
eType string // exchange type: direct, topic, fanout, headers, x-delayed-message
|
|
routingKey string // route key
|
|
headersKeys map[string]interface{} // this field is required if eType=headers.
|
|
delayedMessageType string // this field is required if eType=headers, support direct, topic, fanout, headers
|
|
}
|
|
|
|
// Name exchange name
|
|
func (e *Exchange) Name() string {
|
|
return e.name
|
|
}
|
|
|
|
// Type exchange type
|
|
func (e *Exchange) Type() string {
|
|
return e.eType
|
|
}
|
|
|
|
// RoutingKey exchange routing key
|
|
func (e *Exchange) RoutingKey() string {
|
|
return e.routingKey
|
|
}
|
|
|
|
// HeadersKeys exchange headers keys
|
|
func (e *Exchange) HeadersKeys() map[string]interface{} {
|
|
return e.headersKeys
|
|
}
|
|
|
|
// DelayedMessageType exchange delayed message type
|
|
func (e *Exchange) DelayedMessageType() string {
|
|
return e.delayedMessageType
|
|
}
|
|
|
|
// NewDirectExchange create a direct exchange
|
|
func NewDirectExchange(exchangeName string, routingKey string) *Exchange {
|
|
return &Exchange{
|
|
name: exchangeName,
|
|
eType: exchangeTypeDirect,
|
|
routingKey: routingKey,
|
|
}
|
|
}
|
|
|
|
// NewTopicExchange create a topic exchange
|
|
func NewTopicExchange(exchangeName string, routingKey string) *Exchange {
|
|
return &Exchange{
|
|
name: exchangeName,
|
|
eType: exchangeTypeTopic,
|
|
routingKey: routingKey,
|
|
}
|
|
}
|
|
|
|
// NewFanoutExchange create a fanout exchange
|
|
func NewFanoutExchange(exchangeName string) *Exchange {
|
|
return &Exchange{
|
|
name: exchangeName,
|
|
eType: exchangeTypeFanout,
|
|
routingKey: "",
|
|
}
|
|
}
|
|
|
|
// NewHeadersExchange create a headers exchange, the headerType supports "all" and "any"
|
|
func NewHeadersExchange(exchangeName string, headersType HeadersType, keys map[string]interface{}) *Exchange {
|
|
if keys == nil {
|
|
keys = make(map[string]interface{})
|
|
}
|
|
|
|
switch headersType {
|
|
case HeadersTypeAll, HeadersTypeAny:
|
|
keys["x-match"] = headersType
|
|
default:
|
|
keys["x-match"] = HeadersTypeAll
|
|
}
|
|
|
|
return &Exchange{
|
|
name: exchangeName,
|
|
eType: exchangeTypeHeaders,
|
|
routingKey: "",
|
|
headersKeys: keys,
|
|
}
|
|
}
|
|
|
|
// NewDelayedMessageExchange create a delayed message exchange
|
|
func NewDelayedMessageExchange(exchangeName string, e *Exchange) *Exchange {
|
|
return &Exchange{
|
|
name: exchangeName,
|
|
eType: "x-delayed-message",
|
|
routingKey: e.routingKey,
|
|
delayedMessageType: e.eType,
|
|
headersKeys: e.headersKeys,
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------
|
|
|
|
// QueueDeclareOption declare queue option.
|
|
type QueueDeclareOption func(*queueDeclareOptions)
|
|
|
|
type queueDeclareOptions struct {
|
|
autoDelete bool // delete automatically
|
|
exclusive bool // exclusive (only available to the program that created it)
|
|
noWait bool // block processing
|
|
args amqp.Table // additional properties
|
|
}
|
|
|
|
func (o *queueDeclareOptions) apply(opts ...QueueDeclareOption) {
|
|
for _, opt := range opts {
|
|
opt(o)
|
|
}
|
|
}
|
|
|
|
// default queue declare settings
|
|
func defaultQueueDeclareOptions() *queueDeclareOptions {
|
|
return &queueDeclareOptions{
|
|
autoDelete: false,
|
|
exclusive: false,
|
|
noWait: false,
|
|
args: nil,
|
|
}
|
|
}
|
|
|
|
// WithQueueDeclareAutoDelete set queue declare auto delete option.
|
|
func WithQueueDeclareAutoDelete(enable bool) QueueDeclareOption {
|
|
return func(o *queueDeclareOptions) {
|
|
o.autoDelete = enable
|
|
}
|
|
}
|
|
|
|
// WithQueueDeclareExclusive set queue declare exclusive option.
|
|
func WithQueueDeclareExclusive(enable bool) QueueDeclareOption {
|
|
return func(o *queueDeclareOptions) {
|
|
o.exclusive = enable
|
|
}
|
|
}
|
|
|
|
// WithQueueDeclareNoWait set queue declare no wait option.
|
|
func WithQueueDeclareNoWait(enable bool) QueueDeclareOption {
|
|
return func(o *queueDeclareOptions) {
|
|
o.noWait = enable
|
|
}
|
|
}
|
|
|
|
// WithQueueDeclareArgs set queue declare args option.
|
|
func WithQueueDeclareArgs(args map[string]interface{}) QueueDeclareOption {
|
|
return func(o *queueDeclareOptions) {
|
|
o.args = args
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------
|
|
|
|
// ExchangeDeclareOption declare exchange option.
|
|
type ExchangeDeclareOption func(*exchangeDeclareOptions)
|
|
|
|
type exchangeDeclareOptions struct {
|
|
autoDelete bool // delete automatically
|
|
internal bool // public or not, false means public
|
|
noWait bool // block processing
|
|
args amqp.Table // additional properties
|
|
}
|
|
|
|
func (o *exchangeDeclareOptions) apply(opts ...ExchangeDeclareOption) {
|
|
for _, opt := range opts {
|
|
opt(o)
|
|
}
|
|
}
|
|
|
|
// default exchange declare settings
|
|
func defaultExchangeDeclareOptions() *exchangeDeclareOptions {
|
|
return &exchangeDeclareOptions{
|
|
//durable: true,
|
|
autoDelete: false,
|
|
internal: false,
|
|
noWait: false,
|
|
args: nil,
|
|
}
|
|
}
|
|
|
|
// WithExchangeDeclareAutoDelete set exchange declare auto delete option.
|
|
func WithExchangeDeclareAutoDelete(enable bool) ExchangeDeclareOption {
|
|
return func(o *exchangeDeclareOptions) {
|
|
o.autoDelete = enable
|
|
}
|
|
}
|
|
|
|
// WithExchangeDeclareInternal set exchange declare internal option.
|
|
func WithExchangeDeclareInternal(enable bool) ExchangeDeclareOption {
|
|
return func(o *exchangeDeclareOptions) {
|
|
o.internal = enable
|
|
}
|
|
}
|
|
|
|
// WithExchangeDeclareNoWait set exchange declare no wait option.
|
|
func WithExchangeDeclareNoWait(enable bool) ExchangeDeclareOption {
|
|
return func(o *exchangeDeclareOptions) {
|
|
o.noWait = enable
|
|
}
|
|
}
|
|
|
|
// WithExchangeDeclareArgs set exchange declare args option.
|
|
func WithExchangeDeclareArgs(args map[string]interface{}) ExchangeDeclareOption {
|
|
return func(o *exchangeDeclareOptions) {
|
|
o.args = args
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------
|
|
|
|
// QueueBindOption declare queue bind option.
|
|
type QueueBindOption func(*queueBindOptions)
|
|
|
|
type queueBindOptions struct {
|
|
noWait bool // block processing
|
|
args amqp.Table // this parameter is invalid if the type is headers.
|
|
}
|
|
|
|
func (o *queueBindOptions) apply(opts ...QueueBindOption) {
|
|
for _, opt := range opts {
|
|
opt(o)
|
|
}
|
|
}
|
|
|
|
// default queue bind settings
|
|
func defaultQueueBindOptions() *queueBindOptions {
|
|
return &queueBindOptions{
|
|
noWait: false,
|
|
args: nil,
|
|
}
|
|
}
|
|
|
|
// WithQueueBindNoWait set queue bind no wait option.
|
|
func WithQueueBindNoWait(enable bool) QueueBindOption {
|
|
return func(o *queueBindOptions) {
|
|
o.noWait = enable
|
|
}
|
|
}
|
|
|
|
// WithQueueBindArgs set queue bind args option.
|
|
func WithQueueBindArgs(args map[string]interface{}) QueueBindOption {
|
|
return func(o *queueBindOptions) {
|
|
o.args = args
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------
|
|
|
|
// DelayedMessagePublishOption declare queue bind option.
|
|
type DelayedMessagePublishOption func(*delayedMessagePublishOptions)
|
|
|
|
type delayedMessagePublishOptions struct {
|
|
topicKey string // the topic message type must be required
|
|
headersKeys map[string]interface{} // the headers message type must be required
|
|
}
|
|
|
|
func (o *delayedMessagePublishOptions) apply(opts ...DelayedMessagePublishOption) {
|
|
for _, opt := range opts {
|
|
opt(o)
|
|
}
|
|
}
|
|
|
|
// default delayed message publish settings
|
|
func defaultDelayedMessagePublishOptions() *delayedMessagePublishOptions {
|
|
return &delayedMessagePublishOptions{}
|
|
}
|
|
|
|
// WithDelayedMessagePublishTopicKey set delayed message publish topicKey option.
|
|
func WithDelayedMessagePublishTopicKey(topicKey string) DelayedMessagePublishOption {
|
|
return func(o *delayedMessagePublishOptions) {
|
|
if topicKey == "" {
|
|
return
|
|
}
|
|
o.topicKey = topicKey
|
|
}
|
|
}
|
|
|
|
// WithDelayedMessagePublishHeadersKeys set delayed message publish headersKeys option.
|
|
func WithDelayedMessagePublishHeadersKeys(headersKeys map[string]interface{}) DelayedMessagePublishOption {
|
|
return func(o *delayedMessagePublishOptions) {
|
|
if headersKeys == nil {
|
|
return
|
|
}
|
|
o.headersKeys = headersKeys
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------
|
|
|
|
// DeadLetterOption declare dead letter option.
|
|
type DeadLetterOption func(*deadLetterOptions)
|
|
|
|
type deadLetterOptions struct {
|
|
exchangeName string
|
|
queueName string
|
|
routingKey string
|
|
|
|
exchangeDeclare *exchangeDeclareOptions
|
|
queueDeclare *queueDeclareOptions
|
|
queueBind *queueBindOptions
|
|
}
|
|
|
|
func (o *deadLetterOptions) apply(opts ...DeadLetterOption) {
|
|
for _, opt := range opts {
|
|
opt(o)
|
|
}
|
|
}
|
|
|
|
func (o *deadLetterOptions) isEnabled() bool {
|
|
if o.exchangeName != "" && o.queueName != "" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func defaultDeadLetterOptions() *deadLetterOptions {
|
|
return &deadLetterOptions{
|
|
exchangeDeclare: defaultExchangeDeclareOptions(),
|
|
queueDeclare: defaultQueueDeclareOptions(),
|
|
queueBind: defaultQueueBindOptions(),
|
|
}
|
|
}
|
|
|
|
// WithDeadLetterExchangeDeclareOptions set dead letter exchange declare option.
|
|
func WithDeadLetterExchangeDeclareOptions(opts ...ExchangeDeclareOption) DeadLetterOption {
|
|
return func(o *deadLetterOptions) {
|
|
o.exchangeDeclare.apply(opts...)
|
|
}
|
|
}
|
|
|
|
// WithDeadLetterQueueDeclareOptions set dead letter queue declare option.
|
|
func WithDeadLetterQueueDeclareOptions(opts ...QueueDeclareOption) DeadLetterOption {
|
|
return func(o *deadLetterOptions) {
|
|
o.queueDeclare.apply(opts...)
|
|
}
|
|
}
|
|
|
|
// WithDeadLetterQueueBindOptions set dead letter queue bind option.
|
|
func WithDeadLetterQueueBindOptions(opts ...QueueBindOption) DeadLetterOption {
|
|
return func(o *deadLetterOptions) {
|
|
o.queueBind.apply(opts...)
|
|
}
|
|
}
|
|
|
|
// WithDeadLetter set dead letter exchange, queue, routing key.
|
|
func WithDeadLetter(exchangeName string, queueName string, routingKey string) DeadLetterOption {
|
|
return func(o *deadLetterOptions) {
|
|
o.exchangeName = exchangeName
|
|
o.queueName = queueName
|
|
o.routingKey = routingKey
|
|
}
|
|
}
|