From beb50d0d73b10e139b06fea684a23427e61bac42 Mon Sep 17 00:00:00 2001 From: ydajiang Date: Thu, 28 Aug 2025 10:12:09 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=80=82=E9=85=8Dlivegbs=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 180 ++++-- api_livegbs.go | 6 +- common/http_request.go | 16 +- config.json | 2 +- dao/channel.go | 36 ++ livegbs_bean.go | 1 + stack/client_benchmark_test.go | 719 ++++++++++++------------ stack/client_benchmark_test_config.json | 6 +- stack/client_manager.go | 6 + stack/device.go | 198 ++++++- stack/media_server.go | 2 +- stack/online_devices.go | 10 + stack/sip_handler.go | 44 +- stack/sip_server.go | 4 +- stack/unique_task.go | 51 ++ stack/xml_test.go | 5 +- stats.go | 43 +- 17 files changed, 842 insertions(+), 487 deletions(-) create mode 100644 stack/unique_task.go diff --git a/api.go b/api.go index 7a42883..ef76776 100644 --- a/api.go +++ b/api.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/lkmio/avformat/utils" + "io" "math" "net/http" "os" @@ -101,6 +102,12 @@ type PageQueryChannel struct { GroupID string `json:"group_id"` } +type SetMediaTransportReq struct { + DeviceID string `json:"serial"` + MediaTransport string `json:"media_transport"` + MediaTransportMode string `json:"media_transport_mode"` +} + var apiServer *ApiServer func init() { @@ -161,12 +168,15 @@ func startApiServer(addr string) { // 关闭国标流. 如果是实时流, 等收流或空闲超时自行删除. 回放或下载流立即删除. apiServer.router.HandleFunc("/api/v1/stream/close", common.WithJsonParams(apiServer.OnCloseStream, &StreamIDParams{})) - apiServer.router.HandleFunc("/api/v1/device/list", withVerify(common.WithQueryStringParams(apiServer.OnDeviceList, QueryDeviceChannel{}))) // 查询设备列表 - apiServer.router.HandleFunc("/api/v1/device/channellist", withVerify(common.WithQueryStringParams(apiServer.OnChannelList, QueryDeviceChannel{}))) // 查询通道列表 - apiServer.router.HandleFunc("/api/v1/playback/recordlist", withVerify(common.WithQueryStringParams(apiServer.OnRecordList, QueryRecordParams{}))) // 查询录像列表 - apiServer.router.HandleFunc("/api/v1/position/sub", common.WithJsonResponse(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置 - apiServer.router.HandleFunc("/api/v1/playback/seek", common.WithJsonResponse(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek - apiServer.router.HandleFunc("/api/v1/control/ptz", apiServer.OnPTZControl) // 云台控制 + apiServer.router.HandleFunc("/api/v1/device/list", withVerify(common.WithQueryStringParams(apiServer.OnDeviceList, QueryDeviceChannel{}))) // 查询设备列表 + apiServer.router.HandleFunc("/api/v1/device/channellist", withVerify(common.WithQueryStringParams(apiServer.OnChannelList, QueryDeviceChannel{}))) // 查询通道列表 + apiServer.router.HandleFunc("/api/v1/device/fetchcatalog", withVerify(common.WithQueryStringParams(apiServer.OnCatalogQuery, QueryDeviceChannel{}))) // 更新通道 + apiServer.router.HandleFunc("/api/v1/playback/recordlist", withVerify(common.WithQueryStringParams(apiServer.OnRecordList, QueryRecordParams{}))) // 查询录像列表 + apiServer.router.HandleFunc("/api/v1/stream/info", withVerify(apiServer.OnStreamInfo)) + + apiServer.router.HandleFunc("/api/v1/position/sub", common.WithJsonResponse(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置 + apiServer.router.HandleFunc("/api/v1/playback/seek", common.WithJsonResponse(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek + apiServer.router.HandleFunc("/api/v1/control/ptz", apiServer.OnPTZControl) // 云台控制 apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 级联设备列表 apiServer.router.HandleFunc("/api/v1/platform/add", common.WithJsonResponse(apiServer.OnPlatformAdd, &dao.PlatformModel{})) // 添加级联设备 @@ -186,7 +196,7 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/jt/channel/add", common.WithJsonResponse(apiServer.OnVirtualChannelAdd, &dao.ChannelModel{})) apiServer.router.HandleFunc("/api/v1/jt/channel/edit", common.WithJsonResponse(apiServer.OnVirtualChannelEdit, &dao.ChannelModel{})) apiServer.router.HandleFunc("/api/v1/jt/channel/remove", common.WithJsonResponse(apiServer.OnVirtualChannelRemove, &dao.ChannelModel{})) - apiServer.router.HandleFunc("/api/v1/device/setmediatransport", withVerify(common.WithJsonResponse2(apiServer.OnDeviceMediaTransportSet))) + apiServer.router.HandleFunc("/api/v1/device/setmediatransport", withVerify(common.WithFormDataParams(apiServer.OnDeviceMediaTransportSet, SetMediaTransportReq{}))) registerLiveGBSApi() @@ -321,7 +331,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt w.WriteHeader(code) } -func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnPlayDone(params *PlayDoneParams, _ http.ResponseWriter, _ *http.Request) { log.Sugar.Debugf("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream) sink, _ := dao.Sink.DeleteForwardSink(params.Stream, params.Sink) @@ -340,7 +350,7 @@ func (api *ApiServer) OnPlayDone(params *PlayDoneParams, w http.ResponseWriter, } } -func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, _ *http.Request) { log.Sugar.Debugf("推流事件. protocol: %s stream: %s", params.Protocol, params.Stream) if stack.SourceTypeRtmp == params.Protocol { @@ -375,7 +385,7 @@ func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r * } } -func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnPublishDone(params *StreamParams, _ http.ResponseWriter, _ *http.Request) { log.Sugar.Debugf("推流结束事件. protocol: %s stream: %s", params.Protocol, params.Stream) stack.CloseStream(params.Stream, false) @@ -385,7 +395,7 @@ func (api *ApiServer) OnPublishDone(params *StreamParams, w http.ResponseWriter, } } -func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, req *http.Request) { +func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, _ *http.Request) { log.Sugar.Debugf("推流空闲超时事件. protocol: %s stream: %s", params.Protocol, params.Stream) // 非rtmp空闲超时, 返回非200应答, 删除会话 @@ -395,7 +405,7 @@ func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, } } -func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWriter, req *http.Request) { +func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWriter, _ *http.Request) { log.Sugar.Debugf("收流超时事件. protocol: %s stream: %s", params.Protocol, params.Stream) // 非rtmp推流超时, 返回非200应答, 删除会话 @@ -405,11 +415,11 @@ func (api *ApiServer) OnReceiveTimeout(params *StreamParams, w http.ResponseWrit } } -func (api *ApiServer) OnRecord(params *RecordParams, w http.ResponseWriter, req *http.Request) { +func (api *ApiServer) OnRecord(params *RecordParams, _ http.ResponseWriter, _ *http.Request) { log.Sugar.Infof("录制事件. protocol: %s stream: %s path:%s ", params.Protocol, params.Stream, params.Path) } -func (api *ApiServer) OnInvite(v *InviteParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnInvite(v *InviteParams, _ http.ResponseWriter, r *http.Request) (interface{}, error) { vars := mux.Vars(r) action := strings.ToLower(vars["action"]) @@ -553,7 +563,7 @@ func (api *ApiServer) DoInvite(inviteType common.InviteType, params *InviteParam return http.StatusOK, stream, nil } -func (api *ApiServer) OnCloseStream(v *StreamIDParams, w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnCloseStream(v *StreamIDParams, w http.ResponseWriter, _ *http.Request) { //stream := StreamManager.Find(v.StreamID) // //// 等空闲或收流超时会自动关闭 @@ -561,7 +571,7 @@ func (api *ApiServer) OnCloseStream(v *StreamIDParams, w http.ResponseWriter, r // CloseStream(v.StreamID, true) //} - common.HttpResponseOK(w, nil) + _ = common.HttpResponseOK(w, nil) } // QueryDeviceChannel 查询设备和通道的参数 @@ -581,7 +591,7 @@ type QueryDeviceChannel struct { //channelType string // device/dir, 查询通道列表使用 } -func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) { values := r.URL.Query() log.Sugar.Debugf("查询设备列表 %s", values.Encode()) @@ -612,10 +622,22 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter, remoteIP := split[0] remotePort, _ := strconv.Atoi(split[1]) + // 更新正在查询通道的进度 + var catalogProgress string + data := stack.UniqueTaskManager.Find(stack.GenerateCatalogTaskID(device.GetID())) + if data != nil { + catalogSize := data.(*stack.CatalogProgress) + + if catalogSize.TotalSize > 0 { + catalogProgress = fmt.Sprintf("%d/%d", catalogSize.RecvSize, catalogSize.TotalSize) + } + } + response.DeviceList_ = append(response.DeviceList_, LiveGBSDevice{ - AlarmSubscribe: false, - CatalogInterval: 3600, - CatalogSubscribe: false, + AlarmSubscribe: false, // 报警订阅 + CatalogInterval: 3600, // 目录刷新时间 + CatalogProgress: catalogProgress, + CatalogSubscribe: false, // 目录订阅 ChannelCount: device.ChannelsTotal, ChannelOverLoad: false, Charset: "GB2312", @@ -637,9 +659,9 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter, MediaTransportMode: device.Setup.String(), Name: device.Name, Online: device.Online(), - PTZSubscribe: false, + PTZSubscribe: false, // PTZ订阅2022 Password: "", - PositionSubscribe: false, + PositionSubscribe: false, // 位置订阅 RecordCenter: false, RecordIndistinct: false, RecvStreamIP: "", @@ -658,7 +680,7 @@ func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter, return &response, nil } -func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) { values := r.URL.Query() log.Sugar.Debugf("查询通道列表 %s", values.Encode()) @@ -765,7 +787,7 @@ func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, w http.ResponseWriter return response, nil } -func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnRecordList(v *QueryRecordParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("查询录像列表 %v", *v) model, _ := dao.Device.QueryDevice(v.DeviceID) @@ -848,7 +870,7 @@ func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter, return &response, nil } -func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("订阅位置 %v", *v) model, _ := dao.Device.QueryDevice(v.DeviceID) @@ -866,7 +888,7 @@ func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, w http.ResponseWri return nil, nil } -func (api *ApiServer) OnSeekPlayback(v *SeekParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnSeekPlayback(v *SeekParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("快进回放 %v", *v) model, _ := dao.Stream.QueryStream(v.StreamId) @@ -887,11 +909,11 @@ func (api *ApiServer) OnSeekPlayback(v *SeekParams, w http.ResponseWriter, r *ht return nil, nil } -func (api *ApiServer) OnPTZControl(w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnPTZControl(_ http.ResponseWriter, _ *http.Request) { } -func (api *ApiServer) OnHangup(v *BroadcastParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnHangup(v *BroadcastParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("广播挂断 %v", *v) id := common.GenerateStreamID(common.InviteTypeBroadcast, v.DeviceID, v.ChannelID, "", "") @@ -902,7 +924,7 @@ func (api *ApiServer) OnHangup(v *BroadcastParams, w http.ResponseWriter, r *htt return nil, nil } -func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnBroadcast(v *BroadcastParams, _ http.ResponseWriter, r *http.Request) (interface{}, error) { log.Sugar.Debugf("广播邀请 %v", *v) var sinkStreamId common.StreamID @@ -1003,11 +1025,11 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, w http.ResponseWriter, r * return nil, nil } -func (api *ApiServer) OnTalk(w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnTalk(_ http.ResponseWriter, _ *http.Request) { } -func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) { +func (api *ApiServer) OnStarted(_ http.ResponseWriter, _ *http.Request) { log.Sugar.Infof("lkm启动") streams, _ := dao.Stream.DeleteStreams() @@ -1021,7 +1043,7 @@ func (api *ApiServer) OnStarted(w http.ResponseWriter, req *http.Request) { } } -func (api *ApiServer) OnPlatformAdd(v *dao.PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnPlatformAdd(v *dao.PlatformModel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("添加级联设备 %v", *v) if v.Username == "" { @@ -1059,7 +1081,7 @@ func (api *ApiServer) OnPlatformAdd(v *dao.PlatformModel, w http.ResponseWriter, return nil, err } -func (api *ApiServer) OnPlatformRemove(v *dao.PlatformModel, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnPlatformRemove(v *dao.PlatformModel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("删除级联设备 %v", *v) err := dao.Platform.DeleteUAByAddr(v.ServerAddr) @@ -1072,12 +1094,12 @@ func (api *ApiServer) OnPlatformRemove(v *dao.PlatformModel, w http.ResponseWrit return nil, err } -func (api *ApiServer) OnPlatformList(w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnPlatformList(_ http.ResponseWriter, _ *http.Request) { //platforms := LoadPlatforms() //httpResponseOK(w, platforms) } -func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("级联绑定通道 %v", *v) platform := stack.PlatformManager.Find(v.ServerAddr) @@ -1096,7 +1118,7 @@ func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, w http.ResponseW return channels, nil } -func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) { +func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("级联解绑通道 %v", *v) platform := stack.PlatformManager.Find(v.ServerAddr) @@ -1114,26 +1136,94 @@ func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, w http.Respons return channels, nil } -func (api *ApiServer) OnDeviceMediaTransportSet(w http.ResponseWriter, r *http.Request) (interface{}, error) { - serial := r.FormValue("serial") - mediaTransport := r.FormValue("media_transport") - mediaTransportMode := r.FormValue("media_transport_mode") - +func (api *ApiServer) OnDeviceMediaTransportSet(req *SetMediaTransportReq, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { var setupType common.SetupType - if "udp" == strings.ToLower(mediaTransport) { + if "udp" == strings.ToLower(req.MediaTransport) { setupType = common.SetupTypeUDP - } else if "passive" == strings.ToLower(mediaTransportMode) { + } else if "passive" == strings.ToLower(req.MediaTransportMode) { setupType = common.SetupTypePassive - } else if "active" == strings.ToLower(mediaTransportMode) { + } else if "active" == strings.ToLower(req.MediaTransportMode) { setupType = common.SetupTypeActive } else { return nil, fmt.Errorf("media_transport_mode error") } - err := dao.Device.UpdateMediaTransport(serial, setupType) + err := dao.Device.UpdateMediaTransport(req.DeviceID, setupType) if err != nil { return nil, err } return "OK", nil } + +func (api *ApiServer) OnCatalogQuery(params *QueryDeviceChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { + deviceModel, err := dao.Device.QueryDevice(params.DeviceID) + if err != nil { + return nil, err + } + + if deviceModel == nil { + return nil, fmt.Errorf("not found device") + } + + list, err := (&stack.Device{deviceModel}).QueryCatalog(15) + if err != nil { + return nil, err + } + + response := struct { + ChannelCount int `json:"ChannelCount"` + ChannelList []*dao.ChannelModel `json:"ChannelList"` + }{ + ChannelCount: len(list), + ChannelList: list, + } + return &response, nil +} + +func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { + // 构建目标URL + targetURL := common.Config.MediaServer + r.URL.Path + if r.URL.RawQuery != "" { + targetURL += "?" + r.URL.RawQuery + } + + // 创建转发请求 + proxyReq, err := http.NewRequest(r.Method, targetURL, r.Body) + if err != nil { + http.Error(w, "Error creating proxy request", http.StatusInternalServerError) + return + } + + // 复制请求头 + for name, values := range r.Header { + for _, value := range values { + proxyReq.Header.Add(name, value) + } + } + + // 发送请求 + client := &http.Client{ + Timeout: 30 * time.Second, + } + resp, err := client.Do(proxyReq) + if err != nil { + http.Error(w, "Error forwarding request", http.StatusBadGateway) + return + } + defer resp.Body.Close() + + // 复制响应头 + for name, values := range resp.Header { + for _, value := range values { + w.Header().Add(name, value) + } + } + + // 设置状态码并转发响应体 + w.WriteHeader(resp.StatusCode) + _, err = io.Copy(w, resp.Body) + if err != nil { + log.Sugar.Errorf("Failed to copy response body: %v", err) + } +} diff --git a/api_livegbs.go b/api_livegbs.go index cd00f0e..26c8249 100644 --- a/api_livegbs.go +++ b/api_livegbs.go @@ -89,8 +89,8 @@ func registerLiveGBSApi() { DeviceTotal int `json:"DeviceTotal"` }{ ChannelCount: 16, - ChannelOnline: 1, - ChannelTotal: 1, + ChannelOnline: ChannelOnlineCount, + ChannelTotal: ChannelTotalCount, DeviceOnline: stack.OnlineDeviceManager.Count(), DeviceTotal: dao.DeviceCount, } @@ -120,7 +120,7 @@ func registerLiveGBSApi() { Hardware: KernelArch, InterfaceVersion: "v1", - RemainDays: 0, + RemainDays: 99, RunningTime: FormatUptime(GetUptime()), Server: "github.com/lkmio/gb-cms dev", ServerTime: time.Now().Format("2006-01-02 15:04:05"), diff --git a/common/http_request.go b/common/http_request.go index b671374..9ec560b 100644 --- a/common/http_request.go +++ b/common/http_request.go @@ -92,17 +92,6 @@ func WithJsonResponse[T any](f func(params T, w http.ResponseWriter, req *http.R } } -func WithJsonResponse2(f func(w http.ResponseWriter, req *http.Request) (interface{}, error)) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, req *http.Request) { - responseBody, err := f(w, req) - if err != nil { - _ = HttpResponseError(w, err.Error()) - } else if responseBody != nil { - _ = HttpResponseJson(w, responseBody) - } - } -} - func WithQueryStringParams[T any](f func(params T, w http.ResponseWriter, req *http.Request) (interface{}, error), params interface{}) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { var newParams T @@ -125,7 +114,8 @@ func WithQueryStringParams[T any](f func(params T, w http.ResponseWriter, req *h responseBody, err := f(result.(T), w, req) if err != nil { - _ = HttpResponseError(w, err.Error()) + w.WriteHeader(http.StatusBadRequest) + _ = HttpResponseJson(w, err.Error()) } else if responseBody != nil { _ = HttpResponseJson(w, responseBody) } @@ -153,7 +143,7 @@ func WithFormDataParams[T any](f func(params T, w http.ResponseWriter, req *http responseBody, err := f(result.(T), w, req) if err != nil { - _ = HttpResponseError(w, err.Error()) + _ = HttpResponseJson(w, err.Error()) } else if responseBody != nil { _ = HttpResponseJson(w, responseBody) } diff --git a/config.json b/config.json index 978bc64..a991431 100644 --- a/config.json +++ b/config.json @@ -9,7 +9,7 @@ "alive_expires": 180, "mobile_position_interval": 10, - "media_server": "0.0.0.0:8080", + "media_server": "http://0.0.0.0:8080", "?auto_close_on_idle": "拉流空闲时, 立即关闭流", "auto_close_on_idle": true, diff --git a/dao/channel.go b/dao/channel.go index 5b3bd7c..2ef9c3d 100644 --- a/dao/channel.go +++ b/dao/channel.go @@ -65,6 +65,8 @@ func (d *ChannelModel) Online() bool { type DaoChannel interface { SaveChannel(channel *ChannelModel) error + SaveChannels(channel []*ChannelModel) error + UpdateChannelStatus(deviceId, channelId, status string) error QueryChannelByID(id uint) (*ChannelModel, error) @@ -93,9 +95,15 @@ type DaoChannel interface { DeleteChannel(deviceId string, channelId string) error + DeleteChannels(deviceId string) error + UpdateRootID(rootId, newRootId string) error UpdateChannel(channel *ChannelModel) error + + TotalCount() (int, error) + + OnlineCount(ids []string) (int, error) } type daoChannel struct { @@ -111,6 +119,12 @@ func (d *daoChannel) SaveChannel(channel *ChannelModel) error { }) } +func (d *daoChannel) SaveChannels(channels []*ChannelModel) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Save(channels).Error + }) +} + func (d *daoChannel) UpdateChannelStatus(deviceId, channelId, status string) error { return db.Model(&ChannelModel{}).Where("root_id =? and device_id =?", deviceId, channelId).Update("status", status).Error } @@ -235,6 +249,10 @@ func (d *daoChannel) SaveJTChannel(channel *ChannelModel) error { }) } +func (d *daoChannel) DeleteChannels(deviceId string) error { + return db.Where("root_id =?", deviceId).Unscoped().Delete(&ChannelModel{}).Error +} + func (d *daoChannel) DeleteChannel(deviceId string, channelId string) error { return db.Where("root_id =? and device_id =?", deviceId, channelId).Unscoped().Delete(&ChannelModel{}).Error } @@ -262,3 +280,21 @@ func (d *daoChannel) UpdateChannel(channel *ChannelModel) error { return tx.Model(channel).Where("id =?", channel.ID).Updates(channel).Error }) } + +func (d *daoChannel) TotalCount() (int, error) { + var total int64 + tx := db.Model(&ChannelModel{}).Count(&total) + if tx.Error != nil { + return 0, tx.Error + } + return int(total), nil +} + +func (d *daoChannel) OnlineCount(ids []string) (int, error) { + var total int64 + tx := db.Model(&ChannelModel{}).Where("status =? and root_id in ?", "ON", ids).Count(&total) + if tx.Error != nil { + return 0, tx.Error + } + return int(total), nil +} diff --git a/livegbs_bean.go b/livegbs_bean.go index d3ac1d7..913d389 100644 --- a/livegbs_bean.go +++ b/livegbs_bean.go @@ -3,6 +3,7 @@ package main type LiveGBSDevice struct { AlarmSubscribe bool `json:"AlarmSubscribe"` CatalogInterval int `json:"CatalogInterval"` + CatalogProgress string `json:"CatalogProgress,omitempty"` // 查询目录进度recvSize/totalSize CatalogSubscribe bool `json:"CatalogSubscribe"` ChannelCount int `json:"ChannelCount"` ChannelOverLoad bool `json:"ChannelOverLoad"` diff --git a/stack/client_benchmark_test.go b/stack/client_benchmark_test.go index 73f0833..fd54267 100644 --- a/stack/client_benchmark_test.go +++ b/stack/client_benchmark_test.go @@ -1,361 +1,370 @@ package stack +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "gb-cms/common" + "gb-cms/dao" + "github.com/ghettovoice/gosip/sip" + "net" + "net/http" + "os" + "strconv" + "sync" + "testing" + "time" +) + +var ( + rtpPackets [][]byte + locks map[uint32]*sync.RWMutex +) + +type MediaStream struct { + ssrc uint32 + tcp bool + conn net.Conn + //transport transport.Transport + cancel context.CancelFunc + dialog sip.Request + ctx context.Context + + closedCB func(sendBye bool) +} + +func (m *MediaStream) write() { + //var index int + //length := len(rtpPackets) + //for m.ctx.Err() == nil && index < length { + // time.Sleep(time.Millisecond * 40) + // + // //一次发送某个时间范围内的所有rtp包 + // ts := binary.BigEndian.Uint32(rtpPackets[index][2+4:]) + // mutex := locks[ts] + // { + // mutex.Lock() + // + // for ; m.ctx.Err() == nil && index < length; index++ { + // bytes := rtpPackets[index] + // nextTS := binary.BigEndian.Uint32(bytes[2+4:]) + // if nextTS != ts { + // break + // } + // + // rtp.ModifySSRC(bytes[2:], m.ssrc) + // + // if m.tcp { + // m.conn.Write(bytes) + // } else { + // m.transport.(*transport.UDPClient).Write(bytes[2:]) + // } + // } + // + // mutex.Unlock() + // } + //} + + println("推流结束") + m.Close(true) +} + +func (m *MediaStream) Start() { + m.ctx, m.cancel = context.WithCancel(context.Background()) + go m.write() +} + +func (m *MediaStream) Close(sendBye bool) { + m.cancel() + + if m.closedCB != nil { + m.closedCB(sendBye) + } +} + +func (m *MediaStream) OnConnected(conn net.Conn) []byte { + m.conn = conn + fmt.Printf("tcp连接:%s", conn.RemoteAddr()) + return nil +} + +func (m *MediaStream) OnPacket(conn net.Conn, data []byte) []byte { + return nil +} + +func (m *MediaStream) OnDisConnected(conn net.Conn, err error) { + fmt.Printf("tcp断开连接:%s", conn.RemoteAddr()) + m.Close(true) +} + +type vDevice struct { + *gbClient + streams map[string]*MediaStream + lock sync.Locker +} + +// func CreateTransport(ip string, port int, setup string, handler transport.Handler) (transport.Transport, bool, error) { +// if "passive" == setup { +// tcpClient := &transport.TCPClient{} +// tcpClient.SetHandler(handler) // -//import ( -// "context" -// "encoding/binary" -// "encoding/json" -// "fmt" -// "github.com/ghettovoice/gosip/sip" -// "github.com/lkmio/rtp" -// "github.com/lkmio/transport" -// "net" -// "net/http" -// "os" -// "strconv" -// "strings" -// "sync" -// "testing" -// "time" -//) +// _, err := tcpClient.Connect(nil, &net.TCPAddr{IP: net.ParseIP(ip), Port: port}) +// return tcpClient, true, err +// } else if "active" == setup { +// tcpServer := &transport.TCPServer{} +// tcpServer.SetHandler(handler) +// err := tcpServer.Bind(nil) // -//var ( -// rtpPackets [][]byte -// locks map[uint32]*sync.RWMutex -//) -// -//type MediaStream struct { -// ssrc uint32 -// tcp bool -// conn net.Conn -// transport transport.Transport -// cancel context.CancelFunc -// dialog sip.Request -// ctx context.Context -// -// closedCB func(sendBye bool) -//} -// -//func (m *MediaStream) write() { -// var index int -// length := len(rtpPackets) -// for m.ctx.Err() == nil && index < length { -// time.Sleep(time.Millisecond * 40) -// -// //一次发送某个时间范围内的所有rtp包 -// ts := binary.BigEndian.Uint32(rtpPackets[index][2+4:]) -// mutex := locks[ts] -// { -// mutex.Lock() -// -// for ; m.ctx.Err() == nil && index < length; index++ { -// bytes := rtpPackets[index] -// nextTS := binary.BigEndian.Uint32(bytes[2+4:]) -// if nextTS != ts { -// break -// } -// -// rtp.ModifySSRC(bytes[2:], m.ssrc) -// -// if m.tcp { -// m.conn.Write(bytes) -// } else { -// m.transport.(*transport.UDPClient).Write(bytes[2:]) -// } -// } -// -// mutex.Unlock() +// return tcpServer, true, err +// } else { +// udp := &transport.UDPClient{} +// err := udp.Connect(nil, &net.UDPAddr{IP: net.ParseIP(ip), Port: port}) +// return udp, false, err // } // } -// -// println("推流结束") -// m.Close(true) -//} -// -//func (m *MediaStream) Start() { -// m.ctx, m.cancel = context.WithCancel(context.Background()) -// go m.write() -//} -// -//func (m *MediaStream) Close(sendBye bool) { -// m.cancel() -// -// if m.closedCB != nil { -// m.closedCB(sendBye) -// } -//} -// -//func (m *MediaStream) OnConnected(conn net.Conn) []byte { -// m.conn = conn -// fmt.Printf("tcp连接:%s", conn.RemoteAddr()) -// return nil -//} -// -//func (m *MediaStream) OnPacket(conn net.Conn, data []byte) []byte { -// return nil -//} -// -//func (m *MediaStream) OnDisConnected(conn net.Conn, err error) { -// fmt.Printf("tcp断开连接:%s", conn.RemoteAddr()) -// m.Close(true) -//} -// -//type Platform struct { -// *gbClient -// streams map[string]*MediaStream -// lock sync.Locker -//} -// -//func CreateTransport(ip string, port int, setup string, handler transport.Handler) (transport.Transport, bool, error) { -// if "passive" == setup { -// tcpClient := &transport.TCPClient{} -// tcpClient.SetHandler(handler) -// -// _, err := tcpClient.Connect(nil, &net.TCPAddr{IP: net.ParseIP(ip), Port: port}) -// return tcpClient, true, err -// } else if "active" == setup { -// tcpServer := &transport.TCPServer{} -// tcpServer.SetHandler(handler) -// err := tcpServer.Bind(nil) -// -// return tcpServer, true, err -// } else { -// udp := &transport.UDPClient{} -// err := udp.Connect(nil, &net.UDPAddr{IP: net.ParseIP(ip), Port: port}) -// return udp, false, err -// } -//} -// -//func (v Platform) OnInvite(request sip.Request, user string) sip.Response { -// if len(rtpPackets) < 1 { -// return CreateResponseWithStatusCode(request, http.StatusInternalServerError) -// } -// -// offer, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body()) -// if err != nil { -// return CreateResponseWithStatusCode(request, http.StatusBadRequest) -// } -// -// stream := &MediaStream{} -// socket, tcp, err := CreateTransport(offer.Addr, int(media.Port), offerSetup, stream) -// if err != nil { -// return CreateResponseWithStatusCode(request, http.StatusBadRequest) -// } -// -// time := strings.Split(offer.Time, " ") -// if len(time) < 2 { -// return CreateResponseWithStatusCode(request, http.StatusBadRequest) -// } -// -// var ip string -// var port sip.Port -// var contactAddr string -// if v.sipUA.NatAddr != "" { -// contactAddr = v.sipUA.NatAddr -// } else { -// contactAddr = v.sipUA.ListenAddr -// } -// -// host, p, _ := net.SplitHostPort(contactAddr) -// ip = host -// atoi, _ := strconv.Atoi(p) -// port = sip.Port(atoi) -// -// contactAddress := &sip.Address{ -// Uri: &sip.SipUri{ -// FUser: sip.String{Str: user}, -// FHost: ip, -// FPort: &port, -// }, -// } -// -// answer := BuildSDP(user, offer.Session, ip, uint16(socket.ListenPort()), time[0], time[1], answerSetup, speed, ssrc) -// response := CreateResponseWithStatusCode(request, http.StatusOK) -// response.RemoveHeader("Contact") -// response.AppendHeader(contactAddress.AsContactHeader()) -// response.AppendHeader(&SDPMessageType) -// response.SetBody(answer, true) -// setToTag(response) -// -// i, _ := strconv.Atoi(ssrc) -// stream.ssrc = uint32(i) -// stream.tcp = tcp -// stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipUA.Domain) -// callId, _ := response.CallID() -// -// { -// v.lock.Lock() -// defer v.lock.Unlock() -// v.streams[callId.Value()] = stream -// } -// -// // 设置网络断开回调 -// stream.closedCB = func(sendBye bool) { -// if stream.dialog != nil { -// id, _ := stream.dialog.CallID() -// StreamManager.RemoveWithCallId(id.Value()) -// -// { -// v.lock.Lock() -// delete(v.streams, id.Value()) -// v.lock.Unlock() -// } -// -// if sendBye { -// bye := CreateRequestFromDialog(stream.dialog, sip.BYE) -// v.sipUA.stack.SendRequest(bye) -// } -// -// stream.dialog = nil -// } -// -// if stream.transport != nil { -// stream.transport.Close() -// stream.transport = nil -// } -// } -// -// stream.transport = socket -// stream.Start() -// -// // 绑定到StreamManager, bye请求才会找到设备回调 -// streamId := GenerateStreamID(InviteTypePlay, v.sipUA.Username, user, "", "") -// s := StreamID{StreamID: streamId, Dialog: stream.dialog} -// StreamManager.Add(&s) -// -// callID, _ := request.CallID() -// StreamManager.AddWithCallId(callID.Value(), &s) -// return response -//} -// -//func (v Platform) OnBye(request sip.Request) { -// id, _ := request.CallID() -// stream, ok := v.streams[id.Value()] -// if !ok { -// return -// } -// -// { -// // 此作用域内defer不会生效 -// v.lock.Lock() -// delete(v.streams, id.Value()) -// v.lock.Unlock() -// } -// -// stream.Close(false) -//} -// -//func (v Platform) Offline() { -// for _, stream := range v.streams { -// stream.Close(true) -// } -// -// v.streams = nil -//} -// -//type ClientConfig struct { -// DeviceIDPrefix string `json:"device_id_prefix"` -// ChannelIDPrefix string `json:"channel_id_prefix"` -// ServerAddr string `json:"server_id"` -// Domain string `json:"domain"` -// Password string `json:"password"` -// ListenAddr string `json:"listenAddr"` -// Count int `json:"count"` -// RawFilePath string `json:"rtp_over_tcp_raw_file_path"` // rtp over tcp源文件 -//} -// -//func TestGBClient(t *testing.T) { -// configData, err := os.ReadFile("./client_benchmark_test_config.json") -// if err != nil { -// panic(err) -// } -// -// clientConfig := &ClientConfig{} -// if err = json.Unmarshal(configData, clientConfig); err != nil { -// panic(err) -// } -// -// rtpData, err := os.ReadFile(clientConfig.RawFilePath) -// if err != nil { -// println("读取rtp源文件失败 不能推流") -// } else { -// // 分割rtp包 -// offset := 2 -// length := len(rtpData) -// locks = make(map[uint32]*sync.RWMutex, 128) -// for rtpSize := 0; offset < length; offset += rtpSize + 2 { -// rtpSize = int(binary.BigEndian.Uint16(rtpData[offset-2:])) -// if length-offset < rtpSize { -// break -// } -// -// bytes := rtpData[offset : offset+rtpSize] -// ts := binary.BigEndian.Uint32(bytes[4:]) -// // 每个相同时间戳共用一把互斥锁, 只允许同时一路流发送该时间戳内的rtp包, 保护ssrc被不同的流修改 -// if _, ok := locks[ts]; !ok { -// locks[ts] = &sync.RWMutex{} -// } -// -// rtpPackets = append(rtpPackets, rtpData[offset-2:offset+rtpSize]) -// } -// } -// -// println("========================================") -// println("源码地址: https://github.com/lkmio/gb-cms") -// println("视频来源于网络,如有侵权,请联系删除") -// println("========================================\r\n") -// -// time.Sleep(3 * time.Second) -// -// // 初始化UA配置, 防止SipServer使用时空指针 -// Config = &Config_{} -// -// listenIP, listenPort, err := net.SplitHostPort(clientConfig.ListenAddr) -// if err != nil { -// panic(err) -// } -// -// atoi, err := strconv.Atoi(listenPort) -// if err != nil { -// panic(err) -// } -// -// server, err := StartSipServer("", listenIP, listenIP, atoi) -// if err != nil { -// panic(err) -// } -// DeviceChannelsManager = &DeviceChannels{ -// channels: make(map[string][]*Channel, clientConfig.Count), -// } -// -// for i := 0; i < clientConfig.Count; i++ { -// deviceId := clientConfig.DeviceIDPrefix + fmt.Sprintf("%07d", i+1) -// channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1) -// client := NewGBClient(deviceId, clientConfig.ServerAddr, clientConfig.Domain, "UDP", clientConfig.Password, 500, 40, server) -// -// device := Platform{client.(*gbClient), map[string]*MediaStream{}, &sync.Mutex{}} -// device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1") -// -// channel := &Channel{ -// DeviceID: channelId, -// Name: "1", -// ParentID: deviceId, -// } -// -// DeviceManager.Add(device) -// DeviceChannelsManager.AddChannel(deviceId, channel) -// -// device.Start() -// -// device.SetOnRegisterHandler(func() { -// fmt.Printf(deviceId + " 注册成功\r\n") -// }, func() { -// fmt.Printf(deviceId + " 离线\r\n") -// device.Offline() -// }) -// } -// -// for { -// time.Sleep(time.Second * 3) -// } -//} +func (v vDevice) OnInvite(request sip.Request, user string) sip.Response { + //if len(rtpPackets) < 1 { + return CreateResponseWithStatusCode(request, http.StatusInternalServerError) + //} + // + //offer, ssrc, speed, media, offerSetup, answerSetup, err := ParseGBSDP(request.Body()) + //if err != nil { + // return CreateResponseWithStatusCode(request, http.StatusBadRequest) + //} + // + //stream := &MediaStream{} + //socket, tcp, err := CreateTransport(offer.Addr, int(media.Port), offerSetup, stream) + //if err != nil { + // return CreateResponseWithStatusCode(request, http.StatusBadRequest) + //} + // + //time := strings.Split(offer.Time, " ") + //if len(time) < 2 { + // return CreateResponseWithStatusCode(request, http.StatusBadRequest) + //} + // + //var ip string + //var port sip.Port + //var contactAddr string + //if v.sipUA.NatAddr != "" { + // contactAddr = v.sipUA.NatAddr + //} else { + // contactAddr = v.sipUA.ListenAddr + //} + // + //host, p, _ := net.SplitHostPort(contactAddr) + //ip = host + //atoi, _ := strconv.Atoi(p) + //port = sip.Port(atoi) + // + //contactAddress := &sip.Address{ + // Uri: &sip.SipUri{ + // FUser: sip.String{Str: user}, + // FHost: ip, + // FPort: &port, + // }, + //} + // + //answer := BuildSDP(user, offer.Session, ip, uint16(socket.ListenPort()), time[0], time[1], answerSetup, speed, ssrc) + //response := CreateResponseWithStatusCode(request, http.StatusOK) + //response.RemoveHeader("Contact") + //response.AppendHeader(contactAddress.AsContactHeader()) + //response.AppendHeader(&SDPMessageType) + //response.SetBody(answer, true) + //setToTag(response) + // + //i, _ := strconv.Atoi(ssrc) + //stream.ssrc = uint32(i) + //stream.tcp = tcp + //stream.dialog = CreateDialogRequestFromAnswer(response, true, v.sipUA.Domain) + //callId, _ := response.CallID() + // + //{ + // v.lock.Lock() + // defer v.lock.Unlock() + // v.streams[callId.Value()] = stream + //} + // + //// 设置网络断开回调 + //stream.closedCB = func(sendBye bool) { + // if stream.dialog != nil { + // id, _ := stream.dialog.CallID() + // StreamManager.RemoveWithCallId(id.Value()) + // + // { + // v.lock.Lock() + // delete(v.streams, id.Value()) + // v.lock.Unlock() + // } + // + // if sendBye { + // bye := CreateRequestFromDialog(stream.dialog, sip.BYE) + // v.sipUA.stack.SendRequest(bye) + // } + // + // stream.dialog = nil + // } + // + // if stream.transport != nil { + // stream.transport.Close() + // stream.transport = nil + // } + //} + // + //stream.transport = socket + //stream.Start() + // + //// 绑定到StreamManager, bye请求才会找到设备回调 + //streamId := common.GenerateStreamID(common.InviteTypePlay, v.sipUA.Username, user, "", "") + //s := StreamID{StreamID: streamId, Dialog: stream.dialog} + //StreamManager.Add(&s) + // + //callID, _ := request.CallID() + //StreamManager.AddWithCallId(callID.Value(), &s) + //return response +} + +func (v vDevice) OnBye(request sip.Request) { + id, _ := request.CallID() + stream, ok := v.streams[id.Value()] + if !ok { + return + } + + { + // 此作用域内defer不会生效 + v.lock.Lock() + delete(v.streams, id.Value()) + v.lock.Unlock() + } + + stream.Close(false) +} + +func (v vDevice) Offline() { + for _, stream := range v.streams { + stream.Close(true) + } + + v.streams = nil +} + +type ClientConfig struct { + DeviceIDPrefix string `json:"device_id_prefix"` + ChannelIDPrefix string `json:"channel_id_prefix"` + ServerID string `json:"server_id"` + Domain string `json:"domain"` + Password string `json:"password"` + ListenAddr string `json:"listenAddr"` + Count int `json:"count"` + RawFilePath string `json:"rtp_over_tcp_raw_file_path"` // rtp over tcp源文件 +} + +func TestGBClient(t *testing.T) { + configData, err := os.ReadFile("./client_benchmark_test_config.json") + if err != nil { + panic(err) + } + + clientConfig := &ClientConfig{} + if err = json.Unmarshal(configData, clientConfig); err != nil { + panic(err) + } + + rtpData, err := os.ReadFile(clientConfig.RawFilePath) + if err != nil { + println("读取rtp源文件失败 不能推流") + } else { + // 分割rtp包 + offset := 2 + length := len(rtpData) + locks = make(map[uint32]*sync.RWMutex, 128) + for rtpSize := 0; offset < length; offset += rtpSize + 2 { + rtpSize = int(binary.BigEndian.Uint16(rtpData[offset-2:])) + if length-offset < rtpSize { + break + } + + bytes := rtpData[offset : offset+rtpSize] + ts := binary.BigEndian.Uint32(bytes[4:]) + // 每个相同时间戳共用一把互斥锁, 只允许同时一路流发送该时间戳内的rtp包, 保护ssrc被不同的流修改 + if _, ok := locks[ts]; !ok { + locks[ts] = &sync.RWMutex{} + } + + rtpPackets = append(rtpPackets, rtpData[offset-2:offset+rtpSize]) + } + } + + println("========================================") + println("源码地址: https://github.com/lkmio/gb-cms") + println("视频来源于网络,如有侵权,请联系删除") + println("========================================\r\n") + + time.Sleep(3 * time.Second) + + // 初始化UA配置, 防止SipServer使用时空指针 + common.Config = &common.Config_{} + + listenIP, listenPort, err := net.SplitHostPort(clientConfig.ListenAddr) + if err != nil { + panic(err) + } + + atoi, err := strconv.Atoi(listenPort) + if err != nil { + panic(err) + } + + server, err := StartSipServer("", listenIP, listenIP, atoi) + if err != nil { + panic(err) + } + DeviceChannelsManager = &DeviceChannels{ + channels: make(map[string][]*dao.ChannelModel, clientConfig.Count), + } + + for i := 0; i < clientConfig.Count; i++ { + deviceId := clientConfig.DeviceIDPrefix + fmt.Sprintf("%07d", i+1) + options := &common.SIPUAOptions{ + Username: deviceId, + ServerID: clientConfig.ServerID, + ServerAddr: clientConfig.Domain, + Transport: "UDP", + Password: clientConfig.Password, + RegisterExpires: 500, + KeepaliveInterval: 40, + } + client := NewGBClient(options, server) + + device := vDevice{client.(*gbClient), map[string]*MediaStream{}, &sync.Mutex{}} + device.SetDeviceInfo(fmt.Sprintf("测试设备%d", i+1), "lkmio", "lkmio_gb", "dev-0.0.1") + + DeviceManager.Add(deviceId, device) + + for j := 0; j < 100; j++ { + channelId := clientConfig.ChannelIDPrefix + fmt.Sprintf("%07d", i+1+j) + channel := &dao.ChannelModel{ + DeviceID: channelId, + Name: "1", + ParentID: deviceId, + } + + DeviceChannelsManager.AddChannel(deviceId, channel) + } + + device.Start() + + device.SetOnRegisterHandler(func() { + fmt.Printf(deviceId + " 注册成功\r\n") + }, func() { + fmt.Printf(deviceId + " 离线\r\n") + device.Offline() + }) + } + + for { + time.Sleep(time.Second * 3) + } +} diff --git a/stack/client_benchmark_test_config.json b/stack/client_benchmark_test_config.json index 37f6452..2128eef 100644 --- a/stack/client_benchmark_test_config.json +++ b/stack/client_benchmark_test_config.json @@ -3,10 +3,10 @@ "channel_id_prefix": "3402000000131", "server_id": "34020000002000000001", "?domain": "国标上级域的地址", - "domain": "192.168.2.148:5060", + "domain": "160.202.253.143:15060", "password": "12345678", - "listenAddr": "192.168.2.148:15062", - "count": 100, + "listenAddr": "192.168.2.119:15062", + "count": 1, "?rtp_over_tcp_raw_file_path": "rtp over tcp的推流源文件", "rtp_over_tcp_raw_file_path": "./rtp.raw" } \ No newline at end of file diff --git a/stack/client_manager.go b/stack/client_manager.go index 759c3dd..ef3b792 100644 --- a/stack/client_manager.go +++ b/stack/client_manager.go @@ -17,6 +17,12 @@ var ( clients: make(map[string]GBClient, 8), // username->client addrMap: make(map[string]int, 8), } + + // DeviceManager 模拟国标设备 + DeviceManager = &ClientManager{ + clients: make(map[string]GBClient, 8), // username->client + addrMap: make(map[string]int, 8), + } ) type ClientManager struct { diff --git a/stack/device.go b/stack/device.go index 9de7957..e8822ee 100644 --- a/stack/device.go +++ b/stack/device.go @@ -1,12 +1,18 @@ package stack import ( + "context" + "errors" "fmt" "gb-cms/common" "gb-cms/dao" + "gb-cms/log" "github.com/ghettovoice/gosip/sip" "net" + "net/http" "strconv" + "strings" + "time" ) const ( @@ -14,7 +20,7 @@ const ( "\r\n" + "Catalog\r\n" + "" + - "%s" + + "%d" + "\r\n" + "" + "%s" + @@ -25,7 +31,7 @@ const ( "\r\n" + "DeviceInfo\r\n" + "" + - "%s" + + "%d" + "\r\n" + "" + "%s" + @@ -46,7 +52,7 @@ type GBDevice interface { QueryDeviceInfo() // QueryCatalog 发送查询目录命令 - QueryCatalog() + QueryCatalog(timeout int) ([]*dao.ChannelModel, error) // QueryRecord 发送查询录像命令 QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error @@ -80,6 +86,11 @@ type GBDevice interface { Close() } +type CatalogProgress struct { + TotalSize int + RecvSize int +} + type Device struct { *dao.DeviceModel } @@ -94,15 +105,180 @@ func (d *Device) BuildMessageRequest(to, body string) sip.Request { } func (d *Device) QueryDeviceInfo() { - body := fmt.Sprintf(DeviceInfoFormat, "1", d.DeviceID) + body := fmt.Sprintf(DeviceInfoFormat, GetSN(), d.DeviceID) request := d.BuildMessageRequest(d.DeviceID, body) common.SipStack.SendRequest(request) } -func (d *Device) QueryCatalog() { - body := fmt.Sprintf(CatalogFormat, "1", d.DeviceID) - request := d.BuildMessageRequest(d.DeviceID, body) - common.SipStack.SendRequest(request) +func (d *Device) QueryCatalog(timeoutSeconds int) ([]*dao.ChannelModel, error) { + catalogProgress := &CatalogProgress{} + + var timeoutCtx context.Context + var timeoutCancelFunc context.CancelFunc + if timeoutSeconds > 0 { + timeoutCtx, timeoutCancelFunc = context.WithTimeout(context.Background(), time.Duration(timeoutSeconds)*time.Second) + } + + var err error + var result []*dao.ChannelModel + query := func() { + defer func() { + if timeoutCancelFunc != nil { + timeoutCancelFunc() + } + }() + + // 下发查询指令 + finish := make(chan byte, 1) + sn := GetSN() + body := fmt.Sprintf(CatalogFormat, sn, d.DeviceID) + request := d.BuildMessageRequest(d.DeviceID, body) + tx := common.SipStack.SendRequest(request) + // 异步等待响应 + go func() { + response := <-tx.Responses() + if response != nil && response.StatusCode() != http.StatusOK { + err = fmt.Errorf("query catalog res[%d] %s", response.StatusCode(), StatusCode2Reason(int(response.StatusCode()))) + finish <- 1 + return + } + }() + + // 处理目录消息 + lastTime := time.Now() + var list []*CatalogResponse + SNManager.AddEvent(sn, func(response interface{}) { + lastTime = time.Now() + catalog := response.(*CatalogResponse) + catalogProgress.TotalSize = catalog.SumNum + catalogProgress.RecvSize += catalog.DeviceList.Num + + list = append(list, catalog) + if catalogProgress.RecvSize >= catalogProgress.TotalSize { + finish <- 1 + } + }) + + // 定时检测是否超时或完成 + timeout := 10 * time.Second + ticker := time.NewTicker(timeout) + for { + var end bool + select { + case <-ticker.C: + if time.Since(lastTime) > timeout { + // 超时, 则直接返回 + err = fmt.Errorf("query catalog timeout[%ds]", int(timeout.Seconds())) + ticker.Stop() + end = true + break + } + case <-finish: + ticker.Stop() + end = true + break + } + + if end { + break + } + } + + if err != nil { + return + } + + // 如果查询不完整, 并且数据库中通道列表不为空, 则丢弃本次查询的数据, 否则依旧入库 + var oldChannelCount int + oldChannelCount, err = dao.Channel.QueryChanelCount(d.DeviceID) + if err != nil { + log.Sugar.Errorf("query channel count failed, device: %s, err: %s", d.DeviceID, err.Error()) + return + } else if len(list) < 1 || (oldChannelCount > 0 && catalogProgress.RecvSize < catalogProgress.TotalSize) { + log.Sugar.Errorf("query catalog failed, device: %s, count: %d, recvSize: %d, totalSize: %d", d.DeviceID, oldChannelCount, catalogProgress.RecvSize, catalogProgress.TotalSize) + return + } + + // 删除旧的通道列表 + if oldChannelCount > 0 { + err = dao.Channel.DeleteChannels(d.DeviceID) + if err != nil { + log.Sugar.Errorf("delete channels failed, device: %s, err: %s", d.DeviceID, err.Error()) + return + } + } + + // 批量保存通道 + result, err = d.SaveChannels(list) + } + + if !UniqueTaskManager.Commit(GenerateCatalogTaskID(d.DeviceID), query, catalogProgress) { + return nil, errors.New("device busy") + } + + // web接口的查询超时 + if timeoutCtx != nil { + select { + case <-timeoutCtx.Done(): + if err == nil && catalogProgress.RecvSize < catalogProgress.TotalSize { + err = fmt.Errorf("wait for catalog[%d/%d] timeout[%ds]", catalogProgress.RecvSize, catalogProgress.TotalSize, timeoutSeconds) + } + break + } + } + + return result, err +} + +func (d *Device) SaveChannels(list []*CatalogResponse) ([]*dao.ChannelModel, error) { + var channels []*dao.ChannelModel + for _, response := range list { + for _, channel := range response.DeviceList.Devices { + // 状态转为大写 + channel.Status = common.OnlineStatus(strings.ToUpper(channel.Status.String())) + + // 默认在线 + if common.OFF != channel.Status { + channel.Status = common.ON + } + + // 下级设备的系统ID, 更新DeviceInfo + if channel.DeviceID == d.DeviceID && dao.Device.ExistDevice(d.DeviceID) { + _ = dao.Device.UpdateDeviceInfo(d.DeviceID, &dao.DeviceModel{ + Manufacturer: channel.Manufacturer, + Model: channel.Model, + Name: channel.Name, + }) + } + + typeCode := GetTypeCode(channel.DeviceID) + if typeCode == "" { + log.Sugar.Errorf("保存通道时, 获取设备类型失败 device: %s", channel.DeviceID) + } + + var groupId string + if channel.ParentID != "" { + layers := strings.Split(channel.ParentID, "/") + groupId = layers[len(layers)-1] + } else if channel.BusinessGroupID != "" { + groupId = channel.BusinessGroupID + } + + code, _ := strconv.Atoi(typeCode) + channel.RootID = d.DeviceID + channel.TypeCode = code + channel.GroupID = groupId + channels = append(channels, channel) + } + } + + err := dao.Channel.SaveChannels(channels) + if err != nil { + log.Sugar.Errorf("save channels failed, device: %s, err: %s", d.DeviceID, err.Error()) + return nil, err + } + + return channels, nil } func (d *Device) QueryRecord(channelId, startTime, endTime string, sn int, type_ string) error { @@ -123,7 +299,7 @@ func (d *Device) SubscribePosition(channelId string) error { //暂时不考虑级联 builder := d.NewRequestBuilder(sip.SUBSCRIBE, common.Config.SipID, common.Config.SipContactAddr, channelId) - body := fmt.Sprintf(MobilePositionMessageFormat, 1, channelId, common.Config.MobilePositionInterval) + body := fmt.Sprintf(MobilePositionMessageFormat, GetSN(), channelId, common.Config.MobilePositionInterval) expiresHeader := sip.Expires(common.Config.MobilePositionExpires) builder.SetExpires(&expiresHeader) @@ -151,7 +327,7 @@ func (d *Device) SubscribePosition(channelId string) error { } func (d *Device) Broadcast(sourceId, channelId string) sip.ClientTransaction { - body := fmt.Sprintf(BroadcastFormat, 1, sourceId, channelId) + body := fmt.Sprintf(BroadcastFormat, GetSN(), sourceId, channelId) request := d.BuildMessageRequest(channelId, body) return common.SipStack.SendRequest(request) } @@ -161,7 +337,7 @@ func (d *Device) UpdateChannel(id string, event string) { } func (d *Device) BuildCatalogRequest() (sip.Request, error) { - body := fmt.Sprintf(CatalogFormat, "1", d.DeviceID) + body := fmt.Sprintf(CatalogFormat, GetSN(), d.DeviceID) request := d.BuildMessageRequest(d.DeviceID, body) return request, nil } diff --git a/stack/media_server.go b/stack/media_server.go index 7db9374..b5f7045 100644 --- a/stack/media_server.go +++ b/stack/media_server.go @@ -79,7 +79,7 @@ func SendWithUrlParams(path string, body interface{}, values url.Values) (*http. } } - url := fmt.Sprintf("http://%s/%s", common.Config.MediaServer, path) + url := fmt.Sprintf("%s/%s", common.Config.MediaServer, path) data, err := json.Marshal(body) if err != nil { diff --git a/stack/online_devices.go b/stack/online_devices.go index 6654988..749bf95 100644 --- a/stack/online_devices.go +++ b/stack/online_devices.go @@ -41,6 +41,16 @@ func (m *onlineDeviceManager) Count() int { return len(m.devices) } +func (m *onlineDeviceManager) GetDeviceIds() []string { + m.lock.RLock() + defer m.lock.RUnlock() + ids := make([]string, 0, len(m.devices)) + for id := range m.devices { + ids = append(ids, id) + } + return ids +} + func (m *onlineDeviceManager) Start(interval time.Duration, keepalive time.Duration, OnExpires func(platformId int, deviceId string)) { // 精度有偏差 var timer *time.Timer diff --git a/stack/sip_handler.go b/stack/sip_handler.go index 6cd4e32..6114c1d 100644 --- a/stack/sip_handler.go +++ b/stack/sip_handler.go @@ -5,8 +5,6 @@ import ( "gb-cms/dao" "gb-cms/log" "github.com/lkmio/avformat/utils" - "strconv" - "strings" "time" ) @@ -66,44 +64,10 @@ func (e *EventHandler) OnKeepAlive(id string, addr string) bool { func (e *EventHandler) OnCatalog(device string, response *CatalogResponse) { utils.Assert(device == response.DeviceID) - for _, channel := range response.DeviceList.Devices { - // 状态转为大写 - channel.Status = common.OnlineStatus(strings.ToUpper(channel.Status.String())) - - // 默认在线 - if common.OFF != channel.Status { - channel.Status = common.ON - } - - // 下级设备的系统ID, 更新DeviceInfo - if channel.DeviceID == device && dao.Device.ExistDevice(device) { - _ = dao.Device.UpdateDeviceInfo(device, &dao.DeviceModel{ - Manufacturer: channel.Manufacturer, - Model: channel.Model, - Name: channel.Name, - }) - } - - typeCode := GetTypeCode(channel.DeviceID) - if typeCode == "" { - log.Sugar.Errorf("保存通道时, 获取设备类型失败 device: %s", channel.DeviceID) - } - - var groupId string - if channel.ParentID != "" { - layers := strings.Split(channel.ParentID, "/") - groupId = layers[len(layers)-1] - } else if channel.BusinessGroupID != "" { - groupId = channel.BusinessGroupID - } - - code, _ := strconv.Atoi(typeCode) - channel.RootID = device - channel.TypeCode = code - channel.GroupID = groupId - if err := dao.Channel.SaveChannel(channel); err != nil { - log.Sugar.Infof("保存通道到数据库失败 err: %s", err.Error()) - } + if event := SNManager.FindEvent(response.SN); event != nil { + event(response) + } else { + log.Sugar.Errorf("处理目录响应失败 SN: %d", response.SN) } } diff --git a/stack/sip_server.go b/stack/sip_server.go index f909a5c..64b2bf7 100644 --- a/stack/sip_server.go +++ b/stack/sip_server.go @@ -97,7 +97,7 @@ func (s *sipServer) OnRegister(wrapper *SipRequestSource) { } if queryCatalog { - device.QueryCatalog() + _, _ = device.QueryCatalog(0) } } @@ -250,6 +250,8 @@ func (s *sipServer) OnMessage(wrapper *SipRequestSource) { device = PlatformManager.Find(wrapper.req.Source()) } else if wrapper.fromJt { device = JTDeviceManager.Find(deviceId) + } else { + device = DeviceManager.Find(deviceId) } if ok = device != nil; !ok { diff --git a/stack/unique_task.go b/stack/unique_task.go new file mode 100644 index 0000000..8ef7621 --- /dev/null +++ b/stack/unique_task.go @@ -0,0 +1,51 @@ +package stack + +import ( + "fmt" + "sync" +) + +var ( + UniqueTaskManager = &taskManager{ + tasks: make(map[string]interface{}), + } +) + +func GenerateCatalogTaskID(deviceID string) string { + return fmt.Sprintf("%s_catalog", deviceID) +} + +type taskManager struct { + lock sync.Mutex + tasks map[string]interface{} +} + +func (t *taskManager) Commit(id string, task func(), data interface{}) bool { + t.lock.Lock() + defer t.lock.Unlock() + + if _, ok := t.tasks[id]; ok { + return false + } + + t.tasks[id] = data + + go func() { + task() + t.lock.Lock() + defer t.lock.Unlock() + delete(t.tasks, id) + }() + + return true +} + +func (t *taskManager) Find(id string) interface{} { + t.lock.Lock() + defer t.lock.Unlock() + + if data, ok := t.tasks[id]; ok { + return data + } + return nil +} diff --git a/stack/xml_test.go b/stack/xml_test.go index 1f3b592..a7798f7 100644 --- a/stack/xml_test.go +++ b/stack/xml_test.go @@ -30,7 +30,7 @@ func TestDecodeXML(t *testing.T) { if err != nil { panic(err) } - handler := EventHandler{} + //handler := EventHandler{} for i := 0; i < len(file); { size := binary.BigEndian.Uint32(file[i:]) i += 4 @@ -42,7 +42,8 @@ func TestDecodeXML(t *testing.T) { panic(err) } - handler.OnCatalog(catalogResponse.DeviceID, catalogResponse) + println(string(body)) + //handler.OnCatalog(catalogResponse.DeviceID, catalogResponse) } }) diff --git a/stats.go b/stats.go index 50e9a8a..67304a2 100644 --- a/stats.go +++ b/stats.go @@ -5,7 +5,9 @@ import ( "encoding/json" "fmt" "gb-cms/common" + "gb-cms/dao" "gb-cms/log" + "gb-cms/stack" "github.com/shirou/gopsutil/v3/cpu" "github.com/shirou/gopsutil/v3/disk" "github.com/shirou/gopsutil/v3/mem" @@ -22,6 +24,9 @@ var ( diskStatsJson string lastNetStatsJson string lastNetStats []net.IOCountersStat + + ChannelTotalCount int + ChannelOnlineCount int ) const ( @@ -379,26 +384,40 @@ func StartStats() { topStatsJson = string(marshal) } - // 统计磁盘信息 - count++ - if count >= 5 { - count = 0 + if count%5 == 0 { + // 统计磁盘信息 usage, err := stateDiskUsage() if err != nil { log.Sugar.Errorf("获取磁盘信息失败: %v", err) - continue + } else { + bytes, err := json.Marshal(common.MalformedRequest{ + Code: http.StatusOK, + Msg: "Success", + Data: usage, + }) + if err != nil { + log.Sugar.Errorf("序列化磁盘信息失败: %v", err) + } else { + diskStatsJson = string(bytes) + } } - bytes, err := json.Marshal(common.MalformedRequest{ - Code: http.StatusOK, - Msg: "Success", - Data: usage, - }) + // 统计通道总数和在线数 + i, err := dao.Channel.TotalCount() if err != nil { - log.Sugar.Errorf("序列化磁盘信息失败: %v", err) + log.Sugar.Errorf("获取通道总数失败: %v", err) } else { - diskStatsJson = string(bytes) + ChannelTotalCount = i + } + + onlineCount, err := dao.Channel.OnlineCount(stack.OnlineDeviceManager.GetDeviceIds()) + if err != nil { + log.Sugar.Errorf("获取在线通道数失败: %v", err) + } else { + ChannelOnlineCount = onlineCount } } + + count++ } }