feat: remove un-necessary dependencies

This commit is contained in:
sujit
2024-10-15 20:27:04 +05:45
parent 9f4de4453c
commit 8d96882d95
12 changed files with 890 additions and 139 deletions

View File

@@ -22,8 +22,10 @@ type Processor interface {
Pause(ctx context.Context) error Pause(ctx context.Context) error
Resume(ctx context.Context) error Resume(ctx context.Context) error
Stop(ctx context.Context) error Stop(ctx context.Context) error
GetKey() string
Close() error Close() error
GetKey() string
SetKey(key string)
GetType() string
} }
type Consumer struct { type Consumer struct {
@@ -62,6 +64,14 @@ func (c *Consumer) GetKey() string {
return c.id return c.id
} }
func (c *Consumer) GetType() string {
return "consumer"
}
func (c *Consumer) SetKey(key string) {
c.id = key
}
func (c *Consumer) subscribe(ctx context.Context, queue string) error { func (c *Consumer) subscribe(ctx context.Context, queue string) error {
headers := HeadersWithConsumerID(ctx, c.id) headers := HeadersWithConsumerID(ctx, c.id)
msg := codec.NewMessage(consts.SUBSCRIBE, utils.ToByte("{}"), queue, headers) msg := codec.NewMessage(consts.SUBSCRIBE, utils.ToByte("{}"), queue, headers)

View File

@@ -77,6 +77,14 @@ type DAG struct {
taskCleanupCh chan string taskCleanupCh chan string
} }
func (tm *DAG) SetKey(key string) {
tm.key = key
}
func (tm *DAG) GetType() string {
return tm.key
}
func (tm *DAG) listenForTaskCleanup() { func (tm *DAG) listenForTaskCleanup() {
for taskID := range tm.taskCleanupCh { for taskID := range tm.taskCleanupCh {
tm.mu.Lock() tm.mu.Lock()
@@ -121,6 +129,7 @@ func (tm *DAG) AssignTopic(topic string) {
} }
func NewDAG(name, key string, opts ...mq.Option) *DAG { func NewDAG(name, key string, opts ...mq.Option) *DAG {
callback := func(ctx context.Context, result mq.Result) error { return nil }
d := &DAG{ d := &DAG{
name: name, name: name,
key: key, key: key,
@@ -132,11 +141,10 @@ func NewDAG(name, key string, opts ...mq.Option) *DAG {
opts = append(opts, mq.WithCallback(d.onTaskCallback), mq.WithConsumerOnSubscribe(d.onConsumerJoin), mq.WithConsumerOnClose(d.onConsumerClose)) opts = append(opts, mq.WithCallback(d.onTaskCallback), mq.WithConsumerOnSubscribe(d.onConsumerJoin), mq.WithConsumerOnClose(d.onConsumerClose))
d.server = mq.NewBroker(opts...) d.server = mq.NewBroker(opts...)
d.opts = opts d.opts = opts
d.pool = mq.NewPool(d.server.Options().NumOfWorkers(), d.server.Options().QueueSize(), d.server.Options().MaxMemoryLoad(), d.ProcessTask, func(ctx context.Context, result mq.Result) error { options := d.server.Options()
return nil d.pool = mq.NewPool(options.NumOfWorkers(), options.QueueSize(), options.MaxMemoryLoad(), d.ProcessTask, callback)
})
d.pool.Start(d.server.Options().NumOfWorkers()) d.pool.Start(d.server.Options().NumOfWorkers())
go d.listenForTaskCleanup() // Start listening for task cleanup signals go d.listenForTaskCleanup()
return d return d
} }
@@ -227,10 +235,10 @@ func (tm *DAG) AddDAGNode(name string, key string, dag *DAG, firstNode ...bool)
} }
} }
func (tm *DAG) AddNode(name, key string, handler mq.Handler, firstNode ...bool) { func (tm *DAG) AddNode(name, key string, handler mq.Processor, firstNode ...bool) *DAG {
tm.mu.Lock() tm.mu.Lock()
defer tm.mu.Unlock() defer tm.mu.Unlock()
con := mq.NewConsumer(key, key, handler, tm.opts...) con := mq.NewConsumer(key, key, handler.ProcessTask, tm.opts...)
tm.nodes[key] = &Node{ tm.nodes[key] = &Node{
Name: name, Name: name,
Key: key, Key: key,
@@ -240,6 +248,7 @@ func (tm *DAG) AddNode(name, key string, handler mq.Handler, firstNode ...bool)
if len(firstNode) > 0 && firstNode[0] { if len(firstNode) > 0 && firstNode[0] {
tm.startNode = key tm.startNode = key
} }
return tm
} }
func (tm *DAG) AddDeferredNode(name, key string, firstNode ...bool) error { func (tm *DAG) AddDeferredNode(name, key string, firstNode ...bool) error {
@@ -269,18 +278,21 @@ func (tm *DAG) IsReady() bool {
return true return true
} }
func (tm *DAG) AddCondition(fromNode FromNode, conditions map[When]Then) { func (tm *DAG) AddCondition(fromNode FromNode, conditions map[When]Then) *DAG {
tm.mu.Lock() tm.mu.Lock()
defer tm.mu.Unlock() defer tm.mu.Unlock()
tm.conditions[fromNode] = conditions tm.conditions[fromNode] = conditions
return tm
} }
func (tm *DAG) AddLoop(label, from string, targets ...string) { func (tm *DAG) AddLoop(label, from string, targets ...string) *DAG {
tm.addEdge(Iterator, label, from, targets...) tm.addEdge(Iterator, label, from, targets...)
return tm
} }
func (tm *DAG) AddEdge(label, from string, targets ...string) { func (tm *DAG) AddEdge(label, from string, targets ...string) *DAG {
tm.addEdge(Simple, label, from, targets...) tm.addEdge(Simple, label, from, targets...)
return tm
} }
func (tm *DAG) addEdge(edgeType EdgeType, label, from string, targets ...string) { func (tm *DAG) addEdge(edgeType EdgeType, label, from string, targets ...string) {
@@ -334,6 +346,9 @@ func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
if err != nil { if err != nil {
return mq.Result{Error: err} return mq.Result{Error: err}
} }
if tm.server.SyncMode() {
ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
}
task := NewTask(mq.NewID(), payload, initialNode) task := NewTask(mq.NewID(), payload, initialNode)
awaitResponse, _ := mq.GetAwaitResponse(ctx) awaitResponse, _ := mq.GetAwaitResponse(ctx)
if awaitResponse != "true" { if awaitResponse != "true" {
@@ -342,7 +357,9 @@ func (tm *DAG) Process(ctx context.Context, payload []byte) mq.Result {
if ok { if ok {
ctxx = mq.SetHeaders(ctxx, headers.AsMap()) ctxx = mq.SetHeaders(ctxx, headers.AsMap())
} }
tm.pool.AddTask(ctxx, task) if err := tm.pool.AddTask(ctxx, task); err != nil {
return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: initialNode, Status: "FAILED", Error: err}
}
return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: initialNode, Status: "PENDING"} return mq.Result{CreatedAt: task.CreatedAt, TaskID: task.ID, Topic: initialNode, Status: "PENDING"}
} }
return tm.ProcessTask(ctx, task) return tm.ProcessTask(ctx, task)

View File

@@ -155,10 +155,16 @@ func (tm *TaskManager) handleResult(ctx context.Context, results any) mq.Result
if err != nil { if err != nil {
return mq.HandleError(ctx, err) return mq.HandleError(ctx, err)
} }
return mq.Result{TaskID: tm.taskID, Payload: finalOutput, Status: status, Topic: topic} return mq.Result{TaskID: tm.taskID, Payload: finalOutput, Status: status, Topic: topic, Ctx: ctx}
case mq.Result: case mq.Result:
if res.Ctx == nil {
res.Ctx = ctx
}
return res return res
} }
if rs.Ctx == nil {
rs.Ctx = ctx
}
return rs return rs
} }
@@ -188,7 +194,7 @@ func (tm *TaskManager) processNode(ctx context.Context, node *Node, payload json
} }
select { select {
case <-ctx.Done(): case <-ctx.Done():
result = mq.Result{TaskID: tm.taskID, Topic: node.Key, Error: ctx.Err()} result = mq.Result{TaskID: tm.taskID, Topic: node.Key, Error: ctx.Err(), Ctx: ctx}
tm.appendFinalResult(result) tm.appendFinalResult(result)
return return
default: default:

View File

@@ -4,107 +4,70 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "time"
"net/http"
"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/examples/tasks" "github.com/oarkflow/mq/examples/tasks"
"github.com/oarkflow/mq/services"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag" "github.com/oarkflow/mq/dag"
) )
var (
d = dag.NewDAG(
"Sample DAG",
"sample-dag",
mq.WithSyncMode(true),
mq.WithNotifyResponse(tasks.NotifyResponse),
)
// d = dag.NewDAG(mq.WithSyncMode(true), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert"))
)
func main() { func main() {
subDag := dag.NewDAG( sync()
"Sub DAG", async()
"D",
mq.WithNotifyResponse(tasks.NotifySubDAGResponse),
)
subDag.AddNode("I", "I", tasks.Node4, true)
subDag.AddNode("F", "F", tasks.Node6)
subDag.AddNode("G", "G", tasks.Node7)
subDag.AddNode("H", "H", tasks.Node8)
subDag.AddEdge("Label 2", "I", "F")
subDag.AddEdge("Label 4", "F", "G", "H")
d.AddNode("A", "A", tasks.Node1, true)
d.AddNode("B", "B", tasks.Node2)
d.AddNode("C", "C", tasks.Node3)
d.AddDAGNode("D", "D", subDag)
d.AddNode("E", "E", tasks.Node5)
d.AddLoop("Send each item", "A", "B")
d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"})
d.AddEdge("Label 1", "B", "C")
// Classify edges
// d.ClassifyEdges()
// fmt.Println(d.ExportDOT())
http.HandleFunc("POST /publish", requestHandler("publish"))
http.HandleFunc("POST /request", requestHandler("request"))
http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
id := request.PathValue("id")
if id != "" {
d.PauseConsumer(request.Context(), id)
}
})
http.HandleFunc("/resume-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
id := request.PathValue("id")
if id != "" {
d.ResumeConsumer(request.Context(), id)
}
})
http.HandleFunc("/pause", func(writer http.ResponseWriter, request *http.Request) {
d.Pause(request.Context())
})
http.HandleFunc("/resume", func(writer http.ResponseWriter, request *http.Request) {
d.Resume(request.Context())
})
err := d.Start(context.TODO(), ":8083")
if err != nil {
panic(err)
}
} }
func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) { func setup(f *dag.DAG) {
return func(w http.ResponseWriter, r *http.Request) { f.
if r.Method != http.MethodPost { AddNode("Email Delivery", "email:deliver", &tasks.EmailDelivery{Operation: services.Operation{Type: "process"}}).
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) AddNode("Prepare Email", "prepare:email", &tasks.PrepareEmail{Operation: services.Operation{Type: "process"}}).
return AddNode("Get Input", "get:input", &tasks.GetData{Operation: services.Operation{Type: "input"}}, true).
} AddNode("Iterator Processor", "loop", &tasks.Loop{Operation: services.Operation{Type: "loop"}}).
var payload []byte AddNode("Condition", "condition", &tasks.Condition{Operation: services.Operation{Type: "condition"}}).
if r.Body != nil { AddNode("Store data", "store:data", &tasks.StoreData{Operation: services.Operation{Type: "process"}}).
defer r.Body.Close() AddNode("Send SMS", "send:sms", &tasks.SendSms{Operation: services.Operation{Type: "process"}}).
var err error AddNode("Notification", "notification", &tasks.InAppNotification{Operation: services.Operation{Type: "process"}}).
payload, err = io.ReadAll(r.Body) AddNode("Data Branch", "data-branch", &tasks.DataBranchHandler{Operation: services.Operation{Type: "condition"}}).
if err != nil { AddCondition("condition", map[dag.When]dag.Then{"pass": "email:deliver", "fail": "store:data"}).
http.Error(w, "Failed to read request body", http.StatusBadRequest) AddEdge("Get input to loop", "get:input", "loop").
return AddLoop("Loop to prepare email", "loop", "prepare:email").
} AddEdge("Prepare Email to condition", "prepare:email", "condition").
} else { AddEdge("Store Data to send sms and notification", "store:data", "send:sms", "notification")
http.Error(w, "Empty request body", http.StatusBadRequest) }
return
} func sendData(f *dag.DAG) {
ctx := r.Context() data := []map[string]any{
if requestType == "request" { {
ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"}) "phone": "+123456789",
} "email": "abc.xyz@gmail.com",
// ctx = context.WithValue(ctx, "initial_node", "E") },
rs := d.Process(ctx, payload) {
if rs.Error != nil { "phone": "+98765412",
http.Error(w, fmt.Sprintf("[DAG Error] - %v", rs.Error), http.StatusInternalServerError) "email": "xyz.abc@gmail.com",
return },
} }
w.Header().Set("Content-Type", "application/json") bt, _ := json.Marshal(data)
json.NewEncoder(w).Encode(rs) result := f.Process(context.Background(), bt)
} fmt.Println(string(result.Payload), result)
}
func sync() {
f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithSyncMode(true), mq.WithNotifyResponse(tasks.NotifyResponse))
setup(f)
sendData(f)
}
func async() {
f := dag.NewDAG("Sample DAG", "sample-dag", mq.WithNotifyResponse(tasks.NotifyResponse))
setup(f)
go func() {
err := f.Start(context.TODO(), ":8083")
if err != nil {
panic(err)
}
}()
time.Sleep(3 * time.Second)
sendData(f)
time.Sleep(10 * time.Second)
} }

View File

@@ -14,12 +14,12 @@ func main() {
mq.WithSyncMode(true), mq.WithSyncMode(true),
mq.WithNotifyResponse(tasks.NotifyResponse), mq.WithNotifyResponse(tasks.NotifyResponse),
) )
d.AddNode("C", "C", tasks.Node3, true) d.AddNode("C", "C", &tasks.Node3{}, true)
d.AddNode("D", "D", tasks.Node4) d.AddNode("D", "D", &tasks.Node4{})
d.AddNode("E", "E", tasks.Node5) d.AddNode("E", "E", &tasks.Node5{})
d.AddNode("F", "F", tasks.Node6) d.AddNode("F", "F", &tasks.Node6{})
d.AddNode("G", "G", tasks.Node7) d.AddNode("G", "G", &tasks.Node7{})
d.AddNode("H", "H", tasks.Node8) d.AddNode("H", "H", &tasks.Node8{})
d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"}) d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"})
d.AddEdge("Label 1", "B", "C") d.AddEdge("Label 1", "B", "C")

110
examples/subdag.go Normal file
View File

@@ -0,0 +1,110 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/examples/tasks"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag"
)
var (
d = dag.NewDAG(
"Sample DAG",
"sample-dag",
mq.WithSyncMode(true),
mq.WithNotifyResponse(tasks.NotifyResponse),
)
// d = dag.NewDAG(mq.WithSyncMode(true), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert"))
)
func main() {
subDag := dag.NewDAG(
"Sub DAG",
"D",
mq.WithNotifyResponse(tasks.NotifySubDAGResponse),
)
subDag.AddNode("I", "I", &tasks.Node4{}, true)
subDag.AddNode("F", "F", &tasks.Node6{})
subDag.AddNode("G", "G", &tasks.Node7{})
subDag.AddNode("H", "H", &tasks.Node8{})
subDag.AddEdge("Label 2", "I", "F")
subDag.AddEdge("Label 4", "F", "G", "H")
d.AddNode("A", "A", &tasks.Node1{}, true)
d.AddNode("B", "B", &tasks.Node2{})
d.AddNode("C", "C", &tasks.Node3{})
d.AddDAGNode("D", "D", subDag)
d.AddNode("E", "E", &tasks.Node5{})
d.AddLoop("Send each item", "A", "B")
d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"})
d.AddEdge("Label 1", "B", "C")
// Classify edges
// d.ClassifyEdges()
// fmt.Println(d.ExportDOT())
http.HandleFunc("POST /publish", requestHandler("publish"))
http.HandleFunc("POST /request", requestHandler("request"))
http.HandleFunc("/pause-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
id := request.PathValue("id")
if id != "" {
d.PauseConsumer(request.Context(), id)
}
})
http.HandleFunc("/resume-consumer/{id}", func(writer http.ResponseWriter, request *http.Request) {
id := request.PathValue("id")
if id != "" {
d.ResumeConsumer(request.Context(), id)
}
})
http.HandleFunc("/pause", func(writer http.ResponseWriter, request *http.Request) {
d.Pause(request.Context())
})
http.HandleFunc("/resume", func(writer http.ResponseWriter, request *http.Request) {
d.Resume(request.Context())
})
err := d.Start(context.TODO(), ":8083")
if err != nil {
panic(err)
}
}
func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
var payload []byte
if r.Body != nil {
defer r.Body.Close()
var err error
payload, err = io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
} else {
http.Error(w, "Empty request body", http.StatusBadRequest)
return
}
ctx := r.Context()
if requestType == "request" {
ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"})
}
// ctx = context.WithValue(ctx, "initial_node", "E")
rs := d.Process(ctx, payload)
if rs.Error != nil {
http.Error(w, fmt.Sprintf("[DAG Error] - %v", rs.Error), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(rs)
}
}

View File

@@ -0,0 +1,161 @@
package tasks
import (
"context"
"fmt"
"github.com/oarkflow/json"
"github.com/oarkflow/dipper"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/services"
)
type GetData struct {
services.Operation
}
func (e *GetData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Println("Getting Input", string(task.Payload))
ctx = context.WithValue(ctx, "extra_params", map[string]any{"iphone": true})
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
type Loop struct {
services.Operation
}
func (e *Loop) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Println(ctx.Value("extra_params"), "LOOOOOP")
fmt.Println("Looping...", string(task.Payload))
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
type Condition struct {
services.Operation
}
func (e *Condition) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
switch email := data["email"].(type) {
case string:
if email == "abc.xyz@gmail.com" {
fmt.Println("Checking...", data, "Pass...")
return mq.Result{Payload: task.Payload, Status: "pass", Ctx: ctx}
}
return mq.Result{Payload: task.Payload, Status: "fail", Ctx: ctx}
default:
return mq.Result{Payload: task.Payload, Status: "fail", Ctx: ctx}
}
}
type PrepareEmail struct {
services.Operation
}
func (e *PrepareEmail) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
fmt.Println("Prepare Email")
panic(err)
}
data["email_valid"] = true
d, _ := json.Marshal(data)
fmt.Println("Preparing Email...", string(d))
return mq.Result{Payload: d, Ctx: ctx}
}
type EmailDelivery struct {
services.Operation
}
func (e *EmailDelivery) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
fmt.Println("Email Delivery")
panic(err)
}
fmt.Println("Sending Email...", data)
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
type SendSms struct {
services.Operation
}
func (e *SendSms) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
fmt.Println("Sending Sms...", data)
return mq.Result{Payload: task.Payload, Error: nil, Ctx: ctx}
}
type StoreData struct {
services.Operation
}
func (e *StoreData) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
fmt.Println("Storing Data...", string(task.Payload))
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
type InAppNotification struct {
services.Operation
}
func (e *InAppNotification) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
var data map[string]any
err := json.Unmarshal(task.Payload, &data)
if err != nil {
panic(err)
}
fmt.Println("In App notification...", data)
return mq.Result{Payload: task.Payload, Ctx: ctx}
}
type DataBranchHandler struct{ services.Operation }
func (v *DataBranchHandler) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
ctx = context.WithValue(ctx, "extra_params", map[string]any{"iphone": true})
var row map[string]any
var result mq.Result
result.Payload = task.Payload
err := json.Unmarshal(result.Payload, &row)
if err != nil {
result.Error = err
return result
}
fmt.Println("Data Branch...")
b := make(map[string]any)
switch branches := row["data_branch"].(type) {
case map[string]any:
for field, handler := range branches {
data, err := dipper.Get(row, field)
if err != nil {
break
}
b[handler.(string)] = data
}
break
}
br, err := json.Marshal(b)
if err != nil {
result.Error = err
return result
}
result.Status = "branches"
result.Payload = br
result.Ctx = ctx
return result
}

View File

@@ -7,19 +7,26 @@ import (
"log" "log"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/services"
) )
func Node1(_ context.Context, task *mq.Task) mq.Result { type Node1 struct{ services.Operation }
func (t *Node1) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
fmt.Println("Node 1", string(task.Payload)) fmt.Println("Node 1", string(task.Payload))
return mq.Result{Payload: task.Payload, TaskID: task.ID} return mq.Result{Payload: task.Payload, TaskID: task.ID}
} }
func Node2(_ context.Context, task *mq.Task) mq.Result { type Node2 struct{ services.Operation }
func (t *Node2) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
fmt.Println("Node 2", string(task.Payload)) fmt.Println("Node 2", string(task.Payload))
return mq.Result{Payload: task.Payload, TaskID: task.ID} return mq.Result{Payload: task.Payload, TaskID: task.ID}
} }
func Node3(_ context.Context, task *mq.Task) mq.Result { type Node3 struct{ services.Operation }
func (t *Node3) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
_ = json.Unmarshal(task.Payload, &user) _ = json.Unmarshal(task.Payload, &user)
age := int(user["age"].(float64)) age := int(user["age"].(float64))
@@ -32,7 +39,9 @@ func Node3(_ context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: resultPayload, Status: status} return mq.Result{Payload: resultPayload, Status: status}
} }
func Node4(_ context.Context, task *mq.Task) mq.Result { type Node4 struct{ services.Operation }
func (t *Node4) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
_ = json.Unmarshal(task.Payload, &user) _ = json.Unmarshal(task.Payload, &user)
user["node"] = "D" user["node"] = "D"
@@ -40,7 +49,9 @@ func Node4(_ context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: resultPayload} return mq.Result{Payload: resultPayload}
} }
func Node5(_ context.Context, task *mq.Task) mq.Result { type Node5 struct{ services.Operation }
func (t *Node5) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
_ = json.Unmarshal(task.Payload, &user) _ = json.Unmarshal(task.Payload, &user)
user["node"] = "E" user["node"] = "E"
@@ -48,14 +59,18 @@ func Node5(_ context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: resultPayload} return mq.Result{Payload: resultPayload}
} }
func Node6(_ context.Context, task *mq.Task) mq.Result { type Node6 struct{ services.Operation }
func (t *Node6) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
_ = json.Unmarshal(task.Payload, &user) _ = json.Unmarshal(task.Payload, &user)
resultPayload, _ := json.Marshal(map[string]any{"storage": user}) resultPayload, _ := json.Marshal(map[string]any{"storage": user})
return mq.Result{Payload: resultPayload} return mq.Result{Payload: resultPayload}
} }
func Node7(_ context.Context, task *mq.Task) mq.Result { type Node7 struct{ services.Operation }
func (t *Node7) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
_ = json.Unmarshal(task.Payload, &user) _ = json.Unmarshal(task.Payload, &user)
user["node"] = "G" user["node"] = "G"
@@ -63,7 +78,9 @@ func Node7(_ context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: resultPayload} return mq.Result{Payload: resultPayload}
} }
func Node8(_ context.Context, task *mq.Task) mq.Result { type Node8 struct{ services.Operation }
func (t *Node8) ProcessTask(_ context.Context, task *mq.Task) mq.Result {
var user map[string]any var user map[string]any
_ = json.Unmarshal(task.Payload, &user) _ = json.Unmarshal(task.Payload, &user)
user["node"] = "H" user["node"] = "H"

6
go.mod
View File

@@ -3,6 +3,12 @@ module github.com/oarkflow/mq
go 1.23.0 go 1.23.0
require ( require (
github.com/oarkflow/date v0.0.4
github.com/oarkflow/dipper v0.0.6
github.com/oarkflow/errors v0.0.6
github.com/oarkflow/expr v0.0.11
github.com/oarkflow/json v0.0.13
github.com/oarkflow/xid v1.2.5 github.com/oarkflow/xid v1.2.5
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/time v0.7.0 golang.org/x/time v0.7.0
) )

12
go.sum
View File

@@ -1,4 +1,16 @@
github.com/oarkflow/date v0.0.4 h1:EwY/wiS3CqZNBx7b2x+3kkJwVNuGk+G0dls76kL/fhU=
github.com/oarkflow/date v0.0.4/go.mod h1:xQTFc6p6O5VX6J75ZrPJbelIFGca1ASmhpgirFqL8vM=
github.com/oarkflow/dipper v0.0.6 h1:E+ak9i4R1lxx0B04CjfG5DTLTmwuWA1nrdS6KIHdUxQ=
github.com/oarkflow/dipper v0.0.6/go.mod h1:bnXQ6465eP8WZ9U3M7R24zeBG3P6IU5SASuvpAyCD9w=
github.com/oarkflow/errors v0.0.6 h1:qTBzVblrX6bFbqYLfatsrZHMBPchOZiIE3pfVzh1+k8=
github.com/oarkflow/errors v0.0.6/go.mod h1:UETn0Q55PJ+YUbpR4QImIoBavd6QvJtyW/oeTT7ghZM=
github.com/oarkflow/expr v0.0.11 h1:H6h+dIUlU+xDlijMXKQCh7TdE6MGVoFPpZU7q/dziRI=
github.com/oarkflow/expr v0.0.11/go.mod h1:WgMZqP44h7SBwKyuGZwC15vj46lHtI0/QpKdEZpRVE4=
github.com/oarkflow/json v0.0.13 h1:/ZKW924/v4U1ht34WY7rj/GC/qW9+10IiV5+MR2vO0A=
github.com/oarkflow/json v0.0.13/go.mod h1:S5BZA4/rM87+MY8mFrga3jISzxCL9RtLE6xHSk63VxI=
github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho= github.com/oarkflow/xid v1.2.5 h1:6RcNJm9+oZ/B647gkME9trCzhpxGQaSdNoD56Vmkeho=
github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo= github.com/oarkflow/xid v1.2.5/go.mod h1:jG4YBh+swbjlWApGWDBYnsJEa7hi3CCpmuqhB3RAxVo=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=

View File

@@ -17,6 +17,7 @@ type Result struct {
Topic string `json:"topic"` Topic string `json:"topic"`
TaskID string `json:"task_id"` TaskID string `json:"task_id"`
Status string `json:"status"` Status string `json:"status"`
Ctx context.Context `json:"-"`
Payload json.RawMessage `json:"payload"` Payload json.RawMessage `json:"payload"`
} }
@@ -27,15 +28,16 @@ func (r Result) Unmarshal(data any) error {
return json.Unmarshal(r.Payload, data) return json.Unmarshal(r.Payload, data)
} }
func HandleError(_ context.Context, err error, status ...string) Result { func HandleError(ctx context.Context, err error, status ...string) Result {
st := "Failed" st := "Failed"
if len(status) > 0 { if len(status) > 0 {
st = status[0] st = status[0]
} }
if err == nil { if err == nil {
return Result{} return Result{Ctx: ctx}
} }
return Result{ return Result{
Ctx: ctx,
Status: st, Status: st,
Error: err, Error: err,
} }
@@ -49,6 +51,7 @@ func (r Result) WithData(status string, data []byte) Result {
Status: status, Status: status,
Payload: data, Payload: data,
Error: nil, Error: nil,
Ctx: r.Ctx,
} }
} }
@@ -66,8 +69,6 @@ type Options struct {
tlsConfig TLSConfig tlsConfig TLSConfig
brokerAddr string brokerAddr string
callback []func(context.Context, Result) Result callback []func(context.Context, Result) Result
aesKey json.RawMessage
hmacKey json.RawMessage
maxRetries int maxRetries int
initialDelay time.Duration initialDelay time.Duration
maxBackoff time.Duration maxBackoff time.Duration
@@ -76,7 +77,6 @@ type Options struct {
numOfWorkers int numOfWorkers int
maxMemoryLoad int64 maxMemoryLoad int64
syncMode bool syncMode bool
enableEncryption bool
enableWorkerPool bool enableWorkerPool bool
respondPendingResult bool respondPendingResult bool
} }
@@ -106,7 +106,6 @@ func defaultOptions() *Options {
maxBackoff: 20 * time.Second, maxBackoff: 20 * time.Second,
jitterPercent: 0.5, jitterPercent: 0.5,
queueSize: 100, queueSize: 100,
hmacKey: []byte(`475f3adc6be9ee6f5357020e2922ff5b8f971598e175878e617d19df584bc648`),
numOfWorkers: runtime.NumCPU(), numOfWorkers: runtime.NumCPU(),
maxMemoryLoad: 5000000, maxMemoryLoad: 5000000,
} }
@@ -150,19 +149,6 @@ func WithConsumerOnClose(handler func(ctx context.Context, topic, consumerName s
} }
} }
func WithSecretKey(aesKey json.RawMessage) Option {
return func(opts *Options) {
opts.aesKey = aesKey
opts.enableEncryption = true
}
}
func WithHMACKey(hmacKey json.RawMessage) Option {
return func(opts *Options) {
opts.hmacKey = hmacKey
}
}
// WithBrokerURL - // WithBrokerURL -
func WithBrokerURL(url string) Option { func WithBrokerURL(url string) Option {
return func(opts *Options) { return func(opts *Options) {

463
services/operation.go Normal file
View File

@@ -0,0 +1,463 @@
package services
import (
"context"
"fmt"
"slices"
"strings"
"time"
"github.com/oarkflow/json"
"github.com/oarkflow/date"
"github.com/oarkflow/dipper"
"github.com/oarkflow/errors"
"github.com/oarkflow/expr"
"github.com/oarkflow/xid"
"golang.org/x/exp/maps"
"github.com/oarkflow/mq"
)
type Provider struct {
Mapping map[string]any `json:"mapping"`
UpdateMapping map[string]any `json:"update_mapping"`
InsertMapping map[string]any `json:"insert_mapping"`
Defaults map[string]any `json:"defaults"`
ProviderType string `json:"provider_type"`
Database string `json:"database"`
Source string `json:"source"`
Query string `json:"query"`
}
type Payload struct {
Data map[string]any `json:"data"`
Mapping map[string]string `json:"mapping"`
GeneratedFields []string `json:"generated_fields"`
Providers []Provider `json:"providers"`
}
type Operation struct {
ID string `json:"id"`
Type string `json:"type"`
Key string `json:"key"`
RequiredFields []string `json:"required_fields"`
OptionalFields []string `json:"optional_fields"`
GeneratedFields []string `json:"generated_fields"`
Payload Payload
}
func (e *Operation) Consume(ctx context.Context) error {
return nil
}
func (e *Operation) Pause(ctx context.Context) error {
return nil
}
func (e *Operation) Resume(ctx context.Context) error {
return nil
}
func (e *Operation) Stop(ctx context.Context) error {
return nil
}
func (e *Operation) Close() error {
return nil
}
func (e *Operation) ProcessTask(ctx context.Context, task *mq.Task) mq.Result {
return mq.Result{Payload: task.Payload}
}
func (e *Operation) SetPayload(payload Payload) {
e.Payload = payload
e.GeneratedFields = slices.Compact(append(e.GeneratedFields, payload.GeneratedFields...))
}
func (e *Operation) GetType() string {
return e.Type
}
func (e *Operation) GetKey() string {
return e.Key
}
func (e *Operation) SetKey(key string) {
e.Key = key
}
func (e *Operation) ValidateFields(c context.Context, payload []byte) (map[string]any, error) {
var keys []string
var data map[string]any
err := json.Unmarshal(payload, &data)
if err != nil {
return nil, err
}
for k, v := range e.Payload.Mapping {
_, val := GetVal(c, v, data)
if val != nil {
keys = append(keys, k)
}
}
for k := range e.Payload.Data {
keys = append(keys, k)
}
for _, k := range e.RequiredFields {
if !slices.Contains(keys, k) {
return nil, errors.New("Required field doesn't exist")
}
}
return data, nil
}
func GetVal(c context.Context, v string, data map[string]any) (key string, val any) {
key, val = getVal(c, v, data)
if val == nil {
if strings.Contains(v, "+") {
vPartsG := strings.Split(v, "+")
var value []string
for _, v := range vPartsG {
key, val = getVal(c, strings.TrimSpace(v), data)
if val == nil {
continue
}
value = append(value, val.(string))
}
val = strings.Join(value, "")
} else {
key, val = getVal(c, v, data)
}
}
return
}
func Header(c context.Context, headerKey string) (val map[string]any, exists bool) {
header := c.Value("header")
switch header := header.(type) {
case map[string]any:
if p, exist := header[headerKey]; exist && p != nil {
val = p.(map[string]any)
exists = exist
}
}
return
}
func HeaderVal(c context.Context, headerKey string, key string) (val any) {
header := c.Value("header")
switch header := header.(type) {
case map[string]any:
if p, exists := header[headerKey]; exists && p != nil {
headerField := p.(map[string]any)
if v, e := headerField[key]; e {
val = v
}
}
}
return
}
func getVal(c context.Context, v string, data map[string]any) (key string, val any) {
var param, query, consts map[string]any
var enums map[string]map[string]any
headerData := make(map[string]any)
header := c.Value("header")
switch header := header.(type) {
case map[string]any:
if p, exists := header["param"]; exists && p != nil {
param = p.(map[string]any)
}
if p, exists := header["query"]; exists && p != nil {
query = p.(map[string]any)
}
if p, exists := header["consts"]; exists && p != nil {
consts = p.(map[string]any)
}
if p, exists := header["enums"]; exists && p != nil {
enums = p.(map[string]map[string]any)
}
params := []string{"param", "query", "consts", "enums", "scopes"}
// add other data in header, other than param, query, consts, enums to data
for k, v := range header {
if !slices.Contains(params, k) {
headerData[k] = v
}
}
}
v = strings.TrimPrefix(v, "header.")
vParts := strings.Split(v, ".")
switch vParts[0] {
case "body":
v := vParts[1]
if strings.Contains(v, "*_") {
fieldSuffix := strings.ReplaceAll(v, "*", "")
for k, vt := range data {
if strings.HasSuffix(k, fieldSuffix) {
val = vt
key = k
}
}
} else {
if vd, ok := data[v]; ok {
val = vd
key = v
}
}
case "param":
v := vParts[1]
if strings.Contains(v, "*_") {
fieldSuffix := strings.ReplaceAll(v, "*", "")
for k, vt := range param {
if strings.HasSuffix(k, fieldSuffix) {
val = vt
key = k
}
}
} else {
if vd, ok := param[v]; ok {
val = vd
key = v
}
}
case "query":
v := vParts[1]
if strings.Contains(v, "*_") {
fieldSuffix := strings.ReplaceAll(v, "*", "")
for k, vt := range query {
if strings.HasSuffix(k, fieldSuffix) {
val = vt
key = k
}
}
} else {
if vd, ok := query[v]; ok {
val = vd
key = v
}
}
case "eval":
// connect string except the first one if more than two parts exist
var v string
if len(vParts) > 2 {
v = strings.Join(vParts[1:], ".")
} else {
v = vParts[1]
}
// remove '{{' and '}}'
v = v[2 : len(v)-2]
// parse the expression
p, err := expr.Parse(v)
if err != nil {
return "", nil
}
// evaluate the expression
val, err := p.Eval(data)
if err != nil {
val, err := p.Eval(headerData)
if err == nil {
return v, val
}
return "", nil
} else {
return v, val
}
case "eval_raw", "gorm_eval":
// connect string except the first one if more than two parts exist
var v string
if len(vParts) > 2 {
v = strings.Join(vParts[1:], ".")
} else {
v = vParts[1]
}
// remove '{{' and '}}'
v = v[2 : len(v)-2]
// parse the expression
p, err := expr.Parse(v)
if err != nil {
return "", nil
}
dt := map[string]any{
"header": header,
}
for k, vt := range data {
dt[k] = vt
}
// evaluate the expression
val, err := p.Eval(dt)
if err != nil {
val, err := p.Eval(headerData)
if err == nil {
return v, val
}
return "", nil
} else {
return v, val
}
case "consts":
constG := vParts[1]
if constVal, ok := consts[constG]; ok {
val = constVal
key = v
}
case "enums":
enumG := vParts[1]
if enumGVal, ok := enums[enumG]; ok {
if enumVal, ok := enumGVal[vParts[2]]; ok {
val = enumVal
key = v
}
}
default:
if strings.Contains(v, "*_") {
fieldSuffix := strings.ReplaceAll(v, "*", "")
for k, vt := range data {
if strings.HasSuffix(k, fieldSuffix) {
val = vt
key = k
}
}
} else {
vd, err := dipper.Get(data, v)
if err == nil {
val = vd
key = v
} else {
vd, err := dipper.Get(headerData, v)
if err == nil {
val = vd
key = v
}
}
}
}
return
}
func init() {
// define custom functions for use in config
expr.AddFunction("trim", func(params ...interface{}) (interface{}, error) {
if len(params) == 0 || len(params) > 1 || params[0] == nil {
return nil, errors.New("Invalid number of arguments")
}
val, ok := params[0].(string)
if !ok {
return nil, errors.New("Invalid argument type")
}
return strings.TrimSpace(val), nil
})
expr.AddFunction("upper", func(params ...interface{}) (interface{}, error) {
if len(params) == 0 || len(params) > 1 || params[0] == nil {
return nil, errors.New("Invalid number of arguments")
}
val, ok := params[0].(string)
if !ok {
return nil, errors.New("Invalid argument type")
}
return strings.ToUpper(val), nil
})
expr.AddFunction("lower", func(params ...interface{}) (interface{}, error) {
if len(params) == 0 || len(params) > 1 || params[0] == nil {
return nil, errors.New("Invalid number of arguments")
}
val, ok := params[0].(string)
if !ok {
return nil, errors.New("Invalid argument type")
}
return strings.ToLower(val), nil
})
expr.AddFunction("date", func(params ...interface{}) (interface{}, error) {
if len(params) == 0 || len(params) > 1 || params[0] == nil {
return nil, errors.New("Invalid number of arguments")
}
val, ok := params[0].(string)
if !ok {
return nil, errors.New("Invalid argument type")
}
t, err := date.Parse(val)
if err != nil {
return nil, err
}
return t.Format("2006-01-02"), nil
})
expr.AddFunction("datetime", func(params ...interface{}) (interface{}, error) {
if len(params) == 0 || len(params) > 1 || params[0] == nil {
return nil, errors.New("Invalid number of arguments")
}
val, ok := params[0].(string)
if !ok {
return nil, errors.New("Invalid argument type")
}
t, err := date.Parse(val)
if err != nil {
return nil, err
}
return t.Format(time.RFC3339), nil
})
expr.AddFunction("addSecondsToNow", func(params ...interface{}) (interface{}, error) {
if len(params) == 0 || len(params) > 1 || params[0] == nil {
return nil, errors.New("Invalid number of arguments")
}
// if type of params[0] is not float64 or int, return error
tt, isFloat := params[0].(float64)
if !isFloat {
if _, ok := params[0].(int); !ok {
return nil, errors.New("Invalid argument type")
}
}
// add expiry to the current time
// convert parms[0] to int from float64
if isFloat {
params[0] = int(tt)
}
t := time.Now().UTC()
t = t.Add(time.Duration(params[0].(int)) * time.Second)
return t, nil
})
expr.AddFunction("values", func(params ...interface{}) (interface{}, error) {
if len(params) == 0 || len(params) > 2 {
return nil, errors.New("Invalid number of arguments")
}
// get values from map
mapList, ok := params[0].([]any)
if !ok {
return nil, errors.New("Invalid argument type")
}
keyToGet, hasKey := params[1].(string)
var values []any
if hasKey {
for _, m := range mapList {
mp := m.(map[string]any)
if val, ok := mp[keyToGet]; ok {
values = append(values, val)
}
}
} else {
for _, m := range mapList {
mp := m.(map[string]any)
vals := maps.Values(mp)
values = append(values, vals...)
}
}
return values, nil
})
expr.AddFunction("uniqueid", func(params ...interface{}) (interface{}, error) {
// create a new xid
return xid.New().String(), nil
})
expr.AddFunction("now", func(params ...interface{}) (interface{}, error) {
// get the current time in UTC
return time.Now().UTC(), nil
})
expr.AddFunction("toString", func(params ...interface{}) (interface{}, error) {
if len(params) == 0 || len(params) > 1 || params[0] == nil {
return nil, errors.New("Invalid number of arguments")
}
// convert to string
return fmt.Sprint(params[0]), nil
})
}