From f5bd30b801b944daed473e97bfc749badcc47d1d Mon Sep 17 00:00:00 2001 From: sujit Date: Wed, 30 Oct 2024 12:57:49 +0545 Subject: [PATCH] feat: add task completion --- broker.go | 4 ++++ dag/dag.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/broker.go b/broker.go index 7ead9af..82c6384 100644 --- a/broker.go +++ b/broker.go @@ -483,3 +483,7 @@ func (b *Broker) backoffRetry(queue *Queue, task *QueuedTask, delay time.Duratio } return delay } + +func (b *Broker) URL() string { + return b.opts.brokerAddr +} diff --git a/dag/dag.go b/dag/dag.go index abca5dc..4fac96e 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -126,7 +126,7 @@ func (tm *DAG) GetKey() string { } func (tm *DAG) AssignTopic(topic string) { - tm.consumer = mq.NewConsumer(topic, topic, tm.ProcessTask, mq.WithRespondPendingResult(false)) + tm.consumer = mq.NewConsumer(topic, topic, tm.ProcessTask, mq.WithRespondPendingResult(false), mq.WithBrokerURL(tm.server.URL())) tm.consumerTopic = topic }