From f231610c3a9315ca043d647228140c730361a7e4 Mon Sep 17 00:00:00 2001 From: Oarkflow Date: Wed, 16 Oct 2024 18:34:57 +0545 Subject: [PATCH] fix: ui for DOT --- dag/dag.go | 2 +- dag/ui.go | 67 ++++++++++++++++++++++++------------- examples/dag.go | 82 ++++++++++++++++++++-------------------------- examples/subdag.go | 2 +- 4 files changed, 82 insertions(+), 71 deletions(-) diff --git a/dag/dag.go b/dag/dag.go index dc985d4..f0976da 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -277,7 +277,7 @@ func (tm *DAG) AddCondition(fromNode FromNode, conditions map[When]Then) *DAG { return tm } -func (tm *DAG) AddLoop(label, from string, targets ...string) *DAG { +func (tm *DAG) AddIterator(label, from string, targets ...string) *DAG { tm.addEdge(Iterator, label, from, targets...) return tm } diff --git a/dag/ui.go b/dag/ui.go index e04ef21..2491bdb 100644 --- a/dag/ui.go +++ b/dag/ui.go @@ -10,7 +10,6 @@ import ( func (tm *DAG) PrintGraph() { tm.mu.RLock() defer tm.mu.RUnlock() - fmt.Println("DAG Graph structure:") for _, node := range tm.nodes { fmt.Printf("Node: %s (%s) -> ", node.Name, node.Key) @@ -23,13 +22,13 @@ func (tm *DAG) PrintGraph() { } fmt.Println(strings.Join(c, ", ")) } - var c []string + var edges []string for _, edge := range node.Edges { for _, target := range edge.To { - c = append(c, fmt.Sprintf("%s (%s)", target.Name, target.Key)) + edges = append(edges, fmt.Sprintf("%s (%s)", target.Name, target.Key)) } } - fmt.Println(strings.Join(c, ", ")) + fmt.Println(strings.Join(edges, ", ")) } } @@ -129,26 +128,20 @@ func (tm *DAG) saveImage(fileName string, arg string) error { func (tm *DAG) ExportDOT() string { var sb strings.Builder sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name)) - sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=lightgray, fontname=\"Helvetica\"];\n") + sb.WriteString(" bgcolor=\"lightyellow\";\n") + sb.WriteString(fmt.Sprintf(" label=\"%s\";\n", tm.name)) + sb.WriteString(" labelloc=\"t\";\n") + sb.WriteString(" fontsize=20;\n") + sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=\"lightgray\", fontname=\"Arial\", margin=\"0.2,0.1\"];\n") + sb.WriteString(" edge [fontname=\"Arial\", fontsize=12, arrowsize=0.8];\n") + sb.WriteString(" size=\"10,10\";\n") + sb.WriteString(" ratio=\"fill\";\n") sortedNodes := tm.TopologicalSort() - - // Export nodes for _, nodeKey := range sortedNodes { node := tm.nodes[nodeKey] nodeColor := "lightblue" sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor)) - - // If the node has a sub-DAG, export it - if subDAG, ok := node.processor.(*DAG); ok && subDAG != nil { - subDAGName := fmt.Sprintf("%s_sub", node.Key) - sb.WriteString(fmt.Sprintf(" subgraph \"%s\" {\n", subDAGName)) - sb.WriteString(fmt.Sprintf(" label=\"%s\"\n", node.Name)) - sb.WriteString(subDAG.ExportDOT()) // Export the sub-DAG - sb.WriteString(" }\n") - } } - - // Export edges for _, nodeKey := range sortedNodes { node := tm.nodes[nodeKey] for _, edge := range node.Edges { @@ -161,16 +154,39 @@ func (tm *DAG) ExportDOT() string { } edgeColor := "black" for _, to := range edge.To { - sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle)) + sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s, fontsize=10, arrowsize=0.6];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle)) } } } - - // Handle conditional edges for fromNodeKey, conditions := range tm.conditions { for when, then := range conditions { if toNode, ok := tm.nodes[string(then)]; ok { - sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted];\n", fromNodeKey, toNode.Key, when)) + sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted, fontsize=10, arrowsize=0.6];\n", fromNodeKey, toNode.Key, when)) + } + } + } + for _, nodeKey := range sortedNodes { + node := tm.nodes[nodeKey] + if node.processor != nil { + subDAG, _ := isDAGNode(node) + if subDAG != nil { + sb.WriteString(fmt.Sprintf(" subgraph \"cluster_%s\" {\n", subDAG.name)) + sb.WriteString(" label=\"Sub DAG\";\n") + sb.WriteString(" style=dashed;\n") + sb.WriteString(" bgcolor=\"lightgray\";\n") + sb.WriteString(" node [shape=rectangle, style=\"filled\", fillcolor=\"lightblue\", fontname=\"Arial\", margin=\"0.2,0.1\"];\n") + for subNodeKey, subNode := range subDAG.nodes { + sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\"];\n", subNodeKey, subNode.Name)) + } + for subNodeKey, subNode := range subDAG.nodes { + for _, edge := range subNode.Edges { + for _, to := range edge.To { + sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"black\", style=solid, arrowsize=0.6];\n", subNodeKey, to.Key, edge.Label)) + } + } + } + sb.WriteString(" }\n") + sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"Sub DAG Entry\", color=\"black\", style=solid, arrowsize=0.6];\n", node.Key, subDAG.startNode)) } } } @@ -178,6 +194,13 @@ func (tm *DAG) ExportDOT() string { return sb.String() } +func (tm *DAG) getSubDAG(nodeKey string) (*DAG, bool) { + if node, ok := tm.nodes[nodeKey]; ok { + return isDAGNode(node) + } + return nil, false +} + func (tm *DAG) TopologicalSort() []string { visited := make(map[string]bool) stack := []string{} diff --git a/examples/dag.go b/examples/dag.go index 05ee749..dbca585 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -4,59 +4,47 @@ import ( "context" "encoding/json" "fmt" + "github.com/oarkflow/mq/consts" "io" "net/http" - "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/examples/tasks" - "github.com/oarkflow/mq/services" "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" ) func main() { - sync() - async() -} + d := dag.NewDAG( + "Sample DAG", + "sample-dag", + mq.WithSyncMode(true), + mq.WithNotifyResponse(tasks.NotifyResponse), + ) + subDag := dag.NewDAG( + "Sub DAG", + "D", + mq.WithNotifyResponse(tasks.NotifySubDAGResponse), + ) + subDag.AddNode("I", "I", &tasks.Node4{}, true) + subDag.AddNode("F", "F", &tasks.Node6{}) + subDag.AddNode("G", "G", &tasks.Node7{}) + subDag.AddNode("H", "H", &tasks.Node8{}) + subDag.AddEdge("Label 2", "I", "F") + subDag.AddEdge("Label 4", "F", "G", "H") -func setup(f *dag.DAG) { - f. - AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: services.Operation{Type: "process"}}). - AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: services.Operation{Type: "process"}}). - AddNode("Get Input", "get:input", &tasks.GetData{Operation: services.Operation{Type: "input"}}, true). - AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: services.Operation{Type: "loop"}}). - AddNode("Condition", "condition", &tasks.Condition{Operation: services.Operation{Type: "condition"}}). - AddNode("Store data", "store:data", &tasks.StoreData{Operation: services.Operation{Type: "process"}}). - AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: services.Operation{Type: "process"}}). - AddNode("Notification", "notification", &tasks.InAppNotification{Operation: services.Operation{Type: "process"}}). - AddNode("Data Branch", "data-branch", &tasks.DataBranchHandler{Operation: services.Operation{Type: "condition"}}). - AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "store:data"}). - AddEdge("Get input to loop", "get:input", "loop"). - AddLoop("Loop to prepare email", "loop", "prepare:email"). - AddEdge("Prepare Email to condition", "prepare:email", "condition"). - AddEdge("Store Data to send sms and notification", "store:data", "send:sms", "notification") -} + d.AddNode("A", "A", &tasks.Node1{}, true) + d.AddNode("B", "B", &tasks.Node2{}) + d.AddNode("C", "C", &tasks.Node3{}) + d.AddDAGNode("D", "D", subDag) + d.AddNode("E", "E", &tasks.Node5{}) + d.AddIterator("Send each item", "A", "B") + d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"}) + d.AddEdge("Label 1", "B", "C") -func sendData(f *dag.DAG) { - data := []map[string]any{ - {"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"}, - } - bt, _ := json.Marshal(data) - result := f.Process(context.Background(), bt) - fmt.Println(string(result.Payload)) -} - -func sync() { - f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true), mq.WithNotifyResponse(tasks.NotifyResponse)) - setup(f) - sendData(f) - fmt.Println(f.SaveSVG("dag.svg")) -} - -func async() { - f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithNotifyResponse(tasks.NotifyResponse)) - setup(f) + // Classify edges + // d.ClassifyEdges() + fmt.Println(d.SaveSVG("dag.svg")) requestHandler := func(requestType string) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { @@ -82,7 +70,7 @@ func async() { ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"}) } // ctx = context.WithValue(ctx, "initial_node", "E") - rs := f.Process(ctx, payload) + rs := d.Process(ctx, payload) if rs.Error != nil { http.Error(w, fmt.Sprintf("[DAG Error] - %v", rs.Error), http.StatusInternalServerError) return @@ -97,22 +85,22 @@ func async() { http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) { id := request.PathValue("id") if id != "" { - f.PauseConsumer(request.Context(), id) + d.PauseConsumer(request.Context(), id) } }) http.HandleFunc("/resume-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) { id := request.PathValue("id") if id != "" { - f.ResumeConsumer(request.Context(), id) + d.ResumeConsumer(request.Context(), id) } }) http.HandleFunc("/pause", func(writer http.ResponseWriter, request *http.Request) { - f.Pause(request.Context()) + d.Pause(request.Context()) }) http.HandleFunc("/resume", func(writer http.ResponseWriter, request *http.Request) { - f.Resume(request.Context()) + d.Resume(request.Context()) }) - err := f.Start(context.TODO(), ":8083") + err := d.Start(context.TODO(), ":8083") if err != nil { panic(err) } diff --git a/examples/subdag.go b/examples/subdag.go index ff372ad..bfdfcb2 100644 --- a/examples/subdag.go +++ b/examples/subdag.go @@ -42,7 +42,7 @@ func main() { d.AddNode("C", "C", &tasks.Node3{}) d.AddDAGNode("D", "D", subDag) d.AddNode("E", "E", &tasks.Node5{}) - d.AddLoop("Send each item", "A", "B") + d.AddIterator("Send each item", "A", "B") d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"}) d.AddEdge("Label 1", "B", "C") // Classify edges