mirror of
https://github.com/zhufuyi/sponge.git
synced 2025-09-27 04:55:57 +08:00
225 lines
5.0 KiB
Go
225 lines
5.0 KiB
Go
// Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go
|
|
//
|
|
// producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message.
|
|
// publisher and subscriber using the fanout message type.
|
|
package rabbitmq
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// DefaultURL default rabbitmq url
|
|
const DefaultURL = "amqp://guest:guest@localhost:5672/"
|
|
|
|
var defaultLogger, _ = zap.NewProduction()
|
|
|
|
// ConnectionOption connection option.
|
|
type ConnectionOption func(*connectionOptions)
|
|
|
|
type connectionOptions struct {
|
|
tlsConfig *tls.Config // tls config, if the url is amqps this field must be set
|
|
reconnectTime time.Duration // reconnect time interval, default is 3s
|
|
|
|
zapLog *zap.Logger
|
|
}
|
|
|
|
func (o *connectionOptions) apply(opts ...ConnectionOption) {
|
|
for _, opt := range opts {
|
|
opt(o)
|
|
}
|
|
}
|
|
|
|
// default connection settings
|
|
func defaultConnectionOptions() *connectionOptions {
|
|
return &connectionOptions{
|
|
tlsConfig: nil,
|
|
reconnectTime: time.Second * 3,
|
|
zapLog: defaultLogger,
|
|
}
|
|
}
|
|
|
|
// WithTLSConfig set tls config option.
|
|
func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption {
|
|
return func(o *connectionOptions) {
|
|
if tlsConfig == nil {
|
|
tlsConfig = &tls.Config{
|
|
InsecureSkipVerify: true,
|
|
}
|
|
}
|
|
o.tlsConfig = tlsConfig
|
|
}
|
|
}
|
|
|
|
// WithReconnectTime set reconnect time interval option.
|
|
func WithReconnectTime(d time.Duration) ConnectionOption {
|
|
return func(o *connectionOptions) {
|
|
if d == 0 {
|
|
d = time.Second * 3
|
|
}
|
|
o.reconnectTime = d
|
|
}
|
|
}
|
|
|
|
// WithLogger set logger option.
|
|
func WithLogger(zapLog *zap.Logger) ConnectionOption {
|
|
return func(o *connectionOptions) {
|
|
if zapLog == nil {
|
|
return
|
|
}
|
|
o.zapLog = zapLog
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------------------------
|
|
|
|
// Connection rabbitmq connection
|
|
type Connection struct {
|
|
mutex sync.Mutex
|
|
|
|
url string
|
|
tlsConfig *tls.Config
|
|
reconnectTime time.Duration
|
|
exit chan struct{}
|
|
zapLog *zap.Logger
|
|
|
|
conn *amqp.Connection
|
|
blockChan chan amqp.Blocking
|
|
closeChan chan *amqp.Error
|
|
isConnected bool
|
|
}
|
|
|
|
// NewConnection rabbitmq connection
|
|
func NewConnection(url string, opts ...ConnectionOption) (*Connection, error) {
|
|
if url == "" {
|
|
return nil, errors.New("url is empty")
|
|
}
|
|
|
|
o := defaultConnectionOptions()
|
|
o.apply(opts...)
|
|
|
|
connection := &Connection{
|
|
url: url,
|
|
reconnectTime: o.reconnectTime,
|
|
tlsConfig: o.tlsConfig,
|
|
exit: make(chan struct{}),
|
|
zapLog: o.zapLog,
|
|
}
|
|
|
|
conn, err := connect(connection.url, connection.tlsConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
connection.zapLog.Info("[rabbitmq connection] connected successfully.")
|
|
|
|
connection.conn = conn
|
|
connection.blockChan = connection.conn.NotifyBlocked(make(chan amqp.Blocking, 1))
|
|
connection.closeChan = connection.conn.NotifyClose(make(chan *amqp.Error, 1))
|
|
connection.isConnected = true
|
|
|
|
go connection.monitor()
|
|
|
|
return connection, nil
|
|
}
|
|
|
|
func connect(url string, tlsConfig *tls.Config) (*amqp.Connection, error) {
|
|
var (
|
|
conn *amqp.Connection
|
|
err error
|
|
)
|
|
|
|
if strings.HasPrefix(url, "amqps://") {
|
|
if tlsConfig == nil {
|
|
return nil, errors.New("tls not set, e.g. NewConnection(url, WithTLSConfig(tlsConfig))")
|
|
}
|
|
conn, err = amqp.DialTLS(url, tlsConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
conn, err = amqp.Dial(url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// CheckConnected rabbitmq connection
|
|
func (c *Connection) CheckConnected() bool {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
return c.isConnected
|
|
}
|
|
|
|
func (c *Connection) monitor() {
|
|
retryCount := 0
|
|
reconnectTip := fmt.Sprintf("[rabbitmq connection] lost connection, attempting reconnect in %s", c.reconnectTime)
|
|
|
|
for {
|
|
select {
|
|
case <-c.exit:
|
|
_ = c.closeConn()
|
|
c.zapLog.Info("[rabbitmq connection] closed")
|
|
return
|
|
case b := <-c.blockChan:
|
|
if b.Active {
|
|
c.zapLog.Warn("[rabbitmq connection] TCP blocked: " + b.Reason)
|
|
} else {
|
|
c.zapLog.Warn("[rabbitmq connection] TCP unblocked")
|
|
}
|
|
case <-c.closeChan:
|
|
c.mutex.Lock()
|
|
c.isConnected = false
|
|
c.mutex.Unlock()
|
|
|
|
retryCount++
|
|
c.zapLog.Warn(reconnectTip)
|
|
time.Sleep(c.reconnectTime) // wait for reconnect
|
|
|
|
amqpConn, amqpErr := connect(c.url, c.tlsConfig)
|
|
if amqpErr != nil {
|
|
c.zapLog.Warn("[rabbitmq connection] reconnect failed", zap.String("err", amqpErr.Error()), zap.Int("retryCount", retryCount))
|
|
continue
|
|
}
|
|
c.zapLog.Info("[rabbitmq connection] reconnected successfully.")
|
|
|
|
// set new connection
|
|
c.mutex.Lock()
|
|
c.isConnected = true
|
|
c.conn = amqpConn
|
|
c.blockChan = c.conn.NotifyBlocked(make(chan amqp.Blocking, 1))
|
|
c.closeChan = c.conn.NotifyClose(make(chan *amqp.Error, 1))
|
|
c.mutex.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close rabbitmq connection
|
|
func (c *Connection) Close() {
|
|
c.mutex.Lock()
|
|
c.isConnected = false
|
|
c.mutex.Unlock()
|
|
|
|
close(c.exit)
|
|
}
|
|
|
|
func (c *Connection) closeConn() error {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
if c.conn != nil {
|
|
return c.conn.Close()
|
|
}
|
|
|
|
return nil
|
|
}
|