diff --git a/broker.go b/broker.go index 7ead9af..82c6384 100644 --- a/broker.go +++ b/broker.go @@ -483,3 +483,7 @@ func (b *Broker) backoffRetry(queue *Queue, task *QueuedTask, delay time.Duratio } return delay } + +func (b *Broker) URL() string { + return b.opts.brokerAddr +} diff --git a/dag/dag.go b/dag/dag.go index abca5dc..4fac96e 100644 --- a/dag/dag.go +++ b/dag/dag.go @@ -126,7 +126,7 @@ func (tm *DAG) GetKey() string { } func (tm *DAG) AssignTopic(topic string) { - tm.consumer = mq.NewConsumer(topic, topic, tm.ProcessTask, mq.WithRespondPendingResult(false)) + tm.consumer = mq.NewConsumer(topic, topic, tm.ProcessTask, mq.WithRespondPendingResult(false), mq.WithBrokerURL(tm.server.URL())) tm.consumerTopic = topic }