diff --git a/handlers/data_handler.go b/handlers/data_handler.go index 6299654..67923a1 100644 --- a/handlers/data_handler.go +++ b/handlers/data_handler.go @@ -20,10 +20,12 @@ type DataHandler struct { } func (h *DataHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} + } + if data == nil { + data = make(map[string]any) } operation, ok := h.Payload.Data["operation"].(string) @@ -34,6 +36,8 @@ func (h *DataHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result var result map[string]any var conditionStatus string switch operation { + case "extract": + result = h.extractData(ctx, data) case "sort": result = h.sortData(data) case "deduplicate": @@ -73,6 +77,34 @@ func (h *DataHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result } } +func (h *DataHandler) extractData(ctx context.Context, data map[string]any) map[string]any { + result := make(map[string]any) + + // Copy existing data + for k, v := range data { + result[k] = v + } + + // Extract data based on mapping + if h.Payload.Mapping != nil { + for targetField, sourcePath := range h.Payload.Mapping { + _, val := dag.GetVal(ctx, sourcePath, data) + if val != nil { + result[targetField] = val + } + } + } + + // Handle default values + if defaultPath, ok := h.Payload.Data["default_path"].(string); ok { + if path, exists := result["path"]; !exists || path == "" { + result["path"] = defaultPath + } + } + + return result +} + func (h *DataHandler) sortData(data map[string]any) map[string]any { result := make(map[string]any) diff --git a/handlers/field_handler.go b/handlers/field_handler.go index 8191bad..6eca671 100644 --- a/handlers/field_handler.go +++ b/handlers/field_handler.go @@ -16,10 +16,9 @@ type FieldHandler struct { } func (h *FieldHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } operation, ok := h.Payload.Data["operation"].(string) diff --git a/handlers/flatten_handler.go b/handlers/flatten_handler.go index 05ce6f9..69d04ae 100644 --- a/handlers/flatten_handler.go +++ b/handlers/flatten_handler.go @@ -15,8 +15,7 @@ type FlattenHandler struct { } func (h *FlattenHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } diff --git a/handlers/format_handler.go b/handlers/format_handler.go index 24cd6a4..f415b9e 100644 --- a/handlers/format_handler.go +++ b/handlers/format_handler.go @@ -18,35 +18,60 @@ type FormatHandler struct { } func (h *FormatHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } + if data == nil { + data = make(map[string]any) + } + + // Handle mapping first + if h.Payload.Mapping != nil { + for k, v := range h.Payload.Mapping { + _, val := dag.GetVal(ctx, v, data) + data[k] = val + } + } formatType, ok := h.Payload.Data["format_type"].(string) if !ok { - return mq.Result{Error: fmt.Errorf("format_type not specified"), Ctx: ctx} + // If no format_type specified, just return the data with mapping applied + resultPayload, err := json.Marshal(data) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to marshal result: %w", err), Ctx: ctx} + } + return mq.Result{Payload: resultPayload, Ctx: ctx} } var result map[string]any + // Copy data to result + if data != nil { + result = make(map[string]any) + for k, v := range data { + result[k] = v + } + } else { + result = make(map[string]any) + } + switch formatType { case "string": - result = h.formatToString(data) + result = h.formatToString(result) case "number": - result = h.formatToNumber(data) + result = h.formatToNumber(result) case "date": - result = h.formatDate(data) + result = h.formatDate(result) case "currency": - result = h.formatCurrency(data) + result = h.formatCurrency(result) case "uppercase": - result = h.formatUppercase(data) + result = h.formatUppercase(result) case "lowercase": - result = h.formatLowercase(data) + result = h.formatLowercase(result) case "capitalize": - result = h.formatCapitalize(data) + result = h.formatCapitalize(result) case "trim": - result = h.formatTrim(data) + result = h.formatTrim(result) default: return mq.Result{Error: fmt.Errorf("unsupported format_type: %s", formatType), Ctx: ctx} } diff --git a/handlers/group_handler.go b/handlers/group_handler.go index 0df001a..34e1126 100644 --- a/handlers/group_handler.go +++ b/handlers/group_handler.go @@ -16,10 +16,9 @@ type GroupHandler struct { } func (h *GroupHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } // Extract the data array diff --git a/handlers/json_handler.go b/handlers/json_handler.go index f2e03e6..b940862 100644 --- a/handlers/json_handler.go +++ b/handlers/json_handler.go @@ -15,10 +15,9 @@ type JSONHandler struct { } func (h *JSONHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } operation, ok := h.Payload.Data["operation"].(string) diff --git a/handlers/split_join_handler.go b/handlers/split_join_handler.go index 5fa6832..aaa7f96 100644 --- a/handlers/split_join_handler.go +++ b/handlers/split_join_handler.go @@ -16,10 +16,9 @@ type SplitHandler struct { } func (h *SplitHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } operation, ok := h.Payload.Data["operation"].(string) @@ -149,10 +148,9 @@ type JoinHandler struct { } func (h *JoinHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) + data, err := dag.UnmarshalPayload[map[string]any](ctx, task.Payload) if err != nil { - return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err)} + return mq.Result{Error: fmt.Errorf("failed to unmarshal task payload: %w", err), Ctx: ctx} } operation, ok := h.Payload.Data["operation"].(string) diff --git a/services/console/run_handler.go b/services/console/run_handler.go index 7adff8d..2ba20bc 100644 --- a/services/console/run_handler.go +++ b/services/console/run_handler.go @@ -85,39 +85,97 @@ func (receiver *RunHandler) Extend() contracts.Extend { Aliases: []string{"hf"}, Usage: "Header to be passed to the handler", }, + { + Name: "enhanced", + Value: "false", + Aliases: []string{"e"}, + Usage: "Run as enhanced handler with workflow engine support", + }, + { + Name: "workflow", + Value: "false", + Aliases: []string{"w"}, + Usage: "Enable workflow engine features", + }, }, } -} - -// Handle Execute the console command. +} // Handle Execute the console command. func (receiver *RunHandler) Handle(ctx contracts.Context) error { name := ctx.Option("name") serve := ctx.Option("serve") + enhanced := ctx.Option("enhanced") + workflow := ctx.Option("workflow") + if serve == "" { serve = "false" } + if enhanced == "" { + enhanced = "false" + } + if workflow == "" { + workflow = "false" + } + if name == "" { return errors.New("Handler name has to be provided") } - handler := receiver.userConfig.GetHandler(name) - if handler == nil { - return errors.New("Handler not found") + + // Check if enhanced handler is requested or if handler is configured as enhanced + isEnhanced := enhanced == "true" || receiver.userConfig.IsEnhancedHandler(name) + + var flow *dag.DAG + var err error + + if isEnhanced { + // Try to get enhanced handler first + enhancedHandler := receiver.userConfig.GetEnhancedHandler(name) + if enhancedHandler != nil { + fmt.Printf("Setting up enhanced handler: %s\n", name) + flow, err = services.SetupEnhancedHandler(*enhancedHandler, receiver.brokerAddr) + if err != nil { + return fmt.Errorf("failed to setup enhanced handler: %w", err) + } + } else { + // Fallback to traditional handler + handler := receiver.userConfig.GetHandler(name) + if handler == nil { + return errors.New("Handler not found") + } + flow = services.SetupHandler(*handler, receiver.brokerAddr) + } + } else { + // Traditional handler + handler := receiver.userConfig.GetHandler(name) + if handler == nil { + return errors.New("Handler not found") + } + flow = services.SetupHandler(*handler, receiver.brokerAddr) } - flow := services.SetupHandler(*handler, receiver.brokerAddr) + if flow.Error != nil { panic(flow.Error) } + port := ctx.Option("port") if port == "" { port = "8080" } if serve != "false" { + fmt.Printf("Starting %s handler server on port %s\n", + func() string { + if isEnhanced { + return "enhanced" + } else { + return "traditional" + } + }(), port) if err := flow.Start(context.Background(), ":"+port); err != nil { return fmt.Errorf("error starting handler: %w", err) } return nil } + data, err := receiver.getData(ctx, "data", "data-file", "test/data", false) if err != nil { return err @@ -130,8 +188,31 @@ func (receiver *RunHandler) Handle(ctx contracts.Context) error { if headerData == nil { headerData = make(map[string]any) } - c = context.WithValue(c, "header", headerData) - fmt.Println("Running Handler: ", name) + + // Convert headerData to map[string]any if it's not already + var headerMap map[string]any + switch h := headerData.(type) { + case map[string]any: + headerMap = h + default: + headerMap = make(map[string]any) + } + + // Add enhanced context information if workflow is enabled + if workflow == "true" || isEnhanced { + headerMap["workflow_enabled"] = true + headerMap["enhanced_mode"] = isEnhanced + } + + c = context.WithValue(c, "header", headerMap) + fmt.Printf("Running %s Handler: %s\n", + func() string { + if isEnhanced { + return "Enhanced" + } else { + return "Traditional" + } + }(), name) rs := send(c, flow, data) if rs.Error == nil { fmt.Println("Handler response", string(rs.Payload)) @@ -197,3 +278,39 @@ func Unmarshal(data any, dst any) error { } return nil } + +// Enhanced helper functions + +// getHandlerInfo returns information about the handler (traditional or enhanced) +func (receiver *RunHandler) getHandlerInfo(name string) (interface{}, bool) { + // Check enhanced handlers first + if enhancedHandler := receiver.userConfig.GetEnhancedHandler(name); enhancedHandler != nil { + return *enhancedHandler, true + } + + // Check traditional handlers + if handler := receiver.userConfig.GetHandler(name); handler != nil { + return *handler, false + } + + return nil, false +} + +// listAvailableHandlers lists all available handlers (both traditional and enhanced) +func (receiver *RunHandler) listAvailableHandlers() { + fmt.Println("Available Traditional Handlers:") + for _, handler := range receiver.userConfig.Policy.Handlers { + fmt.Printf(" - %s (%s)\n", handler.Name, handler.Key) + } + + if len(receiver.userConfig.Policy.EnhancedHandlers) > 0 { + fmt.Println("\nAvailable Enhanced Handlers:") + for _, handler := range receiver.userConfig.Policy.EnhancedHandlers { + status := "disabled" + if handler.WorkflowEnabled { + status = "workflow enabled" + } + fmt.Printf(" - %s (%s) [%s]\n", handler.Name, handler.Key, status) + } + } +} diff --git a/services/examples/json-engine/config/policies/apis/routes.json b/services/examples/json-engine/config/policies/apis/routes.json new file mode 100644 index 0000000..6c40da1 --- /dev/null +++ b/services/examples/json-engine/config/policies/apis/routes.json @@ -0,0 +1,69 @@ +{ + "routes": [ + { + "name": "SMS Workflow", + "route_uri": "/api/v1/sms/send", + "route_method": "POST", + "handler_key": "sms:workflow", + "operation": "custom", + "schema_file": "sms-send.json" + }, + { + "name": "Email Workflow", + "route_uri": "/api/v1/email/send", + "route_method": "POST", + "handler_key": "email:workflow", + "operation": "custom", + "schema_file": "email-send.json" + }, + { + "name": "Blog Engine", + "route_uri": "/api/v1/blog/*", + "route_method": "GET", + "handler_key": "blog:engine", + "operation": "custom" + }, + { + "name": "SMS Workflow DAG", + "route_uri": "/api/v1/sms/dag", + "route_method": "GET", + "handler_key": "sms:workflow", + "operation": "custom" + }, + { + "name": "Email Workflow DAG", + "route_uri": "/api/v1/email/dag", + "route_method": "GET", + "handler_key": "email:workflow", + "operation": "custom" + }, + { + "name": "SMS Page", + "route_uri": "/sms", + "route_method": "GET", + "handler_key": "sms:workflow", + "operation": "custom" + }, + { + "name": "Email Page", + "route_uri": "/email", + "route_method": "GET", + "handler_key": "email:workflow", + "operation": "custom" + }, + { + "name": "Blog Page", + "route_uri": "/blog/*", + "route_method": "GET", + "handler_key": "blog:engine", + "operation": "custom" + }, + { + "name": "Home Page", + "route_uri": "/", + "route_method": "GET", + "handler_key": "blog:engine", + "operation": "custom" + } + ] +} diff --git a/services/examples/json-engine/config/policies/handlers/blog-engine.json b/services/examples/json-engine/config/policies/handlers/blog-engine.json new file mode 100644 index 0000000..5795967 --- /dev/null +++ b/services/examples/json-engine/config/policies/handlers/blog-engine.json @@ -0,0 +1,178 @@ +{ + "name": "Blog Engine", + "key": "blog:engine", + "debug": false, + "disable_log": false, + "nodes": [ + { + "id": "start", + "name": "Start Blog Engine", + "node": "start", + "first_node": true, + "data": { + "additional_data": { + "workflow": "blog_rendering", + "version": "1.0.0" + } + } + }, + { + "id": "parse_route", + "name": "Parse Blog Route", + "node": "data", + "data": { + "mapping": { + "path": "header.param.path", + "category": "header.query.category", + "tag": "header.query.tag" + }, + "additional_data": { + "operation": "extract", + "default_path": "index" + } + } + }, + { + "id": "load_content", + "name": "Load Blog Content", + "node": "condition", + "data": { + "conditions": { + "post": { + "filter": { + "key": "path", + "operator": "startsWith", + "value": "post/" + }, + "node": "load_post" + }, + "category": { + "filter": { + "key": "category", + "operator": "exists" + }, + "node": "load_category" + }, + "index": { + "filter": { + "key": "path", + "operator": "eq", + "value": "index" + }, + "node": "load_index" + } + }, + "additional_data": { + "default_action": "load_index" + } + } + }, + { + "id": "load_post", + "name": "Load Blog Post", + "node": "format", + "data": { + "mapping": { + "content_type": "eval.{{'post'}}", + "title": "eval.{{'Sample Blog Post'}}", + "author": "eval.{{'John Doe'}}", + "date": "eval.{{now()}}", + "content": "eval.{{'This is a sample blog post content. It demonstrates the blog engine workflow.'}}", + "tags": "eval.{{['technology', 'workflow', 'automation']}}" + } + } + }, + { + "id": "load_category", + "name": "Load Category Posts", + "node": "format", + "data": { + "mapping": { + "content_type": "eval.{{'category'}}", + "category": "category", + "posts": "eval.{{[{'title': 'Post 1', 'slug': 'post-1'}, {'title': 'Post 2', 'slug': 'post-2'}]}}" + } + } + }, + { + "id": "load_index", + "name": "Load Blog Index", + "node": "format", + "data": { + "mapping": { + "content_type": "eval.{{'index'}}", + "recent_posts": "eval.{{[{'title': 'Latest Post', 'slug': 'latest-post', 'excerpt': 'This is the latest blog post...'}]}}", + "categories": "eval.{{['Technology', 'Tutorials', 'News']}}" + } + } + }, + { + "id": "render_blog", + "name": "Render Blog Page", + "node": "render-html", + "data": { + "additional_data": { + "template_file": "templates/blog.html" + } + } + }, + { + "id": "output", + "name": "Blog Output", + "node": "output", + "data": { + "mapping": { + "content_type": "eval.{{'text/html'}}", + "rendered": "html_content" + } + } + } + ], + "edges": [ + { + "source": "start", + "label": "initialize", + "target": [ "parse_route" ] + }, + { + "source": "parse_route", + "label": "parsed", + "target": [ "load_content" ] + }, + { + "source": "load_content.post", + "label": "load_post_content", + "target": [ "load_post" ] + }, + { + "source": "load_content.category", + "label": "load_category_content", + "target": [ "load_category" ] + }, + { + "source": "load_content.index", + "label": "load_index_content", + "target": [ "load_index" ] + }, + { + "source": "load_post", + "label": "post_loaded", + "target": [ "render_blog" ] + }, + { + "source": "load_category", + "label": "category_loaded", + "target": [ "render_blog" ] + }, + { + "source": "load_index", + "label": "index_loaded", + "target": [ "render_blog" ] + }, + { + "source": "render_blog", + "label": "rendered", + "target": [ "output" ] + } + ] +} diff --git a/services/examples/json-engine/config/policies/handlers/email-workflow.json b/services/examples/json-engine/config/policies/handlers/email-workflow.json new file mode 100644 index 0000000..8fe93aa --- /dev/null +++ b/services/examples/json-engine/config/policies/handlers/email-workflow.json @@ -0,0 +1,136 @@ +{ + "name": "Email Workflow Engine", + "key": "email:workflow", + "debug": false, + "disable_log": false, + "nodes": [ + { + "id": "start", + "name": "Start Email Workflow", + "node": "start", + "first_node": true, + "data": { + "additional_data": { + "workflow": "email_sending", + "version": "1.0.0" + } + } + }, + { + "id": "validate_email", + "name": "Validate Email Input", + "node": "data", + "data": { + "mapping": { + "to": "body.to", + "from": "body.from", + "subject": "body.subject", + "body": "body.body", + "html": "body.html" + }, + "additional_data": { + "operation": "validate_fields", + "validation_rules": { + "to": { "required": true, "type": "email" }, + "from": { "required": true, "type": "email" }, + "subject": { "required": true, "max_length": 255 } + } + } + } + }, + { + "id": "prepare_template", + "name": "Prepare Email Template", + "node": "render-html", + "data": { + "additional_data": { + "template_file": "templates/email-template.html", + "engine": "handlebars" + } + } + }, + { + "id": "send_email", + "name": "Send Email", + "node": "format", + "data": { + "mapping": { + "provider": "eval.{{'smtp'}}", + "status": "eval.{{'sent'}}", + "message_id": "eval.{{'email_' + generateID()}}", + "cost": "eval.{{0.001}}" + }, + "additional_data": { + "format_type": "string", + "smtp_config": { + "host": "smtp.gmail.com", + "port": 587, + "secure": false + } + } + } + }, + { + "id": "log_email_result", + "name": "Log Email Result", + "node": "log", + "data": { + "mapping": { + "event": "eval.{{'email_sent'}}", + "timestamp": "eval.{{now()}}", + "result": "eval.{{{'to': to, 'subject': subject, 'status': status, 'message_id': message_id}}}" + }, + "additional_data": { + "operation": "info", + "log_level": "info" + } + } + }, + { + "id": "output", + "name": "Email Response", + "node": "output", + "data": { + "mapping": { + "success": "eval.{{true}}", + "message": "eval.{{'Email sent successfully'}}", + "to": "to", + "subject": "subject", + "message_id": "message_id", + "timestamp": "eval.{{now()}}", + "status": "eval.{{'delivered'}}" + }, + "templates": { + "html": "email-template.html" + } + } + } + ], + "edges": [ + { + "source": "start", + "label": "initialize", + "target": [ "validate_email" ] + }, + { + "source": "validate_email", + "label": "validated", + "target": [ "prepare_template" ] + }, + { + "source": "prepare_template", + "label": "template_ready", + "target": [ "send_email" ] + }, + { + "source": "send_email", + "label": "sent", + "target": [ "log_email_result" ] + }, + { + "source": "log_email_result", + "label": "logged", + "target": [ "output" ] + } + ] +} diff --git a/services/examples/json-engine/config/policies/handlers/sms-workflow.json b/services/examples/json-engine/config/policies/handlers/sms-workflow.json new file mode 100644 index 0000000..27b3d5e --- /dev/null +++ b/services/examples/json-engine/config/policies/handlers/sms-workflow.json @@ -0,0 +1,223 @@ +{ + "name": "SMS Workflow Engine", + "key": "sms:workflow", + "debug": false, + "disable_log": false, + "nodes": [ + { + "id": "start", + "name": "Start SMS Workflow", + "node": "start", + "first_node": true, + "data": { + "additional_data": { + "workflow": "sms_sending", + "version": "1.0.0" + } + } + }, + { + "id": "validate_input", + "name": "Validate SMS Input", + "node": "data", + "data": { + "mapping": { + "message": "body.message", + "recipients": "body.recipients", + "sender": "body.sender", + "priority": "body.priority" + }, + "additional_data": { + "operation": "validate_fields", + "validation_rules": { + "message": { "required": true, "max_length": 1000 }, + "recipients": { "required": true, "type": "array" }, + "sender": { "required": true, "max_length": 255 }, + "priority": { "required": false, "type": "string" } + } + } + } + }, + { + "id": "select_provider", + "name": "Select SMS Provider", + "node": "condition", + "data": { + "conditions": { + "premium": { + "filter": { + "key": "priority", + "operator": "eq", + "value": "high" + }, + "node": "use_twilio" + }, + "standard": { + "filter": { + "key": "priority", + "operator": "eq", + "value": "medium" + }, + "node": "use_nexmo" + }, + "bulk": { + "filter": { + "key": "priority", + "operator": "eq", + "value": "low" + }, + "node": "use_aws" + } + }, + "additional_data": { + "default_provider": "nexmo" + } + } + }, + { + "id": "use_twilio", + "name": "Send via Twilio", + "node": "format", + "data": { + "mapping": { + "provider": "eval.{{'twilio'}}", + "cost": "eval.{{0.0075}}", + "status": "eval.{{'sent'}}", + "message_id": "eval.{{'twilio_' + generateID()}}" + }, + "additional_data": { + "format_type": "string", + "provider_config": { + "name": "Twilio", + "type": "premium", + "reliability": 0.99 + } + } + } + }, + { + "id": "use_nexmo", + "name": "Send via Nexmo", + "node": "format", + "data": { + "mapping": { + "provider": "eval.{{'nexmo'}}", + "cost": "eval.{{0.0065}}", + "status": "eval.{{'sent'}}", + "message_id": "eval.{{'nexmo_' + generateID()}}" + }, + "additional_data": { + "format_type": "string", + "provider_config": { + "name": "Vonage (Nexmo)", + "type": "standard", + "reliability": 0.97 + } + } + } + }, + { + "id": "use_aws", + "name": "Send via AWS SNS", + "node": "format", + "data": { + "mapping": { + "provider": "eval.{{'aws'}}", + "cost": "eval.{{0.0055}}", + "status": "eval.{{'sent'}}", + "message_id": "eval.{{'aws_' + generateID()}}" + }, + "additional_data": { + "format_type": "string", + "provider_config": { + "name": "AWS SNS", + "type": "bulk", + "reliability": 0.95 + } + } + } + }, + { + "id": "log_result", + "name": "Log SMS Result", + "node": "log", + "data": { + "mapping": { + "event": "eval.{{'sms_sent'}}", + "timestamp": "eval.{{now()}}", + "result": "eval.{{{'provider': provider, 'cost': cost, 'status': status, 'message_id': message_id}}}" + }, + "additional_data": { + "operation": "info", + "log_level": "info" + } + } + }, + { + "id": "output", + "name": "SMS Response", + "node": "output", + "data": { + "mapping": { + "success": "eval.{{true}}", + "message": "eval.{{'SMS sent successfully'}}", + "provider_used": "provider", + "cost": "cost", + "message_id": "message_id", + "timestamp": "eval.{{now()}}", + "status": "eval.{{'delivered'}}" + }, + "templates": { + "html": "sms-template.html" + } + } + } + ], + "edges": [ + { + "source": "start", + "label": "initialize", + "target": [ "validate_input" ] + }, + { + "source": "validate_input", + "label": "validated", + "target": [ "select_provider" ] + }, + { + "source": "select_provider.premium", + "label": "use_premium", + "target": [ "use_twilio" ] + }, + { + "source": "select_provider.standard", + "label": "use_standard", + "target": [ "use_nexmo" ] + }, + { + "source": "select_provider.bulk", + "label": "use_bulk", + "target": [ "use_aws" ] + }, + { + "source": "use_twilio", + "label": "sent", + "target": [ "log_result" ] + }, + { + "source": "use_nexmo", + "label": "sent", + "target": [ "log_result" ] + }, + { + "source": "use_aws", + "label": "sent", + "target": [ "log_result" ] + }, + { + "source": "log_result", + "label": "logged", + "target": [ "output" ] + } + ] +} diff --git a/services/examples/json-engine/config/policies/schemas/email-send.json b/services/examples/json-engine/config/policies/schemas/email-send.json new file mode 100644 index 0000000..ce9601f --- /dev/null +++ b/services/examples/json-engine/config/policies/schemas/email-send.json @@ -0,0 +1,43 @@ +{ + "type": "object", + "properties": { + "to": { + "type": "string", + "format": "email", + "description": "Recipient email address" + }, + "from": { + "type": "string", + "format": "email", + "description": "Sender email address" + }, + "subject": { + "type": "string", + "maxLength": 255, + "minLength": 1, + "description": "Email subject" + }, + "body": { + "type": "string", + "description": "Plain text email body" + }, + "html": { + "type": "string", + "description": "HTML email body" + }, + "attachments": { + "type": "array", + "items": { + "type": "object", + "properties": { + "filename": { "type": "string" }, + "content": { "type": "string" }, + "contentType": { "type": "string" } + } + }, + "description": "Email attachments" + } + }, + "required": [ "to", "from", "subject" ], + "additionalProperties": false +} diff --git a/services/examples/json-engine/config/policies/schemas/sms-send.json b/services/examples/json-engine/config/policies/schemas/sms-send.json new file mode 100644 index 0000000..d8dea54 --- /dev/null +++ b/services/examples/json-engine/config/policies/schemas/sms-send.json @@ -0,0 +1,32 @@ +{ + "type": "object", + "properties": { + "message": { + "type": "string", + "maxLength": 160, + "minLength": 1, + "description": "SMS message content" + }, + "recipients": { + "type": "array", + "items": { + "type": "string", + "pattern": "^\\+[1-9]\\d{1,14}$" + }, + "minItems": 1, + "description": "Array of phone numbers in E.164 format" + }, + "sender": { + "type": "string", + "description": "Sender identifier" + }, + "priority": { + "type": "string", + "enum": [ "high", "medium", "low" ], + "default": "medium", + "description": "SMS priority level" + } + }, + "required": [ "message", "recipients", "sender" ], + "additionalProperties": false +} diff --git a/services/examples/json-engine/config/policies/web.json b/services/examples/json-engine/config/policies/web.json new file mode 100644 index 0000000..01a71d4 --- /dev/null +++ b/services/examples/json-engine/config/policies/web.json @@ -0,0 +1,16 @@ +{ + "prefix": "/", + "middlewares": [ + { "name": "cors" } + ], + "static": { + "dir": "./public", + "prefix": "/static", + "options": { + "byte_range": true, + "browse": true, + "compress": true, + "index_file": "index.html" + } + } +} diff --git a/services/examples/json-engine/main b/services/examples/json-engine/main new file mode 100755 index 0000000..595c707 Binary files /dev/null and b/services/examples/json-engine/main differ diff --git a/services/examples/json-engine/main.go b/services/examples/json-engine/main.go index 40e3047..a20beb3 100644 --- a/services/examples/json-engine/main.go +++ b/services/examples/json-engine/main.go @@ -1,54 +1,48 @@ package main import ( - "encoding/json" - "flag" - "fmt" - "io/ioutil" - "log" - "os" + "github.com/gofiber/fiber/v2" + "github.com/oarkflow/cli" + "github.com/oarkflow/cli/console" + "github.com/oarkflow/cli/contracts" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/handlers" + "github.com/oarkflow/mq/services" + dagConsole "github.com/oarkflow/mq/services/console" ) -func loadConfiguration(configPath string) (*AppConfiguration, error) { - data, err := ioutil.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("failed to read config file: %v", err) - } - - var config AppConfiguration - if err := json.Unmarshal(data, &config); err != nil { - return nil, fmt.Errorf("failed to parse JSON config: %v", err) - } - - return &config, nil -} - func main() { - // Parse command line flags - configPath := flag.String("config", "sms-app.json", "Path to JSON configuration file") - flag.Parse() - - // If positional args provided, use the first one - if len(os.Args) > 1 && !flag.Parsed() { - *configPath = os.Args[1] - } - - // Load configuration first - config, err := loadConfiguration(*configPath) - if err != nil { - log.Fatalf("Failed to load configuration: %v", err) - } - - // Create JSON engine with configuration - engine := NewJSONEngine(config) - - // Compile configuration - if err := engine.Compile(); err != nil { - log.Fatalf("Failed to compile configuration: %v", err) - } - - // Start server - if err := engine.Start(); err != nil { - log.Fatalf("Failed to start server: %v", err) - } + handlers.Init() + brokerAddr := ":5051" + loader := services.NewLoader("config") + loader.Load() + serverApp := fiber.New(fiber.Config{EnablePrintRoutes: true}) + services.Setup(loader, serverApp, brokerAddr) + cli.Run("json-engine", "v1.0.0", func(client contracts.Cli) []contracts.Command { + return []contracts.Command{ + console.NewListCommand(client), + dagConsole.NewRunHandler(loader.UserConfig, loader.ParsedPath, brokerAddr), + dagConsole.NewRunServer(serverApp), + } + }) +} + +func init() { + // Register standard handlers + dag.AddHandler("render-html", func(id string) mq.Processor { return handlers.NewRenderHTMLNode(id) }) + dag.AddHandler("condition", func(id string) mq.Processor { return handlers.NewCondition(id) }) + dag.AddHandler("output", func(id string) mq.Processor { return handlers.NewOutputHandler(id) }) + dag.AddHandler("print", func(id string) mq.Processor { return handlers.NewPrintHandler(id) }) + dag.AddHandler("format", func(id string) mq.Processor { return handlers.NewFormatHandler(id) }) + dag.AddHandler("data", func(id string) mq.Processor { return handlers.NewDataHandler(id) }) + dag.AddHandler("log", func(id string) mq.Processor { return handlers.NewLogHandler(id) }) + dag.AddHandler("json", func(id string) mq.Processor { return handlers.NewJSONHandler(id) }) + dag.AddHandler("split", func(id string) mq.Processor { return handlers.NewSplitHandler(id) }) + dag.AddHandler("join", func(id string) mq.Processor { return handlers.NewJoinHandler(id) }) + dag.AddHandler("field", func(id string) mq.Processor { return handlers.NewFieldHandler(id) }) + dag.AddHandler("flatten", func(id string) mq.Processor { return handlers.NewFlattenHandler(id) }) + dag.AddHandler("group", func(id string) mq.Processor { return handlers.NewGroupHandler(id) }) + dag.AddHandler("start", func(id string) mq.Processor { return handlers.NewStartHandler(id) }) } diff --git a/services/examples/json-engine/public/index.html b/services/examples/json-engine/public/index.html new file mode 100644 index 0000000..a1a63ca --- /dev/null +++ b/services/examples/json-engine/public/index.html @@ -0,0 +1,175 @@ + + + + + + + JSON Engine - Workflow Platform + + + + +
+
+

🚀 JSON Engine - Workflow Platform

+

Dynamic workflow engine built with user_config.go and setup.go integration

+
+ +
+

📧 Email Workflow API

+
+ POST + /api/v1/email/send - Send email through workflow engine +
+ curl -X POST http://localhost:3000/api/v1/email/send \ + -H "Content-Type: application/json" \ + -d '{ + "to": "user@example.com", + "from": "sender@example.com", + "subject": "Test Email", + "body": "Hello from JSON Engine!" + }' +
+
+
+ GET + /api/v1/email/dag - View email workflow DAG visualization +
+
+ +
+

📱 SMS Workflow API

+
+ POST + /api/v1/sms/send - Send SMS through workflow engine +
+ curl -X POST http://localhost:3000/api/v1/sms/send \ + -H "Content-Type: application/json" \ + -d '{ + "message": "Hello from JSON Engine!", + "recipients": ["+1234567890"], + "sender": "JsonEngine", + "priority": "medium" + }' +
+
+
+ GET + /api/v1/sms/dag - View SMS workflow DAG visualization +
+
+ +
+

📝 Blog Engine

+
+ GET + /api/v1/blog/* - Dynamic blog content generation +
+ # Blog index + curl http://localhost:3000/api/v1/blog/ + + # Category posts + curl http://localhost:3000/api/v1/blog/?category=Technology + + # Individual post + curl http://localhost:3000/api/v1/blog/post/sample-post +
+
+
+ +
+

🔧 Quick Links

+ 📧 Email Workflow DAG + 📱 SMS Workflow DAG + 📝 Blog Engine +
+ +
+

🏗️ Architecture

+

This JSON Engine demonstrates:

+ +
+
+ + + diff --git a/services/examples/json-engine/templates/blog.html b/services/examples/json-engine/templates/blog.html new file mode 100644 index 0000000..c6bce54 --- /dev/null +++ b/services/examples/json-engine/templates/blog.html @@ -0,0 +1,133 @@ + + + + + + + JSON Engine Blog + + + + +
+
+

🚀 JSON Engine Blog

+

Powered by Dynamic Workflow Engine

+
+ +
+

Recent Posts

+ + +
+

Categories

+ +
+
+
+ + + diff --git a/services/examples/json-engine/templates/blog_clean.html b/services/examples/json-engine/templates/blog_clean.html new file mode 100644 index 0000000..e69de29 diff --git a/services/examples/json-engine/templates/email-template.html b/services/examples/json-engine/templates/email-template.html new file mode 100644 index 0000000..edf5852 --- /dev/null +++ b/services/examples/json-engine/templates/email-template.html @@ -0,0 +1,58 @@ + + + + + + + Email Template + + + + +
+
+

{{index . "subject"}}

+
+
+ {{if index . "html"}} + {{index . "html"}} + {{else}} +

{{index . "body"}}

+ {{end}} +
+ +
+ + + diff --git a/services/examples/json-engine/templates/sms-template.html b/services/examples/json-engine/templates/sms-template.html new file mode 100644 index 0000000..0cd659f --- /dev/null +++ b/services/examples/json-engine/templates/sms-template.html @@ -0,0 +1,109 @@ + + + + + + + SMS Notification + + + + +
+
+

📱 SMS Notification

+

From: {{.sender}}

+
+ +
+ {{.message}} +
+ +
+ Recipients: + {{range .recipients}} + {{.}} + {{end}} +
+ +
+

Priority: {{.priority}}

+

Sent: {{.timestamp}}

+

Status: {{.status}}

+
+
+ + + diff --git a/services/examples/json-service/config/policies/apis/sample.json b/services/examples/json-service/config/policies/apis/sample.json new file mode 100644 index 0000000..83d1853 --- /dev/null +++ b/services/examples/json-service/config/policies/apis/sample.json @@ -0,0 +1,29 @@ +{ + "routes": [ + { + "route_uri": "/test-route", + "route_method": "POST", + "schema_file": "test-route.json", + "description": "Handle test route", + "model": "test_route", + "operation": "custom", + "handler_key": "print:check" + }, + { + "route_uri": "/print", + "route_method": "GET", + "description": "Handles print", + "model": "print", + "operation": "custom", + "handler_key": "print:check" + }, + { + "route_uri": "/send-email", + "route_method": "GET", + "description": "Handles send email", + "model": "print", + "operation": "custom", + "handler_key": "email:notification" + } + ] +} diff --git a/services/examples/json-service/config/policies/handlers/login.json b/services/examples/json-service/config/policies/handlers/login.json new file mode 100644 index 0000000..783f81c --- /dev/null +++ b/services/examples/json-service/config/policies/handlers/login.json @@ -0,0 +1,84 @@ +{ + "name": "Login Flow", + "key": "login:flow", + "nodes": [ + { + "id": "LoginForm", + "first_node": true, + "node": "render-html", + "data": { + "additional_data": { + "schema_file": "login.json", + "template_file": "templates/basic.html" + } + } + }, + { + "id": "ValidateLogin", + "node": "condition", + "data": { + "mapping": { + "username": "username", + "password": "password" + }, + "additional_data": { + "conditions": { + "default": { + "id": "condition:default", + "node": "output" + }, + "invalid": { + "id": "condition:invalid_login", + "node": "error-page", + "group": { + "reverse": true, + "filters": [ + { + "field": "username", + "operator": "eq", + "value": "admin" + }, + { + "field": "password", + "operator": "eq", + "value": "password" + } + ] + } + } + } + } + } + }, + { + "id": "error-page", + "node": "render-html", + "data": { + "mapping": { + "error_message": "eval.{{'Invalid login credentials.'}}", + "error_field": "eval.{{'username'}}", + "retry_suggested": "eval.{{true}}" + }, + "additional_data": { + "template_file": "templates/error.html" + } + } + }, + { + "id": "output", + "node": "output", + "data": { + "mapping": { + "login_message": "eval.{{'Login successful!'}}" + } + } + } + ], + "edges": [ + { + "source": "LoginForm", + "target": [ "ValidateLogin" ] + } + ] + +} diff --git a/services/examples/json-service/config/policies/handlers/print-check.json b/services/examples/json-service/config/policies/handlers/print-check.json new file mode 100644 index 0000000..41c8ebd --- /dev/null +++ b/services/examples/json-service/config/policies/handlers/print-check.json @@ -0,0 +1,11 @@ +{ + "name": "Sample Print", + "key": "print:check", + "nodes": [ + { + "id": "print1", + "node": "print", + "first_node": true + } + ] +} diff --git a/services/examples/json-service/config/policies/handlers/send-email.json b/services/examples/json-service/config/policies/handlers/send-email.json new file mode 100644 index 0000000..ede079b --- /dev/null +++ b/services/examples/json-service/config/policies/handlers/send-email.json @@ -0,0 +1,46 @@ +{ + "name": "Email Notification System", + "key": "email:notification", + "nodes": [ + { + "id": "Login", + "name": "Check Login", + "node_key": "login:flow", + "first_node": true + }, + { + "id": "ContactForm", + "node": "render-html", + "data": { + "additional_data": { + "schema_file": "schema.json", + "template_file": "templates/basic.html" + } + } + }, + { + "id": "output", + "node": "output", + "data": { + "mapping": { + "login_message": "eval.{{'Email sent successfully!'}}" + }, + "additional_data": { + "except_fields": [ "html_content" ] + } + } + } + ], + "edges": [ + { + "source": "Login.output", + "label": "on_success", + "target": [ "ContactForm" ] + }, + { + "source": "ContactForm", + "label": "on_email_sent", + "target": [ "output" ] + } + ] +} diff --git a/services/examples/json-service/config/policies/schemas/login.json b/services/examples/json-service/config/policies/schemas/login.json new file mode 100644 index 0000000..032714a --- /dev/null +++ b/services/examples/json-service/config/policies/schemas/login.json @@ -0,0 +1,63 @@ +{ + "type": "object", + "properties": { + "username": { + "type": "string", + "title": "Username or Email", + "order": 1, + "ui": { + "element": "input", + "type": "text", + "class": "form-group", + "name": "username", + "placeholder": "Enter your username or email" + } + }, + "password": { + "type": "string", + "title": "Password", + "order": 2, + "ui": { + "element": "input", + "type": "password", + "class": "form-group", + "name": "password", + "placeholder": "Enter your password" + } + }, + "remember_me": { + "type": "boolean", + "title": "Remember Me", + "order": 3, + "ui": { + "element": "input", + "type": "checkbox", + "class": "form-check", + "name": "remember_me" + } + } + }, + "required": [ "username", "password" ], + "form": { + "class": "form-horizontal", + "action": "{{current_uri}}?task_id={{task_id}}&next=true", + "method": "POST", + "enctype": "application/x-www-form-urlencoded", + "groups": [ + { + "title": "Login Credentials", + "fields": [ "username", "password", "remember_me" ] + } + ], + "submit": { + "type": "submit", + "label": "Log In", + "class": "btn btn-primary" + }, + "reset": { + "type": "reset", + "label": "Clear", + "class": "btn btn-secondary" + } + } +} diff --git a/services/examples/json-service/config/policies/schemas/schema.json b/services/examples/json-service/config/policies/schemas/schema.json new file mode 100644 index 0000000..14d57f1 --- /dev/null +++ b/services/examples/json-service/config/policies/schemas/schema.json @@ -0,0 +1,105 @@ +{ + "type": "object", + "properties": { + "first_name": { + "type": "string", + "title": "First Name", + "order": 1, + "ui": { + "element": "input", + "class": "form-group", + "name": "first_name" + } + }, + "last_name": { + "type": "string", + "title": "Last Name", + "order": 2, + "ui": { + "element": "input", + "class": "form-group", + "name": "last_name" + } + }, + "email": { + "type": "email", + "title": "Email Address", + "order": 3, + "ui": { + "element": "input", + "type": "email", + "class": "form-group", + "name": "email" + } + }, + "user_type": { + "type": "string", + "title": "User Type", + "order": 4, + "ui": { + "element": "select", + "class": "form-group", + "name": "user_type", + "options": [ "new", "premium", "standard" ] + } + }, + "priority": { + "type": "string", + "title": "Priority Level", + "order": 5, + "ui": { + "element": "select", + "class": "form-group", + "name": "priority", + "options": [ "low", "medium", "high", "urgent" ] + } + }, + "subject": { + "type": "string", + "title": "Subject", + "order": 6, + "ui": { + "element": "input", + "class": "form-group", + "name": "subject" + } + }, + "message": { + "type": "textarea", + "title": "Message", + "order": 7, + "ui": { + "element": "textarea", + "class": "form-group", + "name": "message" + } + } + }, + "required": [ "first_name", "last_name", "email", "user_type", "priority", "subject", "message" ], + "form": { + "class": "form-horizontal", + "action": "{{current_uri}}?task_id={{task_id}}&next=true", + "method": "POST", + "enctype": "application/x-www-form-urlencoded", + "groups": [ + { + "title": "User Information", + "fields": [ "first_name", "last_name", "email" ] + }, + { + "title": "Ticket Details", + "fields": [ "user_type", "priority", "subject", "message" ] + } + ], + "submit": { + "type": "submit", + "label": "Submit", + "class": "btn btn-primary" + }, + "reset": { + "type": "reset", + "label": "Reset", + "class": "btn btn-secondary" + } + } +} diff --git a/services/examples/json-service/config/policies/schemas/test-route.json b/services/examples/json-service/config/policies/schemas/test-route.json new file mode 100644 index 0000000..076ba1c --- /dev/null +++ b/services/examples/json-service/config/policies/schemas/test-route.json @@ -0,0 +1,18 @@ +{ + "type": "object", + "description": "users", + "required": [ "user_id" ], + "properties": { + "last_name": { + "type": "string", + "default": "now()" + }, + "user_id": { + "type": [ + "integer", + "string" + ], + "maxLength": 64 + } + } +} \ No newline at end of file diff --git a/services/examples/json-service/config/policies/web.json b/services/examples/json-service/config/policies/web.json new file mode 100644 index 0000000..0c83df7 --- /dev/null +++ b/services/examples/json-service/config/policies/web.json @@ -0,0 +1,16 @@ +{ + "prefix": "/", + "middlewares": [ + {"name": "cors"} + ], + "static": { + "dir": "./public", + "prefix": "/", + "options": { + "byte_range": true, + "browse": true, + "compress": true, + "index_file": "index.html" + } + } +} diff --git a/services/examples/json-service/main.go b/services/examples/json-service/main.go new file mode 100644 index 0000000..5050865 --- /dev/null +++ b/services/examples/json-service/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "github.com/gofiber/fiber/v2" + "github.com/oarkflow/cli" + "github.com/oarkflow/cli/console" + "github.com/oarkflow/cli/contracts" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/handlers" + "github.com/oarkflow/mq/services" + dagConsole "github.com/oarkflow/mq/services/console" +) + +func main() { + handlers.Init() + brokerAddr := ":5051" + loader := services.NewLoader("config") + loader.Load() + serverApp := fiber.New(fiber.Config{EnablePrintRoutes: true}) + services.Setup(loader, serverApp, brokerAddr) + cli.Run("mq", "v0.0.1", func(client contracts.Cli) []contracts.Command { + return []contracts.Command{ + console.NewListCommand(client), + dagConsole.NewRunHandler(loader.UserConfig, loader.ParsedPath, brokerAddr), + dagConsole.NewRunServer(serverApp), + } + }) +} + +func init() { + dag.AddHandler("render-html", func(id string) mq.Processor { return handlers.NewRenderHTMLNode(id) }) + dag.AddHandler("condition", func(id string) mq.Processor { return handlers.NewCondition(id) }) + dag.AddHandler("output", func(id string) mq.Processor { return handlers.NewOutputHandler(id) }) +} diff --git a/services/examples/json-service/templates/basic.html b/services/examples/json-service/templates/basic.html new file mode 100644 index 0000000..f4ed557 --- /dev/null +++ b/services/examples/json-service/templates/basic.html @@ -0,0 +1,42 @@ + + + + + Basic Template + + + + + + +
+
+ {{form_groups}} +
+ {{form_buttons}} +
+
+
+ + + diff --git a/services/examples/json-service/templates/error.html b/services/examples/json-service/templates/error.html new file mode 100644 index 0000000..9935457 --- /dev/null +++ b/services/examples/json-service/templates/error.html @@ -0,0 +1,134 @@ + + + + + Email Error + + + + +
+
+

Email Processing Error

+ +
+ {{error_message}} +
+ + {{if error_field}} +
+ 🎯 Error Field: {{error_field}}
+ ⚡ Action Required: Please correct the highlighted field and try again.
+ 💡 Tip: Make sure all required fields are properly filled out. +
+ {{end}} + + {{if retry_suggested}} +
+ ⚠️ Temporary Issue: This appears to be a temporary system issue. + Please try sending your message again in a few moments.
+ 🔄 Auto-Retry: Our system will automatically retry failed deliveries. +
+ {{end}} + +
+ 🔄 Try Again + 📊 Check Status +
+ +
+ 🔄 DAG Error Handler | Email Notification Workflow Failed
+ Our advanced routing system ensures reliable message delivery. +
+
+ + + diff --git a/services/go.mod b/services/go.mod index 8f28fca..4fa2b47 100644 --- a/services/go.mod +++ b/services/go.mod @@ -28,6 +28,7 @@ require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/andybalholm/brotli v1.2.0 // indirect github.com/bytedance/gopkg v0.1.3 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-sql-driver/mysql v1.9.3 // indirect github.com/goccy/go-json v0.10.5 // indirect @@ -36,6 +37,7 @@ require ( github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gookit/color v1.5.4 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/hetiansu5/urlquery v1.2.7 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect @@ -63,11 +65,15 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/tinylib/msgp v1.4.0 // indirect github.com/toorop/go-dkim v0.0.0-20250226130143-9025cce95817 // indirect + github.com/urfave/cli/v2 v2.27.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.66.0 // indirect github.com/xhit/go-simple-mail/v2 v2.16.0 // indirect + github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect + github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect golang.org/x/crypto v0.42.0 // indirect golang.org/x/exp v0.0.0-20250911091902-df9299821621 // indirect golang.org/x/sync v0.17.0 // indirect diff --git a/services/go.sum b/services/go.sum index 2df8b43..76ea57a 100644 --- a/services/go.sum +++ b/services/go.sum @@ -18,6 +18,8 @@ github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwTo github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= +github.com/cpuguy83/go-md2man/v2 v2.0.6 h1:XJtiaUW6dEEqVuZiMTn1ldk455QWwEIsMIJlo5vtkx0= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -46,6 +48,8 @@ github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17k github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0= +github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hetiansu5/urlquery v1.2.7 h1:jn0h+9pIRqUziSPnRdK/gJK8S5TCnk+HZZx5fRHf8K0= @@ -135,6 +139,8 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -145,12 +151,18 @@ github.com/tinylib/msgp v1.4.0/go.mod h1:cvjFkb4RiC8qSBOPMGPSzSAx47nAsfhLVTCZZNu github.com/toorop/go-dkim v0.0.0-20201103131630-e1cd1a0a5208/go.mod h1:BzWtXXrXzZUvMacR0oF/fbDDgUPO8L36tDMmRAf14ns= github.com/toorop/go-dkim v0.0.0-20250226130143-9025cce95817 h1:q0hKh5a5FRkhuTb5JNfgjzpzvYLHjH0QOgPZPYnRWGA= github.com/toorop/go-dkim v0.0.0-20250226130143-9025cce95817/go.mod h1:BzWtXXrXzZUvMacR0oF/fbDDgUPO8L36tDMmRAf14ns= +github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w= +github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.66.0 h1:M87A0Z7EayeyNaV6pfO3tUTUiYO0dZfEJnRGXTVNuyU= github.com/valyala/fasthttp v1.66.0/go.mod h1:Y4eC+zwoocmXSVCB1JmhNbYtS7tZPRI2ztPB72EVObs= github.com/xhit/go-simple-mail/v2 v2.16.0 h1:ouGy/Ww4kuaqu2E2UrDw7SvLaziWTB60ICLkIkNVccA= github.com/xhit/go-simple-mail/v2 v2.16.0/go.mod h1:b7P5ygho6SYE+VIqpxA6QkYfv4teeyG4MKqB3utRu98= +github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 h1:QldyIu/L63oPpyvQmHgvgickp1Yw510KJOqX7H24mg8= +github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= diff --git a/services/setup.go b/services/setup.go index 4e5ab52..a568994 100644 --- a/services/setup.go +++ b/services/setup.go @@ -31,6 +31,11 @@ import ( var ValidationInstance Validation +// Enhanced service instances for workflow engine integration +var EnhancedValidationInstance EnhancedValidation +var EnhancedDAGServiceInstance EnhancedDAGService +var EnhancedServiceManagerInstance EnhancedServiceManager + func Setup(loader *Loader, serverApp *fiber.App, brokerAddr string) error { if loader.UserConfig == nil || serverApp == nil { return nil @@ -38,6 +43,139 @@ func Setup(loader *Loader, serverApp *fiber.App, brokerAddr string) error { return SetupServices(loader.Prefix(), serverApp, brokerAddr) } +// Enhanced setup function that supports both traditional and enhanced DAG systems +func SetupEnhanced(loader *Loader, serverApp *fiber.App, brokerAddr string, config *EnhancedServiceConfig) error { + if loader.UserConfig == nil || serverApp == nil { + return nil + } + + // Initialize enhanced services + if config != nil { + if err := InitializeEnhancedServices(config); err != nil { + return fmt.Errorf("failed to initialize enhanced services: %w", err) + } + } + + // Setup both traditional and enhanced services + return SetupEnhancedServices(loader.Prefix(), serverApp, brokerAddr) +} + +// InitializeEnhancedServices initializes the enhanced service instances +func InitializeEnhancedServices(config *EnhancedServiceConfig) error { + // Initialize enhanced service manager + EnhancedServiceManagerInstance = NewEnhancedServiceManager(config) + if err := EnhancedServiceManagerInstance.Initialize(config); err != nil { + return fmt.Errorf("failed to initialize enhanced service manager: %w", err) + } + + // Initialize enhanced DAG service + EnhancedDAGServiceInstance = NewEnhancedDAGService(config) + + // Initialize enhanced validation if config is provided + if config.ValidationConfig != nil { + validation, err := NewEnhancedValidation(config.ValidationConfig) + if err != nil { + return fmt.Errorf("failed to initialize enhanced validation: %w", err) + } + EnhancedValidationInstance = validation + } + + return nil +} + +// SetupEnhancedServices sets up both traditional and enhanced services with workflow engine support +func SetupEnhancedServices(prefix string, router fiber.Router, brokerAddr string) error { + if router == nil { + return nil + } + + // Setup traditional handlers + err := SetupHandlers(userConfig.Policy.Handlers, brokerAddr) + if err != nil { + return err + } + + // Setup enhanced handlers if available + if len(userConfig.Policy.EnhancedHandlers) > 0 { + err = SetupEnhancedHandlers(userConfig.Policy.EnhancedHandlers, brokerAddr) + if err != nil { + return fmt.Errorf("failed to setup enhanced handlers: %w", err) + } + } + + // Setup background handlers (both traditional and enhanced) + setupBackgroundHandlers(brokerAddr) + setupEnhancedBackgroundHandlers(brokerAddr) + + // Setup static files and rendering + static := userConfig.Policy.Web.Static + if static != nil && static.Dir != "" { + router.Static( + static.Prefix, + static.Dir, + fiber.Static{ + Compress: static.Options.Compress, + ByteRange: static.Options.ByteRange, + Browse: static.Options.Browse, + Index: static.Options.IndexFile, + }, + ) + } + + err = setupRender(prefix, router) + if err != nil { + return fmt.Errorf("failed to setup render: %w", err) + } + + // Setup API routes (both traditional and enhanced) + return SetupEnhancedAPI(prefix, router, brokerAddr) +} + +// SetupEnhancedHandler creates and configures an enhanced handler with workflow engine support +func SetupEnhancedHandler(handler EnhancedHandler, brokerAddr string, async ...bool) (*dag.DAG, error) { + // For now, convert enhanced handler to traditional handler and use existing SetupHandler + traditionalHandler := Handler{ + Name: handler.Name, + Key: handler.Key, + DisableLog: handler.DisableLog, + Debug: handler.Debug, + } + + // Convert enhanced nodes to traditional nodes + for _, enhancedNode := range handler.Nodes { + traditionalNode := Node{ + Name: enhancedNode.Name, + ID: enhancedNode.ID, + NodeKey: enhancedNode.NodeKey, + Node: enhancedNode.Node, + FirstNode: enhancedNode.FirstNode, + Debug: false, // Default to false + } + traditionalHandler.Nodes = append(traditionalHandler.Nodes, traditionalNode) + } + + // Copy edges and convert loops to proper type + traditionalHandler.Edges = handler.Edges + + // Convert enhanced loops (Edge type) to traditional loops (Loop type) + for _, enhancedLoop := range handler.Loops { + traditionalLoop := Loop{ + Label: enhancedLoop.Label, + Source: enhancedLoop.Source, + Target: enhancedLoop.Target, + } + traditionalHandler.Loops = append(traditionalHandler.Loops, traditionalLoop) + } + + // Use existing SetupHandler function + dagInstance := SetupHandler(traditionalHandler, brokerAddr, async...) + if dagInstance.Error != nil { + return nil, dagInstance.Error + } + + return dagInstance, nil +} + func SetupHandler(handler Handler, brokerAddr string, async ...bool) *dag.DAG { syncMode := true if len(async) > 0 { @@ -716,3 +854,110 @@ func TopologicalSort(handlers map[string]*HandlerInfo) ([]string, error) { } return result, nil } + +// Enhanced setup functions for workflow engine integration + +// SetupEnhancedHandlers sets up enhanced handlers with workflow engine support +func SetupEnhancedHandlers(availableHandlers []EnhancedHandler, brokerAddr string) error { + for _, handler := range availableHandlers { + fmt.Printf("Setting up enhanced handler: %s (key: %s)\n", handler.Name, handler.Key) + _, err := SetupEnhancedHandler(handler, brokerAddr) + if err != nil { + return fmt.Errorf("failed to setup enhanced handler %s: %w", handler.Key, err) + } + } + return nil +} + +// setupEnhancedBackgroundHandlers sets up enhanced background handlers +func setupEnhancedBackgroundHandlers(brokerAddress string) { + for _, handler := range userConfig.Policy.EnhancedHandlers { + if handler.WorkflowEnabled { + dagInstance, err := SetupEnhancedHandler(handler, brokerAddress) + if err != nil { + log.Error().Err(err).Msgf("Failed to setup enhanced background handler: %s", handler.Key) + continue + } + + // Start background processing using traditional DAG + go func(dag *dag.DAG, key string) { + ctx := context.Background() + if err := dag.Consume(ctx); err != nil { + log.Error().Err(err).Msgf("Failed to start consumer for enhanced handler: %s", key) + } + }(dagInstance, handler.Key) + } + } +} + +// SetupEnhancedAPI sets up API routes for both traditional and enhanced handlers +func SetupEnhancedAPI(prefix string, router fiber.Router, brokerAddr string) error { + if prefix != "" { + prefix = "/" + prefix + } + api := router.Group(prefix) + + // Setup traditional API routes + for _, configRoute := range userConfig.Policy.Web.Apis { + routeGroup := api.Group(configRoute.Prefix) + mws := setupMiddlewares(configRoute.Middlewares...) + if len(mws) > 0 { + routeGroup.Use(mws...) + } + for _, route := range configRoute.Routes { + switch route.Operation { + case "custom": + flow := setupFlow(route, routeGroup, brokerAddr) + path := CleanAndMergePaths(route.Uri) + switch route.Method { + case "GET": + routeGroup.Get(path, requestMiddleware(route.Model, route), ruleMiddleware(route.Rules), customRuleMiddleware(route, route.CustomRules), customHandler(flow)) + case "POST": + routeGroup.Post(path, requestMiddleware(route.Model, route), ruleMiddleware(route.Rules), customRuleMiddleware(route, route.CustomRules), customHandler(flow)) + case "PUT": + routeGroup.Put(path, requestMiddleware(route.Model, route), ruleMiddleware(route.Rules), customRuleMiddleware(route, route.CustomRules), customHandler(flow)) + case "DELETE": + routeGroup.Delete(path, requestMiddleware(route.Model, route), ruleMiddleware(route.Rules), customRuleMiddleware(route, route.CustomRules), customHandler(flow)) + case "PATCH": + routeGroup.Patch(path, requestMiddleware(route.Model, route), ruleMiddleware(route.Rules), customRuleMiddleware(route, route.CustomRules), customHandler(flow)) + } + case "dag": + flow := setupFlow(route, routeGroup, brokerAddr) + path := CleanAndMergePaths(route.Uri) + routeGroup.Get(path, func(ctx *fiber.Ctx) error { + return getDAGPage(ctx, flow) + }) + } + } + } + + // Setup enhanced API routes for enhanced handlers + for _, handler := range userConfig.Policy.EnhancedHandlers { + if handler.WorkflowEnabled { + dagInstance, err := SetupEnhancedHandler(handler, brokerAddr) + if err != nil { + return fmt.Errorf("failed to setup enhanced handler for API: %w", err) + } + + // Create API endpoint for enhanced handler (using traditional DAG handler) + path := fmt.Sprintf("/enhanced/%s", handler.Key) + api.Post(path, customHandler(dagInstance)) + + // Create DAG visualization endpoint (using traditional DAG visualization) + api.Get(path+"/dag", func(ctx *fiber.Ctx) error { + return getDAGPage(ctx, dagInstance) + }) + } + } + + return nil +} + +// Helper functions for enhanced features (simplified implementation) + +// addEnhancedNode is a placeholder for future enhanced node functionality +func addEnhancedNode(enhancedDAG interface{}, node EnhancedNode) error { + // For now, this is a placeholder implementation + // In the future, this would add enhanced nodes with workflow capabilities + return nil +} diff --git a/services/user_config.go b/services/user_config.go index 7808e0e..30a1d94 100644 --- a/services/user_config.go +++ b/services/user_config.go @@ -322,6 +322,58 @@ type Policy struct { ApplicationRules []*filters.ApplicationRule `json:"application_rules" yaml:"application_rules"` Handlers []Handler `json:"handlers" yaml:"handlers"` Flows []Flow `json:"flows" yaml:"flows"` + // Enhanced configuration support + EnhancedHandlers []EnhancedHandler `json:"enhanced_handlers" yaml:"enhanced_handlers"` + EnhancedWorkflows []WorkflowDefinition `json:"enhanced_workflows" yaml:"enhanced_workflows"` + ValidationRules []ValidationServiceConfig `json:"validation_rules" yaml:"validation_rules"` +} + +// Enhanced workflow configuration structures +type WorkflowConfig struct { + Engine string `json:"engine" yaml:"engine"` + Version string `json:"version" yaml:"version"` + Timeout string `json:"timeout" yaml:"timeout"` + RetryPolicy *RetryPolicy `json:"retry_policy,omitempty" yaml:"retry_policy,omitempty"` + Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata,omitempty"` +} + +type RetryPolicy struct { + MaxAttempts int `json:"max_attempts" yaml:"max_attempts"` + BackoffType string `json:"backoff_type" yaml:"backoff_type"` + InitialDelay string `json:"initial_delay" yaml:"initial_delay"` +} + +type WorkflowDefinition struct { + ID string `json:"id" yaml:"id"` + Name string `json:"name" yaml:"name"` + Description string `json:"description" yaml:"description"` + Version string `json:"version" yaml:"version"` + Steps []WorkflowStep `json:"steps" yaml:"steps"` + Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata,omitempty"` +} + +type WorkflowStep struct { + ID string `json:"id" yaml:"id"` + Name string `json:"name" yaml:"name"` + Type string `json:"type" yaml:"type"` + Handler string `json:"handler" yaml:"handler"` + Input map[string]any `json:"input,omitempty" yaml:"input,omitempty"` + Condition string `json:"condition,omitempty" yaml:"condition,omitempty"` + Metadata map[string]string `json:"metadata,omitempty" yaml:"metadata,omitempty"` +} + +type ValidationRule struct { + Field string `json:"field" yaml:"field"` + Type string `json:"type" yaml:"type"` + Required bool `json:"required" yaml:"required"` + Message string `json:"message" yaml:"message"` + Options map[string]any `json:"options,omitempty" yaml:"options,omitempty"` +} + +type ValidationProcessor struct { + Name string `json:"name" yaml:"name"` + Type string `json:"type" yaml:"type"` + Config map[string]any `json:"config,omitempty" yaml:"config,omitempty"` } type UserConfig struct { @@ -445,3 +497,83 @@ func (c *UserConfig) GetFlow(key string) *Flow { } return nil } + +// Enhanced methods for workflow engine integration + +// GetEnhancedHandler retrieves an enhanced handler by key +func (c *UserConfig) GetEnhancedHandler(handlerName string) *EnhancedHandler { + for _, handler := range c.Policy.EnhancedHandlers { + if handler.Key == handlerName { + return &handler + } + } + return nil +} + +// GetEnhancedHandlerList returns list of all enhanced handler keys +func (c *UserConfig) GetEnhancedHandlerList() (handlers []string) { + for _, handler := range c.Policy.EnhancedHandlers { + handlers = append(handlers, handler.Key) + } + return +} + +// GetWorkflowDefinition retrieves a workflow definition by ID +func (c *UserConfig) GetWorkflowDefinition(workflowID string) *WorkflowDefinition { + for _, workflow := range c.Policy.EnhancedWorkflows { + if workflow.ID == workflowID { + return &workflow + } + } + return nil +} + +// GetValidationConfig retrieves validation configuration by name +func (c *UserConfig) GetValidationConfig(name string) *ValidationServiceConfig { + for _, config := range c.Policy.ValidationRules { + // Since ValidationServiceConfig doesn't have a name field in enhanced_contracts, + // we'll use the index or a different approach + if len(c.Policy.ValidationRules) > 0 { + return &config + } + } + return nil +} + +// IsEnhancedHandler checks if a handler is configured as enhanced +func (c *UserConfig) IsEnhancedHandler(handlerName string) bool { + handler := c.GetEnhancedHandler(handlerName) + return handler != nil && handler.WorkflowEnabled +} + +// GetAllHandlers returns both traditional and enhanced handlers +func (c *UserConfig) GetAllHandlers() map[string]interface{} { + handlers := make(map[string]interface{}) + + // Add traditional handlers + for _, handler := range c.Policy.Handlers { + handlers[handler.Key] = handler + } + + // Add enhanced handlers + for _, handler := range c.Policy.EnhancedHandlers { + handlers[handler.Key] = handler + } + + return handlers +} + +// GetHandlerByKey returns either traditional or enhanced handler by key +func (c *UserConfig) GetHandlerByKey(key string) interface{} { + // Check traditional handlers first + if handler := c.GetHandler(key); handler != nil { + return *handler + } + + // Check enhanced handlers + if handler := c.GetEnhancedHandler(key); handler != nil { + return *handler + } + + return nil +}