mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-08 01:10:09 +08:00
update
This commit is contained in:
13
dag/dag.go
13
dag/dag.go
@@ -49,6 +49,7 @@ func (n *Node) SetTimeout(d time.Duration) {
|
||||
|
||||
type Edge struct {
|
||||
From *Node
|
||||
FromSource string
|
||||
To *Node
|
||||
Label string
|
||||
Type EdgeType
|
||||
@@ -310,6 +311,14 @@ func (tm *DAG) IsReady() bool {
|
||||
return isReady
|
||||
}
|
||||
|
||||
func (tm *DAG) resolveNode(nodeID string) (*Node, bool) {
|
||||
nodeParts := strings.Split(nodeID, ".")
|
||||
if len(nodeParts) > 1 {
|
||||
nodeID = nodeParts[0]
|
||||
}
|
||||
return tm.nodes.Get(nodeID)
|
||||
}
|
||||
|
||||
func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG {
|
||||
if tm.Error != nil {
|
||||
return tm
|
||||
@@ -317,14 +326,14 @@ func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string)
|
||||
if edgeType == Iterator {
|
||||
tm.iteratorNodes.Set(from, []Edge{})
|
||||
}
|
||||
node, ok := tm.nodes.Get(from)
|
||||
node, ok := tm.resolveNode(from)
|
||||
if !ok {
|
||||
tm.Error = fmt.Errorf("node not found %s", from)
|
||||
return tm
|
||||
}
|
||||
for _, target := range targets {
|
||||
if targetNode, ok := tm.nodes.Get(target); ok {
|
||||
edge := Edge{From: node, To: targetNode, Type: edgeType, Label: label}
|
||||
edge := Edge{From: node, To: targetNode, Type: edgeType, Label: label, FromSource: from}
|
||||
node.Edges = append(node.Edges, edge)
|
||||
if edgeType != Iterator {
|
||||
if edges, ok := tm.iteratorNodes.Get(node.ID); ok {
|
||||
|
170
dag/ui.go
170
dag/ui.go
@@ -10,7 +10,6 @@ import (
|
||||
func (tm *DAG) PrintGraph() {
|
||||
fmt.Println("DAG Graph structure:")
|
||||
tm.nodes.ForEach(func(_ string, node *Node) bool {
|
||||
fmt.Printf("Node: %s (%s) -> ", node.Label, node.ID)
|
||||
if conditions, ok := tm.conditions[node.ID]; ok {
|
||||
var c []string
|
||||
for when, then := range conditions {
|
||||
@@ -197,8 +196,10 @@ func (tm *DAG) renderCleanDAG(sb *strings.Builder, indent string) {
|
||||
sb.WriteString(fmt.Sprintf("%s// Main DAG Nodes\n", indent))
|
||||
for _, nodeID := range sortedNodes {
|
||||
node, _ := tm.nodes.Get(nodeID)
|
||||
if !tm.isSubDAGNode(node) {
|
||||
tm.renderCleanNode(sb, node, indent)
|
||||
}
|
||||
}
|
||||
sb.WriteString("\n")
|
||||
|
||||
// Step 2: Render sub-DAG clusters (internal structure only)
|
||||
@@ -333,7 +334,8 @@ func (tm *DAG) renderPrefixedNode(sb *strings.Builder, node *Node, prefixedID, i
|
||||
// renderCleanEdges renders edges for a node
|
||||
func (tm *DAG) renderCleanEdges(sb *strings.Builder, node *Node, indent string) {
|
||||
for _, edge := range node.Edges {
|
||||
sb.WriteString(fmt.Sprintf("%s\"%s\" -> \"%s\"", indent, node.ID, edge.To.ID))
|
||||
from := strings.Join(strings.Split(edge.FromSource, "."), "_")
|
||||
sb.WriteString(fmt.Sprintf("%s\"%s\" -> \"%s\"", indent, from, edge.To.ID))
|
||||
if edge.Label != "" {
|
||||
sb.WriteString(fmt.Sprintf(` [label="%s"]`, edge.Label))
|
||||
}
|
||||
@@ -367,43 +369,6 @@ func (tm *DAG) renderCleanConditionalEdges(sb *strings.Builder, indent string) {
|
||||
}
|
||||
}
|
||||
|
||||
// renderComprehensiveDAG provides a complete rendering solution that handles overlapping and connections properly
|
||||
func (tm *DAG) renderComprehensiveDAG(sb *strings.Builder, prefix, indent string) {
|
||||
sortedNodes := tm.TopologicalSort()
|
||||
|
||||
// Step 1: Render all regular nodes (not sub-DAGs)
|
||||
sb.WriteString(fmt.Sprintf("%s// === MAIN DAG NODES ===\n", indent))
|
||||
for _, nodeKey := range sortedNodes {
|
||||
node, _ := tm.nodes.Get(nodeKey)
|
||||
if !tm.isSubDAGNode(node) {
|
||||
renderNode(sb, node, indent, prefix)
|
||||
}
|
||||
}
|
||||
sb.WriteString("\n")
|
||||
|
||||
// Step 2: Render sub-DAG clusters AND their representative nodes
|
||||
sb.WriteString(fmt.Sprintf("%s// === SUB-DAG CLUSTERS ===\n", indent))
|
||||
for _, nodeKey := range sortedNodes {
|
||||
node, _ := tm.nodes.Get(nodeKey)
|
||||
if subDAG, ok := isDAGNode(node); ok && subDAG.consumerTopic != "" {
|
||||
tm.renderSubDAGWithRepresentativeNode(sb, nodeKey, node, subDAG, prefix, indent)
|
||||
}
|
||||
}
|
||||
sb.WriteString("\n")
|
||||
|
||||
// Step 3: Render all regular edges (including those connecting to sub-DAGs)
|
||||
sb.WriteString(fmt.Sprintf("%s// === REGULAR EDGES ===\n", indent))
|
||||
for _, nodeKey := range sortedNodes {
|
||||
node, _ := tm.nodes.Get(nodeKey)
|
||||
renderEdges(sb, node, indent, prefix)
|
||||
}
|
||||
sb.WriteString("\n")
|
||||
|
||||
// Step 4: Render all conditional edges
|
||||
sb.WriteString(fmt.Sprintf("%s// === CONDITIONAL EDGES ===\n", indent))
|
||||
tm.renderAllConditionalEdges(sb, prefix, indent, sortedNodes)
|
||||
}
|
||||
|
||||
// renderAllConditionalEdges renders all conditional edges from main DAG
|
||||
func (tm *DAG) renderAllConditionalEdges(sb *strings.Builder, prefix, indent string, sortedNodes []string) {
|
||||
if len(tm.conditions) > 0 {
|
||||
@@ -417,74 +382,6 @@ func (tm *DAG) renderAllConditionalEdges(sb *strings.Builder, prefix, indent str
|
||||
}
|
||||
}
|
||||
|
||||
// renderSubDAGWithRepresentativeNode renders both the cluster and a representative node for the sub-DAG
|
||||
func (tm *DAG) renderSubDAGWithRepresentativeNode(sb *strings.Builder, nodeKey string, node *Node, subDAG *DAG, prefix, indent string) {
|
||||
subPrefix := fmt.Sprintf("%s%s_", prefix, subDAG.name)
|
||||
clusterName := fmt.Sprintf("%s%s", prefix, subDAG.name)
|
||||
|
||||
// First, render the representative node for the sub-DAG (this is what edges connect to)
|
||||
renderNode(sb, node, indent, prefix)
|
||||
|
||||
// Then render the sub-DAG cluster with internal structure
|
||||
sb.WriteString(fmt.Sprintf("%ssubgraph \"cluster_%s\" {\n", indent, clusterName))
|
||||
sb.WriteString(fmt.Sprintf("%s // Sub-DAG cluster styling\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s label=\"🔄 Sub-DAG: %s (Internal Structure)\";\n", indent, subDAG.name))
|
||||
sb.WriteString(fmt.Sprintf("%s style=\"filled,dashed\";\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s fillcolor=\"#F0F8FF\";\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s color=\"#4169E1\";\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s penwidth=2;\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s fontname=\"Arial Bold\";\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s fontsize=12;\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s fontcolor=\"#191970\";\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s margin=15;\n", indent))
|
||||
sb.WriteString("\n")
|
||||
|
||||
// Render sub-DAG internal nodes
|
||||
subSortedNodes := subDAG.TopologicalSort()
|
||||
for _, subNodeKey := range subSortedNodes {
|
||||
subNode, _ := subDAG.nodes.Get(subNodeKey)
|
||||
renderNode(sb, subNode, indent+" ", subPrefix)
|
||||
}
|
||||
|
||||
// Render sub-DAG internal edges
|
||||
for _, subNodeKey := range subSortedNodes {
|
||||
subNode, _ := subDAG.nodes.Get(subNodeKey)
|
||||
renderEdges(sb, subNode, indent+" ", subPrefix)
|
||||
}
|
||||
|
||||
// Render sub-DAG internal conditional edges
|
||||
if len(subDAG.conditions) > 0 {
|
||||
for fromNodeKey, conditions := range subDAG.conditions {
|
||||
for when, then := range conditions {
|
||||
if toNode, ok := subDAG.nodes.Get(then); ok {
|
||||
tm.renderConditionalEdge(sb, fromNodeKey, toNode.ID, when, subPrefix, indent+" ")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sb.WriteString(fmt.Sprintf("%s}\n", indent))
|
||||
|
||||
// Add a visual connection from the representative node to the sub-DAG cluster
|
||||
if len(subSortedNodes) > 0 {
|
||||
startNodeKey := subSortedNodes[0]
|
||||
representativeID := fmt.Sprintf("%s%s", prefix, nodeKey)
|
||||
subStartID := fmt.Sprintf("%s%s", subPrefix, startNodeKey)
|
||||
|
||||
sb.WriteString(fmt.Sprintf("%s// Connection to sub-DAG internal structure\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s\"%s\" -> \"%s\" [\n", indent, representativeID, subStartID))
|
||||
sb.WriteString(fmt.Sprintf("%s label=\"📥 Internal Flow\",\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s color=\"#FF6347\",\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s style=\"dashed\",\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s fontsize=10,\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s fontcolor=\"#FF6347\",\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s arrowsize=0.8,\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s penwidth=2,\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s constraint=false\n", indent)) // Don't affect layout
|
||||
sb.WriteString(fmt.Sprintf("%s];\n", indent))
|
||||
}
|
||||
}
|
||||
|
||||
// isSubDAGNode checks if a node contains a sub-DAG
|
||||
func (tm *DAG) isSubDAGNode(node *Node) bool {
|
||||
if node.processor == nil {
|
||||
@@ -564,65 +461,6 @@ func renderNode(sb *strings.Builder, node *Node, indent string, prefix ...string
|
||||
sb.WriteString(fmt.Sprintf("%s];\n", indent))
|
||||
}
|
||||
|
||||
// renderEdges creates professional edge representations with enhanced styling
|
||||
func renderEdges(sb *strings.Builder, node *Node, indent string, prefix ...string) {
|
||||
prefixedID := fmt.Sprintf("%s%s", strings.Join(prefix, ""), node.ID)
|
||||
|
||||
for i, edge := range node.Edges {
|
||||
var (
|
||||
edgeStyle string
|
||||
edgeColor string
|
||||
penWidth string
|
||||
arrowSize string
|
||||
edgeIcon string
|
||||
)
|
||||
|
||||
switch edge.Type {
|
||||
case Iterator:
|
||||
edgeStyle = "dashed"
|
||||
edgeColor = "#5DADE2"
|
||||
penWidth = "2.0"
|
||||
arrowSize = "1.0"
|
||||
edgeIcon = "🔄"
|
||||
case Simple:
|
||||
edgeStyle = "solid"
|
||||
edgeColor = "#7F8C8D"
|
||||
penWidth = "1.8"
|
||||
arrowSize = "0.9"
|
||||
edgeIcon = "→"
|
||||
default:
|
||||
edgeStyle = "solid"
|
||||
edgeColor = "#95A5A6"
|
||||
penWidth = "1.5"
|
||||
arrowSize = "0.8"
|
||||
edgeIcon = "•"
|
||||
}
|
||||
|
||||
toPrefixedID := fmt.Sprintf("%s%s", strings.Join(prefix, ""), edge.To.ID)
|
||||
|
||||
// Create enhanced edge label
|
||||
edgeLabel := edge.Label
|
||||
if edgeLabel == "" {
|
||||
edgeLabel = fmt.Sprintf("Step %d", i+1)
|
||||
}
|
||||
enhancedLabel := fmt.Sprintf("%s %s", edgeIcon, edgeLabel)
|
||||
|
||||
sb.WriteString(fmt.Sprintf("%s\"%s\" -> \"%s\" [\n", indent, prefixedID, toPrefixedID))
|
||||
sb.WriteString(fmt.Sprintf("%s label=\"%s\",\n", indent, enhancedLabel))
|
||||
sb.WriteString(fmt.Sprintf("%s style=\"%s\",\n", indent, edgeStyle))
|
||||
sb.WriteString(fmt.Sprintf("%s color=\"%s\",\n", indent, edgeColor))
|
||||
sb.WriteString(fmt.Sprintf("%s penwidth=%s,\n", indent, penWidth))
|
||||
sb.WriteString(fmt.Sprintf("%s arrowsize=%s,\n", indent, arrowSize))
|
||||
sb.WriteString(fmt.Sprintf("%s fontcolor=\"%s\",\n", indent, edgeColor))
|
||||
sb.WriteString(fmt.Sprintf("%s fontsize=11,\n", indent))
|
||||
sb.WriteString(fmt.Sprintf("%s labeldistance=1.5,\n", indent)) // Better label positioning
|
||||
sb.WriteString(fmt.Sprintf("%s labelangle=0,\n", indent)) // Keep labels horizontal
|
||||
sb.WriteString(fmt.Sprintf("%s minlen=2,\n", indent)) // Minimum length for spacing
|
||||
sb.WriteString(fmt.Sprintf("%s tooltip=\"%s -> %s: %s\"\n", indent, node.Label, edge.To.Label, edgeLabel))
|
||||
sb.WriteString(fmt.Sprintf("%s];\n", indent))
|
||||
}
|
||||
}
|
||||
|
||||
func (tm *DAG) TopologicalSort() (stack []string) {
|
||||
visited := make(map[string]bool)
|
||||
tm.nodes.ForEach(func(_ string, node *Node) bool {
|
||||
|
@@ -98,7 +98,7 @@ func main() {
|
||||
flow.AddNode(dag.Function, "Send Standard Email", "SendStandardEmail", &SendStandardEmailNode{})
|
||||
flow.AddNode(dag.Page, "Success Page", "SuccessPage", &SuccessPageNode{})
|
||||
flow.AddNode(dag.Page, "Error Page", "ErrorPage", &EmailErrorPageNode{})
|
||||
flow.AddEdge(dag.Simple, "Login to Contact", "Login", "ContactForm")
|
||||
flow.AddEdge(dag.Simple, "Login to Contact", "Login.Output", "ContactForm")
|
||||
// Define conditional flow
|
||||
flow.AddEdge(dag.Simple, "Form to Validation", "ContactForm", "ValidateContact")
|
||||
flow.AddCondition("ValidateContact", map[string]string{
|
||||
|
Reference in New Issue
Block a user