feat: [wip] - Implement html node

This commit is contained in:
sujit
2024-11-16 23:12:55 +05:45
parent c38922a05d
commit 169cd9a64e
5 changed files with 49 additions and 20 deletions

View File

@@ -73,6 +73,7 @@ type DAG struct {
taskContext storage.IMap[string, *TaskManager] taskContext storage.IMap[string, *TaskManager]
nodes map[string]*Node nodes map[string]*Node
iteratorNodes storage.IMap[string, []Edge] iteratorNodes storage.IMap[string, []Edge]
iNodes map[string][]Edge
conditions map[FromNode]map[When]Then conditions map[FromNode]map[When]Then
pool *mq.Pool pool *mq.Pool
taskCleanupCh chan string taskCleanupCh chan string
@@ -366,7 +367,7 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
} }
manager, exists := tm.taskContext.Get(task.ID) manager, exists := tm.taskContext.Get(task.ID)
if !exists { if !exists {
manager = NewTaskManager(tm, task.ID, tm.iteratorNodes.AsMap()) manager = NewTaskManager(tm, task.ID, tm.iNodes)
manager.createdAt = task.CreatedAt manager.createdAt = task.CreatedAt
tm.taskContext.Set(task.ID, manager) tm.taskContext.Set(task.ID, manager)
} }
@@ -426,6 +427,10 @@ func (tm *DAG) check(ctx context.Context, payload []byte) (context.Context, *mq.
if taskID == "" { if taskID == "" {
taskID = mq.NewID() taskID = mq.NewID()
} }
if tm.iNodes == nil {
tm.iNodes = tm.iteratorNodes.AsMap()
tm.iteratorNodes.Clear()
}
return ctx, mq.NewTask(taskID, payload, initialNode), nil return ctx, mq.NewTask(taskID, payload, initialNode), nil
} }

View File

@@ -22,6 +22,7 @@ type TaskManager struct {
topic string topic string
result mq.Result result mq.Result
iteratorNodes map[string][]Edge
taskNodeStatus storage.IMap[string, *taskNodeStatus] taskNodeStatus storage.IMap[string, *taskNodeStatus]
} }
@@ -33,6 +34,7 @@ func NewTaskManager(d *DAG, taskID string, iteratorNodes map[string][]Edge) *Tas
dag: d, dag: d,
taskNodeStatus: memory.New[string, *taskNodeStatus](), taskNodeStatus: memory.New[string, *taskNodeStatus](),
taskID: taskID, taskID: taskID,
iteratorNodes: iteratorNodes,
wg: NewWaitGroup(), wg: NewWaitGroup(),
} }
} }

View File

@@ -152,30 +152,37 @@ func (tm *TaskManager) handleNextTask(ctx context.Context, result mq.Result) mq.
for _, edge := range edges { for _, edge := range edges {
switch edge.Type { switch edge.Type {
case Simple: case Simple:
tm.SetTotalItems(getTopic(ctx, edge.From.Key), len(edge.To)) if _, ok := tm.iteratorNodes[edge.From.Key]; ok {
index, _ := mq.GetHeader(ctx, "index") continue
if index != "" && strings.Contains(index, "__") {
index = strings.Split(index, "__")[1]
} else {
index = "0"
}
for _, target := range edge.To {
tm.wg.Add(1)
go func(ctx context.Context, target *Node, result mq.Result) {
ctxx := context.Background()
if headers, ok := mq.GetHeaders(ctx); ok {
headers.Set(consts.QueueKey, target.Key)
headers.Set("index", fmt.Sprintf("%s__%s", target.Key, index))
ctxx = mq.SetHeaders(ctxx, headers.AsMap())
}
tm.processNode(ctxx, target, result.Payload)
}(ctx, target, result)
} }
tm.processEdge(ctx, edge, result)
} }
} }
return result return result
} }
func (tm *TaskManager) processEdge(ctx context.Context, edge Edge, result mq.Result) {
tm.SetTotalItems(getTopic(ctx, edge.From.Key), len(edge.To))
index, _ := mq.GetHeader(ctx, "index")
if index != "" && strings.Contains(index, "__") {
index = strings.Split(index, "__")[1]
} else {
index = "0"
}
for _, target := range edge.To {
tm.wg.Add(1)
go func(ctx context.Context, target *Node, result mq.Result) {
ctxx := context.Background()
if headers, ok := mq.GetHeaders(ctx); ok {
headers.Set(consts.QueueKey, target.Key)
headers.Set("index", fmt.Sprintf("%s__%s", target.Key, index))
ctxx = mq.SetHeaders(ctxx, headers.AsMap())
}
tm.processNode(ctxx, target, result.Payload)
}(ctx, target, result)
}
}
func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge { func (tm *TaskManager) getConditionalEdges(node *Node, result mq.Result) []Edge {
edges := make([]Edge, len(node.Edges)) edges := make([]Edge, len(node.Edges))
copy(edges, node.Edges) copy(edges, node.Edges)

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag" "github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/examples/tasks" "github.com/oarkflow/mq/examples/tasks"
@@ -35,13 +36,15 @@ func setup(f *dag.DAG) {
AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: dag.Operation{Type: "process"}}). AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: dag.Operation{Type: "process"}}).
AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: dag.Operation{Type: "process"}}). AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: dag.Operation{Type: "process"}}).
AddNode("Get Input", "get:input", &tasks.GetData{Operation: dag.Operation{Type: "input"}}, true). AddNode("Get Input", "get:input", &tasks.GetData{Operation: dag.Operation{Type: "input"}}, true).
AddNode("Final Data", "final", &tasks.Final{Operation: dag.Operation{Type: "page"}}).
AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: dag.Operation{Type: "loop"}}). AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: dag.Operation{Type: "loop"}}).
AddNode("Condition", "condition", &tasks.Condition{Operation: dag.Operation{Type: "condition"}}). AddNode("Condition", "condition", &tasks.Condition{Operation: dag.Operation{Type: "condition"}}).
AddDAGNode("Persistent", "persistent", subDAG()). AddDAGNode("Persistent", "persistent", subDAG()).
AddEdge("Get input to loop", "get:input", "loop"). AddEdge("Get input to loop", "get:input", "loop").
AddIterator("Loop to prepare email", "loop", "prepare:email"). AddIterator("Loop to prepare email", "loop", "prepare:email").
AddEdge("Prepare Email to condition", "prepare:email", "condition"). AddEdge("Prepare Email to condition", "prepare:email", "condition").
AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "persistent"}) AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "persistent"}).
AddEdge("Final", "loop", "final")
} }
func sendData(f *dag.DAG) { func sendData(f *dag.DAG) {

View File

@@ -109,3 +109,15 @@ func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.R
} }
return mq.Result{Payload: task.Payload, Ctx: ctx} return mq.Result{Payload: task.Payload, Ctx: ctx}
} }
type Final struct {
dag.Operation
}
func (e *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
rs := map[string]any{
"content": `<strong>Processed successfully!</strong>`,
}
bt, _ := json.Marshal(rs)
return mq.Result{Payload: bt, Ctx: ctx}
}