From ce3a063bb888333793f853ed33744e2414b67115 Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 18 Nov 2024 15:31:45 +0545 Subject: [PATCH] feat: add task completion --- dag/v2/dag.go | 78 ++++++++++++++++++++++++++++++++++++++++++++++---- examples/v2.go | 57 +++++++++++++++++++++++------------- 2 files changed, 109 insertions(+), 26 deletions(-) diff --git a/dag/v2/dag.go b/dag/v2/dag.go index fd5d1d0..caa7c29 100644 --- a/dag/v2/dag.go +++ b/dag/v2/dag.go @@ -28,7 +28,17 @@ type Result struct { Status TaskStatus } +type NodeType int + +func (c NodeType) IsValid() bool { return c >= Process && c <= Page } + +const ( + Process NodeType = iota + Page +) + type Node struct { + Type NodeType ID string Handler func(ctx context.Context, payload json.RawMessage) Result Edges []Edge @@ -44,9 +54,9 @@ const ( ) type Edge struct { - From *Node - To *Node - EdgeType EdgeType + From *Node + To *Node + Type EdgeType } type DAG struct { @@ -55,6 +65,7 @@ type DAG struct { finalResult func(taskID string, result Result) mu sync.Mutex Error error + startNode string } func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG { @@ -65,11 +76,54 @@ func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG { } } -func (tm *DAG) AddNode(nodeID string, handler func(ctx context.Context, payload json.RawMessage) Result) *DAG { +func (tm *DAG) Validate(ctx context.Context) (string, error) { + return tm.parseInitialNode(ctx) +} + +func (tm *DAG) parseInitialNode(ctx context.Context) (string, error) { + val := ctx.Value("initial_node") + initialNode, ok := val.(string) + if ok { + return initialNode, nil + } + if tm.startNode == "" { + firstNode := tm.findStartNode() + if firstNode != nil { + tm.startNode = firstNode.ID + } + } + + if tm.startNode == "" { + return "", fmt.Errorf("initial node not found") + } + return tm.startNode, nil +} + +func (tm *DAG) findStartNode() *Node { + incomingEdges := make(map[string]bool) + connectedNodes := make(map[string]bool) + for _, node := range tm.nodes.AsMap() { + for _, edge := range node.Edges { + if edge.Type.IsValid() { + connectedNodes[node.ID] = true + connectedNodes[edge.To.ID] = true + incomingEdges[edge.To.ID] = true + } + } + } + for nodeID, node := range tm.nodes.AsMap() { + if !incomingEdges[nodeID] && connectedNodes[nodeID] { + return node + } + } + return nil +} + +func (tm *DAG) AddNode(nodeType NodeType, nodeID string, handler func(ctx context.Context, payload json.RawMessage) Result) *DAG { if tm.Error != nil { return tm } - tm.nodes.Set(nodeID, &Node{ID: nodeID, Handler: handler}) + tm.nodes.Set(nodeID, &Node{ID: nodeID, Handler: handler, Type: nodeType}) return tm } @@ -84,7 +138,7 @@ func (tm *DAG) AddEdge(from string, targets ...string) *DAG { } for _, target := range targets { if targetNode, ok := tm.nodes.Get(target); ok { - edge := Edge{From: node, To: targetNode, EdgeType: Simple} + edge := Edge{From: node, To: targetNode, Type: Simple} node.Edges = append(node.Edges, edge) } } @@ -153,6 +207,18 @@ func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) { } func (tm *DAG) Start(addr string) { + http.HandleFunc("/", func(w http.ResponseWriter, request *http.Request) { + firstNode, err := tm.Validate(request.Context()) + if err != nil { + http.Error(w, `{"message": "taskID is missing"}`, http.StatusBadRequest) + return + } + node, _ := tm.nodes.Get(firstNode) + if node.Type == Page { + + } + w.Write([]byte(firstNode)) + }) http.HandleFunc("/form", tm.formHandler) http.HandleFunc("/result", tm.resultHandler) http.HandleFunc("/task-result", tm.taskStatusHandler) diff --git a/examples/v2.go b/examples/v2.go index f2ae51c..7fea36c 100644 --- a/examples/v2.go +++ b/examples/v2.go @@ -11,20 +11,37 @@ import ( ) func Form(ctx context.Context, payload json.RawMessage) v2.Result { - var data map[string]any - if err := json.Unmarshal(payload, &data); err != nil { - return v2.Result{Error: err, Status: v2.StatusFailed} - } - if templateFile, ok := data["html_content"].(string); ok { - parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) - rs, err := parser.ParseTemplate(templateFile, data) - if err != nil { - return v2.Result{Error: err, Status: v2.StatusFailed} - } - ctx = context.WithValue(ctx, "Content-Type", "text/html; charset/utf-8") - return v2.Result{Data: []byte(rs), Status: v2.StatusCompleted, Ctx: ctx} - } - return v2.Result{Data: payload, Status: v2.StatusCompleted} + template := []byte(` + + + + + + User Data Form + + +

Enter Your Information

+
+
+

+ +
+

+ +
+

+ + +
+ + + +`) + return v2.Result{Data: template, Status: v2.StatusCompleted} } func NodeA(ctx context.Context, payload json.RawMessage) v2.Result { @@ -80,12 +97,12 @@ func notify(taskID string, result v2.Result) { func main() { dag := v2.NewDAG(notify) - // dag.AddNode("Form", Form) - dag.AddNode("NodeA", NodeA) - dag.AddNode("NodeB", NodeB) - dag.AddNode("NodeC", NodeC) - dag.AddNode("Result", Result) - // dag.AddEdge("Form", "NodeA") + dag.AddNode(v2.Page, "Form", Form) + dag.AddNode(v2.Process, "NodeA", NodeA) + dag.AddNode(v2.Process, "NodeB", NodeB) + dag.AddNode(v2.Process, "NodeC", NodeC) + dag.AddNode(v2.Page, "Result", Result) + dag.AddEdge("Form", "NodeA") dag.AddEdge("NodeA", "NodeB") dag.AddEdge("NodeB", "NodeC") dag.AddEdge("NodeC", "Result")