diff --git a/api.go b/api.go index 50df79e..d8d37d9 100644 --- a/api.go +++ b/api.go @@ -376,7 +376,7 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) { sources := stream.SourceManager.All() type SourceDetails struct { - ID string `json:"id,omitempty"` + ID string `json:"id"` Protocol string `json:"protocol"` // 推流协议 Time time.Time `json:"time"` // 推流时间 SinkCount int `json:"sink_count"` // 播放端计数 @@ -394,7 +394,7 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) { details = append(details, SourceDetails{ ID: source.GetID(), - Protocol: source.GetType().ToString(), + Protocol: source.GetType().String(), Time: source.CreateTime(), SinkCount: source.SinkCount(), Bitrate: "", // 后续开发 @@ -406,7 +406,33 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) { } func (api *ApiServer) OnSinkList(v *IDS, w http.ResponseWriter, r *http.Request) { + source := stream.SourceManager.Find(v.Source) + if source == nil { + httpResponseOK(w, nil) + return + } + type SinkDetails struct { + ID string `json:"id"` + Protocol string `json:"protocol"` // 拉流协议 + Time time.Time `json:"time"` // 拉流时间 + Bitrate string `json:"bitrate"` // 码率统计 + Tracks []string `json:"tracks"` // 每路流编码器ID + } + + var details []SinkDetails + sinks := source.Sinks() + for _, sink := range sinks { + details = append(details, + SinkDetails{ + ID: stream.SinkId2String(sink.GetID()), + Protocol: sink.GetProtocol().String(), + Time: sink.CreateTime(), + }, + ) + } + + httpResponseOK(w, details) } func (api *ApiServer) OnSourceClose(v *IDS, w http.ResponseWriter, r *http.Request) { @@ -433,7 +459,9 @@ func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request } if source := stream.SourceManager.Find(v.Source); source != nil { - source.RemoveSinkWithID(sinkId) + if sink := source.FindSink(sinkId); sink != nil { + sink.Close() + } } else { log.Sugar.Warnf("Source with ID %s does not exist.", v.Source) } diff --git a/http_json_body_decode.go b/http_json_body_decode.go index eb7a818..b4849df 100644 --- a/http_json_body_decode.go +++ b/http_json_body_decode.go @@ -20,7 +20,7 @@ const ( type MalformedRequest struct { Code int `json:"code"` Msg string `json:"msg"` - Data interface{} `json:"data,omitempty"` + Data interface{} `json:"data"` } func (mr *MalformedRequest) Error() string { diff --git a/stream/config.go b/stream/config.go index 4786770..b901f68 100644 --- a/stream/config.go +++ b/stream/config.go @@ -199,7 +199,7 @@ func DumpStream2File(sourceType SourceType, conn net.Conn, data []byte) { return } - path := fmt.Sprintf("dump/%s-%s", sourceType.ToString(), conn.RemoteAddr().String()) + path := fmt.Sprintf("dump/%s-%s", sourceType.String(), conn.RemoteAddr().String()) path = strings.ReplaceAll(path, ":", ".") file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) diff --git a/stream/hook.go b/stream/hook.go index 11d0d74..58a5adf 100644 --- a/stream/hook.go +++ b/stream/hook.go @@ -71,11 +71,11 @@ func Hook(event HookEvent, params string, body interface{}) (*http.Response, err } func NewHookPlayEventInfo(sink Sink) eventInfo { - return eventInfo{Stream: sink.GetSourceID(), Protocol: sink.GetProtocol().ToString(), RemoteAddr: sink.String()} + return eventInfo{Stream: sink.GetSourceID(), Protocol: sink.GetProtocol().String(), RemoteAddr: sink.String()} } func NewHookPublishEventInfo(source Source) eventInfo { - return eventInfo{Stream: source.GetID(), Protocol: source.GetType().ToString(), RemoteAddr: source.RemoteAddr()} + return eventInfo{Stream: source.GetID(), Protocol: source.GetType().String(), RemoteAddr: source.RemoteAddr()} } func NewRecordEventInfo(source Source, path string) interface{} { diff --git a/stream/hook_sink.go b/stream/hook_sink.go index 9e7a698..618451c 100644 --- a/stream/hook_sink.go +++ b/stream/hook_sink.go @@ -16,7 +16,7 @@ func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookSta if AppConfig.Hooks.IsEnableOnPlay() { hook, err := Hook(HookEventPlay, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink)) if err != nil { - log.Sugar.Errorf("播放事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID()) + log.Sugar.Errorf("播放事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID()) return hook, utils.HookStateFailure } @@ -27,7 +27,7 @@ func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookSta sink.SetReady(ok) source := SourceManager.Find(sink.GetSourceID()) if source == nil { - log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID()) + log.Sugar.Infof("添加sink到等待队列 sink:%s-%v source:%s", sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID()) { sink.Lock() @@ -54,7 +54,7 @@ func HookPlayDoneEvent(sink Sink) (*http.Response, bool) { if AppConfig.Hooks.IsEnableOnPlayDone() { hook, err := Hook(HookEventPlayDone, sink.UrlValues().Encode(), NewHookPlayEventInfo(sink)) if err != nil { - log.Sugar.Errorf("播放结束事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().ToString(), sink.GetID(), sink.GetSourceID()) + log.Sugar.Errorf("播放结束事件-通知失败 err:%s sink:%s-%v source:%s", err.Error(), sink.GetProtocol().String(), sink.GetID(), sink.GetSourceID()) return hook, false } diff --git a/stream/hook_source.go b/stream/hook_source.go index 81a00ee..1bce324 100644 --- a/stream/hook_source.go +++ b/stream/hook_source.go @@ -37,7 +37,7 @@ func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookS urls := GetStreamPlayUrls(source.GetID()) indent, _ := json.MarshalIndent(urls, "", "\t") - log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.GetType().ToString(), source.GetID(), indent) + log.Sugar.Infof("%s准备推流 source:%s 拉流地址:\r\n%s", source.GetType().String(), source.GetID(), indent) return response, utils.HookStateOK } diff --git a/stream/sink.go b/stream/sink.go index 5bc4074..b82fe7c 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -6,6 +6,7 @@ import ( "net" "net/url" "sync" + "time" ) // Sink 对拉流端的封装 @@ -78,6 +79,10 @@ type Sink interface { IsReady() bool SetReady(ok bool) + + CreateTime() time.Time + + SetCreateTime(time time.Time) } type BaseSink struct { @@ -99,6 +104,7 @@ type BaseSink struct { SentPacketCount int // 发包计数 Ready bool + createTime time.Time } func (s *BaseSink) GetID() SinkID { @@ -218,7 +224,7 @@ func (s *BaseSink) Close() { } func (s *BaseSink) String() string { - return fmt.Sprintf("%s-%v source:%s", s.GetProtocol().ToString(), s.ID, s.SourceID) + return fmt.Sprintf("%s-%v source:%s", s.GetProtocol().String(), s.ID, s.SourceID) } func (s *BaseSink) RemoteAddr() string { @@ -271,4 +277,15 @@ func (s *BaseSink) IsReady() bool { func (s *BaseSink) SetReady(ok bool) { s.Ready = ok + if ok { + s.SetCreateTime(time.Now()) + } +} + +func (s *BaseSink) CreateTime() time.Time { + return s.createTime +} + +func (s *BaseSink) SetCreateTime(time time.Time) { + s.createTime = time } diff --git a/stream/source.go b/stream/source.go index 5ae1335..4e08155 100644 --- a/stream/source.go +++ b/stream/source.go @@ -7,6 +7,7 @@ import ( "github.com/lkmio/lkm/log" "net" "net/url" + "sync" "time" "github.com/lkmio/avformat/stream" @@ -44,6 +45,8 @@ type Source interface { RemoveSinkWithID(id SinkID) + FindSink(id SinkID) Sink + SetState(state SessionState) // Close 关闭Source @@ -116,7 +119,7 @@ type Source interface { SetCreateTime(time time.Time) - PlaySink(sin Sink) + Sinks() []Sink } type PublishSource struct { @@ -145,7 +148,7 @@ type PublishSource struct { idleTimer *time.Timer // 拉流空闲计时器 TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink - Sinks map[SinkID]Sink // 保存所有Sink + sinks map[SinkID]Sink // 保存所有Sink TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink streamPipe chan []byte // 推流数据管道 @@ -212,7 +215,7 @@ func (s *PublishSource) Init(receiveQueueSize int) { s.mainContextEvents = make(chan func(), 128) s.TransStreams = make(map[TransStreamID]TransStream, 10) - s.Sinks = make(map[SinkID]Sink, 128) + s.sinks = make(map[SinkID]Sink, 128) s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) } @@ -293,11 +296,11 @@ func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils } func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, streams []utils.AVStream) (TransStream, error) { - log.Sugar.Debugf("创建%s-stream source: %s", protocol.ToString(), s.ID) + log.Sugar.Debugf("创建%s-stream source: %s", protocol.String(), s.ID) transStream, err := CreateTransStream(s, protocol, streams) if err != nil { - log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.ID) + log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID) return nil, err } @@ -473,7 +476,7 @@ func (s *PublishSource) doAddSink(sink Sink) bool { log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) } - s.Sinks[sink.GetID()] = sink + s.sinks[sink.GetID()] = sink s.TransStreamSinks[transStreamId][sink.GetID()] = sink // 新建传输流,发送已经缓存的音视频帧 @@ -504,16 +507,34 @@ func (s *PublishSource) RemoveSink(sink Sink) { func (s *PublishSource) RemoveSinkWithID(id SinkID) { s.PostEvent(func() { - sink, ok := s.Sinks[id] + sink, ok := s.sinks[id] if ok { s.doRemoveSink(sink) } }) } +func (s *PublishSource) FindSink(id SinkID) Sink { + var result Sink + group := sync.WaitGroup{} + group.Add(1) + + s.PostEvent(func() { + sink, ok := s.sinks[id] + if ok { + result = sink + } + + group.Done() + }) + + group.Wait() + return result +} + func (s *PublishSource) doRemoveSink(sink Sink) bool { transStreamSinks := s.TransStreamSinks[sink.GetTransStreamID()] - delete(s.Sinks, sink.GetID()) + delete(s.sinks, sink.GetID()) delete(transStreamSinks, sink.GetID()) s.sinkCount-- @@ -599,7 +620,7 @@ func (s *PublishSource) DoClose() { } // 将所有sink添加到等待队列 - for _, sink := range s.Sinks { + for _, sink := range s.sinks { transStreamID := sink.GetTransStreamID() sink.SetTransStreamID(0) if s.recordSink == sink { @@ -625,7 +646,7 @@ func (s *PublishSource) DoClose() { } s.TransStreams = nil - s.Sinks = nil + s.sinks = nil s.TransStreamSinks = nil // 异步hook @@ -786,7 +807,7 @@ func (s *PublishSource) RemoteAddr() string { } func (s *PublishSource) String() string { - return fmt.Sprintf("source: %s type: %s conn: %s ", s.ID, s.Type.ToString(), s.RemoteAddr()) + return fmt.Sprintf("source: %s type: %s conn: %s ", s.ID, s.Type.String(), s.RemoteAddr()) } func (s *PublishSource) State() SessionState { @@ -813,8 +834,19 @@ func (s *PublishSource) SetCreateTime(time time.Time) { s.createTime = time } -func (s *PublishSource) PlaySink(sink Sink) { - s.PostEvent(func() { +func (s *PublishSource) Sinks() []Sink { + var sinks []Sink + group := sync.WaitGroup{} + group.Add(1) + s.PostEvent(func() { + for _, sink := range s.sinks { + sinks = append(sinks, sink) + } + + group.Done() }) + + group.Wait() + return sinks } diff --git a/stream/source_utils.go b/stream/source_utils.go index c3288cf..a10ab88 100644 --- a/stream/source_utils.go +++ b/stream/source_utils.go @@ -45,7 +45,7 @@ const ( SessionStateClosed = SessionState(7) // 关闭状态 ) -func (s SourceType) ToString() string { +func (s SourceType) String() string { if SourceTypeRtmp == s { return "rtmp" } else if SourceType28181 == s { @@ -57,7 +57,7 @@ func (s SourceType) ToString() string { panic(fmt.Sprintf("unknown source type %d", s)) } -func (p TransStreamProtocol) ToString() string { +func (p TransStreamProtocol) String() string { if TransStreamRtmp == p { return "rtmp" } else if TransStreamFlv == p { diff --git a/stream/stream_factory.go b/stream/stream_factory.go index 501f8b0..3343da1 100644 --- a/stream/stream_factory.go +++ b/stream/stream_factory.go @@ -21,7 +21,7 @@ func init() { func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory) { _, ok := transStreamFactories[protocol] if ok { - panic(fmt.Sprintf("%s has been registered", protocol.ToString())) + panic(fmt.Sprintf("%s has been registered", protocol.String())) } transStreamFactories[protocol] = streamFunc @@ -30,7 +30,7 @@ func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransSt func FindTransStreamFactory(protocol TransStreamProtocol) (TransStreamFactory, error) { f, ok := transStreamFactories[protocol] if !ok { - return nil, fmt.Errorf("unknown protocol %s", protocol.ToString()) + return nil, fmt.Errorf("unknown protocol %s", protocol.String()) } return f, nil diff --git a/stream/stream_server.go b/stream/stream_server.go index 258ef8d..b8279de 100644 --- a/stream/stream_server.go +++ b/stream/stream_server.go @@ -18,7 +18,7 @@ type StreamServer[T any] struct { } func (s *StreamServer[T]) OnConnected(conn net.Conn) []byte { - log.Sugar.Debugf("%s连接 conn:%s", s.SourceType.ToString(), conn.RemoteAddr().String()) + log.Sugar.Debugf("%s连接 conn:%s", s.SourceType.String(), conn.RemoteAddr().String()) conn.(*transport.Conn).Data = s.Handler.OnNewSession(conn) return nil } @@ -32,7 +32,7 @@ func (s *StreamServer[T]) OnPacket(conn net.Conn, data []byte) []byte { } func (s *StreamServer[T]) OnDisConnected(conn net.Conn, err error) { - log.Sugar.Debugf("%s断开连接 conn:%s", s.SourceType.ToString(), conn.RemoteAddr().String()) + log.Sugar.Debugf("%s断开连接 conn:%s", s.SourceType.String(), conn.RemoteAddr().String()) t := conn.(*transport.Conn) if t.Data != nil {