From aa324f4435ecaf492fdd532410111b4538053a0a Mon Sep 17 00:00:00 2001 From: Oarkflow Date: Wed, 9 Oct 2024 11:29:06 +0545 Subject: [PATCH] feat: separate broker --- README.md | 139 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 77 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index f2a5445..f7c0180 100644 --- a/README.md +++ b/README.md @@ -28,45 +28,63 @@ import ( "context" "encoding/json" "fmt" - + "log" + "github.com/oarkflow/mq" ) -func Node1(ctx context.Context, task mq.Task) mq.Result { - fmt.Println("Processing queue1") - return mq.Result{Payload: task.Payload, MessageID: task.ID} +func Node1(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Payload: task.Payload, TaskID: task.ID} } -func Node2(ctx context.Context, task mq.Task) mq.Result { - return mq.Result{Payload: task.Payload, MessageID: task.ID} +func Node2(ctx context.Context, task *mq.Task) mq.Result { + return mq.Result{Payload: task.Payload, TaskID: task.ID} } -func Node3(ctx context.Context, task mq.Task) mq.Result { - var data map[string]any - err := json.Unmarshal(task.Payload, &data) - if err != nil { - return mq.Result{Error: err} +func Node3(ctx context.Context, task *mq.Task) mq.Result { + var user map[string]any + json.Unmarshal(task.Payload, &user) + age := int(user["age"].(float64)) + status := "FAIL" + if age > 20 { + status = "PASS" } - data["salary"] = fmt.Sprintf("12000%v", data["user_id"]) - bt, _ := json.Marshal(data) - return mq.Result{Payload: bt, MessageID: task.ID} + user["status"] = status + resultPayload, _ := json.Marshal(user) + return mq.Result{Payload: resultPayload, Status: status} } -func Node4(ctx context.Context, task mq.Task) mq.Result { - var data []map[string]any - err := json.Unmarshal(task.Payload, &data) - if err != nil { - return mq.Result{Error: err} - } - payload := map[string]any{"storage": data} - bt, _ := json.Marshal(payload) - return mq.Result{Payload: bt, MessageID: task.ID} +func Node4(ctx context.Context, task *mq.Task) mq.Result { + var user map[string]any + json.Unmarshal(task.Payload, &user) + user["final"] = "D" + resultPayload, _ := json.Marshal(user) + return mq.Result{Payload: resultPayload} +} + +func Node5(ctx context.Context, task *mq.Task) mq.Result { + var user map[string]any + json.Unmarshal(task.Payload, &user) + user["salary"] = "E" + resultPayload, _ := json.Marshal(user) + return mq.Result{Payload: resultPayload} +} + +func Node6(ctx context.Context, task *mq.Task) mq.Result { + var user map[string]any + json.Unmarshal(task.Payload, &user) + resultPayload, _ := json.Marshal(map[string]any{"storage": user}) + return mq.Result{Payload: resultPayload} } func Callback(ctx context.Context, task mq.Result) mq.Result { - fmt.Println("Received task", task.MessageID, "Payload", string(task.Payload), task.Error, task.Queue) + fmt.Println("Received task", task.TaskID, "Payload", string(task.Payload), task.Error, task.Topic) return mq.Result{} } + +func NotifyResponse(ctx context.Context, result mq.Result) { + log.Printf("DAG Final response: TaskID: %s, Payload: %s, Topic: %s", result.TaskID, result.Payload, result.Topic) +} ``` ## Start Server @@ -100,16 +118,18 @@ package main import ( "context" - + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/examples/tasks" ) func main() { - consumer := mq.NewConsumer("consumer-1") - consumer.RegisterHandler("queue1", tasks.Node1) - consumer.RegisterHandler("queue2", tasks.Node2) - consumer.Consume(context.Background()) + consumer1 := mq.NewConsumer("consumer-1", "queue1", tasks.Node1) + consumer2 := mq.NewConsumer("consumer-2", "queue2", tasks.Node2) + // consumer := mq.NewConsumer("consumer-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key")) + go consumer1.Consume(context.Background()) + consumer2.Consume(context.Background()) } ``` @@ -164,38 +184,38 @@ package main import ( "context" "encoding/json" + "github.com/oarkflow/mq/consts" + "github.com/oarkflow/mq/examples/tasks" "io" - "log" "net/http" "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" - "github.com/oarkflow/mq/examples/tasks" ) -var d *dag.DAG +var ( + d = dag.NewDAG(mq.WithSyncMode(false), mq.WithNotifyResponse(tasks.NotifyResponse)) + // d = dag.NewDAG(mq.WithSyncMode(true), mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"), mq.WithCAPath("./certs/ca.cert")) +) func main() { - d = dag.New() - d.AddNode("queue1", tasks.Node1) - d.AddNode("queue2", tasks.Node2) - d.AddNode("queue3", tasks.Node3) - d.AddNode("queue4", tasks.Node4) - - d.AddEdge("queue1", "queue2") - d.AddLoop("queue2", "queue3") - d.AddEdge("queue2", "queue4") - d.Prepare() - go func() { - err := d.Start(context.TODO()) - if err != nil { - panic(err) - } - }() - http.HandleFunc("/publish", requestHandler("publish")) - http.HandleFunc("/request", requestHandler("request")) - log.Println("HTTP server started on http://localhost:8083") - log.Fatal(http.ListenAndServe(":8083", nil)) + d.AddNode("A", tasks.Node1, true) + d.AddNode("B", tasks.Node2) + d.AddNode("C", tasks.Node3) + d.AddNode("D", tasks.Node4) + d.AddNode("E", tasks.Node5) + d.AddNode("F", tasks.Node6) + d.AddEdge("A", "B", dag.LoopEdge) + d.AddCondition("C", map[string]string{"PASS": "D", "FAIL": "E"}) + d.AddEdge("B", "C") + d.AddEdge("D", "F") + d.AddEdge("E", "F") + http.HandleFunc("POST /publish", requestHandler("publish")) + http.HandleFunc("POST /request", requestHandler("request")) + err := d.Start(context.TODO(), ":8083") + if err != nil { + panic(err) + } } func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) { @@ -217,19 +237,14 @@ func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Requ http.Error(w, "Empty request body", http.StatusBadRequest) return } - var rs mq.Result + ctx := context.Background() if requestType == "request" { - rs = d.Request(context.Background(), payload) - } else { - rs = d.Send(context.Background(), payload) + ctx = mq.SetHeaders(ctx, map[string]string{consts.AwaitResponseKey: "true"}) } + // ctx = context.WithValue(ctx, "initial_node", "E") + rs := d.ProcessTask(ctx, payload) w.Header().Set("Content-Type", "application/json") - result := map[string]any{ - "message_id": rs.MessageID, - "payload": string(rs.Payload), - "error": rs.Error, - } - json.NewEncoder(w).Encode(result) + json.NewEncoder(w).Encode(rs) } } ```