mirror of
https://github.com/oarkflow/mq.git
synced 2025-09-26 20:11:16 +08:00
2033 lines
53 KiB
Go
2033 lines
53 KiB
Go
package mq
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/oarkflow/errors"
|
|
"github.com/oarkflow/json"
|
|
|
|
"github.com/oarkflow/json/jsonparser"
|
|
|
|
"github.com/oarkflow/mq/codec"
|
|
"github.com/oarkflow/mq/consts"
|
|
"github.com/oarkflow/mq/logger"
|
|
"github.com/oarkflow/mq/storage"
|
|
"github.com/oarkflow/mq/storage/memory"
|
|
"github.com/oarkflow/mq/utils"
|
|
)
|
|
|
|
type Status string
|
|
|
|
const (
|
|
Pending Status = "Pending"
|
|
Processing Status = "Processing"
|
|
Completed Status = "Completed"
|
|
Failed Status = "Failed"
|
|
Cancelled Status = "Cancelled"
|
|
)
|
|
|
|
type Result struct {
|
|
CreatedAt time.Time `json:"created_at"`
|
|
ProcessedAt time.Time `json:"processed_at,omitempty"`
|
|
Latency string `json:"latency"`
|
|
Error error `json:"-"` // Keep error as an error type
|
|
Topic string `json:"topic"`
|
|
TaskID string `json:"task_id"`
|
|
Status Status `json:"status"`
|
|
ConditionStatus string `json:"condition_status"`
|
|
Ctx context.Context `json:"-"`
|
|
Payload json.RawMessage `json:"payload"`
|
|
ResetTo string `json:"reset_to,omitempty"` // Node ID to reset to, or "back" for previous page node
|
|
Last bool
|
|
}
|
|
|
|
func (r Result) MarshalJSON() ([]byte, error) {
|
|
type Alias Result
|
|
aux := &struct {
|
|
ErrorMsg string `json:"error,omitempty"`
|
|
Alias
|
|
}{
|
|
Alias: (Alias)(r),
|
|
}
|
|
if r.Error != nil {
|
|
aux.ErrorMsg = r.Error.Error()
|
|
}
|
|
return json.Marshal(aux)
|
|
}
|
|
|
|
func (r *Result) UnmarshalJSON(data []byte) error {
|
|
type Alias Result
|
|
aux := &struct {
|
|
*Alias
|
|
ErrMsg string `json:"error,omitempty"`
|
|
}{
|
|
Alias: (*Alias)(r),
|
|
}
|
|
|
|
if err := json.Unmarshal(data, &aux); err != nil {
|
|
return err
|
|
}
|
|
if aux.ErrMsg != "" {
|
|
r.Error = errors.New(aux.ErrMsg)
|
|
} else {
|
|
r.Error = nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r Result) Unmarshal(data any) error {
|
|
if r.Payload == nil {
|
|
return fmt.Errorf("payload is nil")
|
|
}
|
|
return json.Unmarshal(r.Payload, data)
|
|
}
|
|
|
|
func HandleError(ctx context.Context, err error, status ...Status) Result {
|
|
st := Failed
|
|
if len(status) > 0 {
|
|
st = status[0]
|
|
}
|
|
if err == nil {
|
|
return Result{Ctx: ctx}
|
|
}
|
|
return Result{
|
|
Ctx: ctx,
|
|
Status: st,
|
|
Error: err,
|
|
}
|
|
}
|
|
|
|
func (r Result) WithData(status Status, data []byte) Result {
|
|
if r.Error != nil {
|
|
return r
|
|
}
|
|
return Result{
|
|
Status: status,
|
|
Payload: data,
|
|
Ctx: r.Ctx,
|
|
}
|
|
}
|
|
|
|
type TLSConfig struct {
|
|
CertPath string
|
|
KeyPath string
|
|
CAPath string
|
|
UseTLS bool
|
|
}
|
|
|
|
// QueueConfig holds configuration for a specific queue
|
|
type QueueConfig struct {
|
|
MaxDepth int `json:"max_depth"`
|
|
MaxRetries int `json:"max_retries"`
|
|
MessageTTL time.Duration `json:"message_ttl"`
|
|
DeadLetter bool `json:"dead_letter"`
|
|
Persistent bool `json:"persistent"`
|
|
BatchSize int `json:"batch_size"`
|
|
Priority int `json:"priority"`
|
|
OrderedMode bool `json:"ordered_mode"`
|
|
Throttling bool `json:"throttling"`
|
|
ThrottleRate int `json:"throttle_rate"`
|
|
ThrottleBurst int `json:"throttle_burst"`
|
|
CompactionMode bool `json:"compaction_mode"`
|
|
}
|
|
|
|
// QueueOption defines options for queue configuration
|
|
type QueueOption func(*QueueConfig)
|
|
|
|
// WithQueueOption creates a queue with specific configuration
|
|
func WithQueueOption(config QueueConfig) QueueOption {
|
|
return func(c *QueueConfig) {
|
|
*c = config
|
|
}
|
|
}
|
|
|
|
// WithQueueMaxDepth sets the maximum queue depth
|
|
func WithQueueMaxDepth(maxDepth int) QueueOption {
|
|
return func(c *QueueConfig) {
|
|
c.MaxDepth = maxDepth
|
|
}
|
|
}
|
|
|
|
// WithQueueMaxRetries sets the maximum retries for queue messages
|
|
func WithQueueMaxRetries(maxRetries int) QueueOption {
|
|
return func(c *QueueConfig) {
|
|
c.MaxRetries = maxRetries
|
|
}
|
|
}
|
|
|
|
// WithQueueTTL sets the message TTL for the queue
|
|
func WithQueueTTL(ttl time.Duration) QueueOption {
|
|
return func(c *QueueConfig) {
|
|
c.MessageTTL = ttl
|
|
}
|
|
}
|
|
|
|
// WithDeadLetter enables dead letter queue for failed messages
|
|
func WithDeadLetter() QueueOption {
|
|
return func(c *QueueConfig) {
|
|
c.DeadLetter = true
|
|
}
|
|
}
|
|
|
|
// WithPersistent enables message persistence
|
|
func WithPersistent() QueueOption {
|
|
return func(c *QueueConfig) {
|
|
c.Persistent = true
|
|
}
|
|
}
|
|
|
|
// RateLimiter implementation
|
|
type RateLimiter struct {
|
|
mu sync.Mutex
|
|
C chan struct{}
|
|
ticker *time.Ticker
|
|
rate int
|
|
burst int
|
|
stop chan struct{}
|
|
}
|
|
|
|
// NewRateLimiter creates a new RateLimiter with the specified rate and burst.
|
|
func NewRateLimiter(rate int, burst int) *RateLimiter {
|
|
rl := &RateLimiter{
|
|
C: make(chan struct{}, burst),
|
|
rate: rate,
|
|
burst: burst,
|
|
stop: make(chan struct{}),
|
|
}
|
|
rl.ticker = time.NewTicker(time.Second / time.Duration(rate))
|
|
go rl.run()
|
|
return rl
|
|
}
|
|
|
|
// run is the internal goroutine that periodically sends tokens.
|
|
func (rl *RateLimiter) run() {
|
|
for {
|
|
select {
|
|
case <-rl.ticker.C:
|
|
// Blocking send to ensure token accumulation doesn't discard tokens.
|
|
rl.mu.Lock()
|
|
// Try sending token, but don't block if channel is full.
|
|
select {
|
|
case rl.C <- struct{}{}:
|
|
default:
|
|
}
|
|
rl.mu.Unlock()
|
|
case <-rl.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wait blocks until a token is available.
|
|
func (rl *RateLimiter) Wait() {
|
|
<-rl.C
|
|
}
|
|
|
|
// Update allows dynamic adjustment of rate and burst at runtime.
|
|
// It immediately applies the new settings.
|
|
func (rl *RateLimiter) Update(newRate, newBurst int) {
|
|
rl.mu.Lock()
|
|
defer rl.mu.Unlock()
|
|
|
|
// Stop the old ticker.
|
|
rl.ticker.Stop()
|
|
// Replace the channel with a new one of the new burst capacity.
|
|
rl.C = make(chan struct{}, newBurst)
|
|
// Update internal state.
|
|
rl.rate = newRate
|
|
rl.burst = newBurst
|
|
// Start a new ticker with the updated rate.
|
|
rl.ticker = time.NewTicker(time.Second / time.Duration(newRate))
|
|
// The run goroutine will pick up tokens from the new ticker and use the new channel.
|
|
}
|
|
|
|
// Stop terminates the rate limiter's internal goroutine.
|
|
func (rl *RateLimiter) Stop() {
|
|
close(rl.stop)
|
|
rl.ticker.Stop()
|
|
}
|
|
|
|
type Options struct {
|
|
storage TaskStorage
|
|
consumerOnSubscribe func(ctx context.Context, topic, consumerName string)
|
|
consumerOnClose func(ctx context.Context, topic, consumerName string)
|
|
notifyResponse func(context.Context, Result) error
|
|
brokerAddr string
|
|
enableHTTPApi bool
|
|
tlsConfig TLSConfig
|
|
callback []func(context.Context, Result) Result
|
|
queueSize int
|
|
initialDelay time.Duration
|
|
maxBackoff time.Duration
|
|
jitterPercent float64
|
|
maxRetries int
|
|
numOfWorkers int
|
|
maxMemoryLoad int64
|
|
syncMode bool
|
|
cleanTaskOnComplete bool
|
|
enableWorkerPool bool
|
|
respondPendingResult bool
|
|
logger logger.Logger
|
|
BrokerRateLimiter *RateLimiter // new field for broker rate limiting
|
|
ConsumerRateLimiter *RateLimiter // new field for consumer rate limiting
|
|
consumerTimeout time.Duration // timeout for consumer message processing (0 = no timeout)
|
|
adminAddr string // address for admin server
|
|
metricsAddr string // address for metrics server
|
|
enableSecurity bool // enable security features
|
|
enableMonitoring bool // enable monitoring features
|
|
username string // username for authentication
|
|
password string // password for authentication
|
|
}
|
|
|
|
func (o *Options) SetSyncMode(sync bool) {
|
|
o.syncMode = sync
|
|
}
|
|
|
|
func (o *Options) NumOfWorkers() int {
|
|
return o.numOfWorkers
|
|
}
|
|
|
|
func (o *Options) Logger() logger.Logger {
|
|
return o.logger
|
|
}
|
|
|
|
func (o *Options) Storage() TaskStorage {
|
|
return o.storage
|
|
}
|
|
|
|
func (o *Options) CleanTaskOnComplete() bool {
|
|
return o.cleanTaskOnComplete
|
|
}
|
|
|
|
func (o *Options) QueueSize() int {
|
|
return o.queueSize
|
|
}
|
|
|
|
func (o *Options) MaxMemoryLoad() int64 {
|
|
return o.maxMemoryLoad
|
|
}
|
|
|
|
func (o *Options) BrokerAddr() string {
|
|
return o.brokerAddr
|
|
}
|
|
|
|
func (o *Options) HTTPApi() bool {
|
|
return o.enableHTTPApi
|
|
}
|
|
|
|
func (o *Options) ConsumerTimeout() time.Duration {
|
|
return o.consumerTimeout
|
|
}
|
|
|
|
func HeadersWithConsumerID(ctx context.Context, id string) map[string]string {
|
|
return WithHeaders(ctx, map[string]string{consts.ConsumerKey: id, consts.ContentType: consts.TypeJson})
|
|
}
|
|
|
|
func HeadersWithConsumerIDAndQueue(ctx context.Context, id, queue string) map[string]string {
|
|
return WithHeaders(ctx, map[string]string{
|
|
consts.ConsumerKey: id,
|
|
consts.ContentType: consts.TypeJson,
|
|
consts.QueueKey: queue,
|
|
})
|
|
}
|
|
|
|
type QueuedTask struct {
|
|
Message *codec.Message
|
|
Task *Task
|
|
RetryCount int
|
|
}
|
|
|
|
type consumer struct {
|
|
conn net.Conn
|
|
id string
|
|
state consts.ConsumerState
|
|
queue string
|
|
pool *Pool
|
|
metrics *ConsumerMetrics
|
|
}
|
|
|
|
type ConsumerMetrics struct {
|
|
ProcessedTasks int64
|
|
ErrorCount int64
|
|
LastActivity time.Time
|
|
}
|
|
|
|
type publisher struct {
|
|
conn net.Conn
|
|
id string
|
|
}
|
|
|
|
// Enhanced Broker Types and Interfaces
|
|
|
|
// ConnectionPool manages a pool of broker connections
|
|
type ConnectionPool struct {
|
|
mu sync.RWMutex
|
|
connections map[string]*BrokerConnection
|
|
maxConns int
|
|
connCount int64
|
|
}
|
|
|
|
// BrokerConnection represents a single broker connection
|
|
type BrokerConnection struct {
|
|
mu sync.RWMutex
|
|
conn net.Conn
|
|
id string
|
|
connType string
|
|
lastActivity time.Time
|
|
isActive bool
|
|
}
|
|
|
|
// HealthChecker monitors broker health
|
|
type HealthChecker struct {
|
|
mu sync.RWMutex
|
|
broker *Broker
|
|
interval time.Duration
|
|
ticker *time.Ticker
|
|
shutdown chan struct{}
|
|
thresholds HealthThresholds
|
|
}
|
|
|
|
// HealthThresholds defines health check thresholds
|
|
type HealthThresholds struct {
|
|
MaxMemoryUsage int64
|
|
MaxCPUUsage float64
|
|
MaxConnections int
|
|
MaxQueueDepth int
|
|
MaxResponseTime time.Duration
|
|
MinFreeMemory int64
|
|
}
|
|
|
|
// CircuitState represents the state of a circuit breaker
|
|
type CircuitState int
|
|
|
|
const (
|
|
CircuitClosed CircuitState = iota
|
|
CircuitOpen
|
|
CircuitHalfOpen
|
|
)
|
|
|
|
// EnhancedCircuitBreaker provides circuit breaker functionality
|
|
type EnhancedCircuitBreaker struct {
|
|
mu sync.RWMutex
|
|
threshold int64
|
|
timeout time.Duration
|
|
state CircuitState
|
|
failureCount int64
|
|
successCount int64
|
|
lastFailureTime time.Time
|
|
}
|
|
|
|
// MetricsCollector collects and stores metrics
|
|
type MetricsCollector struct {
|
|
mu sync.RWMutex
|
|
metrics map[string]*Metric
|
|
}
|
|
|
|
// Metric represents a single metric
|
|
type Metric struct {
|
|
Name string `json:"name"`
|
|
Value float64 `json:"value"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Tags map[string]string `json:"tags,omitempty"`
|
|
}
|
|
|
|
// MessageStore interface for storing messages
|
|
type MessageStore interface {
|
|
Store(msg *StoredMessage) error
|
|
Retrieve(id string) (*StoredMessage, error)
|
|
Delete(id string) error
|
|
List(queue string, limit int, offset int) ([]*StoredMessage, error)
|
|
Count(queue string) (int64, error)
|
|
Cleanup(olderThan time.Time) error
|
|
}
|
|
|
|
// StoredMessage represents a message stored in the message store
|
|
type StoredMessage struct {
|
|
ID string `json:"id"`
|
|
Queue string `json:"queue"`
|
|
Payload []byte `json:"payload"`
|
|
Headers map[string]string `json:"headers,omitempty"`
|
|
Metadata map[string]any `json:"metadata,omitempty"`
|
|
Priority int `json:"priority"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
ExpiresAt *time.Time `json:"expires_at,omitempty"`
|
|
Attempts int `json:"attempts"`
|
|
}
|
|
|
|
type Broker struct {
|
|
// Core broker functionality
|
|
queues storage.IMap[string, *Queue] // Modified to support tenant-specific queues
|
|
consumers storage.IMap[string, *consumer]
|
|
publishers storage.IMap[string, *publisher]
|
|
deadLetter storage.IMap[string, *Queue]
|
|
opts *Options
|
|
pIDs storage.IMap[string, bool]
|
|
listener net.Listener
|
|
|
|
// Enhanced production features
|
|
connectionPool *ConnectionPool
|
|
healthChecker *HealthChecker
|
|
circuitBreaker *EnhancedCircuitBreaker
|
|
metricsCollector *MetricsCollector
|
|
messageStore MessageStore
|
|
securityManager *SecurityManager
|
|
adminServer *AdminServer
|
|
metricsServer *MetricsServer
|
|
authenticatedConns storage.IMap[string, bool] // authenticated connections
|
|
taskHeaders storage.IMap[string, map[string]string] // task headers by task ID
|
|
pendingTasks map[string]map[string]*Task // consumerID -> taskID -> task
|
|
mu sync.RWMutex // for pendingTasks
|
|
isShutdown int32
|
|
shutdown chan struct{}
|
|
wg sync.WaitGroup
|
|
logger logger.Logger
|
|
}
|
|
|
|
func NewBroker(opts ...Option) *Broker {
|
|
options := SetupOptions(opts...)
|
|
|
|
broker := &Broker{
|
|
// Core broker functionality
|
|
queues: memory.New[string, *Queue](),
|
|
publishers: memory.New[string, *publisher](),
|
|
consumers: memory.New[string, *consumer](),
|
|
deadLetter: memory.New[string, *Queue](),
|
|
pIDs: memory.New[string, bool](),
|
|
pendingTasks: make(map[string]map[string]*Task),
|
|
opts: options,
|
|
|
|
// Enhanced production features
|
|
connectionPool: NewConnectionPool(1000), // max 1000 connections
|
|
healthChecker: NewHealthChecker(),
|
|
circuitBreaker: NewEnhancedCircuitBreaker(10, 30*time.Second), // 10 failures, 30s timeout
|
|
metricsCollector: NewMetricsCollector(),
|
|
messageStore: NewInMemoryMessageStore(),
|
|
authenticatedConns: memory.New[string, bool](),
|
|
taskHeaders: memory.New[string, map[string]string](),
|
|
shutdown: make(chan struct{}),
|
|
logger: options.Logger(),
|
|
}
|
|
|
|
if options.enableSecurity {
|
|
broker.securityManager = NewSecurityManager()
|
|
}
|
|
if options.enableMonitoring {
|
|
if options.adminAddr != "" {
|
|
broker.adminServer = NewAdminServer(broker, options.adminAddr, options.Logger())
|
|
}
|
|
if options.metricsAddr != "" {
|
|
// Need to create MonitoringConfig, use default
|
|
config := &MonitoringConfig{
|
|
EnableMetrics: true,
|
|
MetricsPort: 9090, // default
|
|
MetricsPath: "/metrics",
|
|
EnableHealthCheck: true,
|
|
HealthCheckPort: 8080,
|
|
HealthCheckPath: "/health",
|
|
HealthCheckInterval: time.Minute,
|
|
EnableLogging: true,
|
|
LogLevel: "info",
|
|
}
|
|
broker.metricsServer = NewMetricsServer(broker, config, options.Logger())
|
|
}
|
|
}
|
|
|
|
broker.healthChecker.broker = broker
|
|
return broker
|
|
}
|
|
|
|
func (b *Broker) Options() *Options {
|
|
return b.opts
|
|
}
|
|
|
|
func (b *Broker) SecurityManager() *SecurityManager {
|
|
return b.securityManager
|
|
}
|
|
|
|
// InitializeSecurity initializes default users, roles, and permissions for development/testing
|
|
func (b *Broker) InitializeSecurity() error {
|
|
if b.securityManager == nil {
|
|
return fmt.Errorf("security manager not initialized")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error {
|
|
consumerID, ok := GetConsumerID(ctx)
|
|
if ok && consumerID != "" {
|
|
log.Printf("Broker: Consumer connection closed: %s, address: %s", consumerID, conn.RemoteAddr())
|
|
if con, exists := b.consumers.Get(consumerID); exists {
|
|
con.conn.Close()
|
|
b.consumers.Del(consumerID)
|
|
}
|
|
b.queues.ForEach(func(_ string, queue *Queue) bool {
|
|
if _, ok := queue.consumers.Get(consumerID); ok {
|
|
if b.opts.consumerOnClose != nil {
|
|
b.opts.consumerOnClose(ctx, queue.name, consumerID)
|
|
}
|
|
queue.consumers.Del(consumerID)
|
|
}
|
|
return true
|
|
})
|
|
} else {
|
|
b.consumers.ForEach(func(consumerID string, con *consumer) bool {
|
|
if utils.ConnectionsEqual(conn, con.conn) {
|
|
log.Printf("Broker: Consumer connection closed: %s, address: %s", consumerID, conn.RemoteAddr())
|
|
con.conn.Close()
|
|
b.consumers.Del(consumerID)
|
|
b.queues.ForEach(func(_ string, queue *Queue) bool {
|
|
queue.consumers.Del(consumerID)
|
|
if _, ok := queue.consumers.Get(consumerID); ok {
|
|
if b.opts.consumerOnClose != nil {
|
|
b.opts.consumerOnClose(ctx, queue.name, consumerID)
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
publisherID, ok := GetPublisherID(ctx)
|
|
if ok && publisherID != "" {
|
|
log.Printf("Broker: Publisher connection closed: %s, address: %s", publisherID, conn.RemoteAddr())
|
|
if con, exists := b.publishers.Get(publisherID); exists {
|
|
con.conn.Close()
|
|
b.publishers.Del(publisherID)
|
|
}
|
|
}
|
|
// Remove from authenticated connections
|
|
connID := conn.RemoteAddr().String()
|
|
b.authenticatedConns.Del(connID)
|
|
|
|
log.Printf("BROKER - Connection closed: address %s", conn.RemoteAddr())
|
|
return nil
|
|
}
|
|
|
|
func (b *Broker) OnError(_ context.Context, conn net.Conn, err error) {
|
|
if conn != nil {
|
|
if b.isConnectionClosed(err) {
|
|
log.Printf("Connection closed for consumer at %s", conn.RemoteAddr())
|
|
// Find and remove the consumer
|
|
b.consumers.ForEach(func(id string, con *consumer) bool {
|
|
if con.conn == conn {
|
|
b.RemoveConsumer(id)
|
|
b.mu.Lock()
|
|
if tasks, ok := b.pendingTasks[id]; ok {
|
|
for _, task := range tasks {
|
|
// Put back to queue
|
|
if q, ok := b.queues.Get(task.Topic); ok {
|
|
select {
|
|
case q.tasks <- &QueuedTask{Task: task, RetryCount: task.Retries}:
|
|
log.Printf("Requeued task %s for consumer %s", task.ID, id)
|
|
default:
|
|
log.Printf("Failed to requeue task %s, queue full", task.ID)
|
|
}
|
|
}
|
|
}
|
|
delete(b.pendingTasks, id)
|
|
}
|
|
b.mu.Unlock()
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
} else {
|
|
log.Printf("Error reading from connection: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Broker) isConnectionClosed(err error) bool {
|
|
return err == io.EOF || strings.Contains(err.Error(), "connection closed") || strings.Contains(err.Error(), "connection reset")
|
|
}
|
|
|
|
func (b *Broker) isAuthenticated(connID string) bool {
|
|
if b.securityManager == nil {
|
|
return true // no security, allow all
|
|
}
|
|
_, ok := b.authenticatedConns.Get(connID)
|
|
return ok
|
|
}
|
|
|
|
func (b *Broker) OnMessage(ctx context.Context, msg *codec.Message, conn net.Conn) {
|
|
// Set message headers in context for publisher/consumer ID extraction
|
|
ctx = SetHeaders(ctx, msg.Headers)
|
|
|
|
connID := conn.RemoteAddr().String()
|
|
|
|
// Check authentication for protected commands
|
|
if b.securityManager != nil && (msg.Command == consts.PUBLISH || msg.Command == consts.SUBSCRIBE) {
|
|
if !b.isAuthenticated(connID) {
|
|
b.logger.Warn("Unauthenticated access attempt", logger.Field{Key: "command", Value: msg.Command.String()}, logger.Field{Key: "conn", Value: connID})
|
|
// Send error response
|
|
return
|
|
}
|
|
}
|
|
|
|
switch msg.Command {
|
|
case consts.AUTH:
|
|
b.AuthHandler(ctx, conn, msg)
|
|
case consts.PUBLISH:
|
|
b.PublishHandler(ctx, conn, msg)
|
|
case consts.SUBSCRIBE:
|
|
b.SubscribeHandler(ctx, conn, msg)
|
|
case consts.MESSAGE_RESPONSE:
|
|
b.MessageResponseHandler(ctx, msg)
|
|
case consts.MESSAGE_ACK:
|
|
b.MessageAck(ctx, msg)
|
|
case consts.MESSAGE_DENY:
|
|
b.MessageDeny(ctx, msg)
|
|
case consts.CONSUMER_PAUSED:
|
|
b.OnConsumerPause(ctx, msg)
|
|
case consts.CONSUMER_RESUMED:
|
|
b.OnConsumerResume(ctx, msg)
|
|
case consts.CONSUMER_STOPPED:
|
|
b.OnConsumerStop(ctx, msg)
|
|
case consts.CONSUMER_UPDATED:
|
|
b.OnConsumerUpdated(ctx, msg)
|
|
default:
|
|
log.Printf("BROKER - UNKNOWN_COMMAND ~> %s on %s", msg.Command, msg.Queue)
|
|
}
|
|
}
|
|
|
|
func (b *Broker) AuthHandler(ctx context.Context, conn net.Conn, msg *codec.Message) {
|
|
connID := conn.RemoteAddr().String()
|
|
|
|
// Parse auth credentials from payload
|
|
var authReq map[string]any
|
|
if err := json.Unmarshal(msg.Payload, &authReq); err != nil {
|
|
b.logger.Error("Invalid auth request", logger.Field{Key: "error", Value: err.Error()})
|
|
return
|
|
}
|
|
|
|
username, _ := authReq["username"].(string)
|
|
password, _ := authReq["password"].(string)
|
|
|
|
// Authenticate
|
|
user, err := b.securityManager.Authenticate(ctx, map[string]any{
|
|
"username": username,
|
|
"password": password,
|
|
})
|
|
if err != nil {
|
|
b.logger.Warn("Authentication failed", logger.Field{Key: "username", Value: username}, logger.Field{Key: "conn", Value: connID})
|
|
// Send AUTH_DENY
|
|
denyMsg, err := codec.NewMessage(consts.AUTH_DENY, []byte(fmt.Sprintf(`{"error":"%s"}`, err.Error())), "", msg.Headers)
|
|
if err != nil {
|
|
b.logger.Error("Failed to create AUTH_DENY message", logger.Field{Key: "error", Value: err.Error()})
|
|
return
|
|
}
|
|
if err := b.send(ctx, conn, denyMsg); err != nil {
|
|
b.logger.Error("Failed to send AUTH_DENY", logger.Field{Key: "error", Value: err.Error()})
|
|
}
|
|
return
|
|
}
|
|
|
|
// Mark as authenticated
|
|
b.authenticatedConns.Set(connID, true)
|
|
|
|
// Send AUTH_ACK
|
|
ackMsg, err := codec.NewMessage(consts.AUTH_ACK, []byte(`{"status":"authenticated"}`), "", msg.Headers)
|
|
if err != nil {
|
|
b.logger.Error("Failed to create AUTH_ACK message", logger.Field{Key: "error", Value: err.Error()})
|
|
return
|
|
}
|
|
if err := b.send(ctx, conn, ackMsg); err != nil {
|
|
b.logger.Error("Failed to send AUTH_ACK", logger.Field{Key: "error", Value: err.Error()})
|
|
}
|
|
|
|
b.logger.Info("User authenticated", logger.Field{Key: "username", Value: user.Username}, logger.Field{Key: "conn", Value: connID})
|
|
}
|
|
|
|
func (b *Broker) AdjustConsumerWorkers(noOfWorkers int, consumerID ...string) {
|
|
b.consumers.ForEach(func(_ string, c *consumer) bool {
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (b *Broker) MessageAck(ctx context.Context, msg *codec.Message) {
|
|
consumerID, _ := GetConsumerID(ctx)
|
|
taskID, _ := jsonparser.GetString(msg.Payload, "id")
|
|
log.Printf("BROKER - MESSAGE_ACK ~> %s on %s for Task %s", consumerID, msg.Queue, taskID)
|
|
b.mu.Lock()
|
|
if tasks, ok := b.pendingTasks[consumerID]; ok {
|
|
delete(tasks, taskID)
|
|
}
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
func (b *Broker) MessageDeny(ctx context.Context, msg *codec.Message) {
|
|
consumerID, _ := GetConsumerID(ctx)
|
|
taskID, _ := jsonparser.GetString(msg.Payload, "id")
|
|
taskError, _ := jsonparser.GetString(msg.Payload, "error")
|
|
log.Printf("BROKER - MESSAGE_DENY ~> %s on %s for Task %s, Error: %s", consumerID, msg.Queue, taskID, taskError)
|
|
}
|
|
|
|
func (b *Broker) OnConsumerPause(ctx context.Context, _ *codec.Message) {
|
|
consumerID, _ := GetConsumerID(ctx)
|
|
if consumerID != "" {
|
|
if con, exists := b.consumers.Get(consumerID); exists {
|
|
con.state = consts.ConsumerStatePaused
|
|
log.Printf("BROKER - CONSUMER ~> Paused %s", consumerID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Broker) OnConsumerStop(ctx context.Context, _ *codec.Message) {
|
|
consumerID, _ := GetConsumerID(ctx)
|
|
if consumerID != "" {
|
|
if con, exists := b.consumers.Get(consumerID); exists {
|
|
con.state = consts.ConsumerStateStopped
|
|
log.Printf("BROKER - CONSUMER ~> Stopped %s", consumerID)
|
|
if b.opts.notifyResponse != nil {
|
|
result := Result{
|
|
Status: "STOPPED",
|
|
Topic: "", // adjust if queue name is available
|
|
TaskID: consumerID,
|
|
Ctx: ctx,
|
|
}
|
|
_ = b.opts.notifyResponse(ctx, result)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Broker) OnConsumerUpdated(ctx context.Context, msg *codec.Message) {
|
|
consumerID, _ := GetConsumerID(ctx)
|
|
if consumerID != "" {
|
|
log.Printf("BROKER - CONSUMER ~> Updated %s", consumerID)
|
|
if b.opts.notifyResponse != nil {
|
|
result := Result{
|
|
Status: "CONSUMER UPDATED",
|
|
TaskID: consumerID,
|
|
Ctx: ctx,
|
|
Payload: msg.Payload,
|
|
}
|
|
_ = b.opts.notifyResponse(ctx, result)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Broker) OnConsumerResume(ctx context.Context, _ *codec.Message) {
|
|
consumerID, _ := GetConsumerID(ctx)
|
|
if consumerID != "" {
|
|
if con, exists := b.consumers.Get(consumerID); exists {
|
|
con.state = consts.ConsumerStateActive
|
|
log.Printf("BROKER - CONSUMER ~> Resumed %s", consumerID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *Broker) MessageResponseHandler(ctx context.Context, msg *codec.Message) {
|
|
// Extract task ID from response
|
|
var result Result
|
|
if err := json.Unmarshal(msg.Payload, &result); err != nil {
|
|
log.Printf("Error unmarshaling response: %v", err)
|
|
return
|
|
}
|
|
taskID := result.TaskID
|
|
|
|
// Retrieve stored headers for this task
|
|
if headers, ok := b.taskHeaders.Get(taskID); ok {
|
|
ctx = SetHeaders(ctx, headers)
|
|
// Clean up stored headers
|
|
b.taskHeaders.Del(taskID)
|
|
}
|
|
|
|
msg.Command = consts.RESPONSE
|
|
b.HandleCallback(ctx, msg)
|
|
awaitResponse, ok := GetAwaitResponse(ctx)
|
|
if !(ok && awaitResponse == "true") {
|
|
return
|
|
}
|
|
publisherID, exists := GetPublisherID(ctx)
|
|
if !exists {
|
|
return
|
|
}
|
|
con, ok := b.publishers.Get(publisherID)
|
|
if !ok {
|
|
return
|
|
}
|
|
err := b.send(ctx, con.conn, msg)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (b *Broker) Publish(ctx context.Context, task *Task, queue string) error {
|
|
headers, _ := GetHeaders(ctx)
|
|
payload, err := json.Marshal(task)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
msg, err := codec.NewMessage(consts.PUBLISH, payload, queue, headers.AsMap())
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create PUBLISH message: %w", err)
|
|
}
|
|
b.broadcastToConsumers(msg)
|
|
return nil
|
|
}
|
|
|
|
func (b *Broker) PublishHandler(ctx context.Context, conn net.Conn, msg *codec.Message) {
|
|
pub := b.addPublisher(ctx, msg.Queue, conn)
|
|
taskID, _ := jsonparser.GetString(msg.Payload, "id")
|
|
log.Printf("BROKER - PUBLISH ~> received from %s on %s for Task %s", pub.id, msg.Queue, taskID)
|
|
|
|
// Store headers for response routing
|
|
b.taskHeaders.Set(taskID, msg.Headers)
|
|
|
|
ack, err := codec.NewMessage(consts.PUBLISH_ACK, utils.ToByte(fmt.Sprintf(`{"id":"%s"}`, taskID)), msg.Queue, msg.Headers)
|
|
if err != nil {
|
|
log.Printf("Error creating PUBLISH_ACK message: %v\n", err)
|
|
return
|
|
}
|
|
if err := b.send(ctx, conn, ack); err != nil {
|
|
log.Printf("Error sending PUBLISH_ACK: %v\n", err)
|
|
}
|
|
b.broadcastToConsumers(msg)
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
b.publishers.Del(pub.id)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (b *Broker) SubscribeHandler(ctx context.Context, conn net.Conn, msg *codec.Message) {
|
|
consumerID := b.AddConsumer(ctx, msg.Queue, conn)
|
|
ack, err := codec.NewMessage(consts.SUBSCRIBE_ACK, nil, msg.Queue, msg.Headers)
|
|
if err != nil {
|
|
log.Printf("Error creating SUBSCRIBE_ACK message: %v\n", err)
|
|
return
|
|
}
|
|
if err := b.send(ctx, conn, ack); err != nil {
|
|
log.Printf("Error sending SUBSCRIBE_ACK: %v\n", err)
|
|
}
|
|
if b.opts.consumerOnSubscribe != nil {
|
|
b.opts.consumerOnSubscribe(ctx, msg.Queue, consumerID)
|
|
}
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
b.RemoveConsumer(consumerID, msg.Queue)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (b *Broker) Start(ctx context.Context) error {
|
|
// Start health checker
|
|
b.healthChecker.Start()
|
|
|
|
// Start connection cleanup routine
|
|
b.wg.Add(1)
|
|
go b.connectionCleanupRoutine()
|
|
|
|
// Start metrics collection routine
|
|
b.wg.Add(1)
|
|
go b.metricsCollectionRoutine()
|
|
|
|
// Start message store cleanup routine
|
|
b.wg.Add(1)
|
|
go b.messageStoreCleanupRoutine()
|
|
|
|
// Start admin server if enabled
|
|
if b.adminServer != nil {
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer b.wg.Done()
|
|
if err := b.adminServer.Start(); err != nil {
|
|
b.logger.Error("Failed to start admin server", logger.Field{Key: "error", Value: err.Error()})
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Start metrics server if enabled
|
|
if b.metricsServer != nil {
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer b.wg.Done()
|
|
if err := b.metricsServer.Start(ctx); err != nil {
|
|
b.logger.Error("Failed to start metrics server", logger.Field{Key: "error", Value: err.Error()})
|
|
}
|
|
}()
|
|
}
|
|
|
|
b.logger.Info("Broker starting with production features enabled")
|
|
|
|
// Start the broker with enhanced features
|
|
if err := b.startBroker(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Wait for shutdown signal
|
|
<-b.shutdown
|
|
b.logger.Info("Broker shutting down")
|
|
|
|
// Wait for all goroutines to finish
|
|
b.wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (b *Broker) send(ctx context.Context, conn net.Conn, msg *codec.Message) error {
|
|
return codec.SendMessage(ctx, conn, msg)
|
|
}
|
|
|
|
func (b *Broker) receive(ctx context.Context, c net.Conn) (*codec.Message, error) {
|
|
return codec.ReadMessage(ctx, c)
|
|
}
|
|
|
|
func (b *Broker) broadcastToConsumers(msg *codec.Message) {
|
|
if queue, ok := b.queues.Get(msg.Queue); ok {
|
|
var t Task
|
|
json.Unmarshal(msg.Payload, &t)
|
|
task := &QueuedTask{Message: msg, Task: &t, RetryCount: 0}
|
|
queue.tasks <- task
|
|
}
|
|
}
|
|
|
|
func (b *Broker) waitForConsumerAck(ctx context.Context, conn net.Conn) error {
|
|
msg, err := b.receive(ctx, conn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if msg.Command == consts.MESSAGE_ACK {
|
|
log.Println("Received CONSUMER_ACK: Subscribed successfully")
|
|
return nil
|
|
}
|
|
return fmt.Errorf("expected CONSUMER_ACK, got: %v", msg.Command)
|
|
}
|
|
|
|
func (b *Broker) addPublisher(ctx context.Context, queueName string, conn net.Conn) *publisher {
|
|
publisherID, ok := GetPublisherID(ctx)
|
|
_, ok = b.queues.Get(queueName)
|
|
if !ok {
|
|
b.NewQueue(queueName)
|
|
}
|
|
con := &publisher{id: publisherID, conn: conn}
|
|
b.publishers.Set(publisherID, con)
|
|
return con
|
|
}
|
|
|
|
func (b *Broker) AddConsumer(ctx context.Context, queueName string, conn net.Conn) string {
|
|
consumerID, ok := GetConsumerID(ctx)
|
|
q, ok := b.queues.Get(queueName)
|
|
if !ok {
|
|
q = b.NewQueue(queueName)
|
|
}
|
|
|
|
// Create consumer with proper initialization
|
|
con := &consumer{
|
|
id: consumerID,
|
|
conn: conn,
|
|
state: consts.ConsumerStateActive,
|
|
queue: queueName,
|
|
pool: nil, // Pool will be set when consumer connects
|
|
metrics: &ConsumerMetrics{
|
|
ProcessedTasks: 0,
|
|
ErrorCount: 0,
|
|
LastActivity: time.Now(),
|
|
},
|
|
}
|
|
|
|
b.consumers.Set(consumerID, con)
|
|
q.consumers.Set(consumerID, con)
|
|
log.Printf("BROKER - SUBSCRIBE ~> %s on %s", consumerID, queueName)
|
|
return consumerID
|
|
}
|
|
|
|
func (b *Broker) UpdateConsumerPool(consumerID string, pool *Pool) {
|
|
if con, exists := b.consumers.Get(consumerID); exists {
|
|
con.pool = pool
|
|
}
|
|
}
|
|
|
|
func (b *Broker) UpdateConsumerMetrics(consumerID string, processedTasks, errorCount int64) {
|
|
if con, exists := b.consumers.Get(consumerID); exists && con.metrics != nil {
|
|
con.metrics.ProcessedTasks = processedTasks
|
|
con.metrics.ErrorCount = errorCount
|
|
con.metrics.LastActivity = time.Now()
|
|
}
|
|
}
|
|
|
|
func (b *Broker) RemoveConsumer(consumerID string, queues ...string) {
|
|
if len(queues) > 0 {
|
|
for _, queueName := range queues {
|
|
if queue, ok := b.queues.Get(queueName); ok {
|
|
con, ok := queue.consumers.Get(consumerID)
|
|
if ok {
|
|
con.conn.Close()
|
|
queue.consumers.Del(consumerID)
|
|
}
|
|
b.queues.Del(queueName)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
b.queues.ForEach(func(queueName string, queue *Queue) bool {
|
|
con, ok := queue.consumers.Get(consumerID)
|
|
if ok {
|
|
con.conn.Close()
|
|
queue.consumers.Del(consumerID)
|
|
}
|
|
b.queues.Del(queueName)
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (b *Broker) handleConsumer(
|
|
ctx context.Context, cmd consts.CMD, state consts.ConsumerState,
|
|
consumerID string, payload []byte, queues ...string,
|
|
) {
|
|
fn := func(queue *Queue) {
|
|
con, ok := queue.consumers.Get(consumerID)
|
|
if ok {
|
|
ack, err := codec.NewMessage(cmd, payload, queue.name, map[string]string{consts.ConsumerKey: consumerID})
|
|
if err != nil {
|
|
log.Printf("Error creating message for consumer %s: %v", consumerID, err)
|
|
return
|
|
}
|
|
err = b.send(ctx, con.conn, ack)
|
|
if err == nil {
|
|
con.state = state
|
|
}
|
|
}
|
|
}
|
|
if len(queues) > 0 {
|
|
for _, queueName := range queues {
|
|
if queue, ok := b.queues.Get(queueName); ok {
|
|
fn(queue)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
b.queues.ForEach(func(queueName string, queue *Queue) bool {
|
|
fn(queue)
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (b *Broker) UpdateConsumer(ctx context.Context, consumerID string, config DynamicConfig, queues ...string) error {
|
|
var err error
|
|
payload, _ := json.Marshal(config)
|
|
fn := func(queue *Queue) error {
|
|
con, ok := queue.consumers.Get(consumerID)
|
|
if ok {
|
|
ack, err := codec.NewMessage(consts.CONSUMER_UPDATE, payload, queue.name, map[string]string{consts.ConsumerKey: consumerID})
|
|
if err != nil {
|
|
log.Printf("Error creating message for consumer %s: %v", consumerID, err)
|
|
return err
|
|
}
|
|
return b.send(ctx, con.conn, ack)
|
|
}
|
|
return nil
|
|
}
|
|
if len(queues) > 0 {
|
|
for _, queueName := range queues {
|
|
if queue, ok := b.queues.Get(queueName); ok {
|
|
err = fn(queue)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
b.queues.ForEach(func(queueName string, queue *Queue) bool {
|
|
err = fn(queue)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (b *Broker) PauseConsumer(ctx context.Context, consumerID string, queues ...string) {
|
|
b.handleConsumer(ctx, consts.CONSUMER_PAUSE, consts.ConsumerStatePaused, consumerID, utils.ToByte("{}"), queues...)
|
|
}
|
|
|
|
func (b *Broker) ResumeConsumer(ctx context.Context, consumerID string, queues ...string) {
|
|
b.handleConsumer(ctx, consts.CONSUMER_RESUME, consts.ConsumerStateActive, consumerID, utils.ToByte("{}"), queues...)
|
|
}
|
|
|
|
func (b *Broker) StopConsumer(ctx context.Context, consumerID string, queues ...string) {
|
|
b.handleConsumer(ctx, consts.CONSUMER_STOP, consts.ConsumerStateStopped, consumerID, utils.ToByte("{}"), queues...)
|
|
}
|
|
|
|
func (b *Broker) readMessage(ctx context.Context, c net.Conn) error {
|
|
msg, err := b.receive(ctx, c)
|
|
if err == nil {
|
|
ctx = SetHeaders(ctx, msg.Headers)
|
|
b.OnMessage(ctx, msg, c)
|
|
return nil
|
|
}
|
|
if err.Error() == "EOF" || strings.Contains(err.Error(), "closed network connection") {
|
|
b.OnClose(ctx, c)
|
|
return err
|
|
}
|
|
b.OnError(ctx, c, err)
|
|
return err
|
|
}
|
|
|
|
func (b *Broker) dispatchWorker(ctx context.Context, queue *Queue) {
|
|
delay := b.opts.initialDelay
|
|
for task := range queue.tasks {
|
|
// Handle each task in a separate goroutine to avoid blocking the dispatch loop
|
|
go func(t *QueuedTask) {
|
|
if b.opts.BrokerRateLimiter != nil {
|
|
b.opts.BrokerRateLimiter.Wait()
|
|
}
|
|
|
|
success := false
|
|
currentDelay := delay
|
|
|
|
for !success && t.RetryCount <= b.opts.maxRetries {
|
|
if b.dispatchTaskToConsumer(ctx, queue, t) {
|
|
success = true
|
|
b.acknowledgeTask(ctx, t.Message.Queue, queue.name)
|
|
} else {
|
|
t.RetryCount++
|
|
currentDelay = b.backoffRetry(queue, t, currentDelay)
|
|
}
|
|
}
|
|
|
|
if t.RetryCount > b.opts.maxRetries {
|
|
b.sendToDLQ(queue, t)
|
|
}
|
|
}(task)
|
|
}
|
|
}
|
|
|
|
func (b *Broker) sendToDLQ(queue *Queue, task *QueuedTask) {
|
|
id := task.Task.ID
|
|
if dlq, ok := b.deadLetter.Get(queue.name); ok {
|
|
log.Printf("Sending task %s to dead-letter queue for %s", id, queue.name)
|
|
dlq.tasks <- task
|
|
} else {
|
|
log.Printf("No dead-letter queue for %s, discarding task %s", queue.name, id)
|
|
}
|
|
}
|
|
|
|
func (b *Broker) dispatchTaskToConsumer(ctx context.Context, queue *Queue, task *QueuedTask) bool {
|
|
var consumerFound bool
|
|
var err error
|
|
|
|
// Deduplication: Check if the task has already been processed
|
|
taskID := task.Task.ID
|
|
if _, exists := b.pIDs.Get(taskID); exists {
|
|
log.Printf("Task %s already processed, skipping...", taskID)
|
|
return true
|
|
}
|
|
|
|
queue.consumers.ForEach(func(_ string, con *consumer) bool {
|
|
if con.state != consts.ConsumerStateActive {
|
|
err = fmt.Errorf("consumer %s is not active", con.id)
|
|
return true
|
|
}
|
|
|
|
// Send message asynchronously to avoid blocking
|
|
b.mu.Lock()
|
|
if b.pendingTasks[con.id] == nil {
|
|
b.pendingTasks[con.id] = make(map[string]*Task)
|
|
}
|
|
b.pendingTasks[con.id][taskID] = task.Task
|
|
b.mu.Unlock()
|
|
go func(consumer *consumer, message *codec.Message) {
|
|
sendCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if sendErr := b.send(sendCtx, consumer.conn, message); sendErr != nil {
|
|
log.Printf("Failed to send task %s to consumer %s: %v", taskID, consumer.id, sendErr)
|
|
} else {
|
|
log.Printf("Successfully sent task %s to consumer %s", taskID, consumer.id)
|
|
}
|
|
}(con, task.Message)
|
|
|
|
consumerFound = true
|
|
// Mark the task as processed
|
|
b.pIDs.Set(taskID, true)
|
|
return false // Break the loop since we found a consumer
|
|
})
|
|
|
|
if err != nil {
|
|
log.Println(err.Error())
|
|
return false
|
|
}
|
|
if !consumerFound {
|
|
log.Printf("No available consumers for queue %s, retrying...", queue.name)
|
|
if b.opts.notifyResponse != nil {
|
|
result := Result{
|
|
Status: "NO_CONSUMER",
|
|
Topic: queue.name,
|
|
TaskID: taskID,
|
|
Ctx: ctx,
|
|
}
|
|
_ = b.opts.notifyResponse(ctx, result)
|
|
}
|
|
}
|
|
return consumerFound
|
|
}
|
|
|
|
// Modified backoffRetry: Re-insert the task into queue.tasks after backoff.
|
|
func (b *Broker) backoffRetry(queue *Queue, task *QueuedTask, delay time.Duration) time.Duration {
|
|
backoffDuration := utils.CalculateJitter(delay, b.opts.jitterPercent)
|
|
log.Printf("Backing off for %v before retrying task for queue %s", backoffDuration, task.Task.Topic)
|
|
|
|
// Perform backoff sleep in a goroutine to avoid blocking
|
|
go func() {
|
|
time.Sleep(backoffDuration)
|
|
}()
|
|
|
|
delay *= 2
|
|
if delay > b.opts.maxBackoff {
|
|
delay = b.opts.maxBackoff
|
|
}
|
|
return delay
|
|
}
|
|
|
|
func (b *Broker) URL() string {
|
|
return b.opts.brokerAddr
|
|
}
|
|
|
|
func (b *Broker) Close() error {
|
|
if b != nil && b.listener != nil {
|
|
log.Printf("Broker is closing...")
|
|
return b.listener.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *Broker) SetURL(url string) {
|
|
b.opts.brokerAddr = url
|
|
}
|
|
|
|
func (b *Broker) NewQueue(name string) *Queue {
|
|
q := &Queue{
|
|
name: name,
|
|
tasks: make(chan *QueuedTask, b.opts.queueSize),
|
|
consumers: memory.New[string, *consumer](),
|
|
}
|
|
b.queues.Set(name, q)
|
|
|
|
// Create DLQ for the queue
|
|
dlq := &Queue{
|
|
name: name + "_dlq",
|
|
tasks: make(chan *QueuedTask, b.opts.queueSize),
|
|
consumers: memory.New[string, *consumer](),
|
|
}
|
|
b.deadLetter.Set(name, dlq)
|
|
ctx := context.Background()
|
|
go b.dispatchWorker(ctx, q)
|
|
go b.dispatchWorker(ctx, dlq)
|
|
return q
|
|
}
|
|
|
|
// NewQueueWithConfig creates a queue with specific configuration
|
|
func (b *Broker) NewQueueWithConfig(name string, opts ...QueueOption) *Queue {
|
|
config := QueueConfig{
|
|
MaxDepth: b.opts.queueSize,
|
|
MaxRetries: 3,
|
|
MessageTTL: 1 * time.Hour,
|
|
BatchSize: 1,
|
|
}
|
|
|
|
// Apply options
|
|
for _, opt := range opts {
|
|
opt(&config)
|
|
}
|
|
|
|
q := newQueueWithConfig(name, config)
|
|
b.queues.Set(name, q)
|
|
|
|
// Create DLQ for the queue if enabled
|
|
if config.DeadLetter {
|
|
dlqConfig := config
|
|
dlqConfig.MaxDepth = config.MaxDepth / 10 // 10% of main queue
|
|
dlq := newQueueWithConfig(name+"_dlq", dlqConfig)
|
|
b.deadLetter.Set(name, dlq)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
go b.dispatchWorker(ctx, q)
|
|
if config.DeadLetter {
|
|
if dlq, ok := b.deadLetter.Get(name); ok {
|
|
go b.dispatchWorker(ctx, dlq)
|
|
}
|
|
}
|
|
return q
|
|
}
|
|
|
|
// Ensure message ordering in task queues
|
|
func (b *Broker) NewQueueWithOrdering(name string) *Queue {
|
|
q := &Queue{
|
|
name: name,
|
|
tasks: make(chan *QueuedTask, b.opts.queueSize),
|
|
consumers: memory.New[string, *consumer](),
|
|
}
|
|
b.queues.Set(name, q)
|
|
|
|
// Create DLQ for the queue
|
|
dlq := &Queue{
|
|
name: name + "_dlq",
|
|
tasks: make(chan *QueuedTask, b.opts.queueSize),
|
|
consumers: memory.New[string, *consumer](),
|
|
}
|
|
b.deadLetter.Set(name, dlq)
|
|
ctx := context.Background()
|
|
go b.dispatchWorker(ctx, q)
|
|
go b.dispatchWorker(ctx, dlq)
|
|
return q
|
|
}
|
|
|
|
func (b *Broker) TLSConfig() TLSConfig {
|
|
return b.opts.tlsConfig
|
|
}
|
|
|
|
func (b *Broker) SyncMode() bool {
|
|
return b.opts.syncMode
|
|
}
|
|
|
|
func (b *Broker) NotifyHandler() func(context.Context, Result) error {
|
|
return b.opts.notifyResponse
|
|
}
|
|
|
|
func (b *Broker) SetNotifyHandler(callback Callback) {
|
|
b.opts.notifyResponse = callback
|
|
}
|
|
|
|
func (b *Broker) HandleCallback(ctx context.Context, msg *codec.Message) {
|
|
if b.opts.callback != nil {
|
|
var result Result
|
|
err := json.Unmarshal(msg.Payload, &result)
|
|
if err == nil {
|
|
for _, callback := range b.opts.callback {
|
|
callback(ctx, result)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add explicit acknowledgment for successful task processing
|
|
func (b *Broker) acknowledgeTask(ctx context.Context, taskID string, queueName string) {
|
|
log.Printf("Acknowledging task %s on queue %s", taskID, queueName)
|
|
if b.opts.notifyResponse != nil {
|
|
result := Result{
|
|
Status: "ACKNOWLEDGED",
|
|
Topic: queueName,
|
|
TaskID: taskID,
|
|
Ctx: ctx,
|
|
}
|
|
_ = b.opts.notifyResponse(ctx, result)
|
|
}
|
|
}
|
|
|
|
// Add authentication and authorization for publishers and consumers
|
|
func (b *Broker) Authenticate(ctx context.Context, credentials map[string]string) error {
|
|
username, userExists := credentials["username"]
|
|
password, passExists := credentials["password"]
|
|
if !userExists || !passExists {
|
|
return fmt.Errorf("missing credentials")
|
|
}
|
|
// Example: Hardcoded credentials for simplicity
|
|
if username != "admin" || password != "password" {
|
|
return fmt.Errorf("invalid credentials")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *Broker) Authorize(ctx context.Context, role string, action string) error {
|
|
// Example: Simple role-based authorization
|
|
if role == "publisher" && action == "publish" {
|
|
return nil
|
|
}
|
|
if role == "consumer" && action == "consume" {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("unauthorized action")
|
|
}
|
|
|
|
// Enhanced Broker Methods (Production Features)
|
|
|
|
// NewConnectionPool creates a new connection pool
|
|
func NewConnectionPool(maxConns int) *ConnectionPool {
|
|
return &ConnectionPool{
|
|
connections: make(map[string]*BrokerConnection),
|
|
maxConns: maxConns,
|
|
}
|
|
}
|
|
|
|
// AddConnection adds a connection to the pool
|
|
func (cp *ConnectionPool) AddConnection(id string, conn net.Conn, connType string) error {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
if len(cp.connections) >= cp.maxConns {
|
|
return fmt.Errorf("connection pool is full")
|
|
}
|
|
|
|
brokerConn := &BrokerConnection{
|
|
conn: conn,
|
|
id: id,
|
|
connType: connType,
|
|
lastActivity: time.Now(),
|
|
isActive: true,
|
|
}
|
|
|
|
cp.connections[id] = brokerConn
|
|
atomic.AddInt64(&cp.connCount, 1)
|
|
return nil
|
|
}
|
|
|
|
// RemoveConnection removes a connection from the pool
|
|
func (cp *ConnectionPool) RemoveConnection(id string) {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
if conn, exists := cp.connections[id]; exists {
|
|
conn.conn.Close()
|
|
delete(cp.connections, id)
|
|
atomic.AddInt64(&cp.connCount, -1)
|
|
}
|
|
}
|
|
|
|
// GetActiveConnections returns the number of active connections
|
|
func (cp *ConnectionPool) GetActiveConnections() int64 {
|
|
return atomic.LoadInt64(&cp.connCount)
|
|
}
|
|
|
|
// NewHealthChecker creates a new health checker
|
|
func NewHealthChecker() *HealthChecker {
|
|
return &HealthChecker{
|
|
interval: 30 * time.Second,
|
|
shutdown: make(chan struct{}),
|
|
thresholds: HealthThresholds{
|
|
MaxMemoryUsage: 1024 * 1024 * 1024, // 1GB
|
|
MaxCPUUsage: 80.0, // 80%
|
|
MaxConnections: 900, // 90% of max
|
|
MaxQueueDepth: 10000,
|
|
MaxResponseTime: 5 * time.Second,
|
|
MinFreeMemory: 100 * 1024 * 1024, // 100MB
|
|
},
|
|
}
|
|
}
|
|
|
|
// NewEnhancedCircuitBreaker creates a new circuit breaker
|
|
func NewEnhancedCircuitBreaker(threshold int64, timeout time.Duration) *EnhancedCircuitBreaker {
|
|
return &EnhancedCircuitBreaker{
|
|
threshold: threshold,
|
|
timeout: timeout,
|
|
state: CircuitClosed,
|
|
}
|
|
}
|
|
|
|
// NewMetricsCollector creates a new metrics collector
|
|
func NewMetricsCollector() *MetricsCollector {
|
|
return &MetricsCollector{
|
|
metrics: make(map[string]*Metric),
|
|
}
|
|
}
|
|
|
|
// NewInMemoryMessageStore creates a new in-memory message store
|
|
func NewInMemoryMessageStore() *InMemoryMessageStore {
|
|
return &InMemoryMessageStore{
|
|
messages: memory.New[string, *StoredMessage](),
|
|
}
|
|
}
|
|
|
|
// Store stores a message
|
|
func (ims *InMemoryMessageStore) Store(msg *StoredMessage) error {
|
|
ims.messages.Set(msg.ID, msg)
|
|
return nil
|
|
}
|
|
|
|
// Retrieve retrieves a message by ID
|
|
func (ims *InMemoryMessageStore) Retrieve(id string) (*StoredMessage, error) {
|
|
msg, exists := ims.messages.Get(id)
|
|
if !exists {
|
|
return nil, fmt.Errorf("message not found: %s", id)
|
|
}
|
|
return msg, nil
|
|
}
|
|
|
|
// Delete deletes a message
|
|
func (ims *InMemoryMessageStore) Delete(id string) error {
|
|
ims.messages.Del(id)
|
|
return nil
|
|
}
|
|
|
|
// List lists messages for a queue
|
|
func (ims *InMemoryMessageStore) List(queue string, limit int, offset int) ([]*StoredMessage, error) {
|
|
var result []*StoredMessage
|
|
count := 0
|
|
skipped := 0
|
|
|
|
ims.messages.ForEach(func(id string, msg *StoredMessage) bool {
|
|
if msg.Queue == queue {
|
|
if skipped < offset {
|
|
skipped++
|
|
return true
|
|
}
|
|
|
|
result = append(result, msg)
|
|
count++
|
|
|
|
return count < limit
|
|
}
|
|
return true
|
|
})
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Count counts messages in a queue
|
|
func (ims *InMemoryMessageStore) Count(queue string) (int64, error) {
|
|
count := int64(0)
|
|
ims.messages.ForEach(func(id string, msg *StoredMessage) bool {
|
|
if msg.Queue == queue {
|
|
count++
|
|
}
|
|
return true
|
|
})
|
|
return count, nil
|
|
}
|
|
|
|
// Cleanup removes old messages
|
|
func (ims *InMemoryMessageStore) Cleanup(olderThan time.Time) error {
|
|
var toDelete []string
|
|
|
|
ims.messages.ForEach(func(id string, msg *StoredMessage) bool {
|
|
if msg.CreatedAt.Before(olderThan) ||
|
|
(msg.ExpiresAt != nil && msg.ExpiresAt.Before(time.Now())) {
|
|
toDelete = append(toDelete, id)
|
|
}
|
|
return true
|
|
})
|
|
|
|
for _, id := range toDelete {
|
|
ims.messages.Del(id)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Enhanced Start method with production features
|
|
|
|
// startBroker starts the core broker functionality
|
|
func (b *Broker) startBroker(ctx context.Context) error {
|
|
addr := b.opts.BrokerAddr()
|
|
listener, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to listen on %s: %w", addr, err)
|
|
}
|
|
b.listener = listener
|
|
b.logger.Info("Enhanced broker listening", logger.Field{Key: "address", Value: addr})
|
|
|
|
b.wg.Add(1)
|
|
go func() {
|
|
defer b.wg.Done()
|
|
for {
|
|
select {
|
|
case <-b.shutdown:
|
|
return
|
|
default:
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
select {
|
|
case <-b.shutdown:
|
|
return
|
|
default:
|
|
b.logger.Error("Accept error", logger.Field{Key: "error", Value: err.Error()})
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Add connection to pool
|
|
connID := fmt.Sprintf("conn_%d", time.Now().UnixNano())
|
|
b.connectionPool.AddConnection(connID, conn, "unknown")
|
|
|
|
b.wg.Add(1)
|
|
go func(c net.Conn) {
|
|
defer b.wg.Done()
|
|
b.handleEnhancedConnection(ctx, c)
|
|
}(conn)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleEnhancedConnection handles incoming connections with enhanced features
|
|
func (b *Broker) handleEnhancedConnection(ctx context.Context, conn net.Conn) {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
b.logger.Error("Connection handler panic",
|
|
logger.Field{Key: "panic", Value: fmt.Sprintf("%v", r)},
|
|
logger.Field{Key: "remote_addr", Value: conn.RemoteAddr().String()})
|
|
}
|
|
conn.Close()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-b.shutdown:
|
|
return
|
|
default:
|
|
msg, err := b.receive(ctx, conn)
|
|
if err != nil {
|
|
b.OnError(ctx, conn, err)
|
|
return
|
|
}
|
|
b.OnMessage(ctx, msg, conn)
|
|
}
|
|
}
|
|
}
|
|
|
|
// connectionCleanupRoutine periodically cleans up idle connections
|
|
func (b *Broker) connectionCleanupRoutine() {
|
|
defer b.wg.Done()
|
|
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
b.connectionPool.CleanupIdleConnections(10 * time.Minute)
|
|
case <-b.shutdown:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// CleanupIdleConnections removes idle connections
|
|
func (cp *ConnectionPool) CleanupIdleConnections(idleTimeout time.Duration) {
|
|
cp.mu.Lock()
|
|
defer cp.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
for id, conn := range cp.connections {
|
|
conn.mu.RLock()
|
|
lastActivity := conn.lastActivity
|
|
conn.mu.RUnlock()
|
|
|
|
if now.Sub(lastActivity) > idleTimeout {
|
|
conn.conn.Close()
|
|
delete(cp.connections, id)
|
|
atomic.AddInt64(&cp.connCount, -1)
|
|
}
|
|
}
|
|
}
|
|
|
|
// metricsCollectionRoutine periodically collects and reports metrics
|
|
func (b *Broker) metricsCollectionRoutine() {
|
|
defer b.wg.Done()
|
|
|
|
ticker := time.NewTicker(1 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
b.collectMetrics()
|
|
case <-b.shutdown:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// collectMetrics collects current system metrics
|
|
func (b *Broker) collectMetrics() {
|
|
// Collect connection metrics
|
|
activeConns := b.connectionPool.GetActiveConnections()
|
|
b.metricsCollector.RecordMetric("broker.connections.active", float64(activeConns), nil)
|
|
|
|
// Collect queue metrics
|
|
b.queues.ForEach(func(name string, queue *Queue) bool {
|
|
queueDepth := len(queue.tasks)
|
|
consumerCount := queue.consumers.Size()
|
|
|
|
b.metricsCollector.RecordMetric("broker.queue.depth", float64(queueDepth),
|
|
map[string]string{"queue": name})
|
|
b.metricsCollector.RecordMetric("broker.queue.consumers", float64(consumerCount),
|
|
map[string]string{"queue": name})
|
|
|
|
return true
|
|
})
|
|
}
|
|
|
|
// RecordMetric records a metric
|
|
func (mc *MetricsCollector) RecordMetric(name string, value float64, tags map[string]string) {
|
|
mc.mu.Lock()
|
|
defer mc.mu.Unlock()
|
|
|
|
mc.metrics[name] = &Metric{
|
|
Name: name,
|
|
Value: value,
|
|
Timestamp: time.Now(),
|
|
Tags: tags,
|
|
}
|
|
}
|
|
|
|
// messageStoreCleanupRoutine periodically cleans up old messages
|
|
func (b *Broker) messageStoreCleanupRoutine() {
|
|
defer b.wg.Done()
|
|
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
// Clean up messages older than 24 hours
|
|
cutoff := time.Now().Add(-24 * time.Hour)
|
|
if err := b.messageStore.Cleanup(cutoff); err != nil {
|
|
b.logger.Error("Failed to cleanup old messages",
|
|
logger.Field{Key: "error", Value: err.Error()})
|
|
}
|
|
case <-b.shutdown:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Enhanced Stop method with graceful shutdown
|
|
func (b *Broker) StopEnhanced() error {
|
|
if !atomic.CompareAndSwapInt32(&b.isShutdown, 0, 1) {
|
|
return nil // Already shutdown
|
|
}
|
|
|
|
b.logger.Info("Enhanced broker shutting down gracefully")
|
|
|
|
// Signal shutdown
|
|
close(b.shutdown)
|
|
|
|
// Stop health checker
|
|
b.healthChecker.Stop()
|
|
|
|
// Wait for all goroutines to finish
|
|
b.wg.Wait()
|
|
|
|
// Close all connections
|
|
b.connectionPool.mu.Lock()
|
|
for id, conn := range b.connectionPool.connections {
|
|
conn.conn.Close()
|
|
delete(b.connectionPool.connections, id)
|
|
}
|
|
b.connectionPool.mu.Unlock()
|
|
|
|
// Close listener
|
|
if b.listener != nil {
|
|
b.listener.Close()
|
|
}
|
|
|
|
b.logger.Info("Enhanced broker shutdown completed")
|
|
return nil
|
|
}
|
|
|
|
// Start starts the health checker
|
|
func (hc *HealthChecker) Start() {
|
|
hc.ticker = time.NewTicker(hc.interval)
|
|
go func() {
|
|
defer hc.ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-hc.ticker.C:
|
|
hc.performHealthCheck()
|
|
case <-hc.shutdown:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop stops the health checker
|
|
func (hc *HealthChecker) Stop() {
|
|
close(hc.shutdown)
|
|
}
|
|
|
|
// performHealthCheck performs a comprehensive health check
|
|
func (hc *HealthChecker) performHealthCheck() {
|
|
// Check connection count
|
|
activeConns := hc.broker.connectionPool.GetActiveConnections()
|
|
if activeConns > int64(hc.thresholds.MaxConnections) {
|
|
hc.broker.logger.Warn("High connection count detected",
|
|
logger.Field{Key: "active_connections", Value: activeConns},
|
|
logger.Field{Key: "threshold", Value: hc.thresholds.MaxConnections})
|
|
}
|
|
|
|
// Check queue depths
|
|
hc.broker.queues.ForEach(func(name string, queue *Queue) bool {
|
|
if len(queue.tasks) > hc.thresholds.MaxQueueDepth {
|
|
hc.broker.logger.Warn("High queue depth detected",
|
|
logger.Field{Key: "queue", Value: name},
|
|
logger.Field{Key: "depth", Value: len(queue.tasks)},
|
|
logger.Field{Key: "threshold", Value: hc.thresholds.MaxQueueDepth})
|
|
}
|
|
return true
|
|
})
|
|
|
|
// Record health metrics
|
|
hc.broker.metricsCollector.RecordMetric("broker.connections.active", float64(activeConns), nil)
|
|
hc.broker.metricsCollector.RecordMetric("broker.health.check.timestamp", float64(time.Now().Unix()), nil)
|
|
}
|
|
|
|
// Call executes a function with circuit breaker protection
|
|
func (cb *EnhancedCircuitBreaker) Call(fn func() error) error {
|
|
cb.mu.RLock()
|
|
state := cb.state
|
|
cb.mu.RUnlock()
|
|
|
|
switch state {
|
|
case CircuitOpen:
|
|
cb.mu.RLock()
|
|
lastFailure := cb.lastFailureTime
|
|
cb.mu.RUnlock()
|
|
|
|
if time.Since(lastFailure) > cb.timeout {
|
|
cb.mu.Lock()
|
|
cb.state = CircuitHalfOpen
|
|
cb.mu.Unlock()
|
|
} else {
|
|
return fmt.Errorf("circuit breaker is open")
|
|
}
|
|
case CircuitHalfOpen:
|
|
// Allow one request through
|
|
case CircuitClosed:
|
|
// Normal operation
|
|
}
|
|
|
|
err := fn()
|
|
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
if err != nil {
|
|
cb.failureCount++
|
|
cb.lastFailureTime = time.Now()
|
|
|
|
if cb.failureCount >= cb.threshold {
|
|
cb.state = CircuitOpen
|
|
} else if cb.state == CircuitHalfOpen {
|
|
cb.state = CircuitOpen
|
|
}
|
|
} else {
|
|
cb.successCount++
|
|
if cb.state == CircuitHalfOpen {
|
|
cb.state = CircuitClosed
|
|
cb.failureCount = 0
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// InMemoryMessageStore implements MessageStore in memory
|
|
type InMemoryMessageStore struct {
|
|
messages storage.IMap[string, *StoredMessage]
|
|
}
|
|
|
|
func (b *Broker) GetConsumers() []*AdminConsumerMetrics {
|
|
consumers := []*AdminConsumerMetrics{}
|
|
b.consumers.ForEach(func(id string, con *consumer) bool {
|
|
// Get status based on consumer state
|
|
status := "active"
|
|
switch con.state {
|
|
case consts.ConsumerStateActive:
|
|
status = "active"
|
|
case consts.ConsumerStatePaused:
|
|
status = "paused"
|
|
case consts.ConsumerStateStopped:
|
|
status = "stopped"
|
|
}
|
|
|
|
// Handle cases where pool might be nil
|
|
maxConcurrentTasks := 0
|
|
taskTimeout := 0
|
|
maxRetries := 0
|
|
|
|
if con.pool != nil {
|
|
config := con.pool.GetCurrentConfig()
|
|
maxConcurrentTasks = config.NumberOfWorkers
|
|
taskTimeout = int(config.Timeout.Seconds())
|
|
maxRetries = config.MaxRetries
|
|
}
|
|
|
|
// Ensure metrics is not nil
|
|
processedTasks := int64(0)
|
|
errorCount := int64(0)
|
|
lastActivity := time.Now()
|
|
|
|
if con.metrics != nil {
|
|
processedTasks = con.metrics.ProcessedTasks
|
|
errorCount = con.metrics.ErrorCount
|
|
lastActivity = con.metrics.LastActivity
|
|
}
|
|
|
|
consumers = append(consumers, &AdminConsumerMetrics{
|
|
ID: id,
|
|
Queue: con.queue,
|
|
Status: status,
|
|
ProcessedTasks: processedTasks,
|
|
ErrorCount: errorCount,
|
|
LastActivity: lastActivity,
|
|
MaxConcurrentTasks: maxConcurrentTasks,
|
|
TaskTimeout: taskTimeout,
|
|
MaxRetries: maxRetries,
|
|
})
|
|
return true
|
|
})
|
|
return consumers
|
|
}
|
|
|
|
func (b *Broker) GetPools() []*AdminPoolMetrics {
|
|
pools := []*AdminPoolMetrics{}
|
|
b.queues.ForEach(func(name string, queue *Queue) bool {
|
|
// Initialize default values
|
|
workers := 0
|
|
queueSize := 0
|
|
activeTasks := 0
|
|
maxMemoryLoad := int64(0)
|
|
lastActivity := time.Now()
|
|
|
|
// Get metrics from queue if available
|
|
if queue.metrics != nil {
|
|
workers = queue.metrics.WorkerCount
|
|
queueSize = queue.metrics.QueueDepth
|
|
activeTasks = queue.metrics.ActiveTasks
|
|
maxMemoryLoad = queue.metrics.MaxMemoryLoad
|
|
lastActivity = queue.metrics.LastActivity
|
|
}
|
|
|
|
// If metrics are empty, try to get some basic info from the queue
|
|
if queueSize == 0 && queue.tasks != nil {
|
|
queueSize = len(queue.tasks)
|
|
}
|
|
|
|
pools = append(pools, &AdminPoolMetrics{
|
|
ID: name,
|
|
Workers: workers,
|
|
QueueSize: queueSize,
|
|
ActiveTasks: activeTasks,
|
|
Status: "running", // Default status
|
|
MaxMemoryLoad: maxMemoryLoad,
|
|
LastActivity: lastActivity,
|
|
})
|
|
return true
|
|
})
|
|
return pools
|
|
}
|