From 43f06156bb4046bfac056970e75d631dc09d038c Mon Sep 17 00:00:00 2001 From: sujit Date: Sun, 29 Sep 2024 16:37:52 +0545 Subject: [PATCH] init: publisher --- dag/dag.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/dag/dag.go b/dag/dag.go index 3ab693f..066c3e4 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -20,7 +20,7 @@ type DAG struct { server *mq.Broker nodes map[string]*mq.Consumer edges map[string][]string - loopEdges map[string]string + loopEdges map[string][]string taskChMap map[string]chan mq.Result taskResults map[string]map[string]*taskContext mu sync.Mutex @@ -30,7 +30,7 @@ func New(opts ...mq.Option) *DAG { d := &DAG{ nodes: make(map[string]*mq.Consumer), edges: make(map[string][]string), - loopEdges: make(map[string]string), + loopEdges: make(map[string][]string), taskChMap: make(map[string]chan mq.Result), taskResults: make(map[string]map[string]*taskContext), } @@ -49,7 +49,7 @@ func (d *DAG) AddEdge(fromNode string, toNodes ...string) { d.edges[fromNode] = toNodes } -func (d *DAG) AddLoop(fromNode string, toNode string) { +func (d *DAG) AddLoop(fromNode string, toNode ...string) { d.loopEdges[fromNode] = toNode } @@ -119,7 +119,7 @@ func (d *DAG) TaskCallback(ctx context.Context, task *mq.Task) error { } else { payload = task.Result } - if loopNode, exists := d.loopEdges[task.CurrentQueue]; exists { + if loopNodes, exists := d.loopEdges[task.CurrentQueue]; exists { var items []json.RawMessage if err := json.Unmarshal(payload, &items); err != nil { return err @@ -132,12 +132,15 @@ func (d *DAG) TaskCallback(ctx context.Context, task *mq.Task) error { } ctx = mq.SetHeaders(ctx, map[string]string{mq.TriggerNode: task.CurrentQueue}) - for _, item := range items { - _, err := d.PublishTask(ctx, item, loopNode, task.ID) - if err != nil { - return err + for _, loopNode := range loopNodes { + for _, item := range items { + _, err := d.PublishTask(ctx, item, loopNode, task.ID) + if err != nil { + return err + } } } + return nil } if nodeType == "loop" && completed {