使用redis持久化保存设备、通道、级联等信息

This commit is contained in:
yangjiechina
2024-12-11 10:00:40 +08:00
parent 9f21399d3a
commit ea518d71a4
23 changed files with 1709 additions and 377 deletions

224
api.go
View File

@@ -43,8 +43,8 @@ type PlayDoneParams struct {
}
type QueryRecordParams struct {
DeviceId string `json:"device_id"`
ChannelId string `json:"channel_id"`
DeviceID string `json:"device_id"`
ChannelID string `json:"channel_id"`
Timeout int `json:"timeout"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
@@ -88,6 +88,19 @@ type StreamIDParams struct {
StreamID StreamID `json:"stream_id"`
}
type PageQuery struct {
PageNumber *int `json:"page_number"` // 页数
PageSize *int `json:"page_size"` // 每页大小
TotalPages int `json:"total_pages"` // 总页数
TotalCount int `json:"total_count"` // 总记录数
Data interface{} `json:"data"`
}
type PageQueryChannel struct {
PageQuery
DeviceID string `json:"device_id"`
}
var apiServer *ApiServer
func init() {
@@ -102,7 +115,7 @@ func init() {
}
}
func filterRequestBodyParams[T any](f func(params T, w http.ResponseWriter, req *http.Request), params interface{}) func(http.ResponseWriter, *http.Request) {
func withDecodedParams[T any](f func(params T, w http.ResponseWriter, req *http.Request), params interface{}) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
if err := HttpDecodeJSONBody(w, req, params); err != nil {
Sugar.Errorf("处理http请求失败 err: %s path: %s", err.Error(), req.URL.Path)
@@ -114,38 +127,46 @@ func filterRequestBodyParams[T any](f func(params T, w http.ResponseWriter, req
}
}
func withDecodedParams2[T any](f func(params T, w http.ResponseWriter, req *http.Request), params interface{}) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, req *http.Request) {
_ = HttpDecodeJSONBody(w, req, params)
f(params.(T), w, req)
}
}
func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/hook/on_play", filterRequestBodyParams(apiServer.OnPlay, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_play_done", filterRequestBodyParams(apiServer.OnPlayDone, &PlayDoneParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_publish", filterRequestBodyParams(apiServer.OnPublish, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_publish_done", filterRequestBodyParams(apiServer.OnPublishDone, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_idle_timeout", filterRequestBodyParams(apiServer.OnIdleTimeout, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_receive_timeout", filterRequestBodyParams(apiServer.OnReceiveTimeout, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_record", filterRequestBodyParams(apiServer.OnRecord, &RecordParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_play", withDecodedParams(apiServer.OnPlay, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_play_done", withDecodedParams(apiServer.OnPlayDone, &PlayDoneParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_publish", withDecodedParams(apiServer.OnPublish, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_publish_done", withDecodedParams(apiServer.OnPublishDone, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_idle_timeout", withDecodedParams(apiServer.OnIdleTimeout, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_receive_timeout", withDecodedParams(apiServer.OnReceiveTimeout, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_record", withDecodedParams(apiServer.OnRecord, &RecordParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_started", apiServer.OnStarted)
// 统一处理live/playback/download请求
apiServer.router.HandleFunc("/api/v1/{action}/start", filterRequestBodyParams(apiServer.OnInvite, &InviteParams{}))
apiServer.router.HandleFunc("/api/v1/{action}/start", withDecodedParams(apiServer.OnInvite, &InviteParams{}))
// 关闭国标流. 如果是实时流, 等收流或空闲超时自行删除. 回放或下载流立即删除.
apiServer.router.HandleFunc("/api/v1/stream/close", filterRequestBodyParams(apiServer.OnCloseStream, &StreamIDParams{}))
apiServer.router.HandleFunc("/api/v1/stream/close", withDecodedParams(apiServer.OnCloseStream, &StreamIDParams{}))
apiServer.router.HandleFunc("/api/v1/device/list", apiServer.OnDeviceList) // 查询在线设备
apiServer.router.HandleFunc("/api/v1/record/list", filterRequestBodyParams(apiServer.OnRecordList, &QueryRecordParams{})) // 查询录像列表
apiServer.router.HandleFunc("/api/v1/position/sub", filterRequestBodyParams(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置
apiServer.router.HandleFunc("/api/v1/playback/seek", filterRequestBodyParams(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek
apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) // 云台控制
apiServer.router.HandleFunc("/api/v1/device/list", withDecodedParams2(apiServer.OnDeviceList, &PageQuery{})) // 查询设备列表
apiServer.router.HandleFunc("/api/v1/channel/list", withDecodedParams(apiServer.OnChannelList, &PageQueryChannel{})) // 查询通道列表
apiServer.router.HandleFunc("/api/v1/record/list", withDecodedParams(apiServer.OnRecordList, &QueryRecordParams{})) // 查询录像列表
apiServer.router.HandleFunc("/api/v1/position/sub", withDecodedParams(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置
apiServer.router.HandleFunc("/api/v1/playback/seek", withDecodedParams(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek
apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) // 云台控制
apiServer.router.HandleFunc("/api/v1/platform/add", filterRequestBodyParams(apiServer.OnPlatformAdd, &GBPlatformRecord{})) // 添加上级平台
apiServer.router.HandleFunc("/api/v1/platform/remove", filterRequestBodyParams(apiServer.OnPlatformRemove, &GBPlatformRecord{})) // 删除上级平台
apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 上级平台列表
apiServer.router.HandleFunc("/api/v1/platform/channel/bind", filterRequestBodyParams(apiServer.OnPlatformChannelBind, &PlatformChannel{})) // 级联绑定通道
apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", filterRequestBodyParams(apiServer.OnPlatformChannelUnbind, &PlatformChannel{})) // 级联取消绑定通道
apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 级联设备列表
apiServer.router.HandleFunc("/api/v1/platform/add", withDecodedParams(apiServer.OnPlatformAdd, &GBPlatformRecord{})) // 添加级联设备
apiServer.router.HandleFunc("/api/v1/platform/remove", withDecodedParams(apiServer.OnPlatformRemove, &GBPlatformRecord{})) // 删除级联设备
apiServer.router.HandleFunc("/api/v1/platform/channel/bind", withDecodedParams(apiServer.OnPlatformChannelBind, &PlatformChannel{})) // 级联绑定通道
apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", withDecodedParams(apiServer.OnPlatformChannelUnbind, &PlatformChannel{})) // 级联解绑通道
apiServer.router.HandleFunc("/ws/v1/talk", apiServer.OnWSTalk) // 语音广播/对讲, 主讲音频传输链路
apiServer.router.HandleFunc("/api/v1/broadcast/invite", filterRequestBodyParams(apiServer.OnBroadcast, &BroadcastParams{Type: int(BroadcastTypeTCP)})) // 发起语音广播
apiServer.router.HandleFunc("/api/v1/broadcast/hangup", filterRequestBodyParams(apiServer.OnHangup, &HangupParams{})) // 挂断广播会话
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲
apiServer.router.HandleFunc("/ws/v1/talk", apiServer.OnWSTalk) // 语音广播/对讲, 主讲音频传输链路
apiServer.router.HandleFunc("/api/v1/broadcast/invite", withDecodedParams(apiServer.OnBroadcast, &BroadcastParams{Type: int(BroadcastTypeTCP)})) // 发起语音广播
apiServer.router.HandleFunc("/api/v1/broadcast/hangup", withDecodedParams(apiServer.OnHangup, &HangupParams{})) // 挂断广播会话
apiServer.router.HandleFunc("/api/v1/talk", apiServer.OnTalk) // 语音对讲
apiServer.router.HandleFunc("/broadcast.html", func(writer http.ResponseWriter, request *http.Request) {
http.ServeFile(writer, request, "./broadcast.html")
})
@@ -398,20 +419,73 @@ func CloseStream(streamId StreamID) {
}
}
func (api *ApiServer) OnDeviceList(w http.ResponseWriter, r *http.Request) {
devices := DeviceManager.AllDevices()
httpResponseOK(w, devices)
func (api *ApiServer) OnDeviceList(v *PageQuery, w http.ResponseWriter, r *http.Request) {
if v.PageNumber == nil {
var defaultPageNumber = 1
v.PageNumber = &defaultPageNumber
}
if v.PageSize == nil {
var defaultPageSize = 10
v.PageSize = &defaultPageSize
}
devices, total, err := DB.QueryDevices(*v.PageNumber, *v.PageSize)
if err != nil {
Sugar.Errorf("查询设备列表失败 page number: %d, size: %d err: %s", *v.PageNumber, *v.PageSize, err.Error())
httpResponseError(w, err.Error())
return
}
query := PageQuery{
PageNumber: v.PageNumber,
PageSize: v.PageSize,
TotalCount: total,
TotalPages: int(math.Ceil(float64(total) / float64(*v.PageSize))),
Data: devices,
}
httpResponseOK(w, query)
}
func (api *ApiServer) OnChannelList(v *PageQueryChannel, w http.ResponseWriter, r *http.Request) {
if v.PageNumber == nil {
var defaultPageNumber = 1
v.PageNumber = &defaultPageNumber
}
if v.PageSize == nil {
var defaultPageSize = 10
v.PageSize = &defaultPageSize
}
channels, total, err := DB.QueryChannels(v.DeviceID, *v.PageNumber, *v.PageSize)
if err != nil {
Sugar.Errorf("查询设备列表失败 device: %s page number: %d, size: %d err: %s", v.DeviceID, *v.PageNumber, *v.PageSize, err.Error())
httpResponseError(w, err.Error())
return
}
query := PageQuery{
PageNumber: v.PageNumber,
PageSize: v.PageSize,
TotalCount: total,
TotalPages: int(math.Ceil(float64(total) / float64(*v.PageSize))),
Data: channels,
}
httpResponseOK(w, query)
}
func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter, r *http.Request) {
device := DeviceManager.Find(v.DeviceId)
device := DeviceManager.Find(v.DeviceID)
if device == nil {
httpResponseError(w, "设备离线")
return
}
sn := GetSN()
err := device.QueryRecord(v.ChannelId, v.StartTime, v.EndTime, sn, v.Type_)
err := device.QueryRecord(v.ChannelID, v.StartTime, v.EndTime, sn, v.Type_)
if err != nil {
logger.Error("发送查询录像请求失败 err: %s", err.Error())
httpResponseError(w, err.Error())
@@ -646,39 +720,53 @@ func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) {
}
func (api *ApiServer) OnPlatformAdd(v *GBPlatformRecord, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("添加级联 %v", *v)
Sugar.Infof("添加级联设备 %v", *v)
var err error
// 响应错误消息
defer func() {
if err != nil {
Sugar.Errorf("添加级联失败 err: %s", err.Error())
Sugar.Errorf("添加级联设备失败 err: %s", err.Error())
httpResponseError(w, err.Error())
}
}()
if PlatformManager.ExistPlatform(v.SeverID) {
err = fmt.Errorf("id冲突")
if v.Username == "" {
v.Username = Config.SipId
Sugar.Infof("级联设备使用默认的ID")
}
if len(v.Username) != 20 {
err = fmt.Errorf("用户名长度必须20位")
return
} else if PlatformManager.ExistPlatformWithServerAddr(v.ServerAddr) {
err = fmt.Errorf("地址冲突")
} else if len(v.SeverID) != 20 {
err = fmt.Errorf("上级ID长度必须20位")
return
}
platform, err := NewGBPlatform(v, SipUA)
if err != nil {
return
} else if !PlatformManager.AddPlatform(platform) {
err = fmt.Errorf("已经存在")
return
}
var platform *GBPlatform
v.CreateTime = strconv.FormatInt(time.Now().UnixMilli(), 10)
v.Status = "OFF"
platform, err = NewGBPlatform(v, SipUA)
platform.Start()
httpResponseOK(w, nil)
if err == nil {
if err = DB.SavePlatform(v); err == nil {
utils.Assert(PlatformManager.AddPlatform(platform))
platform.Start()
httpResponseOK(w, nil)
}
}
}
func (api *ApiServer) OnPlatformRemove(v *GBPlatformRecord, w http.ResponseWriter, r *http.Request) {
Sugar.Infof("删除级联 %v", *v)
Sugar.Infof("删除级联设备 %v", *v)
if err := DB.DeletePlatform(v); err != nil {
Sugar.Errorf("删除级联设备失败 err: %s", err.Error())
httpResponseOK(w, err.Error())
return
}
platform := PlatformManager.RemovePlatform(v.SeverID)
if platform != nil {
@@ -689,48 +777,48 @@ func (api *ApiServer) OnPlatformRemove(v *GBPlatformRecord, w http.ResponseWrite
}
func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) {
platforms := PlatformManager.Platforms()
platforms, err := DB.LoadPlatforms()
if err != nil {
Sugar.Errorf("查询级联设备列表失败 err: %s", err.Error())
}
httpResponseOK(w, platforms)
}
func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) {
platform := PlatformManager.FindPlatform(v.ServerID)
if platform == nil {
Sugar.Errorf("绑定通道失败, id: %s", v.ServerID)
Sugar.Errorf("绑定通道失败, 级联设备不存在 device: %s", v.ServerID)
httpResponseError(w, "级联设备不存在")
return
}
var channels []*Channel
for _, pair := range v.Channels {
device := DeviceManager.Find(pair[0])
if device == nil {
continue
}
channel := device.FindChannel(pair[1])
if channel == nil {
continue
}
channels = append(channels, channel)
// 级联功能,通道号必须唯一
channels, err := DB.BindChannels(v.ServerID, v.Channels)
if err != nil {
Sugar.Errorf("绑定通道失败 err: %s", err.Error())
httpResponseError(w, err.Error())
return
}
platform.AddChannels(channels)
httpResponseOK(w, nil)
httpResponseOK(w, channels)
}
func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) {
platform := PlatformManager.FindPlatform(v.ServerID)
if platform == nil {
Sugar.Errorf("取消绑定通道失败, id: %s", v.ServerID)
Sugar.Errorf("解绑通道失败, 级联设备不存在 device: %s", v.ServerID)
httpResponseError(w, "级联设备不存在")
return
}
for _, pair := range v.Channels {
platform.RemoveChannel(pair[1])
channels, err := DB.UnbindChannels(v.ServerID, v.Channels)
if err != nil {
Sugar.Errorf("解绑通道失败 err: %s", err.Error())
httpResponseError(w, err.Error())
return
}
httpResponseOK(w, nil)
httpResponseOK(w, channels)
}

View File

@@ -16,15 +16,12 @@ type GBClient interface {
SetDeviceInfo(name, manufacturer, model, firmware string)
// OnQueryCatalog 被查询目录
OnQueryCatalog(sn int)
OnQueryCatalog(sn int, channels []*Channel)
// OnQueryDeviceInfo 被查询设备信息
OnQueryDeviceInfo(sn int)
OnSubscribeCatalog(sn int)
// AddChannels 重写添加通道函数, 增加SIP通知通道变化
AddChannels(channels []*Channel)
}
type Client struct {
@@ -33,18 +30,18 @@ type Client struct {
deviceInfo *DeviceInfoResponse
}
func (g *Client) OnQueryCatalog(sn int) {
channels := g.GetChannels()
if len(channels) == 0 {
return
}
func (g *Client) OnQueryCatalog(sn int, channels []*Channel) {
response := CatalogResponse{}
response.SN = sn
response.CmdType = CmdCatalog
response.DeviceID = g.sipClient.Username
response.SumNum = len(channels)
if response.SumNum < 1 {
g.SendMessage(&response)
return
}
for i, _ := range channels {
channel := *channels[i]
@@ -148,6 +145,6 @@ func NewGBClient(username, serverId, serverAddr, transport, password string, reg
ua: ua,
}
client := &Client{sip, Device{ID: username, Channels: map[string]*Channel{}}, &DeviceInfoResponse{BaseResponse: BaseResponse{BaseMessage: BaseMessage{DeviceID: username, CmdType: CmdDeviceInfo}, Result: "OK"}}}
client := &Client{sip, Device{ID: username}, &DeviceInfoResponse{BaseResponse: BaseResponse{BaseMessage: BaseMessage{DeviceID: username, CmdType: CmdDeviceInfo}, Result: "OK"}}}
return client
}

View File

@@ -110,7 +110,7 @@ func CreateTransport(ip string, port int, setup string, handler transport.Handle
tcpClient := &transport.TCPClient{}
tcpClient.SetHandler(handler)
err := tcpClient.Connect(nil, &net.TCPAddr{IP: net.ParseIP(ip), Port: port})
_, err := tcpClient.Connect(nil, &net.TCPAddr{IP: net.ParseIP(ip), Port: port})
return tcpClient, true, err
} else if "active" == setup {
tcpServer := &transport.TCPServer{}
@@ -323,6 +323,9 @@ func TestGBClient(t *testing.T) {
if err != nil {
panic(err)
}
DeviceChannelsManager = &DeviceChannels{
channels: make(map[string][]*Channel, clientConfig.Count),
}
for i := 0; i < clientConfig.Count; i++ {
deviceId := clientConfig.DeviceIDPrefix + fmt.Sprintf("%07d", i+1)
@@ -332,15 +335,15 @@ func TestGBClient(t *testing.T) {
device := VirtualDevice{client.(*Client), map[string]*MediaStream{}, &sync.Mutex{}}
device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1")
var channels []*Channel
channels = append(channels, &Channel{
channel := &Channel{
DeviceID: channelId,
Name: "1",
ParentID: deviceId,
})
}
DeviceManager.Add(device)
device.AddChannels(channels)
DeviceChannelsManager.AddChannel(deviceId, channel)
device.Start()
device.SetOnRegisterHandler(func() {

View File

@@ -6,7 +6,7 @@
"domain": "192.168.2.148:5060",
"password": "12345678",
"listenAddr": "192.168.2.148:15062",
"count": 1,
"count": 100,
"?rtp_over_tcp_raw_file_path": "rtp over tcp的推流源文件",
"rtp_over_tcp_raw_file_path": "./rtp.raw"
}

33
client_channels.go Normal file
View File

@@ -0,0 +1,33 @@
package main
import "sync"
var (
// DeviceChannelsManager 目前只用作模拟多个国标客户端. 设备接入和级联都会走数据库
DeviceChannelsManager *DeviceChannels
)
type DeviceChannels struct {
channels map[string][]*Channel
lock sync.RWMutex
}
func (d *DeviceChannels) AddChannel(deviceId string, channel *Channel) {
d.lock.Lock()
defer d.lock.Unlock()
if d.channels == nil {
d.channels = make(map[string][]*Channel, 5)
}
channels := d.channels[deviceId]
channels = append(channels, channel)
d.channels[deviceId] = channels
}
func (d *DeviceChannels) FindChannels(deviceId string) []*Channel {
d.lock.RLock()
defer d.lock.RUnlock()
return d.channels[deviceId]
}

View File

@@ -20,6 +20,11 @@ type Config_ struct {
MobilePositionExpires int `json:"mobile_position_expires"`
MediaServer string `json:"media_server"`
Port []int `json:"port"` //语音广播/对讲需要的端口
Redis struct {
Addr string `json:"addr"`
Password string `json:"password"`
}
}
type LogConfig struct {

View File

@@ -10,5 +10,21 @@
"mobile_position_interval": 10,
"media_server": "0.0.0.0:8080",
"port": [20030,20050]
"port": [20030,20050],
"redis": {
"addr": "0.0.0.0:6379",
"password": ""
},
"hooks": {
"?online": "设备上线通知",
"online": "",
"?offline": "设备下线通知",
"offline": "",
"?position" : "设备位置通知",
"position": ""
}
}

44
db.go Normal file
View File

@@ -0,0 +1,44 @@
package main
type GB28181DB interface {
LoadOnlineDevices() (map[string]*Device, error)
LoadDevices() (map[string]*Device, error)
SaveDevice(device *Device) error
SaveChannel(deviceId string, channel *Channel) error
UpdateDeviceStatus(deviceId string, status OnlineStatus) error
UpdateChannelStatus(channelId, status string) error
RefreshHeartbeat(deviceId string) error
QueryDevice(id string) (*Device, error)
QueryDevices(page int, size int) ([]*Device, int, error)
QueryChannel(deviceId string, channelId string) (*Channel, error)
QueryChannels(deviceId string, page, size int) ([]*Channel, int, error)
LoadPlatforms() ([]*GBPlatformRecord, error)
QueryPlatform(id string) (*GBPlatformRecord, error)
SavePlatform(platform *GBPlatformRecord) error
DeletePlatform(platform *GBPlatformRecord) error
UpdatePlatform(platform *GBPlatformRecord) error
UpdatePlatformStatus(serverId string, status OnlineStatus) error
BindChannels(id string, channels [][2]string) ([][2]string, error)
UnbindChannels(id string, channels [][2]string) ([][2]string, error)
// QueryPlatformChannel 查询级联设备的某个通道, 返回通道所属设备ID、通道.
QueryPlatformChannel(platformId string, channelId string) (string, *Channel, error)
}

View File

@@ -1,56 +0,0 @@
package main
// LocalDB 使用json文件保存设备和通道信息
type LocalDB struct {
}
func (m LocalDB) LoadDevices() []*Device {
return nil
}
func (m LocalDB) RegisterDevice(device *Device) (error, bool) {
//持久化...
device.Status = "ON"
oldDevice := DeviceManager.Find(device.ID)
if oldDevice != nil {
oldDevice.(*Device).Status = "ON"
oldDevice.(*Device).RemoteAddr = device.RemoteAddr
oldDevice.(*Device).Name = device.Name
oldDevice.(*Device).Transport = device.Transport
device = oldDevice.(*Device)
} else if err := DeviceManager.Add(device); err != nil {
return err, false
}
return nil, oldDevice == nil || len(device.Channels) == 0
}
func (m LocalDB) UnRegisterDevice(id string) {
device := DeviceManager.Find(id)
if device == nil {
return
}
device.(*Device).Status = "OFF"
}
func (m LocalDB) KeepAliveDevice(device *Device) {
}
func (m LocalDB) AddPlatform(record GBPlatformRecord) error {
//if ExistPlatform(record.SeverID) {
// return
//}
return nil
}
func (m LocalDB) LoadPlatforms() []GBPlatformRecord {
//if ExistPlatform(record.SeverID) {
// return
//}
return nil
}

View File

@@ -1 +1,549 @@
package main
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"time"
)
const (
RedisKeyDevices = "devices" // 使用map保存所有设备信息(不包含通道信息)
RedisKeyDevicesSort = "devices_sort" // 使用zset有序保存所有设备ID(按照入库时间)
RedisKeyChannels = "channels" // 使用map保存所有通道信息
RedisKeyDeviceChannels = "%s_channels" // 使用zset保存设备下的所有通道ID
RedisKeyPlatforms = "platforms" // 使用zset有序保存所有级联设备
RedisKeyStreams = "streams" // 保存所有推流信息, 以便在崩溃后恢复
RedisUniqueChannelID = "%s_%s" // 通道号的唯一ID, 设备_通道号
)
type RedisDB struct {
utils *RedisUtils
platformsLock sync.Mutex
}
type ChannelKey string
func (c ChannelKey) Device() string {
return strings.Split(string(c), "_")[0]
}
func (c ChannelKey) Channel() string {
return strings.Split(string(c), "_")[1]
}
func (c ChannelKey) String() string {
return string(c)
}
// DeviceChannelsKey 返回设备通道列表的主键
func DeviceChannelsKey(id string) string {
return fmt.Sprintf(RedisKeyDeviceChannels, id)
}
// GenerateChannelKey 使用设备号+通道号作为通道的主键,兼容通道号可能重复的情况
func GenerateChannelKey(device, channel string) ChannelKey {
return ChannelKey(fmt.Sprintf(RedisUniqueChannelID, device, channel))
}
func (r *RedisDB) LoadOnlineDevices() (map[string]*Device, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
keys, err := executor.Keys()
if err != nil {
return nil, err
}
devices := make(map[string]*Device, len(keys))
for _, key := range keys {
device, err := r.findDevice(key, executor)
if err != nil || device == nil {
continue
}
devices[key] = device
}
return devices, nil
}
func (r *RedisDB) findDevice(id string, executor Executor) (*Device, error) {
value, err := executor.Key(RedisKeyDevices).HGet(id)
if err != nil {
return nil, err
} else if value == nil {
return nil, nil
}
device := &Device{}
err = json.Unmarshal(value, device)
if err != nil {
return nil, err
}
return device, nil
}
func (r *RedisDB) findChannel(id ChannelKey, executor Executor) (*Channel, error) {
value, err := executor.HGet(id.String())
if err != nil {
return nil, err
} else if value == nil {
return nil, nil
}
channel := &Channel{}
err = json.Unmarshal(value, channel)
if err != nil {
return nil, err
}
return channel, nil
}
func (r *RedisDB) LoadDevices() (map[string]*Device, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
entries, err := executor.Key(RedisKeyDevices).HGetAll()
devices := make(map[string]*Device, len(entries))
for k, v := range entries {
device := &Device{}
if err = json.Unmarshal(v, device); err != nil {
continue
}
devices[k] = device
}
return devices, err
}
func (r *RedisDB) SaveDevice(device *Device) error {
data, err := json.Marshal(device)
if err != nil {
return err
}
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
// 保存设备信息
} else if err = executor.Key(RedisKeyDevices).HSet(device.ID, string(data)); err != nil {
return err
}
return r.UpdateDeviceStatus(device.ID, device.Status)
}
func (r *RedisDB) SaveChannel(deviceId string, channel *Channel) error {
data, err := json.Marshal(channel)
if err != nil {
return err
}
channelKey := GenerateChannelKey(deviceId, channel.DeviceID)
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
// 保存通道信息
} else if err = executor.Key(RedisKeyChannels).HSet(channelKey.String(), string(data)); err != nil {
return err
// 通道关联到Device
} else if err = executor.Key(fmt.Sprintf(RedisKeyDeviceChannels, deviceId)).ZAddWithNotExists(float64(time.Now().UnixMilli()), channelKey); err != nil {
return err
}
return nil
}
func (r *RedisDB) UpdateDeviceStatus(deviceId string, status OnlineStatus) error {
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
}
// 如果在线, 设置有效期key, 添加到设备排序表
if ON == status {
// 设置有效期key
if err = executor.Key(deviceId).Set(nil); err != nil {
return err
} else if err = executor.SetExpires(Config.AliveExpires); err != nil {
return err
// 排序Device根据入库时间
} else if err = executor.Key(RedisKeyDevicesSort).ZAddWithNotExists(float64(time.Now().UnixMilli()), deviceId); err != nil {
return err
}
} else {
// 删除有效key
return executor.Key(deviceId).Del()
}
return nil
}
func (r *RedisDB) UpdateChannelStatus(channelId, status string) error {
//TODO implement me
panic("implement me")
}
func (r *RedisDB) RefreshHeartbeat(deviceId string) error {
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
} else if err = executor.Key(deviceId).Set(strconv.FormatInt(time.Now().UnixMilli(), 10)); err != nil {
return err
}
return executor.SetExpires(Config.AliveExpires)
}
func (r *RedisDB) QueryDevice(id string) (*Device, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
return r.findDevice(id, executor)
}
func (r *RedisDB) QueryDevices(page int, size int) ([]*Device, int, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, 0, err
}
keys, err := executor.Key(RedisKeyDevicesSort).ZRangeWithAsc(page, size)
if err != nil {
return nil, 0, err
}
var devices []*Device
for _, key := range keys {
device, err := r.findDevice(key, executor)
if err != nil {
continue
}
devices = append(devices, device)
}
// 查询总记录数
total, err := executor.Key(RedisKeyDevicesSort).CountZSet()
if err != nil {
return nil, 0, err
}
return devices, total, nil
}
func (r *RedisDB) QueryChannel(deviceId string, channelId string) (*Channel, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
executor.Key(RedisKeyChannels)
return r.findChannel(GenerateChannelKey(deviceId, channelId), executor)
}
func (r *RedisDB) QueryChannels(deviceId string, page, size int) ([]*Channel, int, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, 0, err
}
id := fmt.Sprintf(RedisKeyDeviceChannels, deviceId)
keys, err := executor.Key(id).ZRangeWithAsc(page, size)
if err != nil {
return nil, 0, err
}
executor.Key(RedisKeyChannels)
var channels []*Channel
for _, key := range keys {
channel, err := r.findChannel(ChannelKey(key), executor)
if err != nil {
continue
} else if channel == nil {
continue
}
channels = append(channels, channel)
}
// 查询总记录数
total, err := executor.Key(id).CountZSet()
if err != nil {
return nil, 0, err
}
return channels, total, nil
}
func (r *RedisDB) LoadPlatforms() ([]*GBPlatformRecord, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
var platforms []*GBPlatformRecord
pairs, err := executor.Key(RedisKeyPlatforms).ZRange()
if err == nil {
for _, pair := range pairs {
platform := &GBPlatformRecord{}
if err := json.Unmarshal([]byte(pair[0]), platform); err != nil {
continue
}
platform.CreateTime = pair[1]
platforms = append(platforms, platform)
}
}
return platforms, err
}
func (r *RedisDB) findPlatformWithServerID(id string) (*GBPlatformRecord, error) {
platforms, err := r.LoadPlatforms()
if err != nil {
return nil, err
}
for _, platform := range platforms {
if platform.SeverID == id {
return platform, nil
}
}
return nil, err
}
func (r *RedisDB) QueryPlatform(id string) (*GBPlatformRecord, error) {
return r.findPlatformWithServerID(id)
}
func (r *RedisDB) SavePlatform(platform *GBPlatformRecord) error {
r.platformsLock.Lock()
defer r.platformsLock.Unlock()
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
}
platforms, _ := r.LoadPlatforms()
for _, old := range platforms {
if old.SeverID == platform.SeverID {
return fmt.Errorf("id冲突")
} else if old.ServerAddr == platform.ServerAddr {
return fmt.Errorf("地址冲突")
}
}
data, err := json.Marshal(platform)
if err != nil {
return err
}
return executor.Key(RedisKeyPlatforms).ZAddWithNotExists(platform.CreateTime, data)
}
func (r *RedisDB) DeletePlatform(v *GBPlatformRecord) error {
r.platformsLock.Lock()
defer r.platformsLock.Unlock()
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
}
platform, _ := r.findPlatformWithServerID(v.SeverID)
if platform == nil {
return fmt.Errorf("platform with ID %s not find", v.SeverID)
}
// 删除所有通道, 没有事务
if err = executor.Key(fmt.Sprintf(RedisKeyDeviceChannels, platform.SeverID)).Del(); err != nil {
return err
}
return executor.Key(RedisKeyPlatforms).ZDelWithScore(platform.CreateTime)
}
func (r *RedisDB) UpdatePlatform(platform *GBPlatformRecord) error {
r.platformsLock.Lock()
defer r.platformsLock.Unlock()
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
}
oldPlatform, _ := r.findPlatformWithServerID(platform.SeverID)
if oldPlatform == nil {
return fmt.Errorf("platform with ID %s not find", platform.SeverID)
}
data, err := json.Marshal(platform)
if err != nil {
return err
}
return executor.ZAdd(oldPlatform.CreateTime, data)
}
func (r *RedisDB) UpdatePlatformStatus(serverId string, status OnlineStatus) error {
r.platformsLock.Lock()
defer r.platformsLock.Unlock()
executor, err := r.utils.CreateExecutor()
if err != nil {
return err
}
oldPlatform, _ := r.findPlatformWithServerID(serverId)
if oldPlatform == nil {
return fmt.Errorf("platform with ID %s not find", serverId)
}
oldPlatform.Status = status
data, err := json.Marshal(oldPlatform)
if err != nil {
return err
}
return executor.ZAdd(oldPlatform.CreateTime, data)
}
func (r *RedisDB) BindChannels(id string, channels [][2]string) ([][2]string, error) {
r.platformsLock.Lock()
defer r.platformsLock.Unlock()
platform, err := r.QueryPlatform(id)
if err != nil {
return nil, err
} else if platform == nil {
return nil, fmt.Errorf("platform with ID %s not find", platform.SeverID)
}
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
// 返回成功的设备通道号
var result [][2]string
for _, v := range channels {
deviceId := v[0]
channelId := v[1]
channelKey := GenerateChannelKey(deviceId, channelId)
// 检查通道是否存在, 以及通道是否冲突
channel, err := r.findChannel(channelKey, executor.Key(RedisKeyChannels))
if err != nil {
Sugar.Errorf("添加通道失败, err: %s device: %s channel: %s", err.Error(), deviceId, channelId)
} else if channel == nil {
Sugar.Errorf("添加通道失败, 通道不存在. device: %s channel: %s", deviceId, channelId)
} else if score, _ := executor.Key(DeviceChannelsKey(id)).ZGetScore(channelKey); score != nil {
Sugar.Errorf("添加通道失败, 通道冲突. device: %s channel: %s", deviceId, channelId)
} else if err = executor.Key(DeviceChannelsKey(id)).ZAddWithNotExists(time.Now().UnixMilli(), channelKey); err != nil {
Sugar.Errorf("添加通道失败, err: %s device: %s channel: %s", err.Error(), deviceId, channelId)
} else {
result = append(result, v)
}
}
return result, nil
}
func (r *RedisDB) UnbindChannels(id string, channels [][2]string) ([][2]string, error) {
r.platformsLock.Lock()
defer r.platformsLock.Unlock()
platform, err := r.QueryPlatform(id)
if err != nil {
return nil, err
} else if platform == nil {
return nil, fmt.Errorf("platform with ID %s not find", platform.SeverID)
}
executor, err := r.utils.CreateExecutor()
if err != nil {
return nil, err
}
// 返回成功的设备通道号
var result [][2]string
for _, v := range channels {
if err := executor.Key(DeviceChannelsKey(id)).ZDel(GenerateChannelKey(v[0], v[1])); err != nil {
continue
}
result = append(result, v)
}
return result, nil
}
func (r *RedisDB) QueryPlatformChannel(platformId string, channelId string) (string, *Channel, error) {
executor, err := r.utils.CreateExecutor()
if err != nil {
return "", nil, err
}
score, err := executor.Key(DeviceChannelsKey(platformId)).ZGetScore(channelId)
if err != nil {
return "", nil, err
}
deviceId := score.(string)
channel, err := r.findChannel(GenerateChannelKey(deviceId, channelId), executor.Key(RedisKeyChannels))
if err != nil {
return "", nil, err
}
return deviceId, channel, nil
}
// OnExpires Redis设备ID到期回调
func (r *RedisDB) OnExpires(db int, id string) {
Sugar.Infof("设备心跳过期 device: %s", id)
device := DeviceManager.Find(id)
if device == nil {
Sugar.Errorf("设备不存在 device: %s", id)
return
}
device.(*Device).Status = OFF
if err := DB.SaveDevice(device.(*Device)); err != nil {
Sugar.Errorf("更新设备在线状态失败 err: %s device: %s ", err.Error(), id)
}
}
func NewRedisDB(addr, password string) *RedisDB {
db := &RedisDB{
utils: NewRedisUtils(addr, password),
}
for {
err := StartExpiredKeysSubscription(db.utils, 0, db.OnExpires)
if err == nil {
break
}
Sugar.Errorf("监听redis过期key失败, err: %s", err.Error())
time.Sleep(3 * time.Second)
}
return db
}

160
device.go
View File

@@ -5,7 +5,6 @@ import (
"github.com/ghettovoice/gosip/sip"
"net"
"strconv"
"sync"
)
const (
@@ -19,6 +18,17 @@ const (
"%s" +
"</DeviceID>\r\n" +
"</Query>\r\n"
DeviceInfoFormat = "<?xml version=\"1.0\"?>\r\n" +
"<Query>\r\n" +
"<CmdType>DeviceInfo</CmdType>\r\n" +
"<SN>" +
"%s" +
"</SN>\r\n" +
"<DeviceID>" +
"%s" +
"</DeviceID>\r\n" +
"</Query>\r\n"
)
var (
@@ -27,29 +37,37 @@ var (
SDPMessageType sip.ContentType = "application/sdp"
)
type OnlineStatus string
const (
ON = OnlineStatus("ON")
OFF = OnlineStatus("OFF")
)
func (s OnlineStatus) String() string {
return string(s)
}
type GBDevice interface {
GetID() string
// QueryDeviceInfo 发送查询设备信息命令
QueryDeviceInfo()
// QueryCatalog 发送查询目录命令
QueryCatalog()
// QueryRecord 发送查询录像命令
QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error
//Invite(channel string, setup string)
OnCatalog(response *CatalogResponse)
OnRecord(response *QueryRecordInfoResponse)
OnDeviceInfo(response *DeviceInfoResponse)
// OnInvite 语音广播
OnInvite(request sip.Request, user string) sip.Response
// OnBye 设备侧主动挂断
OnBye(request sip.Request)
OnNotifyPosition(notify *MobilePositionNotify)
//
//OnNotifyCatalog()
//
@@ -63,20 +81,6 @@ type GBDevice interface {
Broadcast(sourceId, channelId string) sip.ClientTransaction
OnKeepalive()
// AddChannels 批量添加通道
AddChannels(channels []*Channel)
// GetChannels 获取所有通道
GetChannels() []*Channel
// FindChannel 根据通道ID查找通道
FindChannel(id string) *Channel
// RemoveChannel 根据通道ID删除通道
RemoveChannel(id string) *Channel
// UpdateChannel 订阅目录,通道发生改变
// 附录P.4.2.2
// @Params event ON-上线/OFF-离线/VLOST-视频丢失/DEFECT-故障/ADD-增加/DEL-删除/UPDATE-更新
@@ -84,27 +88,42 @@ type GBDevice interface {
}
type Device struct {
ID string `json:"id"`
Name string `json:"name"`
RemoteAddr string `json:"remote_addr"`
Transport string `json:"transport"`
Status string `json:"Status,omitempty"` //在线状态 ON-在线/OFF-离线
Channels map[string]*Channel `json:"channels"`
lock sync.RWMutex
ID string `json:"id"`
Name string `json:"name"`
RemoteAddr string `json:"remote_addr"`
Transport string `json:"transport"`
Status OnlineStatus `json:"status"` //在线状态 ON-在线/OFF-离线
Manufacturer string `json:"manufacturer"`
Model string `json:"model"`
Firmware string `json:"firmware"`
ChannelsTotal int `json:"total_channels"` // 通道总数
ChannelsOnline int `json:"online_channels"` // 通道在线数量
}
func (d *Device) GetID() string {
return d.ID
}
func (d *Device) Online() bool {
return d.Status == ON
}
func (d *Device) BuildMessageRequest(to, body string) sip.Request {
request, err := BuildMessageRequest(Config.SipId, net.JoinHostPort(GlobalContactAddress.Uri.Host(), GlobalContactAddress.Uri.Port().String()), to, d.RemoteAddr, d.Transport, body)
if err != nil {
panic(err)
}
return request
}
func (d *Device) QueryDeviceInfo() {
body := fmt.Sprintf(DeviceInfoFormat, "1", d.ID)
request := d.BuildMessageRequest(d.ID, body)
SipUA.SendRequest(request)
}
func (d *Device) QueryCatalog() {
body := fmt.Sprintf(CatalogFormat, "1", d.ID)
request := d.BuildMessageRequest(d.ID, body)
@@ -122,32 +141,6 @@ func (d *Device) OnBye(request sip.Request) {
}
func (d *Device) OnCatalog(response *CatalogResponse) {
for _, device := range response.DeviceList.Devices {
device.ParentID = d.ID
}
d.AddChannels(response.DeviceList.Devices)
}
func (d *Device) OnRecord(response *QueryRecordInfoResponse) {
event := SNManager.FindEvent(response.SN)
if event == nil {
Sugar.Errorf("处理录像查询响应失败 SN:%d", response.SN)
return
}
event(response)
}
func (d *Device) OnDeviceInfo(response *DeviceInfoResponse) {
}
func (d *Device) OnNotifyPosition(notify *MobilePositionNotify) {
}
func (d *Device) SubscribePosition(channelId string) error {
if channelId == "" {
channelId = d.ID
@@ -188,60 +181,8 @@ func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction {
return SipUA.SendRequest(request)
}
func (d *Device) OnKeepalive() {
}
func (d *Device) AddChannels(channels []*Channel) {
d.lock.Lock()
defer d.lock.Unlock()
if d.Channels == nil {
d.Channels = make(map[string]*Channel, 5)
}
for i, _ := range channels {
d.Channels[channels[i].DeviceID] = channels[i]
}
}
func (d *Device) GetChannels() []*Channel {
d.lock.RLock()
defer d.lock.RUnlock()
var channels []*Channel
for _, channel := range d.Channels {
channels = append(channels, channel)
}
return channels
}
func (d *Device) RemoveChannel(id string) *Channel {
d.lock.Lock()
defer d.lock.Unlock()
if channel, ok := d.Channels[id]; ok {
delete(d.Channels, id)
return channel
}
return nil
}
func (d *Device) FindChannel(id string) *Channel {
d.lock.RLock()
defer d.lock.RUnlock()
if channel, ok := d.Channels[id]; ok {
return channel
}
return nil
}
func (d *Device) UpdateChannel(id string, event string) {
d.lock.RLock()
defer d.lock.RUnlock()
}
func (d *Device) BuildCatalogRequest() (sip.Request, error) {
@@ -323,7 +264,6 @@ func (d *Device) BuildDownloadRequest(channelId, ip string, port uint16, startTi
// CreateDialogRequestFromAnswer 根据invite的应答创建Dialog请求
// 应答的to头域需携带tag
func CreateDialogRequestFromAnswer(message sip.Response, uas bool, remoteAddr string) sip.Request {
from, _ := message.From()
to, _ := message.To()

View File

@@ -1,23 +0,0 @@
package main
type DeviceDB interface {
LoadDevices() []*Device
RegisterDevice(device *Device) (error, bool)
UnRegisterDevice(id string)
KeepAliveDevice(device *Device)
LoadPlatforms() []GBPlatformRecord
AddPlatform(record GBPlatformRecord) error
//RemovePlatform(record GBPlatformRecord) (GBPlatformRecord, bool)
//
//PlatformList() []GBPlatformRecord
//
//BindPlatformChannel() bool
//
//UnbindPlatformChannel() bool
}

View File

@@ -5,6 +5,7 @@ import (
"sync"
)
// DeviceManager 位于内存中的所有设备和通道
var DeviceManager *deviceManager
func init() {
@@ -33,16 +34,16 @@ func (s *deviceManager) Find(id string) GBDevice {
return nil
}
func (s *deviceManager) Remove(id string) (GBDevice, error) {
func (s *deviceManager) Remove(id string) GBDevice {
value, loaded := s.m.LoadAndDelete(id)
if loaded {
return value.(GBDevice), nil
return value.(GBDevice)
}
return nil, fmt.Errorf("device with id %s was not find", id)
return nil
}
func (s *deviceManager) AllDevices() []GBDevice {
func (s *deviceManager) All() []GBDevice {
var devices []GBDevice
s.m.Range(func(key, value any) bool {
devices = append(devices, value.(GBDevice))

9
go.mod
View File

@@ -7,8 +7,8 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.1
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/sirupsen/logrus v1.9.3
github.com/x-cray/logrus-prefixed-formatter v0.5.2
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
go.uber.org/zap v1.27.0
golang.org/x/net v0.21.0
golang.org/x/text v0.16.0
@@ -32,6 +32,9 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
)
require github.com/lkmio/avformat v0.0.0
require (
github.com/gomodule/redigo v1.9.2
github.com/lkmio/avformat v0.0.0
)
replace github.com/lkmio/avformat => ../avformat

2
go.sum
View File

@@ -24,6 +24,8 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s=
github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=

View File

@@ -86,21 +86,21 @@ func startLive(deviceId, channelId, setup string) (bool, string) {
}
func startLiveAll(setup string) {
devices := queryAllDevices()
if len(devices) == 0 {
return
}
max := 50
for _, device := range devices {
for _, channel := range device.Channels {
go startLive(device.ID, channel.DeviceID, setup)
max--
if max < 1 {
return
}
}
}
//devices := queryAllDevices()
//if len(devices) == 0 {
// return
//}
//
//max := 50
//for _, device := range devices {
// for _, channel := range device.Channels {
// go startLive(device.ID, channel.DeviceID, setup)
// max--
// if max < 1 {
// return
// }
// }
//}
}
func TestLiveAll(t *testing.T) {

70
main.go
View File

@@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"github.com/lkmio/avformat/transport"
"github.com/lkmio/avformat/utils"
"go.uber.org/zap/zapcore"
"net"
"strconv"
@@ -12,7 +13,7 @@ var (
Config *Config_
SipUA SipServer
TransportManager transport.Manager
DB DeviceDB
DB GB28181DB
)
func init() {
@@ -38,14 +39,56 @@ func main() {
indent, _ := json.MarshalIndent(Config, "", "\t")
Sugar.Infof("server config:\r\n%s", indent)
TransportManager = transport.NewTransportManager(uint16(Config.Port[0]), uint16(Config.Port[1]))
DB = NewRedisDB(Config.Redis.Addr, Config.Redis.Password)
DB = &LocalDB{}
devices := DB.LoadDevices()
for _, device := range devices {
DeviceManager.Add(device)
// 查询在线设备, 更新设备在线状态
onlineDevices, err := DB.LoadOnlineDevices()
if err != nil {
panic(err)
}
devices, err := DB.LoadDevices()
if err != nil {
panic(err)
} else if len(devices) > 0 {
for key, device := range devices {
status := OFF
if _, ok := onlineDevices[key]; ok {
status = ON
}
// 根据通道在线状态,统计通道总数和离线数量
var total int
var online int
channels, _, err := DB.QueryChannels(key, 1, 0xFFFFFFFF)
if err != nil {
Sugar.Errorf("查询通道列表失败 err: %s device: %s", err.Error(), key)
} else {
total = len(channels)
for _, channel := range channels {
if channel.Online() {
online++
}
}
}
device.ChannelsTotal = total
device.ChannelsOnline = online
device.Status = status
if err = DB.SaveDevice(device); err != nil {
Sugar.Errorf("更新设备状态失败 device: %s status: %s", key, status)
continue
}
DeviceManager.Add(device)
}
}
// 设置语音广播端口
TransportManager = transport.NewTransportManager(uint16(Config.Port[0]), uint16(Config.Port[1]))
// 启动sip server
server, err := StartSipServer(config.SipId, config.ListenIP, config.PublicIP, config.SipPort)
if err != nil {
panic(err)
@@ -55,6 +98,21 @@ func main() {
Config.SipContactAddr = net.JoinHostPort(config.PublicIP, strconv.Itoa(config.SipPort))
SipUA = server
// 启动级联设备
platforms, err := DB.LoadPlatforms()
for _, record := range platforms {
platform, err := NewGBPlatform(record, SipUA)
// 都入库了不允许失败, 程序有BUG, 及时修复
utils.Assert(err == nil)
utils.Assert(PlatformManager.AddPlatform(platform))
if err := DB.UpdatePlatformStatus(record.SeverID, OFF); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.SeverID)
}
platform.Start()
}
httpAddr := net.JoinHostPort(config.ListenIP, strconv.Itoa(config.HttpPort))
Sugar.Infof("启动http server. addr: %s", httpAddr)
startApiServer(httpAddr)

View File

@@ -10,15 +10,17 @@ import (
"strings"
)
// GBPlatformRecord 国标级信息持久化结构体
// GBPlatformRecord 国标级联设备信息持久化结构体
type GBPlatformRecord struct {
Username string `json:"username"` //用户名
SeverID string `json:"server_id"` //上级ID, 必选
ServerAddr string `json:"server_addr"` //上级地址, 必选
Transport string `json:"transport"` //上级通信方式, UDP/TCP
Password string `json:"password"` //密码
RegisterExpires int `json:"register_expires"` //注册有效期
KeepAliveInterval int `json:"keep_alive_interval"` //心跳间隔
Username string `json:"username"` // 用户名
SeverID string `json:"server_id"` // 上级ID, 必选. 作为主键, 不能重复.
ServerAddr string `json:"server_addr"` // 上级地址, 必选
Transport string `json:"transport"` // 上级通信方式, UDP/TCP
Password string `json:"password"` // 密码
RegisterExpires int `json:"register_expires"` // 注册有效期
KeepAliveInterval int `json:"keep_alive_interval"` // 心跳间隔
CreateTime string `json:"create_time"` // 入库时间
Status OnlineStatus `json:"status"` // 在线状态
}
type GBPlatform struct {
@@ -57,20 +59,28 @@ func (g *GBPlatform) CloseStream(id string, bye, ms bool) {
// OnInvite 被上级呼叫
func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
Sugar.Infof("收到上级预览 上级id: %s 请求通道id: %s sdp: %s", g.SeverId, user, request.Body())
Sugar.Infof("收到级联Invite请求 platform: %s channel: %s sdp: %s", g.SeverId, user, request.Body())
channel := g.FindChannel(user)
source := request.Source()
platform := PlatformManager.FindPlatformWithServerAddr(source)
utils.Assert(platform != nil)
deviceId, channel, err := DB.QueryPlatformChannel(g.SeverId, user)
if err != nil {
Sugar.Errorf("级联转发失败, 查询数据库失败 err: %s platform: %s channel: %s", err.Error(), g.SeverId, user)
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
}
// 查找通道对应的设备
device := DeviceManager.Find(channel.ParentID)
device := DeviceManager.Find(deviceId)
if device == nil {
Sugar.Errorf("级联转发失败 设备不存在 DeviceID: %s ChannelID: %s", channel.DeviceID, user)
Sugar.Errorf("级联转发失败, 设备不存在 device: %s channel: %s", device, user)
return CreateResponseWithStatusCode(request, http.StatusNotFound)
}
parse, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body())
if err != nil {
Sugar.Errorf("级联转发失败 解析上级SDP发生错误 err: %s sdp: %s", err.Error(), request.Body())
Sugar.Errorf("级联转发失败, 解析上级SDP发生错误 err: %s sdp: %s", err.Error(), request.Body())
return CreateResponseWithStatusCode(request, http.StatusBadRequest)
}
@@ -110,7 +120,7 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
ssrcInt, _ := strconv.Atoi(ssrc)
ip, port, sinkID, err := AddForwardStreamSink(string(streamId), addr, offerSetup, uint32(ssrcInt))
if err != nil {
Sugar.Errorf("级联转发失败 向流媒体服务添加转发Sink失败 err: %s", err.Error())
Sugar.Errorf("级联转发失败,向流媒体服务添加转发Sink失败 err: %s", err.Error())
if "play" != parse.Session {
CloseStream(streamId)
@@ -138,6 +148,33 @@ func (g *GBPlatform) OnInvite(request sip.Request, user string) sip.Response {
return response
}
func (g *GBPlatform) Start() {
Sugar.Infof("启动级联设备, deivce: %s transport: %s addr: %s", g.SeverId, g.sipClient.Transport, g.sipClient.Domain)
g.sipClient.Start()
g.sipClient.SetOnRegisterHandler(g.onlineCB, g.offlineCB)
}
func (g *GBPlatform) Stop() {
g.sipClient.Stop()
g.sipClient.SetOnRegisterHandler(nil, nil)
}
func (g *GBPlatform) Online() {
Sugar.Infof("级联设备上线 device: %s", g.SeverId)
if err := DB.UpdatePlatformStatus(g.SeverId, ON); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), g.SeverId)
}
}
func (g *GBPlatform) Offline() {
Sugar.Infof("级联设备离线 device: %s", g.SeverId)
if err := DB.UpdatePlatformStatus(g.SeverId, OFF); err != nil {
Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), g.SeverId)
}
}
func NewGBPlatform(record *GBPlatformRecord, ua SipServer) (*GBPlatform, error) {
if len(record.SeverID) != 20 {
return nil, fmt.Errorf("SeverID must be exactly 20 characters long")

345
redis.go Normal file
View File

@@ -0,0 +1,345 @@
package main
import (
"fmt"
"github.com/gomodule/redigo/redis"
)
type RedisUtils struct {
pool *redis.Pool
password string
}
func (utils *RedisUtils) CreateExecutor() (Executor, error) {
conn := utils.pool.Get()
if utils.password != "" {
if _, err := conn.Do("auth", utils.password); err != nil {
return nil, err
}
}
//if _, err := conn.Do("select", index); err != nil {
// return nil, err
//}
return &RedisExecutor{
conn: conn,
}, nil
}
type Executor interface {
DB(index int) Executor
Key(key string) Executor
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Keys 返回所有主键
Keys() ([]string, error)
Set(value interface{}) error
Get() (interface{}, error)
Exist() (bool, error)
Del() error
HSet(k string, v interface{}) error
HGet(k string) ([]byte, error)
HGetAll() (map[string][]byte, error)
HExist(k string) (bool, error)
HDel(k string) error
HSan(page, size int) ([][]string, error)
ZAdd(score interface{}, v interface{}) error
ZAddWithNotExists(score interface{}, v interface{}) error
// ZRangeWithDesc 降序查询
ZRangeWithDesc(page, size int) ([]string, error)
// ZRangeWithAsc 升序查询
ZRangeWithAsc(page, size int) ([]string, error)
// ZRange 返回zset所有元素
ZRange() ([][2]string, error)
ZGetScore(member interface{}) (interface{}, error)
ZDel(member interface{}) error
ZDelWithScore(score interface{}) error
// CountZSet 返回zset元素个数
CountZSet() (int, error)
SetExpires(expires int) error
}
type RedisExecutor struct {
conn redis.Conn
db int
key string
}
func (e *RedisExecutor) DB(index int) Executor {
e.db = index
return e
}
func (e *RedisExecutor) Key(key string) Executor {
e.key = key
return e
}
func (e *RedisExecutor) Do(commandName string, args ...interface{}) (interface{}, error) {
if _, err := e.conn.Do("select", e.db); err != nil {
return nil, err
}
return e.conn.Do(commandName, args...)
}
func (e *RedisExecutor) Keys() ([]string, error) {
data, err := e.Do("keys", "*")
if err != nil {
return nil, err
}
var keys []string
for _, key := range data.([]interface{}) {
keys = append(keys, string(key.([]uint8)))
}
return keys, nil
}
func (e *RedisExecutor) SetExpires(expires int) error {
_, err := e.Do("expire", e.key, expires)
return err
}
// HSet 设置map元素
func (e *RedisExecutor) HSet(entryK string, entryV interface{}) error {
_, err := e.Do("hset", e.key, entryK, entryV)
return err
}
func (e *RedisExecutor) Del() error {
_, err := e.Do("del", e.key)
return err
}
func (e *RedisExecutor) HDel(entryK string) error {
_, err := e.Do("del", e.key, entryK)
return err
}
func (e *RedisExecutor) Set(value interface{}) error {
_, err := e.Do("set", e.key, value)
return err
}
func (e *RedisExecutor) Get() (interface{}, error) {
return e.Do("get", e.key)
}
func (e *RedisExecutor) Exist() (bool, error) {
return redis.Bool(e.Do("exist", e.key))
}
func (e *RedisExecutor) HGet(k string) ([]byte, error) {
data, err := e.Do("hget", e.key, k)
if err != nil {
return nil, err
} else if data == nil {
return nil, nil
}
return data.([]byte), err
}
func (e *RedisExecutor) HGetAll() (map[string][]byte, error) {
data, err := e.Do("hgetall", e.key)
if err != nil {
return nil, err
} else if data == nil {
return nil, err
}
entries := data.([]interface{})
n := len(entries) / 2
result := make(map[string][]byte, n)
for i := 0; i < n; i++ {
result[string(entries[i*2].([]uint8))] = entries[i*2+1].([]uint8)
}
return result, err
}
func (e *RedisExecutor) HExist(k string) (bool, error) {
return redis.Bool(e.Do("exist", e.key, k))
}
func (e *RedisExecutor) HSan(page, size int) ([][]string, error) {
reply, err := e.Do("hscan", e.key, (page-1)*size, "count", size)
if err != nil {
return nil, err
}
response, _ := reply.([]interface{})
_ = response[0]
data := response[1].([]interface{})
n := len(data) / 2
var result [][]string
for i := 0; i < n; i++ {
pair := make([]string, 2)
pair[0] = string(data[i*2].([]uint8))
pair[1] = string(data[i*2+1].([]uint8))
result = append(result, pair)
}
return result, err
}
func (e *RedisExecutor) ZAdd(score interface{}, v interface{}) error {
_, err := e.Do("zadd", e.key, score, v)
return err
}
func (e *RedisExecutor) ZAddWithNotExists(score interface{}, v interface{}) error {
_, err := e.Do("zadd", e.key, "nx", score, v)
return err
}
func (e *RedisExecutor) ZRangeWithDesc(page, size int) ([]string, error) {
reply, err := e.Do("zrevrange", e.key, (page-1)*size, (page-1)*size+size-1)
data := reply.([]interface{})
var result []string
for _, v := range data {
result = append(result, string(v.([]uint8)))
}
return result, err
}
func (e *RedisExecutor) ZRangeWithAsc(page, size int) ([]string, error) {
reply, err := e.Do("zrange", e.key, (page-1)*size, (page-1)*size+size-1)
data := reply.([]interface{})
var result []string
for _, v := range data {
result = append(result, string(v.([]uint8)))
}
return result, err
}
func (e *RedisExecutor) ZDel(member interface{}) error {
_, err := e.Do("zrem", e.key, member)
return err
}
func (e *RedisExecutor) ZDelWithScore(score interface{}) error {
_, err := e.Do("zremrangebyscore", e.key, score, score)
return err
}
func (e *RedisExecutor) ZRange() ([][2]string, error) {
reply, err := e.Do("zrange", e.key, 0, -1, "withscores")
if err != nil {
return nil, err
}
data := reply.([]interface{})
n := len(data) / 2
var result [][2]string
for i := 0; i < n; i++ {
var pair [2]string
pair[0] = string(data[i*2].([]uint8))
pair[1] = string(data[i*2+1].([]uint8))
result = append(result, pair)
}
return result, nil
}
func (e *RedisExecutor) ZGetScore(member interface{}) (interface{}, error) {
return e.Do("zrank", e.key, member)
}
func (e *RedisExecutor) CountZSet() (int, error) {
do, err := e.Do("zcard", e.key)
if err != nil {
return 0, err
}
return int(do.(int64)), err
}
func StartExpiredKeysSubscription(utils *RedisUtils, db int, cb func(db int, key string)) error {
conn := utils.pool.Get()
if "" != utils.password {
if _, err := conn.Do("auth", utils.password); err != nil {
return err
}
}
if _, err := conn.Do("config", "set", "protected-mode", "no"); err != nil {
return err
}
if _, err := conn.Do("config", "set", "notify-keyspace-events", "AE"); err != nil {
return err
}
redisClient := redis.PubSubConn{Conn: conn}
pattern := fmt.Sprintf("__keyevent@%d__:expired", db)
if err := redisClient.PSubscribe(pattern); err != nil {
return err
}
go func() {
for {
switch msg := redisClient.Receive().(type) {
case redis.Message:
if pattern == msg.Pattern {
key := string(msg.Data)
go cb(db, key)
}
break
case redis.Subscription:
break
case error:
break
}
}
}()
return nil
}
func NewRedisUtils(addr, password string) *RedisUtils {
return &RedisUtils{
pool: &redis.Pool{
MaxIdle: 50, // 最大空闲连接数
MaxActive: 0, // 和数据库的最大连接数0 表示没有限制
IdleTimeout: 1000, // 最大空闲时间
Dial: func() (redis.Conn, error) { // 初始化连接的代码
return redis.Dial("tcp", addr)
},
},
password: password,
}
}

101
redis_test.go Normal file
View File

@@ -0,0 +1,101 @@
package main
import (
utils2 "github.com/lkmio/avformat/utils"
"strconv"
"testing"
"time"
)
func TestRedisUtils(t *testing.T) {
utils := NewRedisUtils("localhost:6379", "")
executor, err := utils.CreateExecutor()
if err != nil {
panic(err)
}
executor.DB(1)
t.Run("map", func(t *testing.T) {
err = executor.Key("utils").HSet("user", "name")
if err != nil {
panic(err)
}
get, err2 := executor.HGet("user")
utils2.Assert(err2 == nil)
println(get)
all, err2 := executor.HGetAll()
utils2.Assert(err2 == nil)
println(all)
for i := 0; i < 10000; i++ {
executor.HSet(strconv.Itoa(i), strconv.Itoa(i))
}
executor.HSan(1, 10)
err = executor.Key("key_expires").Set("name")
err = executor.SetExpires(10)
})
t.Run("zset", func(t *testing.T) {
_, err = executor.ZRange()
err = executor.Key("zset").ZAddWithNotExists(float64(time.Now().UnixMilli()), 1)
utils2.Assert(err == nil)
err = executor.Key("zset").ZAddWithNotExists(float64(time.Now().UnixMilli()), 1)
utils2.Assert(err == nil)
for i := 0; i < 10; i++ {
if err = executor.Key("zset").ZAdd(float64(i), i); err != nil {
panic(err)
}
}
score, err := executor.Key("zset").ZGetScore(9)
utils2.Assert(err != nil)
println(score)
score, err = executor.Key("zset").ZGetScore(100)
utils2.Assert(err != nil)
println(score)
_, err = executor.ZRange()
utils2.Assert(err == nil)
asc, err2 := executor.ZRangeWithAsc(1, 5)
if err2 != nil {
panic(err2)
}
println(asc)
values, err2 := executor.ZRangeWithDesc(1, 5)
if err2 != nil {
panic(err2)
}
println(values)
for i := 0; i < 5; i++ {
executor.ZDel(i)
}
set, err := executor.CountZSet()
println(set)
})
err = StartExpiredKeysSubscription(utils, 1, func(db int, key string) {
println("redis 过期. key: " + key)
})
keys, err := executor.Keys()
utils2.Assert(err == nil)
println(keys)
select {}
}

159
sip_handler.go Normal file
View File

@@ -0,0 +1,159 @@
package main
import "strings"
// Handler 处理下级设备的消息
type Handler interface {
OnUnregister(id string)
OnRegister(id, transport, addr string) (int, GBDevice, bool)
OnKeepAlive(id string) bool
OnCatalog(device GBDevice, response *CatalogResponse)
OnRecord(device GBDevice, response *QueryRecordInfoResponse)
OnDeviceInfo(device GBDevice, response *DeviceInfoResponse)
OnNotifyPosition(notify *MobilePositionNotify)
}
type EventHandler struct {
}
func (e *EventHandler) OnUnregister(id string) {
device := DeviceManager.Find(id)
if device != nil {
device.(*Device).Status = OFF
}
if DB != nil {
_ = DB.SaveDevice(device.(*Device))
}
}
func (e *EventHandler) OnRegister(id, transport, addr string) (int, GBDevice, bool) {
// 不能和级联设备的上级ID冲突
if PlatformManager.FindPlatform(id) != nil {
Sugar.Errorf("注册失败, ID与级联设备冲突. device: %s", id)
return -1, nil, false
}
var device *Device
old := DeviceManager.Find(id)
if old != nil {
old.(*Device).ID = id
old.(*Device).Transport = transport
old.(*Device).RemoteAddr = addr
device = old.(*Device)
} else {
device = &Device{
ID: id,
Transport: transport,
RemoteAddr: addr,
}
DeviceManager.Add(device)
}
device.Status = ON
if DB != nil {
if err := DB.SaveDevice(device); err != nil {
Sugar.Errorf("保存设备信息到数据库失败 device: %s err: %s", id, err.Error())
}
}
return 3600, device, device.ChannelsTotal < 1
}
func (e *EventHandler) OnKeepAlive(id string) bool {
device := DeviceManager.Find(id)
if device == nil {
Sugar.Errorf("更新心跳失败, 设备不存在. device: %s", id)
return false
}
if !device.(*Device).Online() {
Sugar.Errorf("更新心跳失败, 设备离线. device: %s", id)
}
if DB != nil {
if err := DB.RefreshHeartbeat(id); err != nil {
Sugar.Errorf("更新有效期失败. device: %s err: %s", id, err.Error())
}
}
return true
}
func (e *EventHandler) OnCatalog(device GBDevice, response *CatalogResponse) {
if DB == nil {
return
}
id := device.GetID()
for _, channel := range response.DeviceList.Devices {
// 状态转为大写
channel.Status = OnlineStatus(strings.ToUpper(channel.Status.String()))
// 默认在线
if OFF != channel.Status {
channel.Status = ON
}
// 判断之前是否已经存在通道, 如果不存在累加总数
old, _ := DB.QueryChannel(id, channel.DeviceID)
if err := DB.SaveChannel(id, channel); err != nil {
Sugar.Infof("保存通道到数据库失败 err: %s", err.Error())
}
if old == nil {
device.(*Device).ChannelsTotal++
device.(*Device).ChannelsOnline++
} else if old.Status != channel.Status {
// 保留处理其他状态
if ON == channel.Status {
device.(*Device).ChannelsOnline++
} else if OFF == channel.Status {
device.(*Device).ChannelsOnline--
} else {
return
}
}
if err := DB.SaveDevice(device.(*Device)); err != nil {
Sugar.Errorf("更新设备在线数失败 err: %s", err.Error())
}
}
}
func (e *EventHandler) OnRecord(device GBDevice, response *QueryRecordInfoResponse) {
event := SNManager.FindEvent(response.SN)
if event == nil {
Sugar.Errorf("处理录像查询响应失败 SN: %d", response.SN)
return
}
event(response)
}
func (e *EventHandler) OnDeviceInfo(device GBDevice, response *DeviceInfoResponse) {
device.(*Device).Manufacturer = response.Manufacturer
device.(*Device).Model = response.Model
device.(*Device).Firmware = response.Firmware
device.(*Device).Name = response.DeviceName
if DB != nil {
if err := DB.SaveDevice(device.(*Device)); err != nil {
Sugar.Errorf("保存设备信息到数据库失败 device: %s err: %s", device.GetID(), err.Error())
}
}
}
func (e *EventHandler) OnNotifyPosition(notify *MobilePositionNotify) {
}

View File

@@ -60,6 +60,7 @@ type sipServer struct {
sip gosip.Server
listenAddr string
xmlReflectTypes map[string]reflect.Type
handler EventHandler
}
func (s *sipServer) Send(msg sip.Message) error {
@@ -73,36 +74,39 @@ func setToTag(response sip.Message) {
}
func (s *sipServer) OnRegister(req sip.Request, tx sip.ServerTransaction, parent bool) {
var device *Device
var query bool
var device GBDevice
var queryCatalog bool
_ = req.GetHeaders("Authorization")
fromHeader := req.GetHeaders("From")[0].(*sip.FromHeader)
expiresHeader := req.GetHeaders("Expires")
response := sip.NewResponseFromRequest("", req, 200, "OK", "")
id := fromHeader.Address.User().String()
if expiresHeader != nil && "0" == expiresHeader[0].Value() {
Sugar.Infof("注销信令 from:%s", fromHeader.Address.User())
DB.UnRegisterDevice(fromHeader.Name())
Sugar.Infof("设备注销 Device: %s", id)
s.handler.OnUnregister(id)
} else /*if authorizationHeader == nil*/ {
expires := sip.Expires(3600)
response.AppendHeader(&expires)
//sip.NewResponseFromRequest("", req, 401, "Unauthorized", "")
device = &Device{
ID: fromHeader.Address.User().String(),
Transport: req.Transport(),
RemoteAddr: req.Source(),
var expires int
expires, device, queryCatalog = s.handler.OnRegister(id, req.Transport(), req.Source())
if device != nil {
Sugar.Infof("注册成功 Device: %s", id)
expiresHeader := sip.Expires(expires)
response.AppendHeader(&expiresHeader)
} else {
Sugar.Infof("注册失败 Device: %s", id)
response = sip.NewResponseFromRequest("", req, 401, "Unauthorized", "")
}
err, b := DB.RegisterDevice(device)
query = err != nil || b
}
SendResponse(tx, response)
if device != nil && query {
if device != nil {
// 查询设备信息
device.QueryDeviceInfo()
}
if queryCatalog {
device.QueryCatalog()
}
}
@@ -182,20 +186,20 @@ func (s *sipServer) OnNotify(req sip.Request, tx sip.ServerTransaction, parent b
mobilePosition := MobilePositionNotify{}
if err := DecodeXML([]byte(req.Body()), &mobilePosition); err != nil {
Sugar.Errorf("解析位置通知失败 err:%s body:%s", err.Error(), req.Body())
Sugar.Errorf("解析位置通知失败 err: %s request: %s", err.Error(), req.String())
return
}
if device := DeviceManager.Find(mobilePosition.DeviceID); device != nil {
device.OnNotifyPosition(&mobilePosition)
s.handler.OnNotifyPosition(&mobilePosition)
}
}
func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent bool) {
var online bool
var ok bool
defer func() {
var response sip.Response
if online {
if ok {
response = CreateResponseWithStatusCode(req, http.StatusOK)
} else {
response = CreateResponseWithStatusCode(req, http.StatusForbidden)
@@ -209,12 +213,13 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
cmd := GetCmdType(body)
src, ok := s.xmlReflectTypes[xmlName+"."+cmd]
if !ok {
Sugar.Errorf("处理XML消息失败, 找不到结构体. request: %s", req.String())
return
}
message := reflect.New(src).Interface()
if err := DecodeXML([]byte(body), message); err != nil {
Sugar.Errorf("解析xml异常 >>> %s %s", err.Error(), body)
Sugar.Errorf("解析XML消息失败 err: %s request: %s", err.Error(), body)
return
}
@@ -227,8 +232,8 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
device = DeviceManager.Find(deviceId)
}
if online = device != nil; !online {
Sugar.Errorf("处理Msg失败 设备离线: %s Msg: %s", deviceId, body)
if ok = device != nil; !ok {
Sugar.Errorf("处理XML消息失败, 设备离线: %s request: %s", deviceId, req.String())
return
}
@@ -236,29 +241,54 @@ func (s *sipServer) OnMessage(req sip.Request, tx sip.ServerTransaction, parent
case XmlNameControl:
break
case XmlNameQuery:
client, ok := device.(GBClient)
// 被上级查询
var client GBClient
client, ok = device.(GBClient)
if !ok {
online = false
Sugar.Errorf("处理XML消息失败, 类型转换失败. request: %s", req.String())
return
}
if CmdDeviceInfo == cmd {
client.OnQueryDeviceInfo(message.(*BaseMessage).SN)
} else if CmdCatalog == cmd {
client.OnQueryCatalog(message.(*BaseMessage).SN)
var channels []*Channel
// 查询出所有通道
if DB != nil {
result, _, err := DB.QueryChannels(client.GetID(), 1, 0xFFFFFFFF)
if err != nil {
Sugar.Errorf("查询设备通道列表失败 err: %s device: %s", err.Error(), client.GetID())
}
channels = result
} else {
// 从模拟多个国标客户端中查找
channels = DeviceChannelsManager.FindChannels(client.GetID())
}
client.OnQueryCatalog(message.(*BaseMessage).SN, channels)
}
break
case XmlNameNotify:
if CmdKeepalive == cmd {
device.OnKeepalive()
// 下级设备心跳通知
ok = s.handler.OnKeepAlive(deviceId)
}
break
case XmlNameResponse:
// 查询下级的应答
if CmdCatalog == cmd {
device.OnCatalog(message.(*CatalogResponse))
go s.handler.OnCatalog(device, message.(*CatalogResponse))
} else if CmdRecordInfo == cmd {
device.OnRecord(message.(*QueryRecordInfoResponse))
go s.handler.OnRecord(device, message.(*QueryRecordInfoResponse))
} else if CmdDeviceInfo == cmd {
go s.handler.OnDeviceInfo(device, message.(*DeviceInfoResponse))
}
break
}
}
@@ -272,29 +302,25 @@ func SendResponseWithStatusCode(request sip.Request, tx sip.ServerTransaction, c
}
func SendResponse(tx sip.ServerTransaction, response sip.Response) bool {
Sugar.Infof("send response >>> %s", response.String())
sendError := tx.Respond(response)
if sendError != nil {
Sugar.Infof("send response error %s %s", sendError.Error(), response.String())
Sugar.Errorf("发送响应消息失败, error: %s response: %s", sendError.Error(), response.String())
}
return sendError == nil
}
func (s *sipServer) SendRequestWithContext(ctx context.Context, request sip.Request, options ...gosip.RequestWithContextOption) {
Sugar.Infof("send reqeust: %s", request.String())
s.sip.RequestWithContext(ctx, request, options...)
}
func (s *sipServer) SendRequestWithTimeout(seconds int, request sip.Request, options ...gosip.RequestWithContextOption) (sip.Response, error) {
Sugar.Infof("send reqeust: %s", request.String())
reqCtx, _ := context.WithTimeout(context.Background(), time.Duration(seconds)*time.Second)
return s.sip.RequestWithContext(reqCtx, request, options...)
}
func (s *sipServer) SendRequest(request sip.Request) sip.ClientTransaction {
Sugar.Infof("send reqeust: %s", request.String())
transaction, err := s.sip.Request(request)
if err != nil {
panic(err)
@@ -310,7 +336,6 @@ func (s *sipServer) ListenAddr() string {
// 过滤SIP消息、超找消息来源
func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool)) gosip.RequestHandler {
return func(req sip.Request, tx sip.ServerTransaction) {
Sugar.Infof("process request: %s", req.String())
source := req.Source()
platform := PlatformManager.FindPlatformWithServerAddr(source)
@@ -319,6 +344,7 @@ func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool
if platform == nil {
// SUBSCRIBE/INFO只能上级发起
SendResponseWithStatusCode(req, tx, http.StatusBadRequest)
Sugar.Errorf("处理%s请求失败, %s消息只能上级发起. request: %s", req.Method(), req.Method(), req.String())
return
}
break
@@ -326,6 +352,7 @@ func filterRequest(f func(req sip.Request, tx sip.ServerTransaction, parent bool
if platform != nil {
// NOTIFY和REGISTER只能下级发起
SendResponseWithStatusCode(req, tx, http.StatusBadRequest)
Sugar.Errorf("处理%s请求失败, %s消息只能下级发起. request: %s", req.Method(), req.Method(), req.String())
return
}
break

50
xml.go
View File

@@ -3,29 +3,33 @@ package main
import "encoding/xml"
type Channel struct {
DeviceID string `xml:"DeviceID"`
Name string `xml:"Name,omitempty"`
Manufacturer string `xml:"Manufacturer,omitempty"`
Model string `xml:"Model,omitempty"`
Owner string `xml:"Owner,omitempty"`
CivilCode string `xml:"CivilCode,omitempty"`
Block string `xml:"Block,omitempty"`
Address string `xml:"Address,omitempty"`
Parental string `xml:"Parental,omitempty"`
ParentID string `xml:"ParentID,omitempty"`
SafetyWay string `xml:"SafetyWay,omitempty"`
RegisterWay string `xml:"RegisterWay,omitempty"`
CertNum string `xml:"CertNum,omitempty"`
Certifiable string `xml:"Certifiable,omitempty"`
ErrCode string `xml:"ErrCode,omitempty"`
EndTime string `xml:"EndTime,omitempty"`
Secrecy string `xml:"Secrecy,omitempty"`
IPAddress string `xml:"IPAddress,omitempty"`
Port string `xml:"Port,omitempty"`
Password string `xml:"Password,omitempty"`
Status string `xml:"Status,omitempty"`
Longitude string `xml:"Longitude,omitempty"`
Latitude string `xml:"Latitude,omitempty"`
DeviceID string `xml:"DeviceID"`
Name string `xml:"Name,omitempty"`
Manufacturer string `xml:"Manufacturer,omitempty"`
Model string `xml:"Model,omitempty"`
Owner string `xml:"Owner,omitempty"`
CivilCode string `xml:"CivilCode,omitempty"`
Block string `xml:"Block,omitempty"`
Address string `xml:"Address,omitempty"`
Parental string `xml:"Parental,omitempty"`
ParentID string `xml:"ParentID,omitempty"`
SafetyWay string `xml:"SafetyWay,omitempty"`
RegisterWay string `xml:"RegisterWay,omitempty"`
CertNum string `xml:"CertNum,omitempty"`
Certifiable string `xml:"Certifiable,omitempty"`
ErrCode string `xml:"ErrCode,omitempty"`
EndTime string `xml:"EndTime,omitempty"`
Secrecy string `xml:"Secrecy,omitempty"`
IPAddress string `xml:"IPAddress,omitempty"`
Port string `xml:"Port,omitempty"`
Password string `xml:"Password,omitempty"`
Status OnlineStatus `xml:"Status,omitempty"`
Longitude string `xml:"Longitude,omitempty"`
Latitude string `xml:"Latitude,omitempty"`
}
func (d *Channel) Online() bool {
return d.Status == ON
}
type BaseMessageGetter interface {