feat: implement handlers

This commit is contained in:
sujit
2025-08-05 15:04:30 +05:45
parent 022002b326
commit cef019c486
6 changed files with 297 additions and 112 deletions

View File

@@ -1,10 +1,8 @@
package main
import (
"bytes"
"context"
"fmt"
"html/template"
"os"
"regexp"
"strings"
@@ -13,7 +11,7 @@ import (
"github.com/oarkflow/json"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/renderer"
"github.com/oarkflow/mq/handlers"
"github.com/oarkflow/mq/utils"
"github.com/oarkflow/jet"
@@ -23,17 +21,16 @@ import (
)
func main() {
renderer, err := renderer.GetFromFile("schema.json", "")
if err != nil {
panic(err)
}
flow := dag.NewDAG("Email Notification System", "email-notification", func(taskID string, result mq.Result) {
fmt.Printf("Email notification workflow completed for task %s: %s\n", taskID, string(utils.RemoveRecursiveFromJSON(result.Payload, "html_content")))
}, mq.WithSyncMode(true))
// Add workflow nodes
renderHTML := handlers.NewRenderHTMLNode("render-html")
renderHTML.Payload.Data = map[string]any{
"schema_file": "schema.json",
} // Add workflow nodes
// Note: Page nodes have no timeout by default, allowing users unlimited time for form input
flow.AddNode(dag.Page, "Contact Form", "ContactForm", &RenderHTMLNode{renderer: renderer}, true)
flow.AddNode(dag.Page, "Contact Form", "ContactForm", renderHTML, true)
flow.AddNode(dag.Function, "Validate Contact Data", "ValidateContact", &ValidateContactNode{})
flow.AddNode(dag.Function, "Check User Type", "CheckUserType", &CheckUserTypeNode{})
flow.AddNode(dag.Function, "Send Welcome Email", "SendWelcomeEmail", &SendWelcomeEmailNode{})
@@ -79,109 +76,6 @@ func main() {
// RenderHTMLNode - Page node with JSONSchema-based fields and custom HTML layout
// Usage: Pass JSONSchema and HTML layout to the node for dynamic form rendering and validation
type RenderHTMLNode struct {
dag.Operation
renderer *renderer.JSONSchemaRenderer
}
func (c *RenderHTMLNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
data := c.Payload.Data
var (
schemaFile, _ = data["schema_file"].(string)
templateStr, _ = data["template"].(string)
templateFile, _ = data["template_file"].(string)
)
var templateData map[string]any
if len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &templateData); err != nil {
return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"}
}
}
if templateData == nil {
templateData = make(map[string]any)
}
templateData["task_id"] = ctx.Value("task_id")
var renderedHTML string
var err error
switch {
// 1. JSONSchema + HTML Template
case schemaFile != "" && templateStr != "":
if c.renderer == nil {
renderer, err := renderer.GetFromFile(schemaFile, templateStr)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx}
}
c.renderer = renderer
}
renderedHTML, err = c.renderer.RenderFields(templateData)
// 2. JSONSchema + HTML File
case schemaFile != "" && templateFile != "":
if c.renderer == nil {
renderer, err := renderer.GetFromFile(schemaFile, "", templateFile)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx}
}
c.renderer = renderer
}
renderedHTML, err = c.renderer.RenderFields(templateData)
// 3. Only JSONSchema
case schemaFile != "" || c.renderer != nil:
if c.renderer == nil {
renderer, err := renderer.GetFromFile(schemaFile, "")
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx}
}
c.renderer = renderer
}
renderedHTML, err = c.renderer.RenderFields(templateData)
// 4. Only HTML Template
case templateStr != "":
tmpl, err := template.New("inline").Parse(templateStr)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to parse template: %v", err), Ctx: ctx}
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, templateData)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to execute template: %v", err), Ctx: ctx}
}
renderedHTML = buf.String()
// 5. Only HTML File
case templateFile != "":
fileContent, err := os.ReadFile(templateFile)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to read template file: %v", err), Ctx: ctx}
}
tmpl, err := template.New("file").Parse(string(fileContent))
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to parse template file: %v", err), Ctx: ctx}
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, templateData)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to execute template file: %v", err), Ctx: ctx}
}
renderedHTML = buf.String()
default:
return mq.Result{Error: fmt.Errorf("no valid rendering approach found"), Ctx: ctx}
}
if err != nil {
return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
resultData := map[string]any{
"html_content": renderedHTML,
"step": "form",
}
bt, _ := json.Marshal(resultData)
return mq.Result{Payload: bt, Ctx: ctx}
}
// ValidateContactNode - Validates contact form data
type ValidateContactNode struct {
dag.Operation

View File

@@ -0,0 +1,63 @@
package handlers
import (
"context"
"fmt"
"github.com/oarkflow/json"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
type Loop struct {
dag.Operation
}
func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Println("Loop Data")
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
func NewLoop(id string) *Loop {
return &Loop{
Operation: dag.Operation{ID: id, Key: "loop", Type: dag.Function, Tags: []string{"built-in"}},
}
}
var defaultKey = "default"
type Condition struct {
dag.Operation
conditions map[string]dag.Condition
}
func (e *Condition) SetConditions(conditions map[string]dag.Condition) {
e.conditions = conditions
}
func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
var conditionStatus string
_, ok := e.conditions[defaultKey]
for status, condition := range e.conditions {
if status != defaultKey {
if condition.Match(data) {
conditionStatus = status
}
}
}
if conditionStatus == "" && ok {
conditionStatus = defaultKey
}
return mq.Result{Payload: task.Payload, ConditionStatus: conditionStatus, Ctx: ctx}
}
func NewCondition(id string) *Condition {
return &Condition{
Operation: dag.Operation{ID: id, Key: "condition", Type: dag.Function, Tags: []string{"built-in"}},
}
}

122
handlers/html_handler.go Normal file
View File

@@ -0,0 +1,122 @@
package handlers
import (
"bytes"
"context"
"encoding/json"
"fmt"
"html/template"
"os"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/renderer"
)
type RenderHTMLNode struct {
dag.Operation
renderer *renderer.JSONSchemaRenderer
}
func (c *RenderHTMLNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
data := c.Payload.Data
var (
schemaFile, _ = data["schema_file"].(string)
templateStr, _ = data["template"].(string)
templateFile, _ = data["template_file"].(string)
)
var templateData map[string]any
if len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &templateData); err != nil {
return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"}
}
}
if templateData == nil {
templateData = make(map[string]any)
}
templateData["task_id"] = ctx.Value("task_id")
var renderedHTML string
var err error
switch {
// 1. JSONSchema + HTML Template
case schemaFile != "" && templateStr != "":
if c.renderer == nil {
renderer, err := renderer.GetFromFile(schemaFile, templateStr)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx}
}
c.renderer = renderer
}
renderedHTML, err = c.renderer.RenderFields(templateData)
// 2. JSONSchema + HTML File
case schemaFile != "" && templateFile != "":
if c.renderer == nil {
renderer, err := renderer.GetFromFile(schemaFile, "", templateFile)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx}
}
c.renderer = renderer
}
renderedHTML, err = c.renderer.RenderFields(templateData)
// 3. Only JSONSchema
case schemaFile != "" || c.renderer != nil:
if c.renderer == nil {
renderer, err := renderer.GetFromFile(schemaFile, "")
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to get renderer from file %s: %v", schemaFile, err), Ctx: ctx}
}
c.renderer = renderer
}
renderedHTML, err = c.renderer.RenderFields(templateData)
// 4. Only HTML Template
case templateStr != "":
tmpl, err := template.New("inline").Parse(templateStr)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to parse template: %v", err), Ctx: ctx}
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, templateData)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to execute template: %v", err), Ctx: ctx}
}
renderedHTML = buf.String()
// 5. Only HTML File
case templateFile != "":
fileContent, err := os.ReadFile(templateFile)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to read template file: %v", err), Ctx: ctx}
}
tmpl, err := template.New("file").Parse(string(fileContent))
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to parse template file: %v", err), Ctx: ctx}
}
var buf bytes.Buffer
err = tmpl.Execute(&buf, templateData)
if err != nil {
return mq.Result{Error: fmt.Errorf("failed to execute template file: %v", err), Ctx: ctx}
}
renderedHTML = buf.String()
default:
return mq.Result{Error: fmt.Errorf("no valid rendering approach found"), Ctx: ctx}
}
if err != nil {
return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx, ConditionStatus: "invalid"}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
resultData := map[string]any{
"html_content": renderedHTML,
"step": "form",
}
bt, _ := json.Marshal(resultData)
return mq.Result{Payload: bt, Ctx: ctx}
}
func NewRenderHTMLNode(id string) *RenderHTMLNode {
return &RenderHTMLNode{Operation: dag.Operation{Key: "render-html", ID: id, Type: dag.Page, Tags: []string{"built-in"}}}
}

61
handlers/log_handler.go Normal file
View File

@@ -0,0 +1,61 @@
package handlers
import (
"context"
"fmt"
"github.com/oarkflow/json"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/log"
)
type LogHandler struct {
dag.Operation
}
func (p *LogHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var row map[string]any
data := task.Payload
if data != nil {
err := json.Unmarshal(data, &row)
if err != nil {
return mq.Result{
Ctx: ctx,
Error: err,
}
}
}
var msg string
toReturn := make(map[string]any)
for k, v := range p.Payload.Mapping {
_, val := dag.GetVal(ctx, v, row)
toReturn[k] = val
}
if val, exist := p.Payload.Data["message"]; exist {
_, v := dag.GetVal(ctx, val.(string), toReturn)
if v != nil {
msg = v.(string)
} else {
msg = val.(string)
}
}
logger := log.Info()
if len(toReturn) > 0 {
for k, v := range toReturn {
logger = logger.Any(k, v)
}
}
logger.Msg(msg)
if _, exist := p.Payload.Data["print"]; exist {
fmt.Println(toReturn, msg)
}
return mq.Result{
Ctx: ctx,
Payload: task.Payload,
}
}
func NewLogHandler(id string) *LogHandler {
return &LogHandler{Operation: dag.Operation{Key: "log", ID: id, Type: dag.Function, Tags: []string{"built-in"}}}
}

23
handlers/print_handler.go Normal file
View File

@@ -0,0 +1,23 @@
package handlers
import (
"context"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
type PrintHandler struct {
dag.Operation
}
func (e *PrintHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
e.Debug(ctx, task)
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
func NewPrintHandler(id string) *PrintHandler {
return &PrintHandler{
Operation: dag.Operation{ID: id, Key: "print", Type: dag.Function},
}
}

22
handlers/start_handler.go Normal file
View File

@@ -0,0 +1,22 @@
package handlers
import (
"context"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
type StartHandler struct {
dag.Operation
}
func (e *StartHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
func NewStartHandler(id string) *StartHandler {
return &StartHandler{
Operation: dag.Operation{ID: id, Key: "start", Type: dag.Function, Tags: []string{"built-in"}},
}
}