fix: gb28181 register too fast will start too many task

This commit is contained in:
pggiroro
2025-05-22 22:55:49 +08:00
parent 47884b6880
commit 80ad1044e3
2 changed files with 63 additions and 62 deletions

View File

@@ -140,7 +140,7 @@ func (r *CatalogRequest) IsComplete(channelsLength int) bool {
}
func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) {
d.Debug("into onMessage,deviceid is ", d.DeviceId)
d.plugin.Debug("into onMessage,deviceid is ", d.DeviceId)
source := req.Source()
hostname, portStr, _ := net.SplitHostPort(source)
port, _ := strconv.Atoi(portStr)

View File

@@ -42,7 +42,7 @@ type GB28181Plugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Password string
Sip SipConfig
MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围
@@ -545,13 +545,17 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction)
d.Stop(errors.New("unregister"))
}
} else {
if d, ok := gb.devices.Get(deviceid); ok && d.Online {
if d, ok := gb.devices.Get(deviceid); ok {
gb.Info("into recoverdevice", "deviceId", d.DeviceId)
d.Status = DeviceOnlineStatus
gb.RecoverDevice(d, req)
gb.StoreDevice(deviceid, req, d)
} else {
d := &Device{
DeviceId: deviceid,
}
gb.devices.Set(d)
gb.Info("into StoreDevice", "deviceId", from)
gb.StoreDevice(deviceid, req)
gb.StoreDevice(deviceid, req, d)
}
}
}
@@ -593,7 +597,7 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
gb.Debug("00000000000001,deviceid is ", id)
// 如果设备和平台都存在,通过源地址判断真实来源
if d != nil && p != nil {
if d != nil && d.Online && p != nil {
source := req.Source()
if d.HostAddress == source {
// 如果源地址匹配设备地址,则确认是设备消息
@@ -606,7 +610,7 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
gb.Debug("00000000000002,deviceid is ", id)
// 如果既不是设备也不是平台返回404
if d == nil && p == nil {
if (d == nil && p == nil) || (d != nil && !d.Online) {
var response *sip.Response
gb.Info("OnMessage", "error", "device/platform not found", "id", id)
response = sip.NewResponseFromRequest(req, sip.StatusNotFound, "Not Found", nil)
@@ -619,7 +623,7 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
gb.Debug("00000000000003,deviceid is ", id)
// 根据来源调用不同的处理方法
if d != nil {
if d != nil && d.Online {
d.UpdateTime = time.Now()
if err = d.onMessage(req, tx, temp); err != nil {
gb.Error("onMessage", "error", err.Error(), "type", "device,deviceid is", d.DeviceId)
@@ -750,7 +754,7 @@ func (gb *GB28181Plugin) RecoverDevice(d *Device, req *sip.Request) {
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myWanIP = myLanIP // 使用内网IP作为外网IP
}
} else { // 目标地址是IP
} else { // 目标地址是IP
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
}
@@ -790,21 +794,22 @@ func (gb *GB28181Plugin) RecoverDevice(d *Device, req *sip.Request) {
d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp))
d.channels.L = new(sync.RWMutex)
d.catalogReqs.L = new(sync.RWMutex)
d.Info("StoreDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort)
d.plugin = gb
d.plugin.Info("RecoverDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort)
if gb.DB != nil {
var existing Device
if err := gb.DB.First(&existing, Device{DeviceId: d.DeviceId}).Error; err == nil {
d.ID = existing.ID // 保持原有的自增ID
gb.Info("RecoverDevice", "type", "更新设备", "deviceId", d.DeviceId)
} else {
gb.Info("RecoverDevice", "type", "新增设备", "deviceId", d.DeviceId)
}
//var existing Device
//if err := gb.DB.First(&existing, Device{DeviceId: d.DeviceId}).Error; err == nil {
// d.ID = existing.ID // 保持原有的自增ID
// gb.Info("RecoverDevice", "type", "更新设备", "deviceId", d.DeviceId)
//} else {
// gb.Info("RecoverDevice", "type", "新增设备", "deviceId", d.DeviceId)
//}
gb.DB.Save(d)
}
}
func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Device) {
func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request, d *Device) {
source := req.Source()
sourceIP, sourcePortStr, _ := net.SplitHostPort(source)
sourcePort, _ := strconv.Atoi(sourcePortStr)
@@ -854,7 +859,7 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myWanIP = myLanIP // 使用内网IP作为外网IP
}
} else { // 目标地址是IP
} else { // 目标地址是IP
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
}
@@ -869,48 +874,45 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi
}
now := time.Now()
d = &Device{
DeviceId: deviceid,
CreateTime: now,
UpdateTime: now,
RegisterTime: now,
KeepaliveTime: now,
Status: DeviceOnlineStatus,
Online: true,
StreamMode: "TCP-PASSIVE", // 默认UDP传输
Charset: "GB2312", // 默认GB2312字符集
GeoCoordSys: "WGS84", // 默认WGS84坐标系
Transport: req.Transport(), // 传输协议
IP: sourceIP,
Port: sourcePort,
HostAddress: sourceIP + ":" + sourcePortStr,
SipIp: myLanIP,
MediaIp: myWanIP,
Expires: int(expSec),
eventChan: make(chan any, 10),
Recipient: sip.Uri{
Host: sourceIP,
Port: sourcePort,
User: deviceid,
},
contactHDR: sip.ContactHeader{
Address: sip.Uri{
User: gb.Serial,
Host: myWanIP,
Port: myPort,
},
},
fromHDR: sip.FromHeader{
Address: sip.Uri{
User: gb.Serial,
Host: myWanIP,
Port: myPort,
},
Params: sip.NewParams(),
},
plugin: gb,
LocalPort: myPort,
d.CreateTime = now
d.UpdateTime = now
d.RegisterTime = now
d.KeepaliveTime = now
d.Status = DeviceOnlineStatus
d.Online = true
d.StreamMode = "TCP-PASSIVE" // 默认UDP传输
d.Charset = "GB2312" // 默认GB2312字符集
d.GeoCoordSys = "WGS84" // 默认WGS84坐标系
d.Transport = req.Transport() // 传输协议
d.IP = sourceIP
d.Port = sourcePort
d.HostAddress = sourceIP + ":" + sourcePortStr
d.SipIp = myLanIP
d.MediaIp = myWanIP
d.Expires = int(expSec)
d.eventChan = make(chan any, 10)
d.Recipient = sip.Uri{
Host: sourceIP,
Port: sourcePort,
User: deviceid,
}
d.contactHDR = sip.ContactHeader{
Address: sip.Uri{
User: gb.Serial,
Host: myWanIP,
Port: myPort,
},
}
d.fromHDR = sip.FromHeader{
Address: sip.Uri{
User: gb.Serial,
Host: myWanIP,
Port: myPort,
},
Params: sip.NewParams(),
}
d.plugin = gb
d.LocalPort = myPort
d.Logger = gb.With("deviceid", deviceid)
d.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
@@ -949,7 +951,7 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi
}
}
})
gb.AddTask(d)
gb.AddTask(d).WaitStarted()
if gb.DB != nil {
var existing Device
@@ -962,7 +964,6 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi
gb.Info("StoreDevice", "type", "新增设备", "deviceId", d.DeviceId)
}
}
return
}
func (gb *GB28181Plugin) Pull(streamPath string, conf config.Pull, pubConf *config.Publish) {