feat: 新增接口

1. /api/v1/device/session/list
2. /api/v1/device/session/stop
This commit is contained in:
ydajiang
2025-08-30 17:09:54 +08:00
parent 3eb5f95810
commit 368223e25c
5 changed files with 177 additions and 45 deletions

104
api.go
View File

@@ -2,6 +2,7 @@ package main
import (
"context"
"encoding/json"
"fmt"
"gb-cms/common"
"gb-cms/dao"
@@ -15,6 +16,7 @@ import (
"io"
"math"
"net/http"
"net/url"
"os"
"strconv"
"strings"
@@ -86,7 +88,7 @@ type RecordParams struct {
}
type StreamIDParams struct {
StreamID common.StreamID `json:"stream_id"`
StreamID string `json:"streamid"`
}
type PageQuery struct {
@@ -170,6 +172,8 @@ func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/stream/stop", withVerify(common.WithFormDataParams(apiServer.OnCloseStream, InviteParams{})))
apiServer.router.HandleFunc("/api/v1/device/list", withVerify(common.WithQueryStringParams(apiServer.OnDeviceList, QueryDeviceChannel{}))) // 查询设备列表
apiServer.router.HandleFunc("/api/v1/device/session/list", withVerify(common.WithQueryStringParams(apiServer.OnSessionList, QueryDeviceChannel{}))) // 推流列表
apiServer.router.HandleFunc("/api/v1/device/session/stop", withVerify(common.WithFormDataParams(apiServer.OnSessionStop, StreamIDParams{}))) // 关闭流
apiServer.router.HandleFunc("/api/v1/device/channellist", withVerify(common.WithQueryStringParams(apiServer.OnChannelList, QueryDeviceChannel{}))) // 查询通道列表
apiServer.router.HandleFunc("/api/v1/device/fetchcatalog", withVerify(common.WithQueryStringParams(apiServer.OnCatalogQuery, QueryDeviceChannel{}))) // 更新通道
apiServer.router.HandleFunc("/api/v1/playback/recordlist", withVerify(common.WithQueryStringParams(apiServer.OnRecordList, QueryRecordParams{}))) // 查询录像列表
@@ -566,14 +570,7 @@ func (api *ApiServer) DoInvite(inviteType common.InviteType, params *InviteParam
func (api *ApiServer) OnCloseStream(v *InviteParams, w http.ResponseWriter, _ *http.Request) (interface{}, error) {
streamID := common.GenerateStreamID(common.InviteTypePlay, v.DeviceID, v.ChannelID, "", "")
mode, err := dao.Stream.DeleteStream(streamID)
if err != nil {
log.Sugar.Errorf("删除流失败 err: %s", err.Error())
return nil, err
}
(&stack.Stream{mode}).Close(true, true)
stack.CloseStream(streamID, true)
return "OK", nil
}
@@ -586,6 +583,9 @@ type QueryDeviceChannel struct {
Keyword string `json:"q"`
Online string `json:"online"`
ChannelType string `json:"channel_type"`
Order string `json:"order"`
Sort string `json:"sort"`
SMS string `json:"sms"`
//pageNumber int
//pageSize int
@@ -1201,48 +1201,76 @@ func (api *ApiServer) OnCatalogQuery(params *QueryDeviceChannel, _ http.Response
}
func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
// 构建目标URL
targetURL := common.Config.MediaServer + r.URL.Path
if r.URL.RawQuery != "" {
targetURL += "?" + r.URL.RawQuery
}
// 创建转发请求
proxyReq, err := http.NewRequest(r.Method, targetURL, r.Body)
response, err := stack.MSQueryStreamInfo(r.Header, r.URL.RawQuery)
if err != nil {
http.Error(w, "Error creating proxy request", http.StatusInternalServerError)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
// 复制请求头
for name, values := range r.Header {
for _, value := range values {
proxyReq.Header.Add(name, value)
}
}
// 发送请求
client := &http.Client{
Timeout: 30 * time.Second,
}
resp, err := client.Do(proxyReq)
if err != nil {
http.Error(w, "Error forwarding request", http.StatusBadGateway)
return
}
defer resp.Body.Close()
defer response.Body.Close()
// 复制响应头
for name, values := range resp.Header {
for name, values := range response.Header {
for _, value := range values {
w.Header().Add(name, value)
}
}
// 设置状态码并转发响应体
w.WriteHeader(resp.StatusCode)
_, err = io.Copy(w, resp.Body)
w.WriteHeader(response.StatusCode)
_, err = io.Copy(w, response.Body)
if err != nil {
log.Sugar.Errorf("Failed to copy response body: %v", err)
}
}
func (api *ApiServer) OnSessionList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) {
streams, _, err := dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit)
if err != nil {
return nil, err
}
response := struct {
SessionCount int
SessionList []*StreamInfo
}{}
bytes := make([]byte, 4096)
for _, stream := range streams {
values := url.Values{}
values.Set("streamid", string(stream.StreamID))
resp, err := stack.MSQueryStreamInfo(r.Header, values.Encode())
if err != nil {
return nil, err
}
var n int
n, err = resp.Body.Read(bytes)
resp.Body.Close()
if n < 1 {
break
}
info := &StreamInfo{}
err = json.Unmarshal(bytes[:n], info)
if err != nil {
return nil, err
}
info.ChannelName = stream.Name
response.SessionList = append(response.SessionList, info)
}
response.SessionCount = len(response.SessionList)
return &response, nil
}
func (api *ApiServer) OnSessionStop(params *StreamIDParams, w http.ResponseWriter, req *http.Request) (interface{}, error) {
err := stack.MSCloseSource(params.StreamID)
if err != nil {
return nil, err
}
return "OK", nil
}

View File

@@ -43,6 +43,46 @@ type ModifyPasswordReq struct {
NewPwd string `json:"newpassword"`
}
type StreamInfo struct {
AudioEnable bool `json:"AudioEnable"`
CDN string `json:"CDN"`
CascadeSize int `json:"CascadeSize"`
ChannelID string `json:"ChannelID"`
ChannelName string `json:"ChannelName"`
CloudRecord bool `json:"CloudRecord"`
DecodeSize int `json:"DecodeSize"`
DeviceID string `json:"DeviceID"`
Duration int `json:"Duration"`
FLV string `json:"FLV"`
HLS string `json:"HLS"`
InBitRate int `json:"InBitRate"`
InBytes int `json:"InBytes"`
NumOutputs int `json:"NumOutputs"`
Ondemand bool `json:"Ondemand"`
OutBytes int `json:"OutBytes"`
RTMP string `json:"RTMP"`
RTPCount int `json:"RTPCount"`
RTPLostCount int `json:"RTPLostCount"`
RTPLostRate int `json:"RTPLostRate"`
RTSP string `json:"RTSP"`
RecordStartAt string `json:"RecordStartAt"`
RelaySize int `json:"RelaySize"`
SMSID string `json:"SMSID"`
SnapURL string `json:"SnapURL"`
SourceAudioCodecName string `json:"SourceAudioCodecName"`
SourceAudioSampleRate int `json:"SourceAudioSampleRate"`
SourceVideoCodecName string `json:"SourceVideoCodecName"`
SourceVideoFrameRate int `json:"SourceVideoFrameRate"`
SourceVideoHeight int `json:"SourceVideoHeight"`
SourceVideoWidth int `json:"SourceVideoWidth"`
StartAt string `json:"StartAt"`
StreamID string `json:"StreamID"`
Transport string `json:"Transport"`
VideoFrameCount int `json:"VideoFrameCount"`
WEBRTC string `json:"WEBRTC"`
WS_FLV string `json:"WS_FLV"`
}
func GetUptime() time.Duration {
return time.Since(StartUpTime)
}

View File

@@ -9,15 +9,16 @@ import (
type StreamModel struct {
GBModel
DeviceID string // 下级设备ID, 统计某个设备的所有流/1078设备为sim number
ChannelID string // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number
StreamID common.StreamID `json:"stream_id"` // 流ID
Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
Dialog *common.RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话
SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发)
DeviceID string `gorm:"index"` // 下级设备ID, 统计某个设备的所有流/1078设备为sim number
ChannelID string `gorm:"index"` // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number
StreamID common.StreamID `json:"stream_id" gorm:"index"` // 流ID
Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
Dialog *common.RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话
SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发)
SetupType common.SetupType
CallID string `json:"call_id"`
CallID string `json:"call_id" gorm:"index"`
Urls []string `gorm:"serializer:json"` // 从流媒体服务器返回的拉流地址
Name string `gorm:"index"` // 视频通道名
}
func (s *StreamModel) TableName() string {
@@ -45,6 +46,8 @@ type DaoStream interface {
QueryStream(streamId common.StreamID) (*StreamModel, error)
QueryStreams(keyword string, page, size int) ([]*StreamModel, int, error)
QueryStreamByCallID(callID string) (*StreamModel, error)
DeleteStreamByCallID(callID string) (*StreamModel, error)
@@ -176,3 +179,28 @@ func (d *daoStream) DeleteStreamByDeviceID(deviceID string) ([]*StreamModel, err
return streams, nil
}
func (d *daoStream) QueryStreams(keyword string, page, size int) ([]*StreamModel, int, error) {
var streams []*StreamModel
var total int64
tx := db.Model(&StreamModel{}).Offset((page - 1) * size).Limit(size)
if keyword != "" {
tx.Where("name like ? or device_id like ? or channel_id like ?", "%"+keyword+"%", "%"+keyword+"%", "%"+keyword+"%")
}
if tx = tx.Find(&streams); tx.Error != nil {
return nil, 0, tx.Error
}
tx = db.Model(&StreamModel{})
if keyword != "" {
tx.Where("name like ? or device_id like ? or channel_id like ?", "%"+keyword+"%", "%"+keyword+"%", "%"+keyword+"%")
}
if tx = tx.Count(&total); tx.Error != nil {
return nil, 0, tx.Error
}
return streams, int(total), nil
}

View File

@@ -17,11 +17,19 @@ import (
)
func (d *Device) StartStream(inviteType common.InviteType, streamId common.StreamID, channelId, startTime, stopTime, setup string, speed int, sync bool) (*dao.StreamModel, error) {
channel, err := dao.Channel.QueryChannel(d.DeviceID, channelId)
if err != nil {
return nil, err
} else if channel == nil {
return nil, fmt.Errorf("channel not found")
}
stream := &dao.StreamModel{
DeviceID: streamId.DeviceID(),
ChannelID: streamId.ChannelID(),
StreamID: streamId,
Protocol: SourceType28181,
Name: channel.Name,
}
// 先添加占位置, 防止重复请求

View File

@@ -240,3 +240,31 @@ func MSAddForwardSink(protocol int, source, addr, offerSetup, answerSetup, ssrc,
port, _ := strconv.Atoi(p)
return host, uint16(port), data.Data.Sink, data.Data.SSRC, nil
}
func MSQueryStreamInfo(header http.Header, queryParams string) (*http.Response, error) {
// 构建目标URL
targetURL := common.Config.MediaServer + "/api/v1/stream/info"
if queryParams != "" {
targetURL += "?" + queryParams
}
// 创建转发请求
proxyReq, err := http.NewRequest("POST", targetURL, nil)
if err != nil {
return nil, err
}
// 复制请求头
for name, values := range header {
for _, value := range values {
proxyReq.Header.Add(name, value)
}
}
// 发送请求
client := &http.Client{
Timeout: 30 * time.Second,
}
return client.Do(proxyReq)
}