mirror of
https://github.com/lkmio/gb-cms.git
synced 2025-09-26 19:51:22 +08:00
feat: 支持部标设备和通道编辑
This commit is contained in:
13
api.go
13
api.go
@@ -315,14 +315,13 @@ func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *
|
||||
stream.Put(200)
|
||||
}
|
||||
|
||||
// 对讲websocket已连接
|
||||
// 创建stream
|
||||
if params.Protocol == SourceTypeGBTalk {
|
||||
Sugar.Infof("对讲websocket已连接, stream: %s", params.Stream)
|
||||
|
||||
if params.Protocol == SourceTypeGBTalk || params.Protocol == SourceType1078 {
|
||||
s := &Stream{
|
||||
StreamID: params.Stream,
|
||||
Protocol: params.Protocol,
|
||||
DeviceID: params.Stream.DeviceID(),
|
||||
ChannelID: params.Stream.ChannelID(),
|
||||
StreamID: params.Stream,
|
||||
Protocol: params.Protocol,
|
||||
}
|
||||
|
||||
_, ok := StreamDao.SaveStream(s)
|
||||
@@ -743,7 +742,7 @@ func (api *ApiServer) OnPlatformAdd(v *PlatformModel, w http.ResponseWriter, r *
|
||||
err := fmt.Errorf("用户名长度必须20位")
|
||||
Sugar.Errorf("添加级联设备失败 err: %s", err.Error())
|
||||
return nil, err
|
||||
} else if len(v.SeverID) != 20 {
|
||||
} else if len(v.ServerID) != 20 {
|
||||
err := fmt.Errorf("上级ID长度必须20位")
|
||||
Sugar.Errorf("添加级联设备失败 err: %s", err.Error())
|
||||
return nil, err
|
||||
|
160
api_jt.go
160
api_jt.go
@@ -5,61 +5,144 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func (api *ApiServer) OnVirtualDeviceAdd(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||
Sugar.Infof("add virtual device: %v", *device)
|
||||
|
||||
if len(device.Username) != 20 {
|
||||
Sugar.Errorf("invalid username: %s", device.Username)
|
||||
return nil, fmt.Errorf("invalid username: %s", device.Username)
|
||||
} else if len(device.SeverID) != 20 {
|
||||
Sugar.Errorf("invalid server id: %s", device.SeverID)
|
||||
return nil, fmt.Errorf("invalid server id: %s", device.SeverID)
|
||||
func CheckJTDeviceOptions(device *JTDeviceModel) error {
|
||||
if err := CheckSipUAOptions(&SIPUAOptions{
|
||||
Username: device.Username,
|
||||
ServerAddr: device.ServerAddr,
|
||||
ServerID: device.SeverID,
|
||||
}); err != nil {
|
||||
return err
|
||||
} else if device.SimNumber == "" {
|
||||
// sim卡号必选项
|
||||
Sugar.Errorf("sim number is required")
|
||||
return nil, fmt.Errorf("sim number is required")
|
||||
}
|
||||
|
||||
if JTDeviceDao.ExistDevice(device.Username, device.SimNumber) {
|
||||
// 用户名或sim卡号已存在
|
||||
Sugar.Errorf("username or sim number already exists")
|
||||
return nil, fmt.Errorf("username or sim number already exists")
|
||||
return fmt.Errorf("sim number is required")
|
||||
} else if DeviceDao.ExistDevice(device.Username) {
|
||||
// 用户名与下级设备冲突
|
||||
Sugar.Errorf("username already exists")
|
||||
return nil, fmt.Errorf("username already exists")
|
||||
return fmt.Errorf("username already exists")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func SaveAndStartJTDevice(device *JTDeviceModel) error {
|
||||
jtDevice, err := NewJTDevice(device, SipStack)
|
||||
if err != nil {
|
||||
Sugar.Errorf("create virtual device failed: %s", err.Error())
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if !JTDeviceManager.Add(device.Username, jtDevice) {
|
||||
return nil, fmt.Errorf("ua添加失败, id冲突. key: %s", device.Username)
|
||||
} else if err = JTDeviceDao.SaveDevice(device); err != nil {
|
||||
JTDeviceManager.Remove(device.Username)
|
||||
return fmt.Errorf("ua添加失败, id冲突. key: %s", device.Username)
|
||||
}
|
||||
|
||||
jtDevice.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
func EqualJTDeviceOptions(old, new *JTDeviceModel) bool {
|
||||
return EqualSipUAOptions(&SIPUAOptions{
|
||||
Username: old.Username,
|
||||
ServerAddr: old.ServerAddr,
|
||||
Transport: old.Transport,
|
||||
RegisterExpires: old.RegisterExpires,
|
||||
Password: old.Password,
|
||||
KeepaliveInterval: old.KeepaliveInterval,
|
||||
ServerID: old.SeverID,
|
||||
}, &SIPUAOptions{
|
||||
Username: new.Username,
|
||||
ServerAddr: new.ServerAddr,
|
||||
Transport: new.Transport,
|
||||
RegisterExpires: new.RegisterExpires,
|
||||
Password: new.Password,
|
||||
KeepaliveInterval: new.KeepaliveInterval,
|
||||
ServerID: new.SeverID,
|
||||
})
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnVirtualDeviceAdd(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||
Sugar.Infof("add virtual device: %v", *device)
|
||||
|
||||
err := CheckJTDeviceOptions(device)
|
||||
if err != nil {
|
||||
Sugar.Errorf("%s", err.Error())
|
||||
return nil, err
|
||||
} else if JTDeviceManager.Find(device.Username) != nil {
|
||||
Sugar.Errorf("jt device already exists: %s", device.Username)
|
||||
return nil, fmt.Errorf("jt device already exists: %s", device.Username)
|
||||
}
|
||||
|
||||
err = JTDeviceDao.SaveDevice(device)
|
||||
if err != nil {
|
||||
Sugar.Errorf("save device failed: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
jtDevice.Start()
|
||||
|
||||
err = SaveAndStartJTDevice(device)
|
||||
if err != nil {
|
||||
Sugar.Errorf("add jt device failed: %s", err.Error())
|
||||
return nil, err
|
||||
Sugar.Errorf("start device failed: %s", err.Error())
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnVirtualDeviceEdit(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||
Sugar.Infof("edit virtual device: %v", *device)
|
||||
|
||||
return nil, nil
|
||||
err := CheckJTDeviceOptions(device)
|
||||
if err != nil {
|
||||
Sugar.Errorf("%s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
oldDevice, err := JTDeviceDao.QueryDeviceByID(device.ID)
|
||||
if err != nil {
|
||||
Sugar.Errorf("jt device not found: %d", device.ID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = JTDeviceDao.UpdateDevice(device); err != nil {
|
||||
Sugar.Errorf("update device failed: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 国标ID发生变更, 更新通道的RootID
|
||||
if oldDevice.Username != device.Username {
|
||||
_ = ChannelDao.UpdateRootID(oldDevice.Username, device.Username)
|
||||
}
|
||||
|
||||
// sim卡号发生变更, 告知media server关闭推流, 关闭与上级的转发sink
|
||||
if oldDevice.SimNumber != device.SimNumber {
|
||||
Sugar.Infof("sim number changed, close streams")
|
||||
streams, _ := StreamDao.DeleteStreamByDeviceID(oldDevice.SimNumber)
|
||||
for _, stream := range streams {
|
||||
stream.Close(true, true)
|
||||
}
|
||||
}
|
||||
|
||||
// SipUA信息发生变更, 则需要重启设备
|
||||
if !EqualJTDeviceOptions(oldDevice, device) {
|
||||
Sugar.Infof("sipua options changed, restart device")
|
||||
// 重启设备
|
||||
if client := JTDeviceManager.Remove(oldDevice.Username); client != nil {
|
||||
client.Stop()
|
||||
}
|
||||
|
||||
err = SaveAndStartJTDevice(device)
|
||||
if err != nil {
|
||||
Sugar.Errorf("update device failed: %s", err.Error())
|
||||
}
|
||||
} else {
|
||||
Sugar.Infof("device info changed, update device info")
|
||||
if client := JTDeviceManager.Find(oldDevice.Username); client != nil {
|
||||
client.SetDeviceInfo(device.Name, device.Manufacturer, device.Model, device.Firmware)
|
||||
}
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnVirtualDeviceRemove(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||
Sugar.Infof("remove virtual device: %v", *device)
|
||||
|
||||
err := JTDeviceDao.DeleteDevice(device.Username)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -95,8 +178,25 @@ func (api *ApiServer) OnVirtualChannelAdd(channel *Channel, w http.ResponseWrite
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnVirtualChannelEdit(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||
Sugar.Infof("edit virtual channel: %v", *channel)
|
||||
|
||||
oldChannel, err := ChannelDao.QueryChannelByID(channel.ID)
|
||||
if err != nil {
|
||||
Sugar.Errorf("query channel failed: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
if oldChannel.Name == channel.Name {
|
||||
// 目前只支持修改通道名称
|
||||
Sugar.Warnf("channel name not changed: %s", channel.Name)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
err = ChannelDao.UpdateChannel(channel)
|
||||
if err != nil {
|
||||
Sugar.Errorf("update channel failed: %s", err.Error())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (api *ApiServer) OnVirtualChannelRemove(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||
|
@@ -70,7 +70,7 @@ func (g *gbClient) SendMessage(msg interface{}) {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
request, err := BuildMessageRequest(g.sipUA.Username, g.sipUA.ListenAddr, g.sipUA.SeverID, g.sipUA.ServerAddr, g.sipUA.Transport, string(marshal))
|
||||
request, err := BuildMessageRequest(g.sipUA.Username, g.sipUA.ListenAddr, g.sipUA.ServerID, g.sipUA.ServerAddr, g.sipUA.Transport, string(marshal))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@@ -10,6 +10,8 @@ type DaoChannel interface {
|
||||
|
||||
UpdateChannelStatus(deviceId, channelId, status string) error
|
||||
|
||||
QueryChannelByID(id uint) (*Channel, error)
|
||||
|
||||
QueryChannel(deviceId string, channelId string) (*Channel, error)
|
||||
|
||||
QueryChannels(deviceId, groupId, string, page, size int) ([]*Channel, int, error)
|
||||
@@ -33,6 +35,10 @@ type DaoChannel interface {
|
||||
QueryJTChannelBySimNumber(simNumber string) (*Channel, error)
|
||||
|
||||
DeleteChannel(deviceId string, channelId string) error
|
||||
|
||||
UpdateRootID(rootId, newRootId string) error
|
||||
|
||||
UpdateChannel(channel *Channel) error
|
||||
}
|
||||
|
||||
type daoChannel struct {
|
||||
@@ -52,6 +58,15 @@ func (d *daoChannel) UpdateChannelStatus(deviceId, channelId, status string) err
|
||||
return db.Model(&Channel{}).Where("root_id =? and device_id =?", deviceId, channelId).Update("status", status).Error
|
||||
}
|
||||
|
||||
func (d *daoChannel) QueryChannelByID(id uint) (*Channel, error) {
|
||||
var channel Channel
|
||||
tx := db.Where("id =?", id).Take(&channel)
|
||||
if tx.Error != nil {
|
||||
return nil, tx.Error
|
||||
}
|
||||
return &channel, nil
|
||||
}
|
||||
|
||||
func (d *daoChannel) QueryChannel(deviceId string, channelId string) (*Channel, error) {
|
||||
var channel Channel
|
||||
tx := db.Where("root_id =? and device_id =?", deviceId, channelId).Take(&channel)
|
||||
@@ -153,3 +168,18 @@ func (d *daoChannel) QueryChannelsByChannelID(channelId string) ([]*Channel, err
|
||||
}
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
func (d *daoChannel) UpdateRootID(rootId, newRootId string) error {
|
||||
channel := &Channel{
|
||||
RootID: newRootId,
|
||||
GroupID: newRootId,
|
||||
ParentID: newRootId,
|
||||
}
|
||||
return db.Model(channel).Where("root_id =?", rootId).Select("root_id", "group_id", "parent_id").Updates(channel).Error
|
||||
}
|
||||
|
||||
func (d *daoChannel) UpdateChannel(channel *Channel) error {
|
||||
return DBTransaction(func(tx *gorm.DB) error {
|
||||
return tx.Model(channel).Where("id =?", channel.ID).Updates(channel).Error
|
||||
})
|
||||
}
|
||||
|
49
dao_jt.go
49
dao_jt.go
@@ -1,18 +1,28 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// JTDeviceModel 数据库表结构
|
||||
type JTDeviceModel struct {
|
||||
GBModel
|
||||
SIPUAOptions
|
||||
// SIPUAOptions
|
||||
|
||||
Name string `json:"name"` // display name, 国标DeviceInfo消息中的Name
|
||||
Username string `json:"username" gorm:"uniqueIndex"` // 用户名
|
||||
SeverID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复.
|
||||
ServerAddr string `json:"server_addr"` // 上级地址, 必选
|
||||
Transport string `json:"transport"` // 上级通信方式, UDP/TCP
|
||||
Password string `json:"password"` // 密码
|
||||
RegisterExpires int `json:"register_expires"` // 注册有效期
|
||||
KeepaliveInterval int `json:"keepalive_interval"` // 心跳间隔
|
||||
Status OnlineStatus `json:"status"` // 在线状态
|
||||
|
||||
Manufacturer string `json:"manufacturer"`
|
||||
Model string `json:"model"`
|
||||
Firmware string `json:"firmware"`
|
||||
SimNumber string `json:"sim_number"`
|
||||
SimNumber string `json:"sim_number" gorm:"uniqueIndex"`
|
||||
}
|
||||
|
||||
func (g *JTDeviceModel) TableName() string {
|
||||
@@ -29,6 +39,8 @@ type DaoJTDevice interface {
|
||||
|
||||
QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error)
|
||||
|
||||
QueryDeviceByID(id uint) (*JTDeviceModel, error)
|
||||
|
||||
ExistDevice(username, simNumber string) bool
|
||||
|
||||
DeleteDevice(username string) error
|
||||
@@ -77,7 +89,11 @@ func (d *daoJTDevice) QueryDevice(id string) (*JTDeviceModel, error) {
|
||||
|
||||
func (d *daoJTDevice) DeleteDevice(id string) error {
|
||||
return DBTransaction(func(tx *gorm.DB) error {
|
||||
return tx.Where("username =?", id).Unscoped().Delete(&JTDeviceModel{}).Error
|
||||
err := tx.Where("username =?", id).Unscoped().Delete(&JTDeviceModel{}).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Where("root_id =?", id).Unscoped().Delete(&Channel{}).Error
|
||||
})
|
||||
}
|
||||
|
||||
@@ -91,28 +107,23 @@ func (d *daoJTDevice) QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel,
|
||||
return &device, nil
|
||||
}
|
||||
|
||||
func (d *daoJTDevice) QueryDeviceByID(id uint) (*JTDeviceModel, error) {
|
||||
var device JTDeviceModel
|
||||
tx := db.Where("id =?", id).Take(&device)
|
||||
if tx.Error != nil {
|
||||
return nil, tx.Error
|
||||
}
|
||||
return &device, nil
|
||||
}
|
||||
|
||||
func (d *daoJTDevice) SaveDevice(model *JTDeviceModel) error {
|
||||
return DBTransaction(func(tx *gorm.DB) error {
|
||||
var old JTDeviceModel
|
||||
tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old)
|
||||
if tx.Error == nil {
|
||||
return fmt.Errorf("username or sim number already exists")
|
||||
}
|
||||
|
||||
return db.Save(model).Error
|
||||
return db.Create(model).Error
|
||||
})
|
||||
}
|
||||
|
||||
func (d *daoJTDevice) UpdateDevice(model *JTDeviceModel) error {
|
||||
return DBTransaction(func(tx *gorm.DB) error {
|
||||
var old JTDeviceModel
|
||||
tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old)
|
||||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
} else {
|
||||
model.ID = old.ID
|
||||
}
|
||||
|
||||
return db.Save(model).Error
|
||||
})
|
||||
}
|
||||
|
13
dao_sink.go
13
dao_sink.go
@@ -29,6 +29,8 @@ type DaoSink interface {
|
||||
DeleteForwardSinkByCallID(callID string) (*Sink, error)
|
||||
|
||||
DeleteForwardSinkBySinkStreamID(sinkStreamID StreamID) (*Sink, error)
|
||||
|
||||
DeleteForwardSinksByServerAddr(addr string) ([]*Sink, error)
|
||||
}
|
||||
|
||||
type daoSink struct {
|
||||
@@ -155,3 +157,14 @@ func (d *daoSink) DeleteForwardSinksByIds(ids []uint) error {
|
||||
return tx.Where("id in?", ids).Unscoped().Delete(&Sink{}).Error
|
||||
})
|
||||
}
|
||||
|
||||
func (d *daoSink) DeleteForwardSinksByServerAddr(addr string) ([]*Sink, error) {
|
||||
var sinks []*Sink
|
||||
tx := db.Where("server_addr =?", addr).Find(&sinks)
|
||||
if tx.Error != nil {
|
||||
return nil, tx.Error
|
||||
}
|
||||
return sinks, DBTransaction(func(tx *gorm.DB) error {
|
||||
return tx.Where("server_addr =?", addr).Unscoped().Delete(&Sink{}).Error
|
||||
})
|
||||
}
|
||||
|
@@ -23,6 +23,8 @@ type DaoStream interface {
|
||||
QueryStreamByCallID(callID string) (*Stream, error)
|
||||
|
||||
DeleteStreamByCallID(callID string) (*Stream, error)
|
||||
|
||||
DeleteStreamByDeviceID(deviceID string) ([]*Stream, error)
|
||||
}
|
||||
|
||||
type daoStream struct {
|
||||
@@ -133,3 +135,19 @@ func (d *daoStream) DeleteStreamByCallID(callID string) (*Stream, error) {
|
||||
return tx.Where("call_id =?", callID).Unscoped().Delete(&Stream{}).Error
|
||||
})
|
||||
}
|
||||
|
||||
func (d *daoStream) DeleteStreamByDeviceID(deviceID string) ([]*Stream, error) {
|
||||
var streams []*Stream
|
||||
tx := db.Where("device_id =?", deviceID).Find(&streams)
|
||||
if tx.Error != nil {
|
||||
return nil, tx.Error
|
||||
}
|
||||
_ = DBTransaction(func(tx *gorm.DB) error {
|
||||
for _, stream := range streams {
|
||||
_ = tx.Where("stream_id =?", stream.StreamID).Unscoped().Delete(&Stream{})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return streams, nil
|
||||
}
|
||||
|
13
device.go
13
device.go
@@ -272,19 +272,6 @@ func (d *Device) Close() {
|
||||
// 更新在数据库中的状态
|
||||
d.Status = OFF
|
||||
_ = DeviceDao.UpdateDeviceStatus(d.DeviceID, OFF)
|
||||
|
||||
// 释放所有推流
|
||||
//all := StreamManager.All()
|
||||
//var streams []*Stream
|
||||
//for _, stream := range all {
|
||||
// if d.DeviceID == stream.StreamID.DeviceID() {
|
||||
// streams = append(streams, stream)
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//for _, stream := range streams {
|
||||
// stream.Close(true, true)
|
||||
//}
|
||||
}
|
||||
|
||||
// CreateDialogRequestFromAnswer 根据invite的应答创建Dialog请求
|
||||
|
39
jt_device.go
39
jt_device.go
@@ -14,6 +14,8 @@ type JTDevice struct {
|
||||
}
|
||||
|
||||
func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response {
|
||||
Sugar.Infof("收到1078的Invite请求 sim number: %s device: %s channel: %s", g.simNumber, g.username, user)
|
||||
|
||||
// 通知1078的信令服务器
|
||||
channels, _ := ChannelDao.QueryChannelsByChannelID(user)
|
||||
if len(channels) < 1 {
|
||||
@@ -54,8 +56,43 @@ func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response {
|
||||
return response
|
||||
}
|
||||
|
||||
func (g *JTDevice) Start() {
|
||||
Sugar.Infof("启动部标设备, deivce: %s transport: %s addr: %s", g.Username, g.sipUA.Transport, g.sipUA.ServerAddr)
|
||||
g.sipUA.Start()
|
||||
g.sipUA.SetOnRegisterHandler(g.Online, g.Offline)
|
||||
}
|
||||
|
||||
func (g *JTDevice) Online() {
|
||||
Sugar.Infof("部标设备上线 sim number: %s device: %s server addr: %s", g.simNumber, g.Username, g.ServerAddr)
|
||||
|
||||
if err := JTDeviceDao.UpdateOnlineStatus(ON, g.Username); err != nil {
|
||||
Sugar.Infof("更新部标设备状态失败 err: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func (g *JTDevice) Offline() {
|
||||
Sugar.Infof("部标设备离线 sim number: %s device: %s server addr: %s", g.simNumber, g.Username, g.ServerAddr)
|
||||
|
||||
if err := JTDeviceDao.UpdateOnlineStatus(OFF, g.Username); err != nil {
|
||||
Sugar.Infof("更新部标设备状态失败 err: %s", err.Error())
|
||||
}
|
||||
|
||||
// 释放所有推流
|
||||
g.CloseStreams(true, true)
|
||||
}
|
||||
|
||||
func NewJTDevice(model *JTDeviceModel, ua SipServer) (*JTDevice, error) {
|
||||
platform, err := NewPlatform(&model.SIPUAOptions, ua)
|
||||
platform, err := NewPlatform(&SIPUAOptions{
|
||||
Name: model.Name,
|
||||
Username: model.Username,
|
||||
ServerID: model.SeverID,
|
||||
ServerAddr: model.ServerAddr,
|
||||
Transport: model.Transport,
|
||||
Password: model.Password,
|
||||
RegisterExpires: model.RegisterExpires,
|
||||
KeepaliveInterval: model.KeepaliveInterval,
|
||||
Status: model.Status,
|
||||
}, ua)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
6
live.go
6
live.go
@@ -40,8 +40,10 @@ func (i *InviteType) SessionName2Type(name string) {
|
||||
|
||||
func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*Stream, error) {
|
||||
stream := &Stream{
|
||||
StreamID: streamId,
|
||||
Protocol: SourceType28181,
|
||||
DeviceID: streamId.DeviceID(),
|
||||
ChannelID: streamId.ChannelID(),
|
||||
StreamID: streamId,
|
||||
Protocol: SourceType28181,
|
||||
}
|
||||
|
||||
// 先添加占位置, 防止重复请求
|
||||
|
66
platform.go
66
platform.go
@@ -7,7 +7,6 @@ import (
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -17,22 +16,6 @@ const (
|
||||
|
||||
type Platform struct {
|
||||
*gbClient
|
||||
lock sync.Mutex
|
||||
sinks map[string]StreamID // 保存级联转发的sink, 方便离线的时候关闭sink
|
||||
}
|
||||
|
||||
func (g *Platform) addSink(callId string, stream StreamID) {
|
||||
g.lock.Lock()
|
||||
defer g.lock.Unlock()
|
||||
g.sinks[callId] = stream
|
||||
}
|
||||
|
||||
func (g *Platform) removeSink(callId string) StreamID {
|
||||
g.lock.Lock()
|
||||
defer g.lock.Unlock()
|
||||
stream := g.sinks[callId]
|
||||
delete(g.sinks, callId)
|
||||
return stream
|
||||
}
|
||||
|
||||
// OnBye 被上级挂断
|
||||
@@ -43,36 +26,23 @@ func (g *Platform) OnBye(request sip.Request) {
|
||||
|
||||
// CloseStream 关闭级联会话
|
||||
func (g *Platform) CloseStream(callId string, bye, ms bool) {
|
||||
_ = g.removeSink(callId)
|
||||
sink := RemoveForwardSinkWithCallId(callId)
|
||||
if sink == nil {
|
||||
Sugar.Errorf("关闭转发sink失败, 找不到sink. callid: %s", callId)
|
||||
return
|
||||
sink, _ := SinkDao.DeleteForwardSinkByCallID(callId)
|
||||
if sink != nil {
|
||||
sink.Close(bye, ms)
|
||||
}
|
||||
|
||||
sink.Close(bye, ms)
|
||||
}
|
||||
|
||||
// CloseStreams 关闭所有级联会话
|
||||
func (g *Platform) CloseStreams(bye, ms bool) {
|
||||
var callIds []string
|
||||
g.lock.Lock()
|
||||
|
||||
for k := range g.sinks {
|
||||
callIds = append(callIds, k)
|
||||
}
|
||||
|
||||
g.sinks = make(map[string]StreamID)
|
||||
g.lock.Unlock()
|
||||
|
||||
for _, id := range callIds {
|
||||
g.CloseStream(id, bye, ms)
|
||||
sinks, _ := SinkDao.DeleteForwardSinksByServerAddr(g.ServerAddr)
|
||||
for _, sink := range sinks {
|
||||
sink.Close(bye, ms)
|
||||
}
|
||||
}
|
||||
|
||||
// OnInvite 被上级呼叫
|
||||
func (g *Platform) OnInvite(request sip.Request, user string) sip.Response {
|
||||
Sugar.Infof("收到上级Invite请求 platform: %s channel: %s sdp: %s", g.SeverID, user, request.Body())
|
||||
Sugar.Infof("收到上级Invite请求 platform: %s channel: %s sdp: %s", g.ServerID, user, request.Body())
|
||||
|
||||
source := request.Source()
|
||||
platform := PlatformManager.Find(source)
|
||||
@@ -80,7 +50,7 @@ func (g *Platform) OnInvite(request sip.Request, user string) sip.Response {
|
||||
|
||||
deviceId, channel, err := PlatformDao.QueryPlatformChannel(g.ServerAddr, user)
|
||||
if err != nil {
|
||||
Sugar.Errorf("处理上级Invite失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverID, user)
|
||||
Sugar.Errorf("处理上级Invite失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.ServerID, user)
|
||||
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
@@ -139,33 +109,33 @@ func (g *Platform) Stop() {
|
||||
}
|
||||
|
||||
func (g *Platform) Online() {
|
||||
Sugar.Infof("ua上线 device: %s server addr: %s", g.Username, g.ServerAddr)
|
||||
Sugar.Infof("级联设备上线 device: %s server addr: %s", g.Username, g.ServerAddr)
|
||||
|
||||
if err := PlatformDao.UpdateOnlineStatus(ON, g.ServerAddr); err != nil {
|
||||
Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr)
|
||||
Sugar.Infof("更新级联设备状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr)
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Platform) Offline() {
|
||||
Sugar.Infof("ua离线 device: %s server addr: %s", g.Username, g.ServerAddr)
|
||||
Sugar.Infof("级联设备离线 device: %s server addr: %s", g.Username, g.ServerAddr)
|
||||
|
||||
if err := PlatformDao.UpdateOnlineStatus(OFF, g.ServerAddr); err != nil {
|
||||
Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr)
|
||||
Sugar.Infof("更新级联设备状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr)
|
||||
}
|
||||
|
||||
// 释放所有推流
|
||||
g.CloseStreams(true, true)
|
||||
}
|
||||
|
||||
func NewPlatform(record *SIPUAOptions, ua SipServer) (*Platform, error) {
|
||||
if len(record.SeverID) != 20 {
|
||||
return nil, fmt.Errorf("SeverID must be exactly 20 characters long")
|
||||
func NewPlatform(options *SIPUAOptions, ua SipServer) (*Platform, error) {
|
||||
if len(options.ServerID) != 20 {
|
||||
return nil, fmt.Errorf("ServerID must be exactly 20 characters long")
|
||||
}
|
||||
|
||||
if _, err := netip.ParseAddrPort(record.ServerAddr); err != nil {
|
||||
if _, err := netip.ParseAddrPort(options.ServerAddr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := NewGBClient(record, ua)
|
||||
return &Platform{gbClient: client.(*gbClient), sinks: make(map[string]StreamID, 8)}, nil
|
||||
client := NewGBClient(options, ua)
|
||||
return &Platform{gbClient: client.(*gbClient)}, nil
|
||||
}
|
||||
|
@@ -20,7 +20,7 @@ func startPlatformDevices() {
|
||||
utils.Assert(PlatformManager.Add(platform.ServerAddr, platform))
|
||||
|
||||
if err := PlatformDao.UpdateOnlineStatus(OFF, record.ServerAddr); err != nil {
|
||||
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.SeverID)
|
||||
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.ServerID)
|
||||
}
|
||||
|
||||
platform.Start()
|
||||
|
@@ -270,7 +270,7 @@ func (s *sipServer) OnMessage(wrapper *SipRequestSource) {
|
||||
}
|
||||
|
||||
if ok = device != nil; !ok {
|
||||
Sugar.Errorf("处理上级请求消息失败, 找不到级联设备 addr: %s request: %s", wrapper.req.Source(), wrapper.req.String())
|
||||
Sugar.Errorf("处理上级请求消息失败, 找不到设备 addr: %s request: %s", wrapper.req.Source(), wrapper.req.String())
|
||||
return
|
||||
}
|
||||
|
||||
|
25
sip_ua.go
25
sip_ua.go
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/lkmio/avformat/utils"
|
||||
"math"
|
||||
"net"
|
||||
"net/netip"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
@@ -44,7 +45,7 @@ type SIPUA interface {
|
||||
type SIPUAOptions struct {
|
||||
Name string `json:"name"` // display name, 国标DeviceInfo消息中的Name
|
||||
Username string `json:"username"` // 用户名
|
||||
SeverID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复.
|
||||
ServerID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复.
|
||||
ServerAddr string `json:"server_addr"` // 上级地址, 必选
|
||||
Transport string `json:"transport"` // 上级通信方式, UDP/TCP
|
||||
Password string `json:"password"` // 密码
|
||||
@@ -53,6 +54,24 @@ type SIPUAOptions struct {
|
||||
Status OnlineStatus `json:"status"` // 在线状态
|
||||
}
|
||||
|
||||
func EqualSipUAOptions(old, new *SIPUAOptions) bool {
|
||||
if old.Username != new.Username || old.ServerID != new.ServerID || old.ServerAddr != new.ServerAddr || old.Transport != new.Transport || old.Password != new.Password || old.RegisterExpires != new.RegisterExpires || old.KeepaliveInterval != new.KeepaliveInterval {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func CheckSipUAOptions(options *SIPUAOptions) error {
|
||||
if len(options.Username) != 20 {
|
||||
return fmt.Errorf("invalid username: %s", options.Username)
|
||||
} else if len(options.ServerID) != 20 {
|
||||
return fmt.Errorf("invalid server id: %s", options.ServerID)
|
||||
} else if _, err := netip.ParseAddrPort(options.ServerAddr); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type sipUA struct {
|
||||
SIPUAOptions
|
||||
|
||||
@@ -117,7 +136,7 @@ func (g *sipUA) doRegister(request sip.Request) bool {
|
||||
}
|
||||
|
||||
func (g *sipUA) startNewRegister() bool {
|
||||
builder := NewRequestBuilder(sip.REGISTER, g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport)
|
||||
builder := NewRequestBuilder(sip.REGISTER, g.Username, g.ListenAddr, g.ServerID, g.ServerAddr, g.Transport)
|
||||
expires := sip.Expires(g.RegisterExpires)
|
||||
builder.SetExpires(&expires)
|
||||
|
||||
@@ -175,7 +194,7 @@ func (g *sipUA) doUnregister() {
|
||||
|
||||
func (g *sipUA) doKeepalive() bool {
|
||||
body := fmt.Sprintf(KeepAliveBody, time.Now().UnixMilli()/1000, g.Username)
|
||||
request, err := BuildMessageRequest(g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport, body)
|
||||
request, err := BuildMessageRequest(g.Username, g.ListenAddr, g.ServerID, g.ServerAddr, g.Transport, body)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@@ -79,6 +79,8 @@ func (r *RequestWrapper) Scan(value interface{}) error {
|
||||
|
||||
type Stream struct {
|
||||
GBModel
|
||||
DeviceID string // 下级设备ID, 统计某个设备的所有流/1078设备为sim number
|
||||
ChannelID string // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number
|
||||
StreamID StreamID `json:"stream_id"` // 流ID
|
||||
Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
|
||||
Dialog *RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话
|
||||
|
Reference in New Issue
Block a user