From a5523fe030b7d8a48b7e73aafd980c1dca05751d Mon Sep 17 00:00:00 2001 From: Oarkflow Date: Fri, 19 Sep 2025 11:48:44 +0545 Subject: [PATCH] update --- examples/complex_dag_pages/main.go | 446 +++++++++++++++++++++++++++++ 1 file changed, 446 insertions(+) create mode 100644 examples/complex_dag_pages/main.go diff --git a/examples/complex_dag_pages/main.go b/examples/complex_dag_pages/main.go new file mode 100644 index 0000000..0caa4b6 --- /dev/null +++ b/examples/complex_dag_pages/main.go @@ -0,0 +1,446 @@ +package main + +import ( + "context" + "fmt" + "regexp" + "strings" + + "github.com/oarkflow/json" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +// loginSubDAG creates a login sub-DAG with page for authentication +func loginSubDAG() *dag.DAG { + login := dag.NewDAG("Login Sub DAG", "login-sub-dag", func(taskID string, result mq.Result) { + fmt.Printf("Login Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload)) + }, mq.WithSyncMode(true)) + + login. + AddNode(dag.Page, "Login Page", "login-page", &LoginPage{}). + AddNode(dag.Function, "Verify Credentials", "verify-credentials", &VerifyCredentials{}). + AddNode(dag.Function, "Generate Token", "generate-token", &GenerateToken{}). + AddEdge(dag.Simple, "Login to Verify", "login-page", "verify-credentials"). + AddEdge(dag.Simple, "Verify to Token", "verify-credentials", "generate-token") + + return login +} + +// phoneProcessingSubDAG creates a sub-DAG for processing phone numbers +func phoneProcessingSubDAG() *dag.DAG { + phone := dag.NewDAG("Phone Processing Sub DAG", "phone-processing-sub-dag", func(taskID string, result mq.Result) { + fmt.Printf("Phone Processing Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload)) + }, mq.WithSyncMode(true)) + + phone. + AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{}). + AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{}). + AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{}). + AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{}). + AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{}). + AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{}). + AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop"). + AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone"). + AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"}). + AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid"). + AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid"). + AddEdge(dag.Simple, "Loop to Collect", "phone-loop", "collect-valid") + + return phone +} + +func main() { + flow := dag.NewDAG("Complex Phone Processing DAG with Pages", "complex-phone-dag", func(taskID string, result mq.Result) { + fmt.Printf("Complex DAG Final result for task %s: %s\n", taskID, string(result.Payload)) + }) + flow.ConfigureMemoryStorage() + + // Main nodes + flow.AddNode(dag.Function, "Initialize", "init", &Initialize{}, true) + flow.AddDAGNode(dag.Function, "Login Process", "login", loginSubDAG()) + flow.AddNode(dag.Function, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{}) + flow.AddDAGNode(dag.Function, "Process Phones", "process-phones", phoneProcessingSubDAG()) + flow.AddNode(dag.Function, "Generate Report", "generate-report", &GenerateReport{}) + flow.AddNode(dag.Function, "Send Summary Email", "send-summary", &SendSummaryEmail{}) + flow.AddNode(dag.Function, "Final Cleanup", "cleanup", &FinalCleanup{}) + + // Edges + flow.AddEdge(dag.Simple, "Init to Login", "init", "login") + flow.AddEdge(dag.Simple, "Login to Upload", "login", "upload-page") + flow.AddEdge(dag.Simple, "Upload to Process", "upload-page", "process-phones") + flow.AddEdge(dag.Simple, "Process to Report", "process-phones", "generate-report") + flow.AddEdge(dag.Simple, "Report to Summary", "generate-report", "send-summary") + flow.AddEdge(dag.Simple, "Summary to Cleanup", "send-summary", "cleanup") + + // Sample data for testing + data := map[string]interface{}{ + "user_id": "user123", + "session_data": map[string]interface{}{ + "authenticated": false, + }, + "phone_data": map[string]interface{}{ + "format": "csv", + "content": "name,phone\nJohn Doe,+1234567890\nJane Smith,+1987654321\nBob Johnson,invalid-phone\nAlice Brown,+1555123456", + }, + } + + jsonData, _ := json.Marshal(data) + if flow.Error != nil { + panic(flow.Error) + } + + rs := flow.Process(context.Background(), jsonData) + if rs.Error != nil { + panic(rs.Error) + } + fmt.Println("Complex Phone DAG Status:", rs.Status, "Topic:", rs.Topic) + fmt.Println("Final Payload:", string(rs.Payload)) +} + +// Task implementations + +type Initialize struct { + dag.Operation +} + +func (p *Initialize) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("Initialize Error: %s", err.Error()), Ctx: ctx} + } + data["initialized"] = true + data["timestamp"] = "2025-09-19T12:00:00Z" + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type LoginPage struct { + dag.Operation +} + +func (p *LoginPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("LoginPage Error: %s", err.Error()), Ctx: ctx} + } + + // Simulate user input from page + data["credentials"] = map[string]interface{}{ + "username": "admin", + "password": "password123", + } + data["login_attempted"] = true + + updatedPayload, _ := json.Marshal(data) + return mq.Result{ + Payload: updatedPayload, + Ctx: ctx, + } +} + +type VerifyCredentials struct { + dag.Operation +} + +func (p *VerifyCredentials) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("VerifyCredentials Error: %s", err.Error()), Ctx: ctx} + } + + credentials, ok := data["credentials"].(map[string]interface{}) + if !ok { + return mq.Result{Error: fmt.Errorf("credentials not found"), Ctx: ctx} + } + + username, _ := credentials["username"].(string) + password, _ := credentials["password"].(string) + + // Simple verification logic + if username == "admin" && password == "password123" { + data["authenticated"] = true + data["user_role"] = "administrator" + } else { + data["authenticated"] = false + data["error"] = "Invalid credentials" + } + + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type GenerateToken struct { + dag.Operation +} + +func (p *GenerateToken) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("GenerateToken Error: %s", err.Error()), Ctx: ctx} + } + + if authenticated, ok := data["authenticated"].(bool); ok && authenticated { + data["auth_token"] = "jwt_token_123456789" + data["token_expires"] = "2025-09-19T13:00:00Z" + } + + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type UploadPhoneDataPage struct { + dag.Operation +} + +func (p *UploadPhoneDataPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("UploadPhoneDataPage Error: %s", err.Error()), Ctx: ctx} + } + + // Simulate user interaction - in a real scenario, this would be user input + // The phone data is already in the payload from initialization + data["upload_completed"] = true + data["uploaded_at"] = "2025-09-19T12:05:00Z" + data["user_interaction"] = map[string]interface{}{ + "confirmed_upload": true, + "upload_method": "file_upload", + } + + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type ParsePhoneNumbers struct { + dag.Operation +} + +func (p *ParsePhoneNumbers) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("ParsePhoneNumbers Error: %s", err.Error()), Ctx: ctx} + } + + phoneData, ok := data["phone_data"].(map[string]interface{}) + if !ok { + return mq.Result{Error: fmt.Errorf("phone_data not found"), Ctx: ctx} + } + + content, ok := phoneData["content"].(string) + if !ok { + return mq.Result{Error: fmt.Errorf("phone data content not found"), Ctx: ctx} + } + + var phones []map[string]interface{} + + // Parse CSV content + lines := strings.Split(content, "\n") + if len(lines) > 1 { + headers := strings.Split(lines[0], ",") + for i := 1; i < len(lines); i++ { + if lines[i] == "" { + continue + } + values := strings.Split(lines[i], ",") + if len(values) >= len(headers) { + phone := make(map[string]interface{}) + for j, header := range headers { + phone[strings.TrimSpace(header)] = strings.TrimSpace(values[j]) + } + phones = append(phones, phone) + } + } + } + + data["parsed_phones"] = phones + data["total_phones"] = len(phones) + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type PhoneLoop struct { + dag.Operation +} + +func (p *PhoneLoop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("PhoneLoop Error: %s", err.Error()), Ctx: ctx} + } + + // Extract parsed phones for iteration + if phones, ok := data["parsed_phones"].([]interface{}); ok { + updatedPayload, _ := json.Marshal(phones) + return mq.Result{Payload: updatedPayload, Ctx: ctx} + } + + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type ValidatePhone struct { + dag.Operation +} + +func (p *ValidatePhone) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var phone map[string]interface{} + if err := json.Unmarshal(task.Payload, &phone); err != nil { + return mq.Result{Error: fmt.Errorf("ValidatePhone Error: %s", err.Error()), Ctx: ctx} + } + + phoneStr, ok := phone["phone"].(string) + if !ok { + return mq.Result{Payload: task.Payload, ConditionStatus: "invalid", Ctx: ctx} + } + + // Simple phone validation regex (supports international format) + validPhone := regexp.MustCompile(`^\+?[1-9]\d{1,14}$`) + if validPhone.MatchString(phoneStr) { + phone["valid"] = true + phone["formatted_phone"] = phoneStr + return mq.Result{Payload: task.Payload, ConditionStatus: "valid", Ctx: ctx} + } + + phone["valid"] = false + phone["validation_error"] = "Invalid phone number format" + return mq.Result{Payload: task.Payload, ConditionStatus: "invalid", Ctx: ctx} +} + +type SendWelcomeSMS struct { + dag.Operation +} + +func (p *SendWelcomeSMS) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var phone map[string]interface{} + if err := json.Unmarshal(task.Payload, &phone); err != nil { + return mq.Result{Error: fmt.Errorf("SendWelcomeSMS Error: %s", err.Error()), Ctx: ctx} + } + + phoneStr, ok := phone["phone"].(string) + if !ok { + return mq.Result{Error: fmt.Errorf("phone number not found"), Ctx: ctx} + } + + // Simulate sending welcome SMS + phone["welcome_sent"] = true + phone["welcome_message"] = "Welcome! Your phone number has been verified." + phone["sent_at"] = "2025-09-19T12:10:00Z" + + fmt.Printf("Welcome SMS sent to %s\n", phoneStr) + updatedPayload, _ := json.Marshal(phone) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type CollectValidPhones struct { + dag.Operation +} + +func (p *CollectValidPhones) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + // This node collects all processed phone results + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type CollectInvalidPhones struct { + dag.Operation +} + +func (p *CollectInvalidPhones) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var phone map[string]interface{} + if err := json.Unmarshal(task.Payload, &phone); err != nil { + return mq.Result{Error: fmt.Errorf("CollectInvalidPhones Error: %s", err.Error()), Ctx: ctx} + } + + phone["discarded"] = true + phone["discard_reason"] = "Invalid phone number" + + fmt.Printf("Invalid phone discarded: %v\n", phone["phone"]) + updatedPayload, _ := json.Marshal(phone) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type GenerateReport struct { + dag.Operation +} + +func (p *GenerateReport) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + // If it's an array, wrap it in a map + var arr []interface{} + if err2 := json.Unmarshal(task.Payload, &arr); err2 != nil { + return mq.Result{Error: fmt.Errorf("GenerateReport Error: %s", err.Error()), Ctx: ctx} + } + data = map[string]interface{}{ + "processed_results": arr, + } + } + + // Generate processing report + validCount := 0 + invalidCount := 0 + + if results, ok := data["processed_results"].([]interface{}); ok { + for _, result := range results { + if resultMap, ok := result.(map[string]interface{}); ok { + if _, isValid := resultMap["welcome_sent"]; isValid { + validCount++ + } else if _, isInvalid := resultMap["discarded"]; isInvalid { + invalidCount++ + } + } + } + } + + report := map[string]interface{}{ + "total_processed": validCount + invalidCount, + "valid_phones": validCount, + "invalid_phones": invalidCount, + "processed_at": "2025-09-19T12:15:00Z", + "success": true, + } + + data["report"] = report + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type SendSummaryEmail struct { + dag.Operation +} + +func (p *SendSummaryEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("SendSummaryEmail Error: %s", err.Error()), Ctx: ctx} + } + + // Simulate sending summary email + data["summary_email_sent"] = true + data["summary_recipient"] = "admin@company.com" + data["summary_sent_at"] = "2025-09-19T12:20:00Z" + + fmt.Println("Summary email sent to admin") + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type FinalCleanup struct { + dag.Operation +} + +func (p *FinalCleanup) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]interface{} + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("FinalCleanup Error: %s", err.Error()), Ctx: ctx} + } + + // Perform final cleanup + data["completed"] = true + data["completed_at"] = "2025-09-19T12:25:00Z" + data["workflow_status"] = "success" + + fmt.Println("Workflow completed successfully") + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +}