mirror of
https://github.com/oarkflow/mq.git
synced 2025-11-03 12:00:50 +08:00
update: dependencies
This commit is contained in:
128
dag/dag.go
128
dag/dag.go
@@ -242,8 +242,18 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
|||||||
if !ok {
|
if !ok {
|
||||||
manager = NewTaskManager(tm, task.ID, resultCh, tm.iteratorNodes.Clone())
|
manager = NewTaskManager(tm, task.ID, resultCh, tm.iteratorNodes.Clone())
|
||||||
tm.taskManager.Set(task.ID, manager)
|
tm.taskManager.Set(task.ID, manager)
|
||||||
|
tm.Logger().Info("Processing task",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID},
|
||||||
|
logger.Field{Key: "phase", Value: "start"},
|
||||||
|
logger.Field{Key: "timestamp", Value: time.Now()},
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
manager.resultCh = resultCh
|
manager.resultCh = resultCh
|
||||||
|
tm.Logger().Info("Resuming task",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID},
|
||||||
|
logger.Field{Key: "phase", Value: "resume"},
|
||||||
|
logger.Field{Key: "timestamp", Value: time.Now()},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
currentKey := tm.getCurrentNode(manager)
|
currentKey := tm.getCurrentNode(manager)
|
||||||
currentNode := strings.Split(currentKey, Delimiter)[0]
|
currentNode := strings.Split(currentKey, Delimiter)[0]
|
||||||
@@ -296,6 +306,124 @@ func (tm *DAG) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tm *DAG) ProcessTaskNew(ctx context.Context, task *mq.Task) mq.Result {
|
||||||
|
ctx = context.WithValue(ctx, "task_id", task.ID)
|
||||||
|
userContext := form.UserContext(ctx)
|
||||||
|
next := userContext.Get("next")
|
||||||
|
manager, ok := tm.taskManager.Get(task.ID)
|
||||||
|
resultCh := make(chan mq.Result, 1)
|
||||||
|
if !ok {
|
||||||
|
manager = NewTaskManager(tm, task.ID, resultCh, tm.iteratorNodes.Clone())
|
||||||
|
tm.taskManager.Set(task.ID, manager)
|
||||||
|
tm.Logger().Info("Processing task",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID},
|
||||||
|
logger.Field{Key: "phase", Value: "start"},
|
||||||
|
logger.Field{Key: "timestamp", Value: time.Now()},
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
manager.resultCh = resultCh
|
||||||
|
tm.Logger().Info("Resuming task",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID},
|
||||||
|
logger.Field{Key: "phase", Value: "resume"},
|
||||||
|
logger.Field{Key: "timestamp", Value: time.Now()},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
currentKey := tm.getCurrentNode(manager)
|
||||||
|
currentNode := strings.Split(currentKey, Delimiter)[0]
|
||||||
|
node, exists := tm.nodes.Get(currentNode)
|
||||||
|
method, _ := ctx.Value("method").(string)
|
||||||
|
if method == "GET" && exists && node.NodeType == Page {
|
||||||
|
ctx = context.WithValue(ctx, "initial_node", currentNode)
|
||||||
|
if manager.result != nil {
|
||||||
|
task.Payload = manager.result.Payload
|
||||||
|
tm.Logger().Debug("Merged previous result payload into task",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID})
|
||||||
|
}
|
||||||
|
} else if next == "true" {
|
||||||
|
nodes, err := tm.GetNextNodes(currentNode)
|
||||||
|
if err != nil {
|
||||||
|
tm.Logger().Error("Error retrieving next nodes",
|
||||||
|
logger.Field{Key: "error", Value: err})
|
||||||
|
return mq.Result{Error: err, Ctx: ctx}
|
||||||
|
}
|
||||||
|
if len(nodes) > 0 {
|
||||||
|
ctx = context.WithValue(ctx, "initial_node", nodes[0].ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if currentNodeResult, hasResult := manager.currentNodeResult.Get(currentKey); hasResult {
|
||||||
|
var taskPayload, resultPayload map[string]any
|
||||||
|
if err := json.Unmarshal(task.Payload, &taskPayload); err == nil {
|
||||||
|
if err = json.Unmarshal(currentNodeResult.Payload, &resultPayload); err == nil {
|
||||||
|
for key, val := range resultPayload {
|
||||||
|
taskPayload[key] = val
|
||||||
|
}
|
||||||
|
if newPayload, err := json.Marshal(taskPayload); err != nil {
|
||||||
|
tm.Logger().Error("Error marshalling merged payload",
|
||||||
|
logger.Field{Key: "error", Value: err})
|
||||||
|
} else {
|
||||||
|
task.Payload = newPayload
|
||||||
|
tm.Logger().Debug("Merged previous node result into task payload",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID})
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tm.Logger().Error("Error unmarshalling current node result payload",
|
||||||
|
logger.Field{Key: "error", Value: err})
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tm.Logger().Error("Error unmarshalling task payload",
|
||||||
|
logger.Field{Key: "error", Value: err})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse the initial node from context.
|
||||||
|
firstNode, err := tm.parseInitialNode(ctx)
|
||||||
|
if err != nil {
|
||||||
|
tm.Logger().Error("Error parsing initial node", logger.Field{Key: "error", Value: err})
|
||||||
|
return mq.Result{Error: err, Ctx: ctx}
|
||||||
|
}
|
||||||
|
node, ok = tm.nodes.Get(firstNode)
|
||||||
|
task.Topic = firstNode
|
||||||
|
ctx = context.WithValue(ctx, ContextIndex, "0")
|
||||||
|
|
||||||
|
// Dispatch the task to the TaskManager.
|
||||||
|
tm.Logger().Info("Dispatching task to TaskManager",
|
||||||
|
logger.Field{Key: "firstNode", Value: firstNode},
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID})
|
||||||
|
manager.ProcessTask(ctx, firstNode, task.Payload)
|
||||||
|
|
||||||
|
// Wait for task result. If there's an HTML page node, the task will pause.
|
||||||
|
var result mq.Result
|
||||||
|
if tm.hasPageNode {
|
||||||
|
if !result.Last {
|
||||||
|
tm.Logger().Info("Page node detected; pausing task until user processes HTML",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID})
|
||||||
|
}
|
||||||
|
result = <-resultCh
|
||||||
|
} else {
|
||||||
|
select {
|
||||||
|
case result = <-resultCh:
|
||||||
|
tm.Logger().Info("Received task result",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID})
|
||||||
|
case <-time.After(30 * time.Second):
|
||||||
|
result = mq.Result{
|
||||||
|
Error: fmt.Errorf("timeout waiting for task result"),
|
||||||
|
Ctx: ctx,
|
||||||
|
}
|
||||||
|
tm.Logger().Error("Task result timeout",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.Last {
|
||||||
|
tm.Logger().Info("Task completed",
|
||||||
|
logger.Field{Key: "taskID", Value: task.ID},
|
||||||
|
logger.Field{Key: "lastExecuted", Value: time.Now()},
|
||||||
|
logger.Field{Key: "success", Value: result.Error == nil},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
|
func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
|
||||||
var taskID string
|
var taskID string
|
||||||
userCtx := form.UserContext(ctx)
|
userCtx := form.UserContext(ctx)
|
||||||
|
|||||||
@@ -159,6 +159,12 @@ func (tm *TaskManager) processNode(exec *task) {
|
|||||||
tm.currentNodeResult.Clear()
|
tm.currentNodeResult.Clear()
|
||||||
tm.currentNodePayload.Set(exec.nodeID, exec.payload)
|
tm.currentNodePayload.Set(exec.nodeID, exec.payload)
|
||||||
result := node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID))
|
result := node.processor.ProcessTask(exec.ctx, mq.NewTask(exec.taskID, exec.payload, exec.nodeID))
|
||||||
|
isLast, err := tm.dag.IsLastNode(pureNodeID)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error checking if node %s is last: %v\n", pureNodeID, err)
|
||||||
|
} else if isLast {
|
||||||
|
result.Last = true
|
||||||
|
}
|
||||||
tm.currentNodeResult.Set(exec.nodeID, result)
|
tm.currentNodeResult.Set(exec.nodeID, result)
|
||||||
state.Result = result
|
state.Result = result
|
||||||
result.Topic = node.ID
|
result.Topic = node.ID
|
||||||
@@ -169,13 +175,18 @@ func (tm *TaskManager) processNode(exec *task) {
|
|||||||
tm.processFinalResult(state)
|
tm.processFinalResult(state)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if isLast {
|
||||||
|
tm.processFinalResult(state)
|
||||||
|
}
|
||||||
if node.NodeType == Page {
|
if node.NodeType == Page {
|
||||||
tm.updateTimestamps(&result)
|
tm.updateTimestamps(&result)
|
||||||
tm.result = &result
|
tm.result = &result
|
||||||
tm.resultCh <- result
|
tm.resultCh <- result
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
tm.handleNext(exec.ctx, node, state, result)
|
if !isLast {
|
||||||
|
tm.handleNext(exec.ctx, node, state, result)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *TaskManager) updateTimestamps(rs *mq.Result) {
|
func (tm *TaskManager) updateTimestamps(rs *mq.Result) {
|
||||||
|
|||||||
@@ -110,8 +110,40 @@ func (p *Result) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
|||||||
return mq.Result{Payload: task.Payload, Ctx: ctx}
|
return mq.Result{Payload: task.Payload, Ctx: ctx}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemoveHTMLContent recursively removes the "html_content" field from the given JSON.
|
||||||
|
func RemoveHTMLContent(data json.RawMessage, field string) (json.RawMessage, error) {
|
||||||
|
var result interface{}
|
||||||
|
if err := json.Unmarshal(data, &result); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
removeField(result, field)
|
||||||
|
return json.Marshal(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeField recursively traverses the structure and removes "html_content" field.
|
||||||
|
func removeField(v interface{}, field string) {
|
||||||
|
switch v := v.(type) {
|
||||||
|
case map[string]interface{}:
|
||||||
|
// Check if the field is in the map and remove it.
|
||||||
|
delete(v, field)
|
||||||
|
// Recursively remove the field from nested objects.
|
||||||
|
for _, value := range v {
|
||||||
|
removeField(value, field)
|
||||||
|
}
|
||||||
|
case []interface{}:
|
||||||
|
// If it's an array, recursively process each item.
|
||||||
|
for _, item := range v {
|
||||||
|
removeField(item, field)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func notify(taskID string, result mq.Result) {
|
func notify(taskID string, result mq.Result) {
|
||||||
fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
|
filteredData, err := RemoveHTMLContent(result.Payload, "html_content")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
fmt.Printf("Final result for task %s: %s\n", taskID, string(filteredData))
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ type Result struct {
|
|||||||
ConditionStatus string `json:"condition_status"`
|
ConditionStatus string `json:"condition_status"`
|
||||||
Ctx context.Context `json:"-"`
|
Ctx context.Context `json:"-"`
|
||||||
Payload json.RawMessage `json:"payload"`
|
Payload json.RawMessage `json:"payload"`
|
||||||
|
Last bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r Result) MarshalJSON() ([]byte, error) {
|
func (r Result) MarshalJSON() ([]byte, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user