diff --git a/README.md b/README.md index 87df892..f753e85 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,10 @@ package main import ( "context" - "m7s.live/m7s/v5" - _ "m7s.live/m7s/v5/plugin/debug" - _ "m7s.live/m7s/v5/plugin/flv" - _ "m7s.live/m7s/v5/plugin/rtmp" + "m7s.live/v5" + _ "m7s.live/v5/plugin/debug" + _ "m7s.live/v5/plugin/flv" + _ "m7s.live/v5/plugin/rtmp" ) func main() { @@ -25,7 +25,10 @@ func main() { | Build Tag | Description | |-----------|-------------| | disable_rm | Disables the memory pool | -| sqlite | Enables the sqlite DB | +| sqlite | Enables the sqlite DB | +| sqliteCGO | Enables the sqlite cgo version DB | +| mysql | Enables the mysql DB | +| postgres | Enables the postgres DB | | duckdb | Enables the duckdb DB | | taskpanic | Throws panic, for testing | diff --git a/README_CN.md b/README_CN.md index 0b3411f..d9bfe74 100644 --- a/README_CN.md +++ b/README_CN.md @@ -24,7 +24,9 @@ func main() { |-----------|-----------------| | disable_rm | 禁用内存池 | | sqlite | 启用 sqlite | -|sqliteCGO | 启用 sqlite cgo版本 | +| sqliteCGO | 启用 sqlite cgo版本 | +| mysql | 启用 mysql | +| postgres | 启用 postgres | | duckdb | 启用 duckdb | | taskpanic | 抛出 panic,用于测试 | @@ -36,3 +38,13 @@ func main() { # 创建插件 到 plugin 目录下查看 README_CN.md + +# Prometheus + +```yaml +scrape_configs: + - job_name: "monibuca" + metrics_path: "/api/metrics" + static_configs: + - targets: ["localhost:8080"] +``` diff --git a/example/8080/pull_hls.yaml b/example/8080/pull_hls.yaml new file mode 100644 index 0000000..d318857 --- /dev/null +++ b/example/8080/pull_hls.yaml @@ -0,0 +1,5 @@ +global: + loglevel: debug +hls: + pull: + live/test: http://live4.nbs.cn/channels/njtv/glgc/hd.m3u8 diff --git a/example/default/main.go b/example/default/main.go index c0c02a5..07924ee 100644 --- a/example/default/main.go +++ b/example/default/main.go @@ -10,6 +10,7 @@ import ( _ "m7s.live/v5/plugin/debug" _ "m7s.live/v5/plugin/flv" _ "m7s.live/v5/plugin/gb28181" + _ "m7s.live/v5/plugin/hls" _ "m7s.live/v5/plugin/logrotate" _ "m7s.live/v5/plugin/monitor" _ "m7s.live/v5/plugin/mp4" diff --git a/pkg/annexb.go b/pkg/annexb.go index a5097f8..f1720dc 100644 --- a/pkg/annexb.go +++ b/pkg/annexb.go @@ -42,6 +42,7 @@ func (a *AnnexB) GetSize() int { func (a *AnnexB) GetTimestamp() time.Duration { return a.DTS * time.Millisecond / 90 } + func (a *AnnexB) GetCTS() time.Duration { return (a.PTS - a.DTS) * time.Millisecond / 90 } diff --git a/pkg/config/types.go b/pkg/config/types.go index f3bb24a..7586808 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -12,21 +12,28 @@ import ( "m7s.live/v5/pkg/util" ) +const ( + RelayModeRemux = "remux" + RelayModeRelay = "relay" + RelayModeMix = "mix" +) + type ( Publish struct { MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量 PubAudio bool `default:"true" desc:"是否发布音频"` PubVideo bool `default:"true" desc:"是否发布视频"` - KickExist bool `desc:"是否踢掉已经存在的发布者"` // 是否踢掉已经存在的发布者 - PublishTimeout time.Duration `default:"10s" desc:"发布无数据超时"` // 发布无数据超时 - WaitCloseTimeout time.Duration `desc:"延迟自动关闭(等待重连)"` // 延迟自动关闭(等待重连) - DelayCloseTimeout time.Duration `desc:"延迟自动关闭(无订阅时)"` // 延迟自动关闭(无订阅时) - IdleTimeout time.Duration `desc:"空闲(无订阅)超时"` // 空闲(无订阅)超时 - PauseTimeout time.Duration `default:"30s" desc:"暂停超时时间"` // 暂停超时 - BufferTime time.Duration `desc:"缓冲时长,0代表取最近关键帧"` // 缓冲长度(单位:秒),0代表取最近关键帧 - Speed float64 `default:"0" desc:"倍速"` // 倍速,0 为不限速 - Key string `desc:"发布鉴权key"` // 发布鉴权key - RingSize util.Range[int] `default:"20-1024" desc:"RingSize范围"` // 缓冲区大小范围 + KickExist bool `desc:"是否踢掉已经存在的发布者"` // 是否踢掉已经存在的发布者 + PublishTimeout time.Duration `default:"10s" desc:"发布无数据超时"` // 发布无数据超时 + WaitCloseTimeout time.Duration `desc:"延迟自动关闭(等待重连)"` // 延迟自动关闭(等待重连) + DelayCloseTimeout time.Duration `desc:"延迟自动关闭(无订阅时)"` // 延迟自动关闭(无订阅时) + IdleTimeout time.Duration `desc:"空闲(无订阅)超时"` // 空闲(无订阅)超时 + PauseTimeout time.Duration `default:"30s" desc:"暂停超时时间"` // 暂停超时 + BufferTime time.Duration `desc:"缓冲时长,0代表取最近关键帧"` // 缓冲长度(单位:秒),0代表取最近关键帧 + Speed float64 `default:"0" desc:"倍速"` // 倍速,0 为不限速 + Key string `desc:"发布鉴权key"` // 发布鉴权key + RingSize util.Range[int] `default:"20-1024" desc:"RingSize范围"` // 缓冲区大小范围 + RelayMode string `default:"remux" desc:"转发模式" enum:"remux:转格式,relay:纯转发,mix:混合转发"` // 转发模式 Dump bool } Subscribe struct { diff --git a/plugin/hls/pkg/m3u8.go b/plugin/hls/pkg/m3u8.go index 28edfd4..c8c6988 100644 --- a/plugin/hls/pkg/m3u8.go +++ b/plugin/hls/pkg/m3u8.go @@ -2,6 +2,7 @@ package hls import ( "compress/gzip" + "fmt" "io" "net/http" "net/url" @@ -10,6 +11,9 @@ import ( "github.com/quangngotan95/go-m3u8/m3u8" ) +var memoryM3u8 sync.Map +var memoryTs sync.Map + type M3u8Info struct { Req *http.Request M3U8Count int //一共拉取的m3u8文件数量 @@ -52,3 +56,67 @@ func readM3U8(res *http.Response) (playlist *m3u8.Playlist, err error) { } return } + +const ( + HLS_KEY_METHOD_AES_128 = "AES-128" +) + +// https://datatracker.ietf.org/doc/draft-pantos-http-live-streaming/ + +// 以”#EXT“开头的表示一个”tag“,否则表示注释,直接忽略 +type Playlist struct { + io.Writer + ExtM3U string // indicates that the file is an Extended M3U [M3U] Playlist file. (4.3.3.1) -- 每个M3U文件第一行必须是这个tag. + Version int // indicates the compatibility version of the Playlist file. (4.3.1.2) -- 协议版本号. + Sequence int // indicates the Media Sequence Number of the first Media Segment that appears in a Playlist file. (4.3.3.2) -- 第一个媒体段的序列号. + Targetduration int // specifies the maximum Media Segment duration. (4.3.3.1) -- 每个视频分段最大的时长(单位秒). + PlaylistType int // rovides mutability information about the Media Playlist file. (4.3.3.5) -- 提供关于PlayList的可变性的信息. + Discontinuity int // indicates a discontinuity between theMedia Segment that follows it and the one that preceded it. (4.3.2.3) -- 该标签后边的媒体文件和之前的媒体文件之间的编码不连贯(即发生改变)(场景用于插播广告等等). + Key PlaylistKey // specifies how to decrypt them. (4.3.2.4) -- 解密媒体文件的必要信息(表示怎么对media segments进行解码). + EndList string // indicates that no more Media Segments will be added to the Media Playlist file. (4.3.3.4) -- 标示没有更多媒体文件将会加入到播放列表中,它可能会出现在播放列表文件的任何地方,但是不能出现两次或以上. + Inf PlaylistInf // specifies the duration of a Media Segment. (4.3.2.1) -- 指定每个媒体段(ts)的持续时间. + tsCount int +} + +// Discontinuity : +// file format +// number, type and identifiers of tracks +// timestamp sequence +// encoding parameters +// encoding sequence + +type PlaylistKey struct { + Method string // specifies the encryption method. (4.3.2.4) + Uri string // key url. (4.3.2.4) + IV string // key iv. (4.3.2.4) +} + +type PlaylistInf struct { + Duration float64 + Title string + FilePath string +} + +func (pl *Playlist) Init() (err error) { + // ss := fmt.Sprintf("#EXTM3U\n"+ + // "#EXT-X-VERSION:%d\n"+ + // "#EXT-X-MEDIA-SEQUENCE:%d\n"+ + // "#EXT-X-TARGETDURATION:%d\n"+ + // "#EXT-X-PLAYLIST-TYPE:%d\n"+ + // "#EXT-X-DISCONTINUITY:%d\n"+ + // "#EXT-X-KEY:METHOD=%s,URI=%s,IV=%s\n"+ + // "#EXT-X-ENDLIST", hls.Version, hls.Sequence, hls.Targetduration, hls.PlaylistType, hls.Discontinuity, hls.Key.Method, hls.Key.Uri, hls.Key.IV) + _, err = fmt.Fprintf(pl, "#EXTM3U\n"+ + "#EXT-X-VERSION:%d\n"+ + "#EXT-X-MEDIA-SEQUENCE:%d\n"+ + "#EXT-X-TARGETDURATION:%d\n", pl.Version, pl.Sequence, pl.Targetduration) + pl.Sequence++ + return +} + +func (pl *Playlist) WriteInf(inf PlaylistInf) (err error) { + _, err = fmt.Fprintf(pl, "#EXTINF:%.3f,\n"+ + "%s\n", inf.Duration, inf.Title) + pl.tsCount++ + return +} diff --git a/plugin/hls/pkg/pull.go b/plugin/hls/pkg/pull.go index f2d3e58..66acc71 100644 --- a/plugin/hls/pkg/pull.go +++ b/plugin/hls/pkg/pull.go @@ -2,16 +2,28 @@ package hls import ( "context" + "fmt" + "io" "net/http" + "os" + "path/filepath" + "strings" "sync" + "time" + "github.com/quangngotan95/go-m3u8/m3u8" "m7s.live/v5" + "m7s.live/v5/pkg" + "m7s.live/v5/pkg/codec" "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" + "m7s.live/v5/pkg/util" + mpegts "m7s.live/v5/plugin/hls/pkg/ts" ) type Puller struct { - m7s.HTTPFilePuller + task.Job + PullJob m7s.PullJob Video M3u8Info Audio M3u8Info TsHead http.Header `json:"-" yaml:"-"` //用于提供cookie等特殊身份的http头 @@ -25,10 +37,288 @@ func NewPuller(_ config.Pull) m7s.IPuller { return p } +func (p *Puller) GetPullJob() *m7s.PullJob { + return &p.PullJob +} + func (p *Puller) GetTs(key string) (any, bool) { return p.memoryTs.Load(key) } -func (p *Puller) Run() (err error) { +func (p *Puller) Start() (err error) { + if err = p.PullJob.Publish(); err != nil { + return + } + p.PullJob.Publisher.Speed = 1 + if p.PullJob.PublishConfig.RelayMode != config.RelayModeRemux { + p.memoryTs.Store(p.PullJob.StreamPath, p) + } + return +} + +func (p *Puller) Dispose() { + if p.PullJob.PublishConfig.RelayMode == config.RelayModeRelay { + memoryTs.Delete(p.PullJob.StreamPath) + } +} + +func (p *Puller) Run() (err error) { + p.Video.Req, err = http.NewRequest("GET", p.PullJob.RemoteURL, nil) + if err != nil { + return + } + return p.pull(&p.Video) +} + +func (p *Puller) writePublisher(t *mpegts.MpegTsStream) { + var audioCodec codec.FourCC + var audioStreamType, videoStreamType byte + for pes := range t.PESChan { + if p.Err() != nil { + continue + } + if pes.Header.Dts == 0 { + pes.Header.Dts = pes.Header.Pts + } + switch pes.Header.StreamID & 0xF0 { + case mpegts.STREAM_ID_VIDEO: + if videoStreamType == 0 { + for _, s := range t.PMT.Stream { + videoStreamType = s.StreamType + break + } + } + switch videoStreamType { + case mpegts.STREAM_TYPE_H264: + var annexb pkg.AnnexB + annexb.PTS = time.Duration(pes.Header.Pts) + annexb.DTS = time.Duration(pes.Header.Dts) + annexb.AppendOne(pes.Payload) + p.PullJob.Publisher.WriteVideo(&annexb) + case mpegts.STREAM_TYPE_H265: + var annexb pkg.AnnexB + annexb.PTS = time.Duration(pes.Header.Pts) + annexb.DTS = time.Duration(pes.Header.Dts) + annexb.Hevc = true + annexb.AppendOne(pes.Payload) + p.PullJob.Publisher.WriteVideo(&annexb) + default: + if audioStreamType == 0 { + for _, s := range t.PMT.Stream { + audioStreamType = s.StreamType + switch s.StreamType { + case mpegts.STREAM_TYPE_AAC: + audioCodec = codec.FourCC_MP4A + case mpegts.STREAM_TYPE_G711A: + audioCodec = codec.FourCC_ALAW + case mpegts.STREAM_TYPE_G711U: + audioCodec = codec.FourCC_ULAW + } + } + } + switch audioStreamType { + case mpegts.STREAM_TYPE_AAC: + var adts pkg.ADTS + adts.DTS = time.Duration(pes.Header.Dts) + adts.AppendOne(pes.Payload) + p.PullJob.Publisher.WriteAudio(&adts) + default: + var raw pkg.RawAudio + raw.FourCC = audioCodec + raw.Timestamp = time.Duration(pes.Header.Pts) * time.Millisecond / 90 + raw.AppendOne(pes.Payload) + p.PullJob.Publisher.WriteAudio(&raw) + } + } + } + } +} + +func (p *Puller) pull(info *M3u8Info) (err error) { + //请求失败自动退出 + req := info.Req.WithContext(p.Context) + client := p.PullJob.HTTPClient + sequence := -1 + lastTs := make(map[string]bool) + tsbuffer := make(chan io.ReadCloser) + tsRing := util.NewRing[string](6) + var tsReader *mpegts.MpegTsStream + var closer io.Closer + p.OnDispose(func() { + if closer != nil { + closer.Close() + } + }) + if p.PullJob.PublishConfig.RelayMode != config.RelayModeRelay { + tsReader = &mpegts.MpegTsStream{ + PESChan: make(chan *mpegts.MpegTsPESPacket, 50), + PESBuffer: make(map[uint16]*mpegts.MpegTsPESPacket), + } + go p.writePublisher(tsReader) + defer close(tsReader.PESChan) + } + defer close(tsbuffer) + var maxResolution *m3u8.PlaylistItem + for errcount := 0; err == nil; err = p.Err() { + resp, err1 := client.Do(req) + if err1 != nil { + return err1 + } + req = resp.Request + if playlist, err2 := readM3U8(resp); err2 == nil { + errcount = 0 + info.LastM3u8 = playlist.String() + //if !playlist.Live { + // log.Println(p.LastM3u8) + // return + //} + if playlist.Sequence <= sequence { + p.Warn("same sequence", "sequence", playlist.Sequence, "max", sequence) + time.Sleep(time.Second) + continue + } + info.M3U8Count++ + sequence = playlist.Sequence + thisTs := make(map[string]bool) + tsItems := make([]*m3u8.SegmentItem, 0) + discontinuity := false + for _, item := range playlist.Items { + switch v := item.(type) { + case *m3u8.PlaylistItem: + if (maxResolution == nil || maxResolution.Resolution != nil && (maxResolution.Resolution.Width < v.Resolution.Width || maxResolution.Resolution.Height < v.Resolution.Height)) || maxResolution.Bandwidth < v.Bandwidth { + maxResolution = v + } + case *m3u8.DiscontinuityItem: + discontinuity = true + case *m3u8.SegmentItem: + thisTs[v.Segment] = true + if _, ok := lastTs[v.Segment]; ok && !discontinuity { + continue + } + tsItems = append(tsItems, v) + case *m3u8.MediaItem: + if p.Audio.Req == nil { + if url, err := req.URL.Parse(*v.URI); err == nil { + newReq, _ := http.NewRequest("GET", url.String(), nil) + newReq.Header = req.Header + p.Audio.Req = newReq + go p.pull(&p.Audio) + } + } + } + } + if maxResolution != nil && len(tsItems) == 0 { + if url, err := req.URL.Parse(maxResolution.URI); err == nil { + if strings.HasSuffix(url.Path, ".m3u8") { + p.Video.Req, _ = http.NewRequest("GET", url.String(), nil) + p.Video.Req.Header = req.Header + req = p.Video.Req + continue + } + } + } + tsCount := len(tsItems) + p.Debug("readM3U8", "sequence", sequence, "tscount", tsCount) + lastTs = thisTs + if tsCount > 3 { + tsItems = tsItems[tsCount-3:] + } + var plBuffer util.Buffer + relayPlayList := Playlist{ + Writer: &plBuffer, + Targetduration: playlist.Target, + Sequence: playlist.Sequence, + } + if p.PullJob.PublishConfig.RelayMode != config.RelayModeRemux { + relayPlayList.Init() + } + var tsDownloaders = make([]*TSDownloader, len(tsItems)) + for i, v := range tsItems { + if p.Err() != nil { + return p.Err() + } + tsUrl, _ := info.Req.URL.Parse(v.Segment) + tsReq, _ := http.NewRequestWithContext(p.Context, "GET", tsUrl.String(), nil) + tsReq.Header = p.TsHead + // t1 := time.Now() + tsDownloaders[i] = &TSDownloader{ + client: client, + req: tsReq, + url: tsUrl, + dur: v.Duration, + } + tsDownloaders[i].Start() + } + ts := time.Now().UnixMilli() + for i, v := range tsDownloaders { + p.Debug("start download ts", "tsUrl", v.url.String()) + v.wg.Wait() + if v.res != nil { + info.TSCount++ + var reader io.Reader = v.res.Body + closer = v.res.Body + if p.SaveContext != nil && p.SaveContext.Err() == nil { + savePath := p.SaveContext.Value("path").(string) + os.MkdirAll(filepath.Join(savePath, p.PullJob.StreamPath), 0766) + if f, err := os.OpenFile(filepath.Join(savePath, p.PullJob.StreamPath, filepath.Base(v.url.Path)), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666); err == nil { + reader = io.TeeReader(v.res.Body, f) + closer = f + } + } + var tsBytes *util.Buffer + switch p.PullJob.PublishConfig.RelayMode { + case config.RelayModeRelay: + tsBytes = &util.Buffer{} + io.Copy(tsBytes, reader) + case config.RelayModeMix: + tsBytes = &util.Buffer{} + reader = io.TeeReader(reader, tsBytes) + fallthrough + case config.RelayModeRemux: + tsReader.Feed(reader) + } + if tsBytes != nil { + tsFilename := fmt.Sprintf("%d_%d.ts", ts, i) + tsFilePath := p.PullJob.StreamPath + "/" + tsFilename + var plInfo = PlaylistInf{ + Title: p.PullJob.StreamPath + "/" + tsFilename, + Duration: v.dur, + FilePath: tsFilePath, + } + relayPlayList.WriteInf(plInfo) + p.memoryTs.Store(tsFilePath, tsBytes) + next := tsRing.Next() + if next.Value != "" { + item, _ := p.memoryTs.LoadAndDelete(next.Value) + if item == nil { + p.Warn("memoryTs delete nil", "tsFilePath", next.Value) + } else { + // item.Recycle() + } + } + next.Value = tsFilePath + tsRing = next + } + closer.Close() + } else if v.err != nil { + p.Error("reqTs", "streamPath", p.PullJob.StreamPath, "err", v.err) + } else { + p.Error("reqTs", "streamPath", p.PullJob.StreamPath) + } + p.Debug("finish download ts", "tsUrl", v.url.String()) + } + if p.PullJob.PublishConfig.RelayMode != config.RelayModeRemux { + m3u8 := string(plBuffer) + p.Debug("write m3u8", "streamPath", p.PullJob.StreamPath, "m3u8", m3u8) + memoryM3u8.Store(p.PullJob.StreamPath, m3u8) + } + } else { + p.Error("readM3u8", "streamPath", p.PullJob.StreamPath, "err", err2) + errcount++ + if errcount > 10 { + return err2 + } + } + } return } diff --git a/publisher.go b/publisher.go index 278713f..ccb9144 100644 --- a/publisher.go +++ b/publisher.go @@ -60,9 +60,9 @@ func (s *SpeedControl) speedControl(speed float64, ts time.Duration) { } should := time.Duration(float64(ts-s.beginTimestamp) / speed) s.Delta = should - elapsed - // fmt.Println(speed, elapsed, should, s.Delta) + //fmt.Println(speed, elapsed, should, s.Delta) if s.Delta > threshold { - time.Sleep(s.Delta) + time.Sleep(min(s.Delta, time.Millisecond*500)) } } } @@ -200,8 +200,14 @@ func (p *Publisher) Start() (err error) { } } } - p.audioReady = util.NewPromise(p) - p.videoReady = util.NewPromise(p) + p.audioReady = util.NewPromiseWithTimeout(p, p.PublishTimeout) + if !p.PubAudio { + p.audioReady.Reject(ErrMuted) + } + p.videoReady = util.NewPromiseWithTimeout(p, p.PublishTimeout) + if !p.PubVideo { + p.videoReady.Reject(ErrMuted) + } if p.Dump { f := filepath.Join("./dump", p.StreamPath) os.MkdirAll(filepath.Dir(f), 0666) @@ -362,16 +368,16 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { if t.FPS > 0 { frameDur := float64(time.Second) / float64(t.FPS) if math.Abs(float64(frame.Timestamp-t.LastTs)) > 10*frameDur { //时间戳突变 - p.Warn("timestamp mutation", "fps", t.FPS, "lastTs", t.LastTs, "ts", frame.Timestamp, "frameDur", time.Duration(frameDur)) - frame.Timestamp = t.LastTs + time.Duration(frameDur) + p.Warn("timestamp mutation", "fps", t.FPS, "lastTs", uint32(t.LastTs/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "frameDur", time.Duration(frameDur)) t.BaseTs = frame.Timestamp - ts + frame.Timestamp = t.LastTs + time.Duration(frameDur) } } t.LastTs = frame.Timestamp if p.Enabled(p, task.TraceLevel) { codec := t.FourCC().String() data := frame.Wraps[0].String() - p.Trace("write", "seq", frame.Sequence, "ts0", ts, "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data) + p.Trace("write", "seq", frame.Sequence, "baseTs", int32(t.BaseTs/time.Millisecond), "ts0", uint32(ts/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data) } } diff --git a/puller.go b/puller.go index 107c2e2..b85feb3 100644 --- a/puller.go +++ b/puller.go @@ -36,7 +36,7 @@ type ( PullJob struct { Connection Publisher *Publisher - publishConfig *config.Publish + PublishConfig *config.Publish puller IPuller conf *config.Pull } @@ -83,9 +83,9 @@ func (p *PullJob) GetPullJob() *PullJob { func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf config.Pull) *PullJob { if conf.PubConf != nil { - p.publishConfig = conf.PubConf + p.PublishConfig = conf.PubConf } else { - p.publishConfig = &plugin.config.Publish + p.PublishConfig = &plugin.config.Publish } p.Args = url.Values(conf.Args.DeepClone()) p.conf = &conf @@ -129,7 +129,7 @@ func (p *PullJob) Publish() (err error) { if len(p.Args) > 0 { streamPath += "?" + p.Args.Encode() } - p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, streamPath, *p.publishConfig) + p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, streamPath, *p.PublishConfig) p.Publisher.Type = PublishTypePull if err == nil && p.conf.MaxRetry != 0 { p.Publisher.OnDispose(func() {