init: publisher

This commit is contained in:
sujit
2024-09-27 19:30:22 +05:45
parent b35251f7a0
commit 2d334239ed
2 changed files with 21 additions and 10 deletions

6
dag.go
View File

@@ -103,12 +103,12 @@ func (dag *DAG) TaskCallback(ctx context.Context, task *Task) error {
} }
func (dag *DAG) AddNode(queue string, handler Handler, firstNode ...bool) { func (dag *DAG) AddNode(queue string, handler Handler, firstNode ...bool) {
consumer := NewConsumer(dag.brokerAddr) con := NewConsumer(dag.brokerAddr)
consumer.RegisterHandler(queue, handler) con.RegisterHandler(queue, handler)
dag.broker.NewQueue(queue) dag.broker.NewQueue(queue)
n := &node{ n := &node{
queue: queue, queue: queue,
consumer: consumer, consumer: con,
handler: handler, handler: handler,
} }
if len(firstNode) > 0 && firstNode[0] { if len(firstNode) > 0 && firstNode[0] {

View File

@@ -4,8 +4,9 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/oarkflow/mq"
"time" "time"
"github.com/oarkflow/mq"
) )
func handleNode1(_ context.Context, task mq.Task) mq.Result { func handleNode1(_ context.Context, task mq.Task) mq.Result {
@@ -40,15 +41,25 @@ func handleNode2(_ context.Context, task mq.Task) mq.Result {
} }
func handleNode3(_ context.Context, task mq.Task) mq.Result { func handleNode3(_ context.Context, task mq.Task) mq.Result {
result := `{"field": "node3", "item": %s}` var data map[string]any
fmt.Printf("Processing task at node3: %s\n", string(task.Payload)) err := json.Unmarshal(task.Payload, &data)
return mq.Result{Status: "completed", Payload: json.RawMessage(fmt.Sprintf(result, string(task.Payload)))} if err != nil {
return mq.Result{Error: err}
}
data["item"] = "Item processed in node3"
bt, _ := json.Marshal(data)
return mq.Result{Status: "completed", Payload: bt}
} }
func handleNode4(_ context.Context, task mq.Task) mq.Result { func handleNode4(_ context.Context, task mq.Task) mq.Result {
result := `{"field": "node4", "item": %s}` var data map[string]any
fmt.Printf("Processing task at node4: %s\n", string(task.Payload)) err := json.Unmarshal(task.Payload, &data)
return mq.Result{Status: "completed", Payload: json.RawMessage(fmt.Sprintf(result, string(task.Payload)))} if err != nil {
return mq.Result{Error: err}
}
data["item"] = "An Item processed in node4"
bt, _ := json.Marshal(data)
return mq.Result{Status: "completed", Payload: bt}
} }
func main() { func main() {