mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-05 16:06:55 +08:00
feat: add task completion
This commit is contained in:
@@ -21,7 +21,7 @@ type Edge struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type DAG struct {
|
type DAG struct {
|
||||||
Nodes storage.IMap[string, *Node]
|
nodes storage.IMap[string, *Node]
|
||||||
Edges map[string][]string
|
Edges map[string][]string
|
||||||
ParentNodes map[string]string
|
ParentNodes map[string]string
|
||||||
taskManager storage.IMap[string, *TaskManager]
|
taskManager storage.IMap[string, *TaskManager]
|
||||||
@@ -31,7 +31,7 @@ type DAG struct {
|
|||||||
|
|
||||||
func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG {
|
func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG {
|
||||||
return &DAG{
|
return &DAG{
|
||||||
Nodes: memory.New[string, *Node](),
|
nodes: memory.New[string, *Node](),
|
||||||
Edges: make(map[string][]string),
|
Edges: make(map[string][]string),
|
||||||
ParentNodes: make(map[string]string),
|
ParentNodes: make(map[string]string),
|
||||||
taskManager: memory.New[string, *TaskManager](),
|
taskManager: memory.New[string, *TaskManager](),
|
||||||
@@ -40,12 +40,12 @@ func NewDAG(finalResultCallback func(taskID string, result Result)) *DAG {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tm *DAG) AddNode(nodeID string, handler func(payload json.RawMessage) Result) {
|
func (tm *DAG) AddNode(nodeID string, handler func(payload json.RawMessage) Result) {
|
||||||
tm.Nodes.Set(nodeID, &Node{ID: nodeID, Handler: handler})
|
tm.nodes.Set(nodeID, &Node{ID: nodeID, Handler: handler})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *DAG) AddEdge(from string, to ...string) {
|
func (tm *DAG) AddEdge(from string, targets ...string) {
|
||||||
tm.Edges[from] = append(tm.Edges[from], to...)
|
tm.Edges[from] = append(tm.Edges[from], targets...)
|
||||||
for _, targetNode := range to {
|
for _, targetNode := range targets {
|
||||||
tm.ParentNodes[targetNode] = from
|
tm.ParentNodes[targetNode] = from
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -131,7 +131,7 @@ func (tm *TaskManager) Run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tm *TaskManager) processNode(exec taskExecution) {
|
func (tm *TaskManager) processNode(exec taskExecution) {
|
||||||
node, exists := tm.dag.Nodes.Get(exec.nodeID)
|
node, exists := tm.dag.nodes.Get(exec.nodeID)
|
||||||
if !exists {
|
if !exists {
|
||||||
fmt.Printf("Node %s does not exist\n", exec.nodeID)
|
fmt.Printf("Node %s does not exist\n", exec.nodeID)
|
||||||
return
|
return
|
||||||
|
Reference in New Issue
Block a user