mirror of
https://github.com/oarkflow/mq.git
synced 2025-10-07 17:00:57 +08:00
update: dependencies
This commit is contained in:
@@ -211,7 +211,6 @@ func (tm *DAG) AddNode(nodeType NodeType, name, nodeID string, handler mq.Proces
|
|||||||
n.isReady = true
|
n.isReady = true
|
||||||
}
|
}
|
||||||
tm.nodes.Set(nodeID, n)
|
tm.nodes.Set(nodeID, n)
|
||||||
tm.nodes.Set(nodeID, &Node{ID: nodeID, processor: handler, NodeType: nodeType})
|
|
||||||
if len(startNode) > 0 && startNode[0] {
|
if len(startNode) > 0 && startNode[0] {
|
||||||
tm.startNode = nodeID
|
tm.startNode = nodeID
|
||||||
}
|
}
|
||||||
@@ -245,7 +244,7 @@ func (tm *DAG) IsReady() bool {
|
|||||||
return isReady
|
return isReady
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tm *DAG) AddEdge(edgeType EdgeType, from string, targets ...string) *DAG {
|
func (tm *DAG) AddEdge(edgeType EdgeType, label, from string, targets ...string) *DAG {
|
||||||
if tm.Error != nil {
|
if tm.Error != nil {
|
||||||
return tm
|
return tm
|
||||||
}
|
}
|
||||||
@@ -259,7 +258,7 @@ func (tm *DAG) AddEdge(edgeType EdgeType, from string, targets ...string) *DAG {
|
|||||||
}
|
}
|
||||||
for _, target := range targets {
|
for _, target := range targets {
|
||||||
if targetNode, ok := tm.nodes.Get(target); ok {
|
if targetNode, ok := tm.nodes.Get(target); ok {
|
||||||
edge := Edge{From: node, To: targetNode, Type: edgeType}
|
edge := Edge{From: node, To: targetNode, Type: edgeType, Label: label}
|
||||||
node.Edges = append(node.Edges, edge)
|
node.Edges = append(node.Edges, edge)
|
||||||
if edgeType != Iterator {
|
if edgeType != Iterator {
|
||||||
if edges, ok := tm.iteratorNodes.Get(node.ID); ok {
|
if edges, ok := tm.iteratorNodes.Get(node.ID); ok {
|
||||||
|
@@ -229,7 +229,9 @@ func (tm *DAG) ExportDOT() string {
|
|||||||
func (tm *DAG) TopologicalSort() (stack []string) {
|
func (tm *DAG) TopologicalSort() (stack []string) {
|
||||||
visited := make(map[string]bool)
|
visited := make(map[string]bool)
|
||||||
tm.nodes.ForEach(func(_ string, node *Node) bool {
|
tm.nodes.ForEach(func(_ string, node *Node) bool {
|
||||||
|
if !visited[node.ID] {
|
||||||
tm.topologicalSortUtil(node.ID, visited, &stack)
|
tm.topologicalSortUtil(node.ID, visited, &stack)
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
for i, j := 0, len(stack)-1; i < j; i, j = i+1, j-1 {
|
for i, j := 0, len(stack)-1; i < j; i, j = i+1, j-1 {
|
||||||
@@ -240,7 +242,10 @@ func (tm *DAG) TopologicalSort() (stack []string) {
|
|||||||
|
|
||||||
func (tm *DAG) topologicalSortUtil(v string, visited map[string]bool, stack *[]string) {
|
func (tm *DAG) topologicalSortUtil(v string, visited map[string]bool, stack *[]string) {
|
||||||
visited[v] = true
|
visited[v] = true
|
||||||
node, _ := tm.nodes.Get(v)
|
node, ok := tm.nodes.Get(v)
|
||||||
|
if !ok {
|
||||||
|
fmt.Println("Not found", v)
|
||||||
|
}
|
||||||
for _, edge := range node.Edges {
|
for _, edge := range node.Edges {
|
||||||
if !visited[edge.To.ID] {
|
if !visited[edge.To.ID] {
|
||||||
tm.topologicalSortUtil(edge.To.ID, visited, stack)
|
tm.topologicalSortUtil(edge.To.ID, visited, stack)
|
||||||
|
@@ -121,10 +121,10 @@ func main() {
|
|||||||
flow.AddNode(v2.Function, "NodeB", "NodeB", &NodeB{})
|
flow.AddNode(v2.Function, "NodeB", "NodeB", &NodeB{})
|
||||||
flow.AddNode(v2.Function, "NodeC", "NodeC", &NodeC{})
|
flow.AddNode(v2.Function, "NodeC", "NodeC", &NodeC{})
|
||||||
flow.AddNode(v2.Page, "Result", "Result", &Result{})
|
flow.AddNode(v2.Page, "Result", "Result", &Result{})
|
||||||
flow.AddEdge(v2.Simple, "Form", "NodeA")
|
flow.AddEdge(v2.Simple, "Form", "Form", "NodeA")
|
||||||
flow.AddEdge(v2.Simple, "NodeA", "NodeB")
|
flow.AddEdge(v2.Simple, "NodeA", "NodeA", "NodeB")
|
||||||
flow.AddEdge(v2.Simple, "NodeB", "NodeC")
|
flow.AddEdge(v2.Simple, "NodeB", "NodeB", "NodeC")
|
||||||
flow.AddEdge(v2.Simple, "NodeC", "Result")
|
flow.AddEdge(v2.Simple, "NodeC", "NodeC", "Result")
|
||||||
if flow.Error != nil {
|
if flow.Error != nil {
|
||||||
panic(flow.Error)
|
panic(flow.Error)
|
||||||
}
|
}
|
||||||
|
@@ -18,10 +18,10 @@ func main() {
|
|||||||
flow.AddNode(v2.Function, "ValidateGender", "ValidateGender", &ValidateGender{})
|
flow.AddNode(v2.Function, "ValidateGender", "ValidateGender", &ValidateGender{})
|
||||||
flow.AddNode(v2.Function, "Final", "Final", &Final{})
|
flow.AddNode(v2.Function, "Final", "Final", &Final{})
|
||||||
|
|
||||||
flow.AddEdge(v2.Simple, "GetData", "Loop")
|
flow.AddEdge(v2.Simple, "GetData", "GetData", "Loop")
|
||||||
flow.AddEdge(v2.Iterator, "Loop", "ValidateAge")
|
flow.AddEdge(v2.Iterator, "Validate age for each item", "Loop", "ValidateAge")
|
||||||
flow.AddCondition("ValidateAge", map[string]string{"pass": "ValidateGender"})
|
flow.AddCondition("ValidateAge", map[string]string{"pass": "ValidateGender"})
|
||||||
flow.AddEdge(v2.Simple, "Loop", "Final")
|
flow.AddEdge(v2.Simple, "Mark as Done", "Loop", "Final")
|
||||||
|
|
||||||
// flow.Start(":8080")
|
// flow.Start(":8080")
|
||||||
data := []byte(`[{"age": "15", "gender": "female"}, {"age": "18", "gender": "male"}]`)
|
data := []byte(`[{"age": "15", "gender": "female"}, {"age": "18", "gender": "male"}]`)
|
||||||
@@ -29,7 +29,7 @@ func main() {
|
|||||||
panic(flow.Error)
|
panic(flow.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println(flow.ClassifyEdges())
|
fmt.Println(flow.ExportDOT())
|
||||||
rs := flow.Process(context.Background(), data)
|
rs := flow.Process(context.Background(), data)
|
||||||
if rs.Error != nil {
|
if rs.Error != nil {
|
||||||
panic(rs.Error)
|
panic(rs.Error)
|
||||||
|
Reference in New Issue
Block a user