mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-23 18:33:14 +08:00
fix: ui for DOT
This commit is contained in:
@@ -277,7 +277,7 @@ func (tm *DAG) AddCondition(fromNode FromNode, conditions map[When]Then) *DAG {
|
|||||||
return tm
|
return tm
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *DAG) AddLoop(label, from string, targets ...string) *DAG {
|
func (tm *DAG) AddIterator(label, from string, targets ...string) *DAG {
|
||||||
tm.addEdge(Iterator, label, from, targets...)
|
tm.addEdge(Iterator, label, from, targets...)
|
||||||
return tm
|
return tm
|
||||||
}
|
}
|
||||||
|
67
dag/ui.go
67
dag/ui.go
@@ -10,7 +10,6 @@ import (
|
|||||||
func (tm *DAG) PrintGraph() {
|
func (tm *DAG) PrintGraph() {
|
||||||
tm.mu.RLock()
|
tm.mu.RLock()
|
||||||
defer tm.mu.RUnlock()
|
defer tm.mu.RUnlock()
|
||||||
|
|
||||||
fmt.Println("DAG Graph structure:")
|
fmt.Println("DAG Graph structure:")
|
||||||
for _, node := range tm.nodes {
|
for _, node := range tm.nodes {
|
||||||
fmt.Printf("Node: %s (%s) -> ", node.Name, node.Key)
|
fmt.Printf("Node: %s (%s) -> ", node.Name, node.Key)
|
||||||
@@ -23,13 +22,13 @@ func (tm *DAG) PrintGraph() {
|
|||||||
}
|
}
|
||||||
fmt.Println(strings.Join(c, ", "))
|
fmt.Println(strings.Join(c, ", "))
|
||||||
}
|
}
|
||||||
var c []string
|
var edges []string
|
||||||
for _, edge := range node.Edges {
|
for _, edge := range node.Edges {
|
||||||
for _, target := range edge.To {
|
for _, target := range edge.To {
|
||||||
c = append(c, fmt.Sprintf("%s (%s)", target.Name, target.Key))
|
edges = append(edges, fmt.Sprintf("%s (%s)", target.Name, target.Key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fmt.Println(strings.Join(c, ", "))
|
fmt.Println(strings.Join(edges, ", "))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,26 +128,20 @@ func (tm *DAG) saveImage(fileName string, arg string) error {
|
|||||||
func (tm *DAG) ExportDOT() string {
|
func (tm *DAG) ExportDOT() string {
|
||||||
var sb strings.Builder
|
var sb strings.Builder
|
||||||
sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name))
|
sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name))
|
||||||
sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=lightgray, fontname=\"Helvetica\"];\n")
|
sb.WriteString(" bgcolor=\"lightyellow\";\n")
|
||||||
|
sb.WriteString(fmt.Sprintf(" label=\"%s\";\n", tm.name))
|
||||||
|
sb.WriteString(" labelloc=\"t\";\n")
|
||||||
|
sb.WriteString(" fontsize=20;\n")
|
||||||
|
sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=\"lightgray\", fontname=\"Arial\", margin=\"0.2,0.1\"];\n")
|
||||||
|
sb.WriteString(" edge [fontname=\"Arial\", fontsize=12, arrowsize=0.8];\n")
|
||||||
|
sb.WriteString(" size=\"10,10\";\n")
|
||||||
|
sb.WriteString(" ratio=\"fill\";\n")
|
||||||
sortedNodes := tm.TopologicalSort()
|
sortedNodes := tm.TopologicalSort()
|
||||||
|
|
||||||
// Export nodes
|
|
||||||
for _, nodeKey := range sortedNodes {
|
for _, nodeKey := range sortedNodes {
|
||||||
node := tm.nodes[nodeKey]
|
node := tm.nodes[nodeKey]
|
||||||
nodeColor := "lightblue"
|
nodeColor := "lightblue"
|
||||||
sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor))
|
sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor))
|
||||||
|
|
||||||
// If the node has a sub-DAG, export it
|
|
||||||
if subDAG, ok := node.processor.(*DAG); ok && subDAG != nil {
|
|
||||||
subDAGName := fmt.Sprintf("%s_sub", node.Key)
|
|
||||||
sb.WriteString(fmt.Sprintf(" subgraph \"%s\" {\n", subDAGName))
|
|
||||||
sb.WriteString(fmt.Sprintf(" label=\"%s\"\n", node.Name))
|
|
||||||
sb.WriteString(subDAG.ExportDOT()) // Export the sub-DAG
|
|
||||||
sb.WriteString(" }\n")
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Export edges
|
|
||||||
for _, nodeKey := range sortedNodes {
|
for _, nodeKey := range sortedNodes {
|
||||||
node := tm.nodes[nodeKey]
|
node := tm.nodes[nodeKey]
|
||||||
for _, edge := range node.Edges {
|
for _, edge := range node.Edges {
|
||||||
@@ -161,16 +154,39 @@ func (tm *DAG) ExportDOT() string {
|
|||||||
}
|
}
|
||||||
edgeColor := "black"
|
edgeColor := "black"
|
||||||
for _, to := range edge.To {
|
for _, to := range edge.To {
|
||||||
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle))
|
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s, fontsize=10, arrowsize=0.6];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle conditional edges
|
|
||||||
for fromNodeKey, conditions := range tm.conditions {
|
for fromNodeKey, conditions := range tm.conditions {
|
||||||
for when, then := range conditions {
|
for when, then := range conditions {
|
||||||
if toNode, ok := tm.nodes[string(then)]; ok {
|
if toNode, ok := tm.nodes[string(then)]; ok {
|
||||||
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted];\n", fromNodeKey, toNode.Key, when))
|
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted, fontsize=10, arrowsize=0.6];\n", fromNodeKey, toNode.Key, when))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, nodeKey := range sortedNodes {
|
||||||
|
node := tm.nodes[nodeKey]
|
||||||
|
if node.processor != nil {
|
||||||
|
subDAG, _ := isDAGNode(node)
|
||||||
|
if subDAG != nil {
|
||||||
|
sb.WriteString(fmt.Sprintf(" subgraph \"cluster_%s\" {\n", subDAG.name))
|
||||||
|
sb.WriteString(" label=\"Sub DAG\";\n")
|
||||||
|
sb.WriteString(" style=dashed;\n")
|
||||||
|
sb.WriteString(" bgcolor=\"lightgray\";\n")
|
||||||
|
sb.WriteString(" node [shape=rectangle, style=\"filled\", fillcolor=\"lightblue\", fontname=\"Arial\", margin=\"0.2,0.1\"];\n")
|
||||||
|
for subNodeKey, subNode := range subDAG.nodes {
|
||||||
|
sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\"];\n", subNodeKey, subNode.Name))
|
||||||
|
}
|
||||||
|
for subNodeKey, subNode := range subDAG.nodes {
|
||||||
|
for _, edge := range subNode.Edges {
|
||||||
|
for _, to := range edge.To {
|
||||||
|
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"black\", style=solid, arrowsize=0.6];\n", subNodeKey, to.Key, edge.Label))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.WriteString(" }\n")
|
||||||
|
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"Sub DAG Entry\", color=\"black\", style=solid, arrowsize=0.6];\n", node.Key, subDAG.startNode))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -178,6 +194,13 @@ func (tm *DAG) ExportDOT() string {
|
|||||||
return sb.String()
|
return sb.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tm *DAG) getSubDAG(nodeKey string) (*DAG, bool) {
|
||||||
|
if node, ok := tm.nodes[nodeKey]; ok {
|
||||||
|
return isDAGNode(node)
|
||||||
|
}
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
func (tm *DAG) TopologicalSort() []string {
|
func (tm *DAG) TopologicalSort() []string {
|
||||||
visited := make(map[string]bool)
|
visited := make(map[string]bool)
|
||||||
stack := []string{}
|
stack := []string{}
|
||||||
|
@@ -4,59 +4,47 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/oarkflow/mq/consts"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/oarkflow/mq/consts"
|
|
||||||
"github.com/oarkflow/mq/examples/tasks"
|
"github.com/oarkflow/mq/examples/tasks"
|
||||||
"github.com/oarkflow/mq/services"
|
|
||||||
|
|
||||||
"github.com/oarkflow/mq"
|
"github.com/oarkflow/mq"
|
||||||
"github.com/oarkflow/mq/dag"
|
"github.com/oarkflow/mq/dag"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
sync()
|
d := dag.NewDAG(
|
||||||
async()
|
"Sample DAG",
|
||||||
}
|
"sample-dag",
|
||||||
|
mq.WithSyncMode(true),
|
||||||
|
mq.WithNotifyResponse(tasks.NotifyResponse),
|
||||||
|
)
|
||||||
|
subDag := dag.NewDAG(
|
||||||
|
"Sub DAG",
|
||||||
|
"D",
|
||||||
|
mq.WithNotifyResponse(tasks.NotifySubDAGResponse),
|
||||||
|
)
|
||||||
|
subDag.AddNode("I", "I", &tasks.Node4{}, true)
|
||||||
|
subDag.AddNode("F", "F", &tasks.Node6{})
|
||||||
|
subDag.AddNode("G", "G", &tasks.Node7{})
|
||||||
|
subDag.AddNode("H", "H", &tasks.Node8{})
|
||||||
|
subDag.AddEdge("Label 2", "I", "F")
|
||||||
|
subDag.AddEdge("Label 4", "F", "G", "H")
|
||||||
|
|
||||||
func setup(f *dag.DAG) {
|
d.AddNode("A", "A", &tasks.Node1{}, true)
|
||||||
f.
|
d.AddNode("B", "B", &tasks.Node2{})
|
||||||
AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: services.Operation{Type: "process"}}).
|
d.AddNode("C", "C", &tasks.Node3{})
|
||||||
AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: services.Operation{Type: "process"}}).
|
d.AddDAGNode("D", "D", subDag)
|
||||||
AddNode("Get Input", "get:input", &tasks.GetData{Operation: services.Operation{Type: "input"}}, true).
|
d.AddNode("E", "E", &tasks.Node5{})
|
||||||
AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: services.Operation{Type: "loop"}}).
|
d.AddIterator("Send each item", "A", "B")
|
||||||
AddNode("Condition", "condition", &tasks.Condition{Operation: services.Operation{Type: "condition"}}).
|
d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"})
|
||||||
AddNode("Store data", "store:data", &tasks.StoreData{Operation: services.Operation{Type: "process"}}).
|
d.AddEdge("Label 1", "B", "C")
|
||||||
AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: services.Operation{Type: "process"}}).
|
|
||||||
AddNode("Notification", "notification", &tasks.InAppNotification{Operation: services.Operation{Type: "process"}}).
|
|
||||||
AddNode("Data Branch", "data-branch", &tasks.DataBranchHandler{Operation: services.Operation{Type: "condition"}}).
|
|
||||||
AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "store:data"}).
|
|
||||||
AddEdge("Get input to loop", "get:input", "loop").
|
|
||||||
AddLoop("Loop to prepare email", "loop", "prepare:email").
|
|
||||||
AddEdge("Prepare Email to condition", "prepare:email", "condition").
|
|
||||||
AddEdge("Store Data to send sms and notification", "store:data", "send:sms", "notification")
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendData(f *dag.DAG) {
|
// Classify edges
|
||||||
data := []map[string]any{
|
// d.ClassifyEdges()
|
||||||
{"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"},
|
fmt.Println(d.SaveSVG("dag.svg"))
|
||||||
}
|
|
||||||
bt, _ := json.Marshal(data)
|
|
||||||
result := f.Process(context.Background(), bt)
|
|
||||||
fmt.Println(string(result.Payload))
|
|
||||||
}
|
|
||||||
|
|
||||||
func sync() {
|
|
||||||
f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true), mq.WithNotifyResponse(tasks.NotifyResponse))
|
|
||||||
setup(f)
|
|
||||||
sendData(f)
|
|
||||||
fmt.Println(f.SaveSVG("dag.svg"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func async() {
|
|
||||||
f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithNotifyResponse(tasks.NotifyResponse))
|
|
||||||
setup(f)
|
|
||||||
|
|
||||||
requestHandler := func(requestType string) func(w http.ResponseWriter, r *http.Request) {
|
requestHandler := func(requestType string) func(w http.ResponseWriter, r *http.Request) {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -82,7 +70,7 @@ func async() {
|
|||||||
ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
|
ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
|
||||||
}
|
}
|
||||||
// ctx = context.WithValue(ctx, "initial_node", "E")
|
// ctx = context.WithValue(ctx, "initial_node", "E")
|
||||||
rs := f.Process(ctx, payload)
|
rs := d.Process(ctx, payload)
|
||||||
if rs.Error != nil {
|
if rs.Error != nil {
|
||||||
http.Error(w, fmt.Sprintf("[DAG Error] - %v", rs.Error), http.StatusInternalServerError)
|
http.Error(w, fmt.Sprintf("[DAG Error] - %v", rs.Error), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@@ -97,22 +85,22 @@ func async() {
|
|||||||
http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
|
http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
|
||||||
id := request.PathValue("id")
|
id := request.PathValue("id")
|
||||||
if id != "" {
|
if id != "" {
|
||||||
f.PauseConsumer(request.Context(), id)
|
d.PauseConsumer(request.Context(), id)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
http.HandleFunc("/resume-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
|
http.HandleFunc("/resume-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
|
||||||
id := request.PathValue("id")
|
id := request.PathValue("id")
|
||||||
if id != "" {
|
if id != "" {
|
||||||
f.ResumeConsumer(request.Context(), id)
|
d.ResumeConsumer(request.Context(), id)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
http.HandleFunc("/pause", func(writer http.ResponseWriter, request *http.Request) {
|
http.HandleFunc("/pause", func(writer http.ResponseWriter, request *http.Request) {
|
||||||
f.Pause(request.Context())
|
d.Pause(request.Context())
|
||||||
})
|
})
|
||||||
http.HandleFunc("/resume", func(writer http.ResponseWriter, request *http.Request) {
|
http.HandleFunc("/resume", func(writer http.ResponseWriter, request *http.Request) {
|
||||||
f.Resume(request.Context())
|
d.Resume(request.Context())
|
||||||
})
|
})
|
||||||
err := f.Start(context.TODO(), ":8083")
|
err := d.Start(context.TODO(), ":8083")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@@ -42,7 +42,7 @@ func main() {
|
|||||||
d.AddNode("C", "C", &tasks.Node3{})
|
d.AddNode("C", "C", &tasks.Node3{})
|
||||||
d.AddDAGNode("D", "D", subDag)
|
d.AddDAGNode("D", "D", subDag)
|
||||||
d.AddNode("E", "E", &tasks.Node5{})
|
d.AddNode("E", "E", &tasks.Node5{})
|
||||||
d.AddLoop("Send each item", "A", "B")
|
d.AddIterator("Send each item", "A", "B")
|
||||||
d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"})
|
d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"})
|
||||||
d.AddEdge("Label 1", "B", "C")
|
d.AddEdge("Label 1", "B", "C")
|
||||||
// Classify edges
|
// Classify edges
|
||||||
|
Reference in New Issue
Block a user