mirror of
https://github.com/oarkflow/mq.git
synced 2025-09-26 20:11:16 +08:00
Compare commits
6 Commits
a5523fe030
...
b82cd20eef
Author | SHA1 | Date | |
---|---|---|---|
![]() |
b82cd20eef | ||
![]() |
e4344bc96e | ||
![]() |
86185d5499 | ||
![]() |
297b5c0900 | ||
![]() |
55db1dc4b5 | ||
![]() |
b7ca2a8aeb |
40
dag/dag.go
40
dag/dag.go
@@ -1180,3 +1180,43 @@ func (tm *DAG) StopEnhanced(ctx context.Context) error {
|
||||
// Stop underlying components
|
||||
return tm.Stop(ctx)
|
||||
}
|
||||
|
||||
// GetPreviousPageNode returns the last page node that was executed before the current node
|
||||
func (tm *DAG) GetPreviousPageNode(nodeID string) (*Node, error) {
|
||||
currentNode := strings.Split(nodeID, Delimiter)[0]
|
||||
// Check if current node exists
|
||||
_, exists := tm.nodes.Get(currentNode)
|
||||
if !exists {
|
||||
fmt.Println(tm.nodes.Keys())
|
||||
return nil, fmt.Errorf("current node %s not found", currentNode)
|
||||
}
|
||||
|
||||
// Get topological order to determine execution sequence
|
||||
topologicalOrder, err := tm.GetTopologicalOrder()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get topological order: %w", err)
|
||||
}
|
||||
|
||||
// Find the index of the current node in topological order
|
||||
currentIndex := -1
|
||||
for i, nodeIDInOrder := range topologicalOrder {
|
||||
if nodeIDInOrder == currentNode {
|
||||
currentIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if currentIndex == -1 {
|
||||
return nil, fmt.Errorf("current node %s not found in topological order", currentNode)
|
||||
}
|
||||
|
||||
// Iterate backwards from current node to find the last page node
|
||||
for i := currentIndex - 1; i >= 0; i-- {
|
||||
nodeIDInOrder := topologicalOrder[i]
|
||||
if node, ok := tm.nodes.Get(nodeIDInOrder); ok && node.NodeType == Page {
|
||||
return node, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no previous page node found")
|
||||
}
|
||||
|
@@ -311,7 +311,8 @@ func (tm *TaskManager) areDependenciesMet(nodeID string) bool {
|
||||
logger.Field{Key: "nodeID", Value: nodeID},
|
||||
logger.Field{Key: "dependency", Value: prevNode.ID},
|
||||
logger.Field{Key: "stateExists", Value: exists},
|
||||
logger.Field{Key: "stateStatus", Value: string(state.Status)})
|
||||
logger.Field{Key: "stateStatus", Value: string(state.Status)},
|
||||
logger.Field{Key: "taskID", Value: tm.taskID})
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -706,6 +707,13 @@ func (tm *TaskManager) onNodeCompleted(nr nodeResult) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Handle ResetTo functionality
|
||||
if nr.result.ResetTo != "" {
|
||||
tm.handleResetTo(nr)
|
||||
return
|
||||
}
|
||||
|
||||
if nr.result.Error != nil || nr.status == mq.Failed {
|
||||
if state, exists := tm.taskStates.Get(nr.nodeID); exists {
|
||||
tm.processFinalResult(state)
|
||||
@@ -1086,3 +1094,393 @@ func (tm *TaskManager) updateTaskPosition(ctx context.Context, taskID, currentNo
|
||||
// Save the updated task
|
||||
return tm.storage.SaveTask(ctx, task)
|
||||
}
|
||||
|
||||
// handleResetTo handles the ResetTo functionality for resetting a task to a specific node
|
||||
func (tm *TaskManager) handleResetTo(nr nodeResult) {
|
||||
resetTo := nr.result.ResetTo
|
||||
nodeID := strings.Split(nr.nodeID, Delimiter)[0]
|
||||
|
||||
var targetNodeID string
|
||||
var err error
|
||||
|
||||
if resetTo == "back" {
|
||||
// Use GetPreviousPageNode to find the previous page node
|
||||
var prevNode *Node
|
||||
prevNode, err = tm.dag.GetPreviousPageNode(nodeID)
|
||||
if err != nil {
|
||||
tm.dag.Logger().Error("Failed to get previous page node",
|
||||
logger.Field{Key: "currentNodeID", Value: nodeID},
|
||||
logger.Field{Key: "error", Value: err.Error()})
|
||||
// Send error result
|
||||
tm.resultCh <- mq.Result{
|
||||
Error: fmt.Errorf("failed to reset to previous page node: %w", err),
|
||||
Ctx: nr.ctx,
|
||||
TaskID: nr.result.TaskID,
|
||||
Topic: nr.result.Topic,
|
||||
Status: mq.Failed,
|
||||
Payload: nr.result.Payload,
|
||||
}
|
||||
return
|
||||
}
|
||||
if prevNode == nil {
|
||||
tm.dag.Logger().Error("No previous page node found",
|
||||
logger.Field{Key: "currentNodeID", Value: nodeID})
|
||||
// Send error result
|
||||
tm.resultCh <- mq.Result{
|
||||
Error: fmt.Errorf("no previous page node found"),
|
||||
Ctx: nr.ctx,
|
||||
TaskID: nr.result.TaskID,
|
||||
Topic: nr.result.Topic,
|
||||
Status: mq.Failed,
|
||||
Payload: nr.result.Payload,
|
||||
}
|
||||
return
|
||||
}
|
||||
targetNodeID = prevNode.ID
|
||||
} else {
|
||||
// Use the specified node ID
|
||||
targetNodeID = resetTo
|
||||
// Validate that the target node exists
|
||||
if _, exists := tm.dag.nodes.Get(targetNodeID); !exists {
|
||||
tm.dag.Logger().Error("Reset target node does not exist",
|
||||
logger.Field{Key: "targetNodeID", Value: targetNodeID})
|
||||
// Send error result
|
||||
tm.resultCh <- mq.Result{
|
||||
Error: fmt.Errorf("reset target node %s does not exist", targetNodeID),
|
||||
Ctx: nr.ctx,
|
||||
TaskID: nr.result.TaskID,
|
||||
Topic: nr.result.Topic,
|
||||
Status: mq.Failed,
|
||||
Payload: nr.result.Payload,
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Info("Resetting task to node",
|
||||
logger.Field{Key: "taskID", Value: nr.result.TaskID},
|
||||
logger.Field{Key: "fromNode", Value: nodeID},
|
||||
logger.Field{Key: "toNode", Value: targetNodeID},
|
||||
logger.Field{Key: "resetTo", Value: resetTo})
|
||||
}
|
||||
|
||||
// Clear task states of all nodes between current node and target node
|
||||
// This ensures that when we reset, the workflow can proceed correctly
|
||||
tm.clearTaskStatesInPath(nodeID, targetNodeID)
|
||||
|
||||
// Also clear any deferred tasks for the target node itself
|
||||
tm.deferredTasks.ForEach(func(taskID string, tsk *task) bool {
|
||||
if strings.Split(tsk.nodeID, Delimiter)[0] == targetNodeID {
|
||||
tm.deferredTasks.Del(taskID)
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Cleared deferred task for target node",
|
||||
logger.Field{Key: "nodeID", Value: targetNodeID},
|
||||
logger.Field{Key: "taskID", Value: taskID})
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// Handle dependencies of the target node - if they exist and are not completed,
|
||||
// we need to mark them as completed to allow the workflow to proceed
|
||||
tm.handleTargetNodeDependencies(targetNodeID, nr)
|
||||
|
||||
// Get previously received data for the target node
|
||||
var previousPayload json.RawMessage
|
||||
if prevResult, hasResult := tm.currentNodeResult.Get(targetNodeID); hasResult {
|
||||
previousPayload = prevResult.Payload
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Info("Using previous payload for reset",
|
||||
logger.Field{Key: "targetNodeID", Value: targetNodeID},
|
||||
logger.Field{Key: "payloadSize", Value: len(previousPayload)})
|
||||
}
|
||||
} else {
|
||||
// If no previous data, use the current result's payload
|
||||
previousPayload = nr.result.Payload
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Info("No previous payload found, using current payload",
|
||||
logger.Field{Key: "targetNodeID", Value: targetNodeID})
|
||||
}
|
||||
}
|
||||
|
||||
// Reset task state for the target node
|
||||
if state, exists := tm.taskStates.Get(targetNodeID); exists {
|
||||
state.Status = mq.Completed // Mark as completed to satisfy dependencies
|
||||
state.UpdatedAt = time.Now()
|
||||
state.Result = mq.Result{
|
||||
Status: mq.Completed,
|
||||
Ctx: nr.ctx,
|
||||
}
|
||||
} else {
|
||||
// Create new state if it doesn't exist and mark as completed
|
||||
newState := newTaskState(targetNodeID)
|
||||
newState.Status = mq.Completed
|
||||
newState.Result = mq.Result{
|
||||
Status: mq.Completed,
|
||||
Ctx: nr.ctx,
|
||||
}
|
||||
tm.taskStates.Set(targetNodeID, newState)
|
||||
}
|
||||
|
||||
// Update current node result with the reset result (clear ResetTo to avoid loops)
|
||||
resetResult := mq.Result{
|
||||
TaskID: nr.result.TaskID,
|
||||
Topic: targetNodeID,
|
||||
Status: mq.Completed, // Mark as completed
|
||||
Payload: previousPayload,
|
||||
Ctx: nr.ctx,
|
||||
// ResetTo is intentionally not set to avoid infinite loops
|
||||
}
|
||||
tm.currentNodeResult.Set(targetNodeID, resetResult)
|
||||
|
||||
// Re-enqueue the task for the target node
|
||||
tm.enqueueTask(nr.ctx, targetNodeID, nr.result.TaskID, previousPayload)
|
||||
|
||||
// Log the reset activity
|
||||
tm.logActivity(nr.ctx, nr.result.TaskID, targetNodeID, "task_reset",
|
||||
fmt.Sprintf("Task reset from %s to %s", nodeID, targetNodeID), nil)
|
||||
}
|
||||
|
||||
// clearTaskStatesInPath clears all task states in the path from current node to target node
|
||||
// This is necessary when resetting to ensure the workflow can proceed without dependency issues
|
||||
func (tm *TaskManager) clearTaskStatesInPath(currentNodeID, targetNodeID string) {
|
||||
// Get all nodes in the path from current to target
|
||||
pathNodes := tm.getNodesInPath(currentNodeID, targetNodeID)
|
||||
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Info("Clearing task states in path",
|
||||
logger.Field{Key: "fromNode", Value: currentNodeID},
|
||||
logger.Field{Key: "toNode", Value: targetNodeID},
|
||||
logger.Field{Key: "pathNodeCount", Value: len(pathNodes)})
|
||||
}
|
||||
|
||||
// Also clear the current node itself (ValidateInput in the example)
|
||||
if state, exists := tm.taskStates.Get(currentNodeID); exists {
|
||||
state.Status = mq.Pending
|
||||
state.UpdatedAt = time.Now()
|
||||
state.Result = mq.Result{} // Clear previous result
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Cleared task state for current node",
|
||||
logger.Field{Key: "nodeID", Value: currentNodeID})
|
||||
}
|
||||
}
|
||||
// Also clear any cached results for the current node
|
||||
tm.currentNodeResult.Del(currentNodeID)
|
||||
// Clear any deferred tasks for the current node
|
||||
tm.deferredTasks.ForEach(func(taskID string, tsk *task) bool {
|
||||
if strings.Split(tsk.nodeID, Delimiter)[0] == currentNodeID {
|
||||
tm.deferredTasks.Del(taskID)
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Cleared deferred task for current node",
|
||||
logger.Field{Key: "nodeID", Value: currentNodeID},
|
||||
logger.Field{Key: "taskID", Value: taskID})
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// Clear task states for all nodes in the path
|
||||
for _, pathNodeID := range pathNodes {
|
||||
if state, exists := tm.taskStates.Get(pathNodeID); exists {
|
||||
state.Status = mq.Pending
|
||||
state.UpdatedAt = time.Now()
|
||||
state.Result = mq.Result{} // Clear previous result
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Cleared task state for path node",
|
||||
logger.Field{Key: "nodeID", Value: pathNodeID})
|
||||
}
|
||||
}
|
||||
// Also clear any cached results for this node
|
||||
tm.currentNodeResult.Del(pathNodeID)
|
||||
// Clear any deferred tasks for this node
|
||||
tm.deferredTasks.ForEach(func(taskID string, tsk *task) bool {
|
||||
if strings.Split(tsk.nodeID, Delimiter)[0] == pathNodeID {
|
||||
tm.deferredTasks.Del(taskID)
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Cleared deferred task for path node",
|
||||
logger.Field{Key: "nodeID", Value: pathNodeID},
|
||||
logger.Field{Key: "taskID", Value: taskID})
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// getNodesInPath returns all nodes in the path from start node to end node
|
||||
func (tm *TaskManager) getNodesInPath(startNodeID, endNodeID string) []string {
|
||||
visited := make(map[string]bool)
|
||||
var result []string
|
||||
|
||||
// Use BFS to find the path from start to end
|
||||
queue := []string{startNodeID}
|
||||
visited[startNodeID] = true
|
||||
parent := make(map[string]string)
|
||||
|
||||
found := false
|
||||
for len(queue) > 0 && !found {
|
||||
currentNodeID := queue[0]
|
||||
queue = queue[1:]
|
||||
|
||||
// Get all nodes that this node points to
|
||||
if node, exists := tm.dag.nodes.Get(currentNodeID); exists {
|
||||
for _, edge := range node.Edges {
|
||||
if edge.Type == Simple || edge.Type == Iterator {
|
||||
targetNodeID := edge.To.ID
|
||||
if !visited[targetNodeID] {
|
||||
visited[targetNodeID] = true
|
||||
parent[targetNodeID] = currentNodeID
|
||||
queue = append(queue, targetNodeID)
|
||||
|
||||
if targetNodeID == endNodeID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we found the end node, reconstruct the path
|
||||
if found {
|
||||
current := endNodeID
|
||||
for current != startNodeID {
|
||||
result = append([]string{current}, result...)
|
||||
if parentNode, exists := parent[current]; exists {
|
||||
current = parentNode
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
result = append([]string{startNodeID}, result...)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// getAllDownstreamNodes returns all nodes that come after the given node in the workflow
|
||||
func (tm *TaskManager) getAllDownstreamNodes(nodeID string) []string {
|
||||
visited := make(map[string]bool)
|
||||
var result []string
|
||||
|
||||
// Use BFS to find all downstream nodes
|
||||
queue := []string{nodeID}
|
||||
visited[nodeID] = true
|
||||
|
||||
for len(queue) > 0 {
|
||||
currentNodeID := queue[0]
|
||||
queue = queue[1:]
|
||||
|
||||
// Get all nodes that this node points to
|
||||
if node, exists := tm.dag.nodes.Get(currentNodeID); exists {
|
||||
for _, edge := range node.Edges {
|
||||
if edge.Type == Simple || edge.Type == Iterator {
|
||||
targetNodeID := edge.To.ID
|
||||
if !visited[targetNodeID] {
|
||||
visited[targetNodeID] = true
|
||||
result = append(result, targetNodeID)
|
||||
queue = append(queue, targetNodeID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// handleTargetNodeDependencies handles the dependencies of the target node during reset
|
||||
// If the target node has unmet dependencies, we mark them as completed to allow the workflow to proceed
|
||||
func (tm *TaskManager) handleTargetNodeDependencies(targetNodeID string, nr nodeResult) {
|
||||
// Get the dependencies of the target node
|
||||
prevNodes, err := tm.dag.GetPreviousNodes(targetNodeID)
|
||||
if err != nil {
|
||||
tm.dag.Logger().Error("Error getting previous nodes for target",
|
||||
logger.Field{Key: "targetNodeID", Value: targetNodeID},
|
||||
logger.Field{Key: "error", Value: err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Info("Checking dependencies for target node",
|
||||
logger.Field{Key: "targetNodeID", Value: targetNodeID},
|
||||
logger.Field{Key: "dependencyCount", Value: len(prevNodes)})
|
||||
}
|
||||
|
||||
// Check each dependency and ensure it's marked as completed for reset
|
||||
for _, prevNode := range prevNodes {
|
||||
// Check both the pure node ID and the indexed node ID for state
|
||||
state, exists := tm.taskStates.Get(prevNode.ID)
|
||||
if !exists {
|
||||
// Also check if there's a state with an index suffix
|
||||
tm.taskStates.ForEach(func(key string, s *TaskState) bool {
|
||||
if strings.Split(key, Delimiter)[0] == prevNode.ID {
|
||||
state = s
|
||||
exists = true
|
||||
return false // Stop iteration
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
if !exists {
|
||||
// Create new state and mark as completed for reset
|
||||
newState := newTaskState(prevNode.ID)
|
||||
newState.Status = mq.Completed
|
||||
newState.UpdatedAt = time.Now()
|
||||
newState.Result = mq.Result{
|
||||
Status: mq.Completed,
|
||||
Ctx: nr.ctx,
|
||||
}
|
||||
tm.taskStates.Set(prevNode.ID, newState)
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Created completed state for dependency node during reset",
|
||||
logger.Field{Key: "dependencyNodeID", Value: prevNode.ID})
|
||||
}
|
||||
} else if state.Status != mq.Completed {
|
||||
// Mark existing state as completed for reset
|
||||
state.Status = mq.Completed
|
||||
state.UpdatedAt = time.Now()
|
||||
if state.Result.Status == "" {
|
||||
state.Result = mq.Result{
|
||||
Status: mq.Completed,
|
||||
Ctx: nr.ctx,
|
||||
}
|
||||
}
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Marked dependency node as completed during reset",
|
||||
logger.Field{Key: "dependencyNodeID", Value: prevNode.ID},
|
||||
logger.Field{Key: "previousStatus", Value: string(state.Status)})
|
||||
}
|
||||
} else {
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Dependency already satisfied",
|
||||
logger.Field{Key: "dependencyNodeID", Value: prevNode.ID},
|
||||
logger.Field{Key: "status", Value: string(state.Status)})
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure cached result exists for this dependency
|
||||
if _, hasResult := tm.currentNodeResult.Get(prevNode.ID); !hasResult {
|
||||
tm.currentNodeResult.Set(prevNode.ID, mq.Result{
|
||||
Status: mq.Completed,
|
||||
Ctx: nr.ctx,
|
||||
})
|
||||
}
|
||||
|
||||
// Clear any deferred tasks for this dependency since it's now satisfied
|
||||
tm.deferredTasks.ForEach(func(taskID string, tsk *task) bool {
|
||||
if strings.Split(tsk.nodeID, Delimiter)[0] == prevNode.ID {
|
||||
tm.deferredTasks.Del(taskID)
|
||||
if tm.dag.debug {
|
||||
tm.dag.Logger().Debug("Cleared deferred task for satisfied dependency",
|
||||
logger.Field{Key: "dependencyNodeID", Value: prevNode.ID},
|
||||
logger.Field{Key: "taskID", Value: taskID})
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@@ -5,10 +5,13 @@ import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/oarkflow/json"
|
||||
|
||||
"github.com/oarkflow/jet"
|
||||
"github.com/oarkflow/mq"
|
||||
"github.com/oarkflow/mq/consts"
|
||||
"github.com/oarkflow/mq/dag"
|
||||
)
|
||||
|
||||
@@ -28,75 +31,52 @@ func loginSubDAG() *dag.DAG {
|
||||
return login
|
||||
}
|
||||
|
||||
// phoneProcessingSubDAG creates a sub-DAG for processing phone numbers
|
||||
func phoneProcessingSubDAG() *dag.DAG {
|
||||
phone := dag.NewDAG("Phone Processing Sub DAG", "phone-processing-sub-dag", func(taskID string, result mq.Result) {
|
||||
fmt.Printf("Phone Processing Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload))
|
||||
}, mq.WithSyncMode(true))
|
||||
|
||||
phone.
|
||||
AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{}).
|
||||
AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{}).
|
||||
AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{}).
|
||||
AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{}).
|
||||
AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{}).
|
||||
AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{}).
|
||||
AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop").
|
||||
AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone").
|
||||
AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"}).
|
||||
AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid").
|
||||
AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid").
|
||||
AddEdge(dag.Simple, "Loop to Collect", "phone-loop", "collect-valid")
|
||||
|
||||
return phone
|
||||
}
|
||||
|
||||
func main() {
|
||||
flow := dag.NewDAG("Complex Phone Processing DAG with Pages", "complex-phone-dag", func(taskID string, result mq.Result) {
|
||||
fmt.Printf("Complex DAG Final result for task %s: %s\n", taskID, string(result.Payload))
|
||||
})
|
||||
flow.ConfigureMemoryStorage()
|
||||
|
||||
// Main nodes
|
||||
flow.AddNode(dag.Function, "Initialize", "init", &Initialize{}, true)
|
||||
flow.AddDAGNode(dag.Function, "Login Process", "login", loginSubDAG())
|
||||
flow.AddNode(dag.Function, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{})
|
||||
flow.AddDAGNode(dag.Function, "Process Phones", "process-phones", phoneProcessingSubDAG())
|
||||
// Main nodes - Login process as individual nodes (not sub-DAG) for proper page serving
|
||||
flow.AddNode(dag.Page, "Initialize", "init", &Initialize{}, true)
|
||||
flow.AddNode(dag.Page, "Login Page", "login-page", &LoginPage{})
|
||||
flow.AddNode(dag.Function, "Verify Credentials", "verify-credentials", &VerifyCredentials{})
|
||||
flow.AddNode(dag.Function, "Generate Token", "generate-token", &GenerateToken{})
|
||||
flow.AddNode(dag.Page, "Upload Phone Data", "upload-page", &UploadPhoneDataPage{})
|
||||
flow.AddNode(dag.Function, "Parse Phone Numbers", "parse-phones", &ParsePhoneNumbers{})
|
||||
flow.AddNode(dag.Function, "Phone Loop", "phone-loop", &PhoneLoop{})
|
||||
flow.AddNode(dag.Function, "Validate Phone", "validate-phone", &ValidatePhone{})
|
||||
flow.AddNode(dag.Function, "Send Welcome SMS", "send-welcome", &SendWelcomeSMS{})
|
||||
flow.AddNode(dag.Function, "Collect Valid Phones", "collect-valid", &CollectValidPhones{})
|
||||
flow.AddNode(dag.Function, "Collect Invalid Phones", "collect-invalid", &CollectInvalidPhones{})
|
||||
flow.AddNode(dag.Function, "Generate Report", "generate-report", &GenerateReport{})
|
||||
flow.AddNode(dag.Function, "Send Summary Email", "send-summary", &SendSummaryEmail{})
|
||||
flow.AddNode(dag.Function, "Final Cleanup", "cleanup", &FinalCleanup{})
|
||||
|
||||
// Edges
|
||||
flow.AddEdge(dag.Simple, "Init to Login", "init", "login")
|
||||
flow.AddEdge(dag.Simple, "Login to Upload", "login", "upload-page")
|
||||
flow.AddEdge(dag.Simple, "Upload to Process", "upload-page", "process-phones")
|
||||
flow.AddEdge(dag.Simple, "Process to Report", "process-phones", "generate-report")
|
||||
// Edges - Connect login flow individually
|
||||
flow.AddEdge(dag.Simple, "Init to Login", "init", "login-page")
|
||||
flow.AddEdge(dag.Simple, "Login to Verify", "login-page", "verify-credentials")
|
||||
flow.AddEdge(dag.Simple, "Verify to Token", "verify-credentials", "generate-token")
|
||||
flow.AddEdge(dag.Simple, "Token to Upload", "generate-token", "upload-page")
|
||||
flow.AddEdge(dag.Simple, "Upload to Parse", "upload-page", "parse-phones")
|
||||
flow.AddEdge(dag.Simple, "Parse to Loop", "parse-phones", "phone-loop")
|
||||
flow.AddEdge(dag.Iterator, "Loop over phones", "phone-loop", "validate-phone")
|
||||
flow.AddCondition("validate-phone", map[string]string{"valid": "send-welcome", "invalid": "collect-invalid"})
|
||||
flow.AddEdge(dag.Simple, "Welcome to Collect", "send-welcome", "collect-valid")
|
||||
flow.AddEdge(dag.Simple, "Invalid to Collect", "collect-invalid", "collect-valid")
|
||||
flow.AddEdge(dag.Simple, "Loop to Report", "phone-loop", "generate-report")
|
||||
flow.AddEdge(dag.Simple, "Report to Summary", "generate-report", "send-summary")
|
||||
flow.AddEdge(dag.Simple, "Summary to Cleanup", "send-summary", "cleanup")
|
||||
|
||||
// Sample data for testing
|
||||
data := map[string]interface{}{
|
||||
"user_id": "user123",
|
||||
"session_data": map[string]interface{}{
|
||||
"authenticated": false,
|
||||
},
|
||||
"phone_data": map[string]interface{}{
|
||||
"format": "csv",
|
||||
"content": "name,phone\nJohn Doe,+1234567890\nJane Smith,+1987654321\nBob Johnson,invalid-phone\nAlice Brown,+1555123456",
|
||||
},
|
||||
}
|
||||
// Check for DAG errors
|
||||
// if flow.Error != nil {
|
||||
// fmt.Printf("DAG Error: %v\n", flow.Error)
|
||||
// panic(flow.Error)
|
||||
// }
|
||||
|
||||
jsonData, _ := json.Marshal(data)
|
||||
if flow.Error != nil {
|
||||
panic(flow.Error)
|
||||
}
|
||||
|
||||
rs := flow.Process(context.Background(), jsonData)
|
||||
if rs.Error != nil {
|
||||
panic(rs.Error)
|
||||
}
|
||||
fmt.Println("Complex Phone DAG Status:", rs.Status, "Topic:", rs.Topic)
|
||||
fmt.Println("Final Payload:", string(rs.Payload))
|
||||
fmt.Println("Starting Complex Phone Processing DAG server on http://0.0.0.0:8080")
|
||||
fmt.Println("Navigate to the URL to access the login page")
|
||||
flow.Start(context.Background(), ":8080")
|
||||
}
|
||||
|
||||
// Task implementations
|
||||
@@ -108,12 +88,151 @@ type Initialize struct {
|
||||
func (p *Initialize) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
var data map[string]interface{}
|
||||
if err := json.Unmarshal(task.Payload, &data); err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("Initialize Error: %s", err.Error()), Ctx: ctx}
|
||||
data = make(map[string]interface{})
|
||||
}
|
||||
data["initialized"] = true
|
||||
data["timestamp"] = "2025-09-19T12:00:00Z"
|
||||
updatedPayload, _ := json.Marshal(data)
|
||||
return mq.Result{Payload: updatedPayload, Ctx: ctx}
|
||||
|
||||
// Add sample phone data for testing
|
||||
sampleCSV := `name,phone
|
||||
John Doe,+1234567890
|
||||
Jane Smith,0987654321
|
||||
Bob Johnson,1555123456
|
||||
Alice Brown,invalid-phone
|
||||
Charlie Wilson,+441234567890`
|
||||
|
||||
data["phone_data"] = map[string]interface{}{
|
||||
"content": sampleCSV,
|
||||
"format": "csv",
|
||||
"source": "sample_data",
|
||||
"created_at": "2025-09-19T12:00:00Z",
|
||||
}
|
||||
|
||||
// Generate a task ID for this workflow instance
|
||||
taskID := "workflow-" + fmt.Sprintf("%d", time.Now().Unix())
|
||||
|
||||
// Since this is a page node, show a welcome page that auto-redirects to login
|
||||
htmlContent := `
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta http-equiv="refresh" content="3;url=/process">
|
||||
<title>Phone Processing System</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: Arial, sans-serif;
|
||||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||||
color: white;
|
||||
text-align: center;
|
||||
padding: 50px;
|
||||
margin: 0;
|
||||
min-height: 100vh;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
}
|
||||
.welcome {
|
||||
background: rgba(255, 255, 255, 0.1);
|
||||
padding: 40px;
|
||||
border-radius: 15px;
|
||||
backdrop-filter: blur(10px);
|
||||
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1);
|
||||
max-width: 500px;
|
||||
width: 100%;
|
||||
}
|
||||
.welcome h1 {
|
||||
margin-bottom: 20px;
|
||||
font-size: 2.5em;
|
||||
}
|
||||
.welcome p {
|
||||
margin-bottom: 30px;
|
||||
font-size: 1.1em;
|
||||
opacity: 0.9;
|
||||
}
|
||||
.features {
|
||||
margin-top: 30px;
|
||||
text-align: left;
|
||||
opacity: 0.8;
|
||||
}
|
||||
.features h3 {
|
||||
margin-bottom: 15px;
|
||||
color: #fff;
|
||||
}
|
||||
.features ul {
|
||||
list-style: none;
|
||||
padding: 0;
|
||||
}
|
||||
.features li {
|
||||
margin-bottom: 8px;
|
||||
padding-left: 20px;
|
||||
position: relative;
|
||||
}
|
||||
.features li:before {
|
||||
content: "✓";
|
||||
position: absolute;
|
||||
left: 0;
|
||||
color: #4CAF50;
|
||||
}
|
||||
.countdown {
|
||||
margin-top: 20px;
|
||||
font-size: 1.2em;
|
||||
opacity: 0.9;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="welcome">
|
||||
<h1>📱 Phone Processing System</h1>
|
||||
<p>Welcome to our advanced phone number processing workflow</p>
|
||||
|
||||
<div class="features">
|
||||
<h3>Features:</h3>
|
||||
<ul>
|
||||
<li>CSV/JSON file upload support</li>
|
||||
<li>Phone number validation and formatting</li>
|
||||
<li>Automated welcome SMS sending</li>
|
||||
<li>Invalid number filtering</li>
|
||||
<li>Comprehensive processing reports</li>
|
||||
</ul>
|
||||
</div>
|
||||
|
||||
<div class="countdown">
|
||||
<p>Initializing workflow...</p>
|
||||
<p>Task ID: ` + taskID + `</p>
|
||||
<p>Redirecting to login page in <span id="countdown">3</span> seconds...</p>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<script>
|
||||
let countdown = 3;
|
||||
const countdownElement = document.getElementById('countdown');
|
||||
const interval = setInterval(() => {
|
||||
countdown--;
|
||||
countdownElement.textContent = countdown;
|
||||
if (countdown <= 0) {
|
||||
clearInterval(interval);
|
||||
}
|
||||
}, 1000);
|
||||
</script>
|
||||
</body>
|
||||
</html>`
|
||||
|
||||
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
|
||||
rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
|
||||
if err != nil {
|
||||
return mq.Result{Error: err, Ctx: ctx}
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
|
||||
resultData := map[string]any{
|
||||
"html_content": rs,
|
||||
"step": "initialize",
|
||||
"data": data,
|
||||
}
|
||||
|
||||
resultPayload, _ := json.Marshal(resultData)
|
||||
return mq.Result{Payload: resultPayload, Ctx: ctx}
|
||||
}
|
||||
|
||||
type LoginPage struct {
|
||||
@@ -121,21 +240,177 @@ type LoginPage struct {
|
||||
}
|
||||
|
||||
func (p *LoginPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
// Check if this is a form submission
|
||||
var inputData map[string]interface{}
|
||||
if len(task.Payload) > 0 {
|
||||
if err := json.Unmarshal(task.Payload, &inputData); err == nil {
|
||||
// Check if we have form data (username/password)
|
||||
if formData, ok := inputData["form"].(map[string]interface{}); ok {
|
||||
// This is a form submission, pass it through for verification
|
||||
credentials := map[string]interface{}{
|
||||
"username": formData["username"],
|
||||
"password": formData["password"],
|
||||
}
|
||||
inputData["credentials"] = credentials
|
||||
updatedPayload, _ := json.Marshal(inputData)
|
||||
return mq.Result{Payload: updatedPayload, Ctx: ctx}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, show the form
|
||||
var data map[string]interface{}
|
||||
if err := json.Unmarshal(task.Payload, &data); err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("LoginPage Error: %s", err.Error()), Ctx: ctx}
|
||||
data = make(map[string]interface{})
|
||||
}
|
||||
|
||||
// Simulate user input from page
|
||||
data["credentials"] = map[string]interface{}{
|
||||
"username": "admin",
|
||||
"password": "password123",
|
||||
}
|
||||
data["login_attempted"] = true
|
||||
// HTML content for login page
|
||||
htmlContent := `
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>Phone Processing System - Login</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
|
||||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
min-height: 100vh;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
}
|
||||
.login-container {
|
||||
background: white;
|
||||
padding: 2rem;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 10px 25px rgba(0,0,0,0.2);
|
||||
width: 100%;
|
||||
max-width: 400px;
|
||||
}
|
||||
.login-header {
|
||||
text-align: center;
|
||||
margin-bottom: 2rem;
|
||||
}
|
||||
.login-header h1 {
|
||||
color: #333;
|
||||
margin: 0;
|
||||
font-size: 1.8rem;
|
||||
}
|
||||
.login-header p {
|
||||
color: #666;
|
||||
margin: 0.5rem 0 0 0;
|
||||
}
|
||||
.form-group {
|
||||
margin-bottom: 1.5rem;
|
||||
}
|
||||
.form-group label {
|
||||
display: block;
|
||||
margin-bottom: 0.5rem;
|
||||
color: #333;
|
||||
font-weight: 500;
|
||||
}
|
||||
.form-group input {
|
||||
width: 100%;
|
||||
padding: 0.75rem;
|
||||
border: 2px solid #e1e5e9;
|
||||
border-radius: 5px;
|
||||
font-size: 1rem;
|
||||
transition: border-color 0.3s;
|
||||
box-sizing: border-box;
|
||||
}
|
||||
.form-group input:focus {
|
||||
outline: none;
|
||||
border-color: #667eea;
|
||||
}
|
||||
.login-btn {
|
||||
width: 100%;
|
||||
padding: 0.75rem;
|
||||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||||
color: white;
|
||||
border: none;
|
||||
border-radius: 5px;
|
||||
font-size: 1rem;
|
||||
font-weight: 600;
|
||||
cursor: pointer;
|
||||
transition: transform 0.2s;
|
||||
}
|
||||
.login-btn:hover {
|
||||
transform: translateY(-2px);
|
||||
}
|
||||
.login-btn:active {
|
||||
transform: scale(0.98);
|
||||
}
|
||||
.status-message {
|
||||
margin-top: 1rem;
|
||||
padding: 0.5rem;
|
||||
border-radius: 5px;
|
||||
text-align: center;
|
||||
font-weight: 500;
|
||||
}
|
||||
.status-success {
|
||||
background-color: #d4edda;
|
||||
color: #155724;
|
||||
border: 1px solid #c3e6cb;
|
||||
}
|
||||
.status-error {
|
||||
background-color: #f8d7da;
|
||||
color: #721c24;
|
||||
border: 1px solid #f5c6cb;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="login-container">
|
||||
<div class="login-header">
|
||||
<h1>📱 Phone Processing System</h1>
|
||||
<p>Please login to continue</p>
|
||||
</div>
|
||||
<form method="post" action="/process?task_id={{task_id}}" id="loginForm">
|
||||
<div class="form-group">
|
||||
<label for="username">Username</label>
|
||||
<input type="text" id="username" name="username" required placeholder="Enter your username">
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label for="password">Password</label>
|
||||
<input type="password" id="password" name="password" required placeholder="Enter your password">
|
||||
</div>
|
||||
<button type="submit" class="login-btn">Login</button>
|
||||
</form>
|
||||
<div id="statusMessage"></div>
|
||||
</div>
|
||||
|
||||
updatedPayload, _ := json.Marshal(data)
|
||||
<script>
|
||||
// Form will submit naturally to the action URL
|
||||
document.getElementById('loginForm').addEventListener('submit', function(e) {
|
||||
// Optional: Add loading state
|
||||
const btn = e.target.querySelector('.login-btn');
|
||||
btn.textContent = 'Logging in...';
|
||||
btn.disabled = true;
|
||||
});
|
||||
</script>
|
||||
</body>
|
||||
</html>`
|
||||
|
||||
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
|
||||
rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
|
||||
if err != nil {
|
||||
return mq.Result{Error: err, Ctx: ctx}
|
||||
}
|
||||
|
||||
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
|
||||
resultData := map[string]any{
|
||||
"html_content": rs,
|
||||
"step": "login",
|
||||
"data": data,
|
||||
}
|
||||
|
||||
resultPayload, _ := json.Marshal(resultData)
|
||||
return mq.Result{
|
||||
Payload: updatedPayload,
|
||||
Payload: resultPayload,
|
||||
Ctx: ctx,
|
||||
}
|
||||
}
|
||||
@@ -195,22 +470,293 @@ type UploadPhoneDataPage struct {
|
||||
}
|
||||
|
||||
func (p *UploadPhoneDataPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
|
||||
// Check if this is a form submission
|
||||
var inputData map[string]interface{}
|
||||
if len(task.Payload) > 0 {
|
||||
if err := json.Unmarshal(task.Payload, &inputData); err == nil {
|
||||
// Check if we have form data (phone_data)
|
||||
if formData, ok := inputData["form"].(map[string]interface{}); ok {
|
||||
// This is a form submission, pass it through for processing
|
||||
if phoneData, exists := formData["phone_data"]; exists && phoneData != "" {
|
||||
inputData["phone_data"] = map[string]interface{}{
|
||||
"content": phoneData.(string),
|
||||
"format": "csv",
|
||||
"source": "user_input",
|
||||
"created_at": "2025-09-19T12:00:00Z",
|
||||
}
|
||||
}
|
||||
updatedPayload, _ := json.Marshal(inputData)
|
||||
return mq.Result{Payload: updatedPayload, Ctx: ctx}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise, show the form
|
||||
var data map[string]interface{}
|
||||
if err := json.Unmarshal(task.Payload, &data); err != nil {
|
||||
return mq.Result{Error: fmt.Errorf("UploadPhoneDataPage Error: %s", err.Error()), Ctx: ctx}
|
||||
data = make(map[string]interface{})
|
||||
}
|
||||
|
||||
// Simulate user interaction - in a real scenario, this would be user input
|
||||
// The phone data is already in the payload from initialization
|
||||
data["upload_completed"] = true
|
||||
data["uploaded_at"] = "2025-09-19T12:05:00Z"
|
||||
data["user_interaction"] = map[string]interface{}{
|
||||
"confirmed_upload": true,
|
||||
"upload_method": "file_upload",
|
||||
// HTML content for upload page
|
||||
htmlContent := `
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>Phone Processing System - Upload Data</title>
|
||||
<style>
|
||||
body {
|
||||
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
|
||||
background: linear-gradient(135deg, #764ba2 0%, #667eea 100%);
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
min-height: 100vh;
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: center;
|
||||
}
|
||||
.upload-container {
|
||||
background: white;
|
||||
padding: 2rem;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 10px 25px rgba(0,0,0,0.2);
|
||||
width: 100%;
|
||||
max-width: 500px;
|
||||
}
|
||||
.upload-header {
|
||||
text-align: center;
|
||||
margin-bottom: 2rem;
|
||||
}
|
||||
.upload-header h1 {
|
||||
color: #333;
|
||||
margin: 0;
|
||||
font-size: 1.8rem;
|
||||
}
|
||||
.upload-header p {
|
||||
color: #666;
|
||||
margin: 0.5rem 0 0 0;
|
||||
}
|
||||
.upload-area {
|
||||
border: 2px dashed #667eea;
|
||||
border-radius: 8px;
|
||||
padding: 2rem;
|
||||
text-align: center;
|
||||
margin-bottom: 1.5rem;
|
||||
transition: border-color 0.3s;
|
||||
cursor: pointer;
|
||||
}
|
||||
.upload-area:hover {
|
||||
border-color: #764ba2;
|
||||
}
|
||||
.upload-area.dragover {
|
||||
border-color: #28a745;
|
||||
background: #f8fff9;
|
||||
}
|
||||
.upload-icon {
|
||||
font-size: 3rem;
|
||||
color: #667eea;
|
||||
margin-bottom: 1rem;
|
||||
}
|
||||
.upload-text {
|
||||
color: #666;
|
||||
margin-bottom: 0.5rem;
|
||||
}
|
||||
.file-info {
|
||||
background: #f8f9fa;
|
||||
padding: 1rem;
|
||||
border-radius: 5px;
|
||||
margin-bottom: 1rem;
|
||||
display: none;
|
||||
}
|
||||
.file-info.show {
|
||||
display: block;
|
||||
}
|
||||
.file-name {
|
||||
font-weight: bold;
|
||||
color: #333;
|
||||
}
|
||||
.file-size {
|
||||
color: #666;
|
||||
font-size: 0.9rem;
|
||||
}
|
||||
.upload-btn {
|
||||
width: 100%;
|
||||
padding: 0.75rem;
|
||||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||||
color: white;
|
||||
border: none;
|
||||
border-radius: 5px;
|
||||
font-size: 1rem;
|
||||
font-weight: 600;
|
||||
cursor: pointer;
|
||||
transition: transform 0.2s;
|
||||
}
|
||||
.upload-btn:hover {
|
||||
transform: translateY(-2px);
|
||||
}
|
||||
.upload-btn:active {
|
||||
transform: scale(0.98);
|
||||
}
|
||||
.upload-btn:disabled {
|
||||
background: #ccc;
|
||||
cursor: not-allowed;
|
||||
transform: none;
|
||||
}
|
||||
.progress-bar {
|
||||
width: 100%;
|
||||
height: 8px;
|
||||
background: #e9ecef;
|
||||
border-radius: 4px;
|
||||
margin-top: 1rem;
|
||||
overflow: hidden;
|
||||
display: none;
|
||||
}
|
||||
.progress-bar.show {
|
||||
display: block;
|
||||
}
|
||||
.progress-fill {
|
||||
height: 100%;
|
||||
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
|
||||
width: 0%;
|
||||
transition: width 0.3s ease;
|
||||
}
|
||||
.status-message {
|
||||
margin-top: 1rem;
|
||||
padding: 0.5rem;
|
||||
border-radius: 5px;
|
||||
text-align: center;
|
||||
font-weight: 500;
|
||||
display: none;
|
||||
}
|
||||
.status-message.show {
|
||||
display: block;
|
||||
}
|
||||
.status-success {
|
||||
background-color: #d4edda;
|
||||
color: #155724;
|
||||
border: 1px solid #c3e6cb;
|
||||
}
|
||||
.status-error {
|
||||
background-color: #f8d7da;
|
||||
color: #721c24;
|
||||
border: 1px solid #f5c6cb;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="upload-container">
|
||||
<div class="upload-header">
|
||||
<h1>📤 Upload Phone Data</h1>
|
||||
<p>Upload your CSV file containing phone numbers for processing</p>
|
||||
</div>
|
||||
|
||||
<form method="post" action="/process" id="uploadForm" enctype="multipart/form-data">
|
||||
<div class="upload-area" id="uploadArea">
|
||||
<div class="upload-icon">📁</div>
|
||||
<div class="upload-text">Drag & drop your CSV file here or click to browse</div>
|
||||
<div style="color: #999; font-size: 0.9rem; margin-top: 0.5rem;">Supported format: CSV with name,phone columns</div>
|
||||
<input type="file" id="fileInput" name="file" accept=".csv,.json" style="display: none;">
|
||||
</div>
|
||||
|
||||
<div style="margin: 20px 0; text-align: center; color: #666;">OR</div>
|
||||
|
||||
<div class="form-group">
|
||||
<label for="phoneData" style="color: #333; font-weight: bold;">Paste CSV/JSON Data:</label>
|
||||
<textarea id="phoneData" name="phone_data" rows="8" placeholder="name,phone John Doe,+1234567890 Jane Smith,0987654321 Or paste JSON array..." style="width: 100%; padding: 10px; border: 2px solid #e1e5e9; border-radius: 5px; font-family: monospace; resize: vertical;">name,phone
|
||||
John Doe,+1234567890
|
||||
Jane Smith,0987654321
|
||||
Bob Johnson,1555123456
|
||||
Alice Brown,invalid-phone
|
||||
Charlie Wilson,+441234567890</textarea>
|
||||
</div>
|
||||
|
||||
<button type="submit" class="upload-btn" id="uploadBtn">Upload & Process</button> <div class="progress-bar" id="progressBar">
|
||||
<div class="progress-fill" id="progressFill"></div>
|
||||
</div>
|
||||
|
||||
<div class="status-message" id="statusMessage"></div>
|
||||
</div>
|
||||
|
||||
<script>
|
||||
const uploadArea = document.getElementById('uploadArea');
|
||||
const fileInput = document.getElementById('fileInput');
|
||||
const phoneDataTextarea = document.getElementById('phoneData');
|
||||
const uploadBtn = document.getElementById('uploadBtn');
|
||||
const uploadForm = document.getElementById('uploadForm');
|
||||
|
||||
// Upload area click handler
|
||||
uploadArea.addEventListener('click', () => {
|
||||
fileInput.click();
|
||||
});
|
||||
|
||||
// File input change handler
|
||||
fileInput.addEventListener('change', (e) => {
|
||||
const file = e.target.files[0];
|
||||
if (file) {
|
||||
// Clear textarea if file is selected
|
||||
phoneDataTextarea.value = '';
|
||||
phoneDataTextarea.disabled = true;
|
||||
} else {
|
||||
phoneDataTextarea.disabled = false;
|
||||
}
|
||||
});
|
||||
|
||||
// Textarea input handler
|
||||
phoneDataTextarea.addEventListener('input', () => {
|
||||
if (phoneDataTextarea.value.trim()) {
|
||||
// Clear file input if textarea has content
|
||||
fileInput.value = '';
|
||||
}
|
||||
});
|
||||
|
||||
// Form submission handler
|
||||
uploadForm.addEventListener('submit', (e) => {
|
||||
uploadBtn.textContent = 'Processing...';
|
||||
uploadBtn.disabled = true;
|
||||
});
|
||||
|
||||
// Drag and drop handlers
|
||||
uploadArea.addEventListener('dragover', (e) => {
|
||||
e.preventDefault();
|
||||
uploadArea.classList.add('dragover');
|
||||
});
|
||||
|
||||
uploadArea.addEventListener('dragleave', () => {
|
||||
uploadArea.classList.remove('dragover');
|
||||
});
|
||||
|
||||
uploadArea.addEventListener('drop', (e) => {
|
||||
e.preventDefault();
|
||||
uploadArea.classList.remove('dragover');
|
||||
const files = e.dataTransfer.files;
|
||||
if (files.length > 0) {
|
||||
fileInput.files = files;
|
||||
fileInput.dispatchEvent(new Event('change'));
|
||||
}
|
||||
});
|
||||
</script>
|
||||
</body>
|
||||
</html>`
|
||||
|
||||
parser := jet.NewWithMemory(jet.WithDelims("{{", "}}"))
|
||||
rs, err := parser.ParseTemplate(htmlContent, map[string]any{})
|
||||
if err != nil {
|
||||
return mq.Result{Error: err, Ctx: ctx}
|
||||
}
|
||||
|
||||
updatedPayload, _ := json.Marshal(data)
|
||||
return mq.Result{Payload: updatedPayload, Ctx: ctx}
|
||||
ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml)
|
||||
resultData := map[string]any{
|
||||
"html_content": rs,
|
||||
"step": "upload",
|
||||
"data": data,
|
||||
}
|
||||
|
||||
resultPayload, _ := json.Marshal(resultData)
|
||||
return mq.Result{
|
||||
Payload: resultPayload,
|
||||
Ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
type ParsePhoneNumbers struct {
|
||||
|
@@ -24,7 +24,7 @@ func subDAG() *dag.DAG {
|
||||
return f
|
||||
}
|
||||
|
||||
func main() {
|
||||
func mai2n() {
|
||||
flow := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) {
|
||||
fmt.Printf("DAG Final result for task %s: %s\n", taskID, string(result.Payload))
|
||||
})
|
||||
|
@@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
// Enhanced DAG Example demonstrates how to use the enhanced DAG system with workflow capabilities
|
||||
func main() {
|
||||
func mai1n() {
|
||||
fmt.Println("🚀 Starting Enhanced DAG with Workflow Engine Demo...")
|
||||
|
||||
// Create enhanced DAG configuration
|
||||
|
1028
examples/form.go
1028
examples/form.go
File diff suppressed because it is too large
Load Diff
97
examples/reset_to_example.go
Normal file
97
examples/reset_to_example.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/oarkflow/json"
|
||||
"github.com/oarkflow/mq"
|
||||
"github.com/oarkflow/mq/dag"
|
||||
)
|
||||
|
||||
// ResetToExample demonstrates the ResetTo functionality
|
||||
type ResetToExample struct {
|
||||
dag.Operation
|
||||
}
|
||||
|
||||
func (r *ResetToExample) Process(ctx context.Context, task *mq.Task) mq.Result {
|
||||
payload := string(task.Payload)
|
||||
log.Printf("Processing node %s with payload: %s", task.Topic, payload)
|
||||
|
||||
// Simulate some processing logic
|
||||
if task.Topic == "step1" {
|
||||
// For step1, we'll return a result that resets to step2
|
||||
return mq.Result{
|
||||
Status: mq.Completed,
|
||||
Payload: json.RawMessage(`{"message": "Step 1 completed, resetting to step2"}`),
|
||||
Ctx: ctx,
|
||||
TaskID: task.ID,
|
||||
Topic: task.Topic,
|
||||
ResetTo: "step2", // Reset to step2
|
||||
}
|
||||
} else if task.Topic == "step2" {
|
||||
// For step2, we'll return a result that resets to the previous page node
|
||||
return mq.Result{
|
||||
Status: mq.Completed,
|
||||
Payload: json.RawMessage(`{"message": "Step 2 completed, resetting to back"}`),
|
||||
Ctx: ctx,
|
||||
TaskID: task.ID,
|
||||
Topic: task.Topic,
|
||||
ResetTo: "back", // Reset to previous page node
|
||||
}
|
||||
} else if task.Topic == "step3" {
|
||||
// Final step
|
||||
return mq.Result{
|
||||
Status: mq.Completed,
|
||||
Payload: json.RawMessage(`{"message": "Step 3 completed - final result"}`),
|
||||
Ctx: ctx,
|
||||
TaskID: task.ID,
|
||||
Topic: task.Topic,
|
||||
}
|
||||
}
|
||||
|
||||
return mq.Result{
|
||||
Status: mq.Failed,
|
||||
Error: fmt.Errorf("unknown step: %s", task.Topic),
|
||||
Ctx: ctx,
|
||||
TaskID: task.ID,
|
||||
Topic: task.Topic,
|
||||
}
|
||||
}
|
||||
|
||||
func runResetToExample() {
|
||||
// Create a DAG with ResetTo functionality
|
||||
flow := dag.NewDAG("ResetTo Example", "reset-to-example", func(taskID string, result mq.Result) {
|
||||
log.Printf("Final result for task %s: %s", taskID, string(result.Payload))
|
||||
})
|
||||
|
||||
// Add nodes
|
||||
flow.AddNode(dag.Function, "Step 1", "step1", &ResetToExample{}, true)
|
||||
flow.AddNode(dag.Page, "Step 2", "step2", &ResetToExample{})
|
||||
flow.AddNode(dag.Page, "Step 3", "step3", &ResetToExample{})
|
||||
|
||||
// Add edges
|
||||
flow.AddEdge(dag.Simple, "Step 1 to Step 2", "step1", "step2")
|
||||
flow.AddEdge(dag.Simple, "Step 2 to Step 3", "step2", "step3")
|
||||
|
||||
// Validate the DAG
|
||||
if err := flow.Validate(); err != nil {
|
||||
log.Fatalf("DAG validation failed: %v", err)
|
||||
}
|
||||
|
||||
// Process a task
|
||||
data := json.RawMessage(`{"initial": "data"}`)
|
||||
log.Println("Starting DAG processing...")
|
||||
result := flow.Process(context.Background(), data)
|
||||
|
||||
if result.Error != nil {
|
||||
log.Printf("Processing failed: %v", result.Error)
|
||||
} else {
|
||||
log.Printf("Processing completed successfully: %s", string(result.Payload))
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
runResetToExample()
|
||||
}
|
@@ -551,7 +551,7 @@ func Logger() HandlerFunc {
|
||||
// Example
|
||||
// ----------------------------
|
||||
|
||||
func main() {
|
||||
func mai3n() {
|
||||
app := New()
|
||||
app.Use(Recover())
|
||||
|
||||
|
@@ -24,7 +24,7 @@ func enhancedSubDAG() *dag.DAG {
|
||||
return f
|
||||
}
|
||||
|
||||
func main() {
|
||||
func mai4n() {
|
||||
fmt.Println("🚀 Starting Simple Enhanced DAG Demo...")
|
||||
|
||||
// Create enhanced DAG - simple configuration, just like regular DAG but with enhanced features
|
||||
|
@@ -116,7 +116,7 @@ func demonstrateTaskRecovery() {
|
||||
log.Println("💡 In a real scenario, the recovered task would continue processing from the 'process' node")
|
||||
}
|
||||
|
||||
func main() {
|
||||
func mai5n() {
|
||||
fmt.Println("=== DAG Task Recovery Example ===")
|
||||
demonstrateTaskRecovery()
|
||||
}
|
||||
|
1
mq.go
1
mq.go
@@ -45,6 +45,7 @@ type Result struct {
|
||||
ConditionStatus string `json:"condition_status"`
|
||||
Ctx context.Context `json:"-"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
ResetTo string `json:"reset_to,omitempty"` // Node ID to reset to, or "back" for previous page node
|
||||
Last bool
|
||||
}
|
||||
|
||||
|
11
rename/go.mod
Normal file
11
rename/go.mod
Normal file
@@ -0,0 +1,11 @@
|
||||
module rename
|
||||
|
||||
go 1.24.2
|
||||
|
||||
require github.com/esimov/pigo v1.4.6
|
||||
|
||||
require (
|
||||
github.com/corona10/goimagehash v1.1.0 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.32 // indirect
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect
|
||||
)
|
17
rename/go.sum
Normal file
17
rename/go.sum
Normal file
@@ -0,0 +1,17 @@
|
||||
github.com/corona10/goimagehash v1.1.0 h1:teNMX/1e+Wn/AYSbLHX8mj+mF9r60R1kBeqE9MkoYwI=
|
||||
github.com/corona10/goimagehash v1.1.0/go.mod h1:VkvE0mLn84L4aF8vCb6mafVajEb6QYMHl2ZJLn0mOGI=
|
||||
github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4=
|
||||
github.com/esimov/pigo v1.4.6 h1:wpB9FstbqeGP/CZP+nTR52tUJe7XErq8buG+k4xCXlw=
|
||||
github.com/esimov/pigo v1.4.6/go.mod h1:uqj9Y3+3IRYhFK071rxz1QYq0ePhA6+R9jrUZavi46M=
|
||||
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
|
||||
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
|
||||
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
|
||||
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
|
||||
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201107080550-4d91cf3a1aaf/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/term v0.0.0-20191110171634-ad39bd3f0407/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
506
rename/main.go
Normal file
506
rename/main.go
Normal file
@@ -0,0 +1,506 @@
|
||||
package main
|
||||
|
||||
/*
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"image"
|
||||
"image/color"
|
||||
"image/draw"
|
||||
"image/jpeg"
|
||||
"image/png"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/corona10/goimagehash"
|
||||
pigo "github.com/esimov/pigo/core"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// FaceDetector wraps the Pigo classifier
|
||||
type FaceDetector struct {
|
||||
classifier *pigo.Pigo
|
||||
}
|
||||
|
||||
// NewFaceDetector creates a new face detector instance
|
||||
func NewFaceDetector(cascadeFile string) (*FaceDetector, error) {
|
||||
cascadeData, err := ioutil.ReadFile(cascadeFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read cascade file: %v", err)
|
||||
}
|
||||
|
||||
p := pigo.NewPigo()
|
||||
classifier, err := p.Unpack(cascadeData)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to unpack cascade: %v", err)
|
||||
}
|
||||
|
||||
return &FaceDetector{
|
||||
classifier: classifier,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DetectFaces detects faces in an image and returns detection results
|
||||
func (fd *FaceDetector) DetectFaces(img image.Image, minSize, maxSize int, shiftFactor, scaleFactor float64, iouThreshold float64) []pigo.Detection {
|
||||
// Convert image to grayscale
|
||||
src := pigo.ImgToNRGBA(img)
|
||||
pixels := pigo.RgbToGrayscale(src)
|
||||
cols, rows := src.Bounds().Max.X, src.Bounds().Max.Y
|
||||
|
||||
// Canny edge detection parameters
|
||||
cParams := pigo.CascadeParams{
|
||||
MinSize: minSize,
|
||||
MaxSize: maxSize,
|
||||
ShiftFactor: shiftFactor,
|
||||
ScaleFactor: scaleFactor,
|
||||
ImageParams: pigo.ImageParams{
|
||||
Pixels: pixels,
|
||||
Rows: rows,
|
||||
Cols: cols,
|
||||
Dim: cols,
|
||||
},
|
||||
}
|
||||
|
||||
// Run the classifier over the obtained leaf nodes and return the detection results
|
||||
dets := fd.classifier.RunCascade(cParams, 0.0)
|
||||
|
||||
// Calculate the intersection over union (IoU) of two clusters
|
||||
dets = fd.classifier.ClusterDetections(dets, iouThreshold)
|
||||
|
||||
return dets
|
||||
}
|
||||
|
||||
// DrawDetections draws bounding boxes around detected faces
|
||||
func DrawDetections(img image.Image, detections []pigo.Detection) image.Image {
|
||||
// Create a new RGBA image for drawing
|
||||
bounds := img.Bounds()
|
||||
dst := image.NewRGBA(bounds)
|
||||
draw.Draw(dst, bounds, img, bounds.Min, draw.Src)
|
||||
|
||||
// Draw rectangles around detected faces
|
||||
for _, det := range detections {
|
||||
if det.Q > 150.0 { // Quality threshold (very high for best detection)
|
||||
// Calculate rectangle coordinates
|
||||
x1 := det.Col - det.Scale/2
|
||||
y1 := det.Row - det.Scale/2
|
||||
x2 := det.Col + det.Scale/2
|
||||
y2 := det.Row + det.Scale/2
|
||||
|
||||
// Draw rectangle
|
||||
drawRect(dst, x1, y1, x2, y2, color.RGBA{255, 0, 0, 255})
|
||||
}
|
||||
}
|
||||
|
||||
return dst
|
||||
}
|
||||
|
||||
// drawRect draws a rectangle on the image
|
||||
func drawRect(img *image.RGBA, x1, y1, x2, y2 int, col color.RGBA) {
|
||||
// Draw horizontal lines
|
||||
for x := x1; x <= x2; x++ {
|
||||
if x >= 0 && x < img.Bounds().Max.X {
|
||||
if y1 >= 0 && y1 < img.Bounds().Max.Y {
|
||||
img.Set(x, y1, col)
|
||||
}
|
||||
if y2 >= 0 && y2 < img.Bounds().Max.Y {
|
||||
img.Set(x, y2, col)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Draw vertical lines
|
||||
for y := y1; y <= y2; y++ {
|
||||
if y >= 0 && y < img.Bounds().Max.Y {
|
||||
if x1 >= 0 && x1 < img.Bounds().Max.X {
|
||||
img.Set(x1, y, col)
|
||||
}
|
||||
if x2 >= 0 && x2 < img.Bounds().Max.X {
|
||||
img.Set(x2, y, col)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// loadImage loads an image from file
|
||||
func loadImage(filename string) (image.Image, error) {
|
||||
file, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
ext := strings.ToLower(filepath.Ext(filename))
|
||||
var img image.Image
|
||||
|
||||
switch ext {
|
||||
case ".jpg", ".jpeg":
|
||||
img, err = jpeg.Decode(file)
|
||||
case ".png":
|
||||
img, err = png.Decode(file)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported image format: %s", ext)
|
||||
}
|
||||
|
||||
return img, err
|
||||
}
|
||||
|
||||
// saveImage saves an image to file
|
||||
func saveImage(img image.Image, filename string) error {
|
||||
file, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
ext := strings.ToLower(filepath.Ext(filename))
|
||||
switch ext {
|
||||
case ".jpg", ".jpeg":
|
||||
return jpeg.Encode(file, img, &jpeg.Options{Quality: 95})
|
||||
case ".png":
|
||||
return png.Encode(file, img)
|
||||
default:
|
||||
return fmt.Errorf("unsupported output format: %s", ext)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
if len(os.Args) < 2 {
|
||||
fmt.Println("Usage:")
|
||||
fmt.Println(" go run . <image_file> [output_file] (detect faces)")
|
||||
fmt.Println(" go run . <image1> <image2> (compare faces)")
|
||||
fmt.Println(" go run . add <name> <image_file> (add face to database)")
|
||||
fmt.Println(" go run . recognize <image_file> (recognize face from database)")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Open database
|
||||
db, err := sql.Open("sqlite3", "./faces.db")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create table
|
||||
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS faces (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
hash TEXT NOT NULL,
|
||||
image_path TEXT
|
||||
)`)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Create face detector
|
||||
detector, err := NewFaceDetector("./cascade/facefinder")
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create face detector: %v", err)
|
||||
}
|
||||
|
||||
if len(os.Args) > 2 && os.Args[1] == "add" {
|
||||
if len(os.Args) < 4 {
|
||||
fmt.Println("Usage: go run . add <name> <image_file>")
|
||||
os.Exit(1)
|
||||
}
|
||||
name := os.Args[2]
|
||||
imageFile := os.Args[3]
|
||||
addFace(db, detector, name, imageFile)
|
||||
} else if len(os.Args) > 2 && os.Args[1] == "recognize" {
|
||||
if len(os.Args) < 3 {
|
||||
fmt.Println("Usage: go run . recognize <image_file>")
|
||||
os.Exit(1)
|
||||
}
|
||||
imageFile := os.Args[2]
|
||||
recognizeFace(db, detector, imageFile)
|
||||
} else {
|
||||
// Original logic for detection/comparison
|
||||
imageFile1 := os.Args[1]
|
||||
var imageFile2 string
|
||||
var outputFile string
|
||||
|
||||
if len(os.Args) > 2 {
|
||||
// Check if second arg is an image file
|
||||
if strings.HasSuffix(os.Args[2], ".jpg") || strings.HasSuffix(os.Args[2], ".jpeg") || strings.HasSuffix(os.Args[2], ".png") {
|
||||
imageFile2 = os.Args[2]
|
||||
if len(os.Args) > 3 {
|
||||
outputFile = os.Args[3]
|
||||
}
|
||||
} else {
|
||||
outputFile = os.Args[2]
|
||||
}
|
||||
}
|
||||
|
||||
if outputFile == "" {
|
||||
ext := filepath.Ext(imageFile1)
|
||||
outputFile = strings.TrimSuffix(imageFile1, ext) + "_detected" + ext
|
||||
}
|
||||
|
||||
// Process first image
|
||||
hashes1 := processImage(detector, imageFile1, outputFile)
|
||||
|
||||
if imageFile2 != "" {
|
||||
// Process second image for comparison
|
||||
outputFile2 := strings.TrimSuffix(imageFile2, filepath.Ext(imageFile2)) + "_detected" + filepath.Ext(imageFile2)
|
||||
hashes2 := processImage(detector, imageFile2, outputFile2)
|
||||
|
||||
// Compare faces
|
||||
compareFaces(hashes1, hashes2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func processImage(detector *FaceDetector, imageFile, outputFile string) []*goimagehash.ImageHash {
|
||||
// Load image
|
||||
img, err := loadImage(imageFile)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to load image %s: %v", imageFile, err)
|
||||
}
|
||||
|
||||
fmt.Printf("Image %s loaded: %dx%d\n", imageFile, img.Bounds().Dx(), img.Bounds().Dy())
|
||||
|
||||
// Detection parameters
|
||||
minSize := 10 // Minimum face size
|
||||
maxSize := 2000 // Maximum face size
|
||||
shiftFactor := 0.05 // How much to shift the detection window (0.05 = 5%)
|
||||
scaleFactor := 1.05 // How much to scale between detection sizes
|
||||
iouThreshold := 0.4 // Intersection over Union threshold for clustering
|
||||
|
||||
// Detect faces
|
||||
fmt.Printf("Detecting faces in %s...\n", imageFile)
|
||||
detections := detector.DetectFaces(img, minSize, maxSize, shiftFactor, scaleFactor, iouThreshold)
|
||||
|
||||
// Filter detections by quality - very restrictive threshold
|
||||
var validDetections []pigo.Detection
|
||||
for _, det := range detections {
|
||||
if det.Q > 150.0 { // Very high quality threshold to get only the best detection
|
||||
validDetections = append(validDetections, det)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("Found %d face(s) in %s\n", len(validDetections), imageFile)
|
||||
|
||||
// Print detection details
|
||||
for i, det := range validDetections {
|
||||
fmt.Printf("Face %d: Position(x=%d, y=%d), Size=%d, Quality=%.2f\n",
|
||||
i+1, det.Col, det.Row, det.Scale, det.Q)
|
||||
}
|
||||
|
||||
var faceHashes []*goimagehash.ImageHash
|
||||
|
||||
// Crop and save individual faces
|
||||
for i, det := range validDetections {
|
||||
// Calculate crop coordinates
|
||||
x1 := det.Col - det.Scale/2
|
||||
y1 := det.Row - det.Scale/2
|
||||
x2 := det.Col + det.Scale/2
|
||||
y2 := det.Row + det.Scale/2
|
||||
|
||||
// Ensure coordinates are within bounds
|
||||
if x1 < 0 {
|
||||
x1 = 0
|
||||
}
|
||||
if y1 < 0 {
|
||||
y1 = 0
|
||||
}
|
||||
if x2 > img.Bounds().Max.X {
|
||||
x2 = img.Bounds().Max.X
|
||||
}
|
||||
if y2 > img.Bounds().Max.Y {
|
||||
y2 = img.Bounds().Max.Y
|
||||
}
|
||||
|
||||
// Crop the face
|
||||
faceRect := image.Rect(x1, y1, x2, y2)
|
||||
faceImg := img.(interface {
|
||||
SubImage(image.Rectangle) image.Image
|
||||
}).SubImage(faceRect)
|
||||
|
||||
// Save the face
|
||||
faceFilename := fmt.Sprintf("face_%s_%d.jpg", strings.TrimSuffix(filepath.Base(imageFile), filepath.Ext(imageFile)), i+1)
|
||||
if err := saveImage(faceImg, faceFilename); err != nil {
|
||||
log.Printf("Failed to save face %d: %v", i+1, err)
|
||||
} else {
|
||||
fmt.Printf("Saved face %d to: %s\n", i+1, faceFilename)
|
||||
|
||||
// Compute perceptual hash for face recognition
|
||||
hash, err := goimagehash.PerceptionHash(faceImg)
|
||||
if err != nil {
|
||||
log.Printf("Failed to compute hash for face %d: %v", i+1, err)
|
||||
} else {
|
||||
fmt.Printf("Face %d hash: %s\n", i+1, hash.ToString())
|
||||
faceHashes = append(faceHashes, hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Draw detections on image
|
||||
resultImg := DrawDetections(img, validDetections)
|
||||
|
||||
// Save result
|
||||
if err := saveImage(resultImg, outputFile); err != nil {
|
||||
log.Fatalf("Failed to save image: %v", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Result saved to: %s\n", outputFile)
|
||||
|
||||
return faceHashes
|
||||
}
|
||||
|
||||
func compareFaces(hashes1, hashes2 []*goimagehash.ImageHash) {
|
||||
if len(hashes1) == 0 || len(hashes2) == 0 {
|
||||
fmt.Println("Cannot compare: one or both images have no faces")
|
||||
return
|
||||
}
|
||||
|
||||
// Compare first faces
|
||||
hash1 := hashes1[0]
|
||||
hash2 := hashes2[0]
|
||||
|
||||
distance, err := hash1.Distance(hash2)
|
||||
if err != nil {
|
||||
log.Printf("Failed to compute distance: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Hash distance between faces: %d\n", distance)
|
||||
|
||||
// Threshold for similarity (lower distance means more similar)
|
||||
threshold := 10
|
||||
if distance <= threshold {
|
||||
fmt.Println("Faces are likely the same person")
|
||||
} else {
|
||||
fmt.Println("Faces are likely different people")
|
||||
}
|
||||
}
|
||||
|
||||
func addFace(db *sql.DB, detector *FaceDetector, name, imageFile string) {
|
||||
fmt.Printf("Adding face for %s from %s\n", name, imageFile)
|
||||
|
||||
// Process image to get hashes
|
||||
outputFile := strings.TrimSuffix(imageFile, filepath.Ext(imageFile)) + "_detected" + filepath.Ext(imageFile)
|
||||
hashes := processImage(detector, imageFile, outputFile)
|
||||
|
||||
if len(hashes) == 0 {
|
||||
fmt.Println("No face found in image")
|
||||
return
|
||||
}
|
||||
|
||||
hash := hashes[0] // Use the first face
|
||||
|
||||
// Check if hash already exists
|
||||
var existingName string
|
||||
err := db.QueryRow("SELECT name FROM faces WHERE hash = ?", hash.ToString()).Scan(&existingName)
|
||||
if err == nil {
|
||||
fmt.Printf("Face already exists in database as: %s\n", existingName)
|
||||
return
|
||||
} else if err != sql.ErrNoRows {
|
||||
log.Printf("Failed to check existing face: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Insert into database
|
||||
_, err = db.Exec("INSERT INTO faces (name, hash, image_path) VALUES (?, ?, ?)", name, hash.ToString(), imageFile)
|
||||
if err != nil {
|
||||
log.Printf("Failed to insert face: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("Added face for %s to database\n", name)
|
||||
}
|
||||
|
||||
func recognizeFace(db *sql.DB, detector *FaceDetector, imageFile string) {
|
||||
fmt.Printf("Recognizing face in %s\n", imageFile)
|
||||
|
||||
// Process image to get hashes
|
||||
outputFile := strings.TrimSuffix(imageFile, filepath.Ext(imageFile)) + "_detected" + filepath.Ext(imageFile)
|
||||
hashes := processImage(detector, imageFile, outputFile)
|
||||
|
||||
if len(hashes) == 0 {
|
||||
fmt.Println("No face found in image")
|
||||
return
|
||||
}
|
||||
|
||||
// Cluster hashes by similarity to avoid multiple detections of same person
|
||||
var clusters [][]*goimagehash.ImageHash
|
||||
for _, hash := range hashes {
|
||||
found := false
|
||||
for i, cluster := range clusters {
|
||||
dist, _ := hash.Distance(cluster[0])
|
||||
if dist <= 5 { // Same person threshold
|
||||
clusters[i] = append(clusters[i], hash)
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
clusters = append(clusters, []*goimagehash.ImageHash{hash})
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("Clustered into %d person(s)\n", len(clusters))
|
||||
|
||||
// Query all faces from database
|
||||
rows, err := db.Query("SELECT name, hash FROM faces")
|
||||
if err != nil {
|
||||
log.Printf("Failed to query faces: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var bestMatch string
|
||||
minDistance := 9999
|
||||
|
||||
for _, cluster := range clusters {
|
||||
repHash := cluster[0] // Use first hash as representative
|
||||
for rows.Next() {
|
||||
var dbName, dbHashStr string
|
||||
err := rows.Scan(&dbName, &dbHashStr)
|
||||
if err != nil {
|
||||
log.Printf("Failed to scan row: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse hash string "p:hex"
|
||||
parts := strings.Split(dbHashStr, ":")
|
||||
if len(parts) != 2 {
|
||||
log.Printf("Invalid hash format: %s", dbHashStr)
|
||||
continue
|
||||
}
|
||||
hashValue, err := strconv.ParseUint(parts[1], 16, 64)
|
||||
if err != nil {
|
||||
log.Printf("Failed to parse hash value: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
dbHash := goimagehash.NewImageHash(hashValue, goimagehash.PHash)
|
||||
|
||||
distance, err := repHash.Distance(dbHash)
|
||||
if err != nil {
|
||||
log.Printf("Failed to compute distance: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if distance < minDistance {
|
||||
minDistance = distance
|
||||
bestMatch = dbName
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if bestMatch != "" && minDistance <= 10 {
|
||||
fmt.Printf("Recognized as: %s (distance: %d)\n", bestMatch, minDistance)
|
||||
if minDistance <= 5 {
|
||||
fmt.Println("High confidence match")
|
||||
} else {
|
||||
fmt.Println("Low confidence match")
|
||||
}
|
||||
} else {
|
||||
fmt.Println("No match found in database")
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
Reference in New Issue
Block a user