diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index 62b087a..c5cbb9b 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -140,7 +140,7 @@ func (r *CatalogRequest) IsComplete(channelsLength int) bool { } func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) { - d.Debug("into onMessage,deviceid is ", d.DeviceId) + d.plugin.Debug("into onMessage,deviceid is ", d.DeviceId) source := req.Source() hostname, portStr, _ := net.SplitHostPort(source) port, _ := strconv.Atoi(portStr) diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index c1a8f2b..45ae96d 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -42,7 +42,7 @@ type GB28181Plugin struct { pb.UnimplementedApiServer m7s.Plugin Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001 - Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 + Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 Password string Sip SipConfig MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围 @@ -545,13 +545,17 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction) d.Stop(errors.New("unregister")) } } else { - if d, ok := gb.devices.Get(deviceid); ok && d.Online { + if d, ok := gb.devices.Get(deviceid); ok { gb.Info("into recoverdevice", "deviceId", d.DeviceId) d.Status = DeviceOnlineStatus - gb.RecoverDevice(d, req) + gb.StoreDevice(deviceid, req, d) } else { + d := &Device{ + DeviceId: deviceid, + } + gb.devices.Set(d) gb.Info("into StoreDevice", "deviceId", from) - gb.StoreDevice(deviceid, req) + gb.StoreDevice(deviceid, req, d) } } } @@ -593,7 +597,7 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { gb.Debug("00000000000001,deviceid is ", id) // 如果设备和平台都存在,通过源地址判断真实来源 - if d != nil && p != nil { + if d != nil && d.Online && p != nil { source := req.Source() if d.HostAddress == source { // 如果源地址匹配设备地址,则确认是设备消息 @@ -606,7 +610,7 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { gb.Debug("00000000000002,deviceid is ", id) // 如果既不是设备也不是平台,返回404 - if d == nil && p == nil { + if (d == nil && p == nil) || (d != nil && !d.Online) { var response *sip.Response gb.Info("OnMessage", "error", "device/platform not found", "id", id) response = sip.NewResponseFromRequest(req, sip.StatusNotFound, "Not Found", nil) @@ -619,7 +623,7 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { gb.Debug("00000000000003,deviceid is ", id) // 根据来源调用不同的处理方法 - if d != nil { + if d != nil && d.Online { d.UpdateTime = time.Now() if err = d.onMessage(req, tx, temp); err != nil { gb.Error("onMessage", "error", err.Error(), "type", "device,deviceid is", d.DeviceId) @@ -750,7 +754,7 @@ func (gb *GB28181Plugin) RecoverDevice(d *Device, req *sip.Request) { if sourceIPParse.IsPrivate() { // 源IP是内网IP myWanIP = myLanIP // 使用内网IP作为外网IP } - } else { // 目标地址是IP + } else { // 目标地址是IP if sourceIPParse.IsPrivate() { // 源IP是内网IP myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP } @@ -790,21 +794,22 @@ func (gb *GB28181Plugin) RecoverDevice(d *Device, req *sip.Request) { d.client, _ = sipgo.NewClient(gb.ua, sipgo.WithClientLogger(zerolog.New(os.Stdout)), sipgo.WithClientHostname(d.SipIp)) d.channels.L = new(sync.RWMutex) d.catalogReqs.L = new(sync.RWMutex) - d.Info("StoreDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort) + d.plugin = gb + d.plugin.Info("RecoverDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort) if gb.DB != nil { - var existing Device - if err := gb.DB.First(&existing, Device{DeviceId: d.DeviceId}).Error; err == nil { - d.ID = existing.ID // 保持原有的自增ID - gb.Info("RecoverDevice", "type", "更新设备", "deviceId", d.DeviceId) - } else { - gb.Info("RecoverDevice", "type", "新增设备", "deviceId", d.DeviceId) - } + //var existing Device + //if err := gb.DB.First(&existing, Device{DeviceId: d.DeviceId}).Error; err == nil { + // d.ID = existing.ID // 保持原有的自增ID + // gb.Info("RecoverDevice", "type", "更新设备", "deviceId", d.DeviceId) + //} else { + // gb.Info("RecoverDevice", "type", "新增设备", "deviceId", d.DeviceId) + //} gb.DB.Save(d) } } -func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Device) { +func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request, d *Device) { source := req.Source() sourceIP, sourcePortStr, _ := net.SplitHostPort(source) sourcePort, _ := strconv.Atoi(sourcePortStr) @@ -854,7 +859,7 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi if sourceIPParse.IsPrivate() { // 源IP是内网IP myWanIP = myLanIP // 使用内网IP作为外网IP } - } else { // 目标地址是IP + } else { // 目标地址是IP if sourceIPParse.IsPrivate() { // 源IP是内网IP myLanIP, myWanIP = myIP, myIP // 使用目标IP作为内外网IP } @@ -869,48 +874,45 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi } now := time.Now() - d = &Device{ - DeviceId: deviceid, - CreateTime: now, - UpdateTime: now, - RegisterTime: now, - KeepaliveTime: now, - Status: DeviceOnlineStatus, - Online: true, - StreamMode: "TCP-PASSIVE", // 默认UDP传输 - Charset: "GB2312", // 默认GB2312字符集 - GeoCoordSys: "WGS84", // 默认WGS84坐标系 - Transport: req.Transport(), // 传输协议 - IP: sourceIP, - Port: sourcePort, - HostAddress: sourceIP + ":" + sourcePortStr, - SipIp: myLanIP, - MediaIp: myWanIP, - Expires: int(expSec), - eventChan: make(chan any, 10), - Recipient: sip.Uri{ - Host: sourceIP, - Port: sourcePort, - User: deviceid, - }, - contactHDR: sip.ContactHeader{ - Address: sip.Uri{ - User: gb.Serial, - Host: myWanIP, - Port: myPort, - }, - }, - fromHDR: sip.FromHeader{ - Address: sip.Uri{ - User: gb.Serial, - Host: myWanIP, - Port: myPort, - }, - Params: sip.NewParams(), - }, - plugin: gb, - LocalPort: myPort, + d.CreateTime = now + d.UpdateTime = now + d.RegisterTime = now + d.KeepaliveTime = now + d.Status = DeviceOnlineStatus + d.Online = true + d.StreamMode = "TCP-PASSIVE" // 默认UDP传输 + d.Charset = "GB2312" // 默认GB2312字符集 + d.GeoCoordSys = "WGS84" // 默认WGS84坐标系 + d.Transport = req.Transport() // 传输协议 + d.IP = sourceIP + d.Port = sourcePort + d.HostAddress = sourceIP + ":" + sourcePortStr + d.SipIp = myLanIP + d.MediaIp = myWanIP + d.Expires = int(expSec) + d.eventChan = make(chan any, 10) + d.Recipient = sip.Uri{ + Host: sourceIP, + Port: sourcePort, + User: deviceid, } + d.contactHDR = sip.ContactHeader{ + Address: sip.Uri{ + User: gb.Serial, + Host: myWanIP, + Port: myPort, + }, + } + d.fromHDR = sip.FromHeader{ + Address: sip.Uri{ + User: gb.Serial, + Host: myWanIP, + Port: myPort, + }, + Params: sip.NewParams(), + } + d.plugin = gb + d.LocalPort = myPort d.Logger = gb.With("deviceid", deviceid) d.fromHDR.Params.Add("tag", sip.GenerateTagN(16)) @@ -949,7 +951,7 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi } } }) - gb.AddTask(d) + gb.AddTask(d).WaitStarted() if gb.DB != nil { var existing Device @@ -962,7 +964,6 @@ func (gb *GB28181Plugin) StoreDevice(deviceid string, req *sip.Request) (d *Devi gb.Info("StoreDevice", "type", "新增设备", "deviceId", d.DeviceId) } } - return } func (gb *GB28181Plugin) Pull(streamPath string, conf config.Pull, pubConf *config.Publish) {