diff --git a/examples/email_notification_dag.go b/examples/email_notification_dag.go index c38dccd..107c4e3 100644 --- a/examples/email_notification_dag.go +++ b/examples/email_notification_dag.go @@ -1,10 +1,8 @@ package main import ( - "bytes" "context" "fmt" - "html/template" "os" "regexp" "strings" @@ -13,7 +11,7 @@ import ( "github.com/oarkflow/json" "github.com/oarkflow/mq/dag" - "github.com/oarkflow/mq/renderer" + "github.com/oarkflow/mq/handlers" "github.com/oarkflow/mq/utils" "github.com/oarkflow/jet" @@ -23,17 +21,16 @@ import ( ) func main() { - renderer, err := renderer.GetFromFile("schema.json", "") - if err != nil { - panic(err) - } flow := dag.NewDAG("Email Notification System", "email-notification", func(taskID string, result mq.Result) { fmt.Printf("Email notification workflow completed for task %s: %s\n", taskID, string(utils.RemoveRecursiveFromJSON(result.Payload, "html_content"))) }, mq.WithSyncMode(true)) - // Add workflow nodes + renderHTML := handlers.NewRenderHTMLNode("render-html") + renderHTML.Payload.Data = map[string]any{ + "schema_file": "schema.json", + } // Add workflow nodes // Note: Page nodes have no timeout by default, allowing users unlimited time for form input - flow.AddNode(dag.Page, "Contact Form", "ContactForm", &RenderHTMLNode{renderer: renderer}, true) + flow.AddNode(dag.Page, "Contact Form", "ContactForm", renderHTML, true) flow.AddNode(dag.Function, "Validate Contact Data", "ValidateContact", &ValidateContactNode{}) flow.AddNode(dag.Function, "Check User Type", "CheckUserType", &CheckUserTypeNode{}) flow.AddNode(dag.Function, "Send Welcome Email", "SendWelcomeEmail", &SendWelcomeEmailNode{}) @@ -79,109 +76,6 @@ func main() { // RenderHTMLNode - Page node with JSONSchema-based fields and custom HTML layout // Usage: Pass JSONSchema and HTML layout to the node for dynamic form rendering and validation -type RenderHTMLNode struct { - dag.Operation - renderer *renderer.JSONSchemaRenderer -} - -func (c *RenderHTMLNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - data := c.Payload.Data - var ( - schemaFile, _ = data["schema_file"].(string) - templateStr, _ = data["template"].(string) - templateFile, _ = data["template_file"].(string) - ) - - var templateData map[string]any - if len(task.Payload) > 0 { - if err := json.Unmarshal(task.Payload, &templateData); err != nil { - return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"} - } - } - if templateData == nil { - templateData = make(map[string]any) - } - templateData["task_id"] = ctx.Value("task_id") - - var renderedHTML string - var err error - - switch { - // 1. JSONSchema + HTML Template - case schemaFile != "" && templateStr != "": - if c.renderer == nil { - renderer, err := renderer.GetFromFile(schemaFile, templateStr) - if err != nil { - return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx} - } - c.renderer = renderer - } - renderedHTML, err = c.renderer.RenderFields(templateData) - // 2. JSONSchema + HTML File - case schemaFile != "" && templateFile != "": - if c.renderer == nil { - renderer, err := renderer.GetFromFile(schemaFile, "", templateFile) - if err != nil { - return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx} - } - c.renderer = renderer - } - renderedHTML, err = c.renderer.RenderFields(templateData) - // 3. Only JSONSchema - case schemaFile != "" || c.renderer != nil: - if c.renderer == nil { - renderer, err := renderer.GetFromFile(schemaFile, "") - if err != nil { - return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx} - } - c.renderer = renderer - } - renderedHTML, err = c.renderer.RenderFields(templateData) - // 4. Only HTML Template - case templateStr != "": - tmpl, err := template.New("inline").Parse(templateStr) - if err != nil { - return mq.Result{Error: fmt.Errorf("failed to parse template: %v", err), Ctx: ctx} - } - var buf bytes.Buffer - err = tmpl.Execute(&buf, templateData) - if err != nil { - return mq.Result{Error: fmt.Errorf("failed to execute template: %v", err), Ctx: ctx} - } - renderedHTML = buf.String() - // 5. Only HTML File - case templateFile != "": - fileContent, err := os.ReadFile(templateFile) - if err != nil { - return mq.Result{Error: fmt.Errorf("failed to read template file: %v", err), Ctx: ctx} - } - tmpl, err := template.New("file").Parse(string(fileContent)) - if err != nil { - return mq.Result{Error: fmt.Errorf("failed to parse template file: %v", err), Ctx: ctx} - } - var buf bytes.Buffer - err = tmpl.Execute(&buf, templateData) - if err != nil { - return mq.Result{Error: fmt.Errorf("failed to execute template file: %v", err), Ctx: ctx} - } - renderedHTML = buf.String() - default: - return mq.Result{Error: fmt.Errorf("no valid rendering approach found"), Ctx: ctx} - } - - if err != nil { - return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"} - } - - ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) - resultData := map[string]any{ - "html_content": renderedHTML, - "step": "form", - } - bt, _ := json.Marshal(resultData) - return mq.Result{Payload: bt, Ctx: ctx} -} - // ValidateContactNode - Validates contact form data type ValidateContactNode struct { dag.Operation diff --git a/handlers/common_handler.go b/handlers/common_handler.go new file mode 100644 index 0000000..a616e96 --- /dev/null +++ b/handlers/common_handler.go @@ -0,0 +1,63 @@ +package handlers + +import ( + "context" + "fmt" + + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +type Loop struct { + dag.Operation +} + +func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + fmt.Println("Loop Data") + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +func NewLoop(id string) *Loop { + return &Loop{ + Operation: dag.Operation{ID: id, Key: "loop", Type: dag.Function, Tags: []string{"built-in"}}, + } +} + +var defaultKey = "default" + +type Condition struct { + dag.Operation + conditions map[string]dag.Condition +} + +func (e *Condition) SetConditions(conditions map[string]dag.Condition) { + e.conditions = conditions +} + +func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + var conditionStatus string + _, ok := e.conditions[defaultKey] + for status, condition := range e.conditions { + if status != defaultKey { + if condition.Match(data) { + conditionStatus = status + } + } + } + if conditionStatus == "" && ok { + conditionStatus = defaultKey + } + return mq.Result{Payload: task.Payload, ConditionStatus: conditionStatus, Ctx: ctx} +} + +func NewCondition(id string) *Condition { + return &Condition{ + Operation: dag.Operation{ID: id, Key: "condition", Type: dag.Function, Tags: []string{"built-in"}}, + } +} diff --git a/handlers/html_handler.go b/handlers/html_handler.go new file mode 100644 index 0000000..f31dd28 --- /dev/null +++ b/handlers/html_handler.go @@ -0,0 +1,122 @@ +package handlers + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "html/template" + "os" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/renderer" +) + +type RenderHTMLNode struct { + dag.Operation + renderer *renderer.JSONSchemaRenderer +} + +func (c *RenderHTMLNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + data := c.Payload.Data + var ( + schemaFile, _ = data["schema_file"].(string) + templateStr, _ = data["template"].(string) + templateFile, _ = data["template_file"].(string) + ) + + var templateData map[string]any + if len(task.Payload) > 0 { + if err := json.Unmarshal(task.Payload, &templateData); err != nil { + return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"} + } + } + if templateData == nil { + templateData = make(map[string]any) + } + templateData["task_id"] = ctx.Value("task_id") + + var renderedHTML string + var err error + + switch { + // 1. JSONSchema + HTML Template + case schemaFile != "" && templateStr != "": + if c.renderer == nil { + renderer, err := renderer.GetFromFile(schemaFile, templateStr) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx} + } + c.renderer = renderer + } + renderedHTML, err = c.renderer.RenderFields(templateData) + // 2. JSONSchema + HTML File + case schemaFile != "" && templateFile != "": + if c.renderer == nil { + renderer, err := renderer.GetFromFile(schemaFile, "", templateFile) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx} + } + c.renderer = renderer + } + renderedHTML, err = c.renderer.RenderFields(templateData) + // 3. Only JSONSchema + case schemaFile != "" || c.renderer != nil: + if c.renderer == nil { + renderer, err := renderer.GetFromFile(schemaFile, "") + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx} + } + c.renderer = renderer + } + renderedHTML, err = c.renderer.RenderFields(templateData) + // 4. Only HTML Template + case templateStr != "": + tmpl, err := template.New("inline").Parse(templateStr) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to parse template: %v", err), Ctx: ctx} + } + var buf bytes.Buffer + err = tmpl.Execute(&buf, templateData) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to execute template: %v", err), Ctx: ctx} + } + renderedHTML = buf.String() + // 5. Only HTML File + case templateFile != "": + fileContent, err := os.ReadFile(templateFile) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to read template file: %v", err), Ctx: ctx} + } + tmpl, err := template.New("file").Parse(string(fileContent)) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to parse template file: %v", err), Ctx: ctx} + } + var buf bytes.Buffer + err = tmpl.Execute(&buf, templateData) + if err != nil { + return mq.Result{Error: fmt.Errorf("failed to execute template file: %v", err), Ctx: ctx} + } + renderedHTML = buf.String() + default: + return mq.Result{Error: fmt.Errorf("no valid rendering approach found"), Ctx: ctx} + } + + if err != nil { + return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"} + } + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + resultData := map[string]any{ + "html_content": renderedHTML, + "step": "form", + } + bt, _ := json.Marshal(resultData) + return mq.Result{Payload: bt, Ctx: ctx} +} + +func NewRenderHTMLNode(id string) *RenderHTMLNode { + return &RenderHTMLNode{Operation: dag.Operation{Key: "render-html", ID: id, Type: dag.Page, Tags: []string{"built-in"}}} +} diff --git a/handlers/log_handler.go b/handlers/log_handler.go new file mode 100644 index 0000000..00278f1 --- /dev/null +++ b/handlers/log_handler.go @@ -0,0 +1,61 @@ +package handlers + +import ( + "context" + "fmt" + "github.com/oarkflow/json" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" + + "github.com/oarkflow/log" +) + +type LogHandler struct { + dag.Operation +} + +func (p *LogHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var row map[string]any + data := task.Payload + if data != nil { + err := json.Unmarshal(data, &row) + if err != nil { + return mq.Result{ + Ctx: ctx, + Error: err, + } + } + } + var msg string + toReturn := make(map[string]any) + for k, v := range p.Payload.Mapping { + _, val := dag.GetVal(ctx, v, row) + toReturn[k] = val + } + if val, exist := p.Payload.Data["message"]; exist { + _, v := dag.GetVal(ctx, val.(string), toReturn) + if v != nil { + msg = v.(string) + } else { + msg = val.(string) + } + } + logger := log.Info() + if len(toReturn) > 0 { + for k, v := range toReturn { + logger = logger.Any(k, v) + } + } + logger.Msg(msg) + if _, exist := p.Payload.Data["print"]; exist { + fmt.Println(toReturn, msg) + } + return mq.Result{ + Ctx: ctx, + Payload: task.Payload, + } +} + +func NewLogHandler(id string) *LogHandler { + return &LogHandler{Operation: dag.Operation{Key: "log", ID: id, Type: dag.Function, Tags: []string{"built-in"}}} +} diff --git a/handlers/print_handler.go b/handlers/print_handler.go new file mode 100644 index 0000000..dc56a7a --- /dev/null +++ b/handlers/print_handler.go @@ -0,0 +1,23 @@ +package handlers + +import ( + "context" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +type PrintHandler struct { + dag.Operation +} + +func (e *PrintHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + e.Debug(ctx, task) + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +func NewPrintHandler(id string) *PrintHandler { + return &PrintHandler{ + Operation: dag.Operation{ID: id, Key: "print", Type: dag.Function}, + } +} diff --git a/handlers/start_handler.go b/handlers/start_handler.go new file mode 100644 index 0000000..cbf6842 --- /dev/null +++ b/handlers/start_handler.go @@ -0,0 +1,22 @@ +package handlers + +import ( + "context" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +type StartHandler struct { + dag.Operation +} + +func (e *StartHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +func NewStartHandler(id string) *StartHandler { + return &StartHandler{ + Operation: dag.Operation{ID: id, Key: "start", Type: dag.Function, Tags: []string{"built-in"}}, + } +}