diff --git a/mq.go b/mq.go index b3663b3..80fc66a 100644 --- a/mq.go +++ b/mq.go @@ -283,7 +283,7 @@ type publisher struct { } type Broker struct { - queues storage.IMap[string, storage.IMap[string, *Queue]] // Modified to support tenant-specific queues + queues storage.IMap[string, *Queue] // Modified to support tenant-specific queues consumers storage.IMap[string, *consumer] publishers storage.IMap[string, *publisher] deadLetter storage.IMap[string, *Queue] @@ -295,7 +295,7 @@ type Broker struct { func NewBroker(opts ...Option) *Broker { options := SetupOptions(opts...) return &Broker{ - queues: memory.New[string, storage.IMap[string, *Queue]](), + queues: memory.New[string, *Queue](), publishers: memory.New[string, *publisher](), consumers: memory.New[string, *consumer](), deadLetter: memory.New[string, *Queue](), @@ -316,16 +316,13 @@ func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error { con.conn.Close() b.consumers.Del(consumerID) } - b.queues.ForEach(func(_ string, tenantQueues storage.IMap[string, *Queue]) bool { - tenantQueues.ForEach(func(_ string, queue *Queue) bool { - if _, ok := queue.consumers.Get(consumerID); ok { - if b.opts.consumerOnClose != nil { - b.opts.consumerOnClose(ctx, queue.name, consumerID) - } - queue.consumers.Del(consumerID) + b.queues.ForEach(func(_ string, queue *Queue) bool { + if _, ok := queue.consumers.Get(consumerID); ok { + if b.opts.consumerOnClose != nil { + b.opts.consumerOnClose(ctx, queue.name, consumerID) } - return true - }) + queue.consumers.Del(consumerID) + } return true }) } else { @@ -334,16 +331,13 @@ func (b *Broker) OnClose(ctx context.Context, conn net.Conn) error { log.Printf("Broker: Consumer connection closed: %s, address: %s", consumerID, conn.RemoteAddr()) con.conn.Close() b.consumers.Del(consumerID) - b.queues.ForEach(func(_ string, tenantQueues storage.IMap[string, *Queue]) bool { - tenantQueues.ForEach(func(_ string, queue *Queue) bool { - queue.consumers.Del(consumerID) - if _, ok := queue.consumers.Get(consumerID); ok { - if b.opts.consumerOnClose != nil { - b.opts.consumerOnClose(ctx, queue.name, consumerID) - } + b.queues.ForEach(func(_ string, queue *Queue) bool { + queue.consumers.Del(consumerID) + if _, ok := queue.consumers.Get(consumerID); ok { + if b.opts.consumerOnClose != nil { + b.opts.consumerOnClose(ctx, queue.name, consumerID) } - return true - }) + } return true }) } @@ -599,14 +593,9 @@ func (b *Broker) receive(ctx context.Context, c net.Conn) (*codec.Message, error } func (b *Broker) broadcastToConsumers(msg *codec.Message) { - if tenantQueues, ok := b.queues.Get(msg.Queue); ok { - tenantQueues.ForEach(func(_, queueName string) bool { - if queue, ok := tenantQueues.Get(queueName); ok { - task := &QueuedTask{Message: msg, RetryCount: 0} - queue.tasks <- task - } - return true - }) + if queue, ok := b.queues.Get(msg.Queue); ok { + task := &QueuedTask{Message: msg, RetryCount: 0} + queue.tasks <- task } } @@ -868,8 +857,7 @@ func (b *Broker) NewQueue(name string) *Queue { tasks: make(chan *QueuedTask, b.opts.queueSize), consumers: memory.New[string, *consumer](), } - b.queues.Set(name, memory.New[string, *Queue]()) - b.queues.Get(name).Set(name, q) + b.queues.Set(name, q) // Create DLQ for the queue dlq := &Queue{ @@ -891,8 +879,7 @@ func (b *Broker) NewQueueWithOrdering(name string) *Queue { tasks: make(chan *QueuedTask, b.opts.queueSize), consumers: memory.New[string, *consumer](), } - b.queues.Set(name, memory.New[string, *Queue]()) - b.queues.Get(name).Set(name, q) + b.queues.Set(name, q) // Create DLQ for the queue dlq := &Queue{ @@ -973,95 +960,3 @@ func (b *Broker) Authorize(ctx context.Context, role string, action string) erro } return fmt.Errorf("unauthorized action") } - -// Add support for multi-tenancy -func (b *Broker) AddTenant(tenantID string) error { - if _, exists := b.queues.Get(tenantID); exists { - return fmt.Errorf("tenant %s already exists", tenantID) - } - b.queues.Set(tenantID, memory.New[string, *Queue]()) - return nil -} - -func (b *Broker) RemoveTenant(tenantID string) error { - if _, exists := b.queues.Get(tenantID); !exists { - return fmt.Errorf("tenant %s does not exist", tenantID) - } - b.queues.Del(tenantID) - return nil -} - -// Ensure tenant-specific queues and operations -func (b *Broker) NewQueueForTenant(tenantID, queueName string) (*Queue, error) { - tenantQueues, ok := b.queues.Get(tenantID) - if !ok { - return nil, fmt.Errorf("tenant %s does not exist", tenantID) - } - if _, exists := tenantQueues.Get(queueName); exists { - return nil, fmt.Errorf("queue %s already exists for tenant %s", queueName, tenantID) - } - q := &Queue{ - name: queueName, - tasks: make(chan *QueuedTask, b.opts.queueSize), - consumers: memory.New[string, *consumer](), - } - tenantQueues.Set(queueName, q) - - // Create tenant-specific DLQ - dlq := &Queue{ - name: queueName + "_dlq", - tasks: make(chan *QueuedTask, b.opts.queueSize), - consumers: memory.New[string, *consumer](), - } - tenantQueues.Set(queueName+"_dlq", dlq) - ctx := context.Background() - go b.dispatchWorker(ctx, q) - go b.dispatchWorker(ctx, dlq) - return q, nil -} - -func (b *Broker) PublishForTenant(ctx context.Context, tenantID string, task *Task, queueName string) error { - tenantQueues, ok := b.queues.Get(tenantID) - if !ok { - return fmt.Errorf("tenant %s does not exist", tenantID) - } - queue, ok := tenantQueues.Get(queueName) - if !ok { - return fmt.Errorf("queue %s does not exist for tenant %s", queueName, tenantID) - } - taskID := task.ID - if taskID == "" { - taskID = NewID() - task.ID = taskID - } - queuedTask := &QueuedTask{Message: codec.NewMessage(consts.PUBLISH, task.Payload, queueName, nil), RetryCount: 0} - queue.tasks <- queuedTask - return nil -} - -func (b *Broker) SubscribeForTenant(ctx context.Context, tenantID, queueName string, conn net.Conn) error { - tenantQueues, ok := b.queues.Get(tenantID) - if !ok { - return fmt.Errorf("tenant %s does not exist", tenantID) - } - queue, ok := tenantQueues.Get(queueName) - if !ok { - return fmt.Errorf("queue %s does not exist for tenant %s", queueName, tenantID) - } - consumerID := b.AddConsumer(ctx, queueName, conn) - queue.consumers.Set(consumerID, &consumer{id: consumerID, conn: conn}) - return nil -} - -func (b *Broker) ListQueuesForTenant(tenantID string) ([]string, error) { - tenantQueues, ok := b.queues.Get(tenantID) - if !ok { - return nil, fmt.Errorf("tenant %s does not exist", tenantID) - } - var queueNames []string - tenantQueues.ForEach(func(queueName string, _ *Queue) bool { - queueNames = append(queueNames, queueName) - return true - }) - return queueNames, nil -} diff --git a/recover.go b/recover.go index 8fad4d8..36da6f5 100644 --- a/recover.go +++ b/recover.go @@ -5,8 +5,6 @@ import ( "log" "runtime" "runtime/debug" - - "github.com/oarkflow/mq/metrics" ) func RecoverPanic(labelGenerator func() string) { @@ -20,8 +18,6 @@ func RecoverPanic(labelGenerator func() string) { } } log.Printf("[PANIC] - recovered from panic in %s (%s:%d): %v\nStack trace: %s", funcName, file, line, r, debug.Stack()) - label := labelGenerator() - metrics.TasksErrors.WithLabelValues(label).Inc() } }