Compare commits

...

2 Commits

Author SHA1 Message Date
sujit
61fff5b6fd update 2025-09-20 00:44:47 +05:45
sujit
2b37061523 update 2025-09-19 23:43:57 +05:45
2 changed files with 27 additions and 6 deletions

View File

@@ -544,6 +544,15 @@ func (tm *DAG) processTaskInternal(ctx context.Context, task *mq.Task) mq.Result
)
}
} else {
if manager.result != nil && manager.result.Status == mq.Completed {
currentKey := tm.getCurrentNode(manager)
currentNode := strings.Split(currentKey, Delimiter)[0]
isLast, err := tm.IsLastNode(currentNode)
if err == nil && isLast {
return *manager.result
}
}
// Replace the manager's result channel so waiting here is wired to this call.
manager.resultCh = resultCh
tm.Logger().Info("Resuming task",

View File

@@ -26,15 +26,15 @@ func main() {
// Add SMS workflow nodes
// Note: Page nodes have no timeout by default, allowing users unlimited time for form input
// flow.AddDAGNode(dag.Page, "Login", "login", loginSubDAG(), true)
flow.AddNode(dag.Page, "SMS Form", "SMSForm", &SMSFormNode{}, true)
flow.AddDAGNode(dag.Page, "Login", "login", loginSubDAG(), true)
flow.AddNode(dag.Page, "SMS Form", "SMSForm", &SMSFormNode{})
flow.AddNode(dag.Function, "Validate Input", "ValidateInput", &ValidateInputNode{})
flow.AddNode(dag.Function, "Send SMS", "SendSMS", &SendSMSNode{})
flow.AddNode(dag.Page, "SMS Result", "SMSResult", &SMSResultNode{})
flow.AddNode(dag.Page, "Error Page", "ErrorPage", &ErrorPageNode{})
// Define edges for SMS workflow
// flow.AddEdge(dag.Simple, "Login to Form", "login", "SMSForm")
flow.AddEdge(dag.Simple, "Login to Form", "login", "SMSForm")
flow.AddEdge(dag.Simple, "Form to Validation", "SMSForm", "ValidateInput")
flow.AddCondition("ValidateInput", map[string]string{"valid": "SendSMS"}) // Removed invalid -> ErrorPage since we use ResetTo
flow.AddCondition("SendSMS", map[string]string{"sent": "SMSResult", "failed": "ErrorPage"})
@@ -267,6 +267,14 @@ func (p *VerifyCredentials) ProcessTask(ctx context.Context, task *mq.Task) mq.R
} else {
data["authenticated"] = false
data["error"] = "Invalid credentials"
data["validation_error"] = "Phone number is required"
data["error_field"] = "phone"
bt, _ := json.Marshal(data)
return mq.Result{
Payload: bt,
Ctx: ctx,
ResetTo: "back", // Reset to form instead of going to error page
}
}
delete(data, "html_content")
updatedPayload, _ := json.Marshal(data)
@@ -531,7 +539,7 @@ func (v *ValidateInputNode) ProcessTask(ctx context.Context, task *mq.Task) mq.R
return mq.Result{
Payload: bt,
Ctx: ctx,
ResetTo: "SMSForm", // Reset to form instead of going to error page
ResetTo: "back", // Reset to form instead of going to error page
}
}
@@ -676,8 +684,12 @@ type SMSResultNode struct {
func (r *SMSResultNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var inputData map[string]any
if err := json.Unmarshal(task.Payload, &inputData); err != nil {
return mq.Result{Error: err, Ctx: ctx}
if len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &inputData); err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
} else {
inputData = make(map[string]any)
}
htmlTemplate := `