mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-12-24 13:48:04 +08:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db265e0ef0 | ||
|
|
b6ee2843b0 | ||
|
|
1a8e2bc816 | ||
|
|
bc0c761aa8 | ||
|
|
cabd0e3088 |
@@ -143,10 +143,10 @@ func (s *Server) api_Config_YAML_All(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// 3. Process plugin configs.
|
||||
for _, meta := range plugins {
|
||||
if filterName != "" && meta.Name != filterName {
|
||||
if filterName != "" && !strings.EqualFold(meta.Name, filterName) {
|
||||
continue
|
||||
}
|
||||
|
||||
name := strings.ToLower(meta.Name)
|
||||
configType := meta.Type
|
||||
if configType.Kind() == reflect.Ptr {
|
||||
configType = configType.Elem()
|
||||
@@ -168,12 +168,12 @@ func (s *Server) api_Config_YAML_All(rw http.ResponseWriter, r *http.Request) {
|
||||
configSections = append(configSections, struct {
|
||||
name string
|
||||
data any
|
||||
}{meta.Name, mergedConf})
|
||||
}{name, mergedConf})
|
||||
} else {
|
||||
configSections = append(configSections, struct {
|
||||
name string
|
||||
data any
|
||||
}{meta.Name, pluginConf})
|
||||
}{name, pluginConf})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package plugin_crontab
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -327,11 +328,9 @@ func (ct *CrontabPlugin) ListRecordPlanStreams(ctx context.Context, req *cronpb.
|
||||
}
|
||||
|
||||
func (ct *CrontabPlugin) AddRecordPlanStream(ctx context.Context, req *cronpb.PlanStream) (*cronpb.Response, error) {
|
||||
if req.PlanId == 0 {
|
||||
return &cronpb.Response{
|
||||
Code: 400,
|
||||
Message: "record_plan_id is required",
|
||||
}, nil
|
||||
planId := 1
|
||||
if req.PlanId > 0 {
|
||||
planId = int(req.PlanId)
|
||||
}
|
||||
|
||||
if strings.TrimSpace(req.StreamPath) == "" {
|
||||
@@ -342,7 +341,7 @@ func (ct *CrontabPlugin) AddRecordPlanStream(ctx context.Context, req *cronpb.Pl
|
||||
}
|
||||
|
||||
// 从内存中获取录制计划
|
||||
plan, ok := ct.recordPlans.Get(uint(req.PlanId))
|
||||
plan, ok := ct.recordPlans.Get(uint(planId))
|
||||
if !ok {
|
||||
return &cronpb.Response{
|
||||
Code: 404,
|
||||
@@ -353,7 +352,7 @@ func (ct *CrontabPlugin) AddRecordPlanStream(ctx context.Context, req *cronpb.Pl
|
||||
// 检查是否已存在相同的记录
|
||||
var count int64
|
||||
searchModel := pkg.RecordPlanStream{
|
||||
PlanID: uint(req.PlanId),
|
||||
PlanID: uint(planId),
|
||||
StreamPath: req.StreamPath,
|
||||
}
|
||||
if err := ct.DB.Model(&searchModel).Where(&searchModel).Count(&count).Error; err != nil {
|
||||
@@ -370,10 +369,16 @@ func (ct *CrontabPlugin) AddRecordPlanStream(ctx context.Context, req *cronpb.Pl
|
||||
}, nil
|
||||
}
|
||||
|
||||
fragment := "60s"
|
||||
|
||||
if req.Fragment != "" {
|
||||
fragment = req.Fragment
|
||||
}
|
||||
|
||||
stream := &pkg.RecordPlanStream{
|
||||
PlanID: uint(req.PlanId),
|
||||
StreamPath: req.StreamPath,
|
||||
Fragment: req.Fragment,
|
||||
Fragment: fragment,
|
||||
FilePath: req.FilePath,
|
||||
Enable: req.Enable,
|
||||
RecordType: req.RecordType,
|
||||
@@ -406,11 +411,9 @@ func (ct *CrontabPlugin) AddRecordPlanStream(ctx context.Context, req *cronpb.Pl
|
||||
}
|
||||
|
||||
func (ct *CrontabPlugin) UpdateRecordPlanStream(ctx context.Context, req *cronpb.PlanStream) (*cronpb.Response, error) {
|
||||
if req.PlanId == 0 {
|
||||
return &cronpb.Response{
|
||||
Code: 400,
|
||||
Message: "record_plan_id is required",
|
||||
}, nil
|
||||
planId := 1
|
||||
if req.PlanId > 0 {
|
||||
planId = int(req.PlanId)
|
||||
}
|
||||
|
||||
if strings.TrimSpace(req.StreamPath) == "" {
|
||||
@@ -423,7 +426,7 @@ func (ct *CrontabPlugin) UpdateRecordPlanStream(ctx context.Context, req *cronpb
|
||||
// 检查记录是否存在
|
||||
var existingStream pkg.RecordPlanStream
|
||||
searchModel := pkg.RecordPlanStream{
|
||||
PlanID: uint(req.PlanId),
|
||||
PlanID: uint(planId),
|
||||
StreamPath: req.StreamPath,
|
||||
}
|
||||
if err := ct.DB.Where(&searchModel).First(&existingStream).Error; err != nil {
|
||||
@@ -524,7 +527,7 @@ func (ct *CrontabPlugin) RemoveRecordPlanStream(ctx context.Context, req *cronpb
|
||||
// 停止所有相关的定时任务
|
||||
ct.crontabs.Range(func(crontab *Crontab) bool {
|
||||
if crontab.RecordPlanStream.StreamPath == req.StreamPath && crontab.RecordPlan.ID == uint(req.PlanId) {
|
||||
crontab.Stop(nil)
|
||||
crontab.Stop(errors.New("remove record plan"))
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
@@ -359,13 +359,16 @@ func (cron *Crontab) startRecording() {
|
||||
|
||||
// 发送开始录制请求
|
||||
resp, err := http.Post(fmt.Sprintf("http://%s/mp4/api/start/%s", addr, cron.StreamPath), "application/json", bytes.NewBuffer(jsonBody))
|
||||
cron.Debug("record request", "url is ", fmt.Sprintf("http://%s/mp4/api/start/%s", addr, cron.StreamPath), "jsonBody is ", string(jsonBody))
|
||||
if err != nil {
|
||||
time.Sleep(time.Second)
|
||||
cron.Error("开始录制失败: %v", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
time.Sleep(time.Second)
|
||||
cron.Error("开始录制失败,HTTP状态码: %d", resp.StatusCode)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package plugin_crontab
|
||||
|
||||
import (
|
||||
"gorm.io/gorm"
|
||||
"strings"
|
||||
|
||||
"m7s.live/v5/plugin/crontab/pkg"
|
||||
@@ -9,16 +10,44 @@ import (
|
||||
// InitDefaultPlans 初始化默认的录制计划
|
||||
// 包括工作日录制计划和周末录制计划
|
||||
func (ct *CrontabPlugin) InitDefaultPlans() {
|
||||
|
||||
// 创建全天24小时录制计划(七天全天录制)的计划字符串
|
||||
allDayPlanStr := buildPlanString(true, true, true, true, true, true, true) // 周日到周六
|
||||
|
||||
// 检查是否已存在相同内容的工作日录制计划
|
||||
var count int64
|
||||
if err := ct.DB.Model(&pkg.RecordPlan{}).Where("plan = ?", allDayPlanStr).Count(&count).Error; err != nil {
|
||||
ct.Error("检查24小时录制计划失败: %v", err)
|
||||
} else if count == 0 {
|
||||
// 不存在相同内容的计划,创建新计划
|
||||
workdayPlan := &pkg.RecordPlan{
|
||||
Model: gorm.Model{ID: 1},
|
||||
Name: "七天全天录制计划",
|
||||
Plan: allDayPlanStr,
|
||||
Enable: true,
|
||||
}
|
||||
|
||||
if err := ct.DB.Create(workdayPlan).Error; err != nil {
|
||||
ct.Error("创建七天全天录制计划失败: %v", err)
|
||||
} else {
|
||||
ct.Info("成功创建七天全天录制计划")
|
||||
// 添加到内存中
|
||||
ct.recordPlans.Add(workdayPlan)
|
||||
}
|
||||
} else {
|
||||
ct.Info("已存在相同内容的七天全天录制计划,跳过创建")
|
||||
}
|
||||
|
||||
// 创建工作日录制计划(周一到周五全天录制)的计划字符串
|
||||
workdayPlanStr := buildPlanString(false, true, true, true, true, true, false) // 周一到周五
|
||||
|
||||
// 检查是否已存在相同内容的工作日录制计划
|
||||
var count int64
|
||||
if err := ct.DB.Model(&pkg.RecordPlan{}).Where("plan = ?", workdayPlanStr).Count(&count).Error; err != nil {
|
||||
ct.Error("检查工作日录制计划失败: %v", err)
|
||||
} else if count == 0 {
|
||||
// 不存在相同内容的计划,创建新计划
|
||||
workdayPlan := &pkg.RecordPlan{
|
||||
Model: gorm.Model{ID: 2},
|
||||
Name: "工作日录制计划",
|
||||
Plan: workdayPlanStr,
|
||||
Enable: true,
|
||||
@@ -44,6 +73,7 @@ func (ct *CrontabPlugin) InitDefaultPlans() {
|
||||
} else if count == 0 {
|
||||
// 不存在相同内容的计划,创建新计划
|
||||
weekendPlan := &pkg.RecordPlan{
|
||||
Model: gorm.Model{ID: 3},
|
||||
Name: "周末录制计划",
|
||||
Plan: weekendPlanStr,
|
||||
Enable: true,
|
||||
@@ -69,7 +99,7 @@ func buildPlanString(sun, mon, tue, wed, thu, fri, sat bool) string {
|
||||
|
||||
// 按照周日、周一、...、周六的顺序
|
||||
days := []bool{sun, mon, tue, wed, thu, fri, sat}
|
||||
|
||||
|
||||
for _, record := range days {
|
||||
if record {
|
||||
// 该天录制,24小时都为1
|
||||
|
||||
@@ -1,9 +1,40 @@
|
||||
-- 初始化录制计划的 SQL 脚本
|
||||
-- 包含两个预设计划:工作日全天录制和周末全天录制
|
||||
-- 包含三个预设计划:工作日全天录制,周末全天录制,每天全天录制
|
||||
|
||||
-- 24小时不间断录制计划(每天全天录制)
|
||||
INSERT INTO record_plans (id, name, plan, enable, created_at, updated_at)
|
||||
SELECT 1,'每天全天录制',
|
||||
-- 168位的计划字符串,格式为:
|
||||
-- 前24位为周日,接着24位为周一,以此类推到周六
|
||||
-- 0表示不录制,1表示录制
|
||||
-- 工作日录制:周一到周五全为1,周六周日全为0
|
||||
CONCAT(
|
||||
-- 周日(0):24个1
|
||||
REPEAT('1', 24),
|
||||
-- 周一(1):24个1
|
||||
REPEAT('1', 24),
|
||||
-- 周二(2):24个1
|
||||
REPEAT('1', 24),
|
||||
-- 周三(3):24个1
|
||||
REPEAT('1', 24),
|
||||
-- 周四(4):24个1
|
||||
REPEAT('1', 24),
|
||||
-- 周五(5):24个1
|
||||
REPEAT('1', 24),
|
||||
-- 周六(6):24个1
|
||||
REPEAT('1', 24)
|
||||
),
|
||||
TRUE, -- 启用状态
|
||||
NOW(), -- 创建时间
|
||||
NOW() -- 更新时间
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1 FROM record_plans WHERE name = '每天全天录制'
|
||||
);
|
||||
|
||||
|
||||
-- 工作日计划(周一到周五全天录制)
|
||||
INSERT INTO record_plans (name, plan, enable, created_at, updated_at)
|
||||
SELECT '工作日录制计划',
|
||||
INSERT INTO record_plans (id,name, plan, enable, created_at, updated_at)
|
||||
SELECT 2,'工作日录制计划',
|
||||
-- 168位的计划字符串,格式为:
|
||||
-- 前24位为周日,接着24位为周一,以此类推到周六
|
||||
-- 0表示不录制,1表示录制
|
||||
@@ -32,8 +63,8 @@ WHERE NOT EXISTS (
|
||||
);
|
||||
|
||||
-- 周末计划(周六和周日全天录制)
|
||||
INSERT INTO record_plans (name, plan, enable, created_at, updated_at)
|
||||
SELECT '周末录制计划',
|
||||
INSERT INTO record_plans (id,name, plan, enable, created_at, updated_at)
|
||||
SELECT 3,'周末录制计划',
|
||||
-- 168位的计划字符串
|
||||
-- 周末录制:周六周日全为1,周一到周五全为0
|
||||
CONCAT(
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
type RecordPlan struct {
|
||||
gorm.Model
|
||||
Name string `json:"name" gorm:"default:''"`
|
||||
Plan string `json:"plan" gorm:"type:text"`
|
||||
Plan string `json:"plan" gorm:"type:varchar(255)"`
|
||||
Enable bool `json:"enable" gorm:"default:false"` // 是否启用
|
||||
}
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
type RecordPlanStream struct {
|
||||
PlanID uint `json:"plan_id" gorm:"primaryKey;type:bigint;not null"` // 录制计划ID
|
||||
StreamPath string `json:"stream_path" gorm:"primaryKey;type:varchar(255)"`
|
||||
Fragment string `json:"fragment" gorm:"type:text"`
|
||||
Fragment string `json:"fragment" gorm:"type:varchar(255)"`
|
||||
FilePath string `json:"file_path" gorm:"type:varchar(255)"`
|
||||
CreatedAt time.Time
|
||||
UpdatedAt time.Time
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"gorm.io/gorm"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -642,7 +643,13 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb.
|
||||
}
|
||||
}
|
||||
} else {
|
||||
d.Stop(fmt.Errorf("password changed"))
|
||||
d.Status = DeviceOfflineStatus
|
||||
d.Online = false
|
||||
d.channels.Range(func(c *Channel) bool {
|
||||
c.Status = gb28181.ChannelOffStatus
|
||||
return true
|
||||
})
|
||||
//d.Stop(fmt.Errorf("password changed"))
|
||||
}
|
||||
|
||||
resp.Code = 0
|
||||
@@ -2815,49 +2822,17 @@ func (gb *GB28181Plugin) RemoveDevice(ctx context.Context, req *pb.RemoveDeviceR
|
||||
|
||||
// 使用数据库中的 DeviceId 从内存中查找设备
|
||||
if device, ok := gb.devices.Get(req.Id); ok {
|
||||
device.channels.Range(func(channel *Channel) bool {
|
||||
if err := device.plugin.DB.Where("device_id = ?", device.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
|
||||
device.Error("删除设备通道记录失败", "error", err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
if device.Online {
|
||||
// 停止设备相关任务
|
||||
device.Stop(fmt.Errorf("device removed"))
|
||||
device.WaitStopped()
|
||||
}
|
||||
// device.Stop() 会调用 Dispose(),其中已包含从 gb.devices 中移除设备的逻辑
|
||||
|
||||
// 开启数据库事务
|
||||
//tx := gb.DB.Begin()
|
||||
//if tx.Error != nil {
|
||||
// resp.Code = 500
|
||||
// resp.Message = "开启事务失败"
|
||||
// return resp, tx.Error
|
||||
//}
|
||||
//
|
||||
//// 删除设备
|
||||
//if err := tx.Delete(&Device{DeviceId: req.Id}).Error; err != nil {
|
||||
// tx.Rollback()
|
||||
// resp.Code = 500
|
||||
// resp.Message = "删除设备失败"
|
||||
// return resp, err
|
||||
//}
|
||||
//
|
||||
//// 删除设备关联的通道
|
||||
//if err := tx.Where("device_id = ?", req.Id).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
|
||||
// tx.Rollback()
|
||||
// resp.Code = 500
|
||||
// resp.Message = "删除设备通道失败"
|
||||
// return resp, err
|
||||
//}
|
||||
//
|
||||
//// 提交事务
|
||||
//if err := tx.Commit().Error; err != nil {
|
||||
// tx.Rollback()
|
||||
// resp.Code = 500
|
||||
// resp.Message = "提交事务失败"
|
||||
// return resp, err
|
||||
//device.channels.Range(func(channel *Channel) bool {
|
||||
// if err := device.plugin.DB.Where("device_id = ?", device.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
|
||||
// device.Error("删除设备通道记录失败", "error", err)
|
||||
// }
|
||||
// return true
|
||||
//})
|
||||
//if device.Online {
|
||||
// 停止设备相关任务
|
||||
device.DeletedAt = gorm.DeletedAt{Time: time.Now(), Valid: true}
|
||||
device.Stop(fmt.Errorf("device removed"))
|
||||
device.WaitStopped()
|
||||
//}
|
||||
|
||||
resp.Code = 200
|
||||
|
||||
@@ -78,7 +78,7 @@ type Device struct {
|
||||
KeepaliveCount int `gorm:"default:3" default:"3"` // 心跳次数
|
||||
ChannelCount int // 通道个数
|
||||
Expires int // 注册有效期
|
||||
CreateTime time.Time // 创建时间
|
||||
CreateTime time.Time `gorm:"primaryKey"` // 创建时间
|
||||
UpdateTime time.Time // 更新时间
|
||||
Charset string // 字符集, 支持 UTF-8 与 GB2312
|
||||
SubscribeCatalog int `gorm:"default:0"` // 目录订阅周期,0为不订阅
|
||||
@@ -154,8 +154,8 @@ func (d *Device) GetKey() string {
|
||||
|
||||
// CatalogRequest 目录请求结构体
|
||||
type CatalogRequest struct {
|
||||
SN, SumNum int
|
||||
FirstResponse bool // 是否为第一个响应
|
||||
SN, SumNum, TotalCount int
|
||||
FirstResponse bool // 是否为第一个响应
|
||||
*util.Promise
|
||||
sync.Mutex // 保护并发访问
|
||||
}
|
||||
@@ -168,21 +168,99 @@ func (r *CatalogRequest) GetKey() int {
|
||||
func (r *CatalogRequest) AddResponse() bool {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
fmt.Println("r.FirstResponse: " + fmt.Sprintf("%v", r.FirstResponse))
|
||||
wasFirst := r.FirstResponse
|
||||
r.FirstResponse = false
|
||||
fmt.Println("r.FirstResponse after: " + fmt.Sprintf("%v", r.FirstResponse))
|
||||
|
||||
return wasFirst
|
||||
}
|
||||
|
||||
// IsComplete 检查是否完成接收
|
||||
func (r *CatalogRequest) IsComplete(channelsLength int) bool {
|
||||
func (r *CatalogRequest) IsComplete() bool {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return channelsLength >= r.SumNum
|
||||
return r.TotalCount >= r.SumNum
|
||||
}
|
||||
|
||||
type CatalogHandlerQueueTask struct {
|
||||
task.Work
|
||||
}
|
||||
|
||||
var catalogHandlerQueueTask CatalogHandlerQueueTask
|
||||
|
||||
type catalogHandlerTask struct {
|
||||
task.Task
|
||||
d *Device
|
||||
msg *gb28181.Message
|
||||
}
|
||||
|
||||
func (c *catalogHandlerTask) Run() (err error) {
|
||||
// 处理目录信息
|
||||
d := c.d
|
||||
msg := c.msg
|
||||
catalogReq, exists := d.catalogReqs.Get(msg.SN)
|
||||
d.Debug("into catalog", "msg.SN", msg.SN, "exists", exists)
|
||||
if !exists {
|
||||
// 创建新的目录请求
|
||||
catalogReq = &CatalogRequest{
|
||||
SN: msg.SN,
|
||||
SumNum: msg.SumNum,
|
||||
TotalCount: 0,
|
||||
FirstResponse: true,
|
||||
Promise: util.NewPromise(context.Background()),
|
||||
}
|
||||
d.catalogReqs.Set(catalogReq)
|
||||
d.Debug("into catalog", "msg.SN", msg.SN, "d.catalogReqs", d.catalogReqs.Length)
|
||||
}
|
||||
|
||||
// 添加响应并获取是否是第一个响应
|
||||
isFirst := catalogReq.AddResponse()
|
||||
|
||||
// 更新设备信息到数据库
|
||||
// 如果是第一个响应,将所有通道状态标记为OFF
|
||||
if isFirst {
|
||||
d.Debug("将所有通道状态标记为OFF", "deviceId", d.DeviceId)
|
||||
// 标记所有通道为OFF状态
|
||||
d.channels.Range(func(channel *Channel) bool {
|
||||
if channel.DeviceChannel != nil {
|
||||
channel.DeviceChannel.Status = gb28181.ChannelOffStatus
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// 更新通道信息
|
||||
for _, c := range msg.DeviceList.DeviceChannelList {
|
||||
// 设置关联的设备数据库ID
|
||||
c.ChannelId = c.DeviceId
|
||||
c.DeviceId = d.DeviceId
|
||||
c.ID = d.DeviceId + "_" + c.ChannelId
|
||||
if c.CustomChannelId == "" {
|
||||
c.CustomChannelId = c.ChannelId
|
||||
}
|
||||
d.Debug("msg.DeviceList.DeviceChannelList range", "c.ChannelId", c.ChannelId, "c.Status", c.Status)
|
||||
// 使用 Save 进行 upsert 操作
|
||||
d.addOrUpdateChannel(c)
|
||||
catalogReq.TotalCount++
|
||||
}
|
||||
|
||||
// 更新当前设备的通道数
|
||||
d.ChannelCount = msg.SumNum
|
||||
d.UpdateTime = time.Now()
|
||||
d.Debug("save channel", "deviceid", d.DeviceId, " d.channels.Length", d.channels.Length, "d.ChannelCount", d.ChannelCount, "d.UpdateTime", d.UpdateTime)
|
||||
|
||||
// 在所有通道都添加完成后,检查是否完成接收
|
||||
if catalogReq.IsComplete() {
|
||||
d.Debug("IsComplete")
|
||||
catalogReq.Resolve()
|
||||
d.catalogReqs.RemoveByKey(msg.SN)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) {
|
||||
d.plugin.Trace("into onMessage,deviceid is ", d.DeviceId)
|
||||
d.plugin.Debug("into onMessage", "deviceid is ", d.DeviceId, "msg is", msg)
|
||||
source := req.Source()
|
||||
hostname, portStr, _ := net.SplitHostPort(source)
|
||||
port, _ := strconv.Atoi(portStr)
|
||||
@@ -216,68 +294,11 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
|
||||
}
|
||||
}
|
||||
case "Catalog":
|
||||
// 处理目录信息
|
||||
catalogReq, exists := d.catalogReqs.Get(msg.SN)
|
||||
if !exists {
|
||||
// 创建新的目录请求
|
||||
catalogReq = &CatalogRequest{
|
||||
SN: msg.SN,
|
||||
SumNum: msg.SumNum,
|
||||
FirstResponse: true,
|
||||
Promise: util.NewPromise(context.Background()),
|
||||
}
|
||||
d.catalogReqs.Set(catalogReq)
|
||||
}
|
||||
|
||||
// 添加响应并获取是否是第一个响应
|
||||
isFirst := catalogReq.AddResponse()
|
||||
|
||||
// 更新设备信息到数据库
|
||||
// 如果是第一个响应,将所有通道状态标记为OFF
|
||||
if isFirst {
|
||||
d.Trace("将所有通道状态标记为OFF", "deviceId", d.DeviceId)
|
||||
// 标记所有通道为OFF状态
|
||||
d.channels.Range(func(channel *Channel) bool {
|
||||
if channel.DeviceChannel != nil {
|
||||
channel.DeviceChannel.Status = gb28181.ChannelOffStatus
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// 更新通道信息
|
||||
for _, c := range msg.DeviceChannelList {
|
||||
// 设置关联的设备数据库ID
|
||||
c.ChannelId = c.DeviceId
|
||||
c.DeviceId = d.DeviceId
|
||||
c.ID = d.DeviceId + "_" + c.ChannelId
|
||||
if c.CustomChannelId == "" {
|
||||
c.CustomChannelId = c.ChannelId
|
||||
}
|
||||
// 使用 Save 进行 upsert 操作
|
||||
d.addOrUpdateChannel(c)
|
||||
}
|
||||
|
||||
// 更新当前设备的通道数
|
||||
d.ChannelCount = msg.SumNum
|
||||
d.UpdateTime = time.Now()
|
||||
d.Debug("save channel", "deviceid", d.DeviceId, " d.channels.Length", d.channels.Length, "d.ChannelCount", d.ChannelCount, "d.UpdateTime", d.UpdateTime)
|
||||
|
||||
// 删除所有状态为OFF的通道
|
||||
// d.channels.Range(func(channel *Channel) bool {
|
||||
// if channel.DeviceChannel != nil && channel.DeviceChannel.Status == gb28181.ChannelOffStatus {
|
||||
// d.Debug("删除不存在的通道", "channelId", channel.ID)
|
||||
// d.channels.RemoveByKey(channel.ID)
|
||||
// d.plugin.channels.RemoveByKey(channel.ID)
|
||||
// }
|
||||
// return true
|
||||
// })
|
||||
|
||||
// 在所有通道都添加完成后,检查是否完成接收
|
||||
if catalogReq.IsComplete(d.channels.Length) {
|
||||
catalogReq.Resolve()
|
||||
d.catalogReqs.RemoveByKey(msg.SN)
|
||||
catalogHandler := &catalogHandlerTask{
|
||||
d: d,
|
||||
msg: msg,
|
||||
}
|
||||
catalogHandlerQueueTask.AddTask(catalogHandler)
|
||||
case "RecordInfo":
|
||||
if channel, ok := d.channels.Get(d.DeviceId + "_" + msg.DeviceID); ok {
|
||||
if req, ok := channel.RecordReqs.Get(msg.SN); ok {
|
||||
@@ -614,6 +635,10 @@ func (d *Device) addOrUpdateChannel(c gb28181.DeviceChannel) {
|
||||
}
|
||||
// 更新通道信息
|
||||
channel.DeviceChannel = &c
|
||||
d.channels.Range(func(channel *Channel) bool {
|
||||
d.Debug("range d.channels", "channel.ChannelId", channel.ChannelId, "channel.status", channel.Status)
|
||||
return true
|
||||
})
|
||||
} else {
|
||||
// 创建新通道
|
||||
channel = &Channel{
|
||||
@@ -840,12 +865,12 @@ func (d *Device) onNotify(req *sip.Request, tx sip.ServerTransaction, msg *gb281
|
||||
|
||||
// handleCatalog 处理设备目录更新
|
||||
func (d *Device) handleCatalog(msg *gb28181.Message) error {
|
||||
if msg.DeviceChannelList == nil || len(msg.DeviceChannelList) == 0 {
|
||||
if msg.DeviceList.DeviceChannelList == nil || len(msg.DeviceList.DeviceChannelList) == 0 {
|
||||
return fmt.Errorf("no device items in catalog")
|
||||
}
|
||||
|
||||
// 遍历并更新设备列表
|
||||
for _, item := range msg.DeviceChannelList {
|
||||
for _, item := range msg.DeviceList.DeviceChannelList {
|
||||
channel := &gb28181.DeviceChannel{
|
||||
DeviceId: item.DeviceId,
|
||||
Name: item.Name,
|
||||
|
||||
@@ -104,17 +104,33 @@ func (d *Dialog) Start() (err error) {
|
||||
}
|
||||
|
||||
//defer d.gb.dialogs.Remove(d)
|
||||
if d.gb.tcpPort > 0 {
|
||||
d.MediaPort = d.gb.tcpPort
|
||||
} else {
|
||||
if d.gb.MediaPort.Valid() {
|
||||
select {
|
||||
case d.MediaPort = <-d.gb.tcpPorts:
|
||||
default:
|
||||
return fmt.Errorf("no available tcp port")
|
||||
}
|
||||
if d.StreamMode == "TCP-PASSIVE" {
|
||||
if d.gb.tcpPort > 0 {
|
||||
d.MediaPort = d.gb.tcpPort
|
||||
} else {
|
||||
d.MediaPort = d.gb.MediaPort[0]
|
||||
if d.gb.MediaPort.Valid() {
|
||||
select {
|
||||
case d.MediaPort = <-d.gb.tcpPorts:
|
||||
default:
|
||||
return fmt.Errorf("no available tcp port")
|
||||
}
|
||||
} else {
|
||||
d.MediaPort = d.gb.MediaPort[0]
|
||||
}
|
||||
}
|
||||
} else if d.StreamMode == "UDP" {
|
||||
if d.gb.udpPort > 0 {
|
||||
d.MediaPort = d.gb.udpPort
|
||||
} else {
|
||||
if d.gb.MediaPort.Valid() {
|
||||
select {
|
||||
case d.MediaPort = <-d.gb.udpPorts:
|
||||
default:
|
||||
return fmt.Errorf("no available udp port")
|
||||
}
|
||||
} else {
|
||||
d.MediaPort = d.gb.MediaPort[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,7 +195,9 @@ func (d *Dialog) Start() (err error) {
|
||||
"a=connection:new",
|
||||
)
|
||||
case "UDP":
|
||||
/* 支持udp收流 yjx
|
||||
return errors.New("do not support udp mode")
|
||||
*/
|
||||
default:
|
||||
sdpInfo = append(sdpInfo,
|
||||
"a=setup:passive",
|
||||
@@ -310,9 +328,9 @@ func (d *Dialog) Run() (err error) {
|
||||
pub := gb28181.NewPSPublisher(d.pullCtx.Publisher)
|
||||
if d.StreamMode == "TCP-ACTIVE" {
|
||||
pub.Receiver.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
|
||||
} else {
|
||||
} else if d.StreamMode == "TCP-PASSIVE" {
|
||||
if d.gb.tcpPort > 0 {
|
||||
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
|
||||
d.Info("into single port mode, use gb.tcpPort", d.gb.tcpPort)
|
||||
if d.gb.netListener != nil {
|
||||
d.Info("use gb.netListener", d.gb.netListener.Addr())
|
||||
pub.Receiver.Listener = d.gb.netListener
|
||||
@@ -324,14 +342,44 @@ func (d *Dialog) Run() (err error) {
|
||||
pub.Receiver.SSRC = d.SSRC
|
||||
}
|
||||
pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
|
||||
} else if d.StreamMode == "UDP" {
|
||||
pub.Receiver.IsSinglePort = false
|
||||
if d.gb.udpPort > 0 {
|
||||
d.Info("into single port mode, use gb.udpPort", d.gb.udpPort)
|
||||
if d.gb.netUDPListener != nil {
|
||||
d.Info("use gb.netUDPListener", d.gb.netUDPListener.LocalAddr())
|
||||
pub.Receiver.ListenerUdp = d.gb.netUDPListener
|
||||
} else {
|
||||
d.Info("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort))
|
||||
pub.Receiver.ListenerUdp, err = util.ListenUDP(fmt.Sprintf(":%d", d.gb.udpPort), 1024*1024*4)
|
||||
if err != nil {
|
||||
d.Error("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort), "err", err)
|
||||
return errors.New("start udp listen, err" + err.Error())
|
||||
}
|
||||
|
||||
d.gb.netUDPListener = pub.Receiver.ListenerUdp
|
||||
}
|
||||
|
||||
pub.Receiver.IsSinglePort = true
|
||||
|
||||
}
|
||||
pub.Receiver.SSRC = d.SSRC
|
||||
pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
|
||||
pub.Receiver.UdpCacheSize = 10
|
||||
}
|
||||
|
||||
pub.Receiver.StreamMode = d.StreamMode
|
||||
d.AddTask(&pub.Receiver)
|
||||
startResult := pub.Receiver.WaitStarted()
|
||||
if startResult != nil {
|
||||
return fmt.Errorf("pub.Receiver.WaitStarted %s", startResult)
|
||||
}
|
||||
|
||||
d.gb.udpPubs.Set(pub)
|
||||
|
||||
pub.Demux()
|
||||
|
||||
d.gb.udpPubs.Remove(pub)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -340,9 +388,17 @@ func (d *Dialog) GetKey() string {
|
||||
}
|
||||
|
||||
func (d *Dialog) Dispose() {
|
||||
if d.gb.tcpPort == 0 {
|
||||
// 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用
|
||||
d.gb.tcpPorts <- d.MediaPort
|
||||
|
||||
if d.StreamMode == "UDP" {
|
||||
if d.gb.udpPort == 0 {
|
||||
// 如果没有设置udp端口,则将MediaPort设置为0,表示不再使用
|
||||
d.gb.udpPorts <- d.MediaPort
|
||||
}
|
||||
} else {
|
||||
if d.gb.tcpPort == 0 {
|
||||
// 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用
|
||||
d.gb.tcpPorts <- d.MediaPort
|
||||
}
|
||||
}
|
||||
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId)
|
||||
if d.session != nil {
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
|
||||
"github.com/emiago/sipgo"
|
||||
"github.com/emiago/sipgo/sip"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/rs/zerolog"
|
||||
m7s "m7s.live/v5"
|
||||
"m7s.live/v5/pkg/config"
|
||||
@@ -64,6 +65,10 @@ type GB28181Plugin struct {
|
||||
Platforms []*gb28181.PlatformModel
|
||||
channels util.Collection[string, *Channel]
|
||||
netListener net.Listener
|
||||
udpPorts chan uint16
|
||||
udpPort uint16
|
||||
netUDPListener *net.UDPConn
|
||||
udpPubs task.Manager[uint32, *gb28181.PSPublisher]
|
||||
}
|
||||
|
||||
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
|
||||
@@ -155,6 +160,7 @@ func (gb *GB28181Plugin) OnInit() (err error) {
|
||||
gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent
|
||||
// Creating client handle for ua
|
||||
if len(gb.Sip.ListenAddr) > 0 {
|
||||
gb.AddTask(&catalogHandlerQueueTask)
|
||||
gb.AddTask(&gb.devices)
|
||||
gb.AddTask(&gb.platforms)
|
||||
gb.AddTask(&gb.dialogs)
|
||||
@@ -176,15 +182,42 @@ func (gb *GB28181Plugin) OnInit() (err error) {
|
||||
if gb.MediaPort.Valid() {
|
||||
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
|
||||
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
|
||||
gb.udpPorts = make(chan uint16, gb.MediaPort.Size())
|
||||
if gb.MediaPort.Size() == 0 {
|
||||
gb.tcpPort = gb.MediaPort[0]
|
||||
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
|
||||
|
||||
//support udp
|
||||
{
|
||||
gb.udpPort = gb.MediaPort[0]
|
||||
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
|
||||
|
||||
if err != nil {
|
||||
gb.Error("start listen", "err", err)
|
||||
return errors.New("start udp listen, err" + err.Error())
|
||||
}
|
||||
go gb.ReadUdpInsinglePort()
|
||||
}
|
||||
} else if gb.MediaPort.Size() == 1 {
|
||||
gb.tcpPort = gb.MediaPort[0] + 1
|
||||
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
|
||||
|
||||
//support udp
|
||||
{
|
||||
gb.udpPort = gb.MediaPort[0] + 1
|
||||
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
|
||||
|
||||
if err != nil {
|
||||
gb.Error("start listen", "err", err)
|
||||
return errors.New("start udp listen, err" + err.Error())
|
||||
}
|
||||
|
||||
go gb.ReadUdpInsinglePort()
|
||||
}
|
||||
} else {
|
||||
for i := range gb.MediaPort.Size() {
|
||||
gb.tcpPorts <- gb.MediaPort[0] + i
|
||||
gb.udpPorts <- gb.MediaPort[0] + i
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -282,13 +315,17 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
|
||||
// 检查设备是否过期
|
||||
expireTime := device.RegisterTime.Add(time.Duration(device.Expires) * time.Second)
|
||||
isExpired := now.After(expireTime)
|
||||
|
||||
// 设置设备基本属性
|
||||
device.Status = DeviceOfflineStatus
|
||||
if !isExpired {
|
||||
device.Status = DeviceOnlineStatus
|
||||
if device.CustomName == "" {
|
||||
device.CustomName = device.Name
|
||||
}
|
||||
if device.Online || device.Status == DeviceOnlineStatus {
|
||||
// 设置设备基本属性
|
||||
device.Status = DeviceOfflineStatus
|
||||
if !isExpired {
|
||||
device.Status = DeviceOnlineStatus
|
||||
}
|
||||
device.Online = !isExpired
|
||||
}
|
||||
device.Online = !isExpired
|
||||
|
||||
// 设置事件通道
|
||||
device.eventChan = make(chan any, 10)
|
||||
@@ -389,6 +426,12 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
|
||||
|
||||
// 初始化设备通道并更新到数据库
|
||||
for _, channel := range channels {
|
||||
if channel.CustomName == "" {
|
||||
channel.CustomName = channel.Name
|
||||
}
|
||||
if channel.CustomChannelId == "" {
|
||||
channel.CustomChannelId = channel.ChannelId
|
||||
}
|
||||
if isExpired {
|
||||
channel.Status = "OFF"
|
||||
} else {
|
||||
@@ -403,7 +446,7 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
|
||||
|
||||
// 添加设备任务
|
||||
gb.devices.Add(device)
|
||||
gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime)
|
||||
gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime, "isExpired", isExpired, "device.Name", device.Name)
|
||||
|
||||
}
|
||||
return nil
|
||||
@@ -424,13 +467,10 @@ func (gb *GB28181Plugin) checkPlatform() {
|
||||
gb.Error("查询平台失败", "error", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
gb.Info("找到启用状态的平台", "count", len(platformModels))
|
||||
|
||||
if gb.Platforms != nil && len(gb.Platforms) > 0 {
|
||||
platformModels = append(platformModels, gb.Platforms...)
|
||||
}
|
||||
|
||||
gb.Info("找到启用状态的平台", "count", len(platformModels))
|
||||
// 遍历所有平台进行初始化和注册
|
||||
for _, platformModel := range platformModels {
|
||||
if platformModel.Enable {
|
||||
@@ -1041,3 +1081,24 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (gb *GB28181Plugin) ReadUdpInsinglePort() (err error) {
|
||||
buffer := make(util.Buffer, 1024*1024)
|
||||
var rtpPacket rtp.Packet
|
||||
for {
|
||||
n, _, err := gb.netUDPListener.ReadFromUDP(buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ps := buffer[:n]
|
||||
if err := rtpPacket.Unmarshal(ps); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
pub, ret := gb.udpPubs.Get(rtpPacket.SSRC)
|
||||
if ret {
|
||||
pub.Receiver.ReadUdpRTP(buffer[:n])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,19 +141,22 @@ func BuildKeepAliveXML(sn int, id string) []byte {
|
||||
|
||||
type (
|
||||
Message struct {
|
||||
XMLName xml.Name
|
||||
CmdType string
|
||||
SN int // 请求序列号,一般用于对应 request 和 response
|
||||
DeviceID string
|
||||
Longitude string // 经度
|
||||
Latitude string // 纬度
|
||||
DeviceName string
|
||||
Manufacturer string
|
||||
Model string
|
||||
Channel string
|
||||
Firmware string
|
||||
DeviceChannelList []DeviceChannel `xml:"DeviceList>Item"`
|
||||
RecordList struct {
|
||||
XMLName xml.Name
|
||||
CmdType string
|
||||
SN int // 请求序列号,一般用于对应 request 和 response
|
||||
DeviceID string
|
||||
Longitude string // 经度
|
||||
Latitude string // 纬度
|
||||
DeviceName string
|
||||
Manufacturer string
|
||||
Model string
|
||||
Channel string
|
||||
Firmware string
|
||||
DeviceList struct {
|
||||
DeviceChannelList []DeviceChannel `xml:"Item"`
|
||||
DeviceNum int `xml:"Num,attr"` // 将 Num 属性映射到 DeviceNum
|
||||
} `xml:"DeviceList"`
|
||||
RecordList struct {
|
||||
Num int `xml:"Num,attr"`
|
||||
Item []RecordItem `xml:"Item"`
|
||||
} `xml:"RecordList"`
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"m7s.live/v5/pkg"
|
||||
"m7s.live/v5/pkg/task"
|
||||
"m7s.live/v5/pkg/util"
|
||||
"m7s.live/v5/plugin/gb28181/udputils"
|
||||
rtp2 "m7s.live/v5/plugin/rtp/pkg"
|
||||
)
|
||||
|
||||
@@ -36,17 +37,24 @@ var ErrRTPReceiveLost = errors.New("rtp receive lost")
|
||||
type Receiver struct {
|
||||
task.Task
|
||||
rtp.Packet
|
||||
FeedChan chan []byte
|
||||
psm util.Memory
|
||||
dump *os.File
|
||||
dumpLen []byte
|
||||
psVideo PSVideo
|
||||
psAudio PSAudio
|
||||
RTPReader *rtp2.TCP
|
||||
ListenAddr string
|
||||
Listener net.Listener
|
||||
StreamMode string // 数据流传输模式(UDP:udp传输/TCP-ACTIVE:tcp主动模式/TCP-PASSIVE:tcp被动模式)
|
||||
SSRC uint32 // RTP SSRC
|
||||
FeedChan chan []byte
|
||||
psm util.Memory
|
||||
dump *os.File
|
||||
dumpLen []byte
|
||||
psVideo PSVideo
|
||||
psAudio PSAudio
|
||||
RTPReader *rtp2.TCP
|
||||
ListenAddr string
|
||||
Listener net.Listener
|
||||
StreamMode string // 数据流传输模式(UDP:udp传输/TCP-ACTIVE:tcp主动模式/TCP-PASSIVE:tcp被动模式)
|
||||
SSRC uint32 // RTP SSRC
|
||||
ListenerUdp *net.UDPConn
|
||||
RTPReaderUdp *rtp2.UDP
|
||||
IsSinglePort bool
|
||||
SingleStop chan struct{}
|
||||
udpCache *udputils.PriorityQueueRtp
|
||||
UdpCacheSize int
|
||||
lastSeq uint16
|
||||
}
|
||||
|
||||
func NewPSPublisher(puber *m7s.Publisher) *PSPublisher {
|
||||
@@ -142,6 +150,10 @@ func (dec *PSPublisher) decProgramStreamMap() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PSPublisher) GetKey() uint32 {
|
||||
return p.Receiver.SSRC
|
||||
}
|
||||
|
||||
func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
|
||||
lastSeq := p.SequenceNumber
|
||||
if err = p.Unmarshal(rtp); err != nil {
|
||||
@@ -172,8 +184,63 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
|
||||
return task.ErrTaskComplete
|
||||
}
|
||||
return
|
||||
} else {
|
||||
p.Error("rtp seq mismatch,", "lastSeq", lastSeq, "seq", p.SequenceNumber)
|
||||
return ErrRTPReceiveLost
|
||||
}
|
||||
return ErrRTPReceiveLost
|
||||
|
||||
}
|
||||
|
||||
func (p *Receiver) ReadUdpRTP(rtp util.Buffer) (err error) {
|
||||
//解析rtp
|
||||
if err = p.Unmarshal(rtp); err != nil {
|
||||
p.Error("unmarshal error", "err", err)
|
||||
return nil
|
||||
}
|
||||
//判断ssrc
|
||||
if p.SSRC != 0 && p.SSRC != p.Packet.SSRC {
|
||||
p.Info("ReadUdpRTP, ssrc mismatch", "expected", p.SSRC, "actual", p.Packet.SSRC)
|
||||
if p.TraceEnabled() {
|
||||
p.Trace("rtp ssrc mismatch, skip", "expected", p.SSRC, "actual", p.Packet.SSRC)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if p.UdpCacheSize > 0 && p.udpCache == nil {
|
||||
p.udpCache = udputils.NewPqRtp()
|
||||
}
|
||||
//p.Info("ReadUdpRTP, seq", "rtpSeq", p.SequenceNumber)
|
||||
//加入缓存,自动排序
|
||||
rtpTmpCache := p.Packet
|
||||
rtpTmpCache.Payload = make([]byte, len(p.Payload))
|
||||
copy(rtpTmpCache.Payload, p.Payload)
|
||||
p.udpCache.Push(rtpTmpCache)
|
||||
|
||||
rtpTmp := p.Packet
|
||||
if p.udpCache.Len() < p.UdpCacheSize-1 {
|
||||
return nil
|
||||
} else {
|
||||
rtpTmp, _ = p.udpCache.Pop()
|
||||
}
|
||||
|
||||
//p.Info("ReadUdpRTP, seq", "rtpTmpSeq", rtpTmp.SequenceNumber)
|
||||
|
||||
p.lastSeq = rtpTmp.SequenceNumber
|
||||
|
||||
if p.TraceEnabled() {
|
||||
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.Packet.SSRC)
|
||||
}
|
||||
|
||||
copyData := make([]byte, len(rtpTmp.Payload))
|
||||
copy(copyData, rtpTmp.Payload)
|
||||
select {
|
||||
case p.FeedChan <- copyData:
|
||||
// 成功发送数据
|
||||
case <-p.Done():
|
||||
// 任务已停止,返回错误
|
||||
return task.ErrTaskComplete
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Receiver) Start() (err error) {
|
||||
@@ -181,17 +248,28 @@ func (p *Receiver) Start() (err error) {
|
||||
// TCP主动模式不需要监听,直接返回
|
||||
p.Info("TCP-ACTIVE mode, no need to listen")
|
||||
return nil
|
||||
}
|
||||
// TCP被动模式
|
||||
if p.Listener == nil {
|
||||
p.Info("start new listener", "addr", p.ListenAddr)
|
||||
p.Listener, err = net.Listen("tcp4", p.ListenAddr)
|
||||
if err != nil {
|
||||
p.Error("start listen", "err", err)
|
||||
return errors.New("start listen,err" + err.Error())
|
||||
} else if strings.ToUpper(p.StreamMode) == "TCP-PASSIVE" {
|
||||
// TCP被动模式
|
||||
if p.Listener == nil {
|
||||
p.Info("start new listener", "addr", p.ListenAddr)
|
||||
p.Listener, err = net.Listen("tcp4", p.ListenAddr)
|
||||
if err != nil {
|
||||
p.Error("start listen", "err", err)
|
||||
return errors.New("start listen,err" + err.Error())
|
||||
}
|
||||
}
|
||||
p.Info("start listen", "addr", p.ListenAddr)
|
||||
} else {
|
||||
if p.ListenerUdp == nil {
|
||||
p.Info("start new listener", "addr", p.ListenAddr)
|
||||
|
||||
p.ListenerUdp, err = util.ListenUDP(p.ListenAddr, 1024*1024*10)
|
||||
if err != nil {
|
||||
p.Error("start listen", "err", err)
|
||||
return errors.New("start listen,err" + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
p.Info("start listen", "addr", p.ListenAddr)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -205,6 +283,13 @@ func (p *Receiver) Dispose() {
|
||||
if p.RTPReader != nil {
|
||||
p.RTPReader.Close()
|
||||
}
|
||||
if p.ListenerUdp != nil && !p.IsSinglePort {
|
||||
p.ListenerUdp.Close()
|
||||
}
|
||||
if p.IsSinglePort {
|
||||
close(p.SingleStop)
|
||||
}
|
||||
|
||||
if p.FeedChan != nil {
|
||||
close(p.FeedChan)
|
||||
}
|
||||
@@ -230,15 +315,27 @@ func (p *Receiver) Go() error {
|
||||
p.RTPReader = (*rtp2.TCP)(conn.(*net.TCPConn))
|
||||
p.Info("connected to device", "addr", conn.RemoteAddr())
|
||||
return p.RTPReader.Read(p.ReadRTP)
|
||||
} else if strings.ToUpper(p.StreamMode) == "TCP-PASSIVE" { // TCP被动模式
|
||||
p.Info("start accept")
|
||||
conn, err := p.Listener.Accept()
|
||||
if err != nil {
|
||||
p.Error("accept", "err", err)
|
||||
return err
|
||||
}
|
||||
p.RTPReader = (*rtp2.TCP)(conn.(*net.TCPConn))
|
||||
p.Info("accept", "addr", conn.RemoteAddr())
|
||||
return p.RTPReader.Read(p.ReadRTP)
|
||||
} else { //UDP模式
|
||||
if p.IsSinglePort {
|
||||
p.SingleStop = make(chan struct{})
|
||||
<-p.SingleStop
|
||||
p.Info("stop udp accept", "ssrc", p.SSRC)
|
||||
return nil
|
||||
|
||||
} else {
|
||||
p.Info("start udp accept")
|
||||
p.RTPReaderUdp = (*rtp2.UDP)(p.ListenerUdp)
|
||||
return p.RTPReaderUdp.Read(p.ReadUdpRTP)
|
||||
}
|
||||
}
|
||||
// TCP被动模式
|
||||
p.Info("start accept")
|
||||
conn, err := p.Listener.Accept()
|
||||
if err != nil {
|
||||
p.Error("accept", "err", err)
|
||||
return err
|
||||
}
|
||||
p.RTPReader = (*rtp2.TCP)(conn.(*net.TCPConn))
|
||||
p.Info("accept", "addr", conn.RemoteAddr())
|
||||
return p.RTPReader.Read(p.ReadRTP)
|
||||
}
|
||||
|
||||
@@ -97,7 +97,7 @@ func NewPlatform(pm *gb28181.PlatformModel, plugin *GB28181Plugin, unRegister bo
|
||||
p.DialogClient = sipgo.NewDialogClientCache(p.Client, *p.ContactHDR)
|
||||
|
||||
p.MaxForwardsHDR = sip.MaxForwardsHeader(70)
|
||||
p.plugin.platforms.Set(p)
|
||||
//p.plugin.platforms.Set(p)
|
||||
p.OnDispose(func() {
|
||||
if plugin.platforms.RemoveByKey(p.PlatformModel.ServerGBID) {
|
||||
//for c := range d.channels.Range {
|
||||
|
||||
@@ -217,7 +217,7 @@ func (task *registerHandlerTask) Run() (err error) {
|
||||
channel.Status = "OFF"
|
||||
return true
|
||||
})
|
||||
d.Stop(errors.New("unregister"))
|
||||
//d.Stop(errors.New("unregister"))
|
||||
}
|
||||
} else {
|
||||
if recover {
|
||||
@@ -264,7 +264,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
|
||||
if sourceIPParse.IsPrivate() { // 源IP是内网IP
|
||||
myWanIP = myLanIP // 使用内网IP作为外网IP
|
||||
}
|
||||
} else { // 目标地址是IP
|
||||
} else { // 目标地址是IP
|
||||
if sourceIPParse.IsPrivate() { // 源IP是内网IP
|
||||
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
|
||||
}
|
||||
@@ -372,7 +372,7 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
|
||||
if sourceIPParse.IsPrivate() { // 源IP是内网IP
|
||||
myWanIP = myLanIP // 使用内网IP作为外网IP
|
||||
}
|
||||
} else { // 目标地址是IP
|
||||
} else { // 目标地址是IP
|
||||
if sourceIPParse.IsPrivate() { // 源IP是内网IP
|
||||
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
|
||||
}
|
||||
@@ -387,7 +387,9 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
d.CreateTime = now
|
||||
if d.CreateTime.IsZero() {
|
||||
d.CreateTime = now
|
||||
}
|
||||
d.UpdateTime = now
|
||||
d.RegisterTime = now
|
||||
d.KeepaliveTime = now
|
||||
@@ -454,15 +456,15 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
|
||||
task.gb.devices.Add(d).WaitStarted()
|
||||
|
||||
if task.gb.DB != nil {
|
||||
var existing Device
|
||||
if err := task.gb.DB.First(&existing, Device{DeviceId: d.DeviceId}).Error; err == nil {
|
||||
d.ID = existing.ID // 保持原有的自增ID
|
||||
task.gb.DB.Save(d).Omit("create_time")
|
||||
task.gb.Info("StoreDevice", "type", "更新设备", "deviceId", d.DeviceId)
|
||||
} else {
|
||||
task.gb.DB.Save(d)
|
||||
task.gb.Info("StoreDevice", "type", "新增设备", "deviceId", d.DeviceId)
|
||||
}
|
||||
//var existing Device
|
||||
//if err := task.gb.DB.First(&existing, Device{DeviceId: d.DeviceId}).Error; err == nil {
|
||||
// d.ID = existing.ID // 保持原有的自增ID
|
||||
// task.gb.DB.Save(d).Omit("create_time")
|
||||
// task.gb.Info("StoreDevice", "type", "更新设备", "deviceId", d.DeviceId)
|
||||
//} else {
|
||||
task.gb.DB.Save(d)
|
||||
task.gb.Info("StoreDevice", "type", "新增设备", "deviceId", d.DeviceId)
|
||||
//}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
98
plugin/gb28181/udputils/rtp_sort.go
Normal file
98
plugin/gb28181/udputils/rtp_sort.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package udputils
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"errors"
|
||||
|
||||
"github.com/pion/rtp"
|
||||
)
|
||||
|
||||
const MaxRtpDiff = 65000 //相邻两个包之间的最大差值
|
||||
|
||||
type PriorityQueueRtp struct {
|
||||
itemHeap *packets
|
||||
current *rtp.Packet
|
||||
priorityMap map[uint16]bool
|
||||
lastPacket *rtp.Packet
|
||||
}
|
||||
|
||||
func NewPqRtp() *PriorityQueueRtp {
|
||||
return &PriorityQueueRtp{
|
||||
itemHeap: &packets{},
|
||||
priorityMap: make(map[uint16]bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PriorityQueueRtp) Len() int {
|
||||
return p.itemHeap.Len()
|
||||
}
|
||||
|
||||
func (p *PriorityQueueRtp) Push(v rtp.Packet) {
|
||||
if p.priorityMap[v.SequenceNumber] {
|
||||
return
|
||||
}
|
||||
newItem := &packet{
|
||||
value: v,
|
||||
priority: v.SequenceNumber,
|
||||
}
|
||||
heap.Push(p.itemHeap, newItem)
|
||||
}
|
||||
|
||||
func (p *PriorityQueueRtp) Pop() (rtp.Packet, error) {
|
||||
if len(*p.itemHeap) == 0 {
|
||||
return rtp.Packet{}, errors.New("empty queue")
|
||||
}
|
||||
|
||||
item := heap.Pop(p.itemHeap).(*packet)
|
||||
return item.value, nil
|
||||
}
|
||||
|
||||
func (p *PriorityQueueRtp) Empty() {
|
||||
old := *p.itemHeap
|
||||
*p.itemHeap = old[:0]
|
||||
}
|
||||
|
||||
type packets []*packet
|
||||
|
||||
type packet struct {
|
||||
value rtp.Packet
|
||||
priority uint16
|
||||
index int
|
||||
}
|
||||
|
||||
func (p *packets) Len() int {
|
||||
return len(*p)
|
||||
}
|
||||
|
||||
func (p *packets) Less(i, j int) bool {
|
||||
a, b := (*p)[i].priority, (*p)[j].priority
|
||||
if int(a)-int(b) > MaxRtpDiff || int(b)-int(a) > MaxRtpDiff {
|
||||
if a < b {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
return a < b
|
||||
}
|
||||
|
||||
func (p *packets) Swap(i, j int) {
|
||||
(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
|
||||
(*p)[i].index = i
|
||||
(*p)[j].index = j
|
||||
}
|
||||
|
||||
func (p *packets) Push(x interface{}) {
|
||||
it := x.(*packet)
|
||||
it.index = len(*p)
|
||||
*p = append(*p, it)
|
||||
}
|
||||
|
||||
func (p *packets) Pop() interface{} {
|
||||
old := *p
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
old[n-1] = nil // avoid memory leak
|
||||
item.index = -1 // for safety
|
||||
*p = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
25
plugin/rtp/pkg/udp.go
Normal file
25
plugin/rtp/pkg/udp.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package rtp
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"m7s.live/v5/pkg/util"
|
||||
)
|
||||
|
||||
type UDP net.UDPConn
|
||||
|
||||
func (t *UDP) Read(onRTP func(util.Buffer) error) (err error) {
|
||||
buffer := make(util.Buffer, 1024*1024)
|
||||
|
||||
for {
|
||||
n, _, err := (*net.UDPConn)(t).ReadFromUDP(buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = onRTP(buffer[:n])
|
||||
if err != nil {
|
||||
//return err
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,8 +9,10 @@ import (
|
||||
_ "image/jpeg"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -158,7 +160,7 @@ func (p *SnapPlugin) doSnap(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// 处理保存逻辑
|
||||
savePath := query.Get("savePath")
|
||||
now := time.Now()
|
||||
now := time.Now().UTC()
|
||||
if savePath != "" {
|
||||
os.Mkdir(savePath, 0755)
|
||||
filename := fmt.Sprintf("%s_%s.jpg", streamPath, now.Format("20060102150405.000"))
|
||||
@@ -255,7 +257,8 @@ func (p *SnapPlugin) querySnap(rw http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
targetTime := time.Unix(snapTimeUnix+1, 0)
|
||||
// 将时间戳转换为UTC时间,确保与数据库中存储的UTC时间一致
|
||||
targetTime := time.Unix(snapTimeUnix+1, 0).UTC()
|
||||
var record snap_pkg.SnapRecord
|
||||
|
||||
// 查询小于等于目标时间的最近一条记录
|
||||
@@ -363,17 +366,17 @@ func (p *SnapPlugin) batchSnap(rw http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// 检查截图时间点是否为空
|
||||
if len(snapTimes) == 0 {
|
||||
p.Warn("no valid snapshot times available",
|
||||
p.Warn("no valid snapshot times available",
|
||||
"streamPath", streamPath,
|
||||
"startTime", startTime.Format(time.RFC3339),
|
||||
"endTime", endTime.Format(time.RFC3339),
|
||||
"granularity", granularity)
|
||||
|
||||
|
||||
response := BatchSnapResponse{
|
||||
Success: false,
|
||||
Message: "No valid snapshot times available. Please check your time range and try again.",
|
||||
}
|
||||
|
||||
|
||||
rw.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(rw).Encode(response)
|
||||
return
|
||||
@@ -408,7 +411,7 @@ func (p *SnapPlugin) calculateSnapTimes(publisher *m7s.Publisher, startTime, end
|
||||
// 检查开始时间是否早于当前时间,如果是,则从当前时间开始
|
||||
now := time.Now()
|
||||
if startTime.Before(now) {
|
||||
p.Info("adjusting start time from past to current time",
|
||||
p.Info("adjusting start time from past to current time",
|
||||
"originalStartTime", startTime.Format(time.RFC3339),
|
||||
"adjustedStartTime", now.Format(time.RFC3339))
|
||||
startTime = now
|
||||
@@ -416,7 +419,7 @@ func (p *SnapPlugin) calculateSnapTimes(publisher *m7s.Publisher, startTime, end
|
||||
|
||||
// 检查结束时间是否晚于开始时间
|
||||
if endTime.Before(startTime) || endTime.Equal(startTime) {
|
||||
p.Warn("invalid time range: end time is not after start time",
|
||||
p.Warn("invalid time range: end time is not after start time",
|
||||
"startTime", startTime.Format(time.RFC3339),
|
||||
"endTime", endTime.Format(time.RFC3339))
|
||||
return nil
|
||||
@@ -455,17 +458,17 @@ func (p *SnapPlugin) calculateSnapTimes(publisher *m7s.Publisher, startTime, end
|
||||
if idrRing != nil {
|
||||
// 将时间戳转换为time.Time(从纳秒转为秒)
|
||||
keyframeTime := time.Unix(0, int64(idrRing.Value.Timestamp))
|
||||
|
||||
|
||||
// 检查是否在指定时间范围内
|
||||
if (keyframeTime.Equal(startTime) || keyframeTime.After(startTime)) &&
|
||||
(keyframeTime.Equal(endTime) || keyframeTime.Before(endTime)) {
|
||||
if (keyframeTime.Equal(startTime) || keyframeTime.After(startTime)) &&
|
||||
(keyframeTime.Equal(endTime) || keyframeTime.Before(endTime)) {
|
||||
snapTimes = append(snapTimes, keyframeTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
videoTrack.RUnlock()
|
||||
|
||||
|
||||
// 如果没有找到关键帧,但有GOP信息,则使用估算的GOP间隔生成时间点
|
||||
if len(snapTimes) == 0 && gopDuration > 0 {
|
||||
p.Info("no keyframes found in range, using estimated GOP interval")
|
||||
@@ -509,12 +512,12 @@ func (p *SnapPlugin) executeBatchSnapTask(publisher *m7s.Publisher, streamPath s
|
||||
now := time.Now()
|
||||
if firstSnapTime.After(now) {
|
||||
waitDuration := firstSnapTime.Sub(now)
|
||||
p.Info("batch snap task scheduled for future",
|
||||
"streamPath", streamPath,
|
||||
"totalSnapshots", len(snapTimes),
|
||||
p.Info("batch snap task scheduled for future",
|
||||
"streamPath", streamPath,
|
||||
"totalSnapshots", len(snapTimes),
|
||||
"startTime", firstSnapTime.Format(time.RFC3339),
|
||||
"waitDuration", waitDuration.String())
|
||||
|
||||
|
||||
// 等待到开始时间
|
||||
time.Sleep(waitDuration)
|
||||
}
|
||||
@@ -531,7 +534,7 @@ func (p *SnapPlugin) executeBatchSnapTask(publisher *m7s.Publisher, streamPath s
|
||||
for i, snapTime := range snapTimes {
|
||||
// 打印日志,记录当前截图时间和进度
|
||||
p.Debug("taking snapshot", "progress", fmt.Sprintf("%d/%d", i+1, len(snapTimes)), "time", snapTime.Format(time.RFC3339))
|
||||
|
||||
|
||||
// 当前实现不支持指定时间截图,所以这里只能截取当前帧
|
||||
// 注意:这里每次都会重新创建一个读取器,确保获取到最新的帧
|
||||
buf, err := p.snap(publisher, nil)
|
||||
@@ -540,7 +543,7 @@ func (p *SnapPlugin) executeBatchSnapTask(publisher *m7s.Publisher, streamPath s
|
||||
failCount++
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
// 如果是按间隔截图,每次截图后等待指定时间
|
||||
if granularity > 0 && i < len(snapTimes)-1 { // 不是最后一帧才需要等待
|
||||
// 等待granularity秒,确保下一次截图与当前截图有足够的时间差
|
||||
@@ -565,7 +568,7 @@ func (p *SnapPlugin) executeBatchSnapTask(publisher *m7s.Publisher, streamPath s
|
||||
record := snap_pkg.SnapRecord{
|
||||
StreamName: streamPath,
|
||||
SnapMode: 3, // 批量截图模式
|
||||
SnapTime: snapTime,
|
||||
SnapTime: snapTime.UTC(),
|
||||
SnapPath: filePath,
|
||||
}
|
||||
if err := p.DB.Create(&record).Error; err != nil {
|
||||
@@ -579,20 +582,392 @@ func (p *SnapPlugin) executeBatchSnapTask(publisher *m7s.Publisher, streamPath s
|
||||
// 记录任务完成时间和结果
|
||||
taskEndTime := time.Now()
|
||||
taskDuration := taskEndTime.Sub(taskStartTime)
|
||||
p.Info("batch snap task completed",
|
||||
"streamPath", streamPath,
|
||||
"total", len(snapTimes),
|
||||
"success", successCount,
|
||||
"failed", failCount,
|
||||
p.Info("batch snap task completed",
|
||||
"streamPath", streamPath,
|
||||
"total", len(snapTimes),
|
||||
"success", successCount,
|
||||
"failed", failCount,
|
||||
"duration", taskDuration.String())
|
||||
}
|
||||
|
||||
// batchPlayBack 处理从MP4录像文件中按时间范围和颗粒度进行截图的请求
|
||||
func (p *SnapPlugin) batchPlayBack(rw http.ResponseWriter, r *http.Request) {
|
||||
// 只接受GET请求
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(rw, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
// 检查数据库连接
|
||||
if p.DB == nil {
|
||||
responseWithError(rw, "数据库未初始化")
|
||||
return
|
||||
}
|
||||
|
||||
// 获取streamPath
|
||||
streamPath := r.PathValue("streamPath")
|
||||
if streamPath == "" {
|
||||
responseWithError(rw, "streamPath参数必须提供")
|
||||
return
|
||||
}
|
||||
|
||||
// 获取查询参数
|
||||
query := r.URL.Query()
|
||||
|
||||
// 解析时间范围
|
||||
startTime, endTime, err := util.TimeRangeQueryParse(query)
|
||||
if err != nil {
|
||||
responseWithError(rw, "无效的时间范围: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// 验证时间范围
|
||||
if endTime.Before(startTime) {
|
||||
responseWithError(rw, "结束时间必须晚于开始时间")
|
||||
return
|
||||
}
|
||||
|
||||
// 获取granularity参数
|
||||
granularity := 0
|
||||
granularityStr := query.Get("granularity")
|
||||
if granularityStr != "" {
|
||||
granularityVal, err := strconv.Atoi(granularityStr)
|
||||
if err != nil {
|
||||
responseWithError(rw, "无效的颗粒度格式: "+err.Error())
|
||||
return
|
||||
}
|
||||
if granularityVal < 0 {
|
||||
responseWithError(rw, "颗粒度必须为非负数")
|
||||
return
|
||||
}
|
||||
granularity = granularityVal
|
||||
}
|
||||
|
||||
// 创建保存目录
|
||||
savePath := filepath.Join("snap", "playback", streamPath)
|
||||
os.MkdirAll(savePath, 0755)
|
||||
savePath = strings.ReplaceAll(savePath, "/", "_")
|
||||
os.MkdirAll(savePath, 0755)
|
||||
|
||||
// 立即返回成功响应,表示任务已接收
|
||||
response := BatchSnapResponse{
|
||||
Success: true,
|
||||
Message: fmt.Sprintf("回放截图任务已开始。正在后台处理。时间范围: %s 到 %s (使用参数 start 和 end)", startTime.Format(time.RFC3339), endTime.Format(time.RFC3339)),
|
||||
}
|
||||
|
||||
rw.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(rw).Encode(response)
|
||||
|
||||
// 在后台异步执行截图任务
|
||||
go p.executePlayBackSnapTask(streamPath, startTime, endTime, savePath, granularity)
|
||||
}
|
||||
|
||||
// executePlayBackSnapTask 在后台执行从MP4录像文件中截图的任务
|
||||
func (p *SnapPlugin) executePlayBackSnapTask(streamPath string, startTime, endTime time.Time, savePath string, granularity int) {
|
||||
// 记录任务开始时间
|
||||
taskStartTime := time.Now()
|
||||
p.Info("playback snap task started", "streamPath", streamPath, "startTime", startTime, "endTime", endTime)
|
||||
|
||||
// 从数据库中查询指定时间范围内的MP4录像文件
|
||||
var streams []m7s.RecordStream
|
||||
queryRecord := m7s.RecordStream{
|
||||
Type: "mp4",
|
||||
}
|
||||
|
||||
// 查询条件:结束时间大于请求的开始时间,开始时间小于请求的结束时间,流路径匹配
|
||||
p.DB.Where(&queryRecord).Find(&streams, "end_time>? AND start_time<? AND stream_path=?", startTime, endTime, streamPath)
|
||||
|
||||
// 检查是否找到录像文件
|
||||
if len(streams) == 0 {
|
||||
p.Warn("no mp4 records found for playback snap", "streamPath", streamPath, "startTime", startTime, "endTime", endTime)
|
||||
return
|
||||
}
|
||||
|
||||
p.Info("found mp4 records for playback snap", "streamPath", streamPath, "count", len(streams))
|
||||
|
||||
// 按开始时间排序录像文件,确保时间连续性
|
||||
sort.Slice(streams, func(i, j int) bool {
|
||||
return streams[i].StartTime.Before(streams[j].StartTime)
|
||||
})
|
||||
|
||||
// 全局截图时间点列表
|
||||
var allSnapTimes []time.Time
|
||||
|
||||
// 如果颜粒度小于等于0,则对每个文件提取关键帧
|
||||
if granularity <= 0 {
|
||||
// 对每个文件分别提取关键帧
|
||||
for _, stream := range streams {
|
||||
// 检查文件是否存在
|
||||
if _, err := os.Stat(stream.FilePath); os.IsNotExist(err) {
|
||||
p.Warn("mp4 file not found", "path", stream.FilePath)
|
||||
continue
|
||||
}
|
||||
|
||||
// 计算此文件的有效时间范围(与请求时间范围的交集)
|
||||
fileStartTime := stream.StartTime
|
||||
if fileStartTime.Before(startTime) {
|
||||
fileStartTime = startTime
|
||||
}
|
||||
|
||||
fileEndTime := stream.EndTime
|
||||
if fileEndTime.After(endTime) {
|
||||
fileEndTime = endTime
|
||||
}
|
||||
|
||||
// 提取关键帧
|
||||
keyFrameTimes, err := p.extractKeyFrameTimes(stream.FilePath, fileStartTime, fileEndTime)
|
||||
if err != nil {
|
||||
p.Error("extract key frames failed", "error", err.Error())
|
||||
// 如果提取失败,使用默认的每2秒截图
|
||||
defaultGranularity := 2 * time.Second
|
||||
for t := fileStartTime; t.Before(fileEndTime); t = t.Add(defaultGranularity) {
|
||||
allSnapTimes = append(allSnapTimes, t)
|
||||
}
|
||||
} else {
|
||||
// 将关键帧时间点添加到全局列表
|
||||
allSnapTimes = append(allSnapTimes, keyFrameTimes...)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 当指定颜粒度时,基于整个时间范围生成均匀的截图时间点
|
||||
// 这样可以确保在不同文件之间保持一致的颜粒度
|
||||
for t := startTime; t.Before(endTime); t = t.Add(time.Duration(granularity) * time.Second) {
|
||||
allSnapTimes = append(allSnapTimes, t)
|
||||
}
|
||||
}
|
||||
|
||||
// 按时间排序并去重
|
||||
sort.Slice(allSnapTimes, func(i, j int) bool {
|
||||
return allSnapTimes[i].Before(allSnapTimes[j])
|
||||
})
|
||||
|
||||
// 去除重复的时间点(如果有)
|
||||
var uniqueSnapTimes []time.Time
|
||||
if len(allSnapTimes) > 0 {
|
||||
uniqueSnapTimes = append(uniqueSnapTimes, allSnapTimes[0])
|
||||
for i := 1; i < len(allSnapTimes); i++ {
|
||||
// 如果与前一个时间点不同,则添加
|
||||
if !allSnapTimes[i].Equal(allSnapTimes[i-1]) {
|
||||
uniqueSnapTimes = append(uniqueSnapTimes, allSnapTimes[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
p.Info("generated snapshot times", "count", len(uniqueSnapTimes))
|
||||
|
||||
// 处理每个截图时间点
|
||||
var successCount, failCount int
|
||||
for _, snapTime := range uniqueSnapTimes {
|
||||
// 找到包含该时间点的录像文件
|
||||
var targetStream *m7s.RecordStream
|
||||
for j := range streams {
|
||||
if (snapTime.Equal(streams[j].StartTime) || snapTime.After(streams[j].StartTime)) &&
|
||||
(snapTime.Equal(streams[j].EndTime) || snapTime.Before(streams[j].EndTime)) {
|
||||
targetStream = &streams[j]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// 如果找不到对应的文件,跳过该时间点
|
||||
if targetStream == nil {
|
||||
p.Warn("no mp4 file found for time point", "time", snapTime.Format(time.RFC3339))
|
||||
failCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查文件是否存在
|
||||
if _, err := os.Stat(targetStream.FilePath); os.IsNotExist(err) {
|
||||
p.Warn("mp4 file not found", "path", targetStream.FilePath)
|
||||
failCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// 计算在文件中的时间偏移(毫秒)
|
||||
// 使用文件的duration字段来计算时间偏移
|
||||
// 首先计算截图时间点在整个文件时间范围内的相对位置
|
||||
fileStartTime := targetStream.StartTime
|
||||
fileEndTime := targetStream.EndTime
|
||||
fileDuration := targetStream.Duration
|
||||
|
||||
// 如果数据库中的duration字段有效,则使用它来计算时间偏移
|
||||
var timeOffset int64
|
||||
if fileDuration > 0 {
|
||||
// 注意:duration字段存储的是毫秒值,如 69792 表示 69.792 秒
|
||||
// 计算截图时间点在整个文件时间范围内的相对位置(百分比)
|
||||
totalDuration := fileEndTime.Sub(fileStartTime).Milliseconds()
|
||||
if totalDuration > 0 {
|
||||
position := float64(snapTime.Sub(fileStartTime).Milliseconds()) / float64(totalDuration)
|
||||
// 根据百分比位置和实际duration计算出时间偏移
|
||||
// duration已经是毫秒值,直接使用
|
||||
timeOffset = int64(position * float64(fileDuration))
|
||||
p.Debug("using duration for time offset calculation", "position", position, "duration_ms", fileDuration, "timeOffset_ms", timeOffset)
|
||||
} else {
|
||||
// 如果计算出问题,回退到直接使用时间差
|
||||
timeOffset = snapTime.Sub(fileStartTime).Milliseconds()
|
||||
p.Debug("fallback to direct time difference", "timeOffset", timeOffset)
|
||||
}
|
||||
} else {
|
||||
// 如果duration无效,则使用时间差
|
||||
timeOffset = snapTime.Sub(fileStartTime).Milliseconds()
|
||||
p.Debug("invalid duration, using time difference", "timeOffset", timeOffset)
|
||||
}
|
||||
|
||||
// 使用FFmpeg从MP4文件中截取指定时间点的图片
|
||||
// 文件名包含截图时间点和颜粒度信息,避免不同颜粒度的截图相互覆盖
|
||||
var granularityInfo string
|
||||
if granularity <= 0 {
|
||||
granularityInfo = "keyframe"
|
||||
} else {
|
||||
granularityInfo = fmt.Sprintf("%ds", granularity)
|
||||
}
|
||||
|
||||
filename := fmt.Sprintf("%s_%s_%s.jpg",
|
||||
streamPath,
|
||||
snapTime.Format("20060102150405"),
|
||||
granularityInfo)
|
||||
filename = strings.ReplaceAll(filename, "/", "_")
|
||||
filePath := filepath.Join(savePath, filename)
|
||||
|
||||
// 调用截图函数
|
||||
err := p.snapFromMP4(targetStream.FilePath, filePath, timeOffset)
|
||||
if err != nil {
|
||||
p.Error("playback snap failed", "error", err.Error(), "time", snapTime.Format(time.RFC3339))
|
||||
failCount++
|
||||
continue
|
||||
}
|
||||
|
||||
// 保存截图记录到数据库
|
||||
if p.DB != nil {
|
||||
record := snap_pkg.SnapRecord{
|
||||
StreamName: streamPath,
|
||||
SnapMode: 4, // 回放截图模式
|
||||
SnapTime: snapTime,
|
||||
SnapPath: filePath,
|
||||
}
|
||||
if err := p.DB.Create(&record).Error; err != nil {
|
||||
p.Error("save playback snapshot record failed", "error", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
successCount++
|
||||
}
|
||||
|
||||
// 记录任务完成时间和结果
|
||||
taskEndTime := time.Now()
|
||||
taskDuration := taskEndTime.Sub(taskStartTime)
|
||||
p.Info("playback snap task completed",
|
||||
"streamPath", streamPath,
|
||||
"success", successCount,
|
||||
"failed", failCount,
|
||||
"duration", taskDuration.String())
|
||||
}
|
||||
|
||||
// snapFromMP4 从MP4文件中截取指定时间点的图片
|
||||
func (p *SnapPlugin) snapFromMP4(mp4FilePath, outputPath string, timeOffsetMs int64) error {
|
||||
// 将时间偏移转换为秒
|
||||
timeOffsetSec := float64(timeOffsetMs) / 1000.0
|
||||
|
||||
// 构建ffmpeg命令
|
||||
cmd := exec.Command(
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-ss", fmt.Sprintf("%f", timeOffsetSec), // 设置时间偏移
|
||||
"-i", mp4FilePath, // 输入文件
|
||||
"-vframes", "1", // 只截取一帧
|
||||
"-q:v", "2", // 设置图片质量
|
||||
"-y", // 覆盖输出文件
|
||||
outputPath, // 输出文件路径
|
||||
)
|
||||
|
||||
// 执行命令
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
p.Error("ffmpeg command failed", "error", err.Error(), "output", string(output))
|
||||
return fmt.Errorf("ffmpeg error: %s, output: %s", err.Error(), string(output))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractKeyFrameTimes 从MP4文件中提取关键帧时间点
|
||||
func (p *SnapPlugin) extractKeyFrameTimes(mp4FilePath string, startTime, endTime time.Time) ([]time.Time, error) {
|
||||
// 使用FFmpeg的-skip_frame nokey参数和-show_entries frame=pkt_pts_time参数提取关键帧时间
|
||||
cmd := exec.Command(
|
||||
"ffprobe",
|
||||
"-v", "quiet",
|
||||
"-select_streams", "v",
|
||||
"-skip_frame", "nokey", // 只处理关键帧
|
||||
"-show_entries", "frame=pkt_pts_time", // 显示帧的时间戳
|
||||
"-of", "csv=p=0", // 输出为CSV格式
|
||||
"-i", mp4FilePath,
|
||||
)
|
||||
|
||||
// 执行命令
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
p.Error("ffprobe command failed", "error", err.Error(), "output", string(output))
|
||||
return nil, fmt.Errorf("ffprobe error: %s", err.Error())
|
||||
}
|
||||
|
||||
// 解析输出结果,提取时间戳
|
||||
lines := strings.Split(string(output), "\n")
|
||||
|
||||
// 获取MP4文件的开始时间信息
|
||||
// 注意:ffprobe返回的时间戳是相对于文件开始的秒数
|
||||
// 我们需要将其转换为绝对时间
|
||||
fileStartTimeUnix := time.Time{}
|
||||
// 使用数据库中记录的文件开始时间
|
||||
// 查询数据库获取文件信息
|
||||
var fileInfo m7s.RecordStream
|
||||
if err := p.DB.Where("file_path = ?", mp4FilePath).First(&fileInfo).Error; err == nil {
|
||||
fileStartTimeUnix = fileInfo.StartTime
|
||||
} else {
|
||||
p.Warn("failed to get file start time from database, using request start time", "error", err.Error())
|
||||
fileStartTimeUnix = startTime
|
||||
}
|
||||
|
||||
p.Info("file start time", "time", fileStartTimeUnix.Format(time.RFC3339))
|
||||
|
||||
// 存储关键帧时间点
|
||||
var keyFrameTimes []time.Time
|
||||
|
||||
// 处理每一行输出
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// 将时间戳转换为浮点数(秒)
|
||||
timeOffsetSec, err := strconv.ParseFloat(line, 64)
|
||||
if err != nil {
|
||||
p.Warn("invalid time format in ffprobe output", "line", line)
|
||||
continue
|
||||
}
|
||||
|
||||
// 计算实际时间:文件开始时间 + 偏移秒数
|
||||
frameTime := fileStartTimeUnix.Add(time.Duration(timeOffsetSec * float64(time.Second)))
|
||||
|
||||
// 只保留在请求时间范围内的关键帧
|
||||
if (frameTime.Equal(startTime) || frameTime.After(startTime)) &&
|
||||
(frameTime.Equal(endTime) || frameTime.Before(endTime)) {
|
||||
keyFrameTimes = append(keyFrameTimes, frameTime)
|
||||
}
|
||||
}
|
||||
|
||||
// 如果没有找到关键帧,返回错误
|
||||
if len(keyFrameTimes) == 0 {
|
||||
return nil, fmt.Errorf("no key frames found in the specified time range")
|
||||
}
|
||||
|
||||
return keyFrameTimes, nil
|
||||
}
|
||||
|
||||
func (p *SnapPlugin) RegisterHandler() map[string]http.HandlerFunc {
|
||||
return map[string]http.HandlerFunc{
|
||||
"/{streamPath...}": p.doSnap,
|
||||
"/query/{streamPath...}": p.querySnap,
|
||||
"/batch/{streamPath...}": p.batchSnap,
|
||||
"/{streamPath...}": p.doSnap,
|
||||
"/query/{streamPath...}": p.querySnap,
|
||||
"/batch/{streamPath...}": p.batchSnap,
|
||||
"/batchplayback/{streamPath...}": p.batchPlayBack,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user