mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-19 06:44:47 +08:00
feat: [wip] - Implement html node
This commit is contained in:
@@ -8,6 +8,8 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/exp/maps"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TaskStatus string
|
type TaskStatus string
|
||||||
@@ -35,7 +37,8 @@ type TaskState struct {
|
|||||||
Status TaskStatus
|
Status TaskStatus
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
Result Result
|
Result Result
|
||||||
targetResults []Result
|
targetResults map[string]Result
|
||||||
|
my sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type nodeResult struct {
|
type nodeResult struct {
|
||||||
@@ -99,6 +102,7 @@ func (tm *TaskManager) Trigger(taskID, startNode string, payload json.RawMessage
|
|||||||
NodeID: startNode,
|
NodeID: startNode,
|
||||||
Status: StatusPending,
|
Status: StatusPending,
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
|
targetResults: make(map[string]Result),
|
||||||
}
|
}
|
||||||
tm.mu.Unlock()
|
tm.mu.Unlock()
|
||||||
tm.taskQueue <- taskExecution{taskID: taskID, nodeID: startNode, payload: payload}
|
tm.taskQueue <- taskExecution{taskID: taskID, nodeID: startNode, payload: payload}
|
||||||
@@ -121,7 +125,7 @@ func (tm *TaskManager) processNode(exec taskExecution) {
|
|||||||
tm.mu.Lock()
|
tm.mu.Lock()
|
||||||
state := tm.TaskStates[exec.taskID][exec.nodeID]
|
state := tm.TaskStates[exec.taskID][exec.nodeID]
|
||||||
if state == nil {
|
if state == nil {
|
||||||
state = &TaskState{NodeID: exec.nodeID, Status: StatusPending, Timestamp: time.Now()}
|
state = &TaskState{NodeID: exec.nodeID, Status: StatusPending, Timestamp: time.Now(), targetResults: make(map[string]Result)}
|
||||||
tm.TaskStates[exec.taskID][exec.nodeID] = state
|
tm.TaskStates[exec.taskID][exec.nodeID] = state
|
||||||
}
|
}
|
||||||
state.Status = StatusProcessing
|
state.Status = StatusProcessing
|
||||||
@@ -159,6 +163,7 @@ func (tm *TaskManager) onNodeCompleted(nodeResult nodeResult) {
|
|||||||
NodeID: nextNodeID,
|
NodeID: nextNodeID,
|
||||||
Status: StatusPending,
|
Status: StatusPending,
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
|
targetResults: make(map[string]Result),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tm.mu.Unlock()
|
tm.mu.Unlock()
|
||||||
@@ -170,10 +175,10 @@ func (tm *TaskManager) onNodeCompleted(nodeResult nodeResult) {
|
|||||||
tm.mu.Lock()
|
tm.mu.Lock()
|
||||||
state := tm.TaskStates[nodeResult.taskID][parentNode]
|
state := tm.TaskStates[nodeResult.taskID][parentNode]
|
||||||
if state == nil {
|
if state == nil {
|
||||||
state = &TaskState{NodeID: parentNode, Status: StatusPending, Timestamp: time.Now()}
|
state = &TaskState{NodeID: parentNode, Status: StatusPending, Timestamp: time.Now(), targetResults: make(map[string]Result)}
|
||||||
tm.TaskStates[nodeResult.taskID][parentNode] = state
|
tm.TaskStates[nodeResult.taskID][parentNode] = state
|
||||||
}
|
}
|
||||||
state.targetResults = append(state.targetResults, nodeResult.result)
|
state.targetResults[nodeResult.nodeID] = nodeResult.result
|
||||||
allTargetNodesdone := len(tm.Edges[parentNode]) == len(state.targetResults)
|
allTargetNodesdone := len(tm.Edges[parentNode]) == len(state.targetResults)
|
||||||
tm.mu.Unlock()
|
tm.mu.Unlock()
|
||||||
|
|
||||||
@@ -202,15 +207,17 @@ func (tm *TaskManager) aggregateResults(parentNode string, taskID string) {
|
|||||||
state := tm.TaskStates[taskID][parentNode]
|
state := tm.TaskStates[taskID][parentNode]
|
||||||
if len(state.targetResults) > 1 {
|
if len(state.targetResults) > 1 {
|
||||||
aggregatedData := make([]any, len(state.targetResults))
|
aggregatedData := make([]any, len(state.targetResults))
|
||||||
for i, result := range state.targetResults {
|
i := 0
|
||||||
|
for _, result := range state.targetResults {
|
||||||
var data map[string]any
|
var data map[string]any
|
||||||
json.Unmarshal(result.Data, &data)
|
json.Unmarshal(result.Data, &data)
|
||||||
aggregatedData[i] = data
|
aggregatedData[i] = data
|
||||||
|
i++
|
||||||
}
|
}
|
||||||
aggregatedPayload, _ := json.Marshal(aggregatedData)
|
aggregatedPayload, _ := json.Marshal(aggregatedData)
|
||||||
state.Result = Result{Data: aggregatedPayload, Status: StatusCompleted}
|
state.Result = Result{Data: aggregatedPayload, Status: StatusCompleted}
|
||||||
} else if len(state.targetResults) == 1 {
|
} else if len(state.targetResults) == 1 {
|
||||||
state.Result = state.targetResults[0]
|
state.Result = maps.Values(state.targetResults)[0]
|
||||||
}
|
}
|
||||||
tm.processFinalResult(taskID, state)
|
tm.processFinalResult(taskID, state)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user