Files
sponge/pkg/rabbitmq/connection.go
2023-10-25 00:52:08 +08:00

225 lines
5.0 KiB
Go

// Package rabbitmq is a go wrapper for github.com/rabbitmq/amqp091-go
//
// producer and consumer using the five types direct, topic, fanout, headers, x-delayed-message.
// publisher and subscriber using the fanout message type.
package rabbitmq
import (
"crypto/tls"
"errors"
"fmt"
"strings"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
// DefaultURL default rabbitmq url
const DefaultURL = "amqp://guest:guest@localhost:5672/"
var defaultLogger, _ = zap.NewProduction()
// ConnectionOption connection option.
type ConnectionOption func(*connectionOptions)
type connectionOptions struct {
tlsConfig *tls.Config // tls config, if the url is amqps this field must be set
reconnectTime time.Duration // reconnect time interval, default is 3s
zapLog *zap.Logger
}
func (o *connectionOptions) apply(opts ...ConnectionOption) {
for _, opt := range opts {
opt(o)
}
}
// default connection settings
func defaultConnectionOptions() *connectionOptions {
return &connectionOptions{
tlsConfig: nil,
reconnectTime: time.Second * 3,
zapLog: defaultLogger,
}
}
// WithTLSConfig set tls config option.
func WithTLSConfig(tlsConfig *tls.Config) ConnectionOption {
return func(o *connectionOptions) {
if tlsConfig == nil {
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
o.tlsConfig = tlsConfig
}
}
// WithReconnectTime set reconnect time interval option.
func WithReconnectTime(d time.Duration) ConnectionOption {
return func(o *connectionOptions) {
if d == 0 {
d = time.Second * 3
}
o.reconnectTime = d
}
}
// WithLogger set logger option.
func WithLogger(zapLog *zap.Logger) ConnectionOption {
return func(o *connectionOptions) {
if zapLog == nil {
return
}
o.zapLog = zapLog
}
}
// -------------------------------------------------------------------------------------------
// Connection rabbitmq connection
type Connection struct {
mutex sync.Mutex
url string
tlsConfig *tls.Config
reconnectTime time.Duration
exit chan struct{}
zapLog *zap.Logger
conn *amqp.Connection
blockChan chan amqp.Blocking
closeChan chan *amqp.Error
isConnected bool
}
// NewConnection rabbitmq connection
func NewConnection(url string, opts ...ConnectionOption) (*Connection, error) {
if url == "" {
return nil, errors.New("url is empty")
}
o := defaultConnectionOptions()
o.apply(opts...)
connection := &Connection{
url: url,
reconnectTime: o.reconnectTime,
tlsConfig: o.tlsConfig,
exit: make(chan struct{}),
zapLog: o.zapLog,
}
conn, err := connect(connection.url, connection.tlsConfig)
if err != nil {
return nil, err
}
connection.zapLog.Info("[rabbitmq connection] connected successfully.")
connection.conn = conn
connection.blockChan = connection.conn.NotifyBlocked(make(chan amqp.Blocking, 1))
connection.closeChan = connection.conn.NotifyClose(make(chan *amqp.Error, 1))
connection.isConnected = true
go connection.monitor()
return connection, nil
}
func connect(url string, tlsConfig *tls.Config) (*amqp.Connection, error) {
var (
conn *amqp.Connection
err error
)
if strings.HasPrefix(url, "amqps://") {
if tlsConfig == nil {
return nil, errors.New("tls not set, e.g. NewConnection(url, WithTLSConfig(tlsConfig))")
}
conn, err = amqp.DialTLS(url, tlsConfig)
if err != nil {
return nil, err
}
} else {
conn, err = amqp.Dial(url)
if err != nil {
return nil, err
}
}
return conn, nil
}
// CheckConnected rabbitmq connection
func (c *Connection) CheckConnected() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.isConnected
}
func (c *Connection) monitor() {
retryCount := 0
reconnectTip := fmt.Sprintf("[rabbitmq connection] lost connection, attempting reconnect in %s", c.reconnectTime)
for {
select {
case <-c.exit:
_ = c.closeConn()
c.zapLog.Info("[rabbitmq connection] closed")
return
case b := <-c.blockChan:
if b.Active {
c.zapLog.Warn("[rabbitmq connection] TCP blocked: " + b.Reason)
} else {
c.zapLog.Warn("[rabbitmq connection] TCP unblocked")
}
case <-c.closeChan:
c.mutex.Lock()
c.isConnected = false
c.mutex.Unlock()
retryCount++
c.zapLog.Warn(reconnectTip)
time.Sleep(c.reconnectTime) // wait for reconnect
amqpConn, amqpErr := connect(c.url, c.tlsConfig)
if amqpErr != nil {
c.zapLog.Warn("[rabbitmq connection] reconnect failed", zap.String("err", amqpErr.Error()), zap.Int("retryCount", retryCount))
continue
}
c.zapLog.Info("[rabbitmq connection] reconnected successfully.")
// set new connection
c.mutex.Lock()
c.isConnected = true
c.conn = amqpConn
c.blockChan = c.conn.NotifyBlocked(make(chan amqp.Blocking, 1))
c.closeChan = c.conn.NotifyClose(make(chan *amqp.Error, 1))
c.mutex.Unlock()
}
}
}
// Close rabbitmq connection
func (c *Connection) Close() {
c.mutex.Lock()
c.isConnected = false
c.mutex.Unlock()
close(c.exit)
}
func (c *Connection) closeConn() error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.conn != nil {
return c.conn.Close()
}
return nil
}