diff --git a/dag/v2/dag.go b/dag/v2/dag.go index 3d41db8..f3e2181 100644 --- a/dag/v2/dag.go +++ b/dag/v2/dag.go @@ -52,6 +52,7 @@ type DAG struct { taskManager storage.IMap[string, *TaskManager] finalResult func(taskID string, result Result) mu sync.Mutex + Error error } func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG { @@ -62,14 +63,22 @@ func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG { } } -func (tm *DAG) AddNode(nodeID string, handler func(payload json.RawMessage) Result) { +func (tm *DAG) AddNode(nodeID string, handler func(payload json.RawMessage) Result) *DAG { + if tm.Error != nil { + return tm + } tm.nodes.Set(nodeID, &Node{ID: nodeID, Handler: handler}) + return tm } -func (tm *DAG) AddEdge(from string, targets ...string) error { +func (tm *DAG) AddEdge(from string, targets ...string) *DAG { + if tm.Error != nil { + return tm + } node, ok := tm.nodes.Get(from) if !ok { - return fmt.Errorf("node not found %s", from) + tm.Error = fmt.Errorf("node not found %s", from) + return tm } for _, target := range targets { if targetNode, ok := tm.nodes.Get(target); ok { @@ -77,7 +86,7 @@ func (tm *DAG) AddEdge(from string, targets ...string) error { node.Edges = append(node.Edges, edge) } } - return nil + return tm } func (tm *DAG) GetNextNodes(key string) ([]*Node, error) { @@ -116,7 +125,6 @@ func (tm *DAG) formHandler(w http.ResponseWriter, r *http.Request) { taskID := mq.NewID() manager := NewTaskManager(tm) tm.taskManager.Set(taskID, manager) - go manager.Run() payload := fmt.Sprintf(`{"email": "%s", "age": "%s", "gender": "%s"}`, email, age, gender) manager.Trigger(taskID, "NodeA", json.RawMessage(payload)) http.Redirect(w, r, "/result?taskID="+taskID, http.StatusFound) @@ -143,9 +151,6 @@ func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) { } func (tm *DAG) Start(addr string) { - tm.AddEdge("NodeA", "NodeB") - tm.AddEdge("NodeB", "NodeC") - tm.AddEdge("NodeC", "Result") http.HandleFunc("/form", tm.formHandler) http.HandleFunc("/result", tm.resultHandler) http.HandleFunc("/task-result", tm.taskStatusHandler) diff --git a/dag/v2/task_manager.go b/dag/v2/task_manager.go index 45f13ad..2cab763 100644 --- a/dag/v2/task_manager.go +++ b/dag/v2/task_manager.go @@ -16,7 +16,6 @@ type TaskState struct { Timestamp time.Time Result Result targetResults storage.IMap[string, Result] - my sync.Mutex } type nodeResult struct { @@ -46,6 +45,7 @@ func NewTaskManager(dag *DAG) *TaskManager { resultQueue: make(chan nodeResult, 100), dag: dag, } + go tm.Run() go tm.WaitForResult() return tm } diff --git a/examples/v2.go b/examples/v2.go index fbe02cd..dc52e4d 100644 --- a/examples/v2.go +++ b/examples/v2.go @@ -54,5 +54,11 @@ func main() { dag.AddNode("NodeB", NodeB) dag.AddNode("NodeC", NodeC) dag.AddNode("Result", Result) + dag.AddEdge("NodeA", "NodeB") + dag.AddEdge("NodeB", "NodeC") + dag.AddEdge("NodeC", "Result") + if dag.Error != nil { + panic(dag.Error) + } dag.Start(":8080") }