This commit is contained in:
sujit
2025-09-18 07:42:17 +05:45
parent 565348f185
commit c3db62d13b
13 changed files with 6592 additions and 0 deletions

View File

@@ -0,0 +1,961 @@
package workflow
import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"html/template"
"regexp"
"strconv"
"strings"
"time"
)
// SubDAGProcessor handles sub-workflow execution
type SubDAGProcessor struct{}
func (p *SubDAGProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
subWorkflowID := config.SubWorkflowID
if subWorkflowID == "" {
return &ProcessingResult{
Success: false,
Error: "sub_workflow_id not specified",
}, nil
}
// Apply input mapping
subInput := make(map[string]interface{})
for subKey, sourceKey := range config.InputMapping {
if value, exists := input.Data[sourceKey]; exists {
subInput[subKey] = value
}
}
// Simulate sub-workflow execution (in real implementation, this would trigger actual sub-workflow)
time.Sleep(100 * time.Millisecond)
// Mock sub-workflow output
subOutput := map[string]interface{}{
"sub_workflow_result": "completed",
"sub_workflow_id": subWorkflowID,
"processed_data": subInput,
}
// Apply output mapping
result := make(map[string]interface{})
for targetKey, subKey := range config.OutputMapping {
if value, exists := subOutput[subKey]; exists {
result[targetKey] = value
}
}
// If no output mapping specified, return all sub-workflow output
if len(config.OutputMapping) == 0 {
result = subOutput
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("Sub-workflow %s completed successfully", subWorkflowID),
}, nil
}
// HTMLProcessor handles HTML page generation
type HTMLProcessor struct{}
func (p *HTMLProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
templateStr := config.Template
if templateStr == "" {
return &ProcessingResult{
Success: false,
Error: "template not specified",
}, nil
}
// Parse template
tmpl, err := template.New("html_page").Parse(templateStr)
if err != nil {
return &ProcessingResult{
Success: false,
Error: fmt.Sprintf("failed to parse template: %v", err),
}, nil
}
// Prepare template data
templateData := make(map[string]interface{})
// Add data from input
for key, value := range input.Data {
templateData[key] = value
}
// Add template-specific data from config
for key, value := range config.TemplateData {
templateData[key] = value
}
// Add current timestamp
templateData["timestamp"] = time.Now().Format("2006-01-02 15:04:05")
// Execute template
var htmlBuffer strings.Builder
if err := tmpl.Execute(&htmlBuffer, templateData); err != nil {
return &ProcessingResult{
Success: false,
Error: fmt.Sprintf("failed to execute template: %v", err),
}, nil
}
html := htmlBuffer.String()
result := map[string]interface{}{
"html_content": html,
"template": templateStr,
"data_used": templateData,
}
// If output path is specified, simulate file writing
if config.OutputPath != "" {
result["output_path"] = config.OutputPath
result["file_written"] = true
}
return &ProcessingResult{
Success: true,
Data: result,
Message: "HTML page generated successfully",
}, nil
}
// SMSProcessor handles SMS operations
type SMSProcessor struct{}
func (p *SMSProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
provider := config.Provider
if provider == "" {
provider = "default"
}
from := config.From
if from == "" {
return &ProcessingResult{
Success: false,
Error: "from number not specified",
}, nil
}
if len(config.SMSTo) == 0 {
return &ProcessingResult{
Success: false,
Error: "recipient numbers not specified",
}, nil
}
message := config.Message
if message == "" {
return &ProcessingResult{
Success: false,
Error: "message not specified",
}, nil
}
// Process message template with input data
processedMessage := p.processMessageTemplate(message, input.Data)
// Validate phone numbers
validRecipients := []string{}
invalidRecipients := []string{}
for _, recipient := range config.SMSTo {
if p.isValidPhoneNumber(recipient) {
validRecipients = append(validRecipients, recipient)
} else {
invalidRecipients = append(invalidRecipients, recipient)
}
}
if len(validRecipients) == 0 {
return &ProcessingResult{
Success: false,
Error: "no valid recipient numbers",
}, nil
}
// Simulate SMS sending
time.Sleep(50 * time.Millisecond)
// Mock SMS sending results
results := []map[string]interface{}{}
for _, recipient := range validRecipients {
results = append(results, map[string]interface{}{
"recipient": recipient,
"status": "sent",
"message_id": fmt.Sprintf("msg_%d", time.Now().UnixNano()),
"provider": provider,
})
}
result := map[string]interface{}{
"provider": provider,
"from": from,
"message": processedMessage,
"valid_recipients": validRecipients,
"invalid_recipients": invalidRecipients,
"sent_count": len(validRecipients),
"failed_count": len(invalidRecipients),
"results": results,
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("SMS sent to %d recipients via %s", len(validRecipients), provider),
}, nil
}
func (p *SMSProcessor) processMessageTemplate(message string, data map[string]interface{}) string {
result := message
for key, value := range data {
placeholder := fmt.Sprintf("{{%s}}", key)
result = strings.ReplaceAll(result, placeholder, fmt.Sprintf("%v", value))
}
return result
}
func (p *SMSProcessor) isValidPhoneNumber(phone string) bool {
// Simple phone number validation (E.164 format)
phoneRegex := regexp.MustCompile(`^\+[1-9]\d{1,14}$`)
return phoneRegex.MatchString(phone)
}
// AuthProcessor handles authentication operations
type AuthProcessor struct{}
func (p *AuthProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
authType := config.AuthType
if authType == "" {
authType = "jwt"
}
credentials := config.Credentials
if credentials == nil {
return &ProcessingResult{
Success: false,
Error: "credentials not provided",
}, nil
}
switch authType {
case "jwt":
return p.processJWTAuth(input, credentials, config.TokenExpiry)
case "basic":
return p.processBasicAuth(input, credentials)
case "api_key":
return p.processAPIKeyAuth(input, credentials)
default:
return &ProcessingResult{
Success: false,
Error: fmt.Sprintf("unsupported auth type: %s", authType),
}, nil
}
}
func (p *AuthProcessor) processJWTAuth(input ProcessingContext, credentials map[string]string, expiry time.Duration) (*ProcessingResult, error) {
username, hasUsername := credentials["username"]
password, hasPassword := credentials["password"]
if !hasUsername || !hasPassword {
return &ProcessingResult{
Success: false,
Error: "username and password required for JWT auth",
}, nil
}
// Simulate authentication (in real implementation, verify against user store)
if username == "admin" && password == "password" {
// Generate mock JWT token
token := fmt.Sprintf("jwt.token.%d", time.Now().Unix())
expiresAt := time.Now().Add(expiry)
if expiry == 0 {
expiresAt = time.Now().Add(24 * time.Hour)
}
result := map[string]interface{}{
"auth_type": "jwt",
"token": token,
"expires_at": expiresAt,
"username": username,
"permissions": []string{"read", "write", "admin"},
}
return &ProcessingResult{
Success: true,
Data: result,
Message: "JWT authentication successful",
}, nil
}
return &ProcessingResult{
Success: false,
Error: "invalid credentials",
}, nil
}
func (p *AuthProcessor) processBasicAuth(input ProcessingContext, credentials map[string]string) (*ProcessingResult, error) {
username, hasUsername := credentials["username"]
password, hasPassword := credentials["password"]
if !hasUsername || !hasPassword {
return &ProcessingResult{
Success: false,
Error: "username and password required for basic auth",
}, nil
}
// Simulate basic auth
if username != "" && password != "" {
result := map[string]interface{}{
"auth_type": "basic",
"username": username,
"status": "authenticated",
}
return &ProcessingResult{
Success: true,
Data: result,
Message: "Basic authentication successful",
}, nil
}
return &ProcessingResult{
Success: false,
Error: "invalid credentials",
}, nil
}
func (p *AuthProcessor) processAPIKeyAuth(input ProcessingContext, credentials map[string]string) (*ProcessingResult, error) {
apiKey, hasAPIKey := credentials["api_key"]
if !hasAPIKey {
return &ProcessingResult{
Success: false,
Error: "api_key required for API key auth",
}, nil
}
// Simulate API key validation
if apiKey != "" && len(apiKey) >= 10 {
result := map[string]interface{}{
"auth_type": "api_key",
"api_key": apiKey[:6] + "...", // Partially masked
"status": "authenticated",
}
return &ProcessingResult{
Success: true,
Data: result,
Message: "API key authentication successful",
}, nil
}
return &ProcessingResult{
Success: false,
Error: "invalid API key",
}, nil
}
// ValidatorProcessor handles data validation
type ValidatorProcessor struct{}
func (p *ValidatorProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
validationType := config.ValidationType
if validationType == "" {
validationType = "rules"
}
validationRules := config.ValidationRules
if len(validationRules) == 0 {
return &ProcessingResult{
Success: false,
Error: "no validation rules specified",
}, nil
}
errors := []string{}
warnings := []string{}
validatedFields := []string{}
for _, rule := range validationRules {
fieldValue, exists := input.Data[rule.Field]
if !exists {
if rule.Required {
errors = append(errors, fmt.Sprintf("required field '%s' is missing", rule.Field))
}
continue
}
// Validate based on rule type
switch rule.Type {
case "string":
if err := p.validateString(fieldValue, rule); err != nil {
errors = append(errors, fmt.Sprintf("field '%s': %s", rule.Field, err.Error()))
} else {
validatedFields = append(validatedFields, rule.Field)
}
case "number":
if err := p.validateNumber(fieldValue, rule); err != nil {
errors = append(errors, fmt.Sprintf("field '%s': %s", rule.Field, err.Error()))
} else {
validatedFields = append(validatedFields, rule.Field)
}
case "email":
if err := p.validateEmail(fieldValue); err != nil {
errors = append(errors, fmt.Sprintf("field '%s': %s", rule.Field, err.Error()))
} else {
validatedFields = append(validatedFields, rule.Field)
}
case "regex":
if err := p.validateRegex(fieldValue, rule.Pattern); err != nil {
errors = append(errors, fmt.Sprintf("field '%s': %s", rule.Field, err.Error()))
} else {
validatedFields = append(validatedFields, rule.Field)
}
default:
warnings = append(warnings, fmt.Sprintf("unknown validation type '%s' for field '%s'", rule.Type, rule.Field))
}
}
success := len(errors) == 0
result := map[string]interface{}{
"validation_type": validationType,
"validated_fields": validatedFields,
"errors": errors,
"warnings": warnings,
"error_count": len(errors),
"warning_count": len(warnings),
"is_valid": success,
}
message := fmt.Sprintf("Validation completed: %d fields validated, %d errors, %d warnings",
len(validatedFields), len(errors), len(warnings))
return &ProcessingResult{
Success: success,
Data: result,
Message: message,
}, nil
}
func (p *ValidatorProcessor) validateString(value interface{}, rule ValidationRule) error {
str, ok := value.(string)
if !ok {
return fmt.Errorf("expected string, got %T", value)
}
if rule.MinLength > 0 && len(str) < int(rule.MinLength) {
return fmt.Errorf("minimum length is %d, got %d", rule.MinLength, len(str))
}
if rule.MaxLength > 0 && len(str) > int(rule.MaxLength) {
return fmt.Errorf("maximum length is %d, got %d", rule.MaxLength, len(str))
}
return nil
}
func (p *ValidatorProcessor) validateNumber(value interface{}, rule ValidationRule) error {
var num float64
switch v := value.(type) {
case int:
num = float64(v)
case int64:
num = float64(v)
case float64:
num = v
case string:
parsed, err := strconv.ParseFloat(v, 64)
if err != nil {
return fmt.Errorf("cannot parse as number: %s", v)
}
num = parsed
default:
return fmt.Errorf("expected number, got %T", value)
}
if rule.Min != nil && num < *rule.Min {
return fmt.Errorf("minimum value is %f, got %f", *rule.Min, num)
}
if rule.Max != nil && num > *rule.Max {
return fmt.Errorf("maximum value is %f, got %f", *rule.Max, num)
}
return nil
}
func (p *ValidatorProcessor) validateEmail(value interface{}) error {
email, ok := value.(string)
if !ok {
return fmt.Errorf("expected string, got %T", value)
}
emailRegex := regexp.MustCompile(`^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$`)
if !emailRegex.MatchString(email) {
return fmt.Errorf("invalid email format")
}
return nil
}
func (p *ValidatorProcessor) validateRegex(value interface{}, pattern string) error {
str, ok := value.(string)
if !ok {
return fmt.Errorf("expected string, got %T", value)
}
regex, err := regexp.Compile(pattern)
if err != nil {
return fmt.Errorf("invalid regex pattern: %s", err.Error())
}
if !regex.MatchString(str) {
return fmt.Errorf("does not match pattern %s", pattern)
}
return nil
}
// RouterProcessor handles conditional routing
type RouterProcessor struct{}
func (p *RouterProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
routingRules := config.RoutingRules
if len(routingRules) == 0 {
return &ProcessingResult{
Success: false,
Error: "no routing rules specified",
}, nil
}
selectedRoutes := []RoutingRule{}
for _, rule := range routingRules {
if p.evaluateRoutingCondition(rule.Condition, input.Data) {
selectedRoutes = append(selectedRoutes, rule)
}
}
if len(selectedRoutes) == 0 {
// Check if there's a default route
for _, rule := range routingRules {
if rule.IsDefault {
selectedRoutes = append(selectedRoutes, rule)
break
}
}
}
result := map[string]interface{}{
"selected_routes": selectedRoutes,
"route_count": len(selectedRoutes),
"routing_data": input.Data,
}
if len(selectedRoutes) == 0 {
return &ProcessingResult{
Success: false,
Data: result,
Error: "no matching routes found",
}, nil
}
message := fmt.Sprintf("Routing completed: %d routes selected", len(selectedRoutes))
return &ProcessingResult{
Success: true,
Data: result,
Message: message,
}, nil
}
func (p *RouterProcessor) evaluateRoutingCondition(condition string, data map[string]interface{}) bool {
// Simple condition evaluation - in real implementation, use expression parser
if condition == "" {
return false
}
// Support simple equality checks
if strings.Contains(condition, "==") {
parts := strings.Split(condition, "==")
if len(parts) == 2 {
field := strings.TrimSpace(parts[0])
expectedValue := strings.TrimSpace(strings.Trim(parts[1], "\"'"))
if value, exists := data[field]; exists {
return fmt.Sprintf("%v", value) == expectedValue
}
}
}
// Support simple greater than checks
if strings.Contains(condition, ">") {
parts := strings.Split(condition, ">")
if len(parts) == 2 {
field := strings.TrimSpace(parts[0])
threshold := strings.TrimSpace(parts[1])
if value, exists := data[field]; exists {
if numValue, ok := value.(float64); ok {
if thresholdValue, err := strconv.ParseFloat(threshold, 64); err == nil {
return numValue > thresholdValue
}
}
}
}
}
return false
}
// StorageProcessor handles data storage operations
type StorageProcessor struct{}
func (p *StorageProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
storageType := config.StorageType
if storageType == "" {
storageType = "memory"
}
operation := config.StorageOperation
if operation == "" {
operation = "store"
}
key := config.StorageKey
if key == "" {
key = fmt.Sprintf("data_%d", time.Now().UnixNano())
}
switch operation {
case "store":
return p.storeData(storageType, key, input.Data)
case "retrieve":
return p.retrieveData(storageType, key)
case "delete":
return p.deleteData(storageType, key)
default:
return &ProcessingResult{
Success: false,
Error: fmt.Sprintf("unsupported storage operation: %s", operation),
}, nil
}
}
func (p *StorageProcessor) storeData(storageType, key string, data map[string]interface{}) (*ProcessingResult, error) {
// Simulate data storage
time.Sleep(10 * time.Millisecond)
result := map[string]interface{}{
"storage_type": storageType,
"operation": "store",
"key": key,
"stored_data": data,
"timestamp": time.Now(),
"size_bytes": len(fmt.Sprintf("%v", data)),
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("Data stored successfully with key: %s", key),
}, nil
}
func (p *StorageProcessor) retrieveData(storageType, key string) (*ProcessingResult, error) {
// Simulate data retrieval
time.Sleep(5 * time.Millisecond)
// Mock retrieved data
retrievedData := map[string]interface{}{
"key": key,
"value": "mock_stored_value",
"timestamp": time.Now().Add(-1 * time.Hour),
}
result := map[string]interface{}{
"storage_type": storageType,
"operation": "retrieve",
"key": key,
"retrieved_data": retrievedData,
"found": true,
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("Data retrieved successfully for key: %s", key),
}, nil
}
func (p *StorageProcessor) deleteData(storageType, key string) (*ProcessingResult, error) {
// Simulate data deletion
time.Sleep(5 * time.Millisecond)
result := map[string]interface{}{
"storage_type": storageType,
"operation": "delete",
"key": key,
"deleted": true,
"timestamp": time.Now(),
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("Data deleted successfully for key: %s", key),
}, nil
}
// NotifyProcessor handles notification operations
type NotifyProcessor struct{}
func (p *NotifyProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
notificationType := config.NotificationType
if notificationType == "" {
notificationType = "email"
}
recipients := config.NotificationRecipients
if len(recipients) == 0 {
return &ProcessingResult{
Success: false,
Error: "no notification recipients specified",
}, nil
}
message := config.NotificationMessage
if message == "" {
message = "Workflow notification"
}
// Process message template with input data
processedMessage := p.processNotificationTemplate(message, input.Data)
switch notificationType {
case "email":
return p.sendEmailNotification(recipients, processedMessage, config)
case "sms":
return p.sendSMSNotification(recipients, processedMessage, config)
case "webhook":
return p.sendWebhookNotification(recipients, processedMessage, input.Data, config)
default:
return &ProcessingResult{
Success: false,
Error: fmt.Sprintf("unsupported notification type: %s", notificationType),
}, nil
}
}
func (p *NotifyProcessor) processNotificationTemplate(message string, data map[string]interface{}) string {
result := message
for key, value := range data {
placeholder := fmt.Sprintf("{{%s}}", key)
result = strings.ReplaceAll(result, placeholder, fmt.Sprintf("%v", value))
}
return result
}
func (p *NotifyProcessor) sendEmailNotification(recipients []string, message string, config NodeConfig) (*ProcessingResult, error) {
// Simulate email sending
time.Sleep(100 * time.Millisecond)
results := []map[string]interface{}{}
for _, recipient := range recipients {
results = append(results, map[string]interface{}{
"recipient": recipient,
"status": "sent",
"type": "email",
"timestamp": time.Now(),
})
}
result := map[string]interface{}{
"notification_type": "email",
"recipients": recipients,
"message": message,
"sent_count": len(recipients),
"results": results,
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("Email notifications sent to %d recipients", len(recipients)),
}, nil
}
func (p *NotifyProcessor) sendSMSNotification(recipients []string, message string, config NodeConfig) (*ProcessingResult, error) {
// Simulate SMS sending
time.Sleep(50 * time.Millisecond)
results := []map[string]interface{}{}
for _, recipient := range recipients {
results = append(results, map[string]interface{}{
"recipient": recipient,
"status": "sent",
"type": "sms",
"timestamp": time.Now(),
})
}
result := map[string]interface{}{
"notification_type": "sms",
"recipients": recipients,
"message": message,
"sent_count": len(recipients),
"results": results,
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("SMS notifications sent to %d recipients", len(recipients)),
}, nil
}
func (p *NotifyProcessor) sendWebhookNotification(recipients []string, message string, data map[string]interface{}, config NodeConfig) (*ProcessingResult, error) {
// Simulate webhook sending
time.Sleep(25 * time.Millisecond)
results := []map[string]interface{}{}
for _, recipient := range recipients {
// Mock webhook response
results = append(results, map[string]interface{}{
"url": recipient,
"status": "sent",
"type": "webhook",
"response": map[string]interface{}{"status": "ok", "code": 200},
"timestamp": time.Now(),
})
}
result := map[string]interface{}{
"notification_type": "webhook",
"urls": recipients,
"message": message,
"payload": data,
"sent_count": len(recipients),
"results": results,
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("Webhook notifications sent to %d URLs", len(recipients)),
}, nil
}
// WebhookReceiverProcessor handles incoming webhook processing
type WebhookReceiverProcessor struct{}
func (p *WebhookReceiverProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
expectedSignature := config.WebhookSignature
secret := config.WebhookSecret
// Extract webhook data from input
webhookData, ok := input.Data["webhook_data"].(map[string]interface{})
if !ok {
return &ProcessingResult{
Success: false,
Error: "no webhook data found in input",
}, nil
}
// Verify webhook signature if provided
if expectedSignature != "" && secret != "" {
isValid := p.verifyWebhookSignature(webhookData, secret, expectedSignature)
if !isValid {
return &ProcessingResult{
Success: false,
Error: "webhook signature verification failed",
}, nil
}
}
// Process webhook data based on source
source, _ := webhookData["source"].(string)
if source == "" {
source = "unknown"
}
processedData := map[string]interface{}{
"source": source,
"original_data": webhookData,
"processed_at": time.Now(),
"signature_valid": expectedSignature == "" || secret == "",
}
// Apply any data transformations specified in config
if transformRules, exists := config.WebhookTransforms["transforms"]; exists {
if rules, ok := transformRules.(map[string]interface{}); ok {
for key, rule := range rules {
if sourceField, ok := rule.(string); ok {
if value, exists := webhookData[sourceField]; exists {
processedData[key] = value
}
}
}
}
}
result := map[string]interface{}{
"webhook_source": source,
"processed_data": processedData,
"original_payload": webhookData,
"processing_time": time.Now(),
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("Webhook from %s processed successfully", source),
}, nil
}
func (p *WebhookReceiverProcessor) verifyWebhookSignature(data map[string]interface{}, secret, expectedSignature string) bool {
// Convert data to JSON for signature verification
payload, err := json.Marshal(data)
if err != nil {
return false
}
// Create HMAC signature
h := hmac.New(sha256.New, []byte(secret))
h.Write(payload)
computedSignature := hex.EncodeToString(h.Sum(nil))
// Compare signatures (constant time comparison for security)
return hmac.Equal([]byte(computedSignature), []byte(expectedSignature))
}

436
workflow/api.go Normal file
View File

@@ -0,0 +1,436 @@
package workflow
import (
"strconv"
"time"
"github.com/gofiber/fiber/v2"
"github.com/google/uuid"
)
// WorkflowAPI provides HTTP handlers for workflow management
type WorkflowAPI struct {
engine *WorkflowEngine
}
// NewWorkflowAPI creates a new workflow API handler
func NewWorkflowAPI(engine *WorkflowEngine) *WorkflowAPI {
return &WorkflowAPI{
engine: engine,
}
}
// RegisterRoutes registers all workflow routes with Fiber app
func (api *WorkflowAPI) RegisterRoutes(app *fiber.App) {
v1 := app.Group("/api/v1/workflows")
// Workflow definition routes
v1.Post("/", api.CreateWorkflow)
v1.Get("/", api.ListWorkflows)
v1.Get("/:id", api.GetWorkflow)
v1.Put("/:id", api.UpdateWorkflow)
v1.Delete("/:id", api.DeleteWorkflow)
v1.Get("/:id/versions", api.GetWorkflowVersions)
// Execution routes
v1.Post("/:id/execute", api.ExecuteWorkflow)
v1.Get("/:id/executions", api.ListWorkflowExecutions)
v1.Get("/executions", api.ListAllExecutions)
v1.Get("/executions/:executionId", api.GetExecution)
v1.Post("/executions/:executionId/cancel", api.CancelExecution)
v1.Post("/executions/:executionId/suspend", api.SuspendExecution)
v1.Post("/executions/:executionId/resume", api.ResumeExecution)
// Management routes
v1.Get("/health", api.HealthCheck)
v1.Get("/metrics", api.GetMetrics)
}
// CreateWorkflow creates a new workflow definition
func (api *WorkflowAPI) CreateWorkflow(c *fiber.Ctx) error {
var definition WorkflowDefinition
if err := c.BodyParser(&definition); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid request body",
})
}
// Set ID if not provided
if definition.ID == "" {
definition.ID = uuid.New().String()
}
// Set version if not provided
if definition.Version == "" {
definition.Version = "1.0.0"
}
if err := api.engine.RegisterWorkflow(c.Context(), &definition); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.Status(fiber.StatusCreated).JSON(definition)
}
// ListWorkflows lists workflow definitions with filtering
func (api *WorkflowAPI) ListWorkflows(c *fiber.Ctx) error {
filter := &WorkflowFilter{
Limit: 10,
Offset: 0,
}
// Parse query parameters
if limit := c.Query("limit"); limit != "" {
if l, err := strconv.Atoi(limit); err == nil {
filter.Limit = l
}
}
if offset := c.Query("offset"); offset != "" {
if o, err := strconv.Atoi(offset); err == nil {
filter.Offset = o
}
}
if status := c.Query("status"); status != "" {
filter.Status = []WorkflowStatus{WorkflowStatus(status)}
}
if category := c.Query("category"); category != "" {
filter.Category = []string{category}
}
if owner := c.Query("owner"); owner != "" {
filter.Owner = []string{owner}
}
if search := c.Query("search"); search != "" {
filter.Search = search
}
if sortBy := c.Query("sort_by"); sortBy != "" {
filter.SortBy = sortBy
}
if sortOrder := c.Query("sort_order"); sortOrder != "" {
filter.SortOrder = sortOrder
}
workflows, err := api.engine.ListWorkflows(c.Context(), filter)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.JSON(fiber.Map{
"workflows": workflows,
"total": len(workflows),
"limit": filter.Limit,
"offset": filter.Offset,
})
}
// GetWorkflow retrieves a specific workflow definition
func (api *WorkflowAPI) GetWorkflow(c *fiber.Ctx) error {
id := c.Params("id")
version := c.Query("version")
workflow, err := api.engine.GetWorkflow(c.Context(), id, version)
if err != nil {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.JSON(workflow)
}
// UpdateWorkflow updates an existing workflow definition
func (api *WorkflowAPI) UpdateWorkflow(c *fiber.Ctx) error {
id := c.Params("id")
var definition WorkflowDefinition
if err := c.BodyParser(&definition); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid request body",
})
}
// Ensure ID matches
definition.ID = id
if err := api.engine.RegisterWorkflow(c.Context(), &definition); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.JSON(definition)
}
// DeleteWorkflow removes a workflow definition
func (api *WorkflowAPI) DeleteWorkflow(c *fiber.Ctx) error {
id := c.Params("id")
if err := api.engine.DeleteWorkflow(c.Context(), id); err != nil {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.Status(fiber.StatusNoContent).Send(nil)
}
// GetWorkflowVersions retrieves all versions of a workflow
func (api *WorkflowAPI) GetWorkflowVersions(c *fiber.Ctx) error {
id := c.Params("id")
versions, err := api.engine.registry.GetVersions(c.Context(), id)
if err != nil {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.JSON(fiber.Map{
"workflow_id": id,
"versions": versions,
})
}
// ExecuteWorkflow starts workflow execution
func (api *WorkflowAPI) ExecuteWorkflow(c *fiber.Ctx) error {
id := c.Params("id")
var request struct {
Input map[string]interface{} `json:"input"`
Priority Priority `json:"priority"`
Owner string `json:"owner"`
TriggeredBy string `json:"triggered_by"`
ParentExecution string `json:"parent_execution"`
Delay int `json:"delay"` // seconds
}
if err := c.BodyParser(&request); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": "Invalid request body",
})
}
options := &ExecutionOptions{
Priority: request.Priority,
Owner: request.Owner,
TriggeredBy: request.TriggeredBy,
ParentExecution: request.ParentExecution,
Delay: time.Duration(request.Delay) * time.Second,
}
execution, err := api.engine.ExecuteWorkflow(c.Context(), id, request.Input, options)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.Status(fiber.StatusCreated).JSON(execution)
}
// ListWorkflowExecutions lists executions for a specific workflow
func (api *WorkflowAPI) ListWorkflowExecutions(c *fiber.Ctx) error {
workflowID := c.Params("id")
filter := &ExecutionFilter{
WorkflowID: []string{workflowID},
Limit: 10,
Offset: 0,
}
// Parse query parameters
if limit := c.Query("limit"); limit != "" {
if l, err := strconv.Atoi(limit); err == nil {
filter.Limit = l
}
}
if offset := c.Query("offset"); offset != "" {
if o, err := strconv.Atoi(offset); err == nil {
filter.Offset = o
}
}
if status := c.Query("status"); status != "" {
filter.Status = []ExecutionStatus{ExecutionStatus(status)}
}
executions, err := api.engine.ListExecutions(c.Context(), filter)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.JSON(fiber.Map{
"executions": executions,
"total": len(executions),
"limit": filter.Limit,
"offset": filter.Offset,
})
}
// ListAllExecutions lists all executions with filtering
func (api *WorkflowAPI) ListAllExecutions(c *fiber.Ctx) error {
filter := &ExecutionFilter{
Limit: 10,
Offset: 0,
}
// Parse query parameters
if limit := c.Query("limit"); limit != "" {
if l, err := strconv.Atoi(limit); err == nil {
filter.Limit = l
}
}
if offset := c.Query("offset"); offset != "" {
if o, err := strconv.Atoi(offset); err == nil {
filter.Offset = o
}
}
if status := c.Query("status"); status != "" {
filter.Status = []ExecutionStatus{ExecutionStatus(status)}
}
if owner := c.Query("owner"); owner != "" {
filter.Owner = []string{owner}
}
if priority := c.Query("priority"); priority != "" {
filter.Priority = []Priority{Priority(priority)}
}
executions, err := api.engine.ListExecutions(c.Context(), filter)
if err != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.JSON(fiber.Map{
"executions": executions,
"total": len(executions),
"limit": filter.Limit,
"offset": filter.Offset,
})
}
// GetExecution retrieves a specific execution
func (api *WorkflowAPI) GetExecution(c *fiber.Ctx) error {
executionID := c.Params("executionId")
execution, err := api.engine.GetExecution(c.Context(), executionID)
if err != nil {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.JSON(execution)
}
// CancelExecution cancels a running execution
func (api *WorkflowAPI) CancelExecution(c *fiber.Ctx) error {
executionID := c.Params("executionId")
if err := api.engine.CancelExecution(c.Context(), executionID); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"message": "Execution cancelled",
})
}
// SuspendExecution suspends a running execution
func (api *WorkflowAPI) SuspendExecution(c *fiber.Ctx) error {
executionID := c.Params("executionId")
if err := api.engine.SuspendExecution(c.Context(), executionID); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"message": "Execution suspended",
})
}
// ResumeExecution resumes a suspended execution
func (api *WorkflowAPI) ResumeExecution(c *fiber.Ctx) error {
executionID := c.Params("executionId")
if err := api.engine.ResumeExecution(c.Context(), executionID); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{
"error": err.Error(),
})
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"message": "Execution resumed",
})
}
// HealthCheck returns the health status of the workflow engine
func (api *WorkflowAPI) HealthCheck(c *fiber.Ctx) error {
return c.JSON(fiber.Map{
"status": "healthy",
"timestamp": time.Now(),
"version": "1.0.0",
})
}
// GetMetrics returns workflow engine metrics
func (api *WorkflowAPI) GetMetrics(c *fiber.Ctx) error {
// In a real implementation, collect actual metrics
metrics := map[string]interface{}{
"total_workflows": 0,
"total_executions": 0,
"running_executions": 0,
"completed_executions": 0,
"failed_executions": 0,
"average_execution_time": "0s",
"uptime": "0s",
"memory_usage": "0MB",
"cpu_usage": "0%",
}
return c.JSON(metrics)
}
// Error handling middleware
func ErrorHandler(c *fiber.Ctx, err error) error {
code := fiber.StatusInternalServerError
if e, ok := err.(*fiber.Error); ok {
code = e.Code
}
return c.Status(code).JSON(fiber.Map{
"error": true,
"message": err.Error(),
"timestamp": time.Now(),
})
}
// CORS middleware configuration
func CORSConfig() fiber.Config {
return fiber.Config{
ErrorHandler: ErrorHandler,
}
}

718
workflow/demo/main.go Normal file
View File

@@ -0,0 +1,718 @@
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/gofiber/fiber/v2/middleware/recover"
"github.com/oarkflow/mq/workflow"
)
func main() {
fmt.Println("🚀 Starting Complete Workflow Engine Demo...")
// Create workflow engine with configuration
config := &workflow.Config{
MaxWorkers: 10,
ExecutionTimeout: 30 * time.Minute,
EnableMetrics: true,
EnableAudit: true,
EnableTracing: true,
LogLevel: "info",
Storage: workflow.StorageConfig{
Type: "memory",
MaxConnections: 100,
},
Security: workflow.SecurityConfig{
EnableAuth: false,
AllowedOrigins: []string{"*"},
},
}
engine := workflow.NewWorkflowEngine(config)
// Start the engine
ctx := context.Background()
if err := engine.Start(ctx); err != nil {
log.Fatalf("Failed to start workflow engine: %v", err)
}
defer engine.Stop(ctx)
// Create and register sample workflows
createSampleWorkflows(ctx, engine)
// Start HTTP server
startHTTPServer(engine)
}
func createSampleWorkflows(ctx context.Context, engine *workflow.WorkflowEngine) {
fmt.Println("📝 Creating sample workflows...")
// 1. Simple Data Processing Workflow
dataProcessingWorkflow := &workflow.WorkflowDefinition{
ID: "data-processing-workflow",
Name: "Data Processing Pipeline",
Description: "A workflow that processes incoming data through validation, transformation, and storage",
Version: "1.0.0",
Status: workflow.WorkflowStatusActive,
Category: "data-processing",
Owner: "demo-user",
Tags: []string{"data", "processing", "pipeline"},
Variables: map[string]workflow.Variable{
"source_url": {
Name: "source_url",
Type: "string",
DefaultValue: "https://api.example.com/data",
Required: true,
Description: "URL to fetch data from",
},
"batch_size": {
Name: "batch_size",
Type: "integer",
DefaultValue: 100,
Required: false,
Description: "Number of records to process in each batch",
},
},
Nodes: []workflow.WorkflowNode{
{
ID: "fetch-data",
Name: "Fetch Data",
Type: workflow.NodeTypeAPI,
Description: "Fetch data from external API",
Config: workflow.NodeConfig{
URL: "${source_url}",
Method: "GET",
Headers: map[string]string{
"Content-Type": "application/json",
},
},
Position: workflow.Position{X: 100, Y: 100},
Timeout: func() *time.Duration { d := 30 * time.Second; return &d }(),
},
{
ID: "validate-data",
Name: "Validate Data",
Type: workflow.NodeTypeTask,
Description: "Validate the fetched data",
Config: workflow.NodeConfig{
Script: "console.log('Validating data:', ${data})",
},
Position: workflow.Position{X: 300, Y: 100},
},
{
ID: "transform-data",
Name: "Transform Data",
Type: workflow.NodeTypeTransform,
Description: "Transform data to required format",
Config: workflow.NodeConfig{
TransformType: "json_path",
Expression: "$.data",
},
Position: workflow.Position{X: 500, Y: 100},
},
{
ID: "check-quality",
Name: "Data Quality Check",
Type: workflow.NodeTypeDecision,
Description: "Check if data meets quality standards",
Config: workflow.NodeConfig{
Rules: []workflow.Rule{
{
Condition: "record_count > 0",
Output: "quality_passed",
NextNode: "store-data",
},
{
Condition: "record_count == 0",
Output: "quality_failed",
NextNode: "notify-failure",
},
},
},
Position: workflow.Position{X: 700, Y: 100},
},
{
ID: "store-data",
Name: "Store Data",
Type: workflow.NodeTypeDatabase,
Description: "Store processed data in database",
Config: workflow.NodeConfig{
Query: "INSERT INTO processed_data (data, created_at) VALUES (?, ?)",
Connection: "default",
},
Position: workflow.Position{X: 900, Y: 50},
},
{
ID: "notify-failure",
Name: "Notify Failure",
Type: workflow.NodeTypeEmail,
Description: "Send notification about data quality failure",
Config: workflow.NodeConfig{
To: []string{"admin@example.com"},
Subject: "Data Quality Check Failed",
Body: "The data processing workflow failed quality checks.",
},
Position: workflow.Position{X: 900, Y: 150},
},
},
Edges: []workflow.WorkflowEdge{
{
ID: "fetch-to-validate",
FromNode: "fetch-data",
ToNode: "validate-data",
Priority: 1,
},
{
ID: "validate-to-transform",
FromNode: "validate-data",
ToNode: "transform-data",
Priority: 1,
},
{
ID: "transform-to-check",
FromNode: "transform-data",
ToNode: "check-quality",
Priority: 1,
},
{
ID: "check-to-store",
FromNode: "check-quality",
ToNode: "store-data",
Condition: "quality_passed",
Priority: 1,
},
{
ID: "check-to-notify",
FromNode: "check-quality",
ToNode: "notify-failure",
Condition: "quality_failed",
Priority: 2,
},
},
Config: workflow.WorkflowConfig{
Timeout: func() *time.Duration { d := 10 * time.Minute; return &d }(),
MaxRetries: 3,
Priority: workflow.PriorityMedium,
Concurrency: 5,
ErrorHandling: workflow.ErrorHandling{
OnFailure: "stop",
MaxErrors: 3,
Rollback: false,
},
},
}
// 2. Approval Workflow
approvalWorkflow := &workflow.WorkflowDefinition{
ID: "approval-workflow",
Name: "Document Approval Process",
Description: "Multi-stage approval workflow for document processing",
Version: "1.0.0",
Status: workflow.WorkflowStatusActive,
Category: "approval",
Owner: "demo-user",
Tags: []string{"approval", "documents", "review"},
Nodes: []workflow.WorkflowNode{
{
ID: "initial-review",
Name: "Initial Review",
Type: workflow.NodeTypeHumanTask,
Description: "Initial review by team lead",
Config: workflow.NodeConfig{
Custom: map[string]interface{}{
"assignee": "team-lead",
"due_date": "3 days",
"description": "Please review the document for technical accuracy",
},
},
Position: workflow.Position{X: 100, Y: 100},
},
{
ID: "check-approval",
Name: "Check Approval Status",
Type: workflow.NodeTypeDecision,
Description: "Check if document was approved or rejected",
Config: workflow.NodeConfig{
Rules: []workflow.Rule{
{
Condition: "status == 'approved'",
Output: "approved",
NextNode: "manager-review",
},
{
Condition: "status == 'rejected'",
Output: "rejected",
NextNode: "notify-rejection",
},
{
Condition: "status == 'needs_changes'",
Output: "needs_changes",
NextNode: "notify-changes",
},
},
},
Position: workflow.Position{X: 300, Y: 100},
},
{
ID: "manager-review",
Name: "Manager Review",
Type: workflow.NodeTypeHumanTask,
Description: "Final approval by manager",
Config: workflow.NodeConfig{
Custom: map[string]interface{}{
"assignee": "manager",
"due_date": "2 days",
"description": "Final approval required",
},
},
Position: workflow.Position{X: 500, Y: 50},
},
{
ID: "final-approval",
Name: "Final Approval Check",
Type: workflow.NodeTypeDecision,
Description: "Check final approval status",
Config: workflow.NodeConfig{
Rules: []workflow.Rule{
{
Condition: "status == 'approved'",
Output: "final_approved",
NextNode: "publish-document",
},
{
Condition: "status == 'rejected'",
Output: "final_rejected",
NextNode: "notify-rejection",
},
},
},
Position: workflow.Position{X: 700, Y: 50},
},
{
ID: "publish-document",
Name: "Publish Document",
Type: workflow.NodeTypeTask,
Description: "Publish approved document",
Config: workflow.NodeConfig{
Script: "console.log('Publishing document:', ${document_id})",
},
Position: workflow.Position{X: 900, Y: 50},
},
{
ID: "notify-rejection",
Name: "Notify Rejection",
Type: workflow.NodeTypeEmail,
Description: "Send rejection notification",
Config: workflow.NodeConfig{
To: []string{"${author_email}"},
Subject: "Document Rejected",
Body: "Your document has been rejected. Reason: ${rejection_reason}",
},
Position: workflow.Position{X: 500, Y: 200},
},
{
ID: "notify-changes",
Name: "Notify Changes Needed",
Type: workflow.NodeTypeEmail,
Description: "Send notification about required changes",
Config: workflow.NodeConfig{
To: []string{"${author_email}"},
Subject: "Document Changes Required",
Body: "Your document needs changes. Details: ${change_details}",
},
Position: workflow.Position{X: 300, Y: 200},
},
},
Edges: []workflow.WorkflowEdge{
{
ID: "review-to-check",
FromNode: "initial-review",
ToNode: "check-approval",
Priority: 1,
},
{
ID: "check-to-manager",
FromNode: "check-approval",
ToNode: "manager-review",
Condition: "approved",
Priority: 1,
},
{
ID: "check-to-rejection",
FromNode: "check-approval",
ToNode: "notify-rejection",
Condition: "rejected",
Priority: 2,
},
{
ID: "check-to-changes",
FromNode: "check-approval",
ToNode: "notify-changes",
Condition: "needs_changes",
Priority: 3,
},
{
ID: "manager-to-final",
FromNode: "manager-review",
ToNode: "final-approval",
Priority: 1,
},
{
ID: "final-to-publish",
FromNode: "final-approval",
ToNode: "publish-document",
Condition: "final_approved",
Priority: 1,
},
{
ID: "final-to-rejection",
FromNode: "final-approval",
ToNode: "notify-rejection",
Condition: "final_rejected",
Priority: 2,
},
},
Config: workflow.WorkflowConfig{
Timeout: func() *time.Duration { d := 7 * 24 * time.Hour; return &d }(), // 7 days
MaxRetries: 1,
Priority: workflow.PriorityHigh,
Concurrency: 1,
ErrorHandling: workflow.ErrorHandling{
OnFailure: "continue",
MaxErrors: 5,
Rollback: false,
},
},
}
// 3. Complex ETL Workflow
etlWorkflow := &workflow.WorkflowDefinition{
ID: "etl-workflow",
Name: "ETL Data Pipeline",
Description: "Extract, Transform, Load workflow with parallel processing",
Version: "1.0.0",
Status: workflow.WorkflowStatusActive,
Category: "etl",
Owner: "data-team",
Tags: []string{"etl", "data", "parallel", "batch"},
Nodes: []workflow.WorkflowNode{
{
ID: "extract-customers",
Name: "Extract Customer Data",
Type: workflow.NodeTypeDatabase,
Description: "Extract customer data from source database",
Config: workflow.NodeConfig{
Query: "SELECT * FROM customers WHERE updated_at > ?",
Connection: "source_db",
},
Position: workflow.Position{X: 100, Y: 50},
},
{
ID: "extract-orders",
Name: "Extract Order Data",
Type: workflow.NodeTypeDatabase,
Description: "Extract order data from source database",
Config: workflow.NodeConfig{
Query: "SELECT * FROM orders WHERE created_at > ?",
Connection: "source_db",
},
Position: workflow.Position{X: 100, Y: 150},
},
{
ID: "transform-customers",
Name: "Transform Customer Data",
Type: workflow.NodeTypeTransform,
Description: "Clean and transform customer data",
Config: workflow.NodeConfig{
TransformType: "expression",
Expression: "standardize_phone(${phone}) AND validate_email(${email})",
},
Position: workflow.Position{X: 300, Y: 50},
},
{
ID: "transform-orders",
Name: "Transform Order Data",
Type: workflow.NodeTypeTransform,
Description: "Calculate order metrics and clean data",
Config: workflow.NodeConfig{
TransformType: "expression",
Expression: "calculate_total(${items}) AND format_date(${order_date})",
},
Position: workflow.Position{X: 300, Y: 150},
},
{
ID: "parallel-validation",
Name: "Parallel Data Validation",
Type: workflow.NodeTypeParallel,
Description: "Run validation checks in parallel",
Config: workflow.NodeConfig{
Custom: map[string]interface{}{
"max_parallel": 5,
"timeout": "30s",
},
},
Position: workflow.Position{X: 500, Y: 100},
},
{
ID: "merge-data",
Name: "Merge Customer & Order Data",
Type: workflow.NodeTypeTask,
Description: "Join customer and order data",
Config: workflow.NodeConfig{
Script: "merge_datasets(${customers}, ${orders})",
},
Position: workflow.Position{X: 700, Y: 100},
},
{
ID: "load-warehouse",
Name: "Load to Data Warehouse",
Type: workflow.NodeTypeDatabase,
Description: "Load processed data to warehouse",
Config: workflow.NodeConfig{
Query: "INSERT INTO warehouse.customer_orders SELECT * FROM temp_table",
Connection: "warehouse_db",
},
Position: workflow.Position{X: 900, Y: 100},
},
{
ID: "send-report",
Name: "Send Processing Report",
Type: workflow.NodeTypeEmail,
Description: "Send completion report",
Config: workflow.NodeConfig{
To: []string{"data-team@example.com"},
Subject: "ETL Pipeline Completed",
Body: "ETL pipeline completed successfully. Processed ${record_count} records.",
},
Position: workflow.Position{X: 1100, Y: 100},
},
},
Edges: []workflow.WorkflowEdge{
{
ID: "extract-customers-to-transform",
FromNode: "extract-customers",
ToNode: "transform-customers",
Priority: 1,
},
{
ID: "extract-orders-to-transform",
FromNode: "extract-orders",
ToNode: "transform-orders",
Priority: 1,
},
{
ID: "customers-to-validation",
FromNode: "transform-customers",
ToNode: "parallel-validation",
Priority: 1,
},
{
ID: "orders-to-validation",
FromNode: "transform-orders",
ToNode: "parallel-validation",
Priority: 1,
},
{
ID: "validation-to-merge",
FromNode: "parallel-validation",
ToNode: "merge-data",
Priority: 1,
},
{
ID: "merge-to-load",
FromNode: "merge-data",
ToNode: "load-warehouse",
Priority: 1,
},
{
ID: "load-to-report",
FromNode: "load-warehouse",
ToNode: "send-report",
Priority: 1,
},
},
Config: workflow.WorkflowConfig{
Timeout: func() *time.Duration { d := 2 * time.Hour; return &d }(),
MaxRetries: 2,
Priority: workflow.PriorityCritical,
Concurrency: 10,
ErrorHandling: workflow.ErrorHandling{
OnFailure: "retry",
MaxErrors: 3,
Rollback: true,
},
},
}
// Register all workflows
workflows := []*workflow.WorkflowDefinition{
dataProcessingWorkflow,
approvalWorkflow,
etlWorkflow,
}
for _, wf := range workflows {
if err := engine.RegisterWorkflow(ctx, wf); err != nil {
log.Printf("Failed to register workflow %s: %v", wf.Name, err)
} else {
fmt.Printf("✅ Registered workflow: %s (ID: %s)\n", wf.Name, wf.ID)
}
}
// Execute sample workflows
fmt.Println("🏃 Executing sample workflows...")
// Execute data processing workflow
dataExecution, err := engine.ExecuteWorkflow(ctx, "data-processing-workflow", map[string]interface{}{
"source_url": "https://jsonplaceholder.typicode.com/posts",
"batch_size": 50,
"record_count": 100,
}, &workflow.ExecutionOptions{
Priority: workflow.PriorityMedium,
Owner: "demo-user",
TriggeredBy: "demo",
})
if err != nil {
log.Printf("Failed to execute data processing workflow: %v", err)
} else {
fmt.Printf("🚀 Started data processing execution: %s\n", dataExecution.ID)
}
// Execute approval workflow
approvalExecution, err := engine.ExecuteWorkflow(ctx, "approval-workflow", map[string]interface{}{
"document_id": "DOC-12345",
"author_email": "author@example.com",
"document_title": "Technical Specification",
"document_category": "technical",
}, &workflow.ExecutionOptions{
Priority: workflow.PriorityHigh,
Owner: "demo-user",
TriggeredBy: "document-system",
})
if err != nil {
log.Printf("Failed to execute approval workflow: %v", err)
} else {
fmt.Printf("🚀 Started approval execution: %s\n", approvalExecution.ID)
}
// Execute ETL workflow with delay
etlExecution, err := engine.ExecuteWorkflow(ctx, "etl-workflow", map[string]interface{}{
"start_date": "2023-01-01",
"end_date": "2023-12-31",
"table_name": "customer_orders",
}, &workflow.ExecutionOptions{
Priority: workflow.PriorityCritical,
Owner: "data-team",
TriggeredBy: "scheduler",
Delay: 2 * time.Second, // Start after 2 seconds
})
if err != nil {
log.Printf("Failed to execute ETL workflow: %v", err)
} else {
fmt.Printf("🚀 Scheduled ETL execution: %s (starts in 2 seconds)\n", etlExecution.ID)
}
// Wait a bit to see some execution progress
time.Sleep(3 * time.Second)
// Check execution status
fmt.Println("📊 Checking execution status...")
if dataExecution != nil {
if exec, err := engine.GetExecution(ctx, dataExecution.ID); err == nil {
fmt.Printf("Data Processing Status: %s\n", exec.Status)
}
}
if approvalExecution != nil {
if exec, err := engine.GetExecution(ctx, approvalExecution.ID); err == nil {
fmt.Printf("Approval Workflow Status: %s\n", exec.Status)
}
}
if etlExecution != nil {
if exec, err := engine.GetExecution(ctx, etlExecution.ID); err == nil {
fmt.Printf("ETL Workflow Status: %s\n", exec.Status)
}
}
}
func startHTTPServer(engine *workflow.WorkflowEngine) {
fmt.Println("🌐 Starting HTTP server...")
// Create Fiber app
app := fiber.New(workflow.CORSConfig())
// Add middleware
app.Use(recover.New())
app.Use(logger.New())
app.Use(cors.New(cors.Config{
AllowOrigins: "*",
AllowMethods: "GET,POST,HEAD,PUT,DELETE,PATCH,OPTIONS",
AllowHeaders: "Origin, Content-Type, Accept, Authorization",
}))
// Create API handlers
api := workflow.NewWorkflowAPI(engine)
api.RegisterRoutes(app)
// Add demo routes
app.Get("/", func(c *fiber.Ctx) error {
return c.JSON(fiber.Map{
"message": "🚀 Workflow Engine Demo API",
"version": "1.0.0",
"endpoints": map[string]string{
"workflows": "/api/v1/workflows",
"executions": "/api/v1/workflows/executions",
"health": "/api/v1/workflows/health",
"metrics": "/api/v1/workflows/metrics",
"demo_workflows": "/demo/workflows",
"demo_executions": "/demo/executions",
},
})
})
// Demo endpoints
demo := app.Group("/demo")
demo.Get("/workflows", func(c *fiber.Ctx) error {
workflows, err := engine.ListWorkflows(c.Context(), &workflow.WorkflowFilter{})
if err != nil {
return err
}
return c.JSON(fiber.Map{
"total": len(workflows),
"workflows": workflows,
})
})
demo.Get("/executions", func(c *fiber.Ctx) error {
executions, err := engine.ListExecutions(c.Context(), &workflow.ExecutionFilter{})
if err != nil {
return err
}
return c.JSON(fiber.Map{
"total": len(executions),
"executions": executions,
})
})
fmt.Println("📱 Demo endpoints available:")
fmt.Println(" • Main API: http://localhost:3000/")
fmt.Println(" • Workflows: http://localhost:3000/demo/workflows")
fmt.Println(" • Executions: http://localhost:3000/demo/executions")
fmt.Println(" • Health: http://localhost:3000/api/v1/workflows/health")
fmt.Println(" • Metrics: http://localhost:3000/api/v1/workflows/metrics")
fmt.Println()
fmt.Println("🎯 Try these API calls:")
fmt.Println(" curl http://localhost:3000/demo/workflows")
fmt.Println(" curl http://localhost:3000/demo/executions")
fmt.Println(" curl http://localhost:3000/api/v1/workflows/health")
fmt.Println()
// Start server
log.Fatal(app.Listen(":3000"))
}

696
workflow/engine.go Normal file
View File

@@ -0,0 +1,696 @@
package workflow
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
// WorkflowEngine - Main workflow engine
type WorkflowEngine struct {
registry WorkflowRegistry
stateManager StateManager
executor WorkflowExecutor
scheduler WorkflowScheduler
processorFactory *ProcessorFactory
config *Config
mu sync.RWMutex
running bool
}
// NewWorkflowEngine creates a new workflow engine
func NewWorkflowEngine(config *Config) *WorkflowEngine {
engine := &WorkflowEngine{
registry: NewInMemoryRegistry(),
stateManager: NewInMemoryStateManager(),
processorFactory: NewProcessorFactory(),
config: config,
}
// Create executor and scheduler
engine.executor = NewWorkflowExecutor(engine.processorFactory, engine.stateManager, config)
engine.scheduler = NewWorkflowScheduler(engine.stateManager, engine.executor)
return engine
}
// Start the workflow engine
func (e *WorkflowEngine) Start(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.running {
return fmt.Errorf("workflow engine is already running")
}
// Start components
if err := e.executor.Start(ctx); err != nil {
return fmt.Errorf("failed to start executor: %w", err)
}
if err := e.scheduler.Start(ctx); err != nil {
return fmt.Errorf("failed to start scheduler: %w", err)
}
e.running = true
return nil
}
// Stop the workflow engine
func (e *WorkflowEngine) Stop(ctx context.Context) {
e.mu.Lock()
defer e.mu.Unlock()
if !e.running {
return
}
e.executor.Stop(ctx)
e.scheduler.Stop(ctx)
e.running = false
}
// RegisterWorkflow registers a new workflow definition
func (e *WorkflowEngine) RegisterWorkflow(ctx context.Context, definition *WorkflowDefinition) error {
// Set timestamps
now := time.Now()
if definition.CreatedAt.IsZero() {
definition.CreatedAt = now
}
definition.UpdatedAt = now
// Validate workflow
if err := e.validateWorkflow(definition); err != nil {
return fmt.Errorf("workflow validation failed: %w", err)
}
return e.registry.Store(ctx, definition)
}
// GetWorkflow retrieves a workflow definition
func (e *WorkflowEngine) GetWorkflow(ctx context.Context, id string, version string) (*WorkflowDefinition, error) {
return e.registry.Get(ctx, id, version)
}
// ListWorkflows lists workflow definitions with filtering
func (e *WorkflowEngine) ListWorkflows(ctx context.Context, filter *WorkflowFilter) ([]*WorkflowDefinition, error) {
return e.registry.List(ctx, filter)
}
// DeleteWorkflow removes a workflow definition
func (e *WorkflowEngine) DeleteWorkflow(ctx context.Context, id string) error {
return e.registry.Delete(ctx, id)
}
// ExecuteWorkflow starts workflow execution
func (e *WorkflowEngine) ExecuteWorkflow(ctx context.Context, workflowID string, input map[string]interface{}, options *ExecutionOptions) (*Execution, error) {
// Get workflow definition
definition, err := e.registry.Get(ctx, workflowID, "")
if err != nil {
return nil, fmt.Errorf("failed to get workflow: %w", err)
}
// Create execution
execution := &Execution{
ID: uuid.New().String(),
WorkflowID: workflowID,
WorkflowVersion: definition.Version,
Status: ExecutionStatusPending,
Input: input,
Context: ExecutionContext{
Variables: make(map[string]interface{}),
Metadata: make(map[string]interface{}),
Trace: []TraceEntry{},
Checkpoints: []Checkpoint{},
},
ExecutedNodes: []ExecutedNode{},
StartedAt: time.Now(),
UpdatedAt: time.Now(),
Priority: PriorityMedium,
}
// Apply options
if options != nil {
if options.Priority != "" {
execution.Priority = options.Priority
}
if options.Owner != "" {
execution.Owner = options.Owner
}
if options.TriggeredBy != "" {
execution.TriggeredBy = options.TriggeredBy
}
if options.ParentExecution != "" {
execution.ParentExecution = options.ParentExecution
}
if options.Delay > 0 {
// Schedule for later execution
if err := e.scheduler.ScheduleExecution(ctx, execution, options.Delay); err != nil {
return nil, fmt.Errorf("failed to schedule execution: %w", err)
}
// Save execution in pending state
if err := e.stateManager.CreateExecution(ctx, execution); err != nil {
return nil, fmt.Errorf("failed to create execution: %w", err)
}
return execution, nil
}
}
// Save execution
if err := e.stateManager.CreateExecution(ctx, execution); err != nil {
return nil, fmt.Errorf("failed to create execution: %w", err)
}
// Start execution
go func() {
execution.Status = ExecutionStatusRunning
execution.UpdatedAt = time.Now()
if err := e.stateManager.UpdateExecution(context.Background(), execution); err != nil {
// Log error but continue
}
if err := e.executor.Execute(context.Background(), definition, execution); err != nil {
execution.Status = ExecutionStatusFailed
execution.Error = err.Error()
now := time.Now()
execution.CompletedAt = &now
execution.UpdatedAt = now
e.stateManager.UpdateExecution(context.Background(), execution)
}
}()
return execution, nil
}
// GetExecution retrieves execution status
func (e *WorkflowEngine) GetExecution(ctx context.Context, executionID string) (*Execution, error) {
return e.stateManager.GetExecution(ctx, executionID)
}
// ListExecutions lists executions with filtering
func (e *WorkflowEngine) ListExecutions(ctx context.Context, filter *ExecutionFilter) ([]*Execution, error) {
return e.stateManager.ListExecutions(ctx, filter)
}
// CancelExecution cancels a running execution
func (e *WorkflowEngine) CancelExecution(ctx context.Context, executionID string) error {
return e.executor.Cancel(ctx, executionID)
}
// SuspendExecution suspends a running execution
func (e *WorkflowEngine) SuspendExecution(ctx context.Context, executionID string) error {
return e.executor.Suspend(ctx, executionID)
}
// ResumeExecution resumes a suspended execution
func (e *WorkflowEngine) ResumeExecution(ctx context.Context, executionID string) error {
return e.executor.Resume(ctx, executionID)
}
// validateWorkflow validates a workflow definition
func (e *WorkflowEngine) validateWorkflow(definition *WorkflowDefinition) error {
if definition.ID == "" {
return fmt.Errorf("workflow ID cannot be empty")
}
if definition.Name == "" {
return fmt.Errorf("workflow name cannot be empty")
}
if definition.Version == "" {
return fmt.Errorf("workflow version cannot be empty")
}
if len(definition.Nodes) == 0 {
return fmt.Errorf("workflow must have at least one node")
}
// Validate nodes
nodeIDs := make(map[string]bool)
for _, node := range definition.Nodes {
if node.ID == "" {
return fmt.Errorf("node ID cannot be empty")
}
if nodeIDs[node.ID] {
return fmt.Errorf("duplicate node ID: %s", node.ID)
}
nodeIDs[node.ID] = true
if node.Type == "" {
return fmt.Errorf("node type cannot be empty for node: %s", node.ID)
}
// Validate node configuration based on type
if err := e.validateNodeConfig(node); err != nil {
return fmt.Errorf("invalid configuration for node %s: %w", node.ID, err)
}
}
// Validate edges
for _, edge := range definition.Edges {
if edge.FromNode == "" || edge.ToNode == "" {
return fmt.Errorf("edge must have both from_node and to_node")
}
if !nodeIDs[edge.FromNode] {
return fmt.Errorf("edge references unknown from_node: %s", edge.FromNode)
}
if !nodeIDs[edge.ToNode] {
return fmt.Errorf("edge references unknown to_node: %s", edge.ToNode)
}
}
return nil
}
func (e *WorkflowEngine) validateNodeConfig(node WorkflowNode) error {
switch node.Type {
case NodeTypeAPI:
if node.Config.URL == "" {
return fmt.Errorf("API node requires URL")
}
if node.Config.Method == "" {
return fmt.Errorf("API node requires HTTP method")
}
case NodeTypeTransform:
if node.Config.TransformType == "" {
return fmt.Errorf("Transform node requires transform_type")
}
case NodeTypeDecision:
if node.Config.Condition == "" && len(node.Config.DecisionRules) == 0 {
return fmt.Errorf("Decision node requires either condition or rules")
}
case NodeTypeTimer:
if node.Config.Duration <= 0 && node.Config.Schedule == "" {
return fmt.Errorf("Timer node requires either duration or schedule")
}
case NodeTypeDatabase:
if node.Config.Query == "" {
return fmt.Errorf("Database node requires query")
}
case NodeTypeEmail:
if len(node.Config.EmailTo) == 0 {
return fmt.Errorf("Email node requires recipients")
}
}
return nil
}
// ExecutionOptions for workflow execution
type ExecutionOptions struct {
Priority Priority `json:"priority"`
Owner string `json:"owner"`
TriggeredBy string `json:"triggered_by"`
ParentExecution string `json:"parent_execution"`
Delay time.Duration `json:"delay"`
}
// Simple Executor Implementation
type SimpleWorkflowExecutor struct {
processorFactory *ProcessorFactory
stateManager StateManager
config *Config
workers chan struct{}
running bool
executions map[string]*ExecutionControl
mu sync.RWMutex
}
type ExecutionControl struct {
cancel context.CancelFunc
suspended bool
}
func NewWorkflowExecutor(processorFactory *ProcessorFactory, stateManager StateManager, config *Config) WorkflowExecutor {
return &SimpleWorkflowExecutor{
processorFactory: processorFactory,
stateManager: stateManager,
config: config,
workers: make(chan struct{}, config.MaxWorkers),
executions: make(map[string]*ExecutionControl),
}
}
func (e *SimpleWorkflowExecutor) Start(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
e.running = true
// Initialize worker pool
for i := 0; i < e.config.MaxWorkers; i++ {
e.workers <- struct{}{}
}
return nil
}
func (e *SimpleWorkflowExecutor) Stop(ctx context.Context) {
e.mu.Lock()
defer e.mu.Unlock()
e.running = false
close(e.workers)
// Cancel all running executions
for _, control := range e.executions {
if control.cancel != nil {
control.cancel()
}
}
}
func (e *SimpleWorkflowExecutor) Execute(ctx context.Context, definition *WorkflowDefinition, execution *Execution) error {
// Get a worker
<-e.workers
defer func() {
if e.running {
e.workers <- struct{}{}
}
}()
// Create cancellable context
execCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Track execution
e.mu.Lock()
e.executions[execution.ID] = &ExecutionControl{cancel: cancel}
e.mu.Unlock()
defer func() {
e.mu.Lock()
delete(e.executions, execution.ID)
e.mu.Unlock()
}()
// Convert workflow to DAG and execute
dag, err := e.convertToDAG(definition, execution)
if err != nil {
return fmt.Errorf("failed to convert workflow to DAG: %w", err)
}
// Execute the DAG
inputBytes, err := json.Marshal(execution.Input)
if err != nil {
return fmt.Errorf("failed to serialize input: %w", err)
}
result := dag.Process(execCtx, inputBytes)
// Update execution state
execution.Status = ExecutionStatusCompleted
if result.Error != nil {
execution.Status = ExecutionStatusFailed
execution.Error = result.Error.Error()
} else {
// Deserialize output
var output map[string]interface{}
if err := json.Unmarshal(result.Payload, &output); err == nil {
execution.Output = output
}
}
now := time.Now()
execution.CompletedAt = &now
execution.UpdatedAt = now
return e.stateManager.UpdateExecution(ctx, execution)
}
func (e *SimpleWorkflowExecutor) Cancel(ctx context.Context, executionID string) error {
e.mu.RLock()
control, exists := e.executions[executionID]
e.mu.RUnlock()
if !exists {
return fmt.Errorf("execution not found: %s", executionID)
}
if control.cancel != nil {
control.cancel()
}
// Update execution status
execution, err := e.stateManager.GetExecution(ctx, executionID)
if err != nil {
return err
}
execution.Status = ExecutionStatusCancelled
now := time.Now()
execution.CompletedAt = &now
execution.UpdatedAt = now
return e.stateManager.UpdateExecution(ctx, execution)
}
func (e *SimpleWorkflowExecutor) Suspend(ctx context.Context, executionID string) error {
e.mu.Lock()
defer e.mu.Unlock()
control, exists := e.executions[executionID]
if !exists {
return fmt.Errorf("execution not found: %s", executionID)
}
control.suspended = true
// Update execution status
execution, err := e.stateManager.GetExecution(ctx, executionID)
if err != nil {
return err
}
execution.Status = ExecutionStatusSuspended
execution.UpdatedAt = time.Now()
return e.stateManager.UpdateExecution(ctx, execution)
}
func (e *SimpleWorkflowExecutor) Resume(ctx context.Context, executionID string) error {
e.mu.Lock()
defer e.mu.Unlock()
control, exists := e.executions[executionID]
if !exists {
return fmt.Errorf("execution not found: %s", executionID)
}
control.suspended = false
// Update execution status
execution, err := e.stateManager.GetExecution(ctx, executionID)
if err != nil {
return err
}
execution.Status = ExecutionStatusRunning
execution.UpdatedAt = time.Now()
return e.stateManager.UpdateExecution(ctx, execution)
}
func (e *SimpleWorkflowExecutor) convertToDAG(definition *WorkflowDefinition, execution *Execution) (*dag.DAG, error) {
// Create a new DAG
dagInstance := dag.NewDAG(
fmt.Sprintf("workflow-%s", definition.ID),
execution.ID,
func(taskID string, result mq.Result) {
// Handle final result
},
)
// Create DAG nodes for each workflow node
for _, node := range definition.Nodes {
processor, err := e.processorFactory.CreateProcessor(string(node.Type))
if err != nil {
return nil, fmt.Errorf("failed to create processor for node %s: %w", node.ID, err)
}
// Wrap processor in a DAG processor adapter
dagProcessor := &DAGProcessorAdapter{
processor: processor,
nodeID: node.ID,
execution: execution,
}
// Add node to DAG
dagInstance.AddNode(dag.Function, node.Name, node.ID, dagProcessor, false)
}
// Add dependencies based on edges
for _, edge := range definition.Edges {
dagInstance.AddEdge(dag.Simple, edge.ID, edge.FromNode, edge.ToNode)
}
return dagInstance, nil
}
// DAGProcessorAdapter adapts Processor to DAG Processor interface
type DAGProcessorAdapter struct {
dag.Operation
processor Processor
nodeID string
execution *Execution
}
func (a *DAGProcessorAdapter) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
// Convert task payload to ProcessingContext
var data map[string]interface{}
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %v", err)}
}
// Create a minimal workflow node for processing (in real implementation, this would be passed in)
workflowNode := &WorkflowNode{
ID: a.nodeID,
Type: NodeTypeTask, // Default type, this should be set properly
Config: NodeConfig{},
}
processingContext := ProcessingContext{
Node: workflowNode,
Data: data,
Variables: make(map[string]interface{}),
}
result, err := a.processor.Process(ctx, processingContext)
if err != nil {
return mq.Result{Error: err}
}
// Convert ProcessingResult back to mq.Result
var payload []byte
if result.Data != nil {
payload, _ = json.Marshal(result.Data)
}
mqResult := mq.Result{
Payload: payload,
}
if !result.Success {
mqResult.Error = fmt.Errorf(result.Error)
}
// Track node execution
executedNode := ExecutedNode{
NodeID: a.nodeID,
Status: ExecutionStatusCompleted,
StartedAt: time.Now(),
Input: data,
Output: result.Data,
Logs: []LogEntry{},
}
if !result.Success {
executedNode.Status = ExecutionStatusFailed
executedNode.Error = result.Error
}
now := time.Now()
executedNode.CompletedAt = &now
executedNode.Duration = time.Since(executedNode.StartedAt)
// Add to execution history (in real implementation, use thread-safe approach)
if a.execution != nil {
a.execution.ExecutedNodes = append(a.execution.ExecutedNodes, executedNode)
}
return mqResult
}
// Simple Scheduler Implementation
type SimpleWorkflowScheduler struct {
stateManager StateManager
executor WorkflowExecutor
running bool
mu sync.Mutex
scheduled map[string]*time.Timer
}
func NewWorkflowScheduler(stateManager StateManager, executor WorkflowExecutor) WorkflowScheduler {
return &SimpleWorkflowScheduler{
stateManager: stateManager,
executor: executor,
scheduled: make(map[string]*time.Timer),
}
}
func (s *SimpleWorkflowScheduler) Start(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
s.running = true
return nil
}
func (s *SimpleWorkflowScheduler) Stop(ctx context.Context) {
s.mu.Lock()
defer s.mu.Unlock()
s.running = false
// Cancel all scheduled executions
for _, timer := range s.scheduled {
timer.Stop()
}
s.scheduled = make(map[string]*time.Timer)
}
func (s *SimpleWorkflowScheduler) ScheduleExecution(ctx context.Context, execution *Execution, delay time.Duration) error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return fmt.Errorf("scheduler is not running")
}
// Create timer for delayed execution
timer := time.AfterFunc(delay, func() {
// Remove from scheduled map
s.mu.Lock()
delete(s.scheduled, execution.ID)
s.mu.Unlock()
// Execute workflow (implementation depends on having access to workflow definition)
// For now, just update status
execution.Status = ExecutionStatusRunning
execution.UpdatedAt = time.Now()
s.stateManager.UpdateExecution(context.Background(), execution)
})
s.scheduled[execution.ID] = timer
return nil
}
func (s *SimpleWorkflowScheduler) CancelScheduledExecution(ctx context.Context, executionID string) error {
s.mu.Lock()
defer s.mu.Unlock()
timer, exists := s.scheduled[executionID]
if !exists {
return fmt.Errorf("scheduled execution not found: %s", executionID)
}
timer.Stop()
delete(s.scheduled, executionID)
return nil
}

590
workflow/middleware.go Normal file
View File

@@ -0,0 +1,590 @@
package workflow
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"
)
// MiddlewareManager manages middleware execution chain
type MiddlewareManager struct {
middlewares []Middleware
cache map[string]*MiddlewareResult
mutex sync.RWMutex
}
// MiddlewareFunc is the function signature for middleware
type MiddlewareFunc func(ctx context.Context, data map[string]interface{}, next func(context.Context, map[string]interface{}) MiddlewareResult) MiddlewareResult
// MiddlewareChain represents a chain of middleware functions
type MiddlewareChain struct {
middlewares []MiddlewareFunc
}
// NewMiddlewareManager creates a new middleware manager
func NewMiddlewareManager() *MiddlewareManager {
return &MiddlewareManager{
middlewares: make([]Middleware, 0),
cache: make(map[string]*MiddlewareResult),
}
}
// AddMiddleware adds a middleware to the chain
func (m *MiddlewareManager) AddMiddleware(middleware Middleware) {
m.mutex.Lock()
defer m.mutex.Unlock()
// Insert middleware in priority order
inserted := false
for i, existing := range m.middlewares {
if middleware.Priority < existing.Priority {
m.middlewares = append(m.middlewares[:i], append([]Middleware{middleware}, m.middlewares[i:]...)...)
inserted = true
break
}
}
if !inserted {
m.middlewares = append(m.middlewares, middleware)
}
}
// Execute runs the middleware chain
func (m *MiddlewareManager) Execute(ctx context.Context, data map[string]interface{}) MiddlewareResult {
m.mutex.RLock()
defer m.mutex.RUnlock()
if len(m.middlewares) == 0 {
return MiddlewareResult{Continue: true, Data: data}
}
return m.executeChain(ctx, data, 0)
}
// executeChain recursively executes middleware chain
func (m *MiddlewareManager) executeChain(ctx context.Context, data map[string]interface{}, index int) MiddlewareResult {
if index >= len(m.middlewares) {
return MiddlewareResult{Continue: true, Data: data}
}
middleware := m.middlewares[index]
if !middleware.Enabled {
return m.executeChain(ctx, data, index+1)
}
// Create the next function
next := func(ctx context.Context, data map[string]interface{}) MiddlewareResult {
return m.executeChain(ctx, data, index+1)
}
// Execute current middleware
return m.executeMiddleware(ctx, middleware, data, next)
}
// executeMiddleware executes a single middleware
func (m *MiddlewareManager) executeMiddleware(ctx context.Context, middleware Middleware, data map[string]interface{}, next func(context.Context, map[string]interface{}) MiddlewareResult) MiddlewareResult {
switch middleware.Type {
case MiddlewareAuth:
return m.executeAuthMiddleware(ctx, middleware, data, next)
case MiddlewareLogging:
return m.executeLoggingMiddleware(ctx, middleware, data, next)
case MiddlewareRateLimit:
return m.executeRateLimitMiddleware(ctx, middleware, data, next)
case MiddlewareValidate:
return m.executeValidateMiddleware(ctx, middleware, data, next)
case MiddlewareTransform:
return m.executeTransformMiddleware(ctx, middleware, data, next)
case MiddlewareCustom:
return m.executeCustomMiddleware(ctx, middleware, data, next)
default:
// Unknown middleware type, continue
return next(ctx, data)
}
}
// Auth middleware implementation
func (m *MiddlewareManager) executeAuthMiddleware(ctx context.Context, middleware Middleware, data map[string]interface{}, next func(context.Context, map[string]interface{}) MiddlewareResult) MiddlewareResult {
// Extract token from data or context
token, exists := data["auth_token"].(string)
if !exists {
if authHeader, ok := data["headers"].(map[string]string); ok {
if auth, ok := authHeader["Authorization"]; ok {
token = auth
}
}
}
if token == "" {
return MiddlewareResult{
Continue: false,
Error: fmt.Errorf("authentication token required"),
Data: data,
}
}
// Validate token (simplified)
if !isValidToken(token) {
return MiddlewareResult{
Continue: false,
Error: fmt.Errorf("invalid authentication token"),
Data: data,
}
}
// Add user context
username := extractUsernameFromToken(token)
user := &User{
ID: username,
Username: username,
Role: UserRoleOperator,
Permissions: getUserPermissions(username),
}
authContext := &AuthContext{
User: user,
Token: token,
Permissions: user.Permissions,
}
data["auth_context"] = authContext
data["user"] = user
return next(ctx, data)
}
// Logging middleware implementation
func (m *MiddlewareManager) executeLoggingMiddleware(ctx context.Context, middleware Middleware, data map[string]interface{}, next func(context.Context, map[string]interface{}) MiddlewareResult) MiddlewareResult {
startTime := time.Now()
// Log request
log.Printf("[MIDDLEWARE] %s - Started processing request", middleware.Name)
// Continue to next middleware
result := next(ctx, data)
// Log response
duration := time.Since(startTime)
if result.Error != nil {
log.Printf("[MIDDLEWARE] %s - Completed with error in %v: %v", middleware.Name, duration, result.Error)
} else {
log.Printf("[MIDDLEWARE] %s - Completed successfully in %v", middleware.Name, duration)
}
return result
}
// Rate limiting middleware implementation
func (m *MiddlewareManager) executeRateLimitMiddleware(ctx context.Context, middleware Middleware, data map[string]interface{}, next func(context.Context, map[string]interface{}) MiddlewareResult) MiddlewareResult {
// Get user/IP for rate limiting
identifier := "anonymous"
if user, exists := data["user"].(*User); exists {
identifier = user.ID
} else if ip, exists := data["client_ip"].(string); exists {
identifier = ip
}
// Check rate limit (simplified implementation)
limit := getConfigInt(middleware.Config, "requests_per_minute", 60)
if !checkRateLimit(identifier, limit) {
return MiddlewareResult{
Continue: false,
Error: fmt.Errorf("rate limit exceeded for %s", identifier),
Data: data,
}
}
return next(ctx, data)
}
// Validation middleware implementation
func (m *MiddlewareManager) executeValidateMiddleware(ctx context.Context, middleware Middleware, data map[string]interface{}, next func(context.Context, map[string]interface{}) MiddlewareResult) MiddlewareResult {
// Get validation rules from config
rules, exists := middleware.Config["rules"].([]interface{})
if !exists {
return next(ctx, data)
}
// Validate data
for _, rule := range rules {
if ruleMap, ok := rule.(map[string]interface{}); ok {
field := ruleMap["field"].(string)
ruleType := ruleMap["type"].(string)
if err := validateDataField(data, field, ruleType, ruleMap); err != nil {
return MiddlewareResult{
Continue: false,
Error: fmt.Errorf("validation failed: %v", err),
Data: data,
}
}
}
}
return next(ctx, data)
}
// Transform middleware implementation
func (m *MiddlewareManager) executeTransformMiddleware(ctx context.Context, middleware Middleware, data map[string]interface{}, next func(context.Context, map[string]interface{}) MiddlewareResult) MiddlewareResult {
// Get transformation rules from config
transforms, exists := middleware.Config["transforms"].(map[string]interface{})
if !exists {
return next(ctx, data)
}
// Apply transformations
for field, transform := range transforms {
if transformType, ok := transform.(string); ok {
switch transformType {
case "lowercase":
if value, exists := data[field].(string); exists {
data[field] = strings.ToLower(value)
}
case "uppercase":
if value, exists := data[field].(string); exists {
data[field] = strings.ToUpper(value)
}
case "trim":
if value, exists := data[field].(string); exists {
data[field] = strings.TrimSpace(value)
}
}
}
}
return next(ctx, data)
}
// Custom middleware implementation
func (m *MiddlewareManager) executeCustomMiddleware(ctx context.Context, middleware Middleware, data map[string]interface{}, next func(context.Context, map[string]interface{}) MiddlewareResult) MiddlewareResult {
// Custom middleware can be implemented by users
// For now, just pass through
return next(ctx, data)
}
// Permission checking
type PermissionChecker struct {
permissions map[string][]Permission
mutex sync.RWMutex
}
// NewPermissionChecker creates a new permission checker
func NewPermissionChecker() *PermissionChecker {
return &PermissionChecker{
permissions: make(map[string][]Permission),
}
}
// AddPermission adds a permission for a user
func (p *PermissionChecker) AddPermission(userID string, permission Permission) {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.permissions[userID] == nil {
p.permissions[userID] = make([]Permission, 0)
}
p.permissions[userID] = append(p.permissions[userID], permission)
}
// CheckPermission checks if a user has permission for an action
func (p *PermissionChecker) CheckPermission(userID, resource string, action PermissionAction) bool {
p.mutex.RLock()
defer p.mutex.RUnlock()
permissions, exists := p.permissions[userID]
if !exists {
return false
}
for _, perm := range permissions {
if perm.Resource == resource && perm.Action == action {
return true
}
// Check for admin permission
if perm.Action == PermissionAdmin {
return true
}
}
return false
}
// Utility functions for middleware
// Rate limiting cache
var rateLimitCache = make(map[string][]time.Time)
var rateLimitMutex sync.RWMutex
func checkRateLimit(identifier string, requestsPerMinute int) bool {
rateLimitMutex.Lock()
defer rateLimitMutex.Unlock()
now := time.Now()
cutoff := now.Add(-time.Minute)
// Initialize if not exists
if rateLimitCache[identifier] == nil {
rateLimitCache[identifier] = make([]time.Time, 0)
}
// Remove old entries
requests := rateLimitCache[identifier]
validRequests := make([]time.Time, 0)
for _, req := range requests {
if req.After(cutoff) {
validRequests = append(validRequests, req)
}
}
// Check if limit exceeded
if len(validRequests) >= requestsPerMinute {
return false
}
// Add current request
validRequests = append(validRequests, now)
rateLimitCache[identifier] = validRequests
return true
}
func getConfigInt(config map[string]interface{}, key string, defaultValue int) int {
if value, exists := config[key]; exists {
if intValue, ok := value.(int); ok {
return intValue
}
if floatValue, ok := value.(float64); ok {
return int(floatValue)
}
}
return defaultValue
}
func validateDataField(data map[string]interface{}, field, ruleType string, rule map[string]interface{}) error {
value, exists := data[field]
switch ruleType {
case "required":
if !exists || value == nil || value == "" {
return fmt.Errorf("field '%s' is required", field)
}
case "type":
expectedType := rule["expected"].(string)
if !isCorrectType(value, expectedType) {
return fmt.Errorf("field '%s' must be of type %s", field, expectedType)
}
case "length":
if str, ok := value.(string); ok {
minLen := int(rule["min"].(float64))
maxLen := int(rule["max"].(float64))
if len(str) < minLen || len(str) > maxLen {
return fmt.Errorf("field '%s' length must be between %d and %d", field, minLen, maxLen)
}
}
}
return nil
}
// User management system
type UserManager struct {
users map[string]*User
sessions map[string]*AuthContext
permissionChecker *PermissionChecker
mutex sync.RWMutex
}
// NewUserManager creates a new user manager
func NewUserManager() *UserManager {
return &UserManager{
users: make(map[string]*User),
sessions: make(map[string]*AuthContext),
permissionChecker: NewPermissionChecker(),
}
}
// CreateUser creates a new user
func (u *UserManager) CreateUser(user *User) error {
u.mutex.Lock()
defer u.mutex.Unlock()
if _, exists := u.users[user.ID]; exists {
return fmt.Errorf("user %s already exists", user.ID)
}
user.CreatedAt = time.Now()
user.UpdatedAt = time.Now()
u.users[user.ID] = user
// Add default permissions based on role
u.addDefaultPermissions(user)
return nil
}
// GetUser retrieves a user by ID
func (u *UserManager) GetUser(userID string) (*User, error) {
u.mutex.RLock()
defer u.mutex.RUnlock()
user, exists := u.users[userID]
if !exists {
return nil, fmt.Errorf("user %s not found", userID)
}
return user, nil
}
// AuthenticateUser authenticates a user and creates a session
func (u *UserManager) AuthenticateUser(username, password string) (*AuthContext, error) {
u.mutex.Lock()
defer u.mutex.Unlock()
// Find user by username
var user *User
for _, u := range u.users {
if u.Username == username {
user = u
break
}
}
if user == nil {
return nil, fmt.Errorf("invalid credentials")
}
// In production, properly hash and verify password
if password != "password" {
return nil, fmt.Errorf("invalid credentials")
}
// Create session
sessionID := generateSessionID()
token := generateToken(user)
authContext := &AuthContext{
User: user,
SessionID: sessionID,
Token: token,
Permissions: user.Permissions,
}
u.sessions[sessionID] = authContext
return authContext, nil
}
// ValidateSession validates a session token
func (u *UserManager) ValidateSession(token string) (*AuthContext, error) {
u.mutex.RLock()
defer u.mutex.RUnlock()
for _, session := range u.sessions {
if session.Token == token {
return session, nil
}
}
return nil, fmt.Errorf("invalid session token")
}
// addDefaultPermissions adds default permissions based on user role
func (u *UserManager) addDefaultPermissions(user *User) {
switch user.Role {
case UserRoleAdmin:
u.permissionChecker.AddPermission(user.ID, Permission{
Resource: "*",
Action: PermissionAdmin,
})
case UserRoleManager:
u.permissionChecker.AddPermission(user.ID, Permission{
Resource: "workflow",
Action: PermissionRead,
})
u.permissionChecker.AddPermission(user.ID, Permission{
Resource: "workflow",
Action: PermissionWrite,
})
u.permissionChecker.AddPermission(user.ID, Permission{
Resource: "workflow",
Action: PermissionExecute,
})
case UserRoleOperator:
u.permissionChecker.AddPermission(user.ID, Permission{
Resource: "workflow",
Action: PermissionRead,
})
u.permissionChecker.AddPermission(user.ID, Permission{
Resource: "workflow",
Action: PermissionExecute,
})
case UserRoleViewer:
u.permissionChecker.AddPermission(user.ID, Permission{
Resource: "workflow",
Action: PermissionRead,
})
}
}
func generateSessionID() string {
return fmt.Sprintf("session_%d", time.Now().UnixNano())
}
// Helper functions for authentication middleware
func isValidToken(token string) bool {
// Simple token validation - in real implementation, verify JWT or session token
return token != "" && len(token) > 10
}
func extractUsernameFromToken(token string) string {
// Simple username extraction - in real implementation, decode JWT claims
if strings.HasPrefix(token, "bearer_") {
return strings.TrimPrefix(token, "bearer_")
}
return "unknown"
}
func getUserPermissions(username string) []string {
// Simple permission mapping - in real implementation, fetch from database
switch username {
case "admin":
return []string{"read", "write", "execute", "delete"}
case "manager":
return []string{"read", "write", "execute"}
default:
return []string{"read"}
}
}
func isCorrectType(value interface{}, expectedType string) bool {
switch expectedType {
case "string":
_, ok := value.(string)
return ok
case "number":
_, ok := value.(float64)
if !ok {
_, ok = value.(int)
}
return ok
case "boolean":
_, ok := value.(bool)
return ok
case "array":
_, ok := value.([]interface{})
return ok
case "object":
_, ok := value.(map[string]interface{})
return ok
default:
return false
}
}
func generateToken(user *User) string {
// Simple token generation - in real implementation, create JWT
return fmt.Sprintf("token_%s_%d", user.Username, time.Now().Unix())
}

393
workflow/processors.go Normal file
View File

@@ -0,0 +1,393 @@
package workflow
import (
"context"
"fmt"
"log"
"strings"
"time"
)
// ProcessorFactory creates processor instances for different node types
type ProcessorFactory struct {
processors map[string]func() Processor
}
// NewProcessorFactory creates a new processor factory with all registered processors
func NewProcessorFactory() *ProcessorFactory {
factory := &ProcessorFactory{
processors: make(map[string]func() Processor),
}
// Register basic processors
factory.RegisterProcessor("task", func() Processor { return &TaskProcessor{} })
factory.RegisterProcessor("api", func() Processor { return &APIProcessor{} })
factory.RegisterProcessor("transform", func() Processor { return &TransformProcessor{} })
factory.RegisterProcessor("decision", func() Processor { return &DecisionProcessor{} })
factory.RegisterProcessor("timer", func() Processor { return &TimerProcessor{} })
factory.RegisterProcessor("parallel", func() Processor { return &ParallelProcessor{} })
factory.RegisterProcessor("sequence", func() Processor { return &SequenceProcessor{} })
factory.RegisterProcessor("loop", func() Processor { return &LoopProcessor{} })
factory.RegisterProcessor("filter", func() Processor { return &FilterProcessor{} })
factory.RegisterProcessor("aggregator", func() Processor { return &AggregatorProcessor{} })
factory.RegisterProcessor("error", func() Processor { return &ErrorProcessor{} })
// Register advanced processors
factory.RegisterProcessor("subdag", func() Processor { return &SubDAGProcessor{} })
factory.RegisterProcessor("html", func() Processor { return &HTMLProcessor{} })
factory.RegisterProcessor("sms", func() Processor { return &SMSProcessor{} })
factory.RegisterProcessor("auth", func() Processor { return &AuthProcessor{} })
factory.RegisterProcessor("validator", func() Processor { return &ValidatorProcessor{} })
factory.RegisterProcessor("router", func() Processor { return &RouterProcessor{} })
factory.RegisterProcessor("storage", func() Processor { return &StorageProcessor{} })
factory.RegisterProcessor("notify", func() Processor { return &NotifyProcessor{} })
factory.RegisterProcessor("webhook_receiver", func() Processor { return &WebhookReceiverProcessor{} })
return factory
}
// RegisterProcessor registers a new processor type
func (f *ProcessorFactory) RegisterProcessor(nodeType string, creator func() Processor) {
f.processors[nodeType] = creator
}
// CreateProcessor creates a processor instance for the given node type
func (f *ProcessorFactory) CreateProcessor(nodeType string) (Processor, error) {
creator, exists := f.processors[nodeType]
if !exists {
return nil, fmt.Errorf("unknown processor type: %s", nodeType)
}
return creator(), nil
}
// Basic Processors
// TaskProcessor handles task execution
type TaskProcessor struct{}
func (p *TaskProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
log.Printf("Executing task: %s", input.Node.Name)
// Execute the task based on configuration
config := input.Node.Config
// Simulate task execution based on script or command
if config.Script != "" {
log.Printf("Executing script: %s", config.Script)
} else if config.Command != "" {
log.Printf("Executing command: %s", config.Command)
}
time.Sleep(100 * time.Millisecond)
result := &ProcessingResult{
Success: true,
Data: map[string]interface{}{"task_completed": true, "task_name": input.Node.Name},
Message: fmt.Sprintf("Task %s completed successfully", input.Node.Name),
}
return result, nil
}
// APIProcessor handles API calls
type APIProcessor struct{}
func (p *APIProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
url := config.URL
if url == "" {
return &ProcessingResult{
Success: false,
Error: "URL not specified in API configuration",
}, nil
}
method := "GET"
if config.Method != "" {
method = strings.ToUpper(config.Method)
}
log.Printf("Making %s request to %s", method, url)
// Simulate API call
time.Sleep(200 * time.Millisecond)
// Mock response
response := map[string]interface{}{
"status": "success",
"url": url,
"method": method,
"data": "mock response data",
}
return &ProcessingResult{
Success: true,
Data: response,
Message: fmt.Sprintf("API call to %s completed", url),
}, nil
}
// TransformProcessor handles data transformation
type TransformProcessor struct{}
func (p *TransformProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
// Get transformation rules from Custom config
transforms, ok := config.Custom["transforms"].(map[string]interface{})
if !ok {
return &ProcessingResult{
Success: false,
Error: "No transformation rules specified",
}, nil
}
// Apply transformations to input data
result := make(map[string]interface{})
for key, rule := range transforms {
// Simple field mapping for now
if sourceField, ok := rule.(string); ok {
if value, exists := input.Data[sourceField]; exists {
result[key] = value
}
}
}
return &ProcessingResult{
Success: true,
Data: result,
Message: "Data transformation completed",
}, nil
}
// DecisionProcessor handles conditional logic
type DecisionProcessor struct{}
func (p *DecisionProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
condition := config.Condition
if condition == "" {
return &ProcessingResult{
Success: false,
Error: "No condition specified",
}, nil
}
// Simple condition evaluation
decision := p.evaluateCondition(condition, input.Data)
result := &ProcessingResult{
Success: true,
Data: map[string]interface{}{
"decision": decision,
"condition": condition,
},
Message: fmt.Sprintf("Decision made: %t", decision),
}
return result, nil
}
func (p *DecisionProcessor) evaluateCondition(condition string, data map[string]interface{}) bool {
// Simple condition evaluation - in real implementation, use expression parser
if strings.Contains(condition, "==") {
parts := strings.Split(condition, "==")
if len(parts) == 2 {
field := strings.TrimSpace(parts[0])
expectedValue := strings.TrimSpace(strings.Trim(parts[1], "\"'"))
if value, exists := data[field]; exists {
return fmt.Sprintf("%v", value) == expectedValue
}
}
}
// Default to true for simplicity
return true
}
// TimerProcessor handles time-based operations
type TimerProcessor struct{}
func (p *TimerProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
duration := 1 * time.Second
if config.Duration > 0 {
duration = config.Duration
} else if config.Schedule != "" {
// Simple schedule parsing - just use 1 second for demo
duration = 1 * time.Second
}
log.Printf("Timer waiting for %v", duration)
select {
case <-ctx.Done():
return &ProcessingResult{
Success: false,
Error: "Timer cancelled",
}, ctx.Err()
case <-time.After(duration):
return &ProcessingResult{
Success: true,
Data: map[string]interface{}{"waited": duration.String()},
Message: fmt.Sprintf("Timer completed after %v", duration),
}, nil
}
}
// ParallelProcessor handles parallel execution
type ParallelProcessor struct{}
func (p *ParallelProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
// This would typically trigger parallel execution of child nodes
// For now, just return success
return &ProcessingResult{
Success: true,
Data: map[string]interface{}{"parallel_execution": "started"},
Message: "Parallel execution initiated",
}, nil
}
// SequenceProcessor handles sequential execution
type SequenceProcessor struct{}
func (p *SequenceProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
// This would typically ensure sequential execution of child nodes
// For now, just return success
return &ProcessingResult{
Success: true,
Data: map[string]interface{}{"sequence_execution": "started"},
Message: "Sequential execution initiated",
}, nil
}
// LoopProcessor handles loop operations
type LoopProcessor struct{}
func (p *LoopProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
iterations := 1
if iterValue, ok := config.Custom["iterations"].(float64); ok {
iterations = int(iterValue)
}
results := make([]interface{}, 0, iterations)
for i := 0; i < iterations; i++ {
// In real implementation, this would execute child nodes
results = append(results, map[string]interface{}{
"iteration": i + 1,
"data": input.Data,
})
}
return &ProcessingResult{
Success: true,
Data: map[string]interface{}{"loop_results": results},
Message: fmt.Sprintf("Loop completed %d iterations", iterations),
}, nil
}
// FilterProcessor handles data filtering
type FilterProcessor struct{}
func (p *FilterProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
filterField, ok := config.Custom["field"].(string)
if !ok {
return &ProcessingResult{
Success: false,
Error: "No filter field specified",
}, nil
}
filterValue := config.Custom["value"]
// Simple filtering logic
if value, exists := input.Data[filterField]; exists {
if fmt.Sprintf("%v", value) == fmt.Sprintf("%v", filterValue) {
return &ProcessingResult{
Success: true,
Data: input.Data,
Message: "Filter passed",
}, nil
}
}
return &ProcessingResult{
Success: false,
Data: nil,
Message: "Filter failed",
}, nil
}
// AggregatorProcessor handles data aggregation
type AggregatorProcessor struct{}
func (p *AggregatorProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
operation := "sum"
if op, ok := config.Custom["operation"].(string); ok {
operation = op
}
field, ok := config.Custom["field"].(string)
if !ok {
return &ProcessingResult{
Success: false,
Error: "No aggregation field specified",
}, nil
}
// Simple aggregation - in real implementation, collect data from multiple sources
value := input.Data[field]
result := map[string]interface{}{
"operation": operation,
"field": field,
"result": value,
}
return &ProcessingResult{
Success: true,
Data: result,
Message: fmt.Sprintf("Aggregation completed: %s on %s", operation, field),
}, nil
}
// ErrorProcessor handles error scenarios
type ErrorProcessor struct{}
func (p *ErrorProcessor) Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error) {
config := input.Node.Config
errorMessage := "Simulated error"
if msg, ok := config.Custom["message"].(string); ok {
errorMessage = msg
}
shouldFail := true
if fail, ok := config.Custom["fail"].(bool); ok {
shouldFail = fail
}
if shouldFail {
return &ProcessingResult{
Success: false,
Error: errorMessage,
}, nil
}
return &ProcessingResult{
Success: true,
Data: map[string]interface{}{"error_handled": true},
Message: "Error processor completed without error",
}, nil
}

532
workflow/registry.go Normal file
View File

@@ -0,0 +1,532 @@
package workflow
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
)
// InMemoryRegistry - In-memory implementation of WorkflowRegistry
type InMemoryRegistry struct {
workflows map[string]*WorkflowDefinition
versions map[string][]string // workflow_id -> list of versions
mu sync.RWMutex
}
// NewInMemoryRegistry creates a new in-memory workflow registry
func NewInMemoryRegistry() WorkflowRegistry {
return &InMemoryRegistry{
workflows: make(map[string]*WorkflowDefinition),
versions: make(map[string][]string),
}
}
func (r *InMemoryRegistry) Store(ctx context.Context, definition *WorkflowDefinition) error {
r.mu.Lock()
defer r.mu.Unlock()
// Create a unique key for this version
key := fmt.Sprintf("%s:%s", definition.ID, definition.Version)
// Store the workflow
r.workflows[key] = definition
// Track versions
if versions, exists := r.versions[definition.ID]; exists {
// Check if version already exists
found := false
for _, v := range versions {
if v == definition.Version {
found = true
break
}
}
if !found {
r.versions[definition.ID] = append(versions, definition.Version)
}
} else {
r.versions[definition.ID] = []string{definition.Version}
}
return nil
}
func (r *InMemoryRegistry) Get(ctx context.Context, id string, version string) (*WorkflowDefinition, error) {
r.mu.RLock()
defer r.mu.RUnlock()
var key string
if version == "" {
// Get latest version
versions, exists := r.versions[id]
if !exists || len(versions) == 0 {
return nil, fmt.Errorf("workflow not found: %s", id)
}
// Sort versions and get the latest
sort.Slice(versions, func(i, j int) bool {
return versions[i] > versions[j] // Assuming version strings are sortable
})
key = fmt.Sprintf("%s:%s", id, versions[0])
} else {
key = fmt.Sprintf("%s:%s", id, version)
}
definition, exists := r.workflows[key]
if !exists {
return nil, fmt.Errorf("workflow not found: %s (version: %s)", id, version)
}
return definition, nil
}
func (r *InMemoryRegistry) List(ctx context.Context, filter *WorkflowFilter) ([]*WorkflowDefinition, error) {
r.mu.RLock()
defer r.mu.RUnlock()
var results []*WorkflowDefinition
for _, definition := range r.workflows {
if r.matchesFilter(definition, filter) {
results = append(results, definition)
}
}
// Apply sorting
if filter != nil && filter.SortBy != "" {
r.sortResults(results, filter.SortBy, filter.SortOrder)
}
// Apply pagination
if filter != nil {
start := filter.Offset
end := start + filter.Limit
if start >= len(results) {
return []*WorkflowDefinition{}, nil
}
if end > len(results) {
end = len(results)
}
if filter.Limit > 0 {
results = results[start:end]
}
}
return results, nil
}
func (r *InMemoryRegistry) Delete(ctx context.Context, id string) error {
r.mu.Lock()
defer r.mu.Unlock()
// Get all versions for this workflow
versions, exists := r.versions[id]
if !exists {
return fmt.Errorf("workflow not found: %s", id)
}
// Delete all versions
for _, version := range versions {
key := fmt.Sprintf("%s:%s", id, version)
delete(r.workflows, key)
}
// Remove from versions map
delete(r.versions, id)
return nil
}
func (r *InMemoryRegistry) GetVersions(ctx context.Context, id string) ([]string, error) {
r.mu.RLock()
defer r.mu.RUnlock()
versions, exists := r.versions[id]
if !exists {
return nil, fmt.Errorf("workflow not found: %s", id)
}
// Return a copy to avoid modification
result := make([]string, len(versions))
copy(result, versions)
// Sort versions
sort.Slice(result, func(i, j int) bool {
return result[i] > result[j]
})
return result, nil
}
func (r *InMemoryRegistry) matchesFilter(definition *WorkflowDefinition, filter *WorkflowFilter) bool {
if filter == nil {
return true
}
// Filter by status
if len(filter.Status) > 0 {
found := false
for _, status := range filter.Status {
if definition.Status == status {
found = true
break
}
}
if !found {
return false
}
}
// Filter by category
if len(filter.Category) > 0 {
found := false
for _, category := range filter.Category {
if definition.Category == category {
found = true
break
}
}
if !found {
return false
}
}
// Filter by owner
if len(filter.Owner) > 0 {
found := false
for _, owner := range filter.Owner {
if definition.Owner == owner {
found = true
break
}
}
if !found {
return false
}
}
// Filter by tags
if len(filter.Tags) > 0 {
for _, filterTag := range filter.Tags {
found := false
for _, defTag := range definition.Tags {
if defTag == filterTag {
found = true
break
}
}
if !found {
return false
}
}
}
// Filter by creation date
if filter.CreatedFrom != nil && definition.CreatedAt.Before(*filter.CreatedFrom) {
return false
}
if filter.CreatedTo != nil && definition.CreatedAt.After(*filter.CreatedTo) {
return false
}
// Filter by search term
if filter.Search != "" {
searchTerm := strings.ToLower(filter.Search)
if !strings.Contains(strings.ToLower(definition.Name), searchTerm) &&
!strings.Contains(strings.ToLower(definition.Description), searchTerm) {
return false
}
}
return true
}
func (r *InMemoryRegistry) sortResults(results []*WorkflowDefinition, sortBy, sortOrder string) {
ascending := sortOrder != "desc"
switch sortBy {
case "name":
sort.Slice(results, func(i, j int) bool {
if ascending {
return results[i].Name < results[j].Name
}
return results[i].Name > results[j].Name
})
case "created_at":
sort.Slice(results, func(i, j int) bool {
if ascending {
return results[i].CreatedAt.Before(results[j].CreatedAt)
}
return results[i].CreatedAt.After(results[j].CreatedAt)
})
case "updated_at":
sort.Slice(results, func(i, j int) bool {
if ascending {
return results[i].UpdatedAt.Before(results[j].UpdatedAt)
}
return results[i].UpdatedAt.After(results[j].UpdatedAt)
})
default:
// Default sort by name
sort.Slice(results, func(i, j int) bool {
if ascending {
return results[i].Name < results[j].Name
}
return results[i].Name > results[j].Name
})
}
}
// InMemoryStateManager - In-memory implementation of StateManager
type InMemoryStateManager struct {
executions map[string]*Execution
checkpoints map[string][]*Checkpoint // execution_id -> checkpoints
mu sync.RWMutex
}
// NewInMemoryStateManager creates a new in-memory state manager
func NewInMemoryStateManager() StateManager {
return &InMemoryStateManager{
executions: make(map[string]*Execution),
checkpoints: make(map[string][]*Checkpoint),
}
}
func (s *InMemoryStateManager) CreateExecution(ctx context.Context, execution *Execution) error {
s.mu.Lock()
defer s.mu.Unlock()
if execution.ID == "" {
return fmt.Errorf("execution ID cannot be empty")
}
s.executions[execution.ID] = execution
return nil
}
func (s *InMemoryStateManager) UpdateExecution(ctx context.Context, execution *Execution) error {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.executions[execution.ID]; !exists {
return fmt.Errorf("execution not found: %s", execution.ID)
}
execution.UpdatedAt = time.Now()
s.executions[execution.ID] = execution
return nil
}
func (s *InMemoryStateManager) GetExecution(ctx context.Context, executionID string) (*Execution, error) {
s.mu.RLock()
defer s.mu.RUnlock()
execution, exists := s.executions[executionID]
if !exists {
return nil, fmt.Errorf("execution not found: %s", executionID)
}
return execution, nil
}
func (s *InMemoryStateManager) ListExecutions(ctx context.Context, filter *ExecutionFilter) ([]*Execution, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var results []*Execution
for _, execution := range s.executions {
if s.matchesExecutionFilter(execution, filter) {
results = append(results, execution)
}
}
// Apply sorting
if filter != nil && filter.SortBy != "" {
s.sortExecutionResults(results, filter.SortBy, filter.SortOrder)
}
// Apply pagination
if filter != nil {
start := filter.Offset
end := start + filter.Limit
if start >= len(results) {
return []*Execution{}, nil
}
if end > len(results) {
end = len(results)
}
if filter.Limit > 0 {
results = results[start:end]
}
}
return results, nil
}
func (s *InMemoryStateManager) DeleteExecution(ctx context.Context, executionID string) error {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.executions, executionID)
delete(s.checkpoints, executionID)
return nil
}
func (s *InMemoryStateManager) SaveCheckpoint(ctx context.Context, executionID string, checkpoint *Checkpoint) error {
s.mu.Lock()
defer s.mu.Unlock()
if checkpoints, exists := s.checkpoints[executionID]; exists {
s.checkpoints[executionID] = append(checkpoints, checkpoint)
} else {
s.checkpoints[executionID] = []*Checkpoint{checkpoint}
}
return nil
}
func (s *InMemoryStateManager) GetCheckpoints(ctx context.Context, executionID string) ([]*Checkpoint, error) {
s.mu.RLock()
defer s.mu.RUnlock()
checkpoints, exists := s.checkpoints[executionID]
if !exists {
return []*Checkpoint{}, nil
}
// Return a copy
result := make([]*Checkpoint, len(checkpoints))
copy(result, checkpoints)
return result, nil
}
func (s *InMemoryStateManager) matchesExecutionFilter(execution *Execution, filter *ExecutionFilter) bool {
if filter == nil {
return true
}
// Filter by workflow ID
if len(filter.WorkflowID) > 0 {
found := false
for _, workflowID := range filter.WorkflowID {
if execution.WorkflowID == workflowID {
found = true
break
}
}
if !found {
return false
}
}
// Filter by status
if len(filter.Status) > 0 {
found := false
for _, status := range filter.Status {
if execution.Status == status {
found = true
break
}
}
if !found {
return false
}
}
// Filter by owner
if len(filter.Owner) > 0 {
found := false
for _, owner := range filter.Owner {
if execution.Owner == owner {
found = true
break
}
}
if !found {
return false
}
}
// Filter by priority
if len(filter.Priority) > 0 {
found := false
for _, priority := range filter.Priority {
if execution.Priority == priority {
found = true
break
}
}
if !found {
return false
}
}
// Filter by start date
if filter.StartedFrom != nil && execution.StartedAt.Before(*filter.StartedFrom) {
return false
}
if filter.StartedTo != nil && execution.StartedAt.After(*filter.StartedTo) {
return false
}
return true
}
func (s *InMemoryStateManager) sortExecutionResults(results []*Execution, sortBy, sortOrder string) {
ascending := sortOrder != "desc"
switch sortBy {
case "started_at":
sort.Slice(results, func(i, j int) bool {
if ascending {
return results[i].StartedAt.Before(results[j].StartedAt)
}
return results[i].StartedAt.After(results[j].StartedAt)
})
case "updated_at":
sort.Slice(results, func(i, j int) bool {
if ascending {
return results[i].UpdatedAt.Before(results[j].UpdatedAt)
}
return results[i].UpdatedAt.After(results[j].UpdatedAt)
})
case "priority":
sort.Slice(results, func(i, j int) bool {
priorityOrder := map[Priority]int{
PriorityLow: 1,
PriorityMedium: 2,
PriorityHigh: 3,
PriorityCritical: 4,
}
pi := priorityOrder[results[i].Priority]
pj := priorityOrder[results[j].Priority]
if ascending {
return pi < pj
}
return pi > pj
})
default:
// Default sort by started_at
sort.Slice(results, func(i, j int) bool {
if ascending {
return results[i].StartedAt.Before(results[j].StartedAt)
}
return results[i].StartedAt.After(results[j].StartedAt)
})
}
}

41
workflow/sms-demo/go.mod Normal file
View File

@@ -0,0 +1,41 @@
module sms-demo
go 1.24.2
require (
github.com/gofiber/fiber/v2 v2.52.9
github.com/oarkflow/mq v0.0.0
)
replace github.com/oarkflow/mq => ../../
require (
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/goccy/go-reflect v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/mattn/go-sqlite3 v1.14.32 // indirect
github.com/oarkflow/date v0.0.4 // indirect
github.com/oarkflow/dipper v0.0.6 // indirect
github.com/oarkflow/errors v0.0.6 // indirect
github.com/oarkflow/expr v0.0.11 // indirect
github.com/oarkflow/form v0.0.0-20241203111156-b1be5636af43 // indirect
github.com/oarkflow/jet v0.0.4 // indirect
github.com/oarkflow/json v0.0.28 // indirect
github.com/oarkflow/log v1.0.83 // indirect
github.com/oarkflow/squealx v0.0.56 // indirect
github.com/oarkflow/xid v1.2.8 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.51.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/time v0.12.0 // indirect
)

61
workflow/sms-demo/go.sum Normal file
View File

@@ -0,0 +1,61 @@
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/goccy/go-reflect v1.2.0 h1:O0T8rZCuNmGXewnATuKYnkL0xm6o8UNOJZd/gOkb9ms=
github.com/goccy/go-reflect v1.2.0/go.mod h1:n0oYZn8VcV2CkWTxi8B9QjkCoq6GTtCEdfmR66YhFtE=
github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw=
github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/oarkflow/date v0.0.4 h1:EwY/wiS3CqZNBx7b2x+3kkJwVNuGk+G0dls76kL/fhU=
github.com/oarkflow/date v0.0.4/go.mod h1:xQTFc6p6O5VX6J75ZrPJbelIFGca1ASmhpgirFqL8vM=
github.com/oarkflow/dipper v0.0.6 h1:E+ak9i4R1lxx0B04CjfG5DTLTmwuWA1nrdS6KIHdUxQ=
github.com/oarkflow/dipper v0.0.6/go.mod h1:bnXQ6465eP8WZ9U3M7R24zeBG3P6IU5SASuvpAyCD9w=
github.com/oarkflow/errors v0.0.6 h1:qTBzVblrX6bFbqYLfatsrZHMBPchOZiIE3pfVzh1+k8=
github.com/oarkflow/errors v0.0.6/go.mod h1:UETn0Q55PJ+YUbpR4QImIoBavd6QvJtyW/oeTT7ghZM=
github.com/oarkflow/expr v0.0.11 h1:H6h+dIUlU+xDlijMXKQCh7TdE6MGVoFPpZU7q/dziRI=
github.com/oarkflow/expr v0.0.11/go.mod h1:WgMZqP44h7SBwKyuGZwC15vj46lHtI0/QpKdEZpRVE4=
github.com/oarkflow/form v0.0.0-20241203111156-b1be5636af43 h1:AjNCAnpzDi6BYVUfXUUuIdWruRu4npSSTrR3eZ6Vppw=
github.com/oarkflow/form v0.0.0-20241203111156-b1be5636af43/go.mod h1:fYwqhq8Sig9y0cmgO6q6WN8SP/rrsi7h2Yyk+Ufrne8=
github.com/oarkflow/jet v0.0.4 h1:rs0nTzodye/9zhrSX7FlR80Gjaty6ei2Ln0pmaUrdwg=
github.com/oarkflow/jet v0.0.4/go.mod h1:YXIc47aYyx1xKpnmuz1Z9o88cxxa47r7X3lfUAxZ0Qg=
github.com/oarkflow/json v0.0.28 h1:pCt7yezRDJeSdSu2OZ6Aai0F4J9qCwmPWRsCmfaH8Ds=
github.com/oarkflow/json v0.0.28/go.mod h1:E6Mg4LoY1PHCntfAegZmECc6Ux24sBpXJAu2lwZUe74=
github.com/oarkflow/log v1.0.83 h1:T/38wvjuNeVJ9PDo0wJDTnTUQZ5XeqlcvpbCItuFFJo=
github.com/oarkflow/log v1.0.83/go.mod h1:dMn57z9uq11Y264cx9c9Ac7ska9qM+EBhn4qf9CNlsM=
github.com/oarkflow/squealx v0.0.56 h1:8rPx3jWNnt4ez2P10m1Lz4HTAbvrs0MZ7jjKDJ87Vqg=
github.com/oarkflow/squealx v0.0.56/go.mod h1:J5PNHmu3fH+IgrNm8tltz0aX4drT5uZ5j3r9dW5jQ/8=
github.com/oarkflow/xid v1.2.8 h1:uCIX61Binq2RPMsqImZM6pPGzoZTmRyD6jguxF9aAA0=
github.com/oarkflow/xid v1.2.8/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA=
github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b h1:DXr+pvt3nC887026GRP39Ej11UATqWDmWuS99x26cD0=
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b/go.mod h1:4QTo5u+SEIbbKW1RacMZq1YEfOBqeXa19JeshGi+zc4=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=

1157
workflow/sms-demo/main.go Normal file

File diff suppressed because it is too large Load Diff

BIN
workflow/sms-demo/sms-demo Executable file

Binary file not shown.

538
workflow/types.go Normal file
View File

@@ -0,0 +1,538 @@
package workflow
import (
"context"
"time"
)
// Core types
type (
WorkflowStatus string
ExecutionStatus string
NodeType string
Priority string
UserRole string
PermissionAction string
MiddlewareType string
)
// User and security types
type User struct {
ID string `json:"id"`
Username string `json:"username"`
Email string `json:"email"`
Role UserRole `json:"role"`
Permissions []string `json:"permissions"`
Metadata map[string]string `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type AuthContext struct {
User *User `json:"user"`
SessionID string `json:"session_id"`
Token string `json:"token"`
Permissions []string `json:"permissions"`
Metadata map[string]string `json:"metadata"`
}
type Permission struct {
ID string `json:"id"`
Resource string `json:"resource"`
Action PermissionAction `json:"action"`
Scope string `json:"scope"`
}
// Middleware types
type Middleware struct {
ID string `json:"id"`
Name string `json:"name"`
Type MiddlewareType `json:"type"`
Priority int `json:"priority"`
Config map[string]interface{} `json:"config"`
Enabled bool `json:"enabled"`
}
type MiddlewareResult struct {
Continue bool `json:"continue"`
Error error `json:"error"`
Data map[string]interface{} `json:"data"`
Headers map[string]string `json:"headers"`
}
// Webhook and callback types
type WebhookConfig struct {
URL string `json:"url"`
Method string `json:"method"`
Headers map[string]string `json:"headers"`
Secret string `json:"secret"`
Timeout time.Duration `json:"timeout"`
RetryPolicy *RetryPolicy `json:"retry_policy"`
}
type WebhookReceiver struct {
ID string `json:"id"`
Path string `json:"path"`
Method string `json:"method"`
Secret string `json:"secret"`
Handler string `json:"handler"`
Config map[string]interface{} `json:"config"`
Middlewares []string `json:"middlewares"`
}
type CallbackData struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
ExecutionID string `json:"execution_id"`
NodeID string `json:"node_id"`
Data map[string]interface{} `json:"data"`
Headers map[string]string `json:"headers"`
Timestamp time.Time `json:"timestamp"`
}
const (
// Workflow statuses
WorkflowStatusDraft WorkflowStatus = "draft"
WorkflowStatusActive WorkflowStatus = "active"
WorkflowStatusInactive WorkflowStatus = "inactive"
WorkflowStatusDeprecated WorkflowStatus = "deprecated"
// Execution statuses
ExecutionStatusPending ExecutionStatus = "pending"
ExecutionStatusRunning ExecutionStatus = "running"
ExecutionStatusCompleted ExecutionStatus = "completed"
ExecutionStatusFailed ExecutionStatus = "failed"
ExecutionStatusCancelled ExecutionStatus = "cancelled"
ExecutionStatusSuspended ExecutionStatus = "suspended"
// Node types
NodeTypeTask NodeType = "task"
NodeTypeAPI NodeType = "api"
NodeTypeTransform NodeType = "transform"
NodeTypeDecision NodeType = "decision"
NodeTypeHumanTask NodeType = "human_task"
NodeTypeTimer NodeType = "timer"
NodeTypeLoop NodeType = "loop"
NodeTypeParallel NodeType = "parallel"
NodeTypeDatabase NodeType = "database"
NodeTypeEmail NodeType = "email"
NodeTypeWebhook NodeType = "webhook"
NodeTypeSubDAG NodeType = "sub_dag"
NodeTypeHTML NodeType = "html"
NodeTypeSMS NodeType = "sms"
NodeTypeAuth NodeType = "auth"
NodeTypeValidator NodeType = "validator"
NodeTypeRouter NodeType = "router"
NodeTypeNotify NodeType = "notify"
NodeTypeStorage NodeType = "storage"
NodeTypeWebhookRx NodeType = "webhook_receiver"
// Priorities
PriorityLow Priority = "low"
PriorityMedium Priority = "medium"
PriorityHigh Priority = "high"
PriorityCritical Priority = "critical"
// User roles
UserRoleAdmin UserRole = "admin"
UserRoleManager UserRole = "manager"
UserRoleOperator UserRole = "operator"
UserRoleViewer UserRole = "viewer"
UserRoleGuest UserRole = "guest"
// Permission actions
PermissionRead PermissionAction = "read"
PermissionWrite PermissionAction = "write"
PermissionExecute PermissionAction = "execute"
PermissionDelete PermissionAction = "delete"
PermissionAdmin PermissionAction = "admin"
// Middleware types
MiddlewareAuth MiddlewareType = "auth"
MiddlewareLogging MiddlewareType = "logging"
MiddlewareRateLimit MiddlewareType = "rate_limit"
MiddlewareValidate MiddlewareType = "validate"
MiddlewareTransform MiddlewareType = "transform"
MiddlewareCustom MiddlewareType = "custom"
)
// WorkflowDefinition represents a complete workflow
type WorkflowDefinition struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Version string `json:"version"`
Status WorkflowStatus `json:"status"`
Tags []string `json:"tags"`
Category string `json:"category"`
Owner string `json:"owner"`
Nodes []WorkflowNode `json:"nodes"`
Edges []WorkflowEdge `json:"edges"`
Variables map[string]Variable `json:"variables"`
Config WorkflowConfig `json:"config"`
Metadata map[string]interface{} `json:"metadata"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatedBy string `json:"created_by"`
UpdatedBy string `json:"updated_by"`
}
// WorkflowNode represents a single node in the workflow
type WorkflowNode struct {
ID string `json:"id"`
Name string `json:"name"`
Type NodeType `json:"type"`
Description string `json:"description"`
Config NodeConfig `json:"config"`
Position Position `json:"position"`
Timeout *time.Duration `json:"timeout,omitempty"`
RetryPolicy *RetryPolicy `json:"retry_policy,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// NodeConfig holds configuration for different node types
type NodeConfig struct {
// Common fields
Script string `json:"script,omitempty"`
Command string `json:"command,omitempty"`
Variables map[string]string `json:"variables,omitempty"`
// API node fields
URL string `json:"url,omitempty"`
Method string `json:"method,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
// Transform node fields
TransformType string `json:"transform_type,omitempty"`
Expression string `json:"expression,omitempty"`
// Decision node fields
Condition string `json:"condition,omitempty"`
DecisionRules []Rule `json:"decision_rules,omitempty"`
// Timer node fields
Duration time.Duration `json:"duration,omitempty"`
Schedule string `json:"schedule,omitempty"`
// Database node fields
Query string `json:"query,omitempty"`
Connection string `json:"connection,omitempty"`
// Email node fields
EmailTo []string `json:"email_to,omitempty"`
Subject string `json:"subject,omitempty"`
Body string `json:"body,omitempty"`
// Sub-DAG node fields
SubWorkflowID string `json:"sub_workflow_id,omitempty"`
InputMapping map[string]string `json:"input_mapping,omitempty"`
OutputMapping map[string]string `json:"output_mapping,omitempty"`
// HTML node fields
Template string `json:"template,omitempty"`
TemplateData map[string]string `json:"template_data,omitempty"`
OutputPath string `json:"output_path,omitempty"`
// SMS node fields
Provider string `json:"provider,omitempty"`
From string `json:"from,omitempty"`
SMSTo []string `json:"sms_to,omitempty"`
Message string `json:"message,omitempty"`
MessageType string `json:"message_type,omitempty"`
// Auth node fields
AuthType string `json:"auth_type,omitempty"`
Credentials map[string]string `json:"credentials,omitempty"`
TokenExpiry time.Duration `json:"token_expiry,omitempty"`
// Validator node fields
ValidationType string `json:"validation_type,omitempty"`
ValidationRules []ValidationRule `json:"validation_rules,omitempty"`
// Router node fields
RoutingRules []RoutingRule `json:"routing_rules,omitempty"`
DefaultRoute string `json:"default_route,omitempty"`
// Storage node fields
StorageType string `json:"storage_type,omitempty"`
StorageOperation string `json:"storage_operation,omitempty"`
StorageKey string `json:"storage_key,omitempty"`
StoragePath string `json:"storage_path,omitempty"`
StorageConfig map[string]string `json:"storage_config,omitempty"`
// Notification node fields
NotifyType string `json:"notify_type,omitempty"`
NotificationType string `json:"notification_type,omitempty"`
NotificationRecipients []string `json:"notification_recipients,omitempty"`
NotificationMessage string `json:"notification_message,omitempty"`
Recipients []string `json:"recipients,omitempty"`
Channel string `json:"channel,omitempty"`
// Webhook receiver fields
ListenPath string `json:"listen_path,omitempty"`
Secret string `json:"secret,omitempty"`
WebhookSecret string `json:"webhook_secret,omitempty"`
WebhookSignature string `json:"webhook_signature,omitempty"`
WebhookTransforms map[string]interface{} `json:"webhook_transforms,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
// Custom configuration
Custom map[string]interface{} `json:"custom,omitempty"`
}
// ValidationRule for validator nodes
type ValidationRule struct {
Field string `json:"field"`
Type string `json:"type"` // "string", "number", "email", "regex", "required"
Required bool `json:"required"`
MinLength int `json:"min_length,omitempty"`
MaxLength int `json:"max_length,omitempty"`
Min *float64 `json:"min,omitempty"`
Max *float64 `json:"max,omitempty"`
Pattern string `json:"pattern,omitempty"`
Value interface{} `json:"value,omitempty"`
Message string `json:"message,omitempty"`
}
// RoutingRule for router nodes
type RoutingRule struct {
Condition string `json:"condition"`
Destination string `json:"destination"`
Priority int `json:"priority"`
Weight int `json:"weight"`
IsDefault bool `json:"is_default"`
}
// Rule for decision nodes
type Rule struct {
Condition string `json:"condition"`
Output interface{} `json:"output"`
NextNode string `json:"next_node,omitempty"`
}
// WorkflowEdge represents a connection between nodes
type WorkflowEdge struct {
ID string `json:"id"`
FromNode string `json:"from_node"`
ToNode string `json:"to_node"`
Condition string `json:"condition,omitempty"`
Priority int `json:"priority"`
Label string `json:"label,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// Variable definition for workflow
type Variable struct {
Name string `json:"name"`
Type string `json:"type"`
DefaultValue interface{} `json:"default_value"`
Required bool `json:"required"`
Description string `json:"description"`
}
// WorkflowConfig holds configuration for the entire workflow
type WorkflowConfig struct {
Timeout *time.Duration `json:"timeout,omitempty"`
MaxRetries int `json:"max_retries"`
Priority Priority `json:"priority"`
Concurrency int `json:"concurrency"`
EnableAudit bool `json:"enable_audit"`
EnableMetrics bool `json:"enable_metrics"`
Notifications []string `json:"notifications"`
ErrorHandling ErrorHandling `json:"error_handling"`
}
// ErrorHandling configuration
type ErrorHandling struct {
OnFailure string `json:"on_failure"` // "stop", "continue", "retry"
MaxErrors int `json:"max_errors"`
Rollback bool `json:"rollback"`
}
// Execution represents a workflow execution instance
type Execution struct {
ID string `json:"id"`
WorkflowID string `json:"workflow_id"`
WorkflowVersion string `json:"workflow_version"`
Status ExecutionStatus `json:"status"`
Input map[string]interface{} `json:"input"`
Output map[string]interface{} `json:"output"`
Context ExecutionContext `json:"context"`
CurrentNode string `json:"current_node"`
ExecutedNodes []ExecutedNode `json:"executed_nodes"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
Priority Priority `json:"priority"`
Owner string `json:"owner"`
TriggeredBy string `json:"triggered_by"`
ParentExecution string `json:"parent_execution,omitempty"`
}
// ExecutionContext holds runtime context
type ExecutionContext struct {
Variables map[string]interface{} `json:"variables"`
Secrets map[string]string `json:"secrets,omitempty"`
Metadata map[string]interface{} `json:"metadata"`
Trace []TraceEntry `json:"trace"`
Checkpoints []Checkpoint `json:"checkpoints"`
}
// TraceEntry for execution tracing
type TraceEntry struct {
Timestamp time.Time `json:"timestamp"`
NodeID string `json:"node_id"`
Event string `json:"event"`
Data interface{} `json:"data,omitempty"`
}
// Checkpoint for execution recovery
type Checkpoint struct {
ID string `json:"id"`
NodeID string `json:"node_id"`
Timestamp time.Time `json:"timestamp"`
State map[string]interface{} `json:"state"`
}
// ExecutedNode tracks execution of individual nodes
type ExecutedNode struct {
NodeID string `json:"node_id"`
Status ExecutionStatus `json:"status"`
Input map[string]interface{} `json:"input"`
Output map[string]interface{} `json:"output"`
Error string `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Duration time.Duration `json:"duration"`
RetryCount int `json:"retry_count"`
Logs []LogEntry `json:"logs"`
}
// LogEntry for node execution logs
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// Supporting types
type Position struct {
X float64 `json:"x"`
Y float64 `json:"y"`
}
type RetryPolicy struct {
MaxAttempts int `json:"max_attempts"`
Delay time.Duration `json:"delay"`
Backoff string `json:"backoff"` // "linear", "exponential", "fixed"
MaxDelay time.Duration `json:"max_delay,omitempty"`
}
// Filter types
type WorkflowFilter struct {
Status []WorkflowStatus `json:"status"`
Category []string `json:"category"`
Owner []string `json:"owner"`
Tags []string `json:"tags"`
CreatedFrom *time.Time `json:"created_from"`
CreatedTo *time.Time `json:"created_to"`
Search string `json:"search"`
Limit int `json:"limit"`
Offset int `json:"offset"`
SortBy string `json:"sort_by"`
SortOrder string `json:"sort_order"`
}
type ExecutionFilter struct {
WorkflowID []string `json:"workflow_id"`
Status []ExecutionStatus `json:"status"`
Owner []string `json:"owner"`
Priority []Priority `json:"priority"`
StartedFrom *time.Time `json:"started_from"`
StartedTo *time.Time `json:"started_to"`
Limit int `json:"limit"`
Offset int `json:"offset"`
SortBy string `json:"sort_by"`
SortOrder string `json:"sort_order"`
}
type ProcessingContext struct {
Node *WorkflowNode
Data map[string]interface{}
Variables map[string]interface{}
User *User
Middleware *MiddlewareManager
}
type ProcessingResult struct {
Success bool `json:"success"`
Data map[string]interface{} `json:"data,omitempty"`
Error string `json:"error,omitempty"`
Message string `json:"message,omitempty"`
}
// Core interfaces
type Processor interface {
Process(ctx context.Context, input ProcessingContext) (*ProcessingResult, error)
}
type WorkflowRegistry interface {
Store(ctx context.Context, definition *WorkflowDefinition) error
Get(ctx context.Context, id string, version string) (*WorkflowDefinition, error)
List(ctx context.Context, filter *WorkflowFilter) ([]*WorkflowDefinition, error)
Delete(ctx context.Context, id string) error
GetVersions(ctx context.Context, id string) ([]string, error)
}
type StateManager interface {
CreateExecution(ctx context.Context, execution *Execution) error
UpdateExecution(ctx context.Context, execution *Execution) error
GetExecution(ctx context.Context, executionID string) (*Execution, error)
ListExecutions(ctx context.Context, filter *ExecutionFilter) ([]*Execution, error)
DeleteExecution(ctx context.Context, executionID string) error
SaveCheckpoint(ctx context.Context, executionID string, checkpoint *Checkpoint) error
GetCheckpoints(ctx context.Context, executionID string) ([]*Checkpoint, error)
}
type WorkflowExecutor interface {
Start(ctx context.Context) error
Stop(ctx context.Context)
Execute(ctx context.Context, definition *WorkflowDefinition, execution *Execution) error
Cancel(ctx context.Context, executionID string) error
Suspend(ctx context.Context, executionID string) error
Resume(ctx context.Context, executionID string) error
}
type WorkflowScheduler interface {
Start(ctx context.Context) error
Stop(ctx context.Context)
ScheduleExecution(ctx context.Context, execution *Execution, delay time.Duration) error
CancelScheduledExecution(ctx context.Context, executionID string) error
}
// Config for the workflow engine
type Config struct {
MaxWorkers int `json:"max_workers"`
ExecutionTimeout time.Duration `json:"execution_timeout"`
EnableMetrics bool `json:"enable_metrics"`
EnableAudit bool `json:"enable_audit"`
EnableTracing bool `json:"enable_tracing"`
LogLevel string `json:"log_level"`
Storage StorageConfig `json:"storage"`
Security SecurityConfig `json:"security"`
}
type StorageConfig struct {
Type string `json:"type"` // "memory", "database"
ConnectionURL string `json:"connection_url,omitempty"`
MaxConnections int `json:"max_connections"`
}
type SecurityConfig struct {
EnableAuth bool `json:"enable_auth"`
AllowedOrigins []string `json:"allowed_origins"`
JWTSecret string `json:"jwt_secret,omitempty"`
RequiredScopes []string `json:"required_scopes"`
}