This commit is contained in:
sujit
2025-09-20 10:12:34 +05:45
parent 61fff5b6fd
commit ec006034ff
4 changed files with 218 additions and 24 deletions

View File

@@ -82,6 +82,8 @@ type DAG struct {
iteratorNodes storage.IMap[string, []Edge] iteratorNodes storage.IMap[string, []Edge]
conditions map[string]map[string]string conditions map[string]map[string]string
Error error Error error
parentDAG *DAG
nodeIDInParentDAG string
consumer *mq.Consumer consumer *mq.Consumer
finalResult func(taskID string, result mq.Result) finalResult func(taskID string, result mq.Result)
pool *mq.Pool pool *mq.Pool

View File

@@ -180,6 +180,8 @@ func (tm *DAG) AddDAGNode(nodeType NodeType, name string, key string, dag *DAG,
isReady: true, isReady: true,
IsLast: true, // Assume it's last until edges are added IsLast: true, // Assume it's last until edges are added
}) })
dag.parentDAG = tm
dag.nodeIDInParentDAG = key
if len(firstNode) > 0 && firstNode[0] { if len(firstNode) > 0 && firstNode[0] {
// If there was a previous start node, unset its IsFirst // If there was a previous start node, unset its IsFirst
if tm.startNode != "" { if tm.startNode != "" {

View File

@@ -3,11 +3,15 @@ package dag
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/oarkflow/json" "github.com/oarkflow/json"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
dagstorage "github.com/oarkflow/mq/dag/storage"
"github.com/oarkflow/mq/logger" "github.com/oarkflow/mq/logger"
"github.com/oarkflow/mq/storage"
"github.com/oarkflow/mq/storage/memory"
) )
// debugDAGTaskStart logs debug information when a task starts at DAG level // debugDAGTaskStart logs debug information when a task starts at DAG level
@@ -198,33 +202,219 @@ func (tm *DAG) FlushActivityLogs() error {
return fmt.Errorf("activity logger not initialized") return fmt.Errorf("activity logger not initialized")
} }
// Clone creates a deep copy of the DAG // Clone creates a deep copy of the DAG instance as a separate instance
func (tm *DAG) Clone() *DAG { // This function creates a completely independent copy of the DAG with all its nodes,
newDAG := NewDAG(tm.name+"_clone", tm.key, tm.finalResult) // edges, conditions, and internal state. The cloned DAG can be modified without
// affecting the original DAG.
func (d *DAG) Clone() *DAG {
// Create new DAG instance with basic fields
clone := &DAG{
// Primitive fields (shallow copy)
key: d.key,
name: d.name,
startNode: d.startNode,
consumerTopic: d.consumerTopic,
report: d.report,
httpPrefix: d.httpPrefix,
hasPageNode: d.hasPageNode,
paused: d.paused,
debug: d.debug,
Error: d.Error, // Error is safe to shallow copy
// Copy nodes // Function pointers (shallow copy)
tm.nodes.ForEach(func(id string, node *Node) bool { finalResult: d.finalResult,
newDAG.AddNode(node.NodeType, node.Label, node.ID, node.processor) reportNodeResultCallback: d.reportNodeResultCallback,
return true PreProcessHook: d.PreProcessHook,
}) PostProcessHook: d.PostProcessHook,
// Copy edges // Initialize storage maps
tm.nodes.ForEach(func(id string, node *Node) bool { nodes: memory.New[string, *Node](),
for _, edge := range node.Edges { taskManager: memory.New[string, *TaskManager](),
newDAG.AddEdge(edge.Type, edge.Label, edge.From.ID, edge.To.ID) iteratorNodes: memory.New[string, []Edge](),
}
return true
})
// Copy conditions // Initialize other maps
for fromNode, conditions := range tm.conditions { conditions: make(map[string]map[string]string),
newDAG.AddCondition(fromNode, conditions) nextNodesCache: make(map[string][]*Node),
prevNodesCache: make(map[string][]*Node),
circuitBreakers: make(map[string]*CircuitBreaker),
nodeMiddlewares: make(map[string][]mq.Handler),
// Initialize slices
globalMiddlewares: make([]mq.Handler, 0),
// Initialize mutexes
circuitBreakersMu: sync.RWMutex{},
middlewaresMu: sync.RWMutex{},
// Create new task storage
taskStorage: dagstorage.NewMemoryTaskStorage(),
} }
// Copy start node // Deep copy nodes
newDAG.SetStartNode(tm.startNode) d.nodes.ForEach(func(nodeID string, node *Node) bool {
clonedNode := d.cloneNode(node)
clone.nodes.Set(nodeID, clonedNode)
return true
})
return newDAG // Deep copy iterator nodes
d.iteratorNodes.ForEach(func(nodeID string, edges []Edge) bool {
clonedEdges := make([]Edge, len(edges))
for i, edge := range edges {
clonedEdges[i] = d.cloneEdge(edge, clone.nodes)
}
clone.iteratorNodes.Set(nodeID, clonedEdges)
return true
})
// Deep copy conditions
for nodeID, conds := range d.conditions {
cloneConds := make(map[string]string)
for k, v := range conds {
cloneConds[k] = v
}
clone.conditions[nodeID] = cloneConds
}
// Deep copy caches
for nodeID, nodes := range d.nextNodesCache {
clonedNodes := make([]*Node, len(nodes))
for i, node := range nodes {
// Find the cloned node by ID
if clonedNode, exists := clone.nodes.Get(node.ID); exists {
clonedNodes[i] = clonedNode
}
}
clone.nextNodesCache[nodeID] = clonedNodes
}
for nodeID, nodes := range d.prevNodesCache {
clonedNodes := make([]*Node, len(nodes))
for i, node := range nodes {
// Find the cloned node by ID
if clonedNode, exists := clone.nodes.Get(node.ID); exists {
clonedNodes[i] = clonedNode
}
}
clone.prevNodesCache[nodeID] = clonedNodes
}
// Deep copy circuit breakers
for nodeID, cb := range d.circuitBreakers {
// Create new circuit breaker with same config
if cb.config != nil {
newCB := NewCircuitBreaker(cb.config, nil) // Logger will be set later if needed
clone.circuitBreakers[nodeID] = newCB
}
}
// Deep copy node middlewares
for nodeID, handlers := range d.nodeMiddlewares {
clonedHandlers := make([]mq.Handler, len(handlers))
copy(clonedHandlers, handlers)
clone.nodeMiddlewares[nodeID] = clonedHandlers
}
// Deep copy global middlewares
clone.globalMiddlewares = make([]mq.Handler, len(d.globalMiddlewares))
copy(clone.globalMiddlewares, d.globalMiddlewares)
// Deep copy metrics
if d.metrics != nil {
clone.metrics = &TaskMetrics{
NotStarted: d.metrics.NotStarted,
Queued: d.metrics.Queued,
Cancelled: d.metrics.Cancelled,
Completed: d.metrics.Completed,
Failed: d.metrics.Failed,
}
}
// Initialize server with minimal configuration to prevent nil pointer panics
// The cloned DAG will need to be properly configured before use
clone.server = mq.NewBroker(
mq.WithCallback(clone.onTaskCallback),
mq.WithConsumerOnSubscribe(clone.onConsumerJoin),
mq.WithConsumerOnClose(clone.onConsumerClose),
)
// Initialize logger-dependent managers with null logger as fallback
nullLogger := &logger.NullLogger{}
clone.validator = NewDAGValidator(clone)
clone.monitor = NewMonitor(clone, nullLogger)
clone.retryManager = NewNodeRetryManager(nil, nullLogger)
clone.rateLimiter = NewRateLimiter(nullLogger)
clone.cache = NewDAGCache(5*time.Minute, 1000, nullLogger)
clone.configManager = NewConfigManager(nullLogger)
clone.batchProcessor = NewBatchProcessor(clone, 50, 5*time.Second, nullLogger)
clone.transactionManager = NewTransactionManager(clone, nullLogger)
clone.cleanupManager = NewCleanupManager(clone, 10*time.Minute, 1*time.Hour, 1000, nullLogger)
clone.performanceOptimizer = NewPerformanceOptimizer(clone, clone.monitor, clone.configManager, nullLogger)
// Note: Shared resources like consumer, pool, scheduler, Notifier
// are intentionally NOT cloned as they represent shared system resources.
// The cloned DAG will need to initialize these separately if needed.
// Note: Manager objects are initialized with null logger as fallback.
// The cloned DAG should be properly configured with a real logger before use.
return clone
}
// cloneNode creates a deep copy of a Node
func (d *DAG) cloneNode(node *Node) *Node {
clonedNode := &Node{
Label: node.Label,
ID: node.ID,
NodeType: node.NodeType,
isReady: node.isReady,
Timeout: node.Timeout,
Debug: node.Debug,
IsFirst: node.IsFirst,
IsLast: node.IsLast,
}
// Deep copy edges
clonedNode.Edges = make([]Edge, len(node.Edges))
for i, edge := range node.Edges {
clonedNode.Edges[i] = d.cloneEdge(edge, nil) // Will be updated later with cloned nodes
}
// Clone processor - this is complex as it could be a DAG or other processor
// For now, we'll shallow copy and let the caller handle processor-specific cloning
clonedNode.processor = node.processor
return clonedNode
}
// cloneEdge creates a copy of an Edge, optionally updating node references
func (d *DAG) cloneEdge(edge Edge, nodeMap storage.IMap[string, *Node]) Edge {
clonedEdge := Edge{
FromSource: edge.FromSource,
Label: edge.Label,
Type: edge.Type,
}
// Update node references if nodeMap is provided
if nodeMap != nil {
if clonedFrom, exists := nodeMap.Get(edge.From.ID); exists {
clonedEdge.From = clonedFrom
} else {
clonedEdge.From = edge.From // Keep original if clone not found
}
if clonedTo, exists := nodeMap.Get(edge.To.ID); exists {
clonedEdge.To = clonedTo
} else {
clonedEdge.To = edge.To // Keep original if clone not found
}
} else {
// Keep original node references if no nodeMap provided
clonedEdge.From = edge.From
clonedEdge.To = edge.To
}
return clonedEdge
} }
// Export exports the DAG structure to a serializable format // Export exports the DAG structure to a serializable format

View File

@@ -26,15 +26,15 @@ func main() {
// Add SMS workflow nodes // Add SMS workflow nodes
// Note: Page nodes have no timeout by default, allowing users unlimited time for form input // Note: Page nodes have no timeout by default, allowing users unlimited time for form input
flow.AddDAGNode(dag.Page, "Login", "login", loginSubDAG(), true) // flow.AddDAGNode(dag.Page, "Login", "login", loginSubDAG().Clone(), true)
flow.AddNode(dag.Page, "SMS Form", "SMSForm", &SMSFormNode{}) flow.AddNode(dag.Page, "SMS Form", "SMSForm", &SMSFormNode{}, true)
flow.AddNode(dag.Function, "Validate Input", "ValidateInput", &ValidateInputNode{}) flow.AddNode(dag.Function, "Validate Input", "ValidateInput", &ValidateInputNode{})
flow.AddNode(dag.Function, "Send SMS", "SendSMS", &SendSMSNode{}) flow.AddNode(dag.Function, "Send SMS", "SendSMS", &SendSMSNode{})
flow.AddNode(dag.Page, "SMS Result", "SMSResult", &SMSResultNode{}) flow.AddNode(dag.Page, "SMS Result", "SMSResult", &SMSResultNode{})
flow.AddNode(dag.Page, "Error Page", "ErrorPage", &ErrorPageNode{}) flow.AddNode(dag.Page, "Error Page", "ErrorPage", &ErrorPageNode{})
// Define edges for SMS workflow // Define edges for SMS workflow
flow.AddEdge(dag.Simple, "Login to Form", "login", "SMSForm") // flow.AddEdge(dag.Simple, "Login to Form", "login", "SMSForm")
flow.AddEdge(dag.Simple, "Form to Validation", "SMSForm", "ValidateInput") flow.AddEdge(dag.Simple, "Form to Validation", "SMSForm", "ValidateInput")
flow.AddCondition("ValidateInput", map[string]string{"valid": "SendSMS"}) // Removed invalid -> ErrorPage since we use ResetTo flow.AddCondition("ValidateInput", map[string]string{"valid": "SendSMS"}) // Removed invalid -> ErrorPage since we use ResetTo
flow.AddCondition("SendSMS", map[string]string{"sent": "SMSResult", "failed": "ErrorPage"}) flow.AddCondition("SendSMS", map[string]string{"sent": "SMSResult", "failed": "ErrorPage"})