feat: add hls pull support

This commit is contained in:
langhuihui
2024-11-14 18:58:39 +08:00
parent 4f9cb3305b
commit 3ede6ec08f
10 changed files with 422 additions and 29 deletions

View File

@@ -9,10 +9,10 @@ package main
import ( import (
"context" "context"
"m7s.live/m7s/v5" "m7s.live/v5"
_ "m7s.live/m7s/v5/plugin/debug" _ "m7s.live/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/flv" _ "m7s.live/v5/plugin/flv"
_ "m7s.live/m7s/v5/plugin/rtmp" _ "m7s.live/v5/plugin/rtmp"
) )
func main() { func main() {
@@ -26,6 +26,9 @@ func main() {
|-----------|-------------| |-----------|-------------|
| disable_rm | Disables the memory pool | | disable_rm | Disables the memory pool |
| sqlite | Enables the sqlite DB | | sqlite | Enables the sqlite DB |
| sqliteCGO | Enables the sqlite cgo version DB |
| mysql | Enables the mysql DB |
| postgres | Enables the postgres DB |
| duckdb | Enables the duckdb DB | | duckdb | Enables the duckdb DB |
| taskpanic | Throws panic, for testing | | taskpanic | Throws panic, for testing |

View File

@@ -24,7 +24,9 @@ func main() {
|-----------|-----------------| |-----------|-----------------|
| disable_rm | 禁用内存池 | | disable_rm | 禁用内存池 |
| sqlite | 启用 sqlite | | sqlite | 启用 sqlite |
|sqliteCGO | 启用 sqlite cgo版本 | | sqliteCGO | 启用 sqlite cgo版本 |
| mysql | 启用 mysql |
| postgres | 启用 postgres |
| duckdb | 启用 duckdb | | duckdb | 启用 duckdb |
| taskpanic | 抛出 panic用于测试 | | taskpanic | 抛出 panic用于测试 |
@@ -36,3 +38,13 @@ func main() {
# 创建插件 # 创建插件
到 plugin 目录下查看 README_CN.md 到 plugin 目录下查看 README_CN.md
# Prometheus
```yaml
scrape_configs:
- job_name: "monibuca"
metrics_path: "/api/metrics"
static_configs:
- targets: ["localhost:8080"]
```

View File

@@ -0,0 +1,5 @@
global:
loglevel: debug
hls:
pull:
live/test: http://live4.nbs.cn/channels/njtv/glgc/hd.m3u8

View File

@@ -10,6 +10,7 @@ import (
_ "m7s.live/v5/plugin/debug" _ "m7s.live/v5/plugin/debug"
_ "m7s.live/v5/plugin/flv" _ "m7s.live/v5/plugin/flv"
_ "m7s.live/v5/plugin/gb28181" _ "m7s.live/v5/plugin/gb28181"
_ "m7s.live/v5/plugin/hls"
_ "m7s.live/v5/plugin/logrotate" _ "m7s.live/v5/plugin/logrotate"
_ "m7s.live/v5/plugin/monitor" _ "m7s.live/v5/plugin/monitor"
_ "m7s.live/v5/plugin/mp4" _ "m7s.live/v5/plugin/mp4"

View File

@@ -42,6 +42,7 @@ func (a *AnnexB) GetSize() int {
func (a *AnnexB) GetTimestamp() time.Duration { func (a *AnnexB) GetTimestamp() time.Duration {
return a.DTS * time.Millisecond / 90 return a.DTS * time.Millisecond / 90
} }
func (a *AnnexB) GetCTS() time.Duration { func (a *AnnexB) GetCTS() time.Duration {
return (a.PTS - a.DTS) * time.Millisecond / 90 return (a.PTS - a.DTS) * time.Millisecond / 90
} }

View File

@@ -12,6 +12,12 @@ import (
"m7s.live/v5/pkg/util" "m7s.live/v5/pkg/util"
) )
const (
RelayModeRemux = "remux"
RelayModeRelay = "relay"
RelayModeMix = "mix"
)
type ( type (
Publish struct { Publish struct {
MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量 MaxCount int `default:"0" desc:"最大发布者数量"` // 最大发布者数量
@@ -27,6 +33,7 @@ type (
Speed float64 `default:"0" desc:"倍速"` // 倍速0 为不限速 Speed float64 `default:"0" desc:"倍速"` // 倍速0 为不限速
Key string `desc:"发布鉴权key"` // 发布鉴权key Key string `desc:"发布鉴权key"` // 发布鉴权key
RingSize util.Range[int] `default:"20-1024" desc:"RingSize范围"` // 缓冲区大小范围 RingSize util.Range[int] `default:"20-1024" desc:"RingSize范围"` // 缓冲区大小范围
RelayMode string `default:"remux" desc:"转发模式" enum:"remux:转格式,relay:纯转发,mix:混合转发"` // 转发模式
Dump bool Dump bool
} }
Subscribe struct { Subscribe struct {

View File

@@ -2,6 +2,7 @@ package hls
import ( import (
"compress/gzip" "compress/gzip"
"fmt"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
@@ -10,6 +11,9 @@ import (
"github.com/quangngotan95/go-m3u8/m3u8" "github.com/quangngotan95/go-m3u8/m3u8"
) )
var memoryM3u8 sync.Map
var memoryTs sync.Map
type M3u8Info struct { type M3u8Info struct {
Req *http.Request Req *http.Request
M3U8Count int //一共拉取的m3u8文件数量 M3U8Count int //一共拉取的m3u8文件数量
@@ -52,3 +56,67 @@ func readM3U8(res *http.Response) (playlist *m3u8.Playlist, err error) {
} }
return return
} }
const (
HLS_KEY_METHOD_AES_128 = "AES-128"
)
// https://datatracker.ietf.org/doc/draft-pantos-http-live-streaming/
// 以”#EXT“开头的表示一个”tag“,否则表示注释,直接忽略
type Playlist struct {
io.Writer
ExtM3U string // indicates that the file is an Extended M3U [M3U] Playlist file. (4.3.3.1) -- 每个M3U文件第一行必须是这个tag.
Version int // indicates the compatibility version of the Playlist file. (4.3.1.2) -- 协议版本号.
Sequence int // indicates the Media Sequence Number of the first Media Segment that appears in a Playlist file. (4.3.3.2) -- 第一个媒体段的序列号.
Targetduration int // specifies the maximum Media Segment duration. (4.3.3.1) -- 每个视频分段最大的时长(单位秒).
PlaylistType int // rovides mutability information about the Media Playlist file. (4.3.3.5) -- 提供关于PlayList的可变性的信息.
Discontinuity int // indicates a discontinuity between theMedia Segment that follows it and the one that preceded it. (4.3.2.3) -- 该标签后边的媒体文件和之前的媒体文件之间的编码不连贯(即发生改变)(场景用于插播广告等等).
Key PlaylistKey // specifies how to decrypt them. (4.3.2.4) -- 解密媒体文件的必要信息(表示怎么对media segments进行解码).
EndList string // indicates that no more Media Segments will be added to the Media Playlist file. (4.3.3.4) -- 标示没有更多媒体文件将会加入到播放列表中,它可能会出现在播放列表文件的任何地方,但是不能出现两次或以上.
Inf PlaylistInf // specifies the duration of a Media Segment. (4.3.2.1) -- 指定每个媒体段(ts)的持续时间.
tsCount int
}
// Discontinuity :
// file format
// number, type and identifiers of tracks
// timestamp sequence
// encoding parameters
// encoding sequence
type PlaylistKey struct {
Method string // specifies the encryption method. (4.3.2.4)
Uri string // key url. (4.3.2.4)
IV string // key iv. (4.3.2.4)
}
type PlaylistInf struct {
Duration float64
Title string
FilePath string
}
func (pl *Playlist) Init() (err error) {
// ss := fmt.Sprintf("#EXTM3U\n"+
// "#EXT-X-VERSION:%d\n"+
// "#EXT-X-MEDIA-SEQUENCE:%d\n"+
// "#EXT-X-TARGETDURATION:%d\n"+
// "#EXT-X-PLAYLIST-TYPE:%d\n"+
// "#EXT-X-DISCONTINUITY:%d\n"+
// "#EXT-X-KEY:METHOD=%s,URI=%s,IV=%s\n"+
// "#EXT-X-ENDLIST", hls.Version, hls.Sequence, hls.Targetduration, hls.PlaylistType, hls.Discontinuity, hls.Key.Method, hls.Key.Uri, hls.Key.IV)
_, err = fmt.Fprintf(pl, "#EXTM3U\n"+
"#EXT-X-VERSION:%d\n"+
"#EXT-X-MEDIA-SEQUENCE:%d\n"+
"#EXT-X-TARGETDURATION:%d\n", pl.Version, pl.Sequence, pl.Targetduration)
pl.Sequence++
return
}
func (pl *Playlist) WriteInf(inf PlaylistInf) (err error) {
_, err = fmt.Fprintf(pl, "#EXTINF:%.3f,\n"+
"%s\n", inf.Duration, inf.Title)
pl.tsCount++
return
}

View File

@@ -2,16 +2,28 @@ package hls
import ( import (
"context" "context"
"fmt"
"io"
"net/http" "net/http"
"os"
"path/filepath"
"strings"
"sync" "sync"
"time"
"github.com/quangngotan95/go-m3u8/m3u8"
"m7s.live/v5" "m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/config" "m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task" "m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
) )
type Puller struct { type Puller struct {
m7s.HTTPFilePuller task.Job
PullJob m7s.PullJob
Video M3u8Info Video M3u8Info
Audio M3u8Info Audio M3u8Info
TsHead http.Header `json:"-" yaml:"-"` //用于提供cookie等特殊身份的http头 TsHead http.Header `json:"-" yaml:"-"` //用于提供cookie等特殊身份的http头
@@ -25,10 +37,288 @@ func NewPuller(_ config.Pull) m7s.IPuller {
return p return p
} }
func (p *Puller) GetPullJob() *m7s.PullJob {
return &p.PullJob
}
func (p *Puller) GetTs(key string) (any, bool) { func (p *Puller) GetTs(key string) (any, bool) {
return p.memoryTs.Load(key) return p.memoryTs.Load(key)
} }
func (p *Puller) Run() (err error) { func (p *Puller) Start() (err error) {
if err = p.PullJob.Publish(); err != nil {
return
}
p.PullJob.Publisher.Speed = 1
if p.PullJob.PublishConfig.RelayMode != config.RelayModeRemux {
p.memoryTs.Store(p.PullJob.StreamPath, p)
}
return
}
func (p *Puller) Dispose() {
if p.PullJob.PublishConfig.RelayMode == config.RelayModeRelay {
memoryTs.Delete(p.PullJob.StreamPath)
}
}
func (p *Puller) Run() (err error) {
p.Video.Req, err = http.NewRequest("GET", p.PullJob.RemoteURL, nil)
if err != nil {
return
}
return p.pull(&p.Video)
}
func (p *Puller) writePublisher(t *mpegts.MpegTsStream) {
var audioCodec codec.FourCC
var audioStreamType, videoStreamType byte
for pes := range t.PESChan {
if p.Err() != nil {
continue
}
if pes.Header.Dts == 0 {
pes.Header.Dts = pes.Header.Pts
}
switch pes.Header.StreamID & 0xF0 {
case mpegts.STREAM_ID_VIDEO:
if videoStreamType == 0 {
for _, s := range t.PMT.Stream {
videoStreamType = s.StreamType
break
}
}
switch videoStreamType {
case mpegts.STREAM_TYPE_H264:
var annexb pkg.AnnexB
annexb.PTS = time.Duration(pes.Header.Pts)
annexb.DTS = time.Duration(pes.Header.Dts)
annexb.AppendOne(pes.Payload)
p.PullJob.Publisher.WriteVideo(&annexb)
case mpegts.STREAM_TYPE_H265:
var annexb pkg.AnnexB
annexb.PTS = time.Duration(pes.Header.Pts)
annexb.DTS = time.Duration(pes.Header.Dts)
annexb.Hevc = true
annexb.AppendOne(pes.Payload)
p.PullJob.Publisher.WriteVideo(&annexb)
default:
if audioStreamType == 0 {
for _, s := range t.PMT.Stream {
audioStreamType = s.StreamType
switch s.StreamType {
case mpegts.STREAM_TYPE_AAC:
audioCodec = codec.FourCC_MP4A
case mpegts.STREAM_TYPE_G711A:
audioCodec = codec.FourCC_ALAW
case mpegts.STREAM_TYPE_G711U:
audioCodec = codec.FourCC_ULAW
}
}
}
switch audioStreamType {
case mpegts.STREAM_TYPE_AAC:
var adts pkg.ADTS
adts.DTS = time.Duration(pes.Header.Dts)
adts.AppendOne(pes.Payload)
p.PullJob.Publisher.WriteAudio(&adts)
default:
var raw pkg.RawAudio
raw.FourCC = audioCodec
raw.Timestamp = time.Duration(pes.Header.Pts) * time.Millisecond / 90
raw.AppendOne(pes.Payload)
p.PullJob.Publisher.WriteAudio(&raw)
}
}
}
}
}
func (p *Puller) pull(info *M3u8Info) (err error) {
//请求失败自动退出
req := info.Req.WithContext(p.Context)
client := p.PullJob.HTTPClient
sequence := -1
lastTs := make(map[string]bool)
tsbuffer := make(chan io.ReadCloser)
tsRing := util.NewRing[string](6)
var tsReader *mpegts.MpegTsStream
var closer io.Closer
p.OnDispose(func() {
if closer != nil {
closer.Close()
}
})
if p.PullJob.PublishConfig.RelayMode != config.RelayModeRelay {
tsReader = &mpegts.MpegTsStream{
PESChan: make(chan *mpegts.MpegTsPESPacket, 50),
PESBuffer: make(map[uint16]*mpegts.MpegTsPESPacket),
}
go p.writePublisher(tsReader)
defer close(tsReader.PESChan)
}
defer close(tsbuffer)
var maxResolution *m3u8.PlaylistItem
for errcount := 0; err == nil; err = p.Err() {
resp, err1 := client.Do(req)
if err1 != nil {
return err1
}
req = resp.Request
if playlist, err2 := readM3U8(resp); err2 == nil {
errcount = 0
info.LastM3u8 = playlist.String()
//if !playlist.Live {
// log.Println(p.LastM3u8)
// return
//}
if playlist.Sequence <= sequence {
p.Warn("same sequence", "sequence", playlist.Sequence, "max", sequence)
time.Sleep(time.Second)
continue
}
info.M3U8Count++
sequence = playlist.Sequence
thisTs := make(map[string]bool)
tsItems := make([]*m3u8.SegmentItem, 0)
discontinuity := false
for _, item := range playlist.Items {
switch v := item.(type) {
case *m3u8.PlaylistItem:
if (maxResolution == nil || maxResolution.Resolution != nil && (maxResolution.Resolution.Width < v.Resolution.Width || maxResolution.Resolution.Height < v.Resolution.Height)) || maxResolution.Bandwidth < v.Bandwidth {
maxResolution = v
}
case *m3u8.DiscontinuityItem:
discontinuity = true
case *m3u8.SegmentItem:
thisTs[v.Segment] = true
if _, ok := lastTs[v.Segment]; ok && !discontinuity {
continue
}
tsItems = append(tsItems, v)
case *m3u8.MediaItem:
if p.Audio.Req == nil {
if url, err := req.URL.Parse(*v.URI); err == nil {
newReq, _ := http.NewRequest("GET", url.String(), nil)
newReq.Header = req.Header
p.Audio.Req = newReq
go p.pull(&p.Audio)
}
}
}
}
if maxResolution != nil && len(tsItems) == 0 {
if url, err := req.URL.Parse(maxResolution.URI); err == nil {
if strings.HasSuffix(url.Path, ".m3u8") {
p.Video.Req, _ = http.NewRequest("GET", url.String(), nil)
p.Video.Req.Header = req.Header
req = p.Video.Req
continue
}
}
}
tsCount := len(tsItems)
p.Debug("readM3U8", "sequence", sequence, "tscount", tsCount)
lastTs = thisTs
if tsCount > 3 {
tsItems = tsItems[tsCount-3:]
}
var plBuffer util.Buffer
relayPlayList := Playlist{
Writer: &plBuffer,
Targetduration: playlist.Target,
Sequence: playlist.Sequence,
}
if p.PullJob.PublishConfig.RelayMode != config.RelayModeRemux {
relayPlayList.Init()
}
var tsDownloaders = make([]*TSDownloader, len(tsItems))
for i, v := range tsItems {
if p.Err() != nil {
return p.Err()
}
tsUrl, _ := info.Req.URL.Parse(v.Segment)
tsReq, _ := http.NewRequestWithContext(p.Context, "GET", tsUrl.String(), nil)
tsReq.Header = p.TsHead
// t1 := time.Now()
tsDownloaders[i] = &TSDownloader{
client: client,
req: tsReq,
url: tsUrl,
dur: v.Duration,
}
tsDownloaders[i].Start()
}
ts := time.Now().UnixMilli()
for i, v := range tsDownloaders {
p.Debug("start download ts", "tsUrl", v.url.String())
v.wg.Wait()
if v.res != nil {
info.TSCount++
var reader io.Reader = v.res.Body
closer = v.res.Body
if p.SaveContext != nil && p.SaveContext.Err() == nil {
savePath := p.SaveContext.Value("path").(string)
os.MkdirAll(filepath.Join(savePath, p.PullJob.StreamPath), 0766)
if f, err := os.OpenFile(filepath.Join(savePath, p.PullJob.StreamPath, filepath.Base(v.url.Path)), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666); err == nil {
reader = io.TeeReader(v.res.Body, f)
closer = f
}
}
var tsBytes *util.Buffer
switch p.PullJob.PublishConfig.RelayMode {
case config.RelayModeRelay:
tsBytes = &util.Buffer{}
io.Copy(tsBytes, reader)
case config.RelayModeMix:
tsBytes = &util.Buffer{}
reader = io.TeeReader(reader, tsBytes)
fallthrough
case config.RelayModeRemux:
tsReader.Feed(reader)
}
if tsBytes != nil {
tsFilename := fmt.Sprintf("%d_%d.ts", ts, i)
tsFilePath := p.PullJob.StreamPath + "/" + tsFilename
var plInfo = PlaylistInf{
Title: p.PullJob.StreamPath + "/" + tsFilename,
Duration: v.dur,
FilePath: tsFilePath,
}
relayPlayList.WriteInf(plInfo)
p.memoryTs.Store(tsFilePath, tsBytes)
next := tsRing.Next()
if next.Value != "" {
item, _ := p.memoryTs.LoadAndDelete(next.Value)
if item == nil {
p.Warn("memoryTs delete nil", "tsFilePath", next.Value)
} else {
// item.Recycle()
}
}
next.Value = tsFilePath
tsRing = next
}
closer.Close()
} else if v.err != nil {
p.Error("reqTs", "streamPath", p.PullJob.StreamPath, "err", v.err)
} else {
p.Error("reqTs", "streamPath", p.PullJob.StreamPath)
}
p.Debug("finish download ts", "tsUrl", v.url.String())
}
if p.PullJob.PublishConfig.RelayMode != config.RelayModeRemux {
m3u8 := string(plBuffer)
p.Debug("write m3u8", "streamPath", p.PullJob.StreamPath, "m3u8", m3u8)
memoryM3u8.Store(p.PullJob.StreamPath, m3u8)
}
} else {
p.Error("readM3u8", "streamPath", p.PullJob.StreamPath, "err", err2)
errcount++
if errcount > 10 {
return err2
}
}
}
return return
} }

View File

@@ -60,9 +60,9 @@ func (s *SpeedControl) speedControl(speed float64, ts time.Duration) {
} }
should := time.Duration(float64(ts-s.beginTimestamp) / speed) should := time.Duration(float64(ts-s.beginTimestamp) / speed)
s.Delta = should - elapsed s.Delta = should - elapsed
// fmt.Println(speed, elapsed, should, s.Delta) //fmt.Println(speed, elapsed, should, s.Delta)
if s.Delta > threshold { if s.Delta > threshold {
time.Sleep(s.Delta) time.Sleep(min(s.Delta, time.Millisecond*500))
} }
} }
} }
@@ -200,8 +200,14 @@ func (p *Publisher) Start() (err error) {
} }
} }
} }
p.audioReady = util.NewPromise(p) p.audioReady = util.NewPromiseWithTimeout(p, p.PublishTimeout)
p.videoReady = util.NewPromise(p) if !p.PubAudio {
p.audioReady.Reject(ErrMuted)
}
p.videoReady = util.NewPromiseWithTimeout(p, p.PublishTimeout)
if !p.PubVideo {
p.videoReady.Reject(ErrMuted)
}
if p.Dump { if p.Dump {
f := filepath.Join("./dump", p.StreamPath) f := filepath.Join("./dump", p.StreamPath)
os.MkdirAll(filepath.Dir(f), 0666) os.MkdirAll(filepath.Dir(f), 0666)
@@ -362,16 +368,16 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
if t.FPS > 0 { if t.FPS > 0 {
frameDur := float64(time.Second) / float64(t.FPS) frameDur := float64(time.Second) / float64(t.FPS)
if math.Abs(float64(frame.Timestamp-t.LastTs)) > 10*frameDur { //时间戳突变 if math.Abs(float64(frame.Timestamp-t.LastTs)) > 10*frameDur { //时间戳突变
p.Warn("timestamp mutation", "fps", t.FPS, "lastTs", t.LastTs, "ts", frame.Timestamp, "frameDur", time.Duration(frameDur)) p.Warn("timestamp mutation", "fps", t.FPS, "lastTs", uint32(t.LastTs/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "frameDur", time.Duration(frameDur))
frame.Timestamp = t.LastTs + time.Duration(frameDur)
t.BaseTs = frame.Timestamp - ts t.BaseTs = frame.Timestamp - ts
frame.Timestamp = t.LastTs + time.Duration(frameDur)
} }
} }
t.LastTs = frame.Timestamp t.LastTs = frame.Timestamp
if p.Enabled(p, task.TraceLevel) { if p.Enabled(p, task.TraceLevel) {
codec := t.FourCC().String() codec := t.FourCC().String()
data := frame.Wraps[0].String() data := frame.Wraps[0].String()
p.Trace("write", "seq", frame.Sequence, "ts0", ts, "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data) p.Trace("write", "seq", frame.Sequence, "baseTs", int32(t.BaseTs/time.Millisecond), "ts0", uint32(ts/time.Millisecond), "ts", uint32(frame.Timestamp/time.Millisecond), "codec", codec, "size", bytesIn, "data", data)
} }
} }

View File

@@ -36,7 +36,7 @@ type (
PullJob struct { PullJob struct {
Connection Connection
Publisher *Publisher Publisher *Publisher
publishConfig *config.Publish PublishConfig *config.Publish
puller IPuller puller IPuller
conf *config.Pull conf *config.Pull
} }
@@ -83,9 +83,9 @@ func (p *PullJob) GetPullJob() *PullJob {
func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf config.Pull) *PullJob { func (p *PullJob) Init(puller IPuller, plugin *Plugin, streamPath string, conf config.Pull) *PullJob {
if conf.PubConf != nil { if conf.PubConf != nil {
p.publishConfig = conf.PubConf p.PublishConfig = conf.PubConf
} else { } else {
p.publishConfig = &plugin.config.Publish p.PublishConfig = &plugin.config.Publish
} }
p.Args = url.Values(conf.Args.DeepClone()) p.Args = url.Values(conf.Args.DeepClone())
p.conf = &conf p.conf = &conf
@@ -129,7 +129,7 @@ func (p *PullJob) Publish() (err error) {
if len(p.Args) > 0 { if len(p.Args) > 0 {
streamPath += "?" + p.Args.Encode() streamPath += "?" + p.Args.Encode()
} }
p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, streamPath, *p.publishConfig) p.Publisher, err = p.Plugin.PublishWithConfig(p.puller.GetTask().Context, streamPath, *p.PublishConfig)
p.Publisher.Type = PublishTypePull p.Publisher.Type = PublishTypePull
if err == nil && p.conf.MaxRetry != 0 { if err == nil && p.conf.MaxRetry != 0 {
p.Publisher.OnDispose(func() { p.Publisher.OnDispose(func() {