feat: [wip] - Implement html node

This commit is contained in:
sujit
2024-11-19 00:03:16 +05:45
parent 043b622bb2
commit e79620f341
4 changed files with 27 additions and 28 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/storage"
@@ -29,10 +28,10 @@ type Result struct {
type NodeType int
func (c NodeType) IsValid() bool { return c >= Process && c <= Page }
func (c NodeType) IsValid() bool { return c >= Function && c <= Page }
const (
Process NodeType = iota
Function NodeType = iota
Page
)
@@ -62,7 +61,6 @@ type DAG struct {
nodes storage.IMap[string, *Node]
taskManager storage.IMap[string, *TaskManager]
finalResult func(taskID string, result Result)
mu sync.Mutex
Error error
startNode string
}
@@ -122,7 +120,7 @@ func (tm *DAG) AddNode(nodeType NodeType, nodeID string, handler func(ctx contex
return tm
}
func (tm *DAG) AddEdge(from string, targets ...string) *DAG {
func (tm *DAG) AddEdge(edgeType EdgeType, from string, targets ...string) *DAG {
if tm.Error != nil {
return tm
}
@@ -133,7 +131,7 @@ func (tm *DAG) AddEdge(from string, targets ...string) *DAG {
}
for _, target := range targets {
if targetNode, ok := tm.nodes.Get(target); ok {
edge := Edge{From: node, To: targetNode, Type: Simple}
edge := Edge{From: node, To: targetNode, Type: edgeType}
node.Edges = append(node.Edges, edge)
}
}
@@ -179,7 +177,7 @@ func (tm *DAG) ProcessTask(ctx context.Context, payload []byte) Result {
manager, ok := tm.taskManager.Get(taskID)
resultCh := make(chan Result, 1)
if !ok {
manager = NewTaskManager(tm, resultCh)
manager = NewTaskManager(tm, taskID, resultCh)
tm.taskManager.Set(taskID, manager)
} else {
manager.resultCh = resultCh
@@ -201,6 +199,6 @@ func (tm *DAG) ProcessTask(ctx context.Context, payload []byte) Result {
if ok && node.Type != Page && payload == nil {
return Result{Error: fmt.Errorf("payload is required for node %s", firstNode), Ctx: ctx}
}
manager.ProcessTask(ctx, taskID, firstNode, payload)
manager.ProcessTask(ctx, firstNode, payload)
return <-resultCh
}

View File

@@ -21,7 +21,6 @@ type TaskState struct {
type nodeResult struct {
ctx context.Context
taskID string
nodeID string
result Result
}
@@ -30,6 +29,7 @@ type TaskManager struct {
taskStates map[string]*TaskState
currentNode string
dag *DAG
taskID string
mu sync.RWMutex
taskQueue chan *Task
resultQueue chan nodeResult
@@ -52,12 +52,13 @@ func NewTask(ctx context.Context, taskID, nodeID string, payload json.RawMessage
}
}
func NewTaskManager(dag *DAG, resultCh chan Result) *TaskManager {
func NewTaskManager(dag *DAG, taskID string, resultCh chan Result) *TaskManager {
tm := &TaskManager{
taskStates: make(map[string]*TaskState),
taskQueue: make(chan *Task, 100),
resultQueue: make(chan nodeResult, 100),
resultCh: resultCh,
taskID: taskID,
dag: dag,
}
go tm.Run()
@@ -65,11 +66,11 @@ func NewTaskManager(dag *DAG, resultCh chan Result) *TaskManager {
return tm
}
func (tm *TaskManager) ProcessTask(ctx context.Context, taskID, startNode string, payload json.RawMessage) {
func (tm *TaskManager) ProcessTask(ctx context.Context, startNode string, payload json.RawMessage) {
tm.mu.Lock()
tm.taskStates[startNode] = newTaskState(startNode)
tm.mu.Unlock()
tm.taskQueue <- NewTask(ctx, taskID, startNode, payload)
tm.taskQueue <- NewTask(ctx, tm.taskID, startNode, payload)
}
func newTaskState(nodeID string) *TaskState {
@@ -120,7 +121,7 @@ func (tm *TaskManager) processNode(exec *Task) {
tm.resultCh <- result
return
}
tm.resultQueue <- nodeResult{taskID: exec.taskID, nodeID: exec.nodeID, result: result, ctx: exec.ctx}
tm.resultQueue <- nodeResult{nodeID: exec.nodeID, result: result, ctx: exec.ctx}
}
func (tm *TaskManager) WaitForResult() {
@@ -151,7 +152,7 @@ func (tm *TaskManager) onNodeCompleted(nodeResult nodeResult) {
tm.mu.Unlock()
if tm.areAllTargetNodesCompleted(parentNode.ID) && allTargetNodesDone {
tm.aggregateResults(parentNode.ID, nodeResult.taskID)
tm.aggregateResults(parentNode.ID)
}
}
}
@@ -163,7 +164,7 @@ func (tm *TaskManager) onNodeCompleted(nodeResult nodeResult) {
tm.taskStates[edge.To.ID] = newTaskState(edge.To.ID)
}
tm.mu.Unlock()
tm.taskQueue <- NewTask(nodeResult.ctx, nodeResult.taskID, edge.To.ID, nodeResult.result.Data)
tm.taskQueue <- NewTask(nodeResult.ctx, tm.taskID, edge.To.ID, nodeResult.result.Data)
}
}
@@ -183,7 +184,7 @@ func (tm *TaskManager) areAllTargetNodesCompleted(parentNodeID string) bool {
return true
}
func (tm *TaskManager) aggregateResults(parentNode string, taskID string) {
func (tm *TaskManager) aggregateResults(parentNode string) {
tm.mu.Lock()
defer tm.mu.Unlock()
state := tm.taskStates[parentNode]
@@ -201,10 +202,10 @@ func (tm *TaskManager) aggregateResults(parentNode string, taskID string) {
state.Result = state.targetResults.Values()[0]
}
tm.resultCh <- state.Result
tm.processFinalResult(taskID, state)
tm.processFinalResult(state)
}
func (tm *TaskManager) processFinalResult(taskID string, state *TaskState) {
func (tm *TaskManager) processFinalResult(state *TaskState) {
state.targetResults.Clear()
tm.dag.finalResult(taskID, state.Result)
tm.dag.finalResult(tm.taskID, state.Result)
}

View File

@@ -196,7 +196,7 @@ func processNode(w http.ResponseWriter, r *http.Request, task *Task, tm *TaskMan
log.Printf("No ConditionStatus found, following edge to next Operation: %s", nextNodeID)
} else {
log.Printf("Task %s completed. Final result: %s", task.ID, task.FinalResult)
fmt.Fprintf(w, "<html><body><h1>Process Completed</h1><p>%s</p></body></html>", task.FinalResult)
fmt.Fprintf(w, "<html><body><h1>Function Completed</h1><p>%s</p></body></html>", task.FinalResult)
return
}
}
@@ -243,7 +243,7 @@ func submitHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) {
nextNode, exists := tm.GetNextNode(task)
if !exists {
log.Printf("Task %s completed. Final result: %s", task.ID, task.FinalResult)
fmt.Fprintf(w, "<html><body><h1>Process Completed</h1><p>%s</p></body></html>", task.FinalResult)
fmt.Fprintf(w, "<html><body><h1>Function Completed</h1><p>%s</p></body></html>", task.FinalResult)
return
}
switch nextNode := nextNode.(type) {

View File

@@ -94,14 +94,14 @@ func notify(taskID string, result v2.Result) {
func main() {
dag := v2.NewDAG(notify)
dag.AddNode(v2.Page, "Form", Form)
dag.AddNode(v2.Process, "NodeA", NodeA)
dag.AddNode(v2.Process, "NodeB", NodeB)
dag.AddNode(v2.Process, "NodeC", NodeC)
dag.AddNode(v2.Function, "NodeA", NodeA)
dag.AddNode(v2.Function, "NodeB", NodeB)
dag.AddNode(v2.Function, "NodeC", NodeC)
dag.AddNode(v2.Page, "Result", Result)
// dag.AddEdge("Form", "NodeA")
dag.AddEdge("NodeA", "NodeB")
dag.AddEdge("NodeB", "NodeC")
// dag.AddEdge("NodeC", "Result")
dag.AddEdge(v2.Simple, "Form", "NodeA")
dag.AddEdge(v2.Simple, "NodeA", "NodeB")
dag.AddEdge(v2.Simple, "NodeB", "NodeC")
dag.AddEdge(v2.Simple, "NodeC", "Result")
if dag.Error != nil {
panic(dag.Error)
}