Add call chain to add a new user

This commit is contained in:
Ingo Oppermann
2023-05-26 12:25:05 +02:00
parent cb22c2bb2e
commit 56e03308c2
9 changed files with 130 additions and 5 deletions

View File

@@ -217,6 +217,28 @@ func NewAPI(config APIConfig) (API, error) {
return c.JSON(http.StatusOK, "OK")
})
a.router.POST("/v1/iam/user", func(c echo.Context) error {
r := client.AddIdentityRequest{}
if err := util.ShouldBindJSON(c, &r); err != nil {
return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
if r.Origin == a.id {
return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit")
}
a.logger.Debug().WithField("identity", r.Identity).Log("Add identity request")
err := a.cluster.AddIdentity(r.Origin, r.Identity)
if err != nil {
a.logger.Debug().WithError(err).WithField("identity", r.Identity).Log("Unable to add identity")
return httpapi.Err(http.StatusInternalServerError, "unable to add identity", "%s", err)
}
return c.JSON(http.StatusOK, "OK")
})
a.router.GET("/v1/core", func(c echo.Context) error {
address, _ := a.cluster.CoreAPIAddress("")
return c.JSON(http.StatusOK, address)

View File

@@ -39,6 +39,11 @@ type UpdateProcessRequest struct {
Config app.Config `json:"config"`
}
type AddIdentityRequest struct {
Origin string `json:"origin"`
Identity any `json:"identity"`
}
type APIClient struct {
Address string
Client *http.Client
@@ -114,6 +119,17 @@ func (c *APIClient) UpdateProcess(r UpdateProcessRequest) error {
return err
}
func (c *APIClient) AddIdentity(r AddIdentityRequest) error {
data, err := json.Marshal(r)
if err != nil {
return err
}
_, err = c.call(http.MethodPost, "/iam/user", "application/json", bytes.NewReader(data))
return err
}
func (c *APIClient) Snapshot() (io.ReadCloser, error) {
return c.stream(http.MethodGet, "/snapshot", "", nil)
}

View File

@@ -67,6 +67,8 @@ type Cluster interface {
RemoveProcess(origin, id string) error
UpdateProcess(origin, id string, config *app.Config) error
AddIdentity(origin string, identity any) error
ProxyReader() proxy.ProxyReader
}
@@ -742,6 +744,21 @@ func (c *cluster) UpdateProcess(origin, id string, config *app.Config) error {
return c.applyCommand(cmd)
}
func (c *cluster) AddIdentity(origin string, identity any) error {
if !c.IsRaftLeader() {
return c.forwarder.AddIdentity(origin, identity)
}
cmd := &store.Command{
Operation: store.OpAddIdentity,
Data: &store.CommandAddIdentity{
Identity: identity,
},
}
return c.applyCommand(cmd)
}
func (c *cluster) applyCommand(cmd *store.Command) error {
b, err := json.Marshal(cmd)
if err != nil {

View File

@@ -15,12 +15,16 @@ import (
type Forwarder interface {
SetLeader(address string)
HasLeader() bool
Join(origin, id, raftAddress, peerAddress string) error
Leave(origin, id string) error
Snapshot() (io.ReadCloser, error)
AddProcess(origin string, config *app.Config) error
UpdateProcess(origin, id string, config *app.Config) error
RemoveProcess(origin, id string) error
AddIdentity(origin string, identity any) error
}
type forwarder struct {
@@ -186,3 +190,20 @@ func (f *forwarder) RemoveProcess(origin, id string) error {
return client.RemoveProcess(r)
}
func (f *forwarder) AddIdentity(origin string, identity any) error {
if origin == "" {
origin = f.id
}
r := apiclient.AddIdentityRequest{
Origin: origin,
Identity: identity,
}
f.lock.RLock()
client := f.client
f.lock.RUnlock()
return client.AddIdentity(r)
}

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
"github.com/datarhei/core/v16/iam/access"
"github.com/datarhei/core/v16/iam/identity"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/restream/app"
@@ -26,12 +28,23 @@ type Process struct {
Config *app.Config
}
type Users struct {
UpdatedAt time.Time
Users []identity.User
}
type Policies struct {
UpdatedAt time.Time
Policies []access.Policy
}
type operation string
const (
OpAddProcess operation = "addProcess"
OpRemoveProcess operation = "removeProcess"
OpUpdateProcess operation = "updateProcess"
OpAddIdentity operation = "addIdentity"
)
type Command struct {
@@ -52,6 +65,10 @@ type CommandRemoveProcess struct {
ID string
}
type CommandAddIdentity struct {
Identity any
}
// Implement a FSM
type store struct {
lock sync.RWMutex

View File

@@ -45,7 +45,8 @@ type ProcessConfigLimits struct {
// ProcessConfig represents the configuration of an ffmpeg process
type ProcessConfig struct {
ID string `json:"id"`
Group string `json:"group"`
Owner string `json:"owner"`
Domain string `json:"domain"`
Type string `json:"type" validate:"oneof='ffmpeg' ''" jsonschema:"enum=ffmpeg,enum="`
Reference string `json:"reference"`
Input []ProcessConfigIO `json:"input" validate:"required"`
@@ -62,7 +63,8 @@ type ProcessConfig struct {
func (cfg *ProcessConfig) Marshal() *app.Config {
p := &app.Config{
ID: cfg.ID,
Domain: cfg.Group,
Owner: cfg.Owner,
Domain: cfg.Domain,
Reference: cfg.Reference,
Options: cfg.Options,
Reconnect: cfg.Reconnect,
@@ -142,7 +144,8 @@ func (cfg *ProcessConfig) Unmarshal(c *app.Config) {
}
cfg.ID = c.ID
cfg.Group = c.Domain
cfg.Owner = c.Owner
cfg.Domain = c.Domain
cfg.Reference = c.Reference
cfg.Type = "ffmpeg"
cfg.Reconnect = c.Reconnect

View File

@@ -377,3 +377,23 @@ func (h *ClusterHandler) DeleteProcess(c echo.Context) error {
return c.JSON(http.StatusOK, "OK")
}
func (h *ClusterHandler) AddIdentity(c echo.Context) error {
process := api.ProcessConfig{
ID: shortuuid.New(),
Type: "ffmpeg",
Autostart: true,
}
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
config := process.Marshal()
if err := h.cluster.AddIdentity("", config); err != nil {
return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error())
}
return c.JSON(http.StatusOK, process)
}

View File

@@ -44,6 +44,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
process := api.ProcessConfig{
ID: shortuuid.New(),
Owner: user,
Type: "ffmpeg",
Autostart: true,
}
@@ -52,7 +53,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
if !h.iam.Enforce(user, process.Group, "process:"+process.ID, "write") {
if !h.iam.Enforce(process.Owner, process.Domain, "process:"+process.ID, "write") {
return api.Err(http.StatusForbidden, "Forbidden")
}
@@ -65,7 +66,6 @@ func (h *RestreamHandler) Add(c echo.Context) error {
}
config := process.Marshal()
config.Owner = user
if err := h.restream.AddProcess(config); err != nil {
return api.Err(http.StatusBadRequest, "Invalid process config", "%s", err.Error())

View File

@@ -56,6 +56,15 @@ func (u *User) validate() error {
return fmt.Errorf("the name can only contain [%s]", chars)
}
if len(u.Auth.API.Auth0.User) != 0 {
t, err := newAuth0Tenant(u.Auth.API.Auth0.Tenant)
if err != nil {
return fmt.Errorf("auth0: %w", err)
}
t.Cancel()
}
return nil
}