diff --git a/broker.go b/broker.go index 26b9b7d..0ed32db 100644 --- a/broker.go +++ b/broker.go @@ -43,14 +43,16 @@ type CMD int const ( SUBSCRIBE CMD = iota + 1 + PUBLISH STOP ) type Command struct { - Command CMD `json:"command"` - Queue string `json:"queue"` - MessageID string `json:"message_id"` - Error string `json:"error,omitempty"` + Command CMD `json:"command"` + Queue string `json:"queue"` + MessageID string `json:"message_id"` + Payload json.RawMessage `json:"payload,omitempty"` // Used for carrying the task payload + Error string `json:"error,omitempty"` } type Result struct { @@ -217,6 +219,14 @@ func (b *Broker) handleCommandMessage(ctx context.Context, conn net.Conn, msg Co switch msg.Command { case SUBSCRIBE: b.subscribe(ctx, msg.Queue, conn) + case PUBLISH: + task := Task{ + ID: msg.MessageID, + Payload: json.RawMessage(msg.Error), // Assuming Error field carries task payload here + CreatedAt: time.Now(), + CurrentQueue: msg.Queue, + } + return b.Publish(ctx, task, msg.Queue) default: return fmt.Errorf("unknown command: %d", msg.Command) } diff --git a/consumer.go b/consumer.go index 51bf77b..a3c5f2a 100644 --- a/consumer.go +++ b/consumer.go @@ -5,12 +5,13 @@ import ( "encoding/json" "errors" "fmt" - "github.com/oarkflow/mq/utils" "math/rand" "net" "slices" "sync" "time" + + "github.com/oarkflow/mq/utils" ) type Consumer struct { @@ -128,6 +129,7 @@ func (c *Consumer) Consume(ctx context.Context, queues ...string) error { utils.ReadFromConn(ctx, c.conn, func(ctx context.Context, conn net.Conn, message []byte) error { return c.readMessage(ctx, message) }) + fmt.Println("Stopping consumer") }() c.queues = slices.Compact(append(c.queues, queues...)) for _, q := range c.queues { diff --git a/examples/consumer.go b/examples/consumer.go index 0618965..33d7507 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/oarkflow/mq" ) @@ -10,7 +11,7 @@ func main() { consumer := mq.NewConsumer(":8080") consumer.RegisterHandler("queue1", func(ctx context.Context, task mq.Task) mq.Result { fmt.Println("Handling task for queue1:", task.ID) - return mq.Result{Payload: task.Payload, MessageID: task.ID} + return mq.Result{Payload: []byte(`{"task": 123}`), MessageID: task.ID} }) consumer.RegisterHandler("queue2", func(ctx context.Context, task mq.Task) mq.Result { fmt.Println("Handling task for queue2:", task.ID) diff --git a/examples/publisher.go b/examples/publisher.go new file mode 100644 index 0000000..e311369 --- /dev/null +++ b/examples/publisher.go @@ -0,0 +1,21 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/oarkflow/mq" +) + +func main() { + ctx := context.Background() + publisher := mq.NewPublisher(":8080") + task := mq.Task{ + ID: "task-1", + Payload: json.RawMessage(`{"message": "Hello World"}`), + } + if err := publisher.Publish(ctx, "queue1", task); err != nil { + fmt.Println("Failed to publish task:", err) + } +} diff --git a/examples/server.go b/examples/server.go index afdcc06..b95d855 100644 --- a/examples/server.go +++ b/examples/server.go @@ -3,29 +3,16 @@ package main import ( "context" "fmt" + "github.com/oarkflow/mq" - "time" ) func main() { b := mq.NewBroker(func(ctx context.Context, task *mq.Task) error { - fmt.Println("Received task", task.ID, string(task.Payload), string(task.Result), task.Error, task.CurrentQueue) + fmt.Println("Received task", task.ID, "Payload", string(task.Payload), "Result", string(task.Result), task.Error, task.CurrentQueue) return nil }) b.NewQueue("queue1") b.NewQueue("queue2") - go func() { - for i := 0; i < 10; i++ { - b.Publish(context.Background(), mq.Task{ - ID: fmt.Sprint(i), - Payload: []byte(`"Hello"`), - }, "queue1") - b.Publish(context.Background(), mq.Task{ - ID: fmt.Sprint(i), - Payload: []byte(`"World"`), - }, "queue2") - time.Sleep(time.Second) - } - }() b.Start(context.Background(), ":8080") } diff --git a/publisher.go b/publisher.go new file mode 100644 index 0000000..f9154e1 --- /dev/null +++ b/publisher.go @@ -0,0 +1,32 @@ +package mq + +import ( + "context" + "fmt" + "net" + + "github.com/oarkflow/mq/utils" +) + +type Publisher struct { + brokerAddr string +} + +func NewPublisher(brokerAddr string) *Publisher { + return &Publisher{brokerAddr: brokerAddr} +} + +func (p *Publisher) Publish(ctx context.Context, queue string, task Task) error { + conn, err := net.Dial("tcp", p.brokerAddr) + if err != nil { + return fmt.Errorf("failed to connect to broker: %w", err) + } + defer conn.Close() + cmd := Command{ + Command: PUBLISH, + Queue: queue, + MessageID: task.ID, + Error: string(task.Payload), + } + return utils.Write(ctx, conn, cmd) +}