diff --git a/api.go b/api.go index a3d3a68..8aab425 100644 --- a/api.go +++ b/api.go @@ -315,14 +315,13 @@ func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r * stream.Put(200) } - // 对讲websocket已连接 // 创建stream - if params.Protocol == SourceTypeGBTalk { - Sugar.Infof("对讲websocket已连接, stream: %s", params.Stream) - + if params.Protocol == SourceTypeGBTalk || params.Protocol == SourceType1078 { s := &Stream{ - StreamID: params.Stream, - Protocol: params.Protocol, + DeviceID: params.Stream.DeviceID(), + ChannelID: params.Stream.ChannelID(), + StreamID: params.Stream, + Protocol: params.Protocol, } _, ok := StreamDao.SaveStream(s) @@ -743,7 +742,7 @@ func (api *ApiServer) OnPlatformAdd(v *PlatformModel, w http.ResponseWriter, r * err := fmt.Errorf("用户名长度必须20位") Sugar.Errorf("添加级联设备失败 err: %s", err.Error()) return nil, err - } else if len(v.SeverID) != 20 { + } else if len(v.ServerID) != 20 { err := fmt.Errorf("上级ID长度必须20位") Sugar.Errorf("添加级联设备失败 err: %s", err.Error()) return nil, err diff --git a/api_jt.go b/api_jt.go index 203b53e..66bac9b 100644 --- a/api_jt.go +++ b/api_jt.go @@ -5,61 +5,144 @@ import ( "net/http" ) -func (api *ApiServer) OnVirtualDeviceAdd(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { - Sugar.Infof("add virtual device: %v", *device) - - if len(device.Username) != 20 { - Sugar.Errorf("invalid username: %s", device.Username) - return nil, fmt.Errorf("invalid username: %s", device.Username) - } else if len(device.SeverID) != 20 { - Sugar.Errorf("invalid server id: %s", device.SeverID) - return nil, fmt.Errorf("invalid server id: %s", device.SeverID) +func CheckJTDeviceOptions(device *JTDeviceModel) error { + if err := CheckSipUAOptions(&SIPUAOptions{ + Username: device.Username, + ServerAddr: device.ServerAddr, + ServerID: device.SeverID, + }); err != nil { + return err } else if device.SimNumber == "" { // sim卡号必选项 - Sugar.Errorf("sim number is required") - return nil, fmt.Errorf("sim number is required") - } - - if JTDeviceDao.ExistDevice(device.Username, device.SimNumber) { - // 用户名或sim卡号已存在 - Sugar.Errorf("username or sim number already exists") - return nil, fmt.Errorf("username or sim number already exists") + return fmt.Errorf("sim number is required") } else if DeviceDao.ExistDevice(device.Username) { // 用户名与下级设备冲突 - Sugar.Errorf("username already exists") - return nil, fmt.Errorf("username already exists") + return fmt.Errorf("username already exists") } + return nil +} + +func SaveAndStartJTDevice(device *JTDeviceModel) error { jtDevice, err := NewJTDevice(device, SipStack) if err != nil { Sugar.Errorf("create virtual device failed: %s", err.Error()) - return nil, err + return err } if !JTDeviceManager.Add(device.Username, jtDevice) { - return nil, fmt.Errorf("ua添加失败, id冲突. key: %s", device.Username) - } else if err = JTDeviceDao.SaveDevice(device); err != nil { - JTDeviceManager.Remove(device.Username) + return fmt.Errorf("ua添加失败, id冲突. key: %s", device.Username) + } + + jtDevice.Start() + return nil +} + +func EqualJTDeviceOptions(old, new *JTDeviceModel) bool { + return EqualSipUAOptions(&SIPUAOptions{ + Username: old.Username, + ServerAddr: old.ServerAddr, + Transport: old.Transport, + RegisterExpires: old.RegisterExpires, + Password: old.Password, + KeepaliveInterval: old.KeepaliveInterval, + ServerID: old.SeverID, + }, &SIPUAOptions{ + Username: new.Username, + ServerAddr: new.ServerAddr, + Transport: new.Transport, + RegisterExpires: new.RegisterExpires, + Password: new.Password, + KeepaliveInterval: new.KeepaliveInterval, + ServerID: new.SeverID, + }) +} + +func (api *ApiServer) OnVirtualDeviceAdd(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + Sugar.Infof("add virtual device: %v", *device) + + err := CheckJTDeviceOptions(device) + if err != nil { + Sugar.Errorf("%s", err.Error()) + return nil, err + } else if JTDeviceManager.Find(device.Username) != nil { + Sugar.Errorf("jt device already exists: %s", device.Username) + return nil, fmt.Errorf("jt device already exists: %s", device.Username) + } + + err = JTDeviceDao.SaveDevice(device) + if err != nil { Sugar.Errorf("save device failed: %s", err.Error()) return nil, err } - jtDevice.Start() - + err = SaveAndStartJTDevice(device) if err != nil { - Sugar.Errorf("add jt device failed: %s", err.Error()) - return nil, err + Sugar.Errorf("start device failed: %s", err.Error()) } - return nil, nil + return nil, err } func (api *ApiServer) OnVirtualDeviceEdit(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + Sugar.Infof("edit virtual device: %v", *device) - return nil, nil + err := CheckJTDeviceOptions(device) + if err != nil { + Sugar.Errorf("%s", err.Error()) + return nil, err + } + + oldDevice, err := JTDeviceDao.QueryDeviceByID(device.ID) + if err != nil { + Sugar.Errorf("jt device not found: %d", device.ID) + return nil, err + } + + if err = JTDeviceDao.UpdateDevice(device); err != nil { + Sugar.Errorf("update device failed: %s", err.Error()) + return nil, err + } + + // 国标ID发生变更, 更新通道的RootID + if oldDevice.Username != device.Username { + _ = ChannelDao.UpdateRootID(oldDevice.Username, device.Username) + } + + // sim卡号发生变更, 告知media server关闭推流, 关闭与上级的转发sink + if oldDevice.SimNumber != device.SimNumber { + Sugar.Infof("sim number changed, close streams") + streams, _ := StreamDao.DeleteStreamByDeviceID(oldDevice.SimNumber) + for _, stream := range streams { + stream.Close(true, true) + } + } + + // SipUA信息发生变更, 则需要重启设备 + if !EqualJTDeviceOptions(oldDevice, device) { + Sugar.Infof("sipua options changed, restart device") + // 重启设备 + if client := JTDeviceManager.Remove(oldDevice.Username); client != nil { + client.Stop() + } + + err = SaveAndStartJTDevice(device) + if err != nil { + Sugar.Errorf("update device failed: %s", err.Error()) + } + } else { + Sugar.Infof("device info changed, update device info") + if client := JTDeviceManager.Find(oldDevice.Username); client != nil { + client.SetDeviceInfo(device.Name, device.Manufacturer, device.Model, device.Firmware) + } + } + + return nil, err } func (api *ApiServer) OnVirtualDeviceRemove(device *JTDeviceModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + Sugar.Infof("remove virtual device: %v", *device) + err := JTDeviceDao.DeleteDevice(device.Username) if err != nil { return nil, err @@ -95,8 +178,25 @@ func (api *ApiServer) OnVirtualChannelAdd(channel *Channel, w http.ResponseWrite } func (api *ApiServer) OnVirtualChannelEdit(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + Sugar.Infof("edit virtual channel: %v", *channel) + + oldChannel, err := ChannelDao.QueryChannelByID(channel.ID) + if err != nil { + Sugar.Errorf("query channel failed: %s", err.Error()) + return nil, err + } - return nil, nil + if oldChannel.Name == channel.Name { + // 目前只支持修改通道名称 + Sugar.Warnf("channel name not changed: %s", channel.Name) + return nil, nil + } + + err = ChannelDao.UpdateChannel(channel) + if err != nil { + Sugar.Errorf("update channel failed: %s", err.Error()) + } + return nil, err } func (api *ApiServer) OnVirtualChannelRemove(channel *Channel, w http.ResponseWriter, r *http.Request) (interface{}, error) { diff --git a/client.go b/client.go index aefab1e..0565722 100644 --- a/client.go +++ b/client.go @@ -70,7 +70,7 @@ func (g *gbClient) SendMessage(msg interface{}) { panic(err) } - request, err := BuildMessageRequest(g.sipUA.Username, g.sipUA.ListenAddr, g.sipUA.SeverID, g.sipUA.ServerAddr, g.sipUA.Transport, string(marshal)) + request, err := BuildMessageRequest(g.sipUA.Username, g.sipUA.ListenAddr, g.sipUA.ServerID, g.sipUA.ServerAddr, g.sipUA.Transport, string(marshal)) if err != nil { panic(err) } diff --git a/dao_channel.go b/dao_channel.go index 49890ea..1a13f15 100644 --- a/dao_channel.go +++ b/dao_channel.go @@ -10,6 +10,8 @@ type DaoChannel interface { UpdateChannelStatus(deviceId, channelId, status string) error + QueryChannelByID(id uint) (*Channel, error) + QueryChannel(deviceId string, channelId string) (*Channel, error) QueryChannels(deviceId, groupId, string, page, size int) ([]*Channel, int, error) @@ -33,6 +35,10 @@ type DaoChannel interface { QueryJTChannelBySimNumber(simNumber string) (*Channel, error) DeleteChannel(deviceId string, channelId string) error + + UpdateRootID(rootId, newRootId string) error + + UpdateChannel(channel *Channel) error } type daoChannel struct { @@ -52,6 +58,15 @@ func (d *daoChannel) UpdateChannelStatus(deviceId, channelId, status string) err return db.Model(&Channel{}).Where("root_id =? and device_id =?", deviceId, channelId).Update("status", status).Error } +func (d *daoChannel) QueryChannelByID(id uint) (*Channel, error) { + var channel Channel + tx := db.Where("id =?", id).Take(&channel) + if tx.Error != nil { + return nil, tx.Error + } + return &channel, nil +} + func (d *daoChannel) QueryChannel(deviceId string, channelId string) (*Channel, error) { var channel Channel tx := db.Where("root_id =? and device_id =?", deviceId, channelId).Take(&channel) @@ -153,3 +168,18 @@ func (d *daoChannel) QueryChannelsByChannelID(channelId string) ([]*Channel, err } return channels, nil } + +func (d *daoChannel) UpdateRootID(rootId, newRootId string) error { + channel := &Channel{ + RootID: newRootId, + GroupID: newRootId, + ParentID: newRootId, + } + return db.Model(channel).Where("root_id =?", rootId).Select("root_id", "group_id", "parent_id").Updates(channel).Error +} + +func (d *daoChannel) UpdateChannel(channel *Channel) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Model(channel).Where("id =?", channel.ID).Updates(channel).Error + }) +} diff --git a/dao_jt.go b/dao_jt.go index a3c29c0..9f04e42 100644 --- a/dao_jt.go +++ b/dao_jt.go @@ -1,18 +1,28 @@ package main import ( - "fmt" "gorm.io/gorm" ) // JTDeviceModel 数据库表结构 type JTDeviceModel struct { GBModel - SIPUAOptions + // SIPUAOptions + + Name string `json:"name"` // display name, 国标DeviceInfo消息中的Name + Username string `json:"username" gorm:"uniqueIndex"` // 用户名 + SeverID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复. + ServerAddr string `json:"server_addr"` // 上级地址, 必选 + Transport string `json:"transport"` // 上级通信方式, UDP/TCP + Password string `json:"password"` // 密码 + RegisterExpires int `json:"register_expires"` // 注册有效期 + KeepaliveInterval int `json:"keepalive_interval"` // 心跳间隔 + Status OnlineStatus `json:"status"` // 在线状态 + Manufacturer string `json:"manufacturer"` Model string `json:"model"` Firmware string `json:"firmware"` - SimNumber string `json:"sim_number"` + SimNumber string `json:"sim_number" gorm:"uniqueIndex"` } func (g *JTDeviceModel) TableName() string { @@ -29,6 +39,8 @@ type DaoJTDevice interface { QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error) + QueryDeviceByID(id uint) (*JTDeviceModel, error) + ExistDevice(username, simNumber string) bool DeleteDevice(username string) error @@ -77,7 +89,11 @@ func (d *daoJTDevice) QueryDevice(id string) (*JTDeviceModel, error) { func (d *daoJTDevice) DeleteDevice(id string) error { return DBTransaction(func(tx *gorm.DB) error { - return tx.Where("username =?", id).Unscoped().Delete(&JTDeviceModel{}).Error + err := tx.Where("username =?", id).Unscoped().Delete(&JTDeviceModel{}).Error + if err != nil { + return err + } + return tx.Where("root_id =?", id).Unscoped().Delete(&Channel{}).Error }) } @@ -91,28 +107,23 @@ func (d *daoJTDevice) QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, return &device, nil } +func (d *daoJTDevice) QueryDeviceByID(id uint) (*JTDeviceModel, error) { + var device JTDeviceModel + tx := db.Where("id =?", id).Take(&device) + if tx.Error != nil { + return nil, tx.Error + } + return &device, nil +} + func (d *daoJTDevice) SaveDevice(model *JTDeviceModel) error { return DBTransaction(func(tx *gorm.DB) error { - var old JTDeviceModel - tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old) - if tx.Error == nil { - return fmt.Errorf("username or sim number already exists") - } - - return db.Save(model).Error + return db.Create(model).Error }) } func (d *daoJTDevice) UpdateDevice(model *JTDeviceModel) error { return DBTransaction(func(tx *gorm.DB) error { - var old JTDeviceModel - tx = tx.Where("username =? or sim_number =?", model.Username, model.SimNumber).Select("id").First(&old) - if tx.Error != nil { - return tx.Error - } else { - model.ID = old.ID - } - return db.Save(model).Error }) } diff --git a/dao_sink.go b/dao_sink.go index 9f66d88..8c96b3a 100644 --- a/dao_sink.go +++ b/dao_sink.go @@ -29,6 +29,8 @@ type DaoSink interface { DeleteForwardSinkByCallID(callID string) (*Sink, error) DeleteForwardSinkBySinkStreamID(sinkStreamID StreamID) (*Sink, error) + + DeleteForwardSinksByServerAddr(addr string) ([]*Sink, error) } type daoSink struct { @@ -155,3 +157,14 @@ func (d *daoSink) DeleteForwardSinksByIds(ids []uint) error { return tx.Where("id in?", ids).Unscoped().Delete(&Sink{}).Error }) } + +func (d *daoSink) DeleteForwardSinksByServerAddr(addr string) ([]*Sink, error) { + var sinks []*Sink + tx := db.Where("server_addr =?", addr).Find(&sinks) + if tx.Error != nil { + return nil, tx.Error + } + return sinks, DBTransaction(func(tx *gorm.DB) error { + return tx.Where("server_addr =?", addr).Unscoped().Delete(&Sink{}).Error + }) +} diff --git a/dao_stream.go b/dao_stream.go index 5ec8e51..9da12fd 100644 --- a/dao_stream.go +++ b/dao_stream.go @@ -23,6 +23,8 @@ type DaoStream interface { QueryStreamByCallID(callID string) (*Stream, error) DeleteStreamByCallID(callID string) (*Stream, error) + + DeleteStreamByDeviceID(deviceID string) ([]*Stream, error) } type daoStream struct { @@ -133,3 +135,19 @@ func (d *daoStream) DeleteStreamByCallID(callID string) (*Stream, error) { return tx.Where("call_id =?", callID).Unscoped().Delete(&Stream{}).Error }) } + +func (d *daoStream) DeleteStreamByDeviceID(deviceID string) ([]*Stream, error) { + var streams []*Stream + tx := db.Where("device_id =?", deviceID).Find(&streams) + if tx.Error != nil { + return nil, tx.Error + } + _ = DBTransaction(func(tx *gorm.DB) error { + for _, stream := range streams { + _ = tx.Where("stream_id =?", stream.StreamID).Unscoped().Delete(&Stream{}) + } + return nil + }) + + return streams, nil +} diff --git a/device.go b/device.go index 6636daf..3ba3dec 100644 --- a/device.go +++ b/device.go @@ -272,19 +272,6 @@ func (d *Device) Close() { // 更新在数据库中的状态 d.Status = OFF _ = DeviceDao.UpdateDeviceStatus(d.DeviceID, OFF) - - // 释放所有推流 - //all := StreamManager.All() - //var streams []*Stream - //for _, stream := range all { - // if d.DeviceID == stream.StreamID.DeviceID() { - // streams = append(streams, stream) - // } - //} - // - //for _, stream := range streams { - // stream.Close(true, true) - //} } // CreateDialogRequestFromAnswer 根据invite的应答创建Dialog请求 diff --git a/jt_device.go b/jt_device.go index a86cb00..cba036c 100644 --- a/jt_device.go +++ b/jt_device.go @@ -14,6 +14,8 @@ type JTDevice struct { } func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response { + Sugar.Infof("收到1078的Invite请求 sim number: %s device: %s channel: %s", g.simNumber, g.username, user) + // 通知1078的信令服务器 channels, _ := ChannelDao.QueryChannelsByChannelID(user) if len(channels) < 1 { @@ -54,8 +56,43 @@ func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response { return response } +func (g *JTDevice) Start() { + Sugar.Infof("启动部标设备, deivce: %s transport: %s addr: %s", g.Username, g.sipUA.Transport, g.sipUA.ServerAddr) + g.sipUA.Start() + g.sipUA.SetOnRegisterHandler(g.Online, g.Offline) +} + +func (g *JTDevice) Online() { + Sugar.Infof("部标设备上线 sim number: %s device: %s server addr: %s", g.simNumber, g.Username, g.ServerAddr) + + if err := JTDeviceDao.UpdateOnlineStatus(ON, g.Username); err != nil { + Sugar.Infof("更新部标设备状态失败 err: %s", err.Error()) + } +} + +func (g *JTDevice) Offline() { + Sugar.Infof("部标设备离线 sim number: %s device: %s server addr: %s", g.simNumber, g.Username, g.ServerAddr) + + if err := JTDeviceDao.UpdateOnlineStatus(OFF, g.Username); err != nil { + Sugar.Infof("更新部标设备状态失败 err: %s", err.Error()) + } + + // 释放所有推流 + g.CloseStreams(true, true) +} + func NewJTDevice(model *JTDeviceModel, ua SipServer) (*JTDevice, error) { - platform, err := NewPlatform(&model.SIPUAOptions, ua) + platform, err := NewPlatform(&SIPUAOptions{ + Name: model.Name, + Username: model.Username, + ServerID: model.SeverID, + ServerAddr: model.ServerAddr, + Transport: model.Transport, + Password: model.Password, + RegisterExpires: model.RegisterExpires, + KeepaliveInterval: model.KeepaliveInterval, + Status: model.Status, + }, ua) if err != nil { return nil, err } diff --git a/live.go b/live.go index e3fe76a..ee7d5e4 100644 --- a/live.go +++ b/live.go @@ -40,8 +40,10 @@ func (i *InviteType) SessionName2Type(name string) { func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*Stream, error) { stream := &Stream{ - StreamID: streamId, - Protocol: SourceType28181, + DeviceID: streamId.DeviceID(), + ChannelID: streamId.ChannelID(), + StreamID: streamId, + Protocol: SourceType28181, } // 先添加占位置, 防止重复请求 diff --git a/platform.go b/platform.go index 448aa12..1a04f24 100644 --- a/platform.go +++ b/platform.go @@ -7,7 +7,6 @@ import ( "net/http" "net/netip" "strings" - "sync" ) const ( @@ -17,22 +16,6 @@ const ( type Platform struct { *gbClient - lock sync.Mutex - sinks map[string]StreamID // 保存级联转发的sink, 方便离线的时候关闭sink -} - -func (g *Platform) addSink(callId string, stream StreamID) { - g.lock.Lock() - defer g.lock.Unlock() - g.sinks[callId] = stream -} - -func (g *Platform) removeSink(callId string) StreamID { - g.lock.Lock() - defer g.lock.Unlock() - stream := g.sinks[callId] - delete(g.sinks, callId) - return stream } // OnBye 被上级挂断 @@ -43,36 +26,23 @@ func (g *Platform) OnBye(request sip.Request) { // CloseStream 关闭级联会话 func (g *Platform) CloseStream(callId string, bye, ms bool) { - _ = g.removeSink(callId) - sink := RemoveForwardSinkWithCallId(callId) - if sink == nil { - Sugar.Errorf("关闭转发sink失败, 找不到sink. callid: %s", callId) - return + sink, _ := SinkDao.DeleteForwardSinkByCallID(callId) + if sink != nil { + sink.Close(bye, ms) } - - sink.Close(bye, ms) } // CloseStreams 关闭所有级联会话 func (g *Platform) CloseStreams(bye, ms bool) { - var callIds []string - g.lock.Lock() - - for k := range g.sinks { - callIds = append(callIds, k) - } - - g.sinks = make(map[string]StreamID) - g.lock.Unlock() - - for _, id := range callIds { - g.CloseStream(id, bye, ms) + sinks, _ := SinkDao.DeleteForwardSinksByServerAddr(g.ServerAddr) + for _, sink := range sinks { + sink.Close(bye, ms) } } // OnInvite 被上级呼叫 func (g *Platform) OnInvite(request sip.Request, user string) sip.Response { - Sugar.Infof("收到上级Invite请求 platform: %s channel: %s sdp: %s", g.SeverID, user, request.Body()) + Sugar.Infof("收到上级Invite请求 platform: %s channel: %s sdp: %s", g.ServerID, user, request.Body()) source := request.Source() platform := PlatformManager.Find(source) @@ -80,7 +50,7 @@ func (g *Platform) OnInvite(request sip.Request, user string) sip.Response { deviceId, channel, err := PlatformDao.QueryPlatformChannel(g.ServerAddr, user) if err != nil { - Sugar.Errorf("处理上级Invite失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverID, user) + Sugar.Errorf("处理上级Invite失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.ServerID, user) return CreateResponseWithStatusCode(request, http.StatusInternalServerError) } @@ -139,33 +109,33 @@ func (g *Platform) Stop() { } func (g *Platform) Online() { - Sugar.Infof("ua上线 device: %s server addr: %s", g.Username, g.ServerAddr) + Sugar.Infof("级联设备上线 device: %s server addr: %s", g.Username, g.ServerAddr) if err := PlatformDao.UpdateOnlineStatus(ON, g.ServerAddr); err != nil { - Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr) + Sugar.Infof("更新级联设备状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr) } } func (g *Platform) Offline() { - Sugar.Infof("ua离线 device: %s server addr: %s", g.Username, g.ServerAddr) + Sugar.Infof("级联设备离线 device: %s server addr: %s", g.Username, g.ServerAddr) if err := PlatformDao.UpdateOnlineStatus(OFF, g.ServerAddr); err != nil { - Sugar.Infof("ua状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr) + Sugar.Infof("更新级联设备状态失败 err: %s server addr: %s", err.Error(), g.ServerAddr) } // 释放所有推流 g.CloseStreams(true, true) } -func NewPlatform(record *SIPUAOptions, ua SipServer) (*Platform, error) { - if len(record.SeverID) != 20 { - return nil, fmt.Errorf("SeverID must be exactly 20 characters long") +func NewPlatform(options *SIPUAOptions, ua SipServer) (*Platform, error) { + if len(options.ServerID) != 20 { + return nil, fmt.Errorf("ServerID must be exactly 20 characters long") } - if _, err := netip.ParseAddrPort(record.ServerAddr); err != nil { + if _, err := netip.ParseAddrPort(options.ServerAddr); err != nil { return nil, err } - client := NewGBClient(record, ua) - return &Platform{gbClient: client.(*gbClient), sinks: make(map[string]StreamID, 8)}, nil + client := NewGBClient(options, ua) + return &Platform{gbClient: client.(*gbClient)}, nil } diff --git a/recover.go b/recover.go index 44baf46..1b55fb7 100644 --- a/recover.go +++ b/recover.go @@ -20,7 +20,7 @@ func startPlatformDevices() { utils.Assert(PlatformManager.Add(platform.ServerAddr, platform)) if err := PlatformDao.UpdateOnlineStatus(OFF, record.ServerAddr); err != nil { - Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.SeverID) + Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.ServerID) } platform.Start() diff --git a/sip_server.go b/sip_server.go index bec4e66..d2a760e 100644 --- a/sip_server.go +++ b/sip_server.go @@ -270,7 +270,7 @@ func (s *sipServer) OnMessage(wrapper *SipRequestSource) { } if ok = device != nil; !ok { - Sugar.Errorf("处理上级请求消息失败, 找不到级联设备 addr: %s request: %s", wrapper.req.Source(), wrapper.req.String()) + Sugar.Errorf("处理上级请求消息失败, 找不到设备 addr: %s request: %s", wrapper.req.Source(), wrapper.req.String()) return } diff --git a/sip_ua.go b/sip_ua.go index 579f01e..19715b5 100644 --- a/sip_ua.go +++ b/sip_ua.go @@ -7,6 +7,7 @@ import ( "github.com/lkmio/avformat/utils" "math" "net" + "net/netip" "strconv" "time" ) @@ -44,7 +45,7 @@ type SIPUA interface { type SIPUAOptions struct { Name string `json:"name"` // display name, 国标DeviceInfo消息中的Name Username string `json:"username"` // 用户名 - SeverID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复. + ServerID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复. ServerAddr string `json:"server_addr"` // 上级地址, 必选 Transport string `json:"transport"` // 上级通信方式, UDP/TCP Password string `json:"password"` // 密码 @@ -53,6 +54,24 @@ type SIPUAOptions struct { Status OnlineStatus `json:"status"` // 在线状态 } +func EqualSipUAOptions(old, new *SIPUAOptions) bool { + if old.Username != new.Username || old.ServerID != new.ServerID || old.ServerAddr != new.ServerAddr || old.Transport != new.Transport || old.Password != new.Password || old.RegisterExpires != new.RegisterExpires || old.KeepaliveInterval != new.KeepaliveInterval { + return false + } + return true +} + +func CheckSipUAOptions(options *SIPUAOptions) error { + if len(options.Username) != 20 { + return fmt.Errorf("invalid username: %s", options.Username) + } else if len(options.ServerID) != 20 { + return fmt.Errorf("invalid server id: %s", options.ServerID) + } else if _, err := netip.ParseAddrPort(options.ServerAddr); err != nil { + return err + } + return nil +} + type sipUA struct { SIPUAOptions @@ -117,7 +136,7 @@ func (g *sipUA) doRegister(request sip.Request) bool { } func (g *sipUA) startNewRegister() bool { - builder := NewRequestBuilder(sip.REGISTER, g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport) + builder := NewRequestBuilder(sip.REGISTER, g.Username, g.ListenAddr, g.ServerID, g.ServerAddr, g.Transport) expires := sip.Expires(g.RegisterExpires) builder.SetExpires(&expires) @@ -175,7 +194,7 @@ func (g *sipUA) doUnregister() { func (g *sipUA) doKeepalive() bool { body := fmt.Sprintf(KeepAliveBody, time.Now().UnixMilli()/1000, g.Username) - request, err := BuildMessageRequest(g.Username, g.ListenAddr, g.SeverID, g.ServerAddr, g.Transport, body) + request, err := BuildMessageRequest(g.Username, g.ListenAddr, g.ServerID, g.ServerAddr, g.Transport, body) if err != nil { panic(err) } diff --git a/stream.go b/stream.go index ef047f5..6d2b28d 100644 --- a/stream.go +++ b/stream.go @@ -79,6 +79,8 @@ func (r *RequestWrapper) Scan(value interface{}) error { type Stream struct { GBModel + DeviceID string // 下级设备ID, 统计某个设备的所有流/1078设备为sim number + ChannelID string // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number StreamID StreamID `json:"stream_id"` // 流ID Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk Dialog *RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话