diff --git a/broker.go b/broker.go index 0ed32db..55af50f 100644 --- a/broker.go +++ b/broker.go @@ -184,17 +184,23 @@ func (b *Broker) subscribe(ctx context.Context, queueName string, conn net.Conn) q.conn = make(map[net.Conn]struct{}) } q.conn[conn] = struct{}{} - if q.deferred == nil { - q.deferred = xsync.NewMap[string, *Task]() - } - q.deferred.ForEach(func(_ string, message *Task) bool { - err := b.Publish(ctx, *message, queueName) - if err != nil { - return false + go func() { + select { + case <-ctx.Done(): + b.removeConnection(queueName, conn) } - return true - }) - q.deferred = nil + }() +} + +// Removes connection from the queue and broker +func (b *Broker) removeConnection(queueName string, conn net.Conn) { + if queue, ok := b.queues.Get(queueName); ok { + delete(queue.conn, conn) + if len(queue.conn) == 0 { + b.queues.Del(queueName) + } + conn.Close() + } } func (b *Broker) readMessage(ctx context.Context, conn net.Conn, message []byte) error { @@ -222,11 +228,23 @@ func (b *Broker) handleCommandMessage(ctx context.Context, conn net.Conn, msg Co case PUBLISH: task := Task{ ID: msg.MessageID, - Payload: json.RawMessage(msg.Error), // Assuming Error field carries task payload here + Payload: msg.Payload, CreatedAt: time.Now(), CurrentQueue: msg.Queue, } - return b.Publish(ctx, task, msg.Queue) + err := b.Publish(ctx, task, msg.Queue) + if err != nil { + return err + } + if task.ID != "" { + result := Result{ + Command: "PUBLISH", + MessageID: task.ID, + Status: "success", + Queue: msg.Queue, + } + _ = utils.Write(ctx, conn, result) + } default: return fmt.Errorf("unknown command: %d", msg.Command) } diff --git a/examples/consumer.go b/examples/consumer.go index 33d7507..6cc5744 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -10,7 +10,7 @@ import ( func main() { consumer := mq.NewConsumer(":8080") consumer.RegisterHandler("queue1", func(ctx context.Context, task mq.Task) mq.Result { - fmt.Println("Handling task for queue1:", task.ID) + fmt.Println("Handling task for queue1:", string(task.Payload)) return mq.Result{Payload: []byte(`{"task": 123}`), MessageID: task.ID} }) consumer.RegisterHandler("queue2", func(ctx context.Context, task mq.Task) mq.Result { diff --git a/examples/publisher.go b/examples/publisher.go index e311369..40e19f8 100644 --- a/examples/publisher.go +++ b/examples/publisher.go @@ -4,18 +4,61 @@ import ( "context" "encoding/json" "fmt" + "log" "github.com/oarkflow/mq" ) func main() { - ctx := context.Background() - publisher := mq.NewPublisher(":8080") - task := mq.Task{ - ID: "task-1", - Payload: json.RawMessage(`{"message": "Hello World"}`), + // Fire-and-Forget Example + err := publishAsync() + if err != nil { + log.Fatalf("Failed to publish async: %v", err) } - if err := publisher.Publish(ctx, "queue1", task); err != nil { - fmt.Println("Failed to publish task:", err) + + // Request/Response Example + err = publishSync() + if err != nil { + log.Fatalf("Failed to publish sync: %v", err) } } + +// publishAsync sends a task in Fire-and-Forget (async) mode +func publishAsync() error { + taskPayload := map[string]string{"message": "Fire-and-Forget Task"} + payload, _ := json.Marshal(taskPayload) + + task := mq.Task{ + Payload: payload, + } + + // Create publisher and send the task without waiting for a result + publisher := mq.NewPublisher(":8080") + err := publisher.PublishAsync(context.Background(), "queue1", task) + if err != nil { + return fmt.Errorf("failed to publish async task: %w", err) + } + + fmt.Println("Async task published successfully") + return nil +} + +// publishSync sends a task in Request/Response (sync) mode +func publishSync() error { + taskPayload := map[string]string{"message": "Request/Response Task"} + payload, _ := json.Marshal(taskPayload) + + task := mq.Task{ + Payload: payload, + } + + // Create publisher and send the task, waiting for the result + publisher := mq.NewPublisher(":8080") + result, err := publisher.PublishSync(context.Background(), "queue1", task) + if err != nil { + return fmt.Errorf("failed to publish sync task: %w", err) + } + + fmt.Printf("Sync task published. Result: %v\n", result) + return nil +} diff --git a/publisher.go b/publisher.go index f9154e1..b1dee1d 100644 --- a/publisher.go +++ b/publisher.go @@ -1,7 +1,9 @@ package mq import ( + "bufio" "context" + "encoding/json" "fmt" "net" @@ -30,3 +32,55 @@ func (p *Publisher) Publish(ctx context.Context, queue string, task Task) error } return utils.Write(ctx, conn, cmd) } + +func (p *Publisher) PublishAsync(ctx context.Context, queue string, task Task) error { + conn, err := net.Dial("tcp", p.brokerAddr) + if err != nil { + return fmt.Errorf("failed to connect to broker: %w", err) + } + defer conn.Close() + + cmd := Command{ + Command: PUBLISH, + Queue: queue, + MessageID: task.ID, + Payload: task.Payload, + } + + // Fire and forget: No need to wait for response + return utils.Write(ctx, conn, cmd) +} + +func (p *Publisher) PublishSync(ctx context.Context, queue string, task Task) (Result, error) { + conn, err := net.Dial("tcp", p.brokerAddr) + if err != nil { + return Result{}, fmt.Errorf("failed to connect to broker: %w", err) + } + defer conn.Close() + + cmd := Command{ + Command: PUBLISH, + Queue: queue, + MessageID: task.ID, + Payload: task.Payload, + } + + err = utils.Write(ctx, conn, cmd) + if err != nil { + return Result{}, err + } + + // Wait for response from broker/consumer + resultBytes, err := bufio.NewReader(conn).ReadBytes('\n') + if err != nil { + return Result{}, fmt.Errorf("failed to read response: %w", err) + } + + var result Result + err = json.Unmarshal(resultBytes, &result) + if err != nil { + return Result{}, fmt.Errorf("failed to unmarshal result: %w", err) + } + + return result, nil +}