diff --git a/examples/dag.go b/examples/dag.go index b3c0c20..08c91c0 100644 --- a/examples/dag.go +++ b/examples/dag.go @@ -4,15 +4,17 @@ import ( "context" "encoding/json" "fmt" - "log" - "time" - "github.com/oarkflow/mq" "github.com/oarkflow/mq/dag" + "io" + "log" + "net/http" ) +var d *dag.DAG + func main() { - d := dag.New() + d = dag.New() d.AddNode("queue1", func(ctx context.Context, task mq.Task) mq.Result { return mq.Result{Payload: task.Payload, MessageID: task.ID} }) @@ -39,18 +41,44 @@ func main() { bt, _ := json.Marshal(payload) return mq.Result{Payload: bt, MessageID: task.ID} }) - d.AddEdge("queue1", "queue2") d.AddLoop("queue2", "queue3") d.AddEdge("queue2", "queue4") go func() { - time.Sleep(2 * time.Second) - finalResult := d.Send([]byte(`[{"user_id": 1}, {"user_id": 2}]`)) - log.Printf("Result received: %s %s", finalResult.MessageID, string(finalResult.Payload)) + err := d.Start(context.TODO()) + if err != nil { + panic(err) + } }() - - err := d.Start(context.TODO()) - if err != nil { - panic(err) - } + http.HandleFunc("/send-task", sendTaskHandler) + log.Println("HTTP server started on http://localhost:8083") + log.Fatal(http.ListenAndServe(":8083", nil)) +} +func sendTaskHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + return + } + var payload []byte + if r.Body != nil { + defer r.Body.Close() + var err error + payload, err = io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + } else { + http.Error(w, "Empty request body", http.StatusBadRequest) + return + } + finalResult := d.Send(payload) + w.Header().Set("Content-Type", "application/json") + result := map[string]any{ + "message_id": finalResult.MessageID, + "payload": string(finalResult.Payload), + "error": finalResult.Error, + } + + json.NewEncoder(w).Encode(result) } diff --git a/options.go b/options.go index cf23d7c..207f04d 100644 --- a/options.go +++ b/options.go @@ -20,7 +20,7 @@ type Options struct { func defaultOptions() Options { return Options{ - syncMode: true, + syncMode: false, brokerAddr: ":8080", maxRetries: 5, initialDelay: 2 * time.Second,