feat: [wip] - Implement html node

This commit is contained in:
sujit
2024-11-26 10:22:37 +05:45
parent e87803c0ef
commit 0673ad5641
4 changed files with 116 additions and 153 deletions

View File

@@ -173,7 +173,7 @@ func (tm *DAG) ExportDOT() string {
sb.WriteString(` labelloc="b"; labeljust="c"; fontsize=16;`)
sb.WriteString("\n")
sb.WriteString(` margin=50;`)
sb.WriteString(` style=filled,bold; color=gray90;`)
sb.WriteString(` style="filled,bold"; color="gray90";`)
sb.WriteString("\n")
for _, subNodeKey := range subDAG.TopologicalSort() {
subNode, _ := subDAG.nodes.Get(subNodeKey)
@@ -214,7 +214,7 @@ func renderNode(sb *strings.Builder, node *Node, prefix ...string) {
switch node.NodeType {
case Function:
nodeColor = "#D4EDDA" // Light green background for Function
labelSuffix = " ƒ" // Function symbol (ƒ) as a graphical representation
labelSuffix = " ƒ(x)" // Function symbol (ƒ) as a graphical representation
case Page:
nodeColor = "#f0d2d1" // Light red background for Page

View File

@@ -4,62 +4,119 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq/examples/tasks"
)
func main() {
f := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) {
fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
}, mq.WithSyncMode(true))
f.SetNotifyResponse(func(ctx context.Context, result mq.Result) error {
if f.Notifier != nil {
f.Notifier.ToRoom("global", "final-message", result)
}
return nil
})
setup(f)
err := f.Validate()
if err != nil {
panic(err)
}
f.Start(context.Background(), ":8082")
sendData(f)
}
func subDAG() *dag.DAG {
f := dag.NewDAG("Sub DAG", "sub-dag", func(taskID string, result mq.Result) {
fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
}, mq.WithSyncMode(true))
f.
AddNode(dag.Function, "Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: "process"}}, true).
AddNode(dag.Function, "Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: "process"}}).
AddNode(dag.Function, "Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: "process"}}).
AddNode(dag.Function, "Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: dag.Function}}, true).
AddNode(dag.Function, "Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: dag.Function}}).
AddNode(dag.Function, "Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: dag.Function}}).
AddEdge(dag.Simple, "Store Payload to send sms", "store:data", "send:sms").
AddEdge(dag.Simple, "Store Payload to notification", "send:sms", "notification")
return f
}
func setup(f *dag.DAG) {
f.
AddNode(dag.Function, "Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: dag.Operation{Type: "process"}}).
AddNode(dag.Function, "Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: dag.Operation{Type: "process"}}).
AddNode(dag.Function, "Get Input", "get:input", &tasks.GetData{Operation: dag.Operation{Type: "input"}}, true).
AddNode(dag.Function, "Iterator Processor", "loop", &tasks.Loop{Operation: dag.Operation{Type: "loop"}}).
AddNode(dag.Function, "Condition", "condition", &tasks.Condition{Operation: dag.Operation{Type: "condition"}}).
AddDAGNode("Persistent", "persistent", subDAG()).
AddEdge(dag.Simple, "Get input to loop", "get:input", "loop").
AddEdge(dag.Iterator, "Loop to prepare email", "loop", "prepare:email").
AddEdge(dag.Simple, "Prepare Email to condition", "prepare:email", "condition").
AddCondition("condition", map[string]string{"pass": "email:deliver", "fail": "persistent"})
func main() {
flow := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) {
// fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
})
flow.AddNode(dag.Function, "GetData", "GetData", &GetData{}, true)
flow.AddNode(dag.Function, "Loop", "Loop", &Loop{})
flow.AddNode(dag.Function, "ValidateAge", "ValidateAge", &ValidateAge{})
flow.AddNode(dag.Function, "ValidateGender", "ValidateGender", &ValidateGender{})
flow.AddNode(dag.Function, "Final", "Final", &Final{})
flow.AddDAGNode("Check", "persistent", subDAG())
flow.AddEdge(dag.Simple, "GetData", "GetData", "Loop")
flow.AddEdge(dag.Iterator, "Validate age for each item", "Loop", "ValidateAge")
flow.AddCondition("ValidateAge", map[string]string{"pass": "ValidateGender", "default": "persistent"})
flow.AddEdge(dag.Simple, "Mark as Done", "Loop", "Final")
// flow.Start(":8080")
data := []byte(`[{"age": "15", "gender": "female"}, {"age": "18", "gender": "male"}]`)
if flow.Error != nil {
panic(flow.Error)
}
fmt.Println(flow.ExportDOT())
rs := flow.Process(context.Background(), data)
if rs.Error != nil {
panic(rs.Error)
}
fmt.Println(rs.Status, rs.Topic, string(rs.Payload))
}
func sendData(f *dag.DAG) {
data := []map[string]any{
{"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"},
}
bt, _ := json.Marshal(data)
result := f.Process(context.Background(), bt)
fmt.Println(string(result.Payload))
type GetData struct {
dag.Operation
}
func (p *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Ctx: ctx, Payload: task.Payload}
}
type Loop struct {
dag.Operation
}
func (p *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Ctx: ctx, Payload: task.Payload}
}
type ValidateAge struct {
dag.Operation
}
func (p *ValidateAge) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("ValidateAge Error: %s", err.Error()), Ctx: ctx}
}
var status string
if data["age"] == "18" {
status = "pass"
} else {
status = "default"
}
updatedPayload, _ := json.Marshal(data)
return mq.Result{Payload: updatedPayload, Ctx: ctx, ConditionStatus: status}
}
type ValidateGender struct {
dag.Operation
}
func (p *ValidateGender) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("ValidateGender Error: %s", err.Error()), Ctx: ctx}
}
data["female_voter"] = data["gender"] == "female"
updatedPayload, _ := json.Marshal(data)
return mq.Result{Payload: updatedPayload, Ctx: ctx}
}
type Final struct {
dag.Operation
}
func (p *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data []map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("Final Error: %s", err.Error()), Ctx: ctx}
}
for i, row := range data {
row["done"] = true
data[i] = row
}
updatedPayload, err := json.Marshal(data)
if err != nil {
panic(err)
}
return mq.Result{Payload: updatedPayload, Ctx: ctx}
}

View File

@@ -2,7 +2,9 @@ package tasks
import (
"context"
"github.com/oarkflow/json"
v2 "github.com/oarkflow/mq/dag"
"github.com/oarkflow/mq"
@@ -85,7 +87,9 @@ func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
if err != nil {
panic(err)
}
return mq.Result{Payload: task.Payload, Error: nil, Ctx: ctx}
data["sms_sent"] = true
d, _ := json.Marshal(data)
return mq.Result{Payload: d, Ctx: ctx}
}
type StoreData struct {
@@ -93,7 +97,14 @@ type StoreData struct {
}
func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: task.Payload, Ctx: ctx}
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
data["stored"] = true
d, _ := json.Marshal(data)
return mq.Result{Payload: d, Ctx: ctx}
}
type InAppNotification struct {
@@ -106,7 +117,9 @@ func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.R
if err != nil {
panic(err)
}
return mq.Result{Payload: task.Payload, Ctx: ctx}
data["notified"] = true
d, _ := json.Marshal(data)
return mq.Result{Payload: d, Ctx: ctx}
}
type Final struct {

View File

@@ -1,107 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
func main() {
flow := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) {
// fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload))
})
flow.AddNode(dag.Function, "GetData", "GetData", &GetData{}, true)
flow.AddNode(dag.Function, "Loop", "Loop", &Loop{})
flow.AddNode(dag.Function, "ValidateAge", "ValidateAge", &ValidateAge{})
flow.AddNode(dag.Function, "ValidateGender", "ValidateGender", &ValidateGender{})
flow.AddNode(dag.Function, "Final", "Final", &Final{})
flow.AddEdge(dag.Simple, "GetData", "GetData", "Loop")
flow.AddEdge(dag.Iterator, "Validate age for each item", "Loop", "ValidateAge")
flow.AddCondition("ValidateAge", map[string]string{"pass": "ValidateGender"})
flow.AddEdge(dag.Simple, "Mark as Done", "Loop", "Final")
// flow.Start(":8080")
data := []byte(`[{"age": "15", "gender": "female"}, {"age": "18", "gender": "male"}]`)
if flow.Error != nil {
panic(flow.Error)
}
fmt.Println(flow.ExportDOT())
rs := flow.Process(context.Background(), data)
if rs.Error != nil {
panic(rs.Error)
}
fmt.Println(rs.Status, rs.Topic, string(rs.Payload))
}
type GetData struct {
dag.Operation
}
func (p *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Ctx: ctx, Payload: task.Payload}
}
type Loop struct {
dag.Operation
}
func (p *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Ctx: ctx, Payload: task.Payload}
}
type ValidateAge struct {
dag.Operation
}
func (p *ValidateAge) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("ValidateAge Error: %s", err.Error()), Ctx: ctx}
}
var status string
if data["age"] == "18" {
status = "pass"
} else {
status = "default"
}
updatedPayload, _ := json.Marshal(data)
return mq.Result{Payload: updatedPayload, Ctx: ctx, ConditionStatus: status}
}
type ValidateGender struct {
dag.Operation
}
func (p *ValidateGender) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("ValidateGender Error: %s", err.Error()), Ctx: ctx}
}
data["female_voter"] = data["gender"] == "female"
updatedPayload, _ := json.Marshal(data)
return mq.Result{Payload: updatedPayload, Ctx: ctx}
}
type Final struct {
dag.Operation
}
func (p *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data []map[string]any
if err := json.Unmarshal(task.Payload, &data); err != nil {
return mq.Result{Error: fmt.Errorf("Final Error: %s", err.Error()), Ctx: ctx}
}
for i, row := range data {
row["done"] = true
data[i] = row
}
updatedPayload, err := json.Marshal(data)
if err != nil {
panic(err)
}
return mq.Result{Payload: updatedPayload, Ctx: ctx}
}