mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-09-27 03:25:56 +08:00
213 lines
5.6 KiB
Go
213 lines
5.6 KiB
Go
package m7s
|
|
|
|
import (
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"gorm.io/gorm"
|
|
|
|
"m7s.live/v5/pkg/config"
|
|
|
|
"m7s.live/v5/pkg/task"
|
|
|
|
"m7s.live/v5/pkg"
|
|
)
|
|
|
|
type (
|
|
IRecorder interface {
|
|
task.ITask
|
|
GetRecordJob() *RecordJob
|
|
}
|
|
RecorderFactory = func(config.Record) IRecorder
|
|
// RecordEvent 包含录像事件的公共字段
|
|
|
|
EventRecordStream struct {
|
|
*config.RecordEvent
|
|
RecordStream
|
|
}
|
|
RecordJob struct {
|
|
task.Job
|
|
Event *config.RecordEvent
|
|
StreamPath string // 对应本地流
|
|
Plugin *Plugin
|
|
Subscriber *Subscriber
|
|
SubConf *config.Subscribe
|
|
RecConf *config.Record
|
|
recorder IRecorder
|
|
}
|
|
DefaultRecorder struct {
|
|
task.Task
|
|
RecordJob RecordJob
|
|
Event EventRecordStream
|
|
}
|
|
RecordStream struct {
|
|
ID uint `gorm:"primarykey"`
|
|
StartTime time.Time `gorm:"default:NULL"`
|
|
EndTime time.Time `gorm:"default:NULL"`
|
|
Duration uint32 `gorm:"comment:录像时长;default:0"`
|
|
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255);comment:文件名"`
|
|
Type string `json:"type" desc:"录像文件类型" gorm:"type:varchar(255);comment:录像文件类型,flv,mp4,raw,fmp4,hls"`
|
|
FilePath string
|
|
StreamPath string
|
|
AudioCodec string
|
|
VideoCodec string
|
|
CreatedAt time.Time
|
|
DeletedAt gorm.DeletedAt `gorm:"index" yaml:"-"`
|
|
}
|
|
)
|
|
|
|
func (r *DefaultRecorder) GetRecordJob() *RecordJob {
|
|
return &r.RecordJob
|
|
}
|
|
|
|
func (r *DefaultRecorder) Start() (err error) {
|
|
return r.RecordJob.Subscribe()
|
|
}
|
|
|
|
func (r *DefaultRecorder) CreateStream(start time.Time, customFileName func(*RecordJob) string) (err error) {
|
|
recordJob := &r.RecordJob
|
|
sub := recordJob.Subscriber
|
|
r.Event.RecordStream = RecordStream{
|
|
StartTime: start,
|
|
StreamPath: sub.StreamPath,
|
|
FilePath: customFileName(recordJob),
|
|
Type: recordJob.RecConf.Type,
|
|
}
|
|
dir := filepath.Dir(r.Event.FilePath)
|
|
if err = os.MkdirAll(dir, 0755); err != nil {
|
|
return
|
|
}
|
|
if sub.Publisher.HasAudioTrack() {
|
|
r.Event.AudioCodec = sub.Publisher.AudioTrack.ICodecCtx.String()
|
|
}
|
|
if sub.Publisher.HasVideoTrack() {
|
|
r.Event.VideoCodec = sub.Publisher.VideoTrack.ICodecCtx.String()
|
|
}
|
|
if recordJob.Plugin.DB != nil {
|
|
if recordJob.Event != nil {
|
|
r.Event.RecordEvent = recordJob.Event
|
|
recordJob.Plugin.DB.Save(&r.Event)
|
|
} else {
|
|
recordJob.Plugin.DB.Save(&r.Event.RecordStream)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *DefaultRecorder) WriteTail(end time.Time, tailJob task.IJob) {
|
|
r.Event.EndTime = end
|
|
if r.RecordJob.Plugin.DB != nil {
|
|
// 将事件和录像记录关联
|
|
if r.RecordJob.Event != nil {
|
|
r.RecordJob.Plugin.DB.Save(&r.Event)
|
|
} else {
|
|
r.RecordJob.Plugin.DB.Save(&r.Event.RecordStream)
|
|
}
|
|
}
|
|
if tailJob == nil {
|
|
return
|
|
}
|
|
tailJob.AddTask(NewEventRecordCheck(r.Event.Type, r.Event.StreamPath, r.RecordJob.Plugin.DB))
|
|
}
|
|
|
|
func (p *RecordJob) GetKey() string {
|
|
return p.RecConf.FilePath
|
|
}
|
|
|
|
func (p *RecordJob) Subscribe() (err error) {
|
|
|
|
p.Subscriber, err = p.Plugin.SubscribeWithConfig(p.recorder.GetTask().Context, p.StreamPath, *p.SubConf)
|
|
return
|
|
}
|
|
|
|
func (p *RecordJob) Init(recorder IRecorder, plugin *Plugin, streamPath string, conf config.Record, subConf *config.Subscribe) *RecordJob {
|
|
p.Plugin = plugin
|
|
p.RecConf = &conf
|
|
p.StreamPath = streamPath
|
|
if subConf == nil {
|
|
conf := p.Plugin.config.Subscribe
|
|
subConf = &conf
|
|
}
|
|
subConf.SubType = SubscribeTypeVod
|
|
p.SubConf = subConf
|
|
p.recorder = recorder
|
|
p.SetDescriptions(task.Description{
|
|
"plugin": plugin.Meta.Name,
|
|
"streamPath": streamPath,
|
|
"filePath": conf.FilePath,
|
|
"append": conf.Append,
|
|
"fragment": conf.Fragment,
|
|
})
|
|
recorder.SetRetry(-1, time.Second)
|
|
if sender := plugin.getHookSender(config.HookOnRecordStart); sender != nil {
|
|
recorder.OnStart(func() {
|
|
webhookData := map[string]interface{}{
|
|
"event": config.HookOnRecordStart,
|
|
"streamPath": streamPath,
|
|
"filePath": conf.FilePath,
|
|
"pluginName": plugin.Meta.Name,
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
sender(config.HookOnRecordStart, webhookData)
|
|
})
|
|
}
|
|
|
|
if sender := plugin.getHookSender(config.HookOnRecordEnd); sender != nil {
|
|
recorder.OnDispose(func() {
|
|
webhookData := map[string]interface{}{
|
|
"event": config.HookOnRecordEnd,
|
|
"streamPath": streamPath,
|
|
"filePath": conf.FilePath,
|
|
"reason": recorder.StopReason().Error(),
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
sender(config.HookOnRecordEnd, webhookData)
|
|
})
|
|
}
|
|
|
|
plugin.Server.Records.Add(p, plugin.Logger.With("filePath", conf.FilePath, "streamPath", streamPath))
|
|
return p
|
|
}
|
|
|
|
func (p *RecordJob) Start() (err error) {
|
|
s := p.Plugin.Server
|
|
if _, ok := s.Records.Get(p.GetKey()); ok {
|
|
return pkg.ErrRecordSamePath
|
|
}
|
|
// dir := p.FilePath
|
|
// if p.Fragment == 0 || p.Append {
|
|
// dir = filepath.Dir(p.FilePath)
|
|
// }
|
|
// p.SetDescription("filePath", p.FilePath)
|
|
// if err = os.MkdirAll(dir, 0755); err != nil {
|
|
// return
|
|
// }
|
|
p.AddTask(p.recorder, p.Logger)
|
|
return
|
|
}
|
|
|
|
func NewEventRecordCheck(t string, streamPath string, db *gorm.DB) *eventRecordCheck {
|
|
return &eventRecordCheck{
|
|
DB: db,
|
|
streamPath: streamPath,
|
|
Type: t,
|
|
}
|
|
}
|
|
|
|
type eventRecordCheck struct {
|
|
task.Task
|
|
DB *gorm.DB
|
|
streamPath string
|
|
Type string
|
|
}
|
|
|
|
func (t *eventRecordCheck) Run() (err error) {
|
|
var eventRecordStreams []EventRecordStream
|
|
t.DB.Find(&eventRecordStreams, "type=? AND event_level=high AND stream_path=?", t.Type, t.streamPath) //搜索事件录像,且为重要事件(无法自动删除)
|
|
for _, recordStream := range eventRecordStreams {
|
|
t.DB.Model(&EventRecordStream{}).Where(`event_level=low AND start_time <= ? and end_time >= ?`, recordStream.EndTime, recordStream.StartTime).Update("event_level", config.EventLevelHigh)
|
|
}
|
|
return
|
|
}
|