diff --git a/README.md b/README.md index 0950bc3..42269ed 100644 --- a/README.md +++ b/README.md @@ -63,3 +63,9 @@ record: - `http://localhost:8080/record/live/test.flv` 将会读取对应的flv文件 - `http://localhost:8080/record/live/test.mp4` 将会读取对应的fmp4文件 + +// GO仓库刷新INDEX + +GOPROXY=proxy.golang.org go list -m github.com/eanfs/plugin-record/v4@v4.9.3 + +GOPROXY=proxy.golang.org go list -m github.com/eanfs/plugin-transform/v1@v1.0.0 diff --git a/checker.go b/checker.go new file mode 100644 index 0000000..8e4a3a5 --- /dev/null +++ b/checker.go @@ -0,0 +1,99 @@ +package record + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "go.uber.org/zap" +) + +// 查询所有正在录制中的记录 +func (conf *RecordConfig) CheckRecordDB() { + go func() { + for { + plugin.Info("启动录制检查器,开始检查未在录制的视频流信息:", zap.Time("start", time.Now())) + var outRecordings []any + var eventRecords []EventRecord + + var recordings []any + conf.recordings.Range(func(key, value any) bool { + recordings = append(recordings, value) + return true + }) + err = db.Where("is_delete = ?", "0").Find(&eventRecords).Error + if err != nil { + plugin.Error("查询数据库失败 %s", zap.Error(err)) + return + } else { + plugin.Info("DB中处于录制中的数量:", zap.Int("recordings", len(eventRecords))) + // 查询出在eventRecords中fileName字段包含recordings中的filename的记录 + // 遍历eventRecords,判断fileName字段是否包含recordings中的filename + for _, record := range eventRecords { + plugin.Info("DB中处于录制中的流内容:", zap.String("recording-Id", record.RecId), zap.String("record-Filepath", record.Filepath), zap.String("record-Filename", record.Filename)) + // 如果 recordings 为空,将所有的录制流都添加到 outRecordings + if len(recordings) == 0 { + outRecordings = append(outRecordings, record) + } else { + // 遍历 recordings,判断是否有包含当前 record.Filename 的录制流 + found := false + for _, recording := range recordings { + if strings.Contains(recording.(IRecorder).GetRecorder().ID, record.RecId) { + found = true + break + } + } + // 如果没有找到,则添加到 outRecordings + if !found { + outRecordings = append(outRecordings, record) + } + } + } + // 打印未在录制的视频流 + for _, outRecording := range outRecordings { + streamPath := outRecording.(EventRecord).StreamPath + fileName := outRecording.(EventRecord).Filename + plugin.Info("已停止录制的视频流", zap.Any("StreamPath", streamPath), zap.Any("Filename", fileName)) + + exception := Exception{AlarmType: "Recording-Stop", AlarmDesc: "Recording Stopped Exception", StreamPath: streamPath, FileName: fileName} + callback(&exception) + } + } + + // 等待 1 分钟后继续执行 + <-time.After(60 * time.Second) + } + }() + +} + +// 向第三方发送异常报警 +func callback(exception *Exception) { + exception.CreateTime = time.Now().Format("2006-01-02 15:04:05") + exception.ServerIP = RecordPluginConfig.LocalIp + data, err := json.Marshal(exception) + if err != nil { + fmt.Println("Error marshalling exception:", err) + return + } + err = db.Create(&exception).Error + if err != nil { + fmt.Println("异常数据插入数据库失败:", err) + return + } + resp, err := http.Post(RecordPluginConfig.ExceptionPostUrl, "application/json", bytes.NewBuffer(data)) + if err != nil { + fmt.Println("Error sending exception to third party API:", err) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + fmt.Println("Failed to send exception, status code:", resp.StatusCode) + } else { + fmt.Println("Exception sent successfully!") + } +} diff --git a/config.go b/config.go index 80e2af2..125d6a1 100644 --- a/config.go +++ b/config.go @@ -52,11 +52,12 @@ type VideoFileInfo struct { } type Record struct { - Ext string `desc:"文件扩展名"` //文件扩展名 - Path string `desc:"存储文件的目录"` //存储文件的目录 - AutoRecord bool `desc:"是否自动录制"` //是否自动录制 - Filter config.Regexp `desc:"录制过滤器"` //录制过滤器 - Fragment time.Duration `desc:"分片大小,0表示不分片"` //分片大小,0表示不分片 + Ext string `desc:"文件扩展名"` //文件扩展名 + Path string `desc:"存储文件的目录"` //存储文件的目录 + AutoRecord bool `desc:"是否自动录制"` //是否自动录制 + Filter config.Regexp `desc:"录制过滤器"` //录制过滤器 + Fragment time.Duration `desc:"分片大小,0表示不分片"` //分片大小,0表示不分片 + Duration time.Duration `desc:"视频最大录制时长,0表示不限制"` //分片大小,0表示不分片 http.Handler `json:"-" yaml:"-"` CreateFileFn func(filename string, append bool) (FileWr, error) `json:"-" yaml:"-"` GetDurationFn func(file io.ReadSeeker) uint32 `json:"-" yaml:"-"` diff --git a/entity.go b/entity.go index 78567e6..9814572 100644 --- a/entity.go +++ b/entity.go @@ -6,7 +6,7 @@ import "time" type EventRecord struct { Id uint `json:"id" desc:"自增长id" gorm:"primaryKey;autoIncrement"` StreamPath string `json:"streamPath" desc:"流路径" gorm:"type:varchar(255);comment:流路径"` - EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255);comment:事件编号"` + RecId string `json:"RecId" desc:"录制编号" gorm:"type:varchar(255);comment:录制编号"` RecordMode string `json:"recordMode" desc:"事件类型,0=连续录像模式,1=事件录像模式" gorm:"type:varchar(255);comment:事件类型,0=连续录像模式,1=事件录像模式"` EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255);comment:事件名称"` BeforeDuration string `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:varchar(255);comment:事件前缓存时长"` @@ -37,6 +37,7 @@ type Exception struct { AlarmDesc string `json:"alarmDesc" gorm:"type:varchar(50)"` ServerIP string `json:"serverIP" gorm:"type:varchar(50)"` StreamPath string `json:"streamPath" gorm:"type:varchar(50)"` + FileName string `json:"fileName" gorm:"type:varchar(100)"` } // sqlite数据库用来存放每个flv文件的关键帧对应的offset及abstime数据 diff --git a/flv.go b/flv.go index 8534c83..f206927 100644 --- a/flv.go +++ b/flv.go @@ -2,7 +2,6 @@ package record import ( "fmt" - "go.uber.org/zap/zapcore" "io" "net" "os" @@ -10,6 +9,8 @@ import ( "sync" "time" + "go.uber.org/zap/zapcore" + "go.uber.org/zap" . "m7s.live/engine/v4" "m7s.live/engine/v4/codec" @@ -114,7 +115,7 @@ func (r *FLVRecorder) Start(streamPath string) (err error) { } func (r *FLVRecorder) StartWithFileName(streamPath string, fileName string) error { - r.ID = fmt.Sprintf("%s/flv/%s", streamPath, r.GetRecordModeString(r.RecordMode)) + r.ID = fmt.Sprintf("%s/flv/%s", streamPath, fileName) return r.start(r, streamPath, SUBTYPE_FLV) } @@ -296,7 +297,7 @@ func (r *FLVRecorder) OnEvent(event any) { } } -func (r *FLVRecorder) Close() error { +func (r *FLVRecorder) Close() (err error) { if r.File != nil { if !r.append { go func() { @@ -318,7 +319,14 @@ func (r *FLVRecorder) Close() error { go r.writeMetaData(r.File, r.duration) } else { plugin.Info("====into close append true===recordid is===" + r.ID + "====record type is " + r.GetRecordModeString(r.RecordMode)) - return r.File.Close() + err = r.File.Close() + if err != nil { + r.Error("FLV File Close", zap.Error(err)) + } else { + r.Info("FLV File Close", zap.Error(err)) + go r.UploadFile(r.Path, r.filePath) + } + return err } } return nil diff --git a/fmp4.go b/fmp4.go index 80d5db1..f0680b5 100644 --- a/fmp4.go +++ b/fmp4.go @@ -3,6 +3,7 @@ package record import ( "github.com/Eyevinn/mp4ff/aac" "github.com/Eyevinn/mp4ff/mp4" + "go.uber.org/zap" . "m7s.live/engine/v4" "m7s.live/engine/v4/codec" "time" @@ -67,6 +68,7 @@ func (r *FMP4Recorder) UpdateTimeout(timeout time.Duration) { func NewFMP4Recorder() *FMP4Recorder { r := &FMP4Recorder{} r.Record = RecordPluginConfig.Fmp4 + r.Storage = RecordPluginConfig.Storage return r } @@ -80,7 +82,7 @@ func (r *FMP4Recorder) StartWithFileName(streamPath string, fileName string) err return r.start(r, streamPath, SUBTYPE_RAW) } -func (r *FMP4Recorder) Close() error { +func (r *FMP4Recorder) Close() (err error) { if r.File != nil { if r.video.fragment != nil { r.video.fragment.Encode(r.File) @@ -90,7 +92,14 @@ func (r *FMP4Recorder) Close() error { r.audio.fragment.Encode(r.File) r.audio.fragment = nil } - r.File.Close() + err = r.File.Close() + if err != nil { + r.Error("Fmp4 File Close", zap.Error(err)) + } else { + r.Info("Fmp4 File Close", zap.Error(err)) + go r.UploadFile(r.Path, r.filePath) + } + return err } return nil } diff --git a/go.mod b/go.mod index 56fa107..396b1b7 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,14 @@ -module m7s.live/plugin/record/v4 +module github.com/eanfs/plugin-record/v4 go 1.19 require ( github.com/Eyevinn/mp4ff v0.40.1 github.com/glebarez/sqlite v1.11.0 - github.com/shirou/gopsutil/v3 v3.23.8 - github.com/yapingcat/gomedia v0.0.0-20230905155010-55b9713fcec1 - go.uber.org/zap v1.26.0 + github.com/minio/minio-go/v7 v7.0.66 + github.com/shirou/gopsutil/v3 v3.24.2 + github.com/yapingcat/gomedia v0.0.0-20240906162731-17feea57090c + go.uber.org/zap v1.27.0 gorm.io/driver/mysql v1.5.7 gorm.io/gorm v1.25.12 m7s.live/engine/v4 v4.15.2 @@ -15,13 +16,13 @@ require ( ) require ( - github.com/abema/go-mp4 v1.1.1 // indirect + github.com/abema/go-mp4 v1.2.0 // indirect github.com/aler9/writerseeker v1.1.0 // indirect github.com/asticode/go-astikit v0.30.0 // indirect github.com/asticode/go-astits v1.13.0 // indirect github.com/bluenviron/gohlslib v1.0.0 // indirect - github.com/bluenviron/gortsplib/v4 v4.6.2 // indirect - github.com/bluenviron/mediacommon v1.5.1 // indirect + github.com/bluenviron/gortsplib/v4 v4.8.0 // indirect + github.com/bluenviron/mediacommon v1.9.2 // indirect github.com/deepch/vdk v0.0.27 // indirect github.com/denisbrodbeck/machineid v1.0.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect @@ -31,37 +32,47 @@ require ( github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/golang/mock v1.6.0 // indirect github.com/google/pprof v0.0.0-20230309165930-d61513b1440d // indirect - github.com/google/uuid v1.4.0 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.4 // indirect + github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/logrusorgru/aurora/v4 v4.0.0 // indirect github.com/lufia/plan9stats v0.0.0-20230326075908-cb1d2100619a // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/mcuadros/go-defaults v1.2.0 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/onsi/ginkgo/v2 v2.9.5 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtp v1.8.3 // indirect - github.com/pion/webrtc/v3 v3.2.20 // indirect + github.com/pion/webrtc/v3 v3.2.29 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/q191201771/naza v0.30.48 // indirect github.com/quangngotan95/go-m3u8 v0.1.0 // indirect github.com/quic-go/qtls-go1-20 v0.3.3 // indirect - github.com/quic-go/quic-go v0.38.1 // indirect + github.com/quic-go/quic-go v0.41.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/rs/xid v1.5.0 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect - github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.16.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect + golang.org/x/crypto v0.21.0 // indirect + golang.org/x/exp v0.0.0-20240314144324-c7f7c6466f7f // indirect golang.org/x/mod v0.12.0 // indirect - golang.org/x/net v0.19.0 // indirect - golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/net v0.22.0 // indirect + golang.org/x/sync v0.6.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.13.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect modernc.org/libc v1.22.5 // indirect modernc.org/mathutil v1.5.0 // indirect diff --git a/go.sum b/go.sum index f3a411a..dcf39ad 100644 --- a/go.sum +++ b/go.sum @@ -68,19 +68,25 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20230309165930-d61513b1440d h1:um9/pc7tKMINFfP1eE7Wv6PRGXlcCSJkVajF7KJw3uQ= github.com/google/pprof v0.0.0-20230309165930-d61513b1440d/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= -github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= +github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= @@ -96,8 +102,17 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc= github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.66 h1:bnTOXOHjOqv/gcMuiVbN9o2ngRItvqE774dG9nq0Dzw= +github.com/minio/minio-go/v7 v7.0.66/go.mod h1:DHAgmyQEGdW3Cif0UooKOyrT3Vxs82zNdV6tkKhRtbs= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -158,6 +173,8 @@ github.com/quic-go/quic-go v0.38.1/go.mod h1:ijnZM7JsFIkp4cRyjxJNIzdSfCLmUMg9wdy github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/shirou/gopsutil/v3 v3.23.8 h1:xnATPiybo6GgdRoC4YoGnxXZFRc3dqQTGi73oLvvBrE= github.com/shirou/gopsutil/v3 v3.23.8/go.mod h1:7hmCaBn+2ZwaZOr6jmPBZDfawwMGuo1id3C6aM8EDqQ= @@ -165,12 +182,16 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= @@ -259,6 +280,7 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -319,6 +341,8 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/src-d/go-billy.v4 v4.3.2 h1:0SQA1pRztfTFx2miS8sA97XvooFeNOmvUenF4o0EcVg= gopkg.in/src-d/go-billy.v4 v4.3.2/go.mod h1:nDjArDMp+XMs1aFAESLRjfGSgfvoYN0hDfzEk0GjC98= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/hls.go b/hls.go index f4bd4b9..2106372 100644 --- a/hls.go +++ b/hls.go @@ -47,6 +47,7 @@ func (h *HLSRecorder) UpdateTimeout(timeout time.Duration) { func NewHLSRecorder() (r *HLSRecorder) { r = &HLSRecorder{} r.Record = RecordPluginConfig.Hls + r.Storage = RecordPluginConfig.Storage return r } @@ -69,8 +70,14 @@ func (r *HLSRecorder) Close() (err error) { r.playlist.WriteInf(inf) r.tsStartTime = 0 err = r.File.Close() + if err != nil { + r.Error("HLS File Close", zap.Error(err)) + } else { + r.Info("HLS File Close", zap.Error(err)) + go r.UploadFile(r.Path, r.filePath) + } } - return + return err } func (h *HLSRecorder) OnEvent(event any) { var err error diff --git a/main.go b/main.go index d08ed7f..d0233a4 100644 --- a/main.go +++ b/main.go @@ -4,16 +4,17 @@ import ( _ "embed" "errors" "fmt" - "gorm.io/gorm" "io" - . "m7s.live/engine/v4" - "m7s.live/engine/v4/codec" - "m7s.live/engine/v4/config" - "m7s.live/engine/v4/util" "net" "os" "sync" "time" + + "gorm.io/gorm" + . "m7s.live/engine/v4" + "m7s.live/engine/v4/codec" + "m7s.live/engine/v4/config" + "m7s.live/engine/v4/util" ) type RecordConfig struct { @@ -26,15 +27,16 @@ type RecordConfig struct { Raw Record `desc:"视频裸流录制配置"` RawAudio Record `desc:"音频裸流录制配置"` recordings sync.Map - beforeDuration int `desc:"事件前缓存时长"` - afterDuration int `desc:"事件后缓存时长"` - MysqlDSN string `desc:"mysql数据库连接字符串"` - ExceptionPostUrl string `desc:"第三方异常上报地址"` - SqliteDbPath string `desc:"sqlite数据库路径"` - DiskMaxPercent float64 `desc:"硬盘使用百分之上限值,超过后报警"` - LocalIp string `desc:"本机IP"` - RecordFileExpireDays int `desc:"录像自动删除的天数,0或未设置表示不自动删除"` - RecordPathNotShowStreamPath bool `desc:"录像路径中是否包含streamPath,默认true"` + beforeDuration int `desc:"事件前缓存时长"` + afterDuration int `desc:"事件后缓存时长"` + MysqlDSN string `desc:"mysql数据库连接字符串"` + ExceptionPostUrl string `desc:"第三方异常上报地址"` + SqliteDbPath string `desc:"sqlite数据库路径"` + DiskMaxPercent float64 `desc:"硬盘使用百分之上限值,超过后报警"` + LocalIp string `desc:"本机IP"` + RecordFileExpireDays int `desc:"录像自动删除的天数,0或未设置表示不自动删除"` + RecordPathNotShowStreamPath bool `desc:"录像路径中是否包含streamPath,默认true"` + Storage StorageConfig `desc:"MINIO 配置"` } //go:embed default.yaml @@ -70,7 +72,7 @@ var RecordPluginConfig = &RecordConfig{ afterDuration: 30, MysqlDSN: "", ExceptionPostUrl: "http://www.163.com", - SqliteDbPath: "./sqlite.db", + SqliteDbPath: "./m7sv4.db", DiskMaxPercent: 80.00, LocalIp: getLocalIP(), RecordFileExpireDays: 0, @@ -109,15 +111,15 @@ func (conf *RecordConfig) OnEvent(event any) { var eventRecords []EventRecord expireTime := time.Now().AddDate(0, 0, -conf.RecordFileExpireDays) // 创建包含查询条件的 EventRecord 对象 - queryRecord := EventRecord{ - EventLevel: "1", // 查询条件:event_level = 1 - } - fmt.Printf(" Create Time: %s\n", expireTime.Format("2006-01-02 15:04:05")) - err = db.Where(&queryRecord).Where("create_time < ?", expireTime).Find(&eventRecords).Error + // queryRecord := EventRecord{ + // IsDelete: "0", // 查询条件:is_delete = 1 + // } + fmt.Printf(" 进行录像文件自动删除: 即将删除创建时间小于 %s 的录像文件。\n", expireTime.Format("2006-01-02 15:04:05")) + err = db.Where("create_time < ?", expireTime).Find(&eventRecords).Error if err == nil { if len(eventRecords) > 0 { for _, record := range eventRecords { - fmt.Printf("ID: %d, Create Time: %s,filepath is %s\n", record.Id, record.CreateTime, record.Filepath) + fmt.Printf("执行删除 录像ID: %d, 创建时间: %s, 录像文件: %s\n", record.RecId, record.CreateTime, record.Filepath) err = os.Remove(record.Filepath) if err != nil { fmt.Println("error is " + err.Error()) @@ -135,6 +137,9 @@ func (conf *RecordConfig) OnEvent(event any) { } }() } + //检查录像任务是否存在,不存在则启动 + conf.CheckRecordDB() + conf.Flv.Init() conf.Mp4.Init() conf.Fmp4.Init() diff --git a/mp4.go b/mp4.go index 3df983c..ca10346 100644 --- a/mp4.go +++ b/mp4.go @@ -2,8 +2,6 @@ package record import ( "net" - "os" - "path/filepath" "time" "github.com/yapingcat/gomedia/go-mp4" @@ -43,6 +41,7 @@ func (r *MP4Recorder) UpdateTimeout(timeout time.Duration) { func NewMP4Recorder() *MP4Recorder { r := &MP4Recorder{} r.Record = RecordPluginConfig.Mp4 + r.Storage = RecordPluginConfig.Storage return r } @@ -58,31 +57,24 @@ func (r *MP4Recorder) StartWithFileName(streamPath string, fileName string) erro func (r *MP4Recorder) Close() (err error) { if r.File != nil { - if !isWrifeFrame { - fullPath := filepath.Join(r.Path, "/", r.filePath) - go func(f FileWr) { - err = r.File.Close() - err = os.Remove(fullPath) - if err != nil { - r.Info("未写入帧,文件为空,直接删除,删除结果为=======" + err.Error()) - } - }(r.File) + err = r.Movmuxer.WriteTrailer() + if err != nil { + r.Error("mp4 write trailer", zap.Error(err)) } else { - go func(f FileWr) { - err = r.Movmuxer.WriteTrailer() - if err != nil { - r.Error("mp4 write trailer", zap.Error(err)) - } else { - // _, err = r.file.Write(r.cache.buf) - r.Info("mp4 write trailer", zap.Error(err)) - } - err = f.Close() - }(r.File) + // _, err = r.file.Write(r.cache.buf) + r.Info("mp4 write trailer", zap.Error(err)) + } + err = r.File.Close() + if err != nil { + r.Error("mp4 File Close", zap.Error(err)) + } else { + r.Info("mp4 File Close", zap.Error(err)) + go r.UploadFile(r.Path, r.filePath) } } - isWrifeFrame = false return } + func (r *MP4Recorder) setTracks() { if r.Audio != nil { switch r.Audio.CodecID { diff --git a/raw.go b/raw.go index 885c6d7..685b2cd 100644 --- a/raw.go +++ b/raw.go @@ -36,6 +36,7 @@ func (r *RawRecorder) UpdateTimeout(timeout time.Duration) { func NewRawRecorder() (r *RawRecorder) { r = &RawRecorder{} r.Record = RecordPluginConfig.Raw + r.Storage = RecordPluginConfig.Storage return r } @@ -64,6 +65,12 @@ func (r *RawRecorder) StartWithFileName(streamPath string, fileName string) erro func (r *RawRecorder) Close() (err error) { if r.File != nil { err = r.File.Close() + if err != nil { + r.Error("Raw File Close", zap.Error(err)) + } else { + r.Info("Raw File Close", zap.Error(err)) + go r.UploadFile(r.Path, r.filePath) + } } return } diff --git a/restful.go b/restful.go index 7949f35..7f4c867 100644 --- a/restful.go +++ b/restful.go @@ -53,6 +53,7 @@ func (conf *RecordConfig) API_start(w http.ResponseWriter, r *http.Request) { return } t := query.Get("type") + duration := query.Get("duration") var id string var err error var irecorder IRecorder @@ -94,6 +95,13 @@ func (conf *RecordConfig) API_start(w http.ResponseWriter, r *http.Request) { util.ReturnError(util.APIErrorInternal, err.Error(), w, r) return } + + // 设置录制时长限制 + if duration != "" { + if d, err := time.ParseDuration(duration); err == nil { + recorder.Duration = d + } + } util.ReturnError(util.APIErrorNone, id, w, r) } @@ -108,7 +116,8 @@ func (conf *RecordConfig) API_list_recording(w http.ResponseWriter, r *http.Requ } func (conf *RecordConfig) API_stop(w http.ResponseWriter, r *http.Request) { - if recorder, ok := conf.recordings.Load(r.URL.Query().Get("id")); ok { + id := r.URL.Query().Get("id") + if recorder, ok := conf.recordings.Load(id); ok { recorder.(ISubscriber).Stop(zap.String("reason", "api")) util.ReturnOK(w, r) return @@ -248,12 +257,12 @@ func (conf *RecordConfig) API_list_recording_page(w http.ResponseWriter, r *http query := r.URL.Query() pageSize := query.Get("pageSize") pageNum := query.Get("pageNum") - ID := query.Get("ID") //搜索条件 + RecId := query.Get("id") //搜索条件 var outRecordings []any var totalPageCount int = 1 - if ID != "" { + if RecId != "" { for _, record := range recordings { - if strings.Contains(record.(IRecorder).GetRecorder().ID, ID) { + if record.(IRecorder).GetRecorder().ID == RecId { outRecordings = append(outRecordings, record) } } diff --git a/restful_event.go b/restful_event.go index 0c58bea..7146466 100644 --- a/restful_event.go +++ b/restful_event.go @@ -212,13 +212,15 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request util.ReturnError(-1, errorJsonString(resultJsonData), w, r) return } + //TODO 获取到磁盘容量低,磁盘报错的情况下需要报异常,并且根据事件类型做出处理 - if getDisckException(streamPath) { - resultJsonData["msg"] = "disk is full" - util.ReturnError(-1, errorJsonString(resultJsonData), w, r) - return - } - eventId := eventRecordModel.EventId + // if getDisckException(streamPath) { + // resultJsonData["msg"] = "disk is full" + // util.ReturnError(-1, errorJsonString(resultJsonData), w, r) + // return + // } + + eventId := eventRecordModel.RecId if eventId == "" { resultJsonData["msg"] = "no eventId" util.ReturnError(-1, errorJsonString(resultJsonData), w, r) @@ -292,11 +294,11 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request // 定义 User 结构体作为查询条件 queryRecord := EventRecord{StreamPath: streamPath} db.Where(&queryRecord).Order("id DESC").First(&oldeventRecord) - eventRecord = EventRecord{StreamPath: streamPath, EventId: eventId, RecordMode: "1", EventName: eventName, BeforeDuration: beforeDuration, + eventRecord = EventRecord{StreamPath: streamPath, RecId: eventId, RecordMode: "1", EventName: eventName, BeforeDuration: beforeDuration, AfterDuration: afterDuration, CreateTime: recordTime, StartTime: startTime, EndTime: endTime, Filepath: oldeventRecord.Filepath, Filename: oldeventRecord.Filename, EventDesc: eventRecordModel.EventDesc, Urlpath: oldeventRecord.Urlpath, Type: t} } else { - eventRecord = EventRecord{StreamPath: streamPath, EventId: eventId, RecordMode: "1", EventName: eventName, BeforeDuration: beforeDuration, + eventRecord = EventRecord{StreamPath: streamPath, RecId: eventId, RecordMode: "1", EventName: eventName, BeforeDuration: beforeDuration, AfterDuration: afterDuration, CreateTime: recordTime, StartTime: startTime, EndTime: endTime, Filepath: filepath, Filename: fileName + recorder.Ext, EventDesc: eventRecordModel.EventDesc, Urlpath: urlpath, Type: t} } err = db.Omit("id", "fragment", "isDelete").Create(&eventRecord).Error diff --git a/sqlitedb.go b/sqlitedb.go index 84bbaa5..4e44787 100644 --- a/sqlitedb.go +++ b/sqlitedb.go @@ -1,9 +1,14 @@ package record import ( - "github.com/glebarez/sqlite" - "gorm.io/gorm" + "fmt" "log" + "strings" + "time" + + "github.com/glebarez/sqlite" + "go.uber.org/zap" + "gorm.io/gorm" ) // sqlite数据库初始化,用来存放视频的关键帧等信息 @@ -21,3 +26,44 @@ func initSqliteDB(sqliteDbPath string) *gorm.DB { } return sqlitedb } + +// 保存Recorder到数据库中 +func (r *Recorder) SaveToDB() { + startTime := time.Now().Format("2006-01-02 15:04:05") + endTime := time.Now().Format("2006-01-02 15:04:05") + fileName := r.FileName + if r.FileName == "" { + fileName = strings.ReplaceAll(r.Stream.Path, "/", "-") + "-" + time.Now().Format("2006-01-02-15-04-05") + } + filepath := RecordPluginConfig.Mp4.Path + "/" + r.Stream.Path + "/" + fileName + r.Ext //录像文件存入的完整路径(相对路径) + eventRecord := EventRecord{StreamPath: r.Stream.Path, RecId: r.ID, RecordMode: "0", BeforeDuration: "0", + AfterDuration: fmt.Sprintf("%.0f", r.Fragment.Seconds()), CreateTime: startTime, StartTime: startTime, + EndTime: endTime, Filepath: filepath, Filename: fileName + r.Ext, Urlpath: "record/" + strings.ReplaceAll(r.filePath, "\\", "/"), Fragment: fmt.Sprintf("%.0f", r.Fragment.Seconds()), Type: r.Ext} + err = db.Create(&eventRecord).Error + if err != nil { + r.Error("save to db error", zap.Error(err)) + } else { + r.Info("save to db success", zap.String("filepath", filepath)) + } +} + +// 更新录像文件表中记录,包括录像文件的大小、结束时间以及录像状态 +func (r *Recorder) RemoveRecordById() { + endTime := time.Now().Format("2006-01-02 15:04:05") + fileName := r.FileName + if r.FileName == "" { + fileName = strings.ReplaceAll(r.Stream.Path, "/", "-") + "-" + time.Now().Format("2006-01-02-15-04-05") + } + filepath := RecordPluginConfig.Mp4.Path + "/" + r.Stream.Path + "/" + fileName + r.Ext //录像文件存入的完整路径(相对路径) + + eventRecord := EventRecord{Filepath: filepath, RecordMode: "1", BeforeDuration: "0", + AfterDuration: fmt.Sprintf("%.0f", r.Fragment.Seconds()), + EndTime: endTime, IsDelete: "1"} + + err = db.Where("filepath = ? AND is_delete = ?", filepath, "0").Updates(eventRecord).Error + if err != nil { + r.Error("update to db error", zap.Error(err)) + } else { + r.Info("update to db success", zap.String("filepath", filepath)) + } +} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..826dd33 --- /dev/null +++ b/storage.go @@ -0,0 +1,94 @@ +package record + +import ( + "context" + "sync" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "go.uber.org/zap" +) + +var uploadSemaphore = make(chan struct{}, 8) +var once sync.Once + +type StorageConfig struct { + Endpoint string + AccessKey string + SecretKey string + Bucket string + UseSSL bool +} + +func (r *Recorder) UploadFile(filePath string, fileName string) { + // 使用信号量控制并发数 + uploadSemaphore <- struct{}{} + defer func() { <-uploadSemaphore }() + // 判断Storage是否配置,未配置就不上传 + if r.Storage.Endpoint == "" || r.Storage.SecretKey == "" || r.Storage.AccessKey == "" || r.Storage.Bucket == "" { + r.Info("Minio Storage Config Not Configured") + return + } + + ctx := context.Background() + + endpoint := r.Storage.Endpoint + accessKeyID := r.Storage.AccessKey + secretAccessKey := r.Storage.SecretKey + bucketName := r.Storage.Bucket + useSSL := r.Storage.UseSSL + // Initialize minio client object. + minioClient, err := minio.New(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), + Secure: useSSL, + }) + + if err != nil { + r.Error("create minioClient error:", zap.Error(err)) + } + + // Make a new bucket called testbucket. + location := "us-east-1" + + // 检查Bucket是否存在,不存在就创建Bucket + exists, err := minioClient.BucketExists(ctx, bucketName) + if err != nil { + r.Error("Failed to check bucket existence:", zap.Error(err)) + return + } + + if !exists { + err = minioClient.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{Region: location}) + if err != nil { + r.Error("Create Bucket Error:", zap.Error(err)) + return + } + r.Info("Successfully created Bucket:", zap.String("bucket", bucketName)) + } else { + r.Info("Bucket already exists:", zap.String("bucket", bucketName)) + } + + // Change the value of filePath if the file is in another location + objectName := fileName + fileFullPath := filePath + "/" + objectName + r.Info("Prepare Upload Path: fileName:", zap.String("objectName", objectName)) + contentType := "application/octet-stream" + + // Upload the test file with FPutObject + info, err := minioClient.FPutObject(ctx, bucketName, objectName, fileFullPath, minio.PutObjectOptions{ContentType: contentType}) + if err != nil { + r.Error("Minio PutObject Error:", zap.Error(err)) + } + + r.Info("Successfully uploaded of size ", zap.String("Key", info.Key), zap.Int64("Size", info.Size)) + + r.RemoveRecordById() + + // Remove the file after upload + // 使用定时删除几天前的数据,减少并发录制时写入+删除的磁盘I/O + // err = os.Remove(fileFullPath) + // if err != nil { + // r.Error("Remove file Error:", zap.Error(err)) + // } + // r.Info("Successfully Removed of size ", zap.String("fileFullPath", fileFullPath)) +} diff --git a/subscriber.go b/subscriber.go index 2e86bcd..29ca4ab 100644 --- a/subscriber.go +++ b/subscriber.go @@ -38,6 +38,7 @@ type IRecorder interface { type Recorder struct { Subscriber + Storage StorageConfig SkipTS uint32 Record `json:"-" yaml:"-"` File FileWr `json:"-" yaml:"-"` @@ -67,6 +68,7 @@ func (r *Recorder) CreateFile() (f FileWr, err error) { logFields = append(logFields, zap.Error(err)) r.Error("create file", logFields...) } + r.SaveToDB() return } @@ -121,12 +123,21 @@ func (r *Recorder) cut(absTime uint32) { } } +func (r *Recorder) stopByDuration(absTime uint32) { + if ts := absTime - r.SkipTS; time.Duration(ts)*time.Millisecond >= r.Duration { + r.Info("stop recorder by duration") + r.SkipTS = absTime + r.Stop() + } +} + // func (r *Recorder) Stop(reason ...zap.Field) { // r.Close() // r.Subscriber.Stop(reason...) // } func (r *Recorder) OnEvent(event any) { + // r.Debug("🟡->🟡->🟡 Recorder OnEvent: ", zap.String("event", reflect.TypeOf(event).String())) switch v := event.(type) { case IRecorder: if file, err := r.Spesific.(IRecorder).CreateFile(); err == nil { @@ -156,6 +167,9 @@ func (r *Recorder) OnEvent(event any) { if r.Fragment > 0 && v.IFrame { r.cut(v.AbsTime) } + if r.Duration > 0 && v.IFrame { + r.stopByDuration(v.AbsTime) + } default: r.Subscriber.OnEvent(event) }