fix: pull job publisher panic

This commit is contained in:
langhuihui
2025-09-22 19:33:44 +08:00
parent 7f05a1f24d
commit 4e6abef720

View File

@@ -239,13 +239,15 @@ func (p *PullJob) Publish() (err error) {
if len(p.Connection.Args) > 0 { if len(p.Connection.Args) > 0 {
streamPath += "?" + p.Connection.Args.Encode() streamPath += "?" + p.Connection.Args.Encode()
} }
p.Publisher, err = p.Plugin.PublishWithConfig(p.puller, streamPath, p.PublishConfig) var publisher *Publisher
publisher, err = p.Plugin.PublishWithConfig(p.puller, streamPath, p.PublishConfig)
if err == nil { if err == nil {
p.Publisher.OnDispose(func() { p.Publisher = publisher
if p.Publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) || p.MaxRetry == 0 { publisher.OnDispose(func() {
p.Stop(p.Publisher.StopReason()) if publisher.StopReasonIs(pkg.ErrPublishDelayCloseTimeout, task.ErrStopByUser) || p.MaxRetry == 0 {
p.Stop(publisher.StopReason())
} else { } else {
p.puller.Stop(p.Publisher.StopReason()) p.puller.Stop(publisher.StopReason())
} }
}) })
} }