diff --git a/dag/v2/api.go b/dag/v2/api.go index ac02df3..f2b6f4a 100644 --- a/dag/v2/api.go +++ b/dag/v2/api.go @@ -4,12 +4,13 @@ import ( "context" "encoding/json" "fmt" - "github.com/oarkflow/mq" - "github.com/oarkflow/mq/sio" "net/http" "os" "strings" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/sio" + "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/jsonparser" ) diff --git a/dag/v2/dag.go b/dag/v2/dag.go index 4b95a7c..0e0cf28 100644 --- a/dag/v2/dag.go +++ b/dag/v2/dag.go @@ -2,15 +2,18 @@ package v2 import ( "context" + "encoding/json" "fmt" - "github.com/oarkflow/mq/consts" - "github.com/oarkflow/mq/sio" - "golang.org/x/time/rate" "log" "net/http" "strings" "time" + "golang.org/x/time/rate" + + "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/sio" + "github.com/oarkflow/mq" "github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage/memory" @@ -271,6 +274,13 @@ func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) return tm } +func (tm *DAG) getCurrentNode(manager *TaskManager) string { + if manager.currentNodePayload.Size() == 0 { + return "" + } + return manager.currentNodePayload.Keys()[0] +} + func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { ctx = context.WithValue(ctx, "task_id", task.ID) userContext := UserContext(ctx) @@ -283,11 +293,24 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { } else { manager.resultCh = resultCh } - currentNode := strings.Split(manager.currentNode, Delimiter)[0] + currentKey := tm.getCurrentNode(manager) + currentNode := strings.Split(currentKey, Delimiter)[0] node, exists := tm.nodes.Get(currentNode) method, ok := ctx.Value("method").(string) if method == "GET" && exists && node.NodeType == Page { ctx = context.WithValue(ctx, "initial_node", currentNode) + /* + if isLastNode, err := tm.IsLastNode(currentNode); err != nil && isLastNode { + if manager.result != nil { + fmt.Println(string(manager.result.Payload)) + resultCh <- *manager.result + return <-resultCh + } + } + */ + if manager.result != nil { + task.Payload = manager.result.Payload + } } else if next == "true" { nodes, err := tm.GetNextNodes(currentNode) if err != nil { @@ -297,6 +320,17 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { ctx = context.WithValue(ctx, "initial_node", nodes[0].ID) } } + if currentNodeResult, hasResult := manager.currentNodeResult.Get(currentKey); hasResult { + var taskPayload, resultPayload map[string]any + if err := json.Unmarshal(task.Payload, &taskPayload); err == nil { + if err = json.Unmarshal(currentNodeResult.Payload, &resultPayload); err == nil { + for key, val := range resultPayload { + taskPayload[key] = val + } + task.Payload, _ = json.Marshal(taskPayload) + } + } + } firstNode, err := tm.parseInitialNode(ctx) if err != nil { return mq.Result{Error: err, Ctx: ctx} @@ -410,7 +444,8 @@ func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.Sche } else { manager.resultCh = resultCh } - currentNode := strings.Split(manager.currentNode, Delimiter)[0] + currentKey := tm.getCurrentNode(manager) + currentNode := strings.Split(currentKey, Delimiter)[0] node, exists := tm.nodes.Get(currentNode) method, ok := ctx.Value("method").(string) if method == "GET" && exists && node.NodeType == Page { @@ -424,6 +459,17 @@ func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.Sche ctx = context.WithValue(ctx, "initial_node", nodes[0].ID) } } + if currentNodeResult, hasResult := manager.currentNodeResult.Get(currentKey); hasResult { + var taskPayload, resultPayload map[string]any + if err := json.Unmarshal(payload, &taskPayload); err == nil { + if err = json.Unmarshal(currentNodeResult.Payload, &resultPayload); err == nil { + for key, val := range resultPayload { + taskPayload[key] = val + } + payload, _ = json.Marshal(taskPayload) + } + } + } firstNode, err := tm.parseInitialNode(ctx) if err != nil { return mq.Result{Error: err, Ctx: ctx} diff --git a/dag/v2/task_manager.go b/dag/v2/task_manager.go index 0a2cb03..caf8dbf 100644 --- a/dag/v2/task_manager.go +++ b/dag/v2/task_manager.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "fmt" - "github.com/oarkflow/mq" "log" "strings" "time" + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage/memory" ) @@ -38,18 +39,20 @@ type nodeResult struct { } type TaskManager struct { - taskStates storage.IMap[string, *TaskState] - parentNodes storage.IMap[string, string] - childNodes storage.IMap[string, int] - deferredTasks storage.IMap[string, *task] - iteratorNodes storage.IMap[string, []Edge] - currentNode string - dag *DAG - taskID string - taskQueue chan *task - resultQueue chan nodeResult - resultCh chan mq.Result - stopCh chan struct{} + taskStates storage.IMap[string, *TaskState] + parentNodes storage.IMap[string, string] + childNodes storage.IMap[string, int] + deferredTasks storage.IMap[string, *task] + iteratorNodes storage.IMap[string, []Edge] + currentNodePayload storage.IMap[string, json.RawMessage] + currentNodeResult storage.IMap[string, mq.Result] + result *mq.Result + dag *DAG + taskID string + taskQueue chan *task + resultQueue chan nodeResult + resultCh chan mq.Result + stopCh chan struct{} } type task struct { @@ -70,17 +73,19 @@ func newTask(ctx context.Context, taskID, nodeID string, payload json.RawMessage func NewTaskManager(dag *DAG, taskID string, resultCh chan mq.Result, iteratorNodes storage.IMap[string, []Edge]) *TaskManager { tm := &TaskManager{ - taskStates: memory.New[string, *TaskState](), - parentNodes: memory.New[string, string](), - childNodes: memory.New[string, int](), - deferredTasks: memory.New[string, *task](), - taskQueue: make(chan *task, DefaultChannelSize), - resultQueue: make(chan nodeResult, DefaultChannelSize), - iteratorNodes: iteratorNodes, - stopCh: make(chan struct{}), - resultCh: resultCh, - taskID: taskID, - dag: dag, + taskStates: memory.New[string, *TaskState](), + parentNodes: memory.New[string, string](), + childNodes: memory.New[string, int](), + deferredTasks: memory.New[string, *task](), + currentNodePayload: memory.New[string, json.RawMessage](), + currentNodeResult: memory.New[string, mq.Result](), + taskQueue: make(chan *task, DefaultChannelSize), + resultQueue: make(chan nodeResult, DefaultChannelSize), + iteratorNodes: iteratorNodes, + stopCh: make(chan struct{}), + resultCh: resultCh, + taskID: taskID, + dag: dag, } go tm.run() go tm.waitForResult() @@ -147,16 +152,21 @@ func (tm *TaskManager) processNode(exec *task) { } state.Status = mq.Processing state.UpdatedAt = time.Now() - tm.currentNode = exec.nodeID + tm.currentNodePayload.Clear() + tm.currentNodeResult.Clear() + tm.currentNodePayload.Set(exec.nodeID, exec.payload) result := node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID)) + tm.currentNodeResult.Set(exec.nodeID, result) state.Result = result result.Topic = node.ID if result.Error != nil { + tm.result = &result tm.resultCh <- result tm.processFinalResult(state) return } if node.NodeType == Page { + tm.result = &result tm.resultCh <- result return } @@ -223,6 +233,7 @@ func (tm *TaskManager) handlePrevious(ctx context.Context, state *TaskState, res } } } else { + tm.result = &state.Result state.Result.Topic = strings.Split(state.NodeID, Delimiter)[0] tm.resultCh <- state.Result tm.processFinalResult(state) diff --git a/examples/form.go b/examples/form.go new file mode 100644 index 0000000..d20ea26 --- /dev/null +++ b/examples/form.go @@ -0,0 +1,158 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/oarkflow/jet" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/consts" + v2 "github.com/oarkflow/mq/dag/v2" +) + +func main() { + flow := v2.NewDAG("Multi-Step Form", "multi-step-form", func(taskID string, result mq.Result) { + fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) + }) + flow.AddNode(v2.Page, "FormStep1", "FormStep1", &FormStep1{}) + flow.AddNode(v2.Page, "FormStep2", "FormStep2", &FormStep2{}) + flow.AddNode(v2.Page, "FormResult", "FormResult", &FormResult{}) + + // Define edges + flow.AddEdge(v2.Simple, "FormStep1", "FormStep1", "FormStep2") + flow.AddEdge(v2.Simple, "FormStep2", "FormStep2", "FormResult") + + // Start the flow + if flow.Error != nil { + panic(flow.Error) + } + flow.Start(context.Background(), "0.0.0.0:8082") +} + +type FormStep1 struct { + v2.Operation +} + +func (p *FormStep1) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + bt := []byte(` + + + +
+ + + + + +
+ + + +
+ {{ if show_voting_controls }} + + + + {{ else }} +

You are not eligible to vote.

+ {{ end }} +
+ + +`) + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + inputData["task_id"] = ctx.Value("task_id") + rs, err := parser.ParseTemplate(string(bt), inputData) + if err != nil { + fmt.Println("FormStep2", inputData) + return mq.Result{Error: err, Ctx: ctx} + } + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + inputData["html_content"] = rs + bt, _ = json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx} +} + +type FormResult struct { + v2.Operation +} + +func (p *FormResult) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + // Load HTML template for results + bt := []byte(` + + + +

Form Summary

+

Name: {{ name }}

+

Age: {{ age }}

+{{ if register_vote }} +

You have registered to vote!

+{{ else }} +

You did not register to vote.

+{{ end }} + + + + +`) + var inputData map[string]any + if task.Payload != nil { + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + } + if inputData != nil { + if isEligible, ok := inputData["register_vote"].(string); ok { + inputData["register_vote"] = isEligible + } else { + inputData["register_vote"] = false + } + } + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + rs, err := parser.ParseTemplate(string(bt), inputData) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + inputData["html_content"] = rs + bt, _ = json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx} +} diff --git a/examples/webroot/index.html b/examples/webroot/index.html index 9a4690b..662bb0e 100644 --- a/examples/webroot/index.html +++ b/examples/webroot/index.html @@ -66,7 +66,7 @@ -
+

Table

diff --git a/examples/webroot/js/app.js b/examples/webroot/js/app.js index c6eab96..73f3dbb 100644 --- a/examples/webroot/js/app.js +++ b/examples/webroot/js/app.js @@ -13,23 +13,38 @@ } window.onload = function() { - loadSVG('http://localhost:8083/ui'); + loadSVG('http://localhost:8082/ui'); }; document.getElementById('send-request').addEventListener('click', function() { const input = document.getElementById('payload'); const payloadData = JSON.parse(input.value); const data = { payload: payloadData }; - fetch('http://localhost:8083/request', { + fetch('http://localhost:8082/request', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify(data), }) - .then(response => response.json()) - .then(data => console.log('Success:', data)) - .catch((error) => console.error('Error:', error)); + .then(response => { + const contentType = response.headers.get('Content-Type'); + if (contentType && contentType.includes("text/html")) { + return response.text().then(html => ({ html })); + } else { + return response.json(); + } + }) + .then(data => { + if (data.html) { + const el = document.getElementById("response"); + el.innerHTML = data.html; + } else { + console.log("Success", data); + } + }) + .catch(error => console.error('Error:', error)); + }); const tasks = {};