diff --git a/dag/operation.go b/dag/operation.go
index eb1343b..18e3565 100644
--- a/dag/operation.go
+++ b/dag/operation.go
@@ -13,7 +13,6 @@ import (
"github.com/oarkflow/dipper"
"github.com/oarkflow/errors"
"github.com/oarkflow/expr"
- "github.com/oarkflow/xid"
"golang.org/x/exp/maps"
"github.com/oarkflow/mq"
@@ -506,7 +505,7 @@ func init() {
})
expr.AddFunction("uniqueid", func(params ...interface{}) (interface{}, error) {
// create a new xid
- return xid.New().String(), nil
+ return mq.NewID(), nil
})
expr.AddFunction("now", func(params ...interface{}) (interface{}, error) {
// get the current time in UTC
diff --git a/examples/email_notification_dag.go b/examples/email_notification_dag.go
index 1259b1c..4d1326b 100644
--- a/examples/email_notification_dag.go
+++ b/examples/email_notification_dag.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"regexp"
+ "sort"
"strings"
"time"
@@ -18,13 +19,153 @@ import (
)
func main() {
+ contactFormSchema := map[string]any{
+ "properties": map[string]any{
+ "first_name": map[string]any{
+ "type": "string",
+ "title": "👤 First Name",
+ "order": 1,
+ "ui": map[string]any{
+ "control": "input",
+ "class": "form-group",
+ "name": "first_name",
+ },
+ },
+ "last_name": map[string]any{
+ "type": "string",
+ "title": "👤 Last Name",
+ "order": 2,
+ "ui": map[string]any{
+ "control": "input",
+ "class": "form-group",
+ "name": "last_name",
+ },
+ },
+ "email": map[string]any{
+ "type": "email",
+ "title": "📧 Email Address",
+ "order": 3,
+ "ui": map[string]any{
+ "control": "input",
+ "class": "form-group",
+ "name": "email",
+ },
+ },
+ "user_type": map[string]any{
+ "type": "string",
+ "title": "👥 User Type",
+ "order": 4,
+ "ui": map[string]any{
+ "control": "select",
+ "class": "form-group",
+ "name": "user_type",
+ "options": []any{"new", "premium", "standard"},
+ },
+ },
+ "priority": map[string]any{
+ "type": "string",
+ "title": "🚨 Priority Level",
+ "order": 5,
+ "ui": map[string]any{
+ "control": "select",
+ "class": "form-group",
+ "name": "priority",
+ "options": []any{"low", "medium", "high", "urgent"},
+ },
+ },
+ "subject": map[string]any{
+ "type": "string",
+ "title": "📋 Subject",
+ "order": 6,
+ "ui": map[string]any{
+ "control": "input",
+ "class": "form-group",
+ "name": "subject",
+ },
+ },
+ "message": map[string]any{
+ "type": "textarea",
+ "title": "💬 Message",
+ "order": 7,
+ "ui": map[string]any{
+ "control": "textarea",
+ "class": "form-group",
+ "name": "message",
+ },
+ },
+ },
+ "required": []any{"first_name", "last_name", "email", "user_type", "priority", "subject", "message"},
+ }
+
+ contactFormLayout := `
+
+
+
+ Contact Us - Email Notification System
+
+
+
+
+
+`
+
flow := dag.NewDAG("Email Notification System", "email-notification", func(taskID string, result mq.Result) {
fmt.Printf("Email notification workflow completed for task %s: %s\n", taskID, string(result.Payload))
- })
+ }, mq.WithSyncMode(true))
// Add workflow nodes
// Note: Page nodes have no timeout by default, allowing users unlimited time for form input
- flow.AddNode(dag.Page, "Contact Form", "ContactForm", &ContactFormNode{}, true)
+ flow.AddNode(dag.Page, "Contact Form", "ContactForm", &ConfigurableFormNode{Schema: contactFormSchema, HTMLLayout: contactFormLayout}, true)
flow.AddNode(dag.Function, "Validate Contact Data", "ValidateContact", &ValidateContactNode{})
flow.AddNode(dag.Function, "Check User Type", "CheckUserType", &CheckUserTypeNode{})
flow.AddNode(dag.Function, "Send Welcome Email", "SendWelcomeEmail", &SendWelcomeEmailNode{})
@@ -67,243 +208,51 @@ func main() {
flow.Start(context.Background(), "0.0.0.0:8084")
}
-// ContactFormNode - Contact form with validation
-type ContactFormNode struct {
+// ConfigurableFormNode - Page node with JSONSchema-based fields and custom HTML layout
+// Usage: Pass JSONSchema and HTML layout to the node for dynamic form rendering and validation
+
+type ConfigurableFormNode struct {
dag.Operation
+ Schema map[string]any // JSONSchema for fields and requirements
+ HTMLLayout string // HTML layout template with placeholders for fields
+ fieldsCache []fieldInfo // Cached field order and definitions
+ cacheInitialized bool // Whether cache is initialized
}
-func (c *ContactFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
- // Check if this is a form submission
+// fieldInfo caches field metadata for rendering
+type fieldInfo struct {
+ name string
+ order int
+ def map[string]any
+ definedIndex int // fallback to definition order
+}
+
+func (c *ConfigurableFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var inputData map[string]any
if task.Payload != nil && len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &inputData); err == nil {
- // If we have valid input data, pass it through for validation
+ // Validate input against schema requirements
+ validationErrors := validateAgainstSchema(inputData, c.Schema)
+ if len(validationErrors) > 0 {
+ inputData["validation_error"] = validationErrors[0] // Show first error
+ bt, _ := json.Marshal(inputData)
+ return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"}
+ }
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
}
- // Otherwise, show the form
- htmlTemplate := `
-
-
-
- Contact Us - Email Notification System
-
-
-
-
-
-`
+ // Initialize cache if not done
+ if !c.cacheInitialized {
+ c.fieldsCache = parseFieldsFromSchema(c.Schema)
+ c.cacheInitialized = true
+ }
+ // Render form fields from cached field order
+ formFieldsHTML := renderFieldsFromCache(c.fieldsCache)
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
- rs, err := parser.ParseTemplate(htmlTemplate, map[string]any{
+ layout := strings.Replace(c.HTMLLayout, "{{form_fields}}", formFieldsHTML, 1)
+ rs, err := parser.ParseTemplate(layout, map[string]any{
"task_id": ctx.Value("task_id"),
})
if err != nil {
@@ -319,6 +268,119 @@ func (c *ContactFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Res
return mq.Result{Payload: bt, Ctx: ctx}
}
+// validateAgainstSchema checks inputData against JSONSchema requirements
+func validateAgainstSchema(inputData map[string]any, schema map[string]any) []string {
+ var errors []string
+ if _, ok := schema["properties"].(map[string]any); ok {
+ if required, ok := schema["required"].([]any); ok {
+ for _, field := range required {
+ fname := field.(string)
+ if val, exists := inputData[fname]; !exists || val == "" {
+ errors = append(errors, fname+" is required")
+ }
+ }
+ }
+ // Add more validation as needed (type, format, etc.)
+ }
+ return errors
+}
+
+// parseFieldsFromSchema extracts and sorts fields from schema, preserving order
+func parseFieldsFromSchema(schema map[string]any) []fieldInfo {
+ var fields []fieldInfo
+ if props, ok := schema["properties"].(map[string]any); ok {
+ keyOrder := make([]string, 0, len(props))
+ for k := range props {
+ keyOrder = append(keyOrder, k)
+ }
+ for idx, name := range keyOrder {
+ field := props[name].(map[string]any)
+ order := -1
+ if o, ok := field["order"].(int); ok {
+ order = o
+ } else if o, ok := field["order"].(float64); ok {
+ order = int(o)
+ }
+ fields = append(fields, fieldInfo{name: name, order: order, def: field, definedIndex: idx})
+ }
+ if len(fields) > 1 {
+ sort.SliceStable(fields, func(i, j int) bool {
+ if fields[i].order != -1 && fields[j].order != -1 {
+ return fields[i].order < fields[j].order
+ } else if fields[i].order != -1 {
+ return true
+ } else if fields[j].order != -1 {
+ return false
+ }
+ return fields[i].definedIndex < fields[j].definedIndex
+ })
+ }
+ }
+ return fields
+}
+
+// renderFieldsFromCache generates HTML for form fields from cached field order
+func renderFieldsFromCache(fields []fieldInfo) string {
+ var html strings.Builder
+ for _, f := range fields {
+ label := f.name
+ if l, ok := f.def["title"].(string); ok {
+ label = l
+ }
+ // UI config
+ ui := map[string]any{}
+ if uiRaw, ok := f.def["ui"].(map[string]any); ok {
+ ui = uiRaw
+ }
+ // Control type
+ controlType := "input"
+ if ct, ok := ui["control"].(string); ok {
+ controlType = ct
+ }
+ // CSS classes
+ classes := "form-group"
+ if cls, ok := ui["class"].(string); ok {
+ classes = cls
+ }
+ // Name attribute
+ nameAttr := f.name
+ if n, ok := ui["name"].(string); ok {
+ nameAttr = n
+ }
+ // Type
+ typeStr := "text"
+ if t, ok := f.def["type"].(string); ok {
+ switch t {
+ case "string":
+ typeStr = "text"
+ case "email":
+ typeStr = "email"
+ case "number":
+ typeStr = "number"
+ case "textarea":
+ typeStr = "textarea"
+ }
+ }
+ // Render control
+ if controlType == "textarea" || typeStr == "textarea" {
+ html.WriteString(fmt.Sprintf(``, classes, nameAttr, label, nameAttr, nameAttr, label))
+ } else if controlType == "select" {
+ // Optionally support select with options in ui["options"]
+ optionsHTML := ""
+ if opts, ok := ui["options"].([]any); ok {
+ for _, opt := range opts {
+ optStr := fmt.Sprintf("%v", opt)
+ optionsHTML += fmt.Sprintf(``, optStr, optStr)
+ }
+ }
+ html.WriteString(fmt.Sprintf(``, classes, nameAttr, label, nameAttr, nameAttr, optionsHTML))
+ } else {
+ html.WriteString(fmt.Sprintf(``, classes, nameAttr, label, typeStr, nameAttr, nameAttr, label))
+ }
+ }
+ return html.String()
+}
+
// ValidateContactNode - Validates contact form data
type ValidateContactNode struct {
dag.Operation
diff --git a/go.mod b/go.mod
index ab50191..ee309d9 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@ require (
github.com/oarkflow/jet v0.0.4
github.com/oarkflow/json v0.0.21
github.com/oarkflow/log v1.0.79
- github.com/oarkflow/xid v1.2.5
+ github.com/oarkflow/xid v1.2.8
github.com/prometheus/client_golang v1.21.1
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394
golang.org/x/time v0.11.0
diff --git a/go.sum b/go.sum
index 4830dda..2706121 100644
--- a/go.sum
+++ b/go.sum
@@ -44,8 +44,8 @@ github.com/oarkflow/json v0.0.21 h1:tBx4ufwC48UAd3fUCqLVH/dERpnZ85Dgw5/h7H2HMoM=
github.com/oarkflow/json v0.0.21/go.mod h1:maoLmQZJ/8pF1MugtpVqzHJ59dH1Z7xFSNkhl9BQjYo=
github.com/oarkflow/log v1.0.79 h1:DxhtkBGG+pUu6cudSVw5g75FbKEQJkij5w7n5AEN00M=
github.com/oarkflow/log v1.0.79/go.mod h1:U/4chr1DyOiQvS6JiQpjYTCJhK7RGR8xrXPsGlouLzM=
-github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho=
-github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo=
+github.com/oarkflow/xid v1.2.8 h1:uCIX61Binq2RPMsqImZM6pPGzoZTmRyD6jguxF9aAA0=
+github.com/oarkflow/xid v1.2.8/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk=
diff --git a/scheduler.go b/scheduler.go
index 5beda7c..5e13f43 100644
--- a/scheduler.go
+++ b/scheduler.go
@@ -12,7 +12,6 @@ import (
"time"
"github.com/oarkflow/log"
- "github.com/oarkflow/xid"
"github.com/oarkflow/mq/storage"
"github.com/oarkflow/mq/storage/memory"
@@ -585,7 +584,7 @@ func (s *Scheduler) AddTask(ctx context.Context, payload *Task, opts ...Schedule
}
stop := make(chan struct{})
newTask := &ScheduledTask{
- id: xid.New().String(),
+ id: NewID(),
ctx: ctx,
handler: options.Handler,
payload: payload,