fix: device offline when device unregister,fix delete device failed

This commit is contained in:
pggiroro
2025-08-21 22:40:48 +08:00
parent bc0c761aa8
commit 1a8e2bc816
5 changed files with 163 additions and 147 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"gorm.io/gorm"
"net/http"
"net/url"
"os"
@@ -642,7 +643,13 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb.
}
}
} else {
d.Stop(fmt.Errorf("password changed"))
d.Status = DeviceOfflineStatus
d.Online = false
d.channels.Range(func(c *Channel) bool {
c.Status = gb28181.ChannelOffStatus
return true
})
//d.Stop(fmt.Errorf("password changed"))
}
resp.Code = 0
@@ -2815,49 +2822,17 @@ func (gb *GB28181Plugin) RemoveDevice(ctx context.Context, req *pb.RemoveDeviceR
// 使用数据库中的 DeviceId 从内存中查找设备
if device, ok := gb.devices.Get(req.Id); ok {
device.channels.Range(func(channel *Channel) bool {
if err := device.plugin.DB.Where("device_id = ?", device.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
device.Error("删除设备通道记录失败", "error", err)
}
return true
})
if device.Online {
// 停止设备相关任务
device.Stop(fmt.Errorf("device removed"))
device.WaitStopped()
}
// device.Stop() 会调用 Dispose(),其中已包含从 gb.devices 中移除设备的逻辑
// 开启数据库事务
//tx := gb.DB.Begin()
//if tx.Error != nil {
// resp.Code = 500
// resp.Message = "开启事务失败"
// return resp, tx.Error
//}
//
//// 删除设备
//if err := tx.Delete(&Device{DeviceId: req.Id}).Error; err != nil {
// tx.Rollback()
// resp.Code = 500
// resp.Message = "删除设备失败"
// return resp, err
//}
//
//// 删除设备关联的通道
//if err := tx.Where("device_id = ?", req.Id).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
// tx.Rollback()
// resp.Code = 500
// resp.Message = "删除设备通道失败"
// return resp, err
//}
//
//// 提交事务
//if err := tx.Commit().Error; err != nil {
// tx.Rollback()
// resp.Code = 500
// resp.Message = "提交事务失败"
// return resp, err
//device.channels.Range(func(channel *Channel) bool {
// if err := device.plugin.DB.Where("device_id = ?", device.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
// device.Error("删除设备通道记录失败", "error", err)
// }
// return true
//})
//if device.Online {
// 停止设备相关任务
device.DeletedAt = gorm.DeletedAt{Time: time.Now(), Valid: true}
device.Stop(fmt.Errorf("device removed"))
device.WaitStopped()
//}
resp.Code = 200

View File

@@ -78,7 +78,7 @@ type Device struct {
KeepaliveCount int `gorm:"default:3" default:"3"` // 心跳次数
ChannelCount int // 通道个数
Expires int // 注册有效期
CreateTime time.Time // 创建时间
CreateTime time.Time `gorm:"primaryKey"` // 创建时间
UpdateTime time.Time // 更新时间
Charset string // 字符集, 支持 UTF-8 与 GB2312
SubscribeCatalog int `gorm:"default:0"` // 目录订阅周期0为不订阅
@@ -154,8 +154,8 @@ func (d *Device) GetKey() string {
// CatalogRequest 目录请求结构体
type CatalogRequest struct {
SN, SumNum int
FirstResponse bool // 是否为第一个响应
SN, SumNum, TotalCount int
FirstResponse bool // 是否为第一个响应
*util.Promise
sync.Mutex // 保护并发访问
}
@@ -168,21 +168,99 @@ func (r *CatalogRequest) GetKey() int {
func (r *CatalogRequest) AddResponse() bool {
r.Lock()
defer r.Unlock()
fmt.Println("r.FirstResponse: " + fmt.Sprintf("%v", r.FirstResponse))
wasFirst := r.FirstResponse
r.FirstResponse = false
fmt.Println("r.FirstResponse after: " + fmt.Sprintf("%v", r.FirstResponse))
return wasFirst
}
// IsComplete 检查是否完成接收
func (r *CatalogRequest) IsComplete(channelsLength int) bool {
func (r *CatalogRequest) IsComplete() bool {
r.Lock()
defer r.Unlock()
return channelsLength >= r.SumNum
return r.TotalCount >= r.SumNum
}
type CatalogHandlerQueueTask struct {
task.Work
}
var catalogHandlerQueueTask CatalogHandlerQueueTask
type catalogHandlerTask struct {
task.Task
d *Device
msg *gb28181.Message
}
func (c *catalogHandlerTask) Run() (err error) {
// 处理目录信息
d := c.d
msg := c.msg
catalogReq, exists := d.catalogReqs.Get(msg.SN)
d.Debug("into catalog", "msg.SN", msg.SN, "exists", exists)
if !exists {
// 创建新的目录请求
catalogReq = &CatalogRequest{
SN: msg.SN,
SumNum: msg.SumNum,
TotalCount: 0,
FirstResponse: true,
Promise: util.NewPromise(context.Background()),
}
d.catalogReqs.Set(catalogReq)
d.Debug("into catalog", "msg.SN", msg.SN, "d.catalogReqs", d.catalogReqs.Length)
}
// 添加响应并获取是否是第一个响应
isFirst := catalogReq.AddResponse()
// 更新设备信息到数据库
// 如果是第一个响应将所有通道状态标记为OFF
if isFirst {
d.Debug("将所有通道状态标记为OFF", "deviceId", d.DeviceId)
// 标记所有通道为OFF状态
d.channels.Range(func(channel *Channel) bool {
if channel.DeviceChannel != nil {
channel.DeviceChannel.Status = gb28181.ChannelOffStatus
}
return true
})
}
// 更新通道信息
for _, c := range msg.DeviceList.DeviceChannelList {
// 设置关联的设备数据库ID
c.ChannelId = c.DeviceId
c.DeviceId = d.DeviceId
c.ID = d.DeviceId + "_" + c.ChannelId
if c.CustomChannelId == "" {
c.CustomChannelId = c.ChannelId
}
d.Debug("msg.DeviceList.DeviceChannelList range", "c.ChannelId", c.ChannelId, "c.Status", c.Status)
// 使用 Save 进行 upsert 操作
d.addOrUpdateChannel(c)
catalogReq.TotalCount++
}
// 更新当前设备的通道数
d.ChannelCount = msg.SumNum
d.UpdateTime = time.Now()
d.Debug("save channel", "deviceid", d.DeviceId, " d.channels.Length", d.channels.Length, "d.ChannelCount", d.ChannelCount, "d.UpdateTime", d.UpdateTime)
// 在所有通道都添加完成后,检查是否完成接收
if catalogReq.IsComplete() {
d.Debug("IsComplete")
catalogReq.Resolve()
d.catalogReqs.RemoveByKey(msg.SN)
}
return
}
func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) {
d.plugin.Trace("into onMessage,deviceid is ", d.DeviceId)
d.plugin.Debug("into onMessage", "deviceid is ", d.DeviceId, "msg is", msg)
source := req.Source()
hostname, portStr, _ := net.SplitHostPort(source)
port, _ := strconv.Atoi(portStr)
@@ -216,68 +294,11 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
}
}
case "Catalog":
// 处理目录信息
catalogReq, exists := d.catalogReqs.Get(msg.SN)
if !exists {
// 创建新的目录请求
catalogReq = &CatalogRequest{
SN: msg.SN,
SumNum: msg.SumNum,
FirstResponse: true,
Promise: util.NewPromise(context.Background()),
}
d.catalogReqs.Set(catalogReq)
}
// 添加响应并获取是否是第一个响应
isFirst := catalogReq.AddResponse()
// 更新设备信息到数据库
// 如果是第一个响应将所有通道状态标记为OFF
if isFirst {
d.Trace("将所有通道状态标记为OFF", "deviceId", d.DeviceId)
// 标记所有通道为OFF状态
d.channels.Range(func(channel *Channel) bool {
if channel.DeviceChannel != nil {
channel.DeviceChannel.Status = gb28181.ChannelOffStatus
}
return true
})
}
// 更新通道信息
for _, c := range msg.DeviceChannelList {
// 设置关联的设备数据库ID
c.ChannelId = c.DeviceId
c.DeviceId = d.DeviceId
c.ID = d.DeviceId + "_" + c.ChannelId
if c.CustomChannelId == "" {
c.CustomChannelId = c.ChannelId
}
// 使用 Save 进行 upsert 操作
d.addOrUpdateChannel(c)
}
// 更新当前设备的通道数
d.ChannelCount = msg.SumNum
d.UpdateTime = time.Now()
d.Debug("save channel", "deviceid", d.DeviceId, " d.channels.Length", d.channels.Length, "d.ChannelCount", d.ChannelCount, "d.UpdateTime", d.UpdateTime)
// 删除所有状态为OFF的通道
// d.channels.Range(func(channel *Channel) bool {
// if channel.DeviceChannel != nil && channel.DeviceChannel.Status == gb28181.ChannelOffStatus {
// d.Debug("删除不存在的通道", "channelId", channel.ID)
// d.channels.RemoveByKey(channel.ID)
// d.plugin.channels.RemoveByKey(channel.ID)
// }
// return true
// })
// 在所有通道都添加完成后,检查是否完成接收
if catalogReq.IsComplete(d.channels.Length) {
catalogReq.Resolve()
d.catalogReqs.RemoveByKey(msg.SN)
catalogHandler := &catalogHandlerTask{
d: d,
msg: msg,
}
catalogHandlerQueueTask.AddTask(catalogHandler)
case "RecordInfo":
if channel, ok := d.channels.Get(d.DeviceId + "_" + msg.DeviceID); ok {
if req, ok := channel.RecordReqs.Get(msg.SN); ok {
@@ -614,6 +635,10 @@ func (d *Device) addOrUpdateChannel(c gb28181.DeviceChannel) {
}
// 更新通道信息
channel.DeviceChannel = &c
d.channels.Range(func(channel *Channel) bool {
d.Debug("range d.channels", "channel.ChannelId", channel.ChannelId, "channel.status", channel.Status)
return true
})
} else {
// 创建新通道
channel = &Channel{
@@ -840,12 +865,12 @@ func (d *Device) onNotify(req *sip.Request, tx sip.ServerTransaction, msg *gb281
// handleCatalog 处理设备目录更新
func (d *Device) handleCatalog(msg *gb28181.Message) error {
if msg.DeviceChannelList == nil || len(msg.DeviceChannelList) == 0 {
if msg.DeviceList.DeviceChannelList == nil || len(msg.DeviceList.DeviceChannelList) == 0 {
return fmt.Errorf("no device items in catalog")
}
// 遍历并更新设备列表
for _, item := range msg.DeviceChannelList {
for _, item := range msg.DeviceList.DeviceChannelList {
channel := &gb28181.DeviceChannel{
DeviceId: item.DeviceId,
Name: item.Name,

View File

@@ -155,6 +155,7 @@ func (gb *GB28181Plugin) OnInit() (err error) {
gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent
// Creating client handle for ua
if len(gb.Sip.ListenAddr) > 0 {
gb.AddTask(&catalogHandlerQueueTask)
gb.AddTask(&gb.devices)
gb.AddTask(&gb.platforms)
gb.AddTask(&gb.dialogs)
@@ -282,13 +283,17 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
// 检查设备是否过期
expireTime := device.RegisterTime.Add(time.Duration(device.Expires) * time.Second)
isExpired := now.After(expireTime)
// 设置设备基本属性
device.Status = DeviceOfflineStatus
if !isExpired {
device.Status = DeviceOnlineStatus
if device.CustomName == "" {
device.CustomName = device.Name
}
if device.Online || device.Status == DeviceOnlineStatus {
// 设置设备基本属性
device.Status = DeviceOfflineStatus
if !isExpired {
device.Status = DeviceOnlineStatus
}
device.Online = !isExpired
}
device.Online = !isExpired
// 设置事件通道
device.eventChan = make(chan any, 10)
@@ -389,6 +394,12 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
// 初始化设备通道并更新到数据库
for _, channel := range channels {
if channel.CustomName == "" {
channel.CustomName = channel.Name
}
if channel.CustomChannelId == "" {
channel.CustomChannelId = channel.ChannelId
}
if isExpired {
channel.Status = "OFF"
} else {
@@ -403,7 +414,7 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
// 添加设备任务
gb.devices.Add(device)
gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime)
gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime, "isExpired", isExpired, "device.Name", device.Name)
}
return nil

View File

@@ -141,19 +141,22 @@ func BuildKeepAliveXML(sn int, id string) []byte {
type (
Message struct {
XMLName xml.Name
CmdType string
SN int // 请求序列号,一般用于对应 request 和 response
DeviceID string
Longitude string // 经度
Latitude string // 纬度
DeviceName string
Manufacturer string
Model string
Channel string
Firmware string
DeviceChannelList []DeviceChannel `xml:"DeviceList>Item"`
RecordList struct {
XMLName xml.Name
CmdType string
SN int // 请求序列号,一般用于对应 request 和 response
DeviceID string
Longitude string // 经度
Latitude string // 纬度
DeviceName string
Manufacturer string
Model string
Channel string
Firmware string
DeviceList struct {
DeviceChannelList []DeviceChannel `xml:"Item"`
DeviceNum int `xml:"Num,attr"` // 将 Num 属性映射到 DeviceNum
} `xml:"DeviceList"`
RecordList struct {
Num int `xml:"Num,attr"`
Item []RecordItem `xml:"Item"`
} `xml:"RecordList"`

View File

@@ -217,7 +217,7 @@ func (task *registerHandlerTask) Run() (err error) {
channel.Status = "OFF"
return true
})
d.Stop(errors.New("unregister"))
//d.Stop(errors.New("unregister"))
}
} else {
if recover {
@@ -264,7 +264,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myWanIP = myLanIP // 使用内网IP作为外网IP
}
} else { // 目标地址是IP
} else { // 目标地址是IP
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
}
@@ -372,7 +372,7 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myWanIP = myLanIP // 使用内网IP作为外网IP
}
} else { // 目标地址是IP
} else { // 目标地址是IP
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
}
@@ -387,7 +387,9 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
}
now := time.Now()
d.CreateTime = now
if d.CreateTime.IsZero() {
d.CreateTime = now
}
d.UpdateTime = now
d.RegisterTime = now
d.KeepaliveTime = now
@@ -454,15 +456,15 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
task.gb.devices.Add(d).WaitStarted()
if task.gb.DB != nil {
var existing Device
if err := task.gb.DB.First(&existing, Device{DeviceId: d.DeviceId}).Error; err == nil {
d.ID = existing.ID // 保持原有的自增ID
task.gb.DB.Save(d).Omit("create_time")
task.gb.Info("StoreDevice", "type", "更新设备", "deviceId", d.DeviceId)
} else {
task.gb.DB.Save(d)
task.gb.Info("StoreDevice", "type", "新增设备", "deviceId", d.DeviceId)
}
//var existing Device
//if err := task.gb.DB.First(&existing, Device{DeviceId: d.DeviceId}).Error; err == nil {
// d.ID = existing.ID // 保持原有的自增ID
// task.gb.DB.Save(d).Omit("create_time")
// task.gb.Info("StoreDevice", "type", "更新设备", "deviceId", d.DeviceId)
//} else {
task.gb.DB.Save(d)
task.gb.Info("StoreDevice", "type", "新增设备", "deviceId", d.DeviceId)
//}
}
return
}