From b7ca2a8aeb435fefcb88a9f01fc520a47ce9db16 Mon Sep 17 00:00:00 2001 From: Oarkflow Date: Fri, 19 Sep 2025 16:24:45 +0545 Subject: [PATCH] update --- examples/complex_dag_pages/main.go | 700 ++++++++++++++++++++++++--- examples/form.go | 730 +++++++++++++++++++++++++---- 2 files changed, 1262 insertions(+), 168 deletions(-) diff --git a/examples/complex_dag_pages/main.go b/examples/complex_dag_pages/main.go index 0caa4b6..1ce8adc 100644 --- a/examples/complex_dag_pages/main.go +++ b/examples/complex_dag_pages/main.go @@ -5,10 +5,13 @@ import ( "fmt" "regexp" "strings" + "time" "github.com/oarkflow/json" + "github.com/oarkflow/jet" "github.com/oarkflow/mq" + "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/dag" ) @@ -28,75 +31,52 @@ func loginSubDAG() *dag.DAG { return login } -// phoneProcessingSubDAG creates a sub-DAG for processing phone numbers -func phoneProcessingSubDAG() *dag.DAG { - phone := dag.NewDAG("Phone Processing Sub DAG", "phone-processing-sub-dag", func(taskID string, result mq.Result) { - fmt.Printf("Phone Processing Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload)) - }, mq.WithSyncMode(true)) - - phone. - AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{}). - AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{}). - AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{}). - AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{}). - AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{}). - AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{}). - AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop"). - AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone"). - AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"}). - AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid"). - AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid"). - AddEdge(dag.Simple, "Loop to Collect", "phone-loop", "collect-valid") - - return phone -} - func main() { flow := dag.NewDAG("Complex Phone Processing DAG with Pages", "complex-phone-dag", func(taskID string, result mq.Result) { fmt.Printf("Complex DAG Final result for task %s: %s\n", taskID, string(result.Payload)) }) flow.ConfigureMemoryStorage() - // Main nodes - flow.AddNode(dag.Function, "Initialize", "init", &Initialize{}, true) - flow.AddDAGNode(dag.Function, "Login Process", "login", loginSubDAG()) - flow.AddNode(dag.Function, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{}) - flow.AddDAGNode(dag.Function, "Process Phones", "process-phones", phoneProcessingSubDAG()) + // Main nodes - Login process as individual nodes (not sub-DAG) for proper page serving + flow.AddNode(dag.Page, "Initialize", "init", &Initialize{}, true) + flow.AddNode(dag.Page, "Login Page", "login-page", &LoginPage{}) + flow.AddNode(dag.Function, "Verify Credentials", "verify-credentials", &VerifyCredentials{}) + flow.AddNode(dag.Function, "Generate Token", "generate-token", &GenerateToken{}) + flow.AddNode(dag.Page, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{}) + flow.AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{}) + flow.AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{}) + flow.AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{}) + flow.AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{}) + flow.AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{}) + flow.AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{}) flow.AddNode(dag.Function, "Generate Report", "generate-report", &GenerateReport{}) flow.AddNode(dag.Function, "Send Summary Email", "send-summary", &SendSummaryEmail{}) flow.AddNode(dag.Function, "Final Cleanup", "cleanup", &FinalCleanup{}) - // Edges - flow.AddEdge(dag.Simple, "Init to Login", "init", "login") - flow.AddEdge(dag.Simple, "Login to Upload", "login", "upload-page") - flow.AddEdge(dag.Simple, "Upload to Process", "upload-page", "process-phones") - flow.AddEdge(dag.Simple, "Process to Report", "process-phones", "generate-report") + // Edges - Connect login flow individually + flow.AddEdge(dag.Simple, "Init to Login", "init", "login-page") + flow.AddEdge(dag.Simple, "Login to Verify", "login-page", "verify-credentials") + flow.AddEdge(dag.Simple, "Verify to Token", "verify-credentials", "generate-token") + flow.AddEdge(dag.Simple, "Token to Upload", "generate-token", "upload-page") + flow.AddEdge(dag.Simple, "Upload to Parse", "upload-page", "parse-phones") + flow.AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop") + flow.AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone") + flow.AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"}) + flow.AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid") + flow.AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid") + flow.AddEdge(dag.Simple, "Loop to Report", "phone-loop", "generate-report") flow.AddEdge(dag.Simple, "Report to Summary", "generate-report", "send-summary") flow.AddEdge(dag.Simple, "Summary to Cleanup", "send-summary", "cleanup") - // Sample data for testing - data := map[string]interface{}{ - "user_id": "user123", - "session_data": map[string]interface{}{ - "authenticated": false, - }, - "phone_data": map[string]interface{}{ - "format": "csv", - "content": "name,phone\nJohn Doe,+1234567890\nJane Smith,+1987654321\nBob Johnson,invalid-phone\nAlice Brown,+1555123456", - }, - } + // Check for DAG errors + // if flow.Error != nil { + // fmt.Printf("DAG Error: %v\n", flow.Error) + // panic(flow.Error) + // } - jsonData, _ := json.Marshal(data) - if flow.Error != nil { - panic(flow.Error) - } - - rs := flow.Process(context.Background(), jsonData) - if rs.Error != nil { - panic(rs.Error) - } - fmt.Println("Complex Phone DAG Status:", rs.Status, "Topic:", rs.Topic) - fmt.Println("Final Payload:", string(rs.Payload)) + fmt.Println("Starting Complex Phone Processing DAG server on http://0.0.0.0:8080") + fmt.Println("Navigate to the URL to access the login page") + flow.Start(context.Background(), ":8080") } // Task implementations @@ -108,12 +88,151 @@ type Initialize struct { func (p *Initialize) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { var data map[string]interface{} if err := json.Unmarshal(task.Payload, &data); err != nil { - return mq.Result{Error: fmt.Errorf("Initialize Error: %s", err.Error()), Ctx: ctx} + data = make(map[string]interface{}) } data["initialized"] = true data["timestamp"] = "2025-09-19T12:00:00Z" - updatedPayload, _ := json.Marshal(data) - return mq.Result{Payload: updatedPayload, Ctx: ctx} + + // Add sample phone data for testing + sampleCSV := `name,phone +John Doe,+1234567890 +Jane Smith,0987654321 +Bob Johnson,1555123456 +Alice Brown,invalid-phone +Charlie Wilson,+441234567890` + + data["phone_data"] = map[string]interface{}{ + "content": sampleCSV, + "format": "csv", + "source": "sample_data", + "created_at": "2025-09-19T12:00:00Z", + } + + // Generate a task ID for this workflow instance + taskID := "workflow-" + fmt.Sprintf("%d", time.Now().Unix()) + + // Since this is a page node, show a welcome page that auto-redirects to login + htmlContent := ` + + + + + + Phone Processing System + + + +
+

📱 Phone Processing System

+

Welcome to our advanced phone number processing workflow

+ +
+

Features:

+
    +
  • CSV/JSON file upload support
  • +
  • Phone number validation and formatting
  • +
  • Automated welcome SMS sending
  • +
  • Invalid number filtering
  • +
  • Comprehensive processing reports
  • +
+
+ +
+

Initializing workflow...

+

Task ID: ` + taskID + `

+

Redirecting to login page in 3 seconds...

+
+
+ + + +` + + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + rs, err := parser.ParseTemplate(htmlContent, map[string]any{}) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + resultData := map[string]any{ + "html_content": rs, + "step": "initialize", + "data": data, + } + + resultPayload, _ := json.Marshal(resultData) + return mq.Result{Payload: resultPayload, Ctx: ctx} } type LoginPage struct { @@ -121,21 +240,177 @@ type LoginPage struct { } func (p *LoginPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + // Check if this is a form submission + var inputData map[string]interface{} + if len(task.Payload) > 0 { + if err := json.Unmarshal(task.Payload, &inputData); err == nil { + // Check if we have form data (username/password) + if formData, ok := inputData["form"].(map[string]interface{}); ok { + // This is a form submission, pass it through for verification + credentials := map[string]interface{}{ + "username": formData["username"], + "password": formData["password"], + } + inputData["credentials"] = credentials + updatedPayload, _ := json.Marshal(inputData) + return mq.Result{Payload: updatedPayload, Ctx: ctx} + } + } + } + + // Otherwise, show the form var data map[string]interface{} if err := json.Unmarshal(task.Payload, &data); err != nil { - return mq.Result{Error: fmt.Errorf("LoginPage Error: %s", err.Error()), Ctx: ctx} + data = make(map[string]interface{}) } - // Simulate user input from page - data["credentials"] = map[string]interface{}{ - "username": "admin", - "password": "password123", - } - data["login_attempted"] = true + // HTML content for login page + htmlContent := ` + + + + + + Phone Processing System - Login + + + +
+ +
+
+ + +
+
+ + +
+ +
+
+
- updatedPayload, _ := json.Marshal(data) + + +` + + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + rs, err := parser.ParseTemplate(htmlContent, map[string]any{}) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + resultData := map[string]any{ + "html_content": rs, + "step": "login", + "data": data, + } + + resultPayload, _ := json.Marshal(resultData) return mq.Result{ - Payload: updatedPayload, + Payload: resultPayload, Ctx: ctx, } } @@ -195,22 +470,293 @@ type UploadPhoneDataPage struct { } func (p *UploadPhoneDataPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + // Check if this is a form submission + var inputData map[string]interface{} + if len(task.Payload) > 0 { + if err := json.Unmarshal(task.Payload, &inputData); err == nil { + // Check if we have form data (phone_data) + if formData, ok := inputData["form"].(map[string]interface{}); ok { + // This is a form submission, pass it through for processing + if phoneData, exists := formData["phone_data"]; exists && phoneData != "" { + inputData["phone_data"] = map[string]interface{}{ + "content": phoneData.(string), + "format": "csv", + "source": "user_input", + "created_at": "2025-09-19T12:00:00Z", + } + } + updatedPayload, _ := json.Marshal(inputData) + return mq.Result{Payload: updatedPayload, Ctx: ctx} + } + } + } + + // Otherwise, show the form var data map[string]interface{} if err := json.Unmarshal(task.Payload, &data); err != nil { - return mq.Result{Error: fmt.Errorf("UploadPhoneDataPage Error: %s", err.Error()), Ctx: ctx} + data = make(map[string]interface{}) } - // Simulate user interaction - in a real scenario, this would be user input - // The phone data is already in the payload from initialization - data["upload_completed"] = true - data["uploaded_at"] = "2025-09-19T12:05:00Z" - data["user_interaction"] = map[string]interface{}{ - "confirmed_upload": true, - "upload_method": "file_upload", + // HTML content for upload page + htmlContent := ` + + + + + + Phone Processing System - Upload Data + + + +
+
+

📤 Upload Phone Data

+

Upload your CSV file containing phone numbers for processing

+
+ +
+
+
📁
+
Drag & drop your CSV file here or click to browse
+
Supported format: CSV with name,phone columns
+ +
+ +
OR
+ +
+ + +
+ +
+
+
+ +
+
+ + + +` + + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + rs, err := parser.ParseTemplate(htmlContent, map[string]any{}) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} } - updatedPayload, _ := json.Marshal(data) - return mq.Result{Payload: updatedPayload, Ctx: ctx} + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + resultData := map[string]any{ + "html_content": rs, + "step": "upload", + "data": data, + } + + resultPayload, _ := json.Marshal(resultData) + return mq.Result{ + Payload: resultPayload, + Ctx: ctx, + } } type ParsePhoneNumbers struct { diff --git a/examples/form.go b/examples/form.go index b63b356..316f164 100644 --- a/examples/form.go +++ b/examples/form.go @@ -3,10 +3,14 @@ package main import ( "context" "fmt" + "regexp" + "strings" + "time" "github.com/oarkflow/json" "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/utils" "github.com/oarkflow/jet" @@ -15,146 +19,690 @@ import ( ) func main() { - flow := dag.NewDAG("Multi-Step Form", "multi-step-form", func(taskID string, result mq.Result) { - fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) + flow := dag.NewDAG("SMS Sender", "sms-sender", func(taskID string, result mq.Result) { + fmt.Printf("SMS workflow completed for task %s: %s\n", taskID, string(utils.RemoveRecursiveFromJSON(result.Payload, "html_content"))) }) - flow.AddNode(dag.Page, "Form Step1", "FormStep1", &FormStep1{}) - flow.AddNode(dag.Page, "Form Step2", "FormStep2", &FormStep2{}) - flow.AddNode(dag.Page, "Form Result", "FormResult", &FormResult{}) - // Define edges - flow.AddEdge(dag.Simple, "Form Step1", "FormStep1", "FormStep2") - flow.AddEdge(dag.Simple, "Form Step2", "FormStep2", "FormResult") + // Add SMS workflow nodes + // Note: Page nodes have no timeout by default, allowing users unlimited time for form input + flow.AddNode(dag.Page, "SMS Form", "SMSForm", &SMSFormNode{}) + flow.AddNode(dag.Function, "Validate Input", "ValidateInput", &ValidateInputNode{}) + flow.AddNode(dag.Function, "Send SMS", "SendSMS", &SendSMSNode{}) + flow.AddNode(dag.Page, "SMS Result", "SMSResult", &SMSResultNode{}) + flow.AddNode(dag.Page, "Error Page", "ErrorPage", &ErrorPageNode{}) + + // Define edges for SMS workflow + flow.AddEdge(dag.Simple, "Form to Validation", "SMSForm", "ValidateInput") + flow.AddCondition("ValidateInput", map[string]string{"valid": "SendSMS", "invalid": "ErrorPage"}) + flow.AddCondition("SendSMS", map[string]string{"sent": "SMSResult", "failed": "ErrorPage"}) // Start the flow if flow.Error != nil { panic(flow.Error) } - flow.Start(context.Background(), "0.0.0.0:8082") + + fmt.Println("Starting SMS DAG server on http://0.0.0.0:8083") + fmt.Println("Navigate to the URL to access the SMS form") + flow.Start(context.Background(), "0.0.0.0:8083") } -type FormStep1 struct { +// SMSFormNode - Initial form to collect SMS data +type SMSFormNode struct { dag.Operation } -func (p *FormStep1) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - bt := []byte(` +func (s *SMSFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + // Check if this is a form submission + var inputData map[string]any + if task.Payload != nil && len(task.Payload) > 0 { + if err := json.Unmarshal(task.Payload, &inputData); err == nil { + // If we have valid input data, pass it through for validation + return mq.Result{Payload: task.Payload, Ctx: ctx} + } + } + + // Otherwise, show the form + htmlTemplate := ` + - + + SMS Sender + + - - - - - - - - +

📱 SMS Sender

+
+

Send SMS messages through our secure DAG workflow

+
+
+
+ + +
+ Supports US format: +1234567890 or 1234567890 +
+
+ +
+ + +
0/160 characters
+
+ +
+ + +
+ + +
+ + + + +` -`) parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) - rs, err := parser.ParseTemplate(string(bt), map[string]any{ + rs, err := parser.ParseTemplate(htmlTemplate, map[string]any{ "task_id": ctx.Value("task_id"), }) if err != nil { - fmt.Println("FormStep1", string(task.Payload)) return mq.Result{Error: err, Ctx: ctx} } + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) data := map[string]any{ "html_content": rs, + "step": "form", } - bt, _ = json.Marshal(data) + bt, _ := json.Marshal(data) return mq.Result{Payload: bt, Ctx: ctx} } -type FormStep2 struct { +// ValidateInputNode - Validates phone number and message +type ValidateInputNode struct { dag.Operation } -func (p *FormStep2) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - // Parse input from Step 1 +func (v *ValidateInputNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var inputData map[string]any + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{ + Error: fmt.Errorf("invalid input data: %v", err), + Ctx: ctx, + } + } + + // Extract form data + phone, _ := inputData["phone"].(string) + message, _ := inputData["message"].(string) + senderName, _ := inputData["sender_name"].(string) + + // Validate phone number + if phone == "" { + inputData["validation_error"] = "Phone number is required" + inputData["error_field"] = "phone" + bt, _ := json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"} + } + + // Clean and validate phone number format + cleanPhone := regexp.MustCompile(`\D`).ReplaceAllString(phone, "") + + // Check for valid US phone number (10 or 11 digits) + if len(cleanPhone) == 10 { + cleanPhone = "1" + cleanPhone // Add country code + } else if len(cleanPhone) != 11 || !strings.HasPrefix(cleanPhone, "1") { + inputData["validation_error"] = "Invalid phone number format. Please use US format: +1234567890 or 1234567890" + inputData["error_field"] = "phone" + bt, _ := json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"} + } + + // Validate message + if message == "" { + inputData["validation_error"] = "Message is required" + inputData["error_field"] = "message" + bt, _ := json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"} + } + + if len(message) > 160 { + inputData["validation_error"] = "Message too long. Maximum 160 characters allowed" + inputData["error_field"] = "message" + bt, _ := json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"} + } + + // Check for potentially harmful content + forbiddenWords := []string{"spam", "scam", "fraud", "hack"} + messageLower := strings.ToLower(message) + for _, word := range forbiddenWords { + if strings.Contains(messageLower, word) { + inputData["validation_error"] = "Message contains prohibited content" + inputData["error_field"] = "message" + bt, _ := json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"} + } + } + + // All validations passed + validatedData := map[string]any{ + "phone": cleanPhone, + "message": message, + "sender_name": senderName, + "validated_at": time.Now().Format("2006-01-02 15:04:05"), + "validation_status": "success", + "formatted_phone": formatPhoneForDisplay(cleanPhone), + "char_count": len(message), + } + + bt, _ := json.Marshal(validatedData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "valid"} +} + +// SendSMSNode - Simulates sending SMS +type SendSMSNode struct { + dag.Operation +} + +func (s *SendSMSNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { var inputData map[string]any if err := json.Unmarshal(task.Payload, &inputData); err != nil { return mq.Result{Error: err, Ctx: ctx} } - // Determine dynamic content - isEligible := inputData["age"] == "18" - inputData["show_voting_controls"] = isEligible - bt := []byte(` - + phone, _ := inputData["phone"].(string) + message, _ := inputData["message"].(string) + senderName, _ := inputData["sender_name"].(string) - -
- {{ if show_voting_controls }} - - - - {{ else }} -

You are not eligible to vote.

- {{ end }} -
- - -`) - parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) - inputData["task_id"] = ctx.Value("task_id") - rs, err := parser.ParseTemplate(string(bt), inputData) - if err != nil { - fmt.Println("FormStep2", inputData) - return mq.Result{Error: err, Ctx: ctx} + // Simulate SMS sending delay + time.Sleep(500 * time.Millisecond) + + // Simulate occasional failures for demo purposes + timestamp := time.Now() + success := timestamp.Second()%10 != 0 // 90% success rate + + if !success { + errorData := map[string]any{ + "phone": phone, + "message": message, + "sender_name": senderName, + "sms_status": "failed", + "error_message": "SMS gateway temporarily unavailable. Please try again.", + "sent_at": timestamp.Format("2006-01-02 15:04:05"), + "retry_suggested": true, + } + bt, _ := json.Marshal(errorData) + return mq.Result{ + Payload: bt, + Ctx: ctx, + ConditionStatus: "failed", + } } - ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) - inputData["html_content"] = rs - bt, _ = json.Marshal(inputData) - return mq.Result{Payload: bt, Ctx: ctx} + + // Generate mock SMS ID and response + smsID := fmt.Sprintf("SMS_%d_%s", timestamp.Unix(), phone[len(phone)-4:]) + + resultData := map[string]any{ + "phone": phone, + "formatted_phone": formatPhoneForDisplay(phone), + "message": message, + "sender_name": senderName, + "sms_status": "sent", + "sms_id": smsID, + "sent_at": timestamp.Format("2006-01-02 15:04:05"), + "delivery_estimate": "1-2 minutes", + "cost_estimate": "$0.02", + "gateway": "MockSMS Gateway", + "char_count": len(message), + } + + fmt.Printf("📱 SMS sent successfully! ID: %s, Phone: %s\n", smsID, formatPhoneForDisplay(phone)) + + bt, _ := json.Marshal(resultData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "sent"} } -type FormResult struct { +// SMSResultNode - Shows successful SMS result +type SMSResultNode struct { dag.Operation } -func (p *FormResult) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - // Load HTML template for results - bt := []byte(` - - - -

Form Summary

-

Name: {{ name }}

-

Age: {{ age }}

-{{ if register_vote }} -

You have registered to vote!

-{{ else }} -

You did not register to vote.

-{{ end }} - - - - -`) +func (r *SMSResultNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { var inputData map[string]any - if task.Payload != nil { - if err := json.Unmarshal(task.Payload, &inputData); err != nil { - return mq.Result{Error: err, Ctx: ctx} - } - } - if inputData != nil { - if isEligible, ok := inputData["register_vote"].(string); ok { - inputData["register_vote"] = isEligible - } else { - inputData["register_vote"] = false - } + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{Error: err, Ctx: ctx} } + + htmlTemplate := ` + + + + SMS Sent Successfully + + + +
+
+

SMS Sent Successfully!

+ +
{{sms_status}}
+ +
+
+
📱 Phone Number
+
{{formatted_phone}}
+
+
+
🆔 SMS ID
+
{{sms_id}}
+
+
+
⏰ Sent At
+
{{sent_at}}
+
+
+
🚚 Delivery
+
{{delivery_estimate}}
+
+ {{if sender_name}} +
+
👤 Sender
+
{{sender_name}}
+
+ {{end}} +
+
💰 Cost
+
{{cost_estimate}}
+
+
+ +
+
💬 Message Sent ({{char_count}} chars):
+
+ "{{message}}" +
+
+ + + +
+ Gateway: {{gateway}} | Task completed in DAG workflow +
+
+ +` + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) - rs, err := parser.ParseTemplate(string(bt), inputData) + rs, err := parser.ParseTemplate(htmlTemplate, inputData) if err != nil { return mq.Result{Error: err, Ctx: ctx} } + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) - inputData["html_content"] = rs - bt, _ = json.Marshal(inputData) + finalData := map[string]any{ + "html_content": rs, + "result": inputData, + "step": "success", + } + bt, _ := json.Marshal(finalData) return mq.Result{Payload: bt, Ctx: ctx} } + +// ErrorPageNode - Shows validation or sending errors +type ErrorPageNode struct { + dag.Operation +} + +func (e *ErrorPageNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var inputData map[string]any + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + // Determine error type and message + errorMessage, _ := inputData["validation_error"].(string) + errorField, _ := inputData["error_field"].(string) + smsError, _ := inputData["error_message"].(string) + + if errorMessage == "" && smsError != "" { + errorMessage = smsError + errorField = "sms_sending" + } + if errorMessage == "" { + errorMessage = "An unknown error occurred" + } + + htmlTemplate := ` + + + + SMS Error + + + +
+
+

SMS Error

+ +
+ {{error_message}} +
+ + {{if error_field}} +
+ Error Field: {{error_field}}
+ Action Required: Please correct the highlighted field and try again. +
+ {{end}} + + {{if retry_suggested}} +
+ ⚠️ Temporary Issue: This appears to be a temporary gateway issue. + Please try sending your SMS again in a few moments. +
+ {{end}} + + + +
+ DAG Error Handler | SMS Workflow Failed +
+
+ +` + + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + templateData := map[string]any{ + "error_message": errorMessage, + "error_field": errorField, + "retry_suggested": inputData["retry_suggested"], + } + + rs, err := parser.ParseTemplate(htmlTemplate, templateData) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + finalData := map[string]any{ + "html_content": rs, + "error_data": inputData, + "step": "error", + } + bt, _ := json.Marshal(finalData) + return mq.Result{Payload: bt, Ctx: ctx} +} + +// Helper function to format phone number for display +func formatPhoneForDisplay(phone string) string { + if len(phone) == 11 && strings.HasPrefix(phone, "1") { + // Format as +1 (XXX) XXX-XXXX + return fmt.Sprintf("+1 (%s) %s-%s", + phone[1:4], + phone[4:7], + phone[7:11]) + } + return phone +}