From 78b266ac0469cf473feaa7f7404aa1da039ee7c9 Mon Sep 17 00:00:00 2001 From: sujit Date: Mon, 18 Nov 2024 11:17:25 +0545 Subject: [PATCH] feat: add task completion --- examples/v2.go | 24 +++++++++--------------- examples/webroot/result.html | 35 +++++++++++++++++------------------ 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/examples/v2.go b/examples/v2.go index 590ee0f..49f56e1 100644 --- a/examples/v2.go +++ b/examples/v2.go @@ -10,14 +10,16 @@ import ( "time" "golang.org/x/exp/maps" + + "github.com/oarkflow/mq/storage" + "github.com/oarkflow/mq/storage/memory" ) type DAG struct { Nodes map[string]*Node Edges map[string][]string ParentNodes map[string]string - taskManager map[string]*TaskManager - mu sync.Mutex + taskManager storage.IMap[string, *TaskManager] finalResult func(taskID string, result Result) } @@ -26,20 +28,16 @@ func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG { Nodes: make(map[string]*Node), Edges: make(map[string][]string), ParentNodes: make(map[string]string), - taskManager: make(map[string]*TaskManager), + taskManager: memory.New[string, *TaskManager](), finalResult: finalResultCallback, } } func (tm *DAG) AddNode(nodeID string, handler func(payload json.RawMessage) Result) { - tm.mu.Lock() - defer tm.mu.Unlock() tm.Nodes[nodeID] = &Node{ID: nodeID, Handler: handler} } func (tm *DAG) AddEdge(from string, to ...string) { - tm.mu.Lock() - defer tm.mu.Unlock() tm.Edges[from] = append(tm.Edges[from], to...) for _, targetNode := range to { tm.ParentNodes[targetNode] = from @@ -253,9 +251,7 @@ func (tm *DAG) formHandler(w http.ResponseWriter, r *http.Request) { gender := r.FormValue("gender") taskID := generateTaskID() manager := NewTaskManager(tm) - tm.mu.Lock() - tm.taskManager[taskID] = manager - tm.mu.Unlock() + tm.taskManager.Set(taskID, manager) go manager.Run() payload := fmt.Sprintf(`{"email": "%s", "age": "%s", "gender": "%s"}`, email, age, gender) manager.Trigger(taskID, "NodeA", json.RawMessage(payload)) @@ -270,14 +266,12 @@ func (tm *DAG) resultHandler(w http.ResponseWriter, r *http.Request) { func (tm *DAG) taskStatusHandler(w http.ResponseWriter, r *http.Request) { taskID := r.URL.Query().Get("taskID") if taskID == "" { - http.Error(w, "taskID is missing", http.StatusBadRequest) + http.Error(w, `{"message": "taskID is missing"}`, http.StatusBadRequest) return } - tm.mu.Lock() - manager, ok := tm.taskManager[taskID] - tm.mu.Unlock() + manager, ok := tm.taskManager.Get(taskID) if !ok { - http.Error(w, "Invalid taskID", http.StatusNotFound) + http.Error(w, `{"message": "Invalid TaskID"}`, http.StatusNotFound) return } w.Header().Set("Content-Type", "application/json") diff --git a/examples/webroot/result.html b/examples/webroot/result.html index 3b8ca53..f425325 100644 --- a/examples/webroot/result.html +++ b/examples/webroot/result.html @@ -99,12 +99,14 @@ fetch(`/task-result?taskID=${taskID}`) .then(response => response.json()) .then(data => { - const container = document.getElementById('result'); - let htmlContent = ''; - - // Show final result in a table - htmlContent += ` -

Final Task Result

+ if(data?.message) { + document.getElementById('result').innerHTML = ` +

Error loading task result. ${data.message}


Go back`; + } else { + const container = document.getElementById('result'); + let htmlContent = ''; + htmlContent += ` +

Final Task Result Go back

@@ -120,9 +122,7 @@
Task ID
`; - - // Show result per node - htmlContent += ` + htmlContent += `

Result Per Node

@@ -133,12 +133,10 @@ `; - - // Loop through each node result and display in a table - for (const nodeID in data) { - if (nodeID !== "Result") { - const node = data[nodeID]; - htmlContent += ` + for (const nodeID in data) { + if (nodeID !== "Result") { + const node = data[nodeID]; + htmlContent += ` @@ -146,13 +144,14 @@ `; + } } + htmlContent += '
Node Result Data
${node.NodeID} ${node.Status}
${JSON.stringify(node.Result.Data, null, 2)}
'; + container.innerHTML = htmlContent; } - - htmlContent += ''; - container.innerHTML = htmlContent; }) .catch(error => { + console.log(error) document.getElementById('result').innerHTML = '

Error loading task result.

'; }); } else {