fix: dialog.getKey() change from ssrc to callid;data source of api device/list from db change to memory

This commit is contained in:
pggiroro
2025-07-06 23:00:44 +08:00
parent 4391ad2d8d
commit 966153f873
4 changed files with 147 additions and 114 deletions

View File

@@ -28,85 +28,84 @@ import (
func (gb *GB28181Plugin) List(ctx context.Context, req *pb.GetDevicesRequest) (*pb.DevicesPageInfo, error) {
resp := &pb.DevicesPageInfo{}
if gb.DB == nil {
resp.Code = 500
resp.Message = "数据库未初始化"
return resp, nil
}
// 从内存中读取设备信息,而不是从数据库查询
var pbDevices []*pb.Device
var filteredDevices []*Device
var devices []Device
var total int64
// 遍历内存中的设备集合
gb.devices.Range(func(device *Device) bool {
// 应用筛选条件
if req.Query != "" {
// 检查设备ID或名称是否包含查询字符串
if !strings.Contains(device.DeviceId, req.Query) && !strings.Contains(device.Name, req.Query) {
return true // 继续遍历
}
}
// 构建查询条件
query := gb.DB.Model(&Device{})
if req.Query != "" {
query = query.Where("device_id LIKE ? OR name LIKE ?",
"%"+req.Query+"%", "%"+req.Query+"%")
}
if req.Status {
query = query.Where("online = ?", true)
}
// 如果需要筛选在线设备
if req.Status && !device.Online {
return true // 继续遍历
}
// 获取总数
if err := query.Count(&total).Error; err != nil {
resp.Code = 500
resp.Message = fmt.Sprintf("查询总数失败: %v", err)
return resp, nil
}
// 添加到过滤后的设备列表
filteredDevices = append(filteredDevices, device)
return true
})
// 查询设备列表
// 当Page和Count都为0时不做分页返回所有数据
if req.Page == 0 && req.Count == 0 {
// 不分页,查询所有数据
if err := query.Find(&devices).Error; err != nil {
resp.Code = 500
resp.Message = fmt.Sprintf("查询设备列表失败: %v", err)
// 计算总数
total := len(filteredDevices)
resp.Total = int32(total)
// 处理分页
if req.Page > 0 && req.Count > 0 {
// 计算起始和结束索引
start := int(req.Page-1) * int(req.Count)
end := start + int(req.Count)
// 边界检查
if start >= total {
// 超出范围,返回空列表
resp.Code = 0
resp.Message = "success"
resp.Data = pbDevices
return resp, nil
}
} else {
// 分页查询设备列表
if err := query.
Offset(int(req.Page-1) * int(req.Count)).
Limit(int(req.Count)).
Find(&devices).Error; err != nil {
resp.Code = 500
resp.Message = fmt.Sprintf("查询设备列表失败: %v", err)
return resp, nil
if end > total {
end = total
}
// 应用分页
filteredDevices = filteredDevices[start:end]
}
// 转换为proto消息
var pbDevices []*pb.Device
for _, d := range devices {
// 查询设备对应的通道
var channels []gb28181.DeviceChannel
if err := gb.DB.Where(&gb28181.DeviceChannel{DeviceID: d.DeviceId}).Find(&channels).Error; err != nil {
gb.Error("查询通道失败", "error", err)
continue
}
for _, d := range filteredDevices {
var pbChannels []*pb.Channel
for _, c := range channels {
// 从设备的内存通道集合中获取通道信息
d.channels.Range(func(channel *Channel) bool {
pbChannels = append(pbChannels, &pb.Channel{
DeviceId: c.ChannelID,
ParentId: c.ParentID,
Name: c.Name,
Manufacturer: c.Manufacturer,
Model: c.Model,
Owner: c.Owner,
CivilCode: c.CivilCode,
Address: c.Address,
Port: int32(c.Port),
Parental: int32(c.Parental),
SafetyWay: int32(c.SafetyWay),
RegisterWay: int32(c.RegisterWay),
Secrecy: int32(c.Secrecy),
Status: string(c.Status),
Longitude: fmt.Sprintf("%f", c.GbLongitude),
Latitude: fmt.Sprintf("%f", c.GbLatitude),
DeviceId: channel.ChannelID,
ParentId: channel.ParentID,
Name: channel.Name,
Manufacturer: channel.Manufacturer,
Model: channel.Model,
Owner: channel.Owner,
CivilCode: channel.CivilCode,
Address: channel.Address,
Port: int32(channel.Port),
Parental: int32(channel.Parental),
SafetyWay: int32(channel.SafetyWay),
RegisterWay: int32(channel.RegisterWay),
Secrecy: int32(channel.Secrecy),
Status: string(channel.Status),
Longitude: fmt.Sprintf("%f", channel.GbLongitude),
Latitude: fmt.Sprintf("%f", channel.GbLatitude),
GpsTime: timestamppb.New(time.Now()),
})
}
return true
})
pbDevices = append(pbDevices, &pb.Device{
DeviceId: d.DeviceId,
@@ -131,7 +130,6 @@ func (gb *GB28181Plugin) List(ctx context.Context, req *pb.GetDevicesRequest) (*
resp.Code = 0
resp.Message = "success"
resp.Total = int32(total)
resp.Data = pbDevices
return resp, nil
@@ -2841,9 +2839,11 @@ func (gb *GB28181Plugin) RemoveDevice(ctx context.Context, req *pb.RemoveDeviceR
channel.DeletedAt = gorm.DeletedAt{Time: time.Now(), Valid: true}
return true
})
// 停止设备相关任务
device.Stop(fmt.Errorf("device removed"))
device.WaitStopped()
if device.Online {
// 停止设备相关任务
device.Stop(fmt.Errorf("device removed"))
device.WaitStopped()
}
// device.Stop() 会调用 Dispose(),其中已包含从 gb.devices 中移除设备的逻辑
// 开启数据库事务

View File

@@ -460,7 +460,7 @@ func (d *Device) Go() (err error) {
channel.Status = "OFF"
return true
})
d.Stop(fmt.Errorf("device keepalive timeout after %v,deviceid is %s", timeDiff, d.DeviceId))
//d.Stop(fmt.Errorf("device keepalive timeout after %v,deviceid is %s", timeDiff, d.DeviceId))
return
}
case <-catalogTick.C:

View File

@@ -98,7 +98,6 @@ func (d *Dialog) Start() (err error) {
return fmt.Errorf("device %s not found", deviceId)
}
d.gb.dialogs.Set(d)
//defer d.gb.dialogs.Remove(d)
if d.gb.tcpPort > 0 {
d.MediaPort = d.gb.tcpPort
@@ -251,6 +250,8 @@ func (d *Dialog) Start() (err error) {
if err != nil {
return errors.New("dialog invite error" + err.Error())
}
d.gb.dialogs.Set(d)
return
}
@@ -329,8 +330,8 @@ func (d *Dialog) Run() (err error) {
return
}
func (d *Dialog) GetKey() uint32 {
return d.SSRC
func (d *Dialog) GetKey() string {
return d.GetCallID()
}
func (d *Dialog) Dispose() {

View File

@@ -40,29 +40,29 @@ type PositionConfig struct {
type GB28181Plugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Password string
Sip SipConfig
MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围
Position PositionConfig
Parent string `desc:"父级设备"`
AutoMigrate bool `default:"true" desc:"自动迁移数据库结构并初始化根组织"`
ua *sipgo.UserAgent
server *sipgo.Server
devices util.Collection[string, *Device]
dialogs util.Collection[uint32, *Dialog]
forwardDialogs util.Collection[uint32, *ForwardDialog]
platforms util.Collection[string, *Platform]
tcpPorts chan uint16
tcpPort uint16
sipPorts []int
SipIP string `desc:"sip发送命令的IP一般是本地IP多网卡时需要配置正确的IP"`
MediaIP string `desc:"流媒体IP用于接收流"`
deviceManager task.Manager[string, *DeviceRegisterQueueTask]
Platforms []*gb28181.PlatformModel
channels util.Collection[string, *gb28181.DeviceChannel]
netListener net.Listener
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Password string
Sip SipConfig
MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围
Position PositionConfig
Parent string `desc:"父级设备"`
AutoMigrate bool `default:"true" desc:"自动迁移数据库结构并初始化根组织"`
ua *sipgo.UserAgent
server *sipgo.Server
devices task.Manager[string, *Device]
dialogs task.Manager[uint32, *Dialog]
forwardDialogs task.Manager[uint32, *ForwardDialog]
platforms task.Manager[string, *Platform]
tcpPorts chan uint16
tcpPort uint16
sipPorts []int
SipIP string `desc:"sip发送命令的IP一般是本地IP多网卡时需要配置正确的IP"`
MediaIP string `desc:"流媒体IP用于接收流"`
deviceRegisterManager task.Manager[string, *DeviceRegisterQueueTask]
Platforms []*gb28181.PlatformModel
channels util.Collection[string, *gb28181.DeviceChannel]
netListener net.Listener
}
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
@@ -150,7 +150,7 @@ func (gb *GB28181Plugin) OnInit() (err error) {
return pkg.ErrNoDB
}
gb.Info("GB28181 initing", gb.Platforms)
gb.AddTask(&gb.deviceManager)
gb.AddTask(&gb.deviceRegisterManager)
logger := zerolog.New(os.Stdout)
gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent
// Creating client handle for ua
@@ -225,6 +225,41 @@ func (gb *GB28181Plugin) OnInit() (err error) {
return
}
func (gb *GB28181Plugin) deleteDevice(device *Device, reason string) bool {
gb.Info(fmt.Sprintf("准备删除设备: %s", reason), "deviceId", device.DeviceId)
// 开启数据库事务
tx := gb.DB.Begin()
if tx.Error != nil {
gb.Error("开启事务失败", "error", tx.Error)
return false
}
// 删除设备
if err := tx.Delete(&Device{DeviceId: device.DeviceId}).Error; err != nil {
tx.Rollback()
gb.Error(fmt.Sprintf("删除设备失败: %s", reason), "error", err, "deviceId", device.DeviceId)
return false
}
// 删除设备关联的通道
if err := tx.Where("device_id = ?", device.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
tx.Rollback()
gb.Error(fmt.Sprintf("删除设备通道失败: %s", reason), "error", err, "deviceId", device.DeviceId)
return false
}
// 提交事务
if err := tx.Commit().Error; err != nil {
tx.Rollback()
gb.Error("提交事务失败", "error", err, "deviceId", device.DeviceId)
return false
}
gb.Info(fmt.Sprintf("已删除设备: %s", reason), "deviceId", device.DeviceId)
return true
}
func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
// 从数据库中查询所有设备
var devices []*Device
@@ -314,9 +349,11 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
}
})
device.OnDispose(func() {
device.Online = false
device.Status = DeviceOfflineStatus
if gb.devices.RemoveByKey(device.DeviceId) {
for c := range device.channels.Range {
c.DeviceChannel.Status = "OFF"
if c.PullProxyTask != nil {
c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline)
}
@@ -363,19 +400,14 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
// 添加设备任务
if !isExpired {
gb.AddTask(device)
} else {
//gb.devices.Set(device)
//_, err := device.queryDeviceInfo()
//if err != nil {
// device.Error("queryDeviceInfo when checkDeviceExpire", "err", err)
//}
}
if isExpired {
gb.Info("设备已过期", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime)
} else {
gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime)
} else {
gb.Info("设备已过期", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime)
gb.deleteDevice(device, "设备已过期")
}
} else {
// 不在线的设备,进行数据库删除
gb.deleteDevice(device, "设备不在线")
}
}
return nil
@@ -470,17 +502,17 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction)
}
gb.Debug("onregister start", "deviceId", deviceId)
gb.Debug("get gb.deviceManager.length", "length", gb.deviceManager.Length)
if deviceRegisterQueueTask, ok := gb.deviceManager.SafeGet(deviceId); ok {
gb.Debug("gb.deviceManager.SafeGet", "deviceId", deviceId)
gb.Debug("gb.deviceManager.SafeGet", "deviceRegisterQueueTask", deviceRegisterQueueTask)
gb.Debug("get gb.deviceRegisterManager.length", "length", gb.deviceRegisterManager.Length)
if deviceRegisterQueueTask, ok := gb.deviceRegisterManager.SafeGet(deviceId); ok {
gb.Debug("gb.deviceRegisterManager.SafeGet", "deviceId", deviceId)
gb.Debug("gb.deviceRegisterManager.SafeGet", "deviceRegisterQueueTask", deviceRegisterQueueTask)
deviceRegisterQueueTask.AddTask(&registerHandlerTask)
} else {
deviceRegisterQueueTask := &DeviceRegisterQueueTask{
deviceId: deviceId,
}
gb.Debug("do not safeget deviceRegisterQueueTask", "deviceId", deviceId)
gb.deviceManager.Add(deviceRegisterQueueTask)
gb.deviceRegisterManager.Add(deviceRegisterQueueTask)
deviceRegisterQueueTask.AddTask(&registerHandlerTask)
}
}
@@ -526,7 +558,7 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
}
// 如果设备和平台都存在,通过源地址判断真实来源
if d != nil && d.Online && p != nil {
if d != nil && p != nil {
source := req.Source()
if d.HostAddress == source {
// 如果源地址匹配设备地址,则确认是设备消息