diff --git a/flv.go b/flv.go index cb2fa33..8ff569d 100644 --- a/flv.go +++ b/flv.go @@ -2,11 +2,13 @@ package record import ( "fmt" + "go.uber.org/zap/zapcore" "io" "net" "os" "strconv" "strings" + "sync" "time" "go.uber.org/zap" @@ -21,10 +23,71 @@ type FLVRecorder struct { times []float64 Offset int64 duration int64 + timer *time.Timer + stopCh chan struct{} + mu sync.Mutex +} + +// Goroutine 等待定时器停止录像 +func (r *FLVRecorder) waitForStop(streamPath string) { + select { + case <-r.timer.C: // 定时器到期 + r.StopTimerRecord(zap.String("reason", "timer expired")) + case <-r.stopCh: // 手动停止 + return + } +} + +// 停止定时录像 +func (r *FLVRecorder) StopTimerRecord(reason ...zapcore.Field) { + r.mu.Lock() + defer r.mu.Unlock() + + // 停止录像 + r.Stop(reason...) + + // 关闭 stop 通道,停止 Goroutine + close(r.stopCh) +} + +// 重置定时器 +func (r *FLVRecorder) resetTimer(timeout time.Duration) { + if r.timer != nil { + r.Info("事件录像", zap.String("timeout seconeds is reset to", fmt.Sprintf("%.0f", timeout.Seconds()))) + r.timer.Reset(timeout) + } else { + r.Info("事件录像", zap.String("timeout seconeds is first set to", fmt.Sprintf("%.0f", timeout.Seconds()))) + r.timer = time.NewTimer(timeout) + } +} + +func (r *FLVRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { + // 启动录像 + if err := r.StartWithFileName(streamPath, fileName); err != nil { + return err + } + + // 创建定时器 + r.resetTimer(timeout) + + // 启动 Goroutine 监听定时器 + go r.waitForStop(streamPath) + + return nil +} + +func (r *FLVRecorder) UpdateTimeout(timeout time.Duration) { + r.mu.Lock() + defer r.mu.Unlock() + + // 停止旧的定时器并重置 + r.resetTimer(timeout) } func NewFLVRecorder() (r *FLVRecorder) { - r = &FLVRecorder{} + r = &FLVRecorder{ + stopCh: make(chan struct{}), + } r.Record = RecordPluginConfig.Flv return r } diff --git a/fmp4.go b/fmp4.go index 6f21c7b..a9c222f 100644 --- a/fmp4.go +++ b/fmp4.go @@ -5,6 +5,7 @@ import ( "github.com/Eyevinn/mp4ff/mp4" . "m7s.live/engine/v4" "m7s.live/engine/v4/codec" + "time" ) type mediaContext struct { @@ -43,6 +44,16 @@ type FMP4Recorder struct { ftyp *mp4.FtypBox } +func (r *FMP4Recorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { + //TODO implement me + panic("implement me") +} + +func (r *FMP4Recorder) UpdateTimeout(timeout time.Duration) { + //TODO implement me + panic("implement me") +} + func NewFMP4Recorder() *FMP4Recorder { r := &FMP4Recorder{} r.Record = RecordPluginConfig.Fmp4 diff --git a/hls.go b/hls.go index 254585a..aed4e9d 100644 --- a/hls.go +++ b/hls.go @@ -24,6 +24,16 @@ type HLSRecorder struct { MemoryTs } +func (h *HLSRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { + //TODO implement me + panic("implement me") +} + +func (h *HLSRecorder) UpdateTimeout(timeout time.Duration) { + //TODO implement me + panic("implement me") +} + func NewHLSRecorder() (r *HLSRecorder) { r = &HLSRecorder{} r.Record = RecordPluginConfig.Hls @@ -81,7 +91,7 @@ func (h *HLSRecorder) OnEvent(event any) { case AudioFrame: if h.tsStartTime == 0 { h.tsStartTime = v.AbsTime - } + } h.tsLastTime = v.AbsTime h.Recorder.OnEvent(event) pes := &mpegts.MpegtsPESFrame{ diff --git a/main.go b/main.go index 36cfdce..7ad7dc1 100644 --- a/main.go +++ b/main.go @@ -76,18 +76,22 @@ var exceptionChannel = make(chan *Exception) func (conf *RecordConfig) OnEvent(event any) { switch v := event.(type) { case FirstConfig, config.Config: - if conf.MysqlDSN == "" { - plugin.Error("mysqlDSN 数据库连接配置为空,无法运行,请在config.yaml里配置") - } - plugin.Info("mysqlDSN is" + conf.MysqlDSN) + //if conf.MysqlDSN == "" { + // plugin.Error("mysqlDSN 数据库连接配置为空,无法运行,请在config.yaml里配置") + //} go func() { //处理所有异常,录像中断异常、录像读取异常、录像导出文件中断、磁盘容量低于阈值异常、磁盘异常 for exception := range exceptionChannel { SendToThirdPartyAPI(exception) } }() - initMysqlDB(conf.MysqlDSN) - initSqliteDB(conf.SqliteDbPath) + if conf.MysqlDSN == "" { + plugin.Info("sqliteDb filepath is" + conf.SqliteDbPath) + initSqliteDB(conf.SqliteDbPath) + } else { + plugin.Info("mysqlDSN is" + conf.MysqlDSN) + initMysqlDB(conf.MysqlDSN) + } conf.Flv.Init() conf.Mp4.Init() conf.Fmp4.Init() diff --git a/mp4.go b/mp4.go index fe76443..3d75186 100644 --- a/mp4.go +++ b/mp4.go @@ -2,6 +2,7 @@ package record import ( "net" + "time" "github.com/yapingcat/gomedia/go-mp4" "go.uber.org/zap" @@ -17,6 +18,16 @@ type MP4Recorder struct { audioId uint32 } +func (r *MP4Recorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { + //TODO implement me + panic("implement me") +} + +func (r *MP4Recorder) UpdateTimeout(timeout time.Duration) { + //TODO implement me + panic("implement me") +} + func NewMP4Recorder() *MP4Recorder { r := &MP4Recorder{} r.Record = RecordPluginConfig.Mp4 diff --git a/raw.go b/raw.go index e62dd80..2ee9179 100644 --- a/raw.go +++ b/raw.go @@ -5,6 +5,7 @@ import ( . "m7s.live/engine/v4" "m7s.live/engine/v4/codec" "m7s.live/engine/v4/track" + "time" ) type RawRecorder struct { @@ -12,6 +13,16 @@ type RawRecorder struct { IsAudio bool } +func (r *RawRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error { + //TODO implement me + panic("implement me") +} + +func (r *RawRecorder) UpdateTimeout(timeout time.Duration) { + //TODO implement me + panic("implement me") +} + func NewRawRecorder() (r *RawRecorder) { r = &RawRecorder{} r.Record = RecordPluginConfig.Raw diff --git a/restful_event.go b/restful_event.go index b13ea86..d56af01 100644 --- a/restful_event.go +++ b/restful_event.go @@ -3,18 +3,20 @@ package record import ( "encoding/json" "fmt" - "go.uber.org/zap" "io" "io/ioutil" "log" "net/http" "strconv" "strings" + "sync" "time" "m7s.live/engine/v4/util" ) +var mu sync.Mutex + func errorJsonString(args map[string]interface{}) string { resultJsonData := make(map[string]interface{}) for field, value := range args { @@ -178,7 +180,8 @@ func (conf *RecordConfig) API_event_list(w http.ResponseWriter, r *http.Request) // 事件录像 func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request) { - + mu.Lock() + defer mu.Unlock() token := r.Header.Get("token") resultJsonData := make(map[string]interface{}) resultJsonData["code"] = -1 @@ -215,20 +218,6 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request util.ReturnError(-1, errorJsonString(resultJsonData), w, r) return } - //var streamExist = false - //conf.recordings.Range(func(key, value any) bool { - // existStreamPath := value.(IRecorder).GetSubscriber().Stream.Path - // if existStreamPath == streamPath { - // resultJsonData["msg"] = "streamPath is exist" - // util.ReturnError(-1, errorJsonString(resultJsonData), w, r) - // streamExist = true - // return !streamExist - // } - // return !streamExist - //}) - //if streamExist { - // return - //} eventId := eventRecordModel.EventId if eventId == "" { resultJsonData["msg"] = "no eventId" @@ -263,6 +252,16 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request fragment := eventRecordModel.Fragment //var id string irecorder := NewFLVRecorder() + found := false + conf.recordings.Range(func(key, value any) bool { + tmpIRecorder := value.(*FLVRecorder) + existStreamPath := tmpIRecorder.GetSubscriber().Stream.Path + if existStreamPath == streamPath { + irecorder = tmpIRecorder + found = true + } + return found + }) recorder := irecorder.GetRecorder() recorder.FileName = fileName recorder.append = false @@ -273,14 +272,19 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request recorder.Fragment = f } } - err = irecorder.StartWithFileName(streamPath, fileName) - go func() { - timer := time.NewTimer(30 * time.Second) - // 等待计时器到期 - <-timer.C - irecorder.Stop(zap.String("reason", "api")) - }() + if found { + irecorder.UpdateTimeout(30 * time.Second) + } else { + err = irecorder.StartWithDynamicTimeout(streamPath, fileName, 30*time.Second) + } + //go func() { + // timer := time.NewTimer(30 * time.Second) + // + // // 等待计时器到期 + // <-timer.C + // irecorder.Stop(zap.String("reason", "api")) + //}() //id = recorder.ID if err != nil { exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", StreamPath: streamPath} diff --git a/sqlitedb.go b/sqlitedb.go index 7982e26..6f087c6 100644 --- a/sqlitedb.go +++ b/sqlitedb.go @@ -16,6 +16,8 @@ func initSqliteDB(sqliteDbPath string) { log.Fatal(err) } err = sqlitedb.AutoMigrate(&FLVKeyframe{}) + err = sqlitedb.AutoMigrate(&EventRecord{}) + err = sqlitedb.AutoMigrate(&Exception{}) if err != nil { log.Fatal(err) } diff --git a/subscriber.go b/subscriber.go index 6215b1a..b68a076 100644 --- a/subscriber.go +++ b/subscriber.go @@ -4,7 +4,7 @@ import ( "bufio" "io" "path/filepath" - "strconv" + "strings" "time" "go.uber.org/zap" @@ -18,6 +18,8 @@ type IRecorder interface { StartWithFileName(streamPath string, fileName string) error io.Closer CreateFile() (FileWr, error) + StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error + UpdateTimeout(timeout time.Duration) } type Recorder struct { @@ -61,7 +63,7 @@ func (r *Recorder) getFileName(streamPath string) (filename string) { filename = filepath.Join(filename, r.FileName) } } else { - filename = filepath.Join(filename, strconv.FormatInt(time.Now().Unix(), 10)) + filename = filepath.Join(filename, strings.ReplaceAll(streamPath, "/", "-")+"-"+time.Now().Format("2006-01-02-15-04-05")) } return } @@ -114,6 +116,16 @@ func (r *Recorder) OnEvent(event any) { r.cut(v.AbsTime) } case VideoFrame: + if v.IFrame { + //go func() { //将视频关键帧的数据存入sqlite数据库中 + // var flvKeyfram = &FLVKeyframe{FLVFileName: r.Path + "/" + strings.ReplaceAll(r.filePath, "\\", "/"), FrameOffset: r.VideoReader, FrameAbstime: v.AbsTime} + // sqlitedb.Create(flvKeyfram) + //}() + r.Info("这是关键帧,且取到了r.filePath是" + r.Path + r.filePath) + //r.Info("这是关键帧,且取到了r.VideoReader.AbsTime是" + strconv.FormatUint(uint64(v.FrameAbstime), 10)) + //r.Info("这是关键帧,且取到了r.Offset是" + strconv.Itoa(int(v.FrameOffset))) + //r.Info("这是关键帧,且取到了r.Offset是" + r.Stream.Path) + } if r.Fragment > 0 && v.IFrame { r.cut(v.AbsTime) }