feat: 实现推拉流统计和查询接口

This commit is contained in:
ydajiang
2025-09-05 20:18:14 +08:00
parent 013c27b742
commit f127efb343
9 changed files with 187 additions and 74 deletions

59
api.go
View File

@@ -127,6 +127,7 @@ type QueryDeviceChannel struct {
Order string `json:"order"` // asc/desc
Sort string `json:"sort"` // Channel-根据数据库ID排序/iD-根据通道ID排序
SMS string `json:"sms"`
Filter string `json:"filter"`
}
type DeleteDevice struct {
@@ -213,7 +214,7 @@ func withVerify2(onSuccess func(w http.ResponseWriter, req *http.Request), onFai
}
func startApiServer(addr string) {
apiServer.router.HandleFunc("/api/v1/hook/on_play", common.WithJsonParams(apiServer.OnPlay, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_play", common.WithJsonParams(apiServer.OnPlay, &PlayDoneParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_play_done", common.WithJsonParams(apiServer.OnPlayDone, &PlayDoneParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_publish", common.WithJsonParams(apiServer.OnPublish, &StreamParams{}))
apiServer.router.HandleFunc("/api/v1/hook/on_publish_done", common.WithJsonParams(apiServer.OnPublishDone, &StreamParams{}))
@@ -321,7 +322,7 @@ func startApiServer(addr string) {
}
}
func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *http.Request) {
func (api *ApiServer) OnPlay(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) {
log.Sugar.Infof("播放事件. protocol: %s stream: %s", params.Protocol, params.Stream)
// [注意]: windows上使用cmd/power shell推拉流如果要携带多个参数, 请用双引号将与号引起来("&")
@@ -379,15 +380,20 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
if TokenManager.Find(streamToken) == nil {
w.WriteHeader(http.StatusUnauthorized)
log.Sugar.Errorf("播放鉴权失败, token不存在 token: %s", streamToken)
return
}
if stream, _ := dao.Stream.QueryStream(params.Stream); stream == nil {
} else if stream, _ := dao.Stream.QueryStream(params.Stream); stream == nil {
w.WriteHeader(http.StatusNotFound)
return
} else {
_ = dao.Sink.SaveForwardSink(&dao.SinkModel{
SinkID: params.Sink,
StreamID: params.Stream,
Protocol: params.Protocol,
RemoteAddr: params.RemoteAddr,
})
}
return
}
// 对讲/级联, 在此处请求流
inviteParams := &InviteParams{
DeviceID: deviceId,
ChannelID: channelId,
@@ -414,6 +420,13 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
utils.Assert(http.StatusOK != code)
} else if http.StatusOK == code {
_ = stream.ID
_ = dao.Sink.SaveForwardSink(&dao.SinkModel{
SinkID: params.Sink,
StreamID: params.Stream,
Protocol: params.Protocol,
RemoteAddr: params.RemoteAddr,
})
}
}
@@ -423,7 +436,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt
func (api *ApiServer) OnPlayDone(params *PlayDoneParams, _ http.ResponseWriter, _ *http.Request) {
log.Sugar.Debugf("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream)
sink, _ := dao.Sink.DeleteForwardSink(params.Stream, params.Sink)
sink, _ := dao.Sink.DeleteForwardSink(params.Sink)
if sink == nil {
return
}
@@ -994,13 +1007,13 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, _ http.ResponseWriter, r *
sink := &dao.SinkModel{
StreamID: v.StreamId,
SinkStreamID: sinkStreamId,
Protocol: "gb_talk",
Protocol: stack.SourceTypeGBTalk,
CreateTime: time.Now().Unix(),
SetupType: setupType,
}
streamWaiting := &stack.StreamWaiting{Data: sink}
if err := dao.Sink.SaveForwardSink(v.StreamId, sink); err != nil {
if err := dao.Sink.SaveForwardSink(sink); err != nil {
log.Sugar.Errorf("广播失败, 设备正在广播中. stream: %s", sinkStreamId)
return nil, fmt.Errorf("设备正在广播中")
} else if _, ok = stack.EarlyDialogs.Add(InviteSourceId, streamWaiting); !ok {
@@ -1312,7 +1325,31 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) {
}
func (api *ApiServer) OnSessionList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) {
streams, _, err := dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit)
//filter := q.Filter // playing-正在播放/stream-不包含回放和下载/record-正在回放的流/hevc-h265流/cascade-级联
var streams []*dao.StreamModel
var err error
if "cascade" == q.Filter {
protocols := []int{stack.TransStreamGBCascaded}
var ids []string
ids, _, err = dao.Sink.QueryStreamIds(protocols, (q.Start/q.Limit)+1, q.Limit)
if len(ids) > 0 {
streams, err = dao.Stream.QueryStreamsByIds(ids)
}
} else if "stream" == q.Filter {
streams, _, err = dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit, "play")
} else if "record" == q.Filter {
streams, _, err = dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit, "playback")
} else if "playing" == q.Filter {
protocols := []int{stack.TransStreamRtmp, stack.TransStreamFlv, stack.TransStreamRtsp, stack.TransStreamHls, stack.TransStreamRtc}
var ids []string
ids, _, err = dao.Sink.QueryStreamIds(protocols, (q.Start/q.Limit)+1, q.Limit)
if len(ids) > 0 {
streams, err = dao.Stream.QueryStreamsByIds(ids)
}
} else {
streams, _, err = dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit, "")
}
if err != nil {
return nil, err
}

View File

@@ -1,7 +1,6 @@
package dao
import (
"fmt"
"gb-cms/common"
"gorm.io/gorm"
)
@@ -10,14 +9,15 @@ import (
type SinkModel struct {
GBModel
SinkID string `json:"sink_id"` // 流媒体服务器中的sink id
StreamID common.StreamID `json:"stream_id"` // 推流ID
StreamID common.StreamID `json:"stream_id"` // 所属的推流ID
SinkStreamID common.StreamID `json:"sink_stream_id"` // 广播使用, 每个广播设备的唯一ID
Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded/gb_talk/gb_gateway
Protocol int `json:"protocol,omitempty"` // 流协议, @See stack.TransStreamRtmp
Dialog *common.RequestWrapper `json:"dialog,omitempty"`
CallID string `json:"call_id,omitempty"`
ServerAddr string `json:"server_addr,omitempty"` // 级联上级地址
CreateTime int64 `json:"create_time"`
SetupType common.SetupType // 流转发类型
RemoteAddr string
}
func (d *SinkModel) TableName() string {
@@ -61,19 +61,13 @@ func (d *daoSink) QueryForwardSinks(stream common.StreamID) (map[string]*SinkMod
return sinkMap, nil
}
func (d *daoSink) SaveForwardSink(stream common.StreamID, sink *SinkModel) error {
var old SinkModel
tx := db.Select("id").Where("sink_id =?", sink.SinkID).Take(&old)
if tx.Error == nil {
return fmt.Errorf("sink already exists")
}
func (d *daoSink) SaveForwardSink(sink *SinkModel) error {
return DBTransaction(func(tx *gorm.DB) error {
return tx.Save(sink).Error
return tx.Create(sink).Error
})
}
func (d *daoSink) DeleteForwardSink(stream common.StreamID, sinkId string) (*SinkModel, error) {
func (d *daoSink) DeleteForwardSink(sinkId string) (*SinkModel, error) {
var sink SinkModel
tx := db.Where("sink_id =?", sinkId).Take(&sink)
if tx.Error != nil {
@@ -159,3 +153,40 @@ func (d *daoSink) DeleteForwardSinksByServerAddr(addr string) ([]*SinkModel, err
return tx.Where("server_addr =?", addr).Unscoped().Delete(&SinkModel{}).Error
})
}
func (d *daoSink) QuerySinkCountByProtocol(protocol int) (int, error) {
var count int64
tx := db.Model(&SinkModel{}).Where("protocol = ?", protocol).Count(&count)
if tx.Error != nil {
return 0, tx.Error
}
return int(count), nil
}
func (d *daoSink) Count() (int, error) {
var count int64
tx := db.Model(&SinkModel{}).Count(&count)
if tx.Error != nil {
return 0, tx.Error
}
return int(count), nil
}
// QueryStreamIds 指定多个protocol查询streamIds
func (d *daoSink) QueryStreamIds(protocols []int, page, size int) ([]string, int, error) {
// 查询总数
var total int64
tx := db.Model(&SinkModel{}).Where("protocol in ?", protocols).Group("stream_id").Count(&total)
if tx.Error != nil {
return nil, 0, tx.Error
}
var streamIds []string
// 分页查询
tx = db.Model(&SinkModel{}).Select("stream_id").Where("protocol in ?", protocols).Group("stream_id").Offset((page - 1) * size).Limit(size).Find(&streamIds)
if tx.Error != nil {
return nil, 0, tx.Error
}
return streamIds, int(total), nil
}

View File

@@ -9,16 +9,17 @@ import (
type StreamModel struct {
GBModel
DeviceID string `gorm:"index"` // 下级设备ID, 统计某个设备的所有流/1078设备为sim number
ChannelID string `gorm:"index"` // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number
StreamID common.StreamID `json:"stream_id" gorm:"index,unique"` // 流ID
Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk
Dialog *common.RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话
SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发)
SetupType common.SetupType
CallID string `json:"call_id" gorm:"index"`
Urls []string `gorm:"serializer:json"` // 从流媒体服务器返回的拉流地址
Name string `gorm:"index"` // 视频通道名
DeviceID string `gorm:"index"` // 下级设备ID, 统计某个设备的所有流/1078设备为sim number
ChannelID string `gorm:"index"` // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number
StreamID common.StreamID `json:"stream_id" gorm:"index,unique"` // 流ID
Protocol int `json:"protocol,omitempty"` // 推流协议, @See stack.SourceTypeRtmp
StreamType string // play/playback/download
Dialog *common.RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话
SetupType common.SetupType // 取流方式
CallID string `json:"call_id" gorm:"index"`
Urls []string `gorm:"serializer:json"` // 从流媒体服务器返回的拉流地址
Name string `gorm:"index"` // 视频通道名
RemoteAddr string
}
func (s *StreamModel) TableName() string {
@@ -156,7 +157,7 @@ func (d *daoStream) DeleteStreamByDeviceID(deviceID string) ([]*StreamModel, err
return streams, nil
}
func (d *daoStream) QueryStreams(keyword string, page, size int) ([]*StreamModel, int, error) {
func (d *daoStream) QueryStreams(keyword string, page, size int, streamType string) ([]*StreamModel, int, error) {
var streams []*StreamModel
var total int64
@@ -164,6 +165,9 @@ func (d *daoStream) QueryStreams(keyword string, page, size int) ([]*StreamModel
if keyword != "" {
tx.Where("name like ? or device_id like ? or channel_id like ?", "%"+keyword+"%", "%"+keyword+"%", "%"+keyword+"%")
}
if streamType != "" {
tx.Where("stream_type = ?", streamType)
}
if tx = tx.Find(&streams); tx.Error != nil {
return nil, 0, tx.Error
@@ -174,9 +178,40 @@ func (d *daoStream) QueryStreams(keyword string, page, size int) ([]*StreamModel
tx.Where("name like ? or device_id like ? or channel_id like ?", "%"+keyword+"%", "%"+keyword+"%", "%"+keyword+"%")
}
if streamType != "" {
tx.Where("stream_type = ?", streamType)
}
if tx = tx.Count(&total); tx.Error != nil {
return nil, 0, tx.Error
}
return streams, int(total), nil
}
// QueryStreamsByIds 通过ids查询stream列表
func (d *daoStream) QueryStreamsByIds(ids []string) ([]*StreamModel, error) {
var streams []*StreamModel
tx := db.Where("stream_id in ?", ids).Find(&streams)
if tx.Error != nil {
return nil, tx.Error
}
return streams, nil
}
// QueryStreamCountByType 根据streamType计数stream
func (d *daoStream) QueryStreamCountByType(streamType string) (int, error) {
var count int64
tx := db.Model(&StreamModel{}).Where("stream_type = ?", streamType).Count(&count)
if tx.Error != nil {
return 0, tx.Error
}
return int(count), nil
}
// Count 返回表记录数
func (d *daoStream) Count() (int, error) {
var count int64
db.Model(&StreamModel{}).Count(&count)
return int(count), nil
}

View File

@@ -48,7 +48,7 @@ func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response {
sink := &dao.SinkModel{
StreamID: streamId,
ServerAddr: g.ServerAddr,
Protocol: "gb_gateway"}
Protocol: TransStreamGBGateway}
response, err := AddForwardSink(TransStreamGBGateway, request, user, &Sink{sink}, streamId, gbsdp, inviteType, "96 PS/90000")
if err != nil {

View File

@@ -23,11 +23,12 @@ func (d *Device) StartStream(inviteType common.InviteType, streamId common.Strea
}
stream := &dao.StreamModel{
DeviceID: streamId.DeviceID(),
ChannelID: streamId.ChannelID(),
StreamID: streamId,
Protocol: SourceType28181,
Name: channel.Name,
DeviceID: streamId.DeviceID(),
ChannelID: streamId.ChannelID(),
StreamID: streamId,
Protocol: SourceType28181,
StreamType: string(inviteType),
Name: channel.Name,
}
// 先添加占位置, 防止重复请求

View File

@@ -88,7 +88,7 @@ func (g *Platform) OnInvite(request sip.Request, user string) sip.Response {
sink := &dao.SinkModel{
StreamID: streamId,
ServerAddr: g.ServerAddr,
Protocol: "gb_cascaded"}
Protocol: TransStreamGBCascaded}
// 添加转发sink到流媒体服务器
response, err := AddForwardSink(TransStreamGBCascaded, request, user, &Sink{sink}, streamId, gbSdp, inviteType, "96 PS/90000")

View File

@@ -121,7 +121,7 @@ func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sin
sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source()))
if err = dao.Sink.SaveForwardSink(streamId, sink.SinkModel); err != nil {
if err = dao.Sink.SaveForwardSink(sink.SinkModel); err != nil {
log.Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error())
}

View File

@@ -7,7 +7,6 @@ import (
"gb-cms/log"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/sip/parser"
"sync/atomic"
)
type Stream struct {
@@ -59,25 +58,6 @@ func (s *Stream) UnmarshalJSON(data []byte) error {
return nil
}
func (s *Stream) GetSinkCount() int32 {
return atomic.LoadInt32(&s.SinkCount)
}
func (s *Stream) IncreaseSinkCount() int32 {
value := atomic.AddInt32(&s.SinkCount, 1)
//Sugar.Infof("拉流计数: %d stream: %s ", value, s.StreamID)
// 启动协程去更新拉流计数, 可能会不一致
//go Stream.SaveStream(s)
return value
}
func (s *Stream) DecreaseSinkCount() int32 {
value := atomic.AddInt32(&s.SinkCount, -1)
//Sugar.Infof("拉流计数: %d stream: %s ", value, s.StreamID)
//go Stream.SaveStream(s)
return value
}
func (s *Stream) Close(bye, ms bool) {
// 断开与推流通道的sip会话
if bye {

View File

@@ -36,12 +36,7 @@ const (
func init() {
topStats = &TopStats{
Load: []struct {
Time string `json:"time"`
Load float64 `json:"load"`
Serial string `json:"serial"`
Name string `json:"name"`
}{
Load: []*StreamStats{
{
Time: time.Now().Format("2006-01-02 15:04:05"),
Load: 0,
@@ -83,18 +78,20 @@ func init() {
}
}
type StreamStats struct {
Time string `json:"time"`
Load float64 `json:"load"`
Serial string `json:"serial"`
Name string `json:"name"`
}
type TopStats struct {
CPU []struct {
Time string `json:"time"`
Use float64 `json:"use"`
} `json:"cpuData"`
Load []struct {
Time string `json:"time"`
Load float64 `json:"load"`
Serial string `json:"serial"`
Name string `json:"name"`
} `json:"loadData"`
Load []*StreamStats `json:"loadData"`
Mem []struct {
Time string `json:"time"`
@@ -373,6 +370,38 @@ func StartStats() {
}
}
// 统计流
var liveStreamCount, playbackStreamCount, playStreamCount, recordStreamCount, h265StreamCount, cascadeStreamCount int
streamCount, err := dao.Stream.Count()
if streamCount > 0 {
liveStreamCount, _ = dao.Stream.QueryStreamCountByType("play")
playbackStreamCount, _ = dao.Stream.QueryStreamCountByType("playback")
if i, _ := dao.Sink.Count(); i > 0 {
// 查询级联
cascadeStreamCount, _ = dao.Sink.QuerySinkCountByProtocol(stack.TransStreamGBCascaded)
playStreamCount = i - cascadeStreamCount
}
}
for _, s := range topStats.Load {
s.Time = now
if "直播" == s.Name {
s.Load = float64(liveStreamCount)
} else if "回放" == s.Name {
s.Load = float64(playbackStreamCount)
} else if "播放" == s.Name {
s.Load = float64(playStreamCount)
} else if "录像" == s.Name {
s.Load = float64(recordStreamCount)
} else if "H265" == s.Name {
s.Load = float64(h265StreamCount)
} else if "级联" == s.Name {
s.Load = float64(cascadeStreamCount)
}
}
// json序列化
marshal, err := json.Marshal(common.MalformedRequest{
Code: http.StatusOK,
Msg: "Success",