From de986bde245fcca2ad33a20328507aebd15da479 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Wed, 5 Feb 2025 16:45:05 +0800 Subject: [PATCH] feat: add record type --- api.go | 12 ++++++------ example/custom/main.go | 6 +++--- example/qiaopin/main.go | 2 +- pkg/config/types.go | 1 + plugin.go | 2 +- plugin/flv/pkg/record.go | 19 ++++++++++--------- plugin/hls/pkg/record.go | 11 ++++++----- plugin/mp4/api.go | 4 ++-- pull-proxy.go | 1 - recoder.go | 12 ++++-------- 10 files changed, 34 insertions(+), 36 deletions(-) diff --git a/api.go b/api.go index 407a4fc..76220eb 100644 --- a/api.go +++ b/api.go @@ -185,10 +185,10 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res for record := range s.Records.Range { if record.StreamPath == req.StreamPath { recordings = append(recordings, &pb.RecordingDetail{ - FilePath: record.FilePath, + FilePath: record.RecConf.FilePath, Mode: record.Mode, - Fragment: durationpb.New(record.Fragment), - Append: record.Append, + Fragment: durationpb.New(record.RecConf.Fragment), + Append: record.RecConf.Append, PluginName: record.Plugin.Meta.Name, }) } @@ -589,10 +589,10 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res * s.Records.Call(func() error { for record := range s.Records.Range { recordingMap[record.StreamPath] = append(recordingMap[record.StreamPath], &pb.RecordingDetail{ - FilePath: record.FilePath, + FilePath: record.RecConf.FilePath, Mode: record.Mode, - Fragment: durationpb.New(record.Fragment), - Append: record.Append, + Fragment: durationpb.New(record.RecConf.Fragment), + Append: record.RecConf.Append, PluginName: record.Plugin.Meta.Name, Pointer: uint64(record.GetTaskPointer()), }) diff --git a/example/custom/main.go b/example/custom/main.go index d8bcd59..25d2c64 100644 --- a/example/custom/main.go +++ b/example/custom/main.go @@ -29,12 +29,12 @@ func main() { conf := flag.String("c", "config.yaml", "config file") flag.Parse() mp4.CustomFileName = func(job *m7s.RecordJob) string { - if job.Fragment == 0 { - return job.FilePath + ".mp4" + if job.RecConf.Fragment == 0 { + return job.RecConf.FilePath + ".mp4" } ss := strings.Split(job.StreamPath, "/") lastPart := ss[len(ss)-1] - return filepath.Join(job.FilePath, fmt.Sprintf("%s_%s%s", lastPart, time.Now().Local().Format("2006-01-02-15-04-05"), ".mp4")) + return filepath.Join(job.RecConf.FilePath, fmt.Sprintf("%s_%s%s", lastPart, time.Now().Local().Format("2006-01-02-15-04-05"), ".mp4")) } // ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100)) m7s.Run(context.Background(), *conf) diff --git a/example/qiaopin/main.go b/example/qiaopin/main.go index 7906d71..bb28d43 100644 --- a/example/qiaopin/main.go +++ b/example/qiaopin/main.go @@ -72,7 +72,7 @@ func main() { mp4.CustomFileName = func(job *m7s.RecordJob) string { - fileDir := strings.ReplaceAll(job.FilePath, job.StreamPath, "") + fileDir := strings.ReplaceAll(job.RecConf.FilePath, job.StreamPath, "") if err := os.MkdirAll(fileDir, 0755); err != nil { log.Default().Printf("创建目录失败:%s", err) return fmt.Sprintf("%s_%s%s", job.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05"), ".mp4") diff --git a/pkg/config/types.go b/pkg/config/types.go index 37ac6cb..59264ff 100755 --- a/pkg/config/types.go +++ b/pkg/config/types.go @@ -75,6 +75,7 @@ type ( Header HTTPValues } Record struct { + Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7 FilePath string `desc:"录制文件路径"` // 录制文件路径 Fragment time.Duration `desc:"分片时长"` // 分片时长 Append bool `desc:"是否追加录制"` // 是否追加录制 diff --git a/plugin.go b/plugin.go index 87e5a0c..3b40712 100644 --- a/plugin.go +++ b/plugin.go @@ -644,7 +644,7 @@ func (p *Plugin) Push(streamPath string, conf config.Push, subConf *config.Subsc } func (p *Plugin) Record(pub *Publisher, conf config.Record, subConf *config.Subscribe) *RecordJob { - recorder := p.Meta.Recorder() + recorder := p.Meta.Recorder(conf) job := recorder.GetRecordJob().Init(recorder, p, pub.StreamPath, conf, subConf) job.Depend(pub) return job diff --git a/plugin/flv/pkg/record.go b/plugin/flv/pkg/record.go index 73836a1..de406d1 100644 --- a/plugin/flv/pkg/record.go +++ b/plugin/flv/pkg/record.go @@ -11,6 +11,7 @@ import ( "gorm.io/gorm" "m7s.live/v5" "m7s.live/v5/pkg" + "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" rtmp "m7s.live/v5/plugin/rtmp/pkg" @@ -137,7 +138,7 @@ func writeMetaTag(file *os.File, suber *m7s.Subscriber, filepositions []uint64, writeMetaTagQueueTask.AddTask(task) } -func NewRecorder() m7s.IRecorder { +func NewRecorder(conf config.Record) m7s.IRecorder { return &Recorder{} } @@ -147,10 +148,10 @@ type Recorder struct { } var CustomFileName = func(job *m7s.RecordJob) string { - if job.Fragment == 0 || job.Append { - return fmt.Sprintf("%s.flv", job.FilePath) + if job.RecConf.Fragment == 0 || job.RecConf.Append { + return fmt.Sprintf("%s.flv", job.RecConf.FilePath) } - return filepath.Join(job.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())) + return filepath.Join(job.RecConf.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix())) } func (r *Recorder) createStream(start time.Time) (err error) { @@ -245,7 +246,7 @@ func (r *Recorder) Run() (err error) { var duration int64 ctx := &r.RecordJob suber := ctx.Subscriber - noFragment := ctx.Fragment == 0 || ctx.Append + noFragment := ctx.RecConf.Fragment == 0 || ctx.RecConf.Append startTime := time.Now() if ctx.BeforeDuration > 0 { startTime = startTime.Add(-ctx.BeforeDuration) @@ -254,13 +255,13 @@ func (r *Recorder) Run() (err error) { return } if noFragment { - file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666) + file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.RecConf.Append, os.O_APPEND, os.O_TRUNC), 0666) if err != nil { return } defer writeMetaTag(file, suber, filepositions, times, &duration) } - if ctx.Append { + if ctx.RecConf.Append { var metaData rtmp.EcmaArray metaData, err = ReadMetaData(file) keyframes := metaData["keyframes"].(map[string]any) @@ -287,7 +288,7 @@ func (r *Recorder) Run() (err error) { suber.StartVideoTS = time.Duration(ts) * time.Millisecond offset, err = file.Seek(0, io.SeekEnd) } - } else if ctx.Fragment == 0 { + } else if ctx.RecConf.Fragment == 0 { _, err = file.Write(FLVHead) } else { if file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil { @@ -297,7 +298,7 @@ func (r *Recorder) Run() (err error) { } writer := NewFlvWriter(file) checkFragment := func(absTime uint32) { - if duration = int64(absTime); time.Duration(duration)*time.Millisecond >= ctx.Fragment { + if duration = int64(absTime); time.Duration(duration)*time.Millisecond >= ctx.RecConf.Fragment { writeMetaTag(file, suber, filepositions, times, &duration) r.writeTailer(time.Now()) filepositions = []uint64{0} diff --git a/plugin/hls/pkg/record.go b/plugin/hls/pkg/record.go index 3c4dc38..8a3a347 100644 --- a/plugin/hls/pkg/record.go +++ b/plugin/hls/pkg/record.go @@ -10,12 +10,13 @@ import ( "m7s.live/v5" "m7s.live/v5/pkg" "m7s.live/v5/pkg/codec" + "m7s.live/v5/pkg/config" "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" mpegts "m7s.live/v5/plugin/hls/pkg/ts" ) -func NewRecorder() m7s.IRecorder { +func NewRecorder(conf config.Record) m7s.IRecorder { return &Recorder{} } @@ -31,10 +32,10 @@ type Recorder struct { } var CustomFileName = func(job *m7s.RecordJob) string { - if job.Fragment == 0 || job.Append { - return fmt.Sprintf("%s/%s.ts", job.FilePath, time.Now().Format("20060102150405")) + if job.RecConf.Fragment == 0 || job.RecConf.Append { + return fmt.Sprintf("%s/%s.ts", job.RecConf.FilePath, time.Now().Format("20060102150405")) } - return filepath.Join(job.FilePath, time.Now().Format("20060102150405")+".ts") + return filepath.Join(job.RecConf.FilePath, time.Now().Format("20060102150405")+".ts") } func (r *Recorder) createStream(start time.Time) (err error) { @@ -141,7 +142,7 @@ func (r *Recorder) createNewTs() { } func (r *Recorder) writeSegment(ts time.Duration) (err error) { - if dur := ts - r.lastTs; dur >= r.RecordJob.Fragment || r.lastTs == 0 { + if dur := ts - r.lastTs; dur >= r.RecordJob.RecConf.Fragment || r.lastTs == 0 { if dur == ts && r.lastTs == 0 { //时间戳不对的情况,首个默认为2s dur = time.Duration(2) * time.Second } diff --git a/plugin/mp4/api.go b/plugin/mp4/api.go index 28e1f3d..3c527a6 100644 --- a/plugin/mp4/api.go +++ b/plugin/mp4/api.go @@ -176,7 +176,7 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord) res = &mp4pb.ResponseStartRecord{} p.Server.Records.Call(func() error { _, recordExists = p.Server.Records.Find(func(job *m7s.RecordJob) bool { - return job.StreamPath == req.StreamPath && job.FilePath == req.FilePath + return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath }) return nil }) @@ -215,7 +215,7 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) ( p.Error("EventStart", "error", err) } } - recorder := p.Meta.Recorder() + recorder := p.Meta.Recorder(config.Record{}) var tmpJob *m7s.RecordJob p.Server.Records.Call(func() error { tmpJob, _ = p.Server.Records.Find(func(job *m7s.RecordJob) bool { diff --git a/pull-proxy.go b/pull-proxy.go index 5266db7..386c356 100644 --- a/pull-proxy.go +++ b/pull-proxy.go @@ -43,7 +43,6 @@ type ( PullOnStart, Audio, StopOnIdle bool config.Pull `gorm:"embedded;embeddedPrefix:pull_"` config.Record `gorm:"embedded;embeddedPrefix:record_"` - RecordType string ParentID uint Type string Status byte diff --git a/recoder.go b/recoder.go index 19b11c7..9a3bc1f 100644 --- a/recoder.go +++ b/recoder.go @@ -26,16 +26,14 @@ type ( task.ITask GetRecordJob() *RecordJob } - Recorder = func() IRecorder + Recorder = func(config.Record) IRecorder RecordJob struct { task.Job StreamPath string // 对应本地流 Plugin *Plugin Subscriber *Subscriber SubConf *config.Subscribe - Fragment time.Duration - Append bool - FilePath string + RecConf *config.Record recorder IRecorder EventId string `json:"eventId" desc:"事件编号"` Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式,event=事件录像模式"` @@ -77,7 +75,7 @@ func (r *DefaultRecorder) Start() (err error) { } func (p *RecordJob) GetKey() string { - return p.FilePath + return p.RecConf.FilePath } func (p *RecordJob) Subscribe() (err error) { @@ -88,9 +86,7 @@ func (p *RecordJob) Subscribe() (err error) { func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, conf config.Record, subConf *config.Subscribe) *RecordJob { p.Plugin = plugin - p.Fragment = conf.Fragment - p.Append = conf.Append - p.FilePath = conf.FilePath + p.RecConf = &conf p.StreamPath = streamPath if subConf == nil { conf := p.Plugin.config.Subscribe