增加查询拉流列表接口

This commit is contained in:
yangjiechina
2024-10-29 19:57:52 +08:00
parent ec707c8dc1
commit 6841c4725f
11 changed files with 108 additions and 31 deletions

34
api.go
View File

@@ -376,7 +376,7 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) {
sources := stream.SourceManager.All() sources := stream.SourceManager.All()
type SourceDetails struct { type SourceDetails struct {
ID string `json:"id,omitempty"` ID string `json:"id"`
Protocol string `json:"protocol"` // 推流协议 Protocol string `json:"protocol"` // 推流协议
Time time.Time `json:"time"` // 推流时间 Time time.Time `json:"time"` // 推流时间
SinkCount int `json:"sink_count"` // 播放端计数 SinkCount int `json:"sink_count"` // 播放端计数
@@ -394,7 +394,7 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) {
details = append(details, SourceDetails{ details = append(details, SourceDetails{
ID: source.GetID(), ID: source.GetID(),
Protocol: source.GetType().ToString(), Protocol: source.GetType().String(),
Time: source.CreateTime(), Time: source.CreateTime(),
SinkCount: source.SinkCount(), SinkCount: source.SinkCount(),
Bitrate: "", // 后续开发 Bitrate: "", // 后续开发
@@ -406,7 +406,33 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) {
} }
func (api *ApiServer) OnSinkList(v *IDS, w http.ResponseWriter, r *http.Request) { func (api *ApiServer) OnSinkList(v *IDS, w http.ResponseWriter, r *http.Request) {
source := stream.SourceManager.Find(v.Source)
if source == nil {
httpResponseOK(w, nil)
return
}
type SinkDetails struct {
ID string `json:"id"`
Protocol string `json:"protocol"` // 拉流协议
Time time.Time `json:"time"` // 拉流时间
Bitrate string `json:"bitrate"` // 码率统计
Tracks []string `json:"tracks"` // 每路流编码器ID
}
var details []SinkDetails
sinks := source.Sinks()
for _, sink := range sinks {
details = append(details,
SinkDetails{
ID: stream.SinkId2String(sink.GetID()),
Protocol: sink.GetProtocol().String(),
Time: sink.CreateTime(),
},
)
}
httpResponseOK(w, details)
} }
func (api *ApiServer) OnSourceClose(v *IDS, w http.ResponseWriter, r *http.Request) { func (api *ApiServer) OnSourceClose(v *IDS, w http.ResponseWriter, r *http.Request) {
@@ -433,7 +459,9 @@ func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request
} }
if source := stream.SourceManager.Find(v.Source); source != nil { if source := stream.SourceManager.Find(v.Source); source != nil {
source.RemoveSinkWithID(sinkId) if sink := source.FindSink(sinkId); sink != nil {
sink.Close()
}
} else { } else {
log.Sugar.Warnf("Source with ID %s does not exist.", v.Source) log.Sugar.Warnf("Source with ID %s does not exist.", v.Source)
} }

View File

@@ -20,7 +20,7 @@ const (
type MalformedRequest struct { type MalformedRequest struct {
Code int `json:"code"` Code int `json:"code"`
Msg string `json:"msg"` Msg string `json:"msg"`
Data interface{} `json:"data,omitempty"` Data interface{} `json:"data"`
} }
func (mr *MalformedRequest) Error() string { func (mr *MalformedRequest) Error() string {

View File

@@ -199,7 +199,7 @@ func DumpStream2File(sourceType SourceType, conn net.Conn, data []byte) {
return return
} }
path := fmt.Sprintf("dump/%s-%s", sourceType.ToString(), conn.RemoteAddr().String()) path := fmt.Sprintf("dump/%s-%s", sourceType.String(), conn.RemoteAddr().String())
path = strings.ReplaceAll(path, ":", ".") path = strings.ReplaceAll(path, ":", ".")
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666)

View File

@@ -71,11 +71,11 @@ func Hook(event HookEvent, params string, body interface{}) (*http.Response, err
} }
func NewHookPlayEventInfo(sink Sink) eventInfo { func NewHookPlayEventInfo(sink Sink) eventInfo {
return eventInfo{Stream: sink.GetSourceID(), Protocol: sink.GetProtocol().ToString(), RemoteAddr: sink.String()} return eventInfo{Stream: sink.GetSourceID(), Protocol: sink.GetProtocol().String(), RemoteAddr: sink.String()}
} }
func NewHookPublishEventInfo(source Source) eventInfo { func NewHookPublishEventInfo(source Source) eventInfo {
return eventInfo{Stream: source.GetID(), Protocol: source.GetType().ToString(), RemoteAddr: source.RemoteAddr()} return eventInfo{Stream: source.GetID(), Protocol: source.GetType().String(), RemoteAddr: source.RemoteAddr()}
} }
func NewRecordEventInfo(source Source, path string) interface{} { func NewRecordEventInfo(source Source, path string) interface{} {

View File

@@ -16,7 +16,7 @@ func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookSta
if AppConfig.Hooks.IsEnableOnPlay() { if AppConfig.Hooks.IsEnableOnPlay() {
hook, err := Hook(HookEventPlay, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink)) hook, err := Hook(HookEventPlay, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink))
if err != nil { if err != nil {
log.Sugar.Errorf("播放事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID()) log.Sugar.Errorf("播放事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID())
return hook, utils.HookStateFailure return hook, utils.HookStateFailure
} }
@@ -27,7 +27,7 @@ func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookSta
sink.SetReady(ok) sink.SetReady(ok)
source := SourceManager.Find(sink.GetSourceID()) source := SourceManager.Find(sink.GetSourceID())
if source == nil { if source == nil {
log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID()) log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID())
{ {
sink.Lock() sink.Lock()
@@ -54,7 +54,7 @@ func HookPlayDoneEvent(sink Sink) (*http.Response, bool) {
if AppConfig.Hooks.IsEnableOnPlayDone() { if AppConfig.Hooks.IsEnableOnPlayDone() {
hook, err := Hook(HookEventPlayDone, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink)) hook, err := Hook(HookEventPlayDone, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink))
if err != nil { if err != nil {
log.Sugar.Errorf("播放结束事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID()) log.Sugar.Errorf("播放结束事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID())
return hook, false return hook, false
} }

View File

@@ -37,7 +37,7 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS
urls := GetStreamPlayUrls(source.GetID()) urls := GetStreamPlayUrls(source.GetID())
indent, _ := json.MarshalIndent(urls, "", "\t") indent, _ := json.MarshalIndent(urls, "", "\t")
log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.GetType().ToString(), source.GetID(), indent) log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.GetType().String(), source.GetID(), indent)
return response, utils.HookStateOK return response, utils.HookStateOK
} }

View File

@@ -6,6 +6,7 @@ import (
"net" "net"
"net/url" "net/url"
"sync" "sync"
"time"
) )
// Sink 对拉流端的封装 // Sink 对拉流端的封装
@@ -78,6 +79,10 @@ type Sink interface {
IsReady() bool IsReady() bool
SetReady(ok bool) SetReady(ok bool)
CreateTime() time.Time
SetCreateTime(time time.Time)
} }
type BaseSink struct { type BaseSink struct {
@@ -99,6 +104,7 @@ type BaseSink struct {
SentPacketCount int // 发包计数 SentPacketCount int // 发包计数
Ready bool Ready bool
createTime time.Time
} }
func (s *BaseSink) GetID() SinkID { func (s *BaseSink) GetID() SinkID {
@@ -218,7 +224,7 @@ func (s *BaseSink) Close() {
} }
func (s *BaseSink) String() string { func (s *BaseSink) String() string {
return fmt.Sprintf("%s-%v source:%s", s.GetProtocol().ToString(), s.ID, s.SourceID) return fmt.Sprintf("%s-%v source:%s", s.GetProtocol().String(), s.ID, s.SourceID)
} }
func (s *BaseSink) RemoteAddr() string { func (s *BaseSink) RemoteAddr() string {
@@ -271,4 +277,15 @@ func (s *BaseSink) IsReady() bool {
func (s *BaseSink) SetReady(ok bool) { func (s *BaseSink) SetReady(ok bool) {
s.Ready = ok s.Ready = ok
if ok {
s.SetCreateTime(time.Now())
}
}
func (s *BaseSink) CreateTime() time.Time {
return s.createTime
}
func (s *BaseSink) SetCreateTime(time time.Time) {
s.createTime = time
} }

View File

@@ -7,6 +7,7 @@ import (
"github.com/lkmio/lkm/log" "github.com/lkmio/lkm/log"
"net" "net"
"net/url" "net/url"
"sync"
"time" "time"
"github.com/lkmio/avformat/stream" "github.com/lkmio/avformat/stream"
@@ -44,6 +45,8 @@ type Source interface {
RemoveSinkWithID(id SinkID) RemoveSinkWithID(id SinkID)
FindSink(id SinkID) Sink
SetState(state SessionState) SetState(state SessionState)
// Close 关闭Source // Close 关闭Source
@@ -116,7 +119,7 @@ type Source interface {
SetCreateTime(time time.Time) SetCreateTime(time time.Time)
PlaySink(sin Sink) Sinks() []Sink
} }
type PublishSource struct { type PublishSource struct {
@@ -145,7 +148,7 @@ type PublishSource struct {
idleTimer *time.Timer // 拉流空闲计时器 idleTimer *time.Timer // 拉流空闲计时器
TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink
Sinks map[SinkID]Sink // 保存所有Sink sinks map[SinkID]Sink // 保存所有Sink
TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
streamPipe chan []byte // 推流数据管道 streamPipe chan []byte // 推流数据管道
@@ -212,7 +215,7 @@ func (s *PublishSource) Init(receiveQueueSize int) {
s.mainContextEvents = make(chan func(), 128) s.mainContextEvents = make(chan func(), 128)
s.TransStreams = make(map[TransStreamID]TransStream, 10) s.TransStreams = make(map[TransStreamID]TransStream, 10)
s.Sinks = make(map[SinkID]Sink, 128) s.sinks = make(map[SinkID]Sink, 128)
s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1)
} }
@@ -293,11 +296,11 @@ func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils
} }
func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) { func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) {
log.Sugar.Debugf("创建%s-stream source: %s", protocol.ToString(), s.ID) log.Sugar.Debugf("创建%s-stream source: %s", protocol.String(), s.ID)
transStream, err := CreateTransStream(s, protocol, streams) transStream, err := CreateTransStream(s, protocol, streams)
if err != nil { if err != nil {
log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.ID) log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID)
return nil, err return nil, err
} }
@@ -473,7 +476,7 @@ func (s *PublishSource) doAddSink(sink Sink) bool {
log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID)
} }
s.Sinks[sink.GetID()] = sink s.sinks[sink.GetID()] = sink
s.TransStreamSinks[transStreamId][sink.GetID()] = sink s.TransStreamSinks[transStreamId][sink.GetID()] = sink
// 新建传输流,发送已经缓存的音视频帧 // 新建传输流,发送已经缓存的音视频帧
@@ -504,16 +507,34 @@ func (s *PublishSource) RemoveSink(sink Sink) {
func (s *PublishSource) RemoveSinkWithID(id SinkID) { func (s *PublishSource) RemoveSinkWithID(id SinkID) {
s.PostEvent(func() { s.PostEvent(func() {
sink, ok := s.Sinks[id] sink, ok := s.sinks[id]
if ok { if ok {
s.doRemoveSink(sink) s.doRemoveSink(sink)
} }
}) })
} }
func (s *PublishSource) FindSink(id SinkID) Sink {
var result Sink
group := sync.WaitGroup{}
group.Add(1)
s.PostEvent(func() {
sink, ok := s.sinks[id]
if ok {
result = sink
}
group.Done()
})
group.Wait()
return result
}
func (s *PublishSource) doRemoveSink(sink Sink) bool { func (s *PublishSource) doRemoveSink(sink Sink) bool {
transStreamSinks := s.TransStreamSinks[sink.GetTransStreamID()] transStreamSinks := s.TransStreamSinks[sink.GetTransStreamID()]
delete(s.Sinks, sink.GetID()) delete(s.sinks, sink.GetID())
delete(transStreamSinks, sink.GetID()) delete(transStreamSinks, sink.GetID())
s.sinkCount-- s.sinkCount--
@@ -599,7 +620,7 @@ func (s *PublishSource) DoClose() {
} }
// 将所有sink添加到等待队列 // 将所有sink添加到等待队列
for _, sink := range s.Sinks { for _, sink := range s.sinks {
transStreamID := sink.GetTransStreamID() transStreamID := sink.GetTransStreamID()
sink.SetTransStreamID(0) sink.SetTransStreamID(0)
if s.recordSink == sink { if s.recordSink == sink {
@@ -625,7 +646,7 @@ func (s *PublishSource) DoClose() {
} }
s.TransStreams = nil s.TransStreams = nil
s.Sinks = nil s.sinks = nil
s.TransStreamSinks = nil s.TransStreamSinks = nil
// 异步hook // 异步hook
@@ -786,7 +807,7 @@ func (s *PublishSource) RemoteAddr() string {
} }
func (s *PublishSource) String() string { func (s *PublishSource) String() string {
return fmt.Sprintf("source: %s type: %s conn: %s ", s.ID, s.Type.ToString(), s.RemoteAddr()) return fmt.Sprintf("source: %s type: %s conn: %s ", s.ID, s.Type.String(), s.RemoteAddr())
} }
func (s *PublishSource) State() SessionState { func (s *PublishSource) State() SessionState {
@@ -813,8 +834,19 @@ func (s *PublishSource) SetCreateTime(time time.Time) {
s.createTime = time s.createTime = time
} }
func (s *PublishSource) PlaySink(sink Sink) { func (s *PublishSource) Sinks() []Sink {
s.PostEvent(func() { var sinks []Sink
group := sync.WaitGroup{}
group.Add(1)
s.PostEvent(func() {
for _, sink := range s.sinks {
sinks = append(sinks, sink)
}
group.Done()
}) })
group.Wait()
return sinks
} }

View File

@@ -45,7 +45,7 @@ const (
SessionStateClosed = SessionState(7) // 关闭状态 SessionStateClosed = SessionState(7) // 关闭状态
) )
func (s SourceType) ToString() string { func (s SourceType) String() string {
if SourceTypeRtmp == s { if SourceTypeRtmp == s {
return "rtmp" return "rtmp"
} else if SourceType28181 == s { } else if SourceType28181 == s {
@@ -57,7 +57,7 @@ func (s SourceType) ToString() string {
panic(fmt.Sprintf("unknown source type %d", s)) panic(fmt.Sprintf("unknown source type %d", s))
} }
func (p TransStreamProtocol) ToString() string { func (p TransStreamProtocol) String() string {
if TransStreamRtmp == p { if TransStreamRtmp == p {
return "rtmp" return "rtmp"
} else if TransStreamFlv == p { } else if TransStreamFlv == p {

View File

@@ -21,7 +21,7 @@ func init() {
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory) { func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory) {
_, ok := transStreamFactories[protocol] _, ok := transStreamFactories[protocol]
if ok { if ok {
panic(fmt.Sprintf("%s has been registered", protocol.ToString())) panic(fmt.Sprintf("%s has been registered", protocol.String()))
} }
transStreamFactories[protocol] = streamFunc transStreamFactories[protocol] = streamFunc
@@ -30,7 +30,7 @@ func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransSt
func FindTransStreamFactory(protocol TransStreamProtocol) (TransStreamFactory, error) { func FindTransStreamFactory(protocol TransStreamProtocol) (TransStreamFactory, error) {
f, ok := transStreamFactories[protocol] f, ok := transStreamFactories[protocol]
if !ok { if !ok {
return nil, fmt.Errorf("unknown protocol %s", protocol.ToString()) return nil, fmt.Errorf("unknown protocol %s", protocol.String())
} }
return f, nil return f, nil

View File

@@ -18,7 +18,7 @@ type StreamServer[T any] struct {
} }
func (s *StreamServer[T]) OnConnected(conn net.Conn) []byte { func (s *StreamServer[T]) OnConnected(conn net.Conn) []byte {
log.Sugar.Debugf("%s连接 conn:%s", s.SourceType.ToString(), conn.RemoteAddr().String()) log.Sugar.Debugf("%s连接 conn:%s", s.SourceType.String(), conn.RemoteAddr().String())
conn.(*transport.Conn).Data = s.Handler.OnNewSession(conn) conn.(*transport.Conn).Data = s.Handler.OnNewSession(conn)
return nil return nil
} }
@@ -32,7 +32,7 @@ func (s *StreamServer[T]) OnPacket(conn net.Conn, data []byte) []byte {
} }
func (s *StreamServer[T]) OnDisConnected(conn net.Conn, err error) { func (s *StreamServer[T]) OnDisConnected(conn net.Conn, err error) {
log.Sugar.Debugf("%s断开连接 conn:%s", s.SourceType.ToString(), conn.RemoteAddr().String()) log.Sugar.Debugf("%s断开连接 conn:%s", s.SourceType.String(), conn.RemoteAddr().String())
t := conn.(*transport.Conn) t := conn.(*transport.Conn)
if t.Data != nil { if t.Data != nil {