From 8d96882d958446e0645670a242fb4906178512fb Mon Sep 17 00:00:00 2001 From: sujit Date: Tue, 15 Oct 2024 20:27:04 +0545 Subject: [PATCH] feat: remove un-necessary dependencies --- consumer.go | 12 +- dag/dag.go | 37 ++- dag/task_manager.go | 10 +- examples/dag.go | 149 +++++------ examples/dag_consumer.go | 12 +- examples/subdag.go | 110 +++++++++ examples/tasks/operations.go | 161 ++++++++++++ examples/tasks/tasks.go | 33 ++- go.mod | 6 + go.sum | 12 + options.go | 24 +- services/operation.go | 463 +++++++++++++++++++++++++++++++++++ 12 files changed, 890 insertions(+), 139 deletions(-) create mode 100644 examples/subdag.go create mode 100644 examples/tasks/operations.go create mode 100644 services/operation.go diff --git a/consumer.go b/consumer.go index 9b85247..6ae840b 100644 --- a/consumer.go +++ b/consumer.go @@ -22,8 +22,10 @@ type Processor interface { Pause(ctx context.Context) error Resume(ctx context.Context) error Stop(ctx context.Context) error - GetKey() string Close() error + GetKey() string + SetKey(key string) + GetType() string } type Consumer struct { @@ -62,6 +64,14 @@ func (c *Consumer) GetKey() string { return c.id } +func (c *Consumer) GetType() string { + return "consumer" +} + +func (c *Consumer) SetKey(key string) { + c.id = key +} + func (c *Consumer) subscribe(ctx context.Context, queue string) error { headers := HeadersWithConsumerID(ctx, c.id) msg := codec.NewMessage(consts.SUBSCRIBE, utils.ToByte("{}"), queue, headers) diff --git a/dag/dag.go b/dag/dag.go index 833f956..1c29fc0 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -77,6 +77,14 @@ type DAG struct { taskCleanupCh chan string } +func (tm *DAG) SetKey(key string) { + tm.key = key +} + +func (tm *DAG) GetType() string { + return tm.key +} + func (tm *DAG) listenForTaskCleanup() { for taskID := range tm.taskCleanupCh { tm.mu.Lock() @@ -121,6 +129,7 @@ func (tm *DAG) AssignTopic(topic string) { } func NewDAG(name, key string, opts ...mq.Option) *DAG { + callback := func(ctx context.Context, result mq.Result) error { return nil } d := &DAG{ name: name, key: key, @@ -132,11 +141,10 @@ func NewDAG(name, key string, opts ...mq.Option) *DAG { opts = append(opts, mq.WithCallback(d.onTaskCallback), mq.WithConsumerOnSubscribe(d.onConsumerJoin), mq.WithConsumerOnClose(d.onConsumerClose)) d.server = mq.NewBroker(opts...) d.opts = opts - d.pool = mq.NewPool(d.server.Options().NumOfWorkers(), d.server.Options().QueueSize(), d.server.Options().MaxMemoryLoad(), d.ProcessTask, func(ctx context.Context, result mq.Result) error { - return nil - }) + options := d.server.Options() + d.pool = mq.NewPool(options.NumOfWorkers(), options.QueueSize(), options.MaxMemoryLoad(), d.ProcessTask, callback) d.pool.Start(d.server.Options().NumOfWorkers()) - go d.listenForTaskCleanup() // Start listening for task cleanup signals + go d.listenForTaskCleanup() return d } @@ -227,10 +235,10 @@ func (tm *DAG) AddDAGNode(name string, key string, dag *DAG, firstNode ...bool) } } -func (tm *DAG) AddNode(name, key string, handler mq.Handler, firstNode ...bool) { +func (tm *DAG) AddNode(name, key string, handler mq.Processor, firstNode ...bool) *DAG { tm.mu.Lock() defer tm.mu.Unlock() - con := mq.NewConsumer(key, key, handler, tm.opts...) + con := mq.NewConsumer(key, key, handler.ProcessTask, tm.opts...) tm.nodes[key] = &Node{ Name: name, Key: key, @@ -240,6 +248,7 @@ func (tm *DAG) AddNode(name, key string, handler mq.Handler, firstNode ...bool) if len(firstNode) > 0 && firstNode[0] { tm.startNode = key } + return tm } func (tm *DAG) AddDeferredNode(name, key string, firstNode ...bool) error { @@ -269,18 +278,21 @@ func (tm *DAG) IsReady() bool { return true } -func (tm *DAG) AddCondition(fromNode FromNode, conditions map[When]Then) { +func (tm *DAG) AddCondition(fromNode FromNode, conditions map[When]Then) *DAG { tm.mu.Lock() defer tm.mu.Unlock() tm.conditions[fromNode] = conditions + return tm } -func (tm *DAG) AddLoop(label, from string, targets ...string) { +func (tm *DAG) AddLoop(label, from string, targets ...string) *DAG { tm.addEdge(Iterator, label, from, targets...) + return tm } -func (tm *DAG) AddEdge(label, from string, targets ...string) { +func (tm *DAG) AddEdge(label, from string, targets ...string) *DAG { tm.addEdge(Simple, label, from, targets...) + return tm } func (tm *DAG) addEdge(edgeType EdgeType, label, from string, targets ...string) { @@ -334,6 +346,9 @@ func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result { if err != nil { return mq.Result{Error: err} } + if tm.server.SyncMode() { + ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"}) + } task := NewTask(mq.NewID(), payload, initialNode) awaitResponse, _ := mq.GetAwaitResponse(ctx) if awaitResponse != "true" { @@ -342,7 +357,9 @@ func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result { if ok { ctxx = mq.SetHeaders(ctxx, headers.AsMap()) } - tm.pool.AddTask(ctxx, task) + if err := tm.pool.AddTask(ctxx, task); err != nil { + return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: initialNode, Status: "FAILED", Error: err} + } return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: initialNode, Status: "PENDING"} } return tm.ProcessTask(ctx, task) diff --git a/dag/task_manager.go b/dag/task_manager.go index 83fe500..346043d 100644 --- a/dag/task_manager.go +++ b/dag/task_manager.go @@ -155,10 +155,16 @@ func (tm *TaskManager) handleResult(ctx context.Context, results any) mq.Result if err != nil { return mq.HandleError(ctx, err) } - return mq.Result{TaskID: tm.taskID, Payload: finalOutput, Status: status, Topic: topic} + return mq.Result{TaskID: tm.taskID, Payload: finalOutput, Status: status, Topic: topic, Ctx: ctx} case mq.Result: + if res.Ctx == nil { + res.Ctx = ctx + } return res } + if rs.Ctx == nil { + rs.Ctx = ctx + } return rs } @@ -188,7 +194,7 @@ func (tm *TaskManager) processNode(ctx context.Context, node *Node, payload json } select { case <-ctx.Done(): - result = mq.Result{TaskID: tm.taskID, Topic: node.Key, Error: ctx.Err()} + result = mq.Result{TaskID: tm.taskID, Topic: node.Key, Error: ctx.Err(), Ctx: ctx} tm.appendFinalResult(result) return default: diff --git a/examples/dag.go b/examples/dag.go index 5d3e6f6..08b3b74 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -4,107 +4,70 @@ import ( "context" "encoding/json" "fmt" - "io" - "net/http" + "time" - "github.com/oarkflow/mq/consts" "github.com/oarkflow/mq/examples/tasks" + "github.com/oarkflow/mq/services" "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" ) -var ( - d = dag.NewDAG( - "Sample DAG", - "sample-dag", - mq.WithSyncMode(true), - mq.WithNotifyResponse(tasks.NotifyResponse), - ) - // d = dag.NewDAG(mq.WithSyncMode(true), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert")) -) - func main() { - subDag := dag.NewDAG( - "Sub DAG", - "D", - mq.WithNotifyResponse(tasks.NotifySubDAGResponse), - ) - subDag.AddNode("I", "I", tasks.Node4, true) - subDag.AddNode("F", "F", tasks.Node6) - subDag.AddNode("G", "G", tasks.Node7) - subDag.AddNode("H", "H", tasks.Node8) - subDag.AddEdge("Label 2", "I", "F") - subDag.AddEdge("Label 4", "F", "G", "H") - - d.AddNode("A", "A", tasks.Node1, true) - d.AddNode("B", "B", tasks.Node2) - d.AddNode("C", "C", tasks.Node3) - d.AddDAGNode("D", "D", subDag) - d.AddNode("E", "E", tasks.Node5) - d.AddLoop("Send each item", "A", "B") - d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"}) - d.AddEdge("Label 1", "B", "C") - // Classify edges - // d.ClassifyEdges() - // fmt.Println(d.ExportDOT()) - - http.HandleFunc("POST /publish", requestHandler("publish")) - http.HandleFunc("POST /request", requestHandler("request")) - http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) { - id := request.PathValue("id") - if id != "" { - d.PauseConsumer(request.Context(), id) - } - }) - http.HandleFunc("/resume-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) { - id := request.PathValue("id") - if id != "" { - d.ResumeConsumer(request.Context(), id) - } - }) - http.HandleFunc("/pause", func(writer http.ResponseWriter, request *http.Request) { - d.Pause(request.Context()) - }) - http.HandleFunc("/resume", func(writer http.ResponseWriter, request *http.Request) { - d.Resume(request.Context()) - }) - err := d.Start(context.TODO(), ":8083") - if err != nil { - panic(err) - } + sync() + async() } -func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) - return - } - var payload []byte - if r.Body != nil { - defer r.Body.Close() - var err error - payload, err = io.ReadAll(r.Body) - if err != nil { - http.Error(w, "Failed to read request body", http.StatusBadRequest) - return - } - } else { - http.Error(w, "Empty request body", http.StatusBadRequest) - return - } - ctx := r.Context() - if requestType == "request" { - ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"}) - } - // ctx = context.WithValue(ctx, "initial_node", "E") - rs := d.Process(ctx, payload) - if rs.Error != nil { - http.Error(w, fmt.Sprintf("[DAG Error] - %v", rs.Error), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(rs) - } +func setup(f *dag.DAG) { + f. + AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: services.Operation{Type: "process"}}). + AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: services.Operation{Type: "process"}}). + AddNode("Get Input", "get:input", &tasks.GetData{Operation: services.Operation{Type: "input"}}, true). + AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: services.Operation{Type: "loop"}}). + AddNode("Condition", "condition", &tasks.Condition{Operation: services.Operation{Type: "condition"}}). + AddNode("Store data", "store:data", &tasks.StoreData{Operation: services.Operation{Type: "process"}}). + AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: services.Operation{Type: "process"}}). + AddNode("Notification", "notification", &tasks.InAppNotification{Operation: services.Operation{Type: "process"}}). + AddNode("Data Branch", "data-branch", &tasks.DataBranchHandler{Operation: services.Operation{Type: "condition"}}). + AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "store:data"}). + AddEdge("Get input to loop", "get:input", "loop"). + AddLoop("Loop to prepare email", "loop", "prepare:email"). + AddEdge("Prepare Email to condition", "prepare:email", "condition"). + AddEdge("Store Data to send sms and notification", "store:data", "send:sms", "notification") +} + +func sendData(f *dag.DAG) { + data := []map[string]any{ + { + "phone": "+123456789", + "email": "abc.xyz@gmail.com", + }, + { + "phone": "+98765412", + "email": "xyz.abc@gmail.com", + }, + } + bt, _ := json.Marshal(data) + result := f.Process(context.Background(), bt) + fmt.Println(string(result.Payload), result) +} + +func sync() { + f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true), mq.WithNotifyResponse(tasks.NotifyResponse)) + setup(f) + sendData(f) +} + +func async() { + f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithNotifyResponse(tasks.NotifyResponse)) + setup(f) + go func() { + err := f.Start(context.TODO(), ":8083") + if err != nil { + panic(err) + } + }() + time.Sleep(3 * time.Second) + sendData(f) + time.Sleep(10 * time.Second) } diff --git a/examples/dag_consumer.go b/examples/dag_consumer.go index b371e09..bbb1959 100644 --- a/examples/dag_consumer.go +++ b/examples/dag_consumer.go @@ -14,12 +14,12 @@ func main() { mq.WithSyncMode(true), mq.WithNotifyResponse(tasks.NotifyResponse), ) - d.AddNode("C", "C", tasks.Node3, true) - d.AddNode("D", "D", tasks.Node4) - d.AddNode("E", "E", tasks.Node5) - d.AddNode("F", "F", tasks.Node6) - d.AddNode("G", "G", tasks.Node7) - d.AddNode("H", "H", tasks.Node8) + d.AddNode("C", "C", &tasks.Node3{}, true) + d.AddNode("D", "D", &tasks.Node4{}) + d.AddNode("E", "E", &tasks.Node5{}) + d.AddNode("F", "F", &tasks.Node6{}) + d.AddNode("G", "G", &tasks.Node7{}) + d.AddNode("H", "H", &tasks.Node8{}) d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"}) d.AddEdge("Label 1", "B", "C") diff --git a/examples/subdag.go b/examples/subdag.go new file mode 100644 index 0000000..ff372ad --- /dev/null +++ b/examples/subdag.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/examples/tasks" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" +) + +var ( + d = dag.NewDAG( + "Sample DAG", + "sample-dag", + mq.WithSyncMode(true), + mq.WithNotifyResponse(tasks.NotifyResponse), + ) + // d = dag.NewDAG(mq.WithSyncMode(true), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert")) +) + +func main() { + subDag := dag.NewDAG( + "Sub DAG", + "D", + mq.WithNotifyResponse(tasks.NotifySubDAGResponse), + ) + subDag.AddNode("I", "I", &tasks.Node4{}, true) + subDag.AddNode("F", "F", &tasks.Node6{}) + subDag.AddNode("G", "G", &tasks.Node7{}) + subDag.AddNode("H", "H", &tasks.Node8{}) + subDag.AddEdge("Label 2", "I", "F") + subDag.AddEdge("Label 4", "F", "G", "H") + + d.AddNode("A", "A", &tasks.Node1{}, true) + d.AddNode("B", "B", &tasks.Node2{}) + d.AddNode("C", "C", &tasks.Node3{}) + d.AddDAGNode("D", "D", subDag) + d.AddNode("E", "E", &tasks.Node5{}) + d.AddLoop("Send each item", "A", "B") + d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"}) + d.AddEdge("Label 1", "B", "C") + // Classify edges + // d.ClassifyEdges() + // fmt.Println(d.ExportDOT()) + + http.HandleFunc("POST /publish", requestHandler("publish")) + http.HandleFunc("POST /request", requestHandler("request")) + http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) { + id := request.PathValue("id") + if id != "" { + d.PauseConsumer(request.Context(), id) + } + }) + http.HandleFunc("/resume-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) { + id := request.PathValue("id") + if id != "" { + d.ResumeConsumer(request.Context(), id) + } + }) + http.HandleFunc("/pause", func(writer http.ResponseWriter, request *http.Request) { + d.Pause(request.Context()) + }) + http.HandleFunc("/resume", func(writer http.ResponseWriter, request *http.Request) { + d.Resume(request.Context()) + }) + err := d.Start(context.TODO(), ":8083") + if err != nil { + panic(err) + } +} + +func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + return + } + var payload []byte + if r.Body != nil { + defer r.Body.Close() + var err error + payload, err = io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + } else { + http.Error(w, "Empty request body", http.StatusBadRequest) + return + } + ctx := r.Context() + if requestType == "request" { + ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"}) + } + // ctx = context.WithValue(ctx, "initial_node", "E") + rs := d.Process(ctx, payload) + if rs.Error != nil { + http.Error(w, fmt.Sprintf("[DAG Error] - %v", rs.Error), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(rs) + } +} diff --git a/examples/tasks/operations.go b/examples/tasks/operations.go new file mode 100644 index 0000000..3329cbb --- /dev/null +++ b/examples/tasks/operations.go @@ -0,0 +1,161 @@ +package tasks + +import ( + "context" + "fmt" + + "github.com/oarkflow/json" + + "github.com/oarkflow/dipper" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/services" +) + +type GetData struct { + services.Operation +} + +func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + fmt.Println("Getting Input", string(task.Payload)) + ctx = context.WithValue(ctx, "extra_params", map[string]any{"iphone": true}) + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type Loop struct { + services.Operation +} + +func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + fmt.Println(ctx.Value("extra_params"), "LOOOOOP") + fmt.Println("Looping...", string(task.Payload)) + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type Condition struct { + services.Operation +} + +func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + switch email := data["email"].(type) { + case string: + if email == "abc.xyz@gmail.com" { + fmt.Println("Checking...", data, "Pass...") + return mq.Result{Payload: task.Payload, Status: "pass", Ctx: ctx} + } + return mq.Result{Payload: task.Payload, Status: "fail", Ctx: ctx} + default: + return mq.Result{Payload: task.Payload, Status: "fail", Ctx: ctx} + } +} + +type PrepareEmail struct { + services.Operation +} + +func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + fmt.Println("Prepare Email") + panic(err) + } + data["email_valid"] = true + d, _ := json.Marshal(data) + fmt.Println("Preparing Email...", string(d)) + return mq.Result{Payload: d, Ctx: ctx} +} + +type EmailDelivery struct { + services.Operation +} + +func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + fmt.Println("Email Delivery") + panic(err) + } + fmt.Println("Sending Email...", data) + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type SendSms struct { + services.Operation +} + +func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + fmt.Println("Sending Sms...", data) + return mq.Result{Payload: task.Payload, Error: nil, Ctx: ctx} +} + +type StoreData struct { + services.Operation +} + +func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + + fmt.Println("Storing Data...", string(task.Payload)) + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type InAppNotification struct { + services.Operation +} + +func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + panic(err) + } + fmt.Println("In App notification...", data) + return mq.Result{Payload: task.Payload, Ctx: ctx} +} + +type DataBranchHandler struct{ services.Operation } + +func (v *DataBranchHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + ctx = context.WithValue(ctx, "extra_params", map[string]any{"iphone": true}) + var row map[string]any + var result mq.Result + result.Payload = task.Payload + err := json.Unmarshal(result.Payload, &row) + if err != nil { + result.Error = err + return result + } + fmt.Println("Data Branch...") + b := make(map[string]any) + switch branches := row["data_branch"].(type) { + case map[string]any: + for field, handler := range branches { + data, err := dipper.Get(row, field) + if err != nil { + break + } + b[handler.(string)] = data + } + break + } + br, err := json.Marshal(b) + if err != nil { + result.Error = err + return result + } + result.Status = "branches" + result.Payload = br + result.Ctx = ctx + return result +} diff --git a/examples/tasks/tasks.go b/examples/tasks/tasks.go index 574179b..ec478ae 100644 --- a/examples/tasks/tasks.go +++ b/examples/tasks/tasks.go @@ -7,19 +7,26 @@ import ( "log" "github.com/oarkflow/mq" + "github.com/oarkflow/mq/services" ) -func Node1(_ context.Context, task *mq.Task) mq.Result { +type Node1 struct{ services.Operation } + +func (t *Node1) ProcessTask(_ context.Context, task *mq.Task) mq.Result { fmt.Println("Node 1", string(task.Payload)) return mq.Result{Payload: task.Payload, TaskID: task.ID} } -func Node2(_ context.Context, task *mq.Task) mq.Result { +type Node2 struct{ services.Operation } + +func (t *Node2) ProcessTask(_ context.Context, task *mq.Task) mq.Result { fmt.Println("Node 2", string(task.Payload)) return mq.Result{Payload: task.Payload, TaskID: task.ID} } -func Node3(_ context.Context, task *mq.Task) mq.Result { +type Node3 struct{ services.Operation } + +func (t *Node3) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any _ = json.Unmarshal(task.Payload, &user) age := int(user["age"].(float64)) @@ -32,7 +39,9 @@ func Node3(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload, Status: status} } -func Node4(_ context.Context, task *mq.Task) mq.Result { +type Node4 struct{ services.Operation } + +func (t *Node4) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any _ = json.Unmarshal(task.Payload, &user) user["node"] = "D" @@ -40,7 +49,9 @@ func Node4(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload} } -func Node5(_ context.Context, task *mq.Task) mq.Result { +type Node5 struct{ services.Operation } + +func (t *Node5) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any _ = json.Unmarshal(task.Payload, &user) user["node"] = "E" @@ -48,14 +59,18 @@ func Node5(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload} } -func Node6(_ context.Context, task *mq.Task) mq.Result { +type Node6 struct{ services.Operation } + +func (t *Node6) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any _ = json.Unmarshal(task.Payload, &user) resultPayload, _ := json.Marshal(map[string]any{"storage": user}) return mq.Result{Payload: resultPayload} } -func Node7(_ context.Context, task *mq.Task) mq.Result { +type Node7 struct{ services.Operation } + +func (t *Node7) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any _ = json.Unmarshal(task.Payload, &user) user["node"] = "G" @@ -63,7 +78,9 @@ func Node7(_ context.Context, task *mq.Task) mq.Result { return mq.Result{Payload: resultPayload} } -func Node8(_ context.Context, task *mq.Task) mq.Result { +type Node8 struct{ services.Operation } + +func (t *Node8) ProcessTask(_ context.Context, task *mq.Task) mq.Result { var user map[string]any _ = json.Unmarshal(task.Payload, &user) user["node"] = "H" diff --git a/go.mod b/go.mod index 25387bc..ce058dd 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,12 @@ module github.com/oarkflow/mq go 1.23.0 require ( + github.com/oarkflow/date v0.0.4 + github.com/oarkflow/dipper v0.0.6 + github.com/oarkflow/errors v0.0.6 + github.com/oarkflow/expr v0.0.11 + github.com/oarkflow/json v0.0.13 github.com/oarkflow/xid v1.2.5 + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/time v0.7.0 ) diff --git a/go.sum b/go.sum index 18c6a80..00f370b 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,16 @@ +github.com/oarkflow/date v0.0.4 h1:EwY/wiS3CqZNBx7b2x+3kkJwVNuGk+G0dls76kL/fhU= +github.com/oarkflow/date v0.0.4/go.mod h1:xQTFc6p6O5VX6J75ZrPJbelIFGca1ASmhpgirFqL8vM= +github.com/oarkflow/dipper v0.0.6 h1:E+ak9i4R1lxx0B04CjfG5DTLTmwuWA1nrdS6KIHdUxQ= +github.com/oarkflow/dipper v0.0.6/go.mod h1:bnXQ6465eP8WZ9U3M7R24zeBG3P6IU5SASuvpAyCD9w= +github.com/oarkflow/errors v0.0.6 h1:qTBzVblrX6bFbqYLfatsrZHMBPchOZiIE3pfVzh1+k8= +github.com/oarkflow/errors v0.0.6/go.mod h1:UETn0Q55PJ+YUbpR4QImIoBavd6QvJtyW/oeTT7ghZM= +github.com/oarkflow/expr v0.0.11 h1:H6h+dIUlU+xDlijMXKQCh7TdE6MGVoFPpZU7q/dziRI= +github.com/oarkflow/expr v0.0.11/go.mod h1:WgMZqP44h7SBwKyuGZwC15vj46lHtI0/QpKdEZpRVE4= +github.com/oarkflow/json v0.0.13 h1:/ZKW924/v4U1ht34WY7rj/GC/qW9+10IiV5+MR2vO0A= +github.com/oarkflow/json v0.0.13/go.mod h1:S5BZA4/rM87+MY8mFrga3jISzxCL9RtLE6xHSk63VxI= github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho= github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo= +golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= +golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= diff --git a/options.go b/options.go index 083b477..3488e95 100644 --- a/options.go +++ b/options.go @@ -17,6 +17,7 @@ type Result struct { Topic string `json:"topic"` TaskID string `json:"task_id"` Status string `json:"status"` + Ctx context.Context `json:"-"` Payload json.RawMessage `json:"payload"` } @@ -27,15 +28,16 @@ func (r Result) Unmarshal(data any) error { return json.Unmarshal(r.Payload, data) } -func HandleError(_ context.Context, err error, status ...string) Result { +func HandleError(ctx context.Context, err error, status ...string) Result { st := "Failed" if len(status) > 0 { st = status[0] } if err == nil { - return Result{} + return Result{Ctx: ctx} } return Result{ + Ctx: ctx, Status: st, Error: err, } @@ -49,6 +51,7 @@ func (r Result) WithData(status string, data []byte) Result { Status: status, Payload: data, Error: nil, + Ctx: r.Ctx, } } @@ -66,8 +69,6 @@ type Options struct { tlsConfig TLSConfig brokerAddr string callback []func(context.Context, Result) Result - aesKey json.RawMessage - hmacKey json.RawMessage maxRetries int initialDelay time.Duration maxBackoff time.Duration @@ -76,7 +77,6 @@ type Options struct { numOfWorkers int maxMemoryLoad int64 syncMode bool - enableEncryption bool enableWorkerPool bool respondPendingResult bool } @@ -106,7 +106,6 @@ func defaultOptions() *Options { maxBackoff: 20 * time.Second, jitterPercent: 0.5, queueSize: 100, - hmacKey: []byte(`475f3adc6be9ee6f5357020e2922ff5b8f971598e175878e617d19df584bc648`), numOfWorkers: runtime.NumCPU(), maxMemoryLoad: 5000000, } @@ -150,19 +149,6 @@ func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName s } } -func WithSecretKey(aesKey json.RawMessage) Option { - return func(opts *Options) { - opts.aesKey = aesKey - opts.enableEncryption = true - } -} - -func WithHMACKey(hmacKey json.RawMessage) Option { - return func(opts *Options) { - opts.hmacKey = hmacKey - } -} - // WithBrokerURL - func WithBrokerURL(url string) Option { return func(opts *Options) { diff --git a/services/operation.go b/services/operation.go new file mode 100644 index 0000000..45b0f0b --- /dev/null +++ b/services/operation.go @@ -0,0 +1,463 @@ +package services + +import ( + "context" + "fmt" + "slices" + "strings" + "time" + + "github.com/oarkflow/json" + + "github.com/oarkflow/date" + "github.com/oarkflow/dipper" + "github.com/oarkflow/errors" + "github.com/oarkflow/expr" + "github.com/oarkflow/xid" + "golang.org/x/exp/maps" + + "github.com/oarkflow/mq" +) + +type Provider struct { + Mapping map[string]any `json:"mapping"` + UpdateMapping map[string]any `json:"update_mapping"` + InsertMapping map[string]any `json:"insert_mapping"` + Defaults map[string]any `json:"defaults"` + ProviderType string `json:"provider_type"` + Database string `json:"database"` + Source string `json:"source"` + Query string `json:"query"` +} + +type Payload struct { + Data map[string]any `json:"data"` + Mapping map[string]string `json:"mapping"` + GeneratedFields []string `json:"generated_fields"` + Providers []Provider `json:"providers"` +} + +type Operation struct { + ID string `json:"id"` + Type string `json:"type"` + Key string `json:"key"` + RequiredFields []string `json:"required_fields"` + OptionalFields []string `json:"optional_fields"` + GeneratedFields []string `json:"generated_fields"` + Payload Payload +} + +func (e *Operation) Consume(ctx context.Context) error { + return nil +} + +func (e *Operation) Pause(ctx context.Context) error { + return nil +} + +func (e *Operation) Resume(ctx context.Context) error { + return nil +} + +func (e *Operation) Stop(ctx context.Context) error { + return nil +} + +func (e *Operation) Close() error { + return nil +} + +func (e *Operation) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Payload: task.Payload} +} + +func (e *Operation) SetPayload(payload Payload) { + e.Payload = payload + e.GeneratedFields = slices.Compact(append(e.GeneratedFields, payload.GeneratedFields...)) +} + +func (e *Operation) GetType() string { + return e.Type +} + +func (e *Operation) GetKey() string { + return e.Key +} + +func (e *Operation) SetKey(key string) { + e.Key = key +} + +func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error) { + var keys []string + var data map[string]any + err := json.Unmarshal(payload, &data) + if err != nil { + return nil, err + } + for k, v := range e.Payload.Mapping { + _, val := GetVal(c, v, data) + if val != nil { + keys = append(keys, k) + } + } + for k := range e.Payload.Data { + keys = append(keys, k) + } + for _, k := range e.RequiredFields { + if !slices.Contains(keys, k) { + return nil, errors.New("Required field doesn't exist") + } + } + return data, nil +} + +func GetVal(c context.Context, v string, data map[string]any) (key string, val any) { + key, val = getVal(c, v, data) + if val == nil { + if strings.Contains(v, "+") { + vPartsG := strings.Split(v, "+") + var value []string + for _, v := range vPartsG { + key, val = getVal(c, strings.TrimSpace(v), data) + if val == nil { + continue + } + value = append(value, val.(string)) + } + val = strings.Join(value, "") + } else { + key, val = getVal(c, v, data) + } + } + + return +} + +func Header(c context.Context, headerKey string) (val map[string]any, exists bool) { + header := c.Value("header") + switch header := header.(type) { + case map[string]any: + if p, exist := header[headerKey]; exist && p != nil { + val = p.(map[string]any) + exists = exist + } + } + return +} + +func HeaderVal(c context.Context, headerKey string, key string) (val any) { + header := c.Value("header") + switch header := header.(type) { + case map[string]any: + if p, exists := header[headerKey]; exists && p != nil { + headerField := p.(map[string]any) + if v, e := headerField[key]; e { + val = v + } + } + } + return +} + +func getVal(c context.Context, v string, data map[string]any) (key string, val any) { + var param, query, consts map[string]any + var enums map[string]map[string]any + headerData := make(map[string]any) + header := c.Value("header") + switch header := header.(type) { + case map[string]any: + if p, exists := header["param"]; exists && p != nil { + param = p.(map[string]any) + } + if p, exists := header["query"]; exists && p != nil { + query = p.(map[string]any) + } + if p, exists := header["consts"]; exists && p != nil { + consts = p.(map[string]any) + } + if p, exists := header["enums"]; exists && p != nil { + enums = p.(map[string]map[string]any) + } + params := []string{"param", "query", "consts", "enums", "scopes"} + // add other data in header, other than param, query, consts, enums to data + for k, v := range header { + if !slices.Contains(params, k) { + headerData[k] = v + } + } + } + v = strings.TrimPrefix(v, "header.") + vParts := strings.Split(v, ".") + switch vParts[0] { + case "body": + v := vParts[1] + if strings.Contains(v, "*_") { + fieldSuffix := strings.ReplaceAll(v, "*", "") + for k, vt := range data { + if strings.HasSuffix(k, fieldSuffix) { + val = vt + key = k + } + } + } else { + if vd, ok := data[v]; ok { + val = vd + key = v + } + } + case "param": + v := vParts[1] + if strings.Contains(v, "*_") { + fieldSuffix := strings.ReplaceAll(v, "*", "") + for k, vt := range param { + if strings.HasSuffix(k, fieldSuffix) { + val = vt + key = k + } + } + } else { + if vd, ok := param[v]; ok { + val = vd + key = v + } + } + case "query": + v := vParts[1] + if strings.Contains(v, "*_") { + fieldSuffix := strings.ReplaceAll(v, "*", "") + for k, vt := range query { + if strings.HasSuffix(k, fieldSuffix) { + val = vt + key = k + } + } + } else { + if vd, ok := query[v]; ok { + val = vd + key = v + } + } + case "eval": + // connect string except the first one if more than two parts exist + var v string + if len(vParts) > 2 { + v = strings.Join(vParts[1:], ".") + } else { + v = vParts[1] + } + // remove '{{' and '}}' + v = v[2 : len(v)-2] + + // parse the expression + p, err := expr.Parse(v) + if err != nil { + return "", nil + } + // evaluate the expression + val, err := p.Eval(data) + if err != nil { + val, err := p.Eval(headerData) + if err == nil { + return v, val + } + return "", nil + } else { + return v, val + } + case "eval_raw", "gorm_eval": + // connect string except the first one if more than two parts exist + var v string + if len(vParts) > 2 { + v = strings.Join(vParts[1:], ".") + } else { + v = vParts[1] + } + // remove '{{' and '}}' + v = v[2 : len(v)-2] + + // parse the expression + p, err := expr.Parse(v) + if err != nil { + return "", nil + } + dt := map[string]any{ + "header": header, + } + for k, vt := range data { + dt[k] = vt + } + // evaluate the expression + val, err := p.Eval(dt) + if err != nil { + val, err := p.Eval(headerData) + if err == nil { + return v, val + } + return "", nil + } else { + return v, val + } + case "consts": + constG := vParts[1] + if constVal, ok := consts[constG]; ok { + val = constVal + key = v + } + case "enums": + enumG := vParts[1] + if enumGVal, ok := enums[enumG]; ok { + if enumVal, ok := enumGVal[vParts[2]]; ok { + val = enumVal + key = v + } + } + default: + if strings.Contains(v, "*_") { + fieldSuffix := strings.ReplaceAll(v, "*", "") + for k, vt := range data { + if strings.HasSuffix(k, fieldSuffix) { + val = vt + key = k + } + } + } else { + vd, err := dipper.Get(data, v) + if err == nil { + val = vd + key = v + } else { + vd, err := dipper.Get(headerData, v) + if err == nil { + val = vd + key = v + } + } + } + } + return +} + +func init() { + // define custom functions for use in config + expr.AddFunction("trim", func(params ...interface{}) (interface{}, error) { + if len(params) == 0 || len(params) > 1 || params[0] == nil { + return nil, errors.New("Invalid number of arguments") + } + val, ok := params[0].(string) + if !ok { + return nil, errors.New("Invalid argument type") + } + return strings.TrimSpace(val), nil + }) + expr.AddFunction("upper", func(params ...interface{}) (interface{}, error) { + if len(params) == 0 || len(params) > 1 || params[0] == nil { + return nil, errors.New("Invalid number of arguments") + } + val, ok := params[0].(string) + if !ok { + return nil, errors.New("Invalid argument type") + } + return strings.ToUpper(val), nil + }) + expr.AddFunction("lower", func(params ...interface{}) (interface{}, error) { + if len(params) == 0 || len(params) > 1 || params[0] == nil { + return nil, errors.New("Invalid number of arguments") + } + val, ok := params[0].(string) + if !ok { + return nil, errors.New("Invalid argument type") + } + return strings.ToLower(val), nil + }) + expr.AddFunction("date", func(params ...interface{}) (interface{}, error) { + if len(params) == 0 || len(params) > 1 || params[0] == nil { + return nil, errors.New("Invalid number of arguments") + } + val, ok := params[0].(string) + if !ok { + return nil, errors.New("Invalid argument type") + } + t, err := date.Parse(val) + if err != nil { + return nil, err + } + return t.Format("2006-01-02"), nil + }) + expr.AddFunction("datetime", func(params ...interface{}) (interface{}, error) { + if len(params) == 0 || len(params) > 1 || params[0] == nil { + return nil, errors.New("Invalid number of arguments") + } + val, ok := params[0].(string) + if !ok { + return nil, errors.New("Invalid argument type") + } + t, err := date.Parse(val) + if err != nil { + return nil, err + } + return t.Format(time.RFC3339), nil + }) + expr.AddFunction("addSecondsToNow", func(params ...interface{}) (interface{}, error) { + if len(params) == 0 || len(params) > 1 || params[0] == nil { + return nil, errors.New("Invalid number of arguments") + } + // if type of params[0] is not float64 or int, return error + tt, isFloat := params[0].(float64) + if !isFloat { + if _, ok := params[0].(int); !ok { + return nil, errors.New("Invalid argument type") + } + } + // add expiry to the current time + // convert parms[0] to int from float64 + if isFloat { + params[0] = int(tt) + } + t := time.Now().UTC() + t = t.Add(time.Duration(params[0].(int)) * time.Second) + return t, nil + }) + expr.AddFunction("values", func(params ...interface{}) (interface{}, error) { + if len(params) == 0 || len(params) > 2 { + return nil, errors.New("Invalid number of arguments") + } + // get values from map + mapList, ok := params[0].([]any) + if !ok { + return nil, errors.New("Invalid argument type") + } + keyToGet, hasKey := params[1].(string) + var values []any + if hasKey { + for _, m := range mapList { + mp := m.(map[string]any) + if val, ok := mp[keyToGet]; ok { + values = append(values, val) + } + } + } else { + for _, m := range mapList { + mp := m.(map[string]any) + vals := maps.Values(mp) + values = append(values, vals...) + } + } + return values, nil + }) + expr.AddFunction("uniqueid", func(params ...interface{}) (interface{}, error) { + // create a new xid + return xid.New().String(), nil + }) + expr.AddFunction("now", func(params ...interface{}) (interface{}, error) { + // get the current time in UTC + return time.Now().UTC(), nil + }) + expr.AddFunction("toString", func(params ...interface{}) (interface{}, error) { + if len(params) == 0 || len(params) > 1 || params[0] == nil { + return nil, errors.New("Invalid number of arguments") + } + // convert to string + return fmt.Sprint(params[0]), nil + }) +}