init: publisher

This commit is contained in:
sujit
2024-09-27 13:07:46 +05:45
parent d4cd24f5b4
commit 510c207fe4
4 changed files with 135 additions and 20 deletions

View File

@@ -184,17 +184,23 @@ func (b *Broker) subscribe(ctx context.Context, queueName string, conn net.Conn)
q.conn = make(map[net.Conn]struct{})
}
q.conn[conn] = struct{}{}
if q.deferred == nil {
q.deferred = xsync.NewMap[string, *Task]()
}
q.deferred.ForEach(func(_ string, message *Task) bool {
err := b.Publish(ctx, *message, queueName)
if err != nil {
return false
go func() {
select {
case <-ctx.Done():
b.removeConnection(queueName, conn)
}
return true
})
q.deferred = nil
}()
}
// Removes connection from the queue and broker
func (b *Broker) removeConnection(queueName string, conn net.Conn) {
if queue, ok := b.queues.Get(queueName); ok {
delete(queue.conn, conn)
if len(queue.conn) == 0 {
b.queues.Del(queueName)
}
conn.Close()
}
}
func (b *Broker) readMessage(ctx context.Context, conn net.Conn, message []byte) error {
@@ -222,11 +228,23 @@ func (b *Broker) handleCommandMessage(ctx context.Context, conn net.Conn, msg Co
case PUBLISH:
task := Task{
ID: msg.MessageID,
Payload: json.RawMessage(msg.Error), // Assuming Error field carries task payload here
Payload: msg.Payload,
CreatedAt: time.Now(),
CurrentQueue: msg.Queue,
}
return b.Publish(ctx, task, msg.Queue)
err := b.Publish(ctx, task, msg.Queue)
if err != nil {
return err
}
if task.ID != "" {
result := Result{
Command: "PUBLISH",
MessageID: task.ID,
Status: "success",
Queue: msg.Queue,
}
_ = utils.Write(ctx, conn, result)
}
default:
return fmt.Errorf("unknown command: %d", msg.Command)
}

View File

@@ -10,7 +10,7 @@ import (
func main() {
consumer := mq.NewConsumer(":8080")
consumer.RegisterHandler("queue1", func(ctx context.Context, task mq.Task) mq.Result {
fmt.Println("Handling task for queue1:", task.ID)
fmt.Println("Handling task for queue1:", string(task.Payload))
return mq.Result{Payload: []byte(`{"task": 123}`), MessageID: task.ID}
})
consumer.RegisterHandler("queue2", func(ctx context.Context, task mq.Task) mq.Result {

View File

@@ -4,18 +4,61 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/oarkflow/mq"
)
func main() {
ctx := context.Background()
publisher := mq.NewPublisher(":8080")
task := mq.Task{
ID: "task-1",
Payload: json.RawMessage(`{"message": "Hello World"}`),
// Fire-and-Forget Example
err := publishAsync()
if err != nil {
log.Fatalf("Failed to publish async: %v", err)
}
if err := publisher.Publish(ctx, "queue1", task); err != nil {
fmt.Println("Failed to publish task:", err)
// Request/Response Example
err = publishSync()
if err != nil {
log.Fatalf("Failed to publish sync: %v", err)
}
}
// publishAsync sends a task in Fire-and-Forget (async) mode
func publishAsync() error {
taskPayload := map[string]string{"message": "Fire-and-Forget Task"}
payload, _ := json.Marshal(taskPayload)
task := mq.Task{
Payload: payload,
}
// Create publisher and send the task without waiting for a result
publisher := mq.NewPublisher(":8080")
err := publisher.PublishAsync(context.Background(), "queue1", task)
if err != nil {
return fmt.Errorf("failed to publish async task: %w", err)
}
fmt.Println("Async task published successfully")
return nil
}
// publishSync sends a task in Request/Response (sync) mode
func publishSync() error {
taskPayload := map[string]string{"message": "Request/Response Task"}
payload, _ := json.Marshal(taskPayload)
task := mq.Task{
Payload: payload,
}
// Create publisher and send the task, waiting for the result
publisher := mq.NewPublisher(":8080")
result, err := publisher.PublishSync(context.Background(), "queue1", task)
if err != nil {
return fmt.Errorf("failed to publish sync task: %w", err)
}
fmt.Printf("Sync task published. Result: %v\n", result)
return nil
}

View File

@@ -1,7 +1,9 @@
package mq
import (
"bufio"
"context"
"encoding/json"
"fmt"
"net"
@@ -30,3 +32,55 @@ func (p *Publisher) Publish(ctx context.Context, queue string, task Task) error
}
return utils.Write(ctx, conn, cmd)
}
func (p *Publisher) PublishAsync(ctx context.Context, queue string, task Task) error {
conn, err := net.Dial("tcp", p.brokerAddr)
if err != nil {
return fmt.Errorf("failed to connect to broker: %w", err)
}
defer conn.Close()
cmd := Command{
Command: PUBLISH,
Queue: queue,
MessageID: task.ID,
Payload: task.Payload,
}
// Fire and forget: No need to wait for response
return utils.Write(ctx, conn, cmd)
}
func (p *Publisher) PublishSync(ctx context.Context, queue string, task Task) (Result, error) {
conn, err := net.Dial("tcp", p.brokerAddr)
if err != nil {
return Result{}, fmt.Errorf("failed to connect to broker: %w", err)
}
defer conn.Close()
cmd := Command{
Command: PUBLISH,
Queue: queue,
MessageID: task.ID,
Payload: task.Payload,
}
err = utils.Write(ctx, conn, cmd)
if err != nil {
return Result{}, err
}
// Wait for response from broker/consumer
resultBytes, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
return Result{}, fmt.Errorf("failed to read response: %w", err)
}
var result Result
err = json.Unmarshal(resultBytes, &result)
if err != nil {
return Result{}, fmt.Errorf("failed to unmarshal result: %w", err)
}
return result, nil
}