diff --git a/ffmpeg/parse/parser.go b/ffmpeg/parse/parser.go index 9fe31590..bdf664b4 100644 --- a/ffmpeg/parse/parser.go +++ b/ffmpeg/parse/parser.go @@ -524,10 +524,7 @@ func (p *parser) Progress() Progress { p.progress.ffmpeg.exportTo(&progress) - fmt.Printf("%+v\n", p.progress.avstream) - for i, io := range progress.Input { - fmt.Printf("checking for %s\n", io.Address) av, ok := p.progress.avstream[io.Address] if !ok { continue diff --git a/http/api/progress.go b/http/api/progress.go index a402d55a..f4c149d2 100644 --- a/http/api/progress.go +++ b/http/api/progress.go @@ -38,7 +38,7 @@ type ProgressIO struct { Channels uint64 `json:"channels,omitempty" format:"uint64"` // avstream - AVstream *AVstream `json:"avstream"` + AVstream *AVstream `json:"avstream" jsonschema:"anyof_type=null;object"` } // Unmarshal converts a restreamer ProgressIO to a ProgressIO in API representation diff --git a/http/handler/api/filesystems_test.go b/http/handler/api/filesystems_test.go new file mode 100644 index 00000000..7e4e84a0 --- /dev/null +++ b/http/handler/api/filesystems_test.go @@ -0,0 +1,141 @@ +package api + +import ( + "encoding/json" + "io" + "net/http" + "testing" + + "github.com/datarhei/core/v16/http/api" + httpfs "github.com/datarhei/core/v16/http/fs" + "github.com/datarhei/core/v16/http/handler" + "github.com/datarhei/core/v16/http/mock" + "github.com/datarhei/core/v16/io/fs" + "github.com/stretchr/testify/require" + + "github.com/labstack/echo/v4" +) + +func getDummyFilesystemsHandler(filesystem httpfs.FS) (*FSHandler, error) { + handler := NewFS(map[string]FSConfig{ + filesystem.Name: { + Type: filesystem.Filesystem.Type(), + Mountpoint: filesystem.Mountpoint, + Handler: handler.NewFS(filesystem), + }, + }) + + return handler, nil +} + +func getDummyFilesystemsRouter(filesystem fs.Filesystem) (*echo.Echo, error) { + router := mock.DummyEcho() + + fs := httpfs.FS{ + Name: "foo", + Mountpoint: "/", + AllowWrite: true, + EnableAuth: false, + Username: "", + Password: "", + DefaultFile: "", + DefaultContentType: "text/html", + Gzip: false, + Filesystem: filesystem, + Cache: nil, + } + + handler, err := getDummyFilesystemsHandler(fs) + if err != nil { + return nil, err + } + + router.GET("/:name/*", handler.GetFile) + router.PUT("/:name/*", handler.PutFile) + router.DELETE("/:name/*", handler.DeleteFile) + router.GET("/:name", handler.ListFiles) + router.GET("/", handler.List) + + return router, nil +} + +func TestFilesystems(t *testing.T) { + memfs, err := fs.NewMemFilesystem(fs.MemConfig{}) + require.NoError(t, err) + + router, err := getDummyFilesystemsRouter(memfs) + require.NoError(t, err) + + response := mock.Request(t, http.StatusOK, router, "GET", "/", nil) + + mock.Validate(t, &[]api.FilesystemInfo{}, response.Data) + + f := []api.FilesystemInfo{} + err = json.Unmarshal(response.Raw, &f) + require.NoError(t, err) + + require.Equal(t, 1, len(f)) + + mock.Request(t, http.StatusNotFound, router, "GET", "/bar", nil) + + response = mock.Request(t, http.StatusOK, router, "GET", "/foo", nil) + + mock.Validate(t, &[]api.FileInfo{}, response.Data) + + l := []api.FileInfo{} + err = json.Unmarshal(response.Raw, &l) + require.NoError(t, err) + + require.Equal(t, 0, len(l)) + + mock.Request(t, http.StatusNotFound, router, "GET", "/bar/file", nil) + mock.Request(t, http.StatusNotFound, router, "GET", "/foo/file", nil) + + data := mock.Read(t, "./fixtures/addProcess.json") + require.NoError(t, err) + + mock.Request(t, http.StatusCreated, router, "PUT", "/foo/file", data) + mock.Request(t, http.StatusNotFound, router, "PUT", "/bar/file", data) + + data = mock.Read(t, "./fixtures/addProcess.json") + require.NoError(t, err) + + mock.Request(t, http.StatusNoContent, router, "PUT", "/foo/file", data) + + require.Equal(t, 1, len(memfs.List("/", ""))) + + response = mock.Request(t, http.StatusOK, router, "GET", "/foo", nil) + + mock.Validate(t, &[]api.FileInfo{}, response.Data) + + l = []api.FileInfo{} + err = json.Unmarshal(response.Raw, &l) + require.NoError(t, err) + + require.Equal(t, 1, len(l)) + + mock.Request(t, http.StatusNotFound, router, "GET", "/foo/elif", nil) + + response = mock.Request(t, http.StatusOK, router, "GET", "/foo/file", nil) + + databytes, err := io.ReadAll(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + require.Equal(t, databytes, response.Raw) + + mock.Request(t, http.StatusNotFound, router, "DELETE", "/foo/elif", nil) + mock.Request(t, http.StatusNotFound, router, "DELETE", "/bar/elif", nil) + mock.Request(t, http.StatusOK, router, "DELETE", "/foo/file", nil) + mock.Request(t, http.StatusNotFound, router, "GET", "/foo/file", nil) + + require.Equal(t, 0, len(memfs.List("/", ""))) + + response = mock.Request(t, http.StatusOK, router, "GET", "/foo", nil) + + mock.Validate(t, &[]api.FileInfo{}, response.Data) + + l = []api.FileInfo{} + err = json.Unmarshal(response.Raw, &l) + require.NoError(t, err) + + require.Equal(t, 0, len(l)) +} diff --git a/http/handler/api/restream_test.go b/http/handler/api/restream_test.go index 516db9ce..f470b9f0 100644 --- a/http/handler/api/restream_test.go +++ b/http/handler/api/restream_test.go @@ -4,7 +4,9 @@ import ( "bytes" "encoding/json" "net/http" + "strconv" "testing" + "time" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/mock" @@ -41,10 +43,22 @@ func getDummyRestreamRouter() (*echo.Echo, error) { router.GET("/", restream.GetAll) router.POST("/", restream.Add) router.GET("/:id", restream.Get) + router.GET("/:id/config", restream.GetConfig) router.GET("/:id/report", restream.GetReport) + router.GET("/:id/state", restream.GetState) + router.GET("/:id/report/:at", restream.GetReportAt) router.PUT("/:id", restream.Update) router.DELETE("/:id", restream.Delete) router.PUT("/:id/command", restream.Command) + router.GET("/:id/metadata", restream.GetProcessMetadata) + router.GET("/:id/metadata/:key", restream.GetProcessMetadata) + router.PUT("/:id/metadata/:key", restream.SetProcessMetadata) + + router.GET("/metadata", restream.GetMetadata) + router.GET("/metadata/:key", restream.GetMetadata) + router.PUT("/metadata/:key", restream.SetMetadata) + + router.GET("/report/process", restream.SearchReportHistory) return router, nil } @@ -203,6 +217,49 @@ func TestRemoveProcess(t *testing.T) { mock.Request(t, http.StatusOK, router, "DELETE", "/test", nil) } +func TestAllProcesses(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + response := mock.Request(t, http.StatusOK, router, "GET", "/", nil) + + mock.Validate(t, &[]api.Process{}, response.Data) + + p := []api.Process{} + err = json.Unmarshal(response.Raw, &p) + require.NoError(t, err) + + require.Equal(t, 0, len(p)) + + data := mock.Read(t, "./fixtures/addProcess.json") + + mock.Request(t, http.StatusOK, router, "POST", "/", data) + + response = mock.Request(t, http.StatusOK, router, "GET", "/", nil) + + mock.Validate(t, &[]api.Process{}, response.Data) + + p = []api.Process{} + err = json.Unmarshal(response.Raw, &p) + require.NoError(t, err) + + require.Equal(t, 1, len(p)) +} + +func TestProcess(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + mock.Request(t, http.StatusNotFound, router, "GET", "/test", nil) + + data := mock.Read(t, "./fixtures/addProcess.json") + + mock.Request(t, http.StatusOK, router, "POST", "/", data) + response := mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + mock.Validate(t, &api.Process{}, response.Data) +} + func TestProcessInfo(t *testing.T) { router, err := getDummyRestreamRouter() require.NoError(t, err) @@ -215,6 +272,31 @@ func TestProcessInfo(t *testing.T) { mock.Validate(t, &api.Process{}, response.Data) } +func TestProcessConfig(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := mock.Read(t, "./fixtures/addProcess.json") + + mock.Request(t, http.StatusOK, router, "POST", "/", data) + + response := mock.Request(t, http.StatusOK, router, "GET", "/test/config", nil) + + mock.Validate(t, &api.ProcessConfig{}, response.Data) +} + +func TestProcessState(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := mock.Read(t, "./fixtures/addProcess.json") + + mock.Request(t, http.StatusOK, router, "POST", "/", data) + response := mock.Request(t, http.StatusOK, router, "GET", "/test/state", nil) + + mock.Validate(t, &api.ProcessState{}, response.Data) +} + func TestProcessReportNotFound(t *testing.T) { router, err := getDummyRestreamRouter() require.NoError(t, err) @@ -234,6 +316,126 @@ func TestProcessReport(t *testing.T) { mock.Validate(t, &api.ProcessReport{}, response.Data) } +func TestProcessReportAt(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := mock.Read(t, "./fixtures/addProcess.json") + + mock.Request(t, http.StatusOK, router, "POST", "/", data) + + command := mock.Read(t, "./fixtures/commandStart.json") + mock.Request(t, http.StatusOK, router, "PUT", "/test/command", command) + mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + time.Sleep(2 * time.Second) + + command = mock.Read(t, "./fixtures/commandStop.json") + mock.Request(t, http.StatusOK, router, "PUT", "/test/command", command) + mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + response := mock.Request(t, http.StatusOK, router, "GET", "/test/report", nil) + + x := api.ProcessReport{} + err = json.Unmarshal(response.Raw, &x) + require.NoError(t, err) + + require.Equal(t, 1, len(x.History)) + + at := x.History[0].CreatedAt + + mock.Request(t, http.StatusOK, router, "GET", "/test/report/"+strconv.FormatInt(at, 10), nil) + mock.Request(t, http.StatusNotFound, router, "GET", "/test/report/1234", nil) +} + +func TestSearchReportHistory(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := mock.Read(t, "./fixtures/addProcess.json") + + mock.Request(t, http.StatusOK, router, "POST", "/", data) + + command := mock.Read(t, "./fixtures/commandStart.json") + mock.Request(t, http.StatusOK, router, "PUT", "/test/command", command) + mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + time.Sleep(2 * time.Second) + + command = mock.Read(t, "./fixtures/commandStop.json") + mock.Request(t, http.StatusOK, router, "PUT", "/test/command", command) + mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + command = mock.Read(t, "./fixtures/commandStart.json") + mock.Request(t, http.StatusOK, router, "PUT", "/test/command", command) + mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + time.Sleep(2 * time.Second) + + command = mock.Read(t, "./fixtures/commandStop.json") + mock.Request(t, http.StatusOK, router, "PUT", "/test/command", command) + mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + response := mock.Request(t, http.StatusOK, router, "GET", "/test/report", nil) + + x := api.ProcessReport{} + err = json.Unmarshal(response.Raw, &x) + require.NoError(t, err) + + require.Equal(t, 2, len(x.History)) + + time1 := x.History[0].CreatedAt + time2 := x.History[1].CreatedAt + + response = mock.Request(t, http.StatusOK, router, "GET", "/report/process", nil) + + r := []api.ProcessReportSearchResult{} + err = json.Unmarshal(response.Raw, &r) + require.NoError(t, err) + + require.Equal(t, 2, len(r)) + + response = mock.Request(t, http.StatusOK, router, "GET", "/report/process?state=failed", nil) + + r = []api.ProcessReportSearchResult{} + err = json.Unmarshal(response.Raw, &r) + require.NoError(t, err) + + require.Equal(t, 0, len(r)) + + response = mock.Request(t, http.StatusOK, router, "GET", "/report/process?state=finished", nil) + + r = []api.ProcessReportSearchResult{} + err = json.Unmarshal(response.Raw, &r) + require.NoError(t, err) + + require.Equal(t, 2, len(r)) + + response = mock.Request(t, http.StatusOK, router, "GET", "/report/process?from="+strconv.FormatInt(time1, 10), nil) + + r = []api.ProcessReportSearchResult{} + err = json.Unmarshal(response.Raw, &r) + require.NoError(t, err) + + require.Equal(t, 2, len(r)) + + response = mock.Request(t, http.StatusOK, router, "GET", "/report/process?to="+strconv.FormatInt(time2, 10), nil) + + r = []api.ProcessReportSearchResult{} + err = json.Unmarshal(response.Raw, &r) + require.NoError(t, err) + + require.Equal(t, 1, len(r)) + + response = mock.Request(t, http.StatusOK, router, "GET", "/report/process?from="+strconv.FormatInt(time1, 10)+"&to="+strconv.FormatInt(time2+1, 10), nil) + + r = []api.ProcessReportSearchResult{} + err = json.Unmarshal(response.Raw, &r) + require.NoError(t, err) + + require.Equal(t, 2, len(r)) +} + func TestProcessCommandNotFound(t *testing.T) { router, err := getDummyRestreamRouter() require.NoError(t, err) @@ -270,3 +472,71 @@ func TestProcessCommand(t *testing.T) { mock.Request(t, http.StatusOK, router, "PUT", "/test/command", command) mock.Request(t, http.StatusOK, router, "GET", "/test", data) } + +func TestProcessMetadata(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + data := mock.Read(t, "./fixtures/addProcess.json") + + mock.Request(t, http.StatusOK, router, "POST", "/", data) + + response := mock.Request(t, http.StatusOK, router, "GET", "/test/metadata", nil) + require.Equal(t, nil, response.Data) + + mock.Request(t, http.StatusNotFound, router, "GET", "/test/metadata/foobar", nil) + + data = bytes.NewReader([]byte("hello")) + mock.Request(t, http.StatusBadRequest, router, "PUT", "/test/metadata/foobar", data) + + data = bytes.NewReader([]byte(`"hello"`)) + mock.Request(t, http.StatusOK, router, "PUT", "/test/metadata/foobar", data) + + response = mock.Request(t, http.StatusOK, router, "GET", "/test/metadata/foobar", nil) + + x := "" + err = json.Unmarshal(response.Raw, &x) + require.NoError(t, err) + + require.Equal(t, "hello", x) + + data = bytes.NewReader([]byte(`null`)) + mock.Request(t, http.StatusOK, router, "PUT", "/test/metadata/foobar", data) + + mock.Request(t, http.StatusNotFound, router, "GET", "/test/metadata/foobar", nil) + + response = mock.Request(t, http.StatusOK, router, "GET", "/test/metadata", nil) + require.Equal(t, nil, response.Data) +} + +func TestMetadata(t *testing.T) { + router, err := getDummyRestreamRouter() + require.NoError(t, err) + + response := mock.Request(t, http.StatusOK, router, "GET", "/metadata", nil) + require.Equal(t, nil, response.Data) + + mock.Request(t, http.StatusNotFound, router, "GET", "/metadata/foobar", nil) + + data := bytes.NewReader([]byte("hello")) + mock.Request(t, http.StatusBadRequest, router, "PUT", "/metadata/foobar", data) + + data = bytes.NewReader([]byte(`"hello"`)) + mock.Request(t, http.StatusOK, router, "PUT", "/metadata/foobar", data) + + response = mock.Request(t, http.StatusOK, router, "GET", "/metadata/foobar", nil) + + x := "" + err = json.Unmarshal(response.Raw, &x) + require.NoError(t, err) + + require.Equal(t, "hello", x) + + data = bytes.NewReader([]byte(`null`)) + mock.Request(t, http.StatusOK, router, "PUT", "/metadata/foobar", data) + + mock.Request(t, http.StatusNotFound, router, "GET", "/metadata/foobar", nil) + + response = mock.Request(t, http.StatusOK, router, "GET", "/metadata", nil) + require.Equal(t, nil, response.Data) +} diff --git a/http/handler/api/widget_test.go b/http/handler/api/widget_test.go new file mode 100644 index 00000000..0e7d2434 --- /dev/null +++ b/http/handler/api/widget_test.go @@ -0,0 +1,59 @@ +package api + +import ( + "encoding/json" + "io" + "net/http" + "testing" + + "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/http/mock" + "github.com/datarhei/core/v16/restream" + "github.com/stretchr/testify/require" + + "github.com/labstack/echo/v4" +) + +func getDummyWidgetHandler(rs restream.Restreamer) (*WidgetHandler, error) { + handler := NewWidget(WidgetConfig{ + Restream: rs, + Registry: nil, + }) + + return handler, nil +} + +func getDummyWidgetRouter(rs restream.Restreamer) (*echo.Echo, error) { + router := mock.DummyEcho() + + widget, err := getDummyWidgetHandler(rs) + if err != nil { + return nil, err + } + + router.GET("/:id", widget.Get) + + return router, nil +} + +func TestWidget(t *testing.T) { + rs, err := mock.DummyRestreamer("../../mock") + require.NoError(t, err) + + router, err := getDummyWidgetRouter(rs) + require.NoError(t, err) + + data, err := io.ReadAll(mock.Read(t, "./fixtures/addProcess.json")) + require.NoError(t, err) + + process := api.ProcessConfig{} + err = json.Unmarshal(data, &process) + require.NoError(t, err) + + err = rs.AddProcess(process.Marshal()) + require.NoError(t, err) + + response := mock.Request(t, http.StatusOK, router, "GET", "/test", nil) + + mock.Validate(t, &api.WidgetProcess{}, response.Data) +} diff --git a/http/mock/mock.go b/http/mock/mock.go index 4285e5ef..2d93d7cb 100644 --- a/http/mock/mock.go +++ b/http/mock/mock.go @@ -46,7 +46,9 @@ func DummyRestreamer(pathPrefix string) (restream.Restreamer, error) { } ffmpeg, err := ffmpeg.New(ffmpeg.Config{ - Binary: binary, + Binary: binary, + MaxLogLines: 100, + LogHistoryLength: 3, }) if err != nil { return nil, err @@ -78,6 +80,7 @@ func DummyEcho() *echo.Echo { type Response struct { Code int `json:"code"` Message string `json:"message"` + Raw []byte Data interface{} } @@ -91,7 +94,7 @@ func Request(t *testing.T, httpstatus int, router *echo.Echo, method, path strin response := CheckResponse(t, w.Result()) - require.Equal(t, httpstatus, w.Code, response.Data) + require.Equal(t, httpstatus, w.Code, string(response.Raw)) return response } @@ -104,6 +107,8 @@ func CheckResponse(t *testing.T, res *http.Response) *Response { body, err := io.ReadAll(res.Body) require.Equal(t, nil, err) + response.Raw = body + if strings.Contains(res.Header.Get("Content-Type"), "application/json") { err := json.Unmarshal(body, &response.Data) require.Equal(t, nil, err) @@ -121,7 +126,8 @@ func CheckResponse(t *testing.T, res *http.Response) *Response { } func Validate(t *testing.T, datatype, data interface{}) bool { - schema, _ := jsonschema.Reflect(datatype).MarshalJSON() + schema, err := jsonschema.Reflect(datatype).MarshalJSON() + require.NoError(t, err) schemaLoader := gojsonschema.NewStringLoader(string(schema)) documentLoader := gojsonschema.NewGoLoader(data) diff --git a/restream/restream.go b/restream/restream.go index 59b1b2ab..8e8d6193 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -1435,6 +1435,22 @@ func (r *restream) GetProcessLog(id string) (*app.Log, error) { convertProgressFromParser(&e.Progress, h.Progress) + for i, p := range e.Progress.Input { + if int(p.Index) >= len(task.process.Config.Input) { + continue + } + + e.Progress.Input[i].ID = task.process.Config.Input[p.Index].ID + } + + for i, p := range e.Progress.Output { + if int(p.Index) >= len(task.process.Config.Output) { + continue + } + + e.Progress.Output[i].ID = task.process.Config.Output[p.Index].ID + } + e.LogEntry.Log = make([]app.LogLine, len(h.Log)) for i, line := range h.Log { e.LogEntry.Log[i] = app.LogLine{ @@ -1689,6 +1705,10 @@ func (r *restream) GetMetadata(key string) (interface{}, error) { defer r.lock.RUnlock() if len(key) == 0 { + if len(r.metadata) == 0 { + return nil, nil + } + return r.metadata, nil }