feat: gb28181 support add platform and platform channel from config.yaml

This commit is contained in:
pggiroro
2025-06-04 20:22:51 +08:00
parent cf218215ff
commit 75791fe93f
11 changed files with 337 additions and 265 deletions

View File

@@ -433,10 +433,10 @@ func (gb *GB28181Plugin) SyncDevice(ctx context.Context, req *pb.SyncDeviceReque
if !ok && gb.DB != nil {
// 如果内存中没有且数据库存在,则从数据库查询
var device Device
if err := gb.DB.Where("id = ?", req.DeviceId).First(&device).Error; err == nil {
if err := gb.DB.Where("device_id = ?", req.DeviceId).First(&device).Error; err == nil {
d = &device
// 恢复设备的必要字段
d.Logger = gb.Logger.With("id", req.DeviceId)
d.Logger = gb.With("deviceid", req.DeviceId)
d.channels.L = new(sync.RWMutex)
d.plugin = gb
@@ -1143,7 +1143,7 @@ func (gb *GB28181Plugin) QueryRecord(ctx context.Context, req *pb.QueryRecordReq
return resp, nil
}
channel, ok := device.channels.Get(req.ChannelId)
channel, ok := device.channels.Get(req.DeviceId + "_" + req.ChannelId)
if !ok {
resp.Code = 404
resp.Message = "channel not found"
@@ -1533,6 +1533,13 @@ func (gb *GB28181Plugin) AddPlatformChannel(ctx context.Context, req *pb.AddPlat
resp.Message = fmt.Sprintf("提交事务失败: %v", err)
return resp, nil
}
if platform, ok := gb.platforms.Get(req.PlatformId); !ok {
for _, channelId := range req.ChannelIds {
if channel, ok := gb.channels.Get(channelId); ok {
platform.channels.Set(channel)
}
}
}
resp.Code = 0
resp.Message = "success"
@@ -1593,7 +1600,7 @@ func (gb *GB28181Plugin) Recording(ctx context.Context, req *pb.RecordingRequest
}
// 从device.channels中查找实际通道
_, ok = actualDevice.channels.Get(result.ChannelID)
_, ok = actualDevice.channels.Get(result.DeviceID + "_" + result.ChannelID)
if !ok {
resp.Code = 404
resp.Message = "实际通道未找到"
@@ -1626,7 +1633,7 @@ func (gb *GB28181Plugin) Recording(ctx context.Context, req *pb.RecordingRequest
}
// 检查通道是否存在
_, ok = device.channels.Get(req.ChannelId)
_, ok = device.channels.Get(req.DeviceId + "_" + req.ChannelId)
if !ok {
resp.Code = 404
resp.Message = "通道未找到"
@@ -1712,7 +1719,7 @@ func (gb *GB28181Plugin) GetSnap(ctx context.Context, req *pb.GetSnapRequest) (*
}
// 从device.channels中查找实际通道
_, ok = actualDevice.channels.Get(result.ChannelID)
_, ok = actualDevice.channels.Get(result.DeviceID + "_" + result.ChannelID)
if !ok {
resp.Code = 404
resp.Message = "实际通道未找到"
@@ -1756,7 +1763,7 @@ func (gb *GB28181Plugin) GetSnap(ctx context.Context, req *pb.GetSnapRequest) (*
}
// 检查通道是否存在
_, ok = device.channels.Get(req.ChannelId)
_, ok = device.channels.Get(req.DeviceId + "_" + req.ChannelId)
if !ok {
resp.Code = 404
resp.Message = "通道未找到"

View File

@@ -51,11 +51,11 @@ type Channel struct {
RecordReqs util.Collection[int, *RecordRequest]
PresetReqs util.Collection[int, *PresetRequest] // 预置位请求集合
*slog.Logger
gb28181.DeviceChannel
*gb28181.DeviceChannel
}
func (c *Channel) GetKey() string {
return c.ChannelID
return c.ID
}
type PullProxy struct {
@@ -75,7 +75,7 @@ func (p *PullProxy) Start() error {
streamPaths := strings.Split(p.GetStreamPath(), "/")
deviceId, channelId := streamPaths[0], streamPaths[1]
if device, ok := p.Plugin.GetHandler().(*GB28181Plugin).devices.Get(deviceId); ok {
if _, ok := device.channels.Get(channelId); ok {
if _, ok := device.channels.Get(deviceId + "_" + channelId); ok {
p.ChangeStatus(m7s.PullProxyStatusOnline)
}
}

View File

@@ -230,7 +230,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
d.catalogReqs.RemoveByKey(msg.SN)
}
case "RecordInfo":
if channel, ok := d.channels.Get(msg.DeviceID); ok {
if channel, ok := d.channels.Get(d.DeviceId + "_" + msg.DeviceID); ok {
if req, ok := channel.RecordReqs.Get(msg.SN); ok {
// 添加响应并检查是否完成
if req.AddResponse(*msg) {
@@ -239,7 +239,7 @@ func (d *Device) onMessage(req *sip.Request, tx sip.ServerTransaction, msg *gb28
}
}
case "PresetQuery":
if channel, ok := d.channels.Get(msg.DeviceID); ok {
if channel, ok := d.channels.Get(d.DeviceId + "_" + msg.DeviceID); ok {
if req, ok := channel.PresetReqs.Get(msg.SN); ok {
// 添加预置位响应
req.Response = msg.PresetList.Item
@@ -616,15 +616,16 @@ func (d *Device) frontEndCmdString(cmdCode int32, parameter1 int32, parameter2 i
}
func (d *Device) addOrUpdateChannel(c gb28181.DeviceChannel) {
if channel, ok := d.channels.Get(c.ChannelID); ok {
channel.DeviceChannel = c
if channel, ok := d.channels.Get(c.ID); ok {
channel.DeviceChannel = &c
} else {
channel = &Channel{
Device: d,
Logger: d.Logger.With("channel", c.ChannelID),
DeviceChannel: c,
Logger: d.Logger.With("channel", c.ID),
DeviceChannel: &c,
}
d.channels.Set(channel)
d.plugin.channels.Set(channel.DeviceChannel)
}
}

View File

@@ -83,7 +83,7 @@ func (d *Dialog) Start() (err error) {
var device *Device
if deviceTmp, ok := d.gb.devices.Get(deviceId); ok {
device = deviceTmp
if channel, ok := deviceTmp.channels.Get(channelId); ok {
if channel, ok := deviceTmp.channels.Get(deviceId + "_" + channelId); ok {
d.Channel = channel
d.StreamMode = device.StreamMode
} else {

View File

@@ -70,7 +70,7 @@ func (d *ForwardDialog) Start() (err error) {
var device *Device
if deviceTmp, ok := d.gb.devices.Get(deviceId); ok {
device = deviceTmp
if channel, ok := deviceTmp.channels.Get(channelId); ok {
if channel, ok := deviceTmp.channels.Get(deviceId + "_" + channelId); ok {
d.channel = channel
} else {
return fmt.Errorf("channel %s not found", channelId)

View File

@@ -56,6 +56,8 @@ type GB28181Plugin struct {
SipIP string `desc:"sip发送命令的IP一般是本地IP多网卡时需要配置正确的IP"`
MediaIP string `desc:"流媒体IP用于接收流"`
deviceManager task.Manager[string, *DeviceRegisterQueueTask]
Platforms []*gb28181.PlatformModel
channels util.Collection[string, *gb28181.DeviceChannel]
}
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
@@ -127,6 +129,7 @@ func (gb *GB28181Plugin) initDatabase() error {
}
func (gb *GB28181Plugin) OnInit() (err error) {
gb.Info("GB28181 initing", gb.Platforms)
gb.AddTask(&gb.deviceManager)
logger := zerolog.New(os.Stdout)
gb.ua, err = sipgo.NewUA(sipgo.WithUserAgent("M7S/" + m7s.Version)) // Build user agent
@@ -368,11 +371,44 @@ func (gb *GB28181Plugin) checkPlatform() {
gb.Info("找到启用状态的平台", "count", len(platformModels))
if gb.Platforms != nil && len(gb.Platforms) > 0 {
platformModels = append(platformModels, gb.Platforms...)
}
// 遍历所有平台进行初始化和注册
for _, platformModel := range platformModels {
// 创建Platform实例
platform := NewPlatform(platformModel, gb, true)
if platformModel.PlatformChannels != nil && len(platformModel.PlatformChannels) > 0 {
for i := range platformModel.PlatformChannels {
channelDbId := platformModel.PlatformChannels[i].ChannelDBID
if channelDbId != "" {
if channel, ok := gb.channels.Get(channelDbId); ok {
platform.channels.Set(channel)
}
}
}
} else {
// 查询通道列表
var channels []gb28181.DeviceChannel
if gb.DB != nil {
if err := gb.DB.Table("gb28181_channel gc").
Select(`gc.*`).
Joins("left join gb28181_platform_channel gpc on gc.id=gpc.channel_db_id").
Where("gpc.platform_server_gb_id = ? and gc.status='ON'", platformModel.ServerGBID).
Find(&channels).Error; err != nil {
gb.Error("<UNK>", "error", err.Error())
}
if channels != nil && len(channels) > 0 {
for i := range channels {
if channel, ok := gb.channels.Get(channels[i].ID); ok {
platform.channels.Set(channel)
}
}
}
}
}
//go platform.Unregister()
//if err != nil {
// gb.Error("unregister err ", err)
@@ -446,10 +482,15 @@ func (gb *GB28181Plugin) OnMessage(req *sip.Request, tx sip.ServerTransaction) {
d, _ = gb.devices.Get(id)
// 检查是否是平台
if gb.DB != nil {
var platform gb28181.PlatformModel
if err := gb.DB.First(&platform, gb28181.PlatformModel{ServerGBID: id, Enable: true}).Error; err == nil {
p = &platform
//if gb.DB != nil {
// var platform gb28181.PlatformModel
// if err := gb.DB.First(&platform, gb28181.PlatformModel{ServerGBID: id, Enable: true}).Error; err == nil {
// p = &platform
// }
//}
if platformtmp, ok := gb.platforms.Get(id); ok {
if platformtmp.PlatformModel.Enable {
p = platformtmp.PlatformModel
}
}
@@ -693,214 +734,222 @@ func (gb *GB28181Plugin) OnInvite(req *sip.Request, tx sip.ServerTransaction) {
// 首先从数据库中查询平台
var platform *Platform
var platformModel = &gb28181.PlatformModel{}
if gb.DB != nil {
// 使用requesterId查询平台类似于Java代码中的queryPlatformByServerGBId
result := gb.DB.Where("server_gb_id = ?", inviteInfo.RequesterId).First(&platformModel)
if result.Error == nil {
// 数据库中找到平台根据平台ID从运行时实例中查找
if platformTmp, platformFound := gb.platforms.Get(platformModel.ServerGBID); !platformFound {
gb.Error("OnInvite", "error", "platform found in DB but not in runtime", "platformId", inviteInfo.RequesterId)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Platform Not Found In Runtime", nil))
return
} else {
platform = platformTmp
}
//var platformModel = &gb28181.PlatformModel{}
//if gb.DB != nil {
// // 使用requesterId查询平台类似于Java代码中的queryPlatformByServerGBId
// result := gb.DB.Where("server_gb_id = ?", inviteInfo.RequesterId).First(&platformModel)
// if result.Error == nil {
// 数据库中找到平台根据平台ID从运行时实例中查找
if platformTmp, platformFound := gb.platforms.Get(inviteInfo.RequesterId); !platformFound {
gb.Error("OnInvite", "error", "platform found in DB but not in runtime", "platformId", inviteInfo.RequesterId)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Platform Not Found In Runtime", nil))
return
} else {
platform = platformTmp
}
gb.Info("OnInvite", "action", "platform found", "platformId", inviteInfo.RequesterId, "platformName", platform.PlatformModel.Name)
gb.Info("OnInvite", "action", "platform found", "platformId", inviteInfo.RequesterId, "platformName", platform.PlatformModel.Name)
// 使用GORM的模型查询方式更加符合GORM的使用习惯
// 默认情况下GORM会自动处理软删除只查询未删除的记录
var deviceChannels []gb28181.DeviceChannel
channelResult := gb.DB.Model(&gb28181.DeviceChannel{}).
Joins("LEFT JOIN gb28181_platform_channel ON gb28181_channel.id = gb28181_platform_channel.channel_db_id").
Where("gb28181_platform_channel.platform_server_gb_id = ? AND gb28181_channel.channel_id = ?",
platform.PlatformModel.ServerGBID, inviteInfo.TargetChannelId).
Order("gb28181_channel.id").
Find(&deviceChannels)
// 使用GORM的模型查询方式更加符合GORM的使用习惯
// 默认情况下GORM会自动处理软删除只查询未删除的记录
//var deviceChannels []gb28181.DeviceChannel
//channelResult := gb.DB.Model(&gb28181.DeviceChannel{}).
// Joins("LEFT JOIN gb28181_platform_channel ON gb28181_channel.id = gb28181_platform_channel.channel_db_id").
// Where("gb28181_platform_channel.platform_server_gb_id = ? AND gb28181_channel.channel_id = ?",
// platform.PlatformModel.ServerGBID, inviteInfo.TargetChannelId).
// Order("gb28181_channel.id").
// Find(&deviceChannels)
//
//if channelResult.Error != nil || len(deviceChannels) == 0 {
// gb.Error("OnInvite", "error", "channel not found", "channelId", inviteInfo.TargetChannelId, "err", channelResult.Error)
// _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Channel Not Found", nil))
// return
//}
if channelResult.Error != nil || len(deviceChannels) == 0 {
gb.Error("OnInvite", "error", "channel not found", "channelId", inviteInfo.TargetChannelId, "err", channelResult.Error)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Channel Not Found", nil))
return
}
// 找到了通道
var channel *gb28181.DeviceChannel
// 找到了通道
channel := deviceChannels[len(deviceChannels)-1]
gb.Info("OnInvite", "action", "channel found", "channelId", channel.ChannelID, "channelName", channel.Name)
platform.channels.Range(func(channelTmp *gb28181.DeviceChannel) bool {
if channelTmp.ChannelID == inviteInfo.TargetChannelId {
channel = channelTmp
}
return true
})
var channelTmp *Channel
if deviceFound, ok := gb.devices.Get(channel.DeviceID); ok {
if channelFound, ok := deviceFound.channels.Get(channel.ChannelID); ok {
channelTmp = channelFound
} else {
gb.Error("OnInvite", "channel not found memory,ChannelID is ", channel.ChannelID)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil))
return
}
} else {
gb.Error("OnInvite", "device not found memory,deviceID is ", channel.DeviceID)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil))
return
}
gb.Info("OnInvite", "action", "channel found", "channelId", channel.ChannelID, "channelName", channel.Name)
// 通道存在发送100 Trying响应
tryingResp := sip.NewResponseFromRequest(req, sip.StatusTrying, "Trying", nil)
if err := tx.Respond(tryingResp); err != nil {
gb.Error("OnInvite", "error", "send trying response failed", "err", err.Error())
return
}
// 检查SSRC
if inviteInfo.SSRC == "" {
gb.Error("OnInvite", "error", "ssrc not found in invite")
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil))
return
}
// 获取媒体信息
mediaPort := uint16(0)
if gb.MediaPort.Valid() {
select {
case port := <-gb.tcpPorts:
mediaPort = port
gb.Debug("OnInvite", "action", "allocate port", "port", port)
default:
gb.Error("OnInvite", "error", "no available port")
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "No Available Port", nil))
return
}
} else {
mediaPort = gb.MediaPort[0]
gb.Debug("OnInvite", "action", "use default port", "port", mediaPort)
}
// 构建SDP响应
// 使用平台和通道的信息构建响应
sdpIP := platform.PlatformModel.DeviceIP
// 如果平台配置了SendStreamIP则使用此IP
if platform.PlatformModel.SendStreamIP != "" {
sdpIP = platform.PlatformModel.SendStreamIP
}
// 构建SDP内容参考Java代码createSendSdp方法
content := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.ChannelID, sdpIP),
fmt.Sprintf("s=%s", inviteInfo.SessionName),
fmt.Sprintf("c=IN IP4 %s", sdpIP),
}
// 处理播放时间
if strings.EqualFold("Playback", inviteInfo.SessionName) && inviteInfo.StartTime > 0 && inviteInfo.StopTime > 0 {
content = append(content, fmt.Sprintf("t=%d %d", inviteInfo.StartTime, inviteInfo.StopTime))
} else {
content = append(content, "t=0 0")
}
// 处理传输模式
if inviteInfo.TCP {
content = append(content, fmt.Sprintf("m=video %d TCP/RTP/AVP 96", mediaPort))
if inviteInfo.TCPActive {
content = append(content, "a=setup:passive")
} else {
content = append(content, "a=setup:active")
}
if inviteInfo.TCP {
content = append(content, "a=connection:new")
}
} else {
content = append(content, fmt.Sprintf("m=video %d RTP/AVP 96", mediaPort))
}
// 添加其他属性参考Java代码
content = append(content,
"a=sendonly",
"a=rtpmap:96 PS/90000",
fmt.Sprintf("y=%s", inviteInfo.SSRC),
"f=",
)
// 发送200 OK响应
response := sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil)
contentType := sip.ContentTypeHeader("application/sdp")
response.AppendHeader(&contentType)
response.SetBody([]byte(strings.Join(content, "\r\n") + "\r\n"))
// 创建并保存SendRtpInfo以供OnAck方法使用
forwardDialog := &ForwardDialog{
gb: gb,
platformIP: inviteInfo.IP,
platformPort: inviteInfo.Port,
platformSSRC: inviteInfo.SSRC,
TCP: inviteInfo.TCP,
TCPActive: inviteInfo.TCPActive,
platformCallId: req.CallID().Value(),
start: inviteInfo.StartTime,
end: inviteInfo.StopTime,
channel: channelTmp,
upIP: sdpIP,
upPort: mediaPort,
}
forwardDialog.forwarder = gb28181.NewRTPForwarder()
forwardDialog.forwarder.TCP = forwardDialog.TCP
forwardDialog.forwarder.TCPActive = forwardDialog.TCPActive
forwardDialog.forwarder.StreamMode = forwardDialog.channel.Device.StreamMode
if forwardDialog.TCPActive {
forwardDialog.forwarder.UpListenAddr = fmt.Sprintf(":%d", forwardDialog.upPort)
} else {
forwardDialog.forwarder.UpListenAddr = fmt.Sprintf("%s:%d", forwardDialog.upIP, forwardDialog.platformPort)
}
// 设置监听地址和端口
if strings.ToUpper(forwardDialog.channel.Device.StreamMode) == "TCP-ACTIVE" {
forwardDialog.forwarder.DownListenAddr = fmt.Sprintf("%s:%d", forwardDialog.downIP, forwardDialog.downPort)
} else {
forwardDialog.forwarder.DownListenAddr = fmt.Sprintf(":%d", forwardDialog.MediaPort)
}
// 设置转发目标
if inviteInfo.IP != "" && forwardDialog.platformPort > 0 {
err = forwardDialog.forwarder.SetTarget(forwardDialog.platformIP, forwardDialog.platformPort)
if err != nil {
gb.Error("set target error", "err", err)
return
}
} else {
gb.Error("no target set, will only receive but not forward")
return
}
// 设置目标SSRC
if forwardDialog.platformSSRC != "" {
forwardDialog.forwarder.TargetSSRC = forwardDialog.platformSSRC
gb.Info("set target ssrc", "ssrc", forwardDialog.platformSSRC)
}
// 保存到集合中
gb.forwardDialogs.Set(forwardDialog)
gb.Info("OnInvite", "action", "sendRtpInfo created", "callId", req.CallID().Value())
if err := tx.Respond(response); err != nil {
gb.Error("OnInvite", "error", "send response failed", "err", err.Error())
return
}
gb.Info("OnInvite", "action", "complete", "platformId", inviteInfo.RequesterId, "channelId", channel.ChannelID,
"ip", inviteInfo.IP, "port", inviteInfo.Port, "tcp", inviteInfo.TCP, "tcpActive", inviteInfo.TCPActive)
return
var channelTmp *Channel
if deviceFound, ok := gb.devices.Get(channel.DeviceID); ok {
if channelFound, ok := deviceFound.channels.Get(channel.ID); ok {
channelTmp = channelFound
} else {
// 数据库中未找到平台响应not found
gb.Error("OnInvite", "error", "platform not found in database", "platformId", inviteInfo.RequesterId)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Platform Not Found", nil))
gb.Error("OnInvite", "channel not found memory,ChannelID is ", channel.ChannelID)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil))
return
}
} else {
// 数据库未初始化,响应服务不可用
gb.Error("OnInvite", "error", "database not initialized")
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "Database Not Initialized", nil))
gb.Error("OnInvite", "device not found memory,deviceID is ", channel.DeviceID)
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil))
return
}
// 通道存在发送100 Trying响应
tryingResp := sip.NewResponseFromRequest(req, sip.StatusTrying, "Trying", nil)
if err := tx.Respond(tryingResp); err != nil {
gb.Error("OnInvite", "error", "send trying response failed", "err", err.Error())
return
}
// 检查SSRC
if inviteInfo.SSRC == "" {
gb.Error("OnInvite", "error", "ssrc not found in invite")
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusBadRequest, "SSRC Not Found", nil))
return
}
// 获取媒体信息
mediaPort := uint16(0)
if gb.MediaPort.Valid() {
select {
case port := <-gb.tcpPorts:
mediaPort = port
gb.Debug("OnInvite", "action", "allocate port", "port", port)
default:
gb.Error("OnInvite", "error", "no available port")
_ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "No Available Port", nil))
return
}
} else {
mediaPort = gb.MediaPort[0]
gb.Debug("OnInvite", "action", "use default port", "port", mediaPort)
}
// 构建SDP响应
// 使用平台和通道的信息构建响应
sdpIP := platform.PlatformModel.DeviceIP
// 如果平台配置了SendStreamIP则使用此IP
if platform.PlatformModel.SendStreamIP != "" {
sdpIP = platform.PlatformModel.SendStreamIP
}
// 构建SDP内容参考Java代码createSendSdp方法
content := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %s", channel.ChannelID, sdpIP),
fmt.Sprintf("s=%s", inviteInfo.SessionName),
fmt.Sprintf("c=IN IP4 %s", sdpIP),
}
// 处理播放时间
if strings.EqualFold("Playback", inviteInfo.SessionName) && inviteInfo.StartTime > 0 && inviteInfo.StopTime > 0 {
content = append(content, fmt.Sprintf("t=%d %d", inviteInfo.StartTime, inviteInfo.StopTime))
} else {
content = append(content, "t=0 0")
}
// 处理传输模式
if inviteInfo.TCP {
content = append(content, fmt.Sprintf("m=video %d TCP/RTP/AVP 96", mediaPort))
if inviteInfo.TCPActive {
content = append(content, "a=setup:passive")
} else {
content = append(content, "a=setup:active")
}
if inviteInfo.TCP {
content = append(content, "a=connection:new")
}
} else {
content = append(content, fmt.Sprintf("m=video %d RTP/AVP 96", mediaPort))
}
// 添加其他属性参考Java代码
content = append(content,
"a=sendonly",
"a=rtpmap:96 PS/90000",
fmt.Sprintf("y=%s", inviteInfo.SSRC),
"f=",
)
// 发送200 OK响应
response := sip.NewResponseFromRequest(req, sip.StatusOK, "OK", nil)
contentType := sip.ContentTypeHeader("application/sdp")
response.AppendHeader(&contentType)
response.SetBody([]byte(strings.Join(content, "\r\n") + "\r\n"))
// 创建并保存SendRtpInfo以供OnAck方法使用
forwardDialog := &ForwardDialog{
gb: gb,
platformIP: inviteInfo.IP,
platformPort: inviteInfo.Port,
platformSSRC: inviteInfo.SSRC,
TCP: inviteInfo.TCP,
TCPActive: inviteInfo.TCPActive,
platformCallId: req.CallID().Value(),
start: inviteInfo.StartTime,
end: inviteInfo.StopTime,
channel: channelTmp,
upIP: sdpIP,
upPort: mediaPort,
}
forwardDialog.forwarder = gb28181.NewRTPForwarder()
forwardDialog.forwarder.TCP = forwardDialog.TCP
forwardDialog.forwarder.TCPActive = forwardDialog.TCPActive
forwardDialog.forwarder.StreamMode = forwardDialog.channel.Device.StreamMode
if forwardDialog.TCPActive {
forwardDialog.forwarder.UpListenAddr = fmt.Sprintf(":%d", forwardDialog.upPort)
} else {
forwardDialog.forwarder.UpListenAddr = fmt.Sprintf("%s:%d", forwardDialog.upIP, forwardDialog.platformPort)
}
// 设置监听地址和端口
if strings.ToUpper(forwardDialog.channel.Device.StreamMode) == "TCP-ACTIVE" {
forwardDialog.forwarder.DownListenAddr = fmt.Sprintf("%s:%d", forwardDialog.downIP, forwardDialog.downPort)
} else {
forwardDialog.forwarder.DownListenAddr = fmt.Sprintf(":%d", forwardDialog.MediaPort)
}
// 设置转发目标
if inviteInfo.IP != "" && forwardDialog.platformPort > 0 {
err = forwardDialog.forwarder.SetTarget(forwardDialog.platformIP, forwardDialog.platformPort)
if err != nil {
gb.Error("set target error", "err", err)
return
}
} else {
gb.Error("no target set, will only receive but not forward")
return
}
// 设置目标SSRC
if forwardDialog.platformSSRC != "" {
forwardDialog.forwarder.TargetSSRC = forwardDialog.platformSSRC
gb.Info("set target ssrc", "ssrc", forwardDialog.platformSSRC)
}
// 保存到集合中
gb.forwardDialogs.Set(forwardDialog)
gb.Info("OnInvite", "action", "sendRtpInfo created", "callId", req.CallID().Value())
if err := tx.Respond(response); err != nil {
gb.Error("OnInvite", "error", "send response failed", "err", err.Error())
return
}
gb.Info("OnInvite", "action", "complete", "platformId", inviteInfo.RequesterId, "channelId", channel.ChannelID,
"ip", inviteInfo.IP, "port", inviteInfo.Port, "tcp", inviteInfo.TCP, "tcpActive", inviteInfo.TCPActive)
return
//} else {
// // 数据库中未找到平台响应not found
// gb.Error("OnInvite", "error", "platform not found in database", "platformId", inviteInfo.RequesterId)
// _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusNotFound, "Platform Not Found", nil))
// return
//}
//} else {
// // 数据库未初始化,响应服务不可用
// gb.Error("OnInvite", "error", "database not initialized")
// _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "Database Not Initialized", nil))
// return
//}
}
func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) {

View File

@@ -300,3 +300,7 @@ func (d *DeviceChannel) appendInfoContent(content *string) {
*content += " <SVCTimeSupportMode>" + strconv.Itoa(d.SVCTimeSupportMode) + "</SVCTimeSupportMode>\n"
}
}
func (d *DeviceChannel) GetKey() string {
return d.ID
}

View File

@@ -66,3 +66,8 @@ type PlatformChannel struct {
func (*PlatformChannel) TableName() string {
return "gb28181_platform_channel"
}
func (p *PlatformChannel) GetKey() string {
return p.PlatformServerGBID + "_" + p.ChannelDBID
}

View File

@@ -9,43 +9,44 @@ import (
// 包含了平台的基本信息、SIP服务配置、设备信息、认证信息等。
// 用于存储和管理GB28181平台的所有相关参数。
type PlatformModel struct {
Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用
Name string `gorm:"column:name;omitempty" json:"name"` // Name表示平台的名称
ServerGBID string `gorm:"primaryKey;column:server_gb_id;omitempty" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码
ServerGBDomain string `gorm:"column:server_gb_domain;omitempty" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域
ServerIP string `gorm:"column:server_ip;omitempty" json:"serverIp"` // ServerIP表示SIP服务器的IP地址
ServerPort int `gorm:"column:server_port;omitempty" json:"serverPort"` // ServerPort表示SIP服务器的端口号
DeviceGBID string `gorm:"column:device_gb_id;omitempty" json:"deviceGBId"` // DeviceGBID表示设备的国标编号
DeviceIP string `gorm:"column:device_ip;omitempty" json:"deviceIp"` // DeviceIP表示设备的IP地址
DevicePort int `gorm:"column:device_port;omitempty" json:"devicePort"` // DevicePort表示设备的端口号
Username string `gorm:"column:username;omitempty" json:"username"` // Username表示SIP认证的用户名默认使用设备国标编号
Password string `gorm:"column:password;omitempty" json:"password"` // Password表示SIP认证的密码
Expires int `gorm:"column:expires;omitempty" json:"expires"` // Expires表示注册的过期时间单位为秒
KeepTimeout int `gorm:"column:keep_timeout;omitempty" json:"keepTimeout"` // KeepTimeout表示心跳超时时间单位为秒
Transport string `gorm:"column:transport;omitempty" json:"transport"` // Transport表示传输协议类型
CharacterSet string `gorm:"column:character_set;omitempty" json:"characterSet"` // CharacterSet表示字符集编码
PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制
RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活
Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态
ChannelCount int `gorm:"column:channel_count;omitempty" json:"channelCount"` // ChannelCount表示通道数量
CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息
AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息
MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息
CatalogGroup int `gorm:"column:catalog_group;omitempty" json:"catalogGroup"` // CatalogGroup表示目录分组大小每次向上级发送通道数量
UpdateTime string `gorm:"column:update_time;omitempty" json:"updateTime"` // UpdateTime表示最后更新时间
CreateTime string `gorm:"column:create_time;omitempty" json:"createTime"` // CreateTime表示创建时间
AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用
SendStreamIP string `gorm:"column:send_stream_ip;omitempty" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址
AutoPushChannel bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化
CatalogWithPlatform int `gorm:"column:catalog_with_platform;omitempty" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开)
CatalogWithGroup int `gorm:"column:catalog_with_group;omitempty" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开)
CatalogWithRegion int `gorm:"column:catalog_with_region;omitempty" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开)
CivilCode string `gorm:"column:civil_code;omitempty" json:"civilCode"` // CivilCode表示行政区划代码
Manufacturer string `gorm:"column:manufacturer;omitempty" json:"manufacturer"` // Manufacturer表示平台厂商
Model string `gorm:"column:model;omitempty" json:"model"` // Model表示平台型号
Address string `gorm:"column:address;omitempty" json:"address"` // Address表示平台安装地址
RegisterWay int `gorm:"column:register_way;omitempty" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证)
Secrecy int `gorm:"column:secrecy;omitempty" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密)
Enable bool `gorm:"column:enable" json:"enable"` // Enable表示该平台配置是否启用
Name string `gorm:"column:name;omitempty" json:"name"` // Name表示平台的名称
ServerGBID string `gorm:"primaryKey;column:server_gb_id;omitempty" json:"serverGBId"` // ServerGBID表示SIP服务器的国标编码
ServerGBDomain string `gorm:"column:server_gb_domain;omitempty" json:"serverGBDomain"` // ServerGBDomain表示SIP服务器的国标域
ServerIP string `gorm:"column:server_ip;omitempty" json:"serverIp"` // ServerIP表示SIP服务器的IP地址
ServerPort int `gorm:"column:server_port;omitempty" json:"serverPort"` // ServerPort表示SIP服务器的端口号
DeviceGBID string `gorm:"column:device_gb_id;omitempty" json:"deviceGBId"` // DeviceGBID表示设备的国标编号
DeviceIP string `gorm:"column:device_ip;omitempty" json:"deviceIp"` // DeviceIP表示设备的IP地址
DevicePort int `gorm:"column:device_port;omitempty" json:"devicePort"` // DevicePort表示设备的端口号
Username string `gorm:"column:username;omitempty" json:"username"` // Username表示SIP认证的用户名默认使用设备国标编号
Password string `gorm:"column:password;omitempty" json:"password"` // Password表示SIP认证的密码
Expires int `gorm:"column:expires;omitempty" json:"expires"` // Expires表示注册的过期时间单位为秒
KeepTimeout int `gorm:"column:keep_timeout;omitempty" json:"keepTimeout"` // KeepTimeout表示心跳超时时间单位为秒
Transport string `gorm:"column:transport;omitempty" json:"transport"` // Transport表示传输协议类型
CharacterSet string `gorm:"column:character_set;omitempty" json:"characterSet"` // CharacterSet表示字符集编码
PTZ bool `gorm:"column:ptz" json:"ptz"` // PTZ表示是否允许云台控制
RTCP bool `gorm:"column:rtcp" json:"rtcp"` // RTCP表示是否启用RTCP流保活
Status bool `gorm:"column:status" json:"status"` // Status表示平台当前的在线状态
ChannelCount int `gorm:"column:channel_count;omitempty" json:"channelCount"` // ChannelCount表示通道数量
CatalogSubscribe bool `gorm:"column:catalog_subscribe" json:"catalogSubscribe"` // CatalogSubscribe表示是否已订阅目录信息
AlarmSubscribe bool `gorm:"column:alarm_subscribe" json:"alarmSubscribe"` // AlarmSubscribe表示是否已订阅报警信息
MobilePositionSubscribe bool `gorm:"column:mobile_position_subscribe" json:"mobilePositionSubscribe"` // MobilePositionSubscribe表示是否已订阅移动位置信息
CatalogGroup int `gorm:"column:catalog_group;omitempty" json:"catalogGroup"` // CatalogGroup表示目录分组大小每次向上级发送通道数量
UpdateTime string `gorm:"column:update_time;omitempty" json:"updateTime"` // UpdateTime表示最后更新时间
CreateTime string `gorm:"column:create_time;omitempty" json:"createTime"` // CreateTime表示创建时间
AsMessageChannel bool `gorm:"column:as_message_channel" json:"asMessageChannel"` // AsMessageChannel表示是否作为消息通道使用
SendStreamIP string `gorm:"column:send_stream_ip;omitempty" json:"sendStreamIp"` // SendStreamIP表示点播回复200OK时使用的IP地址
AutoPushChannel bool `gorm:"column:auto_push_channel" json:"autoPushChannel"` // AutoPushChannel表示是否自动推送通道变化
CatalogWithPlatform int `gorm:"column:catalog_with_platform;omitempty" json:"catalogWithPlatform"` // CatalogWithPlatform表示目录信息是否包含平台信息(0:关闭,1:打开)
CatalogWithGroup int `gorm:"column:catalog_with_group;omitempty" json:"catalogWithGroup"` // CatalogWithGroup表示目录信息是否包含分组信息(0:关闭,1:打开)
CatalogWithRegion int `gorm:"column:catalog_with_region;omitempty" json:"catalogWithRegion"` // CatalogWithRegion表示目录信息是否包含行政区划(0:关闭,1:打开)
CivilCode string `gorm:"column:civil_code;omitempty" json:"civilCode"` // CivilCode表示行政区划代码
Manufacturer string `gorm:"column:manufacturer;omitempty" json:"manufacturer"` // Manufacturer表示平台厂商
Model string `gorm:"column:model;omitempty" json:"model"` // Model表示平台型号
Address string `gorm:"column:address;omitempty" json:"address"` // Address表示平台安装地址
RegisterWay int `gorm:"column:register_way;omitempty" json:"registerWay"` // RegisterWay表示注册方式(1:标准认证注册,2:口令认证,3:数字证书双向认证,4:数字证书单向认证)
Secrecy int `gorm:"column:secrecy;omitempty" json:"secrecy"` // Secrecy表示保密属性(0:不涉密,1:涉密)
PlatformChannels []*PlatformChannel `gorm:"-:all"`
}
// TableName 指定数据库表名

View File

@@ -3,6 +3,7 @@ package plugin_gb28181pro
import (
"context"
"fmt"
"m7s.live/v5/pkg/util"
"net/http"
"strconv"
"strings"
@@ -40,6 +41,7 @@ type Platform struct {
plugin *GB28181Plugin
ctx context.Context
unRegister bool
channels util.Collection[string, *gb28181.DeviceChannel] `gorm:"-:all"`
}
func NewPlatform(pm *gb28181.PlatformModel, plugin *GB28181Plugin, unRegister bool) *Platform {
@@ -457,14 +459,17 @@ func (p *Platform) handleCatalog(req *sip.Request, tx sip.ServerTransaction, msg
// 查询通道列表
var channels []gb28181.DeviceChannel
if p.plugin.DB != nil {
if err := p.plugin.DB.Table("gb28181_channel gc").
Select(`gc.*`).
Joins("left join gb28181_platform_channel gpc on gc.id=gpc.channel_db_id").
Where("gpc.platform_server_gb_id = ? and gc.status='ON'", p.PlatformModel.ServerGBID).
Find(&channels).Error; err != nil {
return fmt.Errorf("query channels error: %v", err)
}
//if p.plugin.DB != nil {
// if err := p.plugin.DB.Table("gb28181_channel gc").
// Select(`gc.*`).
// Joins("left join gb28181_platform_channel gpc on gc.id=gpc.channel_db_id").
// Where("gpc.platform_server_gb_id = ? and gc.status='ON'", p.PlatformModel.ServerGBID).
// Find(&channels).Error; err != nil {
// return fmt.Errorf("query channels error: %v", err)
// }
//}
for channel := range p.channels.Range {
channels = append(channels, *channel)
}
// 发送目录响应,无论是否有通道
@@ -826,7 +831,7 @@ func (p *Platform) buildChannelItem(channel gb28181.DeviceChannel) string {
channel.RegisterWay, // 直接使用整数值
channel.Secrecy, // 直接使用整数值
parentID,
channel.Parental, // 直接使用整数值
channel.Parental, // 直接使用整数值
channel.SafetyWay) // 直接使用整数值
}

View File

@@ -17,7 +17,7 @@ func (gb *GB28181Plugin) RecordInfoQuery(deviceID string, channelID string, star
return nil, fmt.Errorf("device not found: %s", deviceID)
}
channel, ok := device.channels.Get(channelID)
channel, ok := device.channels.Get(deviceID + "_" + channelID)
if !ok {
return nil, fmt.Errorf("channel not found: %s", channelID)
}