diff --git a/cluster/api.go b/cluster/api.go index 873779c4..75a92475 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -17,6 +17,7 @@ import ( mwlog "github.com/datarhei/core/v16/http/middleware/log" "github.com/datarhei/core/v16/http/validator" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/restream/app" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" @@ -52,6 +53,16 @@ type LeaveRequest struct { ID string `json:"id"` } +type AddProcessRequest struct { + Origin string `json:"origin"` + Config app.Config `json:"config"` +} + +type RemoveProcessRequest struct { + Origin string `json:"origin"` + ID string `json:"id"` +} + func NewAPI(config APIConfig) (API, error) { a := &api{ id: config.ID, @@ -150,11 +161,47 @@ func NewAPI(config APIConfig) (API, error) { }) a.router.POST("/v1/process", func(c echo.Context) error { - return httpapi.Err(http.StatusNotImplemented, "") + r := AddProcessRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + a.logger.Debug().WithField("id", r.Config.ID).Log("got add process request") + + if r.Origin == a.id { + return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") + } + + err := a.cluster.AddProcess(r.Origin, &r.Config) + if err != nil { + a.logger.Debug().WithError(err).WithField("id", r.Config.ID).Log("unable to add process") + return httpapi.Err(http.StatusInternalServerError, "unable to add process", "%s", err) + } + + return c.JSON(http.StatusOK, "OK") }) - a.router.DELETE("/v1/process/:id", func(c echo.Context) error { - return httpapi.Err(http.StatusNotImplemented, "") + a.router.POST("/v1/process/:id", func(c echo.Context) error { + r := RemoveProcessRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + a.logger.Debug().WithField("id", r.ID).Log("got remove process request") + + if r.Origin == a.id { + return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") + } + + err := a.cluster.RemoveProcess(r.Origin, r.ID) + if err != nil { + a.logger.Debug().WithError(err).WithField("id", r.ID).Log("unable to remove process") + return httpapi.Err(http.StatusInternalServerError, "unable to remove process", "%s", err) + } + + return c.JSON(http.StatusOK, "OK") }) a.router.GET("/v1/core", func(c echo.Context) error { @@ -216,6 +263,28 @@ func (c *APIClient) Leave(r LeaveRequest) error { return err } +func (c *APIClient) AddProcess(r AddProcessRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/process", "application/json", bytes.NewReader(data)) + + return err +} + +func (c *APIClient) RemoveProcess(r RemoveProcessRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/process/"+r.ID, "application/json", bytes.NewReader(data)) + + return err +} + func (c *APIClient) Snapshot() (io.ReadCloser, error) { return c.stream(http.MethodGet, "/snapshot", "", nil) } diff --git a/cluster/cluster.go b/cluster/cluster.go index 5e571bcf..2c23a5c3 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -72,6 +72,9 @@ type Cluster interface { ListNodes() []addNodeCommand GetNode(id string) (addNodeCommand, error) + AddProcess(origin string, config *app.Config) error + RemoveProcess(origin, id string) error + ProxyReader() ProxyReader } @@ -976,15 +979,30 @@ func (c *cluster) followerLoop(stopCh chan struct{}) { } } -func (c *cluster) AddProcess(config app.Config) error { +func (c *cluster) AddProcess(origin string, config *app.Config) error { if !c.IsRaftLeader() { - return c.forwarder.AddProcess() + return c.forwarder.AddProcess(origin, config) } cmd := &command{ - Operation: "addProcess", + Operation: opAddProcess, Data: &addProcessCommand{ - Config: nil, + Config: *config, + }, + } + + return c.applyCommand(cmd) +} + +func (c *cluster) RemoveProcess(origin, id string) error { + if !c.IsRaftLeader() { + return c.forwarder.RemoveProcess(origin, id) + } + + cmd := &command{ + Operation: opRemoveProcess, + Data: &removeProcessCommand{ + ID: id, }, } diff --git a/cluster/forwarder.go b/cluster/forwarder.go index a837cf90..45098c55 100644 --- a/cluster/forwarder.go +++ b/cluster/forwarder.go @@ -8,6 +8,7 @@ import ( "time" "github.com/datarhei/core/v16/log" + "github.com/datarhei/core/v16/restream/app" ) // Forwarder forwards any HTTP request from a follower to the leader @@ -17,9 +18,9 @@ type Forwarder interface { Join(origin, id, raftAddress, peerAddress string) error Leave(origin, id string) error Snapshot() (io.ReadCloser, error) - AddProcess() error + AddProcess(origin string, config *app.Config) error UpdateProcess() error - RemoveProcess() error + RemoveProcess(origin, id string) error } type forwarder struct { @@ -134,14 +135,40 @@ func (f *forwarder) Snapshot() (io.ReadCloser, error) { return client.Snapshot() } -func (f *forwarder) AddProcess() error { - return fmt.Errorf("not implemented") +func (f *forwarder) AddProcess(origin string, config *app.Config) error { + if origin == "" { + origin = f.id + } + + r := AddProcessRequest{ + Origin: origin, + Config: *config, + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.AddProcess(r) } func (f *forwarder) UpdateProcess() error { return fmt.Errorf("not implemented") } -func (f *forwarder) RemoveProcess() error { - return fmt.Errorf("not implemented") +func (f *forwarder) RemoveProcess(origin, id string) error { + if origin == "" { + origin = f.id + } + + r := RemoveProcessRequest{ + Origin: origin, + ID: id, + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.RemoveProcess(r) } diff --git a/cluster/node.go b/cluster/node.go index e519f9e5..130c504e 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -26,6 +26,12 @@ type Node interface { GetURL(path string) (string, error) GetFile(path string) (io.ReadCloser, error) + ProcessGetAll() ([]string, error) + ProcessSet() error + ProcessStart(id string) error + ProcessStop(id string) error + ProcessDelete(id string) error + NodeReader } @@ -463,3 +469,34 @@ func (n *node) GetFile(path string) (io.ReadCloser, error) { return nil, fmt.Errorf("unknown prefix") } + +func (n *node) ProcessGetAll() ([]string, error) { + processes, err := n.peer.ProcessList(nil, nil) + if err != nil { + return nil, err + } + + list := []string{} + + for _, p := range processes { + list = append(list, p.ID) + } + + return list, nil +} + +func (n *node) ProcessSet() error { + return nil +} + +func (n *node) ProcessStart(id string) error { + return n.peer.ProcessCommand(id, "start") +} + +func (n *node) ProcessStop(id string) error { + return n.peer.ProcessCommand(id, "stop") +} + +func (n *node) ProcessDelete(id string) error { + return n.peer.ProcessDelete(id) +} diff --git a/cluster/store.go b/cluster/store.go index 441f92c1..17cb43ae 100644 --- a/cluster/store.go +++ b/cluster/store.go @@ -6,6 +6,8 @@ import ( "io" "sync" + "github.com/datarhei/core/v16/restream/app" + "github.com/hashicorp/raft" ) @@ -14,6 +16,9 @@ type Store interface { ListNodes() []StoreNode GetNode(id string) (StoreNode, error) + + ListProcesses() []app.Config + GetProcess(id string) (app.Config, error) } type operation string @@ -45,18 +50,24 @@ type StoreNode struct { } type addProcessCommand struct { - Config []byte + app.Config +} + +type removeProcessCommand struct { + ID string } // Implement a FSM type store struct { - lock sync.RWMutex - Nodes map[string]string + lock sync.RWMutex + Nodes map[string]string + Process map[string]app.Config } func NewStore() (Store, error) { return &store{ - Nodes: map[string]string{}, + Nodes: map[string]string{}, + Process: map[string]app.Config{}, }, nil } @@ -95,10 +106,26 @@ func (s *store) Apply(log *raft.Log) interface{} { s.lock.Lock() delete(s.Nodes, cmd.ID) s.lock.Unlock() + case opAddProcess: + b, _ := json.Marshal(c.Data) + cmd := addProcessCommand{} + json.Unmarshal(b, &cmd) + + s.lock.Lock() + s.Process[cmd.ID] = cmd.Config + s.lock.Unlock() + case opRemoveProcess: + b, _ := json.Marshal(c.Data) + cmd := removeProcessCommand{} + json.Unmarshal(b, &cmd) + + s.lock.Lock() + delete(s.Process, cmd.ID) + s.lock.Unlock() } s.lock.RLock() - fmt.Printf("\n==> %+v\n\n", s.Nodes) + fmt.Printf("\n==> %+v\n\n", s.Process) s.lock.RUnlock() return nil } @@ -166,6 +193,31 @@ func (s *store) GetNode(id string) (StoreNode, error) { }, nil } +func (s *store) ListProcesses() []app.Config { + s.lock.RLock() + defer s.lock.RUnlock() + + processes := []app.Config{} + + for _, cfg := range s.Process { + processes = append(processes, *cfg.Clone()) + } + + return processes +} + +func (s *store) GetProcess(id string) (app.Config, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + cfg, ok := s.Process[id] + if !ok { + return app.Config{}, fmt.Errorf("not found") + } + + return *cfg.Clone(), nil +} + type fsmSnapshot struct { data []byte } diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index d7ad8b1a..5b9beb1f 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -10,6 +10,7 @@ import ( "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/handler/util" "github.com/labstack/echo/v4" + "github.com/lithammer/shortuuid/v4" ) // The ClusterHandler type provides handler functions for manipulating the cluster config. @@ -31,6 +32,7 @@ func NewCluster(cluster cluster.Cluster) *ClusterHandler { // GetProxyNodes returns the list of proxy nodes in the cluster // @Summary List of proxy nodes in the cluster // @Description List of proxy nodes in the cluster +// @Tags v16.?.? // @ID cluster-3-get-proxy-nodes // @Produce json // @Success 200 {array} api.ClusterNode @@ -62,6 +64,7 @@ func (h *ClusterHandler) GetProxyNodes(c echo.Context) error { // GetProxyNode returns the proxy node with the given ID // @Summary List a proxy node by its ID // @Description List a proxy node by its ID +// @Tags v16.?.? // @ID cluster-3-get-proxy-node // @Produce json // @Param id path string true "Node ID" @@ -92,6 +95,7 @@ func (h *ClusterHandler) GetProxyNode(c echo.Context) error { // GetProxyNodeFiles returns the files from the proxy node with the given ID // @Summary List the files of a proxy node by its ID // @Description List the files of a proxy node by its ID +// @Tags v16.?.? // @ID cluster-3-get-proxy-node-files // @Produce json // @Param id path string true "Node ID" @@ -126,6 +130,7 @@ func (h *ClusterHandler) GetProxyNodeFiles(c echo.Context) error { // GetCluster returns the list of nodes in the cluster // @Summary List of nodes in the cluster // @Description List of nodes in the cluster +// @Tags v16.?.? // @ID cluster-3-get-cluster // @Produce json // @Success 200 {object} api.ClusterAbout @@ -160,6 +165,67 @@ func (h *ClusterHandler) About(c echo.Context) error { return c.JSON(http.StatusOK, about) } +// Add adds a new process to the cluster +// @Summary Add a new process +// @Description Add a new FFmpeg process +// @Tags v16.?.? +// @ID cluster-3-add-process +// @Accept json +// @Produce json +// @Param config body api.ProcessConfig true "Process config" +// @Success 200 {object} api.ProcessConfig +// @Failure 400 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/process [post] +func (h *ClusterHandler) AddProcess(c echo.Context) error { + process := api.ProcessConfig{ + ID: shortuuid.New(), + Type: "ffmpeg", + Autostart: true, + } + + if err := util.ShouldBindJSON(c, &process); err != nil { + return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + if process.Type != "ffmpeg" { + return api.Err(http.StatusBadRequest, "Unsupported process type", "Supported process types are: ffmpeg") + } + + if len(process.Input) == 0 || len(process.Output) == 0 { + return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined") + } + + config := process.Marshal() + + if err := h.cluster.AddProcess("", config); err != nil { + return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) + } + + return c.JSON(http.StatusOK, process) +} + +// Delete deletes the process with the given ID from the cluster +// @Summary Delete a process by its ID +// @Description Delete a process by its ID +// @Tags v16.?.? +// @ID cluster-3-delete-process +// @Produce json +// @Param id path string true "Process ID" +// @Success 200 {string} string +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/process/{id} [delete] +func (h *ClusterHandler) DeleteProcess(c echo.Context) error { + id := util.PathParam(c, "id") + + if err := h.cluster.RemoveProcess("", id); err != nil { + return api.Err(http.StatusInternalServerError, "Process can't be deleted", "%s", err) + } + + return c.JSON(http.StatusOK, "OK") +} + /* // AddNode adds a new node // @Summary Add a new node diff --git a/http/server.go b/http/server.go index e4d9d54d..9b7e5583 100644 --- a/http/server.go +++ b/http/server.go @@ -664,13 +664,11 @@ func (s *server) setRoutesV3(v3 *echo.Group) { v3.GET("/cluster/proxy", s.v3handler.cluster.GetProxyNodes) v3.GET("/cluster/proxy/node/:id", s.v3handler.cluster.GetProxyNode) v3.GET("/cluster/proxy/node/:id/files", s.v3handler.cluster.GetProxyNodeFiles) - /* - if !s.readOnly { - v3.POST("/cluster/node", s.v3handler.cluster.AddNode) - v3.PUT("/cluster/node/:id", s.v3handler.cluster.UpdateNode) - v3.DELETE("/cluster/node/:id", s.v3handler.cluster.DeleteNode) - } - */ + + if !s.readOnly { + v3.POST("/cluster/process", s.v3handler.cluster.AddProcess) + v3.DELETE("/cluster/process/:id", s.v3handler.cluster.DeleteProcess) + } } // v3 Log