From 96b9cbfc080de877de2afc1cbff8b8a539625e1d Mon Sep 17 00:00:00 2001 From: pggiroro Date: Sun, 3 Aug 2025 20:34:31 +0800 Subject: [PATCH] fix: gb28181 update use taskManager --- plugin/gb28181/api.go | 4 +- plugin/gb28181/device.go | 106 +++++------- plugin/gb28181/index.go | 269 +++++++++++++++--------------- plugin/gb28181/pkg/groupsmodel.go | 26 +-- plugin/gb28181/registerhandler.go | 38 ++--- 5 files changed, 200 insertions(+), 243 deletions(-) diff --git a/plugin/gb28181/api.go b/plugin/gb28181/api.go index 2cbb20c..857a7b0 100644 --- a/plugin/gb28181/api.go +++ b/plugin/gb28181/api.go @@ -836,7 +836,7 @@ func (gb *GB28181Plugin) AddPlatform(ctx context.Context, req *pb.Platform) (*pb // 创建Platform实例 platform := NewPlatform(platformModel, gb, false) // 添加到任务系统 - gb.AddTask(platform) + gb.platforms.Add(platform) } resp.Code = 0 @@ -983,7 +983,7 @@ func (gb *GB28181Plugin) UpdatePlatform(ctx context.Context, req *pb.Platform) ( // 创建新的Platform实例 platformInstance := NewPlatform(&platform, gb, false) // 添加到任务系统 - gb.AddTask(platformInstance) + gb.platforms.Add(platformInstance) } else { // 如果平台被禁用,停止并移除旧的platform实例 if oldPlatform, ok := gb.platforms.Get(platform.ServerGBID); ok { diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index 736918c..5d2f93b 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -31,6 +31,33 @@ const ( DeviceAlarmedStatus DeviceStatus = "ALARMED" ) +type DeviceKeepaliveTickTask struct { + task.TickTask + device *Device + seconds time.Duration +} + +func (d *DeviceKeepaliveTickTask) GetTickInterval() time.Duration { + return d.seconds +} + +func (d *DeviceKeepaliveTickTask) Tick(any) { + keepaliveSeconds := 60 + if d.device.KeepaliveInterval >= 5 { + keepaliveSeconds = d.device.KeepaliveInterval + } + d.Debug("keepLiveTick,deviceid is", d.device.DeviceId, "d.KeepaliveTime is ", d.device.KeepaliveTime, "d.KeepaliveInterval is ", d.device.KeepaliveInterval, "d.KeepaliveCount is ", d.device.KeepaliveCount) + if timeDiff := time.Since(d.device.KeepaliveTime); timeDiff > time.Duration(d.device.KeepaliveCount*keepaliveSeconds)*time.Second { + d.device.Online = false + d.device.Status = DeviceOfflineStatus + // 设置所有通道状态为off + d.device.channels.Range(func(channel *Channel) bool { + channel.Status = "OFF" + return true + }) + } +} + type Device struct { task.Job `gorm:"-:all"` DeviceId string `gorm:"primaryKey"` // 设备国标编号 @@ -46,7 +73,8 @@ type Device struct { Online bool // 是否在线,true为在线,false为离线 RegisterTime time.Time // 注册时间 KeepaliveTime time.Time // 心跳时间 - KeepaliveInterval int `gorm:"default:60"` // 心跳间隔 + KeepaliveInterval int `gorm:"default:60" default:"60"` // 心跳间隔 + KeepaliveCount int `gorm:"default:3" default:"3"` // 心跳次数 ChannelCount int // 通道个数 Expires int // 注册有效期 CreateTime time.Time // 创建时间 @@ -91,6 +119,8 @@ func (d *Device) TableName() string { } func (d *Device) Dispose() { + //d.Online = false + //d.Status = DeviceOfflineStatus if d.plugin.DB != nil { // 先删除该设备关联的所有channels if err := d.plugin.DB.Where("device_id = ?", d.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil { @@ -103,6 +133,9 @@ func (d *Device) Dispose() { if err := d.plugin.DB.Create(channel.DeviceChannel).Error; err != nil { d.Error("保存设备通道记录失败", "error", err) } + if channel.PullProxyTask != nil { + channel.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline) + } d.plugin.channels.RemoveByKey(channel.ID) return true }) @@ -110,7 +143,6 @@ func (d *Device) Dispose() { // 保存设备信息 d.plugin.DB.Save(d) } - d.plugin.devices.RemoveByKey(d.DeviceId) } func (d *Device) GetKey() string { @@ -168,7 +200,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 case "Keepalive": d.KeepaliveInterval = int(time.Since(d.KeepaliveTime).Seconds()) d.KeepaliveTime = time.Now() - d.Trace("into keeplive,deviceid is ", d.DeviceId, "d.KeepaliveTime is", d.KeepaliveTime) + d.Debug("into keeplive,deviceid is ", d.DeviceId, "d.KeepaliveTime is", d.KeepaliveTime, "d.KeepaliveInterval is", d.KeepaliveInterval) if d.plugin.DB != nil { if err := d.plugin.DB.Model(d).Updates(map[string]interface{}{ "keepalive_interval": d.KeepaliveInterval, @@ -405,75 +437,21 @@ func (d *Device) Go() (err error) { if d.SubscribeCatalog > 0 { catalogSubTask := NewCatalogSubscribeTask(d) d.AddTask(catalogSubTask) + catalogSubTask.Depend(d) } // 创建并启动位置订阅任务 if d.SubscribePosition > 0 { positionSubTask := NewPositionSubscribeTask(d) d.AddTask(positionSubTask) + positionSubTask.Depend(d) } - - catalogTick := time.NewTicker(time.Minute * 1000) - keepaliveSeconds := 60 - if d.KeepaliveInterval >= 5 { - keepaliveSeconds = d.KeepaliveInterval - } - keepLiveTick := time.NewTicker(time.Second * 10) - defer keepLiveTick.Stop() - defer catalogTick.Stop() - for { - select { - case <-d.Done(): - case <-keepLiveTick.C: - d.Trace("keepLiveTick,deviceid is", d.DeviceId, "d.KeepaliveTime is ", d.KeepaliveTime) - if timeDiff := time.Since(d.KeepaliveTime); timeDiff > time.Duration(3*keepaliveSeconds)*time.Second { - d.Online = false - d.Status = DeviceOfflineStatus - // 设置所有通道状态为off - d.channels.Range(func(channel *Channel) bool { - channel.Status = "OFF" - return true - }) - //d.Stop(fmt.Errorf("device keepalive timeout after %v,deviceid is %s", timeDiff, d.DeviceId)) - return - } - case <-catalogTick.C: - if time.Since(d.KeepaliveTime) > time.Second*time.Duration(d.Expires) { - d.Error("keepalive timeout", "keepaliveTime", d.KeepaliveTime) - return - } - response, err = d.catalog() - if err != nil { - d.Error("catalog", "err", err) - } else { - d.Trace("catalogTick", "response", response.String()) - } - //case event := <-d.eventChan: - // d.Debug("eventChan", "event", event) - // switch v := event.(type) { - // case []gb28181.DeviceChannel: - // for _, c := range v { - // //当父设备非空且存在时、父设备节点增加通道 - // if c.ParentId != "" { - // path := strings.Split(c.ParentId, "/") - // parentId := path[len(path)-1] - // //如果父ID并非本身所属设备,一般情况下这是因为下级设备上传了目录信息,该信息通常不需要处理。 - // // 暂时不考虑级联目录的实现 - // if d.DeviceId != parentId { - // if parent, ok := d.plugin.devices.Get(parentId); ok { - // parent.addOrUpdateChannel(c) - // continue - // } else { - // c.Model = "Directory " + c.Model - // c.Status = "NoParent" - // } - // } - // } - // d.addOrUpdateChannel(c) - // } - // } - } + deviceKeepaliveTickTask := &DeviceKeepaliveTickTask{ + seconds: time.Second * 30, + device: d, } + d.AddTask(deviceKeepaliveTickTask) + return deviceKeepaliveTickTask.WaitStopped() } func (d *Device) CreateRequest(Method sip.RequestMethod, Recipient any) *sip.Request { diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index 80cf6c1..43b5cdc 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -150,11 +150,15 @@ func (gb *GB28181Plugin) OnInit() (err error) { return pkg.ErrNoDB } gb.Info("GB28181 initing", gb.Platforms) - gb.AddTask(&gb.deviceRegisterManager) logger := zerolog.New(os.Stdout) gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent // Creating client handle for ua if len(gb.Sip.ListenAddr) > 0 { + gb.AddTask(&gb.devices) + gb.AddTask(&gb.platforms) + gb.AddTask(&gb.dialogs) + gb.AddTask(&gb.forwardDialogs) + gb.AddTask(&gb.deviceRegisterManager) gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua gb.server.OnMessage(gb.OnMessage) gb.server.OnRegister(gb.OnRegister) @@ -274,145 +278,132 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) { now := time.Now() for _, device := range devices { - if device.Online { - // 检查设备是否过期 - expireTime := device.RegisterTime.Add(time.Duration(device.Expires) * time.Second) - isExpired := now.After(expireTime) + // 检查设备是否过期 + expireTime := device.RegisterTime.Add(time.Duration(device.Expires) * time.Second) + isExpired := now.After(expireTime) - // 设置设备基本属性 - device.Status = DeviceOfflineStatus - if !isExpired { - device.Status = DeviceOnlineStatus - } - device.Online = !isExpired - - // 设置事件通道 - device.eventChan = make(chan any, 10) - - // 设置Logger - device.Logger = gb.Logger.With("deviceid", device.DeviceId) - - // 初始化通道集合 - device.channels.L = new(sync.RWMutex) - - // 初始化目录请求集合 - device.catalogReqs.L = new(sync.RWMutex) - - // 设置plugin引用 - device.plugin = gb - - // 设置联系人头信息 - device.contactHDR = sip.ContactHeader{ - Address: sip.Uri{ - User: gb.Serial, - Host: device.SipIp, - Port: device.LocalPort, - }, - } - - // 设置来源头信息 - device.fromHDR = sip.FromHeader{ - Address: sip.Uri{ - User: gb.Serial, - Host: device.SipIp, - Port: device.LocalPort, - }, - Params: sip.NewParams(), - } - device.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) - - // 设置接收者 - device.Recipient = sip.Uri{ - Host: device.IP, - Port: device.Port, - User: device.DeviceId, - } - - // 创建SIP客户端 - device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(device.SipIp)) - device.Info("checkDeviceExpire", "d.SipIp", device.SipIp, "d.LocalPort", device.LocalPort, "d.contactHDR", device.contactHDR) - - // 设置设备ID的hash值作为任务ID - var hash uint32 - for i := 0; i < len(device.DeviceId); i++ { - ch := device.DeviceId[i] - hash = hash*31 + uint32(ch) - } - device.Task.ID = hash - // 设置启动和销毁回调 - device.OnStart(func() { - gb.devices.Set(device) - }) - device.channels.OnAdd(func(c *Channel) { - if absDevice, ok := gb.Server.PullProxies.SafeFind(func(absDevice m7s.IPullProxy) bool { - conf := absDevice.GetConfig() - return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", device.DeviceId, c.ChannelId) - }); ok { - c.PullProxyTask = absDevice.(*PullProxy) - absDevice.ChangeStatus(m7s.PullProxyStatusOnline) - } - }) - device.OnDispose(func() { - device.Online = false - device.Status = DeviceOfflineStatus - if gb.devices.RemoveByKey(device.DeviceId) { - for c := range device.channels.Range { - c.DeviceChannel.Status = "OFF" - if c.PullProxyTask != nil { - c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline) - } - } - } - }) - - // 加载设备的通道 - var channels []gb28181.DeviceChannel - if err := gb.DB.Where(&gb28181.DeviceChannel{DeviceId: device.DeviceId}).Find(&channels).Error; err != nil { - gb.Error("加载通道失败", "error", err, "deviceId", device.DeviceId) - continue - } - - if gb.SipIP != "" { - device.SipIp = gb.SipIP - } - if gb.MediaIP != "" { - device.MediaIp = gb.MediaIP - } - - // 更新设备状态到数据库 - if err := gb.DB.Model(&Device{}).Where(&Device{DeviceId: device.DeviceId}).Updates(map[string]interface{}{ - "online": device.Online, - "status": device.Status, - }).Error; err != nil { - gb.Error("更新设备状态到数据库失败", "error", err, "deviceId", device.DeviceId) - } - - // 初始化设备通道并更新到数据库 - for _, channel := range channels { - if isExpired { - channel.Status = "OFF" - } else { - channel.Status = "ON" - } - // 更新通道状态到数据库 - if err := gb.DB.Model(&gb28181.DeviceChannel{}).Where(&gb28181.DeviceChannel{ID: channel.ID}).Update("status", channel.Status).Error; err != nil { - gb.Error("更新通道状态到数据库失败", "error", err, "channelId", channel.ChannelId) - } - device.addOrUpdateChannel(channel) - } - - // 添加设备任务 - if !isExpired { - gb.AddTask(device) - gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime) - } else { - gb.Info("设备已过期", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime) - gb.deleteDevice(device, "设备已过期") - } - } else { - // 不在线的设备,进行数据库删除 - gb.deleteDevice(device, "设备不在线") + // 设置设备基本属性 + device.Status = DeviceOfflineStatus + if !isExpired { + device.Status = DeviceOnlineStatus } + device.Online = !isExpired + + // 设置事件通道 + device.eventChan = make(chan any, 10) + + // 设置Logger + device.Logger = gb.Logger.With("deviceid", device.DeviceId) + + // 初始化通道集合 + device.channels.L = new(sync.RWMutex) + + // 初始化目录请求集合 + device.catalogReqs.L = new(sync.RWMutex) + + // 设置plugin引用 + device.plugin = gb + + // 设置联系人头信息 + device.contactHDR = sip.ContactHeader{ + Address: sip.Uri{ + User: gb.Serial, + Host: device.SipIp, + Port: device.LocalPort, + }, + } + + // 设置来源头信息 + device.fromHDR = sip.FromHeader{ + Address: sip.Uri{ + User: gb.Serial, + Host: device.SipIp, + Port: device.LocalPort, + }, + Params: sip.NewParams(), + } + device.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) + + // 设置接收者 + device.Recipient = sip.Uri{ + Host: device.IP, + Port: device.Port, + User: device.DeviceId, + } + + // 创建SIP客户端 + device.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(device.SipIp)) + device.Info("checkDeviceExpire", "d.SipIp", device.SipIp, "d.LocalPort", device.LocalPort, "d.contactHDR", device.contactHDR) + + // 设置设备ID的hash值作为任务ID + var hash uint32 + for i := 0; i < len(device.DeviceId); i++ { + ch := device.DeviceId[i] + hash = hash*31 + uint32(ch) + } + device.Task.ID = hash + device.channels.OnAdd(func(c *Channel) { + if absDevice, ok := gb.Server.PullProxies.SafeFind(func(absDevice m7s.IPullProxy) bool { + conf := absDevice.GetConfig() + return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", device.DeviceId, c.ChannelId) + }); ok { + c.PullProxyTask = absDevice.(*PullProxy) + absDevice.ChangeStatus(m7s.PullProxyStatusOnline) + } + }) + //device.OnDispose(func() { + // device.Online = false + // device.Status = DeviceOfflineStatus + // if gb.devices.RemoveByKey(device.DeviceId) { + // for c := range device.channels.Range { + // c.DeviceChannel.Status = "OFF" + // if c.PullProxyTask != nil { + // c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline) + // } + // } + // } + //}) + + // 加载设备的通道 + var channels []gb28181.DeviceChannel + if err := gb.DB.Where(&gb28181.DeviceChannel{DeviceId: device.DeviceId}).Find(&channels).Error; err != nil { + gb.Error("加载通道失败", "error", err, "deviceId", device.DeviceId) + continue + } + + if gb.SipIP != "" { + device.SipIp = gb.SipIP + } + if gb.MediaIP != "" { + device.MediaIp = gb.MediaIP + } + + // 更新设备状态到数据库 + if err := gb.DB.Model(&Device{}).Where(&Device{DeviceId: device.DeviceId}).Updates(map[string]interface{}{ + "online": device.Online, + "status": device.Status, + }).Error; err != nil { + gb.Error("更新设备状态到数据库失败", "error", err, "deviceId", device.DeviceId) + } + + // 初始化设备通道并更新到数据库 + for _, channel := range channels { + if isExpired { + channel.Status = "OFF" + } else { + channel.Status = "ON" + } + // 更新通道状态到数据库 + if err := gb.DB.Model(&gb28181.DeviceChannel{}).Where(&gb28181.DeviceChannel{ID: channel.ID}).Update("status", channel.Status).Error; err != nil { + gb.Error("更新通道状态到数据库失败", "error", err, "channelId", channel.ChannelId) + } + device.addOrUpdateChannel(channel) + } + + // 添加设备任务 + gb.devices.Add(device) + gb.Info("设备有效", "deviceId", device.DeviceId, "registerTime", device.RegisterTime, "expireTime", expireTime) + } return nil } @@ -480,7 +471,7 @@ func (gb *GB28181Plugin) checkPlatform() { // gb.Error("unregister err ", err) //} // 添加到任务系统 - gb.AddTask(platform) + gb.platforms.Add(platform) gb.Info("平台初始化完成", "ID", platformModel.ServerGBID, "Name", platformModel.Name) } } diff --git a/plugin/gb28181/pkg/groupsmodel.go b/plugin/gb28181/pkg/groupsmodel.go index ad8d085..0bbc472 100644 --- a/plugin/gb28181/pkg/groupsmodel.go +++ b/plugin/gb28181/pkg/groupsmodel.go @@ -96,16 +96,16 @@ func AutoMigrateAll(db interface{}) error { return nil } -// BeforeCreate GORM钩子,在创建记录前设置创建时间和更新时间 -func (g *GroupsModel) BeforeCreate() error { - now := time.Now() - g.CreateTime = now - g.UpdateTime = now - return nil -} - -// BeforeUpdate GORM钩子,在更新记录前设置更新时间 -func (g *GroupsModel) BeforeUpdate() error { - g.UpdateTime = time.Now() - return nil -} +//// BeforeCreate GORM钩子,在创建记录前设置创建时间和更新时间 +//func (g *GroupsModel) BeforeCreate() error { +// now := time.Now() +// g.CreateTime = now +// g.UpdateTime = now +// return nil +//} +// +//// BeforeUpdate GORM钩子,在更新记录前设置更新时间 +//func (g *GroupsModel) BeforeUpdate() error { +// g.UpdateTime = time.Now() +// return nil +//} diff --git a/plugin/gb28181/registerhandler.go b/plugin/gb28181/registerhandler.go index 1a2e2c7..63d0f49 100644 --- a/plugin/gb28181/registerhandler.go +++ b/plugin/gb28181/registerhandler.go @@ -256,7 +256,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) { myLanIP := myip.InternalIPv4() myWanIP := myip.ExternalIPv4() - task.gb.Info("Start RecoverDevice", "source", source, "desc", desc, "myLanIP", myLanIP, "myWanIP", myWanIP) + task.gb.Info("Start RecoverDevice", "source", source, "desc", desc, "myLanIP", myLanIP, "myWanIP", myWanIP, "deviceid", d.DeviceId) // 处理目标地址和源地址的IP映射关系 if sourceIPParse != nil { // 源IP有效时才进行处理 @@ -264,7 +264,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) { if sourceIPParse.IsPrivate() { // 源IP是内网IP myWanIP = myLanIP // 使用内网IP作为外网IP } - } else { // 目标地址是IP + } else { // 目标地址是IP if sourceIPParse.IsPrivate() { // 源IP是内网IP myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP } @@ -299,13 +299,14 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) { d.HostAddress = d.IP + ":" + sourcePortStr d.Status = DeviceOnlineStatus d.UpdateTime = time.Now() + d.KeepaliveTime = time.Now() d.RegisterTime = time.Now() d.Online = true d.client, _ = sipgo.NewClient(task.gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp)) d.channels.L = new(sync.RWMutex) d.catalogReqs.L = new(sync.RWMutex) d.plugin = task.gb - d.plugin.Info("RecoverDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort) + d.plugin.Info("RecoverDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort, "deviceid", d.DeviceId) if task.gb.DB != nil { //var existing Device @@ -371,7 +372,7 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request, if sourceIPParse.IsPrivate() { // 源IP是内网IP myWanIP = myLanIP // 使用内网IP作为外网IP } - } else { // 目标地址是IP + } else { // 目标地址是IP if sourceIPParse.IsPrivate() { // 源IP是内网IP myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP } @@ -441,29 +442,16 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request, } d.Task.ID = hash - d.OnStart(func() { - task.gb.devices.Set(d) - d.channels.OnAdd(func(c *Channel) { - if absDevice, ok := task.gb.Server.PullProxies.SafeFind(func(absDevice m7s.IPullProxy) bool { - conf := absDevice.GetConfig() - return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", d.DeviceId, c.ChannelId) - }); ok { - c.PullProxyTask = absDevice.(*PullProxy) - absDevice.ChangeStatus(m7s.PullProxyStatusOnline) - } - }) - }) - d.OnDispose(func() { - d.Status = DeviceOfflineStatus - if task.gb.devices.RemoveByKey(d.DeviceId) { - for c := range d.channels.Range { - if c.PullProxyTask != nil { - c.PullProxyTask.ChangeStatus(m7s.PullProxyStatusOffline) - } - } + d.channels.OnAdd(func(c *Channel) { + if absDevice, ok := task.gb.Server.PullProxies.SafeFind(func(absDevice m7s.IPullProxy) bool { + conf := absDevice.GetConfig() + return conf.Type == "gb28181" && conf.URL == fmt.Sprintf("%s/%s", d.DeviceId, c.ChannelId) + }); ok { + c.PullProxyTask = absDevice.(*PullProxy) + absDevice.ChangeStatus(m7s.PullProxyStatusOnline) } }) - task.gb.AddTask(d).WaitStarted() + task.gb.devices.Add(d).WaitStarted() if task.gb.DB != nil { var existing Device