From c4e93c31e4dc41ca2375ea36897e4c52007ab63b Mon Sep 17 00:00:00 2001 From: sujit Date: Sun, 29 Sep 2024 01:47:54 +0545 Subject: [PATCH] init: publisher --- examples/dag.go | 47 +++++++++++++++++++++-------------------------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/examples/dag.go b/examples/dag.go index f1cf946..e59d722 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "fmt" "log" "sync" "time" @@ -14,25 +15,33 @@ func main() { dag := NewDAG() dag.AddNode("queue1", func(ctx context.Context, task mq.Task) mq.Result { log.Printf("Handling task for queue1: %s", string(task.Payload)) - return mq.Result{Payload: []byte(`[{"user_id": 1}, {"user_id": 2}]`), MessageID: task.ID} + return mq.Result{Payload: task.Payload, MessageID: task.ID} }) dag.AddNode("queue2", func(ctx context.Context, task mq.Task) mq.Result { - var item map[string]interface{} - if err := json.Unmarshal(task.Payload, &item); err != nil { - return mq.Result{Payload: nil, Error: err, MessageID: task.ID} - } - item["salary"] = 12000 // Simulating task logic by adding "salary" - result, _ := json.Marshal(item) - log.Printf("Handling task for queue2: %s", string(result)) - return mq.Result{Payload: result, MessageID: task.ID} + log.Printf("Handling task for queue2: %s", string(task.Payload)) + return mq.Result{Payload: task.Payload, MessageID: task.ID} + }) + dag.AddNode("queue3", func(ctx context.Context, task mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + return mq.Result{Error: err} + } + data["salary"] = fmt.Sprintf("12000%v", data["user_id"]) + bt, _ := json.Marshal(data) + log.Printf("Handling task for queue3: %s", string(task.Payload)) + return mq.Result{Payload: bt, MessageID: task.ID} + }) + dag.AddNode("queue4", func(ctx context.Context, task mq.Task) mq.Result { + log.Printf("Handling task for queue4: %s", string(task.Payload)) + return mq.Result{Payload: task.Payload, MessageID: task.ID} }) - dag.AddEdge("queue1", "queue2") - dag.AddLoop("queue1", "queue2") // This adds a loop between queue1 and queue2 + dag.AddLoop("queue2", "queue3") // Add loop to handle array go func() { time.Sleep(2 * time.Second) - finalResult := dag.Send([]byte(`{}`)) // sending empty payload to initiate + finalResult := dag.Send([]byte(`[{"user_id": 1}, {"user_id": 2}]`)) log.Printf("Final result received: %s", string(finalResult.Payload)) }() @@ -110,46 +119,32 @@ func (d *DAG) PublishTask(ctx context.Context, payload []byte, queueName string, // TaskCallback is called when a task completes and decides the next step func (d *DAG) TaskCallback(ctx context.Context, task *mq.Task) error { log.Printf("Callback from queue %s with result: %s", task.CurrentQueue, string(task.Result)) - - // Check if this task belongs to a loop d.mu.Lock() loopCtx, isLoopTask := d.loopTaskMap[task.ID] d.mu.Unlock() if isLoopTask { - // Send the sub-task result to the loop's result channel loopCtx.subResultCh <- mq.Result{Payload: task.Result, MessageID: task.ID} } - - // Handle loopEdges first, if applicable if loopNode, exists := d.loopEdges[task.CurrentQueue]; exists { - // This is a loop node, and we need to handle array processing var items []json.RawMessage if err := json.Unmarshal(task.Result, &items); err != nil { return err } - - // Create a loop task context to track the state of this loop loopCtx := &loopTaskContext{ subResultCh: make(chan mq.Result, len(items)), // A channel to collect sub-task results totalItems: len(items), results: make([]json.RawMessage, 0, len(items)), } - - // Register the loop context for this task d.mu.Lock() d.loopTaskMap[task.ID] = loopCtx d.mu.Unlock() - - // Publish a sub-task for each item in the array for _, item := range items { _, err := d.PublishTask(ctx, item, loopNode, task.ID) if err != nil { return err } } - go d.waitForLoopCompletion(ctx, task.ID, task.CurrentQueue) - return nil }