1.优化gorm调用

2.增加HTTP向第三方接口报警的功能
3.根据事件录像id将对应的录像文件拉取为HDL流
This commit is contained in:
pg
2024-09-14 17:17:06 +08:00
parent deb8cfedae
commit d31d4a34ad
5 changed files with 216 additions and 80 deletions

View File

@@ -4,35 +4,37 @@ import "time"
// mysql数据库eventrecord表
type EventRecord struct {
Id uint `json:"id" desc:"自增长id" gorm:"primaryKey"`
StreamPath string `json:"streamPath" desc:"流路径" gorm:"column:streamPath"`
EventId string `json:"eventId" desc:"事件编号" gorm:"column:eventId"`
EventType string `json:"eventType" desc:"事件类型" gorm:"column:eventType"`
EventName string `json:"eventName" desc:"事件名称" gorm:"column:eventName"`
BeforeDuration string `json:"beforeDuration" desc:"事件前缓存时长" gorm:"column:beforeDuration"`
AfterDuration string `json:"afterDuration" desc:"事件后缓存时长" gorm:"column:afterDuration"`
RecordTime string `json:"recordTime" desc:"录像时间" gorm:"column:recordTime"`
StartTime string `json:"startTime" desc:"录像开始时间" gorm:"column:startTime"`
EndTime string `json:"endTime" desc:"录像结束时间" gorm:"column:endTime"`
Filepath string `json:"filepath" desc:"录像文件路径" gorm:"column:filepath"`
IsDelete string `json:"isDelete" desc:"是否删除0表示正常1表示删除默认0" gorm:"column:isDelete"`
UserId string `json:"useId" desc:"用户id" gorm:"-;column:useId"`
Filename string `json:"filename" desc:"文件名" gorm:"column:filename"`
Fragment string `json:"fragment" desc:"切片大小" gorm:"-"`
Id uint `json:"id" desc:"自增长id" gorm:"primaryKey;autoIncrement"`
StreamPath string `json:"streamPath" desc:"流路径" gorm:"type:varchar(255)"`
EventId string `json:"eventId" desc:"事件编号" gorm:"type:varchar(255)"`
RecordMode string `json:"recordMode" desc:"事件类型,0=连续录像模式1=事件录像模式" gorm:"type:varchar(255)"`
EventName string `json:"eventName" desc:"事件名称" gorm:"type:varchar(255)"`
BeforeDuration string `json:"beforeDuration" desc:"事件前缓存时长" gorm:"type:varchar(255);"`
AfterDuration string `json:"afterDuration" desc:"事件后缓存时长" gorm:"type:varchar(255)"`
CreateTime string `json:"createTime" desc:"录像时间" gorm:"type:varchar(255)"`
StartTime string `json:"startTime" desc:"录像开始时间" gorm:"type:varchar(255)"`
EndTime string `json:"endTime" desc:"录像结束时间" gorm:"type:varchar(255)"`
Filepath string `json:"filePath" desc:"录像文件物理路径" gorm:"type:varchar(255)"`
Urlpath string `json:"urlPath" desc:"录像文件下载URL路径" gorm:"type:varchar(255)"`
IsDelete string `json:"isDelete" desc:"是否删除0表示正常1表示删除默认0" gorm:"type:varchar(255);default:'0'"`
UserId string `json:"useId" desc:"用户id" gorm:"type:varchar(255)"`
Filename string `json:"fileName" desc:"文件名" gorm:"type:varchar(255)"`
Fragment string `json:"fragment" desc:"切片大小" gorm:"type:varchar(255)"`
EventDesc string `json:"eventDesc" desc:"事件描述" gorm:"type:varchar(255)"`
}
// TableName 返回自定义的表名
func (EventRecord) TableName() string {
return "eventrecord"
}
//// TableName 返回自定义的表名
//func (EventRecord) TableName() string {
// return "eventrecord"
//}
// mysql数据库里Exception 定义异常结构体
type Exception struct {
Timestamp string `json:"Timestamp" gorm:"autoCreateTime"`
AlarmType string `json:"AlarmType"`
AlarmDesc string `json:"AlarmDesc"`
ServerIP string `json:"ServerIP"`
Channel string `json:"Channel"`
CreateTime string `json:"createTime" gorm:"type:varchar(50)"`
AlarmType string `json:"alarmType" gorm:"type:varchar(50)"`
AlarmDesc string `json:"alarmDesc" gorm:"type:varchar(50)"`
ServerIP string `json:"serverIP" gorm:"type:varchar(50)"`
StreamPath string `json:"streamPath" gorm:"type:varchar(50)"`
}
// sqlite数据库用来存放每个flv文件的关键帧对应的offset及abstime数据

View File

@@ -11,14 +11,18 @@ import (
// 向第三方发送异常报警
func SendToThirdPartyAPI(exception *Exception) {
exception.Timestamp = time.Now().Format("20060102150405")
exception.CreateTime = time.Now().Format("2006-01-02 15:04:05")
exception.ServerIP = RecordPluginConfig.LocalIp
data, err := json.Marshal(exception)
if err != nil {
fmt.Println("Error marshalling exception:", err)
return
}
err = mysqldb.Create(&exception).Error
if err != nil {
fmt.Println("异常数据插入数据库失败:", err)
return
}
resp, err := http.Post(RecordPluginConfig.ExceptionPostUrl, "application/json", bytes.NewBuffer(data))
if err != nil {
fmt.Println("Error sending exception to third party API:", err)
@@ -37,7 +41,7 @@ func SendToThirdPartyAPI(exception *Exception) {
func getDisckException(streamPath string) bool {
d, _ := disk.Usage("/")
if d.UsedPercent >= RecordPluginConfig.DiskMaxPercent {
exceptionChannel <- &Exception{AlarmType: "disk alarm", AlarmDesc: "disk is full", Channel: streamPath}
exceptionChannel <- &Exception{AlarmType: "disk alarm", AlarmDesc: "disk is full", StreamPath: streamPath}
return true
}
return false

1
httputil.go Normal file
View File

@@ -0,0 +1 @@
package record

View File

@@ -1,9 +1,11 @@
package record
import (
"errors"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"log"
"reflect"
)
var mysqldb *gorm.DB
@@ -13,24 +15,6 @@ var createDataBaseSql = `CREATE DATABASE IF NOT EXISTS m7srecord;`
var useDataBaseSql = `USE m7srecord;`
var initsql = `CREATE TABLE IF NOT EXISTS eventrecord (
id int(11) NOT NULL AUTO_INCREMENT,
streamPath varchar(255) NOT NULL COMMENT '流路径',
eventId varchar(255) DEFAULT NULL COMMENT '事件编号',
eventType varchar(255) DEFAULT NULL COMMENT '事件类型',
eventName varchar(255) DEFAULT NULL COMMENT '事件名称',
beforeDuration int(255) DEFAULT NULL COMMENT '事件前缓存时长',
afterDuration int(255) DEFAULT NULL COMMENT '事件后缓存时长',
recordTime datetime DEFAULT NULL COMMENT '录像时间',
startTime datetime DEFAULT NULL COMMENT '录像开始时间',
endTime datetime DEFAULT NULL COMMENT '录像结束时间',
filepath varchar(255) DEFAULT NULL COMMENT '录像文件路径',
isDelete varchar(255) DEFAULT '0' COMMENT '是否删除0表示正常1表示删除默认0',
fileName varchar(255) DEFAULT NULL COMMENT '文件名',
userId int(11) DEFAULT NULL COMMENT '用户id',
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;`
func initMysqlDB(MysqlDSN string) {
mysqldb, err = gorm.Open(mysql.Open(MysqlDSN), &gorm.Config{})
if err != nil {
@@ -38,8 +22,7 @@ func initMysqlDB(MysqlDSN string) {
}
mysqldb.Exec(createDataBaseSql)
mysqldb.Exec(useDataBaseSql)
mysqldb.Exec(initsql)
//mysqldb.AutoMigrate(&EventRecord{})
mysqldb.AutoMigrate(&EventRecord{})
mysqldb.AutoMigrate(&Exception{})
}
@@ -52,19 +35,35 @@ func paginate[T any](model T, pageNum, pageSize int, filters map[string]interfac
// 查询总记录数
countQuery := mysqldb.Model(model)
// 使用反射设置字段值
modelValue := reflect.ValueOf(&model).Elem() // 获取结构体值
modelType := modelValue.Type()
for field, value := range filters {
if valueStr, ok := value.(string); ok && (valueStr != "") {
// countQuery = countQuery.Where(field+" LIKE ?", "%"+valueStr+"%")
//} else {
if valueStr, ok := value.(string); ok && valueStr != "" {
if field == "startTime" {
countQuery = countQuery.Where("recordTime >= ?", valueStr)
countQuery = countQuery.Where("create_time >= ?", valueStr)
} else if field == "endTime" {
countQuery = countQuery.Where("recordTime <= ?", valueStr)
countQuery = countQuery.Where("create_time <= ?", valueStr)
} else {
countQuery = countQuery.Where(field+" = ?", value)
// 使用反射查找字段并设置值
fieldName, err := findFieldByName(modelType, field)
if err != nil {
return nil, 0, err
}
// 设置字段值
if modelField := modelValue.FieldByName(fieldName); modelField.IsValid() && modelField.CanSet() {
modelField.Set(reflect.ValueOf(valueStr))
countQuery = countQuery.Where(&model)
} else {
return nil, 0, errors.New("invalid field: " + field)
}
}
}
}
result := countQuery.Count(&totalCount)
if result.Error != nil {
return nil, 0, result.Error
@@ -72,16 +71,26 @@ func paginate[T any](model T, pageNum, pageSize int, filters map[string]interfac
// 查询当前页的数据
query := mysqldb.Model(model).Limit(pageSize).Offset(offset)
for field, value := range filters {
if valueStr, ok := value.(string); ok && (valueStr != "") {
// query = query.Where(field+" LIKE ?", "%"+valueStr+"%")
//} else {
if valueStr, ok := value.(string); ok && valueStr != "" {
if field == "startTime" {
query = query.Where("recordTime >= ?", valueStr)
query = query.Where("create_time >= ?", valueStr)
} else if field == "endTime" {
query = query.Where("recordTime <= ?", valueStr)
query = query.Where("create_time <= ?", valueStr)
} else {
query = query.Where(field+" = ?", value)
// 使用反射设置查询字段值
fieldName, err := findFieldByName(modelType, field)
if err != nil {
return nil, 0, err
}
if modelField := modelValue.FieldByName(fieldName); modelField.IsValid() && modelField.CanSet() {
modelField.Set(reflect.ValueOf(valueStr))
query = query.Where(&model)
} else {
return nil, 0, errors.New("invalid field: " + field)
}
}
}
}
@@ -93,3 +102,14 @@ func paginate[T any](model T, pageNum, pageSize int, filters map[string]interfac
return results, totalCount, nil
}
// findFieldByName 查找结构体中的字段名
func findFieldByName(modelType reflect.Type, field string) (string, error) {
for i := 0; i < modelType.NumField(); i++ {
structField := modelType.Field(i)
if structField.Tag.Get("json") == field || structField.Name == field {
return structField.Name, nil
}
}
return "", errors.New("field not found: " + field)
}

View File

@@ -2,10 +2,14 @@ package record
import (
"encoding/json"
"fmt"
"go.uber.org/zap"
"io"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"time"
"m7s.live/engine/v4/util"
@@ -20,7 +24,113 @@ func errorJsonString(args map[string]interface{}) string {
return string(jsonString)
}
// 事件录像
// 根据事件id拉取录像
func (conf *RecordConfig) API_event_pull(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("token")
resultJsonData := make(map[string]interface{})
resultJsonData["code"] = -1
if token == "" || token != "m7s" {
resultJsonData["msg"] = "token错误"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
var postData map[string]interface{}
body, err := io.ReadAll(r.Body)
if err != nil {
resultJsonData["msg"] = "Unable to read request body "
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
err = json.Unmarshal(body, &postData)
id := int(postData["id"].(float64))
if id > 0 {
var eventRecord EventRecord
result := mysqldb.First(&eventRecord, id) // 根据主键查询
if result.Error != nil {
log.Println("Error finding eventrecord:", result.Error)
}
fileUrlPath := eventRecord.Urlpath
if fileUrlPath != "" {
// 判断协议类型
scheme := "http"
if r.TLS != nil {
scheme = "https" // 如果 r.TLS 不为空,则表示是 HTTPS
}
// 获取完整请求的地址:包含协议、主机和端口
requestHost := fmt.Sprintf("%s://%s", scheme, r.Host)
fileUrlPath = requestHost + "/" + fileUrlPath
newStreamPath := eventRecord.StreamPath + "/" + strings.TrimSuffix(eventRecord.Filename, ".flv")
requestFullPath := fmt.Sprintf(requestHost+"/hdl/api/pull?streamPath=%s&target=%s", newStreamPath, fileUrlPath)
resp, err := http.Get(requestFullPath)
if err != nil {
log.Fatal("Error while making GET request:", err)
}
defer resp.Body.Close() // 确保函数结束后关闭响应体
// 读取响应体
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal("Error reading response body:", err)
}
// 打印响应内容
fmt.Println("Response Status:", resp.Status)
fmt.Println("Response Body:", string(body))
resultJsonData["streamPath"] = newStreamPath
resultJsonData["code"] = 0
resultJsonData["msg"] = ""
util.ReturnError(util.APIErrorNone, errorJsonString(resultJsonData), w, r)
}
}
}
// 录像报警列表
func (conf *RecordConfig) API_alarm_list(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("token")
resultJsonData := make(map[string]interface{})
resultJsonData["code"] = -1
if token == "" || token != "m7s" {
resultJsonData["msg"] = "token错误"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
//TODO 用token验证用户信息是否有效并获取用户信息换取userid
var postData map[string]interface{}
body, err := io.ReadAll(r.Body)
if err != nil {
resultJsonData["msg"] = "Unable to read request body "
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
err = json.Unmarshal(body, &postData)
pageNum := postData["pageNum"].(float64)
if pageNum <= 0 {
resultJsonData["msg"] = "pageNum error"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
pageSize := postData["pageSize"].(float64)
if pageSize <= 0 {
resultJsonData["msg"] = "pageSize error"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
exceptions, totalCount, err := paginate(Exception{}, int(pageNum), int(pageSize), postData)
if err != nil {
resultJsonData["msg"] = err.Error()
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
resultJsonData["totalCount"] = totalCount
resultJsonData["pageNum"] = pageNum
resultJsonData["pageSize"] = pageSize
resultJsonData["list"] = exceptions
resultJsonData["code"] = 0
resultJsonData["msg"] = ""
util.ReturnError(util.APIErrorNone, errorJsonString(resultJsonData), w, r)
}
// 事件录像列表
func (conf *RecordConfig) API_event_list(w http.ResponseWriter, r *http.Request) {
token := r.Header.Get("token")
resultJsonData := make(map[string]interface{})
@@ -78,7 +188,7 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
return
}
//TODO 用token验证用户信息是否有效并获取用户信息换取userid
var eventRecordMode EventRecord
var eventRecordModel EventRecord
// 读取请求体
body, err := io.ReadAll(r.Body)
if err != nil {
@@ -87,13 +197,13 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
return
}
// 解析JSON数据到map
err = json.Unmarshal(body, &eventRecordMode)
err = json.Unmarshal(body, &eventRecordModel)
if err != nil {
resultJsonData["msg"] = "Invalid JSON format "
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
streamPath := eventRecordMode.StreamPath
streamPath := eventRecordModel.StreamPath
if streamPath == "" {
resultJsonData["msg"] = "no streamPath"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
@@ -119,29 +229,29 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
//if streamExist {
// return
//}
eventId := eventRecordMode.EventId
eventId := eventRecordModel.EventId
if eventId == "" {
resultJsonData["msg"] = "no eventId"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
eventType := eventRecordMode.EventType
if eventType == "" {
resultJsonData["msg"] = "no eventType"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
eventName := eventRecordMode.EventName
//recordMode := eventRecordModel.RecordMode
//if recordMode == "" {
// resultJsonData["msg"] = "no recordMode"
// util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
// return
//}
eventName := eventRecordModel.EventName
if eventName == "" {
resultJsonData["msg"] = "no eventName"
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
beforeDuration := eventRecordMode.BeforeDuration
beforeDuration := eventRecordModel.BeforeDuration
if beforeDuration == "" {
beforeDuration = strconv.Itoa(conf.beforeDuration)
}
afterDuration := eventRecordMode.AfterDuration
afterDuration := eventRecordModel.AfterDuration
if afterDuration == "" {
afterDuration = strconv.Itoa(conf.afterDuration)
}
@@ -150,13 +260,14 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
startTime := time.Now().Add(-30 * time.Second).Format("2006-01-02 15:04:05")
endTime := time.Now().Add(30 * time.Second).Format("2006-01-02 15:04:05")
//切片大小
fragment := eventRecordMode.Fragment
fragment := eventRecordModel.Fragment
//var id string
irecorder := NewFLVRecorder()
recorder := irecorder.GetRecorder()
recorder.FileName = fileName
recorder.append = false
filepath := conf.Flv.Path + "/" + streamPath + "/" + fileName + recorder.Ext
urlpath := "record/" + streamPath + "/" + fileName + recorder.Ext
if fragment != "" {
if f, err := time.ParseDuration(fragment); err == nil {
recorder.Fragment = f
@@ -168,19 +279,17 @@ func (conf *RecordConfig) API_event_start(w http.ResponseWriter, r *http.Request
// 等待计时器到期
<-timer.C
recorder.Stop(zap.String("reason", "api"))
irecorder.Stop(zap.String("reason", "api"))
}()
//id = recorder.ID
if err != nil {
exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", Channel: streamPath}
exceptionChannel <- &Exception{AlarmType: "record", AlarmDesc: "录像失败", StreamPath: streamPath}
resultJsonData["msg"] = err.Error()
util.ReturnError(-1, errorJsonString(resultJsonData), w, r)
return
}
eventRecordMode.Filepath = filepath
eventRecordMode.Filename = fileName + recorder.Ext
eventRecord := EventRecord{StreamPath: streamPath, EventId: eventId, EventType: eventType, EventName: eventName, BeforeDuration: beforeDuration,
AfterDuration: afterDuration, RecordTime: recordTime, StartTime: startTime, EndTime: endTime, Filepath: filepath, Filename: fileName + recorder.Ext}
eventRecord := EventRecord{StreamPath: streamPath, EventId: eventId, RecordMode: "1", EventName: eventName, BeforeDuration: beforeDuration,
AfterDuration: afterDuration, CreateTime: recordTime, StartTime: startTime, EndTime: endTime, Filepath: filepath, Filename: fileName + recorder.Ext, EventDesc: eventRecordModel.EventDesc, Urlpath: urlpath}
err = mysqldb.Omit("id", "fragment", "isDelete").Create(&eventRecord).Error
if err != nil {
resultJsonData["msg"] = err.Error()