diff --git a/README.md b/README.md index d47c42f..87df892 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,8 @@ func main() { |-----------|-------------| | disable_rm | Disables the memory pool | | sqlite | Enables the sqlite DB | - +| duckdb | Enables the duckdb DB | +| taskpanic | Throws panic, for testing | ## More Example diff --git a/README_CN.md b/README_CN.md index 5173013..a280377 100644 --- a/README_CN.md +++ b/README_CN.md @@ -24,7 +24,8 @@ func main() { |-----------|-------------| | disable_rm | 禁用内存池 | | sqlite | 启用 sqlite | - +| duckdb | 启用 duckdb | +| taskpanic | 抛出 panic,用于测试 | ## 更多示例 查看 example 目录 diff --git a/doc/pull.md b/doc/pull.md new file mode 100644 index 0000000..a6cbde5 --- /dev/null +++ b/doc/pull.md @@ -0,0 +1,132 @@ +# sequence +```mermaid +sequenceDiagram + participant P as Plugin + participant M as PluginMeta + participant PJ as PullJob + participant S as Server + participant IPuller as IPuller + + P->>P: Pull(streamPath, conf) + P->>M: Meta.Puller(conf) + M-->>P: puller (IPuller) + P->>PJ: GetPullJob() + PJ-->>P: pullJob + P->>PJ: Init(puller, p, streamPath, conf) + PJ->>S: Server.Pulls.Add(p, logger) + S->>PJ: Start() + PJ->>IPuller: SetRetry(conf.MaxRetry, conf.RetryInterval) + PJ->>PJ: Description = {...} + Note over PJ: Set description with plugin info, streamPath, URL, etc. + PJ->>IPuller: Start() +``` + +# simple config +## flv plugin +### local file +```yaml +flv: + pull: + live/test: /Users/dexter/Movies/jb-demo.flv +``` +### remote file +```yaml +flv: + pull: + live/test: http://192.168.1.100/live/stream.flv +``` + +## mp4 plugin +### local file +```yaml +mp4: + pull: + live/test: /Users/dexter/Movies/jb-demo.mp4 +``` +### remote file +```yaml +mp4: + pull: + live/test: http://192.168.1.100/live/stream.mp4 +``` + +## srt plugin +### local file +```yaml +srt: + pull: + live/test: srt://127.0.0.1:6000?streamid=subscribe:/live/stream&passphrase=foobarfoobar +``` + +## rtmp plugin +```yaml +rtmp: + pull: + live/test: rtmp://127.0.0.1/live/stream +``` + +## rtsp plugin +```yaml +rtsp: + pull: + live/test: rtsp://127.0.0.1/live/stream +``` + +## hls plugin +```yaml +hls: + pull: + live/test: http://127.0.0.1/live/stream.m3u8 +``` + +## gb28181 plugin +deivceID/channelID +```yaml +gb28181: + pull: + live/test: 34020000002000000001/34020000002000000001 +``` +# full config +## pull on subscribe +```yaml +xxx: + onsub: + pull: + .*: $0 +``` + +## config retry +```yaml +xxx: + pull: + live/test: xxxx + maxRetry: 3 + retryInterval: 5s +``` + +## config proxy +```yaml +xxx: + pull: + live/test: xxxx + proxy: http://127.0.0.1:8080 +``` + +## config header +```yaml +xxx: + pull: + live/test: xxxx + header: + User-Agent: xxx +``` + +## config args +```yaml +xxx: + pull: + live/test: xxxx + args: + user: xxx + password: xxx +``` \ No newline at end of file diff --git a/example/custom/config.yaml b/example/custom/config.yaml new file mode 100644 index 0000000..6dff680 --- /dev/null +++ b/example/custom/config.yaml @@ -0,0 +1,23 @@ +global: + loglevel: debug + http: :8081 + tcp: :50052 + +rtsp: + pull: + live/test: rtsp://127.0.0.1:8554/live/test + +#transcode: +# onpub: +# transform: +# ^live.+: +# output: +# - target: rtmp://localhost/trans/$0/small +# conf: -loglevel debug -c:a aac -c:v h264 -vf scale=320:240 + +mp4: + onpub: + record: + ^live/.+: + fragment: 30s + filepath: record/mp4 \ No newline at end of file diff --git a/example/custom/main.go b/example/custom/main.go new file mode 100644 index 0000000..f90835a --- /dev/null +++ b/example/custom/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "context" + "flag" + "fmt" + "m7s.live/m7s/v5" + _ "m7s.live/m7s/v5/plugin/console" + _ "m7s.live/m7s/v5/plugin/debug" + _ "m7s.live/m7s/v5/plugin/flv" + _ "m7s.live/m7s/v5/plugin/gb28181" + _ "m7s.live/m7s/v5/plugin/logrotate" + _ "m7s.live/m7s/v5/plugin/monitor" + _ "m7s.live/m7s/v5/plugin/mp4" + mp4 "m7s.live/m7s/v5/plugin/mp4/pkg" + _ "m7s.live/m7s/v5/plugin/preview" + _ "m7s.live/m7s/v5/plugin/rtmp" + _ "m7s.live/m7s/v5/plugin/rtsp" + _ "m7s.live/m7s/v5/plugin/sei" + _ "m7s.live/m7s/v5/plugin/srt" + _ "m7s.live/m7s/v5/plugin/stress" + _ "m7s.live/m7s/v5/plugin/transcode" + _ "m7s.live/m7s/v5/plugin/webrtc" + "path/filepath" + "strings" + "time" +) + +func main() { + conf := flag.String("c", "config.yaml", "config file") + flag.Parse() + mp4.CustomFileName = func(job *m7s.RecordJob) string { + if job.Fragment == 0 { + return job.FilePath + ".mp4" + } + ss := strings.Split(job.StreamPath, "/") + lastPart := ss[len(ss)-1] + return filepath.Join(job.FilePath, fmt.Sprintf("%s_%s%s", lastPart, time.Now().Local().Format("2006-01-02-15-04-05"), ".mp4")) + } + // ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100)) + m7s.Run(context.Background(), *conf) +} diff --git a/example/default/readflv.yaml b/example/default/readflv.yaml index 960b0f1..9e2dff9 100644 --- a/example/default/readflv.yaml +++ b/example/default/readflv.yaml @@ -3,7 +3,7 @@ global: disableall: true rtsp: enable: true - listenaddr: :554 + tcp: :8554 flv: enable: true pull: diff --git a/example/default/transcode.yaml b/example/default/transcode.yaml index 2b8cfe2..a182ebd 100644 --- a/example/default/transcode.yaml +++ b/example/default/transcode.yaml @@ -1,25 +1,16 @@ global: - loglevel: debug - -#rtsp: -# tcp: -# listenaddr: :10554 - + loglevel: trace + http: :8081 + tcp: :50052 +rtsp: + pull: + live/test: rtsp://127.0.0.1:8554/live/test +debug: + profile: cpu.prof transcode: onpub: transform: -# .+: -# output: -# - target: rtmp://localhost/$0/h265 -# conf: -loglevel debug -c:a aac -c:v hevc_videotoolbox - live/.*: - output: - - target: rtmp://localhost/$0/h264 - overlay: rtmp://localhost/overlay/test -# filter: drawtext=fontfile=/usr/share/fonts/truetype/noto/NotoSans-Regular.ttf:fontsize=24:fontcolor=white:box=1:boxcolor=black@0.5:boxborderw=5:x=(w-tw)/2:y=h-th-10:text='%{localtime\:%Y-%m-%d %H.%M.%S}' - filter: | - [1:v]crop=400:300:10:10[overlay]; - [0:v][overlay]overlay=600:100[base]; - [base]drawtext=fontfile=/usr/share/fonts/truetype/noto/NotoSans-Regular.ttf:fontsize=24:fontcolor=white:box=1:boxcolor=black@0.5:boxborderw=5:x=(w-tw)/2:y=h-th-10:text='%{localtime\:%Y-%m-%d %H.%M.%S}'[out] - conf: -loglevel debug -c:a copy -c:v h264 - + ^live.+: + output: + - target: rtmp://localhost/trans/$0/small + conf: -loglevel debug -c:a aac -c:v h264 -vf scale=320:240 diff --git a/pkg/config/types.go b/pkg/config/types.go index bf7d6fc..e7d8999 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -39,9 +39,9 @@ type ( } Pull struct { URL string `desc:"拉流地址"` - MaxRetry int `desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉,高于0 的数代表最大重拉次数 - RetryInterval time.Duration `default:"5s" desc:"重试间隔"` // 重试间隔 - Proxy string `desc:"代理地址"` // 代理地址 + MaxRetry int `default:"-1" desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉,高于0 的数代表最大重拉次数 + RetryInterval time.Duration `default:"5s" desc:"重试间隔"` // 重试间隔 + Proxy string `desc:"代理地址"` // 代理地址 Header map[string][]string Args url.Values } diff --git a/pkg/ring-writer.go b/pkg/ring-writer.go index 49be893..ee4eff8 100644 --- a/pkg/ring-writer.go +++ b/pkg/ring-writer.go @@ -150,6 +150,7 @@ func (rb *RingWriter) Step() (normal bool) { // do not remove only idr if next == rb.IDRingList.Back().Value { if rb.Size < rb.SizeRange[1] { + rb.SLogger.Debug("only idr") rb.glow(5) next = rb.Next() } @@ -160,7 +161,7 @@ func (rb *RingWriter) Step() (normal bool) { rb.IDRingList.Remove(oldIDR) rb.Unlock() } else { - rb.SLogger.Log(nil, task.TraceLevel, "not enough buffer") + rb.SLogger.Debug("not enough buffer") rb.glow(5) next = rb.Next() } diff --git a/pkg/task/panic.go b/pkg/task/panic.go index 0216d01..749d2be 100644 --- a/pkg/task/panic.go +++ b/pkg/task/panic.go @@ -1,3 +1,6 @@ +//go:build !taskpanic +// +build !taskpanic + package task -var ThrowPanic = false \ No newline at end of file +var ThrowPanic = false diff --git a/pkg/task/task.go b/pkg/task/task.go index 4027cbf..e28634e 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -228,7 +228,7 @@ func (task *Task) GetSignal() any { } func (task *Task) checkRetry(err error) bool { - if errors.Is(err, ErrTaskComplete) { + if errors.Is(err, ErrTaskComplete) || errors.Is(err, ErrExit) { return false } if task.retry.MaxRetry < 0 || task.retry.RetryCount < task.retry.MaxRetry { @@ -349,9 +349,21 @@ func (task *Task) ResetRetryCount() { } func (task *Task) run(handler func() error) { - if err := handler(); err == nil { - task.Stop(ErrTaskComplete) - } else { - task.Stop(err) - } + var err error + defer func() { + if !ThrowPanic { + if r := recover(); r != nil { + err = errors.New(fmt.Sprint(r)) + if task.Logger != nil { + task.Error("panic", "error", err, "stack", string(debug.Stack())) + } + } + } + if err == nil { + task.Stop(ErrTaskComplete) + } else { + task.Stop(err) + } + }() + err = handler() } diff --git a/pkg/util/buf-reader.go b/pkg/util/buf-reader.go index 8e85f1c..8a56561 100644 --- a/pkg/util/buf-reader.go +++ b/pkg/util/buf-reader.go @@ -41,8 +41,6 @@ func NewBufReaderWithBufLen(reader io.Reader, bufLen int) (r *BufReader) { func NewBufReaderBuffersChan(feedChan chan net.Buffers) (r *BufReader) { r = &BufReader{ - Allocator: NewScalableMemoryAllocator(defaultBufSize), - BufLen: defaultBufSize, feedData: func() error { data, ok := <-feedChan if !ok { @@ -63,8 +61,6 @@ func NewBufReaderBuffersChan(feedChan chan net.Buffers) (r *BufReader) { func NewBufReaderChan(feedChan chan []byte) (r *BufReader) { r = &BufReader{ - Allocator: NewScalableMemoryAllocator(defaultBufSize), - BufLen: defaultBufSize, feedData: func() error { data, ok := <-feedChan if !ok { @@ -87,7 +83,9 @@ func NewBufReader(reader io.Reader) (r *BufReader) { func (r *BufReader) Recycle() { r.buf = MemoryReader{} - r.Allocator.Recycle() + if r.Allocator != nil { + r.Allocator.Recycle() + } } func (r *BufReader) Buffered() int { diff --git a/plugin/debug/index.go b/plugin/debug/index.go index ec4f383..0081537 100644 --- a/plugin/debug/index.go +++ b/plugin/debug/index.go @@ -1,17 +1,17 @@ package plugin_debug import ( + myproc "github.com/cloudwego/goref/pkg/proc" "github.com/go-delve/delve/pkg/config" "github.com/go-delve/delve/service/debugger" "io" + "m7s.live/m7s/v5" "net/http" "net/http/pprof" "os" + runtimePPROF "runtime/pprof" "strings" "time" - - myproc "github.com/cloudwego/goref/pkg/proc" - "m7s.live/m7s/v5" ) var _ = m7s.InstallPlugin[DebugPlugin]() @@ -19,8 +19,10 @@ var conf, _ = config.LoadConfig() type DebugPlugin struct { m7s.Plugin - ChartPeriod time.Duration `default:"1s" desc:"图表更新周期"` - Grfout string `default:"grf.out" desc:"grf输出文件"` + ProfileDuration time.Duration `default:"10s" desc:"profile持续时间"` + Profile string `desc:"采集profile存储文件"` + ChartPeriod time.Duration `default:"1s" desc:"图表更新周期"` + Grfout string `default:"grf.out" desc:"grf输出文件"` } type WriteToFile struct { @@ -41,6 +43,24 @@ func (w *WriteToFile) WriteHeader(statusCode int) { // w.w.WriteHeader(statusCode) } +func (p *DebugPlugin) OnInit() error { + if p.Profile != "" { + go func() { + file, err := os.Create(p.Profile) + if err != nil { + return + } + defer file.Close() + p.Info("cpu profile start") + err = runtimePPROF.StartCPUProfile(file) + time.Sleep(p.ProfileDuration) + runtimePPROF.StopCPUProfile() + p.Info("cpu profile done") + }() + } + return nil +} + func (p *DebugPlugin) Pprof_Trace(w http.ResponseWriter, r *http.Request) { r.URL.Path = "/debug" + r.URL.Path pprof.Trace(w, r) diff --git a/plugin/flv/pkg/record.go b/plugin/flv/pkg/record.go index ec4e8d2..66d3d04 100644 --- a/plugin/flv/pkg/record.go +++ b/plugin/flv/pkg/record.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "slices" - "strings" "time" "m7s.live/m7s/v5" @@ -145,6 +144,13 @@ type Recorder struct { m7s.DefaultRecorder } +var CustomFileName = func(job *m7s.RecordJob) string { + if job.Fragment == 0 || job.Append { + return fmt.Sprintf("%s.flv", job.FilePath) + } + return filepath.Join(job.FilePath, time.Now().Local().Format("2006-01-02T15:04:05")+".flv") +} + func (r *Recorder) Run() (err error) { var file *os.File var filepositions []uint64 @@ -155,11 +161,7 @@ func (r *Recorder) Run() (err error) { suber := ctx.Subscriber noFragment := ctx.Fragment == 0 || ctx.Append if noFragment { - filePath := ctx.FilePath - if !strings.HasSuffix(filePath, ".flv") { - filePath += ".flv" - } - if file, err = os.OpenFile(filePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666); err != nil { + if file, err = os.OpenFile(CustomFileName(ctx), os.O_CREATE|os.O_RDWR|util.Conditional(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666); err != nil { return } defer writeMetaTag(file, suber, filepositions, times, &duration) @@ -194,7 +196,7 @@ func (r *Recorder) Run() (err error) { } else if ctx.Fragment == 0 { _, err = file.Write(FLVHead) } else { - if file, err = os.OpenFile(filepath.Join(ctx.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())), os.O_CREATE|os.O_RDWR, 0666); err != nil { + if file, err = os.OpenFile(CustomFileName(ctx), os.O_CREATE|os.O_RDWR, 0666); err != nil { return } _, err = file.Write(FLVHead) @@ -206,7 +208,7 @@ func (r *Recorder) Run() (err error) { filepositions = []uint64{0} times = []float64{0} offset = 0 - if file, err = os.OpenFile(filepath.Join(ctx.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())), os.O_CREATE|os.O_RDWR, 0666); err != nil { + if file, err = os.OpenFile(CustomFileName(ctx), os.O_CREATE|os.O_RDWR, 0666); err != nil { return } _, err = file.Write(FLVHead) diff --git a/plugin/mp4/api.go b/plugin/mp4/api.go index f2fc6e2..60f2dff 100644 --- a/plugin/mp4/api.go +++ b/plugin/mp4/api.go @@ -47,7 +47,7 @@ func (p *MP4Plugin) List(ctx context.Context, req *pb.ReqRecordList) (resp *pb.R if req.FilePath == "" { p.DB.Find(&streams, "end_time>? AND start_time? AND start_time? AND start_time? AND file_path=?", p.PullStartTime, p.PullJob.RemoteURL) + tx := p.PullJob.Plugin.DB.Find(&p.Streams, "end_time>? AND stream_path=?", p.PullStartTime, p.PullJob.RemoteURL) if tx.Error != nil { return tx.Error } diff --git a/recoder.go b/recoder.go index 9b7320b..b542483 100644 --- a/recoder.go +++ b/recoder.go @@ -35,6 +35,7 @@ type ( ID uint `gorm:"primarykey"` StartTime, EndTime time.Time FilePath string + StreamPath string AudioCodec, VideoCodec string } ) diff --git a/server.go b/server.go index 68cf69a..80ad96e 100644 --- a/server.go +++ b/server.go @@ -13,6 +13,8 @@ import ( "strings" "time" + "github.com/shirou/gopsutil/v3/cpu" + "m7s.live/m7s/v5/pkg/task" "m7s.live/m7s/v5/pkg/config" @@ -121,7 +123,7 @@ func NewServer(conf any) (s *Server) { "arch": sysruntime.GOARCH, "cpus": int32(sysruntime.NumCPU()), } - s.Transforms.PublishEvent = make(chan *Publisher, 1) + s.Transforms.PublishEvent = make(chan *Publisher, 10) s.prometheusDesc.init() return } @@ -356,6 +358,13 @@ func (c *CheckSubWaitTimeout) GetTickInterval() time.Duration { } func (c *CheckSubWaitTimeout) Tick(any) { + percents, err := cpu.Percent(time.Second, false) + if err == nil { + for _, cpu := range percents { + c.Info("tick", "cpu", cpu, "streams", c.s.Streams.Length, "subscribers", c.s.Subscribers.Length, "waits", c.s.Waiting.Length) + } + } + for waits := range c.s.Waiting.Range { for sub := range waits.Range { select {