init: publisher

This commit is contained in:
sujit
2024-09-27 12:34:48 +05:45
parent 13b922e038
commit d4cd24f5b4
6 changed files with 74 additions and 21 deletions

View File

@@ -43,6 +43,7 @@ type CMD int
const (
SUBSCRIBE CMD = iota + 1
PUBLISH
STOP
)
@@ -50,6 +51,7 @@ type Command struct {
Command CMD `json:"command"`
Queue string `json:"queue"`
MessageID string `json:"message_id"`
Payload json.RawMessage `json:"payload,omitempty"` // Used for carrying the task payload
Error string `json:"error,omitempty"`
}
@@ -217,6 +219,14 @@ func (b *Broker) handleCommandMessage(ctx context.Context, conn net.Conn, msg Co
switch msg.Command {
case SUBSCRIBE:
b.subscribe(ctx, msg.Queue, conn)
case PUBLISH:
task := Task{
ID: msg.MessageID,
Payload: json.RawMessage(msg.Error), // Assuming Error field carries task payload here
CreatedAt: time.Now(),
CurrentQueue: msg.Queue,
}
return b.Publish(ctx, task, msg.Queue)
default:
return fmt.Errorf("unknown command: %d", msg.Command)
}

View File

@@ -5,12 +5,13 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/oarkflow/mq/utils"
"math/rand"
"net"
"slices"
"sync"
"time"
"github.com/oarkflow/mq/utils"
)
type Consumer struct {
@@ -128,6 +129,7 @@ func (c *Consumer) Consume(ctx context.Context, queues ...string) error {
utils.ReadFromConn(ctx, c.conn, func(ctx context.Context, conn net.Conn, message []byte) error {
return c.readMessage(ctx, message)
})
fmt.Println("Stopping consumer")
}()
c.queues = slices.Compact(append(c.queues, queues...))
for _, q := range c.queues {

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/oarkflow/mq"
)
@@ -10,7 +11,7 @@ func main() {
consumer := mq.NewConsumer(":8080")
consumer.RegisterHandler("queue1", func(ctx context.Context, task mq.Task) mq.Result {
fmt.Println("Handling task for queue1:", task.ID)
return mq.Result{Payload: task.Payload, MessageID: task.ID}
return mq.Result{Payload: []byte(`{"task": 123}`), MessageID: task.ID}
})
consumer.RegisterHandler("queue2", func(ctx context.Context, task mq.Task) mq.Result {
fmt.Println("Handling task for queue2:", task.ID)

21
examples/publisher.go Normal file
View File

@@ -0,0 +1,21 @@
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/oarkflow/mq"
)
func main() {
ctx := context.Background()
publisher := mq.NewPublisher(":8080")
task := mq.Task{
ID: "task-1",
Payload: json.RawMessage(`{"message": "Hello World"}`),
}
if err := publisher.Publish(ctx, "queue1", task); err != nil {
fmt.Println("Failed to publish task:", err)
}
}

View File

@@ -3,29 +3,16 @@ package main
import (
"context"
"fmt"
"github.com/oarkflow/mq"
"time"
)
func main() {
b := mq.NewBroker(func(ctx context.Context, task *mq.Task) error {
fmt.Println("Received task", task.ID, string(task.Payload), string(task.Result), task.Error, task.CurrentQueue)
fmt.Println("Received task", task.ID, "Payload", string(task.Payload), "Result", string(task.Result), task.Error, task.CurrentQueue)
return nil
})
b.NewQueue("queue1")
b.NewQueue("queue2")
go func() {
for i := 0; i < 10; i++ {
b.Publish(context.Background(), mq.Task{
ID: fmt.Sprint(i),
Payload: []byte(`"Hello"`),
}, "queue1")
b.Publish(context.Background(), mq.Task{
ID: fmt.Sprint(i),
Payload: []byte(`"World"`),
}, "queue2")
time.Sleep(time.Second)
}
}()
b.Start(context.Background(), ":8080")
}

32
publisher.go Normal file
View File

@@ -0,0 +1,32 @@
package mq
import (
"context"
"fmt"
"net"
"github.com/oarkflow/mq/utils"
)
type Publisher struct {
brokerAddr string
}
func NewPublisher(brokerAddr string) *Publisher {
return &Publisher{brokerAddr: brokerAddr}
}
func (p *Publisher) Publish(ctx context.Context, queue string, task Task) error {
conn, err := net.Dial("tcp", p.brokerAddr)
if err != nil {
return fmt.Errorf("failed to connect to broker: %w", err)
}
defer conn.Close()
cmd := Command{
Command: PUBLISH,
Queue: queue,
MessageID: task.ID,
Error: string(task.Payload),
}
return utils.Write(ctx, conn, cmd)
}