1.mysql数据库配置未配置时,默认使用sqlite

2.当前该流有事件录像时,重复请求该流的事件录像,录像结束时间顺延,仅实现flv
This commit is contained in:
pg
2024-09-19 17:12:34 +08:00
parent d31d4a34ad
commit b15c776865
9 changed files with 161 additions and 33 deletions

65
flv.go
View File

@@ -2,11 +2,13 @@ package record
import ( import (
"fmt" "fmt"
"go.uber.org/zap/zapcore"
"io" "io"
"net" "net"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
@@ -21,10 +23,71 @@ type FLVRecorder struct {
times []float64 times []float64
Offset int64 Offset int64
duration int64 duration int64
timer *time.Timer
stopCh chan struct{}
mu sync.Mutex
}
// Goroutine 等待定时器停止录像
func (r *FLVRecorder) waitForStop(streamPath string) {
select {
case <-r.timer.C: // 定时器到期
r.StopTimerRecord(zap.String("reason", "timer expired"))
case <-r.stopCh: // 手动停止
return
}
}
// 停止定时录像
func (r *FLVRecorder) StopTimerRecord(reason ...zapcore.Field) {
r.mu.Lock()
defer r.mu.Unlock()
// 停止录像
r.Stop(reason...)
// 关闭 stop 通道,停止 Goroutine
close(r.stopCh)
}
// 重置定时器
func (r *FLVRecorder) resetTimer(timeout time.Duration) {
if r.timer != nil {
r.Info("事件录像", zap.String("timeout seconeds is reset to", fmt.Sprintf("%.0f", timeout.Seconds())))
r.timer.Reset(timeout)
} else {
r.Info("事件录像", zap.String("timeout seconeds is first set to", fmt.Sprintf("%.0f", timeout.Seconds())))
r.timer = time.NewTimer(timeout)
}
}
func (r *FLVRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
// 启动录像
if err := r.StartWithFileName(streamPath, fileName); err != nil {
return err
}
// 创建定时器
r.resetTimer(timeout)
// 启动 Goroutine 监听定时器
go r.waitForStop(streamPath)
return nil
}
func (r *FLVRecorder) UpdateTimeout(timeout time.Duration) {
r.mu.Lock()
defer r.mu.Unlock()
// 停止旧的定时器并重置
r.resetTimer(timeout)
} }
func NewFLVRecorder() (r *FLVRecorder) { func NewFLVRecorder() (r *FLVRecorder) {
r = &FLVRecorder{} r = &FLVRecorder{
stopCh: make(chan struct{}),
}
r.Record = RecordPluginConfig.Flv r.Record = RecordPluginConfig.Flv
return r return r
} }

11
fmp4.go
View File

@@ -5,6 +5,7 @@ import (
"github.com/Eyevinn/mp4ff/mp4" "github.com/Eyevinn/mp4ff/mp4"
. "m7s.live/engine/v4" . "m7s.live/engine/v4"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
"time"
) )
type mediaContext struct { type mediaContext struct {
@@ -43,6 +44,16 @@ type FMP4Recorder struct {
ftyp *mp4.FtypBox ftyp *mp4.FtypBox
} }
func (r *FMP4Recorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
//TODO implement me
panic("implement me")
}
func (r *FMP4Recorder) UpdateTimeout(timeout time.Duration) {
//TODO implement me
panic("implement me")
}
func NewFMP4Recorder() *FMP4Recorder { func NewFMP4Recorder() *FMP4Recorder {
r := &FMP4Recorder{} r := &FMP4Recorder{}
r.Record = RecordPluginConfig.Fmp4 r.Record = RecordPluginConfig.Fmp4

10
hls.go
View File

@@ -24,6 +24,16 @@ type HLSRecorder struct {
MemoryTs MemoryTs
} }
func (h *HLSRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
//TODO implement me
panic("implement me")
}
func (h *HLSRecorder) UpdateTimeout(timeout time.Duration) {
//TODO implement me
panic("implement me")
}
func NewHLSRecorder() (r *HLSRecorder) { func NewHLSRecorder() (r *HLSRecorder) {
r = &HLSRecorder{} r = &HLSRecorder{}
r.Record = RecordPluginConfig.Hls r.Record = RecordPluginConfig.Hls

16
main.go
View File

@@ -76,18 +76,22 @@ var exceptionChannel = make(chan *Exception)
func (conf *RecordConfig) OnEvent(event any) { func (conf *RecordConfig) OnEvent(event any) {
switch v := event.(type) { switch v := event.(type) {
case FirstConfig, config.Config: case FirstConfig, config.Config:
if conf.MysqlDSN == "" { //if conf.MysqlDSN == "" {
plugin.Error("mysqlDSN 数据库连接配置为空无法运行请在config.yaml里配置") // plugin.Error("mysqlDSN 数据库连接配置为空无法运行请在config.yaml里配置")
} //}
plugin.Info("mysqlDSN is" + conf.MysqlDSN)
go func() { //处理所有异常,录像中断异常、录像读取异常、录像导出文件中断、磁盘容量低于阈值异常、磁盘异常 go func() { //处理所有异常,录像中断异常、录像读取异常、录像导出文件中断、磁盘容量低于阈值异常、磁盘异常
for exception := range exceptionChannel { for exception := range exceptionChannel {
SendToThirdPartyAPI(exception) SendToThirdPartyAPI(exception)
} }
}() }()
initMysqlDB(conf.MysqlDSN) if conf.MysqlDSN == "" {
initSqliteDB(conf.SqliteDbPath) plugin.Info("sqliteDb filepath is" + conf.SqliteDbPath)
initSqliteDB(conf.SqliteDbPath)
} else {
plugin.Info("mysqlDSN is" + conf.MysqlDSN)
initMysqlDB(conf.MysqlDSN)
}
conf.Flv.Init() conf.Flv.Init()
conf.Mp4.Init() conf.Mp4.Init()
conf.Fmp4.Init() conf.Fmp4.Init()

11
mp4.go
View File

@@ -2,6 +2,7 @@ package record
import ( import (
"net" "net"
"time"
"github.com/yapingcat/gomedia/go-mp4" "github.com/yapingcat/gomedia/go-mp4"
"go.uber.org/zap" "go.uber.org/zap"
@@ -17,6 +18,16 @@ type MP4Recorder struct {
audioId uint32 audioId uint32
} }
func (r *MP4Recorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
//TODO implement me
panic("implement me")
}
func (r *MP4Recorder) UpdateTimeout(timeout time.Duration) {
//TODO implement me
panic("implement me")
}
func NewMP4Recorder() *MP4Recorder { func NewMP4Recorder() *MP4Recorder {
r := &MP4Recorder{} r := &MP4Recorder{}
r.Record = RecordPluginConfig.Mp4 r.Record = RecordPluginConfig.Mp4

11
raw.go
View File

@@ -5,6 +5,7 @@ import (
. "m7s.live/engine/v4" . "m7s.live/engine/v4"
"m7s.live/engine/v4/codec" "m7s.live/engine/v4/codec"
"m7s.live/engine/v4/track" "m7s.live/engine/v4/track"
"time"
) )
type RawRecorder struct { type RawRecorder struct {
@@ -12,6 +13,16 @@ type RawRecorder struct {
IsAudio bool IsAudio bool
} }
func (r *RawRecorder) StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error {
//TODO implement me
panic("implement me")
}
func (r *RawRecorder) UpdateTimeout(timeout time.Duration) {
//TODO implement me
panic("implement me")
}
func NewRawRecorder() (r *RawRecorder) { func NewRawRecorder() (r *RawRecorder) {
r = &RawRecorder{} r = &RawRecorder{}
r.Record = RecordPluginConfig.Raw r.Record = RecordPluginConfig.Raw

View File

@@ -3,18 +3,20 @@ package record
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"go.uber.org/zap"
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
var mu sync.Mutex
func errorJsonString(args map[string]interface{}) string { func errorJsonString(args map[string]interface{}) string {
resultJsonData := make(map[string]interface{}) resultJsonData := make(map[string]interface{})
for field, value := range args { for field, value := range args {
@@ -178,7 +180,8 @@ func (conf *RecordConfig) API_event_list(w http.ResponseWriter, r *http.Request)
// 事件录像 // 事件录像
func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request) { func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request) {
mu.Lock()
defer mu.Unlock()
token := r.Header.Get("token") token := r.Header.Get("token")
resultJsonData := make(map[string]interface{}) resultJsonData := make(map[string]interface{})
resultJsonData["code"] = -1 resultJsonData["code"] = -1
@@ -215,20 +218,6 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
util.ReturnError(-1, errorJsonString(resultJsonData), w, r) util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return return
} }
//var streamExist = false
//conf.recordings.Range(func(key, value any) bool {
// existStreamPath := value.(IRecorder).GetSubscriber().Stream.Path
// if existStreamPath == streamPath {
// resultJsonData["msg"] = "streamPath is exist"
// util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
// streamExist = true
// return !streamExist
// }
// return !streamExist
//})
//if streamExist {
// return
//}
eventId := eventRecordModel.EventId eventId := eventRecordModel.EventId
if eventId == "" { if eventId == "" {
resultJsonData["msg"] = "no eventId" resultJsonData["msg"] = "no eventId"
@@ -263,6 +252,16 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
fragment := eventRecordModel.Fragment fragment := eventRecordModel.Fragment
//var id string //var id string
irecorder := NewFLVRecorder() irecorder := NewFLVRecorder()
found := false
conf.recordings.Range(func(key, value any) bool {
tmpIRecorder := value.(*FLVRecorder)
existStreamPath := tmpIRecorder.GetSubscriber().Stream.Path
if existStreamPath == streamPath {
irecorder = tmpIRecorder
found = true
}
return found
})
recorder := irecorder.GetRecorder() recorder := irecorder.GetRecorder()
recorder.FileName = fileName recorder.FileName = fileName
recorder.append = false recorder.append = false
@@ -273,14 +272,19 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
recorder.Fragment = f recorder.Fragment = f
} }
} }
err = irecorder.StartWithFileName(streamPath, fileName)
go func() {
timer := time.NewTimer(30 * time.Second)
// 等待计时器到期 if found {
<-timer.C irecorder.UpdateTimeout(30 * time.Second)
irecorder.Stop(zap.String("reason", "api")) } else {
}() err = irecorder.StartWithDynamicTimeout(streamPath, fileName, 30*time.Second)
}
//go func() {
// timer := time.NewTimer(30 * time.Second)
//
// // 等待计时器到期
// <-timer.C
// irecorder.Stop(zap.String("reason", "api"))
//}()
//id = recorder.ID //id = recorder.ID
if err != nil { if err != nil {
exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", StreamPath: streamPath} exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", StreamPath: streamPath}

View File

@@ -16,6 +16,8 @@ func initSqliteDB(sqliteDbPath string) {
log.Fatal(err) log.Fatal(err)
} }
err = sqlitedb.AutoMigrate(&FLVKeyframe{}) err = sqlitedb.AutoMigrate(&FLVKeyframe{})
err = sqlitedb.AutoMigrate(&EventRecord{})
err = sqlitedb.AutoMigrate(&Exception{})
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@@ -4,7 +4,7 @@ import (
"bufio" "bufio"
"io" "io"
"path/filepath" "path/filepath"
"strconv" "strings"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
@@ -18,6 +18,8 @@ type IRecorder interface {
StartWithFileName(streamPath string, fileName string) error StartWithFileName(streamPath string, fileName string) error
io.Closer io.Closer
CreateFile() (FileWr, error) CreateFile() (FileWr, error)
StartWithDynamicTimeout(streamPath, fileName string, timeout time.Duration) error
UpdateTimeout(timeout time.Duration)
} }
type Recorder struct { type Recorder struct {
@@ -61,7 +63,7 @@ func (r *Recorder) getFileName(streamPath string) (filename string) {
filename = filepath.Join(filename, r.FileName) filename = filepath.Join(filename, r.FileName)
} }
} else { } else {
filename = filepath.Join(filename, strconv.FormatInt(time.Now().Unix(), 10)) filename = filepath.Join(filename, strings.ReplaceAll(streamPath, "/", "-")+"-"+time.Now().Format("2006-01-02-15-04-05"))
} }
return return
} }
@@ -114,6 +116,16 @@ func (r *Recorder) OnEvent(event any) {
r.cut(v.AbsTime) r.cut(v.AbsTime)
} }
case VideoFrame: case VideoFrame:
if v.IFrame {
//go func() { //将视频关键帧的数据存入sqlite数据库中
// var flvKeyfram = &FLVKeyframe{FLVFileName: r.Path + "/" + strings.ReplaceAll(r.filePath, "\\", "/"), FrameOffset: r.VideoReader, FrameAbstime: v.AbsTime}
// sqlitedb.Create(flvKeyfram)
//}()
r.Info("这是关键帧且取到了r.filePath是" + r.Path + r.filePath)
//r.Info("这是关键帧且取到了r.VideoReader.AbsTime是" + strconv.FormatUint(uint64(v.FrameAbstime), 10))
//r.Info("这是关键帧且取到了r.Offset是" + strconv.Itoa(int(v.FrameOffset)))
//r.Info("这是关键帧且取到了r.Offset是" + r.Stream.Path)
}
if r.Fragment > 0 && v.IFrame { if r.Fragment > 0 && v.IFrame {
r.cut(v.AbsTime) r.cut(v.AbsTime)
} }