diff --git a/plugin/gb28181/api.go b/plugin/gb28181/api.go index 28a5204..ec04a14 100644 --- a/plugin/gb28181/api.go +++ b/plugin/gb28181/api.go @@ -433,10 +433,10 @@ func (gb *GB28181Plugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceReque if !ok && gb.DB != nil { // 如果内存中没有且数据库存在,则从数据库查询 var device Device - if err := gb.DB.Where("id = ?", req.DeviceId).First(&device).Error; err == nil { + if err := gb.DB.Where("device_id = ?", req.DeviceId).First(&device).Error; err == nil { d = &device // 恢复设备的必要字段 - d.Logger = gb.Logger.With("id", req.DeviceId) + d.Logger = gb.With("deviceid", req.DeviceId) d.channels.L = new(sync.RWMutex) d.plugin = gb @@ -1143,7 +1143,7 @@ func (gb *GB28181Plugin) QueryRecord(ctx context.Context, req *pb.QueryRecordReq return resp, nil } - channel, ok := device.channels.Get(req.ChannelId) + channel, ok := device.channels.Get(req.DeviceId + "_" + req.ChannelId) if !ok { resp.Code = 404 resp.Message = "channel not found" @@ -1533,6 +1533,13 @@ func (gb *GB28181Plugin) AddPlatformChannel(ctx context.Context, req *pb.AddPlat resp.Message = fmt.Sprintf("提交事务失败: %v", err) return resp, nil } + if platform, ok := gb.platforms.Get(req.PlatformId); !ok { + for _, channelId := range req.ChannelIds { + if channel, ok := gb.channels.Get(channelId); ok { + platform.channels.Set(channel) + } + } + } resp.Code = 0 resp.Message = "success" @@ -1593,7 +1600,7 @@ func (gb *GB28181Plugin) Recording(ctx context.Context, req *pb.RecordingRequest } // 从device.channels中查找实际通道 - _, ok = actualDevice.channels.Get(result.ChannelID) + _, ok = actualDevice.channels.Get(result.DeviceID + "_" + result.ChannelID) if !ok { resp.Code = 404 resp.Message = "实际通道未找到" @@ -1626,7 +1633,7 @@ func (gb *GB28181Plugin) Recording(ctx context.Context, req *pb.RecordingRequest } // 检查通道是否存在 - _, ok = device.channels.Get(req.ChannelId) + _, ok = device.channels.Get(req.DeviceId + "_" + req.ChannelId) if !ok { resp.Code = 404 resp.Message = "通道未找到" @@ -1712,7 +1719,7 @@ func (gb *GB28181Plugin) GetSnap(ctx context.Context, req *pb.GetSnapRequest) (* } // 从device.channels中查找实际通道 - _, ok = actualDevice.channels.Get(result.ChannelID) + _, ok = actualDevice.channels.Get(result.DeviceID + "_" + result.ChannelID) if !ok { resp.Code = 404 resp.Message = "实际通道未找到" @@ -1756,7 +1763,7 @@ func (gb *GB28181Plugin) GetSnap(ctx context.Context, req *pb.GetSnapRequest) (* } // 检查通道是否存在 - _, ok = device.channels.Get(req.ChannelId) + _, ok = device.channels.Get(req.DeviceId + "_" + req.ChannelId) if !ok { resp.Code = 404 resp.Message = "通道未找到" diff --git a/plugin/gb28181/channel.go b/plugin/gb28181/channel.go index 4b1011a..ecd8fe8 100644 --- a/plugin/gb28181/channel.go +++ b/plugin/gb28181/channel.go @@ -51,11 +51,11 @@ type Channel struct { RecordReqs util.Collection[int, *RecordRequest] PresetReqs util.Collection[int, *PresetRequest] // 预置位请求集合 *slog.Logger - gb28181.DeviceChannel + *gb28181.DeviceChannel } func (c *Channel) GetKey() string { - return c.ChannelID + return c.ID } type PullProxy struct { @@ -75,7 +75,7 @@ func (p *PullProxy) Start() error { streamPaths := strings.Split(p.GetStreamPath(), "/") deviceId, channelId := streamPaths[0], streamPaths[1] if device, ok := p.Plugin.GetHandler().(*GB28181Plugin).devices.Get(deviceId); ok { - if _, ok := device.channels.Get(channelId); ok { + if _, ok := device.channels.Get(deviceId + "_" + channelId); ok { p.ChangeStatus(m7s.PullProxyStatusOnline) } } diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index c5cbb9b..238d490 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -230,7 +230,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 d.catalogReqs.RemoveByKey(msg.SN) } case "RecordInfo": - if channel, ok := d.channels.Get(msg.DeviceID); ok { + if channel, ok := d.channels.Get(d.DeviceId + "_" + msg.DeviceID); ok { if req, ok := channel.RecordReqs.Get(msg.SN); ok { // 添加响应并检查是否完成 if req.AddResponse(*msg) { @@ -239,7 +239,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28 } } case "PresetQuery": - if channel, ok := d.channels.Get(msg.DeviceID); ok { + if channel, ok := d.channels.Get(d.DeviceId + "_" + msg.DeviceID); ok { if req, ok := channel.PresetReqs.Get(msg.SN); ok { // 添加预置位响应 req.Response = msg.PresetList.Item @@ -616,15 +616,16 @@ func (d *Device) frontEndCmdString(cmdCode int32, parameter1 int32, parameter2 i } func (d *Device) addOrUpdateChannel(c gb28181.DeviceChannel) { - if channel, ok := d.channels.Get(c.ChannelID); ok { - channel.DeviceChannel = c + if channel, ok := d.channels.Get(c.ID); ok { + channel.DeviceChannel = &c } else { channel = &Channel{ Device: d, - Logger: d.Logger.With("channel", c.ChannelID), - DeviceChannel: c, + Logger: d.Logger.With("channel", c.ID), + DeviceChannel: &c, } d.channels.Set(channel) + d.plugin.channels.Set(channel.DeviceChannel) } } diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index 8c4cdd2..f0a75c6 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -83,7 +83,7 @@ func (d *Dialog) Start() (err error) { var device *Device if deviceTmp, ok := d.gb.devices.Get(deviceId); ok { device = deviceTmp - if channel, ok := deviceTmp.channels.Get(channelId); ok { + if channel, ok := deviceTmp.channels.Get(deviceId + "_" + channelId); ok { d.Channel = channel d.StreamMode = device.StreamMode } else { diff --git a/plugin/gb28181/forwarddialog.go b/plugin/gb28181/forwarddialog.go index c2b9d35..2dcad6d 100644 --- a/plugin/gb28181/forwarddialog.go +++ b/plugin/gb28181/forwarddialog.go @@ -70,7 +70,7 @@ func (d *ForwardDialog) Start() (err error) { var device *Device if deviceTmp, ok := d.gb.devices.Get(deviceId); ok { device = deviceTmp - if channel, ok := deviceTmp.channels.Get(channelId); ok { + if channel, ok := deviceTmp.channels.Get(deviceId + "_" + channelId); ok { d.channel = channel } else { return fmt.Errorf("channel %s not found", channelId) diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index cd1dbd7..cb67fca 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -56,6 +56,8 @@ type GB28181Plugin struct { SipIP string `desc:"sip发送命令的IP,一般是本地IP,多网卡时需要配置正确的IP"` MediaIP string `desc:"流媒体IP,用于接收流"` deviceManager task.Manager[string, *DeviceRegisterQueueTask] + Platforms []*gb28181.PlatformModel + channels util.Collection[string, *gb28181.DeviceChannel] } var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{ @@ -127,6 +129,7 @@ func (gb *GB28181Plugin) initDatabase() error { } func (gb *GB28181Plugin) OnInit() (err error) { + gb.Info("GB28181 initing", gb.Platforms) gb.AddTask(&gb.deviceManager) logger := zerolog.New(os.Stdout) gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent @@ -368,11 +371,44 @@ func (gb *GB28181Plugin) checkPlatform() { gb.Info("找到启用状态的平台", "count", len(platformModels)) + if gb.Platforms != nil && len(gb.Platforms) > 0 { + platformModels = append(platformModels, gb.Platforms...) + } + // 遍历所有平台进行初始化和注册 for _, platformModel := range platformModels { // 创建Platform实例 platform := NewPlatform(platformModel, gb, true) + if platformModel.PlatformChannels != nil && len(platformModel.PlatformChannels) > 0 { + for i := range platformModel.PlatformChannels { + channelDbId := platformModel.PlatformChannels[i].ChannelDBID + if channelDbId != "" { + if channel, ok := gb.channels.Get(channelDbId); ok { + platform.channels.Set(channel) + } + } + } + } else { + // 查询通道列表 + var channels []gb28181.DeviceChannel + if gb.DB != nil { + if err := gb.DB.Table("gb28181_channel gc"). + Select(`gc.*`). + Joins("left join gb28181_platform_channel gpc on gc.id=gpc.channel_db_id"). + Where("gpc.platform_server_gb_id = ? and gc.status='ON'", platformModel.ServerGBID). + Find(&channels).Error; err != nil { + gb.Error("", "error", err.Error()) + } + if channels != nil && len(channels) > 0 { + for i := range channels { + if channel, ok := gb.channels.Get(channels[i].ID); ok { + platform.channels.Set(channel) + } + } + } + } + } //go platform.Unregister() //if err != nil { // gb.Error("unregister err ", err) @@ -446,10 +482,15 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) { d, _ = gb.devices.Get(id) // 检查是否是平台 - if gb.DB != nil { - var platform gb28181.PlatformModel - if err := gb.DB.First(&platform, gb28181.PlatformModel{ServerGBID: id, Enable: true}).Error; err == nil { - p = &platform + //if gb.DB != nil { + // var platform gb28181.PlatformModel + // if err := gb.DB.First(&platform, gb28181.PlatformModel{ServerGBID: id, Enable: true}).Error; err == nil { + // p = &platform + // } + //} + if platformtmp, ok := gb.platforms.Get(id); ok { + if platformtmp.PlatformModel.Enable { + p = platformtmp.PlatformModel } } @@ -693,214 +734,222 @@ func (gb *GB28181Plugin) OnInvite(req *sip.Request, tx sip.ServerTransaction) { // 首先从数据库中查询平台 var platform *Platform - var platformModel = &gb28181.PlatformModel{} - if gb.DB != nil { - // 使用requesterId查询平台,类似于Java代码中的queryPlatformByServerGBId - result := gb.DB.Where("server_gb_id = ?", inviteInfo.RequesterId).First(&platformModel) - if result.Error == nil { - // 数据库中找到平台,根据平台ID从运行时实例中查找 - if platformTmp, platformFound := gb.platforms.Get(platformModel.ServerGBID); !platformFound { - gb.Error("OnInvite", "error", "platform found in DB but not in runtime", "platformId", inviteInfo.RequesterId) - _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Platform Not Found In Runtime", nil)) - return - } else { - platform = platformTmp - } + //var platformModel = &gb28181.PlatformModel{} + //if gb.DB != nil { + // // 使用requesterId查询平台,类似于Java代码中的queryPlatformByServerGBId + // result := gb.DB.Where("server_gb_id = ?", inviteInfo.RequesterId).First(&platformModel) + // if result.Error == nil { + // 数据库中找到平台,根据平台ID从运行时实例中查找 + if platformTmp, platformFound := gb.platforms.Get(inviteInfo.RequesterId); !platformFound { + gb.Error("OnInvite", "error", "platform found in DB but not in runtime", "platformId", inviteInfo.RequesterId) + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Platform Not Found In Runtime", nil)) + return + } else { + platform = platformTmp + } - gb.Info("OnInvite", "action", "platform found", "platformId", inviteInfo.RequesterId, "platformName", platform.PlatformModel.Name) + gb.Info("OnInvite", "action", "platform found", "platformId", inviteInfo.RequesterId, "platformName", platform.PlatformModel.Name) - // 使用GORM的模型查询方式,更加符合GORM的使用习惯 - // 默认情况下GORM会自动处理软删除,只查询未删除的记录 - var deviceChannels []gb28181.DeviceChannel - channelResult := gb.DB.Model(&gb28181.DeviceChannel{}). - Joins("LEFT JOIN gb28181_platform_channel ON gb28181_channel.id = gb28181_platform_channel.channel_db_id"). - Where("gb28181_platform_channel.platform_server_gb_id = ? AND gb28181_channel.channel_id = ?", - platform.PlatformModel.ServerGBID, inviteInfo.TargetChannelId). - Order("gb28181_channel.id"). - Find(&deviceChannels) + // 使用GORM的模型查询方式,更加符合GORM的使用习惯 + // 默认情况下GORM会自动处理软删除,只查询未删除的记录 + //var deviceChannels []gb28181.DeviceChannel + //channelResult := gb.DB.Model(&gb28181.DeviceChannel{}). + // Joins("LEFT JOIN gb28181_platform_channel ON gb28181_channel.id = gb28181_platform_channel.channel_db_id"). + // Where("gb28181_platform_channel.platform_server_gb_id = ? AND gb28181_channel.channel_id = ?", + // platform.PlatformModel.ServerGBID, inviteInfo.TargetChannelId). + // Order("gb28181_channel.id"). + // Find(&deviceChannels) + // + //if channelResult.Error != nil || len(deviceChannels) == 0 { + // gb.Error("OnInvite", "error", "channel not found", "channelId", inviteInfo.TargetChannelId, "err", channelResult.Error) + // _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Channel Not Found", nil)) + // return + //} - if channelResult.Error != nil || len(deviceChannels) == 0 { - gb.Error("OnInvite", "error", "channel not found", "channelId", inviteInfo.TargetChannelId, "err", channelResult.Error) - _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Channel Not Found", nil)) - return - } + // 找到了通道 + var channel *gb28181.DeviceChannel - // 找到了通道 - channel := deviceChannels[len(deviceChannels)-1] - gb.Info("OnInvite", "action", "channel found", "channelId", channel.ChannelID, "channelName", channel.Name) + platform.channels.Range(func(channelTmp *gb28181.DeviceChannel) bool { + if channelTmp.ChannelID == inviteInfo.TargetChannelId { + channel = channelTmp + } + return true + }) - var channelTmp *Channel - if deviceFound, ok := gb.devices.Get(channel.DeviceID); ok { - if channelFound, ok := deviceFound.channels.Get(channel.ChannelID); ok { - channelTmp = channelFound - } else { - gb.Error("OnInvite", "channel not found memory,ChannelID is ", channel.ChannelID) - _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil)) - return - } - } else { - gb.Error("OnInvite", "device not found memory,deviceID is ", channel.DeviceID) - _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil)) - return - } + gb.Info("OnInvite", "action", "channel found", "channelId", channel.ChannelID, "channelName", channel.Name) - // 通道存在,发送100 Trying响应 - tryingResp := sip.NewResponseFromRequest(req, sip.StatusTrying, "Trying", nil) - if err := tx.Respond(tryingResp); err != nil { - gb.Error("OnInvite", "error", "send trying response failed", "err", err.Error()) - return - } - - // 检查SSRC - if inviteInfo.SSRC == "" { - gb.Error("OnInvite", "error", "ssrc not found in invite") - _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil)) - return - } - - // 获取媒体信息 - mediaPort := uint16(0) - if gb.MediaPort.Valid() { - select { - case port := <-gb.tcpPorts: - mediaPort = port - gb.Debug("OnInvite", "action", "allocate port", "port", port) - default: - gb.Error("OnInvite", "error", "no available port") - _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "No Available Port", nil)) - return - } - } else { - mediaPort = gb.MediaPort[0] - gb.Debug("OnInvite", "action", "use default port", "port", mediaPort) - } - - // 构建SDP响应 - // 使用平台和通道的信息构建响应 - sdpIP := platform.PlatformModel.DeviceIP - // 如果平台配置了SendStreamIP,则使用此IP - if platform.PlatformModel.SendStreamIP != "" { - sdpIP = platform.PlatformModel.SendStreamIP - } - - // 构建SDP内容,参考Java代码createSendSdp方法 - content := []string{ - "v=0", - fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.ChannelID, sdpIP), - fmt.Sprintf("s=%s", inviteInfo.SessionName), - fmt.Sprintf("c=IN IP4 %s", sdpIP), - } - - // 处理播放时间 - if strings.EqualFold("Playback", inviteInfo.SessionName) && inviteInfo.StartTime > 0 && inviteInfo.StopTime > 0 { - content = append(content, fmt.Sprintf("t=%d %d", inviteInfo.StartTime, inviteInfo.StopTime)) - } else { - content = append(content, "t=0 0") - } - - // 处理传输模式 - if inviteInfo.TCP { - content = append(content, fmt.Sprintf("m=video %d TCP/RTP/AVP 96", mediaPort)) - if inviteInfo.TCPActive { - content = append(content, "a=setup:passive") - } else { - content = append(content, "a=setup:active") - } - if inviteInfo.TCP { - content = append(content, "a=connection:new") - } - } else { - content = append(content, fmt.Sprintf("m=video %d RTP/AVP 96", mediaPort)) - } - - // 添加其他属性,参考Java代码 - content = append(content, - "a=sendonly", - "a=rtpmap:96 PS/90000", - fmt.Sprintf("y=%s", inviteInfo.SSRC), - "f=", - ) - - // 发送200 OK响应 - response := sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil) - contentType := sip.ContentTypeHeader("application/sdp") - response.AppendHeader(&contentType) - response.SetBody([]byte(strings.Join(content, "\r\n") + "\r\n")) - - // 创建并保存SendRtpInfo,以供OnAck方法使用 - forwardDialog := &ForwardDialog{ - gb: gb, - platformIP: inviteInfo.IP, - platformPort: inviteInfo.Port, - platformSSRC: inviteInfo.SSRC, - TCP: inviteInfo.TCP, - TCPActive: inviteInfo.TCPActive, - platformCallId: req.CallID().Value(), - start: inviteInfo.StartTime, - end: inviteInfo.StopTime, - channel: channelTmp, - upIP: sdpIP, - upPort: mediaPort, - } - forwardDialog.forwarder = gb28181.NewRTPForwarder() - forwardDialog.forwarder.TCP = forwardDialog.TCP - forwardDialog.forwarder.TCPActive = forwardDialog.TCPActive - forwardDialog.forwarder.StreamMode = forwardDialog.channel.Device.StreamMode - - if forwardDialog.TCPActive { - forwardDialog.forwarder.UpListenAddr = fmt.Sprintf(":%d", forwardDialog.upPort) - } else { - forwardDialog.forwarder.UpListenAddr = fmt.Sprintf("%s:%d", forwardDialog.upIP, forwardDialog.platformPort) - } - - // 设置监听地址和端口 - if strings.ToUpper(forwardDialog.channel.Device.StreamMode) == "TCP-ACTIVE" { - forwardDialog.forwarder.DownListenAddr = fmt.Sprintf("%s:%d", forwardDialog.downIP, forwardDialog.downPort) - } else { - forwardDialog.forwarder.DownListenAddr = fmt.Sprintf(":%d", forwardDialog.MediaPort) - } - - // 设置转发目标 - if inviteInfo.IP != "" && forwardDialog.platformPort > 0 { - err = forwardDialog.forwarder.SetTarget(forwardDialog.platformIP, forwardDialog.platformPort) - if err != nil { - gb.Error("set target error", "err", err) - return - } - } else { - gb.Error("no target set, will only receive but not forward") - return - } - - // 设置目标SSRC - if forwardDialog.platformSSRC != "" { - forwardDialog.forwarder.TargetSSRC = forwardDialog.platformSSRC - gb.Info("set target ssrc", "ssrc", forwardDialog.platformSSRC) - } - // 保存到集合中 - gb.forwardDialogs.Set(forwardDialog) - gb.Info("OnInvite", "action", "sendRtpInfo created", "callId", req.CallID().Value()) - - if err := tx.Respond(response); err != nil { - gb.Error("OnInvite", "error", "send response failed", "err", err.Error()) - return - } - - gb.Info("OnInvite", "action", "complete", "platformId", inviteInfo.RequesterId, "channelId", channel.ChannelID, - "ip", inviteInfo.IP, "port", inviteInfo.Port, "tcp", inviteInfo.TCP, "tcpActive", inviteInfo.TCPActive) - return + var channelTmp *Channel + if deviceFound, ok := gb.devices.Get(channel.DeviceID); ok { + if channelFound, ok := deviceFound.channels.Get(channel.ID); ok { + channelTmp = channelFound } else { - // 数据库中未找到平台,响应not found - gb.Error("OnInvite", "error", "platform not found in database", "platformId", inviteInfo.RequesterId) - _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Platform Not Found", nil)) + gb.Error("OnInvite", "channel not found memory,ChannelID is ", channel.ChannelID) + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil)) return } } else { - // 数据库未初始化,响应服务不可用 - gb.Error("OnInvite", "error", "database not initialized") - _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "Database Not Initialized", nil)) + gb.Error("OnInvite", "device not found memory,deviceID is ", channel.DeviceID) + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil)) return } + + // 通道存在,发送100 Trying响应 + tryingResp := sip.NewResponseFromRequest(req, sip.StatusTrying, "Trying", nil) + if err := tx.Respond(tryingResp); err != nil { + gb.Error("OnInvite", "error", "send trying response failed", "err", err.Error()) + return + } + + // 检查SSRC + if inviteInfo.SSRC == "" { + gb.Error("OnInvite", "error", "ssrc not found in invite") + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil)) + return + } + + // 获取媒体信息 + mediaPort := uint16(0) + if gb.MediaPort.Valid() { + select { + case port := <-gb.tcpPorts: + mediaPort = port + gb.Debug("OnInvite", "action", "allocate port", "port", port) + default: + gb.Error("OnInvite", "error", "no available port") + _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "No Available Port", nil)) + return + } + } else { + mediaPort = gb.MediaPort[0] + gb.Debug("OnInvite", "action", "use default port", "port", mediaPort) + } + + // 构建SDP响应 + // 使用平台和通道的信息构建响应 + sdpIP := platform.PlatformModel.DeviceIP + // 如果平台配置了SendStreamIP,则使用此IP + if platform.PlatformModel.SendStreamIP != "" { + sdpIP = platform.PlatformModel.SendStreamIP + } + + // 构建SDP内容,参考Java代码createSendSdp方法 + content := []string{ + "v=0", + fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.ChannelID, sdpIP), + fmt.Sprintf("s=%s", inviteInfo.SessionName), + fmt.Sprintf("c=IN IP4 %s", sdpIP), + } + + // 处理播放时间 + if strings.EqualFold("Playback", inviteInfo.SessionName) && inviteInfo.StartTime > 0 && inviteInfo.StopTime > 0 { + content = append(content, fmt.Sprintf("t=%d %d", inviteInfo.StartTime, inviteInfo.StopTime)) + } else { + content = append(content, "t=0 0") + } + + // 处理传输模式 + if inviteInfo.TCP { + content = append(content, fmt.Sprintf("m=video %d TCP/RTP/AVP 96", mediaPort)) + if inviteInfo.TCPActive { + content = append(content, "a=setup:passive") + } else { + content = append(content, "a=setup:active") + } + if inviteInfo.TCP { + content = append(content, "a=connection:new") + } + } else { + content = append(content, fmt.Sprintf("m=video %d RTP/AVP 96", mediaPort)) + } + + // 添加其他属性,参考Java代码 + content = append(content, + "a=sendonly", + "a=rtpmap:96 PS/90000", + fmt.Sprintf("y=%s", inviteInfo.SSRC), + "f=", + ) + + // 发送200 OK响应 + response := sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil) + contentType := sip.ContentTypeHeader("application/sdp") + response.AppendHeader(&contentType) + response.SetBody([]byte(strings.Join(content, "\r\n") + "\r\n")) + + // 创建并保存SendRtpInfo,以供OnAck方法使用 + forwardDialog := &ForwardDialog{ + gb: gb, + platformIP: inviteInfo.IP, + platformPort: inviteInfo.Port, + platformSSRC: inviteInfo.SSRC, + TCP: inviteInfo.TCP, + TCPActive: inviteInfo.TCPActive, + platformCallId: req.CallID().Value(), + start: inviteInfo.StartTime, + end: inviteInfo.StopTime, + channel: channelTmp, + upIP: sdpIP, + upPort: mediaPort, + } + forwardDialog.forwarder = gb28181.NewRTPForwarder() + forwardDialog.forwarder.TCP = forwardDialog.TCP + forwardDialog.forwarder.TCPActive = forwardDialog.TCPActive + forwardDialog.forwarder.StreamMode = forwardDialog.channel.Device.StreamMode + + if forwardDialog.TCPActive { + forwardDialog.forwarder.UpListenAddr = fmt.Sprintf(":%d", forwardDialog.upPort) + } else { + forwardDialog.forwarder.UpListenAddr = fmt.Sprintf("%s:%d", forwardDialog.upIP, forwardDialog.platformPort) + } + + // 设置监听地址和端口 + if strings.ToUpper(forwardDialog.channel.Device.StreamMode) == "TCP-ACTIVE" { + forwardDialog.forwarder.DownListenAddr = fmt.Sprintf("%s:%d", forwardDialog.downIP, forwardDialog.downPort) + } else { + forwardDialog.forwarder.DownListenAddr = fmt.Sprintf(":%d", forwardDialog.MediaPort) + } + + // 设置转发目标 + if inviteInfo.IP != "" && forwardDialog.platformPort > 0 { + err = forwardDialog.forwarder.SetTarget(forwardDialog.platformIP, forwardDialog.platformPort) + if err != nil { + gb.Error("set target error", "err", err) + return + } + } else { + gb.Error("no target set, will only receive but not forward") + return + } + + // 设置目标SSRC + if forwardDialog.platformSSRC != "" { + forwardDialog.forwarder.TargetSSRC = forwardDialog.platformSSRC + gb.Info("set target ssrc", "ssrc", forwardDialog.platformSSRC) + } + // 保存到集合中 + gb.forwardDialogs.Set(forwardDialog) + gb.Info("OnInvite", "action", "sendRtpInfo created", "callId", req.CallID().Value()) + + if err := tx.Respond(response); err != nil { + gb.Error("OnInvite", "error", "send response failed", "err", err.Error()) + return + } + + gb.Info("OnInvite", "action", "complete", "platformId", inviteInfo.RequesterId, "channelId", channel.ChannelID, + "ip", inviteInfo.IP, "port", inviteInfo.Port, "tcp", inviteInfo.TCP, "tcpActive", inviteInfo.TCPActive) + return + //} else { + // // 数据库中未找到平台,响应not found + // gb.Error("OnInvite", "error", "platform not found in database", "platformId", inviteInfo.RequesterId) + // _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Platform Not Found", nil)) + // return + //} + //} else { + // // 数据库未初始化,响应服务不可用 + // gb.Error("OnInvite", "error", "database not initialized") + // _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "Database Not Initialized", nil)) + // return + //} } func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) { diff --git a/plugin/gb28181/pkg/devicechannel.go b/plugin/gb28181/pkg/devicechannel.go index fe40649..830744d 100644 --- a/plugin/gb28181/pkg/devicechannel.go +++ b/plugin/gb28181/pkg/devicechannel.go @@ -300,3 +300,7 @@ func (d *DeviceChannel) appendInfoContent(content *string) { *content += " " + strconv.Itoa(d.SVCTimeSupportMode) + "\n" } } + +func (d *DeviceChannel) GetKey() string { + return d.ID +} diff --git a/plugin/gb28181/pkg/platformchannel.go b/plugin/gb28181/pkg/platformchannel.go index d488388..355a3c3 100644 --- a/plugin/gb28181/pkg/platformchannel.go +++ b/plugin/gb28181/pkg/platformchannel.go @@ -66,3 +66,8 @@ type PlatformChannel struct { func (*PlatformChannel) TableName() string { return "gb28181_platform_channel" } + +func (p *PlatformChannel) GetKey() string { + return p.PlatformServerGBID + "_" + p.ChannelDBID + +} diff --git a/plugin/gb28181/pkg/platformmodel.go b/plugin/gb28181/pkg/platformmodel.go index 23cec16..68ee85d 100644 --- a/plugin/gb28181/pkg/platformmodel.go +++ b/plugin/gb28181/pkg/platformmodel.go @@ -9,43 +9,44 @@ import ( // 包含了平台的基本信息、SIP服务配置、设备信息、认证信息等。 // 用于存储和管理GB28181平台的所有相关参数。 type PlatformModel struct { - Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用 - Name string `gorm:"column:name;omitempty" json:"name"` // Name表示平台的名称 - ServerGBID string `gorm:"primaryKey;column:server_gb_id;omitempty" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码 - ServerGBDomain string `gorm:"column:server_gb_domain;omitempty" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域 - ServerIP string `gorm:"column:server_ip;omitempty" json:"serverIp"` // ServerIP表示SIP服务器的IP地址 - ServerPort int `gorm:"column:server_port;omitempty" json:"serverPort"` // ServerPort表示SIP服务器的端口号 - DeviceGBID string `gorm:"column:device_gb_id;omitempty" json:"deviceGBId"` // DeviceGBID表示设备的国标编号 - DeviceIP string `gorm:"column:device_ip;omitempty" json:"deviceIp"` // DeviceIP表示设备的IP地址 - DevicePort int `gorm:"column:device_port;omitempty" json:"devicePort"` // DevicePort表示设备的端口号 - Username string `gorm:"column:username;omitempty" json:"username"` // Username表示SIP认证的用户名,默认使用设备国标编号 - Password string `gorm:"column:password;omitempty" json:"password"` // Password表示SIP认证的密码 - Expires int `gorm:"column:expires;omitempty" json:"expires"` // Expires表示注册的过期时间,单位为秒 - KeepTimeout int `gorm:"column:keep_timeout;omitempty" json:"keepTimeout"` // KeepTimeout表示心跳超时时间,单位为秒 - Transport string `gorm:"column:transport;omitempty" json:"transport"` // Transport表示传输协议类型 - CharacterSet string `gorm:"column:character_set;omitempty" json:"characterSet"` // CharacterSet表示字符集编码 - PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制 - RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活 - Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态 - ChannelCount int `gorm:"column:channel_count;omitempty" json:"channelCount"` // ChannelCount表示通道数量 - CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息 - AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息 - MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息 - CatalogGroup int `gorm:"column:catalog_group;omitempty" json:"catalogGroup"` // CatalogGroup表示目录分组大小,每次向上级发送通道数量 - UpdateTime string `gorm:"column:update_time;omitempty" json:"updateTime"` // UpdateTime表示最后更新时间 - CreateTime string `gorm:"column:create_time;omitempty" json:"createTime"` // CreateTime表示创建时间 - AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用 - SendStreamIP string `gorm:"column:send_stream_ip;omitempty" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址 - AutoPushChannel bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化 - CatalogWithPlatform int `gorm:"column:catalog_with_platform;omitempty" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开) - CatalogWithGroup int `gorm:"column:catalog_with_group;omitempty" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开) - CatalogWithRegion int `gorm:"column:catalog_with_region;omitempty" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开) - CivilCode string `gorm:"column:civil_code;omitempty" json:"civilCode"` // CivilCode表示行政区划代码 - Manufacturer string `gorm:"column:manufacturer;omitempty" json:"manufacturer"` // Manufacturer表示平台厂商 - Model string `gorm:"column:model;omitempty" json:"model"` // Model表示平台型号 - Address string `gorm:"column:address;omitempty" json:"address"` // Address表示平台安装地址 - RegisterWay int `gorm:"column:register_way;omitempty" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证) - Secrecy int `gorm:"column:secrecy;omitempty" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密) + Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用 + Name string `gorm:"column:name;omitempty" json:"name"` // Name表示平台的名称 + ServerGBID string `gorm:"primaryKey;column:server_gb_id;omitempty" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码 + ServerGBDomain string `gorm:"column:server_gb_domain;omitempty" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域 + ServerIP string `gorm:"column:server_ip;omitempty" json:"serverIp"` // ServerIP表示SIP服务器的IP地址 + ServerPort int `gorm:"column:server_port;omitempty" json:"serverPort"` // ServerPort表示SIP服务器的端口号 + DeviceGBID string `gorm:"column:device_gb_id;omitempty" json:"deviceGBId"` // DeviceGBID表示设备的国标编号 + DeviceIP string `gorm:"column:device_ip;omitempty" json:"deviceIp"` // DeviceIP表示设备的IP地址 + DevicePort int `gorm:"column:device_port;omitempty" json:"devicePort"` // DevicePort表示设备的端口号 + Username string `gorm:"column:username;omitempty" json:"username"` // Username表示SIP认证的用户名,默认使用设备国标编号 + Password string `gorm:"column:password;omitempty" json:"password"` // Password表示SIP认证的密码 + Expires int `gorm:"column:expires;omitempty" json:"expires"` // Expires表示注册的过期时间,单位为秒 + KeepTimeout int `gorm:"column:keep_timeout;omitempty" json:"keepTimeout"` // KeepTimeout表示心跳超时时间,单位为秒 + Transport string `gorm:"column:transport;omitempty" json:"transport"` // Transport表示传输协议类型 + CharacterSet string `gorm:"column:character_set;omitempty" json:"characterSet"` // CharacterSet表示字符集编码 + PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制 + RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活 + Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态 + ChannelCount int `gorm:"column:channel_count;omitempty" json:"channelCount"` // ChannelCount表示通道数量 + CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息 + AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息 + MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息 + CatalogGroup int `gorm:"column:catalog_group;omitempty" json:"catalogGroup"` // CatalogGroup表示目录分组大小,每次向上级发送通道数量 + UpdateTime string `gorm:"column:update_time;omitempty" json:"updateTime"` // UpdateTime表示最后更新时间 + CreateTime string `gorm:"column:create_time;omitempty" json:"createTime"` // CreateTime表示创建时间 + AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用 + SendStreamIP string `gorm:"column:send_stream_ip;omitempty" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址 + AutoPushChannel bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化 + CatalogWithPlatform int `gorm:"column:catalog_with_platform;omitempty" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开) + CatalogWithGroup int `gorm:"column:catalog_with_group;omitempty" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开) + CatalogWithRegion int `gorm:"column:catalog_with_region;omitempty" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开) + CivilCode string `gorm:"column:civil_code;omitempty" json:"civilCode"` // CivilCode表示行政区划代码 + Manufacturer string `gorm:"column:manufacturer;omitempty" json:"manufacturer"` // Manufacturer表示平台厂商 + Model string `gorm:"column:model;omitempty" json:"model"` // Model表示平台型号 + Address string `gorm:"column:address;omitempty" json:"address"` // Address表示平台安装地址 + RegisterWay int `gorm:"column:register_way;omitempty" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证) + Secrecy int `gorm:"column:secrecy;omitempty" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密) + PlatformChannels []*PlatformChannel `gorm:"-:all"` } // TableName 指定数据库表名 diff --git a/plugin/gb28181/platform.go b/plugin/gb28181/platform.go index 2f9d113..892fe4e 100644 --- a/plugin/gb28181/platform.go +++ b/plugin/gb28181/platform.go @@ -3,6 +3,7 @@ package plugin_gb28181pro import ( "context" "fmt" + "m7s.live/v5/pkg/util" "net/http" "strconv" "strings" @@ -40,6 +41,7 @@ type Platform struct { plugin *GB28181Plugin ctx context.Context unRegister bool + channels util.Collection[string, *gb28181.DeviceChannel] `gorm:"-:all"` } func NewPlatform(pm *gb28181.PlatformModel, plugin *GB28181Plugin, unRegister bool) *Platform { @@ -457,14 +459,17 @@ func (p *Platform) handleCatalog(req *sip.Request, tx sip.ServerTransaction, msg // 查询通道列表 var channels []gb28181.DeviceChannel - if p.plugin.DB != nil { - if err := p.plugin.DB.Table("gb28181_channel gc"). - Select(`gc.*`). - Joins("left join gb28181_platform_channel gpc on gc.id=gpc.channel_db_id"). - Where("gpc.platform_server_gb_id = ? and gc.status='ON'", p.PlatformModel.ServerGBID). - Find(&channels).Error; err != nil { - return fmt.Errorf("query channels error: %v", err) - } + //if p.plugin.DB != nil { + // if err := p.plugin.DB.Table("gb28181_channel gc"). + // Select(`gc.*`). + // Joins("left join gb28181_platform_channel gpc on gc.id=gpc.channel_db_id"). + // Where("gpc.platform_server_gb_id = ? and gc.status='ON'", p.PlatformModel.ServerGBID). + // Find(&channels).Error; err != nil { + // return fmt.Errorf("query channels error: %v", err) + // } + //} + for channel := range p.channels.Range { + channels = append(channels, *channel) } // 发送目录响应,无论是否有通道 @@ -826,7 +831,7 @@ func (p *Platform) buildChannelItem(channel gb28181.DeviceChannel) string { channel.RegisterWay, // 直接使用整数值 channel.Secrecy, // 直接使用整数值 parentID, - channel.Parental, // 直接使用整数值 + channel.Parental, // 直接使用整数值 channel.SafetyWay) // 直接使用整数值 } diff --git a/plugin/gb28181/recordinfo.go b/plugin/gb28181/recordinfo.go index 714016c..6d207b1 100644 --- a/plugin/gb28181/recordinfo.go +++ b/plugin/gb28181/recordinfo.go @@ -17,7 +17,7 @@ func (gb *GB28181Plugin) RecordInfoQuery(deviceID string, channelID string, star return nil, fmt.Errorf("device not found: %s", deviceID) } - channel, ok := device.channels.Get(channelID) + channel, ok := device.channels.Get(deviceID + "_" + channelID) if !ok { return nil, fmt.Errorf("channel not found: %s", channelID) }