feat: separate broker

This commit is contained in:
Oarkflow
2024-09-29 18:44:22 +05:45
parent 3269351988
commit fb7e1e77c4
5 changed files with 56 additions and 14 deletions

View File

@@ -129,6 +129,10 @@ func (b *Broker) Send(ctx context.Context, cmd Command) error {
return nil return nil
} }
func (b *Broker) SyncMode() bool {
return b.opts.syncMode
}
func (b *Broker) sendToPublisher(ctx context.Context, publisherID string, result Result) error { func (b *Broker) sendToPublisher(ctx context.Context, publisherID string, result Result) error {
pub, ok := b.publishers.Get(publisherID) pub, ok := b.publishers.Get(publisherID)
if !ok { if !ok {

View File

@@ -3,6 +3,7 @@ package dag
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"sync" "sync"
"github.com/oarkflow/mq" "github.com/oarkflow/mq"
@@ -17,6 +18,7 @@ type taskContext struct {
} }
type DAG struct { type DAG struct {
FirstNode string
server *mq.Broker server *mq.Broker
nodes map[string]*mq.Consumer nodes map[string]*mq.Consumer
edges map[string][]string edges map[string][]string
@@ -39,8 +41,11 @@ func New(opts ...mq.Option) *DAG {
return d return d
} }
func (d *DAG) AddNode(name string, handler mq.Handler) { func (d *DAG) AddNode(name string, handler mq.Handler, firstNode ...bool) {
con := mq.NewConsumer(name) con := mq.NewConsumer(name)
if len(firstNode) > 0 {
d.FirstNode = name
}
con.RegisterHandler(name, handler) con.RegisterHandler(name, handler)
d.nodes[name] = con d.nodes[name] = con
} }
@@ -54,6 +59,15 @@ func (d *DAG) AddLoop(fromNode string, toNode ...string) {
} }
func (d *DAG) Start(ctx context.Context) error { func (d *DAG) Start(ctx context.Context) error {
if d.FirstNode == "" {
firstNode, ok := d.FindFirstNode()
if ok && firstNode != "" {
d.FirstNode = firstNode
}
}
if d.server.SyncMode() {
return nil
}
for _, con := range d.nodes { for _, con := range d.nodes {
go con.Consume(ctx) go con.Consume(ctx)
} }
@@ -70,11 +84,37 @@ func (d *DAG) PublishTask(ctx context.Context, payload []byte, queueName string,
return d.server.Publish(ctx, task, queueName) return d.server.Publish(ctx, task, queueName)
} }
func (d *DAG) FindFirstNode() (string, bool) {
inDegree := make(map[string]int)
for n, _ := range d.nodes {
inDegree[n] = 0
}
for _, targets := range d.edges {
for _, outNode := range targets {
inDegree[outNode]++
}
}
for _, targets := range d.loopEdges {
for _, outNode := range targets {
inDegree[outNode]++
}
}
for n, count := range inDegree {
if count == 0 {
return n, true
}
}
return "", false
}
func (d *DAG) Send(payload []byte) mq.Result { func (d *DAG) Send(payload []byte) mq.Result {
if d.FirstNode == "" {
return mq.Result{Error: fmt.Errorf("initial node not defined")}
}
resultCh := make(chan mq.Result) resultCh := make(chan mq.Result)
task, err := d.PublishTask(context.TODO(), payload, "queue2") task, err := d.PublishTask(context.TODO(), payload, d.FirstNode)
if err != nil { if err != nil {
panic(err) return mq.Result{Error: err}
} }
d.mu.Lock() d.mu.Lock()
d.taskChMap[task.ID] = resultCh d.taskChMap[task.ID] = resultCh

View File

@@ -43,7 +43,6 @@ func main() {
d.AddEdge("queue1", "queue2") d.AddEdge("queue1", "queue2")
d.AddLoop("queue2", "queue3") d.AddLoop("queue2", "queue3")
d.AddEdge("queue2", "queue4") d.AddEdge("queue2", "queue4")
go func() { go func() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
finalResult := d.Send([]byte(`[{"user_id": 1}, {"user_id": 2}]`)) finalResult := d.Send([]byte(`[{"user_id": 1}, {"user_id": 2}]`))

View File

@@ -13,42 +13,32 @@ func main() {
publishSync() publishSync()
} }
// publishAsync sends a task in Fire-and-Forget (async) mode
func publishAsync() error { func publishAsync() error {
taskPayload := map[string]string{"message": "Fire-and-Forget \n Task"} taskPayload := map[string]string{"message": "Fire-and-Forget \n Task"}
payload, _ := json.Marshal(taskPayload) payload, _ := json.Marshal(taskPayload)
task := mq.Task{ task := mq.Task{
Payload: payload, Payload: payload,
} }
// Create publisher and send the task without waiting for a result
publisher := mq.NewPublisher("publish-1") publisher := mq.NewPublisher("publish-1")
err := publisher.Publish(context.Background(), "queue1", task) err := publisher.Publish(context.Background(), "queue1", task)
if err != nil { if err != nil {
return fmt.Errorf("failed to publish async task: %w", err) return fmt.Errorf("failed to publish async task: %w", err)
} }
fmt.Println("Async task published successfully") fmt.Println("Async task published successfully")
return nil return nil
} }
// publishSync sends a task in Request/Response (sync) mode
func publishSync() error { func publishSync() error {
taskPayload := map[string]string{"message": "Request/Response \n Task"} taskPayload := map[string]string{"message": "Request/Response \n Task"}
payload, _ := json.Marshal(taskPayload) payload, _ := json.Marshal(taskPayload)
task := mq.Task{ task := mq.Task{
Payload: payload, Payload: payload,
} }
// Create publisher and send the task, waiting for the result
publisher := mq.NewPublisher("publish-2") publisher := mq.NewPublisher("publish-2")
result, err := publisher.Request(context.Background(), "queue1", task) result, err := publisher.Request(context.Background(), "queue1", task)
if err != nil { if err != nil {
return fmt.Errorf("failed to publish sync task: %w", err) return fmt.Errorf("failed to publish sync task: %w", err)
} }
fmt.Printf("Sync task published. Result: %v\n", string(result.Payload)) fmt.Printf("Sync task published. Result: %v\n", string(result.Payload))
return nil return nil
} }

View File

@@ -6,6 +6,7 @@ import (
) )
type Options struct { type Options struct {
syncMode bool
brokerAddr string brokerAddr string
messageHandler MessageHandler messageHandler MessageHandler
closeHandler CloseHandler closeHandler CloseHandler
@@ -19,6 +20,7 @@ type Options struct {
func defaultOptions() Options { func defaultOptions() Options {
return Options{ return Options{
syncMode: true,
brokerAddr: ":8080", brokerAddr: ":8080",
maxRetries: 5, maxRetries: 5,
initialDelay: 2 * time.Second, initialDelay: 2 * time.Second,
@@ -37,6 +39,13 @@ func WithBrokerURL(url string) Option {
} }
} }
// WithSyncMode -
func WithSyncMode(mode bool) Option {
return func(opts *Options) {
opts.syncMode = mode
}
}
// WithMaxRetries - // WithMaxRetries -
func WithMaxRetries(val int) Option { func WithMaxRetries(val int) Option {
return func(opts *Options) { return func(opts *Options) {