fix: gb28181 Resolve concurrency issues with fromHeader.

This commit is contained in:
pggiroro
2025-12-14 21:12:04 +08:00
parent 3adcb6211c
commit 0d4f27c007
4 changed files with 28 additions and 24 deletions

View File

@@ -17,7 +17,7 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/langhuihui/gotask"
task "github.com/langhuihui/gotask"
"m7s.live/v5/pkg/util"
gb28181 "m7s.live/v5/plugin/gb28181/pkg"
mrtp "m7s.live/v5/plugin/rtp/pkg"
@@ -114,7 +114,6 @@ type Device struct {
client *sipgo.Client
contactHDR sip.ContactHeader
fromHDR sip.FromHeader
toHDR sip.ToHeader
plugin *GB28181Plugin `gorm:"-:all"`
LocalPort int
CatalogSubscribeTask *CatalogSubscribeTask `gorm:"-:all"`
@@ -531,7 +530,13 @@ func (d *Device) CreateRequest(Method sip.RequestMethod, Recipient any) *sip.Req
} else {
req = sip.NewRequest(Method, d.Recipient)
}
fromHDR := d.fromHDR
// 创建新的 FromHeader 并克隆 Params避免并发问题
// 因为 HeaderParams 是 map 类型,直接拷贝会共享同一个 map 引用
fromHDR := sip.FromHeader{
DisplayName: d.fromHDR.DisplayName,
Address: d.fromHDR.Address,
Params: d.fromHDR.Params.Clone(),
}
fromHDR.Params.Add("tag", sip.GenerateTagN(32))
req.AppendHeader(&fromHDR)
contentType := sip.ContentTypeHeader("Application/MANSCDP+xml")

View File

@@ -438,7 +438,6 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
},
Params: sip.NewParams(),
}
device.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
// 设置接收者
device.Recipient = sip.Uri{

View File

@@ -979,7 +979,7 @@ func (p *Platform) buildChannelItem(channel gb28181.DeviceChannel) string {
channel.RegisterWay, // 直接使用整数值
channel.Secrecy, // 直接使用整数值
parentID,
channel.Parental, // 直接使用整数值
channel.Parental, // 直接使用整数值
channel.SafetyWay) // 直接使用整数值
}
@@ -1012,19 +1012,20 @@ func (p *Platform) handleDeviceControl(req *sip.Request, tx sip.ServerTransactio
}
// 创建转发请求
request := sip.NewRequest(sip.MESSAGE, device.Recipient)
// 设置From头部使用平台信息
fromHeader := device.fromHDR
fromTag, _ := req.From().Params.Get("tag")
fromHeader.Params.Add("tag", fromTag)
request.AppendHeader(&fromHeader)
// 添加To头部使用设备信息
toHeader := sip.ToHeader{
Address: device.Recipient,
}
request.AppendHeader(&toHeader)
request := device.CreateRequest(sip.MESSAGE, nil)
//request := sip.NewRequest(sip.MESSAGE, device.Recipient)
//
//// 设置From头部使用平台信息
//fromHeader := device.fromHDR
//fromTag, _ := req.From().Params.Get("tag")
//fromHeader.Params.Add("tag", fromTag)
//request.AppendHeader(&fromHeader)
//
//// 添加To头部使用设备信息
//toHeader := sip.ToHeader{
// Address: device.Recipient,
//}
//request.AppendHeader(&toHeader)
// 添加Via头部
//viaHeader := sip.ViaHeader{
@@ -1039,8 +1040,8 @@ func (p *Platform) handleDeviceControl(req *sip.Request, tx sip.ServerTransactio
//request.AppendHeader(&viaHeader)
// 设置Content-Type
contentTypeHeader := sip.ContentTypeHeader("Application/MANSCDP+xml")
request.AppendHeader(&contentTypeHeader)
//contentTypeHeader := sip.ContentTypeHeader("Application/MANSCDP+xml")
//request.AppendHeader(&contentTypeHeader)
// 直接使用原始消息体
request.SetBody(req.Body())
@@ -1049,7 +1050,7 @@ func (p *Platform) handleDeviceControl(req *sip.Request, tx sip.ServerTransactio
request.SetTransport(strings.ToUpper(device.Transport))
// 发送请求
_, err = device.client.Do(p, request)
_, err = device.send(request)
if err != nil {
p.Error("发送控制命令失败", "error", err.Error())
return fmt.Errorf("send control command failed: %v", err)

View File

@@ -266,7 +266,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myWanIP = myLanIP // 使用内网IP作为外网IP
}
} else { // 目标地址是IP
} else { // 目标地址是IP
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
}
@@ -384,7 +384,7 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myWanIP = myLanIP // 使用内网IP作为外网IP
}
} else { // 目标地址是IP
} else { // 目标地址是IP
if sourceIPParse.IsPrivate() { // 源IP是内网IP
myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP
}
@@ -442,7 +442,6 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request,
d.LocalPort = myPort
d.Logger = task.gb.Logger.With("deviceid", deviceid)
d.fromHDR.Params.Add("tag", sip.GenerateTagN(16))
// 根据设备访问的本地IP、端口和传输协议获取或创建对应的Client
client, err := task.gb.getOrCreateClient(d.SipIp, d.LocalPort, d.Transport)
if err != nil {