From e79620f34129c9217047b3bb8a5ec21a202a136a Mon Sep 17 00:00:00 2001 From: sujit Date: Tue, 19 Nov 2024 00:03:16 +0545 Subject: [PATCH] feat: [wip] - Implement html node --- dag/v2/dag.go | 14 ++++++-------- dag/v2/task_manager.go | 23 ++++++++++++----------- dag/v2/v2.go | 4 ++-- examples/v2.go | 14 +++++++------- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/dag/v2/dag.go b/dag/v2/dag.go index 9d0980d..7d34210 100644 --- a/dag/v2/dag.go +++ b/dag/v2/dag.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "sync" "github.com/oarkflow/mq" "github.com/oarkflow/mq/storage" @@ -29,10 +28,10 @@ type Result struct { type NodeType int -func (c NodeType) IsValid() bool { return c >= Process && c <= Page } +func (c NodeType) IsValid() bool { return c >= Function && c <= Page } const ( - Process NodeType = iota + Function NodeType = iota Page ) @@ -62,7 +61,6 @@ type DAG struct { nodes storage.IMap[string, *Node] taskManager storage.IMap[string, *TaskManager] finalResult func(taskID string, result Result) - mu sync.Mutex Error error startNode string } @@ -122,7 +120,7 @@ func (tm *DAG) AddNode(nodeType NodeType, nodeID string, handler func(ctx contex return tm } -func (tm *DAG) AddEdge(from string, targets ...string) *DAG { +func (tm *DAG) AddEdge(edgeType EdgeType, from string, targets ...string) *DAG { if tm.Error != nil { return tm } @@ -133,7 +131,7 @@ func (tm *DAG) AddEdge(from string, targets ...string) *DAG { } for _, target := range targets { if targetNode, ok := tm.nodes.Get(target); ok { - edge := Edge{From: node, To: targetNode, Type: Simple} + edge := Edge{From: node, To: targetNode, Type: edgeType} node.Edges = append(node.Edges, edge) } } @@ -179,7 +177,7 @@ func (tm *DAG) ProcessTask(ctx context.Context, payload []byte) Result { manager, ok := tm.taskManager.Get(taskID) resultCh := make(chan Result, 1) if !ok { - manager = NewTaskManager(tm, resultCh) + manager = NewTaskManager(tm, taskID, resultCh) tm.taskManager.Set(taskID, manager) } else { manager.resultCh = resultCh @@ -201,6 +199,6 @@ func (tm *DAG) ProcessTask(ctx context.Context, payload []byte) Result { if ok && node.Type != Page && payload == nil { return Result{Error: fmt.Errorf("payload is required for node %s", firstNode), Ctx: ctx} } - manager.ProcessTask(ctx, taskID, firstNode, payload) + manager.ProcessTask(ctx, firstNode, payload) return <-resultCh } diff --git a/dag/v2/task_manager.go b/dag/v2/task_manager.go index 1fb7e3b..6064e3b 100644 --- a/dag/v2/task_manager.go +++ b/dag/v2/task_manager.go @@ -21,7 +21,6 @@ type TaskState struct { type nodeResult struct { ctx context.Context - taskID string nodeID string result Result } @@ -30,6 +29,7 @@ type TaskManager struct { taskStates map[string]*TaskState currentNode string dag *DAG + taskID string mu sync.RWMutex taskQueue chan *Task resultQueue chan nodeResult @@ -52,12 +52,13 @@ func NewTask(ctx context.Context, taskID, nodeID string, payload json.RawMessage } } -func NewTaskManager(dag *DAG, resultCh chan Result) *TaskManager { +func NewTaskManager(dag *DAG, taskID string, resultCh chan Result) *TaskManager { tm := &TaskManager{ taskStates: make(map[string]*TaskState), taskQueue: make(chan *Task, 100), resultQueue: make(chan nodeResult, 100), resultCh: resultCh, + taskID: taskID, dag: dag, } go tm.Run() @@ -65,11 +66,11 @@ func NewTaskManager(dag *DAG, resultCh chan Result) *TaskManager { return tm } -func (tm *TaskManager) ProcessTask(ctx context.Context, taskID, startNode string, payload json.RawMessage) { +func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage) { tm.mu.Lock() tm.taskStates[startNode] = newTaskState(startNode) tm.mu.Unlock() - tm.taskQueue <- NewTask(ctx, taskID, startNode, payload) + tm.taskQueue <- NewTask(ctx, tm.taskID, startNode, payload) } func newTaskState(nodeID string) *TaskState { @@ -120,7 +121,7 @@ func (tm *TaskManager) processNode(exec *Task) { tm.resultCh <- result return } - tm.resultQueue <- nodeResult{taskID: exec.taskID, nodeID: exec.nodeID, result: result, ctx: exec.ctx} + tm.resultQueue <- nodeResult{nodeID: exec.nodeID, result: result, ctx: exec.ctx} } func (tm *TaskManager) WaitForResult() { @@ -151,7 +152,7 @@ func (tm *TaskManager) onNodeCompleted(nodeResult nodeResult) { tm.mu.Unlock() if tm.areAllTargetNodesCompleted(parentNode.ID) && allTargetNodesDone { - tm.aggregateResults(parentNode.ID, nodeResult.taskID) + tm.aggregateResults(parentNode.ID) } } } @@ -163,7 +164,7 @@ func (tm *TaskManager) onNodeCompleted(nodeResult nodeResult) { tm.taskStates[edge.To.ID] = newTaskState(edge.To.ID) } tm.mu.Unlock() - tm.taskQueue <- NewTask(nodeResult.ctx, nodeResult.taskID, edge.To.ID, nodeResult.result.Data) + tm.taskQueue <- NewTask(nodeResult.ctx, tm.taskID, edge.To.ID, nodeResult.result.Data) } } @@ -183,7 +184,7 @@ func (tm *TaskManager) areAllTargetNodesCompleted(parentNodeID string) bool { return true } -func (tm *TaskManager) aggregateResults(parentNode string, taskID string) { +func (tm *TaskManager) aggregateResults(parentNode string) { tm.mu.Lock() defer tm.mu.Unlock() state := tm.taskStates[parentNode] @@ -201,10 +202,10 @@ func (tm *TaskManager) aggregateResults(parentNode string, taskID string) { state.Result = state.targetResults.Values()[0] } tm.resultCh <- state.Result - tm.processFinalResult(taskID, state) + tm.processFinalResult(state) } -func (tm *TaskManager) processFinalResult(taskID string, state *TaskState) { +func (tm *TaskManager) processFinalResult(state *TaskState) { state.targetResults.Clear() - tm.dag.finalResult(taskID, state.Result) + tm.dag.finalResult(tm.taskID, state.Result) } diff --git a/dag/v2/v2.go b/dag/v2/v2.go index 84a06e0..388178b 100644 --- a/dag/v2/v2.go +++ b/dag/v2/v2.go @@ -196,7 +196,7 @@ func processNode(w http.ResponseWriter, r *http.Request, task *Task, tm *TaskMan log.Printf("No ConditionStatus found, following edge to next Operation: %s", nextNodeID) } else { log.Printf("Task %s completed. Final result: %s", task.ID, task.FinalResult) - fmt.Fprintf(w, "

Process Completed

%s

", task.FinalResult) + fmt.Fprintf(w, "

Function Completed

%s

", task.FinalResult) return } } @@ -243,7 +243,7 @@ func submitHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { nextNode, exists := tm.GetNextNode(task) if !exists { log.Printf("Task %s completed. Final result: %s", task.ID, task.FinalResult) - fmt.Fprintf(w, "

Process Completed

%s

", task.FinalResult) + fmt.Fprintf(w, "

Function Completed

%s

", task.FinalResult) return } switch nextNode := nextNode.(type) { diff --git a/examples/v2.go b/examples/v2.go index 9102843..3285c90 100644 --- a/examples/v2.go +++ b/examples/v2.go @@ -94,14 +94,14 @@ func notify(taskID string, result v2.Result) { func main() { dag := v2.NewDAG(notify) dag.AddNode(v2.Page, "Form", Form) - dag.AddNode(v2.Process, "NodeA", NodeA) - dag.AddNode(v2.Process, "NodeB", NodeB) - dag.AddNode(v2.Process, "NodeC", NodeC) + dag.AddNode(v2.Function, "NodeA", NodeA) + dag.AddNode(v2.Function, "NodeB", NodeB) + dag.AddNode(v2.Function, "NodeC", NodeC) dag.AddNode(v2.Page, "Result", Result) - // dag.AddEdge("Form", "NodeA") - dag.AddEdge("NodeA", "NodeB") - dag.AddEdge("NodeB", "NodeC") - // dag.AddEdge("NodeC", "Result") + dag.AddEdge(v2.Simple, "Form", "NodeA") + dag.AddEdge(v2.Simple, "NodeA", "NodeB") + dag.AddEdge(v2.Simple, "NodeB", "NodeC") + dag.AddEdge(v2.Simple, "NodeC", "Result") if dag.Error != nil { panic(dag.Error) }