diff --git a/api.go b/api.go index f3d3e02..de620f0 100644 --- a/api.go +++ b/api.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "errors" "fmt" "gb-cms/common" "gb-cms/dao" @@ -15,6 +16,7 @@ import ( "github.com/lkmio/avformat/utils" "io" "math" + "net" "net/http" "net/url" "os" @@ -120,9 +122,10 @@ type QueryDeviceChannel struct { Limit int `json:"limit"` Keyword string `json:"q"` Online string `json:"online"` - ChannelType string `json:"channel_type"` - Order string `json:"order"` // asc/desc - Sort string `json:"sort"` // Channel-根据数据库ID排序/iD-根据通道ID排序 + Enable string `json:"enable"` + ChannelType string `json:"channel_type"` // dir-查询子目录 + Order string `json:"order"` // asc/desc + Sort string `json:"sort"` // Channel-根据数据库ID排序/iD-根据通道ID排序 SMS string `json:"sms"` } @@ -133,6 +136,35 @@ type DeleteDevice struct { UA string `json:"ua"` } +type SetEnable struct { + ID int `json:"id"` + Enable bool `json:"enable"` + ShareAllChannel bool `json:"shareallchannel"` +} + +type QueryCascadeChannelList struct { + QueryDeviceChannel + ID string `json:"id"` + Related bool `json:"related"` // 只看已选择 + Reverse bool `json:"reverse"` +} + +type ChannelListResult struct { + ChannelCount int `json:"ChannelCount"` + ChannelList []*LiveGBSChannel `json:"ChannelList"` +} + +type CascadeChannel struct { + CascadeID string + *LiveGBSChannel +} + +type CustomChannel struct { + DeviceID string `json:"serial"` + ChannelID string `json:"code"` + CustomID string `json:"id"` +} + var apiServer *ApiServer func init() { @@ -203,16 +235,28 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/stream/info", withVerify(apiServer.OnStreamInfo)) apiServer.router.HandleFunc("/api/v1/device/session/list", withVerify(common.WithQueryStringParams(apiServer.OnSessionList, QueryDeviceChannel{}))) // 推流列表 apiServer.router.HandleFunc("/api/v1/device/session/stop", withVerify(common.WithFormDataParams(apiServer.OnSessionStop, StreamIDParams{}))) // 关闭流 + apiServer.router.HandleFunc("/api/v1/device/setchannelid", withVerify(common.WithFormDataParams(apiServer.OnCustomChannelSet, CustomChannel{}))) // 关闭流 apiServer.router.HandleFunc("/api/v1/position/sub", common.WithJsonResponse(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置 apiServer.router.HandleFunc("/api/v1/playback/seek", common.WithJsonResponse(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek apiServer.router.HandleFunc("/api/v1/control/ptz", withVerify(common.WithFormDataParams(apiServer.OnPTZControl, QueryRecordParams{}))) // 云台控制 - apiServer.router.HandleFunc("/api/v1/cascade/list", apiServer.OnPlatformList) // 级联设备列表 - apiServer.router.HandleFunc("/api/v1/platform/add", common.WithJsonResponse(apiServer.OnPlatformAdd, &dao.PlatformModel{})) // 添加级联设备 - apiServer.router.HandleFunc("/api/v1/platform/remove", common.WithJsonResponse(apiServer.OnPlatformRemove, &dao.PlatformModel{})) // 删除级联设备 - apiServer.router.HandleFunc("/api/v1/platform/channel/bind", common.WithJsonResponse(apiServer.OnPlatformChannelBind, &PlatformChannel{})) // 级联绑定通道 - apiServer.router.HandleFunc("/api/v1/platform/channel/unbind", common.WithJsonResponse(apiServer.OnPlatformChannelUnbind, &PlatformChannel{})) // 级联解绑通道 + apiServer.router.HandleFunc("/api/v1/cascade/list", withVerify(common.WithQueryStringParams(apiServer.OnPlatformList, QueryDeviceChannel{}))) // 级联设备列表 + apiServer.router.HandleFunc("/api/v1/cascade/save", withVerify(common.WithFormDataParams(apiServer.OnPlatformAdd, LiveGBSCascade{}))) // 添加级联设备 + apiServer.router.HandleFunc("/api/v1/cascade/setenable", withVerify(common.WithFormDataParams(apiServer.OnEnableSet, SetEnable{}))) // 添加级联设备 + apiServer.router.HandleFunc("/api/v1/cascade/remove", withVerify(common.WithFormDataParams(apiServer.OnPlatformRemove, SetEnable{}))) // 删除级联设备 + apiServer.router.HandleFunc("/api/v1/cascade/channellist", withVerify(common.WithQueryStringParams(apiServer.OnPlatformChannelList, QueryCascadeChannelList{}))) // 级联设备通道列表 + + apiServer.router.HandleFunc("/api/v1/cascade/savechannels", withVerify(apiServer.OnPlatformChannelBind)) // 级联绑定通道 + apiServer.router.HandleFunc("/api/v1/cascade/removechannels", withVerify(apiServer.OnPlatformChannelUnbind)) // 级联解绑通道 + apiServer.router.HandleFunc("/api/v1/cascade/setshareallchannel", withVerify(common.WithFormDataParams(apiServer.OnShareAllChannel, SetEnable{}))) // 开启或取消级联所有通道 + apiServer.router.HandleFunc("/api/v1/cascade/pushcatalog", withVerify(common.WithFormDataParams(apiServer.OnCatalogPush, SetEnable{}))) // 推送目录 + + // 暂未开发 + apiServer.router.HandleFunc("/api/v1/alarm/list", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 报警查询 + apiServer.router.HandleFunc("/api/v1/cloudrecord/querychannels", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 云端录像 + apiServer.router.HandleFunc("/api/v1/user/list", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 用户管理 + apiServer.router.HandleFunc("/api/v1/log/list", withVerify(func(w http.ResponseWriter, req *http.Request) {})) // 操作日志 apiServer.router.HandleFunc("/api/v1/broadcast/invite", common.WithJsonResponse(apiServer.OnBroadcast, &BroadcastParams{Setup: &common.DefaultSetupType})) // 发起语音广播 apiServer.router.HandleFunc("/api/v1/broadcast/hangup", common.WithJsonResponse(apiServer.OnHangup, &BroadcastParams{})) // 挂断广播会话 @@ -281,15 +325,6 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt // 拉流地址携带的参数 query := r.URL.Query() - - // 播放授权 - streamToken := query.Get("stream_token") - if TokenManager.Find(streamToken) == nil { - w.WriteHeader(http.StatusUnauthorized) - log.Sugar.Errorf("播放鉴权失败, token不存在 token: %s", streamToken) - return - } - jtSource := query.Get("forward_type") == "gateway_1078" // 跳过非国标拉流 @@ -324,12 +359,21 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt log.Sugar.Errorf("通知1078信令服务器失败. 响应状态码: %d sim number: %s channel number: %s", response.StatusCode, simNumber, channelNumber) } } else { - // livegbs前端即使退出的播放,还是会拉流. 如果在hook中发起invite, 会造成不必要的请求. // 流不存在, 返回404 - if stream, _ := dao.Stream.QueryStream(params.Stream); stream == nil { - w.WriteHeader(http.StatusNotFound) - return + if params.Protocol < stack.TransStreamGBCascaded { + // 播放授权 + streamToken := query.Get("stream_token") + if TokenManager.Find(streamToken) == nil { + w.WriteHeader(http.StatusUnauthorized) + log.Sugar.Errorf("播放鉴权失败, token不存在 token: %s", streamToken) + return + } + + if stream, _ := dao.Stream.QueryStream(params.Stream); stream == nil { + w.WriteHeader(http.StatusNotFound) + return + } } inviteParams := &InviteParams{ @@ -421,11 +465,11 @@ func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, _ * func (api *ApiServer) OnPublishDone(params *StreamParams, _ http.ResponseWriter, _ *http.Request) { log.Sugar.Debugf("推流结束事件. protocol: %s stream: %s", params.Protocol, params.Stream) - stack.CloseStream(params.Stream, false) - // 对讲websocket断开连接 - if stack.SourceTypeGBTalk == params.Protocol { - - } + //stack.CloseStream(params.Stream, false) + //// 对讲websocket断开连接 + //if stack.SourceTypeGBTalk == params.Protocol { + // + //} } func (api *ApiServer) OnIdleTimeout(params *StreamParams, w http.ResponseWriter, _ *http.Request) { @@ -705,10 +749,15 @@ func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, _ http.ResponseWriter values := r.URL.Query() log.Sugar.Debugf("查询通道列表 %s", values.Encode()) - device, err := dao.Device.QueryDevice(q.DeviceID) - if err != nil { - log.Sugar.Errorf("查询设备失败 err: %s", err.Error()) - return nil, err + var deviceName string + if q.DeviceID != "" { + device, err := dao.Device.QueryDevice(q.DeviceID) + if err != nil { + log.Sugar.Errorf("查询设备失败 err: %s", err.Error()) + return nil, err + } + + deviceName = device.Name } var status string @@ -723,98 +772,19 @@ func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, _ http.ResponseWriter q.Order = "asc" } - channels, total, err := dao.Channel.QueryChannels(q.DeviceID, q.GroupID, (q.Start/q.Limit)+1, q.Limit, status, q.Keyword, q.Order, q.Sort) + channels, total, err := dao.Channel.QueryChannels(q.DeviceID, q.GroupID, (q.Start/q.Limit)+1, q.Limit, status, q.Keyword, q.Order, q.Sort, q.ChannelType == "dir") if err != nil { log.Sugar.Errorf("查询通道列表失败 err: %s", err.Error()) return nil, err } - response := struct { - ChannelCount int - ChannelList []LiveGBSChannel - }{ + response := ChannelListResult{ ChannelCount: total, } index := q.Start + 1 - for _, channel := range channels { - parental, _ := strconv.Atoi(channel.Parental) - port, _ := strconv.Atoi(channel.Port) - registerWay, _ := strconv.Atoi(channel.RegisterWay) - secrecy, _ := strconv.Atoi(channel.Secrecy) - - streamID := common.GenerateStreamID(common.InviteTypePlay, channel.RootID, channel.DeviceID, "", "") - if stream, err := dao.Stream.QueryStream(streamID); err != nil || stream == nil { - streamID = "" - } - - response.ChannelList = append(response.ChannelList, LiveGBSChannel{ - Address: channel.Address, - Altitude: 0, - AudioEnable: true, - BatteryLevel: 0, - Channel: index, - CivilCode: channel.CivilCode, - CloudRecord: false, - CreatedAt: channel.CreatedAt.Format("2006-01-02 15:04:05"), - Custom: false, - CustomAddress: "", - CustomBlock: "", - CustomCivilCode: "", - CustomFirmware: "", - CustomID: "", - CustomIPAddress: "", - CustomLatitude: 0, - CustomLongitude: 0, - CustomManufacturer: "", - CustomModel: "", - CustomName: "", - CustomPTZType: 0, - CustomParentID: "", - CustomPort: 0, - CustomSerialNumber: "", - CustomStatus: "", - Description: "", - DeviceCustomName: "", - DeviceID: channel.RootID, - DeviceName: device.Name, - DeviceOnline: device.Online(), - DeviceType: "GB", - Direction: 0, - DownloadSpeed: "", - Firmware: "", - ID: channel.DeviceID, - IPAddress: channel.IPAddress, - Latitude: 0, - Longitude: 0, - Manufacturer: channel.Manufacturer, - Model: channel.Model, - Name: channel.Name, - NumOutputs: 0, - Ondemand: true, - Owner: channel.Owner, - PTZType: 0, - ParentID: channel.ParentID, - Parental: parental, - Port: port, - Quality: "", - RegisterWay: registerWay, - Secrecy: secrecy, - SerialNumber: "", - Shared: false, - SignalLevel: 0, - SnapURL: "", - Speed: 0, - Status: channel.Status.String(), - StreamID: string(streamID), // 实时流ID - SubCount: channel.SubCount, - UpdatedAt: channel.UpdatedAt.Format("2006-01-02 15:04:05"), - }) - - index++ - } - - return response, nil + response.ChannelList = ChannelModels2LiveGBSChannels(index, channels, deviceName) + return &response, nil } func (api *ApiServer) OnRecordList(v *QueryRecordParams, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { @@ -1084,7 +1054,7 @@ func (api *ApiServer) OnStarted(_ http.ResponseWriter, _ *http.Request) { } } -func (api *ApiServer) OnPlatformAdd(v *dao.PlatformModel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { +func (api *ApiServer) OnPlatformAdd(v *LiveGBSCascade, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("添加级联设备 %v", *v) if v.Username == "" { @@ -1092,89 +1062,171 @@ func (api *ApiServer) OnPlatformAdd(v *dao.PlatformModel, _ http.ResponseWriter, log.Sugar.Infof("级联设备使用本级域: %s", common.Config.SipID) } + var err error if len(v.Username) != 20 { - err := fmt.Errorf("用户名长度必须20位") - log.Sugar.Errorf("添加级联设备失败 err: %s", err.Error()) + err = fmt.Errorf("用户名长度必须20位") return nil, err - } else if len(v.ServerID) != 20 { - err := fmt.Errorf("上级ID长度必须20位") + } else if len(v.Serial) != 20 { + err = fmt.Errorf("上级ID长度必须20位") + return nil, err + } + + if err != nil { log.Sugar.Errorf("添加级联设备失败 err: %s", err.Error()) return nil, err } v.Status = "OFF" - platform, err := stack.NewPlatform(&v.SIPUAOptions, common.SipStack) + model := dao.PlatformModel{ + SIPUAOptions: common.SIPUAOptions{ + Name: v.Name, + Username: v.Username, + Password: v.Password, + ServerID: v.Serial, + ServerAddr: net.JoinHostPort(v.Host, strconv.Itoa(v.Port)), + Transport: v.CommandTransport, + RegisterExpires: v.RegisterInterval, + KeepaliveInterval: v.KeepaliveInterval, + Status: common.OFF, + }, + } + + platform, err := stack.NewPlatform(&model.SIPUAOptions, common.SipStack) if err != nil { - log.Sugar.Errorf("创建级联设备失败 err: %s", err.Error()) return nil, err } - if !stack.PlatformManager.Add(v.ServerAddr, platform) { - log.Sugar.Errorf("ua添加失败, id冲突. key: %s", v.ServerAddr) - return fmt.Errorf("ua添加失败, id冲突. key: %s", v.ServerAddr), nil - } else if err = dao.Platform.SavePlatform(v); err != nil { - stack.PlatformManager.Remove(v.ServerAddr) - log.Sugar.Errorf("保存级联设备失败 err: %s", err.Error()) + // 编辑国标设备 + if v.ID != "" { + // 停止旧的 + oldPlatform := stack.PlatformManager.Remove(model.ServerAddr) + if oldPlatform != nil { + oldPlatform.Stop() + } + + // 更新数据库 + id, _ := strconv.ParseInt(v.ID, 10, 64) + model.ID = uint(id) + err = dao.Platform.UpdatePlatform(&model) + } else { + err = dao.Platform.SavePlatform(&model) + } + + if err == nil && v.Enable { + if !stack.PlatformManager.Add(model.ServerAddr, platform) { + err = fmt.Errorf("地址冲突. key: %s", model.ServerAddr) + if err != nil { + _ = dao.Platform.DeletePlatformByAddr(model.ServerAddr) + } + } else { + platform.Start() + } + } + + if err != nil { + log.Sugar.Errorf("添加级联设备失败 err: %s", err.Error()) return nil, err } - platform.Start() - return nil, err + return "OK", nil } -func (api *ApiServer) OnPlatformRemove(v *dao.PlatformModel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { +func (api *ApiServer) OnPlatformRemove(v *SetEnable, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { log.Sugar.Debugf("删除级联设备 %v", *v) - - err := dao.Platform.DeleteUAByAddr(v.ServerAddr) - if err != nil { - return nil, err - } else if platform := stack.PlatformManager.Remove(v.ServerAddr); platform != nil { - platform.Stop() - } - - return nil, err -} - -func (api *ApiServer) OnPlatformList(_ http.ResponseWriter, _ *http.Request) { - //platforms := LoadPlatforms() - //httpResponseOK(w, platforms) -} - -func (api *ApiServer) OnPlatformChannelBind(v *PlatformChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { - log.Sugar.Debugf("级联绑定通道 %v", *v) - - platform := stack.PlatformManager.Find(v.ServerAddr) + platform, _ := dao.Platform.QueryPlatformByID(v.ID) if platform == nil { - log.Sugar.Errorf("绑定通道失败, 级联设备不存在 addr: %s", v.ServerAddr) - return nil, fmt.Errorf("not found platform") + return nil, fmt.Errorf("级联设备不存在") } - // 级联功能,通道号必须唯一 - channels, err := dao.Platform.BindChannels(v.ServerAddr, v.Channels) - if err != nil { - log.Sugar.Errorf("绑定通道失败 err: %s", err.Error()) - return nil, err + _ = dao.Platform.DeletePlatformByID(v.ID) + client := stack.PlatformManager.Remove(platform.ServerAddr) + if client != nil { + client.Stop() } - return channels, nil + return "OK", nil } -func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { - log.Sugar.Debugf("级联解绑通道 %v", *v) +func (api *ApiServer) OnPlatformList(q *QueryDeviceChannel, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { + response := struct { + CascadeCount int `json:"CascadeCount"` + CascadeList []*LiveGBSCascade `json:"CascadeList"` + }{} - platform := stack.PlatformManager.Find(v.ServerAddr) - if platform == nil { - log.Sugar.Errorf("解绑通道失败, 级联设备不存在 addr: %s", v.ServerAddr) - return nil, fmt.Errorf("not found platform") + platforms, total, err := dao.Platform.QueryPlatforms((q.Start/q.Limit)+1, q.Limit, q.Keyword, q.Enable, q.Online) + if err == nil { + response.CascadeCount = total + for _, platform := range platforms { + host, p, _ := net.SplitHostPort(platform.ServerAddr) + port, _ := strconv.Atoi(p) + response.CascadeList = append(response.CascadeList, &LiveGBSCascade{ + ID: strconv.Itoa(int(platform.ID)), + Enable: platform.Enable, + Name: platform.Name, + Serial: platform.ServerID, + Realm: platform.ServerID[:10], + Host: host, + Port: port, + LocalSerial: platform.Username, + Username: platform.Username, + Password: platform.Password, + Online: platform.Status == common.ON, + Status: platform.Status, + RegisterInterval: platform.RegisterExpires, + KeepaliveInterval: platform.KeepaliveInterval, + CommandTransport: platform.Transport, + Charset: "GB2312", + CatalogGroupSize: 1, + LoadLimit: 0, + CivilCodeLimit: 8, + DigestAlgorithm: "", + GM: false, + Cert: "***", + CreateAt: platform.CreatedAt.Format("2006-01-02 15:04:05"), + UpdateAt: platform.UpdatedAt.Format("2006-01-02 15:04:05"), + }) + } + } + + return response, nil +} + +func (api *ApiServer) OnPlatformChannelBind(w http.ResponseWriter, r *http.Request) { + idStr := r.FormValue("id") + channels := r.Form["channels[]"] + + var err error + id, _ := strconv.Atoi(idStr) + _, err = dao.Platform.QueryPlatformByID(id) + if err == nil { + err = dao.Platform.BindChannels(id, channels) } - channels, err := dao.Platform.UnbindChannels(v.ServerAddr, v.Channels) if err != nil { - log.Sugar.Errorf("解绑通道失败 err: %s", err.Error()) - return nil, err + w.WriteHeader(http.StatusBadRequest) + _ = common.HttpResponseJson(w, err.Error()) + } else { + _ = common.HttpResponseJson(w, "OK") + } +} + +func (api *ApiServer) OnPlatformChannelUnbind(w http.ResponseWriter, r *http.Request) { + idStr := r.FormValue("id") + channels := r.Form["channels[]"] + + var err error + id, _ := strconv.Atoi(idStr) + _, err = dao.Platform.QueryPlatformByID(id) + if err == nil { + err = dao.Platform.UnbindChannels(id, channels) } - return channels, nil + if err != nil { + w.WriteHeader(http.StatusBadRequest) + _ = common.HttpResponseJson(w, err.Error()) + } else { + _ = common.HttpResponseJson(w, "OK") + } } func (api *ApiServer) OnDeviceMediaTransportSet(req *SetMediaTransportReq, _ http.ResponseWriter, _ *http.Request) (interface{}, error) { @@ -1318,7 +1370,7 @@ func (api *ApiServer) OnDeviceTree(q *QueryDeviceChannel, w http.ResponseWriter, if q.PCode == "" { q.PCode = q.DeviceID } - channels, _, _ := dao.Channel.QueryChannels(q.DeviceID, q.PCode, -1, 0, "", "", "asc", "") + channels, _, _ := dao.Channel.QueryChannels(q.DeviceID, q.PCode, -1, 0, "", "", "asc", "", false) for _, channel := range channels { id := channel.RootID + ":" + channel.DeviceID latitude, _ := strconv.ParseFloat(channel.Latitude, 10) @@ -1365,3 +1417,141 @@ func (api *ApiServer) OnDeviceRemove(q *DeleteDevice, w http.ResponseWriter, req return "OK", nil } + +func (api *ApiServer) OnEnableSet(params *SetEnable, w http.ResponseWriter, req *http.Request) (interface{}, error) { + model, err := dao.Platform.QueryPlatformByID(params.ID) + if err != nil { + return nil, err + } + + err = dao.Platform.UpdateEnable(params.ID, params.Enable) + if err != nil { + return nil, err + } + + if params.Enable { + if stack.PlatformManager.Find(model.ServerAddr) != nil { + return nil, errors.New("device already started") + } + + platform, err := stack.NewPlatform(&model.SIPUAOptions, common.SipStack) + if err != nil { + _ = dao.Platform.UpdateEnable(params.ID, false) + return nil, err + } + + stack.PlatformManager.Add(platform.ServerAddr, platform) + platform.Start() + } else if client := stack.PlatformManager.Remove(model.ServerAddr); client != nil { + client.Stop() + } + + return "OK", nil +} + +func (api *ApiServer) OnPlatformChannelList(q *QueryCascadeChannelList, w http.ResponseWriter, req *http.Request) (interface{}, error) { + response := struct { + ChannelCount int `json:"ChannelCount"` + ChannelList []*CascadeChannel `json:"ChannelList"` + + ChannelRelateCount *int `json:"ChannelRelateCount,omitempty"` + ShareAllChannel *bool `json:"ShareAllChannel,omitempty"` + }{} + + id, err := strconv.Atoi(q.ID) + if err != nil { + return nil, err + } + + // livegbs前端, 如果开启级联所有通道, 是不允许再只看已选择或取消绑定通道 + platform, err := dao.Platform.QueryPlatformByID(id) + if err != nil { + return nil, err + } + + // 只看已选择 + if q.Related == true { + list, total, err := dao.Platform.QueryPlatformChannelList(id) + if err != nil { + return nil, err + } + + response.ChannelCount = total + ChannelList := ChannelModels2LiveGBSChannels(q.Start+1, list, "") + for _, channel := range ChannelList { + response.ChannelList = append(response.ChannelList, &CascadeChannel{ + CascadeID: q.ID, + LiveGBSChannel: channel, + }) + } + } else { + list, err := api.OnChannelList(&q.QueryDeviceChannel, w, req) + if err != nil { + return nil, err + } + + result := list.(*ChannelListResult) + response.ChannelCount = result.ChannelCount + + for _, channel := range result.ChannelList { + var cascadeId string + if exist, _ := dao.Platform.QueryPlatformChannelExist(id, channel.DeviceID, channel.ID); exist { + cascadeId = q.ID + } + + // 判断该通道是否选中 + response.ChannelList = append(response.ChannelList, &CascadeChannel{ + cascadeId, channel, + }) + } + + response.ChannelRelateCount = new(int) + response.ShareAllChannel = new(bool) + + // 级联设备通道总数 + if count, err := dao.Platform.QueryPlatformChannelCount(id); err != nil { + return nil, err + } else { + response.ChannelRelateCount = &count + } + + *response.ShareAllChannel = platform.ShareAll + } + + return &response, nil +} + +func (api *ApiServer) OnShareAllChannel(q *SetEnable, w http.ResponseWriter, req *http.Request) (interface{}, error) { + var err error + if q.ShareAllChannel { + // 删除所有已经绑定的通道, 设置级联所有通道为true + if err = dao.Platform.DeletePlatformChannels(q.ID); err == nil { + err = dao.Platform.SetShareAllChannel(q.ID, true) + } + } else { + // 设置级联所有通道为false + err = dao.Platform.SetShareAllChannel(q.ID, false) + } + + if err != nil { + return nil, err + } + + return "OK", nil +} + +func (api *ApiServer) OnCustomChannelSet(q *CustomChannel, w http.ResponseWriter, req *http.Request) (interface{}, error) { + if len(q.CustomID) != 20 { + return nil, fmt.Errorf("20位国标ID") + } + + if err := dao.Channel.UpdateCustomID(q.DeviceID, q.ChannelID, q.CustomID); err != nil { + return nil, err + } + + return "OK", nil +} + +func (api *ApiServer) OnCatalogPush(q *SetEnable, w http.ResponseWriter, req *http.Request) (interface{}, error) { + return "OK", nil +} diff --git a/common/http_request.go b/common/http_request.go index 9ec560b..e1b3e9d 100644 --- a/common/http_request.go +++ b/common/http_request.go @@ -27,6 +27,16 @@ func parseQueryParams(c func(key string) string, v interface{}) (interface{}, er field := typ.Field(i) fieldValue := val.Field(i) + // 处理组合字段 + if field.Anonymous { + embedded := reflect.New(field.Type).Elem() + if _, err := parseQueryParams(c, embedded.Addr().Interface()); err != nil { + return nil, err + } + fieldValue.Set(embedded) + continue + } + // 获取字段名 fieldName := field.Tag.Get("json") if fieldName == "" { diff --git a/dao/blacklist.go b/dao/blacklist.go index de7e4d7..6f42771 100644 --- a/dao/blacklist.go +++ b/dao/blacklist.go @@ -12,22 +12,6 @@ func (d *BlacklistModel) TableName() string { return "lkm_blacklist" } -type DaoBlacklist interface { - Load() ([]*BlacklistModel, error) - - SaveIP(ip string) error - - DeleteIP(ip string) error - - SaveUA(ua string) error - - DeleteUA(ua string) error - - QueryIP(ip string) (*BlacklistModel, error) - - QueryUA(ua string) (*BlacklistModel, error) -} - type daoBlacklist struct { } diff --git a/dao/channel.go b/dao/channel.go index 09e6b41..f9b6598 100644 --- a/dao/channel.go +++ b/dao/channel.go @@ -53,6 +53,7 @@ type ChannelModel struct { ChannelNumber int `json:"channel_number" xml:"-"` // 对应1078的通道号 SubCount int `json:"-" xml:"-"` // 子节点数量 IsDir bool `json:"-" xml:"-"` // 是否是目录 + CustomID *string `gorm:"unique"` // 自定义通道ID } func (d *ChannelModel) TableName() string { @@ -63,54 +64,6 @@ func (d *ChannelModel) Online() bool { return d.Status == common.ON } -type DaoChannel interface { - SaveChannel(channel *ChannelModel) error - - SaveChannels(channel []*ChannelModel) error - - UpdateChannelStatus(deviceId, channelId, status string) error - - QueryChannelByID(id uint) (*ChannelModel, error) - - QueryChannel(deviceId string, channelId string) (*ChannelModel, error) - - QueryChannels(deviceId, groupId, string, page, size int, keyword string, order string) ([]*ChannelModel, int, error) - - QueryChannelsByRootID(rootId string) ([]*ChannelModel, error) - - QueryChannelsByChannelID(channelId string) ([]*ChannelModel, error) - - QueryChanelCount(deviceId string, hasDir bool) (int, error) - - QuerySubChannelCount(deviceId string, groupId string, hasDir bool) (int, error) - - QueryOnlineChanelCount(deviceId string, hasDir bool) (int, error) - - QueryOnlineSubChannelCount(deviceId string, groupId string, hasDir bool) (int, error) - - QueryChannelByTypeCode(codecs ...int) ([]*ChannelModel, error) - - ExistChannel(channelId string) bool - - SaveJTChannel(channel *ChannelModel) error - - ExistJTChannel(simNumber string, channelNumber int) bool - - QueryJTChannelBySimNumber(simNumber string) (*ChannelModel, error) - - DeleteChannel(deviceId string, channelId string) error - - DeleteChannels(deviceId string) error - - UpdateRootID(rootId, newRootId string) error - - UpdateChannel(channel *ChannelModel) error - - TotalCount() (int, error) - - OnlineCount(ids []string) (int, error) -} - type daoChannel struct { } @@ -152,15 +105,21 @@ func (d *daoChannel) QueryChannel(deviceId string, channelId string) (*ChannelMo return &channel, nil } -func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int, status string, keyword string, order, sort string) ([]*ChannelModel, int, error) { +func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int, status string, keyword string, order, sort string, isDir bool) ([]*ChannelModel, int, error) { conditions := map[string]interface{}{} - conditions["root_id"] = deviceId + if deviceId != "" { + conditions["root_id"] = deviceId + } + if groupId != "" { conditions["group_id"] = groupId } if status != "" { conditions["status"] = status } + if isDir { + conditions["is_dir"] = 1 + } cTx := db.Where(conditions) @@ -278,6 +237,16 @@ func (d *daoChannel) QueryChannelsByChannelID(channelId string) ([]*ChannelModel return channels, nil } +// QueryChannelByCustomID 根据自定义通道ID查询通道 +func (d *daoChannel) QueryChannelByCustomID(customID string) (*ChannelModel, error) { + var channel ChannelModel + tx := db.Where("custom_id =?", customID).Take(&channel) + if tx.Error != nil { + return nil, tx.Error + } + return &channel, nil +} + func (d *daoChannel) UpdateRootID(rootId, newRootId string) error { channel := &ChannelModel{ RootID: newRootId, @@ -335,3 +304,8 @@ func (d *daoChannel) QueryOnlineSubChannelCount(rootId string, groupId string, h } return int(total), nil } + +// UpdateCustomID 更新自定义通道ID +func (d *daoChannel) UpdateCustomID(rootId, channelId string, customID string) error { + return db.Model(&ChannelModel{}).Where("root_id =? and device_id =?", rootId, channelId).Update("custom_id", customID).Error +} diff --git a/dao/device.go b/dao/device.go index 194fc3e..86dfa13 100644 --- a/dao/device.go +++ b/dao/device.go @@ -44,36 +44,6 @@ func (d *DeviceModel) GetID() string { return d.DeviceID } -type DaoDevice interface { - LoadOnlineDevices() (map[string]*DeviceModel, error) - - LoadDevices() (map[string]*DeviceModel, error) - - SaveDevice(device *DeviceModel) error - - RefreshHeartbeat(deviceId string, now time.Time, addr string) error - - QueryDevice(id string) (*DeviceModel, error) - - QueryDevices(page int, size int, status string, keyword string, order string) ([]*DeviceModel, int, error) - - UpdateDeviceStatus(deviceId string, status common.OnlineStatus) error - - UpdateDeviceInfo(deviceId string, device *DeviceModel) error - - UpdateOfflineDevices(deviceIds []string) error - - ExistDevice(deviceId string) bool - - UpdateMediaTransport(deviceId string, setupType common.SetupType) error - - DeleteDevice(deviceId string) error - - DeleteDevicesByIP(ip string) error - - DeleteDevicesByUA(ua string) error -} - type daoDevice struct { } @@ -113,7 +83,7 @@ func (d *daoDevice) SaveDevice(device *DeviceModel) error { } return err } else { - return tx.Model(device).Select("Transport", "RemoteAddr", "Status", "RegisterTime", "LastHeartbeat").Updates(*device).Error + return tx.Model(device).Select("Transport", "RemoteIP", "RemotePort", "Status", "RegisterTime", "LastHeartbeat").Updates(*device).Error } }) } diff --git a/dao/jt.go b/dao/jt.go index f1baddf..e71a4d1 100644 --- a/dao/jt.go +++ b/dao/jt.go @@ -30,29 +30,6 @@ func (g *JTDeviceModel) TableName() string { return "lkm_jt_device" } -// DaoJTDevice 保存级联和1078设备的sipua参数项 -type DaoJTDevice interface { - LoadDevices() ([]*JTDeviceModel, error) - - UpdateOnlineStatus(status common.OnlineStatus, username string) error - - QueryDevice(user string) (*JTDeviceModel, error) - - QueryDeviceBySimNumber(simNumber string) (*JTDeviceModel, error) - - QueryDeviceByID(id uint) (*JTDeviceModel, error) - - ExistDevice(username, simNumber string) bool - - DeleteDevice(username string) error - - SaveDevice(model *JTDeviceModel) error - - UpdateDevice(model *JTDeviceModel) error - - QueryDevices(page int, size int) ([]*JTDeviceModel, int, error) -} - type daoJTDevice struct { } diff --git a/dao/platform.go b/dao/platform.go index 5968aed..223d468 100644 --- a/dao/platform.go +++ b/dao/platform.go @@ -2,39 +2,31 @@ package dao import ( "gb-cms/common" - "gb-cms/log" + "gorm.io/gorm" + "strings" ) // PlatformModel 数据库表结构 type PlatformModel struct { GBModel common.SIPUAOptions + Enable bool // 启用/禁用 + ShareAll bool // 级联所有通道 } func (g *PlatformModel) TableName() string { return "lkm_platform" } -// DaoVirtualDevice 保存级联和1078设备的sipua参数项 -type DaoVirtualDevice interface { - LoadPlatforms() ([]*PlatformModel, error) +type PlatformChannelModel struct { + GBModel + DeviceID string `json:"device_id"` + ChannelID string `json:"channel_id"` + PID uint `json:"pid"` // 级联设备数据库ID +} - QueryPlatform(addr string) (*PlatformModel, error) - - SavePlatform(platform *PlatformModel) error - - DeletePlatform(addr string) error - - UpdatePlatform(platform *PlatformModel) error - - BindChannels(addr string, channels [][2]string) ([][2]string, error) - - UnbindChannels(addr string, channels [][2]string) ([][2]string, error) - - // QueryPlatformChannel 查询级联设备的某个通道, 返回通道所属设备ID、通道. - QueryPlatformChannel(addr string, channelId string) (string, *ChannelModel, error) - - QueryPlatformChannels(addr string) ([]*ChannelModel, error) +func (d *PlatformChannelModel) TableName() string { + return "lkm_platform_channel" } type daoPlatform struct { @@ -50,7 +42,7 @@ func (d *daoPlatform) LoadPlatforms() ([]*PlatformModel, error) { return platforms, nil } -func (d *daoPlatform) QueryUAByAddr(addr string) (*PlatformModel, error) { +func (d *daoPlatform) QueryPlatformByAddr(addr string) (*PlatformModel, error) { var platform PlatformModel tx := db.Where("server_addr =?", addr).First(&platform) if tx.Error != nil { @@ -69,95 +61,232 @@ func (d *daoPlatform) SavePlatform(platform *PlatformModel) error { return db.Save(platform).Error } -func (d *daoPlatform) DeleteUAByAddr(addr string) error { +func (d *daoPlatform) DeletePlatformByAddr(addr string) error { + // 删除绑定的通道 return db.Where("server_addr =?", addr).Unscoped().Delete(&PlatformModel{}).Error } func (d *daoPlatform) UpdatePlatform(platform *PlatformModel) error { - //TODO implement me - panic("implement me") + return DBTransaction(func(tx *gorm.DB) error { + return tx.Save(platform).Error + }) } func (d *daoPlatform) UpdateOnlineStatus(status common.OnlineStatus, addr string) error { return db.Model(&PlatformModel{}).Where("server_addr =?", addr).Update("status", status).Error } -type PlatformChannelModel struct { - GBModel - DeviceID string `json:"device_id"` - Channel string `json:"channel_id"` - ServerAddr string `json:"server_addr"` -} +func (d *daoPlatform) BindChannels(pid int, channels []string) error { + return DBTransaction(func(tx *gorm.DB) error { + for _, channel := range channels { + ids := strings.Split(channel, ":") + var old PlatformChannelModel + // 检查是否已经绑定 + tx.Where("device_id =? and channel_id =? and p_id =?", ids[0], ids[1], pid).First(&old) + if old.ID != 0 { + continue + } -func (d *PlatformChannelModel) TableName() string { - return "lkm_platform_channel" -} + // 检查通道是否存在 + _, err := Channel.QueryChannel(ids[0], ids[1]) + if err != nil { + continue + } -func (d *daoPlatform) BindChannels(addr string, channels [][2]string) ([][2]string, error) { - var res [][2]string - for _, channel := range channels { - - var old PlatformChannelModel - _ = db.Where("device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr).First(&old) - if old.ID == 0 { - _ = db.Create(&PlatformChannelModel{ - DeviceID: channel[0], - Channel: channel[1], + // 插入绑定关系 + _ = tx.Create(&PlatformChannelModel{ + DeviceID: ids[0], + ChannelID: ids[1], + PID: uint(pid), }) } - res = append(res, channel) - } - - return res, nil + return nil + }) } -func (d *daoPlatform) UnbindChannels(addr string, channels [][2]string) ([][2]string, error) { - var res [][2]string - for _, channel := range channels { - tx := db.Unscoped().Delete(&PlatformChannelModel{}, "device_id =? and channel_id =? and server_addr =?", channel[0], channel[1], addr) - if tx.Error == nil { - res = append(res, channel) - } else { - log.Sugar.Errorf("解绑级联设备通道失败. device_id: %s, channel_id: %s err: %s", channel[0], channel[1], tx.Error) +func (d *daoPlatform) UnbindChannels(pid int, channels []string) error { + return DBTransaction(func(tx *gorm.DB) error { + for _, channel := range channels { + ids := strings.Split(channel, ":") + tx.Unscoped().Delete(&PlatformChannelModel{}, "device_id =? and channel_id =? and p_id =?", ids[0], ids[1], pid) } - } - - return res, nil + return nil + }) } func (d *daoPlatform) QueryPlatformChannel(addr string, channelId string) (string, *ChannelModel, error) { + model, err := d.QueryPlatformByAddr(addr) + if err != nil { + return "", nil, err + } + + if model.ShareAll { + channel, _ := Channel.QueryChannelByCustomID(channelId) + if channel != nil { + return channel.RootID, channel, nil + } + + channels, err := Channel.QueryChannelsByChannelID(channelId) + if err != nil { + return "", nil, err + } + return channels[0].RootID, channels[0], nil + } + var platformChannel PlatformChannelModel - tx := db.Model(&PlatformChannelModel{}).Where("channel_id =? and server_addr =?", channelId, addr).First(&platformChannel) + tx := db.Model(&PlatformChannelModel{}).Where("channel_id =? and p_id =?", channelId, model.ID).First(&platformChannel) if tx.Error != nil { return "", nil, tx.Error } - var channel ChannelModel - tx = db.Where("device_id =? and channel_id =?", platformChannel.DeviceID, platformChannel.Channel).First(&channel) + // 优先查询自定义通道 + channel, _ := Channel.QueryChannelByCustomID(channelId) + if channel != nil { + return channel.RootID, channel, nil + } + + tx = db.Where("root_id =? and device_id =?", platformChannel.DeviceID, platformChannel.ChannelID).First(&channel) if tx.Error != nil { return "", nil, tx.Error } - return platformChannel.DeviceID, &channel, nil + return channel.RootID, channel, nil } func (d *daoPlatform) QueryPlatformChannels(addr string) ([]*ChannelModel, error) { + model, err := d.QueryPlatformByAddr(addr) + if err != nil { + return nil, err + } + + // 返回所有通道 + if model.ShareAll { + channels, _, _ := Channel.QueryChannels("", "", -1, -1, "", "", "", "", false) + return channels, nil + } + var platformChannels []*PlatformChannelModel - tx := db.Where("server_addr =?", addr).Find(&platformChannels) + tx := db.Where("p_id =?", model.ID).Find(&platformChannels) if tx.Error != nil { return nil, tx.Error } var channels []*ChannelModel for _, platformChannel := range platformChannels { - var channel ChannelModel - tx = db.Where("device_id =? and channel_id =?", platformChannel.DeviceID, platformChannel.Channel).First(&channel) - if tx.Error == nil { - channels = append(channels, &channel) - } else { - log.Sugar.Errorf("查询级联设备通道失败. device_id: %s, channel_id: %s err: %s", platformChannel.DeviceID, platformChannel.Channel, tx.Error) + queryChannel, err := Channel.QueryChannel(platformChannel.DeviceID, platformChannel.ChannelID) + if err != nil { + continue } + channels = append(channels, queryChannel) } return channels, nil } + +func (d *daoPlatform) QueryPlatforms(page, size int, keyword, enable, status string) ([]*PlatformModel, int, error) { + var platforms []*PlatformModel + var total int64 + query := db.Model(&PlatformModel{}) + if keyword != "" { + query = query.Where("username like ?", "%"+keyword+"%") + } + + if enable == "true" { + query = query.Where("enable = ?", 1) + } else if enable == "false" { + query = query.Where("enable = ?", 0) + } + + if status == "true" { + query = query.Where("status = ?", "ON") + } else if status == "false" { + query = query.Where("status = ?", "OFF") + } + + query.Count(&total) + query.Offset((page - 1) * size).Limit(size).Find(&platforms) + return platforms, int(total), nil +} + +func (d *daoPlatform) UpdateEnable(id int, enable bool) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Model(&PlatformModel{}).Where("id =?", id).Update("enable", enable).Error + }) +} + +func (d *daoPlatform) QueryPlatformByID(id int) (*PlatformModel, error) { + var platform PlatformModel + tx := db.Where("id =?", id).First(&platform) + if tx.Error != nil { + return nil, tx.Error + } + + return &platform, nil +} + +func (d *daoPlatform) DeletePlatformByID(id int) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Unscoped().Delete(&PlatformModel{}, id).Error + }) +} + +// QueryPlatformChannelList 查询级联设备通道列表 +func (d *daoPlatform) QueryPlatformChannelList(id int) ([]*ChannelModel, int, error) { + var platformChannels []*PlatformChannelModel + tx := db.Where("p_id =?", id).Find(&platformChannels) + if tx.Error != nil { + return nil, 0, tx.Error + } + + // 查询通道总数 + count, err := d.QueryPlatformChannelCount(id) + if err != nil { + return nil, 0, err + } + + var channels []*ChannelModel + for _, platformChannel := range platformChannels { + channel, err := Channel.QueryChannel(platformChannel.DeviceID, platformChannel.ChannelID) + if err == nil { + channels = append(channels, channel) + } + } + + return channels, count, nil +} + +// QueryPlatformChannelCount 查询级联设备的通道总数 +func (d *daoPlatform) QueryPlatformChannelCount(id int) (int, error) { + var total int64 + tx := db.Model(&PlatformChannelModel{}).Where("p_id =?", id).Count(&total) + if tx.Error != nil { + return 0, tx.Error + } + + return int(total), nil +} + +// QueryPlatformChannelExist 查询某个通道是否绑定到某个级联设备 +func (d *daoPlatform) QueryPlatformChannelExist(pid int, deviceId, channelId string) (bool, error) { + var total int64 + tx := db.Model(&PlatformChannelModel{}).Where("p_id =? and device_id =? and channel_id =?", pid, deviceId, channelId).Count(&total) + if tx.Error != nil { + return false, tx.Error + } + + return total > 0, nil +} + +// DeletePlatformChannels 删除级联设备的所有通道 +func (d *daoPlatform) DeletePlatformChannels(id int) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Unscoped().Delete(&PlatformChannelModel{}, "p_id =?", id).Error + }) +} + +// SetShareAllChannel 设置级联设备是否分享所有通道 +func (d *daoPlatform) SetShareAllChannel(id int, shareAll bool) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Model(&PlatformModel{}).Where("id =?", id).Update("share_all", shareAll).Error + }) +} diff --git a/dao/sink.go b/dao/sink.go index f5f6057..d81fd65 100644 --- a/dao/sink.go +++ b/dao/sink.go @@ -24,34 +24,6 @@ func (d *SinkModel) TableName() string { return "lkm_sink" } -type DaoSink interface { - LoadForwardSinks() (map[string]*SinkModel, error) - - // QueryForwardSink 查询转发流Sink - QueryForwardSink(stream common.StreamID, sink string) (*SinkModel, error) - - QueryForwardSinks(stream common.StreamID) (map[string]*SinkModel, error) - - // SaveForwardSink 保存转发流Sink - SaveForwardSink(stream common.StreamID, sink *SinkModel) error - - DeleteForwardSink(stream common.StreamID, sink string) (*SinkModel, error) - - DeleteForwardSinksByStreamID(stream common.StreamID) ([]*SinkModel, error) - - DeleteForwardSinks() ([]*SinkModel, error) - - DeleteForwardSinksByIds(ids []uint) error - - QueryForwardSinkByCallID(callID string) (*SinkModel, error) - - DeleteForwardSinkByCallID(callID string) (*SinkModel, error) - - DeleteForwardSinkBySinkStreamID(sinkStreamID common.StreamID) (*SinkModel, error) - - DeleteForwardSinksByServerAddr(addr string) ([]*SinkModel, error) -} - type daoSink struct { } diff --git a/dao/stream.go b/dao/stream.go index 723fa43..7710867 100644 --- a/dao/stream.go +++ b/dao/stream.go @@ -9,12 +9,12 @@ import ( type StreamModel struct { GBModel - DeviceID string `gorm:"index"` // 下级设备ID, 统计某个设备的所有流/1078设备为sim number - ChannelID string `gorm:"index"` // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number - StreamID common.StreamID `json:"stream_id" gorm:"index"` // 流ID - Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk - Dialog *common.RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话 - SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发) + DeviceID string `gorm:"index"` // 下级设备ID, 统计某个设备的所有流/1078设备为sim number + ChannelID string `gorm:"index"` // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number + StreamID common.StreamID `json:"stream_id" gorm:"index,unique"` // 流ID + Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk + Dialog *common.RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话 + SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发) SetupType common.SetupType CallID string `json:"call_id" gorm:"index"` Urls []string `gorm:"serializer:json"` // 从流媒体服务器返回的拉流地址 @@ -31,30 +31,6 @@ func (s *StreamModel) SetDialog(dialog sip.Request) { s.CallID = id.Value() } -type DaoStream interface { - LoadStreams() (map[string]*StreamModel, error) - - SaveStream(stream *StreamModel) (*StreamModel, bool) - - UpdateStream(stream *StreamModel) error - - DeleteStream(streamId common.StreamID) (*StreamModel, error) - - DeleteStreams() ([]*StreamModel, error) - - DeleteStreamsByIds(ids []uint) error - - QueryStream(streamId common.StreamID) (*StreamModel, error) - - QueryStreams(keyword string, page, size int) ([]*StreamModel, int, error) - - QueryStreamByCallID(callID string) (*StreamModel, error) - - DeleteStreamByCallID(callID string) (*StreamModel, error) - - DeleteStreamByDeviceID(deviceID string) ([]*StreamModel, error) -} - type daoStream struct { } diff --git a/livegbs_bean.go b/livegbs_bean.go index af5c1d0..ae43793 100644 --- a/livegbs_bean.go +++ b/livegbs_bean.go @@ -1,5 +1,12 @@ package main +import ( + "gb-cms/common" + "gb-cms/dao" + "gb-cms/stack" + "strconv" +) + type LiveGBSDevice struct { AlarmSubscribe bool `json:"AlarmSubscribe"` CatalogInterval int `json:"CatalogInterval"` @@ -44,67 +51,67 @@ type LiveGBSDevice struct { } type LiveGBSChannel struct { - Address string `json:"Address"` - Altitude int `json:"Altitude"` - AudioEnable bool `json:"AudioEnable"` - BatteryLevel int `json:"BatteryLevel"` - Block string `json:"Block"` - Channel int `json:"Channel"` - CivilCode string `json:"CivilCode"` - CloudRecord bool `json:"CloudRecord"` - CreatedAt string `json:"CreatedAt"` - Custom bool `json:"Custom"` - CustomAddress string `json:"CustomAddress"` - CustomBlock string `json:"CustomBlock"` - CustomCivilCode string `json:"CustomCivilCode"` - CustomFirmware string `json:"CustomFirmware"` - CustomID string `json:"CustomID"` - CustomIPAddress string `json:"CustomIPAddress"` - CustomLatitude int `json:"CustomLatitude"` - CustomLongitude int `json:"CustomLongitude"` - CustomManufacturer string `json:"CustomManufacturer"` - CustomModel string `json:"CustomModel"` - CustomName string `json:"CustomName"` - CustomPTZType int `json:"CustomPTZType"` - CustomParentID string `json:"CustomParentID"` - CustomPort int `json:"CustomPort"` - CustomSerialNumber string `json:"CustomSerialNumber"` - CustomStatus string `json:"CustomStatus"` - Description string `json:"Description"` - DeviceCustomName string `json:"DeviceCustomName"` - DeviceID string `json:"DeviceID"` - DeviceName string `json:"DeviceName"` - DeviceOnline bool `json:"DeviceOnline"` - DeviceType string `json:"DeviceType"` - Direction int `json:"Direction"` - DownloadSpeed string `json:"DownloadSpeed"` - Firmware string `json:"Firmware"` - ID string `json:"ID"` - IPAddress string `json:"IPAddress"` - Latitude int `json:"Latitude"` - Longitude int `json:"Longitude"` - Manufacturer string `json:"Manufacturer"` - Model string `json:"Model"` - Name string `json:"Name"` - NumOutputs int `json:"NumOutputs"` - Ondemand bool `json:"Ondemand"` - Owner string `json:"Owner"` - PTZType int `json:"PTZType"` - ParentID string `json:"ParentID"` - Parental int `json:"Parental"` - Port int `json:"Port"` - Quality string `json:"Quality"` - RegisterWay int `json:"RegisterWay"` - Secrecy int `json:"Secrecy"` - SerialNumber string `json:"SerialNumber"` - Shared bool `json:"Shared"` - SignalLevel int `json:"SignalLevel"` - SnapURL string `json:"SnapURL"` - Speed int `json:"Speed"` - Status string `json:"Status"` - StreamID string `json:"StreamID"` - SubCount int `json:"SubCount"` - UpdatedAt string `json:"UpdatedAt"` + Address string `json:"Address"` + Altitude int `json:"Altitude"` + AudioEnable bool `json:"AudioEnable"` + BatteryLevel int `json:"BatteryLevel"` + Block string `json:"Block"` + Channel int `json:"Channel"` + CivilCode string `json:"CivilCode"` + CloudRecord bool `json:"CloudRecord"` + CreatedAt string `json:"CreatedAt"` + Custom bool `json:"Custom"` + CustomAddress string `json:"CustomAddress"` + CustomBlock string `json:"CustomBlock"` + CustomCivilCode string `json:"CustomCivilCode"` + CustomFirmware string `json:"CustomFirmware"` + CustomID string `json:"CustomID"` + CustomIPAddress string `json:"CustomIPAddress"` + CustomLatitude int `json:"CustomLatitude"` + CustomLongitude int `json:"CustomLongitude"` + CustomManufacturer string `json:"CustomManufacturer"` + CustomModel string `json:"CustomModel"` + CustomName string `json:"CustomName"` + CustomPTZType int `json:"CustomPTZType"` + CustomParentID string `json:"CustomParentID"` + CustomPort int `json:"CustomPort"` + CustomSerialNumber string `json:"CustomSerialNumber"` + CustomStatus string `json:"CustomStatus"` + Description string `json:"Description"` + DeviceCustomName string `json:"DeviceCustomName"` + DeviceID string `json:"DeviceID"` + DeviceName string `json:"DeviceName"` + DeviceOnline bool `json:"DeviceOnline"` + DeviceType string `json:"DeviceType"` + Direction int `json:"Direction"` + DownloadSpeed string `json:"DownloadSpeed"` + Firmware string `json:"Firmware"` + ID string `json:"ID"` + IPAddress string `json:"IPAddress"` + Latitude float64 `json:"Latitude"` + Longitude float64 `json:"Longitude"` + Manufacturer string `json:"Manufacturer"` + Model string `json:"Model"` + Name string `json:"Name"` + NumOutputs int `json:"NumOutputs"` + Ondemand bool `json:"Ondemand"` + Owner string `json:"Owner"` + PTZType int `json:"PTZType"` + ParentID string `json:"ParentID"` + Parental int `json:"Parental"` + Port int `json:"Port"` + Quality string `json:"Quality"` + RegisterWay int `json:"RegisterWay"` + Secrecy int `json:"Secrecy"` + SerialNumber string `json:"SerialNumber"` + Shared bool `json:"Shared"` + SignalLevel int `json:"SignalLevel"` + SnapURL string `json:"SnapURL"` + Speed int `json:"Speed"` + Status string `json:"Status"` + StreamID string `json:"StreamID"` + SubCount int `json:"SubCount"` + UpdatedAt string `json:"UpdatedAt"` } type LiveGBSStreamStart struct { @@ -171,3 +178,135 @@ type LiveGBSDeviceTree struct { SubCount int `json:"subCount"` // 包含目录的总数 SubCountDevice int `json:"subCountDevice"` // 不包含目录的总数 } + +type LiveGBSCascade struct { + Load int + Manufacturer string + ID string + Enable bool // 是否启用 + Name string + Serial string // 上级ID + Realm string // 上级域 + Host string // 上级IP + Port int // 上级端口 + LocalSerial string + LocalHost string + LocalPort int + Username string // 向上级sip通信的用户名 + Password string // 向上级注册的密码 + Online bool + Status common.OnlineStatus + RegisterTimeout int + KeepaliveInterval int + RegisterInterval int + StreamKeepalive bool + StreamReader bool + BindLocalIP bool + AllowControl bool + ShareRecord bool + MergeRecord bool + ShareAllChannel bool + CommandTransport string + Charset string + CatalogGroupSize int + LoadLimit int + CivilCodeLimit int + DigestAlgorithm string + GM bool + Cert string + CreateAt string + UpdateAt string +} + +func ChannelModels2LiveGBSChannels(index int, channels []*dao.ChannelModel, deviceName string) []*LiveGBSChannel { + var ChannelList []*LiveGBSChannel + + for _, channel := range channels { + parental, _ := strconv.Atoi(channel.Parental) + port, _ := strconv.Atoi(channel.Port) + registerWay, _ := strconv.Atoi(channel.RegisterWay) + secrecy, _ := strconv.Atoi(channel.Secrecy) + + streamID := common.GenerateStreamID(common.InviteTypePlay, channel.RootID, channel.DeviceID, "", "") + if stream, err := dao.Stream.QueryStream(streamID); err != nil || stream == nil { + streamID = "" + } + + _, online := stack.OnlineDeviceManager.Find(channel.RootID) + // 转换经纬度 + latitude, _ := strconv.ParseFloat(channel.Latitude, 64) + longitude, _ := strconv.ParseFloat(channel.Longitude, 64) + + var customID string + if channel.CustomID != nil { + customID = *channel.CustomID + } + + ChannelList = append(ChannelList, &LiveGBSChannel{ + Address: channel.Address, + Altitude: 0, + AudioEnable: true, + BatteryLevel: 0, + Channel: index, + CivilCode: channel.CivilCode, + CloudRecord: false, + CreatedAt: channel.CreatedAt.Format("2006-01-02 15:04:05"), + Custom: false, + CustomAddress: "", + CustomBlock: "", + CustomCivilCode: "", + CustomFirmware: "", + CustomID: customID, + CustomIPAddress: "", + CustomLatitude: 0, + CustomLongitude: 0, + CustomManufacturer: "", + CustomModel: "", + CustomName: "", + CustomPTZType: 0, + CustomParentID: "", + CustomPort: 0, + CustomSerialNumber: "", + CustomStatus: "", + Description: "", + DeviceCustomName: "", + DeviceID: channel.RootID, + DeviceName: deviceName, + DeviceOnline: online, + DeviceType: "GB", + Direction: 0, + DownloadSpeed: "", + Firmware: "", + ID: channel.DeviceID, + IPAddress: channel.IPAddress, + Latitude: latitude, + Longitude: longitude, + Manufacturer: channel.Manufacturer, + Model: channel.Model, + Name: channel.Name, + NumOutputs: 0, + Ondemand: true, + Owner: channel.Owner, + PTZType: 0, + ParentID: channel.ParentID, + Parental: parental, + Port: port, + Quality: "", + RegisterWay: registerWay, + Secrecy: secrecy, + SerialNumber: "", + Shared: false, + SignalLevel: 0, + SnapURL: "", + Speed: 0, + Status: channel.Status.String(), + StreamID: string(streamID), // 实时流ID + SubCount: channel.SubCount, + UpdatedAt: channel.UpdatedAt.Format("2006-01-02 15:04:05"), + }) + + index++ + } + + return ChannelList +} diff --git a/recover.go b/recover.go index ea99abb..d0a6ef9 100644 --- a/recover.go +++ b/recover.go @@ -18,15 +18,19 @@ func startPlatformDevices() { } for _, record := range platforms { + if err := dao.Platform.UpdateOnlineStatus(common.OFF, record.ServerAddr); err != nil { + log.Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.ServerID) + } + + if !record.Enable { + continue + } + platform, err := stack.NewPlatform(&record.SIPUAOptions, common.SipStack) // 都入库了不允许失败, 程序有BUG, 及时修复 utils.Assert(err == nil) utils.Assert(stack.PlatformManager.Add(platform.ServerAddr, platform)) - if err := dao.Platform.UpdateOnlineStatus(common.OFF, record.ServerAddr); err != nil { - log.Sugar.Infof("更新级联设备状态失败 err: %s device: %s", err.Error(), record.ServerID) - } - platform.Start() } } diff --git a/stack/client.go b/stack/client.go index 7443cda..1ce501d 100644 --- a/stack/client.go +++ b/stack/client.go @@ -51,6 +51,11 @@ func (g *gbClient) OnQueryCatalog(sn int, channels []*dao.ChannelModel) { for i, _ := range channels { channel := *channels[i] + // 向上级推送自定义的通道ID + if channel.CustomID != nil { + channel.DeviceID = *channel.CustomID + } + response.DeviceList.Devices = nil response.DeviceList.Num = 1 // 一次发一个通道 response.DeviceList.Devices = append(response.DeviceList.Devices, &channel) diff --git a/stack/client_manager.go b/stack/client_manager.go index ef3b792..fcaa597 100644 --- a/stack/client_manager.go +++ b/stack/client_manager.go @@ -91,7 +91,7 @@ func (p *ClientManager) ExistClientByServerAddr(addr string) bool { } func RemovePlatform(key string) (GBClient, error) { - err := dao.Platform.DeleteUAByAddr(key) + err := dao.Platform.DeletePlatformByAddr(key) if err != nil { return nil, err } diff --git a/stack/platform.go b/stack/platform.go index 7fc4dbf..89b57f8 100644 --- a/stack/platform.go +++ b/stack/platform.go @@ -74,6 +74,7 @@ func (g *Platform) OnInvite(request sip.Request, user string) sip.Response { return CreateResponseWithStatusCode(request, http.StatusNotFound) } + // 解析sdp gbSdp, err := ParseGBSDP(request.Body()) if err != nil { log.Sugar.Errorf("处理上级Invite失败,err: %s sdp: %s", err.Error(), request.Body()) @@ -84,24 +85,16 @@ func (g *Platform) OnInvite(request sip.Request, user string) sip.Response { inviteType.SessionName2Type(strings.ToLower(gbSdp.SDP.Session)) streamId := common.GenerateStreamID(inviteType, channel.RootID, channel.DeviceID, gbSdp.StartTime, gbSdp.StopTime) - // 如果流不存在, 向通道发送Invite请求 - stream, _ := dao.Stream.QueryStream(streamId) - if stream == nil { - stream, err = (&Device{device}).StartStream(inviteType, streamId, user, gbSdp.StartTime, gbSdp.StopTime, channel.Setup.String(), 0, true) - if err != nil { - log.Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId) - return CreateResponseWithStatusCode(request, http.StatusBadRequest) - } - } - sink := &dao.SinkModel{ StreamID: streamId, ServerAddr: g.ServerAddr, Protocol: "gb_cascaded"} + // 添加转发sink到流媒体服务器 response, err := AddForwardSink(TransStreamGBCascaded, request, user, &Sink{sink}, streamId, gbSdp, inviteType, "96 PS/90000") if err != nil { log.Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId) + response = CreateResponseWithStatusCode(request, http.StatusInternalServerError) } return response diff --git a/stack/sip_server.go b/stack/sip_server.go index 8a9c5a5..125bb4c 100644 --- a/stack/sip_server.go +++ b/stack/sip_server.go @@ -358,12 +358,12 @@ func filterRequest(f func(wrapper *SipRequestSource)) gosip.RequestHandler { userAgent := req.GetHeaders("User-Agent") // 过滤黑名单 - if _, err := dao.Blacklist.QueryIP(req.Source()); err == nil { + if model, _ := dao.Blacklist.QueryIP(req.Source()); model != nil { SendResponseWithStatusCode(req, tx, http.StatusForbidden) log2.Sugar.Errorf("处理%s请求失败, IP被黑名单过滤: %s request: %s ", req.Method(), req.Source(), req.String()) return } else if len(userAgent) > 0 { - if _, err = dao.Blacklist.QueryUA(userAgent[0].Value()); err == nil { + if model, _ = dao.Blacklist.QueryUA(userAgent[0].Value()); model != nil { SendResponseWithStatusCode(req, tx, http.StatusForbidden) log2.Sugar.Errorf("处理%s请求失败, UA被黑名单过滤: %s request: %s ", req.Method(), userAgent[0].Value(), req.String()) return diff --git a/stack/sip_ua.go b/stack/sip_ua.go index dd7177f..140cd90 100644 --- a/stack/sip_ua.go +++ b/stack/sip_ua.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "gb-cms/common" + "gb-cms/log" "github.com/ghettovoice/gosip/sip" "github.com/lkmio/avformat/utils" "math" @@ -87,10 +88,10 @@ func (g *sipUA) doRegister(request sip.Request) bool { hop.Params.Add("received", &empty) for i := 0; i < 2; i++ { - //发起注册, 第一次未携带授权头, 第二次携带授权头 + // 发起注册, 第一次未携带授权头, 第二次携带授权头 clientTransaction := g.stack.SendRequest(request) - //等待响应 + // 等待响应 responses := clientTransaction.Responses() var response sip.Response select { @@ -112,6 +113,12 @@ func (g *sipUA) doRegister(request sip.Request) bool { } return true } else if response.StatusCode() == 401 || response.StatusCode() == 407 { + if i == 1 { + // 密码错误 + log.Sugar.Errorf("注册失败, 密码错误. username: %s, server id: %s, server addr: %s password: %s", g.Username, g.ServerID, g.ServerAddr, g.Password) + return false + } + authorizer := sip.DefaultAuthorizer{Password: sip.String{Str: g.Password}, User: sip.String{Str: g.Username}} if err := authorizer.AuthorizeRequest(request, response); err != nil { break