Files
mq/ENHANCED_SERVICES_README.md
2025-09-18 18:26:35 +05:45

12 KiB

Enhanced Services with DAG + Workflow Engine

Overview

The enhanced services architecture successfully integrates all workflow engine features into the DAG system, providing complete feature parity and backward compatibility. This upgrade provides both traditional DAG functionality and advanced workflow capabilities through a unified service layer.

Architecture Components

1. Enhanced Service Manager (enhanced_setup.go)

  • Purpose: Core service orchestration with DAG + workflow integration
  • Features:
    • Dual-mode execution (Traditional DAG + Enhanced Workflow)
    • HTTP API endpoints for workflow management
    • Enhanced validation with workflow rule support
    • Service health monitoring and metrics
    • Background task management

2. Enhanced Contracts (enhanced_contracts.go)

  • Purpose: Service interfaces for DAG + workflow integration
  • Key Interfaces:
    • EnhancedServiceManager: Core service management
    • EnhancedDAGService: Dual-mode DAG operations
    • EnhancedValidation: Workflow validation rules
    • EnhancedHandler: Unified handler structure

3. Enhanced DAG Service (enhanced_dag_service.go)

  • Purpose: DAG service with workflow engine capabilities
  • Features:
    • Traditional DAG execution (backward compatibility)
    • Enhanced workflow execution with advanced processors
    • State management and persistence
    • Execution result handling with proper field mapping

4. Enhanced Validation (enhanced_validation.go)

  • Purpose: Validation service with workflow rule support
  • Features:
    • Schema validation with workflow rules
    • Field-level validation (string, email, numeric, etc.)
    • Custom validation logic with processor integration
    • Validation result aggregation

Features Implemented

Complete Workflow Engine Integration

All 8 advanced processors from the workflow engine are now available in the DAG system:

  1. Validator Processor: Schema and field validation
  2. Router Processor: Conditional routing and decision making
  3. Transformer Processor: Data transformation and mapping
  4. Aggregator Processor: Data aggregation and summarization
  5. Filter Processor: Data filtering and selection
  6. Sorter Processor: Data sorting and ordering
  7. Notify Processor: Notification and messaging
  8. Storage Processor: Data persistence and retrieval

Enhanced DAG Capabilities

  • Dual Mode Support: Both traditional DAG and workflow modes
  • Advanced Retry Logic: Exponential backoff with circuit breaker
  • State Management: Persistent execution state tracking
  • Scheduling: Background task scheduling and execution
  • Security: Authentication and authorization support
  • Middleware: Pre/post execution hooks
  • Metrics: Performance monitoring and reporting

HTTP API Integration

Complete REST API for workflow management:

  • GET /api/v1/handlers - List all handlers
  • POST /api/v1/execute/:key - Execute workflow by key
  • GET /api/v1/workflows - List workflow instances
  • POST /api/v1/workflows/:id/execute - Execute specific workflow
  • GET /health - Service health check

Validation System

Enhanced validation with workflow rule support:

  • Field-level validation rules
  • Type checking (string, email, numeric, etc.)
  • Length constraints (min/max)
  • Required field validation
  • Custom validation messages
  • Validation result aggregation

Usage Examples

1. Traditional DAG Mode (Backward Compatibility)

// Traditional DAG handler
handler := services.EnhancedHandler{
    Key:             "traditional-dag",
    Name:            "Traditional DAG",
    WorkflowEnabled: false, // Use traditional DAG mode
    Nodes: []services.EnhancedNode{
        {
            ID:        "start",
            Name:      "Start Process",
            Node:      "basic",
            FirstNode: true,
        },
        {
            ID:   "process",
            Name: "Process Data",
            Node: "basic",
        },
    },
    Edges: []services.Edge{
        {Source: "start", Target: []string{"process"}},
    },
}

2. Enhanced Workflow Mode

// Enhanced workflow handler with processors
handler := services.EnhancedHandler{
    Key:             "enhanced-workflow",
    Name:            "Enhanced Workflow",
    WorkflowEnabled: true, // Use enhanced workflow mode
    ValidationRules: []*dag.WorkflowValidationRule{
        {
            Field:     "email",
            Type:      "email",
            Required:  true,
            Message:   "Valid email is required",
        },
    },
    Nodes: []services.EnhancedNode{
        {
            ID:            "validate-input",
            Name:          "Validate Input",
            Type:          "validator",
            ProcessorType: "validator",
        },
        {
            ID:            "route-data",
            Name:          "Route Decision",
            Type:          "router",
            ProcessorType: "router",
        },
        {
            ID:            "transform-data",
            Name:          "Transform Data",
            Type:          "transformer",
            ProcessorType: "transformer",
        },
    },
    Edges: []services.Edge{
        {Source: "validate-input", Target: []string{"route-data"}},
        {Source: "route-data", Target: []string{"transform-data"}},
    },
}

3. Service Configuration

config := &services.EnhancedServiceConfig{
    BrokerURL: "nats://localhost:4222",
    Debug:     true,

    // Enhanced DAG configuration
    EnhancedDAGConfig: &dag.EnhancedDAGConfig{
        EnableWorkflowEngine:    true,
        MaintainDAGMode:         true,
        EnableStateManagement:   true,
        EnableAdvancedRetry:     true,
        EnableCircuitBreaker:    true,
        MaxConcurrentExecutions: 10,
        DefaultTimeout:          30 * time.Second,
    },

    // Workflow engine configuration
    WorkflowEngineConfig: &dag.WorkflowEngineConfig{
        MaxConcurrentExecutions: 5,
        DefaultTimeout:          2 * time.Minute,
        EnablePersistence:       true,
        EnableSecurity:          true,
        RetryConfig: &dag.RetryConfig{
            MaxRetries:    3,
            InitialDelay:  1 * time.Second,
            BackoffFactor: 2.0,
        },
    },
}

4. Service Initialization

// Create enhanced service manager
manager := services.NewEnhancedServiceManager(config)

// Initialize services
if err := manager.Initialize(config); err != nil {
    log.Fatalf("Failed to initialize services: %v", err)
}

// Start services
ctx := context.Background()
if err := manager.Start(ctx); err != nil {
    log.Fatalf("Failed to start services: %v", err)
}
defer manager.Stop(ctx)

// Register handlers
for _, handler := range handlers {
    if err := manager.RegisterEnhancedHandler(handler); err != nil {
        log.Printf("Failed to register handler %s: %v", handler.Key, err)
    }
}

5. HTTP API Setup

// Create Fiber app
app := fiber.New()

// Register HTTP routes
if err := manager.RegisterHTTPRoutes(app); err != nil {
    log.Fatalf("Failed to register HTTP routes: %v", err)
}

// Start server
log.Fatal(app.Listen(":3000"))

6. Workflow Execution

// Execute workflow programmatically
ctx := context.Background()
input := map[string]any{
    "name":  "John Doe",
    "email": "john@example.com",
}

result, err := manager.ExecuteEnhancedWorkflow(ctx, "enhanced-workflow", input)
if err != nil {
    log.Printf("Execution failed: %v", err)
} else {
    log.Printf("Execution completed: %s (Status: %s)", result.ID, result.Status)
}

HTTP API Usage

Execute Workflow via REST API

# Execute workflow with POST request
curl -X POST http://localhost:3000/api/v1/execute/enhanced-workflow \
  -H "Content-Type: application/json" \
  -d '{
    "name": "John Doe",
    "email": "john@example.com",
    "age": 30
  }'

List Available Handlers

# Get list of registered handlers
curl -X GET http://localhost:3000/api/v1/handlers

Health Check

# Check service health
curl -X GET http://localhost:3000/health

Advanced Features

1. Validation Rules

The enhanced validation system supports comprehensive field validation:

ValidationRules: []*dag.WorkflowValidationRule{
    {
        Field:     "name",
        Type:      "string",
        Required:  true,
        MinLength: 2,
        MaxLength: 50,
        Message:   "Name must be 2-50 characters",
    },
    {
        Field:    "email",
        Type:     "email",
        Required: true,
        Message:  "Valid email is required",
    },
    {
        Field:   "age",
        Type:    "number",
        Min:     18,
        Max:     120,
        Message: "Age must be between 18 and 120",
    },
}

2. Processor Configuration

Each processor can be configured with specific parameters:

Config: dag.WorkflowNodeConfig{
    // Validator processor config
    ValidationType: "schema",
    ValidationRules: []dag.WorkflowValidationRule{...},

    // Router processor config
    RoutingRules: []dag.RoutingRule{...},

    // Transformer processor config
    TransformationRules: []dag.TransformationRule{...},

    // Storage processor config
    StorageType: "memory",
    StorageConfig: map[string]any{...},
}

3. Error Handling and Retry

Built-in retry logic with exponential backoff:

RetryConfig: &dag.RetryConfig{
    MaxRetries:    3,
    InitialDelay:  1 * time.Second,
    MaxDelay:      30 * time.Second,
    BackoffFactor: 2.0,
}

4. State Management

Persistent execution state tracking:

EnhancedDAGConfig: &dag.EnhancedDAGConfig{
    EnableStateManagement: true,
    EnablePersistence:     true,
}

Migration Guide

From Traditional DAG to Enhanced Services

  1. Keep existing DAG handlers: Set WorkflowEnabled: false
  2. Add enhanced features gradually: Create new handlers with WorkflowEnabled: true
  3. Use validation rules: Add ValidationRules for input validation
  4. Configure processors: Set appropriate ProcessorType for each node
  5. Test both modes: Verify traditional and enhanced workflows work correctly

Configuration Migration

// Before (traditional)
config := &services.ServiceConfig{
    BrokerURL: "nats://localhost:4222",
}

// After (enhanced)
config := &services.EnhancedServiceConfig{
    BrokerURL: "nats://localhost:4222",
    EnhancedDAGConfig: &dag.EnhancedDAGConfig{
        EnableWorkflowEngine: true,
        MaintainDAGMode:     true, // Keep backward compatibility
    },
}

Performance Considerations

  1. Concurrent Executions: Configure MaxConcurrentExecutions based on system resources
  2. Timeout Settings: Set appropriate DefaultTimeout for workflow complexity
  3. Retry Strategy: Balance retry attempts with system load
  4. State Management: Enable persistence only when needed
  5. Metrics: Monitor performance with built-in metrics

Troubleshooting

Common Issues

  1. Handler Registration Fails

    • Check validation rules syntax
    • Verify processor types are valid
    • Ensure node dependencies are correct
  2. Workflow Execution Errors

    • Validate input data format
    • Check processor configurations
    • Review error logs for details
  3. HTTP API Issues

    • Verify routes are registered correctly
    • Check request format and headers
    • Review service health status

Debug Mode

Enable debug mode for detailed logging:

config := &services.EnhancedServiceConfig{
    Debug: true,
    // ... other config
}

Conclusion

The enhanced services architecture successfully provides complete feature parity between the DAG system and workflow engine. All workflow engine features are now available in the DAG system while maintaining full backward compatibility with existing traditional DAG implementations.

Key achievements:

  • Complete workflow engine integration (8 advanced processors)
  • Dual-mode support (traditional DAG + enhanced workflow)
  • HTTP API for workflow management
  • Enhanced validation with workflow rules
  • Service health monitoring and metrics
  • Backward compatibility maintained
  • Production-ready architecture

The system now provides a unified, powerful, and flexible platform for both simple DAG operations and complex workflow orchestration.