feat: update

This commit is contained in:
sujit
2024-10-13 11:02:55 +05:45
parent f5ce142544
commit 2f39fb40d7
6 changed files with 54 additions and 17 deletions

View File

@@ -38,11 +38,11 @@ type Broker struct {
queues xsync.IMap[string, *Queue]
consumers xsync.IMap[string, *consumer]
publishers xsync.IMap[string, *publisher]
opts Options
opts *Options
}
func NewBroker(opts ...Option) *Broker {
options := setupOptions(opts...)
options := SetupOptions(opts...)
return &Broker{
queues: xsync.NewMap[string, *Queue](),
publishers: xsync.NewMap[string, *publisher](),
@@ -51,7 +51,7 @@ func NewBroker(opts ...Option) *Broker {
}
}
func (b *Broker) Options() Options {
func (b *Broker) Options() *Options {
return b.opts
}

View File

@@ -32,11 +32,11 @@ type Consumer struct {
pool *Pool
id string
queue string
opts Options
opts *Options
}
func NewConsumer(id string, queue string, handler Handler, opts ...Option) *Consumer {
options := setupOptions(opts...)
options := SetupOptions(opts...)
return &Consumer{
id: id,
opts: options,

32
examples/dag_consumer.go Normal file
View File

@@ -0,0 +1,32 @@
package main
import (
"context"
"github.com/oarkflow/mq"
"github.com/oarkflow/mq/examples/tasks"
"github.com/oarkflow/mq/dag"
)
func main() {
d := dag.NewDAG("Sample DAG", "sample-dag",
mq.WithSyncMode(true),
mq.WithNotifyResponse(tasks.NotifyResponse),
mq.WithSecretKey([]byte("wKWa6GKdBd0njDKNQoInBbh6P0KTjmob")),
)
d.AddNode("C", "C", tasks.Node3, true)
d.AddNode("D", "D", tasks.Node4)
d.AddNode("E", "E", tasks.Node5)
d.AddNode("F", "F", tasks.Node6)
d.AddNode("G", "G", tasks.Node7)
d.AddNode("H", "H", tasks.Node8)
d.AddCondition("C", map[dag.When]dag.Then{"PASS": "D", "FAIL": "E"})
d.AddEdge("Label 1", "B", "C")
d.AddEdge("Label 2", "D", "F")
d.AddEdge("Label 3", "E", "F")
d.AddEdge("Label 4", "F", "G", "H")
// d.AssignTopic("queue1")
d.Consume(context.Background())
}

View File

@@ -3,24 +3,25 @@ package main
import (
"context"
"fmt"
mq2 "github.com/oarkflow/mq"
"time"
"github.com/oarkflow/mq"
)
func main() {
payload := []byte(`{"message":"Message Publisher \n Task"}`)
task := mq2.Task{
payload := []byte(`{"user_id": 2, "age": 34}`)
task := mq.Task{
Payload: payload,
}
publisher := mq2.NewPublisher("publish-1")
publisher := mq.NewPublisher("publish-1")
// publisher := mq.NewPublisher("publish-1", mq.WithTLS(true, "./certs/server.crt", "./certs/server.key"))
err := publisher.Publish(context.Background(), task, "queue1")
if err != nil {
panic(err)
}
fmt.Println("Async task published successfully")
payload = []byte(`{"message":"Fire-and-Forget \n Task"}`)
task = mq2.Task{
payload = []byte(`{"user_id": 2, "age": 34}`)
task = mq.Task{
Payload: payload,
}
for i := 0; i < 100; i++ {

View File

@@ -81,8 +81,12 @@ type Options struct {
respondPendingResult bool
}
func defaultOptions() Options {
return Options{
func (o *Options) SetSyncMode(sync bool) {
o.syncMode = sync
}
func defaultOptions() *Options {
return &Options{
brokerAddr: ":8080",
maxRetries: 5,
respondPendingResult: true,
@@ -99,10 +103,10 @@ func defaultOptions() Options {
// Option defines a function type for setting options.
type Option func(*Options)
func setupOptions(opts ...Option) Options {
func SetupOptions(opts ...Option) *Options {
options := defaultOptions()
for _, opt := range opts {
opt(&options)
opt(options)
}
return options
}

View File

@@ -15,11 +15,11 @@ import (
type Publisher struct {
id string
opts Options
opts *Options
}
func NewPublisher(id string, opts ...Option) *Publisher {
options := setupOptions(opts...)
options := SetupOptions(opts...)
return &Publisher{id: id, opts: options}
}