diff --git a/internal/streams/handlers.go b/internal/streams/handlers.go index ecb76d7c..3009dd66 100644 --- a/internal/streams/handlers.go +++ b/internal/streams/handlers.go @@ -73,3 +73,25 @@ func Location(url string) (string, error) { return "", nil } + +// TODO: rework + +type ConsumerHandler func(url string) (core.Consumer, func(), error) + +var consumerHandlers = map[string]ConsumerHandler{} + +func HandleConsumerFunc(scheme string, handler ConsumerHandler) { + consumerHandlers[scheme] = handler +} + +func GetConsumer(url string) (core.Consumer, func(), error) { + if i := strings.IndexByte(url, ':'); i > 0 { + scheme := url[:i] + + if handler, ok := consumerHandlers[scheme]; ok { + return handler(url) + } + } + + return nil, nil, errors.New("streams: unsupported scheme: " + url) +} diff --git a/internal/streams/publish.go b/internal/streams/publish.go new file mode 100644 index 00000000..259ddb8d --- /dev/null +++ b/internal/streams/publish.go @@ -0,0 +1,19 @@ +package streams + +func (s *Stream) Publish(url string) error { + cons, run, err := GetConsumer(url) + if err != nil { + return err + } + + if err = s.AddConsumer(cons); err != nil { + return err + } + + go func() { + run() + s.RemoveConsumer(cons) + }() + + return nil +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index bc23bf54..14bc09c8 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -172,6 +172,10 @@ func streamsHandler(w http.ResponseWriter, r *http.Request) { } else { api.ResponseJSON(w, stream) } + } else if stream = Get(src); stream != nil { + if err := stream.Publish(dst); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } } else { http.Error(w, "", http.StatusNotFound) }