feat: add record type

This commit is contained in:
langhuihui
2025-02-05 16:45:05 +08:00
parent da4b8b4f5a
commit de986bde24
10 changed files with 34 additions and 36 deletions

12
api.go
View File

@@ -185,10 +185,10 @@ func (s *Server) StreamInfo(ctx context.Context, req *pb.StreamSnapRequest) (res
for record := range s.Records.Range {
if record.StreamPath == req.StreamPath {
recordings = append(recordings, &pb.RecordingDetail{
FilePath: record.FilePath,
FilePath: record.RecConf.FilePath,
Mode: record.Mode,
Fragment: durationpb.New(record.Fragment),
Append: record.Append,
Fragment: durationpb.New(record.RecConf.Fragment),
Append: record.RecConf.Append,
PluginName: record.Plugin.Meta.Name,
})
}
@@ -589,10 +589,10 @@ func (s *Server) StreamList(_ context.Context, req *pb.StreamListRequest) (res *
s.Records.Call(func() error {
for record := range s.Records.Range {
recordingMap[record.StreamPath] = append(recordingMap[record.StreamPath], &pb.RecordingDetail{
FilePath: record.FilePath,
FilePath: record.RecConf.FilePath,
Mode: record.Mode,
Fragment: durationpb.New(record.Fragment),
Append: record.Append,
Fragment: durationpb.New(record.RecConf.Fragment),
Append: record.RecConf.Append,
PluginName: record.Plugin.Meta.Name,
Pointer: uint64(record.GetTaskPointer()),
})

View File

@@ -29,12 +29,12 @@ func main() {
conf := flag.String("c", "config.yaml", "config file")
flag.Parse()
mp4.CustomFileName = func(job *m7s.RecordJob) string {
if job.Fragment == 0 {
return job.FilePath + ".mp4"
if job.RecConf.Fragment == 0 {
return job.RecConf.FilePath + ".mp4"
}
ss := strings.Split(job.StreamPath, "/")
lastPart := ss[len(ss)-1]
return filepath.Join(job.FilePath, fmt.Sprintf("%s_%s%s", lastPart, time.Now().Local().Format("2006-01-02-15-04-05"), ".mp4"))
return filepath.Join(job.RecConf.FilePath, fmt.Sprintf("%s_%s%s", lastPart, time.Now().Local().Format("2006-01-02-15-04-05"), ".mp4"))
}
// ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*100))
m7s.Run(context.Background(), *conf)

View File

@@ -72,7 +72,7 @@ func main() {
mp4.CustomFileName = func(job *m7s.RecordJob) string {
fileDir := strings.ReplaceAll(job.FilePath, job.StreamPath, "")
fileDir := strings.ReplaceAll(job.RecConf.FilePath, job.StreamPath, "")
if err := os.MkdirAll(fileDir, 0755); err != nil {
log.Default().Printf("创建目录失败:%s", err)
return fmt.Sprintf("%s_%s%s", job.StreamPath, time.Now().Local().Format("2006-01-02-15-04-05"), ".mp4")

View File

@@ -75,6 +75,7 @@ type (
Header HTTPValues
}
Record struct {
Type string `desc:"录制类型"` // 录制类型 mp4、flv、hls、hlsv7
FilePath string `desc:"录制文件路径"` // 录制文件路径
Fragment time.Duration `desc:"分片时长"` // 分片时长
Append bool `desc:"是否追加录制"` // 是否追加录制

View File

@@ -644,7 +644,7 @@ func (p *Plugin) Push(streamPath string, conf config.Push, subConf *config.Subsc
}
func (p *Plugin) Record(pub *Publisher, conf config.Record, subConf *config.Subscribe) *RecordJob {
recorder := p.Meta.Recorder()
recorder := p.Meta.Recorder(conf)
job := recorder.GetRecordJob().Init(recorder, p, pub.StreamPath, conf, subConf)
job.Depend(pub)
return job

View File

@@ -11,6 +11,7 @@ import (
"gorm.io/gorm"
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
rtmp "m7s.live/v5/plugin/rtmp/pkg"
@@ -137,7 +138,7 @@ func writeMetaTag(file *os.File, suber *m7s.Subscriber, filepositions []uint64,
writeMetaTagQueueTask.AddTask(task)
}
func NewRecorder() m7s.IRecorder {
func NewRecorder(conf config.Record) m7s.IRecorder {
return &Recorder{}
}
@@ -147,10 +148,10 @@ type Recorder struct {
}
var CustomFileName = func(job *m7s.RecordJob) string {
if job.Fragment == 0 || job.Append {
return fmt.Sprintf("%s.flv", job.FilePath)
if job.RecConf.Fragment == 0 || job.RecConf.Append {
return fmt.Sprintf("%s.flv", job.RecConf.FilePath)
}
return filepath.Join(job.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix()))
return filepath.Join(job.RecConf.FilePath, fmt.Sprintf("%d.flv", time.Now().Unix()))
}
func (r *Recorder) createStream(start time.Time) (err error) {
@@ -245,7 +246,7 @@ func (r *Recorder) Run() (err error) {
var duration int64
ctx := &r.RecordJob
suber := ctx.Subscriber
noFragment := ctx.Fragment == 0 || ctx.Append
noFragment := ctx.RecConf.Fragment == 0 || ctx.RecConf.Append
startTime := time.Now()
if ctx.BeforeDuration > 0 {
startTime = startTime.Add(-ctx.BeforeDuration)
@@ -254,13 +255,13 @@ func (r *Recorder) Run() (err error) {
return
}
if noFragment {
file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.Append, os.O_APPEND, os.O_TRUNC), 0666)
file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR|util.Conditional(ctx.RecConf.Append, os.O_APPEND, os.O_TRUNC), 0666)
if err != nil {
return
}
defer writeMetaTag(file, suber, filepositions, times, &duration)
}
if ctx.Append {
if ctx.RecConf.Append {
var metaData rtmp.EcmaArray
metaData, err = ReadMetaData(file)
keyframes := metaData["keyframes"].(map[string]any)
@@ -287,7 +288,7 @@ func (r *Recorder) Run() (err error) {
suber.StartVideoTS = time.Duration(ts) * time.Millisecond
offset, err = file.Seek(0, io.SeekEnd)
}
} else if ctx.Fragment == 0 {
} else if ctx.RecConf.Fragment == 0 {
_, err = file.Write(FLVHead)
} else {
if file, err = os.OpenFile(r.stream.FilePath, os.O_CREATE|os.O_RDWR, 0666); err != nil {
@@ -297,7 +298,7 @@ func (r *Recorder) Run() (err error) {
}
writer := NewFlvWriter(file)
checkFragment := func(absTime uint32) {
if duration = int64(absTime); time.Duration(duration)*time.Millisecond >= ctx.Fragment {
if duration = int64(absTime); time.Duration(duration)*time.Millisecond >= ctx.RecConf.Fragment {
writeMetaTag(file, suber, filepositions, times, &duration)
r.writeTailer(time.Now())
filepositions = []uint64{0}

View File

@@ -10,12 +10,13 @@ import (
"m7s.live/v5"
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/codec"
"m7s.live/v5/pkg/config"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
mpegts "m7s.live/v5/plugin/hls/pkg/ts"
)
func NewRecorder() m7s.IRecorder {
func NewRecorder(conf config.Record) m7s.IRecorder {
return &Recorder{}
}
@@ -31,10 +32,10 @@ type Recorder struct {
}
var CustomFileName = func(job *m7s.RecordJob) string {
if job.Fragment == 0 || job.Append {
return fmt.Sprintf("%s/%s.ts", job.FilePath, time.Now().Format("20060102150405"))
if job.RecConf.Fragment == 0 || job.RecConf.Append {
return fmt.Sprintf("%s/%s.ts", job.RecConf.FilePath, time.Now().Format("20060102150405"))
}
return filepath.Join(job.FilePath, time.Now().Format("20060102150405")+".ts")
return filepath.Join(job.RecConf.FilePath, time.Now().Format("20060102150405")+".ts")
}
func (r *Recorder) createStream(start time.Time) (err error) {
@@ -141,7 +142,7 @@ func (r *Recorder) createNewTs() {
}
func (r *Recorder) writeSegment(ts time.Duration) (err error) {
if dur := ts - r.lastTs; dur >= r.RecordJob.Fragment || r.lastTs == 0 {
if dur := ts - r.lastTs; dur >= r.RecordJob.RecConf.Fragment || r.lastTs == 0 {
if dur == ts && r.lastTs == 0 { //时间戳不对的情况首个默认为2s
dur = time.Duration(2) * time.Second
}

View File

@@ -176,7 +176,7 @@ func (p *MP4Plugin) StartRecord(ctx context.Context, req *mp4pb.ReqStartRecord)
res = &mp4pb.ResponseStartRecord{}
p.Server.Records.Call(func() error {
_, recordExists = p.Server.Records.Find(func(job *m7s.RecordJob) bool {
return job.StreamPath == req.StreamPath && job.FilePath == req.FilePath
return job.StreamPath == req.StreamPath && job.RecConf.FilePath == req.FilePath
})
return nil
})
@@ -215,7 +215,7 @@ func (p *MP4Plugin) EventStart(ctx context.Context, req *mp4pb.ReqEventRecord) (
p.Error("EventStart", "error", err)
}
}
recorder := p.Meta.Recorder()
recorder := p.Meta.Recorder(config.Record{})
var tmpJob *m7s.RecordJob
p.Server.Records.Call(func() error {
tmpJob, _ = p.Server.Records.Find(func(job *m7s.RecordJob) bool {

View File

@@ -43,7 +43,6 @@ type (
PullOnStart, Audio, StopOnIdle bool
config.Pull `gorm:"embedded;embeddedPrefix:pull_"`
config.Record `gorm:"embedded;embeddedPrefix:record_"`
RecordType string
ParentID uint
Type string
Status byte

View File

@@ -26,16 +26,14 @@ type (
task.ITask
GetRecordJob() *RecordJob
}
Recorder = func() IRecorder
Recorder = func(config.Record) IRecorder
RecordJob struct {
task.Job
StreamPath string // 对应本地流
Plugin *Plugin
Subscriber *Subscriber
SubConf *config.Subscribe
Fragment time.Duration
Append bool
FilePath string
RecConf *config.Record
recorder IRecorder
EventId string `json:"eventId" desc:"事件编号"`
Mode RecordMode `json:"mode" desc:"事件类型,auto=连续录像模式event=事件录像模式"`
@@ -77,7 +75,7 @@ func (r *DefaultRecorder) Start() (err error) {
}
func (p *RecordJob) GetKey() string {
return p.FilePath
return p.RecConf.FilePath
}
func (p *RecordJob) Subscribe() (err error) {
@@ -88,9 +86,7 @@ func (p *RecordJob) Subscribe() (err error) {
func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, conf config.Record, subConf *config.Subscribe) *RecordJob {
p.Plugin = plugin
p.Fragment = conf.Fragment
p.Append = conf.Append
p.FilePath = conf.FilePath
p.RecConf = &conf
p.StreamPath = streamPath
if subConf == nil {
conf := p.Plugin.config.Subscribe