diff --git a/dag/ui.go b/dag/ui.go index 2aeefdb..8124bb4 100644 --- a/dag/ui.go +++ b/dag/ui.go @@ -173,7 +173,7 @@ func (tm *DAG) ExportDOT() string { sb.WriteString(` labelloc="b"; labeljust="c"; fontsize=16;`) sb.WriteString("\n") sb.WriteString(` margin=50;`) - sb.WriteString(` style=filled,bold; color=gray90;`) + sb.WriteString(` style="filled,bold"; color="gray90";`) sb.WriteString("\n") for _, subNodeKey := range subDAG.TopologicalSort() { subNode, _ := subDAG.nodes.Get(subNodeKey) @@ -214,7 +214,7 @@ func renderNode(sb *strings.Builder, node *Node, prefix ...string) { switch node.NodeType { case Function: nodeColor = "#D4EDDA" // Light green background for Function - labelSuffix = " ƒ" // Function symbol (ƒ) as a graphical representation + labelSuffix = " ƒ(x)" // Function symbol (ƒ) as a graphical representation case Page: nodeColor = "#f0d2d1" // Light red background for Page diff --git a/examples/dag.go b/examples/dag.go index 6edc258..3c86666 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -4,62 +4,119 @@ import ( "context" "encoding/json" "fmt" + "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" "github.com/oarkflow/mq/examples/tasks" ) -func main() { - f := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) { - fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) - }, mq.WithSyncMode(true)) - f.SetNotifyResponse(func(ctx context.Context, result mq.Result) error { - if f.Notifier != nil { - f.Notifier.ToRoom("global", "final-message", result) - } - return nil - }) - setup(f) - err := f.Validate() - if err != nil { - panic(err) - } - f.Start(context.Background(), ":8082") - sendData(f) -} - func subDAG() *dag.DAG { f := dag.NewDAG("Sub DAG", "sub-dag", func(taskID string, result mq.Result) { fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) }, mq.WithSyncMode(true)) f. - AddNode(dag.Function, "Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: "process"}}, true). - AddNode(dag.Function, "Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: "process"}}). - AddNode(dag.Function, "Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: "process"}}). + AddNode(dag.Function, "Store data", "store:data", &tasks.StoreData{Operation: dag.Operation{Type: dag.Function}}, true). + AddNode(dag.Function, "Send SMS", "send:sms", &tasks.SendSms{Operation: dag.Operation{Type: dag.Function}}). + AddNode(dag.Function, "Notification", "notification", &tasks.InAppNotification{Operation: dag.Operation{Type: dag.Function}}). AddEdge(dag.Simple, "Store Payload to send sms", "store:data", "send:sms"). AddEdge(dag.Simple, "Store Payload to notification", "send:sms", "notification") return f } -func setup(f *dag.DAG) { - f. - AddNode(dag.Function, "Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: dag.Operation{Type: "process"}}). - AddNode(dag.Function, "Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: dag.Operation{Type: "process"}}). - AddNode(dag.Function, "Get Input", "get:input", &tasks.GetData{Operation: dag.Operation{Type: "input"}}, true). - AddNode(dag.Function, "Iterator Processor", "loop", &tasks.Loop{Operation: dag.Operation{Type: "loop"}}). - AddNode(dag.Function, "Condition", "condition", &tasks.Condition{Operation: dag.Operation{Type: "condition"}}). - AddDAGNode("Persistent", "persistent", subDAG()). - AddEdge(dag.Simple, "Get input to loop", "get:input", "loop"). - AddEdge(dag.Iterator, "Loop to prepare email", "loop", "prepare:email"). - AddEdge(dag.Simple, "Prepare Email to condition", "prepare:email", "condition"). - AddCondition("condition", map[string]string{"pass": "email:deliver", "fail": "persistent"}) +func main() { + flow := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) { + // fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) + }) + flow.AddNode(dag.Function, "GetData", "GetData", &GetData{}, true) + flow.AddNode(dag.Function, "Loop", "Loop", &Loop{}) + flow.AddNode(dag.Function, "ValidateAge", "ValidateAge", &ValidateAge{}) + flow.AddNode(dag.Function, "ValidateGender", "ValidateGender", &ValidateGender{}) + flow.AddNode(dag.Function, "Final", "Final", &Final{}) + flow.AddDAGNode("Check", "persistent", subDAG()) + flow.AddEdge(dag.Simple, "GetData", "GetData", "Loop") + flow.AddEdge(dag.Iterator, "Validate age for each item", "Loop", "ValidateAge") + flow.AddCondition("ValidateAge", map[string]string{"pass": "ValidateGender", "default": "persistent"}) + flow.AddEdge(dag.Simple, "Mark as Done", "Loop", "Final") + + // flow.Start(":8080") + data := []byte(`[{"age": "15", "gender": "female"}, {"age": "18", "gender": "male"}]`) + if flow.Error != nil { + panic(flow.Error) + } + + fmt.Println(flow.ExportDOT()) + rs := flow.Process(context.Background(), data) + if rs.Error != nil { + panic(rs.Error) + } + fmt.Println(rs.Status, rs.Topic, string(rs.Payload)) } -func sendData(f *dag.DAG) { - data := []map[string]any{ - {"phone": "+123456789", "email": "abc.xyz@gmail.com"}, {"phone": "+98765412", "email": "xyz.abc@gmail.com"}, - } - bt, _ := json.Marshal(data) - result := f.Process(context.Background(), bt) - fmt.Println(string(result.Payload)) +type GetData struct { + dag.Operation +} + +func (p *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Ctx: ctx, Payload: task.Payload} +} + +type Loop struct { + dag.Operation +} + +func (p *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Ctx: ctx, Payload: task.Payload} +} + +type ValidateAge struct { + dag.Operation +} + +func (p *ValidateAge) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("ValidateAge Error: %s", err.Error()), Ctx: ctx} + } + var status string + if data["age"] == "18" { + status = "pass" + } else { + status = "default" + } + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx, ConditionStatus: status} +} + +type ValidateGender struct { + dag.Operation +} + +func (p *ValidateGender) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("ValidateGender Error: %s", err.Error()), Ctx: ctx} + } + data["female_voter"] = data["gender"] == "female" + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type Final struct { + dag.Operation +} + +func (p *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data []map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("Final Error: %s", err.Error()), Ctx: ctx} + } + for i, row := range data { + row["done"] = true + data[i] = row + } + updatedPayload, err := json.Marshal(data) + if err != nil { + panic(err) + } + return mq.Result{Payload: updatedPayload, Ctx: ctx} } diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go index dea31d0..9f1513e 100644 --- a/examples/tasks/operations.go +++ b/examples/tasks/operations.go @@ -2,7 +2,9 @@ package tasks import ( "context" + "github.com/oarkflow/json" + v2 "github.com/oarkflow/mq/dag" "github.com/oarkflow/mq" @@ -85,7 +87,9 @@ func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { if err != nil { panic(err) } - return mq.Result{Payload: task.Payload, Error: nil, Ctx: ctx} + data["sms_sent"] = true + d, _ := json.Marshal(data) + return mq.Result{Payload: d, Ctx: ctx} } type StoreData struct { @@ -93,7 +97,14 @@ type StoreData struct { } func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - return mq.Result{Payload: task.Payload, Ctx: ctx} + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + data["stored"] = true + d, _ := json.Marshal(data) + return mq.Result{Payload: d, Ctx: ctx} } type InAppNotification struct { @@ -106,7 +117,9 @@ func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.R if err != nil { panic(err) } - return mq.Result{Payload: task.Payload, Ctx: ctx} + data["notified"] = true + d, _ := json.Marshal(data) + return mq.Result{Payload: d, Ctx: ctx} } type Final struct { diff --git a/examples/v3.go b/examples/v3.go deleted file mode 100644 index bb589fb..0000000 --- a/examples/v3.go +++ /dev/null @@ -1,107 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - "github.com/oarkflow/mq" - "github.com/oarkflow/mq/dag" -) - -func main() { - flow := dag.NewDAG("Sample DAG", "sample-dag", func(taskID string, result mq.Result) { - // fmt.Printf("Final result for task %s: %s\n", taskID, string(result.Payload)) - }) - flow.AddNode(dag.Function, "GetData", "GetData", &GetData{}, true) - flow.AddNode(dag.Function, "Loop", "Loop", &Loop{}) - flow.AddNode(dag.Function, "ValidateAge", "ValidateAge", &ValidateAge{}) - flow.AddNode(dag.Function, "ValidateGender", "ValidateGender", &ValidateGender{}) - flow.AddNode(dag.Function, "Final", "Final", &Final{}) - - flow.AddEdge(dag.Simple, "GetData", "GetData", "Loop") - flow.AddEdge(dag.Iterator, "Validate age for each item", "Loop", "ValidateAge") - flow.AddCondition("ValidateAge", map[string]string{"pass": "ValidateGender"}) - flow.AddEdge(dag.Simple, "Mark as Done", "Loop", "Final") - - // flow.Start(":8080") - data := []byte(`[{"age": "15", "gender": "female"}, {"age": "18", "gender": "male"}]`) - if flow.Error != nil { - panic(flow.Error) - } - - fmt.Println(flow.ExportDOT()) - rs := flow.Process(context.Background(), data) - if rs.Error != nil { - panic(rs.Error) - } - fmt.Println(rs.Status, rs.Topic, string(rs.Payload)) -} - -type GetData struct { - dag.Operation -} - -func (p *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - return mq.Result{Ctx: ctx, Payload: task.Payload} -} - -type Loop struct { - dag.Operation -} - -func (p *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - return mq.Result{Ctx: ctx, Payload: task.Payload} -} - -type ValidateAge struct { - dag.Operation -} - -func (p *ValidateAge) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - if err := json.Unmarshal(task.Payload, &data); err != nil { - return mq.Result{Error: fmt.Errorf("ValidateAge Error: %s", err.Error()), Ctx: ctx} - } - var status string - if data["age"] == "18" { - status = "pass" - } else { - status = "default" - } - updatedPayload, _ := json.Marshal(data) - return mq.Result{Payload: updatedPayload, Ctx: ctx, ConditionStatus: status} -} - -type ValidateGender struct { - dag.Operation -} - -func (p *ValidateGender) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data map[string]any - if err := json.Unmarshal(task.Payload, &data); err != nil { - return mq.Result{Error: fmt.Errorf("ValidateGender Error: %s", err.Error()), Ctx: ctx} - } - data["female_voter"] = data["gender"] == "female" - updatedPayload, _ := json.Marshal(data) - return mq.Result{Payload: updatedPayload, Ctx: ctx} -} - -type Final struct { - dag.Operation -} - -func (p *Final) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { - var data []map[string]any - if err := json.Unmarshal(task.Payload, &data); err != nil { - return mq.Result{Error: fmt.Errorf("Final Error: %s", err.Error()), Ctx: ctx} - } - for i, row := range data { - row["done"] = true - data[i] = row - } - updatedPayload, err := json.Marshal(data) - if err != nil { - panic(err) - } - return mq.Result{Payload: updatedPayload, Ctx: ctx} -}