feat: separate broker

This commit is contained in:
Oarkflow
2024-09-29 20:58:33 +05:45
parent fb7e1e77c4
commit 014d2d17c6
2 changed files with 42 additions and 14 deletions

View File

@@ -4,15 +4,17 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"time"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
"github.com/oarkflow/mq/dag" "github.com/oarkflow/mq/dag"
"io"
"log"
"net/http"
) )
var d *dag.DAG
func main() { func main() {
d := dag.New() d = dag.New()
d.AddNode("queue1", func(ctx context.Context, task mq.Task) mq.Result { d.AddNode("queue1", func(ctx context.Context, task mq.Task) mq.Result {
return mq.Result{Payload: task.Payload, MessageID: task.ID} return mq.Result{Payload: task.Payload, MessageID: task.ID}
}) })
@@ -39,18 +41,44 @@ func main() {
bt, _ := json.Marshal(payload) bt, _ := json.Marshal(payload)
return mq.Result{Payload: bt, MessageID: task.ID} return mq.Result{Payload: bt, MessageID: task.ID}
}) })
d.AddEdge("queue1", "queue2") d.AddEdge("queue1", "queue2")
d.AddLoop("queue2", "queue3") d.AddLoop("queue2", "queue3")
d.AddEdge("queue2", "queue4") d.AddEdge("queue2", "queue4")
go func() { go func() {
time.Sleep(2 * time.Second) err := d.Start(context.TODO())
finalResult := d.Send([]byte(`[{"user_id": 1}, {"user_id": 2}]`)) if err != nil {
log.Printf("Result received: %s %s", finalResult.MessageID, string(finalResult.Payload)) panic(err)
}
}() }()
http.HandleFunc("/send-task", sendTaskHandler)
err := d.Start(context.TODO()) log.Println("HTTP server started on http://localhost:8083")
if err != nil { log.Fatal(http.ListenAndServe(":8083", nil))
panic(err) }
} func sendTaskHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
var payload []byte
if r.Body != nil {
defer r.Body.Close()
var err error
payload, err = io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
} else {
http.Error(w, "Empty request body", http.StatusBadRequest)
return
}
finalResult := d.Send(payload)
w.Header().Set("Content-Type", "application/json")
result := map[string]any{
"message_id": finalResult.MessageID,
"payload": string(finalResult.Payload),
"error": finalResult.Error,
}
json.NewEncoder(w).Encode(result)
} }

View File

@@ -20,7 +20,7 @@ type Options struct {
func defaultOptions() Options { func defaultOptions() Options {
return Options{ return Options{
syncMode: true, syncMode: false,
brokerAddr: ":8080", brokerAddr: ":8080",
maxRetries: 5, maxRetries: 5,
initialDelay: 2 * time.Second, initialDelay: 2 * time.Second,