From 0d4f27c007fdb9fff360836419f87400c165bd2e Mon Sep 17 00:00:00 2001 From: pggiroro Date: Sun, 14 Dec 2025 21:12:04 +0800 Subject: [PATCH] fix: gb28181 Resolve concurrency issues with fromHeader. --- plugin/gb28181/device.go | 11 +++++++--- plugin/gb28181/index.go | 1 - plugin/gb28181/platform.go | 35 ++++++++++++++++--------------- plugin/gb28181/registerhandler.go | 5 ++--- 4 files changed, 28 insertions(+), 24 deletions(-) diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index ea07b39..a85660f 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -17,7 +17,7 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" - "github.com/langhuihui/gotask" + task "github.com/langhuihui/gotask" "m7s.live/v5/pkg/util" gb28181 "m7s.live/v5/plugin/gb28181/pkg" mrtp "m7s.live/v5/plugin/rtp/pkg" @@ -114,7 +114,6 @@ type Device struct { client *sipgo.Client contactHDR sip.ContactHeader fromHDR sip.FromHeader - toHDR sip.ToHeader plugin *GB28181Plugin `gorm:"-:all"` LocalPort int CatalogSubscribeTask *CatalogSubscribeTask `gorm:"-:all"` @@ -531,7 +530,13 @@ func (d *Device) CreateRequest(Method sip.RequestMethod, Recipient any) *sip.Req } else { req = sip.NewRequest(Method, d.Recipient) } - fromHDR := d.fromHDR + // 创建新的 FromHeader 并克隆 Params,避免并发问题 + // 因为 HeaderParams 是 map 类型,直接拷贝会共享同一个 map 引用 + fromHDR := sip.FromHeader{ + DisplayName: d.fromHDR.DisplayName, + Address: d.fromHDR.Address, + Params: d.fromHDR.Params.Clone(), + } fromHDR.Params.Add("tag", sip.GenerateTagN(32)) req.AppendHeader(&fromHDR) contentType := sip.ContentTypeHeader("Application/MANSCDP+xml") diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index aebb18b..12e9690 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -438,7 +438,6 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) { }, Params: sip.NewParams(), } - device.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) // 设置接收者 device.Recipient = sip.Uri{ diff --git a/plugin/gb28181/platform.go b/plugin/gb28181/platform.go index e3c2a80..decf818 100644 --- a/plugin/gb28181/platform.go +++ b/plugin/gb28181/platform.go @@ -979,7 +979,7 @@ func (p *Platform) buildChannelItem(channel gb28181.DeviceChannel) string { channel.RegisterWay, // 直接使用整数值 channel.Secrecy, // 直接使用整数值 parentID, - channel.Parental, // 直接使用整数值 + channel.Parental, // 直接使用整数值 channel.SafetyWay) // 直接使用整数值 } @@ -1012,19 +1012,20 @@ func (p *Platform) handleDeviceControl(req *sip.Request, tx sip.ServerTransactio } // 创建转发请求 - request := sip.NewRequest(sip.MESSAGE, device.Recipient) - - // 设置From头部,使用平台信息 - fromHeader := device.fromHDR - fromTag, _ := req.From().Params.Get("tag") - fromHeader.Params.Add("tag", fromTag) - request.AppendHeader(&fromHeader) - - // 添加To头部,使用设备信息 - toHeader := sip.ToHeader{ - Address: device.Recipient, - } - request.AppendHeader(&toHeader) + request := device.CreateRequest(sip.MESSAGE, nil) + //request := sip.NewRequest(sip.MESSAGE, device.Recipient) + // + //// 设置From头部,使用平台信息 + //fromHeader := device.fromHDR + //fromTag, _ := req.From().Params.Get("tag") + //fromHeader.Params.Add("tag", fromTag) + //request.AppendHeader(&fromHeader) + // + //// 添加To头部,使用设备信息 + //toHeader := sip.ToHeader{ + // Address: device.Recipient, + //} + //request.AppendHeader(&toHeader) // 添加Via头部 //viaHeader := sip.ViaHeader{ @@ -1039,8 +1040,8 @@ func (p *Platform) handleDeviceControl(req *sip.Request, tx sip.ServerTransactio //request.AppendHeader(&viaHeader) // 设置Content-Type - contentTypeHeader := sip.ContentTypeHeader("Application/MANSCDP+xml") - request.AppendHeader(&contentTypeHeader) + //contentTypeHeader := sip.ContentTypeHeader("Application/MANSCDP+xml") + //request.AppendHeader(&contentTypeHeader) // 直接使用原始消息体 request.SetBody(req.Body()) @@ -1049,7 +1050,7 @@ func (p *Platform) handleDeviceControl(req *sip.Request, tx sip.ServerTransactio request.SetTransport(strings.ToUpper(device.Transport)) // 发送请求 - _, err = device.client.Do(p, request) + _, err = device.send(request) if err != nil { p.Error("发送控制命令失败", "error", err.Error()) return fmt.Errorf("send control command failed: %v", err) diff --git a/plugin/gb28181/registerhandler.go b/plugin/gb28181/registerhandler.go index 697fa3e..cad8a2a 100644 --- a/plugin/gb28181/registerhandler.go +++ b/plugin/gb28181/registerhandler.go @@ -266,7 +266,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) { if sourceIPParse.IsPrivate() { // 源IP是内网IP myWanIP = myLanIP // 使用内网IP作为外网IP } - } else { // 目标地址是IP + } else { // 目标地址是IP if sourceIPParse.IsPrivate() { // 源IP是内网IP myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP } @@ -384,7 +384,7 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request, if sourceIPParse.IsPrivate() { // 源IP是内网IP myWanIP = myLanIP // 使用内网IP作为外网IP } - } else { // 目标地址是IP + } else { // 目标地址是IP if sourceIPParse.IsPrivate() { // 源IP是内网IP myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP } @@ -442,7 +442,6 @@ func (task *registerHandlerTask) StoreDevice(deviceid string, req *sip.Request, d.LocalPort = myPort d.Logger = task.gb.Logger.With("deviceid", deviceid) - d.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) // 根据设备访问的本地IP、端口和传输协议获取或创建对应的Client client, err := task.gb.getOrCreateClient(d.SipIp, d.LocalPort, d.Transport) if err != nil {