feat: [wip] - Implement html node

This commit is contained in:
sujit
2024-11-18 07:30:33 +05:45
parent 59f7db8e26
commit b9f5b44f04
4 changed files with 533 additions and 2 deletions

View File

@@ -12,11 +12,18 @@ import (
func main() {
f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true))
f.SetNotifyResponse(func(ctx context.Context, result mq.Result) error {
if f.Notifier != nil {
f.Notifier.ToRoom("global", "final-message", result)
}
return nil
})
setup(f)
err := f.Validate()
if err != nil {
panic(err)
}
f.Start(context.Background(), ":8083")
sendData(f)
}
@@ -43,8 +50,7 @@ func setup(f *dag.DAG) {
AddEdge("Get input to loop", "get:input", "loop").
AddIterator("Loop to prepare email", "loop", "prepare:email").
AddEdge("Prepare Email to condition", "prepare:email", "condition").
AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "persistent"}).
AddEdge("Final", "loop", "final")
AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "persistent"})
}
func sendData(f *dag.DAG) {

320
examples/v2.go Normal file
View File

@@ -0,0 +1,320 @@
package main
import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"strconv"
"sync"
"time"
)
type TaskStatus string
const (
StatusPending TaskStatus = "Pending"
StatusProcessing TaskStatus = "Processing"
StatusCompleted TaskStatus = "Completed"
StatusFailed TaskStatus = "Failed"
)
type Result struct {
Data json.RawMessage
Error error
Status TaskStatus
}
type Node struct {
ID string
Handler func(payload json.RawMessage) Result
}
type TaskState struct {
NodeID string
Status TaskStatus
Timestamp time.Time
Result Result
targetResults []Result
}
type nodeResult struct {
taskID string
nodeID string
result Result
}
type TaskManager struct {
Nodes map[string]*Node
Edges map[string][]string
ParentNodes map[string]string
TaskStates map[string]map[string]*TaskState
mu sync.Mutex
taskQueue chan taskExecution
resultQueue chan nodeResult
finalResult func(taskID string, result Result)
}
type taskExecution struct {
taskID string
nodeID string
payload json.RawMessage
}
func NewTaskManager(finalResultCallback func(taskID string, result Result)) *TaskManager {
tm := &TaskManager{
Nodes: make(map[string]*Node),
Edges: make(map[string][]string),
ParentNodes: make(map[string]string),
TaskStates: make(map[string]map[string]*TaskState),
taskQueue: make(chan taskExecution, 100),
resultQueue: make(chan nodeResult, 100),
finalResult: finalResultCallback,
}
go tm.WaitForResult()
return tm
}
func (tm *TaskManager) AddNode(nodeID string, handler func(payload json.RawMessage) Result) {
tm.mu.Lock()
defer tm.mu.Unlock()
tm.Nodes[nodeID] = &Node{ID: nodeID, Handler: handler}
}
func (tm *TaskManager) AddEdge(from string, to ...string) {
tm.mu.Lock()
defer tm.mu.Unlock()
tm.Edges[from] = append(tm.Edges[from], to...)
for _, targetNode := range to {
tm.ParentNodes[targetNode] = from
}
}
func (tm *TaskManager) Trigger(taskID, startNode string, payload json.RawMessage) {
tm.mu.Lock()
if _, exists := tm.TaskStates[taskID]; !exists {
tm.TaskStates[taskID] = make(map[string]*TaskState)
}
tm.TaskStates[taskID][startNode] = &TaskState{
NodeID: startNode,
Status: StatusPending,
Timestamp: time.Now(),
}
tm.mu.Unlock()
tm.taskQueue <- taskExecution{taskID: taskID, nodeID: startNode, payload: payload}
}
func (tm *TaskManager) Run() {
go func() {
for task := range tm.taskQueue {
tm.processNode(task)
}
}()
}
func (tm *TaskManager) processNode(exec taskExecution) {
node, exists := tm.Nodes[exec.nodeID]
if !exists {
fmt.Printf("Node %s does not exist\n", exec.nodeID)
return
}
tm.mu.Lock()
state := tm.TaskStates[exec.taskID][exec.nodeID]
if state == nil {
state = &TaskState{NodeID: exec.nodeID, Status: StatusPending, Timestamp: time.Now()}
tm.TaskStates[exec.taskID][exec.nodeID] = state
}
state.Status = StatusProcessing
state.Timestamp = time.Now()
tm.mu.Unlock()
result := node.Handler(exec.payload)
tm.mu.Lock()
state.Timestamp = time.Now()
state.Result = result
state.Status = result.Status
tm.mu.Unlock()
if result.Status == StatusFailed {
fmt.Printf("Task %s failed at node %s: %v\n", exec.taskID, exec.nodeID, result.Error)
tm.processFinalResult(exec.taskID, state)
return
}
tm.resultQueue <- nodeResult{taskID: exec.taskID, nodeID: exec.nodeID, result: result}
}
func (tm *TaskManager) WaitForResult() {
go func() {
for nr := range tm.resultQueue {
tm.onNodeCompleted(nr)
}
}()
}
func (tm *TaskManager) onNodeCompleted(nodeResult nodeResult) {
nextNodes := tm.Edges[nodeResult.nodeID]
if len(nextNodes) > 0 {
for _, nextNodeID := range nextNodes {
tm.mu.Lock()
if _, exists := tm.TaskStates[nodeResult.taskID][nextNodeID]; !exists {
tm.TaskStates[nodeResult.taskID][nextNodeID] = &TaskState{
NodeID: nextNodeID,
Status: StatusPending,
Timestamp: time.Now(),
}
}
tm.mu.Unlock()
tm.taskQueue <- taskExecution{taskID: nodeResult.taskID, nodeID: nextNodeID, payload: nodeResult.result.Data}
}
} else {
parentNode := tm.ParentNodes[nodeResult.nodeID]
if parentNode != "" {
tm.mu.Lock()
state := tm.TaskStates[nodeResult.taskID][parentNode]
if state == nil {
state = &TaskState{NodeID: parentNode, Status: StatusPending, Timestamp: time.Now()}
tm.TaskStates[nodeResult.taskID][parentNode] = state
}
state.targetResults = append(state.targetResults, nodeResult.result)
allTargetNodesdone := len(tm.Edges[parentNode]) == len(state.targetResults)
tm.mu.Unlock()
if tm.areAllTargetNodesCompleted(parentNode, nodeResult.taskID) && allTargetNodesdone {
tm.aggregateResults(parentNode, nodeResult.taskID)
}
}
}
}
func (tm *TaskManager) areAllTargetNodesCompleted(parentNode string, taskID string) bool {
tm.mu.Lock()
defer tm.mu.Unlock()
for _, targetNode := range tm.Edges[parentNode] {
state := tm.TaskStates[taskID][targetNode]
if state == nil || state.Status != StatusCompleted {
return false
}
}
return true
}
func (tm *TaskManager) aggregateResults(parentNode string, taskID string) {
tm.mu.Lock()
defer tm.mu.Unlock()
state := tm.TaskStates[taskID][parentNode]
if len(state.targetResults) > 1 {
aggregatedData := make([]any, len(state.targetResults))
for i, result := range state.targetResults {
var data map[string]any
json.Unmarshal(result.Data, &data)
aggregatedData[i] = data
}
aggregatedPayload, _ := json.Marshal(aggregatedData)
state.Result = Result{Data: aggregatedPayload, Status: StatusCompleted}
} else if len(state.targetResults) == 1 {
state.Result = state.targetResults[0]
}
tm.processFinalResult(taskID, state)
}
func (tm *TaskManager) processFinalResult(taskID string, state *TaskState) {
clear(state.targetResults)
tm.finalResult(taskID, state.Result)
}
func finalResultCallback(taskID string, result Result) {
fmt.Printf("Final result for Task %s: %s\n", taskID, string(result.Data))
}
func generateTaskID() string {
return strconv.Itoa(rand.Intn(100000)) // Random taskID generation
}
func (tm *TaskManager) formHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
http.ServeFile(w, r, "webroot/form.html")
} else if r.Method == "POST" {
r.ParseForm()
email := r.FormValue("email")
age := r.FormValue("age")
gender := r.FormValue("gender")
taskID := generateTaskID() // Generate TaskID on form submission
payload := fmt.Sprintf(`{"email": "%s", "age": "%s", "gender": "%s"}`, email, age, gender)
tm.Trigger(taskID, "NodeA", json.RawMessage(payload))
http.Redirect(w, r, "/result?taskID="+taskID, http.StatusFound)
}
}
func (tm *TaskManager) resultHandler(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "webroot/result.html")
}
func (tm *TaskManager) taskStatusHandler(w http.ResponseWriter, r *http.Request) {
taskID := r.URL.Query().Get("taskID")
if taskID == "" {
http.Error(w, "taskID is missing", http.StatusBadRequest)
return
}
tm.mu.Lock()
state := tm.TaskStates[taskID]
tm.mu.Unlock()
if state == nil {
http.Error(w, "Invalid taskID", http.StatusNotFound)
return
}
// Return the final result as JSON or other response format
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(state)
}
func main() {
tm := NewTaskManager(finalResultCallback)
tm.AddNode("NodeA", func(payload json.RawMessage) Result {
var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil {
return Result{Error: err, Status: StatusFailed}
}
data["allowed_voting"] = data["age"] == "18"
updatedPayload, _ := json.Marshal(data)
return Result{Data: updatedPayload, Status: StatusCompleted}
})
tm.AddNode("NodeB", func(payload json.RawMessage) Result {
var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil {
return Result{Error: err, Status: StatusFailed}
}
data["female_voter"] = data["gender"] == "female"
updatedPayload, _ := json.Marshal(data)
return Result{Data: updatedPayload, Status: StatusCompleted}
})
tm.AddNode("NodeC", func(payload json.RawMessage) Result {
var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil {
return Result{Error: err, Status: StatusFailed}
}
data["voted"] = true
updatedPayload, _ := json.Marshal(data)
return Result{Data: updatedPayload, Status: StatusCompleted}
})
tm.AddNode("Result", func(payload json.RawMessage) Result {
var data map[string]any
json.Unmarshal(payload, &data)
// Show the final result
// You can render the data as an HTML result here
return Result{Data: payload, Status: StatusCompleted}
})
tm.AddEdge("Form", "NodeA")
tm.AddEdge("NodeA", "NodeB")
tm.AddEdge("NodeB", "NodeC")
tm.AddEdge("NodeC", "Result")
http.HandleFunc("/form", tm.formHandler)
http.HandleFunc("/result", tm.resultHandler)
http.HandleFunc("/task-result", tm.taskStatusHandler)
go tm.Run()
http.ListenAndServe(":8080", nil)
}

View File

@@ -0,0 +1,27 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>User Data Form</title>
</head>
<body>
<h1>Enter Your Information</h1>
<form action="/form" method="POST">
<label for="email">Email:</label><br>
<input type="email" id="email" name="email" value="s.baniya.np@gmail.com" required><br><br>
<label for="age">Age:</label><br>
<input type="number" id="age" name="age" value="18" required><br><br>
<label for="gender">Gender:</label><br>
<select id="gender" name="gender" required>
<option value="male">Male</option>
<option value="female">Female</option>
<option value="other">Other</option>
</select><br><br>
<input type="submit" value="Submit">
</form>
</body>
</html>

View File

@@ -0,0 +1,178 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Task Result</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 0;
background-color: #f4f7fc;
}
h1 {
text-align: center;
color: #333;
padding-top: 20px;
}
.container {
width: 80%;
margin: 0 auto;
padding: 20px;
background-color: white;
border-radius: 8px;
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
}
table {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}
table th, table td {
padding: 10px;
border: 1px solid #ddd;
text-align: left;
}
table th {
background-color: #f1f1f1;
color: #333;
}
.status-pending {
color: orange;
}
.status-processing {
color: blue;
}
.status-completed {
color: green;
}
.status-failed {
color: red;
}
.node-result {
margin-top: 30px;
}
.node-result h2 {
color: #333;
}
.footer {
text-align: center;
margin-top: 40px;
padding-bottom: 20px;
}
</style>
</head>
<body>
<h1>Task Result</h1>
<div class="container" id="result">
<p>Loading result...</p>
</div>
<div class="footer">
<p>&copy; 2024 Task Manager</p>
</div>
<script>
function formatDate(dateStr) {
const date = new Date(dateStr);
return date.toLocaleString();
}
// Fetch the task result
const taskID = new URLSearchParams(window.location.search).get('taskID'); // Get taskID from URL
if (taskID) {
fetch(`/task-result?taskID=${taskID}`)
.then(response => response.json())
.then(data => {
const container = document.getElementById('result');
let htmlContent = '';
// Show final result in a table
htmlContent += `
<h2>Final Task Result</h2>
<table>
<tr>
<th>Task ID</th>
<th>Status</th>
<th>Timestamp</th>
<th>Result Data</th>
</tr>
<tr>
<td>${taskID}</td>
<td class="${getStatusClass(data.Result.Status)}">${data.Result.Status}</td>
<td>${formatDate(data.Result.Timestamp)}</td>
<td><pre>${JSON.stringify(data.Result.Result.Data, null, 2)}</pre></td>
</tr>
</table>
`;
// Show result per node
htmlContent += `
<div class="node-result">
<h2>Result Per Node</h2>
<table>
<tr>
<th>Node ID</th>
<th>Status</th>
<th>Timestamp</th>
<th>Node Result Data</th>
</tr>
`;
// Loop through each node result and display in a table
for (const nodeID in data) {
if (nodeID !== "Result") {
const node = data[nodeID];
htmlContent += `
<tr>
<td>${node.NodeID}</td>
<td class="${getStatusClass(node.Status)}">${node.Status}</td>
<td>${formatDate(node.Timestamp)}</td>
<td><pre>${JSON.stringify(node.Result.Data, null, 2)}</pre></td>
</tr>
`;
}
}
htmlContent += '</table></div>';
container.innerHTML = htmlContent;
})
.catch(error => {
document.getElementById('result').innerHTML = '<p>Error loading task result.</p>';
});
} else {
document.getElementById('result').innerHTML = '<p>Task ID not provided.</p>';
}
function getStatusClass(status) {
switch (status) {
case 'Pending':
return 'status-pending';
case 'Processing':
return 'status-processing';
case 'Completed':
return 'status-completed';
case 'Failed':
return 'status-failed';
default:
return '';
}
}
</script>
</body>
</html>