This commit is contained in:
Oarkflow
2025-08-09 20:20:51 +05:45
parent e51641f402
commit 4364ce4d79
7 changed files with 139 additions and 19 deletions

View File

@@ -75,3 +75,23 @@ func AvailableDAG() []string {
} }
return op return op
} }
// HasPageNode checks if the DAG contains any Page nodes
func (d *DAG) HasPageNode() bool {
return d.hasPageNode
}
// ContainsPageNodes iterates through all nodes to check if any are Page nodes
// This method provides an alternative way to check for Page nodes by examining
// the actual nodes rather than relying on the cached hasPageNode field
func (d *DAG) ContainsPageNodes() bool {
var hasPage bool
d.nodes.ForEach(func(_ string, node *Node) bool {
if node.NodeType == Page {
hasPage = true
return false // Stop iteration when found
}
return true // Continue iteration
})
return hasPage
}

View File

@@ -3,6 +3,7 @@ package handlers
import ( import (
"context" "context"
"github.com/oarkflow/json"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag" "github.com/oarkflow/mq/dag"
) )
@@ -11,8 +12,24 @@ type OutputHandler struct {
dag.Operation dag.Operation
} }
func (e *OutputHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (c *OutputHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: task.Payload, Ctx: ctx} var templateData map[string]any
if len(task.Payload) > 0 {
if err := json.Unmarshal(task.Payload, &templateData); err != nil {
return mq.Result{Payload: task.Payload, Error: err, Ctx: ctx}
}
}
if templateData == nil {
templateData = make(map[string]any)
}
if c.Payload.Mapping != nil {
for k, v := range c.Payload.Mapping {
_, val := dag.GetVal(ctx, v, templateData)
templateData[k] = val
}
}
bt, _ := json.Marshal(templateData)
return mq.Result{Payload: bt, Ctx: ctx}
} }
func NewOutputHandler(id string) *OutputHandler { func NewOutputHandler(id string) *OutputHandler {

View File

@@ -23,6 +23,10 @@
}, },
"additional_data": { "additional_data": {
"conditions": { "conditions": {
"default": {
"id": "condition:default",
"node": "output"
},
"invalid": { "invalid": {
"id": "condition:invalid_login", "id": "condition:invalid_login",
"node": "error-page", "node": "error-page",
@@ -59,6 +63,15 @@
"template_file": "app/templates/error.html" "template_file": "app/templates/error.html"
} }
} }
},
{
"id": "output",
"node": "output",
"data": {
"mapping": {
"login_message": "eval.{{'Login successful!'}}"
}
}
} }
], ],
"edges": [ "edges": [

View File

@@ -0,0 +1,29 @@
{
"name": "Email Notification System",
"key": "email:notification",
"nodes": [
{
"id": "Login",
"name": "Check Login",
"node_key": "login:flow",
"first_node": true
},
{
"id": "ContactForm",
"node": "render-html",
"data": {
"additional_data": {
"schema_file": "schema.json",
"template_file": "app/templates/basic.html"
}
}
}
],
"edges": [
{
"source": "Login.output",
"label": "on_success",
"target": [ "ContactForm" ]
}
]
}

View File

@@ -12,20 +12,39 @@ import (
"github.com/oarkflow/mq/services" "github.com/oarkflow/mq/services"
) )
func main() { var availableHandlers = map[string]bool{
handlerBytes, err := os.ReadFile("json/login.json") "json/login.json": false,
if err != nil { "json/send-email.json": true,
panic(err) }
}
var handler services.Handler func SetupHandler() (*dag.DAG, error) {
err = json.Unmarshal(handlerBytes, &handler) var flowToServe *dag.DAG
if err != nil {
panic(err)
}
brokerAddr := ":8081" brokerAddr := ":8081"
flow := services.SetupHandler(handler, brokerAddr) for file, serve := range availableHandlers {
if flow.Error != nil { handlerBytes, err := os.ReadFile(file)
fmt.Println("Error setting up handler:", flow.Error) if err != nil {
panic(err)
}
var handler services.Handler
err = json.Unmarshal(handlerBytes, &handler)
if err != nil {
panic(err)
}
flow := services.SetupHandler(handler, brokerAddr)
if flow.Error != nil {
return nil, flow.Error
}
if serve {
flowToServe = flow
}
}
return flowToServe, nil
}
func main() {
flow, err := SetupHandler()
if err != nil {
fmt.Println("Error setting up handlers:", err)
return return
} }
flow.Start(context.Background(), ":5000") flow.Start(context.Background(), ":5000")
@@ -34,4 +53,5 @@ func main() {
func init() { func init() {
dag.AddHandler("render-html", func(id string) mq.Processor { return handlers.NewRenderHTMLNode(id) }) dag.AddHandler("render-html", func(id string) mq.Processor { return handlers.NewRenderHTMLNode(id) })
dag.AddHandler("condition", func(id string) mq.Processor { return handlers.NewCondition(id) }) dag.AddHandler("condition", func(id string) mq.Processor { return handlers.NewCondition(id) })
dag.AddHandler("output", func(id string) mq.Processor { return handlers.NewOutputHandler(id) })
} }

View File

@@ -41,18 +41,36 @@ func SetupHandler(handler Handler, brokerAddr string, async ...bool) *dag.DAG {
if len(async) > 0 { if len(async) > 0 {
syncMode = async[0] syncMode = async[0]
} }
key := fmt.Sprintf(`%s-%v`, handler.Key, syncMode) key := handler.Key
existingDAG := dag.GetDAG(key) existingDAG := dag.GetDAG(key)
if existingDAG != nil { if existingDAG != nil {
return existingDAG return existingDAG
} }
flow := dag.NewDAG(handler.Name, handler.Key, nil, mq.WithSyncMode(syncMode), mq.WithBrokerURL(brokerAddr)) flow := dag.NewDAG(handler.Name, handler.Key, nil, mq.WithSyncMode(syncMode), mq.WithBrokerURL(brokerAddr))
for _, node := range handler.Nodes { for _, node := range handler.Nodes {
err := prepareNode(flow, node) if node.Node == "" && node.NodeKey == "" {
if err != nil { flow.Error = errors.New("Node not defined " + node.ID)
flow.Error = err
return flow return flow
} }
if node.Node != "" {
err := prepareNode(flow, node)
if err != nil {
flow.Error = err
return flow
}
} else if node.NodeKey != "" {
newDag := dag.GetDAG(node.NodeKey)
if newDag == nil {
flow.Error = errors.New("DAG not found " + node.NodeKey)
return flow
}
nodeType := dag.Function
if newDag.HasPageNode() {
nodeType = dag.Page
}
fmt.Println(node.Name, node.ID, node.NodeKey, node.FirstNode)
flow.AddDAGNode(nodeType, node.Name, node.ID, newDag, node.FirstNode)
}
} }
for _, edge := range handler.Edges { for _, edge := range handler.Edges {
if edge.Label == "" { if edge.Label == "" {
@@ -133,6 +151,8 @@ func prepareNode(flow *dag.DAG, node Node) error {
Reverse: cond.FilterGroup.Reverse, Reverse: cond.FilterGroup.Reverse,
Filters: fillers, Filters: fillers,
} }
} else {
conditions[key] = nil
} }
} }
flow.AddCondition(node.ID, condition) flow.AddCondition(node.ID, condition)

View File

@@ -136,6 +136,7 @@ type Data struct {
type Node struct { type Node struct {
Name string `json:"name,omitempty" yaml:"name,omitempty"` Name string `json:"name,omitempty" yaml:"name,omitempty"`
ID string `json:"id" yaml:"id"` ID string `json:"id" yaml:"id"`
NodeKey string `json:"node_key" yaml:"node_key"`
Node string `json:"node" yaml:"node"` Node string `json:"node" yaml:"node"`
Data Data `json:"data" yaml:"data"` Data Data `json:"data" yaml:"data"`
FirstNode bool `json:"first_node" yaml:"first_node"` FirstNode bool `json:"first_node" yaml:"first_node"`