diff --git a/plugin/gb28181/api.go b/plugin/gb28181/api.go index 0d9b456..0db1ab0 100644 --- a/plugin/gb28181/api.go +++ b/plugin/gb28181/api.go @@ -3,7 +3,6 @@ package plugin_gb28181pro import ( "context" "fmt" - "gorm.io/gorm" "net/http" "net/url" "os" @@ -12,6 +11,8 @@ import ( "sync" "time" + "gorm.io/gorm" + "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "m7s.live/v5/pkg/util" @@ -2849,14 +2850,43 @@ func (gb *GB28181Plugin) RemoveDevice(ctx context.Context, req *pb.RemoveDeviceR device.Stop(fmt.Errorf("device removed")) device.WaitStopped() // device.Stop() 会调用 Dispose(),其中已包含从 gb.devices 中移除设备的逻辑 - } else { - resp.Code = 404 - resp.Message = "设备未找到" - return resp, nil + + // 开启数据库事务 + tx := gb.DB.Begin() + if tx.Error != nil { + resp.Code = 500 + resp.Message = "开启事务失败" + return resp, tx.Error + } + + // 删除设备 + if err := tx.Delete(&Device{DeviceId: req.Id}).Error; err != nil { + tx.Rollback() + resp.Code = 500 + resp.Message = "删除设备失败" + return resp, err + } + + // 删除设备关联的通道 + if err := tx.Delete(&gb28181.DeviceChannel{DeviceID: req.Id}).Error; err != nil { + tx.Rollback() + resp.Code = 500 + resp.Message = "删除设备通道失败" + return resp, err + } + + // 提交事务 + if err := tx.Commit().Error; err != nil { + tx.Rollback() + resp.Code = 500 + resp.Message = "提交事务失败" + return resp, err + } + + resp.Code = 200 + resp.Message = "设备删除成功" } - resp.Code = 0 - resp.Message = "success" return resp, nil } diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index 1e6c112..25ea6eb 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -138,7 +138,7 @@ func (r *CatalogRequest) IsComplete(channelsLength int) bool { } func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28181.Message) (err error) { - d.plugin.Debug("into onMessage,deviceid is ", d.DeviceId) + d.plugin.Trace("into onMessage,deviceid is ", d.DeviceId) source := req.Source() hostname, portStr, _ := net.SplitHostPort(source) port, _ := strconv.Atoi(portStr) @@ -159,7 +159,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 case "Keepalive": d.KeepaliveInterval = int(time.Since(d.KeepaliveTime).Seconds()) d.KeepaliveTime = time.Now() - d.Debug("into keeplive,deviceid is ", d.DeviceId, "d.KeepaliveTime is", d.KeepaliveTime) + d.Trace("into keeplive,deviceid is ", d.DeviceId, "d.KeepaliveTime is", d.KeepaliveTime) if d.plugin.DB != nil { if err := d.plugin.DB.Model(d).Updates(map[string]interface{}{ "keepalive_interval": d.KeepaliveInterval, @@ -189,7 +189,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 if d.plugin.DB != nil { // 如果是第一个响应,先清空现有通道 if isFirst { - d.Debug("清空现有通道", "deviceId", d.DeviceId) + d.Trace("清空现有通道", "deviceId", d.DeviceId) if err := d.plugin.DB.Where("device_id = ?", d.DeviceId).Delete(&gb28181.DeviceChannel{}).Error; err != nil { d.Error("删除通道失败", "error", err, "deviceId", d.DeviceId) } @@ -213,7 +213,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 // 更新当前设备的通道数 d.ChannelCount = msg.SumNum d.UpdateTime = time.Now() - d.Debug("save channel", "deviceid", d.DeviceId, "channels count", d.channels.Length) + d.Trace("save channel", "deviceid", d.DeviceId, "channels count", d.channels.Length) if err := d.plugin.DB.Model(d).Updates(map[string]interface{}{ "channel_count": d.ChannelCount, "update_time": d.UpdateTime, @@ -401,12 +401,12 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 func (d *Device) send(req *sip.Request) (*sip.Response, error) { d.SN++ - d.Debug("send", "req", req.String()) + d.Trace("send", "req", req.String()) return d.client.Do(context.Background(), req) } func (d *Device) Go() (err error) { - d.Debug("into device.Go,deviceid is ", d.DeviceId) + d.Trace("into device.Go,deviceid is ", d.DeviceId) var response *sip.Response // 初始化catalogReqs @@ -424,7 +424,7 @@ func (d *Device) Go() (err error) { if err != nil { d.Error("catalog", "err", err) } else { - d.Debug("catalog", "response", response.String()) + d.Trace("catalog", "response", response.String()) } // 创建并启动目录订阅任务 @@ -451,7 +451,7 @@ func (d *Device) Go() (err error) { select { case <-d.Done(): case <-keepLiveTick.C: - d.Debug("keepLiveTick,deviceid is", d.DeviceId, "d.KeepaliveTime is ", d.KeepaliveTime) + d.Trace("keepLiveTick,deviceid is", d.DeviceId, "d.KeepaliveTime is ", d.KeepaliveTime) if timeDiff := time.Since(d.KeepaliveTime); timeDiff > time.Duration(3*keepaliveSeconds)*time.Second { d.Online = false d.Status = DeviceOfflineStatus @@ -472,7 +472,7 @@ func (d *Device) Go() (err error) { if err != nil { d.Error("catalog", "err", err) } else { - d.Debug("catalogTick", "response", response.String()) + d.Trace("catalogTick", "response", response.String()) } //case event := <-d.eventChan: // d.Debug("eventChan", "event", event) diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index 5e4ffb5..81d4a50 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -149,6 +149,7 @@ func (d *Dialog) Start() (err error) { } sdpInfo = append(sdpInfo, mediaLine) + sdpInfo = append(sdpInfo, "a=recvonly") if d.stream != "" { sdpInfo = append(sdpInfo, "a="+d.stream) @@ -248,14 +249,14 @@ func (d *Dialog) Start() (err error) { } func (d *Dialog) Run() (err error) { - d.Channel.Info("before WaitAnswer") + d.gb.Info("before WaitAnswer") err = d.session.WaitAnswer(d.gb, sipgo.AnswerOptions{}) - d.Channel.Info("after WaitAnswer") + d.gb.Info("after WaitAnswer") if err != nil { return errors.New("wait answer error" + err.Error()) } inviteResponseBody := string(d.session.InviteResponse.Body()) - d.Channel.Info("inviteResponse", "body", inviteResponseBody) + d.gb.Info("inviteResponse", "body", inviteResponseBody) ds := strings.Split(inviteResponseBody, "\r\n") for _, l := range ds { if ls := strings.Split(l, "="); len(ls) > 1 { @@ -302,6 +303,10 @@ func (d *Dialog) Run() (err error) { } pub.Receiver.StreamMode = d.StreamMode d.AddTask(&pub.Receiver) + startResult := pub.Receiver.WaitStarted() + if startResult != nil { + return fmt.Errorf("pub.Receiver.WaitStarted %s", startResult) + } pub.Demux() return } diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index 9b254fd..482b3c9 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -3,15 +3,17 @@ package plugin_gb28181pro import ( "errors" "fmt" - "m7s.live/v5/pkg" "net/http" "os" + "regexp" "slices" "strconv" "strings" "sync" "time" + "m7s.live/v5/pkg" + "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" "github.com/rs/zerolog" @@ -39,7 +41,7 @@ type GB28181Plugin struct { pb.UnimplementedApiServer m7s.Plugin Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001 - Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 + Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 Password string Sip SipConfig MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围 @@ -436,9 +438,22 @@ func (gb *GB28181Plugin) OnRegister(req *sip.Request, tx sip.ServerTransaction) from := req.From() if from == nil || from.Address.User == "" { gb.Error("OnRegister", "error", "no user") + response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid sip from format", nil) + if err := tx.Respond(response); err != nil { + gb.Error("respond BadRequest", "error", err.Error()) + } return } deviceId := from.Address.User + // 验证设备ID是否符合GB28181规范(20位数字) + if match, _ := regexp.MatchString(`^\d{20}$`, deviceId); !match { + gb.Error("OnRegister", "error", "invalid device id format, must be 20 digits", "deviceId", deviceId) + response := sip.NewResponseFromRequest(req, sip.StatusBadRequest, "Invalid device ID format", nil) + if err := tx.Respond(response); err != nil { + gb.Error("respond BadRequest", "error", err.Error()) + } + return + } registerHandlerTask := registerHandlerTask{ gb: gb, req: req, @@ -501,7 +516,6 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { } } - gb.Debug("00000000000001,deviceid is ", id) // 如果设备和平台都存在,通过源地址判断真实来源 if d != nil && d.Online && p != nil { source := req.Source() @@ -513,7 +527,6 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { d = nil } } - gb.Debug("00000000000002,deviceid is ", id) // 如果既不是设备也不是平台,返回404 if (d == nil && p == nil) || (d != nil && !d.Online) { @@ -526,7 +539,6 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { gb.Debug("after on message respond") return } - gb.Debug("00000000000003,deviceid is ", id) // 根据来源调用不同的处理方法 if d != nil && d.Online {