feat: [wip] - Implement html node

This commit is contained in:
sujit
2024-11-23 09:20:07 +05:45
parent cbe93dd9ca
commit 6b87422a1a
6 changed files with 269 additions and 38 deletions

View File

@@ -4,12 +4,13 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/sio"
"net/http" "net/http"
"os" "os"
"strings" "strings"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/sio"
"github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/jsonparser" "github.com/oarkflow/mq/jsonparser"
) )

View File

@@ -2,15 +2,18 @@ package v2
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/sio"
"golang.org/x/time/rate"
"log" "log"
"net/http" "net/http"
"strings" "strings"
"time" "time"
"golang.org/x/time/rate"
"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/sio"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage"
"github.com/oarkflow/mq/storage/memory" "github.com/oarkflow/mq/storage/memory"
@@ -271,6 +274,13 @@ func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string)
return tm return tm
} }
func (tm *DAG) getCurrentNode(manager *TaskManager) string {
if manager.currentNodePayload.Size() == 0 {
return ""
}
return manager.currentNodePayload.Keys()[0]
}
func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
ctx = context.WithValue(ctx, "task_id", task.ID) ctx = context.WithValue(ctx, "task_id", task.ID)
userContext := UserContext(ctx) userContext := UserContext(ctx)
@@ -283,11 +293,24 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
} else { } else {
manager.resultCh = resultCh manager.resultCh = resultCh
} }
currentNode := strings.Split(manager.currentNode, Delimiter)[0] currentKey := tm.getCurrentNode(manager)
currentNode := strings.Split(currentKey, Delimiter)[0]
node, exists := tm.nodes.Get(currentNode) node, exists := tm.nodes.Get(currentNode)
method, ok := ctx.Value("method").(string) method, ok := ctx.Value("method").(string)
if method == "GET" && exists && node.NodeType == Page { if method == "GET" && exists && node.NodeType == Page {
ctx = context.WithValue(ctx, "initial_node", currentNode) ctx = context.WithValue(ctx, "initial_node", currentNode)
/*
if isLastNode, err := tm.IsLastNode(currentNode); err != nil && isLastNode {
if manager.result != nil {
fmt.Println(string(manager.result.Payload))
resultCh <- *manager.result
return <-resultCh
}
}
*/
if manager.result != nil {
task.Payload = manager.result.Payload
}
} else if next == "true" { } else if next == "true" {
nodes, err := tm.GetNextNodes(currentNode) nodes, err := tm.GetNextNodes(currentNode)
if err != nil { if err != nil {
@@ -297,6 +320,17 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
ctx = context.WithValue(ctx, "initial_node", nodes[0].ID) ctx = context.WithValue(ctx, "initial_node", nodes[0].ID)
} }
} }
if currentNodeResult, hasResult := manager.currentNodeResult.Get(currentKey); hasResult {
var taskPayload, resultPayload map[string]any
if err := json.Unmarshal(task.Payload, &taskPayload); err == nil {
if err = json.Unmarshal(currentNodeResult.Payload, &resultPayload); err == nil {
for key, val := range resultPayload {
taskPayload[key] = val
}
task.Payload, _ = json.Marshal(taskPayload)
}
}
}
firstNode, err := tm.parseInitialNode(ctx) firstNode, err := tm.parseInitialNode(ctx)
if err != nil { if err != nil {
return mq.Result{Error: err, Ctx: ctx} return mq.Result{Error: err, Ctx: ctx}
@@ -410,7 +444,8 @@ func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.Sche
} else { } else {
manager.resultCh = resultCh manager.resultCh = resultCh
} }
currentNode := strings.Split(manager.currentNode, Delimiter)[0] currentKey := tm.getCurrentNode(manager)
currentNode := strings.Split(currentKey, Delimiter)[0]
node, exists := tm.nodes.Get(currentNode) node, exists := tm.nodes.Get(currentNode)
method, ok := ctx.Value("method").(string) method, ok := ctx.Value("method").(string)
if method == "GET" && exists && node.NodeType == Page { if method == "GET" && exists && node.NodeType == Page {
@@ -424,6 +459,17 @@ func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.Sche
ctx = context.WithValue(ctx, "initial_node", nodes[0].ID) ctx = context.WithValue(ctx, "initial_node", nodes[0].ID)
} }
} }
if currentNodeResult, hasResult := manager.currentNodeResult.Get(currentKey); hasResult {
var taskPayload, resultPayload map[string]any
if err := json.Unmarshal(payload, &taskPayload); err == nil {
if err = json.Unmarshal(currentNodeResult.Payload, &resultPayload); err == nil {
for key, val := range resultPayload {
taskPayload[key] = val
}
payload, _ = json.Marshal(taskPayload)
}
}
}
firstNode, err := tm.parseInitialNode(ctx) firstNode, err := tm.parseInitialNode(ctx)
if err != nil { if err != nil {
return mq.Result{Error: err, Ctx: ctx} return mq.Result{Error: err, Ctx: ctx}

View File

@@ -4,11 +4,12 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/oarkflow/mq"
"log" "log"
"strings" "strings"
"time" "time"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/storage" "github.com/oarkflow/mq/storage"
"github.com/oarkflow/mq/storage/memory" "github.com/oarkflow/mq/storage/memory"
) )
@@ -43,7 +44,9 @@ type TaskManager struct {
childNodes storage.IMap[string, int] childNodes storage.IMap[string, int]
deferredTasks storage.IMap[string, *task] deferredTasks storage.IMap[string, *task]
iteratorNodes storage.IMap[string, []Edge] iteratorNodes storage.IMap[string, []Edge]
currentNode string currentNodePayload storage.IMap[string, json.RawMessage]
currentNodeResult storage.IMap[string, mq.Result]
result *mq.Result
dag *DAG dag *DAG
taskID string taskID string
taskQueue chan *task taskQueue chan *task
@@ -74,6 +77,8 @@ func NewTaskManager(dag *DAG, taskID string, resultCh chan mq.Result, iteratorNo
parentNodes: memory.New[string, string](), parentNodes: memory.New[string, string](),
childNodes: memory.New[string, int](), childNodes: memory.New[string, int](),
deferredTasks: memory.New[string, *task](), deferredTasks: memory.New[string, *task](),
currentNodePayload: memory.New[string, json.RawMessage](),
currentNodeResult: memory.New[string, mq.Result](),
taskQueue: make(chan *task, DefaultChannelSize), taskQueue: make(chan *task, DefaultChannelSize),
resultQueue: make(chan nodeResult, DefaultChannelSize), resultQueue: make(chan nodeResult, DefaultChannelSize),
iteratorNodes: iteratorNodes, iteratorNodes: iteratorNodes,
@@ -147,16 +152,21 @@ func (tm *TaskManager) processNode(exec *task) {
} }
state.Status = mq.Processing state.Status = mq.Processing
state.UpdatedAt = time.Now() state.UpdatedAt = time.Now()
tm.currentNode = exec.nodeID tm.currentNodePayload.Clear()
tm.currentNodeResult.Clear()
tm.currentNodePayload.Set(exec.nodeID, exec.payload)
result := node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID)) result := node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID))
tm.currentNodeResult.Set(exec.nodeID, result)
state.Result = result state.Result = result
result.Topic = node.ID result.Topic = node.ID
if result.Error != nil { if result.Error != nil {
tm.result = &result
tm.resultCh <- result tm.resultCh <- result
tm.processFinalResult(state) tm.processFinalResult(state)
return return
} }
if node.NodeType == Page { if node.NodeType == Page {
tm.result = &result
tm.resultCh <- result tm.resultCh <- result
return return
} }
@@ -223,6 +233,7 @@ func (tm *TaskManager) handlePrevious(ctx context.Context, state *TaskState, res
} }
} }
} else { } else {
tm.result = &state.Result
state.Result.Topic = strings.Split(state.NodeID, Delimiter)[0] state.Result.Topic = strings.Split(state.NodeID, Delimiter)[0]
tm.resultCh <- state.Result tm.resultCh <- state.Result
tm.processFinalResult(state) tm.processFinalResult(state)

158
examples/form.go Normal file
View File

@@ -0,0 +1,158 @@
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/oarkflow/jet"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/consts"
v2 "github.com/oarkflow/mq/dag/v2"
)
func main() {
flow := v2.NewDAG("Multi-Step Form", "multi-step-form", func(taskID string, result mq.Result) {
fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
})
flow.AddNode(v2.Page, "FormStep1", "FormStep1", &FormStep1{})
flow.AddNode(v2.Page, "FormStep2", "FormStep2", &FormStep2{})
flow.AddNode(v2.Page, "FormResult", "FormResult", &FormResult{})
// Define edges
flow.AddEdge(v2.Simple, "FormStep1", "FormStep1", "FormStep2")
flow.AddEdge(v2.Simple, "FormStep2", "FormStep2", "FormResult")
// Start the flow
if flow.Error != nil {
panic(flow.Error)
}
flow.Start(context.Background(), "0.0.0.0:8082")
}
type FormStep1 struct {
v2.Operation
}
func (p *FormStep1) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
bt := []byte(`
<html>
<body>
<form method="post" action="/process?task_id={{task_id}}&next=true">
<label>Name:</label>
<input type="text" name="name" required>
<label>Age:</label>
<input type="number" name="age" required>
<button type="submit">Next</button>
</form>
</body
</html
`)
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
rs, err := parser.ParseTemplate(string(bt), map[string]any{
"task_id": ctx.Value("task_id"),
})
if err != nil {
fmt.Println("FormStep1", string(task.Payload))
return mq.Result{Error: err, Ctx: ctx}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
data := map[string]any{
"html_content": rs,
}
bt, _ = json.Marshal(data)
return mq.Result{Payload: bt, Ctx: ctx}
}
type FormStep2 struct {
v2.Operation
}
func (p *FormStep2) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
// Parse input from Step 1
var inputData map[string]any
if err := json.Unmarshal(task.Payload, &inputData); err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
// Determine dynamic content
isEligible := inputData["age"] == "18"
inputData["show_voting_controls"] = isEligible
bt := []byte(`
<html>
<body>
<form method="post" action="/process?task_id={{task_id}}&next=true">
{{ if show_voting_controls }}
<label>Do you want to register to vote?</label>
<input type="checkbox" name="register_vote">
<button type="submit">Next</button>
{{ else }}
<p>You are not eligible to vote.</p>
{{ end }}
</form>
</body>
</html>
`)
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
inputData["task_id"] = ctx.Value("task_id")
rs, err := parser.ParseTemplate(string(bt), inputData)
if err != nil {
fmt.Println("FormStep2", inputData)
return mq.Result{Error: err, Ctx: ctx}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
inputData["html_content"] = rs
bt, _ = json.Marshal(inputData)
return mq.Result{Payload: bt, Ctx: ctx}
}
type FormResult struct {
v2.Operation
}
func (p *FormResult) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
// Load HTML template for results
bt := []byte(`
<html>
<body>
<h1>Form Summary</h1>
<p>Name: {{ name }}</p>
<p>Age: {{ age }}</p>
{{ if register_vote }}
<p>You have registered to vote!</p>
{{ else }}
<p>You did not register to vote.</p>
{{ end }}
</body>
</html>
`)
var inputData map[string]any
if task.Payload != nil {
if err := json.Unmarshal(task.Payload, &inputData); err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
}
if inputData != nil {
if isEligible, ok := inputData["register_vote"].(string); ok {
inputData["register_vote"] = isEligible
} else {
inputData["register_vote"] = false
}
}
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
rs, err := parser.ParseTemplate(string(bt), inputData)
if err != nil {
return mq.Result{Error: err, Ctx: ctx}
}
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
inputData["html_content"] = rs
bt, _ = json.Marshal(inputData)
return mq.Result{Payload: bt, Ctx: ctx}
}

View File

@@ -66,7 +66,7 @@
<textarea id="payload" class="w-full p-2 border rounded-md" placeholder="Enter your JSON Payload here">[{"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"}]</textarea> <textarea id="payload" class="w-full p-2 border rounded-md" placeholder="Enter your JSON Payload here">[{"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"}]</textarea>
<button id="send-request" class="ml-4 py-2 px-6 bg-green-500 text-white rounded-md">Send</button> <button id="send-request" class="ml-4 py-2 px-6 bg-green-500 text-white rounded-md">Send</button>
</div> </div>
<div> <div id="response">
<h1 class="text-xl font-semibold text-gray-700 mb-4">Table</h1> <h1 class="text-xl font-semibold text-gray-700 mb-4">Table</h1>
<div class="overflow-auto scrollbar h-48"> <div class="overflow-auto scrollbar h-48">
<table class="min-w-full border-collapse border border-gray-300"> <table class="min-w-full border-collapse border border-gray-300">

View File

@@ -13,23 +13,38 @@
} }
window.onload = function() { window.onload = function() {
loadSVG('http://localhost:8083/ui'); loadSVG('http://localhost:8082/ui');
}; };
document.getElementById('send-request').addEventListener('click', function() { document.getElementById('send-request').addEventListener('click', function() {
const input = document.getElementById('payload'); const input = document.getElementById('payload');
const payloadData = JSON.parse(input.value); const payloadData = JSON.parse(input.value);
const data = { payload: payloadData }; const data = { payload: payloadData };
fetch('http://localhost:8083/request', { fetch('http://localhost:8082/request', {
method: 'POST', method: 'POST',
headers: { headers: {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}, },
body: JSON.stringify(data), body: JSON.stringify(data),
}) })
.then(response => response.json()) .then(response => {
.then(data => console.log('Success:', data)) const contentType = response.headers.get('Content-Type');
.catch((error) => console.error('Error:', error)); if (contentType && contentType.includes("text/html")) {
return response.text().then(html => ({ html }));
} else {
return response.json();
}
})
.then(data => {
if (data.html) {
const el = document.getElementById("response");
el.innerHTML = data.html;
} else {
console.log("Success", data);
}
})
.catch(error => console.error('Error:', error));
}); });
const tasks = {}; const tasks = {};