feat: add task completion

This commit is contained in:
sujit
2024-11-18 13:18:58 +05:45
parent d05deb08b3
commit c49dd3c9e7
2 changed files with 55 additions and 41 deletions

View File

@@ -51,14 +51,18 @@ func NewTaskManager(dag *DAG) *TaskManager {
func (tm *TaskManager) Trigger(taskID, startNode string, payload json.RawMessage) { func (tm *TaskManager) Trigger(taskID, startNode string, payload json.RawMessage) {
tm.mu.Lock() tm.mu.Lock()
tm.taskStates[startNode] = &TaskState{ tm.taskStates[startNode] = newTaskState(startNode)
NodeID: startNode, tm.mu.Unlock()
tm.taskQueue <- taskExecution{taskID: taskID, nodeID: startNode, payload: payload}
}
func newTaskState(nodeID string) *TaskState {
return &TaskState{
NodeID: nodeID,
Status: StatusPending, Status: StatusPending,
Timestamp: time.Now(), Timestamp: time.Now(),
targetResults: make(map[string]Result), targetResults: make(map[string]Result),
} }
tm.mu.Unlock()
tm.taskQueue <- taskExecution{taskID: taskID, nodeID: startNode, payload: payload}
} }
func (tm *TaskManager) Run() { func (tm *TaskManager) Run() {
@@ -78,7 +82,7 @@ func (tm *TaskManager) processNode(exec taskExecution) {
tm.mu.Lock() tm.mu.Lock()
state := tm.taskStates[exec.nodeID] state := tm.taskStates[exec.nodeID]
if state == nil { if state == nil {
state = &TaskState{NodeID: exec.nodeID, Status: StatusPending, Timestamp: time.Now(), targetResults: make(map[string]Result)} state = newTaskState(exec.nodeID)
tm.taskStates[exec.nodeID] = state tm.taskStates[exec.nodeID] = state
} }
state.Status = StatusProcessing state.Status = StatusProcessing

View File

@@ -7,11 +7,7 @@ import (
v2 "github.com/oarkflow/mq/dag/v2" v2 "github.com/oarkflow/mq/dag/v2"
) )
func main() { func NodeA(payload json.RawMessage) v2.Result {
dag := v2.NewDAG(func(taskID string, result v2.Result) {
fmt.Printf("Final result for Task %s: %s\n", taskID, string(result.Data))
})
dag.AddNode("NodeA", func(payload json.RawMessage) v2.Result {
var data map[string]any var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil { if err := json.Unmarshal(payload, &data); err != nil {
return v2.Result{Error: err, Status: v2.StatusFailed} return v2.Result{Error: err, Status: v2.StatusFailed}
@@ -19,8 +15,9 @@ func main() {
data["allowed_voting"] = data["age"] == "18" data["allowed_voting"] = data["age"] == "18"
updatedPayload, _ := json.Marshal(data) updatedPayload, _ := json.Marshal(data)
return v2.Result{Data: updatedPayload, Status: v2.StatusCompleted} return v2.Result{Data: updatedPayload, Status: v2.StatusCompleted}
}) }
dag.AddNode("NodeB", func(payload json.RawMessage) v2.Result {
func NodeB(payload json.RawMessage) v2.Result {
var data map[string]any var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil { if err := json.Unmarshal(payload, &data); err != nil {
return v2.Result{Error: err, Status: v2.StatusFailed} return v2.Result{Error: err, Status: v2.StatusFailed}
@@ -28,8 +25,9 @@ func main() {
data["female_voter"] = data["gender"] == "female" data["female_voter"] = data["gender"] == "female"
updatedPayload, _ := json.Marshal(data) updatedPayload, _ := json.Marshal(data)
return v2.Result{Data: updatedPayload, Status: v2.StatusCompleted} return v2.Result{Data: updatedPayload, Status: v2.StatusCompleted}
}) }
dag.AddNode("NodeC", func(payload json.RawMessage) v2.Result {
func NodeC(payload json.RawMessage) v2.Result {
var data map[string]any var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil { if err := json.Unmarshal(payload, &data); err != nil {
return v2.Result{Error: err, Status: v2.StatusFailed} return v2.Result{Error: err, Status: v2.StatusFailed}
@@ -37,12 +35,24 @@ func main() {
data["voted"] = true data["voted"] = true
updatedPayload, _ := json.Marshal(data) updatedPayload, _ := json.Marshal(data)
return v2.Result{Data: updatedPayload, Status: v2.StatusCompleted} return v2.Result{Data: updatedPayload, Status: v2.StatusCompleted}
}) }
dag.AddNode("Result", func(payload json.RawMessage) v2.Result {
func Result(payload json.RawMessage) v2.Result {
var data map[string]any var data map[string]any
json.Unmarshal(payload, &data) json.Unmarshal(payload, &data)
return v2.Result{Data: payload, Status: v2.StatusCompleted} return v2.Result{Data: payload, Status: v2.StatusCompleted}
}) }
func notify(taskID string, result v2.Result) {
fmt.Printf("Final result for Task %s: %s\n", taskID, string(result.Data))
}
func main() {
dag := v2.NewDAG(notify)
dag.AddNode("NodeA", NodeA)
dag.AddNode("NodeB", NodeB)
dag.AddNode("NodeC", NodeC)
dag.AddNode("Result", Result)
dag.Start(":8080") dag.Start(":8080")
} }