diff --git a/cluster/api.go b/cluster/api.go index 49ab5199..aa2570dc 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -217,6 +217,28 @@ func NewAPI(config APIConfig) (API, error) { return c.JSON(http.StatusOK, "OK") }) + a.router.POST("/v1/iam/user", func(c echo.Context) error { + r := client.AddIdentityRequest{} + + if err := util.ShouldBindJSON(c, &r); err != nil { + return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + if r.Origin == a.id { + return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") + } + + a.logger.Debug().WithField("identity", r.Identity).Log("Add identity request") + + err := a.cluster.AddIdentity(r.Origin, r.Identity) + if err != nil { + a.logger.Debug().WithError(err).WithField("identity", r.Identity).Log("Unable to add identity") + return httpapi.Err(http.StatusInternalServerError, "unable to add identity", "%s", err) + } + + return c.JSON(http.StatusOK, "OK") + }) + a.router.GET("/v1/core", func(c echo.Context) error { address, _ := a.cluster.CoreAPIAddress("") return c.JSON(http.StatusOK, address) diff --git a/cluster/client/client.go b/cluster/client/client.go index 06575778..254f3c1d 100644 --- a/cluster/client/client.go +++ b/cluster/client/client.go @@ -39,6 +39,11 @@ type UpdateProcessRequest struct { Config app.Config `json:"config"` } +type AddIdentityRequest struct { + Origin string `json:"origin"` + Identity any `json:"identity"` +} + type APIClient struct { Address string Client *http.Client @@ -114,6 +119,17 @@ func (c *APIClient) UpdateProcess(r UpdateProcessRequest) error { return err } +func (c *APIClient) AddIdentity(r AddIdentityRequest) error { + data, err := json.Marshal(r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/iam/user", "application/json", bytes.NewReader(data)) + + return err +} + func (c *APIClient) Snapshot() (io.ReadCloser, error) { return c.stream(http.MethodGet, "/snapshot", "", nil) } diff --git a/cluster/cluster.go b/cluster/cluster.go index 0bb5d067..dd020bd8 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -67,6 +67,8 @@ type Cluster interface { RemoveProcess(origin, id string) error UpdateProcess(origin, id string, config *app.Config) error + AddIdentity(origin string, identity any) error + ProxyReader() proxy.ProxyReader } @@ -742,6 +744,21 @@ func (c *cluster) UpdateProcess(origin, id string, config *app.Config) error { return c.applyCommand(cmd) } +func (c *cluster) AddIdentity(origin string, identity any) error { + if !c.IsRaftLeader() { + return c.forwarder.AddIdentity(origin, identity) + } + + cmd := &store.Command{ + Operation: store.OpAddIdentity, + Data: &store.CommandAddIdentity{ + Identity: identity, + }, + } + + return c.applyCommand(cmd) +} + func (c *cluster) applyCommand(cmd *store.Command) error { b, err := json.Marshal(cmd) if err != nil { diff --git a/cluster/forwarder/forwarder.go b/cluster/forwarder/forwarder.go index a128d30d..40b25887 100644 --- a/cluster/forwarder/forwarder.go +++ b/cluster/forwarder/forwarder.go @@ -15,12 +15,16 @@ import ( type Forwarder interface { SetLeader(address string) HasLeader() bool + Join(origin, id, raftAddress, peerAddress string) error Leave(origin, id string) error Snapshot() (io.ReadCloser, error) + AddProcess(origin string, config *app.Config) error UpdateProcess(origin, id string, config *app.Config) error RemoveProcess(origin, id string) error + + AddIdentity(origin string, identity any) error } type forwarder struct { @@ -186,3 +190,20 @@ func (f *forwarder) RemoveProcess(origin, id string) error { return client.RemoveProcess(r) } + +func (f *forwarder) AddIdentity(origin string, identity any) error { + if origin == "" { + origin = f.id + } + + r := apiclient.AddIdentityRequest{ + Origin: origin, + Identity: identity, + } + + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + return client.AddIdentity(r) +} diff --git a/cluster/store/store.go b/cluster/store/store.go index 528ce99f..6c713277 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/datarhei/core/v16/iam/access" + "github.com/datarhei/core/v16/iam/identity" "github.com/datarhei/core/v16/log" "github.com/datarhei/core/v16/restream/app" @@ -26,12 +28,23 @@ type Process struct { Config *app.Config } +type Users struct { + UpdatedAt time.Time + Users []identity.User +} + +type Policies struct { + UpdatedAt time.Time + Policies []access.Policy +} + type operation string const ( OpAddProcess operation = "addProcess" OpRemoveProcess operation = "removeProcess" OpUpdateProcess operation = "updateProcess" + OpAddIdentity operation = "addIdentity" ) type Command struct { @@ -52,6 +65,10 @@ type CommandRemoveProcess struct { ID string } +type CommandAddIdentity struct { + Identity any +} + // Implement a FSM type store struct { lock sync.RWMutex diff --git a/http/api/process.go b/http/api/process.go index 4d7bcd2d..00de1750 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -45,7 +45,8 @@ type ProcessConfigLimits struct { // ProcessConfig represents the configuration of an ffmpeg process type ProcessConfig struct { ID string `json:"id"` - Group string `json:"group"` + Owner string `json:"owner"` + Domain string `json:"domain"` Type string `json:"type" validate:"oneof='ffmpeg' ''" jsonschema:"enum=ffmpeg,enum="` Reference string `json:"reference"` Input []ProcessConfigIO `json:"input" validate:"required"` @@ -62,7 +63,8 @@ type ProcessConfig struct { func (cfg *ProcessConfig) Marshal() *app.Config { p := &app.Config{ ID: cfg.ID, - Domain: cfg.Group, + Owner: cfg.Owner, + Domain: cfg.Domain, Reference: cfg.Reference, Options: cfg.Options, Reconnect: cfg.Reconnect, @@ -142,7 +144,8 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) { } cfg.ID = c.ID - cfg.Group = c.Domain + cfg.Owner = c.Owner + cfg.Domain = c.Domain cfg.Reference = c.Reference cfg.Type = "ffmpeg" cfg.Reconnect = c.Reconnect diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index d47dd240..9490639d 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -377,3 +377,23 @@ func (h *ClusterHandler) DeleteProcess(c echo.Context) error { return c.JSON(http.StatusOK, "OK") } + +func (h *ClusterHandler) AddIdentity(c echo.Context) error { + process := api.ProcessConfig{ + ID: shortuuid.New(), + Type: "ffmpeg", + Autostart: true, + } + + if err := util.ShouldBindJSON(c, &process); err != nil { + return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) + } + + config := process.Marshal() + + if err := h.cluster.AddIdentity("", config); err != nil { + return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) + } + + return c.JSON(http.StatusOK, process) +} diff --git a/http/handler/api/restream.go b/http/handler/api/restream.go index dbe9674e..26b7f942 100644 --- a/http/handler/api/restream.go +++ b/http/handler/api/restream.go @@ -44,6 +44,7 @@ func (h *RestreamHandler) Add(c echo.Context) error { process := api.ProcessConfig{ ID: shortuuid.New(), + Owner: user, Type: "ffmpeg", Autostart: true, } @@ -52,7 +53,7 @@ func (h *RestreamHandler) Add(c echo.Context) error { return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - if !h.iam.Enforce(user, process.Group, "process:"+process.ID, "write") { + if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") { return api.Err(http.StatusForbidden, "Forbidden") } @@ -65,7 +66,6 @@ func (h *RestreamHandler) Add(c echo.Context) error { } config := process.Marshal() - config.Owner = user if err := h.restream.AddProcess(config); err != nil { return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error()) diff --git a/iam/identity/identity.go b/iam/identity/identity.go index a25778ee..fcf8750c 100644 --- a/iam/identity/identity.go +++ b/iam/identity/identity.go @@ -56,6 +56,15 @@ func (u *User) validate() error { return fmt.Errorf("the name can only contain [%s]", chars) } + if len(u.Auth.API.Auth0.User) != 0 { + t, err := newAuth0Tenant(u.Auth.API.Auth0.Tenant) + if err != nil { + return fmt.Errorf("auth0: %w", err) + } + + t.Cancel() + } + return nil }