mirror of
https://github.com/lkmio/gb-cms.git
synced 2025-09-26 19:51:22 +08:00
feat: 适配livegbs接口
This commit is contained in:
180
api.go
180
api.go
@@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/lkmio/avformat/utils"
|
"github.com/lkmio/avformat/utils"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@@ -101,6 +102,12 @@ type PageQueryChannel struct {
|
|||||||
GroupID string `json:"group_id"`
|
GroupID string `json:"group_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SetMediaTransportReq struct {
|
||||||
|
DeviceID string `json:"serial"`
|
||||||
|
MediaTransport string `json:"media_transport"`
|
||||||
|
MediaTransportMode string `json:"media_transport_mode"`
|
||||||
|
}
|
||||||
|
|
||||||
var apiServer *ApiServer
|
var apiServer *ApiServer
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@@ -161,12 +168,15 @@ func startApiServer(addr string) {
|
|||||||
// 关闭国标流. 如果是实时流, 等收流或空闲超时自行删除. 回放或下载流立即删除.
|
// 关闭国标流. 如果是实时流, 等收流或空闲超时自行删除. 回放或下载流立即删除.
|
||||||
apiServer.router.HandleFunc("/api/v1/stream/close", common.WithJsonParams(apiServer.OnCloseStream, &StreamIDParams{}))
|
apiServer.router.HandleFunc("/api/v1/stream/close", common.WithJsonParams(apiServer.OnCloseStream, &StreamIDParams{}))
|
||||||
|
|
||||||
apiServer.router.HandleFunc("/api/v1/device/list", withVerify(common.WithQueryStringParams(apiServer.OnDeviceList, QueryDeviceChannel{}))) // 查询设备列表
|
apiServer.router.HandleFunc("/api/v1/device/list", withVerify(common.WithQueryStringParams(apiServer.OnDeviceList, QueryDeviceChannel{}))) // 查询设备列表
|
||||||
apiServer.router.HandleFunc("/api/v1/device/channellist", withVerify(common.WithQueryStringParams(apiServer.OnChannelList, QueryDeviceChannel{}))) // 查询通道列表
|
apiServer.router.HandleFunc("/api/v1/device/channellist", withVerify(common.WithQueryStringParams(apiServer.OnChannelList, QueryDeviceChannel{}))) // 查询通道列表
|
||||||
apiServer.router.HandleFunc("/api/v1/playback/recordlist", withVerify(common.WithQueryStringParams(apiServer.OnRecordList, QueryRecordParams{}))) // 查询录像列表
|
apiServer.router.HandleFunc("/api/v1/device/fetchcatalog", withVerify(common.WithQueryStringParams(apiServer.OnCatalogQuery, QueryDeviceChannel{}))) // 更新通道
|
||||||
apiServer.router.HandleFunc("/api/v1/position/sub", common.WithJsonResponse(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置
|
apiServer.router.HandleFunc("/api/v1/playback/recordlist", withVerify(common.WithQueryStringParams(apiServer.OnRecordList, QueryRecordParams{}))) // 查询录像列表
|
||||||
apiServer.router.HandleFunc("/api/v1/playback/seek", common.WithJsonResponse(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek
|
apiServer.router.HandleFunc("/api/v1/stream/info", withVerify(apiServer.OnStreamInfo))
|
||||||
apiServer.router.HandleFunc("/api/v1/control/ptz", apiServer.OnPTZControl) // 云台控制
|
|
||||||
|
apiServer.router.HandleFunc("/api/v1/position/sub", common.WithJsonResponse(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置
|
||||||
|
apiServer.router.HandleFunc("/api/v1/playback/seek", common.WithJsonResponse(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek
|
||||||
|
apiServer.router.HandleFunc("/api/v1/control/ptz", apiServer.OnPTZControl) // 云台控制
|
||||||
|
|
||||||
apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 级联设备列表
|
apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 级联设备列表
|
||||||
apiServer.router.HandleFunc("/api/v1/platform/add", common.WithJsonResponse(apiServer.OnPlatformAdd, &dao.PlatformModel{})) // 添加级联设备
|
apiServer.router.HandleFunc("/api/v1/platform/add", common.WithJsonResponse(apiServer.OnPlatformAdd, &dao.PlatformModel{})) // 添加级联设备
|
||||||
@@ -186,7 +196,7 @@ func startApiServer(addr string) {
|
|||||||
apiServer.router.HandleFunc("/api/v1/jt/channel/add", common.WithJsonResponse(apiServer.OnVirtualChannelAdd, &dao.ChannelModel{}))
|
apiServer.router.HandleFunc("/api/v1/jt/channel/add", common.WithJsonResponse(apiServer.OnVirtualChannelAdd, &dao.ChannelModel{}))
|
||||||
apiServer.router.HandleFunc("/api/v1/jt/channel/edit", common.WithJsonResponse(apiServer.OnVirtualChannelEdit, &dao.ChannelModel{}))
|
apiServer.router.HandleFunc("/api/v1/jt/channel/edit", common.WithJsonResponse(apiServer.OnVirtualChannelEdit, &dao.ChannelModel{}))
|
||||||
apiServer.router.HandleFunc("/api/v1/jt/channel/remove", common.WithJsonResponse(apiServer.OnVirtualChannelRemove, &dao.ChannelModel{}))
|
apiServer.router.HandleFunc("/api/v1/jt/channel/remove", common.WithJsonResponse(apiServer.OnVirtualChannelRemove, &dao.ChannelModel{}))
|
||||||
apiServer.router.HandleFunc("/api/v1/device/setmediatransport", withVerify(common.WithJsonResponse2(apiServer.OnDeviceMediaTransportSet)))
|
apiServer.router.HandleFunc("/api/v1/device/setmediatransport", withVerify(common.WithFormDataParams(apiServer.OnDeviceMediaTransportSet, SetMediaTransportReq{})))
|
||||||
|
|
||||||
registerLiveGBSApi()
|
registerLiveGBSApi()
|
||||||
|
|
||||||
@@ -321,7 +331,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
|
|||||||
w.WriteHeader(code)
|
w.WriteHeader(code)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) {
|
func (api *ApiServer) OnPlayDone(params *PlayDoneParams, _ http.ResponseWriter, _ *http.Request) {
|
||||||
log.Sugar.Debugf("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
log.Sugar.Debugf("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
||||||
|
|
||||||
sink, _ := dao.Sink.DeleteForwardSink(params.Stream, params.Sink)
|
sink, _ := dao.Sink.DeleteForwardSink(params.Stream, params.Sink)
|
||||||
@@ -340,7 +350,7 @@ func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *http.Request) {
|
func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, _ *http.Request) {
|
||||||
log.Sugar.Debugf("推流事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
log.Sugar.Debugf("推流事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
||||||
|
|
||||||
if stack.SourceTypeRtmp == params.Protocol {
|
if stack.SourceTypeRtmp == params.Protocol {
|
||||||
@@ -375,7 +385,7 @@ func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter, r *http.Request) {
|
func (api *ApiServer) OnPublishDone(params *StreamParams, _ http.ResponseWriter, _ *http.Request) {
|
||||||
log.Sugar.Debugf("推流结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
log.Sugar.Debugf("推流结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
||||||
|
|
||||||
stack.CloseStream(params.Stream, false)
|
stack.CloseStream(params.Stream, false)
|
||||||
@@ -385,7 +395,7 @@ func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, req *http.Request) {
|
func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, _ *http.Request) {
|
||||||
log.Sugar.Debugf("推流空闲超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
log.Sugar.Debugf("推流空闲超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
||||||
|
|
||||||
// 非rtmp空闲超时, 返回非200应答, 删除会话
|
// 非rtmp空闲超时, 返回非200应答, 删除会话
|
||||||
@@ -395,7 +405,7 @@ func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWriter, req *http.Request) {
|
func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWriter, _ *http.Request) {
|
||||||
log.Sugar.Debugf("收流超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
log.Sugar.Debugf("收流超时事件. protocol: %s stream: %s", params.Protocol, params.Stream)
|
||||||
|
|
||||||
// 非rtmp推流超时, 返回非200应答, 删除会话
|
// 非rtmp推流超时, 返回非200应答, 删除会话
|
||||||
@@ -405,11 +415,11 @@ func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWrit
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnRecord(params *RecordParams, w http.ResponseWriter, req *http.Request) {
|
func (api *ApiServer) OnRecord(params *RecordParams, _ http.ResponseWriter, _ *http.Request) {
|
||||||
log.Sugar.Infof("录制事件. protocol: %s stream: %s path:%s ", params.Protocol, params.Stream, params.Path)
|
log.Sugar.Infof("录制事件. protocol: %s stream: %s path:%s ", params.Protocol, params.Stream, params.Path)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnInvite(v *InviteParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnInvite(v *InviteParams, _ http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
action := strings.ToLower(vars["action"])
|
action := strings.ToLower(vars["action"])
|
||||||
|
|
||||||
@@ -553,7 +563,7 @@ func (api *ApiServer) DoInvite(inviteType common.InviteType, params *InviteParam
|
|||||||
return http.StatusOK, stream, nil
|
return http.StatusOK, stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnCloseStream(v *StreamIDParams, w http.ResponseWriter, r *http.Request) {
|
func (api *ApiServer) OnCloseStream(v *StreamIDParams, w http.ResponseWriter, _ *http.Request) {
|
||||||
//stream := StreamManager.Find(v.StreamID)
|
//stream := StreamManager.Find(v.StreamID)
|
||||||
//
|
//
|
||||||
//// 等空闲或收流超时会自动关闭
|
//// 等空闲或收流超时会自动关闭
|
||||||
@@ -561,7 +571,7 @@ func (api *ApiServer) OnCloseStream(v *StreamIDParams, w http.ResponseWriter, r
|
|||||||
// CloseStream(v.StreamID, true)
|
// CloseStream(v.StreamID, true)
|
||||||
//}
|
//}
|
||||||
|
|
||||||
common.HttpResponseOK(w, nil)
|
_ = common.HttpResponseOK(w, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryDeviceChannel 查询设备和通道的参数
|
// QueryDeviceChannel 查询设备和通道的参数
|
||||||
@@ -581,7 +591,7 @@ type QueryDeviceChannel struct {
|
|||||||
//channelType string // device/dir, 查询通道列表使用
|
//channelType string // device/dir, 查询通道列表使用
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||||
values := r.URL.Query()
|
values := r.URL.Query()
|
||||||
|
|
||||||
log.Sugar.Debugf("查询设备列表 %s", values.Encode())
|
log.Sugar.Debugf("查询设备列表 %s", values.Encode())
|
||||||
@@ -612,10 +622,22 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter,
|
|||||||
remoteIP := split[0]
|
remoteIP := split[0]
|
||||||
remotePort, _ := strconv.Atoi(split[1])
|
remotePort, _ := strconv.Atoi(split[1])
|
||||||
|
|
||||||
|
// 更新正在查询通道的进度
|
||||||
|
var catalogProgress string
|
||||||
|
data := stack.UniqueTaskManager.Find(stack.GenerateCatalogTaskID(device.GetID()))
|
||||||
|
if data != nil {
|
||||||
|
catalogSize := data.(*stack.CatalogProgress)
|
||||||
|
|
||||||
|
if catalogSize.TotalSize > 0 {
|
||||||
|
catalogProgress = fmt.Sprintf("%d/%d", catalogSize.RecvSize, catalogSize.TotalSize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
response.DeviceList_ = append(response.DeviceList_, LiveGBSDevice{
|
response.DeviceList_ = append(response.DeviceList_, LiveGBSDevice{
|
||||||
AlarmSubscribe: false,
|
AlarmSubscribe: false, // 报警订阅
|
||||||
CatalogInterval: 3600,
|
CatalogInterval: 3600, // 目录刷新时间
|
||||||
CatalogSubscribe: false,
|
CatalogProgress: catalogProgress,
|
||||||
|
CatalogSubscribe: false, // 目录订阅
|
||||||
ChannelCount: device.ChannelsTotal,
|
ChannelCount: device.ChannelsTotal,
|
||||||
ChannelOverLoad: false,
|
ChannelOverLoad: false,
|
||||||
Charset: "GB2312",
|
Charset: "GB2312",
|
||||||
@@ -637,9 +659,9 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter,
|
|||||||
MediaTransportMode: device.Setup.String(),
|
MediaTransportMode: device.Setup.String(),
|
||||||
Name: device.Name,
|
Name: device.Name,
|
||||||
Online: device.Online(),
|
Online: device.Online(),
|
||||||
PTZSubscribe: false,
|
PTZSubscribe: false, // PTZ订阅2022
|
||||||
Password: "",
|
Password: "",
|
||||||
PositionSubscribe: false,
|
PositionSubscribe: false, // 位置订阅
|
||||||
RecordCenter: false,
|
RecordCenter: false,
|
||||||
RecordIndistinct: false,
|
RecordIndistinct: false,
|
||||||
RecvStreamIP: "",
|
RecvStreamIP: "",
|
||||||
@@ -658,7 +680,7 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter,
|
|||||||
return &response, nil
|
return &response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||||
values := r.URL.Query()
|
values := r.URL.Query()
|
||||||
log.Sugar.Debugf("查询通道列表 %s", values.Encode())
|
log.Sugar.Debugf("查询通道列表 %s", values.Encode())
|
||||||
|
|
||||||
@@ -765,7 +787,7 @@ func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, w http.ResponseWriter
|
|||||||
return response, nil
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnRecordList(v *QueryRecordParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("查询录像列表 %v", *v)
|
log.Sugar.Debugf("查询录像列表 %v", *v)
|
||||||
|
|
||||||
model, _ := dao.Device.QueryDevice(v.DeviceID)
|
model, _ := dao.Device.QueryDevice(v.DeviceID)
|
||||||
@@ -848,7 +870,7 @@ func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter,
|
|||||||
return &response, nil
|
return &response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("订阅位置 %v", *v)
|
log.Sugar.Debugf("订阅位置 %v", *v)
|
||||||
|
|
||||||
model, _ := dao.Device.QueryDevice(v.DeviceID)
|
model, _ := dao.Device.QueryDevice(v.DeviceID)
|
||||||
@@ -866,7 +888,7 @@ func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, w http.ResponseWri
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnSeekPlayback(v *SeekParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnSeekPlayback(v *SeekParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("快进回放 %v", *v)
|
log.Sugar.Debugf("快进回放 %v", *v)
|
||||||
|
|
||||||
model, _ := dao.Stream.QueryStream(v.StreamId)
|
model, _ := dao.Stream.QueryStream(v.StreamId)
|
||||||
@@ -887,11 +909,11 @@ func (api *ApiServer) OnSeekPlayback(v *SeekParams, w http.ResponseWriter, r *ht
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPTZControl(w http.ResponseWriter, r *http.Request) {
|
func (api *ApiServer) OnPTZControl(_ http.ResponseWriter, _ *http.Request) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnHangup(v *BroadcastParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnHangup(v *BroadcastParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("广播挂断 %v", *v)
|
log.Sugar.Debugf("广播挂断 %v", *v)
|
||||||
|
|
||||||
id := common.GenerateStreamID(common.InviteTypeBroadcast, v.DeviceID, v.ChannelID, "", "")
|
id := common.GenerateStreamID(common.InviteTypeBroadcast, v.DeviceID, v.ChannelID, "", "")
|
||||||
@@ -902,7 +924,7 @@ func (api *ApiServer) OnHangup(v *BroadcastParams, w http.ResponseWriter, r *htt
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnBroadcast(v *BroadcastParams, _ http.ResponseWriter, r *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("广播邀请 %v", *v)
|
log.Sugar.Debugf("广播邀请 %v", *v)
|
||||||
|
|
||||||
var sinkStreamId common.StreamID
|
var sinkStreamId common.StreamID
|
||||||
@@ -1003,11 +1025,11 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnTalk(w http.ResponseWriter, r *http.Request) {
|
func (api *ApiServer) OnTalk(_ http.ResponseWriter, _ *http.Request) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) {
|
func (api *ApiServer) OnStarted(_ http.ResponseWriter, _ *http.Request) {
|
||||||
log.Sugar.Infof("lkm启动")
|
log.Sugar.Infof("lkm启动")
|
||||||
|
|
||||||
streams, _ := dao.Stream.DeleteStreams()
|
streams, _ := dao.Stream.DeleteStreams()
|
||||||
@@ -1021,7 +1043,7 @@ func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPlatformAdd(v *dao.PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnPlatformAdd(v *dao.PlatformModel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("添加级联设备 %v", *v)
|
log.Sugar.Debugf("添加级联设备 %v", *v)
|
||||||
|
|
||||||
if v.Username == "" {
|
if v.Username == "" {
|
||||||
@@ -1059,7 +1081,7 @@ func (api *ApiServer) OnPlatformAdd(v *dao.PlatformModel, w http.ResponseWriter,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPlatformRemove(v *dao.PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnPlatformRemove(v *dao.PlatformModel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("删除级联设备 %v", *v)
|
log.Sugar.Debugf("删除级联设备 %v", *v)
|
||||||
|
|
||||||
err := dao.Platform.DeleteUAByAddr(v.ServerAddr)
|
err := dao.Platform.DeleteUAByAddr(v.ServerAddr)
|
||||||
@@ -1072,12 +1094,12 @@ func (api *ApiServer) OnPlatformRemove(v *dao.PlatformModel, w http.ResponseWrit
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) {
|
func (api *ApiServer) OnPlatformList(_ http.ResponseWriter, _ *http.Request) {
|
||||||
//platforms := LoadPlatforms()
|
//platforms := LoadPlatforms()
|
||||||
//httpResponseOK(w, platforms)
|
//httpResponseOK(w, platforms)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("级联绑定通道 %v", *v)
|
log.Sugar.Debugf("级联绑定通道 %v", *v)
|
||||||
|
|
||||||
platform := stack.PlatformManager.Find(v.ServerAddr)
|
platform := stack.PlatformManager.Find(v.ServerAddr)
|
||||||
@@ -1096,7 +1118,7 @@ func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseW
|
|||||||
return channels, nil
|
return channels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
log.Sugar.Debugf("级联解绑通道 %v", *v)
|
log.Sugar.Debugf("级联解绑通道 %v", *v)
|
||||||
|
|
||||||
platform := stack.PlatformManager.Find(v.ServerAddr)
|
platform := stack.PlatformManager.Find(v.ServerAddr)
|
||||||
@@ -1114,26 +1136,94 @@ func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, w http.Respons
|
|||||||
return channels, nil
|
return channels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *ApiServer) OnDeviceMediaTransportSet(w http.ResponseWriter, r *http.Request) (interface{}, error) {
|
func (api *ApiServer) OnDeviceMediaTransportSet(req *SetMediaTransportReq, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
serial := r.FormValue("serial")
|
|
||||||
mediaTransport := r.FormValue("media_transport")
|
|
||||||
mediaTransportMode := r.FormValue("media_transport_mode")
|
|
||||||
|
|
||||||
var setupType common.SetupType
|
var setupType common.SetupType
|
||||||
if "udp" == strings.ToLower(mediaTransport) {
|
if "udp" == strings.ToLower(req.MediaTransport) {
|
||||||
setupType = common.SetupTypeUDP
|
setupType = common.SetupTypeUDP
|
||||||
} else if "passive" == strings.ToLower(mediaTransportMode) {
|
} else if "passive" == strings.ToLower(req.MediaTransportMode) {
|
||||||
setupType = common.SetupTypePassive
|
setupType = common.SetupTypePassive
|
||||||
} else if "active" == strings.ToLower(mediaTransportMode) {
|
} else if "active" == strings.ToLower(req.MediaTransportMode) {
|
||||||
setupType = common.SetupTypeActive
|
setupType = common.SetupTypeActive
|
||||||
} else {
|
} else {
|
||||||
return nil, fmt.Errorf("media_transport_mode error")
|
return nil, fmt.Errorf("media_transport_mode error")
|
||||||
}
|
}
|
||||||
|
|
||||||
err := dao.Device.UpdateMediaTransport(serial, setupType)
|
err := dao.Device.UpdateMediaTransport(req.DeviceID, setupType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return "OK", nil
|
return "OK", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (api *ApiServer) OnCatalogQuery(params *QueryDeviceChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) {
|
||||||
|
deviceModel, err := dao.Device.QueryDevice(params.DeviceID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if deviceModel == nil {
|
||||||
|
return nil, fmt.Errorf("not found device")
|
||||||
|
}
|
||||||
|
|
||||||
|
list, err := (&stack.Device{deviceModel}).QueryCatalog(15)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
response := struct {
|
||||||
|
ChannelCount int `json:"ChannelCount"`
|
||||||
|
ChannelList []*dao.ChannelModel `json:"ChannelList"`
|
||||||
|
}{
|
||||||
|
ChannelCount: len(list),
|
||||||
|
ChannelList: list,
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// 构建目标URL
|
||||||
|
targetURL := common.Config.MediaServer + r.URL.Path
|
||||||
|
if r.URL.RawQuery != "" {
|
||||||
|
targetURL += "?" + r.URL.RawQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
// 创建转发请求
|
||||||
|
proxyReq, err := http.NewRequest(r.Method, targetURL, r.Body)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "Error creating proxy request", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 复制请求头
|
||||||
|
for name, values := range r.Header {
|
||||||
|
for _, value := range values {
|
||||||
|
proxyReq.Header.Add(name, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 发送请求
|
||||||
|
client := &http.Client{
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
}
|
||||||
|
resp, err := client.Do(proxyReq)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "Error forwarding request", http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
// 复制响应头
|
||||||
|
for name, values := range resp.Header {
|
||||||
|
for _, value := range values {
|
||||||
|
w.Header().Add(name, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置状态码并转发响应体
|
||||||
|
w.WriteHeader(resp.StatusCode)
|
||||||
|
_, err = io.Copy(w, resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
log.Sugar.Errorf("Failed to copy response body: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -89,8 +89,8 @@ func registerLiveGBSApi() {
|
|||||||
DeviceTotal int `json:"DeviceTotal"`
|
DeviceTotal int `json:"DeviceTotal"`
|
||||||
}{
|
}{
|
||||||
ChannelCount: 16,
|
ChannelCount: 16,
|
||||||
ChannelOnline: 1,
|
ChannelOnline: ChannelOnlineCount,
|
||||||
ChannelTotal: 1,
|
ChannelTotal: ChannelTotalCount,
|
||||||
DeviceOnline: stack.OnlineDeviceManager.Count(),
|
DeviceOnline: stack.OnlineDeviceManager.Count(),
|
||||||
DeviceTotal: dao.DeviceCount,
|
DeviceTotal: dao.DeviceCount,
|
||||||
}
|
}
|
||||||
@@ -120,7 +120,7 @@ func registerLiveGBSApi() {
|
|||||||
Hardware: KernelArch,
|
Hardware: KernelArch,
|
||||||
InterfaceVersion: "v1",
|
InterfaceVersion: "v1",
|
||||||
|
|
||||||
RemainDays: 0,
|
RemainDays: 99,
|
||||||
RunningTime: FormatUptime(GetUptime()),
|
RunningTime: FormatUptime(GetUptime()),
|
||||||
Server: "github.com/lkmio/gb-cms dev",
|
Server: "github.com/lkmio/gb-cms dev",
|
||||||
ServerTime: time.Now().Format("2006-01-02 15:04:05"),
|
ServerTime: time.Now().Format("2006-01-02 15:04:05"),
|
||||||
|
@@ -92,17 +92,6 @@ func WithJsonResponse[T any](f func(params T, w http.ResponseWriter, req *http.R
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithJsonResponse2(f func(w http.ResponseWriter, req *http.Request) (interface{}, error)) func(http.ResponseWriter, *http.Request) {
|
|
||||||
return func(w http.ResponseWriter, req *http.Request) {
|
|
||||||
responseBody, err := f(w, req)
|
|
||||||
if err != nil {
|
|
||||||
_ = HttpResponseError(w, err.Error())
|
|
||||||
} else if responseBody != nil {
|
|
||||||
_ = HttpResponseJson(w, responseBody)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithQueryStringParams[T any](f func(params T, w http.ResponseWriter, req *http.Request) (interface{}, error), params interface{}) func(http.ResponseWriter, *http.Request) {
|
func WithQueryStringParams[T any](f func(params T, w http.ResponseWriter, req *http.Request) (interface{}, error), params interface{}) func(http.ResponseWriter, *http.Request) {
|
||||||
return func(w http.ResponseWriter, req *http.Request) {
|
return func(w http.ResponseWriter, req *http.Request) {
|
||||||
var newParams T
|
var newParams T
|
||||||
@@ -125,7 +114,8 @@ func WithQueryStringParams[T any](f func(params T, w http.ResponseWriter, req *h
|
|||||||
|
|
||||||
responseBody, err := f(result.(T), w, req)
|
responseBody, err := f(result.(T), w, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = HttpResponseError(w, err.Error())
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
_ = HttpResponseJson(w, err.Error())
|
||||||
} else if responseBody != nil {
|
} else if responseBody != nil {
|
||||||
_ = HttpResponseJson(w, responseBody)
|
_ = HttpResponseJson(w, responseBody)
|
||||||
}
|
}
|
||||||
@@ -153,7 +143,7 @@ func WithFormDataParams[T any](f func(params T, w http.ResponseWriter, req *http
|
|||||||
|
|
||||||
responseBody, err := f(result.(T), w, req)
|
responseBody, err := f(result.(T), w, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = HttpResponseError(w, err.Error())
|
_ = HttpResponseJson(w, err.Error())
|
||||||
} else if responseBody != nil {
|
} else if responseBody != nil {
|
||||||
_ = HttpResponseJson(w, responseBody)
|
_ = HttpResponseJson(w, responseBody)
|
||||||
}
|
}
|
||||||
|
@@ -9,7 +9,7 @@
|
|||||||
"alive_expires": 180,
|
"alive_expires": 180,
|
||||||
"mobile_position_interval": 10,
|
"mobile_position_interval": 10,
|
||||||
|
|
||||||
"media_server": "0.0.0.0:8080",
|
"media_server": "http://0.0.0.0:8080",
|
||||||
|
|
||||||
"?auto_close_on_idle": "拉流空闲时, 立即关闭流",
|
"?auto_close_on_idle": "拉流空闲时, 立即关闭流",
|
||||||
"auto_close_on_idle": true,
|
"auto_close_on_idle": true,
|
||||||
|
@@ -65,6 +65,8 @@ func (d *ChannelModel) Online() bool {
|
|||||||
type DaoChannel interface {
|
type DaoChannel interface {
|
||||||
SaveChannel(channel *ChannelModel) error
|
SaveChannel(channel *ChannelModel) error
|
||||||
|
|
||||||
|
SaveChannels(channel []*ChannelModel) error
|
||||||
|
|
||||||
UpdateChannelStatus(deviceId, channelId, status string) error
|
UpdateChannelStatus(deviceId, channelId, status string) error
|
||||||
|
|
||||||
QueryChannelByID(id uint) (*ChannelModel, error)
|
QueryChannelByID(id uint) (*ChannelModel, error)
|
||||||
@@ -93,9 +95,15 @@ type DaoChannel interface {
|
|||||||
|
|
||||||
DeleteChannel(deviceId string, channelId string) error
|
DeleteChannel(deviceId string, channelId string) error
|
||||||
|
|
||||||
|
DeleteChannels(deviceId string) error
|
||||||
|
|
||||||
UpdateRootID(rootId, newRootId string) error
|
UpdateRootID(rootId, newRootId string) error
|
||||||
|
|
||||||
UpdateChannel(channel *ChannelModel) error
|
UpdateChannel(channel *ChannelModel) error
|
||||||
|
|
||||||
|
TotalCount() (int, error)
|
||||||
|
|
||||||
|
OnlineCount(ids []string) (int, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type daoChannel struct {
|
type daoChannel struct {
|
||||||
@@ -111,6 +119,12 @@ func (d *daoChannel) SaveChannel(channel *ChannelModel) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *daoChannel) SaveChannels(channels []*ChannelModel) error {
|
||||||
|
return DBTransaction(func(tx *gorm.DB) error {
|
||||||
|
return tx.Save(channels).Error
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (d *daoChannel) UpdateChannelStatus(deviceId, channelId, status string) error {
|
func (d *daoChannel) UpdateChannelStatus(deviceId, channelId, status string) error {
|
||||||
return db.Model(&ChannelModel{}).Where("root_id =? and device_id =?", deviceId, channelId).Update("status", status).Error
|
return db.Model(&ChannelModel{}).Where("root_id =? and device_id =?", deviceId, channelId).Update("status", status).Error
|
||||||
}
|
}
|
||||||
@@ -235,6 +249,10 @@ func (d *daoChannel) SaveJTChannel(channel *ChannelModel) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *daoChannel) DeleteChannels(deviceId string) error {
|
||||||
|
return db.Where("root_id =?", deviceId).Unscoped().Delete(&ChannelModel{}).Error
|
||||||
|
}
|
||||||
|
|
||||||
func (d *daoChannel) DeleteChannel(deviceId string, channelId string) error {
|
func (d *daoChannel) DeleteChannel(deviceId string, channelId string) error {
|
||||||
return db.Where("root_id =? and device_id =?", deviceId, channelId).Unscoped().Delete(&ChannelModel{}).Error
|
return db.Where("root_id =? and device_id =?", deviceId, channelId).Unscoped().Delete(&ChannelModel{}).Error
|
||||||
}
|
}
|
||||||
@@ -262,3 +280,21 @@ func (d *daoChannel) UpdateChannel(channel *ChannelModel) error {
|
|||||||
return tx.Model(channel).Where("id =?", channel.ID).Updates(channel).Error
|
return tx.Model(channel).Where("id =?", channel.ID).Updates(channel).Error
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *daoChannel) TotalCount() (int, error) {
|
||||||
|
var total int64
|
||||||
|
tx := db.Model(&ChannelModel{}).Count(&total)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return 0, tx.Error
|
||||||
|
}
|
||||||
|
return int(total), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *daoChannel) OnlineCount(ids []string) (int, error) {
|
||||||
|
var total int64
|
||||||
|
tx := db.Model(&ChannelModel{}).Where("status =? and root_id in ?", "ON", ids).Count(&total)
|
||||||
|
if tx.Error != nil {
|
||||||
|
return 0, tx.Error
|
||||||
|
}
|
||||||
|
return int(total), nil
|
||||||
|
}
|
||||||
|
@@ -3,6 +3,7 @@ package main
|
|||||||
type LiveGBSDevice struct {
|
type LiveGBSDevice struct {
|
||||||
AlarmSubscribe bool `json:"AlarmSubscribe"`
|
AlarmSubscribe bool `json:"AlarmSubscribe"`
|
||||||
CatalogInterval int `json:"CatalogInterval"`
|
CatalogInterval int `json:"CatalogInterval"`
|
||||||
|
CatalogProgress string `json:"CatalogProgress,omitempty"` // 查询目录进度recvSize/totalSize
|
||||||
CatalogSubscribe bool `json:"CatalogSubscribe"`
|
CatalogSubscribe bool `json:"CatalogSubscribe"`
|
||||||
ChannelCount int `json:"ChannelCount"`
|
ChannelCount int `json:"ChannelCount"`
|
||||||
ChannelOverLoad bool `json:"ChannelOverLoad"`
|
ChannelOverLoad bool `json:"ChannelOverLoad"`
|
||||||
|
@@ -1,361 +1,370 @@
|
|||||||
package stack
|
package stack
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"gb-cms/common"
|
||||||
|
"gb-cms/dao"
|
||||||
|
"github.com/ghettovoice/gosip/sip"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
rtpPackets [][]byte
|
||||||
|
locks map[uint32]*sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
type MediaStream struct {
|
||||||
|
ssrc uint32
|
||||||
|
tcp bool
|
||||||
|
conn net.Conn
|
||||||
|
//transport transport.Transport
|
||||||
|
cancel context.CancelFunc
|
||||||
|
dialog sip.Request
|
||||||
|
ctx context.Context
|
||||||
|
|
||||||
|
closedCB func(sendBye bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MediaStream) write() {
|
||||||
|
//var index int
|
||||||
|
//length := len(rtpPackets)
|
||||||
|
//for m.ctx.Err() == nil && index < length {
|
||||||
|
// time.Sleep(time.Millisecond * 40)
|
||||||
|
//
|
||||||
|
// //一次发送某个时间范围内的所有rtp包
|
||||||
|
// ts := binary.BigEndian.Uint32(rtpPackets[index][2+4:])
|
||||||
|
// mutex := locks[ts]
|
||||||
|
// {
|
||||||
|
// mutex.Lock()
|
||||||
|
//
|
||||||
|
// for ; m.ctx.Err() == nil && index < length; index++ {
|
||||||
|
// bytes := rtpPackets[index]
|
||||||
|
// nextTS := binary.BigEndian.Uint32(bytes[2+4:])
|
||||||
|
// if nextTS != ts {
|
||||||
|
// break
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// rtp.ModifySSRC(bytes[2:], m.ssrc)
|
||||||
|
//
|
||||||
|
// if m.tcp {
|
||||||
|
// m.conn.Write(bytes)
|
||||||
|
// } else {
|
||||||
|
// m.transport.(*transport.UDPClient).Write(bytes[2:])
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// mutex.Unlock()
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
|
println("推流结束")
|
||||||
|
m.Close(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MediaStream) Start() {
|
||||||
|
m.ctx, m.cancel = context.WithCancel(context.Background())
|
||||||
|
go m.write()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MediaStream) Close(sendBye bool) {
|
||||||
|
m.cancel()
|
||||||
|
|
||||||
|
if m.closedCB != nil {
|
||||||
|
m.closedCB(sendBye)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MediaStream) OnConnected(conn net.Conn) []byte {
|
||||||
|
m.conn = conn
|
||||||
|
fmt.Printf("tcp连接:%s", conn.RemoteAddr())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MediaStream) OnPacket(conn net.Conn, data []byte) []byte {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MediaStream) OnDisConnected(conn net.Conn, err error) {
|
||||||
|
fmt.Printf("tcp断开连接:%s", conn.RemoteAddr())
|
||||||
|
m.Close(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
type vDevice struct {
|
||||||
|
*gbClient
|
||||||
|
streams map[string]*MediaStream
|
||||||
|
lock sync.Locker
|
||||||
|
}
|
||||||
|
|
||||||
|
// func CreateTransport(ip string, port int, setup string, handler transport.Handler) (transport.Transport, bool, error) {
|
||||||
|
// if "passive" == setup {
|
||||||
|
// tcpClient := &transport.TCPClient{}
|
||||||
|
// tcpClient.SetHandler(handler)
|
||||||
//
|
//
|
||||||
//import (
|
// _, err := tcpClient.Connect(nil, &net.TCPAddr{IP: net.ParseIP(ip), Port: port})
|
||||||
// "context"
|
// return tcpClient, true, err
|
||||||
// "encoding/binary"
|
// } else if "active" == setup {
|
||||||
// "encoding/json"
|
// tcpServer := &transport.TCPServer{}
|
||||||
// "fmt"
|
// tcpServer.SetHandler(handler)
|
||||||
// "github.com/ghettovoice/gosip/sip"
|
// err := tcpServer.Bind(nil)
|
||||||
// "github.com/lkmio/rtp"
|
|
||||||
// "github.com/lkmio/transport"
|
|
||||||
// "net"
|
|
||||||
// "net/http"
|
|
||||||
// "os"
|
|
||||||
// "strconv"
|
|
||||||
// "strings"
|
|
||||||
// "sync"
|
|
||||||
// "testing"
|
|
||||||
// "time"
|
|
||||||
//)
|
|
||||||
//
|
//
|
||||||
//var (
|
// return tcpServer, true, err
|
||||||
// rtpPackets [][]byte
|
// } else {
|
||||||
// locks map[uint32]*sync.RWMutex
|
// udp := &transport.UDPClient{}
|
||||||
//)
|
// err := udp.Connect(nil, &net.UDPAddr{IP: net.ParseIP(ip), Port: port})
|
||||||
//
|
// return udp, false, err
|
||||||
//type MediaStream struct {
|
|
||||||
// ssrc uint32
|
|
||||||
// tcp bool
|
|
||||||
// conn net.Conn
|
|
||||||
// transport transport.Transport
|
|
||||||
// cancel context.CancelFunc
|
|
||||||
// dialog sip.Request
|
|
||||||
// ctx context.Context
|
|
||||||
//
|
|
||||||
// closedCB func(sendBye bool)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (m *MediaStream) write() {
|
|
||||||
// var index int
|
|
||||||
// length := len(rtpPackets)
|
|
||||||
// for m.ctx.Err() == nil && index < length {
|
|
||||||
// time.Sleep(time.Millisecond * 40)
|
|
||||||
//
|
|
||||||
// //一次发送某个时间范围内的所有rtp包
|
|
||||||
// ts := binary.BigEndian.Uint32(rtpPackets[index][2+4:])
|
|
||||||
// mutex := locks[ts]
|
|
||||||
// {
|
|
||||||
// mutex.Lock()
|
|
||||||
//
|
|
||||||
// for ; m.ctx.Err() == nil && index < length; index++ {
|
|
||||||
// bytes := rtpPackets[index]
|
|
||||||
// nextTS := binary.BigEndian.Uint32(bytes[2+4:])
|
|
||||||
// if nextTS != ts {
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// rtp.ModifySSRC(bytes[2:], m.ssrc)
|
|
||||||
//
|
|
||||||
// if m.tcp {
|
|
||||||
// m.conn.Write(bytes)
|
|
||||||
// } else {
|
|
||||||
// m.transport.(*transport.UDPClient).Write(bytes[2:])
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// mutex.Unlock()
|
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
//
|
func (v vDevice) OnInvite(request sip.Request, user string) sip.Response {
|
||||||
// println("推流结束")
|
//if len(rtpPackets) < 1 {
|
||||||
// m.Close(true)
|
return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
|
||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
//func (m *MediaStream) Start() {
|
//offer, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body())
|
||||||
// m.ctx, m.cancel = context.WithCancel(context.Background())
|
//if err != nil {
|
||||||
// go m.write()
|
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
|
||||||
//}
|
//}
|
||||||
//
|
//
|
||||||
//func (m *MediaStream) Close(sendBye bool) {
|
//stream := &MediaStream{}
|
||||||
// m.cancel()
|
//socket, tcp, err := CreateTransport(offer.Addr, int(media.Port), offerSetup, stream)
|
||||||
//
|
//if err != nil {
|
||||||
// if m.closedCB != nil {
|
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
|
||||||
// m.closedCB(sendBye)
|
//}
|
||||||
// }
|
//
|
||||||
//}
|
//time := strings.Split(offer.Time, " ")
|
||||||
//
|
//if len(time) < 2 {
|
||||||
//func (m *MediaStream) OnConnected(conn net.Conn) []byte {
|
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
|
||||||
// m.conn = conn
|
//}
|
||||||
// fmt.Printf("tcp连接:%s", conn.RemoteAddr())
|
//
|
||||||
// return nil
|
//var ip string
|
||||||
//}
|
//var port sip.Port
|
||||||
//
|
//var contactAddr string
|
||||||
//func (m *MediaStream) OnPacket(conn net.Conn, data []byte) []byte {
|
//if v.sipUA.NatAddr != "" {
|
||||||
// return nil
|
// contactAddr = v.sipUA.NatAddr
|
||||||
//}
|
//} else {
|
||||||
//
|
// contactAddr = v.sipUA.ListenAddr
|
||||||
//func (m *MediaStream) OnDisConnected(conn net.Conn, err error) {
|
//}
|
||||||
// fmt.Printf("tcp断开连接:%s", conn.RemoteAddr())
|
//
|
||||||
// m.Close(true)
|
//host, p, _ := net.SplitHostPort(contactAddr)
|
||||||
//}
|
//ip = host
|
||||||
//
|
//atoi, _ := strconv.Atoi(p)
|
||||||
//type Platform struct {
|
//port = sip.Port(atoi)
|
||||||
// *gbClient
|
//
|
||||||
// streams map[string]*MediaStream
|
//contactAddress := &sip.Address{
|
||||||
// lock sync.Locker
|
// Uri: &sip.SipUri{
|
||||||
//}
|
// FUser: sip.String{Str: user},
|
||||||
//
|
// FHost: ip,
|
||||||
//func CreateTransport(ip string, port int, setup string, handler transport.Handler) (transport.Transport, bool, error) {
|
// FPort: &port,
|
||||||
// if "passive" == setup {
|
// },
|
||||||
// tcpClient := &transport.TCPClient{}
|
//}
|
||||||
// tcpClient.SetHandler(handler)
|
//
|
||||||
//
|
//answer := BuildSDP(user, offer.Session, ip, uint16(socket.ListenPort()), time[0], time[1], answerSetup, speed, ssrc)
|
||||||
// _, err := tcpClient.Connect(nil, &net.TCPAddr{IP: net.ParseIP(ip), Port: port})
|
//response := CreateResponseWithStatusCode(request, http.StatusOK)
|
||||||
// return tcpClient, true, err
|
//response.RemoveHeader("Contact")
|
||||||
// } else if "active" == setup {
|
//response.AppendHeader(contactAddress.AsContactHeader())
|
||||||
// tcpServer := &transport.TCPServer{}
|
//response.AppendHeader(&SDPMessageType)
|
||||||
// tcpServer.SetHandler(handler)
|
//response.SetBody(answer, true)
|
||||||
// err := tcpServer.Bind(nil)
|
//setToTag(response)
|
||||||
//
|
//
|
||||||
// return tcpServer, true, err
|
//i, _ := strconv.Atoi(ssrc)
|
||||||
// } else {
|
//stream.ssrc = uint32(i)
|
||||||
// udp := &transport.UDPClient{}
|
//stream.tcp = tcp
|
||||||
// err := udp.Connect(nil, &net.UDPAddr{IP: net.ParseIP(ip), Port: port})
|
//stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipUA.Domain)
|
||||||
// return udp, false, err
|
//callId, _ := response.CallID()
|
||||||
// }
|
//
|
||||||
//}
|
//{
|
||||||
//
|
// v.lock.Lock()
|
||||||
//func (v Platform) OnInvite(request sip.Request, user string) sip.Response {
|
// defer v.lock.Unlock()
|
||||||
// if len(rtpPackets) < 1 {
|
// v.streams[callId.Value()] = stream
|
||||||
// return CreateResponseWithStatusCode(request, http.StatusInternalServerError)
|
//}
|
||||||
// }
|
//
|
||||||
//
|
//// 设置网络断开回调
|
||||||
// offer, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body())
|
//stream.closedCB = func(sendBye bool) {
|
||||||
// if err != nil {
|
// if stream.dialog != nil {
|
||||||
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
|
// id, _ := stream.dialog.CallID()
|
||||||
// }
|
// StreamManager.RemoveWithCallId(id.Value())
|
||||||
//
|
//
|
||||||
// stream := &MediaStream{}
|
// {
|
||||||
// socket, tcp, err := CreateTransport(offer.Addr, int(media.Port), offerSetup, stream)
|
// v.lock.Lock()
|
||||||
// if err != nil {
|
// delete(v.streams, id.Value())
|
||||||
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
|
// v.lock.Unlock()
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// time := strings.Split(offer.Time, " ")
|
// if sendBye {
|
||||||
// if len(time) < 2 {
|
// bye := CreateRequestFromDialog(stream.dialog, sip.BYE)
|
||||||
// return CreateResponseWithStatusCode(request, http.StatusBadRequest)
|
// v.sipUA.stack.SendRequest(bye)
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// var ip string
|
// stream.dialog = nil
|
||||||
// var port sip.Port
|
// }
|
||||||
// var contactAddr string
|
//
|
||||||
// if v.sipUA.NatAddr != "" {
|
// if stream.transport != nil {
|
||||||
// contactAddr = v.sipUA.NatAddr
|
// stream.transport.Close()
|
||||||
// } else {
|
// stream.transport = nil
|
||||||
// contactAddr = v.sipUA.ListenAddr
|
// }
|
||||||
// }
|
//}
|
||||||
//
|
//
|
||||||
// host, p, _ := net.SplitHostPort(contactAddr)
|
//stream.transport = socket
|
||||||
// ip = host
|
//stream.Start()
|
||||||
// atoi, _ := strconv.Atoi(p)
|
//
|
||||||
// port = sip.Port(atoi)
|
//// 绑定到StreamManager, bye请求才会找到设备回调
|
||||||
//
|
//streamId := common.GenerateStreamID(common.InviteTypePlay, v.sipUA.Username, user, "", "")
|
||||||
// contactAddress := &sip.Address{
|
//s := StreamID{StreamID: streamId, Dialog: stream.dialog}
|
||||||
// Uri: &sip.SipUri{
|
//StreamManager.Add(&s)
|
||||||
// FUser: sip.String{Str: user},
|
//
|
||||||
// FHost: ip,
|
//callID, _ := request.CallID()
|
||||||
// FPort: &port,
|
//StreamManager.AddWithCallId(callID.Value(), &s)
|
||||||
// },
|
//return response
|
||||||
// }
|
}
|
||||||
//
|
|
||||||
// answer := BuildSDP(user, offer.Session, ip, uint16(socket.ListenPort()), time[0], time[1], answerSetup, speed, ssrc)
|
func (v vDevice) OnBye(request sip.Request) {
|
||||||
// response := CreateResponseWithStatusCode(request, http.StatusOK)
|
id, _ := request.CallID()
|
||||||
// response.RemoveHeader("Contact")
|
stream, ok := v.streams[id.Value()]
|
||||||
// response.AppendHeader(contactAddress.AsContactHeader())
|
if !ok {
|
||||||
// response.AppendHeader(&SDPMessageType)
|
return
|
||||||
// response.SetBody(answer, true)
|
}
|
||||||
// setToTag(response)
|
|
||||||
//
|
{
|
||||||
// i, _ := strconv.Atoi(ssrc)
|
// 此作用域内defer不会生效
|
||||||
// stream.ssrc = uint32(i)
|
v.lock.Lock()
|
||||||
// stream.tcp = tcp
|
delete(v.streams, id.Value())
|
||||||
// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipUA.Domain)
|
v.lock.Unlock()
|
||||||
// callId, _ := response.CallID()
|
}
|
||||||
//
|
|
||||||
// {
|
stream.Close(false)
|
||||||
// v.lock.Lock()
|
}
|
||||||
// defer v.lock.Unlock()
|
|
||||||
// v.streams[callId.Value()] = stream
|
func (v vDevice) Offline() {
|
||||||
// }
|
for _, stream := range v.streams {
|
||||||
//
|
stream.Close(true)
|
||||||
// // 设置网络断开回调
|
}
|
||||||
// stream.closedCB = func(sendBye bool) {
|
|
||||||
// if stream.dialog != nil {
|
v.streams = nil
|
||||||
// id, _ := stream.dialog.CallID()
|
}
|
||||||
// StreamManager.RemoveWithCallId(id.Value())
|
|
||||||
//
|
type ClientConfig struct {
|
||||||
// {
|
DeviceIDPrefix string `json:"device_id_prefix"`
|
||||||
// v.lock.Lock()
|
ChannelIDPrefix string `json:"channel_id_prefix"`
|
||||||
// delete(v.streams, id.Value())
|
ServerID string `json:"server_id"`
|
||||||
// v.lock.Unlock()
|
Domain string `json:"domain"`
|
||||||
// }
|
Password string `json:"password"`
|
||||||
//
|
ListenAddr string `json:"listenAddr"`
|
||||||
// if sendBye {
|
Count int `json:"count"`
|
||||||
// bye := CreateRequestFromDialog(stream.dialog, sip.BYE)
|
RawFilePath string `json:"rtp_over_tcp_raw_file_path"` // rtp over tcp源文件
|
||||||
// v.sipUA.stack.SendRequest(bye)
|
}
|
||||||
// }
|
|
||||||
//
|
func TestGBClient(t *testing.T) {
|
||||||
// stream.dialog = nil
|
configData, err := os.ReadFile("./client_benchmark_test_config.json")
|
||||||
// }
|
if err != nil {
|
||||||
//
|
panic(err)
|
||||||
// if stream.transport != nil {
|
}
|
||||||
// stream.transport.Close()
|
|
||||||
// stream.transport = nil
|
clientConfig := &ClientConfig{}
|
||||||
// }
|
if err = json.Unmarshal(configData, clientConfig); err != nil {
|
||||||
// }
|
panic(err)
|
||||||
//
|
}
|
||||||
// stream.transport = socket
|
|
||||||
// stream.Start()
|
rtpData, err := os.ReadFile(clientConfig.RawFilePath)
|
||||||
//
|
if err != nil {
|
||||||
// // 绑定到StreamManager, bye请求才会找到设备回调
|
println("读取rtp源文件失败 不能推流")
|
||||||
// streamId := GenerateStreamID(InviteTypePlay, v.sipUA.Username, user, "", "")
|
} else {
|
||||||
// s := StreamID{StreamID: streamId, Dialog: stream.dialog}
|
// 分割rtp包
|
||||||
// StreamManager.Add(&s)
|
offset := 2
|
||||||
//
|
length := len(rtpData)
|
||||||
// callID, _ := request.CallID()
|
locks = make(map[uint32]*sync.RWMutex, 128)
|
||||||
// StreamManager.AddWithCallId(callID.Value(), &s)
|
for rtpSize := 0; offset < length; offset += rtpSize + 2 {
|
||||||
// return response
|
rtpSize = int(binary.BigEndian.Uint16(rtpData[offset-2:]))
|
||||||
//}
|
if length-offset < rtpSize {
|
||||||
//
|
break
|
||||||
//func (v Platform) OnBye(request sip.Request) {
|
}
|
||||||
// id, _ := request.CallID()
|
|
||||||
// stream, ok := v.streams[id.Value()]
|
bytes := rtpData[offset : offset+rtpSize]
|
||||||
// if !ok {
|
ts := binary.BigEndian.Uint32(bytes[4:])
|
||||||
// return
|
// 每个相同时间戳共用一把互斥锁, 只允许同时一路流发送该时间戳内的rtp包, 保护ssrc被不同的流修改
|
||||||
// }
|
if _, ok := locks[ts]; !ok {
|
||||||
//
|
locks[ts] = &sync.RWMutex{}
|
||||||
// {
|
}
|
||||||
// // 此作用域内defer不会生效
|
|
||||||
// v.lock.Lock()
|
rtpPackets = append(rtpPackets, rtpData[offset-2:offset+rtpSize])
|
||||||
// delete(v.streams, id.Value())
|
}
|
||||||
// v.lock.Unlock()
|
}
|
||||||
// }
|
|
||||||
//
|
println("========================================")
|
||||||
// stream.Close(false)
|
println("源码地址: https://github.com/lkmio/gb-cms")
|
||||||
//}
|
println("视频来源于网络,如有侵权,请联系删除")
|
||||||
//
|
println("========================================\r\n")
|
||||||
//func (v Platform) Offline() {
|
|
||||||
// for _, stream := range v.streams {
|
time.Sleep(3 * time.Second)
|
||||||
// stream.Close(true)
|
|
||||||
// }
|
// 初始化UA配置, 防止SipServer使用时空指针
|
||||||
//
|
common.Config = &common.Config_{}
|
||||||
// v.streams = nil
|
|
||||||
//}
|
listenIP, listenPort, err := net.SplitHostPort(clientConfig.ListenAddr)
|
||||||
//
|
if err != nil {
|
||||||
//type ClientConfig struct {
|
panic(err)
|
||||||
// DeviceIDPrefix string `json:"device_id_prefix"`
|
}
|
||||||
// ChannelIDPrefix string `json:"channel_id_prefix"`
|
|
||||||
// ServerAddr string `json:"server_id"`
|
atoi, err := strconv.Atoi(listenPort)
|
||||||
// Domain string `json:"domain"`
|
if err != nil {
|
||||||
// Password string `json:"password"`
|
panic(err)
|
||||||
// ListenAddr string `json:"listenAddr"`
|
}
|
||||||
// Count int `json:"count"`
|
|
||||||
// RawFilePath string `json:"rtp_over_tcp_raw_file_path"` // rtp over tcp源文件
|
server, err := StartSipServer("", listenIP, listenIP, atoi)
|
||||||
//}
|
if err != nil {
|
||||||
//
|
panic(err)
|
||||||
//func TestGBClient(t *testing.T) {
|
}
|
||||||
// configData, err := os.ReadFile("./client_benchmark_test_config.json")
|
DeviceChannelsManager = &DeviceChannels{
|
||||||
// if err != nil {
|
channels: make(map[string][]*dao.ChannelModel, clientConfig.Count),
|
||||||
// panic(err)
|
}
|
||||||
// }
|
|
||||||
//
|
for i := 0; i < clientConfig.Count; i++ {
|
||||||
// clientConfig := &ClientConfig{}
|
deviceId := clientConfig.DeviceIDPrefix + fmt.Sprintf("%07d", i+1)
|
||||||
// if err = json.Unmarshal(configData, clientConfig); err != nil {
|
options := &common.SIPUAOptions{
|
||||||
// panic(err)
|
Username: deviceId,
|
||||||
// }
|
ServerID: clientConfig.ServerID,
|
||||||
//
|
ServerAddr: clientConfig.Domain,
|
||||||
// rtpData, err := os.ReadFile(clientConfig.RawFilePath)
|
Transport: "UDP",
|
||||||
// if err != nil {
|
Password: clientConfig.Password,
|
||||||
// println("读取rtp源文件失败 不能推流")
|
RegisterExpires: 500,
|
||||||
// } else {
|
KeepaliveInterval: 40,
|
||||||
// // 分割rtp包
|
}
|
||||||
// offset := 2
|
client := NewGBClient(options, server)
|
||||||
// length := len(rtpData)
|
|
||||||
// locks = make(map[uint32]*sync.RWMutex, 128)
|
device := vDevice{client.(*gbClient), map[string]*MediaStream{}, &sync.Mutex{}}
|
||||||
// for rtpSize := 0; offset < length; offset += rtpSize + 2 {
|
device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1")
|
||||||
// rtpSize = int(binary.BigEndian.Uint16(rtpData[offset-2:]))
|
|
||||||
// if length-offset < rtpSize {
|
DeviceManager.Add(deviceId, device)
|
||||||
// break
|
|
||||||
// }
|
for j := 0; j < 100; j++ {
|
||||||
//
|
channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1+j)
|
||||||
// bytes := rtpData[offset : offset+rtpSize]
|
channel := &dao.ChannelModel{
|
||||||
// ts := binary.BigEndian.Uint32(bytes[4:])
|
DeviceID: channelId,
|
||||||
// // 每个相同时间戳共用一把互斥锁, 只允许同时一路流发送该时间戳内的rtp包, 保护ssrc被不同的流修改
|
Name: "1",
|
||||||
// if _, ok := locks[ts]; !ok {
|
ParentID: deviceId,
|
||||||
// locks[ts] = &sync.RWMutex{}
|
}
|
||||||
// }
|
|
||||||
//
|
DeviceChannelsManager.AddChannel(deviceId, channel)
|
||||||
// rtpPackets = append(rtpPackets, rtpData[offset-2:offset+rtpSize])
|
}
|
||||||
// }
|
|
||||||
// }
|
device.Start()
|
||||||
//
|
|
||||||
// println("========================================")
|
device.SetOnRegisterHandler(func() {
|
||||||
// println("源码地址: https://github.com/lkmio/gb-cms")
|
fmt.Printf(deviceId + " 注册成功\r\n")
|
||||||
// println("视频来源于网络,如有侵权,请联系删除")
|
}, func() {
|
||||||
// println("========================================\r\n")
|
fmt.Printf(deviceId + " 离线\r\n")
|
||||||
//
|
device.Offline()
|
||||||
// time.Sleep(3 * time.Second)
|
})
|
||||||
//
|
}
|
||||||
// // 初始化UA配置, 防止SipServer使用时空指针
|
|
||||||
// Config = &Config_{}
|
for {
|
||||||
//
|
time.Sleep(time.Second * 3)
|
||||||
// listenIP, listenPort, err := net.SplitHostPort(clientConfig.ListenAddr)
|
}
|
||||||
// if err != nil {
|
}
|
||||||
// panic(err)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// atoi, err := strconv.Atoi(listenPort)
|
|
||||||
// if err != nil {
|
|
||||||
// panic(err)
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// server, err := StartSipServer("", listenIP, listenIP, atoi)
|
|
||||||
// if err != nil {
|
|
||||||
// panic(err)
|
|
||||||
// }
|
|
||||||
// DeviceChannelsManager = &DeviceChannels{
|
|
||||||
// channels: make(map[string][]*Channel, clientConfig.Count),
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// for i := 0; i < clientConfig.Count; i++ {
|
|
||||||
// deviceId := clientConfig.DeviceIDPrefix + fmt.Sprintf("%07d", i+1)
|
|
||||||
// channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1)
|
|
||||||
// client := NewGBClient(deviceId, clientConfig.ServerAddr, clientConfig.Domain, "UDP", clientConfig.Password, 500, 40, server)
|
|
||||||
//
|
|
||||||
// device := Platform{client.(*gbClient), map[string]*MediaStream{}, &sync.Mutex{}}
|
|
||||||
// device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1")
|
|
||||||
//
|
|
||||||
// channel := &Channel{
|
|
||||||
// DeviceID: channelId,
|
|
||||||
// Name: "1",
|
|
||||||
// ParentID: deviceId,
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// DeviceManager.Add(device)
|
|
||||||
// DeviceChannelsManager.AddChannel(deviceId, channel)
|
|
||||||
//
|
|
||||||
// device.Start()
|
|
||||||
//
|
|
||||||
// device.SetOnRegisterHandler(func() {
|
|
||||||
// fmt.Printf(deviceId + " 注册成功\r\n")
|
|
||||||
// }, func() {
|
|
||||||
// fmt.Printf(deviceId + " 离线\r\n")
|
|
||||||
// device.Offline()
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// for {
|
|
||||||
// time.Sleep(time.Second * 3)
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
@@ -3,10 +3,10 @@
|
|||||||
"channel_id_prefix": "3402000000131",
|
"channel_id_prefix": "3402000000131",
|
||||||
"server_id": "34020000002000000001",
|
"server_id": "34020000002000000001",
|
||||||
"?domain": "国标上级域的地址",
|
"?domain": "国标上级域的地址",
|
||||||
"domain": "192.168.2.148:5060",
|
"domain": "160.202.253.143:15060",
|
||||||
"password": "12345678",
|
"password": "12345678",
|
||||||
"listenAddr": "192.168.2.148:15062",
|
"listenAddr": "192.168.2.119:15062",
|
||||||
"count": 100,
|
"count": 1,
|
||||||
"?rtp_over_tcp_raw_file_path": "rtp over tcp的推流源文件",
|
"?rtp_over_tcp_raw_file_path": "rtp over tcp的推流源文件",
|
||||||
"rtp_over_tcp_raw_file_path": "./rtp.raw"
|
"rtp_over_tcp_raw_file_path": "./rtp.raw"
|
||||||
}
|
}
|
@@ -17,6 +17,12 @@ var (
|
|||||||
clients: make(map[string]GBClient, 8), // username->client
|
clients: make(map[string]GBClient, 8), // username->client
|
||||||
addrMap: make(map[string]int, 8),
|
addrMap: make(map[string]int, 8),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeviceManager 模拟国标设备
|
||||||
|
DeviceManager = &ClientManager{
|
||||||
|
clients: make(map[string]GBClient, 8), // username->client
|
||||||
|
addrMap: make(map[string]int, 8),
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientManager struct {
|
type ClientManager struct {
|
||||||
|
198
stack/device.go
198
stack/device.go
@@ -1,12 +1,18 @@
|
|||||||
package stack
|
package stack
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"gb-cms/common"
|
"gb-cms/common"
|
||||||
"gb-cms/dao"
|
"gb-cms/dao"
|
||||||
|
"gb-cms/log"
|
||||||
"github.com/ghettovoice/gosip/sip"
|
"github.com/ghettovoice/gosip/sip"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -14,7 +20,7 @@ const (
|
|||||||
"<Query>\r\n" +
|
"<Query>\r\n" +
|
||||||
"<CmdType>Catalog</CmdType>\r\n" +
|
"<CmdType>Catalog</CmdType>\r\n" +
|
||||||
"<SN>" +
|
"<SN>" +
|
||||||
"%s" +
|
"%d" +
|
||||||
"</SN>\r\n" +
|
"</SN>\r\n" +
|
||||||
"<DeviceID>" +
|
"<DeviceID>" +
|
||||||
"%s" +
|
"%s" +
|
||||||
@@ -25,7 +31,7 @@ const (
|
|||||||
"<Query>\r\n" +
|
"<Query>\r\n" +
|
||||||
"<CmdType>DeviceInfo</CmdType>\r\n" +
|
"<CmdType>DeviceInfo</CmdType>\r\n" +
|
||||||
"<SN>" +
|
"<SN>" +
|
||||||
"%s" +
|
"%d" +
|
||||||
"</SN>\r\n" +
|
"</SN>\r\n" +
|
||||||
"<DeviceID>" +
|
"<DeviceID>" +
|
||||||
"%s" +
|
"%s" +
|
||||||
@@ -46,7 +52,7 @@ type GBDevice interface {
|
|||||||
QueryDeviceInfo()
|
QueryDeviceInfo()
|
||||||
|
|
||||||
// QueryCatalog 发送查询目录命令
|
// QueryCatalog 发送查询目录命令
|
||||||
QueryCatalog()
|
QueryCatalog(timeout int) ([]*dao.ChannelModel, error)
|
||||||
|
|
||||||
// QueryRecord 发送查询录像命令
|
// QueryRecord 发送查询录像命令
|
||||||
QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error
|
QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error
|
||||||
@@ -80,6 +86,11 @@ type GBDevice interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CatalogProgress struct {
|
||||||
|
TotalSize int
|
||||||
|
RecvSize int
|
||||||
|
}
|
||||||
|
|
||||||
type Device struct {
|
type Device struct {
|
||||||
*dao.DeviceModel
|
*dao.DeviceModel
|
||||||
}
|
}
|
||||||
@@ -94,15 +105,180 @@ func (d *Device) BuildMessageRequest(to, body string) sip.Request {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Device) QueryDeviceInfo() {
|
func (d *Device) QueryDeviceInfo() {
|
||||||
body := fmt.Sprintf(DeviceInfoFormat, "1", d.DeviceID)
|
body := fmt.Sprintf(DeviceInfoFormat, GetSN(), d.DeviceID)
|
||||||
request := d.BuildMessageRequest(d.DeviceID, body)
|
request := d.BuildMessageRequest(d.DeviceID, body)
|
||||||
common.SipStack.SendRequest(request)
|
common.SipStack.SendRequest(request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Device) QueryCatalog() {
|
func (d *Device) QueryCatalog(timeoutSeconds int) ([]*dao.ChannelModel, error) {
|
||||||
body := fmt.Sprintf(CatalogFormat, "1", d.DeviceID)
|
catalogProgress := &CatalogProgress{}
|
||||||
request := d.BuildMessageRequest(d.DeviceID, body)
|
|
||||||
common.SipStack.SendRequest(request)
|
var timeoutCtx context.Context
|
||||||
|
var timeoutCancelFunc context.CancelFunc
|
||||||
|
if timeoutSeconds > 0 {
|
||||||
|
timeoutCtx, timeoutCancelFunc = context.WithTimeout(context.Background(), time.Duration(timeoutSeconds)*time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var result []*dao.ChannelModel
|
||||||
|
query := func() {
|
||||||
|
defer func() {
|
||||||
|
if timeoutCancelFunc != nil {
|
||||||
|
timeoutCancelFunc()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 下发查询指令
|
||||||
|
finish := make(chan byte, 1)
|
||||||
|
sn := GetSN()
|
||||||
|
body := fmt.Sprintf(CatalogFormat, sn, d.DeviceID)
|
||||||
|
request := d.BuildMessageRequest(d.DeviceID, body)
|
||||||
|
tx := common.SipStack.SendRequest(request)
|
||||||
|
// 异步等待响应
|
||||||
|
go func() {
|
||||||
|
response := <-tx.Responses()
|
||||||
|
if response != nil && response.StatusCode() != http.StatusOK {
|
||||||
|
err = fmt.Errorf("query catalog res[%d] %s", response.StatusCode(), StatusCode2Reason(int(response.StatusCode())))
|
||||||
|
finish <- 1
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// 处理目录消息
|
||||||
|
lastTime := time.Now()
|
||||||
|
var list []*CatalogResponse
|
||||||
|
SNManager.AddEvent(sn, func(response interface{}) {
|
||||||
|
lastTime = time.Now()
|
||||||
|
catalog := response.(*CatalogResponse)
|
||||||
|
catalogProgress.TotalSize = catalog.SumNum
|
||||||
|
catalogProgress.RecvSize += catalog.DeviceList.Num
|
||||||
|
|
||||||
|
list = append(list, catalog)
|
||||||
|
if catalogProgress.RecvSize >= catalogProgress.TotalSize {
|
||||||
|
finish <- 1
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// 定时检测是否超时或完成
|
||||||
|
timeout := 10 * time.Second
|
||||||
|
ticker := time.NewTicker(timeout)
|
||||||
|
for {
|
||||||
|
var end bool
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
if time.Since(lastTime) > timeout {
|
||||||
|
// 超时, 则直接返回
|
||||||
|
err = fmt.Errorf("query catalog timeout[%ds]", int(timeout.Seconds()))
|
||||||
|
ticker.Stop()
|
||||||
|
end = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case <-finish:
|
||||||
|
ticker.Stop()
|
||||||
|
end = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if end {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 如果查询不完整, 并且数据库中通道列表不为空, 则丢弃本次查询的数据, 否则依旧入库
|
||||||
|
var oldChannelCount int
|
||||||
|
oldChannelCount, err = dao.Channel.QueryChanelCount(d.DeviceID)
|
||||||
|
if err != nil {
|
||||||
|
log.Sugar.Errorf("query channel count failed, device: %s, err: %s", d.DeviceID, err.Error())
|
||||||
|
return
|
||||||
|
} else if len(list) < 1 || (oldChannelCount > 0 && catalogProgress.RecvSize < catalogProgress.TotalSize) {
|
||||||
|
log.Sugar.Errorf("query catalog failed, device: %s, count: %d, recvSize: %d, totalSize: %d", d.DeviceID, oldChannelCount, catalogProgress.RecvSize, catalogProgress.TotalSize)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 删除旧的通道列表
|
||||||
|
if oldChannelCount > 0 {
|
||||||
|
err = dao.Channel.DeleteChannels(d.DeviceID)
|
||||||
|
if err != nil {
|
||||||
|
log.Sugar.Errorf("delete channels failed, device: %s, err: %s", d.DeviceID, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量保存通道
|
||||||
|
result, err = d.SaveChannels(list)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !UniqueTaskManager.Commit(GenerateCatalogTaskID(d.DeviceID), query, catalogProgress) {
|
||||||
|
return nil, errors.New("device busy")
|
||||||
|
}
|
||||||
|
|
||||||
|
// web接口的查询超时
|
||||||
|
if timeoutCtx != nil {
|
||||||
|
select {
|
||||||
|
case <-timeoutCtx.Done():
|
||||||
|
if err == nil && catalogProgress.RecvSize < catalogProgress.TotalSize {
|
||||||
|
err = fmt.Errorf("wait for catalog[%d/%d] timeout[%ds]", catalogProgress.RecvSize, catalogProgress.TotalSize, timeoutSeconds)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Device) SaveChannels(list []*CatalogResponse) ([]*dao.ChannelModel, error) {
|
||||||
|
var channels []*dao.ChannelModel
|
||||||
|
for _, response := range list {
|
||||||
|
for _, channel := range response.DeviceList.Devices {
|
||||||
|
// 状态转为大写
|
||||||
|
channel.Status = common.OnlineStatus(strings.ToUpper(channel.Status.String()))
|
||||||
|
|
||||||
|
// 默认在线
|
||||||
|
if common.OFF != channel.Status {
|
||||||
|
channel.Status = common.ON
|
||||||
|
}
|
||||||
|
|
||||||
|
// 下级设备的系统ID, 更新DeviceInfo
|
||||||
|
if channel.DeviceID == d.DeviceID && dao.Device.ExistDevice(d.DeviceID) {
|
||||||
|
_ = dao.Device.UpdateDeviceInfo(d.DeviceID, &dao.DeviceModel{
|
||||||
|
Manufacturer: channel.Manufacturer,
|
||||||
|
Model: channel.Model,
|
||||||
|
Name: channel.Name,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
typeCode := GetTypeCode(channel.DeviceID)
|
||||||
|
if typeCode == "" {
|
||||||
|
log.Sugar.Errorf("保存通道时, 获取设备类型失败 device: %s", channel.DeviceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
var groupId string
|
||||||
|
if channel.ParentID != "" {
|
||||||
|
layers := strings.Split(channel.ParentID, "/")
|
||||||
|
groupId = layers[len(layers)-1]
|
||||||
|
} else if channel.BusinessGroupID != "" {
|
||||||
|
groupId = channel.BusinessGroupID
|
||||||
|
}
|
||||||
|
|
||||||
|
code, _ := strconv.Atoi(typeCode)
|
||||||
|
channel.RootID = d.DeviceID
|
||||||
|
channel.TypeCode = code
|
||||||
|
channel.GroupID = groupId
|
||||||
|
channels = append(channels, channel)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := dao.Channel.SaveChannels(channels)
|
||||||
|
if err != nil {
|
||||||
|
log.Sugar.Errorf("save channels failed, device: %s, err: %s", d.DeviceID, err.Error())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return channels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Device) QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error {
|
func (d *Device) QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error {
|
||||||
@@ -123,7 +299,7 @@ func (d *Device) SubscribePosition(channelId string) error {
|
|||||||
|
|
||||||
//暂时不考虑级联
|
//暂时不考虑级联
|
||||||
builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, channelId)
|
builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, channelId)
|
||||||
body := fmt.Sprintf(MobilePositionMessageFormat, 1, channelId, common.Config.MobilePositionInterval)
|
body := fmt.Sprintf(MobilePositionMessageFormat, GetSN(), channelId, common.Config.MobilePositionInterval)
|
||||||
|
|
||||||
expiresHeader := sip.Expires(common.Config.MobilePositionExpires)
|
expiresHeader := sip.Expires(common.Config.MobilePositionExpires)
|
||||||
builder.SetExpires(&expiresHeader)
|
builder.SetExpires(&expiresHeader)
|
||||||
@@ -151,7 +327,7 @@ func (d *Device) SubscribePosition(channelId string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction {
|
func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction {
|
||||||
body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId)
|
body := fmt.Sprintf(BroadcastFormat, GetSN(), sourceId, channelId)
|
||||||
request := d.BuildMessageRequest(channelId, body)
|
request := d.BuildMessageRequest(channelId, body)
|
||||||
return common.SipStack.SendRequest(request)
|
return common.SipStack.SendRequest(request)
|
||||||
}
|
}
|
||||||
@@ -161,7 +337,7 @@ func (d *Device) UpdateChannel(id string, event string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Device) BuildCatalogRequest() (sip.Request, error) {
|
func (d *Device) BuildCatalogRequest() (sip.Request, error) {
|
||||||
body := fmt.Sprintf(CatalogFormat, "1", d.DeviceID)
|
body := fmt.Sprintf(CatalogFormat, GetSN(), d.DeviceID)
|
||||||
request := d.BuildMessageRequest(d.DeviceID, body)
|
request := d.BuildMessageRequest(d.DeviceID, body)
|
||||||
return request, nil
|
return request, nil
|
||||||
}
|
}
|
||||||
|
@@ -79,7 +79,7 @@ func SendWithUrlParams(path string, body interface{}, values url.Values) (*http.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
url := fmt.Sprintf("http://%s/%s", common.Config.MediaServer, path)
|
url := fmt.Sprintf("%s/%s", common.Config.MediaServer, path)
|
||||||
|
|
||||||
data, err := json.Marshal(body)
|
data, err := json.Marshal(body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -41,6 +41,16 @@ func (m *onlineDeviceManager) Count() int {
|
|||||||
return len(m.devices)
|
return len(m.devices)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *onlineDeviceManager) GetDeviceIds() []string {
|
||||||
|
m.lock.RLock()
|
||||||
|
defer m.lock.RUnlock()
|
||||||
|
ids := make([]string, 0, len(m.devices))
|
||||||
|
for id := range m.devices {
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
return ids
|
||||||
|
}
|
||||||
|
|
||||||
func (m *onlineDeviceManager) Start(interval time.Duration, keepalive time.Duration, OnExpires func(platformId int, deviceId string)) {
|
func (m *onlineDeviceManager) Start(interval time.Duration, keepalive time.Duration, OnExpires func(platformId int, deviceId string)) {
|
||||||
// 精度有偏差
|
// 精度有偏差
|
||||||
var timer *time.Timer
|
var timer *time.Timer
|
||||||
|
@@ -5,8 +5,6 @@ import (
|
|||||||
"gb-cms/dao"
|
"gb-cms/dao"
|
||||||
"gb-cms/log"
|
"gb-cms/log"
|
||||||
"github.com/lkmio/avformat/utils"
|
"github.com/lkmio/avformat/utils"
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -66,44 +64,10 @@ func (e *EventHandler) OnKeepAlive(id string, addr string) bool {
|
|||||||
|
|
||||||
func (e *EventHandler) OnCatalog(device string, response *CatalogResponse) {
|
func (e *EventHandler) OnCatalog(device string, response *CatalogResponse) {
|
||||||
utils.Assert(device == response.DeviceID)
|
utils.Assert(device == response.DeviceID)
|
||||||
for _, channel := range response.DeviceList.Devices {
|
if event := SNManager.FindEvent(response.SN); event != nil {
|
||||||
// 状态转为大写
|
event(response)
|
||||||
channel.Status = common.OnlineStatus(strings.ToUpper(channel.Status.String()))
|
} else {
|
||||||
|
log.Sugar.Errorf("处理目录响应失败 SN: %d", response.SN)
|
||||||
// 默认在线
|
|
||||||
if common.OFF != channel.Status {
|
|
||||||
channel.Status = common.ON
|
|
||||||
}
|
|
||||||
|
|
||||||
// 下级设备的系统ID, 更新DeviceInfo
|
|
||||||
if channel.DeviceID == device && dao.Device.ExistDevice(device) {
|
|
||||||
_ = dao.Device.UpdateDeviceInfo(device, &dao.DeviceModel{
|
|
||||||
Manufacturer: channel.Manufacturer,
|
|
||||||
Model: channel.Model,
|
|
||||||
Name: channel.Name,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
typeCode := GetTypeCode(channel.DeviceID)
|
|
||||||
if typeCode == "" {
|
|
||||||
log.Sugar.Errorf("保存通道时, 获取设备类型失败 device: %s", channel.DeviceID)
|
|
||||||
}
|
|
||||||
|
|
||||||
var groupId string
|
|
||||||
if channel.ParentID != "" {
|
|
||||||
layers := strings.Split(channel.ParentID, "/")
|
|
||||||
groupId = layers[len(layers)-1]
|
|
||||||
} else if channel.BusinessGroupID != "" {
|
|
||||||
groupId = channel.BusinessGroupID
|
|
||||||
}
|
|
||||||
|
|
||||||
code, _ := strconv.Atoi(typeCode)
|
|
||||||
channel.RootID = device
|
|
||||||
channel.TypeCode = code
|
|
||||||
channel.GroupID = groupId
|
|
||||||
if err := dao.Channel.SaveChannel(channel); err != nil {
|
|
||||||
log.Sugar.Infof("保存通道到数据库失败 err: %s", err.Error())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -97,7 +97,7 @@ func (s *sipServer) OnRegister(wrapper *SipRequestSource) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if queryCatalog {
|
if queryCatalog {
|
||||||
device.QueryCatalog()
|
_, _ = device.QueryCatalog(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -250,6 +250,8 @@ func (s *sipServer) OnMessage(wrapper *SipRequestSource) {
|
|||||||
device = PlatformManager.Find(wrapper.req.Source())
|
device = PlatformManager.Find(wrapper.req.Source())
|
||||||
} else if wrapper.fromJt {
|
} else if wrapper.fromJt {
|
||||||
device = JTDeviceManager.Find(deviceId)
|
device = JTDeviceManager.Find(deviceId)
|
||||||
|
} else {
|
||||||
|
device = DeviceManager.Find(deviceId)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok = device != nil; !ok {
|
if ok = device != nil; !ok {
|
||||||
|
51
stack/unique_task.go
Normal file
51
stack/unique_task.go
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
package stack
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
UniqueTaskManager = &taskManager{
|
||||||
|
tasks: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func GenerateCatalogTaskID(deviceID string) string {
|
||||||
|
return fmt.Sprintf("%s_catalog", deviceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
type taskManager struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
tasks map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *taskManager) Commit(id string, task func(), data interface{}) bool {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
if _, ok := t.tasks[id]; ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
t.tasks[id] = data
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
task()
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
delete(t.tasks, id)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *taskManager) Find(id string) interface{} {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
if data, ok := t.tasks[id]; ok {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@@ -30,7 +30,7 @@ func TestDecodeXML(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
handler := EventHandler{}
|
//handler := EventHandler{}
|
||||||
for i := 0; i < len(file); {
|
for i := 0; i < len(file); {
|
||||||
size := binary.BigEndian.Uint32(file[i:])
|
size := binary.BigEndian.Uint32(file[i:])
|
||||||
i += 4
|
i += 4
|
||||||
@@ -42,7 +42,8 @@ func TestDecodeXML(t *testing.T) {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
handler.OnCatalog(catalogResponse.DeviceID, catalogResponse)
|
println(string(body))
|
||||||
|
//handler.OnCatalog(catalogResponse.DeviceID, catalogResponse)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
43
stats.go
43
stats.go
@@ -5,7 +5,9 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"gb-cms/common"
|
"gb-cms/common"
|
||||||
|
"gb-cms/dao"
|
||||||
"gb-cms/log"
|
"gb-cms/log"
|
||||||
|
"gb-cms/stack"
|
||||||
"github.com/shirou/gopsutil/v3/cpu"
|
"github.com/shirou/gopsutil/v3/cpu"
|
||||||
"github.com/shirou/gopsutil/v3/disk"
|
"github.com/shirou/gopsutil/v3/disk"
|
||||||
"github.com/shirou/gopsutil/v3/mem"
|
"github.com/shirou/gopsutil/v3/mem"
|
||||||
@@ -22,6 +24,9 @@ var (
|
|||||||
diskStatsJson string
|
diskStatsJson string
|
||||||
lastNetStatsJson string
|
lastNetStatsJson string
|
||||||
lastNetStats []net.IOCountersStat
|
lastNetStats []net.IOCountersStat
|
||||||
|
|
||||||
|
ChannelTotalCount int
|
||||||
|
ChannelOnlineCount int
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -379,26 +384,40 @@ func StartStats() {
|
|||||||
topStatsJson = string(marshal)
|
topStatsJson = string(marshal)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 统计磁盘信息
|
if count%5 == 0 {
|
||||||
count++
|
// 统计磁盘信息
|
||||||
if count >= 5 {
|
|
||||||
count = 0
|
|
||||||
usage, err := stateDiskUsage()
|
usage, err := stateDiskUsage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Sugar.Errorf("获取磁盘信息失败: %v", err)
|
log.Sugar.Errorf("获取磁盘信息失败: %v", err)
|
||||||
continue
|
} else {
|
||||||
|
bytes, err := json.Marshal(common.MalformedRequest{
|
||||||
|
Code: http.StatusOK,
|
||||||
|
Msg: "Success",
|
||||||
|
Data: usage,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Sugar.Errorf("序列化磁盘信息失败: %v", err)
|
||||||
|
} else {
|
||||||
|
diskStatsJson = string(bytes)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bytes, err := json.Marshal(common.MalformedRequest{
|
// 统计通道总数和在线数
|
||||||
Code: http.StatusOK,
|
i, err := dao.Channel.TotalCount()
|
||||||
Msg: "Success",
|
|
||||||
Data: usage,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Sugar.Errorf("序列化磁盘信息失败: %v", err)
|
log.Sugar.Errorf("获取通道总数失败: %v", err)
|
||||||
} else {
|
} else {
|
||||||
diskStatsJson = string(bytes)
|
ChannelTotalCount = i
|
||||||
|
}
|
||||||
|
|
||||||
|
onlineCount, err := dao.Channel.OnlineCount(stack.OnlineDeviceManager.GetDeviceIds())
|
||||||
|
if err != nil {
|
||||||
|
log.Sugar.Errorf("获取在线通道数失败: %v", err)
|
||||||
|
} else {
|
||||||
|
ChannelOnlineCount = onlineCount
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user