fix: gb28181 check from.Address.User when onRegister,delete device from db when device is not register

This commit is contained in:
pggiroro
2025-06-11 22:17:04 +08:00
parent 0b731e468b
commit 23f2ed39a1
4 changed files with 71 additions and 24 deletions

View File

@@ -3,7 +3,6 @@ package plugin_gb28181pro
import (
"context"
"fmt"
"gorm.io/gorm"
"net/http"
"net/url"
"os"
@@ -12,6 +11,8 @@ import (
"sync"
"time"
"gorm.io/gorm"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"m7s.live/v5/pkg/util"
@@ -2849,14 +2850,43 @@ func (gb *GB28181Plugin) RemoveDevice(ctx context.Context, req *pb.RemoveDeviceR
device.Stop(fmt.Errorf("device removed"))
device.WaitStopped()
// device.Stop() 会调用 Dispose(),其中已包含从 gb.devices 中移除设备的逻辑
} else {
resp.Code = 404
resp.Message = "设备未找到"
return resp, nil
// 开启数据库事务
tx := gb.DB.Begin()
if tx.Error != nil {
resp.Code = 500
resp.Message = "开启事务失败"
return resp, tx.Error
}
// 删除设备
if err := tx.Delete(&Device{DeviceId: req.Id}).Error; err != nil {
tx.Rollback()
resp.Code = 500
resp.Message = "删除设备失败"
return resp, err
}
// 删除设备关联的通道
if err := tx.Delete(&gb28181.DeviceChannel{DeviceID: req.Id}).Error; err != nil {
tx.Rollback()
resp.Code = 500
resp.Message = "删除设备通道失败"
return resp, err
}
// 提交事务
if err := tx.Commit().Error; err != nil {
tx.Rollback()
resp.Code = 500
resp.Message = "提交事务失败"
return resp, err
}
resp.Code = 200
resp.Message = "设备删除成功"
}
resp.Code = 0
resp.Message = "success"
return resp, nil
}

View File

@@ -138,7 +138,7 @@ func (r *CatalogRequest) IsComplete(channelsLength int) bool {
}
func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) {
d.plugin.Debug("into onMessage,deviceid is ", d.DeviceId)
d.plugin.Trace("into onMessage,deviceid is ", d.DeviceId)
source := req.Source()
hostname, portStr, _ := net.SplitHostPort(source)
port, _ := strconv.Atoi(portStr)
@@ -159,7 +159,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
case "Keepalive":
d.KeepaliveInterval = int(time.Since(d.KeepaliveTime).Seconds())
d.KeepaliveTime = time.Now()
d.Debug("into keeplive,deviceid is ", d.DeviceId, "d.KeepaliveTime is", d.KeepaliveTime)
d.Trace("into keeplive,deviceid is ", d.DeviceId, "d.KeepaliveTime is", d.KeepaliveTime)
if d.plugin.DB != nil {
if err := d.plugin.DB.Model(d).Updates(map[string]interface{}{
"keepalive_interval": d.KeepaliveInterval,
@@ -189,7 +189,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
if d.plugin.DB != nil {
// 如果是第一个响应,先清空现有通道
if isFirst {
d.Debug("清空现有通道", "deviceId", d.DeviceId)
d.Trace("清空现有通道", "deviceId", d.DeviceId)
if err := d.plugin.DB.Where("device_id = ?", d.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil {
d.Error("删除通道失败", "error", err, "deviceId", d.DeviceId)
}
@@ -213,7 +213,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
// 更新当前设备的通道数
d.ChannelCount = msg.SumNum
d.UpdateTime = time.Now()
d.Debug("save channel", "deviceid", d.DeviceId, "channels count", d.channels.Length)
d.Trace("save channel", "deviceid", d.DeviceId, "channels count", d.channels.Length)
if err := d.plugin.DB.Model(d).Updates(map[string]interface{}{
"channel_count": d.ChannelCount,
"update_time": d.UpdateTime,
@@ -401,12 +401,12 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
func (d *Device) send(req *sip.Request) (*sip.Response, error) {
d.SN++
d.Debug("send", "req", req.String())
d.Trace("send", "req", req.String())
return d.client.Do(context.Background(), req)
}
func (d *Device) Go() (err error) {
d.Debug("into device.Go,deviceid is ", d.DeviceId)
d.Trace("into device.Go,deviceid is ", d.DeviceId)
var response *sip.Response
// 初始化catalogReqs
@@ -424,7 +424,7 @@ func (d *Device) Go() (err error) {
if err != nil {
d.Error("catalog", "err", err)
} else {
d.Debug("catalog", "response", response.String())
d.Trace("catalog", "response", response.String())
}
// 创建并启动目录订阅任务
@@ -451,7 +451,7 @@ func (d *Device) Go() (err error) {
select {
case <-d.Done():
case <-keepLiveTick.C:
d.Debug("keepLiveTick,deviceid is", d.DeviceId, "d.KeepaliveTime is ", d.KeepaliveTime)
d.Trace("keepLiveTick,deviceid is", d.DeviceId, "d.KeepaliveTime is ", d.KeepaliveTime)
if timeDiff := time.Since(d.KeepaliveTime); timeDiff > time.Duration(3*keepaliveSeconds)*time.Second {
d.Online = false
d.Status = DeviceOfflineStatus
@@ -472,7 +472,7 @@ func (d *Device) Go() (err error) {
if err != nil {
d.Error("catalog", "err", err)
} else {
d.Debug("catalogTick", "response", response.String())
d.Trace("catalogTick", "response", response.String())
}
//case event := <-d.eventChan:
// d.Debug("eventChan", "event", event)

View File

@@ -149,6 +149,7 @@ func (d *Dialog) Start() (err error) {
}
sdpInfo = append(sdpInfo, mediaLine)
sdpInfo = append(sdpInfo, "a=recvonly")
if d.stream != "" {
sdpInfo = append(sdpInfo, "a="+d.stream)
@@ -248,14 +249,14 @@ func (d *Dialog) Start() (err error) {
}
func (d *Dialog) Run() (err error) {
d.Channel.Info("before WaitAnswer")
d.gb.Info("before WaitAnswer")
err = d.session.WaitAnswer(d.gb, sipgo.AnswerOptions{})
d.Channel.Info("after WaitAnswer")
d.gb.Info("after WaitAnswer")
if err != nil {
return errors.New("wait answer error" + err.Error())
}
inviteResponseBody := string(d.session.InviteResponse.Body())
d.Channel.Info("inviteResponse", "body", inviteResponseBody)
d.gb.Info("inviteResponse", "body", inviteResponseBody)
ds := strings.Split(inviteResponseBody, "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
@@ -302,6 +303,10 @@ func (d *Dialog) Run() (err error) {
}
pub.Receiver.StreamMode = d.StreamMode
d.AddTask(&pub.Receiver)
startResult := pub.Receiver.WaitStarted()
if startResult != nil {
return fmt.Errorf("pub.Receiver.WaitStarted %s", startResult)
}
pub.Demux()
return
}

View File

@@ -3,15 +3,17 @@ package plugin_gb28181pro
import (
"errors"
"fmt"
"m7s.live/v5/pkg"
"net/http"
"os"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"time"
"m7s.live/v5/pkg"
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/rs/zerolog"
@@ -39,7 +41,7 @@ type GB28181Plugin struct {
pb.UnimplementedApiServer
m7s.Plugin
Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000
Password string
Sip SipConfig
MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围
@@ -436,9 +438,22 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction)
from := req.From()
if from == nil || from.Address.User == "" {
gb.Error("OnRegister", "error", "no user")
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid sip from format", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
deviceId := from.Address.User
// 验证设备ID是否符合GB28181规范(20位数字)
if match, _ := regexp.MatchString(`^\d{20}$`, deviceId); !match {
gb.Error("OnRegister", "error", "invalid device id format, must be 20 digits", "deviceId", deviceId)
response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid device ID format", nil)
if err := tx.Respond(response); err != nil {
gb.Error("respond BadRequest", "error", err.Error())
}
return
}
registerHandlerTask := registerHandlerTask{
gb: gb,
req: req,
@@ -501,7 +516,6 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
}
}
gb.Debug("00000000000001,deviceid is ", id)
// 如果设备和平台都存在,通过源地址判断真实来源
if d != nil && d.Online && p != nil {
source := req.Source()
@@ -513,7 +527,6 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
d = nil
}
}
gb.Debug("00000000000002,deviceid is ", id)
// 如果既不是设备也不是平台返回404
if (d == nil && p == nil) || (d != nil && !d.Online) {
@@ -526,7 +539,6 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
gb.Debug("after on message respond")
return
}
gb.Debug("00000000000003,deviceid is ", id)
// 根据来源调用不同的处理方法
if d != nil && d.Online {