This commit is contained in:
Oarkflow
2025-08-05 19:33:35 +05:45
parent cef019c486
commit d347e2e253
6 changed files with 160 additions and 8 deletions

View File

@@ -437,6 +437,9 @@ func (tm *DAG) processTaskInternal(ctx context.Context, task *mq.Task) mq.Result
currentKey := tm.getCurrentNode(manager) currentKey := tm.getCurrentNode(manager)
currentNode := strings.Split(currentKey, Delimiter)[0] currentNode := strings.Split(currentKey, Delimiter)[0]
node, exists := tm.nodes.Get(currentNode) node, exists := tm.nodes.Get(currentNode)
if exists {
fmt.Println(isDAGNode(node))
}
method, ok := ctx.Value("method").(string) method, ok := ctx.Value("method").(string)
if method == "GET" && exists && node.NodeType == Page { if method == "GET" && exists && node.NodeType == Page {
ctx = context.WithValue(ctx, "initial_node", currentNode) ctx = context.WithValue(ctx, "initial_node", currentNode)

View File

@@ -225,7 +225,7 @@ func (tm *TaskManager) processNode(exec *task) {
attempts := 0 attempts := 0
for { for {
// Tracing start (stubbed) // Tracing start (stubbed)
log.Printf("Tracing: Start processing node %s (attempt %d)", exec.nodeID, attempts+1) log.Printf("Tracing: Start processing node %s (attempt %d) on flow %s", exec.nodeID, attempts+1, tm.dag.key)
result = node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID, mq.WithDAG(tm.dag))) result = node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID, mq.WithDAG(tm.dag)))
if result.Error != nil { if result.Error != nil {
if te, ok := result.Error.(TaskError); ok && te.Recoverable { if te, ok := result.Error.(TaskError); ok && te.Recoverable {
@@ -253,7 +253,7 @@ func (tm *TaskManager) processNode(exec *task) {
} }
break break
} }
log.Printf("Tracing: End processing node %s", exec.nodeID) log.Printf("Tracing: End processing node %s on flow %s", exec.nodeID, tm.dag.key)
nodeLatency := time.Since(startTime) nodeLatency := time.Since(startTime)
// Invoke PostProcessHook if available. // Invoke PostProcessHook if available.

View File

@@ -20,17 +20,75 @@ import (
"github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/consts"
) )
type ValidateLoginNode struct {
dag.Operation
}
func (v *ValidateLoginNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var inputData map[string]any
if err := json.Unmarshal(task.Payload, &inputData); err != nil {
return mq.Result{Error: fmt.Errorf("invalid input data: %v", err), Ctx: ctx}
}
username, _ := inputData["username"].(string)
password, _ := inputData["password"].(string)
if username == "" || password == "" {
inputData["validation_error"] = "Username and password are required"
inputData["error_field"] = "credentials"
bt, _ := json.Marshal(inputData)
return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"}
}
// Simulate user validation
if username == "admin" && password == "password" {
inputData["validation_status"] = "success"
} else {
inputData["validation_error"] = "Invalid username or password"
inputData["error_field"] = "credentials"
bt, _ := json.Marshal(inputData)
return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"}
}
validatedData := map[string]any{
"username": username,
"validated_at": time.Now().Format("2006-01-02 15:04:05"),
"validation_status": "success",
}
bt, _ := json.Marshal(validatedData)
return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "valid"}
}
func loginDAG() *dag.DAG {
flow := dag.NewDAG("Login Flow", "login-flow", func(taskID string, result mq.Result) {
fmt.Printf("Login flow completed for task %s: %s\n", taskID, string(result.Payload))
}, mq.WithSyncMode(true), mq.WithLogger(nil))
renderHTML := handlers.NewRenderHTMLNode("render-html")
renderHTML.Payload.Data = map[string]any{
"schema_file": "login.json",
}
flow.AddNode(dag.Page, "Login Form", "LoginForm", renderHTML, true)
flow.AddNode(dag.Function, "Validate Login", "ValidateLogin", &ValidateLoginNode{})
flow.AddNode(dag.Page, "Error Page", "ErrorPage", &EmailErrorPageNode{})
flow.AddEdge(dag.Simple, "Validate Login", "LoginForm", "ValidateLogin")
flow.AddCondition("ValidateLogin", map[string]string{
"invalid": "ErrorPage",
})
return flow
}
func main() { func main() {
flow := dag.NewDAG("Email Notification System", "email-notification", func(taskID string, result mq.Result) { flow := dag.NewDAG("Email Notification System", "email-notification", func(taskID string, result mq.Result) {
fmt.Printf("Email notification workflow completed for task %s: %s\n", taskID, string(utils.RemoveRecursiveFromJSON(result.Payload, "html_content"))) fmt.Printf("Email notification workflow completed for task %s: %s\n", taskID, string(utils.RemoveRecursiveFromJSON(result.Payload, "html_content")))
}, mq.WithSyncMode(true)) }, mq.WithSyncMode(true), mq.WithLogger(nil))
renderHTML := handlers.NewRenderHTMLNode("render-html") renderHTML := handlers.NewRenderHTMLNode("render-html")
renderHTML.Payload.Data = map[string]any{ renderHTML.Payload.Data = map[string]any{
"schema_file": "schema.json", "schema_file": "schema.json",
} // Add workflow nodes }
// Note: Page nodes have no timeout by default, allowing users unlimited time for form input flow.AddDAGNode(dag.Page, "CheckLogin", "Login", loginDAG(), true)
flow.AddNode(dag.Page, "Contact Form", "ContactForm", renderHTML, true) flow.AddNode(dag.Page, "Contact Form", "ContactForm", renderHTML)
flow.AddNode(dag.Function, "Validate Contact Data", "ValidateContact", &ValidateContactNode{}) flow.AddNode(dag.Function, "Validate Contact Data", "ValidateContact", &ValidateContactNode{})
flow.AddNode(dag.Function, "Check User Type", "CheckUserType", &CheckUserTypeNode{}) flow.AddNode(dag.Function, "Check User Type", "CheckUserType", &CheckUserTypeNode{})
flow.AddNode(dag.Function, "Send Welcome Email", "SendWelcomeEmail", &SendWelcomeEmailNode{}) flow.AddNode(dag.Function, "Send Welcome Email", "SendWelcomeEmail", &SendWelcomeEmailNode{})
@@ -38,7 +96,7 @@ func main() {
flow.AddNode(dag.Function, "Send Standard Email", "SendStandardEmail", &SendStandardEmailNode{}) flow.AddNode(dag.Function, "Send Standard Email", "SendStandardEmail", &SendStandardEmailNode{})
flow.AddNode(dag.Page, "Success Page", "SuccessPage", &SuccessPageNode{}) flow.AddNode(dag.Page, "Success Page", "SuccessPage", &SuccessPageNode{})
flow.AddNode(dag.Page, "Error Page", "ErrorPage", &EmailErrorPageNode{}) flow.AddNode(dag.Page, "Error Page", "ErrorPage", &EmailErrorPageNode{})
flow.AddEdge(dag.Simple, "Login to Contact", "Login", "ContactForm")
// Define conditional flow // Define conditional flow
flow.AddEdge(dag.Simple, "Form to Validation", "ContactForm", "ValidateContact") flow.AddEdge(dag.Simple, "Form to Validation", "ContactForm", "ValidateContact")
flow.AddCondition("ValidateContact", map[string]string{ flow.AddCondition("ValidateContact", map[string]string{

63
examples/login.json Normal file
View File

@@ -0,0 +1,63 @@
{
"type": "object",
"properties": {
"username": {
"type": "string",
"title": "Username or Email",
"order": 1,
"ui": {
"element": "input",
"type": "text",
"class": "form-group",
"name": "username",
"placeholder": "Enter your username or email"
}
},
"password": {
"type": "string",
"title": "Password",
"order": 2,
"ui": {
"element": "input",
"type": "password",
"class": "form-group",
"name": "password",
"placeholder": "Enter your password"
}
},
"remember_me": {
"type": "boolean",
"title": "Remember Me",
"order": 3,
"ui": {
"element": "input",
"type": "checkbox",
"class": "form-check",
"name": "remember_me"
}
}
},
"required": [ "username", "password" ],
"form": {
"class": "form-horizontal",
"action": "/process?task_id={{task_id}}&next=true",
"method": "POST",
"enctype": "application/x-www-form-urlencoded",
"groups": [
{
"title": "Login Credentials",
"fields": [ "username", "password", "remember_me" ]
}
],
"submit": {
"type": "submit",
"label": "Log In",
"class": "btn btn-primary"
},
"reset": {
"type": "reset",
"label": "Clear",
"class": "btn btn-secondary"
}
}
}

24
logger/null.go Normal file
View File

@@ -0,0 +1,24 @@
package logger
import "context"
type NullLogger struct{}
func (l *NullLogger) Debug(msg string, fields ...Field) {}
func (l *NullLogger) Info(msg string, fields ...Field) {}
func (l *NullLogger) Warn(msg string, fields ...Field) {}
func (l *NullLogger) Error(msg string, fields ...Field) {}
func NewNullLogger() *NullLogger {
return &NullLogger{}
}
func (l *NullLogger) WithFields(fields ...Field) Logger {
return l
}
func (l *NullLogger) WithContext(ctx context.Context) Logger {
return l
}

View File

@@ -161,8 +161,12 @@ func WithNotifyResponse(callback Callback) Option {
func WithLogger(log logger.Logger) Option { func WithLogger(log logger.Logger) Option {
return func(opts *Options) { return func(opts *Options) {
if log == nil {
opts.logger = logger.NewNullLogger()
} else {
opts.logger = log opts.logger = log
} }
}
} }
func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option { func WithWorkerPool(queueSize, numOfWorkers int, maxMemoryLoad int64) Option {