diff --git a/cmd/api/api.go b/cmd/api/api.go index 21c6d95a..64225d5e 100644 --- a/cmd/api/api.go +++ b/cmd/api/api.go @@ -83,7 +83,7 @@ func fileServerHandlder(w http.ResponseWriter, r *http.Request) { func statsHandler(w http.ResponseWriter, _ *http.Request) { v := map[string]interface{}{ - "streams": streams.Streams, + "streams": streams.All(), } data, err := json.Marshal(v) if err != nil { diff --git a/cmd/hass/hass.go b/cmd/hass/hass.go index de66bc9b..eb8b32a0 100644 --- a/cmd/hass/hass.go +++ b/cmd/hass/hass.go @@ -79,7 +79,7 @@ func handler(w http.ResponseWriter, r *http.Request) { return } - stream := streams.NewStream(url) + stream := streams.Get(url) str, err = webrtc.ExchangeSDP(stream, string(offer), r.UserAgent()) if err != nil { log.Error().Err(err).Msg("[api.hass] exchange SDP") diff --git a/cmd/streams/handlers.go b/cmd/streams/handlers.go index aad7be11..36f47b33 100644 --- a/cmd/streams/handlers.go +++ b/cmd/streams/handlers.go @@ -17,6 +17,11 @@ func HandleFunc(scheme string, handler Handler) { handlers[scheme] = handler } +func HasProducer(url string) bool { + i := strings.IndexByte(url, ':') + return handlers[url[:i]] != nil +} + func GetProducer(url string) (streamer.Producer, error) { i := strings.IndexByte(url, ':') handler := handlers[url[:i]] @@ -24,4 +29,4 @@ func GetProducer(url string) (streamer.Producer, error) { return nil, fmt.Errorf("unsupported scheme: %s", url) } return handler(url) -} \ No newline at end of file +} diff --git a/cmd/streams/stream.go b/cmd/streams/stream.go index f33ea8b0..31cf10ce 100644 --- a/cmd/streams/stream.go +++ b/cmd/streams/stream.go @@ -120,6 +120,20 @@ func (s *Stream) RemoveProducer(prod streamer.Producer) { panic("not implemented") } +func (s *Stream) Active() bool { + if len(s.consumers) > 0{ + return true + } + + for _, prod := range s.producers { + if prod.element != nil { + return true + } + } + + return false +} + func (s *Stream) MarshalJSON() ([]byte, error) { var v []interface{} for _, prod := range s.producers { diff --git a/cmd/streams/streams.go b/cmd/streams/streams.go index a9a1e558..9d9f052d 100644 --- a/cmd/streams/streams.go +++ b/cmd/streams/streams.go @@ -5,8 +5,6 @@ import ( "github.com/rs/zerolog" ) -var Streams = map[string]*Stream{} - func Init() { var cfg struct { Mod map[string]interface{} `yaml:"streams"` @@ -17,12 +15,34 @@ func Init() { log = app.GetLogger("streams") for name, item := range cfg.Mod { - Streams[name] = NewStream(item) + streams[name] = NewStream(item) } } func Get(name string) *Stream { - return Streams[name] + if stream, ok := streams[name]; ok { + return stream + } + + if HasProducer(name) { + log.Info().Str("url", name).Msg("[streams] create new stream") + stream := NewStream(name) + streams[name] = stream + return stream + } + + return nil +} + +func All() map[string]interface{} { + active := map[string]interface{}{} + for name, stream := range streams { + if stream.Active() { + active[name] = stream + } + } + return active } var log zerolog.Logger +var streams = map[string]*Stream{}