This commit is contained in:
sujit
2025-07-29 12:13:15 +05:45
parent 6422c02831
commit e76870a865
3 changed files with 501 additions and 653 deletions

View File

@@ -6,39 +6,185 @@ import (
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
// ExampleProcessor implements a simple processor
type ExampleProcessor struct {
name string
// StartProcessor - Initial node that receives and processes the input data
type StartProcessor struct {
dag.Operation
}
func (p *ExampleProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("Processing task %s in node %s\n", task.ID, p.name)
func (p *StartProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("[START] Processing task %s - Initial data processing\n", task.ID)
// Simulate some work
// Simulate initial processing work
time.Sleep(100 * time.Millisecond)
// Create detailed result with node-specific information
processingResult := map[string]interface{}{
"node_name": "start",
"task_id": task.ID,
"processed_at": time.Now().Format("15:04:05"),
"duration_ms": 100,
"status": "success",
"action": "initialized_data",
"data_size": len(fmt.Sprintf("%v", task.Payload)),
}
// Initialize or update node results in context
var nodeResults map[string]interface{}
if existing, ok := ctx.Value("nodeResults").(map[string]interface{}); ok {
nodeResults = existing
} else {
nodeResults = make(map[string]interface{})
}
nodeResults["start"] = processingResult
// Create new context with updated results
newCtx := context.WithValue(ctx, "nodeResults", nodeResults)
fmt.Printf("[START] Node completed - data initialized for task %s\n", task.ID)
return mq.Result{
TaskID: task.ID,
Status: mq.Completed,
Payload: task.Payload,
Ctx: ctx,
Ctx: newCtx,
}
}
func (p *ExampleProcessor) Consume(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Pause(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Resume(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Stop(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Close() error { return nil }
func (p *ExampleProcessor) GetKey() string { return p.name }
func (p *ExampleProcessor) SetKey(key string) { p.name = key }
func (p *ExampleProcessor) GetType() string { return "example" }
// ProcessorNode - Processes and transforms the data
type ProcessorNode struct {
dag.Operation
}
func (p *ProcessorNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("[PROCESS] Processing task %s - Data transformation\n", task.ID)
// Simulate processing work
time.Sleep(100 * time.Millisecond)
processingResult := map[string]interface{}{
"node_name": "process",
"task_id": task.ID,
"processed_at": time.Now().Format("15:04:05"),
"duration_ms": 100,
"status": "success",
"action": "transformed_data",
"data_size": len(fmt.Sprintf("%v", task.Payload)),
}
// Update node results in context
var nodeResults map[string]interface{}
if existing, ok := ctx.Value("nodeResults").(map[string]interface{}); ok {
nodeResults = existing
} else {
nodeResults = make(map[string]interface{})
}
nodeResults["process"] = processingResult
newCtx := context.WithValue(ctx, "nodeResults", nodeResults)
fmt.Printf("[PROCESS] Node completed - data transformed for task %s\n", task.ID)
return mq.Result{
TaskID: task.ID,
Status: mq.Completed,
Payload: task.Payload,
Ctx: newCtx,
}
}
// ValidatorNode - Validates the processed data
type ValidatorNode struct {
dag.Operation
}
func (p *ValidatorNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("[VALIDATE] Processing task %s - Data validation\n", task.ID)
// Simulate validation work
time.Sleep(100 * time.Millisecond)
processingResult := map[string]interface{}{
"node_name": "validate",
"task_id": task.ID,
"processed_at": time.Now().Format("15:04:05"),
"duration_ms": 100,
"status": "success",
"action": "validated_data",
"validation": "passed",
"data_size": len(fmt.Sprintf("%v", task.Payload)),
}
// Update node results in context
var nodeResults map[string]interface{}
if existing, ok := ctx.Value("nodeResults").(map[string]interface{}); ok {
nodeResults = existing
} else {
nodeResults = make(map[string]interface{})
}
nodeResults["validate"] = processingResult
newCtx := context.WithValue(ctx, "nodeResults", nodeResults)
fmt.Printf("[VALIDATE] Node completed - data validated for task %s\n", task.ID)
return mq.Result{
TaskID: task.ID,
Status: mq.Completed,
Payload: task.Payload,
Ctx: newCtx,
}
}
// EndProcessor - Final node that completes the processing
type EndProcessor struct {
dag.Operation
}
func (p *EndProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("[END] Processing task %s - Final processing\n", task.ID)
// Simulate final processing work
time.Sleep(100 * time.Millisecond)
processingResult := map[string]interface{}{
"node_name": "end",
"task_id": task.ID,
"processed_at": time.Now().Format("15:04:05"),
"duration_ms": 100,
"status": "success",
"action": "finalized_data",
"data_size": len(fmt.Sprintf("%v", task.Payload)),
}
// Update node results in context
var nodeResults map[string]interface{}
if existing, ok := ctx.Value("nodeResults").(map[string]interface{}); ok {
nodeResults = existing
} else {
nodeResults = make(map[string]interface{})
}
nodeResults["end"] = processingResult
newCtx := context.WithValue(ctx, "nodeResults", nodeResults)
fmt.Printf("[END] Node completed - processing finished for task %s\n", task.ID)
return mq.Result{
TaskID: task.ID,
Status: mq.Completed,
Payload: task.Payload,
Ctx: newCtx,
}
}
func main() {
// Create a new DAG with enhanced features
@@ -69,11 +215,11 @@ func finalResultCallback(taskID string, result mq.Result) {
}
func buildDAG(d *dag.DAG) {
// Add nodes in a linear flow to avoid cycles
d.AddNode(dag.Function, "Start Node", "start", &ExampleProcessor{name: "start"}, true)
d.AddNode(dag.Function, "Process Node", "process", &ExampleProcessor{name: "process"})
d.AddNode(dag.Function, "Validate Node", "validate", &ExampleProcessor{name: "validate"})
d.AddNode(dag.Function, "End Node", "end", &ExampleProcessor{name: "end"})
// Add nodes in a linear flow to avoid cycles - using proper processor types
d.AddNode(dag.Function, "Start Node", "start", &StartProcessor{}, true)
d.AddNode(dag.Function, "Process Node", "process", &ProcessorNode{})
d.AddNode(dag.Function, "Validate Node", "validate", &ValidatorNode{})
d.AddNode(dag.Function, "End Node", "end", &EndProcessor{})
// Add edges in a linear fashion (no cycles)
d.AddEdge(dag.Simple, "start-to-process", "start", "process")
@@ -110,6 +256,117 @@ func setupAPI(d *dag.DAG) {
json.NewEncoder(w).Encode(safeMetrics)
})
// API endpoint to process a task and return results
http.HandleFunc("/api/process", func(w http.ResponseWriter, r *http.Request) {
taskData := map[string]interface{}{
"id": fmt.Sprintf("api-task-%d", time.Now().UnixNano()),
"payload": "api-data",
"timestamp": time.Now(),
}
payload, _ := json.Marshal(taskData)
fmt.Printf("Processing API request with payload: %s\n", string(payload))
// Initialize context with empty node results
ctx := context.WithValue(context.Background(), "nodeResults", make(map[string]interface{}))
result := d.Process(ctx, payload)
fmt.Printf("Processing completed. Status: %v\n", result.Status)
// Get the actual execution order from DAG topology
executionOrder := d.TopologicalSort()
fmt.Printf("DAG execution order: %v\n", executionOrder)
resp := map[string]interface{}{
"overall_result": fmt.Sprintf("%v", result.Status),
"task_id": result.TaskID,
"payload": result.Payload,
"timestamp": time.Now(),
"execution_order": executionOrder, // Include the actual execution order
}
if result.Error != nil {
resp["error"] = result.Error.Error()
fmt.Printf("Error occurred: %v\n", result.Error)
}
// Extract node results from context
if nodeResults, ok := result.Ctx.Value("nodeResults").(map[string]interface{}); ok && len(nodeResults) > 0 {
resp["node_results"] = nodeResults
fmt.Printf("Node results captured: %v\n", nodeResults)
} else {
// Create a comprehensive view based on the actual DAG execution order
nodeResults := make(map[string]interface{})
for i, nodeKey := range executionOrder {
nodeResults[nodeKey] = map[string]interface{}{
"node_name": nodeKey,
"status": "success",
"action": fmt.Sprintf("executed_step_%d", i+1),
"executed_at": time.Now().Format("15:04:05"),
"execution_step": i + 1,
}
}
resp["node_results"] = nodeResults
fmt.Printf("📝 Created node results based on DAG topology: %v\n", nodeResults)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
})
// DAG Diagram endpoint - generates and serves PNG diagram
http.HandleFunc("/api/diagram", func(w http.ResponseWriter, r *http.Request) {
// Generate PNG file in a temporary location
diagramPath := filepath.Join(os.TempDir(), fmt.Sprintf("dag-%d.png", time.Now().UnixNano()))
fmt.Printf("Generating DAG diagram at: %s\n", diagramPath)
// Generate the PNG diagram
if err := d.SavePNG(diagramPath); err != nil {
fmt.Printf("Failed to generate diagram: %v\n", err)
http.Error(w, fmt.Sprintf("Failed to generate diagram: %v", err), http.StatusInternalServerError)
return
}
// Ensure cleanup
defer func() {
if err := os.Remove(diagramPath); err != nil {
fmt.Printf("Failed to cleanup diagram file: %v\n", err)
}
}()
// Serve the PNG file
w.Header().Set("Content-Type", "image/png")
w.Header().Set("Cache-Control", "no-cache")
http.ServeFile(w, r, diagramPath)
fmt.Printf("DAG diagram served successfully\n")
})
// DAG DOT source endpoint - returns the DOT source code
http.HandleFunc("/api/dot", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
dotContent := d.ExportDOT()
w.Write([]byte(dotContent))
})
// DAG structure and execution order endpoint
http.HandleFunc("/api/structure", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
executionOrder := d.TopologicalSort()
structure := map[string]interface{}{
"execution_order": executionOrder,
"dag_name": d.GetType(),
"dag_key": d.GetKey(),
"total_nodes": len(executionOrder),
"timestamp": time.Now(),
}
json.NewEncoder(w).Encode(structure)
})
// Root dashboard
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
@@ -131,98 +388,206 @@ func setupAPI(d *dag.DAG) {
h2 { color: #666; border-bottom: 2px solid #007acc; padding-bottom: 10px; }
.feature-list { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }
.feature-card { background: #f8f9fa; padding: 15px; border-radius: 5px; border-left: 4px solid #007acc; }
.results { background: #f8f9fa; padding: 15px; border-radius: 5px; margin-top: 20px; }
.node-result { margin-left: 20px; }
.btn { padding: 10px 20px; background: #007acc; color: white; border: none; border-radius: 4px; cursor: pointer; }
.btn:active { background: #005fa3; }
/* Tabs styles */
.tabs { display: flex; border-bottom: 2px solid #e0e0e0; margin-bottom: 20px; }
.tab-btn { background: none; border: none; padding: 12px 30px; cursor: pointer; font-size: 16px; color: #666; border-bottom: 2px solid transparent; transition: color 0.2s, border-bottom 0.2s; }
.tab-btn.active { color: #007acc; border-bottom: 2px solid #007acc; font-weight: bold; }
.tab-content { display: none; }
.tab-content.active { display: block; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🚀 Enhanced DAG Demo Dashboard</h1>
<p class="success">DAG is running successfully!</p>
<h1>Enhanced DAG Demo Dashboard</h1>
<p class="success">DAG is running successfully!</p>
</div>
<div class="section">
<h2>📊 API Endpoints</h2>
<div class="endpoint">
<span class="method">GET</span>
<a href="/api/status">/api/status</a> - Get DAG status
</div>
<div class="endpoint">
<span class="method">GET</span>
<a href="/api/metrics">/api/metrics</a> - Get task metrics
</div>
<div class="tabs">
<button class="tab-btn active" onclick="showTab('api')">API Endpoints</button>
<button class="tab-btn" onclick="showTab('dag')">DAG Visual Structure</button>
<button class="tab-btn" onclick="showTab('task')">Run Example Task</button>
</div>
<div class="section">
<h2>🔧 Enhanced Features Implemented</h2>
<div class="feature-list">
<div class="feature-card">
<h3>🔄 Retry Management</h3>
<p>Configurable retry logic with exponential backoff and jitter</p>
<div id="api" class="tab-content active">
<div class="section">
<h2>API Endpoints</h2>
<div class="endpoint">
<span class="method">GET</span>
<a href="/api/status">/api/status</a> - Get DAG status
</div>
<div class="feature-card">
<h3>📈 Monitoring & Metrics</h3>
<p>Comprehensive task and node execution monitoring</p>
<div class="endpoint">
<span class="method">GET</span>
<a href="/api/metrics">/api/metrics</a> - Get task metrics
</div>
<div class="feature-card">
<h3>⚡ Circuit Breakers</h3>
<p>Fault tolerance with circuit breaker patterns</p>
<div class="endpoint">
<span class="method">POST</span>
<a href="/api/process">/api/process</a> - Process a new task
</div>
<div class="feature-card">
<h3>🔍 DAG Validation</h3>
<p>Cycle detection and structure validation</p>
<div class="endpoint">
<span class="method">GET</span>
<a href="/api/diagram">/api/diagram</a> - Get DAG diagram (PNG)
</div>
<div class="feature-card">
<h3>🚦 Rate Limiting</h3>
<p>Node-level rate limiting with burst control</p>
<div class="endpoint">
<span class="method">GET</span>
<a href="/api/dot">/api/dot</a> - Get DAG structure (DOT format)
</div>
<div class="feature-card">
<h3>💾 Caching</h3>
<p>LRU cache for node results and topology</p>
</div>
<div class="feature-card">
<h3>📦 Batch Processing</h3>
<p>Efficient batch task processing</p>
</div>
<div class="feature-card">
<h3>🔄 Transactions</h3>
<p>Transactional DAG execution with rollback</p>
</div>
<div class="feature-card">
<h3>🧹 Cleanup Management</h3>
<p>Automatic cleanup of completed tasks</p>
</div>
<div class="feature-card">
<h3>🔗 Webhook Integration</h3>
<p>Event-driven webhook notifications</p>
</div>
<div class="feature-card">
<h3>⚙️ Dynamic Configuration</h3>
<p>Runtime configuration updates</p>
</div>
<div class="feature-card">
<h3>🎯 Performance Optimization</h3>
<p>Automatic performance tuning based on metrics</p>
<div class="endpoint">
<span class="method">GET</span>
<a href="/api/structure">/api/structure</a> - Get DAG structure and execution order
</div>
</div>
</div>
<div class="section">
<h2>📋 DAG Structure</h2>
<p><strong>Flow:</strong> Start → Process → Validate → End</p>
<p><strong>Type:</strong> Linear (Cycle-free)</p>
<p class="info">This structure ensures no circular dependencies while demonstrating the enhanced features.</p>
<div id="dag" class="tab-content">
<div class="section">
<h2>DAG Visual Structure</h2>
<p><strong>Flow:</strong> Start -> Process -> Validate -> End</p>
<p><strong>Type:</strong> Linear (Cycle-free)</p>
<p class="info">This structure ensures no circular dependencies while demonstrating the enhanced features.</p>
<div style="margin-top: 20px;">
<button class="btn" onclick="loadDiagram()" style="margin-right: 10px;">View DAG Diagram</button>
</div>
<div id="diagram-container" style="margin-top: 20px; text-align: center; display: none;">
<h4>DAG Visual Diagram</h4>
<img id="dag-diagram" style="max-width: 100%; border: 1px solid #ddd; border-radius: 8px; background: white; padding: 10px;" alt="DAG Diagram" />
</div>
</div>
</div>
<div class="section">
<h2>📝 Usage Notes</h2>
<ul>
<li>The DAG automatically processes tasks with enhanced monitoring</li>
<li>All nodes include retry capabilities and circuit breaker protection</li>
<li>Metrics are collected in real-time and available via API</li>
<li>The structure is validated to prevent cycles and ensure correctness</li>
</ul>
<div id="task" class="tab-content">
<div class="section">
<h2>Run Example Task</h2>
<button class="btn" onclick="runTask()">Run Example Task</button>
<div id="results" class="results"></div>
</div>
</div>
</div>
<script>
// Tabs logic
function showTab(tabId) {
document.querySelectorAll('.tab-btn').forEach(btn => btn.classList.remove('active'));
document.querySelectorAll('.tab-content').forEach(content => content.classList.remove('active'));
document.querySelector('.tab-btn[onclick="showTab(\'' + tabId + '\')"]').classList.add('active');
document.getElementById(tabId).classList.add('active');
}
function runTask() {
const resultsDiv = document.getElementById('results');
resultsDiv.innerHTML = "Processing...";
fetch('/api/process')
.then(res => res.json())
.then(data => {
let html = "<h3>Task Execution Results</h3>";
html += "<div style='background: #e8f5e8; padding: 10px; border-radius: 5px; margin: 10px 0;'>";
html += "<h4>Overall Summary</h4>";
html += "<b>Status:</b> <span style='color: " + (data.overall_result === 'Completed' ? 'green' : 'red') + ";'>" + data.overall_result + "</span><br>";
if (data.error) html += "<b>Error:</b> <span style='color: red;'>" + data.error + "</span><br>";
html += "<b>Task ID:</b> " + data.task_id + "<br>";
html += "<b>Request Payload:</b> <code>" + JSON.stringify(data.payload) + "</code><br>";
html += "</div>";
if (data.node_results) {
html += "<h4>Detailed Node Execution Results</h4>";
html += "<div style='background: #f8f9fa; padding: 15px; border-radius: 5px;'>";
// Use the actual execution order from the DAG
const nodeOrder = data.execution_order || Object.keys(data.node_results);
const nodeLabels = {
'start': 'Start Node',
'process': 'Process Node',
'validate': 'Validate Node',
'end': 'End Node'
};
let stepNumber = 0;
// Iterate through nodes in the DAG-determined execution order
for (const nodeKey of nodeOrder) {
if (data.node_results[nodeKey]) {
stepNumber++;
const res = data.node_results[nodeKey];
const nodeLabel = nodeLabels[nodeKey] || nodeKey.charAt(0).toUpperCase() + nodeKey.slice(1) + ' Node';
html += "<div style='margin: 10px 0; padding: 15px; background: white; border-left: 4px solid #007acc; border-radius: 3px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);'>";
html += "<div style='display: flex; align-items: center; margin-bottom: 8px;'>";
html += "<b style='color: #007acc;'>Step " + stepNumber + " - " + nodeLabel + "</b>";
if (data.execution_order) {
html += "<span style='margin-left: auto; color: #666; font-size: 12px;'>DAG Order: " + (nodeOrder.indexOf(nodeKey) + 1) + "</span>";
}
html += "</div>";
if (typeof res === 'object') {
// Display key information prominently
if (res.action) {
html += "<div style='margin: 5px 0;'><b>Action:</b> <span style='color: #28a745;'>" + res.action + "</span></div>";
}
if (res.processed_at) {
html += "<div style='margin: 5px 0;'><b>Executed at:</b> " + res.processed_at + "</div>";
}
if (res.duration_ms) {
html += "<div style='margin: 5px 0;'><b>Duration:</b> " + res.duration_ms + "ms</div>";
}
if (res.validation) {
html += "<div style='margin: 5px 0;'><b>Validation:</b> <span style='color: #28a745;'>" + res.validation + "</span></div>";
}
if (res.execution_step) {
html += "<div style='margin: 5px 0;'><b>Execution Step:</b> " + res.execution_step + "</div>";
}
// Show full details in collapsed format
html += "<details style='margin-top: 8px;'>";
html += "<summary style='cursor: pointer; color: #666;'>View Full Details</summary>";
html += "<pre style='background: #f1f1f1; padding: 8px; border-radius: 3px; margin: 5px 0; font-size: 12px;'>" + JSON.stringify(res, null, 2) + "</pre>";
html += "</details>";
} else {
html += "<span style='color: #28a745;'>" + res + "</span>";
}
html += "</div>";
}
}
// Show execution order summary
if (data.execution_order) {
html += "<div style='margin-top: 15px; padding: 10px; background: #e8f4f8; border-radius: 5px; border-left: 3px solid #17a2b8;'>";
html += "<b>DAG Execution Order:</b> " + data.execution_order.join(' -> ');
html += "</div>";
}
html += "</div>";
} else {
html += "<div style='background: #fff3cd; padding: 10px; border-radius: 5px; color: #856404;'>";
html += "No detailed node results available";
html += "</div>";
}
resultsDiv.innerHTML = html;
})
.catch(err => {
resultsDiv.innerHTML = "Error: " + err;
});
}
function loadDiagram() {
const diagramContainer = document.getElementById('diagram-container');
const diagramImg = document.getElementById('dag-diagram');
diagramContainer.style.display = 'block';
diagramImg.src = '/api/diagram?' + new Date().getTime(); // Add timestamp to prevent caching
diagramImg.onload = function() {
console.log('DAG diagram loaded successfully');
};
diagramImg.onerror = function() {
diagramContainer.innerHTML = '<div style="color: red; padding: 20px;">Failed to load DAG diagram. Make sure Graphviz is installed on the server.</div>';
};
}
</script>
</body>
</html>
`)
@@ -232,7 +597,6 @@ func setupAPI(d *dag.DAG) {
func processTasks(d *dag.DAG) {
fmt.Println("Processing example tasks...")
// Process some example tasks
for i := 0; i < 3; i++ {
taskData := map[string]interface{}{
"id": fmt.Sprintf("task-%d", i),
@@ -245,13 +609,46 @@ func processTasks(d *dag.DAG) {
fmt.Printf("Processing task %d...\n", i)
result := d.Process(context.Background(), payload)
// Show overall result
if result.Error == nil {
fmt.Printf("Task %d completed successfully\n", i)
fmt.Printf("Task %d completed successfully\n", i)
} else {
fmt.Printf("Task %d failed: %v\n", i, result.Error)
fmt.Printf("Task %d failed: %v\n", i, result.Error)
}
// Show per-node results in DAG execution order
if nodeResults, ok := result.Ctx.Value("nodeResults").(map[string]interface{}); ok {
fmt.Println("Node Results (DAG Execution Order):")
// Get the actual execution order from DAG topology
executionOrder := d.TopologicalSort()
nodeLabels := map[string]string{
"start": "Start Node",
"process": "Process Node",
"validate": "Validate Node",
"end": "End Node",
}
stepNum := 1
for _, nodeKey := range executionOrder {
if res, exists := nodeResults[nodeKey]; exists {
label := nodeLabels[nodeKey]
if label == "" {
// Capitalize first letter of node key
if len(nodeKey) > 0 {
label = fmt.Sprintf("%s%s Node", strings.ToUpper(nodeKey[:1]), nodeKey[1:])
} else {
label = fmt.Sprintf("%s Node", nodeKey)
}
}
fmt.Printf(" Step %d - %s: %v\n", stepNum, label, res)
stepNum++
}
}
// Show the execution order
fmt.Printf(" DAG Execution Order: %s\n", strings.Join(executionOrder, " → "))
}
// Small delay between tasks
time.Sleep(200 * time.Millisecond)
}
@@ -259,27 +656,27 @@ func processTasks(d *dag.DAG) {
}
func displayStatistics(d *dag.DAG) {
fmt.Println("\n=== 📊 DAG Statistics ===")
fmt.Println("\n=== DAG Statistics ===")
// Get basic task metrics
metrics := d.GetTaskMetrics()
fmt.Printf("Task Metrics:\n")
fmt.Printf(" Completed: %d\n", metrics.Completed)
fmt.Printf(" Failed: %d\n", metrics.Failed)
fmt.Printf(" ⏸️ Cancelled: %d\n", metrics.Cancelled)
fmt.Printf(" 🔄 Not Started: %d\n", metrics.NotStarted)
fmt.Printf(" Queued: %d\n", metrics.Queued)
fmt.Printf(" Completed: %d\n", metrics.Completed)
fmt.Printf(" Failed: %d\n", metrics.Failed)
fmt.Printf(" Cancelled: %d\n", metrics.Cancelled)
fmt.Printf(" Not Started: %d\n", metrics.NotStarted)
fmt.Printf(" Queued: %d\n", metrics.Queued)
// Get DAG information
fmt.Printf("\nDAG Information:\n")
fmt.Printf(" 📛 Name: %s\n", d.GetType())
fmt.Printf(" 🔑 Key: %s\n", d.GetKey())
fmt.Printf(" Name: %s\n", d.GetType())
fmt.Printf(" Key: %s\n", d.GetKey())
// Check if DAG is ready
if d.IsReady() {
fmt.Printf(" 📊 Status: Ready\n")
fmt.Printf(" Status: Ready\n")
} else {
fmt.Printf(" 📊 Status: Not Ready\n")
fmt.Printf(" Status: Not Ready\n")
}
fmt.Println("\n=== End Statistics ===\n")

View File

@@ -1,245 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
// ExampleProcessor implements a simple processor
type ExampleProcessor struct {
name string
}
func (p *ExampleProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("Processing task %s in node %s\n", task.ID, p.name)
// Simulate some work
time.Sleep(100 * time.Millisecond)
return mq.Result{
TaskID: task.ID,
Status: mq.Completed,
Payload: task.Payload,
Ctx: ctx,
}
}
func (p *ExampleProcessor) Consume(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Pause(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Resume(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Stop(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Close() error { return nil }
func (p *ExampleProcessor) GetKey() string { return p.name }
func (p *ExampleProcessor) SetKey(key string) { p.name = key }
func (p *ExampleProcessor) GetType() string { return "example" }
func main() {
// Create a new DAG with enhanced features
d := dag.NewDAG("enhanced-example", "example", finalResultCallback)
// Configure enhanced features
setupEnhancedFeatures(d)
// Build the DAG
buildDAG(d)
// Validate the DAG using the validator
validator := d.GetValidator()
if err := validator.ValidateStructure(); err != nil {
log.Fatalf("DAG validation failed: %v", err)
}
// Start monitoring
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if monitor := d.GetMonitor(); monitor != nil {
monitor.Start(ctx)
defer monitor.Stop()
}
// Set up API endpoints
setupAPI(d)
// Process some tasks
processTasks(d)
// Display statistics
displayStatistics(d)
// Start HTTP server for API
fmt.Println("Starting HTTP server on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func finalResultCallback(taskID string, result mq.Result) {
fmt.Printf("Task %s completed with status: %s\n", taskID, result.Status)
}
func setupEnhancedFeatures(d *dag.DAG) {
// For now, just use basic configuration since enhanced methods aren't implemented yet
fmt.Println("Setting up enhanced features...")
// We'll use the basic DAG functionality for this demo
// Enhanced features will be added as they become available
}
func buildDAG(d *dag.DAG) {
// Add nodes with enhanced features - using a linear flow to avoid cycles
d.AddNode(dag.Function, "Start Node", "start", &ExampleProcessor{name: "start"}, true)
d.AddNode(dag.Function, "Process Node", "process", &ExampleProcessor{name: "process"})
d.AddNode(dag.Function, "Validate Node", "validate", &ExampleProcessor{name: "validate"})
d.AddNode(dag.Function, "Retry Node", "retry", &ExampleProcessor{name: "retry"})
d.AddNode(dag.Function, "End Node", "end", &ExampleProcessor{name: "end"})
// Add linear edges to avoid cycles
d.AddEdge(dag.Simple, "start-to-process", "start", "process")
d.AddEdge(dag.Simple, "process-to-validate", "process", "validate")
// Add conditional edges without creating cycles
d.AddCondition("validate", map[string]string{
"success": "end",
"retry": "retry",
})
// Retry node goes to end (no back-loop to avoid cycle)
d.AddEdge(dag.Simple, "retry-to-end", "retry", "end")
}
func setupAPI(d *dag.DAG) {
// Set up enhanced API endpoints
apiHandler := dag.NewEnhancedAPIHandler(d)
apiHandler.RegisterRoutes(http.DefaultServeMux)
// Add custom endpoint
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
fmt.Fprintf(w, `
<!DOCTYPE html>
<html>
<head>
<title>Enhanced DAG Dashboard</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.section { margin: 20px 0; padding: 20px; border: 1px solid #ddd; }
.endpoint { margin: 10px 0; }
.method { color: #007acc; font-weight: bold; }
</style>
</head>
<body>
<h1>Enhanced DAG Dashboard</h1>
<div class="section">
<h2>Monitoring Endpoints</h2>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/metrics">/api/dag/metrics</a> - Get monitoring metrics</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/node-stats">/api/dag/node-stats</a> - Get node statistics</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/health">/api/dag/health</a> - Get health status</div>
</div>
<div class="section">
<h2>Management Endpoints</h2>
<div class="endpoint"><span class="method">POST</span> /api/dag/validate - Validate DAG structure</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/topology">/api/dag/topology</a> - Get topological order</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/critical-path">/api/dag/critical-path</a> - Get critical path</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/statistics">/api/dag/statistics</a> - Get DAG statistics</div>
</div>
<div class="section">
<h2>Configuration Endpoints</h2>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/config">/api/dag/config</a> - Get configuration</div>
<div class="endpoint"><span class="method">PUT</span> /api/dag/config - Update configuration</div>
<div class="endpoint"><span class="method">POST</span> /api/dag/rate-limit - Set rate limits</div>
</div>
<div class="section">
<h2>Performance Endpoints</h2>
<div class="endpoint"><span class="method">POST</span> /api/dag/optimize - Optimize performance</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/circuit-breaker">/api/dag/circuit-breaker</a> - Get circuit breaker status</div>
<div class="endpoint"><span class="method">POST</span> /api/dag/cache/clear - Clear cache</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/cache/stats">/api/dag/cache/stats</a> - Get cache statistics</div>
</div>
</body>
</html>
`)
})
}
func processTasks(d *dag.DAG) {
// Process some example tasks
for i := 0; i < 5; i++ {
taskData := map[string]interface{}{
"id": fmt.Sprintf("task-%d", i),
"payload": fmt.Sprintf("data-%d", i),
}
payload, _ := json.Marshal(taskData)
// Start a transaction for the task
taskID := fmt.Sprintf("task-%d", i)
tx := d.BeginTransaction(taskID)
// Process the task
result := d.Process(context.Background(), payload)
// Commit or rollback based on result
if result.Error == nil {
if tx != nil {
d.CommitTransaction(tx.ID)
}
fmt.Printf("Task %s completed successfully\n", taskID)
} else {
if tx != nil {
d.RollbackTransaction(tx.ID)
}
fmt.Printf("Task %s failed: %v\n", taskID, result.Error)
}
// Small delay between tasks
time.Sleep(100 * time.Millisecond)
}
}
func displayStatistics(d *dag.DAG) {
fmt.Println("\n=== DAG Statistics ===")
// Get task metrics
metrics := d.GetTaskMetrics()
fmt.Printf("Task Metrics:\n")
fmt.Printf(" Completed: %d\n", metrics.Completed)
fmt.Printf(" Failed: %d\n", metrics.Failed)
fmt.Printf(" Cancelled: %d\n", metrics.Cancelled)
// Get monitoring metrics
if monitoringMetrics := d.GetMonitoringMetrics(); monitoringMetrics != nil {
fmt.Printf("\nMonitoring Metrics:\n")
fmt.Printf(" Total Tasks: %d\n", monitoringMetrics.TasksTotal)
fmt.Printf(" Tasks in Progress: %d\n", monitoringMetrics.TasksInProgress)
fmt.Printf(" Average Execution Time: %v\n", monitoringMetrics.AverageExecutionTime)
}
// Get DAG statistics
dagStats := d.GetDAGStatistics()
fmt.Printf("\nDAG Structure:\n")
for key, value := range dagStats {
fmt.Printf(" %s: %v\n", key, value)
}
// Get topological order
if topology, err := d.GetTopologicalOrder(); err == nil {
fmt.Printf("\nTopological Order: %v\n", topology)
}
// Get critical path
if path, err := d.GetCriticalPath(); err == nil {
fmt.Printf("Critical Path: %v\n", path)
}
fmt.Println("\n=== End Statistics ===\n")
}

View File

@@ -1,304 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
// ExampleProcessor implements a simple processor
type ExampleProcessor struct {
name string
}
func (p *ExampleProcessor) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Printf("Processing task %s in node %s\n", task.ID, p.name)
// Simulate some work
time.Sleep(100 * time.Millisecond)
return mq.Result{
TaskID: task.ID,
Status: mq.Completed,
Payload: task.Payload,
Ctx: ctx,
}
}
func (p *ExampleProcessor) Consume(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Pause(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Resume(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Stop(ctx context.Context) error { return nil }
func (p *ExampleProcessor) Close() error { return nil }
func (p *ExampleProcessor) GetKey() string { return p.name }
func (p *ExampleProcessor) SetKey(key string) { p.name = key }
func (p *ExampleProcessor) GetType() string { return "example" }
func main() {
// Create a new DAG with enhanced features
dag := dag.NewDAG("enhanced-example", "example", finalResultCallback)
// Configure enhanced features
setupEnhancedFeatures(dag)
// Build the DAG
buildDAG(dag)
// Validate the DAG
if err := dag.ValidateDAG(); err != nil {
log.Fatalf("DAG validation failed: %v", err)
}
// Start monitoring
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dag.StartMonitoring(ctx)
defer dag.StopMonitoring()
// Set up API endpoints
setupAPI(dag)
// Process some tasks
processTasks(dag)
// Display statistics
displayStatistics(dag)
// Start HTTP server for API
fmt.Println("Starting HTTP server on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
func finalResultCallback(taskID string, result mq.Result) {
fmt.Printf("Task %s completed with status: %s\n", taskID, result.Status)
}
func setupEnhancedFeatures(d *dag.DAG) {
// Configure retry settings
retryConfig := &dag.RetryConfig{
MaxRetries: 3,
InitialDelay: 1 * time.Second,
MaxDelay: 10 * time.Second,
BackoffFactor: 2.0,
Jitter: true,
}
d.SetRetryConfig(retryConfig)
// Configure rate limiting
d.SetRateLimit("process", 10.0, 5) // 10 requests per second, burst of 5
d.SetRateLimit("validate", 5.0, 2) // 5 requests per second, burst of 2
// Configure monitoring thresholds
alertThresholds := &dag.AlertThresholds{
MaxFailureRate: 0.1, // 10%
MaxExecutionTime: 5 * time.Minute,
MaxTasksInProgress: 100,
MinSuccessRate: 0.9, // 90%
MaxNodeFailures: 5,
HealthCheckInterval: 30 * time.Second,
}
d.SetAlertThresholds(alertThresholds)
// Add alert handler
alertHandler := dag.NewAlertWebhookHandler(d.Logger())
d.AddAlertHandler(alertHandler)
// Configure webhook manager
httpClient := dag.NewSimpleHTTPClient(30 * time.Second)
webhookManager := dag.NewWebhookManager(httpClient, d.Logger())
// Add webhook for task completion events
webhookConfig := dag.WebhookConfig{
URL: "http://localhost:9090/webhook",
Headers: map[string]string{"Authorization": "Bearer token123"},
Timeout: 30 * time.Second,
RetryCount: 3,
Events: []string{"task_completed", "task_failed"},
}
webhookManager.AddWebhook("task_completed", webhookConfig)
d.SetWebhookManager(webhookManager)
// Update DAG configuration
config := &dag.DAGConfig{
MaxConcurrentTasks: 50,
TaskTimeout: 2 * time.Minute,
NodeTimeout: 1 * time.Minute,
MonitoringEnabled: true,
AlertingEnabled: true,
CleanupInterval: 5 * time.Minute,
TransactionTimeout: 3 * time.Minute,
BatchProcessingEnabled: true,
BatchSize: 20,
BatchTimeout: 5 * time.Second,
}
if err := d.UpdateConfiguration(config); err != nil {
log.Printf("Failed to update configuration: %v", err)
}
}
func buildDAG(d *dag.DAG) {
// Create processors with retry capabilities
retryConfig := &dag.RetryConfig{
MaxRetries: 2,
InitialDelay: 500 * time.Millisecond,
MaxDelay: 5 * time.Second,
BackoffFactor: 2.0,
}
// Add nodes with enhanced features
d.AddNodeWithRetry(dag.Function, "Start Node", "start", &ExampleProcessor{name: "start"}, retryConfig, true)
d.AddNodeWithRetry(dag.Function, "Process Node", "process", &ExampleProcessor{name: "process"}, retryConfig)
d.AddNodeWithRetry(dag.Function, "Validate Node", "validate", &ExampleProcessor{name: "validate"}, retryConfig)
d.AddNodeWithRetry(dag.Function, "End Node", "end", &ExampleProcessor{name: "end"}, retryConfig)
// Add edges
d.AddEdge(dag.Simple, "start-to-process", "start", "process")
d.AddEdge(dag.Simple, "process-to-validate", "process", "validate")
d.AddEdge(dag.Simple, "validate-to-end", "validate", "end")
// Add conditional edges
d.AddCondition("validate", map[string]string{
"success": "end",
"retry": "process",
})
}
func setupAPI(d *dag.DAG) {
// Set up enhanced API endpoints
apiHandler := dag.NewEnhancedAPIHandler(d)
apiHandler.RegisterRoutes(http.DefaultServeMux)
// Add custom endpoint
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
fmt.Fprintf(w, `
<!DOCTYPE html>
<html>
<head>
<title>Enhanced DAG Dashboard</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.section { margin: 20px 0; padding: 20px; border: 1px solid #ddd; }
.endpoint { margin: 10px 0; }
.method { color: #007acc; font-weight: bold; }
</style>
</head>
<body>
<h1>Enhanced DAG Dashboard</h1>
<div class="section">
<h2>Monitoring Endpoints</h2>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/metrics">/api/dag/metrics</a> - Get monitoring metrics</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/node-stats">/api/dag/node-stats</a> - Get node statistics</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/health">/api/dag/health</a> - Get health status</div>
</div>
<div class="section">
<h2>Management Endpoints</h2>
<div class="endpoint"><span class="method">POST</span> /api/dag/validate - Validate DAG structure</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/topology">/api/dag/topology</a> - Get topological order</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/critical-path">/api/dag/critical-path</a> - Get critical path</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/statistics">/api/dag/statistics</a> - Get DAG statistics</div>
</div>
<div class="section">
<h2>Configuration Endpoints</h2>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/config">/api/dag/config</a> - Get configuration</div>
<div class="endpoint"><span class="method">PUT</span> /api/dag/config - Update configuration</div>
<div class="endpoint"><span class="method">POST</span> /api/dag/rate-limit - Set rate limits</div>
</div>
<div class="section">
<h2>Performance Endpoints</h2>
<div class="endpoint"><span class="method">POST</span> /api/dag/optimize - Optimize performance</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/circuit-breaker">/api/dag/circuit-breaker</a> - Get circuit breaker status</div>
<div class="endpoint"><span class="method">POST</span> /api/dag/cache/clear - Clear cache</div>
<div class="endpoint"><span class="method">GET</span> <a href="/api/dag/cache/stats">/api/dag/cache/stats</a> - Get cache statistics</div>
</div>
</body>
</html>
`)
})
}
func processTasks(d *dag.DAG) {
// Process some example tasks
for i := 0; i < 5; i++ {
taskData := map[string]interface{}{
"id": fmt.Sprintf("task-%d", i),
"payload": fmt.Sprintf("data-%d", i),
}
payload, _ := json.Marshal(taskData)
// Start a transaction for the task
taskID := fmt.Sprintf("task-%d", i)
tx := d.BeginTransaction(taskID)
// Process the task
result := d.Process(context.Background(), payload)
// Commit or rollback based on result
if result.Error == nil {
if tx != nil {
d.CommitTransaction(tx.ID)
}
fmt.Printf("Task %s completed successfully\n", taskID)
} else {
if tx != nil {
d.RollbackTransaction(tx.ID)
}
fmt.Printf("Task %s failed: %v\n", taskID, result.Error)
}
// Small delay between tasks
time.Sleep(100 * time.Millisecond)
}
}
func displayStatistics(d *dag.DAG) {
fmt.Println("\n=== DAG Statistics ===")
// Get task metrics
metrics := d.GetTaskMetrics()
fmt.Printf("Task Metrics:\n")
fmt.Printf(" Completed: %d\n", metrics.Completed)
fmt.Printf(" Failed: %d\n", metrics.Failed)
fmt.Printf(" Cancelled: %d\n", metrics.Cancelled)
// Get monitoring metrics
if monitoringMetrics := d.GetMonitoringMetrics(); monitoringMetrics != nil {
fmt.Printf("\nMonitoring Metrics:\n")
fmt.Printf(" Total Tasks: %d\n", monitoringMetrics.TasksTotal)
fmt.Printf(" Tasks in Progress: %d\n", monitoringMetrics.TasksInProgress)
fmt.Printf(" Average Execution Time: %v\n", monitoringMetrics.AverageExecutionTime)
}
// Get DAG statistics
dagStats := d.GetDAGStatistics()
fmt.Printf("\nDAG Structure:\n")
for key, value := range dagStats {
fmt.Printf(" %s: %v\n", key, value)
}
// Get topological order
if topology, err := d.GetTopologicalOrder(); err == nil {
fmt.Printf("\nTopological Order: %v\n", topology)
}
// Get critical path
if path, err := d.GetCriticalPath(); err == nil {
fmt.Printf("Critical Path: %v\n", path)
}
fmt.Println("\n=== End Statistics ===\n")
}