From 2d334239edd3e70f9bd60893bcdb76d47befadea Mon Sep 17 00:00:00 2001 From: sujit Date: Fri, 27 Sep 2024 19:30:22 +0545 Subject: [PATCH] init: publisher --- dag.go | 6 +++--- examples/dag.go | 25 ++++++++++++++++++------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/dag.go b/dag.go index e5b38b3..bb2e8a2 100644 --- a/dag.go +++ b/dag.go @@ -103,12 +103,12 @@ func (dag *DAG) TaskCallback(ctx context.Context, task *Task) error { } func (dag *DAG) AddNode(queue string, handler Handler, firstNode ...bool) { - consumer := NewConsumer(dag.brokerAddr) - consumer.RegisterHandler(queue, handler) + con := NewConsumer(dag.brokerAddr) + con.RegisterHandler(queue, handler) dag.broker.NewQueue(queue) n := &node{ queue: queue, - consumer: consumer, + consumer: con, handler: handler, } if len(firstNode) > 0 && firstNode[0] { diff --git a/examples/dag.go b/examples/dag.go index 26e9850..e962312 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -4,8 +4,9 @@ import ( "context" "encoding/json" "fmt" - "github.com/oarkflow/mq" "time" + + "github.com/oarkflow/mq" ) func handleNode1(_ context.Context, task mq.Task) mq.Result { @@ -40,15 +41,25 @@ func handleNode2(_ context.Context, task mq.Task) mq.Result { } func handleNode3(_ context.Context, task mq.Task) mq.Result { - result := `{"field": "node3", "item": %s}` - fmt.Printf("Processing task at node3: %s\n", string(task.Payload)) - return mq.Result{Status: "completed", Payload: json.RawMessage(fmt.Sprintf(result, string(task.Payload)))} + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + return mq.Result{Error: err} + } + data["item"] = "Item processed in node3" + bt, _ := json.Marshal(data) + return mq.Result{Status: "completed", Payload: bt} } func handleNode4(_ context.Context, task mq.Task) mq.Result { - result := `{"field": "node4", "item": %s}` - fmt.Printf("Processing task at node4: %s\n", string(task.Payload)) - return mq.Result{Status: "completed", Payload: json.RawMessage(fmt.Sprintf(result, string(task.Payload)))} + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + return mq.Result{Error: err} + } + data["item"] = "An Item processed in node4" + bt, _ := json.Marshal(data) + return mq.Result{Status: "completed", Payload: bt} } func main() {