diff --git a/dag/dag.go b/dag/dag.go index e80c091..ef508ef 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -48,10 +48,11 @@ func (n *Node) SetTimeout(d time.Duration) { } type Edge struct { - From *Node - To *Node - Label string - Type EdgeType + From *Node + FromSource string + To *Node + Label string + Type EdgeType } type DAG struct { @@ -310,6 +311,14 @@ func (tm *DAG) IsReady() bool { return isReady } +func (tm *DAG) resolveNode(nodeID string) (*Node, bool) { + nodeParts := strings.Split(nodeID, ".") + if len(nodeParts) > 1 { + nodeID = nodeParts[0] + } + return tm.nodes.Get(nodeID) +} + func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG { if tm.Error != nil { return tm @@ -317,14 +326,14 @@ func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) if edgeType == Iterator { tm.iteratorNodes.Set(from, []Edge{}) } - node, ok := tm.nodes.Get(from) + node, ok := tm.resolveNode(from) if !ok { tm.Error = fmt.Errorf("node not found %s", from) return tm } for _, target := range targets { if targetNode, ok := tm.nodes.Get(target); ok { - edge := Edge{From: node, To: targetNode, Type: edgeType, Label: label} + edge := Edge{From: node, To: targetNode, Type: edgeType, Label: label, FromSource: from} node.Edges = append(node.Edges, edge) if edgeType != Iterator { if edges, ok := tm.iteratorNodes.Get(node.ID); ok { diff --git a/dag/ui.go b/dag/ui.go index 7e60698..d0c3d31 100644 --- a/dag/ui.go +++ b/dag/ui.go @@ -10,7 +10,6 @@ import ( func (tm *DAG) PrintGraph() { fmt.Println("DAG Graph structure:") tm.nodes.ForEach(func(_ string, node *Node) bool { - fmt.Printf("Node: %s (%s) -> ", node.Label, node.ID) if conditions, ok := tm.conditions[node.ID]; ok { var c []string for when, then := range conditions { @@ -197,7 +196,9 @@ func (tm *DAG) renderCleanDAG(sb *strings.Builder, indent string) { sb.WriteString(fmt.Sprintf("%s// Main DAG Nodes\n", indent)) for _, nodeID := range sortedNodes { node, _ := tm.nodes.Get(nodeID) - tm.renderCleanNode(sb, node, indent) + if !tm.isSubDAGNode(node) { + tm.renderCleanNode(sb, node, indent) + } } sb.WriteString("\n") @@ -333,7 +334,8 @@ func (tm *DAG) renderPrefixedNode(sb *strings.Builder, node *Node, prefixedID, i // renderCleanEdges renders edges for a node func (tm *DAG) renderCleanEdges(sb *strings.Builder, node *Node, indent string) { for _, edge := range node.Edges { - sb.WriteString(fmt.Sprintf("%s\"%s\" -> \"%s\"", indent, node.ID, edge.To.ID)) + from := strings.Join(strings.Split(edge.FromSource, "."), "_") + sb.WriteString(fmt.Sprintf("%s\"%s\" -> \"%s\"", indent, from, edge.To.ID)) if edge.Label != "" { sb.WriteString(fmt.Sprintf(` [label="%s"]`, edge.Label)) } @@ -367,43 +369,6 @@ func (tm *DAG) renderCleanConditionalEdges(sb *strings.Builder, indent string) { } } -// renderComprehensiveDAG provides a complete rendering solution that handles overlapping and connections properly -func (tm *DAG) renderComprehensiveDAG(sb *strings.Builder, prefix, indent string) { - sortedNodes := tm.TopologicalSort() - - // Step 1: Render all regular nodes (not sub-DAGs) - sb.WriteString(fmt.Sprintf("%s// === MAIN DAG NODES ===\n", indent)) - for _, nodeKey := range sortedNodes { - node, _ := tm.nodes.Get(nodeKey) - if !tm.isSubDAGNode(node) { - renderNode(sb, node, indent, prefix) - } - } - sb.WriteString("\n") - - // Step 2: Render sub-DAG clusters AND their representative nodes - sb.WriteString(fmt.Sprintf("%s// === SUB-DAG CLUSTERS ===\n", indent)) - for _, nodeKey := range sortedNodes { - node, _ := tm.nodes.Get(nodeKey) - if subDAG, ok := isDAGNode(node); ok && subDAG.consumerTopic != "" { - tm.renderSubDAGWithRepresentativeNode(sb, nodeKey, node, subDAG, prefix, indent) - } - } - sb.WriteString("\n") - - // Step 3: Render all regular edges (including those connecting to sub-DAGs) - sb.WriteString(fmt.Sprintf("%s// === REGULAR EDGES ===\n", indent)) - for _, nodeKey := range sortedNodes { - node, _ := tm.nodes.Get(nodeKey) - renderEdges(sb, node, indent, prefix) - } - sb.WriteString("\n") - - // Step 4: Render all conditional edges - sb.WriteString(fmt.Sprintf("%s// === CONDITIONAL EDGES ===\n", indent)) - tm.renderAllConditionalEdges(sb, prefix, indent, sortedNodes) -} - // renderAllConditionalEdges renders all conditional edges from main DAG func (tm *DAG) renderAllConditionalEdges(sb *strings.Builder, prefix, indent string, sortedNodes []string) { if len(tm.conditions) > 0 { @@ -417,74 +382,6 @@ func (tm *DAG) renderAllConditionalEdges(sb *strings.Builder, prefix, indent str } } -// renderSubDAGWithRepresentativeNode renders both the cluster and a representative node for the sub-DAG -func (tm *DAG) renderSubDAGWithRepresentativeNode(sb *strings.Builder, nodeKey string, node *Node, subDAG *DAG, prefix, indent string) { - subPrefix := fmt.Sprintf("%s%s_", prefix, subDAG.name) - clusterName := fmt.Sprintf("%s%s", prefix, subDAG.name) - - // First, render the representative node for the sub-DAG (this is what edges connect to) - renderNode(sb, node, indent, prefix) - - // Then render the sub-DAG cluster with internal structure - sb.WriteString(fmt.Sprintf("%ssubgraph \"cluster_%s\" {\n", indent, clusterName)) - sb.WriteString(fmt.Sprintf("%s // Sub-DAG cluster styling\n", indent)) - sb.WriteString(fmt.Sprintf("%s label=\"🔄 Sub-DAG: %s (Internal Structure)\";\n", indent, subDAG.name)) - sb.WriteString(fmt.Sprintf("%s style=\"filled,dashed\";\n", indent)) - sb.WriteString(fmt.Sprintf("%s fillcolor=\"#F0F8FF\";\n", indent)) - sb.WriteString(fmt.Sprintf("%s color=\"#4169E1\";\n", indent)) - sb.WriteString(fmt.Sprintf("%s penwidth=2;\n", indent)) - sb.WriteString(fmt.Sprintf("%s fontname=\"Arial Bold\";\n", indent)) - sb.WriteString(fmt.Sprintf("%s fontsize=12;\n", indent)) - sb.WriteString(fmt.Sprintf("%s fontcolor=\"#191970\";\n", indent)) - sb.WriteString(fmt.Sprintf("%s margin=15;\n", indent)) - sb.WriteString("\n") - - // Render sub-DAG internal nodes - subSortedNodes := subDAG.TopologicalSort() - for _, subNodeKey := range subSortedNodes { - subNode, _ := subDAG.nodes.Get(subNodeKey) - renderNode(sb, subNode, indent+" ", subPrefix) - } - - // Render sub-DAG internal edges - for _, subNodeKey := range subSortedNodes { - subNode, _ := subDAG.nodes.Get(subNodeKey) - renderEdges(sb, subNode, indent+" ", subPrefix) - } - - // Render sub-DAG internal conditional edges - if len(subDAG.conditions) > 0 { - for fromNodeKey, conditions := range subDAG.conditions { - for when, then := range conditions { - if toNode, ok := subDAG.nodes.Get(then); ok { - tm.renderConditionalEdge(sb, fromNodeKey, toNode.ID, when, subPrefix, indent+" ") - } - } - } - } - - sb.WriteString(fmt.Sprintf("%s}\n", indent)) - - // Add a visual connection from the representative node to the sub-DAG cluster - if len(subSortedNodes) > 0 { - startNodeKey := subSortedNodes[0] - representativeID := fmt.Sprintf("%s%s", prefix, nodeKey) - subStartID := fmt.Sprintf("%s%s", subPrefix, startNodeKey) - - sb.WriteString(fmt.Sprintf("%s// Connection to sub-DAG internal structure\n", indent)) - sb.WriteString(fmt.Sprintf("%s\"%s\" -> \"%s\" [\n", indent, representativeID, subStartID)) - sb.WriteString(fmt.Sprintf("%s label=\"📥 Internal Flow\",\n", indent)) - sb.WriteString(fmt.Sprintf("%s color=\"#FF6347\",\n", indent)) - sb.WriteString(fmt.Sprintf("%s style=\"dashed\",\n", indent)) - sb.WriteString(fmt.Sprintf("%s fontsize=10,\n", indent)) - sb.WriteString(fmt.Sprintf("%s fontcolor=\"#FF6347\",\n", indent)) - sb.WriteString(fmt.Sprintf("%s arrowsize=0.8,\n", indent)) - sb.WriteString(fmt.Sprintf("%s penwidth=2,\n", indent)) - sb.WriteString(fmt.Sprintf("%s constraint=false\n", indent)) // Don't affect layout - sb.WriteString(fmt.Sprintf("%s];\n", indent)) - } -} - // isSubDAGNode checks if a node contains a sub-DAG func (tm *DAG) isSubDAGNode(node *Node) bool { if node.processor == nil { @@ -564,65 +461,6 @@ func renderNode(sb *strings.Builder, node *Node, indent string, prefix ...string sb.WriteString(fmt.Sprintf("%s];\n", indent)) } -// renderEdges creates professional edge representations with enhanced styling -func renderEdges(sb *strings.Builder, node *Node, indent string, prefix ...string) { - prefixedID := fmt.Sprintf("%s%s", strings.Join(prefix, ""), node.ID) - - for i, edge := range node.Edges { - var ( - edgeStyle string - edgeColor string - penWidth string - arrowSize string - edgeIcon string - ) - - switch edge.Type { - case Iterator: - edgeStyle = "dashed" - edgeColor = "#5DADE2" - penWidth = "2.0" - arrowSize = "1.0" - edgeIcon = "🔄" - case Simple: - edgeStyle = "solid" - edgeColor = "#7F8C8D" - penWidth = "1.8" - arrowSize = "0.9" - edgeIcon = "→" - default: - edgeStyle = "solid" - edgeColor = "#95A5A6" - penWidth = "1.5" - arrowSize = "0.8" - edgeIcon = "•" - } - - toPrefixedID := fmt.Sprintf("%s%s", strings.Join(prefix, ""), edge.To.ID) - - // Create enhanced edge label - edgeLabel := edge.Label - if edgeLabel == "" { - edgeLabel = fmt.Sprintf("Step %d", i+1) - } - enhancedLabel := fmt.Sprintf("%s %s", edgeIcon, edgeLabel) - - sb.WriteString(fmt.Sprintf("%s\"%s\" -> \"%s\" [\n", indent, prefixedID, toPrefixedID)) - sb.WriteString(fmt.Sprintf("%s label=\"%s\",\n", indent, enhancedLabel)) - sb.WriteString(fmt.Sprintf("%s style=\"%s\",\n", indent, edgeStyle)) - sb.WriteString(fmt.Sprintf("%s color=\"%s\",\n", indent, edgeColor)) - sb.WriteString(fmt.Sprintf("%s penwidth=%s,\n", indent, penWidth)) - sb.WriteString(fmt.Sprintf("%s arrowsize=%s,\n", indent, arrowSize)) - sb.WriteString(fmt.Sprintf("%s fontcolor=\"%s\",\n", indent, edgeColor)) - sb.WriteString(fmt.Sprintf("%s fontsize=11,\n", indent)) - sb.WriteString(fmt.Sprintf("%s labeldistance=1.5,\n", indent)) // Better label positioning - sb.WriteString(fmt.Sprintf("%s labelangle=0,\n", indent)) // Keep labels horizontal - sb.WriteString(fmt.Sprintf("%s minlen=2,\n", indent)) // Minimum length for spacing - sb.WriteString(fmt.Sprintf("%s tooltip=\"%s -> %s: %s\"\n", indent, node.Label, edge.To.Label, edgeLabel)) - sb.WriteString(fmt.Sprintf("%s];\n", indent)) - } -} - func (tm *DAG) TopologicalSort() (stack []string) { visited := make(map[string]bool) tm.nodes.ForEach(func(_ string, node *Node) bool { diff --git a/examples/email_notification_dag.go b/examples/email_notification_dag.go index 15bf832..0e0da58 100644 --- a/examples/email_notification_dag.go +++ b/examples/email_notification_dag.go @@ -98,7 +98,7 @@ func main() { flow.AddNode(dag.Function, "Send Standard Email", "SendStandardEmail", &SendStandardEmailNode{}) flow.AddNode(dag.Page, "Success Page", "SuccessPage", &SuccessPageNode{}) flow.AddNode(dag.Page, "Error Page", "ErrorPage", &EmailErrorPageNode{}) - flow.AddEdge(dag.Simple, "Login to Contact", "Login", "ContactForm") + flow.AddEdge(dag.Simple, "Login to Contact", "Login.Output", "ContactForm") // Define conditional flow flow.AddEdge(dag.Simple, "Form to Validation", "ContactForm", "ValidateContact") flow.AddCondition("ValidateContact", map[string]string{