From 71c3206f09ef1bc297d125d5ea2fbd1afbf41af3 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Wed, 1 Oct 2025 15:17:23 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E5=88=B7=E6=96=B0=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 46 ++++++++++++++++++++++++++++++++++++-- dao/device.go | 49 +++++++++++++++++++++++++++++++++++++++++ main.go | 12 +++------- stack/device.go | 2 ++ stack/online_devices.go | 11 +++++++++ stack/schedule.go | 32 +++++++++++++++++++++++++++ stack/sip_handler.go | 14 +++++++++--- stats.go | 11 +++++++++ 8 files changed, 163 insertions(+), 14 deletions(-) create mode 100644 stack/schedule.go diff --git a/api.go b/api.go index 0e13d73..ecaa15d 100644 --- a/api.go +++ b/api.go @@ -157,6 +157,33 @@ type CustomChannel struct { CustomID string `json:"id"` } +type DeviceInfo struct { + DeviceID string `json:"serial"` + CustomName string `json:"custom_name"` + MediaTransport string `json:"media_transport"` + MediaTransportMode string `json:"media_transport_mode"` + StreamMode string `json:"stream_mode"` + SMSID string `json:"sms_id"` + SMSGroupID string `json:"sms_group_id"` + RecvStreamIP string `json:"recv_stream_ip"` + ContactIP string `json:"contact_ip"` + Charset string `json:"charset"` + CatalogInterval int `json:"catalog_interval"` + SubscribeInterval int `json:"subscribe_interval"` + CatalogSubscribe bool `json:"catalog_subscribe"` + AlarmSubscribe bool `json:"alarm_subscribe"` + PositionSubscribe bool `json:"position_subscribe"` + PTZSubscribe bool `json:"ptz_subscribe"` + RecordCenter bool `json:"record_center"` + RecordIndistinct bool `json:"record_indistinct"` + CivilCodeFirst bool `json:"civil_code_first"` + KeepOriginalTree bool `json:"keep_original_tree"` + Password string `json:"password"` + DropChannelType string `json:"drop_channel_type"` + Longitude float64 `json:"longitude"` + Latitude float64 `json:"latitude"` +} + var apiServer *ApiServer func init() { @@ -248,6 +275,7 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/cascade/removechannels", withVerify(apiServer.OnPlatformChannelUnbind)) // 级联解绑通道 apiServer.router.HandleFunc("/api/v1/cascade/setshareallchannel", withVerify(common.WithFormDataParams(apiServer.OnShareAllChannel, SetEnable{}))) // 开启或取消级联所有通道 apiServer.router.HandleFunc("/api/v1/cascade/pushcatalog", withVerify(common.WithFormDataParams(apiServer.OnCatalogPush, SetEnable{}))) // 推送目录 + apiServer.router.HandleFunc("/api/v1/device/setinfo", withVerify(common.WithFormDataParams(apiServer.OnDeviceInfoSet, DeviceInfo{}))) // 编辑设备信息 // 暂未开发 apiServer.router.HandleFunc("/api/v1/alarm/list", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 报警查询 @@ -748,9 +776,13 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, _ http.ResponseWriter, lastKeealiveTime = offlineTime } + if device.CatalogInterval < 1 { + device.CatalogInterval = dao.DefaultCatalogInterval + } + response.DeviceList_ = append(response.DeviceList_, LiveGBSDevice{ - AlarmSubscribe: false, // 报警订阅 - CatalogInterval: 3600, // 目录刷新时间 + AlarmSubscribe: false, // 报警订阅 + CatalogInterval: device.CatalogInterval, // 目录刷新时间 CatalogProgress: catalogProgress, CatalogSubscribe: false, // 目录订阅 ChannelCount: device.ChannelsTotal, @@ -1601,3 +1633,13 @@ func (api *ApiServer) OnPlaybackControl(params *StreamIDParams, _ http.ResponseW return "OK", nil } + +func (api *ApiServer) OnDeviceInfoSet(params *DeviceInfo, w http.ResponseWriter, req *http.Request) (interface{}, error) { + // 目前仅实现修改目录订阅间隔 + if params.CatalogInterval < 60 { + return nil, errors.New("catalog_interval error") + } else if err := dao.Device.UpdateCatalogInterval(params.DeviceID, params.CatalogInterval); err != nil { + return nil, err + } + return "OK", nil +} diff --git a/dao/device.go b/dao/device.go index f40e5fd..3008e2e 100644 --- a/dao/device.go +++ b/dao/device.go @@ -8,6 +8,10 @@ import ( "time" ) +const ( + DefaultCatalogInterval = 3600 // 默认目录刷新间隔,单位秒 +) + type DeviceModel struct { GBModel DeviceID string `json:"device_id" gorm:"index"` @@ -26,6 +30,10 @@ type DeviceModel struct { ChannelsTotal int `json:"total_channels"` // 通道总数 ChannelsOnline int `json:"online_channels"` // 通道在线数量 Setup common.SetupType + + CatalogInterval int // 录像目录刷新间隔,单位秒, 默认3600每小时刷新 + LastRefreshCatalog time.Time `gorm:"type:datetime"` // 最后刷新目录时间 + //ScheduleRecord [7]uint64 // 录像计划,0-6表示周一至周日,一天的时间刻度用一个uint64表示,从高位开始代表0点,每bit半小时,共占用48位, 1表示录像,0表示不录像 } func (d *DeviceModel) TableName() string { @@ -229,3 +237,44 @@ func (d *daoDevice) Count() (int, error) { db.Model(&DeviceModel{}).Count(&count) return int(count), nil } + +func (d *daoDevice) UpdateRefreshCatalogTime(deviceId string, now time.Time) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Model(&DeviceModel{}).Where("device_id =?", deviceId).Update("last_refresh_catalog", now.Format("2006-01-02 15:04:05")).Error + }) +} + +// QueryRefreshCatalogExpiredDevices 查询刷新目录到期的设备列表 +func (d *daoDevice) QueryRefreshCatalogExpiredDevices(now time.Time) ([]*DeviceModel, error) { + var devices []*DeviceModel + tx := db.Where( + "(datetime(last_refresh_catalog, '+'||IFNULL(catalog_interval, ?)||' seconds') < ? OR last_refresh_catalog IS NULL) AND status = ?", + DefaultCatalogInterval, + now, + common.ON, + ).Find(&devices) + if tx.Error != nil { + return nil, tx.Error + } + + return devices, nil +} + +// QueryNeedRefreshCatalog 查询设备是否需要刷新目录 +func (d *daoDevice) QueryNeedRefreshCatalog(deviceId string, now time.Time) bool { + var devices int64 + _ = db.Model(&DeviceModel{}).Where( + "device_id = ? AND (datetime(last_refresh_catalog, '+'||IFNULL(catalog_interval, ?)||' seconds') < ? OR last_refresh_catalog IS NULL)", + deviceId, + DefaultCatalogInterval, + now, + ).Count(&devices) + + return devices > 0 +} + +func (d *daoDevice) UpdateCatalogInterval(id string, interval int) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Model(&DeviceModel{}).Where("device_id =?", id).Update("catalog_interval", interval).Error + }) +} diff --git a/main.go b/main.go index e0d6ea2..677a47b 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,6 @@ import ( "gb-cms/log" "gb-cms/stack" "github.com/pretty66/websocketproxy" - "github.com/shirou/gopsutil/v3/host" "go.uber.org/zap" "go.uber.org/zap/zapcore" "net" @@ -24,7 +23,6 @@ var ( AdminMD5 string // 明文密码"admin"的MD5值 PwdMD5 string StartUpTime time.Time - KernelArch string ) func init() { @@ -53,13 +51,6 @@ func main() { indent, _ := json.MarshalIndent(common.Config, "", "\t") log.Sugar.Infof("server config:\r\n%s", indent) - info, err := host.Info() - if err != nil { - log.Sugar.Errorf(err.Error()) - } else { - KernelArch = info.KernelArch - } - if config.Hooks.OnInvite != "" { hook.RegisterEventUrl(hook.EventTypeDeviceOnInvite, config.Hooks.OnInvite) } @@ -134,6 +125,9 @@ func main() { log.Sugar.Infof("启动http server. addr: %s", httpAddr) go startApiServer(httpAddr) + // 启动目录刷新任务 + stack.AddScheduledTask(time.Minute, true, stack.RefreshCatalogScheduleTask) + err = http.ListenAndServe(":19000", nil) if err != nil { println(err) diff --git a/stack/device.go b/stack/device.go index b75fc4f..6a1aeb6 100644 --- a/stack/device.go +++ b/stack/device.go @@ -212,6 +212,8 @@ func (d *Device) QueryCatalog(timeoutSeconds int) ([]*dao.ChannelModel, error) { // 批量保存通道 result, err = d.SaveChannels(list) + // 更新查询目录的时间 + _ = dao.Device.UpdateRefreshCatalogTime(d.DeviceID, time.Now()) } if !UniqueTaskManager.Commit(GenerateCatalogTaskID(d.DeviceID), query, catalogProgress) { diff --git a/stack/online_devices.go b/stack/online_devices.go index 749bf95..110c578 100644 --- a/stack/online_devices.go +++ b/stack/online_devices.go @@ -22,6 +22,17 @@ func (m *onlineDeviceManager) Add(deviceId string, t time.Time) { m.devices[deviceId] = t } +func (m *onlineDeviceManager) Refresh(deviceId string, t time.Time) bool { + m.lock.Lock() + defer m.lock.Unlock() + if _, ok := m.devices[deviceId]; ok { + m.devices[deviceId] = t + return true + } + + return false +} + func (m *onlineDeviceManager) Remove(deviceId string) { m.lock.Lock() defer m.lock.Unlock() diff --git a/stack/schedule.go b/stack/schedule.go new file mode 100644 index 0000000..f10e439 --- /dev/null +++ b/stack/schedule.go @@ -0,0 +1,32 @@ +package stack + +import ( + "gb-cms/dao" + "time" +) + +func RefreshCatalogScheduleTask() { + now := time.Now() + devices, _ := dao.Device.QueryRefreshCatalogExpiredDevices(now) + // 发起查询目录请求 + for _, device := range devices { + d := &Device{device} + go func() { + _, _ = d.QueryCatalog(30) + }() + } +} + +func AddScheduledTask(interval time.Duration, firstRun bool, task func()) { + ticker := time.NewTicker(interval) + if firstRun { + go task() + } + + for { + select { + case <-ticker.C: + go task() + } + } +} diff --git a/stack/sip_handler.go b/stack/sip_handler.go index 8adfe35..c7c5bc5 100644 --- a/stack/sip_handler.go +++ b/stack/sip_handler.go @@ -34,6 +34,11 @@ func (e *EventHandler) OnUnregister(id string) { _ = dao.Device.UpdateDeviceStatus(id, common.OFF) } +// OnRegister 处理设备注册请求 +// +// int - 注册有效期(秒) +// GBDevice - 注册成功后返回的设备信息, 返回nil表示注册失败 +// bool - 是否需要发送目录查询(true表示需要) func (e *EventHandler) OnRegister(id, transport, addr, userAgent string) (int, GBDevice, bool) { now := time.Now() host, p, _ := net.SplitHostPort(addr) @@ -53,18 +58,21 @@ func (e *EventHandler) OnRegister(id, transport, addr, userAgent string) (int, G log.Sugar.Errorf("保存设备信息到数据库失败 device: %s err: %s", id, err.Error()) } + OnlineDeviceManager.Add(id, now) count, _ := dao.Channel.QueryChanelCount(id, true) - return 3600, &Device{device}, count < 1 + return 3600, &Device{device}, count < 1 || dao.Device.QueryNeedRefreshCatalog(id, now) } func (e *EventHandler) OnKeepAlive(id string, addr string) bool { now := time.Now() - if err := dao.Device.RefreshHeartbeat(id, now, addr); err != nil { + if !OnlineDeviceManager.Refresh(id, now) { + // 拒绝设备离线后收到的心跳, 让设备重新发起注册 + return false + } else if err := dao.Device.RefreshHeartbeat(id, now, addr); err != nil { log.Sugar.Errorf("更新有效期失败. device: %s err: %s", id, err.Error()) return false } - OnlineDeviceManager.Add(id, now) return true } diff --git a/stats.go b/stats.go index be7fbcb..a5e91e1 100644 --- a/stats.go +++ b/stats.go @@ -10,6 +10,7 @@ import ( "gb-cms/stack" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" + "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/mem" "github.com/shirou/gopsutil/v3/net" "math" @@ -28,6 +29,8 @@ var ( ChannelTotalCount int // 包含目录 ChannelOnlineCount int // 不包含目录 DeviceCount int // 设备基数 + + KernelArch string ) const ( @@ -285,6 +288,14 @@ func isPhysicalInterface(name string, stats net.IOCountersStat) bool { } func StartStats() { + // 硬件信息统计一次 + info, err := host.Info() + if err != nil { + log.Sugar.Errorf(err.Error()) + } else { + KernelArch = info.KernelArch + } + // 统计间隔 refreshInterval := 2 * time.Second