From 43ce1cdd84cf99eabe6f82e0c30760c64db8d84e Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 17 Feb 2025 22:40:37 +0545 Subject: [PATCH] feat: sig --- dag/task_manager.go | 46 ++++++++++++++++++++++++++++++++++++++++----- examples/v2.go | 7 ++++--- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/dag/task_manager.go b/dag/task_manager.go index 61cde7f..fb28b1d 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -9,6 +9,7 @@ import ( "time" "github.com/oarkflow/mq" + "github.com/oarkflow/mq/logger" "github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage/memory" @@ -141,15 +142,18 @@ func (tm *TaskManager) waitForResult() { } func (tm *TaskManager) processNode(exec *task) { + startTime := time.Now() pureNodeID := strings.Split(exec.nodeID, Delimiter)[0] node, exists := tm.dag.nodes.Get(pureNodeID) if !exists { - log.Printf("Node %s does not exist while processing node\n", pureNodeID) + tm.dag.Logger().Error("Node not found while processing node", + logger.Field{Key: "nodeID", Value: pureNodeID}) return } state, _ := tm.taskStates.Get(exec.nodeID) if state == nil { - log.Printf("State for node %s not found; creating new state.\n", exec.nodeID) + tm.dag.Logger().Warn("State not found; creating new state", + logger.Field{Key: "nodeID", Value: exec.nodeID}) state = newTaskState(exec.nodeID) tm.taskStates.Set(exec.nodeID, state) } @@ -158,28 +162,59 @@ func (tm *TaskManager) processNode(exec *task) { tm.currentNodePayload.Clear() tm.currentNodeResult.Clear() tm.currentNodePayload.Set(exec.nodeID, exec.payload) + + // Execute the node’s task. result := node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID)) + // Calculate the per-node latency. + nodeLatency := time.Since(startTime) + + // Log the result of node execution with comprehensive details. + logFields := []logger.Field{ + {Key: "nodeID", Value: exec.nodeID}, + {Key: "pureNodeID", Value: pureNodeID}, + {Key: "taskID", Value: exec.taskID}, + {Key: "latency", Value: nodeLatency.String()}, + } + if result.Error != nil { + logFields = append(logFields, logger.Field{Key: "error", Value: result.Error.Error()}) + logFields = append(logFields, logger.Field{Key: "status", Value: mq.Failed}) + tm.dag.Logger().Error("Node execution failed", logFields...) + } else { + logFields = append(logFields, logger.Field{Key: "status", Value: mq.Completed}) + tm.dag.Logger().Info("Node executed successfully", logFields...) + } + + // If this is the last node, mark it accordingly. isLast, err := tm.dag.IsLastNode(pureNodeID) if err != nil { - log.Printf("Error checking if node %s is last: %v\n", pureNodeID, err) + tm.dag.Logger().Error("Error checking if node is last", + logger.Field{Key: "nodeID", Value: pureNodeID}, + logger.Field{Key: "error", Value: err.Error()}) } else if isLast { result.Last = true } + tm.currentNodeResult.Set(exec.nodeID, result) state.Result = result result.Topic = node.ID + tm.updateTimestamps(&result) if result.Error != nil { - tm.updateTimestamps(&result) + result.Status = mq.Failed + state.Status = mq.Failed + state.Result.Status = mq.Failed + state.Result.Latency = result.Latency tm.result = &result tm.resultCh <- result tm.processFinalResult(state) return } + result.Status = mq.Completed + state.Result.Status = mq.Completed + state.Result.Latency = result.Latency if isLast { tm.processFinalResult(state) } if node.NodeType == Page { - tm.updateTimestamps(&result) tm.result = &result tm.resultCh <- result return @@ -407,6 +442,7 @@ func (tm *TaskManager) retryDeferredTasks() { } func (tm *TaskManager) processFinalResult(state *TaskState) { + state.Status = mq.Completed state.targetResults.Clear() if tm.dag.finalResult != nil { tm.dag.finalResult(tm.taskID, state.Result) diff --git a/examples/v2.go b/examples/v2.go index 1b12a16..bca12bd 100644 --- a/examples/v2.go +++ b/examples/v2.go @@ -4,9 +4,10 @@ import ( "context" "encoding/json" "fmt" + "os" + "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" - "os" "github.com/oarkflow/jet" @@ -143,11 +144,11 @@ func notify(taskID string, result mq.Result) { if err != nil { panic(err) } - fmt.Printf("Final result for task %s: %s\n", taskID, string(filteredData)) + fmt.Printf("Final result for task %s: %s, status: %s, latency: %s\n", taskID, string(filteredData), result.Status, result.Latency) } func main() { - flow := dag.NewDAG("Sample DAG", "sample-dag", notify) + flow := dag.NewDAG("Sample DAG", "sample-dag", notify, mq.WithSyncMode(true)) flow.AddNode(dag.Page, "Form", "Form", &Form{}) flow.AddNode(dag.Function, "NodeA", "NodeA", &NodeA{}) flow.AddNode(dag.Function, "NodeB", "NodeB", &NodeB{})