feat: 实现订阅位置、报警、目录信令

This commit is contained in:
ydajiang
2025-10-06 20:16:27 +08:00
parent 71c3206f09
commit 11cfb29c24
21 changed files with 654 additions and 143 deletions

90
api.go
View File

@@ -261,7 +261,6 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/device/session/stop", withVerify(common.WithFormDataParams(apiServer.OnSessionStop, StreamIDParams{}))) // 关闭流
apiServer.router.HandleFunc("/api/v1/device/setchannelid", withVerify(common.WithFormDataParams(apiServer.OnCustomChannelSet, CustomChannel{}))) // 关闭流
apiServer.router.HandleFunc("/api/v1/position/sub", common.WithJsonResponse(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置
apiServer.router.HandleFunc("/api/v1/playback/seek", common.WithJsonResponse(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek
apiServer.router.HandleFunc("/api/v1/control/ptz", withVerify(common.WithFormDataParams(apiServer.OnPTZControl, QueryRecordParams{}))) // 云台控制
@@ -279,6 +278,7 @@ func startApiServer(addr string) {
// 暂未开发
apiServer.router.HandleFunc("/api/v1/alarm/list", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 报警查询
apiServer.router.HandleFunc("/api/v1/sms/list", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 报警查询
apiServer.router.HandleFunc("/api/v1/cloudrecord/querychannels", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 云端录像
apiServer.router.HandleFunc("/api/v1/user/list", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 用户管理
apiServer.router.HandleFunc("/api/v1/log/list", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 操作日志
@@ -726,10 +726,6 @@ func (api *ApiServer) OnCloseLiveStream(v *InviteParams, _ http.ResponseWriter,
}
func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) {
values := r.URL.Query()
log.Sugar.Debugf("查询设备列表 %s", values.Encode())
var status string
if "" == q.Online {
} else if "true" == q.Online {
@@ -781,10 +777,10 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, _ http.ResponseWriter,
}
response.DeviceList_ = append(response.DeviceList_, LiveGBSDevice{
AlarmSubscribe: false, // 报警订阅
AlarmSubscribe: device.AlarmSubscribe, // 报警订阅
CatalogInterval: device.CatalogInterval, // 目录刷新时间
CatalogProgress: catalogProgress,
CatalogSubscribe: false, // 目录订阅
CatalogSubscribe: device.CatalogSubscribe, // 目录订阅
ChannelCount: device.ChannelsTotal,
ChannelOverLoad: false,
Charset: "GB2312",
@@ -809,7 +805,7 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, _ http.ResponseWriter,
Online: device.Online(),
PTZSubscribe: false, // PTZ订阅2022
Password: "",
PositionSubscribe: false, // 位置订阅
PositionSubscribe: device.PositionSubscribe, // 位置订阅
RecordCenter: false,
RecordIndistinct: false,
RecvStreamIP: "",
@@ -829,9 +825,6 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, _ http.ResponseWriter,
}
func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) {
values := r.URL.Query()
log.Sugar.Debugf("查询通道列表 %s", values.Encode())
var deviceName string
if q.DeviceID != "" {
device, err := dao.Device.QueryDevice(q.DeviceID)
@@ -953,24 +946,6 @@ func (api *ApiServer) OnRecordList(v *QueryRecordParams, _ http.ResponseWriter,
return &response, nil
}
func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
log.Sugar.Debugf("订阅位置 %v", *v)
model, _ := dao.Device.QueryDevice(v.DeviceID)
if model == nil || !model.Online() {
log.Sugar.Errorf("订阅位置失败, 设备离线 device: %s", v.DeviceID)
return nil, fmt.Errorf("设备离线")
}
device := &stack.Device{DeviceModel: model}
if err := device.SubscribePosition(v.ChannelID); err != nil {
log.Sugar.Errorf("订阅位置失败 err: %s", err.Error())
return nil, err
}
return nil, nil
}
func (api *ApiServer) OnSeekPlayback(v *SeekParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
log.Sugar.Debugf("快进回放 %v", *v)
@@ -1635,11 +1610,60 @@ func (api *ApiServer) OnPlaybackControl(params *StreamIDParams, _ http.ResponseW
}
func (api *ApiServer) OnDeviceInfoSet(params *DeviceInfo, w http.ResponseWriter, req *http.Request) (interface{}, error) {
// 目前仅实现修改目录订阅间隔
if params.CatalogInterval < 60 {
return nil, errors.New("catalog_interval error")
} else if err := dao.Device.UpdateCatalogInterval(params.DeviceID, params.CatalogInterval); err != nil {
model, err := dao.Device.QueryDevice(params.DeviceID)
if err != nil {
return nil, err
}
device := stack.Device{DeviceModel: model}
// 更新的字段和值
conditions := make(map[string]interface{}, 0)
// 刷新目录间隔
if params.CatalogInterval != model.CatalogInterval && params.CatalogInterval != dao.DefaultCatalogInterval && params.CatalogInterval >= 60 {
conditions["catalog_interval"] = params.CatalogInterval
}
if model.CatalogSubscribe != params.CatalogSubscribe {
conditions["catalog_subscribe"] = params.CatalogSubscribe
// 开启目录订阅
if params.CatalogSubscribe {
_ = device.SubscribeCatalog()
} else {
// 取消目录订阅
device.UnsubscribeCatalog()
}
}
if model.PositionSubscribe != params.PositionSubscribe {
conditions["position_subscribe"] = params.PositionSubscribe
// 开启位置订阅
if params.PositionSubscribe {
_ = device.SubscribePosition()
} else {
// 取消位置订阅
device.UnsubscribePosition()
}
}
if model.AlarmSubscribe != params.AlarmSubscribe {
conditions["alarm_subscribe"] = params.AlarmSubscribe
// 开启报警订阅
if params.AlarmSubscribe {
_ = device.SubscribeAlarm()
} else {
// 取消报警订阅
device.UnsubscribeAlarm()
}
}
// 更新设备信息
if len(conditions) > 0 {
if err = dao.Device.UpdateDevice(params.DeviceID, conditions); err != nil {
return nil, err
}
}
return "OK", nil
}

View File

@@ -21,7 +21,7 @@ type Config_ struct {
AliveExpires int `json:"alive_expires"`
MobilePositionInterval int `json:"mobile_position_interval"`
MobilePositionExpires int `json:"mobile_position_expires"`
SubscribeExpires int `json:"subscribe_expires"`
MediaServer string `json:"media_server"`
AutoCloseOnIdle bool `json:"auto_close_on_idle"`

View File

@@ -8,6 +8,7 @@
"password":"123456",
"alive_expires": 180,
"mobile_position_interval": 10,
"subscribe_expires": 3600,
"media_server": "http://0.0.0.0:8080",

View File

@@ -50,10 +50,11 @@ type ChannelModel struct {
Longitude string `json:"longitude" xml:"Longitude,omitempty"`
Latitude string `json:"latitude" xml:"Latitude,omitempty"`
Setup common.SetupType `json:"setup,omitempty"`
ChannelNumber int `json:"channel_number" xml:"-"` // 对应1078的通道号
SubCount int `json:"-" xml:"-"` // 子节点数量
IsDir bool `json:"-" xml:"-"` // 是否是目录
CustomID *string `gorm:"unique"` // 自定义通道ID
ChannelNumber int `json:"channel_number" xml:"-"` // 对应1078的通道号
SubCount int `json:"-" xml:"-"` // 子节点数量
IsDir bool `json:"-" xml:"-"` // 是否是目录
CustomID *string `gorm:"unique"` // 自定义通道ID
Event string `json:"-" xml:"Event,omitempty" gorm:"-"` // <!-- 状态改变事件ON:上线,OFF:离线,VLOST:视频丢失,DEFECT:故障,ADD:增加,DEL:删除,UPDATE:更新(必选)-->
}
func (d *ChannelModel) TableName() string {

View File

@@ -34,6 +34,9 @@ type DeviceModel struct {
CatalogInterval int // 录像目录刷新间隔,单位秒, 默认3600每小时刷新
LastRefreshCatalog time.Time `gorm:"type:datetime"` // 最后刷新目录时间
//ScheduleRecord [7]uint64 // 录像计划0-6表示周一至周日一天的时间刻度用一个uint64表示从高位开始代表0点每bit半小时共占用48位, 1表示录像0表示不录像
CatalogSubscribe bool `json:"catalog_subscribe"` // 是否开启目录订阅
AlarmSubscribe bool `json:"alarm_subscribe"` // 是否开启报警订阅
PositionSubscribe bool `json:"position_subscribe"` // 是否开启位置订阅
}
func (d *DeviceModel) TableName() string {
@@ -278,3 +281,9 @@ func (d *daoDevice) UpdateCatalogInterval(id string, interval int) error {
return tx.Model(&DeviceModel{}).Where("device_id =?", id).Update("catalog_interval", interval).Error
})
}
func (d *daoDevice) UpdateDevice(deviceId string, conditions map[string]interface{}) error {
return DBTransaction(func(tx *gorm.DB) error {
return tx.Model(&DeviceModel{}).Where("device_id =?", deviceId).Updates(conditions).Error
})
}

87
dao/dialog.go Normal file
View File

@@ -0,0 +1,87 @@
package dao
import (
"gb-cms/common"
"gorm.io/gorm"
"time"
)
const (
SipDialogTypeSubscribeCatalog = iota + 1
SipDialogTypeSubscribeAlarm
SipDialogTypeSubscribePosition
)
// SipDialogModel 持久化SIP会话
type SipDialogModel struct {
GBModel
DeviceID string
ChannelID string
CallID string
Dialog *common.RequestWrapper `json:"message,omitempty"`
Type int
RefreshTime time.Time
}
func (m *SipDialogModel) TableName() string {
return "lkm_dialog"
}
type daoDialog struct {
}
func (m *daoDialog) QueryDialogs(id string) ([]*SipDialogModel, error) {
var dialogs []*SipDialogModel
err := db.Where("device_id = ?", id).Find(&dialogs).Error
if err != nil {
return nil, err
}
return dialogs, nil
}
func (m *daoDialog) QueryDialogsByType(id string, t int) ([]*SipDialogModel, error) {
var dialogs []*SipDialogModel
err := db.Where("device_id = ? and type = ?", id, t).Find(&dialogs).Error
if err != nil {
return nil, err
}
return dialogs, nil
}
// DeleteDialogs 删除设备下的所有会话
func (m *daoDialog) DeleteDialogs(id string) error {
return DBTransaction(func(tx *gorm.DB) error {
return tx.Where("device_id = ?", id).Unscoped().Delete(&SipDialogModel{}).Error
})
}
// DeleteDialogsByType 删除设备下的指定类型会话
func (m *daoDialog) DeleteDialogsByType(id string, t int) (*SipDialogModel, error) {
var dialog SipDialogModel
err := DBTransaction(func(tx *gorm.DB) error {
err := tx.Where("device_id = ? and type = ?", id, t).First(&dialog).Error
if err != nil {
return err
}
return tx.Unscoped().Delete(&dialog).Error
})
return &dialog, err
}
// Save 保存会话
func (m *daoDialog) Save(dialog *SipDialogModel) error {
return DBTransaction(func(tx *gorm.DB) error {
return tx.Save(dialog).Error
})
}
// QueryExpiredDialogs 查找即将过期的订阅会话
func (m *daoDialog) QueryExpiredDialogs(now time.Time) ([]*SipDialogModel, error) {
var dialogs []*SipDialogModel
err := db.Where("refresh_time >= ?", now).Find(&dialogs).Error
if err != nil {
return nil, err
}
return dialogs, nil
}

View File

@@ -25,6 +25,7 @@ var (
Sink = &daoSink{}
JTDevice = &daoJTDevice{}
Blacklist = &daoBlacklist{}
Dialog = &daoDialog{}
)
func init() {
@@ -62,11 +63,6 @@ func init() {
s.SetMaxOpenConns(40)
s.SetMaxIdleConns(10)
// devices
// channels
// platforms
// streams
// sinks
if err = db.AutoMigrate(&DeviceModel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&ChannelModel{}); err != nil {
@@ -83,6 +79,8 @@ func init() {
panic(err)
} else if err = db.AutoMigrate(&BlacklistModel{}); err != nil {
panic(err)
} else if err = db.AutoMigrate(&SipDialogModel{}); err != nil {
panic(err)
}
StartSaveTask()

View File

@@ -79,7 +79,7 @@ func main() {
}
}
// 启动session超时管理
// 启动web session超时管理
go TokenManager.Start(5 * time.Minute)
// 启动设备在线超时管理
@@ -126,7 +126,9 @@ func main() {
go startApiServer(httpAddr)
// 启动目录刷新任务
stack.AddScheduledTask(time.Minute, true, stack.RefreshCatalogScheduleTask)
go stack.AddScheduledTask(time.Minute, true, stack.RefreshCatalogScheduleTask)
// 启动订阅刷新任务
go stack.AddScheduledTask(time.Minute, true, stack.RefreshSubscribeScheduleTask)
err = http.ListenAndServe(":19000", nil)
if err != nil {

View File

@@ -141,17 +141,8 @@ func updateDevicesStatus() {
offlineDevices = append(offlineDevices, key)
}
if len(offlineDevices) > 0 {
// 每次更新100个
for i := 0; i < len(offlineDevices); i += 100 {
end := i + 100
if end > len(offlineDevices) {
end = len(offlineDevices)
}
if err = dao.Device.UpdateOfflineDevices(offlineDevices[i:end]); err != nil {
log.Sugar.Errorf("更新设备状态失败 device: %s", offlineDevices[i:end])
}
}
for _, device := range offlineDevices {
stack.CloseDevice(device)
}
}
}

109
stack/alarm.go Normal file
View File

@@ -0,0 +1,109 @@
package stack
import (
"fmt"
"gb-cms/common"
"gb-cms/dao"
"gb-cms/log"
"github.com/ghettovoice/gosip/sip"
"time"
)
const (
AlarmFormat = "<?xml version=\"1.0\"?>\r\n" +
"<Query>\r\n" +
"<CmdType>Alarm</CmdType>\r\n" +
"<SN>%d</SN>\r\n" +
"<DeviceID>%s</DeviceID>\r\n" +
"<StartAlarmPriority>1</StartAlarmPriority>\r\n" + // <!-- 报警起始级别(可选),0为全部,1为一级警情,2为二级警情,3为三级警情,4为四级警情-->
"<EndAlarmPriority>4</EndAlarmPriority>\r\n" + // <!-- 报警终止级别(可选),0为全部,1为一级警情,2为二级警情,3为三级警情,4为四级警情-->
"<AlarmMethod>0</AlarmMethod>\r\n" + // <!-- 报警方式条件(可选),取值0为全部,1为电话报警,2为设备报警,3为短信报警,4为GPS报警,5为视频报警,6为设备故障报警,7其他报警;可以为直接组合如12为电话报警或设备报警-->
"<StartTime>%s</StartTime>\r\n" +
"<EndTime>%s</EndTime>\r\n" +
"</Query>"
AlarmResponseFormat = "<?xml version=\"1.0\"?>\r\n" +
"<Response>\r\n" +
"<CmdType>Alarm</CmdType>\r\n" +
"<SN>%d</SN>\r\n" +
"<DeviceID>%s</DeviceID>\r\n" +
"<Result>%s</Result>\r\n" +
"</Response>"
)
type AlarmNotify struct {
BaseMessage
AlarmPriority string `xml:"AlarmPriority"` // <!-- 报警级别(必选),1为一级警情,2为二级警情,3为三级警情,4为四级警情-->
AlarmMethod string `xml:"AlarmMethod"` // <!-- 报警方式(必选),取值1为电话报警,2为设备报警,3为短信报警,4为GPS报警,5为视频报警,6为设备故障报警,7其他报警-->
AlarmTime string `xml:"AlarmTime"` // <!--报警时间(必选)-->
AlarmDescription string `xml:"AlarmDescription"` // <!--报警内容描述(可选)-->
Longitude string `xml:"Longitude"` // <!-- 经度(可选)-->
Latitude string `xml:"Latitude"` // <!-- 纬度(可选)-->
Info *struct {
// <!-- 报警类型。报警方式为2时,不携带AlarmType为默认的报警设备报警,携带AlarmType取值及对应报警类型如下:
// 1-视频丢失报警;2-设备防拆报警;3-存储设备磁盘满报警;4-设备高温报警;5-设备低温报警。报警方式为5时,取值如下:
// 1-人工视频报警;2-运动目标检测报警;3-遗留物检测报警;4-物体移除检测报警;5-绊线检测报警;6-入侵检测报警;7-逆行检测报警;8-徘徊检测报警;9-流量统计报警;
// 10-密度检测报警;11-视频异常检测报警;12-快速移动报警。报警方式为6时,取值如下:1-存储设备磁盘故障报警;2-存储设备风扇故障报警。-->
AlarmType *int `xml:"AlarmType"`
// <!—报警类型扩展参数。在入侵检测报警时可携带<EventType>事件类型</EventType>,事件类型取值:1-进入区域;2-离开区域。-->
AlarmTypeParam *struct {
EventType *int `xml:"EventType"`
}
} `xml:"Info"`
}
func (d *Device) SubscribeAlarm() error {
now := time.Now()
end := now.Add(time.Duration(common.Config.SubscribeExpires) * time.Second)
builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, d.DeviceID)
body := fmt.Sprintf(AlarmFormat, GetSN(), d.DeviceID, now.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05"))
expiresHeader := sip.Expires(common.Config.SubscribeExpires)
builder.SetExpires(&expiresHeader)
builder.SetContentType(&XmlMessageType)
builder.SetContact(GlobalContactAddress)
builder.SetBody(body)
request, err := builder.Build()
if err != nil {
return err
}
err = SendSubscribeMessage(d.DeviceID, request, dao.SipDialogTypeSubscribeAlarm, EventPresence)
if err != nil {
log.Sugar.Errorf("订阅报警失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
return err
}
func (d *Device) UnsubscribeAlarm() {
body := fmt.Sprintf(MobilePositionMessageFormatUnsubscribe, GetSN(), d.DeviceID)
err := Unsubscribe(d.DeviceID, dao.SipDialogTypeSubscribeAlarm, EventPresence, []byte(body), d.RemoteIP, d.RemotePort)
if err != nil {
log.Sugar.Errorf("取消订阅报警失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
}
func (d *Device) RefreshSubscribeAlarm() {
now := time.Now()
end := now.Add(time.Duration(common.Config.SubscribeExpires) * time.Second)
body := fmt.Sprintf(AlarmFormat, GetSN(), d.DeviceID, now.Format("2006-01-02T15:04:05"), end.Format("2006-01-02T15:04:05"))
err := RefreshSubscribe(d.DeviceID, dao.SipDialogTypeSubscribeAlarm, EventPresence, common.Config.SubscribeExpires, []byte(body), d.RemoteIP, d.RemotePort)
if err != nil {
log.Sugar.Errorf("刷新报警订阅失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
}
// SendAlarmNotificationResponseCmd 设备主动上报的报警信息,需要回复确认
func (d *Device) SendAlarmNotificationResponseCmd(sn int, id string) {
builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, d.DeviceID)
builder.SetBody(fmt.Sprintf(AlarmResponseFormat, sn, id, "OK"))
request, err := builder.Build()
if err != nil {
return
}
common.SipStack.SendRequest(request)
}

53
stack/catalog.go Normal file
View File

@@ -0,0 +1,53 @@
package stack
import (
"fmt"
"gb-cms/common"
"gb-cms/dao"
"gb-cms/log"
"github.com/ghettovoice/gosip/sip"
)
const (
EventCatalog = "catalog"
)
// SubscribeCatalog 域间目录订阅
func (d *Device) SubscribeCatalog() error {
builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, d.DeviceID)
body := fmt.Sprintf(CatalogFormat, GetSN(), d.DeviceID)
expiresHeader := sip.Expires(common.Config.SubscribeExpires)
builder.SetExpires(&expiresHeader)
builder.SetContentType(&XmlMessageType)
builder.SetContact(GlobalContactAddress)
builder.SetBody(body)
request, err := builder.Build()
if err != nil {
return err
}
err = SendSubscribeMessage(d.DeviceID, request, dao.SipDialogTypeSubscribeCatalog, EventCatalog)
if err != nil {
log.Sugar.Errorf("订阅目录失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
return err
}
func (d *Device) UnsubscribeCatalog() {
body := fmt.Sprintf(CatalogFormat, GetSN(), d.DeviceID)
err := Unsubscribe(d.DeviceID, dao.SipDialogTypeSubscribeCatalog, EventCatalog, []byte(body), d.RemoteIP, d.RemotePort)
if err != nil {
log.Sugar.Errorf("取消订阅目录失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
}
func (d *Device) RefreshSubscribeCatalog() {
body := fmt.Sprintf(CatalogFormat, GetSN(), d.DeviceID)
err := RefreshSubscribe(d.DeviceID, dao.SipDialogTypeSubscribeCatalog, EventCatalog, common.Config.SubscribeExpires, []byte(body), d.RemoteIP, d.RemotePort)
if err != nil {
log.Sugar.Errorf("刷新目录订阅失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
}

View File

@@ -72,7 +72,8 @@ type GBDevice interface {
//
//OnNotifyAlarm()
SubscribePosition(channelId string) error
SubscribeEvent()
//SubscribePosition(channelId string) error
//SubscribeCatalog()
//
@@ -327,40 +328,6 @@ func (d *Device) OnBye(request sip.Request) {
}
func (d *Device) SubscribePosition(channelId string) error {
if channelId == "" {
channelId = d.DeviceID
}
//暂时不考虑级联
builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, channelId)
body := fmt.Sprintf(MobilePositionMessageFormat, GetSN(), channelId, common.Config.MobilePositionInterval)
expiresHeader := sip.Expires(common.Config.MobilePositionExpires)
builder.SetExpires(&expiresHeader)
builder.SetContentType(&XmlMessageType)
builder.SetContact(GlobalContactAddress)
builder.SetBody(body)
request, err := builder.Build()
if err != nil {
return err
}
event := Event(EventPresence)
request.AppendHeader(&event)
response, err := common.SipStack.SendRequestWithTimeout(5, request)
if err != nil {
return err
}
if response.StatusCode() != 200 {
return fmt.Errorf("err code %d", response.StatusCode())
}
return nil
}
func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction {
body := fmt.Sprintf(BroadcastFormat, GetSN(), sourceId, channelId)
request := d.BuildMessageRequest(channelId, body)
@@ -450,6 +417,8 @@ func (d *Device) Close() {
// 更新在数据库中的状态
d.Status = common.OFF
_ = dao.Device.UpdateDeviceStatus(d.DeviceID, common.OFF)
// 删除设备下的所有会话
_ = dao.Dialog.DeleteDialogs(d.DeviceID)
}
// CreateDialogRequestFromAnswer 根据invite的应答创建Dialog请求

View File

@@ -91,7 +91,10 @@ func NewOnlineDeviceManager() *onlineDeviceManager {
// OnExpires Redis设备ID到期回调
func OnExpires(db int, id string) {
log.Sugar.Infof("设备心跳过期 device: %s", id)
CloseDevice(id)
}
func CloseDevice(id string) {
device, _ := dao.Device.QueryDevice(id)
if device == nil {
log.Sugar.Errorf("设备不存在 device: %s", id)

View File

@@ -13,7 +13,7 @@ const (
)
func (d *Device) ScalePlayback(dialog sip.Request, speed float64) {
infoRequest := CreateRequestFromDialog(dialog, sip.INFO)
infoRequest := CreateRequestFromDialog(dialog, sip.INFO, d.RemoteIP, d.RemotePort)
sn := GetSN()
body := fmt.Sprintf(RTSPBodyFormat, sn, speed)
infoRequest.SetBody(body, true)
@@ -22,13 +22,5 @@ func (d *Device) ScalePlayback(dialog sip.Request, speed float64) {
infoRequest.RemoveHeader("Contact")
infoRequest.AppendHeader(GlobalContactAddress.AsContactHeader())
// 替換到device的真實地址
recipient := infoRequest.Recipient()
if uri, ok := recipient.(*sip.SipUri); ok {
sipPort := sip.Port(d.RemotePort)
uri.FHost = d.RemoteIP
uri.FPort = &sipPort
}
common.SipStack.SendRequest(infoRequest)
}

View File

@@ -3,20 +3,34 @@ package stack
import (
"fmt"
"gb-cms/common"
"gb-cms/dao"
"gb-cms/log"
"github.com/ghettovoice/gosip/sip"
)
const (
EventPresence = "presence" //SIP 的事件通知机制(如 RFC 3856 和 RFC 6665实现
//MobilePositionMessageFormat = "<?xml version=\"1.0\"?>\r\n" +
// "<Query>\r\n" +
// "<CmdType>MobilePosition</CmdType>\r\n" +
// "<SN>%s</SN>\r\n" +
// "<DeviceID>%s</DeviceID>\r\n" +
// "<Interval>%d</Interval>\r\n" +
// "</Query>\r\n"
MobilePositionMessageFormat = "<Query><CmdType>MobilePosition</CmdType><SN>%d</SN><DeviceID>%s</DeviceID><Interval>%d</Interval></Query>"
EventPresence = "presence" //SIP 的事件通知机制(如 RFC 3856 和 RFC 6665实现
MobilePositionMessageFormat = "<?xml version=\"1.0\"?>\r\n" +
"<Query>\r\n" +
"<CmdType>MobilePosition</CmdType>\r\n" +
"<SN>%d</SN>\r\n" +
"<DeviceID>%s</DeviceID>\r\n" +
"<Interval>%d</Interval>\r\n" +
"</Query>\r\n"
MobilePositionMessageFormatUnsubscribe = "<?xml version=\"1.0\"?>\r\n" +
"<Query>\r\n" +
"<CmdType>MobilePosition</CmdType>\r\n" +
"<SN>%d</SN>\r\n" +
"<DeviceID>%s</DeviceID>\r\n" +
"</Query>\r\n"
//MobilePositionMessageFormat = "<Query>" +
// "<CmdType>MobilePosition</CmdType>" +
// "<SN>%d</SN>" +
// "<DeviceID>%s</DeviceID>" +
// "<Interval>%d</Interval>" +
// "</Query>"
)
type MobilePositionNotify struct {
@@ -31,16 +45,14 @@ type MobilePositionNotify struct {
Altitude string `xml:"Altitude"`
}
func (d *Device) DoSubscribePosition(channelId string) error {
if channelId == "" {
channelId = d.DeviceID
}
func (d *Device) SubscribePosition() error {
channelId := d.DeviceID
//暂时不考虑级联
// 暂时不考虑级联
builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, channelId)
body := fmt.Sprintf(MobilePositionMessageFormat, 1, channelId, common.Config.MobilePositionInterval)
body := fmt.Sprintf(MobilePositionMessageFormat, GetSN(), channelId, common.Config.MobilePositionInterval)
expiresHeader := sip.Expires(common.Config.MobilePositionExpires)
expiresHeader := sip.Expires(common.Config.SubscribeExpires)
builder.SetExpires(&expiresHeader)
builder.SetContentType(&XmlMessageType)
builder.SetContact(GlobalContactAddress)
@@ -51,20 +63,26 @@ func (d *Device) DoSubscribePosition(channelId string) error {
return err
}
event := Event(EventPresence)
request.AppendHeader(&event)
response, err := common.SipStack.SendRequestWithTimeout(5, request)
err = SendSubscribeMessage(d.DeviceID, request, dao.SipDialogTypeSubscribePosition, EventPresence)
if err != nil {
return err
log.Sugar.Errorf("订阅位置失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
if response.StatusCode() != 200 {
return fmt.Errorf("err code %d", response.StatusCode())
return err
}
func (d *Device) UnsubscribePosition() {
body := fmt.Sprintf(MobilePositionMessageFormatUnsubscribe, GetSN(), d.DeviceID)
err := Unsubscribe(d.DeviceID, dao.SipDialogTypeSubscribePosition, EventPresence, []byte(body), d.RemoteIP, d.RemotePort)
if err != nil {
log.Sugar.Errorf("取消订阅位置失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
return nil
}
func (d *Device) OnMobilePositionNotify(notify *MobilePositionNotify) {
log.Sugar.Infof("收到位置信息 device:%s data:%v", d.DeviceID, notify)
func (d *Device) RefreshSubscribePosition() {
body := fmt.Sprintf(MobilePositionMessageFormat, GetSN(), d.DeviceID, common.Config.MobilePositionInterval)
err := RefreshSubscribe(d.DeviceID, dao.SipDialogTypeSubscribePosition, EventPresence, common.Config.SubscribeExpires, []byte(body), d.RemoteIP, d.RemotePort)
if err != nil {
log.Sugar.Errorf("刷新位置订阅失败 err: %s deviceID: %s", err.Error(), d.DeviceID)
}
}

View File

@@ -17,6 +17,27 @@ func RefreshCatalogScheduleTask() {
}
}
func RefreshSubscribeScheduleTask() {
dialogs, _ := dao.Dialog.QueryExpiredDialogs(time.Now())
for _, dialog := range dialogs {
go func(t int, id string) {
device, _ := dao.Device.QueryDevice(id)
if device == nil {
return
}
d := &Device{device}
if dao.SipDialogTypeSubscribeCatalog == t {
d.RefreshSubscribeCatalog()
} else if dao.SipDialogTypeSubscribePosition == t {
d.RefreshSubscribePosition()
} else if dao.SipDialogTypeSubscribeAlarm == t {
d.RefreshSubscribeAlarm()
}
}(dialog.Type, dialog.DeviceID)
}
}
func AddScheduledTask(interval time.Duration, firstRun bool, task func()) {
ticker := time.NewTicker(interval)
if firstRun {

View File

@@ -54,7 +54,7 @@ func (s *Sink) MarshalJSON() ([]byte, error) {
func (s *Sink) Bye() {
if s.Dialog != nil && s.Dialog.Request != nil {
byeRequest := CreateRequestFromDialog(s.Dialog.Request, sip.BYE)
byeRequest := CreateRequestFromDialog(s.Dialog.Request, sip.BYE, "", 0)
go common.SipStack.SendRequest(byeRequest)
}
}

View File

@@ -25,6 +25,8 @@ type Handler interface {
OnDeviceInfo(device string, response *DeviceInfoResponse)
OnNotifyPosition(notify *MobilePositionNotify)
OnNotifyCatalog(catalog *CatalogResponse)
}
type EventHandler struct {
@@ -56,6 +58,10 @@ func (e *EventHandler) OnRegister(id, transport, addr, userAgent string) (int, G
if err := dao.Device.SaveDevice(device); err != nil {
log.Sugar.Errorf("保存设备信息到数据库失败 device: %s err: %s", id, err.Error())
return 0, nil, false
} else if d, err := dao.Device.QueryDevice(id); err == nil {
// 查询所有字段
device = d
}
OnlineDeviceManager.Add(id, now)
@@ -114,6 +120,46 @@ func (e *EventHandler) OnDeviceInfo(device string, response *DeviceInfoResponse)
}
}
func (e *EventHandler) OnNotifyPosition(notify *MobilePositionNotify) {
func (e *EventHandler) OnNotifyPositionMessage(notify *MobilePositionNotify) {
log.Sugar.Infof("收到位置通知 device:%s data:%v", notify.DeviceID, notify)
}
func (e *EventHandler) OnNotifyCatalogMessage(catalog *CatalogResponse) {
log.Sugar.Infof("收到目录通知 device:%s data:%v", catalog.DeviceID, catalog)
for _, channel := range catalog.DeviceList.Devices {
if channel.Event == "" {
log.Sugar.Warnf("目录事件为空 设备ID: %s", channel.DeviceID)
continue
}
channel.RootID = catalog.DeviceID
switch channel.Event {
case "ON":
_ = dao.Channel.UpdateChannelStatus(catalog.DeviceID, channel.DeviceID, string(common.ON))
break
case "OFF":
_ = dao.Channel.UpdateChannelStatus(catalog.DeviceID, channel.DeviceID, string(common.OFF))
break
case "VLOST":
break
case "DEFECT":
break
case "ADD":
_ = dao.Channel.SaveChannel(channel)
break
case "DEL":
_ = dao.Channel.DeleteChannel(catalog.DeviceID, channel.DeviceID)
break
case "UPDATE":
_ = dao.Channel.SaveChannel(channel)
break
default:
log.Sugar.Warnf("未知的目录事件 %s 设备ID: %s", channel.Event, channel.DeviceID)
}
}
}
func (e *EventHandler) OnNotifyAlarmMessage(alarm *AlarmNotify) {
log.Sugar.Infof("收到报警通知 device:%s data:%v", alarm.DeviceID, alarm)
}

View File

@@ -38,6 +38,7 @@ const (
CmdKeepalive = "Keepalive"
CmdBroadcast = "Broadcast"
CmdMediaStatus = "MediaStatus"
CmdAlarm = "Alarm"
)
type sipServer struct {
@@ -98,9 +99,12 @@ func (s *sipServer) OnRegister(wrapper *SipRequestSource) {
SendResponse(wrapper.tx, response)
// 注册成功
if device != nil {
// 查询设备信息
device.QueryDeviceInfo()
// 处理各种订阅
device.SubscribeEvent()
}
if queryCatalog {
@@ -199,13 +203,34 @@ func (s *sipServer) OnNotify(wrapper *SipRequestSource) {
response := sip.NewResponseFromRequest("", wrapper.req, 200, "OK", "")
SendResponse(wrapper.tx, response)
mobilePosition := MobilePositionNotify{}
if err := DecodeXML([]byte(wrapper.req.Body()), &mobilePosition); err != nil {
log2.Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), wrapper.req.String())
return
// 位置通知/目录通知/报警通知/PTZ?
cmd := GetCmdType(wrapper.req.Body())
switch cmd {
case CmdMobilePosition:
mobilePosition := MobilePositionNotify{}
if err := DecodeXML([]byte(wrapper.req.Body()), &mobilePosition); err != nil {
log2.Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), wrapper.req.String())
return
}
s.handler.OnNotifyPositionMessage(&mobilePosition)
break
case CmdCatalog:
catalog := CatalogResponse{}
if err := DecodeXML([]byte(wrapper.req.Body()), &catalog); err != nil {
log2.Sugar.Errorf("解析目录通知失败 err: %s request: %s", err.Error(), wrapper.req.String())
return
}
s.handler.OnNotifyCatalogMessage(&catalog)
break
case CmdAlarm:
alarm := AlarmNotify{}
if err := DecodeXML([]byte(wrapper.req.Body()), &alarm); err != nil {
log2.Sugar.Errorf("解析报警通知失败 err: %s request: %s", err.Error(), wrapper.req.String())
return
}
s.handler.OnNotifyAlarmMessage(&alarm)
break
}
s.handler.OnNotifyPosition(&mobilePosition)
}
func (s *sipServer) OnMessage(wrapper *SipRequestSource) {
@@ -296,6 +321,33 @@ func (s *sipServer) OnMessage(wrapper *SipRequestSource) {
ok = true
id, _ := wrapper.req.CallID()
CloseStreamByCallID(id.Value())
} else if CmdAlarm == cmd {
ok = true
// 9.4 报警事件通知和分发
notify := AlarmNotify{}
if err := DecodeXML([]byte(wrapper.req.Body()), &notify); err != nil {
log2.Sugar.Errorf("解析报警通知失败 err: %s request: %s", err.Error(), wrapper.req.String())
return
}
// 发送响应命令
d, err := dao.Device.QueryDevice(deviceId)
if err != nil {
return
} else {
device := Device{d}
device.SendAlarmNotificationResponseCmd(notify.SN, notify.DeviceID)
s.handler.OnNotifyAlarmMessage(&notify)
}
} else if CmdCatalog == cmd {
ok = true
catalog := CatalogResponse{}
if err := DecodeXML([]byte(wrapper.req.Body()), &catalog); err != nil {
log2.Sugar.Errorf("解析目录通知失败 err: %s request: %s", err.Error(), wrapper.req.String())
return
}
s.handler.OnNotifyCatalogMessage(&catalog)
}
break

View File

@@ -83,7 +83,7 @@ func (s *Stream) Bye() {
}
}
func CreateRequestFromDialog(dialog sip.Request, method sip.RequestMethod) sip.Request {
func CreateRequestFromDialog(dialog sip.Request, method sip.RequestMethod, remoteIP string, remotePort int) sip.Request {
{
seq, _ := dialog.CSeq()
seq.SeqNo++
@@ -93,11 +93,22 @@ func CreateRequestFromDialog(dialog sip.Request, method sip.RequestMethod) sip.R
request := dialog.Clone().(sip.Request)
request.SetMethod(method)
request.SetSource("")
request.SetDestination("")
// 替換到device的真實地址
if remoteIP != "" {
recipient := request.Recipient()
if uri, ok := recipient.(*sip.SipUri); ok {
sipPort := sip.Port(remotePort)
uri.FHost = remoteIP
uri.FPort = &sipPort
}
}
return request
}
func (s *Stream) CreateRequestFromDialog(method sip.RequestMethod) sip.Request {
return CreateRequestFromDialog(s.Dialog, method)
return CreateRequestFromDialog(s.Dialog, method, "", 0)
}
func CloseStream(streamId common.StreamID, ms bool) {

View File

@@ -1,13 +1,137 @@
package stack
import "github.com/ghettovoice/gosip/sip"
import (
"fmt"
"gb-cms/common"
"gb-cms/dao"
"github.com/ghettovoice/gosip/sip"
"time"
)
type GBSubscribe struct {
PositionDialog sip.Request
CatalogDialog sip.Request
AlarmDialog sip.Request
// SubscribeEvent 发起订阅
func (d *Device) SubscribeEvent() {
if d.PositionSubscribe {
// 先取消订阅之前的,再重新发起订阅
dialogs, _ := dao.Dialog.QueryDialogsByType(d.DeviceID, dao.SipDialogTypeSubscribePosition)
if len(dialogs) > 0 {
d.UnsubscribePosition()
}
_ = d.SubscribePosition()
}
if d.CatalogSubscribe {
// 先取消订阅之前的,再重新发起订阅
dialogs, _ := dao.Dialog.QueryDialogsByType(d.DeviceID, dao.SipDialogTypeSubscribeCatalog)
if len(dialogs) > 0 {
d.UnsubscribeCatalog()
}
_ = d.SubscribeCatalog()
}
if d.AlarmSubscribe {
// 先取消订阅之前的,再重新发起订阅
dialogs, _ := dao.Dialog.QueryDialogsByType(d.DeviceID, dao.SipDialogTypeSubscribeAlarm)
if len(dialogs) > 0 {
d.UnsubscribeAlarm()
}
_ = d.SubscribeAlarm()
}
}
func RefreshSubscribe(expires int) {
// SendSubscribeMessage 通用发送订阅消息
func SendSubscribeMessage(deviceId string, request sip.Request, t int, event Event) error {
request.AppendHeader(&event)
transaction := common.SipStack.SendRequest(request)
response := <-transaction.Responses()
if response == nil {
return fmt.Errorf("no response")
} else if response.StatusCode() != 200 {
return fmt.Errorf("error response code: %d", response.StatusCode())
}
// 保存dialog到数据库
dialog := CreateDialogRequestFromAnswer(response, false, request.Source())
callid, _ := dialog.CallID()
model := &dao.SipDialogModel{
DeviceID: deviceId,
CallID: callid.Value(),
Dialog: &common.RequestWrapper{Request: dialog},
Type: t,
RefreshTime: time.Now().Add(time.Duration(common.Config.SubscribeExpires-60) * time.Second), // 刷新订阅时间, -60秒预留计时器出发间隔, 确保订阅在过期前刷新
}
return dao.Dialog.Save(model)
}
// Unsubscribe 通用取消订阅消息
func Unsubscribe(deviceId string, t int, event Event, body []byte, remoteIP string, remotePort int) error {
model, err := dao.Dialog.DeleteDialogsByType(deviceId, t)
if err != nil {
return err
}
request := CreateRequestFromDialog(model.Dialog.Request, sip.SUBSCRIBE, remoteIP, remotePort)
// 添加事件头
expiresHeader := sip.Expires(0)
contactHeader := sip.ContactHeader{DisplayName: GlobalContactAddress.DisplayName, Address: GlobalContactAddress.Uri, Params: GlobalContactAddress.Params}
request.RemoveHeader("Event")
request.RemoveHeader("Expires")
request.RemoveHeader("Contact")
request.RemoveHeader("Content-Type")
request.AppendHeader(&event)
request.AppendHeader(&expiresHeader)
request.AppendHeader(&contactHeader)
request.AppendHeader(&XmlMessageType)
if body != nil {
request.SetBody(string(body), true)
}
common.SipStack.SendRequest(request)
return nil
}
func RefreshSubscribe(deviceId string, t int, event Event, expires int, body []byte, remoteIP string, remotePort int) error {
dialogs, _ := dao.Dialog.QueryDialogsByType(deviceId, t)
if len(dialogs) == 0 {
return fmt.Errorf("no dialog")
}
request := CreateRequestFromDialog(dialogs[0].Dialog.Request, sip.SUBSCRIBE, remoteIP, remotePort)
expiresHeader := sip.Expires(expires)
contactHeader := sip.ContactHeader{DisplayName: GlobalContactAddress.DisplayName, Address: GlobalContactAddress.Uri, Params: GlobalContactAddress.Params}
request.RemoveHeader("Event")
request.RemoveHeader("Expires")
request.RemoveHeader("Contact")
request.RemoveHeader("Content-Type")
request.AppendHeader(&event)
request.AppendHeader(&expiresHeader)
request.AppendHeader(&contactHeader)
request.AppendHeader(&XmlMessageType)
if body != nil {
request.SetBody(string(body), true)
}
transaction := common.SipStack.SendRequest(request)
response := <-transaction.Responses()
if response == nil {
return fmt.Errorf("no response")
} else if response.StatusCode() != 200 {
return fmt.Errorf("error response code: %d", response.StatusCode())
} else {
// 刷新订阅时间, -60秒预留计时器出发间隔, 确保订阅在过期前刷新
dialogs[0].RefreshTime = time.Now().Add(time.Duration(common.Config.SubscribeExpires-60) * time.Second)
err := dao.Dialog.Save(dialogs[0])
if err != nil {
return err
}
}
return nil
}