From 43298a5b50df7e9a38d7c8dda080dc4194ff3f3e Mon Sep 17 00:00:00 2001 From: ydajiang Date: Mon, 25 Aug 2025 10:28:12 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=80=82=E9=85=8Dlivegbs=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 492 ++++++++++++++++++++++++++++++++++++---------- api_livegbs.go | 244 +++++++++++++++++++++++ dao_channel.go | 32 ++- dao_device.go | 41 +++- dao_stream.go | 2 +- db_sqlite.go | 2 +- device.go | 13 +- go.mod | 10 +- http_request.go | 191 ++++++++++++++++++ http_response.go | 36 +++- live.go | 3 +- livegbs_bean.go | 153 ++++++++++++++ main.go | 30 ++- online_devices.go | 6 + platform.go | 12 +- sip_server.go | 1 + stats.go | 402 +++++++++++++++++++++++++++++++++++++ stream.go | 22 ++- temp_pwd.go | 47 +++++ token_manager.go | 97 +++++++++ xml.go | 3 +- xml_record.go | 15 +- 22 files changed, 1701 insertions(+), 153 deletions(-) create mode 100644 api_livegbs.go create mode 100644 http_request.go create mode 100644 livegbs_bean.go create mode 100644 stats.go create mode 100644 temp_pwd.go create mode 100644 token_manager.go diff --git a/api.go b/api.go index 9e93cf0..f56be40 100644 --- a/api.go +++ b/api.go @@ -10,6 +10,7 @@ import ( "github.com/lkmio/avformat/utils" "math" "net/http" + "os" "strconv" "strings" "time" @@ -21,12 +22,13 @@ type ApiServer struct { } type InviteParams struct { - DeviceID string `json:"device_id"` - ChannelID string `json:"channel_id"` - StartTime string `json:"start_time"` - EndTime string `json:"end_time"` + DeviceID string `json:"serial"` + ChannelID string `json:"code"` + StartTime string `json:"starttime"` + EndTime string `json:"endtime"` Setup string `json:"setup"` Speed string `json:"speed"` + Token string `json:"token"` streamId StreamID } @@ -42,12 +44,12 @@ type PlayDoneParams struct { } type QueryRecordParams struct { - DeviceID string `json:"device_id"` - ChannelID string `json:"channel_id"` + DeviceID string `json:"serial"` + ChannelID string `json:"code"` Timeout int `json:"timeout"` - StartTime string `json:"start_time"` - EndTime string `json:"end_time"` - Type_ string `json:"type"` + StartTime string `json:"starttime"` + EndTime string `json:"endtime"` + //Type_ string `json:"type"` } type DeviceChannelID struct { @@ -109,37 +111,6 @@ func init() { } } -func withJsonParams[T any](f func(params T, w http.ResponseWriter, req *http.Request), params T) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, req *http.Request) { - newParams := new(T) - if err := HttpDecodeJSONBody(w, req, newParams); err != nil { - Sugar.Errorf("解析请求体失败 err: %s path: %s", err.Error(), req.URL.Path) - httpResponseError(w, err.Error()) - return - } - - f(*newParams, w, req) - } -} - -func withJsonResponse[T any](f func(params T, w http.ResponseWriter, req *http.Request) (interface{}, error), params interface{}) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, req *http.Request) { - newParams := new(T) - if err := HttpDecodeJSONBody(w, req, newParams); err != nil { - Sugar.Errorf("解析请求体失败 err: %s path: %s", err.Error(), req.URL.Path) - httpResponseError(w, err.Error()) - return - } - - responseBody, err := f(*newParams, w, req) - if err != nil { - httpResponseError(w, err.Error()) - } else { - httpResponseOK(w, responseBody) - } - } -} - func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/hook/on_play", withJsonParams(apiServer.OnPlay, &StreamParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_play_done", withJsonParams(apiServer.OnPlayDone, &PlayDoneParams{})) @@ -152,16 +123,16 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/hook/on_started", apiServer.OnStarted) // 统一处理live/playback/download请求 - apiServer.router.HandleFunc("/api/v1/{action}/start", withJsonParams(apiServer.OnInvite, &InviteParams{})) + apiServer.router.HandleFunc("/api/v1/{action}/start", withVerify(withFormDataParams(apiServer.OnInvite, InviteParams{}))) // 关闭国标流. 如果是实时流, 等收流或空闲超时自行删除. 回放或下载流立即删除. apiServer.router.HandleFunc("/api/v1/stream/close", withJsonParams(apiServer.OnCloseStream, &StreamIDParams{})) - apiServer.router.HandleFunc("/api/v1/device/list", withJsonResponse(apiServer.OnDeviceList, &PageQuery{})) // 查询设备列表 - apiServer.router.HandleFunc("/api/v1/channel/list", withJsonResponse(apiServer.OnChannelList, &PageQueryChannel{})) // 查询通道列表 - apiServer.router.HandleFunc("/api/v1/record/list", withJsonResponse(apiServer.OnRecordList, &QueryRecordParams{})) // 查询录像列表 - apiServer.router.HandleFunc("/api/v1/position/sub", withJsonResponse(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置 - apiServer.router.HandleFunc("/api/v1/playback/seek", withJsonResponse(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek - apiServer.router.HandleFunc("/api/v1/ptz/control", apiServer.OnPTZControl) // 云台控制 + apiServer.router.HandleFunc("/api/v1/device/list", withVerify(withQueryStringParams(apiServer.OnDeviceList, QueryDeviceChannel{}))) // 查询设备列表 + apiServer.router.HandleFunc("/api/v1/device/channellist", withVerify(withQueryStringParams(apiServer.OnChannelList, QueryDeviceChannel{}))) // 查询通道列表 + apiServer.router.HandleFunc("/api/v1/playback/recordlist", withVerify(withQueryStringParams(apiServer.OnRecordList, QueryRecordParams{}))) // 查询录像列表 + apiServer.router.HandleFunc("/api/v1/position/sub", withJsonResponse(apiServer.OnSubscribePosition, &DeviceChannelID{})) // 订阅移动位置 + apiServer.router.HandleFunc("/api/v1/playback/seek", withJsonResponse(apiServer.OnSeekPlayback, &SeekParams{})) // 回放seek + apiServer.router.HandleFunc("/api/v1/control/ptz", apiServer.OnPTZControl) // 云台控制 apiServer.router.HandleFunc("/api/v1/platform/list", apiServer.OnPlatformList) // 级联设备列表 apiServer.router.HandleFunc("/api/v1/platform/add", withJsonResponse(apiServer.OnPlatformAdd, &PlatformModel{})) // 添加级联设备 @@ -181,8 +152,29 @@ func startApiServer(addr string) { apiServer.router.HandleFunc("/api/v1/jt/channel/add", withJsonResponse(apiServer.OnVirtualChannelAdd, &Channel{})) apiServer.router.HandleFunc("/api/v1/jt/channel/edit", withJsonResponse(apiServer.OnVirtualChannelEdit, &Channel{})) apiServer.router.HandleFunc("/api/v1/jt/channel/remove", withJsonResponse(apiServer.OnVirtualChannelRemove, &Channel{})) + apiServer.router.HandleFunc("/api/v1/device/setmediatransport", withVerify(withJsonResponse2(apiServer.OnDeviceMediaTransportSet))) - http.Handle("/", apiServer.router) + registerLiveGBSApi() + + // 前端路由 + htmlRoot := "../www/" + fileServer := http.FileServer(http.Dir(htmlRoot)) + apiServer.router.PathPrefix("/").HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + // 处理无扩展名的路径,自动添加.html扩展名 + path := request.URL.Path + if !strings.Contains(path, ".") { + // 检查是否存在对应的.html文件 + htmlPath := htmlRoot + path + ".html" + if _, err := os.Stat(htmlPath); err == nil { + // 如果存在对应的.html文件,则直接返回该文件 + http.ServeFile(writer, request, htmlPath) + return + } + } + + // 供静态文件服务 + fileServer.ServeHTTP(writer, request) + }) srv := &http.Server{ Handler: apiServer.router, @@ -215,6 +207,15 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt // 拉流地址携带的参数 query := r.URL.Query() + + // 播放授权 + streamToken := query.Get("stream_token") + if TokenManager.Find(streamToken) == nil { + w.WriteHeader(http.StatusUnauthorized) + Sugar.Errorf("播放鉴权失败, token不存在 token: %s", streamToken) + return + } + jtSource := query.Get("forward_type") == "gateway_1078" // 跳过非国标拉流 @@ -269,11 +270,11 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt var err error streamType := strings.ToLower(query.Get("stream_type")) if "playback" == streamType { - code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r) + code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false) } else if "download" == streamType { - code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false, w, r) + code, stream, err = api.DoInvite(InviteTypeDownload, inviteParams, false) } else { - code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false, w, r) + code, stream, err = api.DoInvite(InviteTypePlay, inviteParams, false) } if err != nil { @@ -316,6 +317,8 @@ func (api *ApiServer) OnPublish(params *StreamParams, w http.ResponseWriter, r * stream := EarlyDialogs.Find(string(params.Stream)) if stream != nil { stream.Put(200) + } else { + Sugar.Infof("推流事件. 未找到stream. stream: %s", params.Stream) } // 创建stream @@ -373,7 +376,7 @@ func (api *ApiServer) OnRecord(params *RecordParams, w http.ResponseWriter, req Sugar.Infof("录制事件. protocol: %s stream: %s path:%s ", params.Protocol, params.Stream, params.Path) } -func (api *ApiServer) OnInvite(v *InviteParams, w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnInvite(v *InviteParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { vars := mux.Vars(r) action := strings.ToLower(vars["action"]) @@ -381,35 +384,99 @@ func (api *ApiServer) OnInvite(v *InviteParams, w http.ResponseWriter, r *http.R var stream *Stream var err error if "playback" == action { - code, stream, err = apiServer.DoInvite(InviteTypePlayback, v, true, w, r) + code, stream, err = apiServer.DoInvite(InviteTypePlayback, v, true) } else if "download" == action { - code, stream, err = apiServer.DoInvite(InviteTypeDownload, v, true, w, r) - } else if "live" == action { - code, stream, err = apiServer.DoInvite(InviteTypePlay, v, true, w, r) + code, stream, err = apiServer.DoInvite(InviteTypeDownload, v, true) + } else if "stream" == action { + code, stream, err = apiServer.DoInvite(InviteTypePlay, v, true) } else { - w.WriteHeader(http.StatusNotFound) - return + return nil, fmt.Errorf("action not found") } if http.StatusOK != code { Sugar.Errorf("请求流失败 err: %s", err.Error()) - httpResponseError(w, err.Error()) - } else { - // 返回stream id和拉流地址 - response := struct { - Stream string `json:"stream_id"` - Urls []string `json:"urls"` - }{ - string(stream.StreamID), - stream.urls, - } - httpResponseOK(w, response) + return nil, err } + + var urls map[string]string + urls = make(map[string]string, 10) + for _, url := range stream.Urls { + var streamName string + + if strings.HasPrefix(url, "ws") { + streamName = "WS_FLV" + } else if strings.HasSuffix(url, ".flv") { + streamName = "FLV" + } else if strings.HasSuffix(url, ".m3u8") { + streamName = "HLS" + } else if strings.HasSuffix(url, ".rtc") { + streamName = "WEBRTC" + } else if strings.HasPrefix(url, "rtmp") { + streamName = "RTMP" + } else if strings.HasPrefix(url, "rtsp") { + streamName = "RTSP" + } + + // 加上登录的token, 播放授权 + url += "?stream_token=" + v.Token + + // 兼容livegbs前端播放webrtc + if streamName == "WEBRTC" { + if strings.HasPrefix(url, "http") { + url = strings.Replace(url, "http", "webrtc", 1) + } else if strings.HasPrefix(url, "https") { + url = strings.Replace(url, "https", "webrtcs", 1) + } + + url += "&wf=livegbs" + } + + urls[streamName] = url + } + + response := LiveGBSStream{ + AudioEnable: false, + CDN: "", + CascadeSize: 0, + ChannelID: v.ChannelID, + ChannelName: "未读取通道名", + ChannelPTZType: 0, + CloudRecord: false, + DecodeSize: 0, + DeviceID: v.DeviceID, + Duration: 1, + FLV: urls["FLV"], + HLS: urls["HLS"], + InBitRate: 0, + InBytes: 0, + NumOutputs: 0, + Ondemand: true, + OutBytes: 0, + RTMP: urls["RTMP"], + RecordStartAt: "", + RelaySize: 0, + SMSID: "", + SnapURL: "", + SourceAudioCodecName: "", + SourceAudioSampleRate: 0, + SourceVideoCodecName: "", + SourceVideoFrameRate: 0, + SourceVideoHeight: 0, + SourceVideoWidth: 0, + StartAt: "", + StreamID: string(stream.StreamID), + Transport: "TCP", + VideoFrameCount: 0, + WEBRTC: urls["WEBRTC"], + WS_FLV: urls["WS_FLV"], + } + + return response, err } // DoInvite 发起Invite请求 // @params sync 是否异步等待流媒体的publish事件(确认收到流), 目前请求流分两种方式,流媒体hook和http接口, hook方式同步等待确认收到流再应答, http接口直接应答成功。 -func (api *ApiServer) DoInvite(inviteType InviteType, params *InviteParams, sync bool, w http.ResponseWriter, r *http.Request) (int, *Stream, error) { +func (api *ApiServer) DoInvite(inviteType InviteType, params *InviteParams, sync bool) (int, *Stream, error) { device, _ := DeviceDao.QueryDevice(params.DeviceID) if device == nil || !device.Online() { return http.StatusNotFound, nil, fmt.Errorf("设备离线 id: %s", params.DeviceID) @@ -419,12 +486,12 @@ func (api *ApiServer) DoInvite(inviteType InviteType, params *InviteParams, sync var startTimeSeconds string var endTimeSeconds string if InviteTypePlay != inviteType { - startTime, err := time.ParseInLocation("2006-01-02t15:04:05", params.StartTime, time.Local) + startTime, err := time.ParseInLocation("2006-01-02T15:04:05", params.StartTime, time.Local) if err != nil { return http.StatusBadRequest, nil, err } - endTime, err := time.ParseInLocation("2006-01-02t15:04:05", params.EndTime, time.Local) + endTime, err := time.ParseInLocation("2006-01-02T15:04:05", params.EndTime, time.Local) if err != nil { return http.StatusBadRequest, nil, err } @@ -437,6 +504,10 @@ func (api *ApiServer) DoInvite(inviteType InviteType, params *InviteParams, sync params.streamId = GenerateStreamID(inviteType, device.GetID(), params.ChannelID, params.StartTime, params.EndTime) } + if params.Setup == "" { + params.Setup = device.Setup.String() + } + // 解析回放或下载速度参数 speed, _ := strconv.Atoi(params.Speed) speed = int(math.Min(4, float64(speed))) @@ -459,64 +530,205 @@ func (api *ApiServer) OnCloseStream(v *StreamIDParams, w http.ResponseWriter, r httpResponseOK(w, nil) } -func (api *ApiServer) OnDeviceList(v *PageQuery, w http.ResponseWriter, r *http.Request) (interface{}, error) { - Sugar.Infof("查询设备列表 %v", *v) +// QueryDeviceChannel 查询设备和通道的参数 +type QueryDeviceChannel struct { + DeviceID string `json:"serial"` + GroupID string `json:"dir_serial"` + Start int `json:"start"` + Limit int `json:"limit"` + Keyword string `json:"q"` + Online string `json:"online"` + ChannelType string `json:"channel_type"` - if v.PageNumber == nil { - var defaultPageNumber = 1 - v.PageNumber = &defaultPageNumber + //pageNumber int + //pageSize int + //keyword string + //online string // true/false + //channelType string // device/dir, 查询通道列表使用 +} + +func (api *ApiServer) OnDeviceList(q *QueryDeviceChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + values := r.URL.Query() + + Sugar.Infof("查询设备列表 %s", values.Encode()) + + var status string + if "" == q.Online { + } else if "true" == q.Online { + status = "ON" + } else if "false" == q.Online { + status = "OFF" } - if v.PageSize == nil { - var defaultPageSize = 10 - v.PageSize = &defaultPageSize - } - - devices, total, err := DeviceDao.QueryDevices(*v.PageNumber, *v.PageSize) + devices, total, err := DeviceDao.QueryDevices((q.Start/q.Limit)+1, q.Limit, status, q.Keyword) if err != nil { Sugar.Errorf("查询设备列表失败 err: %s", err.Error()) return nil, err } - query := &PageQuery{ - PageNumber: v.PageNumber, - PageSize: v.PageSize, - TotalCount: total, - TotalPages: int(math.Ceil(float64(total) / float64(*v.PageSize))), - Data: devices, + response := struct { + DeviceCount int + DeviceList_ []LiveGBSDevice `json:"DeviceList"` + }{ + DeviceCount: total, } - return query, nil + for _, device := range devices { + split := strings.Split(device.RemoteAddr, ":") + remoteIP := split[0] + remotePort, _ := strconv.Atoi(split[1]) + + response.DeviceList_ = append(response.DeviceList_, LiveGBSDevice{ + AlarmSubscribe: false, + CatalogInterval: 3600, + CatalogSubscribe: false, + ChannelCount: device.ChannelsTotal, + ChannelOverLoad: false, + Charset: "GB2312", + CivilCodeFirst: false, + CommandTransport: device.Transport, + ContactIP: "", + CreatedAt: device.CreatedAt.Format("2006-01-02 15:04:05"), + CustomName: "", + DropChannelType: "", + GBVer: "", + ID: device.GetID(), + KeepOriginalTree: false, + LastKeepaliveAt: device.LastHeartbeat.Format("2006-01-02 15:04:05"), + LastRegisterAt: device.RegisterTime.Format("2006-01-02 15:04:05"), + Latitude: 0, + Longitude: 0, + Manufacturer: device.Manufacturer, + MediaTransport: device.Setup.Transport(), + MediaTransportMode: device.Setup.String(), + Name: device.Name, + Online: device.Online(), + PTZSubscribe: false, + Password: "", + PositionSubscribe: false, + RecordCenter: false, + RecordIndistinct: false, + RecvStreamIP: "", + RemoteIP: remoteIP, + RemotePort: remotePort, + RemoteRegion: "", + SMSGroupID: "", + SMSID: "", + StreamMode: "", + SubscribeInterval: 0, + Type: "GB", + UpdatedAt: device.UpdatedAt.Format("2006-01-02 15:04:05"), + }) + } + + return &response, nil } -func (api *ApiServer) OnChannelList(v *PageQueryChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) { - Sugar.Infof("查询通道列表 %v", *v) +func (api *ApiServer) OnChannelList(q *QueryDeviceChannel, w http.ResponseWriter, r *http.Request) (interface{}, error) { + values := r.URL.Query() + Sugar.Infof("查询通道列表 %s", values.Encode()) - if v.PageNumber == nil { - var defaultPageNumber = 1 - v.PageNumber = &defaultPageNumber + device, err := DeviceDao.QueryDevice(q.DeviceID) + if err != nil { + Sugar.Errorf("查询设备失败 err: %s", err.Error()) + return nil, err } - if v.PageSize == nil { - var defaultPageSize = 10 - v.PageSize = &defaultPageSize + var status string + if "" == q.Online { + } else if "true" == q.Online { + status = "ON" + } else if "false" == q.Online { + status = "OFF" } - channels, total, err := ChannelDao.QueryChannels(v.DeviceID, v.GroupID, *v.PageNumber, *v.PageSize) + channels, total, err := ChannelDao.QueryChannels(q.DeviceID, q.GroupID, (q.Start/q.Limit)+1, q.Limit, status, q.Keyword) if err != nil { Sugar.Errorf("查询通道列表失败 err: %s", err.Error()) return nil, err } - query := &PageQuery{ - PageNumber: v.PageNumber, - PageSize: v.PageSize, - TotalCount: total, - TotalPages: int(math.Ceil(float64(total) / float64(*v.PageSize))), - Data: channels, + response := struct { + ChannelCount int + ChannelList []LiveGBSChannel + }{ + ChannelCount: total, } - return query, nil + index := q.Start + 1 + for _, channel := range channels { + parental, _ := strconv.Atoi(channel.Parental) + port, _ := strconv.Atoi(channel.Port) + registerWay, _ := strconv.Atoi(channel.RegisterWay) + secrecy, _ := strconv.Atoi(channel.Secrecy) + + response.ChannelList = append(response.ChannelList, LiveGBSChannel{ + Address: channel.Address, + Altitude: 0, + AudioEnable: true, + BatteryLevel: 0, + Channel: index, + CivilCode: channel.CivilCode, + CloudRecord: false, + CreatedAt: channel.CreatedAt.Format("2006-01-02 15:04:05"), + Custom: false, + CustomAddress: "", + CustomBlock: "", + CustomCivilCode: "", + CustomFirmware: "", + CustomID: "", + CustomIPAddress: "", + CustomLatitude: 0, + CustomLongitude: 0, + CustomManufacturer: "", + CustomModel: "", + CustomName: "", + CustomPTZType: 0, + CustomParentID: "", + CustomPort: 0, + CustomSerialNumber: "", + CustomStatus: "", + Description: "", + DeviceCustomName: "", + DeviceID: channel.RootID, + DeviceName: device.Name, + DeviceOnline: device.Online(), + DeviceType: "GB", + Direction: 0, + DownloadSpeed: "", + Firmware: "", + ID: channel.DeviceID, + IPAddress: channel.IPAddress, + Latitude: 0, + Longitude: 0, + Manufacturer: channel.Manufacturer, + Model: channel.Model, + Name: channel.Name, + NumOutputs: 0, + Ondemand: true, + Owner: channel.Owner, + PTZType: 0, + ParentID: channel.ParentID, + Parental: parental, + Port: port, + Quality: "", + RegisterWay: registerWay, + Secrecy: secrecy, + SerialNumber: "", + Shared: false, + SignalLevel: 0, + SnapURL: "", + Speed: 0, + Status: channel.Status.String(), + StreamID: "", + SubCount: channel.SubCount, + UpdatedAt: channel.UpdatedAt.Format("2006-01-02 15:04:05"), + }) + + index++ + } + + return response, nil } func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter, r *http.Request) (interface{}, error) { @@ -529,7 +741,7 @@ func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter, } sn := GetSN() - err := device.QueryRecord(v.ChannelID, v.StartTime, v.EndTime, sn, v.Type_) + err := device.QueryRecord(v.ChannelID, v.StartTime, v.EndTime, sn, "all") if err != nil { Sugar.Errorf("发送查询录像请求失败 err: %s", err.Error()) return nil, err @@ -558,7 +770,47 @@ func (api *ApiServer) OnRecordList(v *QueryRecordParams, w http.ResponseWriter, break } - return recordList, nil + response := struct { + DeviceID string + Name string + RecordList []struct { + DeviceID string + EndTime string + FileSize uint64 + Name string + Secrecy string + StartTime string + Type string + } + SumNum int `json:"sumNum"` + }{ + DeviceID: v.DeviceID, + Name: device.Name, + SumNum: len(recordList), + } + + for _, record := range recordList { + Sugar.Infof("查询录像列表 %v", record) + response.RecordList = append(response.RecordList, struct { + DeviceID string + EndTime string + FileSize uint64 + Name string + Secrecy string + StartTime string + Type string + }{ + DeviceID: record.DeviceID, + EndTime: record.EndTime, + FileSize: record.FileSize, + Name: record.Name, + Secrecy: record.Secrecy, + StartTime: record.StartTime, + Type: record.Type, + }) + } + + return &response, nil } func (api *ApiServer) OnSubscribePosition(v *DeviceChannelID, w http.ResponseWriter, r *http.Request) (interface{}, error) { @@ -823,3 +1075,27 @@ func (api *ApiServer) OnPlatformChannelUnbind(v *PlatformChannel, w http.Respons return channels, nil } + +func (api *ApiServer) OnDeviceMediaTransportSet(w http.ResponseWriter, r *http.Request) (interface{}, error) { + serial := r.FormValue("serial") + mediaTransport := r.FormValue("media_transport") + mediaTransportMode := r.FormValue("media_transport_mode") + + var setupType SetupType + if "udp" == strings.ToLower(mediaTransport) { + setupType = SetupTypeUDP + } else if "passive" == strings.ToLower(mediaTransportMode) { + setupType = SetupTypePassive + } else if "active" == strings.ToLower(mediaTransportMode) { + setupType = SetupTypeActive + } else { + return nil, fmt.Errorf("media_transport_mode error") + } + + err := DeviceDao.UpdateMediaTransport(serial, setupType) + if err != nil { + return nil, err + } + + return "OK", nil +} diff --git a/api_livegbs.go b/api_livegbs.go new file mode 100644 index 0000000..bf69733 --- /dev/null +++ b/api_livegbs.go @@ -0,0 +1,244 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "sync" + "time" +) + +var ( + ModifyPasswordLock sync.Mutex +) + +type LoginReq struct { + Username string `json:"username"` + Pwd string `json:"password"` // MD5加密 + RememberMe bool `json:"remember_me"` +} + +type ServerInfoBase struct { + CopyrightText string `json:"CopyrightText"` + DemoUser string `json:"DemoUser"` + LiveStreamAuth bool `json:"LiveStreamAuth"` + LoginRequestMethod string `json:"LoginRequestMethod"` + LogoMiniText string `json:"LogoMiniText"` + LogoText string `json:"LogoText"` + + MapInfo struct { + Center []float64 `json:"Center"` + MaxZoom int `json:"MaxZoom"` + MinZoom int `json:"MinZoom"` + Zoom int `json:"Zoom"` + } `json:"MapInfo"` +} + +type ModifyPasswordReq struct { + OldPwd string `json:"oldpassword"` + NewPwd string `json:"newpassword"` +} + +func GetUptime() time.Duration { + return time.Since(StartUpTime) +} + +func FormatUptime(d time.Duration) string { + days := int(d.Hours()) / 24 + hours := int(d.Hours()) % 24 + minutes := int(d.Minutes()) % 60 + seconds := int(d.Seconds()) % 60 + return fmt.Sprintf("%d Days %d Hours %d Mins %d Secs", days, hours, minutes, seconds) +} + +func registerLiveGBSApi() { + + serverInfoBase := ServerInfoBase{ + CopyrightText: fmt.Sprintf("Copyright © %d \u003ca href=\"//github.com/lkmio\" target=\"_blank\"\u003egithub.com/lkmio\u003c/a\u003e Released under MIT License", time.Now().Year()), + DemoUser: "", + LiveStreamAuth: true, + LoginRequestMethod: "post", + LogoMiniText: "GBS", + LogoText: "LKMGBS", + MapInfo: struct { + Center []float64 `json:"Center"` + MaxZoom int `json:"MaxZoom"` + MinZoom int `json:"MinZoom"` + Zoom int `json:"Zoom"` + }{ + Center: []float64{0.0, 0.0}, + MaxZoom: 16, + MinZoom: 8, + Zoom: 12, + }, + } + + apiServer.router.HandleFunc("/api/v1/login", withFormDataParams(apiServer.OnLogin, LoginReq{})) + apiServer.router.HandleFunc("/api/v1/modifypassword", withVerify(withFormDataParams(apiServer.OnModifyPassword, ModifyPasswordReq{}))) + + apiServer.router.HandleFunc("/api/v1/dashboard/auth", withVerify(func(writer http.ResponseWriter, request *http.Request) { + response := struct { + ChannelCount int `json:"ChannelCount"` + ChannelOnline int `json:"ChannelOnline"` + ChannelTotal int `json:"ChannelTotal"` + DeviceOnline int `json:"DeviceOnline"` + DeviceTotal int `json:"DeviceTotal"` + }{ + ChannelCount: 16, + ChannelOnline: 1, + ChannelTotal: 1, + DeviceOnline: OnlineDeviceManager.Count(), + DeviceTotal: DeviceCount, + } + + _ = httpResponseSuccess(writer, response) + })) + + apiServer.router.HandleFunc("/api/v1/getserverinfo", withVerify2(func(writer http.ResponseWriter, request *http.Request) { + response := struct { + ServerInfoBase + + Authorization string `json:"Authorization"` + ChannelCount int `json:"ChannelCount"` + Hardware string `json:"Hardware"` + InterfaceVersion string `json:"InterfaceVersion"` + + RemainDays int `json:"RemainDays"` + RunningTime string `json:"RunningTime"` + Server string `json:"Server"` + ServerTime string `json:"ServerTime"` + StartUpTime string `json:"StartUpTime"` + VersionType string `json:"VersionType"` + }{ + ServerInfoBase: serverInfoBase, + Authorization: "Users", + ChannelCount: 16, + Hardware: KernelArch, + InterfaceVersion: "v1", + + RemainDays: 0, + RunningTime: FormatUptime(GetUptime()), + Server: "github.com/lkmio/gb-cms dev", + ServerTime: time.Now().Format("2006-01-02 15:04:05"), + StartUpTime: StartUpTime.Format("2006-01-02 15:04:05"), + VersionType: "开源版", + } + + _ = httpResponseJson(writer, response) + }, func(w http.ResponseWriter, req *http.Request) { + _ = httpResponseJson(w, &serverInfoBase) + })) + + apiServer.router.HandleFunc("/api/v1/userinfo", withVerify(func(writer http.ResponseWriter, request *http.Request) { + cookie, _ := request.Cookie("token") + session := TokenManager.Find(cookie.Value) + if session == nil { + writer.WriteHeader(http.StatusUnauthorized) + return + } + + response := struct { + Token string `json:"Token"` + ID int `json:"ID"` + Name string `json:"Name"` + Roles []string `json:"Roles"` + HasAllChannel bool `json:"HasAllChannel"` + LoginAt string `json:"LoginAt"` + RemoteIP string `json:"RemoteIP"` + }{ + Token: cookie.Value, + ID: 1, + Name: "admin", + Roles: []string{"超级管理员"}, + HasAllChannel: true, + LoginAt: session.LoginTime.Format("2006-01-02 15:04:05"), + RemoteIP: request.RemoteAddr, + } + + _ = httpResponseJson(writer, response) + })) + + apiServer.router.HandleFunc("/api/v1/ispasswordchanged", func(writer http.ResponseWriter, request *http.Request) { + _ = httpResponseJson(writer, map[string]bool{ + "PasswordChanged": true, + "UserChanged": false, + }) + }) + + apiServer.router.HandleFunc("api/v1/dashboard/auth", withVerify(func(writer http.ResponseWriter, request *http.Request) { + + })) + + apiServer.router.HandleFunc("/api/v1/dashboard/top", withVerify(func(writer http.ResponseWriter, request *http.Request) { + _ = httpResponseJsonStr(writer, topStatsJson) + })) + + // 实时统计上下行流量 + apiServer.router.HandleFunc("/api/v1/dashboard/top/net", withVerify(func(writer http.ResponseWriter, request *http.Request) { + _ = httpResponseJsonStr(writer, lastNetStatsJson) + })) + + apiServer.router.HandleFunc("/api/v1/dashboard/store", withVerify(func(writer http.ResponseWriter, request *http.Request) { + _ = httpResponseJsonStr(writer, diskStatsJson) + })) +} + +func (api *ApiServer) OnLogin(v *LoginReq, w http.ResponseWriter, r *http.Request) (interface{}, error) { + if PwdMD5 != v.Pwd { + Sugar.Errorf("登录失败, 密码错误 pwd: %s remote addr: %s", v.Pwd, r.RemoteAddr) + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("用户名或密码错误")) + return nil, nil + } + + token := GenerateToken() + TokenManager.Add(token, v.Username, v.Pwd) + + http.SetCookie(w, &http.Cookie{ + Name: "token", + Value: token, + Path: "/", + HttpOnly: true, + }) + + response := struct { + AuthToken string + CookieToken string + Token string + TokenTimeout int + URLToken string + }{ + AuthToken: token, + CookieToken: token, + Token: token, + TokenTimeout: 0, + URLToken: token, + } + + return response, nil +} + +func (api *ApiServer) OnModifyPassword(v *ModifyPasswordReq, w http.ResponseWriter, r *http.Request) (interface{}, error) { + ModifyPasswordLock.Lock() + defer ModifyPasswordLock.Unlock() + if PwdMD5 != v.OldPwd { + Sugar.Errorf("修改密码失败, 旧密码错误 oldPwd: %s remote addr: %s", v.OldPwd, r.RemoteAddr) + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte("原密码不正确")) + return nil, nil + } + + // 写入新密码 + err := os.WriteFile("./data/pwd.txt", []byte(v.NewPwd), 0644) + if err != nil { + Sugar.Errorf("修改密码失败, 写入文件失败 err: %s pwd: %s", err.Error(), v.NewPwd) + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("系统错误")) + return nil, nil + } + + // 删除所有token? + TokenManager.Clear() + PwdMD5 = v.NewPwd + return nil, nil +} diff --git a/dao_channel.go b/dao_channel.go index 1a13f15..ca925c3 100644 --- a/dao_channel.go +++ b/dao_channel.go @@ -76,25 +76,47 @@ func (d *daoChannel) QueryChannel(deviceId string, channelId string) (*Channel, return &channel, nil } -func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int) ([]*Channel, int, error) { +func (d *daoChannel) QueryChannels(deviceId, groupId string, page, size int, status string, keyword string) ([]*Channel, int, error) { conditions := map[string]interface{}{} conditions["root_id"] = deviceId if groupId != "" { conditions["group_id"] = groupId } + if status != "" { + conditions["status"] = status + } + + cTx := db.Limit(size).Offset((page - 1) * size).Where(conditions) + if keyword != "" { + cTx.Where("name like ? or device_id like ?", "%"+keyword+"%", "%"+keyword+"%") + } var channels []*Channel - tx := db.Limit(size).Offset((page - 1) * size).Where(conditions).Find(&channels) - if tx.Error != nil { + if tx := cTx.Find(&channels); tx.Error != nil { return nil, 0, tx.Error } + countTx := db.Model(&Channel{}).Select("id").Where(conditions) + if keyword != "" { + countTx.Where("name like ? or device_id like ?", "%"+keyword+"%", "%"+keyword+"%") + } + var total int64 - tx = db.Model(&Channel{}).Select("id").Where(conditions).Count(&total) - if tx.Error != nil { + if tx := countTx.Count(&total); tx.Error != nil { return nil, 0, tx.Error } + // 查询每个通道的子节点通道数量 + for _, channel := range channels { + // 查询子节点数量 + var subCount int64 + tx := db.Model(&Channel{}).Where("root_id =? and group_id =?", deviceId, channel.DeviceID).Count(&subCount) + if tx.Error != nil { + return nil, 0, tx.Error + } + channel.SubCount = int(subCount) + } + return channels, int(total), nil } diff --git a/dao_device.go b/dao_device.go index 04919c8..8e2f7ba 100644 --- a/dao_device.go +++ b/dao_device.go @@ -5,6 +5,10 @@ import ( "time" ) +var ( + DeviceCount int +) + type DaoDevice interface { LoadOnlineDevices() (map[string]*Device, error) @@ -25,6 +29,8 @@ type DaoDevice interface { UpdateOfflineDevices(deviceIds []string) error ExistDevice(deviceId string) bool + + UpdateMediaTransport(deviceId string, setupType SetupType) error } type daoDevice struct { @@ -47,6 +53,7 @@ func (d *daoDevice) LoadDevices() (map[string]*Device, error) { deviceMap[device.DeviceID] = device } + DeviceCount = len(devices) return deviceMap, nil } @@ -59,7 +66,11 @@ func (d *daoDevice) SaveDevice(device *Device) error { if device.ID == 0 { //return tx.Create(&old).Error - return tx.Save(device).Error + err := tx.Save(device).Error + if err == nil { + DeviceCount++ + } + return err } else { return tx.Model(device).Select("Transport", "RemoteAddr", "Status", "RegisterTime", "LastHeartbeat").Updates(*device).Error } @@ -114,16 +125,29 @@ func (d *daoDevice) QueryDevice(id string) (*Device, error) { return &device, nil } -func (d *daoDevice) QueryDevices(page int, size int) ([]*Device, int, error) { +func (d *daoDevice) QueryDevices(page int, size int, status string, keyword string) ([]*Device, int, error) { + var cond = make(map[string]interface{}) + if status != "" { + cond["status"] = status + } + + devicesTx := db.Where(cond).Limit(size).Offset((page - 1) * size) + if keyword != "" { + devicesTx.Where("device_id like ? or name like ?", "%"+keyword+"%", "%"+keyword+"%") + } + var devices []*Device - tx := db.Limit(size).Offset((page - 1) * size).Find(&devices) - if tx.Error != nil { + if tx := devicesTx.Find(&devices); tx.Error != nil { return nil, 0, tx.Error } + countTx := db.Where(cond).Model(&Device{}) + if keyword != "" { + countTx.Where("device_id like ? or name like ?", "%"+keyword+"%", "%"+keyword+"%") + } + var total int64 - tx = db.Model(&Device{}).Count(&total) - if tx.Error != nil { + if tx := countTx.Count(&total); tx.Error != nil { return nil, 0, tx.Error } @@ -152,3 +176,8 @@ func (d *daoDevice) ExistDevice(deviceId string) bool { return true } +func (d *daoDevice) UpdateMediaTransport(deviceId string, setupType SetupType) error { + return DBTransaction(func(tx *gorm.DB) error { + return tx.Model(&Device{}).Where("device_id =?", deviceId).Update("setup", setupType).Error + }) +} diff --git a/dao_stream.go b/dao_stream.go index 9da12fd..1be42ed 100644 --- a/dao_stream.go +++ b/dao_stream.go @@ -47,7 +47,7 @@ func (d *daoStream) LoadStreams() (map[string]*Stream, error) { func (d *daoStream) SaveStream(stream *Stream) (*Stream, bool) { var old Stream - tx := db.Select("id").Where("stream_id =?", stream.StreamID).Take(&old) + tx := db.Where("stream_id =?", stream.StreamID).Take(&old) if old.ID != 0 { return &old, false } diff --git a/db_sqlite.go b/db_sqlite.go index f03236d..7b94194 100644 --- a/db_sqlite.go +++ b/db_sqlite.go @@ -9,7 +9,7 @@ import ( ) const ( - DBNAME = "lkm_gb.db" + DBNAME = "data/lkm_gb.db" //DBNAME = ":memory:" ) diff --git a/device.go b/device.go index 4cbf5c5..38236ec 100644 --- a/device.go +++ b/device.go @@ -92,19 +92,20 @@ type GBDevice interface { type Device struct { GBModel - DeviceID string `json:"device_id" gorm:"uniqueIndex"` - Name string `json:"name"` + DeviceID string `json:"device_id" gorm:"index"` + Name string `json:"name" gorm:"index"` RemoteAddr string `json:"remote_addr"` - Transport string `json:"transport"` - Status OnlineStatus `json:"status"` //在线状态 ON-在线/OFF-离线 + Transport string `json:"transport"` // 信令传输模式 + Status OnlineStatus `json:"status"` // 在线状态 ON-在线/OFF-离线 Manufacturer string `json:"manufacturer"` Model string `json:"model"` Firmware string `json:"firmware"` - RegisterTime time.Time `json:"register_time"` - LastHeartbeat time.Time `json:"last_heartbeat"` + RegisterTime time.Time `json:"register_time"` // 注册时间 + LastHeartbeat time.Time `json:"last_heartbeat"` // 最后心跳时间 ChannelsTotal int `json:"total_channels"` // 通道总数 ChannelsOnline int `json:"online_channels"` // 通道在线数量 + Setup SetupType } func (d *Device) GetID() string { diff --git a/go.mod b/go.mod index 644ef79..8a4f2ce 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module gb-cms -go 1.19 +go 1.23 require ( github.com/ghettovoice/gosip v0.0.0-20240401112151-56d750b16008 @@ -17,18 +17,25 @@ require ( github.com/discoviking/fsm v0.0.0-20150126104936-f4a273feecca // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/glebarez/go-sqlite v1.21.2 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/gobwas/ws v1.4.0 // indirect github.com/google/uuid v1.3.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/tevino/abool v1.2.0 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/sys v0.21.0 // indirect @@ -46,5 +53,6 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.3 github.com/lkmio/avformat v0.0.1 + github.com/shirou/gopsutil/v3 v3.24.5 gorm.io/gorm v1.26.1 ) diff --git a/http_request.go b/http_request.go new file mode 100644 index 0000000..9e36c72 --- /dev/null +++ b/http_request.go @@ -0,0 +1,191 @@ +package main + +import ( + "net/http" + "reflect" + "strconv" + "time" +) + +// parseQueryParams 使用反射解析 URL 查询参数并填充到结构体中 +func parseQueryParams(c func(key string) string, v interface{}) (interface{}, error) { + val := reflect.ValueOf(v) + if val.Kind() == reflect.Ptr { + if val.IsNil() { + // 如果指针为 nil,创建一个新的实例 + val = reflect.New(val.Type().Elem()) + } + val = val.Elem() + } + + if val.Kind() != reflect.Struct { + return nil, nil + } + + typ := val.Type() + for i := 0; i < val.NumField(); i++ { + field := typ.Field(i) + fieldValue := val.Field(i) + + // 获取字段名 + fieldName := field.Tag.Get("json") + if fieldName == "" { + fieldName = field.Name + } + + // 从 URL 参数中获取值 + value := c(fieldName) + if value == "" { + continue + } + + // 根据字段类型设置值 + switch fieldValue.Kind() { + case reflect.String: + fieldValue.SetString(value) + case reflect.Int: + intValue, err := strconv.Atoi(value) + if err != nil { + return nil, err + } + fieldValue.SetInt(int64(intValue)) + case reflect.Bool: + boolValue, err := strconv.ParseBool(value) + if err != nil { + return nil, err + } + fieldValue.SetBool(boolValue) + } + } + + return val.Addr().Interface(), nil +} + +func withJsonParams[T any](f func(params T, w http.ResponseWriter, req *http.Request), params T) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, req *http.Request) { + newParams := new(T) + if err := HttpDecodeJSONBody(w, req, newParams); err != nil { + Sugar.Errorf("解析请求体失败 err: %s path: %s", err.Error(), req.URL.Path) + _ = httpResponseError(w, err.Error()) + return + } + + f(*newParams, w, req) + } +} + +func withJsonResponse[T any](f func(params T, w http.ResponseWriter, req *http.Request) (interface{}, error), params interface{}) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, req *http.Request) { + newParams := new(T) + if err := HttpDecodeJSONBody(w, req, newParams); err != nil { + Sugar.Errorf("解析请求体失败 err: %s path: %s", err.Error(), req.URL.Path) + _ = httpResponseError(w, err.Error()) + return + } + + responseBody, err := f(*newParams, w, req) + if err != nil { + _ = httpResponseError(w, err.Error()) + } else if responseBody != nil { + _ = httpResponseOK(w, responseBody) + } + } +} + +func withJsonResponse2(f func(w http.ResponseWriter, req *http.Request) (interface{}, error)) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, req *http.Request) { + responseBody, err := f(w, req) + if err != nil { + _ = httpResponseError(w, err.Error()) + } else if responseBody != nil { + _ = httpResponseJson(w, responseBody) + } + } +} + +func withQueryStringParams[T any](f func(params T, w http.ResponseWriter, req *http.Request) (interface{}, error), params interface{}) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, req *http.Request) { + var newParams T + query := req.URL.Query() + result, err := parseQueryParams(func(key string) string { + if key == "token" { + cookie, err := req.Cookie("token") + if err != nil { + panic(err) + } + return cookie.Value + } + + return query.Get(key) + }, newParams) + if err != nil { + _ = httpResponseError(w, err.Error()) + return + } + + responseBody, err := f(result.(T), w, req) + if err != nil { + _ = httpResponseError(w, err.Error()) + } else if responseBody != nil { + _ = httpResponseJson(w, responseBody) + } + } +} + +func withFormDataParams[T any](f func(params T, w http.ResponseWriter, req *http.Request) (interface{}, error), params interface{}) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, req *http.Request) { + var newParams T + result, err := parseQueryParams(func(key string) string { + if key == "token" { + cookie, err := req.Cookie("token") + if err != nil { + panic(err) + } + return cookie.Value + } + + return req.FormValue(key) + }, newParams) + if err != nil { + _ = httpResponseError(w, err.Error()) + return + } + + responseBody, err := f(result.(T), w, req) + if err != nil { + _ = httpResponseError(w, err.Error()) + } else if responseBody != nil { + _ = httpResponseJson(w, responseBody) + } + } +} + +// 验证和刷新token +func withVerify(f func(w http.ResponseWriter, req *http.Request)) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, req *http.Request) { + cookie, err := req.Cookie("token") + if err != nil { + w.WriteHeader(http.StatusUnauthorized) + return + } + + ok := TokenManager.Refresh(cookie.Value, time.Now()) + if !ok { + w.WriteHeader(http.StatusUnauthorized) + return + } + + f(w, req) + } +} + +func withVerify2(onSuccess func(w http.ResponseWriter, req *http.Request), onFailure func(w http.ResponseWriter, req *http.Request)) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, req *http.Request) { + cookie, err := req.Cookie("token") + if err == nil && TokenManager.Refresh(cookie.Value, time.Now()) { + onSuccess(w, req) + } else { + onFailure(w, req) + } + } +} diff --git a/http_response.go b/http_response.go index 1f9273f..5d17391 100644 --- a/http_response.go +++ b/http_response.go @@ -11,31 +11,49 @@ type Response[T any] struct { Data T `json:"data"` } -func httpResponse(w http.ResponseWriter, code int, msg string) { - httpResponseJson(w, MalformedRequest{ +func httpResponse(w http.ResponseWriter, code int, msg string) error { + return httpResponseJson(w, MalformedRequest{ Code: code, Msg: msg, }) } -func httpResponseJson(w http.ResponseWriter, payload interface{}) { - body, _ := json.Marshal(payload) +func httpResponseJson(w http.ResponseWriter, payload interface{}) error { + body, err := json.Marshal(payload) + if err != nil { + return err + } + + return httpResponseJsonStr(w, string(body)) +} + +func httpResponseJsonStr(w http.ResponseWriter, payload string) error { w.Header().Set("Content-Type", "application/json") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT") - w.Write(body) + + _, err := w.Write([]byte(payload)) + return err } -func httpResponseOK(w http.ResponseWriter, data interface{}) { - httpResponseJson(w, MalformedRequest{ +func httpResponseOK(w http.ResponseWriter, data interface{}) error { + return httpResponseJson(w, MalformedRequest{ Code: http.StatusOK, Msg: "ok", Data: data, }) } -func httpResponseError(w http.ResponseWriter, msg string) { - httpResponseJson(w, MalformedRequest{ +func httpResponseSuccess(w http.ResponseWriter, data interface{}) error { + return httpResponseJson(w, MalformedRequest{ + Code: http.StatusOK, + Msg: "Success", + Data: data, + }) +} + +func httpResponseError(w http.ResponseWriter, msg string) error { + return httpResponseJson(w, MalformedRequest{ Code: -1, Msg: msg, Data: nil, diff --git a/live.go b/live.go index ee7d5e4..f70f18e 100644 --- a/live.go +++ b/live.go @@ -66,6 +66,7 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId // 等待流媒体服务发送推流通知 wait := func() bool { waiting := StreamWaiting{} + logger.Infof("等待收流通知 streamId: %s", streamId) _, _ = EarlyDialogs.Add(string(streamId), &waiting) defer EarlyDialogs.Remove(string(streamId)) @@ -83,7 +84,7 @@ func (d *Device) StartStream(inviteType InviteType, streamId StreamID, channelId return nil, fmt.Errorf("receiving stream timed out") } - stream.urls = urls + stream.Urls = urls // 保存到数据库 _ = StreamDao.UpdateStream(stream) diff --git a/livegbs_bean.go b/livegbs_bean.go new file mode 100644 index 0000000..d3ac1d7 --- /dev/null +++ b/livegbs_bean.go @@ -0,0 +1,153 @@ +package main + +type LiveGBSDevice struct { + AlarmSubscribe bool `json:"AlarmSubscribe"` + CatalogInterval int `json:"CatalogInterval"` + CatalogSubscribe bool `json:"CatalogSubscribe"` + ChannelCount int `json:"ChannelCount"` + ChannelOverLoad bool `json:"ChannelOverLoad"` + Charset string `json:"Charset"` + CivilCodeFirst bool `json:"CivilCodeFirst"` + CommandTransport string `json:"CommandTransport"` + ContactIP string `json:"ContactIP"` + CreatedAt string `json:"CreatedAt"` + CustomName string `json:"CustomName"` + DropChannelType string `json:"DropChannelType"` + GBVer string `json:"GBVer"` + ID string `json:"ID"` + KeepOriginalTree bool `json:"KeepOriginalTree"` + LastKeepaliveAt string `json:"LastKeepaliveAt"` + LastRegisterAt string `json:"LastRegisterAt"` + Latitude float64 `json:"Latitude"` + Longitude float64 `json:"Longitude"` + Manufacturer string `json:"Manufacturer"` + MediaTransport string `json:"MediaTransport"` + MediaTransportMode string `json:"MediaTransportMode"` + Name string `json:"Name"` + Online bool `json:"Online"` + PTZSubscribe bool `json:"PTZSubscribe"` + Password string `json:"Password"` + PositionSubscribe bool `json:"PositionSubscribe"` + RecordCenter bool `json:"RecordCenter"` + RecordIndistinct bool `json:"RecordIndistinct"` + RecvStreamIP string `json:"RecvStreamIP"` + RemoteIP string `json:"RemoteIP"` + RemotePort int `json:"RemotePort"` + RemoteRegion string `json:"RemoteRegion"` + SMSGroupID string `json:"SMSGroupID"` + SMSID string `json:"SMSID"` + StreamMode string `json:"StreamMode"` + SubscribeInterval int `json:"SubscribeInterval"` + Type string `json:"Type"` + UpdatedAt string `json:"UpdatedAt"` +} + +type LiveGBSChannel struct { + Address string `json:"Address"` + Altitude int `json:"Altitude"` + AudioEnable bool `json:"AudioEnable"` + BatteryLevel int `json:"BatteryLevel"` + Block string `json:"Block"` + Channel int `json:"Channel"` + CivilCode string `json:"CivilCode"` + CloudRecord bool `json:"CloudRecord"` + CreatedAt string `json:"CreatedAt"` + Custom bool `json:"Custom"` + CustomAddress string `json:"CustomAddress"` + CustomBlock string `json:"CustomBlock"` + CustomCivilCode string `json:"CustomCivilCode"` + CustomFirmware string `json:"CustomFirmware"` + CustomID string `json:"CustomID"` + CustomIPAddress string `json:"CustomIPAddress"` + CustomLatitude int `json:"CustomLatitude"` + CustomLongitude int `json:"CustomLongitude"` + CustomManufacturer string `json:"CustomManufacturer"` + CustomModel string `json:"CustomModel"` + CustomName string `json:"CustomName"` + CustomPTZType int `json:"CustomPTZType"` + CustomParentID string `json:"CustomParentID"` + CustomPort int `json:"CustomPort"` + CustomSerialNumber string `json:"CustomSerialNumber"` + CustomStatus string `json:"CustomStatus"` + Description string `json:"Description"` + DeviceCustomName string `json:"DeviceCustomName"` + DeviceID string `json:"DeviceID"` + DeviceName string `json:"DeviceName"` + DeviceOnline bool `json:"DeviceOnline"` + DeviceType string `json:"DeviceType"` + Direction int `json:"Direction"` + DownloadSpeed string `json:"DownloadSpeed"` + Firmware string `json:"Firmware"` + ID string `json:"ID"` + IPAddress string `json:"IPAddress"` + Latitude int `json:"Latitude"` + Longitude int `json:"Longitude"` + Manufacturer string `json:"Manufacturer"` + Model string `json:"Model"` + Name string `json:"Name"` + NumOutputs int `json:"NumOutputs"` + Ondemand bool `json:"Ondemand"` + Owner string `json:"Owner"` + PTZType int `json:"PTZType"` + ParentID string `json:"ParentID"` + Parental int `json:"Parental"` + Port int `json:"Port"` + Quality string `json:"Quality"` + RegisterWay int `json:"RegisterWay"` + Secrecy int `json:"Secrecy"` + SerialNumber string `json:"SerialNumber"` + Shared bool `json:"Shared"` + SignalLevel int `json:"SignalLevel"` + SnapURL string `json:"SnapURL"` + Speed int `json:"Speed"` + Status string `json:"Status"` + StreamID string `json:"StreamID"` + SubCount int `json:"SubCount"` + UpdatedAt string `json:"UpdatedAt"` +} + +type LiveGBSStreamStart struct { + Serial string + Code string +} + +type LiveGBSStream struct { + AudioEnable bool `json:"AudioEnable"` + CDN string `json:"CDN"` + CascadeSize int `json:"CascadeSize"` + ChannelID string `json:"ChannelID"` + ChannelName string `json:"ChannelName"` + ChannelPTZType int `json:"ChannelPTZType"` + CloudRecord bool `json:"CloudRecord"` + DecodeSize int `json:"DecodeSize"` + DeviceID string `json:"DeviceID"` + Duration int `json:"Duration"` + FLV string `json:"FLV"` + HLS string `json:"HLS"` + InBitRate int `json:"InBitRate"` + InBytes int `json:"InBytes"` + NumOutputs int `json:"NumOutputs"` + Ondemand bool `json:"Ondemand"` + OutBytes int `json:"OutBytes"` + RTMP string `json:"RTMP"` + RTPCount int `json:"RTPCount"` + RTPLostCount int `json:"RTPLostCount"` + RTPLostRate int `json:"RTPLostRate"` + RTSP string `json:"RTSP"` + RecordStartAt string `json:"RecordStartAt"` + RelaySize int `json:"RelaySize"` + SMSID string `json:"SMSID"` + SnapURL string `json:"SnapURL"` + SourceAudioCodecName string `json:"SourceAudioCodecName"` + SourceAudioSampleRate int `json:"SourceAudioSampleRate"` + SourceVideoCodecName string `json:"SourceVideoCodecName"` + SourceVideoFrameRate int `json:"SourceVideoFrameRate"` + SourceVideoHeight int `json:"SourceVideoHeight"` + SourceVideoWidth int `json:"SourceVideoWidth"` + StartAt string `json:"StartAt"` + StreamID string `json:"StreamID"` + Transport string `json:"Transport"` + VideoFrameCount int `json:"VideoFrameCount"` + WEBRTC string `json:"WEBRTC"` + WS_FLV string `json:"WS_FLV"` +} diff --git a/main.go b/main.go index bd78b24..f52700a 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "gb-cms/hook" + "github.com/shirou/gopsutil/v3/host" "go.uber.org/zap/zapcore" "net" "net/http" @@ -12,11 +13,16 @@ import ( ) var ( - Config *Config_ - SipStack SipServer + Config *Config_ + SipStack SipServer + PwdMD5 string + StartUpTime time.Time + KernelArch string ) func init() { + StartUpTime = time.Now() + logConfig := LogConfig{ Level: int(zapcore.DebugLevel), Name: "./logs/clog", @@ -39,10 +45,28 @@ func main() { indent, _ := json.MarshalIndent(Config, "", "\t") Sugar.Infof("server config:\r\n%s", indent) + info, err := host.Info() + if err != nil { + Sugar.Errorf(err.Error()) + } else { + KernelArch = info.KernelArch + } + if config.Hooks.OnInvite != "" { hook.RegisterEventUrl(hook.EventTypeDeviceOnInvite, config.Hooks.OnInvite) } + plaintext, md5 := ReadTempPwd() + if plaintext != "" { + Sugar.Infof("temp pwd: %s", plaintext) + } + + PwdMD5 = md5 + + // 启动session超时管理 + go TokenManager.Start(5 * time.Minute) + + // 启动设备在线超时管理 OnlineDeviceManager.Start(time.Duration(Config.AliveExpires)*time.Second/4, time.Duration(Config.AliveExpires)*time.Second, OnExpires) // 从数据库中恢复会话 @@ -61,6 +85,8 @@ func main() { panic(err) } + go StartStats() + Sugar.Infof("启动sip server成功. addr: %s:%d", config.ListenIP, config.SipPort) Config.SipContactAddr = net.JoinHostPort(config.PublicIP, strconv.Itoa(config.SipPort)) SipStack = server diff --git a/online_devices.go b/online_devices.go index b0e4114..662d320 100644 --- a/online_devices.go +++ b/online_devices.go @@ -33,6 +33,12 @@ func (m *onlineDeviceManager) Find(deviceId string) (time.Time, bool) { return t, ok } +func (m *onlineDeviceManager) Count() int { + m.lock.RLock() + defer m.lock.RUnlock() + return len(m.devices) +} + func (m *onlineDeviceManager) Start(interval time.Duration, keepalive time.Duration, OnExpires func(platformId int, deviceId string)) { // 精度有偏差 var timer *time.Timer diff --git a/platform.go b/platform.go index 1a04f24..c05078d 100644 --- a/platform.go +++ b/platform.go @@ -24,6 +24,16 @@ func (g *Platform) OnBye(request sip.Request) { g.CloseStream(id.Value(), false, true) } +func (g *Platform) OnQueryCatalog(sn int, channels []*Channel) { + // 添加本级域 + channels = append(channels, &Channel{ + DeviceID: g.ServerID, + Setup: SetupTypePassive, + }) + + g.gbClient.OnQueryCatalog(sn, channels) +} + // CloseStream 关闭级联会话 func (g *Platform) CloseStream(callId string, bye, ms bool) { sink, _ := SinkDao.DeleteForwardSinkByCallID(callId) @@ -74,7 +84,7 @@ func (g *Platform) OnInvite(request sip.Request, user string) sip.Response { // 如果流不存在, 向通道发送Invite请求 stream, _ := StreamDao.QueryStream(streamId) if stream == nil { - stream, err = device.StartStream(inviteType, streamId, user, gbSdp.startTime, gbSdp.stopTime, channel.SetupType.String(), 0, true) + stream, err = device.StartStream(inviteType, streamId, user, gbSdp.startTime, gbSdp.stopTime, channel.Setup.String(), 0, true) if err != nil { Sugar.Errorf("处理上级Invite失败 err: %s stream: %s", err.Error(), streamId) return CreateResponseWithStatusCode(request, http.StatusBadRequest) diff --git a/sip_server.go b/sip_server.go index d2a760e..9e7c78e 100644 --- a/sip_server.go +++ b/sip_server.go @@ -311,6 +311,7 @@ func (s *sipServer) OnMessage(wrapper *SipRequestSource) { if CmdCatalog == cmd { s.handler.OnCatalog(deviceId, message.(*CatalogResponse)) } else if CmdRecordInfo == cmd { + Sugar.Infof("查询录像列表 %s", body) s.handler.OnRecord(deviceId, message.(*QueryRecordInfoResponse)) } else if CmdDeviceInfo == cmd { s.handler.OnDeviceInfo(deviceId, message.(*DeviceInfoResponse)) diff --git a/stats.go b/stats.go new file mode 100644 index 0000000..56427d1 --- /dev/null +++ b/stats.go @@ -0,0 +1,402 @@ +package main + +// 每秒钟统计系统资源占用, 包括: cpu/流量/磁盘/内存 +import ( + "encoding/json" + "fmt" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/disk" + "github.com/shirou/gopsutil/v3/mem" + "github.com/shirou/gopsutil/v3/net" + "math" + "net/http" + "strings" + "time" +) + +var ( + topStats *TopStats + topStatsJson string + diskStatsJson string + lastNetStatsJson string + lastNetStats []net.IOCountersStat +) + +const ( + // MaxStatsCount 最大统计条数 + MaxStatsCount = 30 +) + +func init() { + topStats = &TopStats{ + Load: []struct { + Time string `json:"time"` + Load float64 `json:"load"` + Serial string `json:"serial"` + Name string `json:"name"` + }{ + { + Time: time.Now().Format("2006-01-02 15:04:05"), + Load: 0, + Serial: "", + Name: "直播", + }, + { + Time: time.Now().Format("2006-01-02 15:04:05"), + Load: 0, + Serial: "", + Name: "回放", + }, + { + Time: time.Now().Format("2006-01-02 15:04:05"), + Load: 0, + Serial: "", + Name: "播放", + }, + { + Time: time.Now().Format("2006-01-02 15:04:05"), + Load: 0, + Serial: "", + Name: "H265", + }, + { + Time: time.Now().Format("2006-01-02 15:04:05"), + Load: 0, + Serial: "", + Name: "录像", + }, + + { + Time: time.Now().Format("2006-01-02 15:04:05"), + Load: 0, + Serial: "", + Name: "级联", + }, + }, + } +} + +type TopStats struct { + CPU []struct { + Time string `json:"time"` + Use float64 `json:"use"` + } `json:"cpuData"` + + Load []struct { + Time string `json:"time"` + Load float64 `json:"load"` + Serial string `json:"serial"` + Name string `json:"name"` + } `json:"loadData"` + + Mem []struct { + Time string `json:"time"` + Use float64 `json:"use"` + } `json:"memData"` + + Net []struct { + Time string `json:"time"` + Sent float64 `json:"sent"` + Recv float64 `json:"recv"` + } `json:"netData"` +} + +// FormatDiskSize 返回大小和单位 +func FormatDiskSize(size uint64) (string, string) { + const ( + KB = 1024 + MB = 1024 * KB + GB = 1024 * MB + ) + + switch { + case size >= GB: + return fmt.Sprintf("%.2f", float64(size)/GB), "G" + case size >= MB: + return fmt.Sprintf("%.2f MB", float64(size)/MB), "M" + case size >= KB: + return fmt.Sprintf("%.2f KB", float64(size)/KB), "K" + default: + return fmt.Sprintf("%d B", size), "B" + } +} + +// {{ edit_1 }} 添加磁盘信息显示函数 +func stateDiskUsage() ([]struct { + Name string + Unit string + Size string + FreeSpace string + Used string + Percent string + Threshold string +}, error) { + + // 获取所有磁盘分区 + partitions, err := disk.Partitions(false) // true表示获取所有分区,包括远程分区 + if err != nil { + return nil, err + } + + var result []struct { + Name string + Unit string + Size string + FreeSpace string + Used string + Percent string + Threshold string + } + for _, partition := range partitions { + // 跳过某些特殊文件系统类型 + if partition.Fstype == "tmpfs" || partition.Fstype == "devtmpfs" || partition.Fstype == "squashfs" { + continue + } + + // 获取分区使用情况 + usage, err := disk.Usage(partition.Mountpoint) + if err != nil { + // 某些分区可能无法访问,跳过 + continue + } + + // 格式化磁盘大小 + size, unit := FormatDiskSize(usage.Total) + freeSpace, unit := FormatDiskSize(usage.Free) + used, unit := FormatDiskSize(usage.Used) + percent := fmt.Sprintf("%.2f", usage.UsedPercent) + + result = append(result, struct { + Name string + Unit string + Size string + FreeSpace string + Used string + Percent string + Threshold string + }{Name: partition.Mountpoint, Unit: unit, Size: size, FreeSpace: freeSpace, Used: used, Percent: percent, Threshold: ""}) + } + + return result, nil +} + +// 统计上下行流量 +func stateNet(refreshInterval time.Duration) (float64, float64, error) { + if lastNetStats == nil { + // 获取初始的网络 IO 计数器 + lastStats, err := net.IOCounters(true) // pernic=true 表示按接口分别统计 + if err != nil { + return 0, 0, err + } + lastNetStats = lastStats + } + + currentStats, err := net.IOCounters(true) + if err != nil { + return 0, 0, err + } + + var rxTotal float64 + var txTotal float64 + for _, current := range currentStats { + if !isPhysicalInterface(current.Name, current) { + continue + } + + for _, last := range lastNetStats { + if current.Name == last.Name { + // 核心计算逻辑 + rxRate := float64(current.BytesRecv-last.BytesRecv) / refreshInterval.Seconds() + txRate := float64(current.BytesSent-last.BytesSent) / refreshInterval.Seconds() + rxTotal += rxRate + txTotal += txRate + break + } + } + } + + // 更新 lastStats + lastNetStats = currentStats + // 按照Mbps统计, 保留3位小数 + rxTotal = math.Round(rxTotal*8/1024/1024*1000) / 1000 + txTotal = math.Round(txTotal*8/1024/1024*1000) / 1000 + return rxTotal, txTotal, nil +} + +func isPhysicalInterface(name string, stats net.IOCountersStat) bool { + // 跳过本地回环接口 + if name == "lo" || strings.Contains(strings.ToLower(name), "loopback") { + return false + } + + // 跳过虚拟接口 - 基于名称特征 + virtualKeywords := []string{ + "virtual", "vmnet", "veth", "docker", "bridge", "tun", "tap", + "npcap", "wfp", "lightweight", "filter", "vethernet", "isatap", + "teredo", "6to4", "vpn", "ras", "ppp", "slip", "wlanusb", + } + + lowerName := strings.ToLower(name) + for _, keyword := range virtualKeywords { + if strings.Contains(lowerName, keyword) { + return false + } + } + + // 特殊处理"本地连接"前缀的接口 + if strings.HasPrefix(name, "本地连接") { + // 但排除那些明显是虚拟的本地连接 + if strings.Contains(lowerName, "virtual") || strings.Contains(lowerName, "npcap") { + return false + } + // 如果有实际流量数据,更可能是物理接口 + return stats.BytesRecv > 0 || stats.BytesSent > 0 + } + + // 基于流量数据判断(物理接口通常有流量) + // 如果接口名称不包含虚拟特征且有流量数据,则认为是物理接口 + if stats.BytesRecv > 0 || stats.BytesSent > 0 { + return true + } + + // 对于没有流量的接口,基于名称判断 + physicalKeywords := []string{ + "ethernet", "以太网", "eth", "wlan", "wi-fi", "wireless", "wifi", + "lan", "net", "intel", "realtek", "broadcom", "atheros", "qualcomm", + } + + for _, keyword := range physicalKeywords { + if strings.Contains(lowerName, keyword) { + return true + } + } + + // 默认情况下,排除本地连接*这类虚拟接口 + if strings.HasPrefix(name, "本地连接*") { + return false + } + + return false +} + +func StartStats() { + // 统计间隔 + refreshInterval := 2 * time.Second + + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() + + var count int + for range ticker.C { + now := time.Now().Format("2006-01-02 15:04:05") + + // 获取CPU使用率 + cpuPercent, err := cpu.Percent(time.Second, false) + if err != nil { + Sugar.Errorf("获取CPU信息失败: %v", err) + } else { + // 所有核心 + var cpuPercentTotal float64 + for _, f := range cpuPercent { + cpuPercentTotal += f + } + + // float64保留两位小数 + cpuPercentTotal = math.Round(cpuPercentTotal*100) / 100 + + // 只统计30条,超过30条,删除最旧的 + if len(topStats.CPU) >= MaxStatsCount { + topStats.CPU = topStats.CPU[1:] + } + + topStats.CPU = append(topStats.CPU, struct { + Time string `json:"time"` + Use float64 `json:"use"` + }{ + Time: now, + Use: float64(cpuPercentTotal) / 100, + }) + + } + + // 获取内存信息 + memInfo, err := mem.VirtualMemory() + if err != nil { + Sugar.Errorf("获取内存信息失败: %v", err) + } else { + + // 只统计30条,超过30条,删除最旧的 + if len(topStats.Mem) >= MaxStatsCount { + topStats.Mem = topStats.Mem[1:] + } + + topStats.Mem = append(topStats.Mem, struct { + Time string `json:"time"` + Use float64 `json:"use"` + }{ + Time: now, + Use: math.Round(memInfo.UsedPercent) / 100, + }) + } + + // 获取网络信息 + rx, tx, err := stateNet(refreshInterval) + if err != nil { + Sugar.Errorf("获取网络信息失败: %v", err) + } else { + if len(topStats.Net) >= MaxStatsCount { + topStats.Net = topStats.Net[1:] + } + + topStats.Net = append(topStats.Net, struct { + Time string `json:"time"` + Sent float64 `json:"sent"` + Recv float64 `json:"recv"` + }{ + Time: now, + Sent: tx, + Recv: rx, + }) + + marshal, err := json.Marshal(topStats.Net[len(topStats.Net)-1]) + if err != nil { + Sugar.Errorf("序列化网络信息失败: %v", err) + } else { + lastNetStatsJson = string(marshal) + } + } + + marshal, err := json.Marshal(MalformedRequest{ + Code: http.StatusOK, + Msg: "Success", + Data: topStats, + }) + if err != nil { + Sugar.Errorf("序列化统计信息失败: %v", err) + } else { + topStatsJson = string(marshal) + } + + // 统计磁盘信息 + count++ + if count >= 5 { + count = 0 + usage, err := stateDiskUsage() + if err != nil { + Sugar.Errorf("获取磁盘信息失败: %v", err) + continue + } + + bytes, err := json.Marshal(MalformedRequest{ + Code: http.StatusOK, + Msg: "Success", + Data: usage, + }) + if err != nil { + Sugar.Errorf("序列化磁盘信息失败: %v", err) + } else { + diskStatsJson = string(bytes) + } + } + } +} diff --git a/stream.go b/stream.go index 3ac63d1..49a1d7e 100644 --- a/stream.go +++ b/stream.go @@ -12,7 +12,7 @@ import ( type SetupType int const ( - SetupTypeUDP SetupType = iota + SetupTypeUDP SetupType = iota + 1 SetupTypePassive SetupTypeActive ) @@ -23,15 +23,13 @@ var ( func (s SetupType) String() string { switch s { - case SetupTypeUDP: - return "udp" case SetupTypePassive: return "passive" case SetupTypeActive: return "active" + default: + return "udp" } - - panic("invalid setup type") } func (s SetupType) MediaProtocol() string { @@ -43,6 +41,15 @@ func (s SetupType) MediaProtocol() string { } } +func (s SetupType) Transport() string { + switch s { + case SetupTypePassive, SetupTypeActive: + return "TCP" + default: + return "UDP" + } +} + // RequestWrapper sql序列化 type RequestWrapper struct { sip.Request @@ -86,9 +93,8 @@ type Stream struct { Dialog *RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话 SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发) SetupType SetupType - CallID string `json:"call_id"` - - urls []string // 从流媒体服务器返回的拉流地址 + CallID string `json:"call_id"` + Urls []string `gorm:"serializer:json"` // 从流媒体服务器返回的拉流地址 } func (s *Stream) MarshalJSON() ([]byte, error) { diff --git a/temp_pwd.go b/temp_pwd.go new file mode 100644 index 0000000..79e2760 --- /dev/null +++ b/temp_pwd.go @@ -0,0 +1,47 @@ +package main + +import ( + "crypto/md5" + "encoding/hex" + "math/rand" + "os" + "time" +) + +func GenerateTempPwd() string { + // 根据字母数字符号生成12位随机密码 + // 字母数字符号 + const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()" + // 随机数 + rand.Seed(time.Now().UnixNano()) + // 生成12位随机密码 + b := make([]byte, 12) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + + return string(b) +} + +// ReadTempPwd 读取临时密码 +func ReadTempPwd() (plaintext string, md5Hex string) { + // 从文件中读取密码 + pwd, err := os.ReadFile("./data/pwd.txt") + if err != nil { + // 生成密码 + plaintext = GenerateTempPwd() + + // 计算md5 + hash := md5.Sum([]byte(plaintext)) + pwd = []byte(hex.EncodeToString(hash[:])) + + // 写入文件 + err = os.WriteFile("./data/pwd.txt", pwd, 0644) + if err != nil { + panic(err) + } + } + + md5Hex = string(pwd) + return +} diff --git a/token_manager.go b/token_manager.go new file mode 100644 index 0000000..5b91cbf --- /dev/null +++ b/token_manager.go @@ -0,0 +1,97 @@ +package main + +import ( + "math/rand" + "sync" + "time" +) + +var ( + TokenManager = tokenManager{ + tokens: make(map[string]*UserSession), + } +) + +type UserSession struct { + Username string + Pwd string + LoginTime time.Time + AliveTime time.Time +} + +type tokenManager struct { + tokens map[string]*UserSession + + lock sync.RWMutex +} + +func (t *tokenManager) Add(token string, username string, pwd string) { + t.lock.Lock() + defer t.lock.Unlock() + t.tokens[token] = &UserSession{ + Username: username, + Pwd: pwd, + LoginTime: time.Now(), + AliveTime: time.Now(), + } +} + +func (t *tokenManager) Find(token string) *UserSession { + t.lock.RLock() + defer t.lock.RUnlock() + return t.tokens[token] +} + +func (t *tokenManager) Remove(token string) { + t.lock.Lock() + defer t.lock.Unlock() + delete(t.tokens, token) +} + +func (t *tokenManager) Refresh(token string, time2 time.Time) bool { + t.lock.Lock() + defer t.lock.Unlock() + + session, ok := t.tokens[token] + if !ok { + return false + } + + session.AliveTime = time2 + return true +} + +func (t *tokenManager) Start(timeout time.Duration) { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + t.lock.Lock() + for token, session := range t.tokens { + if time.Since(session.AliveTime) > timeout { + delete(t.tokens, token) + } + } + t.lock.Unlock() + } + } +} + +func (t *tokenManager) Clear() { + // 清空所有token + t.lock.Lock() + defer t.lock.Unlock() + t.tokens = make(map[string]*UserSession) +} + +// GenerateToken 生成token +func GenerateToken() string { + // 从大小写字母和数字中随机选择 + charset := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + // 随机选择16个字符 + token := make([]byte, 16) + for i := range token { + token[i] = charset[rand.Intn(len(charset))] + } + return string(token) +} diff --git a/xml.go b/xml.go index 5c31bce..bd75ed6 100644 --- a/xml.go +++ b/xml.go @@ -48,8 +48,9 @@ type Channel struct { Status OnlineStatus `json:"status" xml:"Status,omitempty"` Longitude string `json:"longitude" xml:"Longitude,omitempty"` Latitude string `json:"latitude" xml:"Latitude,omitempty"` - SetupType SetupType `json:"setup_type,omitempty"` + Setup SetupType `json:"setup,omitempty"` ChannelNumber int `json:"channel_number" xml:"-"` // 对应1078的通道号 + SubCount int `json:"-" xml:"-" gorm:"-"` // 子节点数量 } func (d *Channel) Online() bool { diff --git a/xml_record.go b/xml_record.go index ab1f84e..b08c93a 100644 --- a/xml_record.go +++ b/xml_record.go @@ -32,6 +32,7 @@ type QueryRecordInfoResponse struct { DeviceID string `xml:"DeviceID"` SumNum int `xml:"SumNum"` DeviceList RecordList `xml:"RecordList"` + BaseMessage } type RecordList struct { @@ -41,13 +42,21 @@ type RecordList struct { } type RecordInfo struct { - FileSize uint64 `xml:"FileSize" json:"fileSize"` + DeviceID string `xml:"DeviceID"` + Name string `xml:"Name"` + FilePath string `xml:"FilePath" json:"filePath"` + Address string `xml:"Address"` StartTime string `xml:"StartTime" json:"startTime"` EndTime string `xml:"EndTime" json:"endTime"` - FilePath string `xml:"FilePath" json:"filePath"` + Secrecy string `xml:"Secrecy"` + Type string `xml:"Type"` + RecorderID string `xml:"RecorderID" json:"recorderId"` + FileSize uint64 `xml:"FileSize" json:"fileSize"` + RecordLocation string `xml:"RecordLocation"` + StreamNumber int `xml:"StreamNumber"` + ResourceType string `xml:"ResourceType" json:"type"` ResourceId string `xml:"ResourceId" json:"resourceId"` - RecorderId string `xml:"RecorderId" json:"recorderId"` UserId string `xml:"UserId" json:"userId"` UserName string `xml:"UserName" json:"userName"` ResourceName string `xml:"ResourceName" json:"resourceName"`