diff --git a/examples/complex_dag_pages/main.go b/examples/complex_dag_pages/main.go
index 0caa4b6..1ce8adc 100644
--- a/examples/complex_dag_pages/main.go
+++ b/examples/complex_dag_pages/main.go
@@ -5,10 +5,13 @@ import (
"fmt"
"regexp"
"strings"
+ "time"
"github.com/oarkflow/json"
+ "github.com/oarkflow/jet"
"github.com/oarkflow/mq"
+ "github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/dag"
)
@@ -28,75 +31,52 @@ func loginSubDAG() *dag.DAG {
return login
}
-// phoneProcessingSubDAG creates a sub-DAG for processing phone numbers
-func phoneProcessingSubDAG() *dag.DAG {
- phone := dag.NewDAG("Phone Processing Sub DAG", "phone-processing-sub-dag", func(taskID string, result mq.Result) {
- fmt.Printf("Phone Processing Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload))
- }, mq.WithSyncMode(true))
-
- phone.
- AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{}).
- AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{}).
- AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{}).
- AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{}).
- AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{}).
- AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{}).
- AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop").
- AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone").
- AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"}).
- AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid").
- AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid").
- AddEdge(dag.Simple, "Loop to Collect", "phone-loop", "collect-valid")
-
- return phone
-}
-
func main() {
flow := dag.NewDAG("Complex Phone Processing DAG with Pages", "complex-phone-dag", func(taskID string, result mq.Result) {
fmt.Printf("Complex DAG Final result for task %s: %s\n", taskID, string(result.Payload))
})
flow.ConfigureMemoryStorage()
- // Main nodes
- flow.AddNode(dag.Function, "Initialize", "init", &Initialize{}, true)
- flow.AddDAGNode(dag.Function, "Login Process", "login", loginSubDAG())
- flow.AddNode(dag.Function, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{})
- flow.AddDAGNode(dag.Function, "Process Phones", "process-phones", phoneProcessingSubDAG())
+ // Main nodes - Login process as individual nodes (not sub-DAG) for proper page serving
+ flow.AddNode(dag.Page, "Initialize", "init", &Initialize{}, true)
+ flow.AddNode(dag.Page, "Login Page", "login-page", &LoginPage{})
+ flow.AddNode(dag.Function, "Verify Credentials", "verify-credentials", &VerifyCredentials{})
+ flow.AddNode(dag.Function, "Generate Token", "generate-token", &GenerateToken{})
+ flow.AddNode(dag.Page, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{})
+ flow.AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{})
+ flow.AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{})
+ flow.AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{})
+ flow.AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{})
+ flow.AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{})
+ flow.AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{})
flow.AddNode(dag.Function, "Generate Report", "generate-report", &GenerateReport{})
flow.AddNode(dag.Function, "Send Summary Email", "send-summary", &SendSummaryEmail{})
flow.AddNode(dag.Function, "Final Cleanup", "cleanup", &FinalCleanup{})
- // Edges
- flow.AddEdge(dag.Simple, "Init to Login", "init", "login")
- flow.AddEdge(dag.Simple, "Login to Upload", "login", "upload-page")
- flow.AddEdge(dag.Simple, "Upload to Process", "upload-page", "process-phones")
- flow.AddEdge(dag.Simple, "Process to Report", "process-phones", "generate-report")
+ // Edges - Connect login flow individually
+ flow.AddEdge(dag.Simple, "Init to Login", "init", "login-page")
+ flow.AddEdge(dag.Simple, "Login to Verify", "login-page", "verify-credentials")
+ flow.AddEdge(dag.Simple, "Verify to Token", "verify-credentials", "generate-token")
+ flow.AddEdge(dag.Simple, "Token to Upload", "generate-token", "upload-page")
+ flow.AddEdge(dag.Simple, "Upload to Parse", "upload-page", "parse-phones")
+ flow.AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop")
+ flow.AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone")
+ flow.AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"})
+ flow.AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid")
+ flow.AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid")
+ flow.AddEdge(dag.Simple, "Loop to Report", "phone-loop", "generate-report")
flow.AddEdge(dag.Simple, "Report to Summary", "generate-report", "send-summary")
flow.AddEdge(dag.Simple, "Summary to Cleanup", "send-summary", "cleanup")
- // Sample data for testing
- data := map[string]interface{}{
- "user_id": "user123",
- "session_data": map[string]interface{}{
- "authenticated": false,
- },
- "phone_data": map[string]interface{}{
- "format": "csv",
- "content": "name,phone\nJohn Doe,+1234567890\nJane Smith,+1987654321\nBob Johnson,invalid-phone\nAlice Brown,+1555123456",
- },
- }
+ // Check for DAG errors
+ // if flow.Error != nil {
+ // fmt.Printf("DAG Error: %v\n", flow.Error)
+ // panic(flow.Error)
+ // }
- jsonData, _ := json.Marshal(data)
- if flow.Error != nil {
- panic(flow.Error)
- }
-
- rs := flow.Process(context.Background(), jsonData)
- if rs.Error != nil {
- panic(rs.Error)
- }
- fmt.Println("Complex Phone DAG Status:", rs.Status, "Topic:", rs.Topic)
- fmt.Println("Final Payload:", string(rs.Payload))
+ fmt.Println("Starting Complex Phone Processing DAG server on http://0.0.0.0:8080")
+ fmt.Println("Navigate to the URL to access the login page")
+ flow.Start(context.Background(), ":8080")
}
// Task implementations
@@ -108,12 +88,151 @@ type Initialize struct {
func (p *Initialize) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]interface{}
if err := json.Unmarshal(task.Payload, &data); err != nil {
- return mq.Result{Error: fmt.Errorf("Initialize Error: %s", err.Error()), Ctx: ctx}
+ data = make(map[string]interface{})
}
data["initialized"] = true
data["timestamp"] = "2025-09-19T12:00:00Z"
- updatedPayload, _ := json.Marshal(data)
- return mq.Result{Payload: updatedPayload, Ctx: ctx}
+
+ // Add sample phone data for testing
+ sampleCSV := `name,phone
+John Doe,+1234567890
+Jane Smith,0987654321
+Bob Johnson,1555123456
+Alice Brown,invalid-phone
+Charlie Wilson,+441234567890`
+
+ data["phone_data"] = map[string]interface{}{
+ "content": sampleCSV,
+ "format": "csv",
+ "source": "sample_data",
+ "created_at": "2025-09-19T12:00:00Z",
+ }
+
+ // Generate a task ID for this workflow instance
+ taskID := "workflow-" + fmt.Sprintf("%d", time.Now().Unix())
+
+ // Since this is a page node, show a welcome page that auto-redirects to login
+ htmlContent := `
+
+
+
+
+
+ Phone Processing System
+
+
+
+
+
📱 Phone Processing System
+
Welcome to our advanced phone number processing workflow
+
+
+
Features:
+
+ - CSV/JSON file upload support
+ - Phone number validation and formatting
+ - Automated welcome SMS sending
+ - Invalid number filtering
+ - Comprehensive processing reports
+
+
+
+
+
Initializing workflow...
+
Task ID: ` + taskID + `
+
Redirecting to login page in 3 seconds...
+
+
+
+
+
+`
+
+ parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
+ rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
+ if err != nil {
+ return mq.Result{Error: err, Ctx: ctx}
+ }
+
+ ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
+ resultData := map[string]any{
+ "html_content": rs,
+ "step": "initialize",
+ "data": data,
+ }
+
+ resultPayload, _ := json.Marshal(resultData)
+ return mq.Result{Payload: resultPayload, Ctx: ctx}
}
type LoginPage struct {
@@ -121,21 +240,177 @@ type LoginPage struct {
}
func (p *LoginPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
+ // Check if this is a form submission
+ var inputData map[string]interface{}
+ if len(task.Payload) > 0 {
+ if err := json.Unmarshal(task.Payload, &inputData); err == nil {
+ // Check if we have form data (username/password)
+ if formData, ok := inputData["form"].(map[string]interface{}); ok {
+ // This is a form submission, pass it through for verification
+ credentials := map[string]interface{}{
+ "username": formData["username"],
+ "password": formData["password"],
+ }
+ inputData["credentials"] = credentials
+ updatedPayload, _ := json.Marshal(inputData)
+ return mq.Result{Payload: updatedPayload, Ctx: ctx}
+ }
+ }
+ }
+
+ // Otherwise, show the form
var data map[string]interface{}
if err := json.Unmarshal(task.Payload, &data); err != nil {
- return mq.Result{Error: fmt.Errorf("LoginPage Error: %s", err.Error()), Ctx: ctx}
+ data = make(map[string]interface{})
}
- // Simulate user input from page
- data["credentials"] = map[string]interface{}{
- "username": "admin",
- "password": "password123",
- }
- data["login_attempted"] = true
+ // HTML content for login page
+ htmlContent := `
+
+
+
+
+
+ Phone Processing System - Login
+
+
+
+
- updatedPayload, _ := json.Marshal(data)
+
+
+`
+
+ parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
+ rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
+ if err != nil {
+ return mq.Result{Error: err, Ctx: ctx}
+ }
+
+ ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
+ resultData := map[string]any{
+ "html_content": rs,
+ "step": "login",
+ "data": data,
+ }
+
+ resultPayload, _ := json.Marshal(resultData)
return mq.Result{
- Payload: updatedPayload,
+ Payload: resultPayload,
Ctx: ctx,
}
}
@@ -195,22 +470,293 @@ type UploadPhoneDataPage struct {
}
func (p *UploadPhoneDataPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
+ // Check if this is a form submission
+ var inputData map[string]interface{}
+ if len(task.Payload) > 0 {
+ if err := json.Unmarshal(task.Payload, &inputData); err == nil {
+ // Check if we have form data (phone_data)
+ if formData, ok := inputData["form"].(map[string]interface{}); ok {
+ // This is a form submission, pass it through for processing
+ if phoneData, exists := formData["phone_data"]; exists && phoneData != "" {
+ inputData["phone_data"] = map[string]interface{}{
+ "content": phoneData.(string),
+ "format": "csv",
+ "source": "user_input",
+ "created_at": "2025-09-19T12:00:00Z",
+ }
+ }
+ updatedPayload, _ := json.Marshal(inputData)
+ return mq.Result{Payload: updatedPayload, Ctx: ctx}
+ }
+ }
+ }
+
+ // Otherwise, show the form
var data map[string]interface{}
if err := json.Unmarshal(task.Payload, &data); err != nil {
- return mq.Result{Error: fmt.Errorf("UploadPhoneDataPage Error: %s", err.Error()), Ctx: ctx}
+ data = make(map[string]interface{})
}
- // Simulate user interaction - in a real scenario, this would be user input
- // The phone data is already in the payload from initialization
- data["upload_completed"] = true
- data["uploaded_at"] = "2025-09-19T12:05:00Z"
- data["user_interaction"] = map[string]interface{}{
- "confirmed_upload": true,
- "upload_method": "file_upload",
+ // HTML content for upload page
+ htmlContent := `
+
+
+
+
+
+ Phone Processing System - Upload Data
+
+
+
+
+
+
+
+`
+
+ parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
+ rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
+ if err != nil {
+ return mq.Result{Error: err, Ctx: ctx}
}
- updatedPayload, _ := json.Marshal(data)
- return mq.Result{Payload: updatedPayload, Ctx: ctx}
+ ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
+ resultData := map[string]any{
+ "html_content": rs,
+ "step": "upload",
+ "data": data,
+ }
+
+ resultPayload, _ := json.Marshal(resultData)
+ return mq.Result{
+ Payload: resultPayload,
+ Ctx: ctx,
+ }
}
type ParsePhoneNumbers struct {
diff --git a/examples/form.go b/examples/form.go
index b63b356..316f164 100644
--- a/examples/form.go
+++ b/examples/form.go
@@ -3,10 +3,14 @@ package main
import (
"context"
"fmt"
+ "regexp"
+ "strings"
+ "time"
"github.com/oarkflow/json"
"github.com/oarkflow/mq/dag"
+ "github.com/oarkflow/mq/utils"
"github.com/oarkflow/jet"
@@ -15,146 +19,690 @@ import (
)
func main() {
- flow := dag.NewDAG("Multi-Step Form", "multi-step-form", func(taskID string, result mq.Result) {
- fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
+ flow := dag.NewDAG("SMS Sender", "sms-sender", func(taskID string, result mq.Result) {
+ fmt.Printf("SMS workflow completed for task %s: %s\n", taskID, string(utils.RemoveRecursiveFromJSON(result.Payload, "html_content")))
})
- flow.AddNode(dag.Page, "Form Step1", "FormStep1", &FormStep1{})
- flow.AddNode(dag.Page, "Form Step2", "FormStep2", &FormStep2{})
- flow.AddNode(dag.Page, "Form Result", "FormResult", &FormResult{})
- // Define edges
- flow.AddEdge(dag.Simple, "Form Step1", "FormStep1", "FormStep2")
- flow.AddEdge(dag.Simple, "Form Step2", "FormStep2", "FormResult")
+ // Add SMS workflow nodes
+ // Note: Page nodes have no timeout by default, allowing users unlimited time for form input
+ flow.AddNode(dag.Page, "SMS Form", "SMSForm", &SMSFormNode{})
+ flow.AddNode(dag.Function, "Validate Input", "ValidateInput", &ValidateInputNode{})
+ flow.AddNode(dag.Function, "Send SMS", "SendSMS", &SendSMSNode{})
+ flow.AddNode(dag.Page, "SMS Result", "SMSResult", &SMSResultNode{})
+ flow.AddNode(dag.Page, "Error Page", "ErrorPage", &ErrorPageNode{})
+
+ // Define edges for SMS workflow
+ flow.AddEdge(dag.Simple, "Form to Validation", "SMSForm", "ValidateInput")
+ flow.AddCondition("ValidateInput", map[string]string{"valid": "SendSMS", "invalid": "ErrorPage"})
+ flow.AddCondition("SendSMS", map[string]string{"sent": "SMSResult", "failed": "ErrorPage"})
// Start the flow
if flow.Error != nil {
panic(flow.Error)
}
- flow.Start(context.Background(), "0.0.0.0:8082")
+
+ fmt.Println("Starting SMS DAG server on http://0.0.0.0:8083")
+ fmt.Println("Navigate to the URL to access the SMS form")
+ flow.Start(context.Background(), "0.0.0.0:8083")
}
-type FormStep1 struct {
+// SMSFormNode - Initial form to collect SMS data
+type SMSFormNode struct {
dag.Operation
}
-func (p *FormStep1) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
- bt := []byte(`
+func (s *SMSFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
+ // Check if this is a form submission
+ var inputData map[string]any
+ if task.Payload != nil && len(task.Payload) > 0 {
+ if err := json.Unmarshal(task.Payload, &inputData); err == nil {
+ // If we have valid input data, pass it through for validation
+ return mq.Result{Payload: task.Payload, Ctx: ctx}
+ }
+ }
+
+ // Otherwise, show the form
+ htmlTemplate := `
+
-
+
+ SMS Sender
+
+
-
-
+ 📱 SMS Sender
+
+
Send SMS messages through our secure DAG workflow
+
+
+
+
+
+
+