From f127efb343cbf7e197cf210fd33f24bc77375f8b Mon Sep 17 00:00:00 2001 From: ydajiang Date: Fri, 5 Sep 2025 20:18:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E6=8E=A8=E6=8B=89?= =?UTF-8?q?=E6=B5=81=E7=BB=9F=E8=AE=A1=E5=92=8C=E6=9F=A5=E8=AF=A2=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 59 +++++++++++++++++++++++++++++++++++++--------- dao/sink.go | 55 ++++++++++++++++++++++++++++++++---------- dao/stream.go | 57 +++++++++++++++++++++++++++++++++++--------- stack/jt_device.go | 2 +- stack/live.go | 11 +++++---- stack/platform.go | 2 +- stack/sink.go | 2 +- stack/stream.go | 20 ---------------- stats.go | 53 +++++++++++++++++++++++++++++++---------- 9 files changed, 187 insertions(+), 74 deletions(-) diff --git a/api.go b/api.go index 73ce4bb..9c510b0 100644 --- a/api.go +++ b/api.go @@ -127,6 +127,7 @@ type QueryDeviceChannel struct { Order string `json:"order"` // asc/desc Sort string `json:"sort"` // Channel-根据数据库ID排序/iD-根据通道ID排序 SMS string `json:"sms"` + Filter string `json:"filter"` } type DeleteDevice struct { @@ -213,7 +214,7 @@ func withVerify2(onSuccess func(w http.ResponseWriter, req *http.Request), onFai } func startApiServer(addr string) { - apiServer.router.HandleFunc("/api/v1/hook/on_play", common.WithJsonParams(apiServer.OnPlay, &StreamParams{})) + apiServer.router.HandleFunc("/api/v1/hook/on_play", common.WithJsonParams(apiServer.OnPlay, &PlayDoneParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_play_done", common.WithJsonParams(apiServer.OnPlayDone, &PlayDoneParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_publish", common.WithJsonParams(apiServer.OnPublish, &StreamParams{})) apiServer.router.HandleFunc("/api/v1/hook/on_publish_done", common.WithJsonParams(apiServer.OnPublishDone, &StreamParams{})) @@ -321,7 +322,7 @@ func startApiServer(addr string) { } } -func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *http.Request) { +func (api *ApiServer) OnPlay(params *PlayDoneParams, w http.ResponseWriter, r *http.Request) { log.Sugar.Infof("播放事件. protocol: %s stream: %s", params.Protocol, params.Stream) // [注意]: windows上使用cmd/power shell推拉流如果要携带多个参数, 请用双引号将与号引起来("&") @@ -379,15 +380,20 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt if TokenManager.Find(streamToken) == nil { w.WriteHeader(http.StatusUnauthorized) log.Sugar.Errorf("播放鉴权失败, token不存在 token: %s", streamToken) - return - } - - if stream, _ := dao.Stream.QueryStream(params.Stream); stream == nil { + } else if stream, _ := dao.Stream.QueryStream(params.Stream); stream == nil { w.WriteHeader(http.StatusNotFound) - return + } else { + _ = dao.Sink.SaveForwardSink(&dao.SinkModel{ + SinkID: params.Sink, + StreamID: params.Stream, + Protocol: params.Protocol, + RemoteAddr: params.RemoteAddr, + }) } + return } + // 对讲/级联, 在此处请求流 inviteParams := &InviteParams{ DeviceID: deviceId, ChannelID: channelId, @@ -414,6 +420,13 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt utils.Assert(http.StatusOK != code) } else if http.StatusOK == code { _ = stream.ID + + _ = dao.Sink.SaveForwardSink(&dao.SinkModel{ + SinkID: params.Sink, + StreamID: params.Stream, + Protocol: params.Protocol, + RemoteAddr: params.RemoteAddr, + }) } } @@ -423,7 +436,7 @@ func (api *ApiServer) OnPlay(params *StreamParams, w http.ResponseWriter, r *htt func (api *ApiServer) OnPlayDone(params *PlayDoneParams, _ http.ResponseWriter, _ *http.Request) { log.Sugar.Debugf("播放结束事件. protocol: %s stream: %s", params.Protocol, params.Stream) - sink, _ := dao.Sink.DeleteForwardSink(params.Stream, params.Sink) + sink, _ := dao.Sink.DeleteForwardSink(params.Sink) if sink == nil { return } @@ -994,13 +1007,13 @@ func (api *ApiServer) OnBroadcast(v *BroadcastParams, _ http.ResponseWriter, r * sink := &dao.SinkModel{ StreamID: v.StreamId, SinkStreamID: sinkStreamId, - Protocol: "gb_talk", + Protocol: stack.SourceTypeGBTalk, CreateTime: time.Now().Unix(), SetupType: setupType, } streamWaiting := &stack.StreamWaiting{Data: sink} - if err := dao.Sink.SaveForwardSink(v.StreamId, sink); err != nil { + if err := dao.Sink.SaveForwardSink(sink); err != nil { log.Sugar.Errorf("广播失败, 设备正在广播中. stream: %s", sinkStreamId) return nil, fmt.Errorf("设备正在广播中") } else if _, ok = stack.EarlyDialogs.Add(InviteSourceId, streamWaiting); !ok { @@ -1312,7 +1325,31 @@ func (api *ApiServer) OnStreamInfo(w http.ResponseWriter, r *http.Request) { } func (api *ApiServer) OnSessionList(q *QueryDeviceChannel, _ http.ResponseWriter, r *http.Request) (interface{}, error) { - streams, _, err := dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit) + //filter := q.Filter // playing-正在播放/stream-不包含回放和下载/record-正在回放的流/hevc-h265流/cascade-级联 + var streams []*dao.StreamModel + var err error + if "cascade" == q.Filter { + protocols := []int{stack.TransStreamGBCascaded} + var ids []string + ids, _, err = dao.Sink.QueryStreamIds(protocols, (q.Start/q.Limit)+1, q.Limit) + if len(ids) > 0 { + streams, err = dao.Stream.QueryStreamsByIds(ids) + } + } else if "stream" == q.Filter { + streams, _, err = dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit, "play") + } else if "record" == q.Filter { + streams, _, err = dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit, "playback") + } else if "playing" == q.Filter { + protocols := []int{stack.TransStreamRtmp, stack.TransStreamFlv, stack.TransStreamRtsp, stack.TransStreamHls, stack.TransStreamRtc} + var ids []string + ids, _, err = dao.Sink.QueryStreamIds(protocols, (q.Start/q.Limit)+1, q.Limit) + if len(ids) > 0 { + streams, err = dao.Stream.QueryStreamsByIds(ids) + } + } else { + streams, _, err = dao.Stream.QueryStreams(q.Keyword, (q.Start/q.Limit)+1, q.Limit, "") + } + if err != nil { return nil, err } diff --git a/dao/sink.go b/dao/sink.go index d81fd65..27bd1c0 100644 --- a/dao/sink.go +++ b/dao/sink.go @@ -1,7 +1,6 @@ package dao import ( - "fmt" "gb-cms/common" "gorm.io/gorm" ) @@ -10,14 +9,15 @@ import ( type SinkModel struct { GBModel SinkID string `json:"sink_id"` // 流媒体服务器中的sink id - StreamID common.StreamID `json:"stream_id"` // 推流ID + StreamID common.StreamID `json:"stream_id"` // 所属的推流ID SinkStreamID common.StreamID `json:"sink_stream_id"` // 广播使用, 每个广播设备的唯一ID - Protocol string `json:"protocol,omitempty"` // 转发流协议, gb_cascaded/gb_talk/gb_gateway + Protocol int `json:"protocol,omitempty"` // 拉流协议, @See stack.TransStreamRtmp Dialog *common.RequestWrapper `json:"dialog,omitempty"` CallID string `json:"call_id,omitempty"` ServerAddr string `json:"server_addr,omitempty"` // 级联上级地址 CreateTime int64 `json:"create_time"` SetupType common.SetupType // 流转发类型 + RemoteAddr string } func (d *SinkModel) TableName() string { @@ -61,19 +61,13 @@ func (d *daoSink) QueryForwardSinks(stream common.StreamID) (map[string]*SinkMod return sinkMap, nil } -func (d *daoSink) SaveForwardSink(stream common.StreamID, sink *SinkModel) error { - var old SinkModel - tx := db.Select("id").Where("sink_id =?", sink.SinkID).Take(&old) - if tx.Error == nil { - return fmt.Errorf("sink already exists") - } - +func (d *daoSink) SaveForwardSink(sink *SinkModel) error { return DBTransaction(func(tx *gorm.DB) error { - return tx.Save(sink).Error + return tx.Create(sink).Error }) } -func (d *daoSink) DeleteForwardSink(stream common.StreamID, sinkId string) (*SinkModel, error) { +func (d *daoSink) DeleteForwardSink(sinkId string) (*SinkModel, error) { var sink SinkModel tx := db.Where("sink_id =?", sinkId).Take(&sink) if tx.Error != nil { @@ -159,3 +153,40 @@ func (d *daoSink) DeleteForwardSinksByServerAddr(addr string) ([]*SinkModel, err return tx.Where("server_addr =?", addr).Unscoped().Delete(&SinkModel{}).Error }) } + +func (d *daoSink) QuerySinkCountByProtocol(protocol int) (int, error) { + var count int64 + tx := db.Model(&SinkModel{}).Where("protocol = ?", protocol).Count(&count) + if tx.Error != nil { + return 0, tx.Error + } + return int(count), nil +} + +func (d *daoSink) Count() (int, error) { + var count int64 + tx := db.Model(&SinkModel{}).Count(&count) + if tx.Error != nil { + return 0, tx.Error + } + return int(count), nil +} + +// QueryStreamIds 指定多个protocol查询streamIds +func (d *daoSink) QueryStreamIds(protocols []int, page, size int) ([]string, int, error) { + // 查询总数 + var total int64 + tx := db.Model(&SinkModel{}).Where("protocol in ?", protocols).Group("stream_id").Count(&total) + if tx.Error != nil { + return nil, 0, tx.Error + } + + var streamIds []string + // 分页查询 + tx = db.Model(&SinkModel{}).Select("stream_id").Where("protocol in ?", protocols).Group("stream_id").Offset((page - 1) * size).Limit(size).Find(&streamIds) + if tx.Error != nil { + return nil, 0, tx.Error + } + + return streamIds, int(total), nil +} diff --git a/dao/stream.go b/dao/stream.go index 7710867..85b0ed6 100644 --- a/dao/stream.go +++ b/dao/stream.go @@ -9,16 +9,17 @@ import ( type StreamModel struct { GBModel - DeviceID string `gorm:"index"` // 下级设备ID, 统计某个设备的所有流/1078设备为sim number - ChannelID string `gorm:"index"` // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number - StreamID common.StreamID `json:"stream_id" gorm:"index,unique"` // 流ID - Protocol int `json:"protocol,omitempty"` // 推流协议, rtmp/28181/1078/gb_talk - Dialog *common.RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话 - SinkCount int32 `json:"sink_count"` // 拉流端计数(包含级联转发) - SetupType common.SetupType - CallID string `json:"call_id" gorm:"index"` - Urls []string `gorm:"serializer:json"` // 从流媒体服务器返回的拉流地址 - Name string `gorm:"index"` // 视频通道名 + DeviceID string `gorm:"index"` // 下级设备ID, 统计某个设备的所有流/1078设备为sim number + ChannelID string `gorm:"index"` // 下级通道ID, 统计某个设备下的某个通道的所有流/1078设备为 channel number + StreamID common.StreamID `json:"stream_id" gorm:"index,unique"` // 流ID + Protocol int `json:"protocol,omitempty"` // 推流协议, @See stack.SourceTypeRtmp + StreamType string // play/playback/download + Dialog *common.RequestWrapper `json:"dialog,omitempty"` // 国标流的SipCall会话 + SetupType common.SetupType // 取流方式 + CallID string `json:"call_id" gorm:"index"` + Urls []string `gorm:"serializer:json"` // 从流媒体服务器返回的拉流地址 + Name string `gorm:"index"` // 视频通道名 + RemoteAddr string } func (s *StreamModel) TableName() string { @@ -156,7 +157,7 @@ func (d *daoStream) DeleteStreamByDeviceID(deviceID string) ([]*StreamModel, err return streams, nil } -func (d *daoStream) QueryStreams(keyword string, page, size int) ([]*StreamModel, int, error) { +func (d *daoStream) QueryStreams(keyword string, page, size int, streamType string) ([]*StreamModel, int, error) { var streams []*StreamModel var total int64 @@ -164,6 +165,9 @@ func (d *daoStream) QueryStreams(keyword string, page, size int) ([]*StreamModel if keyword != "" { tx.Where("name like ? or device_id like ? or channel_id like ?", "%"+keyword+"%", "%"+keyword+"%", "%"+keyword+"%") } + if streamType != "" { + tx.Where("stream_type = ?", streamType) + } if tx = tx.Find(&streams); tx.Error != nil { return nil, 0, tx.Error @@ -174,9 +178,40 @@ func (d *daoStream) QueryStreams(keyword string, page, size int) ([]*StreamModel tx.Where("name like ? or device_id like ? or channel_id like ?", "%"+keyword+"%", "%"+keyword+"%", "%"+keyword+"%") } + if streamType != "" { + tx.Where("stream_type = ?", streamType) + } + if tx = tx.Count(&total); tx.Error != nil { return nil, 0, tx.Error } return streams, int(total), nil } + +// QueryStreamsByIds 通过ids查询stream列表 +func (d *daoStream) QueryStreamsByIds(ids []string) ([]*StreamModel, error) { + var streams []*StreamModel + tx := db.Where("stream_id in ?", ids).Find(&streams) + if tx.Error != nil { + return nil, tx.Error + } + return streams, nil +} + +// QueryStreamCountByType 根据streamType计数stream +func (d *daoStream) QueryStreamCountByType(streamType string) (int, error) { + var count int64 + tx := db.Model(&StreamModel{}).Where("stream_type = ?", streamType).Count(&count) + if tx.Error != nil { + return 0, tx.Error + } + return int(count), nil +} + +// Count 返回表记录数 +func (d *daoStream) Count() (int, error) { + var count int64 + db.Model(&StreamModel{}).Count(&count) + return int(count), nil +} diff --git a/stack/jt_device.go b/stack/jt_device.go index dc5566c..b3a1604 100644 --- a/stack/jt_device.go +++ b/stack/jt_device.go @@ -48,7 +48,7 @@ func (g *JTDevice) OnInvite(request sip.Request, user string) sip.Response { sink := &dao.SinkModel{ StreamID: streamId, ServerAddr: g.ServerAddr, - Protocol: "gb_gateway"} + Protocol: TransStreamGBGateway} response, err := AddForwardSink(TransStreamGBGateway, request, user, &Sink{sink}, streamId, gbsdp, inviteType, "96 PS/90000") if err != nil { diff --git a/stack/live.go b/stack/live.go index 945e382..7be7d81 100644 --- a/stack/live.go +++ b/stack/live.go @@ -23,11 +23,12 @@ func (d *Device) StartStream(inviteType common.InviteType, streamId common.Strea } stream := &dao.StreamModel{ - DeviceID: streamId.DeviceID(), - ChannelID: streamId.ChannelID(), - StreamID: streamId, - Protocol: SourceType28181, - Name: channel.Name, + DeviceID: streamId.DeviceID(), + ChannelID: streamId.ChannelID(), + StreamID: streamId, + Protocol: SourceType28181, + StreamType: string(inviteType), + Name: channel.Name, } // 先添加占位置, 防止重复请求 diff --git a/stack/platform.go b/stack/platform.go index 89b57f8..58bf087 100644 --- a/stack/platform.go +++ b/stack/platform.go @@ -88,7 +88,7 @@ func (g *Platform) OnInvite(request sip.Request, user string) sip.Response { sink := &dao.SinkModel{ StreamID: streamId, ServerAddr: g.ServerAddr, - Protocol: "gb_cascaded"} + Protocol: TransStreamGBCascaded} // 添加转发sink到流媒体服务器 response, err := AddForwardSink(TransStreamGBCascaded, request, user, &Sink{sink}, streamId, gbSdp, inviteType, "96 PS/90000") diff --git a/stack/sink.go b/stack/sink.go index 4d9a913..4dce8f3 100644 --- a/stack/sink.go +++ b/stack/sink.go @@ -121,7 +121,7 @@ func AddForwardSink(forwardType int, request sip.Request, user string, sink *Sin sink.SetDialog(CreateDialogRequestFromAnswer(response, true, request.Source())) - if err = dao.Sink.SaveForwardSink(streamId, sink.SinkModel); err != nil { + if err = dao.Sink.SaveForwardSink(sink.SinkModel); err != nil { log.Sugar.Errorf("保存sink到数据库失败, stream: %s sink: %s err: %s", streamId, sink.SinkID, err.Error()) } diff --git a/stack/stream.go b/stack/stream.go index cf76856..0da3a98 100644 --- a/stack/stream.go +++ b/stack/stream.go @@ -7,7 +7,6 @@ import ( "gb-cms/log" "github.com/ghettovoice/gosip/sip" "github.com/ghettovoice/gosip/sip/parser" - "sync/atomic" ) type Stream struct { @@ -59,25 +58,6 @@ func (s *Stream) UnmarshalJSON(data []byte) error { return nil } -func (s *Stream) GetSinkCount() int32 { - return atomic.LoadInt32(&s.SinkCount) -} - -func (s *Stream) IncreaseSinkCount() int32 { - value := atomic.AddInt32(&s.SinkCount, 1) - //Sugar.Infof("拉流计数: %d stream: %s ", value, s.StreamID) - // 启动协程去更新拉流计数, 可能会不一致 - //go Stream.SaveStream(s) - return value -} - -func (s *Stream) DecreaseSinkCount() int32 { - value := atomic.AddInt32(&s.SinkCount, -1) - //Sugar.Infof("拉流计数: %d stream: %s ", value, s.StreamID) - //go Stream.SaveStream(s) - return value -} - func (s *Stream) Close(bye, ms bool) { // 断开与推流通道的sip会话 if bye { diff --git a/stats.go b/stats.go index dda5bdf..9763448 100644 --- a/stats.go +++ b/stats.go @@ -36,12 +36,7 @@ const ( func init() { topStats = &TopStats{ - Load: []struct { - Time string `json:"time"` - Load float64 `json:"load"` - Serial string `json:"serial"` - Name string `json:"name"` - }{ + Load: []*StreamStats{ { Time: time.Now().Format("2006-01-02 15:04:05"), Load: 0, @@ -83,18 +78,20 @@ func init() { } } +type StreamStats struct { + Time string `json:"time"` + Load float64 `json:"load"` + Serial string `json:"serial"` + Name string `json:"name"` +} + type TopStats struct { CPU []struct { Time string `json:"time"` Use float64 `json:"use"` } `json:"cpuData"` - Load []struct { - Time string `json:"time"` - Load float64 `json:"load"` - Serial string `json:"serial"` - Name string `json:"name"` - } `json:"loadData"` + Load []*StreamStats `json:"loadData"` Mem []struct { Time string `json:"time"` @@ -373,6 +370,38 @@ func StartStats() { } } + // 统计流 + var liveStreamCount, playbackStreamCount, playStreamCount, recordStreamCount, h265StreamCount, cascadeStreamCount int + streamCount, err := dao.Stream.Count() + if streamCount > 0 { + liveStreamCount, _ = dao.Stream.QueryStreamCountByType("play") + playbackStreamCount, _ = dao.Stream.QueryStreamCountByType("playback") + + if i, _ := dao.Sink.Count(); i > 0 { + // 查询级联 + cascadeStreamCount, _ = dao.Sink.QuerySinkCountByProtocol(stack.TransStreamGBCascaded) + playStreamCount = i - cascadeStreamCount + } + } + + for _, s := range topStats.Load { + s.Time = now + if "直播" == s.Name { + s.Load = float64(liveStreamCount) + } else if "回放" == s.Name { + s.Load = float64(playbackStreamCount) + } else if "播放" == s.Name { + s.Load = float64(playStreamCount) + } else if "录像" == s.Name { + s.Load = float64(recordStreamCount) + } else if "H265" == s.Name { + s.Load = float64(h265StreamCount) + } else if "级联" == s.Name { + s.Load = float64(cascadeStreamCount) + } + } + + // json序列化 marshal, err := json.Marshal(common.MalformedRequest{ Code: http.StatusOK, Msg: "Success",