From b5ca85a469b52ace97a4e702eae5aa998f3cbecf Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 28 Oct 2024 08:38:36 +0545 Subject: [PATCH] feat: Add connection --- dag/dag.go | 2 +- dag/task_manager.go | 8 ++++---- examples/tasks/operations.go | 6 +++--- options.go | 21 ++++++++++----------- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/dag/dag.go b/dag/dag.go index 72435cb..abca5dc 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -170,7 +170,7 @@ func (tm *DAG) callbackToConsumer(ctx context.Context, result mq.Result) { func (tm *DAG) onTaskCallback(ctx context.Context, result mq.Result) mq.Result { if taskContext, ok := tm.taskContext[result.TaskID]; ok && result.Topic != "" { - return taskContext.handleCallback(ctx, result) + return taskContext.handleNextTask(ctx, result) } return mq.Result{} } diff --git a/dag/task_manager.go b/dag/task_manager.go index 7d7ad0e..70304ca 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -75,9 +75,9 @@ func (tm *TaskManager) dispatchFinalResult(ctx context.Context) mq.Result { func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge { edges := make([]Edge, len(node.Edges)) copy(edges, node.Edges) - if result.Status != "" { + if result.ConditionStatus != "" { if conditions, ok := tm.dag.conditions[FromNode(result.Topic)]; ok { - if targetNodeKey, ok := conditions[When(result.Status)]; ok { + if targetNodeKey, ok := conditions[When(result.ConditionStatus)]; ok { if targetNode, ok := tm.dag.nodes[string(targetNodeKey)]; ok { edges = append(edges, Edge{From: node, To: []*Node{targetNode}}) } @@ -91,7 +91,7 @@ func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge return edges } -func (tm *TaskManager) handleCallback(ctx context.Context, result mq.Result) mq.Result { +func (tm *TaskManager) handleNextTask(ctx context.Context, result mq.Result) mq.Result { defer func() { tm.wg.Done() mq.RecoverPanic(mq.RecoverTitle) @@ -207,7 +207,7 @@ func (tm *TaskManager) processNode(ctx context.Context, node *Node, payload json defer func() { result.Topic = node.Key tm.appendResult(result, false) - tm.handleCallback(ctx, result) + tm.handleNextTask(ctx, result) }() } select { diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go index e26e1ad..acd3996 100644 --- a/examples/tasks/operations.go +++ b/examples/tasks/operations.go @@ -38,11 +38,11 @@ func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { switch email := data["email"].(type) { case string: if email == "abc.xyz@gmail.com" { - return mq.Result{Payload: task.Payload, Status: "pass", Ctx: ctx} + return mq.Result{Payload: task.Payload, ConditionStatus: "pass", Ctx: ctx} } - return mq.Result{Payload: task.Payload, Status: "fail", Ctx: ctx} + return mq.Result{Payload: task.Payload, ConditionStatus: "fail", Ctx: ctx} default: - return mq.Result{Payload: task.Payload, Status: "fail", Ctx: ctx} + return mq.Result{Payload: task.Payload, ConditionStatus: "fail", Ctx: ctx} } } diff --git a/options.go b/options.go index 209ab70..126a6ed 100644 --- a/options.go +++ b/options.go @@ -13,15 +13,16 @@ import ( ) type Result struct { - CreatedAt time.Time `json:"created_at"` - ProcessedAt time.Time `json:"processed_at,omitempty"` - Latency string `json:"latency"` - Error error `json:"-"` // Keep error as an error type - Topic string `json:"topic"` - TaskID string `json:"task_id"` - Status string `json:"status"` - Ctx context.Context `json:"-"` - Payload json.RawMessage `json:"payload"` + CreatedAt time.Time `json:"created_at"` + ProcessedAt time.Time `json:"processed_at,omitempty"` + Latency string `json:"latency"` + Error error `json:"-"` // Keep error as an error type + Topic string `json:"topic"` + TaskID string `json:"task_id"` + Status string `json:"status"` + ConditionStatus string `json:"condition_status"` + Ctx context.Context `json:"-"` + Payload json.RawMessage `json:"payload"` } func (r Result) MarshalJSON() ([]byte, error) { @@ -50,8 +51,6 @@ func (r *Result) UnmarshalJSON(data []byte) error { if err := json.Unmarshal(data, &aux); err != nil { return err } - - // Restore the error from string to error type if aux.ErrMsg != "" { r.Error = errors.New(aux.ErrMsg) } else {