mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-05 16:06:55 +08:00
feat: add scheduler
This commit is contained in:
25
dag/ui.go
25
dag/ui.go
@@ -131,11 +131,24 @@ func (tm *DAG) ExportDOT() string {
|
|||||||
sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name))
|
sb.WriteString(fmt.Sprintf("digraph \"%s\" {\n", tm.name))
|
||||||
sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=lightgray, fontname=\"Helvetica\"];\n")
|
sb.WriteString(" node [shape=box, style=\"rounded,filled\", fillcolor=lightgray, fontname=\"Helvetica\"];\n")
|
||||||
sortedNodes := tm.TopologicalSort()
|
sortedNodes := tm.TopologicalSort()
|
||||||
|
|
||||||
|
// Export nodes
|
||||||
for _, nodeKey := range sortedNodes {
|
for _, nodeKey := range sortedNodes {
|
||||||
node := tm.nodes[nodeKey]
|
node := tm.nodes[nodeKey]
|
||||||
nodeColor := "lightblue"
|
nodeColor := "lightblue"
|
||||||
sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor))
|
sb.WriteString(fmt.Sprintf(" \"%s\" [label=\"%s\", fillcolor=\"%s\"];\n", node.Key, node.Name, nodeColor))
|
||||||
|
|
||||||
|
// If the node has a sub-DAG, export it
|
||||||
|
if subDAG, ok := node.processor.(*DAG); ok && subDAG != nil {
|
||||||
|
subDAGName := fmt.Sprintf("%s_sub", node.Key)
|
||||||
|
sb.WriteString(fmt.Sprintf(" subgraph \"%s\" {\n", subDAGName))
|
||||||
|
sb.WriteString(fmt.Sprintf(" label=\"%s\"\n", node.Name))
|
||||||
|
sb.WriteString(subDAG.ExportDOT()) // Export the sub-DAG
|
||||||
|
sb.WriteString(" }\n")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Export edges
|
||||||
for _, nodeKey := range sortedNodes {
|
for _, nodeKey := range sortedNodes {
|
||||||
node := tm.nodes[nodeKey]
|
node := tm.nodes[nodeKey]
|
||||||
for _, edge := range node.Edges {
|
for _, edge := range node.Edges {
|
||||||
@@ -146,23 +159,17 @@ func (tm *DAG) ExportDOT() string {
|
|||||||
default:
|
default:
|
||||||
edgeStyle = "solid"
|
edgeStyle = "solid"
|
||||||
}
|
}
|
||||||
|
edgeColor := "black"
|
||||||
for _, to := range edge.To {
|
for _, to := range edge.To {
|
||||||
edgeColor := "black"
|
|
||||||
if edge.Label == "Iterate" {
|
|
||||||
edgeColor = "blue"
|
|
||||||
} else if edge.Label == "PASS" {
|
|
||||||
edgeColor = "green"
|
|
||||||
} else if edge.Label == "FAIL" {
|
|
||||||
edgeColor = "red"
|
|
||||||
}
|
|
||||||
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle))
|
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"%s\", style=%s];\n", node.Key, to.Key, edge.Label, edgeColor, edgeStyle))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle conditional edges
|
||||||
for fromNodeKey, conditions := range tm.conditions {
|
for fromNodeKey, conditions := range tm.conditions {
|
||||||
for when, then := range conditions {
|
for when, then := range conditions {
|
||||||
if toNode, ok := tm.nodes[string(then)]; ok {
|
if toNode, ok := tm.nodes[string(then)]; ok {
|
||||||
|
|
||||||
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted];\n", fromNodeKey, toNode.Key, when))
|
sb.WriteString(fmt.Sprintf(" \"%s\" -> \"%s\" [label=\"%s\", color=\"purple\", style=dotted];\n", fromNodeKey, toNode.Key, when))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -51,6 +51,7 @@ func sync() {
|
|||||||
f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true), mq.WithNotifyResponse(tasks.NotifyResponse))
|
f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true), mq.WithNotifyResponse(tasks.NotifyResponse))
|
||||||
setup(f)
|
setup(f)
|
||||||
sendData(f)
|
sendData(f)
|
||||||
|
fmt.Println(f.SaveSVG("dag.svg"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func async() {
|
func async() {
|
||||||
|
Reference in New Issue
Block a user