fix: gb28181 update use taskManager

This commit is contained in:
pggiroro
2025-08-03 20:34:31 +08:00
parent 2bbee90a9f
commit 96b9cbfc08
5 changed files with 200 additions and 243 deletions

View File

@@ -836,7 +836,7 @@ func (gb *GB28181Plugin) AddPlatform(ctx context.Context, req *pb.Platform) (*pb
// 创建Platform实例
platform := NewPlatform(platformModel, gb, false)
// 添加到任务系统
gb.AddTask(platform)
gb.platforms.Add(platform)
}
resp.Code = 0
@@ -983,7 +983,7 @@ func (gb *GB28181Plugin) UpdatePlatform(ctx context.Context, req *pb.Platform) (
// 创建新的Platform实例
platformInstance := NewPlatform(&platform, gb, false)
// 添加到任务系统
gb.AddTask(platformInstance)
gb.platforms.Add(platformInstance)
} else {
// 如果平台被禁用停止并移除旧的platform实例
if oldPlatform, ok := gb.platforms.Get(platform.ServerGBID); ok {

View File

@@ -31,6 +31,33 @@ const (
DeviceAlarmedStatus DeviceStatus = "ALARMED"
)
type DeviceKeepaliveTickTask struct {
task.TickTask
device *Device
seconds time.Duration
}
func (d *DeviceKeepaliveTickTask) GetTickInterval() time.Duration {
return d.seconds
}
func (d *DeviceKeepaliveTickTask) Tick(any) {
keepaliveSeconds := 60
if d.device.KeepaliveInterval >= 5 {
keepaliveSeconds = d.device.KeepaliveInterval
}
d.Debug("keepLiveTick,deviceid is", d.device.DeviceId, "d.KeepaliveTime is ", d.device.KeepaliveTime, "d.KeepaliveInterval is ", d.device.KeepaliveInterval, "d.KeepaliveCount is ", d.device.KeepaliveCount)
if timeDiff := time.Since(d.device.KeepaliveTime); timeDiff > time.Duration(d.device.KeepaliveCount*keepaliveSeconds)*time.Second {
d.device.Online = false
d.device.Status = DeviceOfflineStatus
// 设置所有通道状态为off
d.device.channels.Range(func(channel *Channel) bool {
channel.Status = "OFF"
return true
})
}
}
type Device struct {
task.Job `gorm:"-:all"`
DeviceId string `gorm:"primaryKey"` // 设备国标编号
@@ -46,7 +73,8 @@ type Device struct {
Online bool // 是否在线true为在线false为离线
RegisterTime time.Time // 注册时间
KeepaliveTime time.Time // 心跳时间
KeepaliveInterval int `gorm:"default:60"` // 心跳间隔
KeepaliveInterval int `gorm:"default:60" default:"60"` // 心跳间隔
KeepaliveCount int `gorm:"default:3" default:"3"` // 心跳次数
ChannelCount int // 通道个数
Expires int // 注册有效期
CreateTime time.Time // 创建时间
@@ -91,6 +119,8 @@ func (d *Device) TableName() string {
}
func (d *Device) Dispose() {
//d.Online = false
//d.Status = DeviceOfflineStatus
if d.plugin.DB != nil {
// 先删除该设备关联的所有channels
if err := d.plugin.DB.Where("device_id = ?", d.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
@@ -103,6 +133,9 @@ func (d *Device) Dispose() {
if err := d.plugin.DB.Create(channel.DeviceChannel).Error; err != nil {
d.Error("保存设备通道记录失败", "error", err)
}
if channel.PullProxyTask != nil {
channel.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline)
}
d.plugin.channels.RemoveByKey(channel.ID)
return true
})
@@ -110,7 +143,6 @@ func (d *Device) Dispose() {
// 保存设备信息
d.plugin.DB.Save(d)
}
d.plugin.devices.RemoveByKey(d.DeviceId)
}
func (d *Device) GetKey() string {
@@ -168,7 +200,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
case "Keepalive":
d.KeepaliveInterval = int(time.Since(d.KeepaliveTime).Seconds())
d.KeepaliveTime = time.Now()
d.Trace("into keeplive,deviceid is ", d.DeviceId, "d.KeepaliveTime is", d.KeepaliveTime)
d.Debug("into keeplive,deviceid is ", d.DeviceId, "d.KeepaliveTime is", d.KeepaliveTime, "d.KeepaliveInterval is", d.KeepaliveInterval)
if d.plugin.DB != nil {
if err := d.plugin.DB.Model(d).Updates(map[string]interface{}{
"keepalive_interval": d.KeepaliveInterval,
@@ -405,75 +437,21 @@ func (d *Device) Go() (err error) {
if d.SubscribeCatalog > 0 {
catalogSubTask := NewCatalogSubscribeTask(d)
d.AddTask(catalogSubTask)
catalogSubTask.Depend(d)
}
// 创建并启动位置订阅任务
if d.SubscribePosition > 0 {
positionSubTask := NewPositionSubscribeTask(d)
d.AddTask(positionSubTask)
positionSubTask.Depend(d)
}
catalogTick := time.NewTicker(time.Minute * 1000)
keepaliveSeconds := 60
if d.KeepaliveInterval >= 5 {
keepaliveSeconds = d.KeepaliveInterval
}
keepLiveTick := time.NewTicker(time.Second * 10)
defer keepLiveTick.Stop()
defer catalogTick.Stop()
for {
select {
case <-d.Done():
case <-keepLiveTick.C:
d.Trace("keepLiveTick,deviceid is", d.DeviceId, "d.KeepaliveTime is ", d.KeepaliveTime)
if timeDiff := time.Since(d.KeepaliveTime); timeDiff > time.Duration(3*keepaliveSeconds)*time.Second {
d.Online = false
d.Status = DeviceOfflineStatus
// 设置所有通道状态为off
d.channels.Range(func(channel *Channel) bool {
channel.Status = "OFF"
return true
})
//d.Stop(fmt.Errorf("device keepalive timeout after %v,deviceid is %s", timeDiff, d.DeviceId))
return
}
case <-catalogTick.C:
if time.Since(d.KeepaliveTime) > time.Second*time.Duration(d.Expires) {
d.Error("keepalive timeout", "keepaliveTime", d.KeepaliveTime)
return
}
response, err = d.catalog()
if err != nil {
d.Error("catalog", "err", err)
} else {
d.Trace("catalogTick", "response", response.String())
}
//case event := <-d.eventChan:
// d.Debug("eventChan", "event", event)
// switch v := event.(type) {
// case []gb28181.DeviceChannel:
// for _, c := range v {
// //当父设备非空且存在时、父设备节点增加通道
// if c.ParentId != "" {
// path := strings.Split(c.ParentId, "/")
// parentId := path[len(path)-1]
// //如果父ID并非本身所属设备一般情况下这是因为下级设备上传了目录信息该信息通常不需要处理。
// // 暂时不考虑级联目录的实现
// if d.DeviceId != parentId {
// if parent, ok := d.plugin.devices.Get(parentId); ok {
// parent.addOrUpdateChannel(c)
// continue
// } else {
// c.Model = "Directory " + c.Model
// c.Status = "NoParent"
// }
// }
// }
// d.addOrUpdateChannel(c)
// }
// }
}
deviceKeepaliveTickTask := &DeviceKeepaliveTickTask{
seconds: time.Second * 30,
device: d,
}
d.AddTask(deviceKeepaliveTickTask)
return deviceKeepaliveTickTask.WaitStopped()
}
func (d *Device) CreateRequest(Method sip.RequestMethod, Recipient any) *sip.Request {

View File

@@ -150,11 +150,15 @@ func (gb *GB28181Plugin) OnInit() (err error) {
return pkg.ErrNoDB
}
gb.Info("GB28181 initing", gb.Platforms)
gb.AddTask(&gb.deviceRegisterManager)
logger := zerolog.New(os.Stdout)
gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent
// Creating client handle for ua
if len(gb.Sip.ListenAddr) > 0 {
gb.AddTask(&gb.devices)
gb.AddTask(&gb.platforms)
gb.AddTask(&gb.dialogs)
gb.AddTask(&gb.forwardDialogs)
gb.AddTask(&gb.deviceRegisterManager)
gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua
gb.server.OnMessage(gb.OnMessage)
gb.server.OnRegister(gb.OnRegister)
@@ -274,145 +278,132 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
now := time.Now()
for _, device := range devices {
if device.Online {
// 检查设备是否过期
expireTime := device.RegisterTime.Add(time.Duration(device.Expires) * time.Second)
isExpired := now.After(expireTime)
// 检查设备是否过期
expireTime := device.RegisterTime.Add(time.Duration(device.Expires) * time.Second)
isExpired := now.After(expireTime)
// 设置设备基本属性
device.Status = DeviceOfflineStatus
if !isExpired {
device.Status = DeviceOnlineStatus
}
device.Online = !isExpired
// 设置事件通道
device.eventChan = make(chan any, 10)
// 设置Logger
device.Logger = gb.Logger.With("deviceid", device.DeviceId)
// 初始化通道集合
device.channels.L = new(sync.RWMutex)
// 初始化目录请求集合
device.catalogReqs.L = new(sync.RWMutex)
// 设置plugin引用
device.plugin = gb
// 设置联系人头信息
device.contactHDR = sip.ContactHeader{
Address: sip.Uri{
User: gb.Serial,
Host: device.SipIp,
Port: device.LocalPort,
},
}
// 设置来源头信息
device.fromHDR = sip.FromHeader{
Address: sip.Uri{
User: gb.Serial,
Host: device.SipIp,
Port: device.LocalPort,
},
Params: sip.NewParams(),
}
device.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
// 设置接收者
device.Recipient = sip.Uri{
Host: device.IP,
Port: device.Port,
User: device.DeviceId,
}
// 创建SIP客户端
device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(device.SipIp))
device.Info("checkDeviceExpire", "d.SipIp", device.SipIp, "d.LocalPort", device.LocalPort, "d.contactHDR", device.contactHDR)
// 设置设备ID的hash值作为任务ID
var hash uint32
for i := 0; i < len(device.DeviceId); i++ {
ch := device.DeviceId[i]
hash = hash*31 + uint32(ch)
}
device.Task.ID = hash
// 设置启动和销毁回调
device.OnStart(func() {
gb.devices.Set(device)
})
device.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.PullProxies.SafeFind(func(absDevice m7s.IPullProxy) bool {
conf := absDevice.GetConfig()
return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", device.DeviceId, c.ChannelId)
}); ok {
c.PullProxyTask = absDevice.(*PullProxy)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
})
device.OnDispose(func() {
device.Online = false
device.Status = DeviceOfflineStatus
if gb.devices.RemoveByKey(device.DeviceId) {
for c := range device.channels.Range {
c.DeviceChannel.Status = "OFF"
if c.PullProxyTask != nil {
c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline)
}
}
}
})
// 加载设备的通道
var channels []gb28181.DeviceChannel
if err := gb.DB.Where(&gb28181.DeviceChannel{DeviceId: device.DeviceId}).Find(&channels).Error; err != nil {
gb.Error("加载通道失败", "error", err, "deviceId", device.DeviceId)
continue
}
if gb.SipIP != "" {
device.SipIp = gb.SipIP
}
if gb.MediaIP != "" {
device.MediaIp = gb.MediaIP
}
// 更新设备状态到数据库
if err := gb.DB.Model(&Device{}).Where(&Device{DeviceId: device.DeviceId}).Updates(map[string]interface{}{
"online": device.Online,
"status": device.Status,
}).Error; err != nil {
gb.Error("更新设备状态到数据库失败", "error", err, "deviceId", device.DeviceId)
}
// 初始化设备通道并更新到数据库
for _, channel := range channels {
if isExpired {
channel.Status = "OFF"
} else {
channel.Status = "ON"
}
// 更新通道状态到数据库
if err := gb.DB.Model(&gb28181.DeviceChannel{}).Where(&gb28181.DeviceChannel{ID: channel.ID}).Update("status", channel.Status).Error; err != nil {
gb.Error("更新通道状态到数据库失败", "error", err, "channelId", channel.ChannelId)
}
device.addOrUpdateChannel(channel)
}
// 添加设备任务
if !isExpired {
gb.AddTask(device)
gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime)
} else {
gb.Info("设备已过期", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime)
gb.deleteDevice(device, "设备已过期")
}
} else {
// 不在线的设备,进行数据库删除
gb.deleteDevice(device, "设备不在线")
// 设置设备基本属性
device.Status = DeviceOfflineStatus
if !isExpired {
device.Status = DeviceOnlineStatus
}
device.Online = !isExpired
// 设置事件通道
device.eventChan = make(chan any, 10)
// 设置Logger
device.Logger = gb.Logger.With("deviceid", device.DeviceId)
// 初始化通道集合
device.channels.L = new(sync.RWMutex)
// 初始化目录请求集合
device.catalogReqs.L = new(sync.RWMutex)
// 设置plugin引用
device.plugin = gb
// 设置联系人头信息
device.contactHDR = sip.ContactHeader{
Address: sip.Uri{
User: gb.Serial,
Host: device.SipIp,
Port: device.LocalPort,
},
}
// 设置来源头信息
device.fromHDR = sip.FromHeader{
Address: sip.Uri{
User: gb.Serial,
Host: device.SipIp,
Port: device.LocalPort,
},
Params: sip.NewParams(),
}
device.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
// 设置接收者
device.Recipient = sip.Uri{
Host: device.IP,
Port: device.Port,
User: device.DeviceId,
}
// 创建SIP客户端
device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(device.SipIp))
device.Info("checkDeviceExpire", "d.SipIp", device.SipIp, "d.LocalPort", device.LocalPort, "d.contactHDR", device.contactHDR)
// 设置设备ID的hash值作为任务ID
var hash uint32
for i := 0; i < len(device.DeviceId); i++ {
ch := device.DeviceId[i]
hash = hash*31 + uint32(ch)
}
device.Task.ID = hash
device.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.PullProxies.SafeFind(func(absDevice m7s.IPullProxy) bool {
conf := absDevice.GetConfig()
return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", device.DeviceId, c.ChannelId)
}); ok {
c.PullProxyTask = absDevice.(*PullProxy)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
})
//device.OnDispose(func() {
// device.Online = false
// device.Status = DeviceOfflineStatus
// if gb.devices.RemoveByKey(device.DeviceId) {
// for c := range device.channels.Range {
// c.DeviceChannel.Status = "OFF"
// if c.PullProxyTask != nil {
// c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline)
// }
// }
// }
//})
// 加载设备的通道
var channels []gb28181.DeviceChannel
if err := gb.DB.Where(&gb28181.DeviceChannel{DeviceId: device.DeviceId}).Find(&channels).Error; err != nil {
gb.Error("加载通道失败", "error", err, "deviceId", device.DeviceId)
continue
}
if gb.SipIP != "" {
device.SipIp = gb.SipIP
}
if gb.MediaIP != "" {
device.MediaIp = gb.MediaIP
}
// 更新设备状态到数据库
if err := gb.DB.Model(&Device{}).Where(&Device{DeviceId: device.DeviceId}).Updates(map[string]interface{}{
"online": device.Online,
"status": device.Status,
}).Error; err != nil {
gb.Error("更新设备状态到数据库失败", "error", err, "deviceId", device.DeviceId)
}
// 初始化设备通道并更新到数据库
for _, channel := range channels {
if isExpired {
channel.Status = "OFF"
} else {
channel.Status = "ON"
}
// 更新通道状态到数据库
if err := gb.DB.Model(&gb28181.DeviceChannel{}).Where(&gb28181.DeviceChannel{ID: channel.ID}).Update("status", channel.Status).Error; err != nil {
gb.Error("更新通道状态到数据库失败", "error", err, "channelId", channel.ChannelId)
}
device.addOrUpdateChannel(channel)
}
// 添加设备任务
gb.devices.Add(device)
gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime)
}
return nil
}
@@ -480,7 +471,7 @@ func (gb *GB28181Plugin) checkPlatform() {
// gb.Error("unregister err ", err)
//}
// 添加到任务系统
gb.AddTask(platform)
gb.platforms.Add(platform)
gb.Info("平台初始化完成", "ID", platformModel.ServerGBID, "Name", platformModel.Name)
}
}

View File

@@ -96,16 +96,16 @@ func AutoMigrateAll(db interface{}) error {
return nil
}
// BeforeCreate GORM钩子在创建记录前设置创建时间和更新时间
func (g *GroupsModel) BeforeCreate() error {
now := time.Now()
g.CreateTime = now
g.UpdateTime = now
return nil
}
// BeforeUpdate GORM钩子在更新记录前设置更新时间
func (g *GroupsModel) BeforeUpdate() error {
g.UpdateTime = time.Now()
return nil
}
//// BeforeCreate GORM钩子在创建记录前设置创建时间和更新时间
//func (g *GroupsModel) BeforeCreate() error {
// now := time.Now()
// g.CreateTime = now
// g.UpdateTime = now
// return nil
//}
//
//// BeforeUpdate GORM钩子在更新记录前设置更新时间
//func (g *GroupsModel) BeforeUpdate() error {
// g.UpdateTime = time.Now()
// return nil
//}

View File

@@ -256,7 +256,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
myLanIP := myip.InternalIPv4()
myWanIP := myip.ExternalIPv4()
task.gb.Info("Start RecoverDevice", "source", source, "desc", desc, "myLanIP", myLanIP, "myWanIP", myWanIP)
task.gb.Info("Start RecoverDevice", "source", source, "desc", desc, "myLanIP", myLanIP, "myWanIP", myWanIP, "deviceid", d.DeviceId)
// 处理目标地址和源地址的IP映射关系
if sourceIPParse != nil { // 源IP有效时才进行处理
@@ -264,7 +264,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myWanIP = myLanIP // 使用内网IP作为外网IP
}
} else { // 目标地址是IP
} else { // 目标地址是IP
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
}
@@ -299,13 +299,14 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
d.HostAddress = d.IP + ":" + sourcePortStr
d.Status = DeviceOnlineStatus
d.UpdateTime = time.Now()
d.KeepaliveTime = time.Now()
d.RegisterTime = time.Now()
d.Online = true
d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp))
d.channels.L = new(sync.RWMutex)
d.catalogReqs.L = new(sync.RWMutex)
d.plugin = task.gb
d.plugin.Info("RecoverDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort)
d.plugin.Info("RecoverDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort, "deviceid", d.DeviceId)
if task.gb.DB != nil {
//var existing Device
@@ -371,7 +372,7 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myWanIP = myLanIP // 使用内网IP作为外网IP
}
} else { // 目标地址是IP
} else { // 目标地址是IP
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
}
@@ -441,29 +442,16 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
}
d.Task.ID = hash
d.OnStart(func() {
task.gb.devices.Set(d)
d.channels.OnAdd(func(c *Channel) {
if absDevice, ok := task.gb.Server.PullProxies.SafeFind(func(absDevice m7s.IPullProxy) bool {
conf := absDevice.GetConfig()
return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", d.DeviceId, c.ChannelId)
}); ok {
c.PullProxyTask = absDevice.(*PullProxy)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
})
})
d.OnDispose(func() {
d.Status = DeviceOfflineStatus
if task.gb.devices.RemoveByKey(d.DeviceId) {
for c := range d.channels.Range {
if c.PullProxyTask != nil {
c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline)
}
}
d.channels.OnAdd(func(c *Channel) {
if absDevice, ok := task.gb.Server.PullProxies.SafeFind(func(absDevice m7s.IPullProxy) bool {
conf := absDevice.GetConfig()
return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", d.DeviceId, c.ChannelId)
}); ok {
c.PullProxyTask = absDevice.(*PullProxy)
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
})
task.gb.AddTask(d).WaitStarted()
task.gb.devices.Add(d).WaitStarted()
if task.gb.DB != nil {
var existing Device