mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-26 15:50:22 +08:00
update
This commit is contained in:
@@ -5,13 +5,13 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/oarkflow/json"
|
||||
|
||||
"github.com/oarkflow/mq/dag"
|
||||
"github.com/oarkflow/mq/renderer"
|
||||
"github.com/oarkflow/mq/utils"
|
||||
|
||||
"github.com/oarkflow/jet"
|
||||
@@ -21,27 +21,17 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
var contactFormSchema = map[string]any{}
|
||||
content, err := os.ReadFile("app/schema.json")
|
||||
renderer, err := renderer.GetFromFile("schema.json", "")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := json.Unmarshal(content, &contactFormSchema); err != nil {
|
||||
panic(fmt.Errorf("failed to parse JSON schema: %w", err))
|
||||
}
|
||||
|
||||
contactFormLayout, err := os.ReadFile("email/contact-form.html")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
flow := dag.NewDAG("Email Notification System", "email-notification", func(taskID string, result mq.Result) {
|
||||
fmt.Printf("Email notification workflow completed for task %s: %s\n", taskID, string(utils.RemoveRecursiveFromJSON(result.Payload, "html_content")))
|
||||
}, mq.WithSyncMode(true))
|
||||
|
||||
// Add workflow nodes
|
||||
// Note: Page nodes have no timeout by default, allowing users unlimited time for form input
|
||||
flow.AddNode(dag.Page, "Contact Form", "ContactForm", &ConfigurableFormNode{Schema: contactFormSchema, HTMLLayout: string(contactFormLayout)}, true)
|
||||
flow.AddNode(dag.Page, "Contact Form", "ContactForm", &JSONSchemaFormNode{renderer: renderer}, true)
|
||||
flow.AddNode(dag.Function, "Validate Contact Data", "ValidateContact", &ValidateContactNode{})
|
||||
flow.AddNode(dag.Function, "Check User Type", "CheckUserType", &CheckUserTypeNode{})
|
||||
flow.AddNode(dag.Function, "Send Welcome Email", "SendWelcomeEmail", &SendWelcomeEmailNode{})
|
||||
@@ -84,179 +74,52 @@ func main() {
|
||||
flow.Start(context.Background(), "0.0.0.0:8084")
|
||||
}
|
||||
|
||||
// ConfigurableFormNode - Page node with JSONSchema-based fields and custom HTML layout
|
||||
// JSONSchemaFormNode - Page node with JSONSchema-based fields and custom HTML layout
|
||||
// Usage: Pass JSONSchema and HTML layout to the node for dynamic form rendering and validation
|
||||
|
||||
type ConfigurableFormNode struct {
|
||||
type JSONSchemaFormNode struct {
|
||||
dag.Operation
|
||||
Schema map[string]any // JSONSchema for fields and requirements
|
||||
HTMLLayout string // HTML layout template with placeholders for fields
|
||||
fieldsCache []fieldInfo // Cached field order and definitions
|
||||
cacheInitialized bool // Whether cache is initialized
|
||||
renderer *renderer.JSONSchemaRenderer
|
||||
}
|
||||
|
||||
// fieldInfo caches field metadata for rendering
|
||||
type fieldInfo struct {
|
||||
name string
|
||||
order int
|
||||
def map[string]any
|
||||
definedIndex int // fallback to definition order
|
||||
}
|
||||
func (c *JSONSchemaFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
if c.renderer == nil {
|
||||
schemaFile, ok := c.Payload.Data["schema_file"].(string)
|
||||
if !ok || schemaFile == "" {
|
||||
return mq.Result{Error: fmt.Errorf("schema_file is required for JSONSchemaFormNode"), Ctx: ctx}
|
||||
}
|
||||
templateFile, _ := c.Payload.Data["template_file"].(string)
|
||||
template, _ := c.Payload.Data["template"].(string)
|
||||
|
||||
func (c *ConfigurableFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
renderer, err := renderer.GetFromFile(schemaFile, template, templateFile)
|
||||
if err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx}
|
||||
}
|
||||
c.renderer = renderer
|
||||
}
|
||||
var inputData map[string]any
|
||||
if task.Payload != nil && len(task.Payload) > 0 {
|
||||
if err := json.Unmarshal(task.Payload, &inputData); err == nil {
|
||||
// Validate input against schema requirements
|
||||
validationErrors := validateAgainstSchema(inputData, c.Schema)
|
||||
if len(validationErrors) > 0 {
|
||||
inputData["validation_error"] = validationErrors[0] // Show first error
|
||||
bt, _ := json.Marshal(inputData)
|
||||
return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "invalid"}
|
||||
}
|
||||
return mq.Result{Payload: task.Payload, Ctx: ctx}
|
||||
if len(task.Payload) > 0 {
|
||||
if err := json.Unmarshal(task.Payload, &inputData); err != nil {
|
||||
return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"}
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize cache if not done
|
||||
if !c.cacheInitialized {
|
||||
c.fieldsCache = parseFieldsFromSchema(c.Schema)
|
||||
c.cacheInitialized = true
|
||||
}
|
||||
|
||||
// Render form fields from cached field order
|
||||
formFieldsHTML := renderFieldsFromCache(c.fieldsCache)
|
||||
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
|
||||
layout := strings.Replace(c.HTMLLayout, "{{form_fields}}", formFieldsHTML, 1)
|
||||
rs, err := parser.ParseTemplate(layout, map[string]any{
|
||||
templateData := map[string]any{
|
||||
"task_id": ctx.Value("task_id"),
|
||||
})
|
||||
}
|
||||
renderedHTML, err := c.renderer.RenderFields(templateData)
|
||||
if err != nil {
|
||||
return mq.Result{Error: err, Ctx: ctx}
|
||||
return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"}
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
|
||||
data := map[string]any{
|
||||
"html_content": rs,
|
||||
"html_content": renderedHTML,
|
||||
"step": "form",
|
||||
}
|
||||
bt, _ := json.Marshal(data)
|
||||
return mq.Result{Payload: bt, Ctx: ctx}
|
||||
}
|
||||
|
||||
// validateAgainstSchema checks inputData against JSONSchema requirements
|
||||
func validateAgainstSchema(inputData map[string]any, schema map[string]any) []string {
|
||||
var errors []string
|
||||
if _, ok := schema["properties"].(map[string]any); ok {
|
||||
if required, ok := schema["required"].([]any); ok {
|
||||
for _, field := range required {
|
||||
fname := field.(string)
|
||||
if val, exists := inputData[fname]; !exists || val == "" {
|
||||
errors = append(errors, fname+" is required")
|
||||
}
|
||||
}
|
||||
}
|
||||
// Add more validation as needed (type, format, etc.)
|
||||
}
|
||||
return errors
|
||||
}
|
||||
|
||||
// parseFieldsFromSchema extracts and sorts fields from schema, preserving order
|
||||
func parseFieldsFromSchema(schema map[string]any) []fieldInfo {
|
||||
var fields []fieldInfo
|
||||
if props, ok := schema["properties"].(map[string]any); ok {
|
||||
keyOrder := make([]string, 0, len(props))
|
||||
for k := range props {
|
||||
keyOrder = append(keyOrder, k)
|
||||
}
|
||||
for idx, name := range keyOrder {
|
||||
field := props[name].(map[string]any)
|
||||
order := -1
|
||||
if o, ok := field["order"].(int); ok {
|
||||
order = o
|
||||
} else if o, ok := field["order"].(float64); ok {
|
||||
order = int(o)
|
||||
}
|
||||
fields = append(fields, fieldInfo{name: name, order: order, def: field, definedIndex: idx})
|
||||
}
|
||||
if len(fields) > 1 {
|
||||
sort.SliceStable(fields, func(i, j int) bool {
|
||||
if fields[i].order != -1 && fields[j].order != -1 {
|
||||
return fields[i].order < fields[j].order
|
||||
} else if fields[i].order != -1 {
|
||||
return true
|
||||
} else if fields[j].order != -1 {
|
||||
return false
|
||||
}
|
||||
return fields[i].definedIndex < fields[j].definedIndex
|
||||
})
|
||||
}
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
// renderFieldsFromCache generates HTML for form fields from cached field order
|
||||
func renderFieldsFromCache(fields []fieldInfo) string {
|
||||
var html strings.Builder
|
||||
for _, f := range fields {
|
||||
label := f.name
|
||||
if l, ok := f.def["title"].(string); ok {
|
||||
label = l
|
||||
}
|
||||
// UI config
|
||||
ui := map[string]any{}
|
||||
if uiRaw, ok := f.def["ui"].(map[string]any); ok {
|
||||
ui = uiRaw
|
||||
}
|
||||
// Control type
|
||||
controlType := "input"
|
||||
if ct, ok := ui["control"].(string); ok {
|
||||
controlType = ct
|
||||
}
|
||||
// CSS classes
|
||||
classes := "form-group"
|
||||
if cls, ok := ui["class"].(string); ok {
|
||||
classes = cls
|
||||
}
|
||||
// Name attribute
|
||||
nameAttr := f.name
|
||||
if n, ok := ui["name"].(string); ok {
|
||||
nameAttr = n
|
||||
}
|
||||
// Type
|
||||
typeStr := "text"
|
||||
if t, ok := f.def["type"].(string); ok {
|
||||
switch t {
|
||||
case "string":
|
||||
typeStr = "text"
|
||||
case "email":
|
||||
typeStr = "email"
|
||||
case "number":
|
||||
typeStr = "number"
|
||||
case "textarea":
|
||||
typeStr = "textarea"
|
||||
}
|
||||
}
|
||||
// Render control
|
||||
if controlType == "textarea" || typeStr == "textarea" {
|
||||
html.WriteString(fmt.Sprintf(`<div class="%s"><label for="%s">%s:</label><textarea id="%s" name="%s" placeholder="%s"></textarea></div>`, classes, nameAttr, label, nameAttr, nameAttr, label))
|
||||
} else if controlType == "select" {
|
||||
// Optionally support select with options in ui["options"]
|
||||
optionsHTML := ""
|
||||
if opts, ok := ui["options"].([]any); ok {
|
||||
for _, opt := range opts {
|
||||
optStr := fmt.Sprintf("%v", opt)
|
||||
optionsHTML += fmt.Sprintf(`<option value="%s">%s</option>`, optStr, optStr)
|
||||
}
|
||||
}
|
||||
html.WriteString(fmt.Sprintf(`<div class="%s"><label for="%s">%s:</label><select id="%s" name="%s">%s</select></div>`, classes, nameAttr, label, nameAttr, nameAttr, optionsHTML))
|
||||
} else {
|
||||
html.WriteString(fmt.Sprintf(`<div class="%s"><label for="%s">%s:</label><input type="%s" id="%s" name="%s" placeholder="%s"></div>`, classes, nameAttr, label, typeStr, nameAttr, nameAttr, label))
|
||||
}
|
||||
}
|
||||
return html.String()
|
||||
}
|
||||
|
||||
// ValidateContactNode - Validates contact form data
|
||||
type ValidateContactNode struct {
|
||||
dag.Operation
|
||||
|
||||
Reference in New Issue
Block a user