feat: 支持过滤通道

This commit is contained in:
ydajiang
2025-10-31 13:44:32 +08:00
parent 7a3bffe219
commit 4f4fc016f4
8 changed files with 137 additions and 21 deletions

View File

@@ -83,7 +83,7 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, _ http.ResponseWriter,
ContactIP: "",
CreatedAt: device.CreatedAt.Format("2006-01-02 15:04:05"),
CustomName: "",
DropChannelType: "",
DropChannelType: device.DropChannelType,
GBVer: "",
ID: device.GetID(),
KeepOriginalTree: false,
@@ -413,6 +413,14 @@ func (api *ApiServer) OnDeviceInfoSet(params *DeviceInfo, w http.ResponseWriter,
if err = dao.Device.UpdateDevice(params.DeviceID, conditions); err != nil {
return nil, err
}
} else if params.DropChannelType != model.DropChannelType {
var dropChannelTypes []string
if params.DropChannelType != "" {
dropChannelTypes = strings.Split(params.DropChannelType, ",")
}
if err = dao.Device.SetDropChannelType(params.DeviceID, dropChannelTypes); err != nil {
return nil, err
}
}
return "OK", nil

View File

@@ -61,6 +61,7 @@ func (api *ApiServer) OnGetBaseConfig(_ *Empty, _ http.ResponseWriter, _ *http.R
BlackUAList: ua,
Captcha: false,
DevicePassword: common.Config.Password,
DropChannelType: common.Config.GlobalDropChannelType,
GlobalChannelAudio: true,
GlobalChannelShared: false,
GlobalDeviceAlarmSubscribeInterval: common.Config.SubAlarmGlobalInterval,
@@ -173,6 +174,23 @@ func (api *ApiServer) OnSetBaseConfig(baseConfig *BaseConfig, _ http.ResponseWri
changed = true
}
// 更新全局过滤通道类型
if baseConfig.DropChannelType != common.Config.GlobalDropChannelType {
iniConfig.Section("sip").Key("global_drop_channel_type").SetValue(baseConfig.DropChannelType)
changed = true
var codes []string
if baseConfig.DropChannelType != "" {
codes = strings.Split(baseConfig.DropChannelType, ",")
}
err = dao.Channel.DropChannel("", codes, nil)
if err != nil {
log.Sugar.Errorf("更新全局过滤通道类型失败: %s", err.Error())
return nil, err
}
}
// 更新默认媒体传输方式
var setup = "udp"
if strings.ToLower(baseConfig.MediaTransport) == "udp" {

View File

@@ -35,6 +35,8 @@ type Config_ struct {
SubPositionGlobalInterval int `json:"sub_position_global_interval"`
SubPTZGlobalInterval int `json:"sub_ptz_global_interval"`
GlobalDropChannelType string `json:"global_drop_channel_type"`
DeviceDefaultMediaTransport string `json:"device_default_media_transport"`
Hooks struct {
@@ -81,6 +83,7 @@ func ParseConfig(path string) (*Config_, error) {
SubPositionGlobalInterval: load.Section("sip").Key("sub_position_global_interval").MustInt(),
SubPTZGlobalInterval: load.Section("sip").Key("sub_ptz_global_interval").MustInt(),
DeviceDefaultMediaTransport: load.Section("sip").Key("device_default_media_transport").String(),
GlobalDropChannelType: load.Section("sip").Key("global_drop_channel_type").String(),
}
config_.Hooks.Online = load.Section("hooks").Key("online").String()

View File

@@ -31,6 +31,8 @@ sub_alarm_global_interval = 3600
sub_position_global_interval = 3600
# 全局订阅PTZ
sub_ptz_global_interval = 3600
# 全局过滤通道类型, 逗号分隔
global_drop_channel_type =
[http]
port = 9000

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"gb-cms/common"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"time"
)
@@ -55,6 +56,7 @@ type ChannelModel struct {
IsDir bool `json:"-" xml:"-"` // 是否是目录
CustomID *string `gorm:"unique" xml:"-"` // 自定义通道ID
Event string `json:"-" xml:"Event,omitempty" gorm:"-"` // <!-- 状态改变事件ON:上线,OFF:离线,VLOST:视频丢失,DEFECT:故障,ADD:增加,DEL:删除,UPDATE:更新(必选)-->
DropMark int `json:"-" xml:"-"` // 是否被过滤 0-不被过滤/非0-被过滤
}
func (d *ChannelModel) TableName() string {
@@ -122,7 +124,7 @@ func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int, sta
conditions["is_dir"] = 1
}
cTx := db.Where(conditions)
cTx := db.Where(conditions).Where("(drop_mark != 1 OR drop_mark IS NULL)")
if page > 0 {
cTx.Limit(size).Offset((page - 1) * size)
@@ -142,11 +144,26 @@ func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int, sta
}
}
countTx := db.Model(&ChannelModel{}).Select("id").Where(conditions)
countTx := db.Model(&ChannelModel{}).Select("id").Where(conditions).Where("(drop_mark != 1 OR drop_mark IS NULL)")
if keyword != "" {
countTx.Where("name like ? or device_id like ?", "%"+keyword+"%", "%"+keyword+"%")
}
// 重新统计子节点数量
for _, channel := range channels {
if !channel.IsDir {
continue
}
var total int64
// 统计子节点数量
if tx := db.Model(&ChannelModel{}).Where("root_id =? and group_id =? and (drop_mark != 1 OR drop_mark IS NULL)", channel.RootID, channel.DeviceID).Select("id").Count(&total); tx.Error != nil {
return nil, 0, tx.Error
}
channel.SubCount = int(total)
}
var total int64
if tx := countTx.Count(&total); tx.Error != nil {
return nil, 0, tx.Error
@@ -166,12 +183,12 @@ func (d *daoChannel) QueryChannelsByRootID(rootId string) ([]*ChannelModel, erro
func (d *daoChannel) QueryChanelCount(deviceId string, hasDir bool) (int, error) {
var total int64
tx := db.Model(&ChannelModel{}).Where("root_id =?", deviceId)
tx := db.Model(&ChannelModel{}).Where("root_id =? and (drop_mark != 1 OR drop_mark IS NULL)", deviceId)
if !hasDir {
tx.Where("is_dir =?", 0)
}
if tx = tx.Count(&total); tx.Error != nil {
if tx = tx.Select("id").Count(&total); tx.Error != nil {
return 0, tx.Error
}
return int(total), nil
@@ -179,12 +196,12 @@ func (d *daoChannel) QueryChanelCount(deviceId string, hasDir bool) (int, error)
func (d *daoChannel) QueryOnlineChanelCount(deviceId string, hasDir bool) (int, error) {
var total int64
tx := db.Model(&ChannelModel{}).Where("root_id =? and status =?", deviceId, "ON")
tx := db.Model(&ChannelModel{}).Where("root_id =? and status =? and (drop_mark != 1 OR drop_mark IS NULL)", deviceId, "ON")
if !hasDir {
tx.Where("is_dir =?", 0)
}
if tx = tx.Count(&total); tx.Error != nil {
if tx = tx.Select("id").Count(&total); tx.Error != nil {
return 0, tx.Error
}
@@ -193,7 +210,7 @@ func (d *daoChannel) QueryOnlineChanelCount(deviceId string, hasDir bool) (int,
func (d *daoChannel) QueryChannelByTypeCode(codecs ...int) ([]*ChannelModel, error) {
var channels []*ChannelModel
tx := db.Where("type_code in ?", codecs).Find(&channels)
tx := db.Where("type_code in ? and (drop_mark != 1 OR drop_mark IS NULL)", codecs).Find(&channels)
if tx.Error != nil {
return nil, tx.Error
}
@@ -265,7 +282,7 @@ func (d *daoChannel) UpdateChannel(channel *ChannelModel) error {
func (d *daoChannel) TotalCount() (int, error) {
var total int64
tx := db.Model(&ChannelModel{}).Count(&total)
tx := db.Model(&ChannelModel{}).Where("(drop_mark != 1 OR drop_mark IS NULL)").Select("id").Count(&total)
if tx.Error != nil {
return 0, tx.Error
}
@@ -274,7 +291,7 @@ func (d *daoChannel) TotalCount() (int, error) {
func (d *daoChannel) OnlineCount(ids []string) (int, error) {
var total int64
tx := db.Model(&ChannelModel{}).Where("status =? and root_id in ?", "ON", ids).Count(&total)
tx := db.Model(&ChannelModel{}).Where("status =? and root_id in ? and (drop_mark != 1 OR drop_mark IS NULL)", "ON", ids).Select("id").Count(&total)
if tx.Error != nil {
return 0, tx.Error
}
@@ -283,12 +300,12 @@ func (d *daoChannel) OnlineCount(ids []string) (int, error) {
func (d *daoChannel) QuerySubChannelCount(rootId string, groupId string, hasDir bool) (int, error) {
var total int64
tx := db.Model(&ChannelModel{}).Where("root_id =? and group_id =?", rootId, groupId)
tx := db.Model(&ChannelModel{}).Where("root_id =? and group_id =? and (drop_mark != 1 OR drop_mark IS NULL)", rootId, groupId)
if !hasDir {
tx.Where("is_dir =?", 0)
}
if tx = tx.Count(&total); tx.Error != nil {
if tx = tx.Select("id").Count(&total); tx.Error != nil {
return 0, tx.Error
}
return int(total), nil
@@ -296,11 +313,11 @@ func (d *daoChannel) QuerySubChannelCount(rootId string, groupId string, hasDir
func (d *daoChannel) QueryOnlineSubChannelCount(rootId string, groupId string, hasDir bool) (int, error) {
var total int64
tx := db.Model(&ChannelModel{}).Where("root_id =? and group_id =? and status =?", rootId, groupId, "ON")
tx := db.Model(&ChannelModel{}).Where("root_id =? and group_id =? and status =? and (drop_mark != 1 OR drop_mark IS NULL)", rootId, groupId, "ON")
if !hasDir {
tx.Where("is_dir =?", 0)
}
if tx = tx.Count(&total); tx.Error != nil {
if tx = tx.Select("id").Count(&total); tx.Error != nil {
return 0, tx.Error
}
return int(total), nil
@@ -339,3 +356,46 @@ func (d *daoChannel) QueryCustomID(rootId string, channelId string) (string, err
return *channel.CustomID, nil
}
// DropChannel 过滤通道
func (d *daoChannel) DropChannel(rootId string, typeCodes []string, tx *gorm.DB) error {
// 如果rootId为空, 过滤所有typeCode相同的通道
// 如果typeCodecs为空, 所有通道都不被过滤
update := func(tx *gorm.DB) error {
var conditions []clause.Expression
if rootId != "" {
conditions = append(conditions, gorm.Expr("root_id = ?", rootId))
} else {
// 全局过滤时,跳过单独设置过滤的设备
var rootIds []string
tx.Model(DeviceModel{}).Where("drop_channel_type != '' and drop_channel_type is not null").Pluck("device_id", &rootIds)
if len(rootIds) > 0 {
conditions = append(conditions, gorm.Expr("root_id NOT IN ?", rootIds))
}
}
// 处理typeCodes条件
if len(typeCodes) > 0 {
// 先重置所有符合条件的通道为不过滤
conditions = append(conditions, gorm.Expr("type_code NOT IN ?", typeCodes))
if err := tx.Model(&ChannelModel{}).Clauses(conditions...).Update("drop_mark", 0).Error; err != nil {
return err
}
// 设置指定typeCodes的通道为过滤
conditions[len(conditions)-1] = gorm.Expr("type_code IN ?", typeCodes)
return tx.Model(&ChannelModel{}).Clauses(conditions...).Update("drop_mark", 1).Error
}
// typeCodes为空时重置所有符合条件的通道为不过滤
return tx.Session(&gorm.Session{AllowGlobalUpdate: true}).Model(&ChannelModel{}).Clauses(conditions...).Update("drop_mark", 0).Error
}
if tx != nil {
return update(tx)
}
return DBTransaction(func(tx *gorm.DB) error {
return update(tx)
})
}

View File

@@ -5,6 +5,7 @@ import (
"gorm.io/gorm"
"net"
"strconv"
"strings"
"time"
)
@@ -39,6 +40,7 @@ type DeviceModel struct {
PositionSubscribe bool `json:"position_subscribe"` // 是否开启位置订阅
Longitude float64
Latitude float64
DropChannelType string
}
func (d *DeviceModel) TableName() string {
@@ -307,3 +309,14 @@ func (d *daoDevice) QueryDeviceName(deviceId string) (string, error) {
return device.Name, nil
}
func (d *daoDevice) SetDropChannelType(deviceId string, dropChannelTypes []string) error {
return DBTransaction(func(tx *gorm.DB) error {
err := tx.Model(&DeviceModel{}).Where("device_id =?", deviceId).Update("drop_channel_type", strings.Join(dropChannelTypes, ",")).Error
if err != nil {
return err
}
return Channel.DropChannel(deviceId, dropChannelTypes, tx)
})
}

View File

@@ -70,6 +70,13 @@ func (m *daoDialog) DeleteDialogsByType(id string, t int) (*SipDialogModel, erro
return &dialog, err
}
// DeleteDialogByCallID 根据callid删除会话
func (m *daoDialog) DeleteDialogByCallID(callid string) error {
return DBTransaction(func(tx *gorm.DB) error {
return tx.Where("call_id = ?", callid).Unscoped().Delete(&SipDialogModel{}).Error
})
}
// Save 保存会话
func (m *daoDialog) Save(dialog *SipDialogModel) error {
return DBTransaction(func(tx *gorm.DB) error {

View File

@@ -18,23 +18,28 @@ func RefreshCatalogScheduleTask() {
}
func RefreshSubscribeScheduleTask() {
dialogs, _ := dao.Dialog.QueryExpiredDialogs(time.Now())
now := time.Now()
dialogs, _ := dao.Dialog.QueryExpiredDialogs(now)
for _, dialog := range dialogs {
go func(t int, id string) {
device, _ := dao.Device.QueryDevice(id)
go func(dialog *dao.SipDialogModel) {
device, _ := dao.Device.QueryDevice(dialog.DeviceID)
if device == nil {
// 被上级订阅, 由上级刷新, 过期删除
if dialog.RefreshTime.Before(now) {
_ = dao.Dialog.DeleteDialogByCallID(dialog.CallID)
}
return
}
d := &Device{device}
if dao.SipDialogTypeSubscribeCatalog == t {
if dao.SipDialogTypeSubscribeCatalog == dialog.Type {
d.RefreshSubscribeCatalog()
} else if dao.SipDialogTypeSubscribePosition == t {
} else if dao.SipDialogTypeSubscribePosition == dialog.Type {
d.RefreshSubscribePosition()
} else if dao.SipDialogTypeSubscribeAlarm == t {
} else if dao.SipDialogTypeSubscribeAlarm == dialog.Type {
d.RefreshSubscribeAlarm()
}
}(dialog.Type, dialog.DeviceID)
}(dialog)
}
}