diff --git a/examples/form.go b/examples/form.go
index 316f164..9e23aa8 100644
--- a/examples/form.go
+++ b/examples/form.go
@@ -25,6 +25,8 @@ func main() {
// Add SMS workflow nodes
// Note: Page nodes have no timeout by default, allowing users unlimited time for form input
+
+ flow.AddDAGNode(dag.Page, "Login", "login", loginSubDAG(), true)
flow.AddNode(dag.Page, "SMS Form", "SMSForm", &SMSFormNode{})
flow.AddNode(dag.Function, "Validate Input", "ValidateInput", &ValidateInputNode{})
flow.AddNode(dag.Function, "Send SMS", "SendSMS", &SendSMSNode{})
@@ -32,6 +34,7 @@ func main() {
flow.AddNode(dag.Page, "Error Page", "ErrorPage", &ErrorPageNode{})
// Define edges for SMS workflow
+ flow.AddEdge(dag.Simple, "Login to Form", "login", "SMSForm")
flow.AddEdge(dag.Simple, "Form to Validation", "SMSForm", "ValidateInput")
flow.AddCondition("ValidateInput", map[string]string{"valid": "SendSMS", "invalid": "ErrorPage"})
flow.AddCondition("SendSMS", map[string]string{"sent": "SMSResult", "failed": "ErrorPage"})
@@ -46,6 +49,250 @@ func main() {
flow.Start(context.Background(), "0.0.0.0:8083")
}
+// loginSubDAG creates a login sub-DAG with page for authentication
+func loginSubDAG() *dag.DAG {
+ login := dag.NewDAG("Login Sub DAG", "login-sub-dag", func(taskID string, result mq.Result) {
+ fmt.Printf("Login Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload))
+ }, mq.WithSyncMode(true))
+
+ login.
+ AddNode(dag.Page, "Login Page", "login-page", &LoginPage{}).
+ AddNode(dag.Function, "Verify Credentials", "verify-credentials", &VerifyCredentials{}).
+ AddNode(dag.Function, "Generate Token", "generate-token", &GenerateToken{}).
+ AddEdge(dag.Simple, "Login to Verify", "login-page", "verify-credentials").
+ AddEdge(dag.Simple, "Verify to Token", "verify-credentials", "generate-token")
+
+ return login
+}
+
+type LoginPage struct {
+ dag.Operation
+}
+
+func (p *LoginPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
+ // Check if this is a form submission
+ var inputData map[string]interface{}
+ if len(task.Payload) > 0 {
+ if err := json.Unmarshal(task.Payload, &inputData); err == nil {
+ // Check if we have form data (username/password)
+ if formData, ok := inputData["form"].(map[string]interface{}); ok {
+ // This is a form submission, pass it through for verification
+ credentials := map[string]interface{}{
+ "username": formData["username"],
+ "password": formData["password"],
+ }
+ inputData["credentials"] = credentials
+ updatedPayload, _ := json.Marshal(inputData)
+ return mq.Result{Payload: updatedPayload, Ctx: ctx}
+ }
+ }
+ }
+
+ // Otherwise, show the form
+ var data map[string]interface{}
+ if err := json.Unmarshal(task.Payload, &data); err != nil {
+ data = make(map[string]interface{})
+ }
+
+ // HTML content for login page
+ htmlContent := `
+
+
+
+
+
+
Phone Processing System - Login
+
+
+
+
+
+
+
+`
+
+ parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
+ rs, err := parser.ParseTemplate(htmlContent, map[string]any{
+ "task_id": ctx.Value("task_id"),
+ })
+ if err != nil {
+ return mq.Result{Error: err, Ctx: ctx}
+ }
+
+ ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
+ resultData := map[string]any{
+ "html_content": rs,
+ "step": "login",
+ "data": data,
+ }
+
+ resultPayload, _ := json.Marshal(resultData)
+ return mq.Result{
+ Payload: resultPayload,
+ Ctx: ctx,
+ }
+}
+
+type VerifyCredentials struct {
+ dag.Operation
+}
+
+func (p *VerifyCredentials) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
+ var data map[string]interface{}
+ if err := json.Unmarshal(task.Payload, &data); err != nil {
+ return mq.Result{Error: fmt.Errorf("VerifyCredentials Error: %s", err.Error()), Ctx: ctx}
+ }
+
+ username, _ := data["username"].(string)
+ password, _ := data["password"].(string)
+
+ // Simple verification logic
+ if username == "admin" && password == "password123" {
+ data["authenticated"] = true
+ data["user_role"] = "administrator"
+ } else {
+ data["authenticated"] = false
+ data["error"] = "Invalid credentials"
+ }
+ delete(data, "html_content")
+ updatedPayload, _ := json.Marshal(data)
+ return mq.Result{Payload: updatedPayload, Ctx: ctx}
+}
+
+type GenerateToken struct {
+ dag.Operation
+}
+
+func (p *GenerateToken) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
+ var data map[string]interface{}
+ if err := json.Unmarshal(task.Payload, &data); err != nil {
+ return mq.Result{Error: fmt.Errorf("GenerateToken Error: %s", err.Error()), Ctx: ctx}
+ }
+
+ if authenticated, ok := data["authenticated"].(bool); ok && authenticated {
+ data["auth_token"] = "jwt_token_123456789"
+ data["token_expires"] = "2025-09-19T13:00:00Z"
+ }
+
+ delete(data, "html_content")
+ updatedPayload, _ := json.Marshal(data)
+ return mq.Result{Payload: updatedPayload, Ctx: ctx}
+}
+
// SMSFormNode - Initial form to collect SMS data
type SMSFormNode struct {
dag.Operation