diff --git a/dag/v2/v2.go b/dag/v2/v2.go new file mode 100644 index 0000000..c766434 --- /dev/null +++ b/dag/v2/v2.go @@ -0,0 +1,291 @@ +package v2 + +import ( + "context" + "encoding/json" + "fmt" + "github.com/oarkflow/jet" + "log" + "math/rand" + "net/http" + "strings" + "time" +) + +type Node interface { + ProcessTask(ctx context.Context, task *Task) Result + GetNodeType() string +} + +type Result struct { + ConditionStatus string + Payload json.RawMessage + Error error +} + +type Task struct { + ID string + CurrentNodeID string + Payload json.RawMessage + FinalResult string +} + +type Operation struct { + ID string + Type string + Content string + Func func(task *Task) Result +} + +func (n *Operation) ProcessTask(ctx context.Context, task *Task) Result { + var data map[string]any + if task.Payload != nil { + err := json.Unmarshal(task.Payload, &data) + if err != nil { + return Result{Error: err} + } + } + if data == nil { + data = make(map[string]any) + } + if n.Type == "process" && n.Func != nil { + return n.Func(task) + } + if n.Type == "page" && n.Content != "" { + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + data["taskID"] = task.ID + tmpl := fmt.Sprintf("%s

Request

{{request_data|writeJson}}

", n.Content) + rs, err := parser.ParseTemplate(tmpl, map[string]any{ + "request_data": data, + "taskID": task.ID, + }) + if err != nil { + return Result{Error: err} + } + return Result{Payload: []byte(rs)} + } + return Result{Payload: task.Payload} +} + +func (n *Operation) GetNodeType() string { + return n.Type +} + +type Graph struct { + Nodes map[string]Node + Edges map[string][]string + tm *TaskManager +} + +func NewGraph() *Graph { + g := &Graph{ + Nodes: make(map[string]Node), + Edges: make(map[string][]string), + } + tm := NewTaskManager(g) + g.tm = tm + return g +} + +func (g *Graph) Start() { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + startTaskHandler(w, r, g.tm) + }) + http.HandleFunc("/render", func(w http.ResponseWriter, r *http.Request) { + renderHandler(w, r, g.tm) + }) + http.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) { + submitHandler(w, r, g.tm) + }) + http.HandleFunc("/verify", func(w http.ResponseWriter, r *http.Request) { + verifyHandler(w, r, g.tm) + }) + + fmt.Println("Server starting on :8080...") + log.Fatal(http.ListenAndServe(":8080", nil)) +} + +func (g *Graph) AddNode(nt Node) { + switch n := nt.(type) { + case *Operation: + g.Nodes[n.ID] = n + } +} + +func (g *Graph) AddEdge(fromID, toID string) { + g.Edges[fromID] = append(g.Edges[fromID], toID) +} + +type TaskManager struct { + tasks map[string]*Task + Graph *Graph +} + +func NewTaskManager(graph *Graph) *TaskManager { + return &TaskManager{ + tasks: make(map[string]*Task), + Graph: graph, + } +} + +func (tm *TaskManager) GetTask(taskID string) (*Task, bool) { + task, exists := tm.tasks[taskID] + return task, exists +} + +func (tm *TaskManager) StartTask() *Task { + taskID := generateTaskID() + task := &Task{ + ID: taskID, + CurrentNodeID: "customRegistration", + } + tm.tasks[taskID] = task + return task +} + +func (tm *TaskManager) UpdateTask(task *Task) { + tm.tasks[task.ID] = task +} + +func (tm *TaskManager) GetNextNode(task *Task) (Node, bool) { + if task == nil { + return nil, false + } + edges, _ := tm.Graph.Edges[task.CurrentNodeID] + if len(edges) > 0 { + nextNodeID := edges[0] + nextNode, exists := tm.Graph.Nodes[nextNodeID] + if exists { + return nextNode, true + } + } + return nil, false +} + +func generateTaskID() string { + rand.Seed(time.Now().UnixNano()) + const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + var result strings.Builder + for i := 0; i < 8; i++ { + result.WriteByte(charset[rand.Intn(len(charset))]) + } + return result.String() +} + +func processNode(w http.ResponseWriter, r *http.Request, task *Task, tm *TaskManager) { + for { + log.Printf("Processing taskID: %s, Current Node: %s", task.ID, task.CurrentNodeID) + node, exists := tm.Graph.Nodes[task.CurrentNodeID] + if !exists { + http.Error(w, "Node not found", http.StatusInternalServerError) + return + } + result := node.ProcessTask(context.Background(), task) + log.Printf("Node %s processed. Result ConditionStatus: %s", task.CurrentNodeID, result.ConditionStatus) + if node.GetNodeType() == "page" { + fmt.Fprintf(w, string(result.Payload)) + return + } + if result.Error != nil { + http.Error(w, result.Error.Error(), http.StatusInternalServerError) + return + } + nextNodeID := result.ConditionStatus + if nextNodeID == "" { + edges := tm.Graph.Edges[task.CurrentNodeID] + if len(edges) > 0 { + nextNodeID = edges[0] + log.Printf("No ConditionStatus found, following edge to next Operation: %s", nextNodeID) + } else { + log.Printf("Task %s completed. Final result: %s", task.ID, task.FinalResult) + fmt.Fprintf(w, "

Process Completed

%s

", task.FinalResult) + return + } + } + task.CurrentNodeID = nextNodeID + tm.UpdateTask(task) + if nextNode, nextExists := tm.Graph.Nodes[nextNodeID]; nextExists && nextNode.GetNodeType() == "page" { + log.Printf("Redirecting to next page: %s", nextNodeID) + http.Redirect(w, r, fmt.Sprintf("/render?taskID=%s", task.ID), http.StatusFound) + return + } + } +} + +func renderHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { + taskID := r.URL.Query().Get("taskID") + task, exists := tm.GetTask(taskID) + if !exists { + http.Error(w, "Task not found", http.StatusNotFound) + return + } + processNode(w, r, task, tm) +} + +func submitHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { + taskID := r.URL.Query().Get("taskID") + task, exists := tm.GetTask(taskID) + if !exists { + http.Error(w, "Task not found", http.StatusNotFound) + return + } + err := r.ParseForm() + if err != nil { + http.Error(w, "Failed to parse form data", http.StatusBadRequest) + return + } + inputData := make(map[string]string) + for key, values := range r.Form { + if len(values) > 0 { + inputData[key] = values[0] + } + } + rawInputs, _ := json.Marshal(inputData) + task.Payload = rawInputs + nextNode, exists := tm.GetNextNode(task) + if !exists { + log.Printf("Task %s completed. Final result: %s", task.ID, task.FinalResult) + fmt.Fprintf(w, "

Process Completed

%s

", task.FinalResult) + return + } + switch nextNode := nextNode.(type) { + case *Operation: + task.CurrentNodeID = nextNode.ID + } + processNode(w, r, task, tm) +} + +func startTaskHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { + task := tm.StartTask() + http.Redirect(w, r, fmt.Sprintf("/render?taskID=%s", task.ID), http.StatusFound) +} + +func verifyHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { + taskID := r.URL.Query().Get("taskID") + if taskID == "" { + http.Error(w, "Missing taskID", http.StatusBadRequest) + return + } + task, exists := tm.GetTask(taskID) + if !exists { + http.Error(w, "Task not found", http.StatusNotFound) + return + } + data := map[string]any{ + "email_verified": "true", + } + bt, _ := json.Marshal(data) + task.Payload = bt + log.Printf("Email for taskID %s successfully verified.", task.ID) + nextNode, exists := tm.Graph.Nodes["dashboard"] + if !exists { + http.Error(w, "Dashboard Operation not found", http.StatusInternalServerError) + return + } + result := nextNode.ProcessTask(context.Background(), task) + if result.Error != nil { + http.Error(w, result.Error.Error(), http.StatusInternalServerError) + return + } + fmt.Fprintf(w, string(result.Payload)) +} diff --git a/examples/html.go b/examples/html.go index 05d8a6b..1634a28 100644 --- a/examples/html.go +++ b/examples/html.go @@ -1,370 +1,95 @@ package main import ( - "context" "encoding/json" "fmt" - "github.com/oarkflow/jet" - "log" - "math/rand" - "net/http" - "strings" - "time" + "github.com/oarkflow/mq/dag/v2" ) -const ( - PageType = "page" - FunctionType = "function" -) - -type Node interface { - ProcessTask(ctx context.Context, task *Task) Result - GetNodeType() string -} - -type Result struct { - ConditionStatus string - Payload json.RawMessage - Error error -} - -type Task struct { - ID string - CurrentNodeID string - Payload json.RawMessage - FinalResult string -} - -type PageNode struct { - ID string - Content string -} - -func (n *PageNode) ProcessTask(ctx context.Context, task *Task) Result { - var data map[string]any - if task.Payload != nil { - err := json.Unmarshal(task.Payload, &data) - if err != nil { - return Result{Error: err} - } - } - if data == nil { - data = make(map[string]any) - } - parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) - data["taskID"] = task.ID - tmpl := fmt.Sprintf("%s

Request

{{request_data|writeJson}}

", n.Content) - rs, err := parser.ParseTemplate(tmpl, map[string]any{ - "request_data": data, - "taskID": task.ID, - }) - if err != nil { - return Result{Error: err} - } - return Result{Payload: []byte(rs)} -} - -func (n *PageNode) GetNodeType() string { - return PageType -} - -type FunctionNode struct { - ID string - Func func(task *Task) Result -} - -func (n *FunctionNode) ProcessTask(ctx context.Context, task *Task) Result { - return n.Func(task) -} - -func (n *FunctionNode) GetNodeType() string { - return FunctionType -} - -type Graph struct { - Nodes map[string]Node - Edges map[string][]string -} - -func NewGraph() *Graph { - return &Graph{ - Nodes: make(map[string]Node), - Edges: make(map[string][]string), - } -} - -func (g *Graph) AddNode(node Node) { - switch n := node.(type) { - case *PageNode: - g.Nodes[n.ID] = n - case *FunctionNode: - g.Nodes[n.ID] = n - } -} - -func (g *Graph) AddEdge(fromID, toID string) { - g.Edges[fromID] = append(g.Edges[fromID], toID) -} - -type TaskManager struct { - tasks map[string]*Task - graph *Graph -} - -func NewTaskManager(graph *Graph) *TaskManager { - return &TaskManager{ - tasks: make(map[string]*Task), - graph: graph, - } -} - -func (tm *TaskManager) GetTask(taskID string) (*Task, bool) { - task, exists := tm.tasks[taskID] - return task, exists -} - -func (tm *TaskManager) StartTask() *Task { - taskID := generateTaskID() - task := &Task{ - ID: taskID, - CurrentNodeID: "customRegistration", - } - tm.tasks[taskID] = task - return task -} - -func (tm *TaskManager) UpdateTask(task *Task) { - tm.tasks[task.ID] = task -} - -func (tm *TaskManager) GetNextNode(task *Task) (Node, bool) { - if task == nil { - return nil, false - } - edges, _ := tm.graph.Edges[task.CurrentNodeID] - if len(edges) > 0 { - nextNodeID := edges[0] - nextNode, exists := tm.graph.Nodes[nextNodeID] - if exists { - return nextNode, true - } - } - return nil, false -} - -func generateTaskID() string { - rand.Seed(time.Now().UnixNano()) - const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - var result strings.Builder - for i := 0; i < 8; i++ { - result.WriteByte(charset[rand.Intn(len(charset))]) - } - return result.String() -} - -func processNode(w http.ResponseWriter, r *http.Request, task *Task, tm *TaskManager) { - for { - log.Printf("Processing taskID: %s, Current Node: %s", task.ID, task.CurrentNodeID) - node, exists := tm.graph.Nodes[task.CurrentNodeID] - if !exists { - http.Error(w, "Node not found", http.StatusInternalServerError) - return - } - result := node.ProcessTask(context.Background(), task) - log.Printf("Node %s processed. Result ConditionStatus: %s", task.CurrentNodeID, result.ConditionStatus) - if node.GetNodeType() == PageType { - fmt.Fprintf(w, string(result.Payload)) - return - } - if result.Error != nil { - http.Error(w, result.Error.Error(), http.StatusInternalServerError) - return - } - nextNodeID := result.ConditionStatus - if nextNodeID == "" { - edges := tm.graph.Edges[task.CurrentNodeID] - if len(edges) > 0 { - nextNodeID = edges[0] - log.Printf("No ConditionStatus found, following edge to next node: %s", nextNodeID) - } else { - log.Printf("Task %s completed. Final result: %s", task.ID, task.FinalResult) - fmt.Fprintf(w, "

Process Completed

%s

", task.FinalResult) - return - } - } - task.CurrentNodeID = nextNodeID - tm.UpdateTask(task) - if nextNode, nextExists := tm.graph.Nodes[nextNodeID]; nextExists && nextNode.GetNodeType() == PageType { - log.Printf("Redirecting to next page: %s", nextNodeID) - http.Redirect(w, r, fmt.Sprintf("/render?taskID=%s", task.ID), http.StatusFound) - return - } - } -} - -func renderHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { - taskID := r.URL.Query().Get("taskID") - task, exists := tm.GetTask(taskID) - if !exists { - http.Error(w, "Task not found", http.StatusNotFound) - return - } - processNode(w, r, task, tm) -} - -func submitHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { - taskID := r.URL.Query().Get("taskID") - task, exists := tm.GetTask(taskID) - if !exists { - http.Error(w, "Task not found", http.StatusNotFound) - return - } - err := r.ParseForm() - if err != nil { - http.Error(w, "Failed to parse form data", http.StatusBadRequest) - return - } - inputData := make(map[string]string) - for key, values := range r.Form { - if len(values) > 0 { - inputData[key] = values[0] - } - } - rawInputs, _ := json.Marshal(inputData) - task.Payload = rawInputs - nextNode, exists := tm.GetNextNode(task) - if !exists { - log.Printf("Task %s completed. Final result: %s", task.ID, task.FinalResult) - fmt.Fprintf(w, "

Process Completed

%s

", task.FinalResult) - return - } - switch nextNode := nextNode.(type) { - case *PageNode: - task.CurrentNodeID = nextNode.ID - case *FunctionNode: - task.CurrentNodeID = nextNode.ID - } - processNode(w, r, task, tm) -} - -func startTaskHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { - task := tm.StartTask() - http.Redirect(w, r, fmt.Sprintf("/render?taskID=%s", task.ID), http.StatusFound) -} - -func isValidEmail(email string) bool { - return email != "" -} - -func isValidPhone(phone string) bool { - return phone != "" -} - -func verifyHandler(w http.ResponseWriter, r *http.Request, tm *TaskManager) { - taskID := r.URL.Query().Get("taskID") - if taskID == "" { - http.Error(w, "Missing taskID", http.StatusBadRequest) - return - } - task, exists := tm.GetTask(taskID) - if !exists { - http.Error(w, "Task not found", http.StatusNotFound) - return - } - data := map[string]any{ - "email_verified": "true", - } - bt, _ := json.Marshal(data) - task.Payload = bt - log.Printf("Email for taskID %s successfully verified.", task.ID) - nextNode, exists := tm.graph.Nodes["dashboard"] - if !exists { - http.Error(w, "Dashboard node not found", http.StatusInternalServerError) - return - } - result := nextNode.ProcessTask(context.Background(), task) - if result.Error != nil { - http.Error(w, result.Error.Error(), http.StatusInternalServerError) - return - } - fmt.Fprintf(w, string(result.Payload)) -} - func main() { - graph := NewGraph() - tm := NewTaskManager(graph) - customRegistrationNode := &PageNode{ + graph := v2.NewGraph() + customRegistrationNode := &v2.Operation{ + Type: "page", ID: "customRegistration", Content: `

Custom Registration




`, } - checkValidityNode := &FunctionNode{ - ID: "checkValidity", - Func: func(task *Task) Result { + checkValidityNode := &v2.Operation{ + Type: "process", + ID: "checkValidity", + Func: func(task *v2.Task) v2.Result { var inputs map[string]string if err := json.Unmarshal(task.Payload, &inputs); err != nil { - return Result{ConditionStatus: "customRegistration", Error: fmt.Errorf("Invalid input format")} + return v2.Result{ConditionStatus: "customRegistration", Error: fmt.Errorf("Invalid input format")} } email, phone := inputs["email"], inputs["phone"] if !isValidEmail(email) || !isValidPhone(phone) { - return Result{ + return v2.Result{ ConditionStatus: "customRegistration", Error: fmt.Errorf("Invalid email or phone number. Please try again."), } } - return Result{ConditionStatus: "checkManualVerification"} + return v2.Result{ConditionStatus: "checkManualVerification"} }, } - checkManualVerificationNode := &FunctionNode{ - ID: "checkManualVerification", - Func: func(task *Task) Result { + checkManualVerificationNode := &v2.Operation{ + Type: "process", + ID: "checkManualVerification", + Func: func(task *v2.Task) v2.Result { var inputs map[string]string if err := json.Unmarshal(task.Payload, &inputs); err != nil { - return Result{ConditionStatus: "customRegistration", Error: fmt.Errorf("Invalid input format")} + return v2.Result{ConditionStatus: "customRegistration", Error: fmt.Errorf("Invalid input format")} } city := inputs["city"] if city != "Kathmandu" { - return Result{ConditionStatus: "manualVerificationPage"} + return v2.Result{ConditionStatus: "manualVerificationPage"} } - return Result{ConditionStatus: "approveCustomer"} + return v2.Result{ConditionStatus: "approveCustomer"} }, } - approveCustomerNode := &FunctionNode{ - ID: "approveCustomer", - Func: func(task *Task) Result { + approveCustomerNode := &v2.Operation{ + Type: "process", + ID: "approveCustomer", + Func: func(task *v2.Task) v2.Result { task.FinalResult = "Customer approved" - return Result{} + return v2.Result{} }, } - sendVerificationEmailNode := &FunctionNode{ - ID: "sendVerificationEmail", - Func: func(task *Task) Result { - return Result{} + sendVerificationEmailNode := &v2.Operation{ + Type: "process", + ID: "sendVerificationEmail", + Func: func(task *v2.Task) v2.Result { + return v2.Result{} }, } - verificationLinkPageNode := &PageNode{ + verificationLinkPageNode := &v2.Operation{ + Type: "page", ID: "verificationLinkPage", Content: `

Verify Email

Click here to verify your email

Verify`, } - dashboardNode := &PageNode{ + dashboardNode := &v2.Operation{ + Type: "page", ID: "dashboard", Content: `

Dashboard

Welcome to your dashboard!

`, } - manualVerificationNode := &PageNode{ + manualVerificationNode := &v2.Operation{ + Type: "page", ID: "manualVerificationPage", Content: `

Manual Verification

Please verify the user's information manually.

`, } - verifyApprovedNode := &FunctionNode{ - ID: "verifyApproved", - Func: func(task *Task) Result { - return Result{} + verifyApprovedNode := &v2.Operation{ + Type: "process", + ID: "verifyApproved", + Func: func(task *v2.Task) v2.Result { + return v2.Result{} }, } - denyVerificationNode := &FunctionNode{ - ID: "denyVerification", - Func: func(task *Task) Result { + denyVerificationNode := &v2.Operation{ + Type: "process", + ID: "denyVerification", + Func: func(task *v2.Task) v2.Result { task.FinalResult = "Verification Denied" - return Result{} + return v2.Result{} }, } @@ -390,20 +115,13 @@ func main() { graph.AddEdge("manualVerificationPage", "denyVerification") graph.AddEdge("verifyApproved", "approveCustomer") graph.AddEdge("denyVerification", "verificationLinkPage") - - http.HandleFunc("/start", func(w http.ResponseWriter, r *http.Request) { - startTaskHandler(w, r, tm) - }) - http.HandleFunc("/render", func(w http.ResponseWriter, r *http.Request) { - renderHandler(w, r, tm) - }) - http.HandleFunc("/submit", func(w http.ResponseWriter, r *http.Request) { - submitHandler(w, r, tm) - }) - http.HandleFunc("/verify", func(w http.ResponseWriter, r *http.Request) { - verifyHandler(w, r, tm) - }) - - fmt.Println("Server starting on :8080...") - log.Fatal(http.ListenAndServe(":8080", nil)) + graph.Start() +} + +func isValidEmail(email string) bool { + return email != "" +} + +func isValidPhone(phone string) bool { + return phone != "" }