From 4364ce4d79f4296124fad55a0181a24fa8d0e891 Mon Sep 17 00:00:00 2001 From: Oarkflow Date: Sat, 9 Aug 2025 20:20:51 +0545 Subject: [PATCH] update --- dag/operations.go | 20 +++++++++++ handlers/output_handler.go | 21 ++++++++++-- services/examples/json/login.json | 13 ++++++++ services/examples/json/send-email.json | 29 ++++++++++++++++ services/examples/json_email.go | 46 ++++++++++++++++++-------- services/setup.go | 28 +++++++++++++--- services/user_config.go | 1 + 7 files changed, 139 insertions(+), 19 deletions(-) create mode 100644 services/examples/json/send-email.json diff --git a/dag/operations.go b/dag/operations.go index 1c7cb61..7182a73 100644 --- a/dag/operations.go +++ b/dag/operations.go @@ -75,3 +75,23 @@ func AvailableDAG() []string { } return op } + +// HasPageNode checks if the DAG contains any Page nodes +func (d *DAG) HasPageNode() bool { + return d.hasPageNode +} + +// ContainsPageNodes iterates through all nodes to check if any are Page nodes +// This method provides an alternative way to check for Page nodes by examining +// the actual nodes rather than relying on the cached hasPageNode field +func (d *DAG) ContainsPageNodes() bool { + var hasPage bool + d.nodes.ForEach(func(_ string, node *Node) bool { + if node.NodeType == Page { + hasPage = true + return false // Stop iteration when found + } + return true // Continue iteration + }) + return hasPage +} diff --git a/handlers/output_handler.go b/handlers/output_handler.go index beb910f..1406200 100644 --- a/handlers/output_handler.go +++ b/handlers/output_handler.go @@ -3,6 +3,7 @@ package handlers import ( "context" + "github.com/oarkflow/json" "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" ) @@ -11,8 +12,24 @@ type OutputHandler struct { dag.Operation } -func (e *OutputHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - return mq.Result{Payload: task.Payload, Ctx: ctx} +func (c *OutputHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var templateData map[string]any + if len(task.Payload) > 0 { + if err := json.Unmarshal(task.Payload, &templateData); err != nil { + return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx} + } + } + if templateData == nil { + templateData = make(map[string]any) + } + if c.Payload.Mapping != nil { + for k, v := range c.Payload.Mapping { + _, val := dag.GetVal(ctx, v, templateData) + templateData[k] = val + } + } + bt, _ := json.Marshal(templateData) + return mq.Result{Payload: bt, Ctx: ctx} } func NewOutputHandler(id string) *OutputHandler { diff --git a/services/examples/json/login.json b/services/examples/json/login.json index b97ac7e..57b2b5b 100644 --- a/services/examples/json/login.json +++ b/services/examples/json/login.json @@ -23,6 +23,10 @@ }, "additional_data": { "conditions": { + "default": { + "id": "condition:default", + "node": "output" + }, "invalid": { "id": "condition:invalid_login", "node": "error-page", @@ -59,6 +63,15 @@ "template_file": "app/templates/error.html" } } + }, + { + "id": "output", + "node": "output", + "data": { + "mapping": { + "login_message": "eval.{{'Login successful!'}}" + } + } } ], "edges": [ diff --git a/services/examples/json/send-email.json b/services/examples/json/send-email.json new file mode 100644 index 0000000..272ae01 --- /dev/null +++ b/services/examples/json/send-email.json @@ -0,0 +1,29 @@ +{ + "name": "Email Notification System", + "key": "email:notification", + "nodes": [ + { + "id": "Login", + "name": "Check Login", + "node_key": "login:flow", + "first_node": true + }, + { + "id": "ContactForm", + "node": "render-html", + "data": { + "additional_data": { + "schema_file": "schema.json", + "template_file": "app/templates/basic.html" + } + } + } + ], + "edges": [ + { + "source": "Login.output", + "label": "on_success", + "target": [ "ContactForm" ] + } + ] +} diff --git a/services/examples/json_email.go b/services/examples/json_email.go index 7522b7f..5e5fea2 100644 --- a/services/examples/json_email.go +++ b/services/examples/json_email.go @@ -12,20 +12,39 @@ import ( "github.com/oarkflow/mq/services" ) -func main() { - handlerBytes, err := os.ReadFile("json/login.json") - if err != nil { - panic(err) - } - var handler services.Handler - err = json.Unmarshal(handlerBytes, &handler) - if err != nil { - panic(err) - } +var availableHandlers = map[string]bool{ + "json/login.json": false, + "json/send-email.json": true, +} + +func SetupHandler() (*dag.DAG, error) { + var flowToServe *dag.DAG brokerAddr := ":8081" - flow := services.SetupHandler(handler, brokerAddr) - if flow.Error != nil { - fmt.Println("Error setting up handler:", flow.Error) + for file, serve := range availableHandlers { + handlerBytes, err := os.ReadFile(file) + if err != nil { + panic(err) + } + var handler services.Handler + err = json.Unmarshal(handlerBytes, &handler) + if err != nil { + panic(err) + } + flow := services.SetupHandler(handler, brokerAddr) + if flow.Error != nil { + return nil, flow.Error + } + if serve { + flowToServe = flow + } + } + return flowToServe, nil +} + +func main() { + flow, err := SetupHandler() + if err != nil { + fmt.Println("Error setting up handlers:", err) return } flow.Start(context.Background(), ":5000") @@ -34,4 +53,5 @@ func main() { func init() { dag.AddHandler("render-html", func(id string) mq.Processor { return handlers.NewRenderHTMLNode(id) }) dag.AddHandler("condition", func(id string) mq.Processor { return handlers.NewCondition(id) }) + dag.AddHandler("output", func(id string) mq.Processor { return handlers.NewOutputHandler(id) }) } diff --git a/services/setup.go b/services/setup.go index b7fb058..e9b594a 100644 --- a/services/setup.go +++ b/services/setup.go @@ -41,18 +41,36 @@ func SetupHandler(handler Handler, brokerAddr string, async ...bool) *dag.DAG { if len(async) > 0 { syncMode = async[0] } - key := fmt.Sprintf(`%s-%v`, handler.Key, syncMode) + key := handler.Key existingDAG := dag.GetDAG(key) if existingDAG != nil { return existingDAG } flow := dag.NewDAG(handler.Name, handler.Key, nil, mq.WithSyncMode(syncMode), mq.WithBrokerURL(brokerAddr)) for _, node := range handler.Nodes { - err := prepareNode(flow, node) - if err != nil { - flow.Error = err + if node.Node == "" && node.NodeKey == "" { + flow.Error = errors.New("Node not defined " + node.ID) return flow } + if node.Node != "" { + err := prepareNode(flow, node) + if err != nil { + flow.Error = err + return flow + } + } else if node.NodeKey != "" { + newDag := dag.GetDAG(node.NodeKey) + if newDag == nil { + flow.Error = errors.New("DAG not found " + node.NodeKey) + return flow + } + nodeType := dag.Function + if newDag.HasPageNode() { + nodeType = dag.Page + } + fmt.Println(node.Name, node.ID, node.NodeKey, node.FirstNode) + flow.AddDAGNode(nodeType, node.Name, node.ID, newDag, node.FirstNode) + } } for _, edge := range handler.Edges { if edge.Label == "" { @@ -133,6 +151,8 @@ func prepareNode(flow *dag.DAG, node Node) error { Reverse: cond.FilterGroup.Reverse, Filters: fillers, } + } else { + conditions[key] = nil } } flow.AddCondition(node.ID, condition) diff --git a/services/user_config.go b/services/user_config.go index 4e9e294..6f2f1ef 100644 --- a/services/user_config.go +++ b/services/user_config.go @@ -136,6 +136,7 @@ type Data struct { type Node struct { Name string `json:"name,omitempty" yaml:"name,omitempty"` ID string `json:"id" yaml:"id"` + NodeKey string `json:"node_key" yaml:"node_key"` Node string `json:"node" yaml:"node"` Data Data `json:"data" yaml:"data"` FirstNode bool `json:"first_node" yaml:"first_node"`