diff --git a/cluster/api.go b/cluster/api.go index 92777ea5..49ab5199 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -62,10 +62,6 @@ func NewAPI(config APIConfig) (API, error) { a.router.HideBanner = true a.router.HidePort = true - mwlog.NewWithConfig(mwlog.Config{ - Logger: a.logger, - }) - a.router.Use(mwlog.NewWithConfig(mwlog.Config{ Logger: a.logger, })) diff --git a/cluster/cluster.go b/cluster/cluster.go index d3216093..0bb5d067 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -61,6 +61,8 @@ type Cluster interface { Shutdown() error + ListProcesses() []store.Process + GetProcess(id string) (store.Process, error) AddProcess(origin string, config *app.Config) error RemoveProcess(origin, id string) error UpdateProcess(origin, id string, config *app.Config) error @@ -686,6 +688,14 @@ func (c *cluster) trackLeaderChanges() { } } +func (c *cluster) ListProcesses() []store.Process { + return c.store.ProcessList() +} + +func (c *cluster) GetProcess(id string) (store.Process, error) { + return c.store.GetProcess(id) +} + func (c *cluster) AddProcess(origin string, config *app.Config) error { if !c.IsRaftLeader() { return c.forwarder.AddProcess(origin, config) diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 7810006e..79935a58 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -32,6 +32,7 @@ type Node interface { ProcessStart(id string) error ProcessStop(id string) error ProcessDelete(id string) error + ProcessUpdate(id string, config *app.Config) error NodeReader } @@ -746,8 +747,15 @@ func (n *node) ProcessAdd(config *app.Config) error { return fmt.Errorf("not connected") } + cfg := convertConfig(config) + + return n.peer.ProcessAdd(cfg) +} + +func convertConfig(config *app.Config) clientapi.ProcessConfig { cfg := clientapi.ProcessConfig{ ID: config.ID, + Type: "ffmpeg", Reference: config.Reference, Input: []clientapi.ProcessConfigIO{}, Output: []clientapi.ProcessConfigIO{}, @@ -791,7 +799,7 @@ func (n *node) ProcessAdd(config *app.Config) error { cfg.Output = append(cfg.Output, output) } - return n.peer.ProcessAdd(cfg) + return cfg } func (n *node) ProcessStart(id string) error { @@ -826,3 +834,16 @@ func (n *node) ProcessDelete(id string) error { return n.peer.ProcessDelete(id) } + +func (n *node) ProcessUpdate(id string, config *app.Config) error { + n.peerLock.RLock() + defer n.peerLock.RUnlock() + + if n.peer == nil { + return fmt.Errorf("not connected") + } + + cfg := convertConfig(config) + + return n.peer.ProcessUpdate(id, cfg) +} diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index 866bbd07..24ddcbde 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -531,10 +531,10 @@ func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config) erro p.lock.RLock() defer p.lock.RUnlock() - _, ok := p.nodes[nodeid] + node, ok := p.nodes[nodeid] if !ok { return fmt.Errorf("node not found") } - return fmt.Errorf("not implemented") + return node.ProcessUpdate(id, config) } diff --git a/cluster/store/store.go b/cluster/store/store.go index 1342d147..528ce99f 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -21,6 +21,7 @@ type Store interface { } type Process struct { + CreatedAt time.Time UpdatedAt time.Time Config *app.Config } @@ -103,8 +104,10 @@ func (s *store) Apply(entry *raft.Log) interface{} { s.lock.Lock() _, ok := s.Process[cmd.ID] if !ok { + now := time.Now() s.Process[cmd.ID] = Process{ - UpdatedAt: time.Now(), + CreatedAt: now, + UpdatedAt: now, Config: &cmd.Config, } } diff --git a/docs/docs.go b/docs/docs.go index 6b005990..2015a527 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -275,6 +275,35 @@ const docTemplate = `{ } } }, + "/api/v3/cluster/node/process": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of processes in the cluster", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List of processes in the cluster", + "operationId": "cluster-3-list-node-processes", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterProcess" + } + } + } + } + } + }, "/api/v3/cluster/node/{id}": { "get": { "security": [ @@ -379,7 +408,7 @@ const docTemplate = `{ "schema": { "type": "array", "items": { - "$ref": "#/definitions/api.ClusterProcess" + "$ref": "#/definitions/api.Process" } } } @@ -431,6 +460,63 @@ const docTemplate = `{ } }, "/api/v3/cluster/process/{id}": { + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Replace an existing process.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Replace an existing process", + "operationId": "cluster-3-update-process", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "Process config", + "name": "config", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ProcessConfig" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ProcessConfig" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + }, "delete": { "security": [ { diff --git a/docs/swagger.json b/docs/swagger.json index 5409f3b1..bfdf9149 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -268,6 +268,35 @@ } } }, + "/api/v3/cluster/node/process": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of processes in the cluster", + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "List of processes in the cluster", + "operationId": "cluster-3-list-node-processes", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterProcess" + } + } + } + } + } + }, "/api/v3/cluster/node/{id}": { "get": { "security": [ @@ -372,7 +401,7 @@ "schema": { "type": "array", "items": { - "$ref": "#/definitions/api.ClusterProcess" + "$ref": "#/definitions/api.Process" } } } @@ -424,6 +453,63 @@ } }, "/api/v3/cluster/process/{id}": { + "put": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Replace an existing process.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Replace an existing process", + "operationId": "cluster-3-update-process", + "parameters": [ + { + "type": "string", + "description": "Process ID", + "name": "id", + "in": "path", + "required": true + }, + { + "description": "Process config", + "name": "config", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/api.ProcessConfig" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ProcessConfig" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + }, "delete": { "security": [ { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 24c12f3f..b05033eb 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -2302,6 +2302,24 @@ paths: summary: List the files of a proxy node by its ID tags: - v16.?.? + /api/v3/cluster/node/process: + get: + description: List of processes in the cluster + operationId: cluster-3-list-node-processes + produces: + - application/json + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/api.ClusterProcess' + type: array + security: + - ApiKeyAuth: [] + summary: List of processes in the cluster + tags: + - v16.?.? /api/v3/cluster/process: get: description: List of processes in the cluster @@ -2313,7 +2331,7 @@ paths: description: OK schema: items: - $ref: '#/definitions/api.ClusterProcess' + $ref: '#/definitions/api.Process' type: array security: - ApiKeyAuth: [] @@ -2374,6 +2392,43 @@ paths: summary: Delete a process by its ID tags: - v16.?.? + put: + consumes: + - application/json + description: Replace an existing process. + operationId: cluster-3-update-process + parameters: + - description: Process ID + in: path + name: id + required: true + type: string + - description: Process config + in: body + name: config + required: true + schema: + $ref: '#/definitions/api.ProcessConfig' + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.ProcessConfig' + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: Replace an existing process + tags: + - v16.?.? /api/v3/config: get: description: Retrieve the currently active Restreamer configuration diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 598ba7cc..d47dd240 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -11,6 +11,7 @@ import ( "github.com/datarhei/core/v16/encoding/json" "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" + "github.com/datarhei/core/v16/restream" "github.com/labstack/echo/v4" "github.com/lithammer/shortuuid/v4" @@ -101,6 +102,30 @@ func (h *ClusterHandler) GetNode(c echo.Context) error { return c.JSON(http.StatusOK, node) } +// GetNodeVersion returns the proxy node version with the given ID +// @Summary List a proxy node by its ID +// @Description List a proxy node by its ID +// @Tags v16.?.? +// @ID cluster-3-get-node +// @Produce json +// @Param id path string true "Node ID" +// @Success 200 {object} api.ClusterNode +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/node/{id} [get] +func (h *ClusterHandler) GetNodeVersion(c echo.Context) error { + id := util.PathParam(c, "id") + + peer, err := h.proxy.GetNode(id) + if err != nil { + return api.Err(http.StatusNotFound, "Node not found", "%s", err) + } + + version := peer.Version() + + return c.JSON(http.StatusOK, version) +} + // GetNodeFiles returns the files from the proxy node with the given ID // @Summary List the files of a proxy node by its ID // @Description List the files of a proxy node by its ID @@ -180,16 +205,16 @@ func (h *ClusterHandler) About(c echo.Context) error { return c.JSON(http.StatusOK, about) } -// ListProcesses returns the list of processes in the cluster +// ListNodeProcesses returns the list of processes running on the nodes of the cluster // @Summary List of processes in the cluster // @Description List of processes in the cluster // @Tags v16.?.? -// @ID cluster-3-list-processes +// @ID cluster-3-list-node-processes // @Produce json // @Success 200 {array} api.ClusterProcess // @Security ApiKeyAuth -// @Router /api/v3/cluster/process [get] -func (h *ClusterHandler) ListProcesses(c echo.Context) error { +// @Router /api/v3/cluster/node/process [get] +func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error { procs := h.proxy.ListProcesses() processes := []api.ClusterProcess{} @@ -210,6 +235,40 @@ func (h *ClusterHandler) ListProcesses(c echo.Context) error { return c.JSON(http.StatusOK, processes) } +// ListStoreProcesses returns the list of processes stored in the DB of the cluster +// @Summary List of processes in the cluster +// @Description List of processes in the cluster +// @Tags v16.?.? +// @ID cluster-3-list-processes +// @Produce json +// @Success 200 {array} api.Process +// @Security ApiKeyAuth +// @Router /api/v3/cluster/process [get] +func (h *ClusterHandler) ListProcesses(c echo.Context) error { + procs := h.cluster.ListProcesses() + + processes := []api.Process{} + + for _, p := range procs { + process := api.Process{ + ID: p.Config.ID, + Type: "ffmpeg", + Reference: p.Config.Reference, + CreatedAt: 0, + UpdatedAt: p.UpdatedAt.Unix(), + } + + config := &api.ProcessConfig{} + config.Unmarshal(p.Config) + + process.Config = config + + processes = append(processes, process) + } + + return c.JSON(http.StatusOK, processes) +} + // Add adds a new process to the cluster // @Summary Add a new process // @Description Add a new FFmpeg process @@ -250,6 +309,54 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error { return c.JSON(http.StatusOK, process) } +// Update replaces an existing process +// @Summary Replace an existing process +// @Description Replace an existing process. +// @Tags v16.?.? +// @ID cluster-3-update-process +// @Accept json +// @Produce json +// @Param id path string true "Process ID" +// @Param config body api.ProcessConfig true "Process config" +// @Success 200 {object} api.ProcessConfig +// @Failure 400 {object} api.Error +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/process/{id} [put] +func (h *ClusterHandler) UpdateProcess(c echo.Context) error { + id := util.PathParam(c, "id") + + process := api.ProcessConfig{ + ID: id, + Type: "ffmpeg", + Autostart: true, + } + + current, err := h.cluster.GetProcess(id) + if err != nil { + return api.Err(http.StatusNotFound, "Process not found", "%s", id) + } + + // Prefill the config with the current values + process.Unmarshal(current.Config) + + if err := util.ShouldBindJSON(c, &process); err != nil { + return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + config := process.Marshal() + + if err := h.cluster.UpdateProcess("", id, config); err != nil { + if err == restream.ErrUnknownProcess { + return api.Err(http.StatusNotFound, "Process not found", "%s", id) + } + + return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err) + } + + return c.JSON(http.StatusOK, process) +} + // Delete deletes the process with the given ID from the cluster // @Summary Delete a process by its ID // @Description Delete a process by its ID @@ -270,157 +377,3 @@ func (h *ClusterHandler) DeleteProcess(c echo.Context) error { return c.JSON(http.StatusOK, "OK") } - -/* -// AddNode adds a new node -// @Summary Add a new node -// @Description Add a new node to the cluster -// @ID cluster-3-add-node -// @Accept json -// @Produce json -// @Param config body api.ClusterNodeConfig true "Node config" -// @Success 200 {string} string -// @Failure 400 {object} api.Error -// @Security ApiKeyAuth -// @Router /api/v3/cluster/node [post] -func (h *ClusterHandler) AddNode(c echo.Context) error { - node := api.ClusterNodeConfig{} - - if err := util.ShouldBindJSON(c, &node); err != nil { - return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) - } - - id, err := h.cluster.AddNodeX(node.Address, "", "") - if err != nil { - return api.Err(http.StatusBadRequest, "Failed to add node", "%s", err) - } - - return c.JSON(http.StatusOK, id) -} - -// DeleteNode deletes the node with the given ID -// @Summary Delete a node by its ID -// @Description Delete a node by its ID -// @ID cluster-3-delete-node -// @Produce json -// @Param id path string true "Node ID" -// @Success 200 {string} string -// @Failure 404 {object} api.Error -// @Failure 500 {object} api.Error -// @Security ApiKeyAuth -// @Router /api/v3/cluster/node/{id} [delete] -func (h *ClusterHandler) DeleteNode(c echo.Context) error { - id := util.PathParam(c, "id") - - if err := h.cluster.RemoveNodeX(id); err != nil { - if err == cluster.ErrNodeNotFound { - return api.Err(http.StatusNotFound, err.Error(), "%s", id) - } - - return api.Err(http.StatusInternalServerError, "Failed to remove node", "%s", err) - } - - return c.JSON(http.StatusOK, "OK") -} - -// GetNode returns the node with the given ID -// @Summary List a node by its ID -// @Description List a node by its ID -// @ID cluster-3-get-node -// @Produce json -// @Param id path string true "Node ID" -// @Success 200 {object} api.ClusterNode -// @Failure 404 {object} api.Error -// @Security ApiKeyAuth -// @Router /api/v3/cluster/node/{id} [get] -func (h *ClusterHandler) GetNode(c echo.Context) error { - id := util.PathParam(c, "id") - - peer, err := h.cluster.GetNodeX(id) - if err != nil { - return api.Err(http.StatusNotFound, "Node not found", "%s", err) - } - - state := peer.State() - - node := api.ClusterNode{ - Address: peer.Address(), - ID: state.ID, - LastUpdate: state.LastUpdate.Unix(), - State: state.State, - } - - return c.JSON(http.StatusOK, node) -} - -// GetNodeProxy returns the files from the node with the given ID -// @Summary List the files of a node by its ID -// @Description List the files of a node by its ID -// @ID cluster-3-get-node-proxy -// @Produce json -// @Param id path string true "Node ID" -// @Success 200 {object} api.ClusterNodeFiles -// @Failure 404 {object} api.Error -// @Security ApiKeyAuth -// @Router /api/v3/cluster/node/{id}/proxy [get] -func (h *ClusterHandler) GetNodeProxy(c echo.Context) error { - id := util.PathParam(c, "id") - - peer, err := h.cluster.GetNodeX(id) - if err != nil { - return api.Err(http.StatusNotFound, "Node not found", "%s", err) - } - - files := api.ClusterNodeFiles{} - - state := peer.State() - - sort.Strings(state.Files) - - for _, path := range state.Files { - prefix := strings.TrimSuffix(h.prefix.FindString(path), ":") - path = h.prefix.ReplaceAllString(path, "") - - files[prefix] = append(files[prefix], path) - } - - return c.JSON(http.StatusOK, files) -} - -// UpdateNode replaces an existing node -// @Summary Replaces an existing node -// @Description Replaces an existing node and returns the new node ID -// @ID cluster-3-update-node -// @Accept json -// @Produce json -// @Param id path string true "Node ID" -// @Param config body api.ClusterNodeConfig true "Node config" -// @Success 200 {string} string -// @Failure 400 {object} api.Error -// @Failure 404 {object} api.Error -// @Security ApiKeyAuth -// @Router /api/v3/cluster/node/{id} [put] -func (h *ClusterHandler) UpdateNode(c echo.Context) error { - id := util.PathParam(c, "id") - - node := api.ClusterNodeConfig{} - - if err := util.ShouldBindJSON(c, &node); err != nil { - return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) - } - - if err := h.cluster.RemoveNode(id); err != nil { - if err == cluster.ErrNodeNotFound { - return api.Err(http.StatusNotFound, err.Error(), "%s", id) - } - return api.Err(http.StatusBadRequest, "Failed to remove node", "%s", err) - } - - id, err := h.cluster.AddNodeX(node.Address, "", "") - if err != nil { - return api.Err(http.StatusBadRequest, "Failed to add node", "%s", err) - } - - return c.JSON(http.StatusOK, id) -} -*/ diff --git a/http/server.go b/http/server.go index 99b8d988..ae9c5ae3 100644 --- a/http/server.go +++ b/http/server.go @@ -660,15 +660,17 @@ func (s *server) setRoutesV3(v3 *echo.Group) { // v3 Cluster if s.v3handler.cluster != nil { v3.GET("/cluster", s.v3handler.cluster.About) + v3.GET("/cluster/process", s.v3handler.cluster.ListProcesses) v3.GET("/cluster/node", s.v3handler.cluster.GetNodes) + v3.GET("/cluster/node/process", s.v3handler.cluster.ListNodeProcesses) v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode) v3.GET("/cluster/node/:id/files", s.v3handler.cluster.GetNodeFiles) - - v3.GET("/cluster/process", s.v3handler.cluster.ListProcesses) + v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion) if !s.readOnly { v3.POST("/cluster/process", s.v3handler.cluster.AddProcess) + v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess) v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess) } }