diff --git a/README.md b/README.md new file mode 100644 index 0000000..1318a51 --- /dev/null +++ b/README.md @@ -0,0 +1,221 @@ +# Introduction MQ (Message Queue Broker) + +A simple Pub/Sub system memory based task processing. It uses centralized server to manage consumers and publishers. + + +## Examples: + +[tasks.go](./examples/tasks/tasks.go) + +```go +package tasks + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/oarkflow/mq" +) + +func Node1(ctx context.Context, task mq.Task) mq.Result { + fmt.Println("Processing queue1") + return mq.Result{Payload: task.Payload, MessageID: task.ID} +} + +func Node2(ctx context.Context, task mq.Task) mq.Result { + return mq.Result{Payload: task.Payload, MessageID: task.ID} +} + +func Node3(ctx context.Context, task mq.Task) mq.Result { + var data map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + return mq.Result{Error: err} + } + data["salary"] = fmt.Sprintf("12000%v", data["user_id"]) + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, MessageID: task.ID} +} + +func Node4(ctx context.Context, task mq.Task) mq.Result { + var data []map[string]any + err := json.Unmarshal(task.Payload, &data) + if err != nil { + return mq.Result{Error: err} + } + payload := map[string]any{"storage": data} + bt, _ := json.Marshal(payload) + return mq.Result{Payload: bt, MessageID: task.ID} +} + +func Callback(ctx context.Context, task mq.Result) mq.Result { + fmt.Println("Received task", task.MessageID, "Payload", string(task.Payload), task.Error, task.Queue) + return mq.Result{} +} +``` + +## Start Server + +[server.go](./examples/server.go) + +```go +package main + +import ( + "context" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/examples/tasks" +) + +func main() { + b := mq.NewBroker(mq.WithCallback(tasks.Callback)) + b.NewQueue("queue1") + b.NewQueue("queue2") + b.Start(context.Background()) +} +``` + +## Start Consumer + +[consumer.go](./examples/consumer.go) + +```go +package main + +import ( + "context" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/examples/tasks" +) + +func main() { + consumer := mq.NewConsumer("consumer-1") + consumer.RegisterHandler("queue1", tasks.Node1) + consumer.RegisterHandler("queue2", tasks.Node2) + consumer.Consume(context.Background()) +} +``` + +## Publish tasks + +[publisher.go](./examples/publisher.go) + +```go +package main + +import ( + "context" + "fmt" + + "github.com/oarkflow/mq" +) + +func main() { + payload := []byte(`{"message":"Message Publisher \n Task"}`) + task := mq.Task{ + Payload: payload, + } + publisher := mq.NewPublisher("publish-1") + err := publisher.Publish(context.Background(), "queue1", task) + if err != nil { + panic(err) + } + fmt.Println("Async task published successfully") + payload = []byte(`{"message":"Fire-and-Forget \n Task"}`) + task = mq.Task{ + Payload: payload, + } + result, err := publisher.Request(context.Background(), "queue1", task) + if err != nil { + panic(err) + } + fmt.Printf("Sync task published. Result: %v\n", string(result.Payload)) +} +``` + +# DAG (Directed Acyclic Graph) + +In this package, you can use the `DAG` feature to create a directed acyclic graph of tasks. The `DAG` feature allows you to define a sequence of tasks that need to be executed in a specific order. + +## Example + +[dag.go](./examples/dag.go) + +```go +package main + +import ( + "context" + "encoding/json" + "io" + "log" + "net/http" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/examples/tasks" +) + +var d *dag.DAG + +func main() { + d = dag.New() + d.AddNode("queue1", tasks.Node1) + d.AddNode("queue2", tasks.Node2) + d.AddNode("queue3", tasks.Node3) + d.AddNode("queue4", tasks.Node4) + + d.AddEdge("queue1", "queue2") + d.AddLoop("queue2", "queue3") + d.AddEdge("queue2", "queue4") + d.Prepare() + go func() { + err := d.Start(context.TODO()) + if err != nil { + panic(err) + } + }() + http.HandleFunc("/publish", requestHandler("publish")) + http.HandleFunc("/request", requestHandler("request")) + log.Println("HTTP server started on http://localhost:8083") + log.Fatal(http.ListenAndServe(":8083", nil)) +} + +func requestHandler(requestType string) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Invalid request method", http.StatusMethodNotAllowed) + return + } + var payload []byte + if r.Body != nil { + defer r.Body.Close() + var err error + payload, err = io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusBadRequest) + return + } + } else { + http.Error(w, "Empty request body", http.StatusBadRequest) + return + } + var rs mq.Result + if requestType == "request" { + rs = d.Request(context.Background(), payload) + } else { + rs = d.Send(context.Background(), payload) + } + w.Header().Set("Content-Type", "application/json") + result := map[string]any{ + "message_id": rs.MessageID, + "payload": string(rs.Payload), + "error": rs.Error, + } + json.NewEncoder(w).Encode(result) + } +} +``` \ No newline at end of file diff --git a/examples/consumer.go b/examples/consumer.go index fa0d23d..708a0cd 100644 --- a/examples/consumer.go +++ b/examples/consumer.go @@ -2,20 +2,14 @@ package main import ( "context" - "fmt" "github.com/oarkflow/mq" + "github.com/oarkflow/mq/examples/tasks" ) func main() { consumer := mq.NewConsumer("consumer-1") - consumer.RegisterHandler("queue1", func(ctx context.Context, task mq.Task) mq.Result { - fmt.Println("Handling task for queue1:", string(task.Payload)) - return mq.Result{Payload: []byte(`{"task": 123}`), MessageID: task.ID} - }) - consumer.RegisterHandler("queue2", func(ctx context.Context, task mq.Task) mq.Result { - fmt.Println("Handling task for queue2:", task.ID) - return mq.Result{Payload: task.Payload, MessageID: task.ID} - }) + consumer.RegisterHandler("queue1", tasks.Node1) + consumer.RegisterHandler("queue2", tasks.Node2) consumer.Consume(context.Background()) } diff --git a/examples/publisher.go b/examples/publisher.go index f1a9e9b..a74281b 100644 --- a/examples/publisher.go +++ b/examples/publisher.go @@ -2,43 +2,29 @@ package main import ( "context" - "encoding/json" "fmt" "github.com/oarkflow/mq" ) func main() { - publishAsync() - publishSync() -} - -func publishAsync() error { - taskPayload := map[string]string{"message": "Fire-and-Forget \n Task"} - payload, _ := json.Marshal(taskPayload) + payload := []byte(`{"message":"Message Publisher \n Task"}`) task := mq.Task{ Payload: payload, } publisher := mq.NewPublisher("publish-1") err := publisher.Publish(context.Background(), "queue1", task) if err != nil { - return fmt.Errorf("failed to publish async task: %w", err) + panic(err) } fmt.Println("Async task published successfully") - return nil -} - -func publishSync() error { - taskPayload := map[string]string{"message": "Request/Response \n Task"} - payload, _ := json.Marshal(taskPayload) - task := mq.Task{ + payload = []byte(`{"message":"Fire-and-Forget \n Task"}`) + task = mq.Task{ Payload: payload, } - publisher := mq.NewPublisher("publish-2") result, err := publisher.Request(context.Background(), "queue1", task) if err != nil { - return fmt.Errorf("failed to publish sync task: %w", err) + panic(err) } fmt.Printf("Sync task published. Result: %v\n", string(result.Payload)) - return nil } diff --git a/examples/server.go b/examples/server.go index bad655d..91aa3e5 100644 --- a/examples/server.go +++ b/examples/server.go @@ -2,16 +2,13 @@ package main import ( "context" - "fmt" "github.com/oarkflow/mq" + "github.com/oarkflow/mq/examples/tasks" ) func main() { - b := mq.NewBroker(mq.WithCallback(func(ctx context.Context, task mq.Result) mq.Result { - fmt.Println("Received task", task.MessageID, "Payload", string(task.Payload), task.Error, task.Queue) - return mq.Result{} - })) + b := mq.NewBroker(mq.WithCallback(tasks.Callback)) b.NewQueue("queue1") b.NewQueue("queue2") b.Start(context.Background()) diff --git a/examples/tasks/tasks.go b/examples/tasks/tasks.go index e37b84b..42e11a2 100644 --- a/examples/tasks/tasks.go +++ b/examples/tasks/tasks.go @@ -38,3 +38,8 @@ func Node4(ctx context.Context, task mq.Task) mq.Result { bt, _ := json.Marshal(payload) return mq.Result{Payload: bt, MessageID: task.ID} } + +func Callback(ctx context.Context, task mq.Result) mq.Result { + fmt.Println("Received task", task.MessageID, "Payload", string(task.Payload), task.Error, task.Queue) + return mq.Result{} +}