feature: reinit device from db

This commit is contained in:
pg
2025-03-06 21:50:28 +08:00
committed by pggiroro
parent 1764a9f7e7
commit 4a52cc89bc
4 changed files with 189 additions and 28 deletions

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"os"
"sort"
"strings"
"sync"
"time"
@@ -463,7 +464,7 @@ func (gb *GB28181ProPlugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceRe
// 初始化 SIP 客户端
d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.LocalIP))
if d.client != nil {
d.dialogClient = sipgo.NewDialogClient(d.client, d.contactHDR)
d.dialogClient = sipgo.NewDialogClientCache(d.client, d.contactHDR)
} else {
return resp, fmt.Errorf("failed to create sip client")
}
@@ -1129,6 +1130,11 @@ func (gb *GB28181ProPlugin) QueryRecord(ctx context.Context, req *pb.QueryRecord
resp.Code = 0
resp.Message = fmt.Sprintf("success, received %d/%d records", recordReq.ReceivedNum, recordReq.SumNum)
// 排序录像列表按StartTime升序排序
sort.Slice(resp.Records, func(i, j int) bool {
return resp.Records[i].StartTime < resp.Records[j].StartTime
})
// 清理请求
channel.RecordReqs.Remove(recordReq)
@@ -1219,7 +1225,7 @@ func (gb *GB28181ProPlugin) TestSip(ctx context.Context, req *pb.TestSipRequest)
resp.Message = "failed to create sip client"
return resp, nil
}
device.dialogClient = sipgo.NewDialogClient(device.client, device.contactHDR)
device.dialogClient = sipgo.NewDialogClientCache(device.client, device.contactHDR)
// 构建目标URI
recipient := sip.Uri{

View File

@@ -70,7 +70,7 @@ type Device struct {
Longitude, Latitude string // 经度,纬度
eventChan chan any
client *sipgo.Client
dialogClient *sipgo.DialogClient
dialogClient *sipgo.DialogClientCache
contactHDR sip.ContactHeader
fromHDR sip.FromHeader
toHDR sip.ToHeader

View File

@@ -44,7 +44,7 @@ type GB28181ProPlugin struct {
m7s.Plugin
AutoInvite bool `default:"true" desc:"自动邀请"`
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Username string
Password string
Sip SipConfig
@@ -75,8 +75,8 @@ func (gb *GB28181ProPlugin) OnInit() (err error) {
// Creating client handle for ua
if len(gb.Sip.ListenAddr) > 0 {
gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua
gb.server.OnRegister(gb.OnRegister)
gb.server.OnMessage(gb.OnMessage)
gb.server.OnRegister(gb.OnRegister)
gb.server.OnBye(gb.OnBye)
gb.devices.L = new(sync.RWMutex)
gb.server.OnInvite(gb.OnInvite)
@@ -120,6 +120,11 @@ func (gb *GB28181ProPlugin) OnInit() (err error) {
gb.Error("检查设备过期状态失败", "error", err)
}
// 初始化数据库中的设备
if err := gb.checkDevices(); err != nil {
gb.Error("检查设备有效性失败", "error", err)
}
// 检查并初始化平台
gb.checkPlatform()
}
@@ -133,6 +138,158 @@ func (gb *GB28181ProPlugin) OnInit() (err error) {
return
}
// InitDevicesFromDB 从数据库中加载并初始化在线设备
func (gb *GB28181ProPlugin) checkDevices() error {
// 检查数据库是否已初始化
if gb.DB == nil {
gb.Warn("InitDevicesFromDB", "warning", "数据库未初始化")
return nil
}
// 查询所有有效设备:在线且注册未过期
var devices []*Device
now := time.Now()
// 先查询所有在线设备
if err := gb.DB.Where("online = ?", true).Find(&devices).Error; err != nil {
gb.Error("InitDevicesFromDB", "error", err.Error())
return err
}
// 过滤出未过期的设备
validDevices := make([]*Device, 0, len(devices))
for _, d := range devices {
expireTime := d.RegisterTime.Add(time.Duration(d.Expires) * time.Second)
if !now.After(expireTime) {
validDevices = append(validDevices, d)
} else {
gb.Debug("InitDevicesFromDB", "跳过过期设备", d.DeviceID, "注册时间", d.RegisterTime, "过期时间", expireTime)
}
}
gb.Info("InitDevicesFromDB", "找到有效设备数量", len(validDevices), "总在线设备数量", len(devices))
// 初始化每个设备
for _, device := range validDevices {
d := device // 创建副本以避免循环变量问题
// 设置设备基本属性
d.Status = DeviceRecoverStatus
// 设置事件通道
d.eventChan = make(chan any, 10)
// 设置Logger
d.Logger = gb.With("deviceid", d.DeviceID)
// 初始化通道集合
d.channels.L = new(sync.RWMutex)
// 设置plugin引用
d.plugin = gb
// 配置SIP相关参数
host := d.LocalIP
if host == "" {
host = myip.InternalIPv4()
d.LocalIP = host
d.mediaIp = host
}
// 获取公网或内网IP配置
if !net.ParseIP(d.IP).IsPrivate() {
host = gb.GetPublicIP(host)
}
// 设置联系人头信息
d.contactHDR = sip.ContactHeader{
Address: sip.Uri{
User: gb.Serial,
Host: host,
Port: d.LocalPort,
},
}
// 设置来源头信息
d.fromHDR = sip.FromHeader{
Address: sip.Uri{
User: gb.Serial,
Host: gb.Realm,
},
Params: sip.NewParams(),
}
d.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
// 设置接收者
d.Recipient = sip.Uri{
Host: d.IP,
Port: d.Port,
User: d.DeviceID,
}
// 创建SIP客户端
d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(host), sipgo.WithClientPort(d.LocalPort))
d.dialogClient = sipgo.NewDialogClientCache(d.client, d.contactHDR)
// 设置设备ID的hash值作为任务ID
var hash uint32
for i := 0; i < len(d.DeviceID); i++ {
ch := d.DeviceID[i]
hash = hash*31 + uint32(ch)
}
d.Task.ID = hash
// 设置启动和销毁回调
d.OnStart(func() {
gb.devices.Add(d)
d.channels.OnAdd(func(c *Channel) {
if absDevice, ok := gb.Server.PullProxies.Find(func(absDevice *m7s.PullProxy) bool {
return absDevice.Type == "gb28181" && absDevice.URL == fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID)
}); ok {
c.AbstractDevice = absDevice
absDevice.Handler = c
absDevice.ChangeStatus(m7s.PullProxyStatusOnline)
}
if gb.AutoInvite {
gb.Pull(fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID), config.Pull{
MaxRetry: 0,
URL: fmt.Sprintf("%s/%s", d.DeviceID, c.DeviceID),
}, nil)
}
})
})
d.OnDispose(func() {
d.Status = DeviceOfflineStatus
if gb.devices.RemoveByKey(d.DeviceID) {
for c := range d.channels.Range {
if c.AbstractDevice != nil {
c.AbstractDevice.ChangeStatus(m7s.PullProxyStatusOffline)
}
}
}
})
// 添加设备任务
gb.AddTask(d)
expireTime := d.RegisterTime.Add(time.Duration(d.Expires) * time.Second)
gb.Info("InitDevicesFromDB", "已初始化设备", d.DeviceID, "注册时间", d.RegisterTime, "过期时间", expireTime)
// 加载设备的通道
var channels []gb28181.DeviceChannel
if err := gb.DB.Where(&gb28181.DeviceChannel{DeviceDBID: d.ID}).Find(&channels).Error; err != nil {
gb.Error("InitDevicesFromDB", "加载通道失败", d.DeviceID, "error", err.Error())
} else {
// 初始化设备通道
for _, channel := range channels {
d.addOrUpdateChannel(channel)
}
gb.Info("InitDevicesFromDB", "已加载通道数量", len(channels), "设备ID", d.DeviceID)
}
}
return nil
}
// checkPlatform 从数据库中查找启用状态的平台,初始化它们,并进行注册和定时任务设置
func (gb *GB28181ProPlugin) checkPlatform() {
// 检查数据库是否初始化
@@ -362,6 +519,17 @@ func (gb *GB28181ProPlugin) OnRegister(req *sip.Request, tx sip.ServerTransactio
}
func (gb *GB28181ProPlugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
// 解析消息内容
temp := &gb28181.Message{}
err := gb28181.DecodeXML(temp, req.Body())
if err != nil {
gb.Error("OnMessage", "error", err.Error())
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Bad Request", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
from := req.From()
if from == nil || from.Address.User == "" {
gb.Error("OnMessage", "error", "no user")
@@ -406,18 +574,6 @@ func (gb *GB28181ProPlugin) OnMessage(req *sip.Request, tx sip.ServerTransaction
return
}
// 解析消息内容
temp := &gb28181.Message{}
err := gb28181.DecodeXML(temp, req.Body())
if err != nil {
gb.Error("OnMessage", "error", err.Error())
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Bad Request", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
// 根据来源调用不同的处理方法
if d != nil {
d.UpdateTime = time.Now()
@@ -500,7 +656,6 @@ func (gb *GB28181ProPlugin) RecoverDevice(d *Device, req *sip.Request) {
}
func (gb *GB28181ProPlugin) StoreDevice(deviceid string, req *sip.Request) (d *Device) {
from := req.From()
source := req.Source()
desc := req.Destination()
servIp, sPortStr, _ := net.SplitHostPort(desc)
@@ -561,7 +716,7 @@ func (gb *GB28181ProPlugin) StoreDevice(deviceid string, req *sip.Request) (d *D
Recipient: sip.Uri{
Host: hostname,
Port: port,
User: from.Address.User,
User: deviceid,
},
contactHDR: sip.ContactHeader{
Address: sip.Uri{
@@ -585,7 +740,7 @@ func (gb *GB28181ProPlugin) StoreDevice(deviceid string, req *sip.Request) (d *D
d.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(host), sipgo.WithClientPort(serverPort))
gb.Info("get serverport is ", serverPort)
d.dialogClient = sipgo.NewDialogClient(d.client, d.contactHDR)
d.dialogClient = sipgo.NewDialogClientCache(d.client, d.contactHDR)
d.channels.L = new(sync.RWMutex)
d.Info("StoreDevice", "source", source, "desc", desc, "servIp", servIp, "publicIP", host, "recipient", req.Recipient)

View File

@@ -23,12 +23,12 @@ type Platform struct {
PlatformModel *gb28181.PlatformModel
// SIP相关字段不存储到数据库
Client *sipgo.Client `gorm:"-" json:"-"` // SIP客户端
DialogClient *sipgo.DialogClient `gorm:"-" json:"-"` // SIP对话客户端
Recipient sip.Uri `gorm:"-" json:"-"` // 接收者地址
ContactHDR *sip.ContactHeader `gorm:"-" json:"-"` // 联系人头部
UserAgentHDR sip.Header `gorm:"-" json:"-"`
MaxForwardsHDR sip.MaxForwardsHeader `gorm:"-" json:"-"`
Client *sipgo.Client `gorm:"-" json:"-"` // SIP客户端
DialogClient *sipgo.DialogClientCache `gorm:"-" json:"-"` // SIP对话客户端
Recipient sip.Uri `gorm:"-" json:"-"` // 接收者地址
ContactHDR *sip.ContactHeader `gorm:"-" json:"-"` // 联系人头部
UserAgentHDR sip.Header `gorm:"-" json:"-"`
MaxForwardsHDR sip.MaxForwardsHeader `gorm:"-" json:"-"`
// 运行时字段
KeepAliveReply int `gorm:"-" json:"keepAliveReply"` // KeepAliveReply表示心跳未回复次数
@@ -75,7 +75,7 @@ func NewPlatform(pm *gb28181.PlatformModel, plugin *GB28181ProPlugin) *Platform
p.ContactHDR = &contactHdr
// 创建对话客户端
p.DialogClient = sipgo.NewDialogClient(p.Client, *p.ContactHDR)
p.DialogClient = sipgo.NewDialogClientCache(p.Client, *p.ContactHDR)
p.MaxForwardsHDR = sip.MaxForwardsHeader(70)
p.plugin.platforms.Set(p)
@@ -563,7 +563,7 @@ func (p *Platform) buildChannelItem(channel gb28181.CommonGBChannel) string {
channel.GbRegisterWay, // 直接使用整数值
channel.GbSecrecy, // 直接使用整数值
parentID,
channel.GbParental, // 直接使用整数值
channel.GbParental, // 直接使用整数值
channel.GbSafetyWay) // 直接使用整数值
}