Fix update process, list process in raft store

This commit is contained in:
Ingo Oppermann
2023-05-13 21:21:47 +02:00
parent 29242b96ca
commit 6a4c715f75
10 changed files with 383 additions and 171 deletions

View File

@@ -62,10 +62,6 @@ func NewAPI(config APIConfig) (API, error) {
a.router.HideBanner = true
a.router.HidePort = true
mwlog.NewWithConfig(mwlog.Config{
Logger: a.logger,
})
a.router.Use(mwlog.NewWithConfig(mwlog.Config{
Logger: a.logger,
}))

View File

@@ -61,6 +61,8 @@ type Cluster interface {
Shutdown() error
ListProcesses() []store.Process
GetProcess(id string) (store.Process, error)
AddProcess(origin string, config *app.Config) error
RemoveProcess(origin, id string) error
UpdateProcess(origin, id string, config *app.Config) error
@@ -686,6 +688,14 @@ func (c *cluster) trackLeaderChanges() {
}
}
func (c *cluster) ListProcesses() []store.Process {
return c.store.ProcessList()
}
func (c *cluster) GetProcess(id string) (store.Process, error) {
return c.store.GetProcess(id)
}
func (c *cluster) AddProcess(origin string, config *app.Config) error {
if !c.IsRaftLeader() {
return c.forwarder.AddProcess(origin, config)

View File

@@ -32,6 +32,7 @@ type Node interface {
ProcessStart(id string) error
ProcessStop(id string) error
ProcessDelete(id string) error
ProcessUpdate(id string, config *app.Config) error
NodeReader
}
@@ -746,8 +747,15 @@ func (n *node) ProcessAdd(config *app.Config) error {
return fmt.Errorf("not connected")
}
cfg := convertConfig(config)
return n.peer.ProcessAdd(cfg)
}
func convertConfig(config *app.Config) clientapi.ProcessConfig {
cfg := clientapi.ProcessConfig{
ID: config.ID,
Type: "ffmpeg",
Reference: config.Reference,
Input: []clientapi.ProcessConfigIO{},
Output: []clientapi.ProcessConfigIO{},
@@ -791,7 +799,7 @@ func (n *node) ProcessAdd(config *app.Config) error {
cfg.Output = append(cfg.Output, output)
}
return n.peer.ProcessAdd(cfg)
return cfg
}
func (n *node) ProcessStart(id string) error {
@@ -826,3 +834,16 @@ func (n *node) ProcessDelete(id string) error {
return n.peer.ProcessDelete(id)
}
func (n *node) ProcessUpdate(id string, config *app.Config) error {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
if n.peer == nil {
return fmt.Errorf("not connected")
}
cfg := convertConfig(config)
return n.peer.ProcessUpdate(id, cfg)
}

View File

@@ -531,10 +531,10 @@ func (p *proxy) ProcessUpdate(nodeid string, id string, config *app.Config) erro
p.lock.RLock()
defer p.lock.RUnlock()
_, ok := p.nodes[nodeid]
node, ok := p.nodes[nodeid]
if !ok {
return fmt.Errorf("node not found")
}
return fmt.Errorf("not implemented")
return node.ProcessUpdate(id, config)
}

View File

@@ -21,6 +21,7 @@ type Store interface {
}
type Process struct {
CreatedAt time.Time
UpdatedAt time.Time
Config *app.Config
}
@@ -103,8 +104,10 @@ func (s *store) Apply(entry *raft.Log) interface{} {
s.lock.Lock()
_, ok := s.Process[cmd.ID]
if !ok {
now := time.Now()
s.Process[cmd.ID] = Process{
UpdatedAt: time.Now(),
CreatedAt: now,
UpdatedAt: now,
Config: &cmd.Config,
}
}

View File

@@ -275,6 +275,35 @@ const docTemplate = `{
}
}
},
"/api/v3/cluster/node/process": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List of processes in the cluster",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "List of processes in the cluster",
"operationId": "cluster-3-list-node-processes",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ClusterProcess"
}
}
}
}
}
},
"/api/v3/cluster/node/{id}": {
"get": {
"security": [
@@ -379,7 +408,7 @@ const docTemplate = `{
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ClusterProcess"
"$ref": "#/definitions/api.Process"
}
}
}
@@ -431,6 +460,63 @@ const docTemplate = `{
}
},
"/api/v3/cluster/process/{id}": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Replace an existing process.",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Replace an existing process",
"operationId": "cluster-3-update-process",
"parameters": [
{
"type": "string",
"description": "Process ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "Process config",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ProcessConfig"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.ProcessConfig"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
},
"delete": {
"security": [
{

View File

@@ -268,6 +268,35 @@
}
}
},
"/api/v3/cluster/node/process": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List of processes in the cluster",
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "List of processes in the cluster",
"operationId": "cluster-3-list-node-processes",
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ClusterProcess"
}
}
}
}
}
},
"/api/v3/cluster/node/{id}": {
"get": {
"security": [
@@ -372,7 +401,7 @@
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ClusterProcess"
"$ref": "#/definitions/api.Process"
}
}
}
@@ -424,6 +453,63 @@
}
},
"/api/v3/cluster/process/{id}": {
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Replace an existing process.",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"v16.?.?"
],
"summary": "Replace an existing process",
"operationId": "cluster-3-update-process",
"parameters": [
{
"type": "string",
"description": "Process ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "Process config",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ProcessConfig"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.ProcessConfig"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
},
"delete": {
"security": [
{

View File

@@ -2302,6 +2302,24 @@ paths:
summary: List the files of a proxy node by its ID
tags:
- v16.?.?
/api/v3/cluster/node/process:
get:
description: List of processes in the cluster
operationId: cluster-3-list-node-processes
produces:
- application/json
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/api.ClusterProcess'
type: array
security:
- ApiKeyAuth: []
summary: List of processes in the cluster
tags:
- v16.?.?
/api/v3/cluster/process:
get:
description: List of processes in the cluster
@@ -2313,7 +2331,7 @@ paths:
description: OK
schema:
items:
$ref: '#/definitions/api.ClusterProcess'
$ref: '#/definitions/api.Process'
type: array
security:
- ApiKeyAuth: []
@@ -2374,6 +2392,43 @@ paths:
summary: Delete a process by its ID
tags:
- v16.?.?
put:
consumes:
- application/json
description: Replace an existing process.
operationId: cluster-3-update-process
parameters:
- description: Process ID
in: path
name: id
required: true
type: string
- description: Process config
in: body
name: config
required: true
schema:
$ref: '#/definitions/api.ProcessConfig'
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.ProcessConfig'
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Replace an existing process
tags:
- v16.?.?
/api/v3/config:
get:
description: Retrieve the currently active Restreamer configuration

View File

@@ -11,6 +11,7 @@ import (
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/restream"
"github.com/labstack/echo/v4"
"github.com/lithammer/shortuuid/v4"
@@ -101,6 +102,30 @@ func (h *ClusterHandler) GetNode(c echo.Context) error {
return c.JSON(http.StatusOK, node)
}
// GetNodeVersion returns the proxy node version with the given ID
// @Summary List a proxy node by its ID
// @Description List a proxy node by its ID
// @Tags v16.?.?
// @ID cluster-3-get-node
// @Produce json
// @Param id path string true "Node ID"
// @Success 200 {object} api.ClusterNode
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id} [get]
func (h *ClusterHandler) GetNodeVersion(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.proxy.GetNode(id)
if err != nil {
return api.Err(http.StatusNotFound, "Node not found", "%s", err)
}
version := peer.Version()
return c.JSON(http.StatusOK, version)
}
// GetNodeFiles returns the files from the proxy node with the given ID
// @Summary List the files of a proxy node by its ID
// @Description List the files of a proxy node by its ID
@@ -180,16 +205,16 @@ func (h *ClusterHandler) About(c echo.Context) error {
return c.JSON(http.StatusOK, about)
}
// ListProcesses returns the list of processes in the cluster
// ListNodeProcesses returns the list of processes running on the nodes of the cluster
// @Summary List of processes in the cluster
// @Description List of processes in the cluster
// @Tags v16.?.?
// @ID cluster-3-list-processes
// @ID cluster-3-list-node-processes
// @Produce json
// @Success 200 {array} api.ClusterProcess
// @Security ApiKeyAuth
// @Router /api/v3/cluster/process [get]
func (h *ClusterHandler) ListProcesses(c echo.Context) error {
// @Router /api/v3/cluster/node/process [get]
func (h *ClusterHandler) ListNodeProcesses(c echo.Context) error {
procs := h.proxy.ListProcesses()
processes := []api.ClusterProcess{}
@@ -210,6 +235,40 @@ func (h *ClusterHandler) ListProcesses(c echo.Context) error {
return c.JSON(http.StatusOK, processes)
}
// ListStoreProcesses returns the list of processes stored in the DB of the cluster
// @Summary List of processes in the cluster
// @Description List of processes in the cluster
// @Tags v16.?.?
// @ID cluster-3-list-processes
// @Produce json
// @Success 200 {array} api.Process
// @Security ApiKeyAuth
// @Router /api/v3/cluster/process [get]
func (h *ClusterHandler) ListProcesses(c echo.Context) error {
procs := h.cluster.ListProcesses()
processes := []api.Process{}
for _, p := range procs {
process := api.Process{
ID: p.Config.ID,
Type: "ffmpeg",
Reference: p.Config.Reference,
CreatedAt: 0,
UpdatedAt: p.UpdatedAt.Unix(),
}
config := &api.ProcessConfig{}
config.Unmarshal(p.Config)
process.Config = config
processes = append(processes, process)
}
return c.JSON(http.StatusOK, processes)
}
// Add adds a new process to the cluster
// @Summary Add a new process
// @Description Add a new FFmpeg process
@@ -250,6 +309,54 @@ func (h *ClusterHandler) AddProcess(c echo.Context) error {
return c.JSON(http.StatusOK, process)
}
// Update replaces an existing process
// @Summary Replace an existing process
// @Description Replace an existing process.
// @Tags v16.?.?
// @ID cluster-3-update-process
// @Accept json
// @Produce json
// @Param id path string true "Process ID"
// @Param config body api.ProcessConfig true "Process config"
// @Success 200 {object} api.ProcessConfig
// @Failure 400 {object} api.Error
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/process/{id} [put]
func (h *ClusterHandler) UpdateProcess(c echo.Context) error {
id := util.PathParam(c, "id")
process := api.ProcessConfig{
ID: id,
Type: "ffmpeg",
Autostart: true,
}
current, err := h.cluster.GetProcess(id)
if err != nil {
return api.Err(http.StatusNotFound, "Process not found", "%s", id)
}
// Prefill the config with the current values
process.Unmarshal(current.Config)
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
config := process.Marshal()
if err := h.cluster.UpdateProcess("", id, config); err != nil {
if err == restream.ErrUnknownProcess {
return api.Err(http.StatusNotFound, "Process not found", "%s", id)
}
return api.Err(http.StatusBadRequest, "Process can't be updated", "%s", err)
}
return c.JSON(http.StatusOK, process)
}
// Delete deletes the process with the given ID from the cluster
// @Summary Delete a process by its ID
// @Description Delete a process by its ID
@@ -270,157 +377,3 @@ func (h *ClusterHandler) DeleteProcess(c echo.Context) error {
return c.JSON(http.StatusOK, "OK")
}
/*
// AddNode adds a new node
// @Summary Add a new node
// @Description Add a new node to the cluster
// @ID cluster-3-add-node
// @Accept json
// @Produce json
// @Param config body api.ClusterNodeConfig true "Node config"
// @Success 200 {string} string
// @Failure 400 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node [post]
func (h *ClusterHandler) AddNode(c echo.Context) error {
node := api.ClusterNodeConfig{}
if err := util.ShouldBindJSON(c, &node); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
id, err := h.cluster.AddNodeX(node.Address, "", "")
if err != nil {
return api.Err(http.StatusBadRequest, "Failed to add node", "%s", err)
}
return c.JSON(http.StatusOK, id)
}
// DeleteNode deletes the node with the given ID
// @Summary Delete a node by its ID
// @Description Delete a node by its ID
// @ID cluster-3-delete-node
// @Produce json
// @Param id path string true "Node ID"
// @Success 200 {string} string
// @Failure 404 {object} api.Error
// @Failure 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id} [delete]
func (h *ClusterHandler) DeleteNode(c echo.Context) error {
id := util.PathParam(c, "id")
if err := h.cluster.RemoveNodeX(id); err != nil {
if err == cluster.ErrNodeNotFound {
return api.Err(http.StatusNotFound, err.Error(), "%s", id)
}
return api.Err(http.StatusInternalServerError, "Failed to remove node", "%s", err)
}
return c.JSON(http.StatusOK, "OK")
}
// GetNode returns the node with the given ID
// @Summary List a node by its ID
// @Description List a node by its ID
// @ID cluster-3-get-node
// @Produce json
// @Param id path string true "Node ID"
// @Success 200 {object} api.ClusterNode
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id} [get]
func (h *ClusterHandler) GetNode(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.cluster.GetNodeX(id)
if err != nil {
return api.Err(http.StatusNotFound, "Node not found", "%s", err)
}
state := peer.State()
node := api.ClusterNode{
Address: peer.Address(),
ID: state.ID,
LastUpdate: state.LastUpdate.Unix(),
State: state.State,
}
return c.JSON(http.StatusOK, node)
}
// GetNodeProxy returns the files from the node with the given ID
// @Summary List the files of a node by its ID
// @Description List the files of a node by its ID
// @ID cluster-3-get-node-proxy
// @Produce json
// @Param id path string true "Node ID"
// @Success 200 {object} api.ClusterNodeFiles
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/proxy [get]
func (h *ClusterHandler) GetNodeProxy(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.cluster.GetNodeX(id)
if err != nil {
return api.Err(http.StatusNotFound, "Node not found", "%s", err)
}
files := api.ClusterNodeFiles{}
state := peer.State()
sort.Strings(state.Files)
for _, path := range state.Files {
prefix := strings.TrimSuffix(h.prefix.FindString(path), ":")
path = h.prefix.ReplaceAllString(path, "")
files[prefix] = append(files[prefix], path)
}
return c.JSON(http.StatusOK, files)
}
// UpdateNode replaces an existing node
// @Summary Replaces an existing node
// @Description Replaces an existing node and returns the new node ID
// @ID cluster-3-update-node
// @Accept json
// @Produce json
// @Param id path string true "Node ID"
// @Param config body api.ClusterNodeConfig true "Node config"
// @Success 200 {string} string
// @Failure 400 {object} api.Error
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id} [put]
func (h *ClusterHandler) UpdateNode(c echo.Context) error {
id := util.PathParam(c, "id")
node := api.ClusterNodeConfig{}
if err := util.ShouldBindJSON(c, &node); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
if err := h.cluster.RemoveNode(id); err != nil {
if err == cluster.ErrNodeNotFound {
return api.Err(http.StatusNotFound, err.Error(), "%s", id)
}
return api.Err(http.StatusBadRequest, "Failed to remove node", "%s", err)
}
id, err := h.cluster.AddNodeX(node.Address, "", "")
if err != nil {
return api.Err(http.StatusBadRequest, "Failed to add node", "%s", err)
}
return c.JSON(http.StatusOK, id)
}
*/

View File

@@ -660,15 +660,17 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
// v3 Cluster
if s.v3handler.cluster != nil {
v3.GET("/cluster", s.v3handler.cluster.About)
v3.GET("/cluster/process", s.v3handler.cluster.ListProcesses)
v3.GET("/cluster/node", s.v3handler.cluster.GetNodes)
v3.GET("/cluster/node/process", s.v3handler.cluster.ListNodeProcesses)
v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode)
v3.GET("/cluster/node/:id/files", s.v3handler.cluster.GetNodeFiles)
v3.GET("/cluster/process", s.v3handler.cluster.ListProcesses)
v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion)
if !s.readOnly {
v3.POST("/cluster/process", s.v3handler.cluster.AddProcess)
v3.PUT("/cluster/process/:id", s.v3handler.cluster.UpdateProcess)
v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess)
}
}