mirror of
https://github.com/oarkflow/mq.git
synced 2025-11-03 05:23:19 +08:00
feat: add task completion
This commit is contained in:
@@ -15,17 +15,23 @@ import (
|
||||
"github.com/oarkflow/mq/storage/memory"
|
||||
)
|
||||
|
||||
type Edge struct {
|
||||
From *Node
|
||||
To []*Node
|
||||
}
|
||||
|
||||
type DAG struct {
|
||||
Nodes map[string]*Node
|
||||
Nodes storage.IMap[string, *Node]
|
||||
Edges map[string][]string
|
||||
ParentNodes map[string]string
|
||||
taskManager storage.IMap[string, *TaskManager]
|
||||
finalResult func(taskID string, result Result)
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG {
|
||||
return &DAG{
|
||||
Nodes: make(map[string]*Node),
|
||||
Nodes: memory.New[string, *Node](),
|
||||
Edges: make(map[string][]string),
|
||||
ParentNodes: make(map[string]string),
|
||||
taskManager: memory.New[string, *TaskManager](),
|
||||
@@ -34,7 +40,7 @@ func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG {
|
||||
}
|
||||
|
||||
func (tm *DAG) AddNode(nodeID string, handler func(payload json.RawMessage) Result) {
|
||||
tm.Nodes[nodeID] = &Node{ID: nodeID, Handler: handler}
|
||||
tm.Nodes.Set(nodeID, &Node{ID: nodeID, Handler: handler})
|
||||
}
|
||||
|
||||
func (tm *DAG) AddEdge(from string, to ...string) {
|
||||
@@ -125,7 +131,7 @@ func (tm *TaskManager) Run() {
|
||||
}
|
||||
|
||||
func (tm *TaskManager) processNode(exec taskExecution) {
|
||||
node, exists := tm.dag.Nodes[exec.nodeID]
|
||||
node, exists := tm.dag.Nodes.Get(exec.nodeID)
|
||||
if !exists {
|
||||
fmt.Printf("Node %s does not exist\n", exec.nodeID)
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user