1.增加事件录像,事件录像列表接口

2.将下载时间范围内的录像及播放时间范围内的录像参数统一为start和end
3.增加事件录像入数据库功能
This commit is contained in:
pg
2024-08-18 23:28:42 +08:00
parent 699868b602
commit deb8cfedae
8 changed files with 482 additions and 15 deletions

44
entity.go Normal file
View File

@@ -0,0 +1,44 @@
package record
import "time"
// mysql数据库eventrecord表
type EventRecord struct {
Id uint `json:"id" desc:"自增长id" gorm:"primaryKey"`
StreamPath string `json:"streamPath" desc:"流路径" gorm:"column:streamPath"`
EventId string `json:"eventId" desc:"事件编号" gorm:"column:eventId"`
EventType string `json:"eventType" desc:"事件类型" gorm:"column:eventType"`
EventName string `json:"eventName" desc:"事件名称" gorm:"column:eventName"`
BeforeDuration string `json:"beforeDuration" desc:"事件前缓存时长" gorm:"column:beforeDuration"`
AfterDuration string `json:"afterDuration" desc:"事件后缓存时长" gorm:"column:afterDuration"`
RecordTime string `json:"recordTime" desc:"录像时间" gorm:"column:recordTime"`
StartTime string `json:"startTime" desc:"录像开始时间" gorm:"column:startTime"`
EndTime string `json:"endTime" desc:"录像结束时间" gorm:"column:endTime"`
Filepath string `json:"filepath" desc:"录像文件路径" gorm:"column:filepath"`
IsDelete string `json:"isDelete" desc:"是否删除0表示正常1表示删除默认0" gorm:"column:isDelete"`
UserId string `json:"useId" desc:"用户id" gorm:"-;column:useId"`
Filename string `json:"filename" desc:"文件名" gorm:"column:filename"`
Fragment string `json:"fragment" desc:"切片大小" gorm:"-"`
}
// TableName 返回自定义的表名
func (EventRecord) TableName() string {
return "eventrecord"
}
// mysql数据库里Exception 定义异常结构体
type Exception struct {
Timestamp string `json:"Timestamp" gorm:"autoCreateTime"`
AlarmType string `json:"AlarmType"`
AlarmDesc string `json:"AlarmDesc"`
ServerIP string `json:"ServerIP"`
Channel string `json:"Channel"`
}
// sqlite数据库用来存放每个flv文件的关键帧对应的offset及abstime数据
type FLVKeyframe struct {
FLVFileName string `gorm:"not null"`
FrameOffset int64 `gorm:"not null"`
FrameAbstime uint32 `gorm:"not null"`
CreatedAt time.Time `gorm:"autoCreateTime"`
}

44
exception.go Normal file
View File

@@ -0,0 +1,44 @@
package record
import (
"bytes"
"encoding/json"
"fmt"
"github.com/shirou/gopsutil/v3/disk"
"net/http"
"time"
)
// 向第三方发送异常报警
func SendToThirdPartyAPI(exception *Exception) {
exception.Timestamp = time.Now().Format("20060102150405")
exception.ServerIP = RecordPluginConfig.LocalIp
data, err := json.Marshal(exception)
if err != nil {
fmt.Println("Error marshalling exception:", err)
return
}
resp, err := http.Post(RecordPluginConfig.ExceptionPostUrl, "application/json", bytes.NewBuffer(data))
if err != nil {
fmt.Println("Error sending exception to third party API:", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Println("Failed to send exception, status code:", resp.StatusCode)
} else {
fmt.Println("Exception sent successfully!")
}
}
// 磁盘超上限报警
func getDisckException(streamPath string) bool {
d, _ := disk.Usage("/")
if d.UsedPercent >= RecordPluginConfig.DiskMaxPercent {
exceptionChannel <- &Exception{AlarmType: "disk alarm", AlarmDesc: "disk is full", Channel: streamPath}
return true
}
return false
}

13
flv.go
View File

@@ -5,6 +5,8 @@ import (
"io"
"net"
"os"
"strconv"
"strings"
"time"
"go.uber.org/zap"
@@ -156,6 +158,17 @@ func (r *FLVRecorder) OnEvent(event any) {
v.Seek(0, io.SeekEnd)
}
}
case VideoFrame:
if r.VideoReader.Value.IFrame {
go func() { //将视频关键帧的数据存入sqlite数据库中
var flvKeyfram = &FLVKeyframe{FLVFileName: r.Path + "/" + strings.ReplaceAll(r.filePath, "\\", "/"), FrameOffset: r.Offset, FrameAbstime: r.VideoReader.AbsTime}
sqlitedb.Create(flvKeyfram)
}()
r.Info("这是关键帧且取到了r.filePath是" + r.Path + r.filePath)
r.Info("这是关键帧且取到了r.VideoReader.AbsTime是" + strconv.FormatUint(uint64(r.VideoReader.AbsTime), 10))
r.Info("这是关键帧且取到了r.Offset是" + strconv.Itoa(int(r.Offset)))
r.Info("这是关键帧且取到了r.Offset是" + r.Stream.Path)
}
case FLVFrame:
check := false
var absTime uint32

58
main.go
View File

@@ -4,6 +4,7 @@ import (
_ "embed"
"errors"
"io"
"net"
"sync"
. "m7s.live/engine/v4"
@@ -15,13 +16,20 @@ import (
type RecordConfig struct {
config.Subscribe
config.HTTP
Flv Record `desc:"flv录制配置"`
Mp4 Record `desc:"mp4录制配置"`
Fmp4 Record `desc:"fmp4录制配置"`
Hls Record `desc:"hls录制配置"`
Raw Record `desc:"视频裸流录制配置"`
RawAudio Record `desc:"音频裸流录制配置"`
recordings sync.Map
Flv Record `desc:"flv录制配置"`
Mp4 Record `desc:"mp4录制配置"`
Fmp4 Record `desc:"fmp4录制配置"`
Hls Record `desc:"hls录制配置"`
Raw Record `desc:"视频裸流录制配置"`
RawAudio Record `desc:"音频裸流录制配置"`
recordings sync.Map
beforeDuration int `desc:"事件前缓存时长"`
afterDuration int `desc:"事件后缓存时长"`
MysqlDSN string `desc:"mysql数据库连接字符串"`
ExceptionPostUrl string `desc:"第三方异常上报地址"`
SqliteDbPath string `desc:"sqlite数据库路径"`
DiskMaxPercent float64 `desc:"硬盘使用百分之上限值,超过后报警"`
LocalIp string `desc:"本机IP"`
}
//go:embed default.yaml
@@ -53,13 +61,33 @@ var RecordPluginConfig = &RecordConfig{
Path: "record/raw",
Ext: ".", // 默认aac扩展名为.aac,pcma扩展名为.pcma,pcmu扩展名为.pcmu
},
beforeDuration: 30,
afterDuration: 30,
MysqlDSN: "",
ExceptionPostUrl: "http://www.163.com",
SqliteDbPath: "./sqlite.db",
DiskMaxPercent: 80.00,
LocalIp: getLocalIP(),
}
var plugin = InstallPlugin(RecordPluginConfig, defaultYaml)
var exceptionChannel = make(chan *Exception)
func (conf *RecordConfig) OnEvent(event any) {
switch v := event.(type) {
case FirstConfig, config.Config:
if conf.MysqlDSN == "" {
plugin.Error("mysqlDSN 数据库连接配置为空无法运行请在config.yaml里配置")
}
plugin.Info("mysqlDSN is" + conf.MysqlDSN)
go func() { //处理所有异常,录像中断异常、录像读取异常、录像导出文件中断、磁盘容量低于阈值异常、磁盘异常
for exception := range exceptionChannel {
SendToThirdPartyAPI(exception)
}
}()
initMysqlDB(conf.MysqlDSN)
initSqliteDB(conf.SqliteDbPath)
conf.Flv.Init()
conf.Mp4.Init()
conf.Fmp4.Init()
@@ -122,3 +150,19 @@ func getFLVDuration(file io.ReadSeeker) uint32 {
}
return 0
}
func getLocalIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, addr := range addrs {
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
return ipNet.IP.String()
}
}
}
return ""
}

95
mysqldb.go Normal file
View File

@@ -0,0 +1,95 @@
package record
import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
)
var mysqldb *gorm.DB
var err error
var createDataBaseSql = `CREATE DATABASE IF NOT EXISTS m7srecord;`
var useDataBaseSql = `USE m7srecord;`
var initsql = `CREATE TABLE IF NOT EXISTS eventrecord (
id int(11) NOT NULL AUTO_INCREMENT,
streamPath varchar(255) NOT NULL COMMENT '流路径',
eventId varchar(255) DEFAULT NULL COMMENT '事件编号',
eventType varchar(255) DEFAULT NULL COMMENT '事件类型',
eventName varchar(255) DEFAULT NULL COMMENT '事件名称',
beforeDuration int(255) DEFAULT NULL COMMENT '事件前缓存时长',
afterDuration int(255) DEFAULT NULL COMMENT '事件后缓存时长',
recordTime datetime DEFAULT NULL COMMENT '录像时间',
startTime datetime DEFAULT NULL COMMENT '录像开始时间',
endTime datetime DEFAULT NULL COMMENT '录像结束时间',
filepath varchar(255) DEFAULT NULL COMMENT '录像文件路径',
isDelete varchar(255) DEFAULT '0' COMMENT '是否删除0表示正常1表示删除默认0',
fileName varchar(255) DEFAULT NULL COMMENT '文件名',
userId int(11) DEFAULT NULL COMMENT '用户id',
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;`
func initMysqlDB(MysqlDSN string) {
mysqldb, err = gorm.Open(mysql.Open(MysqlDSN), &gorm.Config{})
if err != nil {
log.Fatal(err)
}
mysqldb.Exec(createDataBaseSql)
mysqldb.Exec(useDataBaseSql)
mysqldb.Exec(initsql)
//mysqldb.AutoMigrate(&EventRecord{})
mysqldb.AutoMigrate(&Exception{})
}
func paginate[T any](model T, pageNum, pageSize int, filters map[string]interface{}) ([]T, int64, error) {
var results []T
var totalCount int64
// 计算偏移量
offset := (pageNum - 1) * pageSize
// 查询总记录数
countQuery := mysqldb.Model(model)
for field, value := range filters {
if valueStr, ok := value.(string); ok && (valueStr != "") {
// countQuery = countQuery.Where(field+" LIKE ?", "%"+valueStr+"%")
//} else {
if field == "startTime" {
countQuery = countQuery.Where("recordTime >= ?", valueStr)
} else if field == "endTime" {
countQuery = countQuery.Where("recordTime <= ?", valueStr)
} else {
countQuery = countQuery.Where(field+" = ?", value)
}
}
}
result := countQuery.Count(&totalCount)
if result.Error != nil {
return nil, 0, result.Error
}
// 查询当前页的数据
query := mysqldb.Model(model).Limit(pageSize).Offset(offset)
for field, value := range filters {
if valueStr, ok := value.(string); ok && (valueStr != "") {
// query = query.Where(field+" LIKE ?", "%"+valueStr+"%")
//} else {
if field == "startTime" {
query = query.Where("recordTime >= ?", valueStr)
} else if field == "endTime" {
query = query.Where("recordTime <= ?", valueStr)
} else {
query = query.Where(field+" = ?", value)
}
}
}
result = query.Find(&results)
if result.Error != nil {
return nil, 0, result.Error
}
return results, totalCount, nil
}

195
restful_event.go Normal file
View File

@@ -0,0 +1,195 @@
package record
import (
"encoding/json"
"go.uber.org/zap"
"io"
"net/http"
"strconv"
"time"
"m7s.live/engine/v4/util"
)
func errorJsonString(args map[string]interface{}) string {
resultJsonData := make(map[string]interface{})
for field, value := range args {
resultJsonData[field] = value
}
jsonString, _ := json.Marshal(resultJsonData)
return string(jsonString)
}
// 事件录像
func (conf *RecordConfig) API_event_list(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("token")
resultJsonData := make(map[string]interface{})
resultJsonData["code"] = -1
if token == "" || token != "m7s" {
resultJsonData["msg"] = "token错误"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
//TODO 用token验证用户信息是否有效并获取用户信息换取userid
var postData map[string]interface{}
body, err := io.ReadAll(r.Body)
if err != nil {
resultJsonData["msg"] = "Unable to read request body "
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
err = json.Unmarshal(body, &postData)
pageNum := postData["pageNum"].(float64)
if pageNum <= 0 {
resultJsonData["msg"] = "pageNum error"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
pageSize := postData["pageSize"].(float64)
if pageSize <= 0 {
resultJsonData["msg"] = "pageSize error"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
eventRecords, totalCount, err := paginate(EventRecord{}, int(pageNum), int(pageSize), postData)
if err != nil {
resultJsonData["msg"] = err.Error()
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
resultJsonData["totalCount"] = totalCount
resultJsonData["pageNum"] = pageNum
resultJsonData["pageSize"] = pageSize
resultJsonData["list"] = eventRecords
resultJsonData["code"] = 0
resultJsonData["msg"] = ""
util.ReturnError(util.APIErrorNone, errorJsonString(resultJsonData), w, r)
}
// 事件录像
func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("token")
resultJsonData := make(map[string]interface{})
resultJsonData["code"] = -1
if token == "" || token != "m7s" {
resultJsonData["msg"] = "token错误"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
//TODO 用token验证用户信息是否有效并获取用户信息换取userid
var eventRecordMode EventRecord
// 读取请求体
body, err := io.ReadAll(r.Body)
if err != nil {
resultJsonData["msg"] = "Unable to read request body "
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
// 解析JSON数据到map
err = json.Unmarshal(body, &eventRecordMode)
if err != nil {
resultJsonData["msg"] = "Invalid JSON format "
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
streamPath := eventRecordMode.StreamPath
if streamPath == "" {
resultJsonData["msg"] = "no streamPath"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
//TODO 获取到磁盘容量低,磁盘报错的情况下需要报异常,并且根据事件类型做出处理
if getDisckException(streamPath) {
resultJsonData["msg"] = "disk is full"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
//var streamExist = false
//conf.recordings.Range(func(key, value any) bool {
// existStreamPath := value.(IRecorder).GetSubscriber().Stream.Path
// if existStreamPath == streamPath {
// resultJsonData["msg"] = "streamPath is exist"
// util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
// streamExist = true
// return !streamExist
// }
// return !streamExist
//})
//if streamExist {
// return
//}
eventId := eventRecordMode.EventId
if eventId == "" {
resultJsonData["msg"] = "no eventId"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
eventType := eventRecordMode.EventType
if eventType == "" {
resultJsonData["msg"] = "no eventType"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
eventName := eventRecordMode.EventName
if eventName == "" {
resultJsonData["msg"] = "no eventName"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
beforeDuration := eventRecordMode.BeforeDuration
if beforeDuration == "" {
beforeDuration = strconv.Itoa(conf.beforeDuration)
}
afterDuration := eventRecordMode.AfterDuration
if afterDuration == "" {
afterDuration = strconv.Itoa(conf.afterDuration)
}
recordTime := time.Now().Format("2006-01-02 15:04:05")
fileName := time.Now().Format("20060102150405")
startTime := time.Now().Add(-30 * time.Second).Format("2006-01-02 15:04:05")
endTime := time.Now().Add(30 * time.Second).Format("2006-01-02 15:04:05")
//切片大小
fragment := eventRecordMode.Fragment
//var id string
irecorder := NewFLVRecorder()
recorder := irecorder.GetRecorder()
recorder.FileName = fileName
recorder.append = false
filepath := conf.Flv.Path + "/" + streamPath + "/" + fileName + recorder.Ext
if fragment != "" {
if f, err := time.ParseDuration(fragment); err == nil {
recorder.Fragment = f
}
}
err = irecorder.StartWithFileName(streamPath, fileName)
go func() {
timer := time.NewTimer(30 * time.Second)
// 等待计时器到期
<-timer.C
recorder.Stop(zap.String("reason", "api"))
}()
//id = recorder.ID
if err != nil {
exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", Channel: streamPath}
resultJsonData["msg"] = err.Error()
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
eventRecordMode.Filepath = filepath
eventRecordMode.Filename = fileName + recorder.Ext
eventRecord := EventRecord{StreamPath: streamPath, EventId: eventId, EventType: eventType, EventName: eventName, BeforeDuration: beforeDuration,
AfterDuration: afterDuration, RecordTime: recordTime, StartTime: startTime, EndTime: endTime, Filepath: filepath, Filename: fileName + recorder.Ext}
err = mysqldb.Omit("id", "fragment", "isDelete").Create(&eventRecord).Error
if err != nil {
resultJsonData["msg"] = err.Error()
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
outid := eventRecord.Id
resultJsonData["id"] = outid
resultJsonData["code"] = 0
resultJsonData["msg"] = ""
util.ReturnError(util.APIErrorNone, errorJsonString(resultJsonData), w, r)
}

22
sqlitedb.go Normal file
View File

@@ -0,0 +1,22 @@
package record
import (
"github.com/glebarez/sqlite"
"gorm.io/gorm"
"log"
)
var sqlitedb *gorm.DB
// sqlite数据库初始化用来存放视频的关键帧等信息
func initSqliteDB(sqliteDbPath string) {
// 打开数据库连接
sqlitedb, err = gorm.Open(sqlite.Open(sqliteDbPath), &gorm.Config{})
if err != nil {
log.Fatal(err)
}
err = sqlitedb.AutoMigrate(&FLVKeyframe{})
if err != nil {
log.Fatal(err)
}
}

26
vod.go
View File

@@ -50,12 +50,15 @@ func (conf *RecordConfig) Play_flv_(w http.ResponseWriter, r *http.Request) {
singleFile := filepath.Join(conf.Flv.Path, streamPath+".flv")
query := r.URL.Query()
startTimeStr := query.Get("start")
s, err := strconv.Atoi(startTimeStr)
endTimeStr := query.Get("end")
//s, err := strconv.Atoi(startTimeStr)
startTime, err := time.ParseInLocation("20060102150405", startTimeStr, time.Local)
endTime, err := time.ParseInLocation("20060102150405", endTimeStr, time.Local)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
startTime := time.UnixMilli(int64(s))
//startTime := time.UnixMilli(int64(s))
speedStr := query.Get("speed")
speed, err := strconv.ParseFloat(speedStr, 64)
if err != nil {
@@ -97,6 +100,9 @@ func (conf *RecordConfig) Play_flv_(w http.ResponseWriter, r *http.Request) {
return nil
}
}
if modTime.After(endTime) {
return nil
}
fileList = append(fileList, info)
return nil
})
@@ -220,19 +226,23 @@ func (conf *RecordConfig) Download_flv_(w http.ResponseWriter, r *http.Request)
streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/download/flv/"), ".flv")
singleFile := filepath.Join(conf.Flv.Path, streamPath+".flv")
query := r.URL.Query()
rangeStr := strings.Split(query.Get("range"), "-")
s, err := strconv.Atoi(rangeStr[0])
//rangeStr := strings.Split(query.Get("range"), "-")
//s, err := strconv.Atoi(rangeStr[0])
startTimeStr := query.Get("start")
endTimeStr := query.Get("end")
startTime, err := time.ParseInLocation("20060102150405", startTimeStr, time.Local)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
startTime := time.UnixMilli(int64(s))
e, err := strconv.Atoi(rangeStr[1])
//startTime := time.UnixMilli(int64(s))
//e, err := strconv.Atoi(rangeStr[1])
endTime, err := time.ParseInLocation("20060102150405", endTimeStr, time.Local)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
endTime := time.UnixMilli(int64(e))
//endTime := time.UnixMilli(int64(e))
timeRange := endTime.Sub(startTime)
plugin.Info("download", zap.String("stream", streamPath), zap.Time("start", startTime), zap.Time("end", endTime))
dir := filepath.Join(conf.Flv.Path, streamPath)
@@ -305,7 +315,7 @@ func (conf *RecordConfig) Download_flv_(w http.ResponseWriter, r *http.Request)
offsetDelta := amf.Len() + 15
offset := offsetDelta + len(flvHead)
contentLength += uint64(offset)
metaData["duration"] = timeRange.Seconds()
metaData["duration"] = times[len(times)-1]
metaData["filesize"] = contentLength
for i := range filepositions {
filepositions[i] += uint64(offset)