From 295eec923c90b7e2435a1ef2962d25f644cc6f68 Mon Sep 17 00:00:00 2001 From: sujit Date: Thu, 31 Jul 2025 14:27:27 +0545 Subject: [PATCH] feat: add task completion --- dag/operation.go | 3 +- examples/email_notification_dag.go | 516 ++++++++++++++++------------- go.mod | 2 +- go.sum | 4 +- scheduler.go | 3 +- 5 files changed, 294 insertions(+), 234 deletions(-) diff --git a/dag/operation.go b/dag/operation.go index eb1343b..18e3565 100644 --- a/dag/operation.go +++ b/dag/operation.go @@ -13,7 +13,6 @@ import ( "github.com/oarkflow/dipper" "github.com/oarkflow/errors" "github.com/oarkflow/expr" - "github.com/oarkflow/xid" "golang.org/x/exp/maps" "github.com/oarkflow/mq" @@ -506,7 +505,7 @@ func init() { }) expr.AddFunction("uniqueid", func(params ...interface{}) (interface{}, error) { // create a new xid - return xid.New().String(), nil + return mq.NewID(), nil }) expr.AddFunction("now", func(params ...interface{}) (interface{}, error) { // get the current time in UTC diff --git a/examples/email_notification_dag.go b/examples/email_notification_dag.go index 1259b1c..4d1326b 100644 --- a/examples/email_notification_dag.go +++ b/examples/email_notification_dag.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "regexp" + "sort" "strings" "time" @@ -18,13 +19,153 @@ import ( ) func main() { + contactFormSchema := map[string]any{ + "properties": map[string]any{ + "first_name": map[string]any{ + "type": "string", + "title": "👤 First Name", + "order": 1, + "ui": map[string]any{ + "control": "input", + "class": "form-group", + "name": "first_name", + }, + }, + "last_name": map[string]any{ + "type": "string", + "title": "👤 Last Name", + "order": 2, + "ui": map[string]any{ + "control": "input", + "class": "form-group", + "name": "last_name", + }, + }, + "email": map[string]any{ + "type": "email", + "title": "📧 Email Address", + "order": 3, + "ui": map[string]any{ + "control": "input", + "class": "form-group", + "name": "email", + }, + }, + "user_type": map[string]any{ + "type": "string", + "title": "👥 User Type", + "order": 4, + "ui": map[string]any{ + "control": "select", + "class": "form-group", + "name": "user_type", + "options": []any{"new", "premium", "standard"}, + }, + }, + "priority": map[string]any{ + "type": "string", + "title": "🚨 Priority Level", + "order": 5, + "ui": map[string]any{ + "control": "select", + "class": "form-group", + "name": "priority", + "options": []any{"low", "medium", "high", "urgent"}, + }, + }, + "subject": map[string]any{ + "type": "string", + "title": "📋 Subject", + "order": 6, + "ui": map[string]any{ + "control": "input", + "class": "form-group", + "name": "subject", + }, + }, + "message": map[string]any{ + "type": "textarea", + "title": "💬 Message", + "order": 7, + "ui": map[string]any{ + "control": "textarea", + "class": "form-group", + "name": "message", + }, + }, + }, + "required": []any{"first_name", "last_name", "email", "user_type", "priority", "subject", "message"}, + } + + contactFormLayout := ` + + + + Contact Us - Email Notification System + + + +
+

📧 Contact Us

+
Advanced Email Notification System with DAG Workflow
+ +
+

🔄 Smart Routing: Our system automatically routes your message based on your user type and preferences.

+
+ +
+
+ 📱 Instant Notifications
+ Real-time email delivery +
+
+ 🎯 Smart Targeting
+ User-specific content +
+
+ 🔒 Secure Processing
+ Enterprise-grade security +
+
+ +
+
+ {{form_fields}} +
+ + +
+
+ +` + flow := dag.NewDAG("Email Notification System", "email-notification", func(taskID string, result mq.Result) { fmt.Printf("Email notification workflow completed for task %s: %s\n", taskID, string(result.Payload)) - }) + }, mq.WithSyncMode(true)) // Add workflow nodes // Note: Page nodes have no timeout by default, allowing users unlimited time for form input - flow.AddNode(dag.Page, "Contact Form", "ContactForm", &ContactFormNode{}, true) + flow.AddNode(dag.Page, "Contact Form", "ContactForm", &ConfigurableFormNode{Schema: contactFormSchema, HTMLLayout: contactFormLayout}, true) flow.AddNode(dag.Function, "Validate Contact Data", "ValidateContact", &ValidateContactNode{}) flow.AddNode(dag.Function, "Check User Type", "CheckUserType", &CheckUserTypeNode{}) flow.AddNode(dag.Function, "Send Welcome Email", "SendWelcomeEmail", &SendWelcomeEmailNode{}) @@ -67,243 +208,51 @@ func main() { flow.Start(context.Background(), "0.0.0.0:8084") } -// ContactFormNode - Contact form with validation -type ContactFormNode struct { +// ConfigurableFormNode - Page node with JSONSchema-based fields and custom HTML layout +// Usage: Pass JSONSchema and HTML layout to the node for dynamic form rendering and validation + +type ConfigurableFormNode struct { dag.Operation + Schema map[string]any // JSONSchema for fields and requirements + HTMLLayout string // HTML layout template with placeholders for fields + fieldsCache []fieldInfo // Cached field order and definitions + cacheInitialized bool // Whether cache is initialized } -func (c *ContactFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - // Check if this is a form submission +// fieldInfo caches field metadata for rendering +type fieldInfo struct { + name string + order int + def map[string]any + definedIndex int // fallback to definition order +} + +func (c *ConfigurableFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { var inputData map[string]any if task.Payload != nil && len(task.Payload) > 0 { if err := json.Unmarshal(task.Payload, &inputData); err == nil { - // If we have valid input data, pass it through for validation + // Validate input against schema requirements + validationErrors := validateAgainstSchema(inputData, c.Schema) + if len(validationErrors) > 0 { + inputData["validation_error"] = validationErrors[0] // Show first error + bt, _ := json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"} + } return mq.Result{Payload: task.Payload, Ctx: ctx} } } - // Otherwise, show the form - htmlTemplate := ` - - - - Contact Us - Email Notification System - - - -
-

📧 Contact Us

-
Advanced Email Notification System with DAG Workflow
- -
-

🔄 Smart Routing: Our system automatically routes your message based on your user type and preferences.

-
- -
-
- 📱 Instant Notifications
- Real-time email delivery -
-
- 🎯 Smart Targeting
- User-specific content -
-
- 🔒 Secure Processing
- Enterprise-grade security -
-
- -
-
-
- - -
-
- - -
-
- -
- - -
- -
-
- - -
-
- - -
-
- -
- - -
- -
- - -
- - -
-
- -` + // Initialize cache if not done + if !c.cacheInitialized { + c.fieldsCache = parseFieldsFromSchema(c.Schema) + c.cacheInitialized = true + } + // Render form fields from cached field order + formFieldsHTML := renderFieldsFromCache(c.fieldsCache) parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) - rs, err := parser.ParseTemplate(htmlTemplate, map[string]any{ + layout := strings.Replace(c.HTMLLayout, "{{form_fields}}", formFieldsHTML, 1) + rs, err := parser.ParseTemplate(layout, map[string]any{ "task_id": ctx.Value("task_id"), }) if err != nil { @@ -319,6 +268,119 @@ func (c *ContactFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Res return mq.Result{Payload: bt, Ctx: ctx} } +// validateAgainstSchema checks inputData against JSONSchema requirements +func validateAgainstSchema(inputData map[string]any, schema map[string]any) []string { + var errors []string + if _, ok := schema["properties"].(map[string]any); ok { + if required, ok := schema["required"].([]any); ok { + for _, field := range required { + fname := field.(string) + if val, exists := inputData[fname]; !exists || val == "" { + errors = append(errors, fname+" is required") + } + } + } + // Add more validation as needed (type, format, etc.) + } + return errors +} + +// parseFieldsFromSchema extracts and sorts fields from schema, preserving order +func parseFieldsFromSchema(schema map[string]any) []fieldInfo { + var fields []fieldInfo + if props, ok := schema["properties"].(map[string]any); ok { + keyOrder := make([]string, 0, len(props)) + for k := range props { + keyOrder = append(keyOrder, k) + } + for idx, name := range keyOrder { + field := props[name].(map[string]any) + order := -1 + if o, ok := field["order"].(int); ok { + order = o + } else if o, ok := field["order"].(float64); ok { + order = int(o) + } + fields = append(fields, fieldInfo{name: name, order: order, def: field, definedIndex: idx}) + } + if len(fields) > 1 { + sort.SliceStable(fields, func(i, j int) bool { + if fields[i].order != -1 && fields[j].order != -1 { + return fields[i].order < fields[j].order + } else if fields[i].order != -1 { + return true + } else if fields[j].order != -1 { + return false + } + return fields[i].definedIndex < fields[j].definedIndex + }) + } + } + return fields +} + +// renderFieldsFromCache generates HTML for form fields from cached field order +func renderFieldsFromCache(fields []fieldInfo) string { + var html strings.Builder + for _, f := range fields { + label := f.name + if l, ok := f.def["title"].(string); ok { + label = l + } + // UI config + ui := map[string]any{} + if uiRaw, ok := f.def["ui"].(map[string]any); ok { + ui = uiRaw + } + // Control type + controlType := "input" + if ct, ok := ui["control"].(string); ok { + controlType = ct + } + // CSS classes + classes := "form-group" + if cls, ok := ui["class"].(string); ok { + classes = cls + } + // Name attribute + nameAttr := f.name + if n, ok := ui["name"].(string); ok { + nameAttr = n + } + // Type + typeStr := "text" + if t, ok := f.def["type"].(string); ok { + switch t { + case "string": + typeStr = "text" + case "email": + typeStr = "email" + case "number": + typeStr = "number" + case "textarea": + typeStr = "textarea" + } + } + // Render control + if controlType == "textarea" || typeStr == "textarea" { + html.WriteString(fmt.Sprintf(`
`, classes, nameAttr, label, nameAttr, nameAttr, label)) + } else if controlType == "select" { + // Optionally support select with options in ui["options"] + optionsHTML := "" + if opts, ok := ui["options"].([]any); ok { + for _, opt := range opts { + optStr := fmt.Sprintf("%v", opt) + optionsHTML += fmt.Sprintf(``, optStr, optStr) + } + } + html.WriteString(fmt.Sprintf(`
`, classes, nameAttr, label, nameAttr, nameAttr, optionsHTML)) + } else { + html.WriteString(fmt.Sprintf(`
`, classes, nameAttr, label, typeStr, nameAttr, nameAttr, label)) + } + } + return html.String() +} + // ValidateContactNode - Validates contact form data type ValidateContactNode struct { dag.Operation diff --git a/go.mod b/go.mod index ab50191..ee309d9 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/oarkflow/jet v0.0.4 github.com/oarkflow/json v0.0.21 github.com/oarkflow/log v1.0.79 - github.com/oarkflow/xid v1.2.5 + github.com/oarkflow/xid v1.2.8 github.com/prometheus/client_golang v1.21.1 golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 golang.org/x/time v0.11.0 diff --git a/go.sum b/go.sum index 4830dda..2706121 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,8 @@ github.com/oarkflow/json v0.0.21 h1:tBx4ufwC48UAd3fUCqLVH/dERpnZ85Dgw5/h7H2HMoM= github.com/oarkflow/json v0.0.21/go.mod h1:maoLmQZJ/8pF1MugtpVqzHJ59dH1Z7xFSNkhl9BQjYo= github.com/oarkflow/log v1.0.79 h1:DxhtkBGG+pUu6cudSVw5g75FbKEQJkij5w7n5AEN00M= github.com/oarkflow/log v1.0.79/go.mod h1:U/4chr1DyOiQvS6JiQpjYTCJhK7RGR8xrXPsGlouLzM= -github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho= -github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo= +github.com/oarkflow/xid v1.2.8 h1:uCIX61Binq2RPMsqImZM6pPGzoZTmRyD6jguxF9aAA0= +github.com/oarkflow/xid v1.2.8/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk= diff --git a/scheduler.go b/scheduler.go index 5beda7c..5e13f43 100644 --- a/scheduler.go +++ b/scheduler.go @@ -12,7 +12,6 @@ import ( "time" "github.com/oarkflow/log" - "github.com/oarkflow/xid" "github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage/memory" @@ -585,7 +584,7 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule } stop := make(chan struct{}) newTask := &ScheduledTask{ - id: xid.New().String(), + id: NewID(), ctx: ctx, handler: options.Handler, payload: payload,